Utilities – opdi.utils

Shared utility functions for the OPDI pipeline.

Provides datetime helpers, Spark session management, geospatial calculations, and H3 hexagonal indexing utilities.

opdi.utils.generate_months(start_date, end_date)[source]

Generate a list of dates corresponding to the first day of each month between two dates.

Parameters:
  • start_date (date) – The starting date

  • end_date (date) – The ending date

Returns:

List of date objects for the first day of each month within the specified range

Return type:

List[date]

Example

>>> generate_months(date(2024, 1, 1), date(2024, 3, 1))
[date(2024, 1, 1), date(2024, 2, 1), date(2024, 3, 1)]
opdi.utils.generate_intervals(start_date, end_date, step_days=10)[source]

Generate a list of date intervals with specified step size.

Parameters:
  • start_date (date) – The starting date

  • end_date (date) – The ending date

  • step_days (int) – Number of days in each interval (default: 10)

Returns:

List of (start, end) date tuples representing intervals

Return type:

List[Tuple[date, date]]

Example

>>> generate_intervals(date(2024, 1, 1), date(2024, 1, 25), step_days=10)
[(date(2024, 1, 1), date(2024, 1, 10)),
 (date(2024, 1, 11), date(2024, 1, 20)),
 (date(2024, 1, 21), date(2024, 1, 25))]
opdi.utils.get_start_end_of_month(dt)[source]

Return Unix timestamps for the first and last second of the given month.

Parameters:

dt (date) – Date object for the desired month

Returns:

Tuple of (first_second_timestamp, last_second_timestamp) as floats

Return type:

Tuple[float, float]

Example

>>> get_start_end_of_month(date(2024, 1, 1))
(1704067200.0, 1706745599.0)  # Jan 1 00:00:00 to Jan 31 23:59:59
opdi.utils.get_data_within_timeframe(spark, table_name, month, time_col='event_time', unix_time=True)[source]

Retrieve records from a Spark table within the given monthly timeframe.

Parameters:
  • spark (pyspark.sql.SparkSession) – Active SparkSession object

  • table_name (str) – Name of the Spark table to query

  • month (date) – Start date of the month (first day)

  • time_col (str) – Name of the time column to filter on (default: ‘event_time’)

  • unix_time (bool) – Whether the time column is already in Unix timestamp format (default: True)

Returns:

DataFrame containing the records within the specified timeframe

Return type:

pyspark.sql.DataFrame

Example

>>> df = get_data_within_timeframe(spark, "project_opdi.osn_tracks", date(2024, 1, 1))
>>> df.count()  # Returns number of tracks in January 2024
opdi.utils.get_data_within_interval(spark, table_name, start_date, end_date, time_col='event_time', unix_time=True)[source]

Retrieve records from a Spark table within a specific date interval.

Parameters:
  • spark (pyspark.sql.SparkSession) – Active SparkSession object

  • table_name (str) – Name of the Spark table to query

  • start_date (date) – Start date of the interval (inclusive)

  • end_date (date) – End date of the interval (inclusive)

  • time_col (str) – Name of the time column to filter on (default: ‘event_time’)

  • unix_time (bool) – Whether the time column is already in Unix timestamp format (default: True)

Returns:

DataFrame containing the records within the specified interval

Return type:

pyspark.sql.DataFrame

Example

>>> df = get_data_within_interval(
...     spark, "project_opdi.flight_events",
...     date(2024, 1, 1), date(2024, 1, 10)
... )
class opdi.utils.SparkSessionManager[source]

Bases: object

Factory for creating and managing Spark sessions with OPDI configurations.

This class centralizes all Spark configuration logic, eliminating duplication across pipeline scripts and enabling environment-specific settings.

static create_session(app_name='OPDI Pipeline', config=None, env='dev', extra_configs=None)[source]

Create a new Spark session with OPDI configuration.

Parameters:
  • app_name (str) – Name for the Spark application

  • config (OPDIConfig | None) – OPDI configuration object. If None, creates config for specified env

  • env (str) – Environment name (“dev”, “live”, “local”) - used if config is None

  • extra_configs (Dict[str, str] | None) – Additional Spark configurations to override defaults

