Source code for opdi.ingestion.ourairports

"""
OurAirports reference data ingestion.

Downloads and loads airport reference datasets from OurAirports into
Spark tables. Covers airports, runways, navaids, frequencies, countries,
and regions.

Ported from: OPDI-live/python/v2.0.0/00_etl_ourairports.py
"""

import os
import urllib.request
from datetime import datetime
from typing import Dict, List, Optional

from pyspark.sql import SparkSession
from pyspark.sql.types import (
    BooleanType,
    DoubleType,
    IntegerType,
    StringType,
    StructField,
    StructType,
)

from opdi.config import OPDIConfig

# Dataset URLs
DEFAULT_URLS: Dict[str, str] = {
    "airports": "https://ourairports.com/data/airports.csv",
    "runways": "https://ourairports.com/data/runways.csv",
    "navaids": "https://ourairports.com/data/navaids.csv",
    "airport_frequencies": "https://ourairports.com/data/airport-frequencies.csv",
    "countries": "https://ourairports.com/data/countries.csv",
    "regions": "https://ourairports.com/data/regions.csv",
}

# Spark schemas for each dataset
SCHEMA_AIRPORTS = StructType([
    StructField("id", IntegerType(), True),
    StructField("ident", StringType(), True),
    StructField("type", StringType(), True),
    StructField("name", StringType(), True),
    StructField("latitude_deg", DoubleType(), True),
    StructField("longitude_deg", DoubleType(), True),
    StructField("elevation_ft", IntegerType(), True),
    StructField("continent", StringType(), True),
    StructField("iso_country", StringType(), True),
    StructField("iso_region", StringType(), True),
    StructField("municipality", StringType(), True),
    StructField("scheduled_service", StringType(), True),
    StructField("gps_code", StringType(), True),
    StructField("iata_code", StringType(), True),
    StructField("local_code", StringType(), True),
    StructField("home_link", StringType(), True),
    StructField("wikipedia_link", StringType(), True),
    StructField("keywords", StringType(), True),
])

SCHEMA_RUNWAYS = StructType([
    StructField("id", IntegerType(), True),
    StructField("airport_ref", IntegerType(), True),
    StructField("airport_ident", StringType(), True),
    StructField("length_ft", IntegerType(), True),
    StructField("width_ft", IntegerType(), True),
    StructField("surface", StringType(), True),
    StructField("lighted", BooleanType(), True),
    StructField("closed", BooleanType(), True),
    StructField("le_ident", StringType(), True),
    StructField("le_latitude_deg", DoubleType(), True),
    StructField("le_longitude_deg", DoubleType(), True),
    StructField("le_elevation_ft", IntegerType(), True),
    StructField("le_heading_degT", DoubleType(), True),
    StructField("le_displaced_threshold_ft", IntegerType(), True),
    StructField("he_ident", StringType(), True),
    StructField("he_latitude_deg", DoubleType(), True),
    StructField("he_longitude_deg", DoubleType(), True),
    StructField("he_elevation_ft", IntegerType(), True),
    StructField("he_heading_degT", DoubleType(), True),
    StructField("he_displaced_threshold_ft", IntegerType(), True),
])

SCHEMA_NAVAIDS = StructType([
    StructField("id", IntegerType(), True),
    StructField("filename", StringType(), True),
    StructField("ident", StringType(), True),
    StructField("name", StringType(), True),
    StructField("type", StringType(), True),
    StructField("frequency_khz", IntegerType(), True),
    StructField("latitude_deg", DoubleType(), True),
    StructField("longitude_deg", DoubleType(), True),
    StructField("elevation_ft", IntegerType(), True),
    StructField("iso_country", StringType(), True),
    StructField("dme_frequency_khz", IntegerType(), True),
    StructField("dme_channel", StringType(), True),
    StructField("dme_latitude_deg", DoubleType(), True),
    StructField("dme_longitude_deg", DoubleType(), True),
    StructField("dme_elevation_ft", IntegerType(), True),
    StructField("slaved_variation_deg", DoubleType(), True),
    StructField("magnetic_variation_deg", DoubleType(), True),
    StructField("usageType", StringType(), True),
    StructField("power", StringType(), True),
    StructField("associated_airport", StringType(), True),
])

