Monitoring – opdi.monitoring

Monitoring and data quality modules for OPDI.

class opdi.monitoring.BasicStatsCollector(spark, config)[source]

Bases: object

Collects basic statistics for OPDI tables.

Provides simple row counts and table existence checks for monitoring pipeline execution and data availability.

Parameters:
DEFAULT_TABLES = ['opdi_flight_list', 'opdi_flight_events', 'opdi_measurements']
__init__(spark, config)[source]

Initialize basic stats collector.

Parameters:
get_table_count(table_name)[source]

Get row count for a single table.

Parameters:

table_name (str) – Table name (without project prefix)

Returns:

Number of rows in the table

Raises:

Exception – If table doesn’t exist

Return type:

int

get_all_table_counts(tables=None)[source]

Get row counts for multiple tables.

Parameters:

tables (List[str]) – List of table names. If None, uses DEFAULT_TABLES

Returns:

Dictionary mapping table names to row counts

Return type:

Dict[str, int]

Example

>>> collector = BasicStatsCollector(spark, config)
>>> counts = collector.get_all_table_counts()
>>> print(counts)
{'opdi_flight_list': 1500000, 'opdi_flight_events': 3000000}
print_summary(tables=None)[source]

Print formatted summary of table counts.

Parameters:

tables (List[str]) – List of table names. If None, uses DEFAULT_TABLES

Return type:

None

Example

>>> collector = BasicStatsCollector(spark, config)
>>> collector.print_summary()
Row counts per table
opdi_flight_list                      1,500,000
opdi_flight_events                    3,000,000
opdi_measurements                     5,000,000
table_exists(table_name)[source]

Check if a table exists.

Parameters:

table_name (str) – Table name (without project prefix)

Returns:

True if table exists, False otherwise

Return type:

bool

get_table_schema(table_name)[source]

Get schema information for a table.

Parameters:

table_name (str) – Table name (without project prefix)

Returns:

List of (column_name, data_type) tuples

Return type:

List[tuple]

class opdi.monitoring.AdvancedStatsCollector(spark, config, suspicious_threshold=25000000, known_outages=None)[source]

Bases: object

Advanced statistics and data quality monitoring.

Tracks daily row counts, detects anomalies, and cross-references with known outages and MinIO bucket availability.

Parameters:
DEFAULT_KNOWN_OUTAGES = [('2023-01-02 23:00:00', '2023-01-03 10:00:00'), ('2023-01-18 11:00:00', '2023-01-23 07:00:00'), ('2023-06-21 13:00:00', '2023-06-21 22:00:00'), ('2023-11-15 06:00:00', '2023-11-16 08:00:00'), ('2023-11-20 01:00:00', '2023-11-20 03:00:00'), ('2023-12-02 08:00:00', '2023-12-05 03:00:00'), ('2024-05-20 10:00:00', '2024-05-21 05:00:00')]
__init__(spark, config, suspicious_threshold=25000000, known_outages=None)[source]

Initialize advanced stats collector.

Parameters:
  • spark (pyspark.sql.SparkSession) – Active SparkSession

  • config (OPDIConfig) – OPDI configuration object

  • suspicious_threshold (int) – Row count threshold below which data is suspicious

  • known_outages (List[Tuple[str, str]] | None) – List of (start_datetime, end_datetime) outage periods

fetch_daily_row_counts(table_name, date_column='event_time')[source]

Fetch daily row counts from a table.

Leverages partition pruning for efficient scanning.

Parameters:
  • table_name (str) – Table name (without project prefix)

  • date_column (str) – Column to use for date grouping

Returns:

date, row_count

Return type:

DataFrame with columns

Example

>>> collector = AdvancedStatsCollector(spark, config)
>>> df = collector.fetch_daily_row_counts("osn_statevectors_v2")
>>> print(f"Date range: {df['date'].min()} to {df['date'].max()}")
fill_missing_dates(df)[source]

Fill in missing dates with 0 row counts.

Parameters:

df (pandas.DataFrame) – DataFrame with date and row_count columns

Returns:

DataFrame with all dates filled in

Return type:

pandas.DataFrame

check_known_outage(date)[source]

Check if a date falls within any known outage period.

Parameters:

date (pandas.Timestamp) – Date to check

Returns:

Outage period string if date is affected, empty string otherwise

Return type:

str

setup_minio_client()[source]

Set up MinIO client (mc) for accessing OpenSky bucket.

Requires OSN_USERNAME and OSN_KEY environment variables.

Returns:

True if setup successful, False otherwise

Return type:

bool

get_minio_files_for_date(date)[source]

Query MinIO bucket for files available on a specific date.

Parameters:

date (pandas.Timestamp) – Date to query

Returns:

Dictionary with ‘file_count’ and ‘hours’ (list of hours with data)

Return type:

Dict[str, any]

