"""
OpenSky Network state vectors ingestion module.
Downloads and processes state vector data from OpenSky Network's MinIO server
and writes to Iceberg tables with proper partitioning.
"""
import os
import subprocess
import time
from typing import List, Set, Optional, Dict
from datetime import date
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import col, to_date, from_unixtime
from opdi.config import OPDIConfig
[docs]
class StateVectorIngestion:
"""
Handles ingestion of OpenSky Network state vectors from MinIO storage.
This class manages the complete workflow of downloading state vector parquet files
from OpenSky's S3-compatible MinIO server, processing them, and writing to
Iceberg tables with daily partitioning.
"""
# Column name mapping from camelCase (OSN) to snake_case (OPDI standard)
COLUMN_MAPPING = {
"eventTime": "event_time",
"icao24": "icao24",
"lat": "lat",
"lon": "lon",
"velocity": "velocity",
"heading": "heading",
"vertRate": "vert_rate",
"callsign": "callsign",
"onGround": "on_ground",
"alert": "alert",
"spi": "spi",
"squawk": "squawk",
"baroAltitude": "baro_altitude",
"geoAltitude": "geo_altitude",
"lastPosUpdate": "last_pos_update",
"lastContact": "last_contact",
"serials": "serials",
}
[docs]
def __init__(
self,
spark: SparkSession,
config: OPDIConfig,
local_download_path: str = "OPDI_live/data/ec-datadump",
log_file_path: str = "OPDI_live/logs/01_osn_statevectors_etl.log",
):
"""
Initialize state vector ingestion.
Args:
spark: Active SparkSession
config: OPDI configuration object
local_download_path: Local directory for temporary file downloads
log_file_path: Path to file tracking processed files
"""
self.spark = spark
self.config = config
self.local_download_path = local_download_path
self.log_file_path = log_file_path
self.project = config.project.project_name
self.batch_size = config.ingestion.batch_size
# Ensure directories exist
os.makedirs(local_download_path, exist_ok=True)
os.makedirs(os.path.dirname(log_file_path), exist_ok=True)
def _execute_shell_command(self, command: str) -> tuple[str, str]:
"""
Execute a shell command and return stdout and stderr.
Args:
command: Shell command to execute
Returns:
Tuple of (stdout, stderr) as strings
"""
process = subprocess.Popen(
command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
stdout, stderr = process.communicate()
return stdout.decode().strip(), stderr.decode().strip()
[docs]
def setup_minio_client(self) -> bool:
"""
Set up MinIO client (mc) for accessing OpenSky Network data.
Requires OSN_USERNAME and OSN_KEY environment variables to be set.
Returns:
True if setup successful, False otherwise
Raises:
EnvironmentError: If OSN credentials are not set
"""
if "OSN_USERNAME" not in os.environ or "OSN_KEY" not in os.environ:
raise EnvironmentError(
"OSN_USERNAME and OSN_KEY environment variables must be set. "
"Obtain credentials from OpenSky Network."
)
print("Setting up MinIO client...")
self._execute_shell_command(
"curl -O https://dl.min.io/client/mc/release/linux-amd64/mc"
)
self._execute_shell_command("chmod +x mc")
stdout, stderr = self._execute_shell_command(
"./mc alias set opensky https://s3.opensky-network.org $OSN_USERNAME $OSN_KEY"
)
if stderr and "error" in stderr.lower():
print(f"Error setting up MinIO: {stderr}")
return False
print("MinIO client configured successfully.")
return True
[docs]
def list_available_files(self, year_filter: Optional[List[int]] = None) -> List[str]:
"""
List available state vector files on OpenSky MinIO server.
Args:
year_filter: List of years to filter (e.g., [2024, 2025]). If None, uses 2022-2026.
Returns:
List of file paths on MinIO server
"""
if year_filter is None:
year_filter = [2022, 2023, 2024, 2025, 2026]
print("Listing available files on OpenSky MinIO...")
stdout, stderr = self._execute_shell_command(
'./mc find opensky/ec-datadump/ --path "*/states_*.parquet"'
)
if stderr:
print(f"Warning while listing files: {stderr}")
files = stdout.split("\n")
# Filter by year
year_patterns = [f"{year}-" for year in year_filter]
filtered_files = [
file for file in files if any(pattern in file for pattern in year_patterns)
]
print(f"Found {len(filtered_files)} files matching year filter.")
return filtered_files
[docs]
def load_processed_files(self) -> Set[str]:
"""
Load the set of already processed files from log.
Returns:
Set of processed file names
"""
if os.path.exists(self.log_file_path):
with open(self.log_file_path, "r") as f:
return set(f.read().splitlines())
return set()
[docs]
def mark_files_processed(self, file_names: List[str]) -> None:
"""
Mark files as processed by appending to log file.
Args:
file_names: List of file names to mark as processed
"""
with open(self.log_file_path, "a") as f:
for file_name in file_names:
f.write(file_name + "\n")
[docs]
def remove_partial_files(self) -> None:
"""
Remove partially downloaded files (``*.parquet.part.minio``).
MinIO creates ``.part`` files during download that may remain if
download is interrupted. This cleans them up before processing.
"""
try:
files = os.listdir(self.local_download_path)
except FileNotFoundError:
return
for filename in files:
if filename.endswith(".parquet.part.minio"):
file_path = os.path.join(self.local_download_path, filename)
os.remove(file_path)
print(f"Removed partial file: {filename}")
[docs]
def download_files(self, file_paths: List[str]) -> List[str]:
"""
Download files from MinIO to local storage.
Args:
file_paths: List of full MinIO file paths to download
Returns:
List of successfully downloaded file names
"""
downloaded_files = []
processed_files = self.load_processed_files()
for file_path in file_paths:
file_name = file_path.split("/")[-1]
if file_name in processed_files:
continue
local_file_path = os.path.join(self.local_download_path, file_name)
cp_command = f'./mc cp "{file_path}" {local_file_path}'
out, err = self._execute_shell_command(cp_command)
if err:
print(f"Error downloading {file_name}: {err}")
else:
downloaded_files.append(file_name)
return downloaded_files
[docs]
def process_and_write_batch(self, file_names: List[str]) -> None:
"""
Process downloaded files and write to Iceberg table.
Args:
file_names: List of file names to process (must be in local_download_path)
"""
if not file_names:
return
# Read all files in the local folder
df = self.spark.read.option("mergeSchema", "true").parquet(self.local_download_path)
# Rename columns from camelCase to snake_case
for camel_case, snake_case in self.COLUMN_MAPPING.items():
df = df.withColumnRenamed(camel_case, snake_case)
# Handle legacy 'time' column
if "time" in df.columns:
df = df.withColumnRenamed("time", "event_time")
# Convert Unix timestamp to Spark timestamp
df = df.withColumn("event_time", from_unixtime(col("event_time")).cast("timestamp"))
# Add partition column
df_with_partition = df.withColumn("event_time_day", to_date(col("event_time")))
# Repartition for efficient write
df_partitioned = df_with_partition.repartition("event_time_day").orderBy(
"event_time_day"
)
# Drop partition column (will be added automatically by Iceberg)
df_cleaned = df_partitioned.drop("event_time_day")
# Write to Iceberg table
table_name = f"`{self.project}`.`osn_statevectors_v2`"
df_cleaned.writeTo(table_name).append()
print(f"Written {df_cleaned.count()} records to {table_name}")
[docs]
def cleanup_local_files(self, file_names: List[str]) -> None:
"""
Delete local files after successful processing to save disk space.
Args:
file_names: List of file names to delete
"""
for file_name in file_names:
local_file_path = os.path.join(self.local_download_path, file_name)
if os.path.exists(local_file_path):
os.remove(local_file_path)
[docs]
def ingest(
self,
year_filter: Optional[List[int]] = None,
dry_run: bool = False,
) -> Dict[str, int]:
"""
Run the complete ingestion workflow.
Downloads state vectors in batches, processes them, and writes to Iceberg.
Args:
year_filter: List of years to process (e.g., [2024, 2025])
dry_run: If True, only list files without downloading/processing
Returns:
Dictionary with statistics: {'files_processed': N, 'files_skipped': M}
Example:
>>> from opdi.ingestion import StateVectorIngestion
>>> from opdi.utils.spark_helpers import get_spark
>>> from opdi.config import OPDIConfig
>>>
>>> config = OPDIConfig.for_environment("live")
>>> spark = get_spark("live", "State Vector Ingestion")
>>> ingestion = StateVectorIngestion(spark, config)
>>> stats = ingestion.ingest(year_filter=[2024])
"""
# Setup MinIO client
if not self.setup_minio_client():
raise RuntimeError("Failed to set up MinIO client")
# List available files
files_to_download = self.list_available_files(year_filter)
processed_files = self.load_processed_files()
# Filter out already processed files
pending_files = [
f for f in files_to_download if f.split("/")[-1] not in processed_files
]
print(f"Total files: {len(files_to_download)}")
print(f"Already processed: {len(files_to_download) - len(pending_files)}")
print(f"To process: {len(pending_files)}")
if dry_run:
print("Dry run - no files will be downloaded.")
return {"files_processed": 0, "files_skipped": len(files_to_download)}
# Process in batches
files_processed = 0
for i in range(0, len(pending_files), self.batch_size):
batch_num = i // self.batch_size
total_batches = (len(pending_files) + self.batch_size - 1) // self.batch_size
print(f"\n=== Processing batch {batch_num + 1} of {total_batches} ===")
batch_files = pending_files[i : i + self.batch_size]
# Download batch
downloaded_files = self.download_files(batch_files)
if not downloaded_files:
continue
# Clean up partial downloads
time.sleep(1) # Brief pause for file system consistency
self.remove_partial_files()
# Process and write to Iceberg
try:
self.process_and_write_batch(downloaded_files)
# Clean up local files
self.cleanup_local_files(downloaded_files)
# Mark as processed
self.mark_files_processed(downloaded_files)
files_processed += len(downloaded_files)
print(f"Batch complete. Processed {len(downloaded_files)} files.")
except Exception as e:
print(f"Error processing batch: {e}")
# Files remain in local folder and won't be marked as processed
# Can be retried on next run
raise
print(f"\n=== Ingestion complete ===")
print(f"Files processed: {files_processed}")
return {
"files_processed": files_processed,
"files_skipped": len(files_to_download) - len(pending_files),
}
[docs]
def create_table_if_not_exists(self) -> None:
"""
Create the osn_statevectors_v2 Iceberg table if it doesn't exist.
This should be run once before first ingestion.
"""
from datetime import date
today = date.today().strftime("%d %B %Y")
create_table_sql = f"""
CREATE TABLE IF NOT EXISTS `{self.project}`.`osn_statevectors_v2` (
event_time TIMESTAMP COMMENT 'Timestamp for which the state vector was valid.',
icao24 STRING COMMENT '24-bit ICAO transponder ID for tracking airframes.',
lat DOUBLE COMMENT 'Last known latitude of the aircraft.',
lon DOUBLE COMMENT 'Last known longitude of the aircraft.',
velocity DOUBLE COMMENT 'Speed over ground in meters per second.',
heading DOUBLE COMMENT 'Direction of movement (track angle) from geographic north.',
vert_rate DOUBLE COMMENT 'Vertical speed in meters per second.',
callsign STRING COMMENT 'Callsign broadcast by the aircraft.',
on_ground BOOLEAN COMMENT 'Surface positions (true) or airborne (false).',
alert BOOLEAN COMMENT 'Special ATC indicator.',
spi BOOLEAN COMMENT 'Special ATC indicator.',
squawk STRING COMMENT '4-digit transponder code for ATC identification.',
baro_altitude DOUBLE COMMENT 'Altitude measured by barometer (meters).',
geo_altitude DOUBLE COMMENT 'Altitude from GNSS/GPS sensor (meters).',
last_pos_update DOUBLE COMMENT 'Unix timestamp of position age.',
last_contact DOUBLE COMMENT 'Unix timestamp of last signal received.',
serials ARRAY<INT> COMMENT 'List of ADS-B receiver serials.'
)
USING iceberg
PARTITIONED BY (days(event_time))
COMMENT 'OpenSky Network state vectors. Last updated: {today}.'
"""
self.spark.sql(create_table_sql)
print(f"Table {self.project}.osn_statevectors_v2 created/verified.")