File size: 4,821 Bytes
1822f54 3ce1088 1822f54 3ce1088 1822f54 3ce1088 1822f54 3ce1088 1822f54 3ce1088 1822f54 3ce1088 1822f54 3ce1088 1822f54 3ce1088 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 |
import os
import uuid
import shutil
import pandas as pd
import polars as pl
import time
import logging
from typing import Optional, Tuple
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
def validate_csv_structure(df: pd.DataFrame) -> Tuple[bool, str]:
"""
Validate the structure of the DataFrame.
Args:
df: DataFrame to validate
Returns:
Tuple[bool, str]: (is_valid, error_message)
"""
# Check if DataFrame is empty
if df.empty:
return False, "CSV file is empty"
# Check required columns
required_columns = ['_id', 'text']
missing_columns = [col for col in required_columns if col not in df.columns]
if missing_columns:
return False, f"Missing required columns: {', '.join(missing_columns)}"
# Validate _id column
if df['_id'].isna().any():
return False, "Found empty _id values"
# Validate text column
if df['text'].isna().any():
return False, "Found empty text values"
# Check for duplicate _id values
if df['_id'].duplicated().any():
return False, "Found duplicate _id values"
return True, ""
def getDataFrame(path: str) -> Optional[pl.DataFrame]:
"""
Read and validate CSV file into a DataFrame.
Args:
path: Path to the CSV file
Returns:
Optional[pl.DataFrame]: The validated DataFrame or None if validation fails
"""
try:
# Read CSV with tab separator
data = pd.read_csv(
path,
sep="\t",
header=0,
on_bad_lines='warn',
encoding='utf-8'
)
# Validate structure
is_valid, error_message = validate_csv_structure(data)
if not is_valid:
logger.error(error_message)
return None
# Clean text column
data['text'] = data['text'].astype(str).str.strip()
data = data[data['text'].str.len() > 0]
if data.empty:
logger.error("No valid text data found after cleaning")
return None
# Convert to Polars DataFrame
pl_df = pl.from_pandas(data)
logger.info(f"Successfully loaded {len(pl_df)} rows from CSV")
return pl_df
except pd.errors.EmptyDataError:
logger.error("CSV file is empty")
return None
except pd.errors.ParserError as e:
logger.error(f"Error parsing CSV file: {str(e)}")
return None
except Exception as e:
logger.error(f"Unexpected error reading CSV: {str(e)}")
return None
def save_to_csv(dataframe: pl.DataFrame) -> Optional[str]:
"""
Save DataFrame to CSV file.
Args:
dataframe: Polars DataFrame to save
Returns:
Optional[str]: Path to saved file or None if save fails
"""
try:
if dataframe is None or dataframe.is_empty():
logger.warning("No data to save")
return None
# Create data directory if it doesn't exist
folder_path = "data"
os.makedirs(folder_path, exist_ok=True)
# Generate unique filename with timestamp
timestamp = int(time.time())
csv_path = f"{folder_path}/results_{timestamp}.csv"
# Save to CSV with tab separator
dataframe.write_csv(csv_path, separator="\t")
logger.info(f"Results saved to {csv_path}")
return csv_path
except Exception as e:
logger.error(f"Error saving results: {str(e)}")
return None
def delete_folder_periodically(path: str, interval: int = 3600) -> None:
"""
Periodically clean up the data folder.
Args:
path: Path to folder to clean
interval: Interval between cleanups in seconds
"""
while True:
try:
if os.path.exists(path):
# Get current time
current_time = time.time()
# Check each file in the directory
for filename in os.listdir(path):
file_path = os.path.join(path, filename)
if os.path.isfile(file_path):
# Check file age
file_age = current_time - os.path.getmtime(file_path)
if file_age > interval:
os.remove(file_path)
logger.info(f"Deleted old file: {file_path}")
time.sleep(interval)
except Exception as e:
logger.error(f"Error in cleanup task: {str(e)}")
time.sleep(interval) |