Pipeline – opdi.pipeline

Core pipeline transformation modules for OPDI.

Provides the main data transformation pipeline: - Track creation from raw state vectors - Flight list generation (departures, arrivals, overflights) - Flight event detection (phases, crossings, airport events)

class opdi.pipeline.TrackProcessor(spark, config, log_file_path='OPDI_live/logs/02_osn-tracks-etl-log.parquet')[source]

Bases: object

Processes raw state vectors into structured flight tracks.

This class implements the core track creation logic including: - Unique track ID generation using SHA256 hashing - Track splitting based on time gaps and altitude - H3 hexagonal encoding at multiple resolutions - Cumulative distance calculation - Altitude outlier detection and smoothing

CRITICAL: The track ID generation algorithm must remain unchanged as it ensures consistency with historical data and downstream systems.

Parameters:
__init__(spark, config, log_file_path='OPDI_live/logs/02_osn-tracks-etl-log.parquet')[source]

Initialize track processor.

Parameters:
process_month(month, skip_if_processed=True)[source]

Process tracks for a single month.

Parameters:
  • month (date) – Date representing the first day of the month to process

  • skip_if_processed (bool) – If True, skip if month already processed

Return type:

None

Example

>>> from datetime import date
>>> processor = TrackProcessor(spark, config)
>>> processor.process_month(date(2024, 1, 1))
process_date_range(start_month, end_month, skip_if_processed=True)[source]

Process tracks for a range of months.

Parameters:
  • start_month (date) – First month to process (first day of month)

  • end_month (date) – Last month to process (first day of month)

  • skip_if_processed (bool) – If True, skip already processed months

Return type:

None

Example

>>> from datetime import date
>>> processor = TrackProcessor(spark, config)
>>> processor.process_date_range(
...     date(2024, 1, 1),
...     date(2024, 3, 1)
... )
create_table_if_not_exists()[source]

Create the osn_tracks Iceberg table if it doesn’t exist.

This should be run once before first track processing.

Return type:

None

class opdi.pipeline.FlightListProcessor(spark, config, log_dir='OPDI_live/logs')[source]

Bases: object

Generates the OPDI flight list from processed track data.

The flight list is produced in two phases:

  • DAI (Departures/Arrivals/Internal) – Identifies flights with known departure and/or arrival airports by matching track points to H3 airport detection zones within 30 NM and below FL40.

  • Overflights – Captures remaining tracks (no airport match) that have ADS-B signals lasting at least 5 minutes.

Each flight is enriched with aircraft metadata from the OSN aircraft database (registration, model, typecode, operator).

Parameters:

Example

>>> processor = FlightListProcessor(spark, config)
>>> processor.process_date_range(date(2024, 1, 1), date(2024, 3, 1))
MAX_FL = 40
process_dai(month, airports_hex_path, skip_if_processed=True)[source]

Process Departures/Arrivals/Internal flights for a month.

Parameters:
  • month (date) – Month to process.

  • airports_hex_path (str) – Path to preprocessed airport hex zones parquet.

  • skip_if_processed (bool) – Skip if month already processed.

Return type:

None

process_overflights(month, skip_if_processed=True)[source]

Process overflight records for a month.

Parameters:
  • month (date) – Month to process.

  • skip_if_processed (bool) – Skip if month already processed.

Return type:

None

process_date_range(start_month, end_month, airports_hex_path, skip_if_processed=True)[source]

Process the complete flight list for a range of months.

Runs DAI processing first, then overflight processing for each month.

Parameters:
  • start_month (date) – First month to process.

  • end_month (date) – Last month to process.

  • airports_hex_path (str) – Path to preprocessed airport hex zones parquet.

  • skip_if_processed (bool) – Skip already processed months.

Return type:

None

Example

>>> processor = FlightListProcessor(spark, config)
>>> processor.process_date_range(
...     date(2024, 1, 1),
...     date(2024, 6, 1),
...     "data/airport_hex/zones_res7_processed.parquet"
... )
create_table_if_not_exists()[source]

Create the opdi_flight_list_v2 Iceberg table if it doesn’t exist.

Return type:

None

class opdi.pipeline.FlightEventProcessor(spark, config, log_dir='OPDI_live/logs')[source]

Bases: object

Orchestrates the extraction of flight events and measurements.

Combines horizontal (phase), vertical (FL crossing), airport, and first/last-seen event detection into a single processing pipeline. Writes results to opdi_flight_events and opdi_measurements tables.

Parameters:

Example

>>> processor = FlightEventProcessor(spark, config)
>>> processor.process_date_range(date(2024, 1, 1), date(2024, 6, 1))
process_month(month, skip_if_processed=True)[source]

Process all flight events for a single month.

Parameters:
  • month (date) – Month to process.

  • skip_if_processed (bool) – Skip event types already processed.

Return type:

None

process_date_range(start_month, end_month, skip_if_processed=True)[source]

Process flight events for a range of months.

Parameters:
  • start_month (date) – First month to process.

  • end_month (date) – Last month to process.

  • skip_if_processed (bool) – Skip already processed months/event types.

