Output – opdi.output

Output modules for exporting OPDI data.

class opdi.output.CSVExporter(input_dir, output_dir, size_threshold_mb=200)[source]

Bases: object

Exports parquet files to CSV.gz format with deduplication.

This class handles the final data cleanup step, removing duplicate rows and exporting data in both parquet and compressed CSV formats.

Parameters:
  • input_dir (str)

  • output_dir (str)

  • size_threshold_mb (int)

__init__(input_dir, output_dir, size_threshold_mb=200)[source]

Initialize CSV exporter.

Parameters:
  • input_dir (str) – Directory containing input parquet files

  • output_dir (str) – Directory for output files

  • size_threshold_mb (int) – Skip files larger than this size (MB)

clean_and_export_file(input_file)[source]

Clean and export a single parquet file.

Parameters:

input_file (str) – Path to input parquet file

Returns:

Tuple of (original_rows, cleaned_rows)

Return type:

tuple[int, int]

clean_and_export_all()[source]

Clean and export all parquet files in input directory.

Returns:

Dictionary with keys files_processed, files_skipped, total_original_rows, total_cleaned_rows, and duplicates_removed.

Return type:

dict

Example

>>> exporter = CSVExporter(
...     "data/measurements",
...     "data/measurements_clean"
... )
>>> stats = exporter.clean_and_export_all()
>>> print(f"Processed {stats['files_processed']} files")
>>> print(f"Removed {stats['duplicates_removed']} duplicates")
class opdi.output.ParquetExporter(spark, config, output_dir, interval_days=10)[source]

Bases: object

Exports OPDI tables to parquet files.

Supports different interval strategies: - Monthly intervals for flight lists - N-day intervals for events and measurements

Parameters:
__init__(spark, config, output_dir, interval_days=10)[source]

Initialize parquet exporter.

Parameters:
  • spark (pyspark.sql.SparkSession) – Active SparkSession

  • config (OPDIConfig) – OPDI configuration object

  • output_dir (str) – Base directory for output files

  • interval_days (int) – Interval size in days for events/measurements

static safe_to_pandas(df)[source]

Safely convert Spark DataFrame to pandas by casting timestamps to strings.

Parameters:

df (pyspark.sql.DataFrame) – PySpark DataFrame

Returns:

pandas DataFrame

Return type:

pandas.DataFrame

Example

>>> pdf = ParquetExporter.safe_to_pandas(spark_df)
generate_intervals(start_date, end_date, step_days)[source]

Generate rolling date intervals of fixed length.

Parameters:
  • start_date (date) – Interval start

  • end_date (date) – Interval end

  • step_days (int) – Step size in days

Returns:

List of (start_date, end_date) tuples

Return type:

List[Tuple[date, date]]

Example

>>> exporter = ParquetExporter(spark, config, "/data/exports")
>>> intervals = exporter.generate_intervals(
...     date(2024, 1, 1),
...     date(2024, 1, 31),
...     10
... )
>>> print(len(intervals))  # 4 intervals
generate_month_intervals(start_date, end_date)[source]

Generate full-month intervals.

The first interval begins at the first day of start_date’s month. The last interval ends at the first day of the month containing end_date, or at end_date if end_date is exactly on a month boundary.

Parameters:
  • start_date (date) – Start date

  • end_date (date) – End date (exclusive upper bound)

Returns:

List of (month_start, month_end) tuples

Return type:

List[Tuple[date, date]]

Example

>>> intervals = exporter.generate_month_intervals(
...     date(2024, 1, 15),
...     date(2024, 3, 10)
... )
>>> # Returns: [(2024-01-01, 2024-02-01), (2024-02-01, 2024-03-01), (2024-03-01, 2024-04-01)]
export_flight_list(month_start, month_end, version='v0.0.2', overwrite=False)[source]

Export OPDI flight list for a monthly interval.

Files are written to:

{output_dir}/flight_list/flight_list_YYYYMM.parquet

Parameters:
  • month_start (date) – Month interval start (1st of month)

  • month_end (date) – Month interval end (1st of next month or stop)

  • version (str) – Version string to add to records

  • overwrite (bool) – Whether to overwrite existing files

Returns:

Path to output file if created, None if skipped

Return type:

str | None

Example

>>> exporter.export_flight_list(
...     date(2024, 1, 1),
...     date(2024, 2, 1),
...     version="v0.0.2"
... )
export_flight_events(start_date, end_date, version='v0.0.2', overwrite=False)[source]

Export OPDI flight events for a date interval.

Files are written to:

{output_dir}/flight_events/flight_events_YYYYMMDD_YYYYMMDD.parquet