SCHEMA_AIRPORT_FREQUENCIES = StructType([
    StructField("id", IntegerType(), True),
    StructField("airport_ref", IntegerType(), True),
    StructField("airport_ident", StringType(), True),
    StructField("type", StringType(), True),
    StructField("description", StringType(), True),
    StructField("frequency_mhz", DoubleType(), True),
])

SCHEMA_COUNTRIES = StructType([
    StructField("id", IntegerType(), True),
    StructField("code", StringType(), True),
    StructField("name", StringType(), True),
    StructField("continent", StringType(), True),
    StructField("wikipedia_link", StringType(), True),
    StructField("keywords", StringType(), True),
])

SCHEMA_REGIONS = StructType([
    StructField("id", IntegerType(), True),
    StructField("code", StringType(), True),
    StructField("local_code", StringType(), True),
    StructField("name", StringType(), True),
    StructField("continent", StringType(), True),
    StructField("iso_country", StringType(), True),
    StructField("wikipedia_link", StringType(), True),
    StructField("keywords", StringType(), True),
])

SCHEMAS: Dict[str, StructType] = {
    "airports": SCHEMA_AIRPORTS,
    "runways": SCHEMA_RUNWAYS,
    "navaids": SCHEMA_NAVAIDS,
    "airport_frequencies": SCHEMA_AIRPORT_FREQUENCIES,
    "countries": SCHEMA_COUNTRIES,
    "regions": SCHEMA_REGIONS,
}


