Ingestion – opdi.ingestion

Data ingestion modules for OPDI pipeline.

Provides connectors for loading data from external sources: OpenSky Network state vectors, aircraft database, and OurAirports.

class opdi.ingestion.StateVectorIngestion(spark, config, local_download_path='OPDI_live/data/ec-datadump', log_file_path='OPDI_live/logs/01_osn_statevectors_etl.log')[source]

Bases: object

Handles ingestion of OpenSky Network state vectors from MinIO storage.

This class manages the complete workflow of downloading state vector parquet files from OpenSky’s S3-compatible MinIO server, processing them, and writing to Iceberg tables with daily partitioning.

Parameters:
COLUMN_MAPPING = {'alert': 'alert', 'baroAltitude': 'baro_altitude', 'callsign': 'callsign', 'eventTime': 'event_time', 'geoAltitude': 'geo_altitude', 'heading': 'heading', 'icao24': 'icao24', 'lastContact': 'last_contact', 'lastPosUpdate': 'last_pos_update', 'lat': 'lat', 'lon': 'lon', 'onGround': 'on_ground', 'serials': 'serials', 'spi': 'spi', 'squawk': 'squawk', 'velocity': 'velocity', 'vertRate': 'vert_rate'}
__init__(spark, config, local_download_path='OPDI_live/data/ec-datadump', log_file_path='OPDI_live/logs/01_osn_statevectors_etl.log')[source]

Initialize state vector ingestion.

Parameters:
  • spark (pyspark.sql.SparkSession) – Active SparkSession

  • config (OPDIConfig) – OPDI configuration object

  • local_download_path (str) – Local directory for temporary file downloads

  • log_file_path (str) – Path to file tracking processed files

setup_minio_client()[source]

Set up MinIO client (mc) for accessing OpenSky Network data.

Requires OSN_USERNAME and OSN_KEY environment variables to be set.

Returns:

True if setup successful, False otherwise

Raises:

EnvironmentError – If OSN credentials are not set

Return type:

bool

list_available_files(year_filter=None)[source]

List available state vector files on OpenSky MinIO server.

Parameters:

year_filter (List[int] | None) – List of years to filter (e.g., [2024, 2025]). If None, uses 2022-2026.

Returns:

List of file paths on MinIO server

Return type:

List[str]

load_processed_files()[source]

Load the set of already processed files from log.

Returns:

Set of processed file names

Return type:

Set[str]

mark_files_processed(file_names)[source]

Mark files as processed by appending to log file.

Parameters:

file_names (List[str]) – List of file names to mark as processed

Return type:

None

remove_partial_files()[source]

Remove partially downloaded files (*.parquet.part.minio).

MinIO creates .part files during download that may remain if download is interrupted. This cleans them up before processing.

Return type:

None

download_files(file_paths)[source]

Download files from MinIO to local storage.

Parameters:

file_paths (List[str]) – List of full MinIO file paths to download

Returns:

List of successfully downloaded file names

Return type:

List[str]

process_and_write_batch(file_names)[source]

Process downloaded files and write to Iceberg table.

Parameters:

file_names (List[str]) – List of file names to process (must be in local_download_path)

Return type:

None

cleanup_local_files(file_names)[source]

Delete local files after successful processing to save disk space.

Parameters:

file_names (List[str]) – List of file names to delete

Return type:

None

ingest(year_filter=None, dry_run=False)[source]

Run the complete ingestion workflow.

Downloads state vectors in batches, processes them, and writes to Iceberg.

Parameters:
  • year_filter (List[int] | None) – List of years to process (e.g., [2024, 2025])

  • dry_run (bool) – If True, only list files without downloading/processing

Returns:

{‘files_processed’: N, ‘files_skipped’: M}

Return type:

Dictionary with statistics

Example

>>> from opdi.ingestion import StateVectorIngestion
>>> from opdi.utils.spark_helpers import get_spark
>>> from opdi.config import OPDIConfig
>>>
>>> config = OPDIConfig.for_environment("live")
>>> spark = get_spark("live", "State Vector Ingestion")
>>> ingestion = StateVectorIngestion(spark, config)
>>> stats = ingestion.ingest(year_filter=[2024])
create_table_if_not_exists()[source]

Create the osn_statevectors_v2 Iceberg table if it doesn’t exist.

This should be run once before first ingestion.

Return type:

None

class opdi.ingestion.AircraftDatabaseIngestion(spark, config, url=None)[source]

