Ingestion – opdi.ingestion¶
Data ingestion modules for OPDI pipeline.
Provides connectors for loading data from external sources: OpenSky Network state vectors, aircraft database, and OurAirports.
- class opdi.ingestion.StateVectorIngestion(spark, config, local_download_path='OPDI_live/data/ec-datadump', log_file_path='OPDI_live/logs/01_osn_statevectors_etl.log')[source]¶
Bases:
objectHandles ingestion of OpenSky Network state vectors from MinIO storage.
This class manages the complete workflow of downloading state vector parquet files from OpenSky’s S3-compatible MinIO server, processing them, and writing to Iceberg tables with daily partitioning.
- Parameters:
spark (pyspark.sql.SparkSession)
config (OPDIConfig)
local_download_path (str)
log_file_path (str)
- COLUMN_MAPPING = {'alert': 'alert', 'baroAltitude': 'baro_altitude', 'callsign': 'callsign', 'eventTime': 'event_time', 'geoAltitude': 'geo_altitude', 'heading': 'heading', 'icao24': 'icao24', 'lastContact': 'last_contact', 'lastPosUpdate': 'last_pos_update', 'lat': 'lat', 'lon': 'lon', 'onGround': 'on_ground', 'serials': 'serials', 'spi': 'spi', 'squawk': 'squawk', 'velocity': 'velocity', 'vertRate': 'vert_rate'}¶
- __init__(spark, config, local_download_path='OPDI_live/data/ec-datadump', log_file_path='OPDI_live/logs/01_osn_statevectors_etl.log')[source]¶
Initialize state vector ingestion.
- Parameters:
spark (pyspark.sql.SparkSession) – Active SparkSession
config (OPDIConfig) – OPDI configuration object
local_download_path (str) – Local directory for temporary file downloads
log_file_path (str) – Path to file tracking processed files
- setup_minio_client()[source]¶
Set up MinIO client (mc) for accessing OpenSky Network data.
Requires OSN_USERNAME and OSN_KEY environment variables to be set.
- Returns:
True if setup successful, False otherwise
- Raises:
EnvironmentError – If OSN credentials are not set
- Return type:
- list_available_files(year_filter=None)[source]¶
List available state vector files on OpenSky MinIO server.
- remove_partial_files()[source]¶
Remove partially downloaded files (
*.parquet.part.minio).MinIO creates
.partfiles during download that may remain if download is interrupted. This cleans them up before processing.- Return type:
None
- cleanup_local_files(file_names)[source]¶
Delete local files after successful processing to save disk space.
- ingest(year_filter=None, dry_run=False)[source]¶
Run the complete ingestion workflow.
Downloads state vectors in batches, processes them, and writes to Iceberg.
- Parameters:
- Returns:
{‘files_processed’: N, ‘files_skipped’: M}
- Return type:
Dictionary with statistics
Example
>>> from opdi.ingestion import StateVectorIngestion >>> from opdi.utils.spark_helpers import get_spark >>> from opdi.config import OPDIConfig >>> >>> config = OPDIConfig.for_environment("live") >>> spark = get_spark("live", "State Vector Ingestion") >>> ingestion = StateVectorIngestion(spark, config) >>> stats = ingestion.ingest(year_filter=[2024])
- class opdi.ingestion.AircraftDatabaseIngestion(spark, config, url=None)[source]¶
Bases:
objectHandles ingestion of OpenSky Network aircraft database.
The aircraft database contains metadata about aircraft including: - ICAO 24-bit address - Registration number - Aircraft model and type - Operator information
- Parameters:
spark (pyspark.sql.SparkSession)
config (OPDIConfig)
url (str | None)
- __init__(spark, config, url=None)[source]¶
Initialize aircraft database ingestion.
- Parameters:
spark (pyspark.sql.SparkSession) – Active SparkSession
config (OPDIConfig) – OPDI configuration object
url (str | None) – URL to aircraft database CSV. If None, uses config default
- download_and_convert()[source]¶
Download aircraft database CSV and convert to Spark DataFrame.
- Returns:
Spark DataFrame with aircraft metadata
- Raises:
Exception – If download or conversion fails
- Return type:
- write_to_table(df, mode='append')[source]¶
Write aircraft database to Iceberg table.
- Parameters:
df (pyspark.sql.DataFrame) – DataFrame to write
mode (str) – Write mode - “append” or “overwrite”
- Return type:
None
- create_table_if_not_exists()[source]¶
Create the osn_aircraft_db Iceberg table if it doesn’t exist.
This should be run once before first ingestion.
- Return type:
None
- ingest(mode='append')[source]¶
Run the complete ingestion workflow.
Downloads the aircraft database and writes to Iceberg table.
- Parameters:
mode (str) – Write mode - “append” or “overwrite”
- Returns:
Number of records ingested
- Return type:
Example
>>> from opdi.ingestion import AircraftDatabaseIngestion >>> from opdi.utils.spark_helpers import get_spark >>> from opdi.config import OPDIConfig >>> >>> config = OPDIConfig.for_environment("live") >>> spark = get_spark("live", "Aircraft DB Ingestion") >>> ingestion = AircraftDatabaseIngestion(spark, config) >>> count = ingestion.ingest(mode="overwrite") >>> print(f"Ingested {count} aircraft records")
- get_aircraft_info(icao24)[source]¶
Look up aircraft information by ICAO 24-bit address.
- Parameters:
icao24 (str) – ICAO 24-bit transponder ID (e.g., “a12b34”)
- Returns:
Dictionary with aircraft info, or None if not found
- Return type:
dict | None
Example
>>> ingestion = AircraftDatabaseIngestion(spark, config) >>> info = ingestion.get_aircraft_info("a12b34") >>> if info: ... print(f"Registration: {info['registration']}")
- class opdi.ingestion.OurAirportsIngestion(spark, config, target_database='project_aiu', temp_dir='.')[source]¶
Bases:
objectETL pipeline for OurAirports reference data.
Downloads CSV datasets from OurAirports, loads them into Spark DataFrames with proper schemas, and writes them to database tables.
Handles 6 datasets: airports, runways, navaids, airport_frequencies, countries, and regions.
- Parameters:
spark (pyspark.sql.SparkSession) – Active SparkSession.
config (OPDIConfig) – OPDI configuration object.
target_database (str) – Database to write tables to (default: ‘project_aiu’).
temp_dir (str) – Directory for temporary CSV downloads.
Example
>>> ingestion = OurAirportsIngestion(spark, config) >>> ingestion.ingest_all()
- create_tables(drop_existing=False)[source]¶
Create all OurAirports tables.
- Parameters:
drop_existing (bool) – If True, drop and recreate existing tables.
- Return type:
None
- ingest_all(urls=None, drop_existing=True)[source]¶
Download and ingest all OurAirports datasets.
- Parameters:
- Returns:
Dictionary mapping dataset names to row counts.
- Return type:
Example
>>> ingestion = OurAirportsIngestion(spark, config) >>> stats = ingestion.ingest_all() >>> print(stats) {'airports': 76543, 'runways': 45678, ...}
State Vectors – opdi.ingestion.osn_statevectors¶
OpenSky Network state vectors ingestion module.
Downloads and processes state vector data from OpenSky Network’s MinIO server and writes to Iceberg tables with proper partitioning.
- class opdi.ingestion.osn_statevectors.StateVectorIngestion(spark, config, local_download_path='OPDI_live/data/ec-datadump', log_file_path='OPDI_live/logs/01_osn_statevectors_etl.log')[source]¶
Bases:
objectHandles ingestion of OpenSky Network state vectors from MinIO storage.
This class manages the complete workflow of downloading state vector parquet files from OpenSky’s S3-compatible MinIO server, processing them, and writing to Iceberg tables with daily partitioning.
- Parameters:
spark (pyspark.sql.SparkSession)
config (OPDIConfig)
local_download_path (str)
log_file_path (str)
- COLUMN_MAPPING = {'alert': 'alert', 'baroAltitude': 'baro_altitude', 'callsign': 'callsign', 'eventTime': 'event_time', 'geoAltitude': 'geo_altitude', 'heading': 'heading', 'icao24': 'icao24', 'lastContact': 'last_contact', 'lastPosUpdate': 'last_pos_update', 'lat': 'lat', 'lon': 'lon', 'onGround': 'on_ground', 'serials': 'serials', 'spi': 'spi', 'squawk': 'squawk', 'velocity': 'velocity', 'vertRate': 'vert_rate'}¶
- __init__(spark, config, local_download_path='OPDI_live/data/ec-datadump', log_file_path='OPDI_live/logs/01_osn_statevectors_etl.log')[source]¶
Initialize state vector ingestion.
- Parameters:
spark (pyspark.sql.SparkSession) – Active SparkSession
config (OPDIConfig) – OPDI configuration object
local_download_path (str) – Local directory for temporary file downloads
log_file_path (str) – Path to file tracking processed files
- setup_minio_client()[source]¶
Set up MinIO client (mc) for accessing OpenSky Network data.
Requires OSN_USERNAME and OSN_KEY environment variables to be set.
- Returns:
True if setup successful, False otherwise
- Raises:
EnvironmentError – If OSN credentials are not set
- Return type:
- list_available_files(year_filter=None)[source]¶
List available state vector files on OpenSky MinIO server.
- remove_partial_files()[source]¶
Remove partially downloaded files (
*.parquet.part.minio).MinIO creates
.partfiles during download that may remain if download is interrupted. This cleans them up before processing.- Return type:
None
- cleanup_local_files(file_names)[source]¶
Delete local files after successful processing to save disk space.
- ingest(year_filter=None, dry_run=False)[source]¶
Run the complete ingestion workflow.
Downloads state vectors in batches, processes them, and writes to Iceberg.
- Parameters:
- Returns:
{‘files_processed’: N, ‘files_skipped’: M}
- Return type:
Dictionary with statistics
Example
>>> from opdi.ingestion import StateVectorIngestion >>> from opdi.utils.spark_helpers import get_spark >>> from opdi.config import OPDIConfig >>> >>> config = OPDIConfig.for_environment("live") >>> spark = get_spark("live", "State Vector Ingestion") >>> ingestion = StateVectorIngestion(spark, config) >>> stats = ingestion.ingest(year_filter=[2024])
Aircraft Database – opdi.ingestion.osn_aircraft_db¶
OpenSky Network aircraft database ingestion module.
Downloads aircraft metadata from OpenSky Network and loads into Iceberg tables.
- class opdi.ingestion.osn_aircraft_db.AircraftDatabaseIngestion(spark, config, url=None)[source]¶
Bases:
objectHandles ingestion of OpenSky Network aircraft database.
The aircraft database contains metadata about aircraft including: - ICAO 24-bit address - Registration number - Aircraft model and type - Operator information
- Parameters:
spark (pyspark.sql.SparkSession)
config (OPDIConfig)
url (str | None)
- __init__(spark, config, url=None)[source]¶
Initialize aircraft database ingestion.
- Parameters:
spark (pyspark.sql.SparkSession) – Active SparkSession
config (OPDIConfig) – OPDI configuration object
url (str | None) – URL to aircraft database CSV. If None, uses config default
- download_and_convert()[source]¶
Download aircraft database CSV and convert to Spark DataFrame.
- Returns:
Spark DataFrame with aircraft metadata
- Raises:
Exception – If download or conversion fails
- Return type:
- write_to_table(df, mode='append')[source]¶
Write aircraft database to Iceberg table.
- Parameters:
df (pyspark.sql.DataFrame) – DataFrame to write
mode (str) – Write mode - “append” or “overwrite”
- Return type:
None
- create_table_if_not_exists()[source]¶
Create the osn_aircraft_db Iceberg table if it doesn’t exist.
This should be run once before first ingestion.
- Return type:
None
- ingest(mode='append')[source]¶
Run the complete ingestion workflow.
Downloads the aircraft database and writes to Iceberg table.
- Parameters:
mode (str) – Write mode - “append” or “overwrite”
- Returns:
Number of records ingested
- Return type:
Example
>>> from opdi.ingestion import AircraftDatabaseIngestion >>> from opdi.utils.spark_helpers import get_spark >>> from opdi.config import OPDIConfig >>> >>> config = OPDIConfig.for_environment("live") >>> spark = get_spark("live", "Aircraft DB Ingestion") >>> ingestion = AircraftDatabaseIngestion(spark, config) >>> count = ingestion.ingest(mode="overwrite") >>> print(f"Ingested {count} aircraft records")
- get_aircraft_info(icao24)[source]¶
Look up aircraft information by ICAO 24-bit address.
- Parameters:
icao24 (str) – ICAO 24-bit transponder ID (e.g., “a12b34”)
- Returns:
Dictionary with aircraft info, or None if not found
- Return type:
dict | None
Example
>>> ingestion = AircraftDatabaseIngestion(spark, config) >>> info = ingestion.get_aircraft_info("a12b34") >>> if info: ... print(f"Registration: {info['registration']}")
OurAirports – opdi.ingestion.ourairports¶
OurAirports reference data ingestion.
Downloads and loads airport reference datasets from OurAirports into Spark tables. Covers airports, runways, navaids, frequencies, countries, and regions.
Ported from: OPDI-live/python/v2.0.0/00_etl_ourairports.py
- class opdi.ingestion.ourairports.OurAirportsIngestion(spark, config, target_database='project_aiu', temp_dir='.')[source]¶
Bases:
objectETL pipeline for OurAirports reference data.
Downloads CSV datasets from OurAirports, loads them into Spark DataFrames with proper schemas, and writes them to database tables.
Handles 6 datasets: airports, runways, navaids, airport_frequencies, countries, and regions.
- Parameters:
spark (pyspark.sql.SparkSession) – Active SparkSession.
config (OPDIConfig) – OPDI configuration object.
target_database (str) – Database to write tables to (default: ‘project_aiu’).
temp_dir (str) – Directory for temporary CSV downloads.
Example
>>> ingestion = OurAirportsIngestion(spark, config) >>> ingestion.ingest_all()
- __init__(spark, config, target_database='project_aiu', temp_dir='.')[source]¶
- Parameters:
spark (pyspark.sql.SparkSession)
config (OPDIConfig)
target_database (str)
temp_dir (str)
- create_tables(drop_existing=False)[source]¶
Create all OurAirports tables.
- Parameters:
drop_existing (bool) – If True, drop and recreate existing tables.
- Return type:
None
- ingest_all(urls=None, drop_existing=True)[source]¶
Download and ingest all OurAirports datasets.
- Parameters:
- Returns:
Dictionary mapping dataset names to row counts.
- Return type:
Example
>>> ingestion = OurAirportsIngestion(spark, config) >>> stats = ingestion.ingest_all() >>> print(stats) {'airports': 76543, 'runways': 45678, ...}