Monitoring – opdi.monitoring¶
Monitoring and data quality modules for OPDI.
- class opdi.monitoring.BasicStatsCollector(spark, config)[source]¶
Bases:
objectCollects basic statistics for OPDI tables.
Provides simple row counts and table existence checks for monitoring pipeline execution and data availability.
- Parameters:
spark (pyspark.sql.SparkSession)
config (OPDIConfig)
- DEFAULT_TABLES = ['opdi_flight_list', 'opdi_flight_events', 'opdi_measurements']¶
- __init__(spark, config)[source]¶
Initialize basic stats collector.
- Parameters:
spark (pyspark.sql.SparkSession) – Active SparkSession
config (OPDIConfig) – OPDI configuration object
- get_all_table_counts(tables=None)[source]¶
Get row counts for multiple tables.
- Parameters:
tables (List[str]) – List of table names. If None, uses DEFAULT_TABLES
- Returns:
Dictionary mapping table names to row counts
- Return type:
Example
>>> collector = BasicStatsCollector(spark, config) >>> counts = collector.get_all_table_counts() >>> print(counts) {'opdi_flight_list': 1500000, 'opdi_flight_events': 3000000}
- print_summary(tables=None)[source]¶
Print formatted summary of table counts.
- Parameters:
tables (List[str]) – List of table names. If None, uses DEFAULT_TABLES
- Return type:
None
Example
>>> collector = BasicStatsCollector(spark, config) >>> collector.print_summary() Row counts per table opdi_flight_list 1,500,000 opdi_flight_events 3,000,000 opdi_measurements 5,000,000
- class opdi.monitoring.AdvancedStatsCollector(spark, config, suspicious_threshold=25000000, known_outages=None)[source]¶
Bases:
objectAdvanced statistics and data quality monitoring.
Tracks daily row counts, detects anomalies, and cross-references with known outages and MinIO bucket availability.
- Parameters:
spark (pyspark.sql.SparkSession)
config (OPDIConfig)
suspicious_threshold (int)
- DEFAULT_KNOWN_OUTAGES = [('2023-01-02 23:00:00', '2023-01-03 10:00:00'), ('2023-01-18 11:00:00', '2023-01-23 07:00:00'), ('2023-06-21 13:00:00', '2023-06-21 22:00:00'), ('2023-11-15 06:00:00', '2023-11-16 08:00:00'), ('2023-11-20 01:00:00', '2023-11-20 03:00:00'), ('2023-12-02 08:00:00', '2023-12-05 03:00:00'), ('2024-05-20 10:00:00', '2024-05-21 05:00:00')]¶
- __init__(spark, config, suspicious_threshold=25000000, known_outages=None)[source]¶
Initialize advanced stats collector.
- Parameters:
spark (pyspark.sql.SparkSession) – Active SparkSession
config (OPDIConfig) – OPDI configuration object
suspicious_threshold (int) – Row count threshold below which data is suspicious
known_outages (List[Tuple[str, str]] | None) – List of (start_datetime, end_datetime) outage periods
- fetch_daily_row_counts(table_name, date_column='event_time')[source]¶
Fetch daily row counts from a table.
Leverages partition pruning for efficient scanning.
- Parameters:
- Returns:
date, row_count
- Return type:
DataFrame with columns
Example
>>> collector = AdvancedStatsCollector(spark, config) >>> df = collector.fetch_daily_row_counts("osn_statevectors_v2") >>> print(f"Date range: {df['date'].min()} to {df['date'].max()}")
- fill_missing_dates(df)[source]¶
Fill in missing dates with 0 row counts.
- Parameters:
df (pandas.DataFrame) – DataFrame with date and row_count columns
- Returns:
DataFrame with all dates filled in
- Return type:
- check_known_outage(date)[source]¶
Check if a date falls within any known outage period.
- Parameters:
date (pandas.Timestamp) – Date to check
- Returns:
Outage period string if date is affected, empty string otherwise
- Return type:
- setup_minio_client()[source]¶
Set up MinIO client (mc) for accessing OpenSky bucket.
Requires OSN_USERNAME and OSN_KEY environment variables.
- Returns:
True if setup successful, False otherwise
- Return type:
- get_minio_files_for_date(date)[source]¶
Query MinIO bucket for files available on a specific date.
- Parameters:
date (pandas.Timestamp) – Date to query
- Returns:
Dictionary with ‘file_count’ and ‘hours’ (list of hours with data)
- Return type:
- analyze_missing_data(df, check_minio=True)[source]¶
Analyze missing or suspicious data and add diagnostic columns.
- Parameters:
df (pandas.DataFrame) – DataFrame with date and row_count columns
check_minio (bool) – Whether to check MinIO bucket for file availability
- Returns:
is_suspicious: bool
known_outage: str
minio_file_count: int
minio_hours_available: str
explanation: str
- Return type:
DataFrame with additional diagnostic columns
- visualize_daily_counts(df, output_path, title='Daily Row Counts - OpenSky Network State Vectors')[source]¶
Create and save interactive Plotly visualization of daily row counts.
- Parameters:
df (pandas.DataFrame) – DataFrame with date, row_count, and analysis columns
output_path (str) – Path to save the HTML plot
title (str) – Chart title
- Return type:
None
Example
>>> collector.visualize_daily_counts( ... df, ... "daily_counts.html", ... "Daily Counts - State Vectors" ... )
- generate_quality_report(table_name, output_csv, output_html, date_column='event_time', check_minio=True)[source]¶
Generate complete data quality report.
- Parameters:
- Returns:
DataFrame with complete analysis
- Return type:
Example
>>> collector = AdvancedStatsCollector(spark, config) >>> df = collector.generate_quality_report( ... "osn_statevectors_v2", ... "daily_counts.csv", ... "daily_counts.html" ... )
- opdi.monitoring.print_table_counts(spark, tables, project='project_opdi')[source]¶
Standalone function to print table counts.
- Parameters:
spark (pyspark.sql.SparkSession) – Active SparkSession
tables (List[str]) – List of fully qualified table names (e.g., “project_opdi.opdi_flight_list”)
project (str) – Project name (default: “project_opdi”)
- Return type:
None
Example
>>> from opdi.monitoring.basic_stats import print_table_counts >>> print_table_counts(spark, ["project_opdi.opdi_flight_list"])
Basic Statistics – opdi.monitoring.basic_stats¶
Basic statistics collection for OPDI tables.
Provides simple row count and basic statistics for monitoring data pipeline health and completeness.
- class opdi.monitoring.basic_stats.BasicStatsCollector(spark, config)[source]¶
Bases:
objectCollects basic statistics for OPDI tables.
Provides simple row counts and table existence checks for monitoring pipeline execution and data availability.
- Parameters:
spark (pyspark.sql.SparkSession)
config (OPDIConfig)
- DEFAULT_TABLES = ['opdi_flight_list', 'opdi_flight_events', 'opdi_measurements']¶
- __init__(spark, config)[source]¶
Initialize basic stats collector.
- Parameters:
spark (pyspark.sql.SparkSession) – Active SparkSession
config (OPDIConfig) – OPDI configuration object
- get_all_table_counts(tables=None)[source]¶
Get row counts for multiple tables.
- Parameters:
tables (List[str]) – List of table names. If None, uses DEFAULT_TABLES
- Returns:
Dictionary mapping table names to row counts
- Return type:
Example
>>> collector = BasicStatsCollector(spark, config) >>> counts = collector.get_all_table_counts() >>> print(counts) {'opdi_flight_list': 1500000, 'opdi_flight_events': 3000000}
- print_summary(tables=None)[source]¶
Print formatted summary of table counts.
- Parameters:
tables (List[str]) – List of table names. If None, uses DEFAULT_TABLES
- Return type:
None
Example
>>> collector = BasicStatsCollector(spark, config) >>> collector.print_summary() Row counts per table opdi_flight_list 1,500,000 opdi_flight_events 3,000,000 opdi_measurements 5,000,000
- opdi.monitoring.basic_stats.print_table_counts(spark, tables, project='project_opdi')[source]¶
Standalone function to print table counts.
- Parameters:
spark (pyspark.sql.SparkSession) – Active SparkSession
tables (List[str]) – List of fully qualified table names (e.g., “project_opdi.opdi_flight_list”)
project (str) – Project name (default: “project_opdi”)
- Return type:
None
Example
>>> from opdi.monitoring.basic_stats import print_table_counts >>> print_table_counts(spark, ["project_opdi.opdi_flight_list"])
Advanced Statistics – opdi.monitoring.advanced_stats¶
Advanced statistics and data quality monitoring for OPDI.
Provides comprehensive data quality analysis including: - Daily row count trends - Anomaly detection (suspicious low counts) - Known outage tracking - MinIO bucket availability checks - Interactive Plotly visualizations
- class opdi.monitoring.advanced_stats.AdvancedStatsCollector(spark, config, suspicious_threshold=25000000, known_outages=None)[source]¶
Bases:
objectAdvanced statistics and data quality monitoring.
Tracks daily row counts, detects anomalies, and cross-references with known outages and MinIO bucket availability.
- Parameters:
spark (pyspark.sql.SparkSession)
config (OPDIConfig)
suspicious_threshold (int)
- DEFAULT_KNOWN_OUTAGES = [('2023-01-02 23:00:00', '2023-01-03 10:00:00'), ('2023-01-18 11:00:00', '2023-01-23 07:00:00'), ('2023-06-21 13:00:00', '2023-06-21 22:00:00'), ('2023-11-15 06:00:00', '2023-11-16 08:00:00'), ('2023-11-20 01:00:00', '2023-11-20 03:00:00'), ('2023-12-02 08:00:00', '2023-12-05 03:00:00'), ('2024-05-20 10:00:00', '2024-05-21 05:00:00')]¶
- __init__(spark, config, suspicious_threshold=25000000, known_outages=None)[source]¶
Initialize advanced stats collector.
- Parameters:
spark (pyspark.sql.SparkSession) – Active SparkSession
config (OPDIConfig) – OPDI configuration object
suspicious_threshold (int) – Row count threshold below which data is suspicious
known_outages (List[Tuple[str, str]] | None) – List of (start_datetime, end_datetime) outage periods
- fetch_daily_row_counts(table_name, date_column='event_time')[source]¶
Fetch daily row counts from a table.
Leverages partition pruning for efficient scanning.
- Parameters:
- Returns:
date, row_count
- Return type:
DataFrame with columns
Example
>>> collector = AdvancedStatsCollector(spark, config) >>> df = collector.fetch_daily_row_counts("osn_statevectors_v2") >>> print(f"Date range: {df['date'].min()} to {df['date'].max()}")
- fill_missing_dates(df)[source]¶
Fill in missing dates with 0 row counts.
- Parameters:
df (pandas.DataFrame) – DataFrame with date and row_count columns
- Returns:
DataFrame with all dates filled in
- Return type:
- check_known_outage(date)[source]¶
Check if a date falls within any known outage period.
- Parameters:
date (pandas.Timestamp) – Date to check
- Returns:
Outage period string if date is affected, empty string otherwise
- Return type:
- setup_minio_client()[source]¶
Set up MinIO client (mc) for accessing OpenSky bucket.
Requires OSN_USERNAME and OSN_KEY environment variables.
- Returns:
True if setup successful, False otherwise
- Return type:
- get_minio_files_for_date(date)[source]¶
Query MinIO bucket for files available on a specific date.
- Parameters:
date (pandas.Timestamp) – Date to query
- Returns:
Dictionary with ‘file_count’ and ‘hours’ (list of hours with data)
- Return type:
- analyze_missing_data(df, check_minio=True)[source]¶
Analyze missing or suspicious data and add diagnostic columns.
- Parameters:
df (pandas.DataFrame) – DataFrame with date and row_count columns
check_minio (bool) – Whether to check MinIO bucket for file availability
- Returns:
is_suspicious: bool
known_outage: str
minio_file_count: int
minio_hours_available: str
explanation: str
- Return type:
DataFrame with additional diagnostic columns
- visualize_daily_counts(df, output_path, title='Daily Row Counts - OpenSky Network State Vectors')[source]¶
Create and save interactive Plotly visualization of daily row counts.
- Parameters:
df (pandas.DataFrame) – DataFrame with date, row_count, and analysis columns
output_path (str) – Path to save the HTML plot
title (str) – Chart title
- Return type:
None
Example
>>> collector.visualize_daily_counts( ... df, ... "daily_counts.html", ... "Daily Counts - State Vectors" ... )
- generate_quality_report(table_name, output_csv, output_html, date_column='event_time', check_minio=True)[source]¶
Generate complete data quality report.
- Parameters:
- Returns:
DataFrame with complete analysis
- Return type:
Example
>>> collector = AdvancedStatsCollector(spark, config) >>> df = collector.generate_quality_report( ... "osn_statevectors_v2", ... "daily_counts.csv", ... "daily_counts.html" ... )