Source code for opdi.monitoring.basic_stats

"""
Basic statistics collection for OPDI tables.

Provides simple row count and basic statistics for monitoring
data pipeline health and completeness.
"""

from typing import List, Dict
from pyspark.sql import SparkSession

from opdi.config import OPDIConfig


[docs] class BasicStatsCollector: """ Collects basic statistics for OPDI tables. Provides simple row counts and table existence checks for monitoring pipeline execution and data availability. """ DEFAULT_TABLES = [ "opdi_flight_list", "opdi_flight_events", "opdi_measurements", ]
[docs] def __init__(self, spark: SparkSession, config: OPDIConfig): """ Initialize basic stats collector. Args: spark: Active SparkSession config: OPDI configuration object """ self.spark = spark self.config = config self.project = config.project.project_name
[docs] def get_table_count(self, table_name: str) -> int: """ Get row count for a single table. Args: table_name: Table name (without project prefix) Returns: Number of rows in the table Raises: Exception: If table doesn't exist """ full_table_name = f"{self.project}.{table_name}" count = self.spark.sql(f"SELECT COUNT(*) AS cnt FROM {full_table_name}").collect()[0]["cnt"] return count
[docs] def get_all_table_counts(self, tables: List[str] = None) -> Dict[str, int]: """ Get row counts for multiple tables. Args: tables: List of table names. If None, uses DEFAULT_TABLES Returns: Dictionary mapping table names to row counts Example: >>> collector = BasicStatsCollector(spark, config) >>> counts = collector.get_all_table_counts() >>> print(counts) {'opdi_flight_list': 1500000, 'opdi_flight_events': 3000000} """ if tables is None: tables = self.DEFAULT_TABLES results = {} for table in tables: try: count = self.get_table_count(table) results[table] = count except Exception as e: print(f"Error getting count for {table}: {e}") results[table] = -1 # Indicate error return results
[docs] def print_summary(self, tables: List[str] = None) -> None: """ Print formatted summary of table counts. Args: tables: List of table names. If None, uses DEFAULT_TABLES Example: >>> collector = BasicStatsCollector(spark, config) >>> collector.print_summary() Row counts per table opdi_flight_list 1,500,000 opdi_flight_events 3,000,000 opdi_measurements 5,000,000 """ if tables is None: tables = self.DEFAULT_TABLES print("\nRow counts per table") print("-" * 60) for table in tables: full_table_name = f"{self.project}.{table}" try: count = self.get_table_count(table) print(f"{full_table_name:<45} {count:>12,}") except Exception as e: print(f"{full_table_name:<45} {'ERROR':>12}") print(f" └─ {str(e)}") print("-" * 60)
[docs] def table_exists(self, table_name: str) -> bool: """ Check if a table exists. Args: table_name: Table name (without project prefix) Returns: True if table exists, False otherwise """ try: full_table_name = f"{self.project}.{table_name}" self.spark.sql(f"DESCRIBE {full_table_name}") return True except Exception: return False
[docs] def get_table_schema(self, table_name: str) -> List[tuple]: """ Get schema information for a table. Args: table_name: Table name (without project prefix) Returns: List of (column_name, data_type) tuples """ full_table_name = f"{self.project}.{table_name}" schema_df = self.spark.sql(f"DESCRIBE {full_table_name}") return [(row.col_name, row.data_type) for row in schema_df.collect()]