Returns:

Configured SparkSession with Hive support enabled

Return type:

pyspark.sql.SparkSession

Example

>>> from opdi.utils.spark_helpers import SparkSessionManager
>>> spark = SparkSessionManager.create_session("Track Processing", env="live")
>>> df = spark.table("project_opdi.osn_statevectors_v2")
static get_or_create(app_name='OPDI Pipeline', config=None, env='dev')[source]

Get existing Spark session or create new one if none exists.

This method is useful when you want to reuse an existing session within the same application context.

Parameters:
  • app_name (str) – Name for the Spark application

  • config (OPDIConfig | None) – OPDI configuration object. If None, creates config for specified env

  • env (str) – Environment name (“dev”, “live”, “local”)

Returns:

Active SparkSession (existing or newly created)

Return type:

pyspark.sql.SparkSession

Example

>>> spark = SparkSessionManager.get_or_create("My Analysis")
static stop_session(spark)[source]

Stop the given Spark session and clean up resources.

Parameters:

spark (pyspark.sql.SparkSession) – SparkSession to stop

Return type:

None

Example

>>> spark = SparkSessionManager.create_session()
>>> # ... do work ...
>>> SparkSessionManager.stop_session(spark)
static create_local_session(app_name='OPDI Local', master='local[*]', extra_configs=None)[source]

Create a lightweight Spark session for local testing.

This creates a minimal Spark session without Iceberg, Hive, or Azure dependencies, suitable for unit testing and local development.

Parameters:
  • app_name (str) – Name for the Spark application

  • master (str) – Spark master URL (default: “local[*]” uses all cores)

  • extra_configs (Dict[str, str] | None) – Additional Spark configurations

Returns:

Local SparkSession for testing

Return type:

pyspark.sql.SparkSession

Example

>>> spark = SparkSessionManager.create_local_session("Unit Tests")
>>> df = spark.createDataFrame([(1, "test")], ["id", "name"])
opdi.utils.get_spark(env='dev', app_name='OPDI Pipeline')[source]

Convenience function to get a Spark session with OPDI configuration.

This is a shorthand for SparkSessionManager.create_session().

Parameters:
  • env (str) – Environment name (“dev”, “live”, “local”)

  • app_name (str) – Name for the Spark application

Returns:

Configured SparkSession

Return type:

pyspark.sql.SparkSession

Example

>>> from opdi.utils.spark_helpers import get_spark
>>> spark = get_spark("live", "Flight Events Processing")
opdi.utils.haversine_distance(lat1, lon1, lat2, lon2, unit='nm')[source]

Calculate the great circle distance between two points on Earth using the Haversine formula.

Parameters:
  • lat1 (float) – Latitude of first point in degrees

  • lon1 (float) – Longitude of first point in degrees

  • lat2 (float) – Latitude of second point in degrees

  • lon2 (float) – Longitude of second point in degrees

  • unit (str) – Output unit - “km”, “nm” (nautical miles), or “m” (meters)

Returns:

Distance between the two points in the specified unit

Return type:

float

Example

>>> # Distance from Brussels (EBBR) to Paris (LFPG)
>>> dist = haversine_distance(50.9014, 4.4844, 49.0097, 2.5478, unit="nm")
>>> print(f"{dist:.1f} NM")
138.5 NM
opdi.utils.add_cumulative_distance(df, lat_col='lat', lon_col='lon', track_id_col='track_id', time_col='event_time')[source]

Calculate the great circle distance between consecutive points and cumulative distance.

Uses native PySpark functions for distributed computation of distances along tracks.

Parameters:
  • df (pyspark.sql.DataFrame) – Input Spark DataFrame

  • lat_col (str) – Name of latitude column (default: “lat”)

  • lon_col (str) – Name of longitude column (default: “lon”)

  • track_id_col (str) – Name of track ID column for partitioning (default: “track_id”)

  • time_col (str) – Name of time column for ordering (default: “event_time”)

Returns:

  • segment_distance_nm: Distance from previous point in nautical miles

  • cumulative_distance_nm: Total distance from track start