Bases: object

Handles ingestion of OpenSky Network aircraft database.

The aircraft database contains metadata about aircraft including: - ICAO 24-bit address - Registration number - Aircraft model and type - Operator information

Parameters:
__init__(spark, config, url=None)[source]

Initialize aircraft database ingestion.

Parameters:
  • spark (pyspark.sql.SparkSession) – Active SparkSession

  • config (OPDIConfig) – OPDI configuration object

  • url (str | None) – URL to aircraft database CSV. If None, uses config default

download_and_convert()[source]

Download aircraft database CSV and convert to Spark DataFrame.

Returns:

Spark DataFrame with aircraft metadata

Raises:

Exception – If download or conversion fails

Return type:

pyspark.sql.DataFrame

write_to_table(df, mode='append')[source]

Write aircraft database to Iceberg table.

Parameters:
Return type:

None

create_table_if_not_exists()[source]

Create the osn_aircraft_db Iceberg table if it doesn’t exist.

This should be run once before first ingestion.

Return type:

None

ingest(mode='append')[source]

Run the complete ingestion workflow.

Downloads the aircraft database and writes to Iceberg table.

Parameters:

mode (str) – Write mode - “append” or “overwrite”

Returns:

Number of records ingested

Return type:

int

Example

>>> from opdi.ingestion import AircraftDatabaseIngestion
>>> from opdi.utils.spark_helpers import get_spark
>>> from opdi.config import OPDIConfig
>>>
>>> config = OPDIConfig.for_environment("live")
>>> spark = get_spark("live", "Aircraft DB Ingestion")
>>> ingestion = AircraftDatabaseIngestion(spark, config)
>>> count = ingestion.ingest(mode="overwrite")
>>> print(f"Ingested {count} aircraft records")
get_aircraft_info(icao24)[source]

Look up aircraft information by ICAO 24-bit address.

Parameters:

icao24 (str) – ICAO 24-bit transponder ID (e.g., “a12b34”)

Returns:

Dictionary with aircraft info, or None if not found

Return type:

dict | None

Example

>>> ingestion = AircraftDatabaseIngestion(spark, config)
>>> info = ingestion.get_aircraft_info("a12b34")
>>> if info:
...     print(f"Registration: {info['registration']}")
class opdi.ingestion.OurAirportsIngestion(spark, config, target_database='project_aiu', temp_dir='.')[source]

Bases: object

ETL pipeline for OurAirports reference data.

Downloads CSV datasets from OurAirports, loads them into Spark DataFrames with proper schemas, and writes them to database tables.

Handles 6 datasets: airports, runways, navaids, airport_frequencies, countries, and regions.

Parameters:
  • spark (pyspark.sql.SparkSession) – Active SparkSession.

  • config (OPDIConfig) – OPDI configuration object.

  • target_database (str) – Database to write tables to (default: ‘project_aiu’).

  • temp_dir (str) – Directory for temporary CSV downloads.

Example

>>> ingestion = OurAirportsIngestion(spark, config)
>>> ingestion.ingest_all()
create_tables(drop_existing=False)[source]

Create all OurAirports tables.

Parameters:

drop_existing (bool) – If True, drop and recreate existing tables.

Return type:

None

ingest_dataset(name, url=None)[source]

Download and ingest a single OurAirports dataset.

Parameters:
  • name (str) – Dataset name (airports, runways, navaids, etc.).

  • url (str | None) – Override URL for the CSV download.

Returns:

Number of rows ingested.

Return type:

int

ingest_all(urls=None, drop_existing=True)[source]

Download and ingest all OurAirports datasets.

Parameters:
  • urls (Dict[str, str] | None) – Override URLs for each dataset.

  • drop_existing (bool) – If True, drop and recreate tables first.

Returns:

Dictionary mapping dataset names to row counts.

Return type:

Dict[str, int]

Example

>>> ingestion = OurAirportsIngestion(spark, config)
>>> stats = ingestion.ingest_all()
>>> print(stats)
{'airports': 76543, 'runways': 45678, ...}

State Vectors – opdi.ingestion.osn_statevectors

OpenSky Network state vectors ingestion module.

Downloads and processes state vector data from OpenSky Network’s MinIO server and writes to Iceberg tables with proper partitioning.

class opdi.ingestion.osn_statevectors.StateVectorIngestion(spark, config, local_download_path='OPDI_live/data/ec-datadump', log_file_path='OPDI_live/logs/01_osn_statevectors_etl.log')[source]