Parameters:
  • start_date (date) – Interval start

  • end_date (date) – Interval end

  • version (str) – Version string to add to records

  • overwrite (bool) – Whether to overwrite existing files

Returns:

Path to output file if created, None if skipped

Return type:

str | None

export_measurements(start_date, end_date, version='v0.0.2', overwrite=False)[source]

Export OPDI measurements for a date interval.

Files are written to:

{output_dir}/measurements/measurements_YYYYMMDD_YYYYMMDD.parquet

Parameters:
  • start_date (date) – Interval start

  • end_date (date) – Interval end

  • version (str) – Version string to add to records

  • overwrite (bool) – Whether to overwrite existing files

Returns:

Path to output file if created, None if skipped

Return type:

str | None

export_all(start_date, end_date, export_flight_list=True, export_flight_events=True, export_measurements=True, version='v0.0.2', overwrite=False, last_n_months=None)[source]

Export all OPDI tables for a date range.

Parameters:
  • start_date (date) – Export start date

  • end_date (date) – Export end date

  • export_flight_list (bool) – Whether to export flight list

  • export_flight_events (bool) – Whether to export flight events

  • export_measurements (bool) – Whether to export measurements

  • version (str) – Version string to add to records

  • overwrite (bool) – Whether to overwrite existing files

  • last_n_months (int | None) – If set, only export last N months

Returns:

Dictionary with keys flight_list_files, event_files, and measurement_files.

Return type:

dict

Example

>>> from datetime import date
>>> exporter = ParquetExporter(spark, config, "/data/exports")
>>> stats = exporter.export_all(
...     date(2024, 1, 1),
...     date(2024, 3, 1),
...     last_n_months=2
... )
>>> print(f"Exported {stats['flight_list_files']} flight list files")
opdi.output.clean_and_save_data(input_dir, output_dir, size_threshold_mb=200)[source]

Standalone function to clean and export parquet files.

Parameters:
  • input_dir (str) – Directory containing parquet files

  • output_dir (str) – Directory for cleaned output

  • size_threshold_mb (int) – Skip files larger than this (MB)

Return type:

None

Example

>>> from opdi.output.csv_exporter import clean_and_save_data
>>> clean_and_save_data("data/measurements", "data/measurements_clean")

Parquet Exporter – opdi.output.parquet_exporter

Parquet export module for OPDI data extraction.

Provides functionality to export OPDI tables (flight list, events, measurements) to parquet files with configurable time intervals.

class opdi.output.parquet_exporter.ParquetExporter(spark, config, output_dir, interval_days=10)[source]

Bases: object

Exports OPDI tables to parquet files.

Supports different interval strategies: - Monthly intervals for flight lists - N-day intervals for events and measurements

Parameters:
__init__(spark, config, output_dir, interval_days=10)[source]

Initialize parquet exporter.

Parameters:
  • spark (pyspark.sql.SparkSession) – Active SparkSession

  • config (OPDIConfig) – OPDI configuration object

  • output_dir (str) – Base directory for output files

  • interval_days (int) – Interval size in days for events/measurements

static safe_to_pandas(df)[source]

Safely convert Spark DataFrame to pandas by casting timestamps to strings.

Parameters:

df (pyspark.sql.DataFrame) – PySpark DataFrame

Returns:

pandas DataFrame

Return type:

pandas.DataFrame

Example

>>> pdf = ParquetExporter.safe_to_pandas(spark_df)
generate_intervals(start_date, end_date, step_days)[source]

Generate rolling date intervals of fixed length.

Parameters:
  • start_date (date) – Interval start

  • end_date (date) – Interval end

  • step_days (int) – Step size in days

Returns:

List of (start_date, end_date) tuples

Return type:

List[Tuple[date, date]]

Example

>>> exporter = ParquetExporter(spark, config, "/data/exports")
>>> intervals = exporter.generate_intervals(
...     date(2024, 1, 1),
...     date(2024, 1, 31),
...     10
... )
>>> print(len(intervals))  # 4 intervals
generate_month_intervals(start_date, end_date)[source]

Generate full-month intervals.

The first interval begins at the first day of start_date’s month. The last interval ends at the first day of the month containing end_date, or at end_date if end_date is exactly on a month boundary.

Parameters:
  • start_date (date) – Start date

  • end_date (date) – End date (exclusive upper bound)

Returns:

List of (month_start, month_end) tuples

Return type:

List[Tuple[date, date]]

Example

>>> intervals = exporter.generate_month_intervals(
...     date(2024, 1, 15),
...     date(2024, 3, 10)
... )
>>> # Returns: [(2024-01-01, 2024-02-01), (2024-02-01, 2024-03-01), (2024-03-01, 2024-04-01)]
export_flight_list(month_start, month_end, version='v0.0.2', overwrite=False)[source]

