Source code for opdi.output.csv_exporter
"""
CSV export and data cleanup module.
Provides deduplication and export functionality for converting
parquet files to compressed CSV format.
"""
import os
import glob
import pandas as pd
[docs]
class CSVExporter:
"""
Exports parquet files to CSV.gz format with deduplication.
This class handles the final data cleanup step, removing duplicate
rows and exporting data in both parquet and compressed CSV formats.
"""
[docs]
def __init__(
self,
input_dir: str,
output_dir: str,
size_threshold_mb: int = 200,
):
"""
Initialize CSV exporter.
Args:
input_dir: Directory containing input parquet files
output_dir: Directory for output files
size_threshold_mb: Skip files larger than this size (MB)
"""
self.input_dir = input_dir
self.output_dir = output_dir
self.size_threshold_mb = size_threshold_mb
self.size_threshold_bytes = size_threshold_mb * 1024 * 1024
# Create output directory
os.makedirs(output_dir, exist_ok=True)
def _should_skip_file(self, base_filename: str) -> bool:
"""
Check if file should be skipped based on existing output file sizes.
Args:
base_filename: Base filename without extension
Returns:
True if both output files exist and are larger than threshold
"""
parquet_path = os.path.join(self.output_dir, f"{base_filename}.parquet")
csv_path = os.path.join(self.output_dir, f"{base_filename}.csv.gz")
if os.path.exists(parquet_path) and os.path.exists(csv_path):
parquet_size = os.path.getsize(parquet_path)
csv_size = os.path.getsize(csv_path)
if (parquet_size > self.size_threshold_bytes and
csv_size > self.size_threshold_bytes):
return True
return False
[docs]
def clean_and_export_file(self, input_file: str) -> tuple[int, int]:
"""
Clean and export a single parquet file.
Args:
input_file: Path to input parquet file
Returns:
Tuple of (original_rows, cleaned_rows)
"""
# Extract base filename
base_filename = os.path.basename(input_file).split(".")[0]
# Check if should skip
if self._should_skip_file(base_filename):
print(
f"Skipping {input_file} - cleaned files exist and are "
f"larger than {self.size_threshold_mb}MB"
)
return (0, 0)
print(f"Processing {input_file}...")
# Read parquet file
df = pd.read_parquet(input_file)
original_rows = len(df)
# Remove duplicates
df.drop_duplicates(inplace=True)
cleaned_rows = len(df)
duplicates_removed = original_rows - cleaned_rows
if duplicates_removed > 0:
print(f" Removed {duplicates_removed:,} duplicate rows")
# Output paths
parquet_output = os.path.join(self.output_dir, f"{base_filename}.parquet")
csv_output = os.path.join(self.output_dir, f"{base_filename}.csv.gz")
# Write outputs
df.to_parquet(parquet_output)
df.to_csv(csv_output, index=False, compression="gzip")
print(f" Exported: {cleaned_rows:,} rows")
print(f" → {parquet_output}")
print(f" → {csv_output}")
return (original_rows, cleaned_rows)
[docs]
def clean_and_export_all(self) -> dict:
"""
Clean and export all parquet files in input directory.
Returns:
Dictionary with keys ``files_processed``, ``files_skipped``,
``total_original_rows``, ``total_cleaned_rows``, and
``duplicates_removed``.
Example:
>>> exporter = CSVExporter(
... "data/measurements",
... "data/measurements_clean"
... )
>>> stats = exporter.clean_and_export_all()
>>> print(f"Processed {stats['files_processed']} files")
>>> print(f"Removed {stats['duplicates_removed']} duplicates")
"""
# Get all parquet files
parquet_files = glob.glob(os.path.join(self.input_dir, "*.parquet"))
if not parquet_files:
print(f"No parquet files found in {self.input_dir}")
return {
"files_processed": 0,
"files_skipped": 0,
"total_original_rows": 0,
"total_cleaned_rows": 0,
"duplicates_removed": 0,
}
print(f"Found {len(parquet_files)} parquet files")
print(f"Input directory: {self.input_dir}")
print(f"Output directory: {self.output_dir}")
print(f"Size threshold: {self.size_threshold_mb} MB\n")
files_processed = 0
files_skipped = 0
total_original = 0
total_cleaned = 0
for file in parquet_files:
original, cleaned = self.clean_and_export_file(file)
if original == 0 and cleaned == 0:
files_skipped += 1
else:
files_processed += 1
total_original += original
total_cleaned += cleaned
duplicates_removed = total_original - total_cleaned
# Print summary
print("\n" + "=" * 60)
print("CLEANUP SUMMARY")
print("=" * 60)
print(f"Files processed: {files_processed}")
print(f"Files skipped: {files_skipped}")
print(f"Total original rows: {total_original:,}")
print(f"Total cleaned rows: {total_cleaned:,}")
print(f"Duplicates removed: {duplicates_removed:,}")
print("=" * 60)
return {
"files_processed": files_processed,
"files_skipped": files_skipped,
"total_original_rows": total_original,
"total_cleaned_rows": total_cleaned,
"duplicates_removed": duplicates_removed,
}
[docs]
def clean_and_save_data(input_dir: str, output_dir: str, size_threshold_mb: int = 200) -> None:
"""
Standalone function to clean and export parquet files.
Args:
input_dir: Directory containing parquet files
output_dir: Directory for cleaned output
size_threshold_mb: Skip files larger than this (MB)
Example:
>>> from opdi.output.csv_exporter import clean_and_save_data
>>> clean_and_save_data("data/measurements", "data/measurements_clean")
"""
exporter = CSVExporter(input_dir, output_dir, size_threshold_mb)
exporter.clean_and_export_all()