[docs] class OurAirportsIngestion: """ ETL pipeline for OurAirports reference data. Downloads CSV datasets from OurAirports, loads them into Spark DataFrames with proper schemas, and writes them to database tables. Handles 6 datasets: airports, runways, navaids, airport_frequencies, countries, and regions. Args: spark: Active SparkSession. config: OPDI configuration object. target_database: Database to write tables to (default: 'project_aiu'). temp_dir: Directory for temporary CSV downloads. Example: >>> ingestion = OurAirportsIngestion(spark, config) >>> ingestion.ingest_all() """
[docs] def __init__( self, spark: SparkSession, config: OPDIConfig, target_database: str = "project_aiu", temp_dir: str = ".", ): self.spark = spark self.config = config self.target_database = target_database self.temp_dir = temp_dir self._temp_file = os.path.join(temp_dir, "ourairports_temp.csv")
def _get_create_table_sql(self, table_name: str) -> str: """Generate CREATE TABLE SQL for a given OurAirports dataset.""" today = datetime.today().strftime("%d %B %Y") db = self.target_database # Map of table names to their CREATE TABLE SQL sqls = { "airports": f""" CREATE TABLE IF NOT EXISTS `{db}`.`oa_airports` ( id INT, ident STRING, type STRING, name STRING, latitude_deg DOUBLE, longitude_deg DOUBLE, elevation_ft INT, continent STRING, iso_country STRING, iso_region STRING, municipality STRING, scheduled_service STRING, gps_code STRING, iata_code STRING, local_code STRING, home_link STRING, wikipedia_link STRING, keywords STRING ) COMMENT 'OurAirports airports data. Last updated: {today}.' STORED AS parquet TBLPROPERTIES ('transactional'='false') """, "runways": f""" CREATE TABLE IF NOT EXISTS `{db}`.`oa_runways` ( id INT, airport_ref INT, airport_ident STRING, length_ft INT, width_ft INT, surface STRING, lighted BOOLEAN, closed BOOLEAN, le_ident STRING, le_latitude_deg DOUBLE, le_longitude_deg DOUBLE, le_elevation_ft INT, le_heading_degT DOUBLE, le_displaced_threshold_ft INT, he_ident STRING, he_latitude_deg DOUBLE, he_longitude_deg DOUBLE, he_elevation_ft INT, he_heading_degT DOUBLE, he_displaced_threshold_ft INT ) COMMENT 'OurAirports runways data. Last updated: {today}.' STORED AS parquet TBLPROPERTIES ('transactional'='false') """, "navaids": f""" CREATE TABLE IF NOT EXISTS `{db}`.`oa_navaids` ( id INT, filename STRING, ident STRING, name STRING, type STRING, frequency_khz INT, latitude_deg DOUBLE, longitude_deg DOUBLE, elevation_ft INT, iso_country STRING, dme_frequency_khz INT, dme_channel STRING, dme_latitude_deg DOUBLE, dme_longitude_deg DOUBLE, dme_elevation_ft INT, slaved_variation_deg DOUBLE, magnetic_variation_deg DOUBLE, usageType STRING, power STRING, associated_airport STRING ) COMMENT 'OurAirports navaids data. Last updated: {today}.' STORED AS parquet TBLPROPERTIES ('transactional'='false') """, "airport_frequencies": f""" CREATE TABLE IF NOT EXISTS `{db}`.`oa_airport_frequencies` ( id INT, airport_ref INT, airport_ident STRING, type STRING, description STRING, frequency_mhz DOUBLE ) COMMENT 'OurAirports airport frequencies. Last updated: {today}.' STORED AS parquet TBLPROPERTIES ('transactional'='false') """, "countries": f""" CREATE TABLE IF NOT EXISTS `{db}`.`oa_countries` ( id INT, code STRING, name STRING, continent STRING, wikipedia_link STRING, keywords STRING ) COMMENT 'OurAirports countries data. Last updated: {today}.' STORED AS parquet TBLPROPERTIES ('transactional'='false') """, "regions": f""" CREATE TABLE IF NOT EXISTS `{db}`.`oa_regions` ( id INT, code STRING, local_code STRING, name STRING, continent STRING, iso_country STRING, wikipedia_link STRING, keywords STRING ) COMMENT 'OurAirports regions data. Last updated: {today}.' STORED AS parquet TBLPROPERTIES ('transactional'='false') """, } return sqls[table_name]
[docs] def create_tables(self, drop_existing: bool = False) -> None: """ Create all OurAirports tables. Args: drop_existing: If True, drop and recreate existing tables. """ db = self.target_database table_names = [ "oa_airports", "oa_runways", "oa_navaids", "oa_airport_frequencies", "oa_countries", "oa_regions", ] if drop_existing: for table in table_names: self.spark.sql(f"DROP TABLE IF EXISTS `{db}`.`{table}`") print(f"Dropped {db}.{table}") for dataset in SCHEMAS: sql = self._get_create_table_sql(dataset) self.spark.sql(sql) print(f"Created/verified {db}.oa_{dataset}")
[docs] def ingest_dataset( self, name: str, url: Optional[str] = None, ) -> int: """ Download and ingest a single OurAirports dataset. Args: name: Dataset name (airports, runways, navaids, etc.). url: Override URL for the CSV download. Returns: Number of rows ingested. """ url = url or DEFAULT_URLS[name] schema = SCHEMAS[name] print(f"Downloading {name} from {url}...") urllib.request.urlretrieve(url, self._temp_file) df = self.spark.read.csv(self._temp_file, header=True, schema=schema) row_count = df.count() table_name = f"{self.target_database}.oa_{name}" df.write.mode("overwrite").insertInto(table_name) print(f" Ingested {row_count:,} rows into {table_name}") return row_count
[docs] def ingest_all( self, urls: Optional[Dict[str, str]] = None, drop_existing: bool = True, ) -> Dict[str, int]: """ Download and ingest all OurAirports datasets. Args: urls: Override URLs for each dataset. drop_existing: If True, drop and recreate tables first. Returns: Dictionary mapping dataset names to row counts. Example: >>> ingestion = OurAirportsIngestion(spark, config) >>> stats = ingestion.ingest_all() >>> print(stats) {'airports': 76543, 'runways': 45678, ...} """ urls = urls or DEFAULT_URLS self.create_tables(drop_existing=drop_existing) stats = {} for name in SCHEMAS: url = urls.get(name, DEFAULT_URLS[name]) stats[name] = self.ingest_dataset(name, url) # Cleanup temp file if os.path.exists(self._temp_file): os.remove(self._temp_file) print("\nOurAirports ingestion complete.") print("-" * 40) for name, count in stats.items(): print(f" {name:<25} {count:>10,} rows") print("-" * 40) return stats