Pipeline – opdi.pipeline¶
Core pipeline transformation modules for OPDI.
Provides the main data transformation pipeline: - Track creation from raw state vectors - Flight list generation (departures, arrivals, overflights) - Flight event detection (phases, crossings, airport events)
- class opdi.pipeline.TrackProcessor(spark, config, log_file_path='OPDI_live/logs/02_osn-tracks-etl-log.parquet')[source]¶
Bases:
objectProcesses raw state vectors into structured flight tracks.
This class implements the core track creation logic including: - Unique track ID generation using SHA256 hashing - Track splitting based on time gaps and altitude - H3 hexagonal encoding at multiple resolutions - Cumulative distance calculation - Altitude outlier detection and smoothing
CRITICAL: The track ID generation algorithm must remain unchanged as it ensures consistency with historical data and downstream systems.
- Parameters:
spark (pyspark.sql.SparkSession)
config (OPDIConfig)
log_file_path (str)
- __init__(spark, config, log_file_path='OPDI_live/logs/02_osn-tracks-etl-log.parquet')[source]¶
Initialize track processor.
- Parameters:
spark (pyspark.sql.SparkSession) – Active SparkSession
config (OPDIConfig) – OPDI configuration object
log_file_path (str) – Path to parquet file tracking processed months
- process_month(month, skip_if_processed=True)[source]¶
Process tracks for a single month.
- Parameters:
- Return type:
None
Example
>>> from datetime import date >>> processor = TrackProcessor(spark, config) >>> processor.process_month(date(2024, 1, 1))
- process_date_range(start_month, end_month, skip_if_processed=True)[source]¶
Process tracks for a range of months.
- Parameters:
- Return type:
None
Example
>>> from datetime import date >>> processor = TrackProcessor(spark, config) >>> processor.process_date_range( ... date(2024, 1, 1), ... date(2024, 3, 1) ... )
- class opdi.pipeline.FlightListProcessor(spark, config, log_dir='OPDI_live/logs')[source]¶
Bases:
objectGenerates the OPDI flight list from processed track data.
The flight list is produced in two phases:
DAI (Departures/Arrivals/Internal) – Identifies flights with known departure and/or arrival airports by matching track points to H3 airport detection zones within 30 NM and below FL40.
Overflights – Captures remaining tracks (no airport match) that have ADS-B signals lasting at least 5 minutes.
Each flight is enriched with aircraft metadata from the OSN aircraft database (registration, model, typecode, operator).
- Parameters:
spark (pyspark.sql.SparkSession) – Active SparkSession.
config (OPDIConfig) – OPDI configuration object.
log_dir (str) – Directory for processing progress logs.
Example
>>> processor = FlightListProcessor(spark, config) >>> processor.process_date_range(date(2024, 1, 1), date(2024, 3, 1))
- MAX_FL = 40¶
- process_dai(month, airports_hex_path, skip_if_processed=True)[source]¶
Process Departures/Arrivals/Internal flights for a month.
- process_date_range(start_month, end_month, airports_hex_path, skip_if_processed=True)[source]¶
Process the complete flight list for a range of months.
Runs DAI processing first, then overflight processing for each month.
- Parameters:
- Return type:
None
Example
>>> processor = FlightListProcessor(spark, config) >>> processor.process_date_range( ... date(2024, 1, 1), ... date(2024, 6, 1), ... "data/airport_hex/zones_res7_processed.parquet" ... )
- class opdi.pipeline.FlightEventProcessor(spark, config, log_dir='OPDI_live/logs')[source]¶
Bases:
objectOrchestrates the extraction of flight events and measurements.
Combines horizontal (phase), vertical (FL crossing), airport, and first/last-seen event detection into a single processing pipeline. Writes results to opdi_flight_events and opdi_measurements tables.
- Parameters:
spark (pyspark.sql.SparkSession) – Active SparkSession.
config (OPDIConfig) – OPDI configuration object.
log_dir (str) – Directory for processing progress logs.
Example
>>> processor = FlightEventProcessor(spark, config) >>> processor.process_date_range(date(2024, 1, 1), date(2024, 6, 1))
Tracks – opdi.pipeline.tracks¶
Track processing and enrichment module.
Transforms raw state vectors into structured flight tracks with: - Track ID generation (SHA256-based) - Track splitting based on time gaps - H3 geospatial encoding - Distance calculations - Altitude cleaning
- class opdi.pipeline.tracks.TrackProcessor(spark, config, log_file_path='OPDI_live/logs/02_osn-tracks-etl-log.parquet')[source]¶
Bases:
objectProcesses raw state vectors into structured flight tracks.
This class implements the core track creation logic including: - Unique track ID generation using SHA256 hashing - Track splitting based on time gaps and altitude - H3 hexagonal encoding at multiple resolutions - Cumulative distance calculation - Altitude outlier detection and smoothing
CRITICAL: The track ID generation algorithm must remain unchanged as it ensures consistency with historical data and downstream systems.
- Parameters:
spark (pyspark.sql.SparkSession)
config (OPDIConfig)
log_file_path (str)
- __init__(spark, config, log_file_path='OPDI_live/logs/02_osn-tracks-etl-log.parquet')[source]¶
Initialize track processor.
- Parameters:
spark (pyspark.sql.SparkSession) – Active SparkSession
config (OPDIConfig) – OPDI configuration object
log_file_path (str) – Path to parquet file tracking processed months
- process_month(month, skip_if_processed=True)[source]¶
Process tracks for a single month.
- Parameters:
- Return type:
None
Example
>>> from datetime import date >>> processor = TrackProcessor(spark, config) >>> processor.process_month(date(2024, 1, 1))
- process_date_range(start_month, end_month, skip_if_processed=True)[source]¶
Process tracks for a range of months.
- Parameters:
- Return type:
None
Example
>>> from datetime import date >>> processor = TrackProcessor(spark, config) >>> processor.process_date_range( ... date(2024, 1, 1), ... date(2024, 3, 1) ... )
Flights – opdi.pipeline.flights¶
Flight list generation module.
Creates flight-level summaries from processed tracks by: 1. Detecting departures and arrivals using H3 airport zones 2. Classifying flights as take-off, landing, or overflight 3. Enriching with aircraft metadata from the OSN aircraft database 4. Producing the OPDI flight list table
Ported from: OPDI-live/python/v2.0.0/03_opdi_flight_list_v2.py
- class opdi.pipeline.flights.FlightListProcessor(spark, config, log_dir='OPDI_live/logs')[source]¶
Bases:
objectGenerates the OPDI flight list from processed track data.
The flight list is produced in two phases:
DAI (Departures/Arrivals/Internal) – Identifies flights with known departure and/or arrival airports by matching track points to H3 airport detection zones within 30 NM and below FL40.
Overflights – Captures remaining tracks (no airport match) that have ADS-B signals lasting at least 5 minutes.
Each flight is enriched with aircraft metadata from the OSN aircraft database (registration, model, typecode, operator).
- Parameters:
spark (pyspark.sql.SparkSession) – Active SparkSession.
config (OPDIConfig) – OPDI configuration object.
log_dir (str) – Directory for processing progress logs.
Example
>>> processor = FlightListProcessor(spark, config) >>> processor.process_date_range(date(2024, 1, 1), date(2024, 3, 1))
- MAX_FL = 40¶
- __init__(spark, config, log_dir='OPDI_live/logs')[source]¶
- Parameters:
spark (pyspark.sql.SparkSession)
config (OPDIConfig)
log_dir (str)
- process_dai(month, airports_hex_path, skip_if_processed=True)[source]¶
Process Departures/Arrivals/Internal flights for a month.
- process_date_range(start_month, end_month, airports_hex_path, skip_if_processed=True)[source]¶
Process the complete flight list for a range of months.
Runs DAI processing first, then overflight processing for each month.
- Parameters:
- Return type:
None
Example
>>> processor = FlightListProcessor(spark, config) >>> processor.process_date_range( ... date(2024, 1, 1), ... date(2024, 6, 1), ... "data/airport_hex/zones_res7_processed.parquet" ... )
Events – opdi.pipeline.events¶
Flight events and measurements ETL module.
Detects and records flight events (milestones) from track data.
Event types:
Horizontal segment events – flight phases (GND, CL, DE, CR, LVL), top-of-climb, top-of-descent, take-off, landing using fuzzy logic
Vertical crossing events – flight level crossings (FL50, FL70, FL100, FL245)
Airport events – entry/exit of runway, taxiway, apron via H3 layout matching
First/last seen events per track
Also produces measurement records (distance flown, time passed) linked to each event.
Ported from OPDI-live/python/v2.0.0/04_opdi_flight_events_etl.py.
- opdi.pipeline.events.zmf(column, a, b)[source]¶
Zero-order membership function (Z-shaped). Returns 1 below a, 0 above b.
- opdi.pipeline.events.gaussmf(column, mean, sigma)[source]¶
Gaussian membership function. Peak at mean, width controlled by sigma.
- opdi.pipeline.events.smf(column, a, b)[source]¶
S-shaped membership function. Returns 0 below a, 1 above b.
- opdi.pipeline.events.calculate_horizontal_segment_events(sdf_input)[source]¶
Detect flight phase transitions and horizontal segment events.
Uses fuzzy logic membership functions to classify each state vector into flight phases (GND, CL, DE, CR, LVL) based on altitude, rate of climb, and speed. Detects phase transitions to produce events: level-start, level-end, top-of-climb, top-of-descent, take-off, landing.
- Parameters:
sdf_input (pyspark.sql.DataFrame) – Input DataFrame with track data including baro_altitude_c, vert_rate, velocity.
- Returns:
track_id, type, event_time, lon, lat, altitude_ft, cumulative_distance_nm, cumulative_time_s, info.
- Return type:
DataFrame of detected events with columns
- opdi.pipeline.events.calculate_vertical_crossing_events(sdf_input)[source]¶
Detect flight level crossing events (FL50, FL70, FL100, FL245).
For each track, identifies the first and last time the aircraft crosses each monitored flight level boundary.
- Parameters:
sdf_input (pyspark.sql.DataFrame) – Input DataFrame with baro_altitude_c.
- Returns:
track_id, type, event_time, lon, lat, altitude_ft, cumulative_distance_nm, cumulative_time_s, info.
- Return type:
DataFrame of crossing events with columns
- opdi.pipeline.events.calculate_firstseen_lastseen_events(sdf_input)[source]¶
Calculate first_seen and last_seen events for each track.
- Parameters:
sdf_input (pyspark.sql.DataFrame) – Input DataFrame with track data.
- Returns:
DataFrame with first/last seen events.
- Return type:
- opdi.pipeline.events.calculate_airport_events(sv, month, spark, project)[source]¶
Detect airport infrastructure entry/exit events using H3 layout matching.
Matches low-altitude track points against airport layout H3 hexagons (resolution 12) to detect when aircraft enter and exit runways, taxiways, aprons, and other ground infrastructure.
- Parameters:
sv (pyspark.sql.DataFrame) – Input tracks DataFrame.
month (date) – Month being processed (for flight list lookup).
spark (pyspark.sql.SparkSession) – Active SparkSession.
project (str) – Project/database name.
- Returns:
DataFrame of airport entry/exit events.
- Return type:
- opdi.pipeline.events.add_time_measure(sdf_input)[source]¶
Add cumulative time (seconds from track start) to each state vector.
- Parameters:
sdf_input (pyspark.sql.DataFrame)
- Return type:
- opdi.pipeline.events.add_distance_measure(df)[source]¶
Calculate segment and cumulative distance (NM) using Haversine formula.
Uses native PySpark functions for distributed computation.
- Parameters:
df (pyspark.sql.DataFrame) – DataFrame with lat, lon, track_id, event_time columns.
- Returns:
DataFrame with segment_distance_nm and cumulative_distance_nm.
- Return type:
- class opdi.pipeline.events.FlightEventProcessor(spark, config, log_dir='OPDI_live/logs')[source]¶
Bases:
objectOrchestrates the extraction of flight events and measurements.
Combines horizontal (phase), vertical (FL crossing), airport, and first/last-seen event detection into a single processing pipeline. Writes results to opdi_flight_events and opdi_measurements tables.
- Parameters:
spark (pyspark.sql.SparkSession) – Active SparkSession.
config (OPDIConfig) – OPDI configuration object.
log_dir (str) – Directory for processing progress logs.
Example
>>> processor = FlightEventProcessor(spark, config) >>> processor.process_date_range(date(2024, 1, 1), date(2024, 6, 1))
- __init__(spark, config, log_dir='OPDI_live/logs')[source]¶
- Parameters:
spark (pyspark.sql.SparkSession)
config (OPDIConfig)
log_dir (str)