|
import os |
|
import uuid |
|
import shutil |
|
import pandas as pd |
|
import polars as pl |
|
import time |
|
import logging |
|
from typing import Optional, Tuple |
|
|
|
|
|
logging.basicConfig( |
|
level=logging.INFO, |
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' |
|
) |
|
logger = logging.getLogger(__name__) |
|
|
|
def validate_csv_structure(df: pd.DataFrame) -> Tuple[bool, str]: |
|
""" |
|
Validate the structure of the DataFrame. |
|
|
|
Args: |
|
df: DataFrame to validate |
|
|
|
Returns: |
|
Tuple[bool, str]: (is_valid, error_message) |
|
""" |
|
|
|
if df.empty: |
|
return False, "CSV file is empty" |
|
|
|
|
|
required_columns = ['_id', 'text'] |
|
missing_columns = [col for col in required_columns if col not in df.columns] |
|
if missing_columns: |
|
return False, f"Missing required columns: {', '.join(missing_columns)}" |
|
|
|
|
|
if df['_id'].isna().any(): |
|
return False, "Found empty _id values" |
|
|
|
|
|
if df['text'].isna().any(): |
|
return False, "Found empty text values" |
|
|
|
|
|
if df['_id'].duplicated().any(): |
|
return False, "Found duplicate _id values" |
|
|
|
return True, "" |
|
|
|
def getDataFrame(path: str) -> Optional[pl.DataFrame]: |
|
""" |
|
Read and validate CSV file into a DataFrame. |
|
|
|
Args: |
|
path: Path to the CSV file |
|
|
|
Returns: |
|
Optional[pl.DataFrame]: The validated DataFrame or None if validation fails |
|
""" |
|
try: |
|
|
|
data = pd.read_csv( |
|
path, |
|
sep="\t", |
|
header=0, |
|
on_bad_lines='warn', |
|
encoding='utf-8' |
|
) |
|
|
|
|
|
is_valid, error_message = validate_csv_structure(data) |
|
if not is_valid: |
|
logger.error(error_message) |
|
return None |
|
|
|
|
|
data['text'] = data['text'].astype(str).str.strip() |
|
data = data[data['text'].str.len() > 0] |
|
|
|
if data.empty: |
|
logger.error("No valid text data found after cleaning") |
|
return None |
|
|
|
|
|
pl_df = pl.from_pandas(data) |
|
logger.info(f"Successfully loaded {len(pl_df)} rows from CSV") |
|
|
|
return pl_df |
|
|
|
except pd.errors.EmptyDataError: |
|
logger.error("CSV file is empty") |
|
return None |
|
except pd.errors.ParserError as e: |
|
logger.error(f"Error parsing CSV file: {str(e)}") |
|
return None |
|
except Exception as e: |
|
logger.error(f"Unexpected error reading CSV: {str(e)}") |
|
return None |
|
|
|
def save_to_csv(dataframe: pl.DataFrame) -> Optional[str]: |
|
""" |
|
Save DataFrame to CSV file. |
|
|
|
Args: |
|
dataframe: Polars DataFrame to save |
|
|
|
Returns: |
|
Optional[str]: Path to saved file or None if save fails |
|
""" |
|
try: |
|
if dataframe is None or dataframe.is_empty(): |
|
logger.warning("No data to save") |
|
return None |
|
|
|
|
|
folder_path = "data" |
|
os.makedirs(folder_path, exist_ok=True) |
|
|
|
|
|
timestamp = int(time.time()) |
|
csv_path = f"{folder_path}/results_{timestamp}.csv" |
|
|
|
|
|
dataframe.write_csv(csv_path, separator="\t") |
|
logger.info(f"Results saved to {csv_path}") |
|
|
|
return csv_path |
|
|
|
except Exception as e: |
|
logger.error(f"Error saving results: {str(e)}") |
|
return None |
|
|
|
def delete_folder_periodically(path: str, interval: int = 3600) -> None: |
|
""" |
|
Periodically clean up the data folder. |
|
|
|
Args: |
|
path: Path to folder to clean |
|
interval: Interval between cleanups in seconds |
|
""" |
|
while True: |
|
try: |
|
if os.path.exists(path): |
|
|
|
current_time = time.time() |
|
|
|
|
|
for filename in os.listdir(path): |
|
file_path = os.path.join(path, filename) |
|
if os.path.isfile(file_path): |
|
|
|
file_age = current_time - os.path.getmtime(file_path) |
|
if file_age > interval: |
|
os.remove(file_path) |
|
logger.info(f"Deleted old file: {file_path}") |
|
|
|
time.sleep(interval) |
|
|
|
except Exception as e: |
|
logger.error(f"Error in cleanup task: {str(e)}") |
|
time.sleep(interval) |