Export OPDI flight list for a monthly interval.

Files are written to:

{output_dir}/flight_list/flight_list_YYYYMM.parquet

Parameters:
  • month_start (date) – Month interval start (1st of month)

  • month_end (date) – Month interval end (1st of next month or stop)

  • version (str) – Version string to add to records

  • overwrite (bool) – Whether to overwrite existing files

Returns:

Path to output file if created, None if skipped

Return type:

str | None

Example

>>> exporter.export_flight_list(
...     date(2024, 1, 1),
...     date(2024, 2, 1),
...     version="v0.0.2"
... )
export_flight_events(start_date, end_date, version='v0.0.2', overwrite=False)[source]

Export OPDI flight events for a date interval.

Files are written to:

{output_dir}/flight_events/flight_events_YYYYMMDD_YYYYMMDD.parquet

Parameters:
  • start_date (date) – Interval start

  • end_date (date) – Interval end

  • version (str) – Version string to add to records

  • overwrite (bool) – Whether to overwrite existing files

Returns:

Path to output file if created, None if skipped

Return type:

str | None

export_measurements(start_date, end_date, version='v0.0.2', overwrite=False)[source]

Export OPDI measurements for a date interval.

Files are written to:

{output_dir}/measurements/measurements_YYYYMMDD_YYYYMMDD.parquet

Parameters:
  • start_date (date) – Interval start

  • end_date (date) – Interval end

  • version (str) – Version string to add to records

  • overwrite (bool) – Whether to overwrite existing files

Returns:

Path to output file if created, None if skipped

Return type:

str | None

export_all(start_date, end_date, export_flight_list=True, export_flight_events=True, export_measurements=True, version='v0.0.2', overwrite=False, last_n_months=None)[source]

Export all OPDI tables for a date range.

Parameters:
  • start_date (date) – Export start date

  • end_date (date) – Export end date

  • export_flight_list (bool) – Whether to export flight list

  • export_flight_events (bool) – Whether to export flight events

  • export_measurements (bool) – Whether to export measurements

  • version (str) – Version string to add to records

  • overwrite (bool) – Whether to overwrite existing files

  • last_n_months (int | None) – If set, only export last N months

Returns:

Dictionary with keys flight_list_files, event_files, and measurement_files.

Return type:

dict

Example

>>> from datetime import date
>>> exporter = ParquetExporter(spark, config, "/data/exports")
>>> stats = exporter.export_all(
...     date(2024, 1, 1),
...     date(2024, 3, 1),
...     last_n_months=2
... )
>>> print(f"Exported {stats['flight_list_files']} flight list files")

CSV Exporter – opdi.output.csv_exporter

CSV export and data cleanup module.

Provides deduplication and export functionality for converting parquet files to compressed CSV format.

class opdi.output.csv_exporter.CSVExporter(input_dir, output_dir, size_threshold_mb=200)[source]

Bases: object

Exports parquet files to CSV.gz format with deduplication.

This class handles the final data cleanup step, removing duplicate rows and exporting data in both parquet and compressed CSV formats.

Parameters:
  • input_dir (str)

  • output_dir (str)

  • size_threshold_mb (int)

__init__(input_dir, output_dir, size_threshold_mb=200)[source]

Initialize CSV exporter.

Parameters:
  • input_dir (str) – Directory containing input parquet files

  • output_dir (str) – Directory for output files

  • size_threshold_mb (int) – Skip files larger than this size (MB)

clean_and_export_file(input_file)[source]

Clean and export a single parquet file.

Parameters:

input_file (str) – Path to input parquet file

Returns:

Tuple of (original_rows, cleaned_rows)

Return type:

tuple[int, int]

clean_and_export_all()[source]

Clean and export all parquet files in input directory.

Returns:

Dictionary with keys files_processed, files_skipped, total_original_rows, total_cleaned_rows, and duplicates_removed.

Return type:

dict

Example

>>> exporter = CSVExporter(
...     "data/measurements",
...     "data/measurements_clean"
... )
>>> stats = exporter.clean_and_export_all()
>>> print(f"Processed {stats['files_processed']} files")
>>> print(f"Removed {stats['duplicates_removed']} duplicates")
opdi.output.csv_exporter.clean_and_save_data(input_dir, output_dir, size_threshold_mb=200)[source]

Standalone function to clean and export parquet files.

Parameters:
  • input_dir (str) – Directory containing parquet files

  • output_dir (str) – Directory for cleaned output

  • size_threshold_mb (int) – Skip files larger than this (MB)

Return type:

None

Example

>>> from opdi.output.csv_exporter import clean_and_save_data
>>> clean_and_save_data("data/measurements", "data/measurements_clean")