Return type:

DataFrame with additional columns

Example

>>> df_with_distance = add_cumulative_distance(tracks_df)
>>> df_with_distance.select("track_id", "cumulative_distance_nm").show()
opdi.utils.calculate_bearing(lat1, lon1, lat2, lon2)[source]

Calculate the initial bearing (forward azimuth) between two points.

Parameters:
  • lat1 (float) – Latitude of first point in degrees

  • lon1 (float) – Longitude of first point in degrees

  • lat2 (float) – Latitude of second point in degrees

  • lon2 (float) – Longitude of second point in degrees

Returns:

Bearing in degrees (0-360), where 0° is North, 90° is East, etc.

Return type:

float

Example

>>> bearing = calculate_bearing(50.9014, 4.4844, 49.0097, 2.5478)
>>> print(f"Bearing: {bearing:.1f}°")
Bearing: 235.4°
opdi.utils.destination_point(lat, lon, bearing, distance_nm)[source]

Calculate destination point given start point, bearing, and distance.

Parameters:
  • lat (float) – Starting latitude in degrees

  • lon (float) – Starting longitude in degrees

  • bearing (float) – Bearing in degrees (0-360)

  • distance_nm (float) – Distance in nautical miles

Returns:

Tuple of (destination_latitude, destination_longitude) in degrees

Return type:

Tuple[float, float]

Example

>>> # Point 100 NM north of Brussels
>>> dest_lat, dest_lon = destination_point(50.9014, 4.4844, 0, 100)
opdi.utils.meters_to_flight_level(meters)[source]

Convert altitude in meters to flight level.

Flight level is altitude in hundreds of feet, so FL100 = 10,000 feet.

Parameters:

meters (float) – Altitude in meters

Returns:

Flight level (integer)

Return type:

int

Example

>>> meters_to_flight_level(3048)  # 10,000 feet
100
opdi.utils.flight_level_to_meters(flight_level)[source]

Convert flight level to altitude in meters.

Parameters:

flight_level (int) – Flight level (e.g., 100 for FL100)

Returns:

Altitude in meters

Return type:

float

Example

>>> flight_level_to_meters(100)
3048.0
opdi.utils.h3_list_prep(h3_resolutions)[source]

Generate H3 column names for a list of resolutions.

Parameters:

h3_resolutions (List[int]) – List of H3 resolution levels (e.g., [7, 12])

Returns:

List of column names formatted as “h3_res_{resolution}”

Return type:

List[str]

Example

>>> h3_list_prep([7, 12])
['h3_res_7', 'h3_res_12']
opdi.utils.get_h3_coords(h3_index)[source]

Get latitude and longitude coordinates for an H3 index.

Parameters:

h3_index (str) – H3 hexagon index string

Returns:

Tuple of (latitude, longitude) in degrees

Return type:

Tuple[float, float]

Example

>>> lat, lon = get_h3_coords('871fb46655fffff')
>>> print(f"Lat: {lat:.4f}, Lon: {lon:.4f}")
opdi.utils.compact_h3_set(h3_set)[source]

Compact a set of H3 indices by merging child hexagons into parents where possible.

This reduces the number of hexagons needed to represent the same area, improving storage and query performance.

Parameters:

h3_set (Set[str]) – Set of H3 index strings

Returns:

Compacted set of H3 indices

Return type:

Set[str]

Example

>>> h3_indices = {'871fb46655fffff', '871fb46656fffff', ...}
>>> compacted = compact_h3_set(h3_indices)
>>> len(compacted) < len(h3_indices)  # Compacted set is smaller
True
opdi.utils.uncompact_h3_set(h3_set, target_resolution)[source]

Uncompact a set of H3 indices to a target resolution.

Expands parent hexagons into their children at the specified resolution.

Parameters:
  • h3_set (Set[str]) – Set of H3 index strings

  • target_resolution (int) – Target H3 resolution level (0-15)

Returns:

Uncompacted set of H3 indices at target resolution

Return type:

Set[str]

Example