analyze_missing_data(df, check_minio=True)[source]

Analyze missing or suspicious data and add diagnostic columns.

Parameters:
  • df (pandas.DataFrame) – DataFrame with date and row_count columns

  • check_minio (bool) – Whether to check MinIO bucket for file availability

Returns:

  • is_suspicious: bool

  • known_outage: str

  • minio_file_count: int

  • minio_hours_available: str

  • explanation: str

Return type:

DataFrame with additional diagnostic columns

visualize_daily_counts(df, output_path, title='Daily Row Counts - OpenSky Network State Vectors')[source]

Create and save interactive Plotly visualization of daily row counts.

Parameters:
  • df (pandas.DataFrame) – DataFrame with date, row_count, and analysis columns

  • output_path (str) – Path to save the HTML plot

  • title (str) – Chart title

Return type:

None

Example

>>> collector.visualize_daily_counts(
...     df,
...     "daily_counts.html",
...     "Daily Counts - State Vectors"
... )
generate_quality_report(table_name, output_csv, output_html, date_column='event_time', check_minio=True)[source]

Generate complete data quality report.

Parameters:
  • table_name (str) – Table to analyze (without project prefix)

  • output_csv (str) – Path to save CSV report

  • output_html (str) – Path to save HTML visualization

  • date_column (str) – Column to use for date grouping

  • check_minio (bool) – Whether to check MinIO bucket

Returns:

DataFrame with complete analysis

Return type:

pandas.DataFrame

Example

>>> collector = AdvancedStatsCollector(spark, config)
>>> df = collector.generate_quality_report(
...     "osn_statevectors_v2",
...     "daily_counts.csv",
...     "daily_counts.html"
... )
opdi.monitoring.print_table_counts(spark, tables, project='project_opdi')[source]

Standalone function to print table counts.

Parameters:
  • spark (pyspark.sql.SparkSession) – Active SparkSession

  • tables (List[str]) – List of fully qualified table names (e.g., “project_opdi.opdi_flight_list”)

  • project (str) – Project name (default: “project_opdi”)

Return type:

None

Example

>>> from opdi.monitoring.basic_stats import print_table_counts
>>> print_table_counts(spark, ["project_opdi.opdi_flight_list"])

Basic Statistics – opdi.monitoring.basic_stats

Basic statistics collection for OPDI tables.

Provides simple row count and basic statistics for monitoring data pipeline health and completeness.

class opdi.monitoring.basic_stats.BasicStatsCollector(spark, config)[source]

Bases: object

Collects basic statistics for OPDI tables.

Provides simple row counts and table existence checks for monitoring pipeline execution and data availability.

Parameters:
DEFAULT_TABLES = ['opdi_flight_list', 'opdi_flight_events', 'opdi_measurements']
__init__(spark, config)[source]

Initialize basic stats collector.

Parameters:
get_table_count(table_name)[source]

Get row count for a single table.

Parameters:

table_name (str) – Table name (without project prefix)

Returns:

Number of rows in the table

Raises:

Exception – If table doesn’t exist

Return type:

int

get_all_table_counts(tables=None)[source]

Get row counts for multiple tables.

Parameters:

tables (List[str]) – List of table names. If None, uses DEFAULT_TABLES

Returns:

Dictionary mapping table names to row counts

Return type:

Dict[str, int]

Example

>>> collector = BasicStatsCollector(spark, config)
>>> counts = collector.get_all_table_counts()
>>> print(counts)
{'opdi_flight_list': 1500000, 'opdi_flight_events': 3000000}
print_summary(tables=None)[source]

Print formatted summary of table counts.

Parameters:

tables (List[str]) – List of table names. If None, uses DEFAULT_TABLES

Return type:

None

Example

>>> collector = BasicStatsCollector(spark, config)
>>> collector.print_summary()
Row counts per table
opdi_flight_list                      1,500,000
opdi_flight_events                    3,000,000
opdi_measurements                     5,000,000
table_exists(table_name)[source]

Check if a table exists.

Parameters:

table_name (str) – Table name (without project prefix)

Returns:

True if table exists, False otherwise

Return type:

bool

get_table_schema(table_name)[source]

Get schema information for a table.

Parameters:

table_name (str) – Table name (without project prefix)

Returns:

List of (column_name, data_type) tuples

Return type:

List[tuple]

opdi.monitoring.basic_stats.print_table_counts(spark, tables, project='project_opdi')[source]

Standalone function to print table counts.

Parameters:
  • spark (pyspark.sql.SparkSession) – Active SparkSession

  • tables (List[str]) – List of fully qualified table names (e.g., “project_opdi.opdi_flight_list”)

  • project (str) – Project name (default: “project_opdi”)

Return type:

None

Example

>>> from opdi.monitoring.basic_stats import print_table_counts
>>> print_table_counts(spark, ["project_opdi.opdi_flight_list"])

