Output – opdi.output¶
Output modules for exporting OPDI data.
- class opdi.output.CSVExporter(input_dir, output_dir, size_threshold_mb=200)[source]¶
Bases:
objectExports parquet files to CSV.gz format with deduplication.
This class handles the final data cleanup step, removing duplicate rows and exporting data in both parquet and compressed CSV formats.
- clean_and_export_all()[source]¶
Clean and export all parquet files in input directory.
- Returns:
Dictionary with keys
files_processed,files_skipped,total_original_rows,total_cleaned_rows, andduplicates_removed.- Return type:
Example
>>> exporter = CSVExporter( ... "data/measurements", ... "data/measurements_clean" ... ) >>> stats = exporter.clean_and_export_all() >>> print(f"Processed {stats['files_processed']} files") >>> print(f"Removed {stats['duplicates_removed']} duplicates")
- class opdi.output.ParquetExporter(spark, config, output_dir, interval_days=10)[source]¶
Bases:
objectExports OPDI tables to parquet files.
Supports different interval strategies: - Monthly intervals for flight lists - N-day intervals for events and measurements
- Parameters:
spark (pyspark.sql.SparkSession)
config (OPDIConfig)
output_dir (str)
interval_days (int)
- __init__(spark, config, output_dir, interval_days=10)[source]¶
Initialize parquet exporter.
- Parameters:
spark (pyspark.sql.SparkSession) – Active SparkSession
config (OPDIConfig) – OPDI configuration object
output_dir (str) – Base directory for output files
interval_days (int) – Interval size in days for events/measurements
- static safe_to_pandas(df)[source]¶
Safely convert Spark DataFrame to pandas by casting timestamps to strings.
- Parameters:
df (pyspark.sql.DataFrame) – PySpark DataFrame
- Returns:
pandas DataFrame
- Return type:
Example
>>> pdf = ParquetExporter.safe_to_pandas(spark_df)
- generate_intervals(start_date, end_date, step_days)[source]¶
Generate rolling date intervals of fixed length.
- Parameters:
- Returns:
List of (start_date, end_date) tuples
- Return type:
Example
>>> exporter = ParquetExporter(spark, config, "/data/exports") >>> intervals = exporter.generate_intervals( ... date(2024, 1, 1), ... date(2024, 1, 31), ... 10 ... ) >>> print(len(intervals)) # 4 intervals
- generate_month_intervals(start_date, end_date)[source]¶
Generate full-month intervals.
The first interval begins at the first day of start_date’s month. The last interval ends at the first day of the month containing end_date, or at end_date if end_date is exactly on a month boundary.
- Parameters:
- Returns:
List of (month_start, month_end) tuples
- Return type:
Example
>>> intervals = exporter.generate_month_intervals( ... date(2024, 1, 15), ... date(2024, 3, 10) ... ) >>> # Returns: [(2024-01-01, 2024-02-01), (2024-02-01, 2024-03-01), (2024-03-01, 2024-04-01)]
- export_flight_list(month_start, month_end, version='v0.0.2', overwrite=False)[source]¶
Export OPDI flight list for a monthly interval.
- Files are written to:
{output_dir}/flight_list/flight_list_YYYYMM.parquet
- Parameters:
- Returns:
Path to output file if created, None if skipped
- Return type:
str | None
Example
>>> exporter.export_flight_list( ... date(2024, 1, 1), ... date(2024, 2, 1), ... version="v0.0.2" ... )
- export_flight_events(start_date, end_date, version='v0.0.2', overwrite=False)[source]¶
Export OPDI flight events for a date interval.
- Files are written to:
{output_dir}/flight_events/flight_events_YYYYMMDD_YYYYMMDD.parquet
- export_measurements(start_date, end_date, version='v0.0.2', overwrite=False)[source]¶
Export OPDI measurements for a date interval.
- Files are written to:
{output_dir}/measurements/measurements_YYYYMMDD_YYYYMMDD.parquet
- export_all(start_date, end_date, export_flight_list=True, export_flight_events=True, export_measurements=True, version='v0.0.2', overwrite=False, last_n_months=None)[source]¶
Export all OPDI tables for a date range.
- Parameters:
start_date (date) – Export start date
end_date (date) – Export end date
export_flight_list (bool) – Whether to export flight list
export_flight_events (bool) – Whether to export flight events
export_measurements (bool) – Whether to export measurements
version (str) – Version string to add to records
overwrite (bool) – Whether to overwrite existing files
last_n_months (int | None) – If set, only export last N months
- Returns:
Dictionary with keys
flight_list_files,event_files, andmeasurement_files.- Return type:
Example
>>> from datetime import date >>> exporter = ParquetExporter(spark, config, "/data/exports") >>> stats = exporter.export_all( ... date(2024, 1, 1), ... date(2024, 3, 1), ... last_n_months=2 ... ) >>> print(f"Exported {stats['flight_list_files']} flight list files")
- opdi.output.clean_and_save_data(input_dir, output_dir, size_threshold_mb=200)[source]¶
Standalone function to clean and export parquet files.
- Parameters:
- Return type:
None
Example
>>> from opdi.output.csv_exporter import clean_and_save_data >>> clean_and_save_data("data/measurements", "data/measurements_clean")
Parquet Exporter – opdi.output.parquet_exporter¶
Parquet export module for OPDI data extraction.
Provides functionality to export OPDI tables (flight list, events, measurements) to parquet files with configurable time intervals.
- class opdi.output.parquet_exporter.ParquetExporter(spark, config, output_dir, interval_days=10)[source]¶
Bases:
objectExports OPDI tables to parquet files.
Supports different interval strategies: - Monthly intervals for flight lists - N-day intervals for events and measurements
- Parameters:
spark (pyspark.sql.SparkSession)
config (OPDIConfig)
output_dir (str)
interval_days (int)
- __init__(spark, config, output_dir, interval_days=10)[source]¶
Initialize parquet exporter.
- Parameters:
spark (pyspark.sql.SparkSession) – Active SparkSession
config (OPDIConfig) – OPDI configuration object
output_dir (str) – Base directory for output files
interval_days (int) – Interval size in days for events/measurements
- static safe_to_pandas(df)[source]¶
Safely convert Spark DataFrame to pandas by casting timestamps to strings.
- Parameters:
df (pyspark.sql.DataFrame) – PySpark DataFrame
- Returns:
pandas DataFrame
- Return type:
Example
>>> pdf = ParquetExporter.safe_to_pandas(spark_df)
- generate_intervals(start_date, end_date, step_days)[source]¶
Generate rolling date intervals of fixed length.
- Parameters:
- Returns:
List of (start_date, end_date) tuples
- Return type:
Example
>>> exporter = ParquetExporter(spark, config, "/data/exports") >>> intervals = exporter.generate_intervals( ... date(2024, 1, 1), ... date(2024, 1, 31), ... 10 ... ) >>> print(len(intervals)) # 4 intervals
- generate_month_intervals(start_date, end_date)[source]¶
Generate full-month intervals.
The first interval begins at the first day of start_date’s month. The last interval ends at the first day of the month containing end_date, or at end_date if end_date is exactly on a month boundary.
- Parameters:
- Returns:
List of (month_start, month_end) tuples
- Return type:
Example
>>> intervals = exporter.generate_month_intervals( ... date(2024, 1, 15), ... date(2024, 3, 10) ... ) >>> # Returns: [(2024-01-01, 2024-02-01), (2024-02-01, 2024-03-01), (2024-03-01, 2024-04-01)]
- export_flight_list(month_start, month_end, version='v0.0.2', overwrite=False)[source]¶
Export OPDI flight list for a monthly interval.
- Files are written to:
{output_dir}/flight_list/flight_list_YYYYMM.parquet
- Parameters:
- Returns:
Path to output file if created, None if skipped
- Return type:
str | None
Example
>>> exporter.export_flight_list( ... date(2024, 1, 1), ... date(2024, 2, 1), ... version="v0.0.2" ... )
- export_flight_events(start_date, end_date, version='v0.0.2', overwrite=False)[source]¶
Export OPDI flight events for a date interval.
- Files are written to:
{output_dir}/flight_events/flight_events_YYYYMMDD_YYYYMMDD.parquet
- export_measurements(start_date, end_date, version='v0.0.2', overwrite=False)[source]¶
Export OPDI measurements for a date interval.
- Files are written to:
{output_dir}/measurements/measurements_YYYYMMDD_YYYYMMDD.parquet
- export_all(start_date, end_date, export_flight_list=True, export_flight_events=True, export_measurements=True, version='v0.0.2', overwrite=False, last_n_months=None)[source]¶
Export all OPDI tables for a date range.
- Parameters:
start_date (date) – Export start date
end_date (date) – Export end date
export_flight_list (bool) – Whether to export flight list
export_flight_events (bool) – Whether to export flight events
export_measurements (bool) – Whether to export measurements
version (str) – Version string to add to records
overwrite (bool) – Whether to overwrite existing files
last_n_months (int | None) – If set, only export last N months
- Returns:
Dictionary with keys
flight_list_files,event_files, andmeasurement_files.- Return type:
Example
>>> from datetime import date >>> exporter = ParquetExporter(spark, config, "/data/exports") >>> stats = exporter.export_all( ... date(2024, 1, 1), ... date(2024, 3, 1), ... last_n_months=2 ... ) >>> print(f"Exported {stats['flight_list_files']} flight list files")
CSV Exporter – opdi.output.csv_exporter¶
CSV export and data cleanup module.
Provides deduplication and export functionality for converting parquet files to compressed CSV format.
- class opdi.output.csv_exporter.CSVExporter(input_dir, output_dir, size_threshold_mb=200)[source]¶
Bases:
objectExports parquet files to CSV.gz format with deduplication.
This class handles the final data cleanup step, removing duplicate rows and exporting data in both parquet and compressed CSV formats.
- clean_and_export_all()[source]¶
Clean and export all parquet files in input directory.
- Returns:
Dictionary with keys
files_processed,files_skipped,total_original_rows,total_cleaned_rows, andduplicates_removed.- Return type:
Example
>>> exporter = CSVExporter( ... "data/measurements", ... "data/measurements_clean" ... ) >>> stats = exporter.clean_and_export_all() >>> print(f"Processed {stats['files_processed']} files") >>> print(f"Removed {stats['duplicates_removed']} duplicates")
- opdi.output.csv_exporter.clean_and_save_data(input_dir, output_dir, size_threshold_mb=200)[source]¶
Standalone function to clean and export parquet files.
- Parameters:
- Return type:
None
Example
>>> from opdi.output.csv_exporter import clean_and_save_data >>> clean_and_save_data("data/measurements", "data/measurements_clean")