API Reference

Complete reference documentation auto-generated from the opdi source code. All modules use Google-style docstrings.

Modules

Package-Level Exports

OPDI - Open Performance Data Initiative

A Python package for processing OpenSky Network aviation data through modular ETL pipelines. Provides ingestion, transformation, and output layers for European air-traffic data analysis.

Quick start:

from opdi.config import OPDIConfig
from opdi.utils.spark_helpers import get_spark

config = OPDIConfig.for_environment("dev")
spark  = get_spark("dev")

Full pipeline:

from opdi.runner import run_pipeline
run_pipeline(env="live", start_date=date(2024, 1, 1), end_date=date(2024, 6, 1))
class opdi.OPDIConfig(project=<factory>, spark=<factory>, h3=<factory>, ingestion=<factory>)[source]

Bases: object

Main OPDI configuration container.

Parameters:
project: ProjectConfig
spark: SparkConfig
h3: H3Config
ingestion: IngestionConfig
classmethod for_environment(env='dev')[source]

Create configuration for specific environment.

Parameters:

env (str) – Environment name (“dev”, “live”, or “local”)

Returns:

OPDIConfig instance with environment-specific settings

Return type:

OPDIConfig

classmethod default()[source]

Create default configuration (dev environment).

Return type:

OPDIConfig

class opdi.ProjectConfig(project_name='project_opdi', warehouse_path='abfs://storage-fs@cdpdllive.dfs.core.windows.net/data/project/opdi.db/unmanaged', hadoop_filesystem='abfs://storage-fs@cdpdllive.dfs.core.windows.net/data/project/opdi.db/unmanaged')[source]

Bases: object

Project-level configuration.

Parameters:
  • project_name (str)

  • warehouse_path (str)

  • hadoop_filesystem (str)

project_name: str = 'project_opdi'

Database/catalog name for Iceberg tables.

warehouse_path: str = 'abfs://storage-fs@cdpdllive.dfs.core.windows.net/data/project/opdi.db/unmanaged'

Warehouse path for Iceberg catalog.

hadoop_filesystem: str = 'abfs://storage-fs@cdpdllive.dfs.core.windows.net/data/project/opdi.db/unmanaged'

Hadoop filesystem path for Kerberos access.

class opdi.SparkConfig(app_name='OPDI Pipeline', driver_cores='1', driver_memory='8G', driver_max_result_size='6g', executor_memory='12G', executor_memory_overhead='3G', executor_cores='2', executor_instances='3', dynamic_allocation_max_executors='10', network_timeout='800s', executor_heartbeat_interval='400s', shuffle_compress='true', shuffle_spill_compress='true', ui_show_console_progress='false', iceberg_jar_path='/opt/spark/optional-lib/iceberg-spark-runtime-3.5_2.12-1.5.2.1.23.17218.0-1.jar', handle_timestamp_without_timezone='true', hadoop_group='eur-app-opdi')[source]

Bases: object

Spark session configuration.

Parameters:
  • app_name (str)

  • driver_cores (str)

  • driver_memory (str)

  • driver_max_result_size (str)

  • executor_memory (str)

  • executor_memory_overhead (str)

  • executor_cores (str)

  • executor_instances (str)

  • dynamic_allocation_max_executors (str)

  • network_timeout (str)

  • executor_heartbeat_interval (str)

  • shuffle_compress (str)

  • shuffle_spill_compress (str)

  • ui_show_console_progress (str)

  • iceberg_jar_path (str)

  • handle_timestamp_without_timezone (str)

  • hadoop_group (str)

app_name: str = 'OPDI Pipeline'

Spark application name.

driver_cores: str = '1'
driver_memory: str = '8G'
driver_max_result_size: str = '6g'
executor_memory: str = '12G'
executor_memory_overhead: str = '3G'
executor_cores: str = '2'
executor_instances: str = '3'
dynamic_allocation_max_executors: str = '10'
network_timeout: str = '800s'
executor_heartbeat_interval: str = '400s'
shuffle_compress: str = 'true'
shuffle_spill_compress: str = 'true'
ui_show_console_progress: str = 'false'
iceberg_jar_path: str = '/opt/spark/optional-lib/iceberg-spark-runtime-3.5_2.12-1.5.2.1.23.17218.0-1.jar'

Path to Iceberg Spark runtime JAR.

handle_timestamp_without_timezone: str = 'true'

Handle timestamps without timezone in Iceberg.

hadoop_group: str = 'eur-app-opdi'

Required group for Azure filesystem access.

to_spark_config(project_config)[source]

Convert to Spark configuration dictionary.

Parameters:

project_config (ProjectConfig) – Project configuration for warehouse paths

Returns:

Dictionary of Spark configuration key-value pairs

Return type:

Dict[str, str]

class opdi.H3Config(airport_detection_resolution=7, airport_layout_resolution=12, track_resolutions=<factory>, airspace_resolution=7)[source]

Bases: object

H3 hexagonal indexing configuration.

Parameters:
  • airport_detection_resolution (int)

  • airport_layout_resolution (int)

  • track_resolutions (List[int])

  • airspace_resolution (int)

airport_detection_resolution: int = 7

H3 resolution for airport detection zones (~5.2 km hexagons).

airport_layout_resolution: int = 12

H3 resolution for airport ground layouts (~307 m hexagons).

track_resolutions: List[int]

H3 resolutions for track encoding.

airspace_resolution: int = 7

H3 resolution for airspace encoding.

class opdi.IngestionConfig(minio_endpoint='https://s3.opensky-network.org', osn_aircraft_db_url='https://s3.opensky-network.org/data-samples/metadata/aircraft-database-complete-2024-10.csv', ourairports_base_url='https://ourairports.com/data/', ourairports_datasets=<factory>, batch_size=250, track_gap_threshold_minutes=30, track_gap_low_altitude_minutes=15, track_gap_low_altitude_meters=1524.0, max_vertical_rate_mps=25.4, altitude_smoothing_window_minutes=5)[source]

Bases: object

Data ingestion configuration.

Parameters:
  • minio_endpoint (str)

  • osn_aircraft_db_url (str)

  • ourairports_base_url (str)

  • ourairports_datasets (Dict[str, str])

  • batch_size (int)

  • track_gap_threshold_minutes (int)

  • track_gap_low_altitude_minutes (int)

  • track_gap_low_altitude_meters (float)

  • max_vertical_rate_mps (float)

  • altitude_smoothing_window_minutes (int)

minio_endpoint: str = 'https://s3.opensky-network.org'

MinIO endpoint for OpenSky Network data.

osn_aircraft_db_url: str = 'https://s3.opensky-network.org/data-samples/metadata/aircraft-database-complete-2024-10.csv'

URL for OpenSky Network aircraft database.

ourairports_base_url: str = 'https://ourairports.com/data/'

Base URL for OurAirports CSV datasets.

ourairports_datasets: Dict[str, str]

OurAirports dataset filenames.

batch_size: int = 250

Number of files to process in a single batch (state vectors).

track_gap_threshold_minutes: int = 30

Time gap threshold for splitting tracks (minutes).

track_gap_low_altitude_minutes: int = 15

Time gap threshold at low altitude for splitting tracks (minutes).

track_gap_low_altitude_meters: float = 1524.0

Altitude threshold for low altitude track splitting (meters, ~5000 ft).

max_vertical_rate_mps: float = 25.4

Maximum realistic vertical rate in m/s (~5000 ft/min).

altitude_smoothing_window_minutes: int = 5

Window size for altitude smoothing (minutes).