Return type:

None

Example

>>> processor = FlightEventProcessor(spark, config)
>>> processor.process_date_range(date(2024, 1, 1), date(2024, 6, 1))
create_tables_if_not_exist()[source]

Create opdi_flight_events and opdi_measurements Iceberg tables.

Return type:

None

Tracks – opdi.pipeline.tracks

Track processing and enrichment module.

Transforms raw state vectors into structured flight tracks with: - Track ID generation (SHA256-based) - Track splitting based on time gaps - H3 geospatial encoding - Distance calculations - Altitude cleaning

class opdi.pipeline.tracks.TrackProcessor(spark, config, log_file_path='OPDI_live/logs/02_osn-tracks-etl-log.parquet')[source]

Bases: object

Processes raw state vectors into structured flight tracks.

This class implements the core track creation logic including: - Unique track ID generation using SHA256 hashing - Track splitting based on time gaps and altitude - H3 hexagonal encoding at multiple resolutions - Cumulative distance calculation - Altitude outlier detection and smoothing

CRITICAL: The track ID generation algorithm must remain unchanged as it ensures consistency with historical data and downstream systems.

Parameters:
__init__(spark, config, log_file_path='OPDI_live/logs/02_osn-tracks-etl-log.parquet')[source]

Initialize track processor.

Parameters:
process_month(month, skip_if_processed=True)[source]

Process tracks for a single month.

Parameters:
  • month (date) – Date representing the first day of the month to process

  • skip_if_processed (bool) – If True, skip if month already processed

Return type:

None

Example

>>> from datetime import date
>>> processor = TrackProcessor(spark, config)
>>> processor.process_month(date(2024, 1, 1))
process_date_range(start_month, end_month, skip_if_processed=True)[source]

Process tracks for a range of months.

Parameters:
  • start_month (date) – First month to process (first day of month)

  • end_month (date) – Last month to process (first day of month)

  • skip_if_processed (bool) – If True, skip already processed months

Return type:

None

Example

>>> from datetime import date
>>> processor = TrackProcessor(spark, config)
>>> processor.process_date_range(
...     date(2024, 1, 1),
...     date(2024, 3, 1)
... )
create_table_if_not_exists()[source]

Create the osn_tracks Iceberg table if it doesn’t exist.

This should be run once before first track processing.

Return type:

None

Flights – opdi.pipeline.flights

Flight list generation module.

Creates flight-level summaries from processed tracks by: 1. Detecting departures and arrivals using H3 airport zones 2. Classifying flights as take-off, landing, or overflight 3. Enriching with aircraft metadata from the OSN aircraft database 4. Producing the OPDI flight list table

Ported from: OPDI-live/python/v2.0.0/03_opdi_flight_list_v2.py

class opdi.pipeline.flights.FlightListProcessor(spark, config, log_dir='OPDI_live/logs')[source]

Bases: object

Generates the OPDI flight list from processed track data.

The flight list is produced in two phases:

  • DAI (Departures/Arrivals/Internal) – Identifies flights with known departure and/or arrival airports by matching track points to H3 airport detection zones within 30 NM and below FL40.

  • Overflights – Captures remaining tracks (no airport match) that have ADS-B signals lasting at least 5 minutes.

Each flight is enriched with aircraft metadata from the OSN aircraft database (registration, model, typecode, operator).

Parameters:

Example

>>> processor = FlightListProcessor(spark, config)
>>> processor.process_date_range(date(2024, 1, 1), date(2024, 3, 1))
MAX_FL = 40
__init__(spark, config, log_dir='OPDI_live/logs')[source]
Parameters:
process_dai(month, airports_hex_path, skip_if_processed=True)[source]

Process Departures/Arrivals/Internal flights for a month.

Parameters:
  • month (date) – Month to process.

  • airports_hex_path (str) – Path to preprocessed airport hex zones parquet.

  • skip_if_processed (bool) – Skip if month already processed.

Return type:

None

process_overflights(month, skip_if_processed=True)[source]

Process overflight records for a month.

Parameters:
  • month (date) – Month to process.

  • skip_if_processed (bool) – Skip if month already processed.

Return type:

None

process_date_range(start_month, end_month, airports_hex_path, skip_if_processed=True)[source]

Process the complete flight list for a range of months.

Runs DAI processing first, then overflight processing for each month.

Parameters:
  • start_month (date) – First month to process.

  • end_month (date) – Last month to process.

  • airports_hex_path (str) – Path to preprocessed airport hex zones parquet.

  • skip_if_processed (bool) – Skip already processed months.

Return type:

None

Example

>>> processor = FlightListProcessor(spark, config)
>>> processor.process_date_range(
...     date(2024, 1, 1),
...     date(2024, 6, 1),
...     "data/airport_hex/zones_res7_processed.parquet"
... )
create_table_if_not_exists()[source]

Create the opdi_flight_list_v2 Iceberg table if it doesn’t exist.

Return type:

None

Events – opdi.pipeline.events