>>> compacted = {'861fb467fffffff'}  # Resolution 6
>>> uncompacted = uncompact_h3_set(compacted, 7)  # Expand to resolution 7
>>> len(uncompacted) > len(compacted)  # More hexagons at finer resolution
True
opdi.utils.h3_distance(h3_index1, h3_index2)[source]

Calculate the grid distance between two H3 indices.

Grid distance is the number of hexagon steps between two indices. Both indices must be at the same resolution.

Parameters:
  • h3_index1 (str) – First H3 index

  • h3_index2 (str) – Second H3 index

Returns:

Number of hexagon steps between the two indices

Return type:

int

Example

>>> dist = h3_distance('871fb46655fffff', '871fb46656fffff')
>>> print(f"Grid distance: {dist} hexagons")
opdi.utils.k_ring(h3_index, k)[source]

Get all hexagons within k steps of the given hexagon (including the center).

Parameters:
  • h3_index (str) – Center H3 hexagon index

  • k (int) – Number of steps (radius)

Returns:

Set of H3 indices within k steps

Return type:

Set[str]

Example

>>> # Get hexagon and its immediate neighbors (k=1)
>>> neighbors = k_ring('871fb46655fffff', 1)
>>> len(neighbors)  # Center + 6 neighbors
7
opdi.utils.hex_ring(h3_index, k)[source]

Get hexagons exactly k steps away from the given hexagon (hollow ring).

Parameters:
  • h3_index (str) – Center H3 hexagon index

  • k (int) – Number of steps (radius)

Returns:

Set of H3 indices exactly k steps away

Return type:

Set[str]

Example

>>> # Get only the immediate neighbors (not the center)
>>> ring = hex_ring('871fb46655fffff', 1)
>>> len(ring)  # Just the 6 neighbors
6
opdi.utils.polyfill_geojson(geojson_geometry, resolution, geo_json_conformant=False)[source]

Fill a GeoJSON geometry with H3 hexagons at the specified resolution.

Parameters:
  • geojson_geometry (dict) – GeoJSON geometry dict (Polygon or MultiPolygon)

  • resolution (int) – H3 resolution level (0-15)

  • geo_json_conformant (bool) – Whether coordinates are in GeoJSON format [lon, lat] vs [lat, lon]

Returns:

Set of H3 indices covering the geometry

Return type:

Set[str]

Example

>>> polygon = {
...     "type": "Polygon",
...     "coordinates": [[
...         [4.4, 50.8], [4.5, 50.8], [4.5, 50.9], [4.4, 50.9], [4.4, 50.8]
...     ]]
... }
>>> hexagons = polyfill_geojson(polygon, resolution=7, geo_json_conformant=True)
opdi.utils.is_valid_h3_index(h3_index)[source]

Check if a string is a valid H3 index.

Parameters:

h3_index (str) – String to validate

Returns:

True if valid H3 index, False otherwise

Return type:

bool

Example

>>> is_valid_h3_index('871fb46655fffff')
True
>>> is_valid_h3_index('invalid')
False

DateTime Helpers – opdi.utils.datetime_helpers

Date and time utility functions for OPDI pipeline.

Provides functions for generating date ranges, converting dates to Unix timestamps, and filtering Spark DataFrames by time windows.

opdi.utils.datetime_helpers.generate_months(start_date, end_date)[source]

Generate a list of dates corresponding to the first day of each month between two dates.

Parameters:
  • start_date (date) – The starting date

  • end_date (date) – The ending date

Returns:

List of date objects for the first day of each month within the specified range

Return type:

List[date]

Example

>>> generate_months(date(2024, 1, 1), date(2024, 3, 1))
[date(2024, 1, 1), date(2024, 2, 1), date(2024, 3, 1)]
opdi.utils.datetime_helpers.generate_intervals(start_date, end_date, step_days=10)[source]

Generate a list of date intervals with specified step size.

Parameters:
  • start_date (date) – The starting date

  • end_date (date) – The ending date

  • step_days (int) – Number of days in each interval (default: 10)

Returns:

List of (start, end) date tuples representing intervals

Return type:

List[Tuple[date, date]]

Example