Bases: object

Handles ingestion of OpenSky Network state vectors from MinIO storage.

This class manages the complete workflow of downloading state vector parquet files from OpenSky’s S3-compatible MinIO server, processing them, and writing to Iceberg tables with daily partitioning.

Parameters:
COLUMN_MAPPING = {'alert': 'alert', 'baroAltitude': 'baro_altitude', 'callsign': 'callsign', 'eventTime': 'event_time', 'geoAltitude': 'geo_altitude', 'heading': 'heading', 'icao24': 'icao24', 'lastContact': 'last_contact', 'lastPosUpdate': 'last_pos_update', 'lat': 'lat', 'lon': 'lon', 'onGround': 'on_ground', 'serials': 'serials', 'spi': 'spi', 'squawk': 'squawk', 'velocity': 'velocity', 'vertRate': 'vert_rate'}
__init__(spark, config, local_download_path='OPDI_live/data/ec-datadump', log_file_path='OPDI_live/logs/01_osn_statevectors_etl.log')[source]

Initialize state vector ingestion.

Parameters:
  • spark (pyspark.sql.SparkSession) – Active SparkSession

  • config (OPDIConfig) – OPDI configuration object

  • local_download_path (str) – Local directory for temporary file downloads

  • log_file_path (str) – Path to file tracking processed files

setup_minio_client()[source]

Set up MinIO client (mc) for accessing OpenSky Network data.

Requires OSN_USERNAME and OSN_KEY environment variables to be set.

Returns:

True if setup successful, False otherwise

Raises:

EnvironmentError – If OSN credentials are not set

Return type:

bool

list_available_files(year_filter=None)[source]

List available state vector files on OpenSky MinIO server.

Parameters:

year_filter (List[int] | None) – List of years to filter (e.g., [2024, 2025]). If None, uses 2022-2026.

Returns:

List of file paths on MinIO server

Return type:

List[str]

load_processed_files()[source]

Load the set of already processed files from log.

Returns:

Set of processed file names

Return type:

Set[str]

mark_files_processed(file_names)[source]

Mark files as processed by appending to log file.

Parameters:

file_names (List[str]) – List of file names to mark as processed

Return type:

None

remove_partial_files()[source]

Remove partially downloaded files (*.parquet.part.minio).

MinIO creates .part files during download that may remain if download is interrupted. This cleans them up before processing.

Return type:

None

download_files(file_paths)[source]

Download files from MinIO to local storage.

Parameters:

file_paths (List[str]) – List of full MinIO file paths to download

Returns:

List of successfully downloaded file names

Return type:

List[str]

process_and_write_batch(file_names)[source]

Process downloaded files and write to Iceberg table.

Parameters:

file_names (List[str]) – List of file names to process (must be in local_download_path)

Return type:

None

cleanup_local_files(file_names)[source]

Delete local files after successful processing to save disk space.

Parameters:

file_names (List[str]) – List of file names to delete

Return type:

None

ingest(year_filter=None, dry_run=False)[source]

Run the complete ingestion workflow.

Downloads state vectors in batches, processes them, and writes to Iceberg.

Parameters:
  • year_filter (List[int] | None) – List of years to process (e.g., [2024, 2025])

  • dry_run (bool) – If True, only list files without downloading/processing

Returns:

{‘files_processed’: N, ‘files_skipped’: M}

Return type:

Dictionary with statistics

Example

>>> from opdi.ingestion import StateVectorIngestion
>>> from opdi.utils.spark_helpers import get_spark
>>> from opdi.config import OPDIConfig
>>>
>>> config = OPDIConfig.for_environment("live")
>>> spark = get_spark("live", "State Vector Ingestion")
>>> ingestion = StateVectorIngestion(spark, config)
>>> stats = ingestion.ingest(year_filter=[2024])
create_table_if_not_exists()[source]

Create the osn_statevectors_v2 Iceberg table if it doesn’t exist.

This should be run once before first ingestion.

Return type:

None

Aircraft Database – opdi.ingestion.osn_aircraft_db

OpenSky Network aircraft database ingestion module.

Downloads aircraft metadata from OpenSky Network and loads into Iceberg tables.

class opdi.ingestion.osn_aircraft_db.AircraftDatabaseIngestion(spark, config, url=None)[source]

Bases: object

