Source code for opdi.utils.spark_helpers

"""
Spark session management utilities for OPDI pipeline.

Provides centralized Spark session creation with consistent configuration
across all pipeline components.
"""

from typing import Dict, Optional
from pyspark.sql import SparkSession
from opdi.config import OPDIConfig, SparkConfig, ProjectConfig


[docs] class SparkSessionManager: """ Factory for creating and managing Spark sessions with OPDI configurations. This class centralizes all Spark configuration logic, eliminating duplication across pipeline scripts and enabling environment-specific settings. """
[docs] @staticmethod def create_session( app_name: str = "OPDI Pipeline", config: Optional[OPDIConfig] = None, env: str = "dev", extra_configs: Optional[Dict[str, str]] = None, ) -> SparkSession: """ Create a new Spark session with OPDI configuration. Args: app_name: Name for the Spark application config: OPDI configuration object. If None, creates config for specified env env: Environment name ("dev", "live", "local") - used if config is None extra_configs: Additional Spark configurations to override defaults Returns: Configured SparkSession with Hive support enabled Example: >>> from opdi.utils.spark_helpers import SparkSessionManager >>> spark = SparkSessionManager.create_session("Track Processing", env="live") >>> df = spark.table("project_opdi.osn_statevectors_v2") """ # Get configuration if config is None: config = OPDIConfig.for_environment(env) # Override app name in spark config config.spark.app_name = app_name # Get Spark configuration dictionary spark_configs = config.spark.to_spark_config(config.project) # Apply extra configs if provided if extra_configs: spark_configs.update(extra_configs) # Build Spark session builder = SparkSession.builder.appName(app_name) # Apply all configurations for key, value in spark_configs.items(): builder = builder.config(key, value) # Enable Hive support and create session spark = builder.enableHiveSupport().getOrCreate() return spark
[docs] @staticmethod def get_or_create( app_name: str = "OPDI Pipeline", config: Optional[OPDIConfig] = None, env: str = "dev", ) -> SparkSession: """ Get existing Spark session or create new one if none exists. This method is useful when you want to reuse an existing session within the same application context. Args: app_name: Name for the Spark application config: OPDI configuration object. If None, creates config for specified env env: Environment name ("dev", "live", "local") Returns: Active SparkSession (existing or newly created) Example: >>> spark = SparkSessionManager.get_or_create("My Analysis") """ try: # Try to get active session spark = SparkSession.getActiveSession() if spark is not None: return spark except Exception: pass # No active session, create new one return SparkSessionManager.create_session(app_name, config, env)
[docs] @staticmethod def stop_session(spark: SparkSession) -> None: """ Stop the given Spark session and clean up resources. Args: spark: SparkSession to stop Example: >>> spark = SparkSessionManager.create_session() >>> # ... do work ... >>> SparkSessionManager.stop_session(spark) """ if spark is not None: spark.stop()
[docs] @staticmethod def create_local_session( app_name: str = "OPDI Local", master: str = "local[*]", extra_configs: Optional[Dict[str, str]] = None, ) -> SparkSession: """ Create a lightweight Spark session for local testing. This creates a minimal Spark session without Iceberg, Hive, or Azure dependencies, suitable for unit testing and local development. Args: app_name: Name for the Spark application master: Spark master URL (default: "local[*]" uses all cores) extra_configs: Additional Spark configurations Returns: Local SparkSession for testing Example: >>> spark = SparkSessionManager.create_local_session("Unit Tests") >>> df = spark.createDataFrame([(1, "test")], ["id", "name"]) """ builder = SparkSession.builder.appName(app_name).master(master) # Minimal configuration for local testing default_configs = { "spark.driver.memory": "2G", "spark.executor.memory": "2G", "spark.sql.shuffle.partitions": "4", "spark.ui.showConsoleProgress": "false", } # Apply default configs for key, value in default_configs.items(): builder = builder.config(key, value) # Apply extra configs if provided if extra_configs: for key, value in extra_configs.items(): builder = builder.config(key, value) return builder.getOrCreate()
[docs] def get_spark(env: str = "dev", app_name: str = "OPDI Pipeline") -> SparkSession: """ Convenience function to get a Spark session with OPDI configuration. This is a shorthand for SparkSessionManager.create_session(). Args: env: Environment name ("dev", "live", "local") app_name: Name for the Spark application Returns: Configured SparkSession Example: >>> from opdi.utils.spark_helpers import get_spark >>> spark = get_spark("live", "Flight Events Processing") """ return SparkSessionManager.create_session(app_name=app_name, env=env)