Advanced Statistics – opdi.monitoring.advanced_stats

Advanced statistics and data quality monitoring for OPDI.

Provides comprehensive data quality analysis including: - Daily row count trends - Anomaly detection (suspicious low counts) - Known outage tracking - MinIO bucket availability checks - Interactive Plotly visualizations

class opdi.monitoring.advanced_stats.AdvancedStatsCollector(spark, config, suspicious_threshold=25000000, known_outages=None)[source]

Bases: object

Advanced statistics and data quality monitoring.

Tracks daily row counts, detects anomalies, and cross-references with known outages and MinIO bucket availability.

Parameters:
DEFAULT_KNOWN_OUTAGES = [('2023-01-02 23:00:00', '2023-01-03 10:00:00'), ('2023-01-18 11:00:00', '2023-01-23 07:00:00'), ('2023-06-21 13:00:00', '2023-06-21 22:00:00'), ('2023-11-15 06:00:00', '2023-11-16 08:00:00'), ('2023-11-20 01:00:00', '2023-11-20 03:00:00'), ('2023-12-02 08:00:00', '2023-12-05 03:00:00'), ('2024-05-20 10:00:00', '2024-05-21 05:00:00')]
__init__(spark, config, suspicious_threshold=25000000, known_outages=None)[source]

Initialize advanced stats collector.

Parameters:
  • spark (pyspark.sql.SparkSession) – Active SparkSession

  • config (OPDIConfig) – OPDI configuration object

  • suspicious_threshold (int) – Row count threshold below which data is suspicious

  • known_outages (List[Tuple[str, str]] | None) – List of (start_datetime, end_datetime) outage periods

fetch_daily_row_counts(table_name, date_column='event_time')[source]

Fetch daily row counts from a table.

Leverages partition pruning for efficient scanning.

Parameters:
  • table_name (str) – Table name (without project prefix)

  • date_column (str) – Column to use for date grouping

Returns:

date, row_count

Return type:

DataFrame with columns

Example

>>> collector = AdvancedStatsCollector(spark, config)
>>> df = collector.fetch_daily_row_counts("osn_statevectors_v2")
>>> print(f"Date range: {df['date'].min()} to {df['date'].max()}")
fill_missing_dates(df)[source]

Fill in missing dates with 0 row counts.

Parameters:

df (pandas.DataFrame) – DataFrame with date and row_count columns

Returns:

DataFrame with all dates filled in

Return type:

pandas.DataFrame

check_known_outage(date)[source]

Check if a date falls within any known outage period.

Parameters:

date (pandas.Timestamp) – Date to check

Returns:

Outage period string if date is affected, empty string otherwise

Return type:

str

setup_minio_client()[source]

Set up MinIO client (mc) for accessing OpenSky bucket.

Requires OSN_USERNAME and OSN_KEY environment variables.

Returns:

True if setup successful, False otherwise

Return type:

bool

get_minio_files_for_date(date)[source]

Query MinIO bucket for files available on a specific date.

Parameters:

date (pandas.Timestamp) – Date to query

Returns:

Dictionary with ‘file_count’ and ‘hours’ (list of hours with data)

Return type:

Dict[str, any]

analyze_missing_data(df, check_minio=True)[source]

Analyze missing or suspicious data and add diagnostic columns.

Parameters:
  • df (pandas.DataFrame) – DataFrame with date and row_count columns

  • check_minio (bool) – Whether to check MinIO bucket for file availability

Returns:

  • is_suspicious: bool

  • known_outage: str

  • minio_file_count: int

  • minio_hours_available: str

  • explanation: str

Return type:

DataFrame with additional diagnostic columns

visualize_daily_counts(df, output_path, title='Daily Row Counts - OpenSky Network State Vectors')[source]

Create and save interactive Plotly visualization of daily row counts.

Parameters:
  • df (pandas.DataFrame) – DataFrame with date, row_count, and analysis columns

  • output_path (str) – Path to save the HTML plot

  • title (str) – Chart title

Return type:

None

Example

>>> collector.visualize_daily_counts(
...     df,
...     "daily_counts.html",
...     "Daily Counts - State Vectors"
... )
generate_quality_report(table_name, output_csv, output_html, date_column='event_time', check_minio=True)[source]

Generate complete data quality report.

Parameters:
  • table_name (str) – Table to analyze (without project prefix)

  • output_csv (str) – Path to save CSV report

  • output_html (str) – Path to save HTML visualization

  • date_column (str) – Column to use for date grouping

  • check_minio (bool) – Whether to check MinIO bucket

Returns:

DataFrame with complete analysis

Return type:

pandas.DataFrame

Example

>>> collector = AdvancedStatsCollector(spark, config)
>>> df = collector.generate_quality_report(
...     "osn_statevectors_v2",
...     "daily_counts.csv",
...     "daily_counts.html"
... )