Handles ingestion of OpenSky Network aircraft database.

The aircraft database contains metadata about aircraft including: - ICAO 24-bit address - Registration number - Aircraft model and type - Operator information

Parameters:
__init__(spark, config, url=None)[source]

Initialize aircraft database ingestion.

Parameters:
  • spark (pyspark.sql.SparkSession) – Active SparkSession

  • config (OPDIConfig) – OPDI configuration object

  • url (str | None) – URL to aircraft database CSV. If None, uses config default

download_and_convert()[source]

Download aircraft database CSV and convert to Spark DataFrame.

Returns:

Spark DataFrame with aircraft metadata

Raises:

Exception – If download or conversion fails

Return type:

pyspark.sql.DataFrame

write_to_table(df, mode='append')[source]

Write aircraft database to Iceberg table.

Parameters:
Return type:

None

create_table_if_not_exists()[source]

Create the osn_aircraft_db Iceberg table if it doesn’t exist.

This should be run once before first ingestion.

Return type:

None

ingest(mode='append')[source]

Run the complete ingestion workflow.

Downloads the aircraft database and writes to Iceberg table.

Parameters:

mode (str) – Write mode - “append” or “overwrite”

Returns:

Number of records ingested

Return type:

int

Example

>>> from opdi.ingestion import AircraftDatabaseIngestion
>>> from opdi.utils.spark_helpers import get_spark
>>> from opdi.config import OPDIConfig
>>>
>>> config = OPDIConfig.for_environment("live")
>>> spark = get_spark("live", "Aircraft DB Ingestion")
>>> ingestion = AircraftDatabaseIngestion(spark, config)
>>> count = ingestion.ingest(mode="overwrite")
>>> print(f"Ingested {count} aircraft records")
get_aircraft_info(icao24)[source]

Look up aircraft information by ICAO 24-bit address.

Parameters:

icao24 (str) – ICAO 24-bit transponder ID (e.g., “a12b34”)

Returns:

Dictionary with aircraft info, or None if not found

Return type:

dict | None

Example

>>> ingestion = AircraftDatabaseIngestion(spark, config)
>>> info = ingestion.get_aircraft_info("a12b34")
>>> if info:
...     print(f"Registration: {info['registration']}")

OurAirports – opdi.ingestion.ourairports

OurAirports reference data ingestion.

Downloads and loads airport reference datasets from OurAirports into Spark tables. Covers airports, runways, navaids, frequencies, countries, and regions.

Ported from: OPDI-live/python/v2.0.0/00_etl_ourairports.py

class opdi.ingestion.ourairports.OurAirportsIngestion(spark, config, target_database='project_aiu', temp_dir='.')[source]

Bases: object

ETL pipeline for OurAirports reference data.

Downloads CSV datasets from OurAirports, loads them into Spark DataFrames with proper schemas, and writes them to database tables.

Handles 6 datasets: airports, runways, navaids, airport_frequencies, countries, and regions.

Parameters:
  • spark (pyspark.sql.SparkSession) – Active SparkSession.

  • config (OPDIConfig) – OPDI configuration object.

  • target_database (str) – Database to write tables to (default: ‘project_aiu’).

  • temp_dir (str) – Directory for temporary CSV downloads.

Example

>>> ingestion = OurAirportsIngestion(spark, config)
>>> ingestion.ingest_all()
__init__(spark, config, target_database='project_aiu', temp_dir='.')[source]
Parameters:
create_tables(drop_existing=False)[source]

Create all OurAirports tables.

Parameters:

drop_existing (bool) – If True, drop and recreate existing tables.

Return type:

None

ingest_dataset(name, url=None)[source]

Download and ingest a single OurAirports dataset.

Parameters:
  • name (str) – Dataset name (airports, runways, navaids, etc.).

  • url (str | None) – Override URL for the CSV download.

Returns:

Number of rows ingested.

Return type:

int

ingest_all(urls=None, drop_existing=True)[source]

Download and ingest all OurAirports datasets.

Parameters:
  • urls (Dict[str, str] | None) – Override URLs for each dataset.

  • drop_existing (bool) – If True, drop and recreate tables first.

Returns:

Dictionary mapping dataset names to row counts.

Return type:

Dict[str, int]

Example

>>> ingestion = OurAirportsIngestion(spark, config)
>>> stats = ingestion.ingest_all()
>>> print(stats)
{'airports': 76543, 'runways': 45678, ...}