>>> generate_intervals(date(2024, 1, 1), date(2024, 1, 25), step_days=10)
[(date(2024, 1, 1), date(2024, 1, 10)),
 (date(2024, 1, 11), date(2024, 1, 20)),
 (date(2024, 1, 21), date(2024, 1, 25))]
opdi.utils.datetime_helpers.get_start_end_of_month(dt)[source]

Return Unix timestamps for the first and last second of the given month.

Parameters:

dt (date) – Date object for the desired month

Returns:

Tuple of (first_second_timestamp, last_second_timestamp) as floats

Return type:

Tuple[float, float]

Example

>>> get_start_end_of_month(date(2024, 1, 1))
(1704067200.0, 1706745599.0)  # Jan 1 00:00:00 to Jan 31 23:59:59
opdi.utils.datetime_helpers.get_data_within_timeframe(spark, table_name, month, time_col='event_time', unix_time=True)[source]

Retrieve records from a Spark table within the given monthly timeframe.

Parameters:
  • spark (pyspark.sql.SparkSession) – Active SparkSession object

  • table_name (str) – Name of the Spark table to query

  • month (date) – Start date of the month (first day)

  • time_col (str) – Name of the time column to filter on (default: ‘event_time’)

  • unix_time (bool) – Whether the time column is already in Unix timestamp format (default: True)

Returns:

DataFrame containing the records within the specified timeframe

Return type:

pyspark.sql.DataFrame

Example

>>> df = get_data_within_timeframe(spark, "project_opdi.osn_tracks", date(2024, 1, 1))
>>> df.count()  # Returns number of tracks in January 2024
opdi.utils.datetime_helpers.get_data_within_interval(spark, table_name, start_date, end_date, time_col='event_time', unix_time=True)[source]

Retrieve records from a Spark table within a specific date interval.

Parameters:
  • spark (pyspark.sql.SparkSession) – Active SparkSession object

  • table_name (str) – Name of the Spark table to query

  • start_date (date) – Start date of the interval (inclusive)

  • end_date (date) – End date of the interval (inclusive)

  • time_col (str) – Name of the time column to filter on (default: ‘event_time’)

  • unix_time (bool) – Whether the time column is already in Unix timestamp format (default: True)

Returns:

DataFrame containing the records within the specified interval

Return type:

pyspark.sql.DataFrame

Example

>>> df = get_data_within_interval(
...     spark, "project_opdi.flight_events",
...     date(2024, 1, 1), date(2024, 1, 10)
... )

Spark Helpers – opdi.utils.spark_helpers

Spark session management utilities for OPDI pipeline.

Provides centralized Spark session creation with consistent configuration across all pipeline components.

class opdi.utils.spark_helpers.SparkSessionManager[source]

Bases: object

Factory for creating and managing Spark sessions with OPDI configurations.

This class centralizes all Spark configuration logic, eliminating duplication across pipeline scripts and enabling environment-specific settings.

static create_session(app_name='OPDI Pipeline', config=None, env='dev', extra_configs=None)[source]

Create a new Spark session with OPDI configuration.

Parameters:
  • app_name (str) – Name for the Spark application

  • config (OPDIConfig | None) – OPDI configuration object. If None, creates config for specified env

  • env (str) – Environment name (“dev”, “live”, “local”) - used if config is None

  • extra_configs (Dict[str, str] | None) – Additional Spark configurations to override defaults

Returns:

Configured SparkSession with Hive support enabled

Return type:

pyspark.sql.SparkSession

Example

>>> from opdi.utils.spark_helpers import SparkSessionManager
>>> spark = SparkSessionManager.create_session("Track Processing", env="live")
>>> df = spark.table("project_opdi.osn_statevectors_v2")
static get_or_create(app_name='OPDI Pipeline', config=None, env='dev')[source]

Get existing Spark session or create new one if none exists.

This method is useful when you want to reuse an existing session within the same application context.

Parameters:
  • app_name (str) – Name for the Spark application

  • config (OPDIConfig | None) – OPDI configuration object. If None, creates config for specified env

  • env (str) – Environment name (“dev”, “live”, “local”)

Returns:

Active SparkSession (existing or newly created)

Return type:

pyspark.sql.SparkSession

