import os import uuid import shutil import pandas as pd import polars as pl import time import logging from typing import Optional, Tuple # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) def validate_csv_structure(df: pd.DataFrame) -> Tuple[bool, str]: """ Validate the structure of the DataFrame. Args: df: DataFrame to validate Returns: Tuple[bool, str]: (is_valid, error_message) """ # Check if DataFrame is empty if df.empty: return False, "CSV file is empty" # Check required columns required_columns = ['_id', 'text'] missing_columns = [col for col in required_columns if col not in df.columns] if missing_columns: return False, f"Missing required columns: {', '.join(missing_columns)}" # Validate _id column if df['_id'].isna().any(): return False, "Found empty _id values" # Validate text column if df['text'].isna().any(): return False, "Found empty text values" # Check for duplicate _id values if df['_id'].duplicated().any(): return False, "Found duplicate _id values" return True, "" def getDataFrame(path: str) -> Optional[pl.DataFrame]: """ Read and validate CSV file into a DataFrame. Args: path: Path to the CSV file Returns: Optional[pl.DataFrame]: The validated DataFrame or None if validation fails """ try: # Read CSV with tab separator data = pd.read_csv( path, sep="\t", header=0, on_bad_lines='warn', encoding='utf-8' ) # Validate structure is_valid, error_message = validate_csv_structure(data) if not is_valid: logger.error(error_message) return None # Clean text column data['text'] = data['text'].astype(str).str.strip() data = data[data['text'].str.len() > 0] if data.empty: logger.error("No valid text data found after cleaning") return None # Convert to Polars DataFrame pl_df = pl.from_pandas(data) logger.info(f"Successfully loaded {len(pl_df)} rows from CSV") return pl_df except pd.errors.EmptyDataError: logger.error("CSV file is empty") return None except pd.errors.ParserError as e: logger.error(f"Error parsing CSV file: {str(e)}") return None except Exception as e: logger.error(f"Unexpected error reading CSV: {str(e)}") return None def save_to_csv(dataframe: pl.DataFrame) -> Optional[str]: """ Save DataFrame to CSV file. Args: dataframe: Polars DataFrame to save Returns: Optional[str]: Path to saved file or None if save fails """ try: if dataframe is None or dataframe.is_empty(): logger.warning("No data to save") return None # Create data directory if it doesn't exist folder_path = "data" os.makedirs(folder_path, exist_ok=True) # Generate unique filename with timestamp timestamp = int(time.time()) csv_path = f"{folder_path}/results_{timestamp}.csv" # Save to CSV with tab separator dataframe.write_csv(csv_path, separator="\t") logger.info(f"Results saved to {csv_path}") return csv_path except Exception as e: logger.error(f"Error saving results: {str(e)}") return None def delete_folder_periodically(path: str, interval: int = 3600) -> None: """ Periodically clean up the data folder. Args: path: Path to folder to clean interval: Interval between cleanups in seconds """ while True: try: if os.path.exists(path): # Get current time current_time = time.time() # Check each file in the directory for filename in os.listdir(path): file_path = os.path.join(path, filename) if os.path.isfile(file_path): # Check file age file_age = current_time - os.path.getmtime(file_path) if file_age > interval: os.remove(file_path) logger.info(f"Deleted old file: {file_path}") time.sleep(interval) except Exception as e: logger.error(f"Error in cleanup task: {str(e)}") time.sleep(interval)