Flight events and measurements ETL module.

Detects and records flight events (milestones) from track data.

Event types:

  • Horizontal segment events – flight phases (GND, CL, DE, CR, LVL), top-of-climb, top-of-descent, take-off, landing using fuzzy logic

  • Vertical crossing events – flight level crossings (FL50, FL70, FL100, FL245)

  • Airport events – entry/exit of runway, taxiway, apron via H3 layout matching

  • First/last seen events per track

Also produces measurement records (distance flown, time passed) linked to each event.

Ported from OPDI-live/python/v2.0.0/04_opdi_flight_events_etl.py.

opdi.pipeline.events.zmf(column, a, b)[source]

Zero-order membership function (Z-shaped). Returns 1 below a, 0 above b.

opdi.pipeline.events.gaussmf(column, mean, sigma)[source]

Gaussian membership function. Peak at mean, width controlled by sigma.

opdi.pipeline.events.smf(column, a, b)[source]

S-shaped membership function. Returns 0 below a, 1 above b.

opdi.pipeline.events.calculate_horizontal_segment_events(sdf_input)[source]

Detect flight phase transitions and horizontal segment events.

Uses fuzzy logic membership functions to classify each state vector into flight phases (GND, CL, DE, CR, LVL) based on altitude, rate of climb, and speed. Detects phase transitions to produce events: level-start, level-end, top-of-climb, top-of-descent, take-off, landing.

Parameters:

sdf_input (pyspark.sql.DataFrame) – Input DataFrame with track data including baro_altitude_c, vert_rate, velocity.

Returns:

track_id, type, event_time, lon, lat, altitude_ft, cumulative_distance_nm, cumulative_time_s, info.

Return type:

DataFrame of detected events with columns

opdi.pipeline.events.calculate_vertical_crossing_events(sdf_input)[source]

Detect flight level crossing events (FL50, FL70, FL100, FL245).

For each track, identifies the first and last time the aircraft crosses each monitored flight level boundary.

Parameters:

sdf_input (pyspark.sql.DataFrame) – Input DataFrame with baro_altitude_c.

Returns:

track_id, type, event_time, lon, lat, altitude_ft, cumulative_distance_nm, cumulative_time_s, info.

Return type:

DataFrame of crossing events with columns

opdi.pipeline.events.calculate_firstseen_lastseen_events(sdf_input)[source]

Calculate first_seen and last_seen events for each track.

Parameters:

sdf_input (pyspark.sql.DataFrame) – Input DataFrame with track data.

Returns:

DataFrame with first/last seen events.

Return type:

pyspark.sql.DataFrame

opdi.pipeline.events.calculate_airport_events(sv, month, spark, project)[source]

Detect airport infrastructure entry/exit events using H3 layout matching.

Matches low-altitude track points against airport layout H3 hexagons (resolution 12) to detect when aircraft enter and exit runways, taxiways, aprons, and other ground infrastructure.

Parameters:
Returns:

DataFrame of airport entry/exit events.

Return type:

pyspark.sql.DataFrame

opdi.pipeline.events.add_time_measure(sdf_input)[source]

Add cumulative time (seconds from track start) to each state vector.

Parameters:

sdf_input (pyspark.sql.DataFrame)

Return type:

pyspark.sql.DataFrame

opdi.pipeline.events.add_distance_measure(df)[source]

Calculate segment and cumulative distance (NM) using Haversine formula.

Uses native PySpark functions for distributed computation.

Parameters:

df (pyspark.sql.DataFrame) – DataFrame with lat, lon, track_id, event_time columns.

Returns:

DataFrame with segment_distance_nm and cumulative_distance_nm.

Return type:

pyspark.sql.DataFrame

class opdi.pipeline.events.FlightEventProcessor(spark, config, log_dir='OPDI_live/logs')[source]

Bases: object

Orchestrates the extraction of flight events and measurements.

Combines horizontal (phase), vertical (FL crossing), airport, and first/last-seen event detection into a single processing pipeline. Writes results to opdi_flight_events and opdi_measurements tables.

Parameters:

Example

>>> processor = FlightEventProcessor(spark, config)
>>> processor.process_date_range(date(2024, 1, 1), date(2024, 6, 1))
__init__(spark, config, log_dir='OPDI_live/logs')[source]
Parameters:
process_month(month, skip_if_processed=True)[source]

Process all flight events for a single month.

Parameters:
  • month (date) – Month to process.

  • skip_if_processed (bool) – Skip event types already processed.

Return type:

None

process_date_range(start_month, end_month, skip_if_processed=True)[source]

Process flight events for a range of months.

Parameters:
  • start_month (date) – First month to process.

  • end_month (date) – Last month to process.

  • skip_if_processed (bool) – Skip already processed months/event types.

Return type:

None

Example

>>> processor = FlightEventProcessor(spark, config)
>>> processor.process_date_range(date(2024, 1, 1), date(2024, 6, 1))
create_tables_if_not_exist()[source]

Create opdi_flight_events and opdi_measurements Iceberg tables.

Return type:

None