Example

>>> spark = SparkSessionManager.get_or_create("My Analysis")
static stop_session(spark)[source]

Stop the given Spark session and clean up resources.

Parameters:

spark (pyspark.sql.SparkSession) – SparkSession to stop

Return type:

None

Example

>>> spark = SparkSessionManager.create_session()
>>> # ... do work ...
>>> SparkSessionManager.stop_session(spark)
static create_local_session(app_name='OPDI Local', master='local[*]', extra_configs=None)[source]

Create a lightweight Spark session for local testing.

This creates a minimal Spark session without Iceberg, Hive, or Azure dependencies, suitable for unit testing and local development.

Parameters:
  • app_name (str) – Name for the Spark application

  • master (str) – Spark master URL (default: “local[*]” uses all cores)

  • extra_configs (Dict[str, str] | None) – Additional Spark configurations

Returns:

Local SparkSession for testing

Return type:

pyspark.sql.SparkSession

Example

>>> spark = SparkSessionManager.create_local_session("Unit Tests")
>>> df = spark.createDataFrame([(1, "test")], ["id", "name"])
opdi.utils.spark_helpers.get_spark(env='dev', app_name='OPDI Pipeline')[source]

Convenience function to get a Spark session with OPDI configuration.

This is a shorthand for SparkSessionManager.create_session().

Parameters:
  • env (str) – Environment name (“dev”, “live”, “local”)

  • app_name (str) – Name for the Spark application

Returns:

Configured SparkSession

Return type:

pyspark.sql.SparkSession

Example

>>> from opdi.utils.spark_helpers import get_spark
>>> spark = get_spark("live", "Flight Events Processing")

Geospatial – opdi.utils.geospatial

Geospatial utility functions for OPDI pipeline.

Provides functions for distance calculations, coordinate transformations, and geometry operations used throughout the aviation data pipeline.

opdi.utils.geospatial.haversine_distance(lat1, lon1, lat2, lon2, unit='nm')[source]

Calculate the great circle distance between two points on Earth using the Haversine formula.

Parameters:
  • lat1 (float) – Latitude of first point in degrees

  • lon1 (float) – Longitude of first point in degrees

  • lat2 (float) – Latitude of second point in degrees

  • lon2 (float) – Longitude of second point in degrees

  • unit (str) – Output unit - “km”, “nm” (nautical miles), or “m” (meters)

Returns:

Distance between the two points in the specified unit

Return type:

float

Example

>>> # Distance from Brussels (EBBR) to Paris (LFPG)
>>> dist = haversine_distance(50.9014, 4.4844, 49.0097, 2.5478, unit="nm")
>>> print(f"{dist:.1f} NM")
138.5 NM
opdi.utils.geospatial.add_cumulative_distance(df, lat_col='lat', lon_col='lon', track_id_col='track_id', time_col='event_time')[source]

Calculate the great circle distance between consecutive points and cumulative distance.

Uses native PySpark functions for distributed computation of distances along tracks.

Parameters:
  • df (pyspark.sql.DataFrame) – Input Spark DataFrame

  • lat_col (str) – Name of latitude column (default: “lat”)

  • lon_col (str) – Name of longitude column (default: “lon”)

  • track_id_col (str) – Name of track ID column for partitioning (default: “track_id”)

  • time_col (str) – Name of time column for ordering (default: “event_time”)

Returns:

  • segment_distance_nm: Distance from previous point in nautical miles

  • cumulative_distance_nm: Total distance from track start

Return type:

DataFrame with additional columns

Example

>>> df_with_distance = add_cumulative_distance(tracks_df)
>>> df_with_distance.select("track_id", "cumulative_distance_nm").show()
opdi.utils.geospatial.calculate_bearing(lat1, lon1, lat2, lon2)[source]

Calculate the initial bearing (forward azimuth) between two points.

Parameters:
  • lat1 (float) – Latitude of first point in degrees

  • lon1 (float) – Longitude of first point in degrees

  • lat2 (float) – Latitude of second point in degrees

  • lon2 (float) – Longitude of second point in degrees

Returns:

Bearing in degrees (0-360), where 0° is North, 90° is East, etc.

Return type:

float

Example

>>> bearing = calculate_bearing(50.9014, 4.4844, 49.0097, 2.5478)
>>> print(f"Bearing: {bearing:.1f}°")
Bearing: 235.4°
opdi.utils.geospatial.destination_point(lat, lon, bearing, distance_nm)[source]

Calculate destination point given start point, bearing, and distance.

Parameters:
  • lat (float) – Starting latitude in degrees

  • lon (float) – Starting longitude in degrees

  • bearing (float) – Bearing in degrees (0-360)

  • distance_nm (float) – Distance in nautical miles

Returns:

Tuple of (destination_latitude, destination_longitude) in degrees

Return type:

Tuple[float, float]

Example

>>> # Point 100 NM north of Brussels
>>> dest_lat, dest_lon = destination_point(50.9014, 4.4844, 0, 100)
opdi.utils.geospatial.meters_to_flight_level(meters)[source]

Convert altitude in meters to flight level.

Flight level is altitude in hundreds of feet, so FL100 = 10,000 feet.

Parameters:

meters (float) – Altitude in meters

Returns:

Flight level (integer)

Return type:

int

Example

>>> meters_to_flight_level(3048)  # 10,000 feet
100
opdi.utils.geospatial.flight_level_to_meters(flight_level)[source]

Convert flight level to altitude in meters.

Parameters:

flight_level (int) – Flight level (e.g., 100 for FL100)

Returns:

Altitude in meters

Return type:

float

Example

>>> flight_level_to_meters(100)
3048.0

H3 Helpers – opdi.utils.h3_helpers

H3 hexagonal indexing utility functions for OPDI pipeline.

Provides helper functions for working with H3 geospatial indices, including column name generation, coordinate extraction, and set operations.

opdi.utils.h3_helpers.h3_list_prep(h3_resolutions)[source]

Generate H3 column names for a list of resolutions.

Parameters:

h3_resolutions (List[int]) – List of H3 resolution levels (e.g., [7, 12])

Returns:

List of column names formatted as “h3_res_{resolution}”

Return type:

List[str]

Example

>>> h3_list_prep([7, 12])
['h3_res_7', 'h3_res_12']
opdi.utils.h3_helpers.get_h3_coords(h3_index)[source]

Get latitude and longitude coordinates for an H3 index.

Parameters:

h3_index (str) – H3 hexagon index string

Returns:

Tuple of (latitude, longitude) in degrees

Return type:

Tuple[float, float]

Example

>>> lat, lon = get_h3_coords('871fb46655fffff')
>>> print(f"Lat: {lat:.4f}, Lon: {lon:.4f}")
opdi.utils.h3_helpers.compact_h3_set(h3_set)[source]

Compact a set of H3 indices by merging child hexagons into parents where possible.

This reduces the number of hexagons needed to represent the same area, improving storage and query performance.

Parameters:

h3_set (Set[str]) – Set of H3 index strings

Returns:

Compacted set of H3 indices

Return type:

Set[str]

Example

>>> h3_indices = {'871fb46655fffff', '871fb46656fffff', ...}
>>> compacted = compact_h3_set(h3_indices)
>>> len(compacted) < len(h3_indices)  # Compacted set is smaller
True
opdi.utils.h3_helpers.uncompact_h3_set(h3_set, target_resolution)[source]

Uncompact a set of H3 indices to a target resolution.

Expands parent hexagons into their children at the specified resolution.

Parameters:
  • h3_set (Set[str]) – Set of H3 index strings

  • target_resolution (int) – Target H3 resolution level (0-15)

Returns:

Uncompacted set of H3 indices at target resolution

Return type:

Set[str]

Example

>>> compacted = {'861fb467fffffff'}  # Resolution 6
>>> uncompacted = uncompact_h3_set(compacted, 7)  # Expand to resolution 7
>>> len(uncompacted) > len(compacted)  # More hexagons at finer resolution
True
opdi.utils.h3_helpers.h3_distance(h3_index1, h3_index2)[source]

Calculate the grid distance between two H3 indices.

