Source code for opdi.reference.h3_airport_zones

"""
H3 airport detection zone generator.

Creates concentric hexagonal rings around airports for flight detection.
Each airport gets rings at configurable radii (default: 0-40 NM) encoded
as H3 hexagons at resolution 7.

Ported from: OPDI-live/python/v2.0.0/00_create_h3_airport_detection_areas.py
"""

import json
import math
import os
from typing import List, Optional

import numpy as np
import pandas as pd
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import udf, col, lit, array_except, explode
from pyspark.sql.types import (
    StructType,
    StructField,
    StringType,
    IntegerType,
    FloatType,
    ArrayType,
)
import h3
import h3_pyspark

from opdi.config import OPDIConfig


[docs] def generate_circle_polygon( lon: float, lat: float, radius_nautical_miles: float, num_points: int = 360 ) -> str: """ Generate a GeoJSON polygon approximating a circle around a point. Uses the destination-point formula to compute points along a circle at a given radius from a center coordinate. Args: lon: Center longitude in decimal degrees. lat: Center latitude in decimal degrees. radius_nautical_miles: Circle radius in nautical miles. num_points: Number of polygon vertices (higher = smoother circle). Returns: GeoJSON Polygon string. """ radius_km = radius_nautical_miles * 1.852 def degrees_to_radians(degrees): return degrees * math.pi / 180 def calculate_point(lon, lat, distance_km, bearing): R = 6371.01 # Earth's radius in km lat_rad = degrees_to_radians(lat) lon_rad = degrees_to_radians(lon) distance_rad = distance_km / R bearing_rad = degrees_to_radians(bearing) lat_new_rad = math.asin( math.sin(lat_rad) * math.cos(distance_rad) + math.cos(lat_rad) * math.sin(distance_rad) * math.cos(bearing_rad) ) lon_new_rad = lon_rad + math.atan2( math.sin(bearing_rad) * math.sin(distance_rad) * math.cos(lat_rad), math.cos(distance_rad) - math.sin(lat_rad) * math.sin(lat_new_rad), ) return [math.degrees(lon_new_rad), math.degrees(lat_new_rad)] points = [ calculate_point(lon, lat, radius_km, (360 / num_points) * i) for i in range(num_points) ] points.append(points[0]) # Close the polygon return json.dumps({"type": "Polygon", "coordinates": [points]})
[docs] class AirportDetectionZoneGenerator: """ Generates H3 hexagonal detection zones around airports. Creates concentric rings around each airport at configurable radii, encoded as H3 hexagons. These zones are used by the flight list pipeline to detect departures and arrivals. The default configuration creates 6 concentric rings at 0-5, 5-10, 10-20, 20-30, and 30-40 NM from the airport reference point. Args: spark: Active SparkSession. config: OPDI configuration object. resolution: H3 resolution for hexagons (default: from config). num_points: Number of points for circle polygon approximation. radii_nm: List of ring boundary radii in nautical miles. Example: >>> generator = AirportDetectionZoneGenerator(spark, config) >>> df = generator.generate() >>> generator.save_to_parquet("data/airport_hex/zones_res7.parquet") """ # European bounding box with offset for edge airports BBOX_OFFSET = 3 # degrees LAT_MIN = 26.74617 LAT_MAX = 70.25976 LON_MIN = -25.86653 LON_MAX = 49.65699 AIRPORT_SCHEMA = StructType([ StructField("id", StringType(), True), StructField("ident", StringType(), True), StructField("type", StringType(), True), StructField("name", StringType(), True), StructField("latitude_deg", FloatType(), True), StructField("longitude_deg", FloatType(), True), StructField("elevation_ft", FloatType(), True), StructField("continent", StringType(), True), StructField("iso_country", StringType(), True), StructField("iso_region", StringType(), True), StructField("municipality", StringType(), True), StructField("scheduled_service", StringType(), True), StructField("gps_code", StringType(), True), StructField("iata_code", StringType(), True), StructField("local_code", StringType(), True), StructField("home_link", StringType(), True), StructField("wikipedia_link", StringType(), True), StructField("keywords", StringType(), True), ])
[docs] def __init__( self, spark: SparkSession, config: OPDIConfig, resolution: Optional[int] = None, num_points: int = 720, radii_nm: Optional[List[float]] = None, ): self.spark = spark self.config = config self.resolution = resolution or config.h3.airport_detection_resolution self.num_points = num_points self.radii_nm = radii_nm or [0, 5, 10, 20, 30, 40] self._result_df: Optional[pd.DataFrame] = None # Register UDF self._circle_udf = udf(generate_circle_polygon, StringType())
def _load_airports( self, airports_url: str = "https://davidmegginson.github.io/ourairports-data/airports.csv", ) -> DataFrame: """ Load airport data from OurAirports and filter to European bounding box. Args: airports_url: URL to OurAirports airports CSV. Returns: Spark DataFrame with airport data filtered to Europe. """ df_apt = pd.read_csv(airports_url) # European bounding box filter offset = self.BBOX_OFFSET f_lat = df_apt.latitude_deg.between( self.LAT_MIN - offset, self.LAT_MAX + offset ) f_lon = df_apt.longitude_deg.between( self.LON_MIN - offset, self.LON_MAX + offset ) df_apt = df_apt[f_lat & f_lon] # Ensure column types df_apt.columns = df_apt.columns.astype(str) df_apt = df_apt.astype({ "latitude_deg": "float64", "longitude_deg": "float64", "elevation_ft": "float64", }) return self.spark.createDataFrame(df_apt, schema=self.AIRPORT_SCHEMA) def _build_ring_config(self) -> DataFrame: """ Build Spark DataFrame defining concentric ring boundaries. Returns: Spark DataFrame with ring configuration (area_type, min/max radii). """ area_type = [f"C{x + 10}" for x in self.radii_nm] df = pd.DataFrame({ "max_resolution": self.resolution, "number_of_points_c": self.num_points, "area_type": area_type, "min_c_radius_nm": self.radii_nm, }) df["max_c_radius_nm"] = ( df["min_c_radius_nm"].shift(-1).fillna(np.max(self.radii_nm) + 10) ) df[["min_c_radius_nm", "max_c_radius_nm"]] = df[ ["min_c_radius_nm", "max_c_radius_nm"] ].astype(float) df["m_col"] = 1 schema = StructType([ StructField("max_resolution", IntegerType(), True), StructField("number_of_points_c", IntegerType(), True), StructField("area_type", StringType(), True), StructField("min_c_radius_nm", FloatType(), True), StructField("max_c_radius_nm", FloatType(), True), StructField("m_col", IntegerType(), True), ]) return self.spark.createDataFrame(df, schema=schema)
[docs] def generate( self, airports_url: str = "https://davidmegginson.github.io/ourairports-data/airports.csv", ) -> pd.DataFrame: """ Generate H3 detection zones for all airports in the European bounding box. Each airport-ring combination produces a set of H3 hex IDs formed by subtracting the inner circle hexagons from the outer circle hexagons, creating a ring-shaped detection zone. Args: airports_url: URL to OurAirports airports CSV. Returns: Pandas DataFrame with airport detection zone hex IDs. Columns include all airport metadata plus area_type and hex_id. """ print("Loading airports...") airports_df = self._load_airports(airports_url) print("Building ring configuration...") ring_config = self._build_ring_config() # Cross join airports with ring configuration airports_m = airports_df.withColumn("m_col", lit(1)).join( ring_config, on="m_col", how="left" ) print(f"Generating H3 zones at resolution {self.resolution}...") result = ( airports_m.withColumn( "inner_circle_polygon", self._circle_udf( col("longitude_deg"), col("latitude_deg"), col("min_c_radius_nm"), col("number_of_points_c"), ), ) .withColumn( "outer_circle_polygon", self._circle_udf( col("longitude_deg"), col("latitude_deg"), col("max_c_radius_nm"), col("number_of_points_c"), ), ) .withColumn( "inner_circle_hex_ids", h3_pyspark.polyfill( col("inner_circle_polygon"), col("max_resolution"), lit(True) ), ) .withColumn( "outer_circle_hex_ids", h3_pyspark.polyfill( col("outer_circle_polygon"), col("max_resolution"), lit(True) ), ) .withColumn( "hex_id", array_except( col("outer_circle_hex_ids"), col("inner_circle_hex_ids") ), ) .drop( "inner_circle_polygon", "outer_circle_polygon", "inner_circle_hex_ids", "outer_circle_hex_ids", ) .toPandas() ) self._result_df = result print(f"Generated {len(result)} airport-ring combinations.") return result
[docs] def save_to_parquet(self, output_path: str) -> None: """ Save generated detection zones to a parquet file. Args: output_path: Path to output parquet file. Raises: RuntimeError: If generate() has not been called first. """ if self._result_df is None: raise RuntimeError("Call generate() before saving.") os.makedirs(os.path.dirname(output_path) or ".", exist_ok=True) self._result_df.to_parquet(output_path) print(f"Saved detection zones to {output_path}")
[docs] def prepare_for_flight_list( self, max_radius_nm: float = 30.0, airport_types: Optional[List[str]] = None, ) -> pd.DataFrame: """ Prepare detection zone data for use by the flight list pipeline. Filters zones to the specified maximum radius and airport types, explodes hex arrays, adds H3 coordinates, and computes distance from airport center. Args: max_radius_nm: Maximum detection radius in nautical miles. airport_types: List of airport types to include (default: large, medium, small). Returns: Pandas DataFrame ready for use in flight list generation. Columns: apt_ident, apt_hex_id, distance_from_center, apt_latitude_deg, apt_longitude_deg. Raises: RuntimeError: If generate() has not been called first. """ if self._result_df is None: raise RuntimeError("Call generate() before preparing for flight list.") if airport_types is None: airport_types = ["large_airport", "medium_airport", "small_airport"] df = self._result_df.copy() # Filter by airport type and radius df = df[df["type"].isin(airport_types)] df = df[df["max_c_radius_nm"] <= max_radius_nm] # Explode hex arrays and remove nulls df = df[["ident", "hex_id", "latitude_deg", "longitude_deg"]].explode("hex_id") df = df[~df.hex_id.isna()] # Get H3 coordinates for each hex df["geo"] = df["hex_id"].apply(lambda h: h3.h3_to_geo(h)) df["lat"] = df["geo"].apply(lambda g: g[0]) df["lon"] = df["geo"].apply(lambda g: g[1]) df = df.drop("geo", axis=1) # European bounding box filter f_lat = np.logical_and(df.lat >= self.LAT_MIN, df.lat <= self.LAT_MAX) f_lon = np.logical_and(df.lon >= self.LON_MIN, df.lon <= self.LON_MAX) df = df[np.logical_and(f_lat, f_lon)] # Add center hex ID for each airport df["center_hex_id"] = df.apply( lambda row: h3.geo_to_h3( row["latitude_deg"], row["longitude_deg"], resolution=self.resolution ), axis=1, ) # Prefix column names df.columns = ["apt_" + x for x in df.columns] # Calculate H3 grid distance from center def calc_dist(h1, h2): try: return h3.h3_distance(h1, h2) except Exception: return None df["distance_from_center"] = df.apply( lambda row: calc_dist(row["apt_hex_id"], row["apt_center_hex_id"]), axis=1, ) df = df[~pd.isnull(df["distance_from_center"])] # Select final columns df = df[ [ "apt_ident", "apt_hex_id", "distance_from_center", "apt_latitude_deg", "apt_longitude_deg", ] ] return df