Grid distance is the number of hexagon steps between two indices. Both indices must be at the same resolution.

Parameters:
  • h3_index1 (str) – First H3 index

  • h3_index2 (str) – Second H3 index

Returns:

Number of hexagon steps between the two indices

Return type:

int

Example

>>> dist = h3_distance('871fb46655fffff', '871fb46656fffff')
>>> print(f"Grid distance: {dist} hexagons")
opdi.utils.h3_helpers.get_h3_resolution(h3_index)[source]

Get the resolution level of an H3 index.

Parameters:

h3_index (str) – H3 hexagon index string

Returns:

Resolution level (0-15)

Return type:

int

Example

>>> res = get_h3_resolution('871fb46655fffff')
>>> print(f"Resolution: {res}")
Resolution: 7
opdi.utils.h3_helpers.k_ring(h3_index, k)[source]

Get all hexagons within k steps of the given hexagon (including the center).

Parameters:
  • h3_index (str) – Center H3 hexagon index

  • k (int) – Number of steps (radius)

Returns:

Set of H3 indices within k steps

Return type:

Set[str]

Example

>>> # Get hexagon and its immediate neighbors (k=1)
>>> neighbors = k_ring('871fb46655fffff', 1)
>>> len(neighbors)  # Center + 6 neighbors
7
opdi.utils.h3_helpers.hex_ring(h3_index, k)[source]

Get hexagons exactly k steps away from the given hexagon (hollow ring).

Parameters:
  • h3_index (str) – Center H3 hexagon index

  • k (int) – Number of steps (radius)

Returns:

Set of H3 indices exactly k steps away

Return type:

Set[str]

Example

>>> # Get only the immediate neighbors (not the center)
>>> ring = hex_ring('871fb46655fffff', 1)
>>> len(ring)  # Just the 6 neighbors
6
opdi.utils.h3_helpers.h3_to_parent(h3_index, parent_resolution)[source]

Get the parent hexagon at a coarser resolution.

Parameters:
  • h3_index (str) – Child H3 index

  • parent_resolution (int) – Resolution of parent (must be less than child resolution)

Returns:

Parent H3 index at specified resolution

Return type:

str

Example

>>> child = '871fb46655fffff'  # Resolution 7
>>> parent = h3_to_parent(child, 6)
>>> print(f"Parent: {parent}")
opdi.utils.h3_helpers.h3_to_children(h3_index, child_resolution)[source]

Get all child hexagons at a finer resolution.

Parameters:
  • h3_index (str) – Parent H3 index

  • child_resolution (int) – Resolution of children (must be greater than parent resolution)

Returns:

Set of child H3 indices at specified resolution

Return type:

Set[str]

Example

>>> parent = '861fb467fffffff'  # Resolution 6
>>> children = h3_to_children(parent, 7)
>>> print(f"Number of children: {len(children)}")
Number of children: 7
opdi.utils.h3_helpers.polyfill_geojson(geojson_geometry, resolution, geo_json_conformant=False)[source]

Fill a GeoJSON geometry with H3 hexagons at the specified resolution.

Parameters:
  • geojson_geometry (dict) – GeoJSON geometry dict (Polygon or MultiPolygon)

  • resolution (int) – H3 resolution level (0-15)

  • geo_json_conformant (bool) – Whether coordinates are in GeoJSON format [lon, lat] vs [lat, lon]

Returns:

Set of H3 indices covering the geometry

Return type:

Set[str]

Example

>>> polygon = {
...     "type": "Polygon",
...     "coordinates": [[
...         [4.4, 50.8], [4.5, 50.8], [4.5, 50.9], [4.4, 50.9], [4.4, 50.8]
...     ]]
... }
>>> hexagons = polyfill_geojson(polygon, resolution=7, geo_json_conformant=True)
opdi.utils.h3_helpers.is_valid_h3_index(h3_index)[source]

Check if a string is a valid H3 index.

Parameters:

h3_index (str) – String to validate

Returns:

True if valid H3 index, False otherwise

Return type:

bool

Example

>>> is_valid_h3_index('871fb46655fffff')
True
>>> is_valid_h3_index('invalid')
False