File size: 4,821 Bytes
1822f54
 
 
 
 
 
3ce1088
 
1822f54
3ce1088
 
 
 
 
 
1822f54
3ce1088
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1822f54
3ce1088
 
 
 
 
 
1822f54
3ce1088
 
1822f54
3ce1088
 
 
 
1822f54
3ce1088
 
 
 
 
 
 
 
1822f54
3ce1088
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
import os
import uuid
import shutil
import pandas as pd
import polars as pl
import time
import logging
from typing import Optional, Tuple

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

def validate_csv_structure(df: pd.DataFrame) -> Tuple[bool, str]:
    """
    Validate the structure of the DataFrame.
    
    Args:
        df: DataFrame to validate
        
    Returns:
        Tuple[bool, str]: (is_valid, error_message)
    """
    # Check if DataFrame is empty
    if df.empty:
        return False, "CSV file is empty"
    
    # Check required columns
    required_columns = ['_id', 'text']
    missing_columns = [col for col in required_columns if col not in df.columns]
    if missing_columns:
        return False, f"Missing required columns: {', '.join(missing_columns)}"
    
    # Validate _id column
    if df['_id'].isna().any():
        return False, "Found empty _id values"
    
    # Validate text column
    if df['text'].isna().any():
        return False, "Found empty text values"
    
    # Check for duplicate _id values
    if df['_id'].duplicated().any():
        return False, "Found duplicate _id values"
    
    return True, ""

def getDataFrame(path: str) -> Optional[pl.DataFrame]:
    """
    Read and validate CSV file into a DataFrame.
    
    Args:
        path: Path to the CSV file
        
    Returns:
        Optional[pl.DataFrame]: The validated DataFrame or None if validation fails
    """
    try:
        # Read CSV with tab separator
        data = pd.read_csv(
            path,
            sep="\t",
            header=0,
            on_bad_lines='warn',
            encoding='utf-8'
        )
        
        # Validate structure
        is_valid, error_message = validate_csv_structure(data)
        if not is_valid:
            logger.error(error_message)
            return None
        
        # Clean text column
        data['text'] = data['text'].astype(str).str.strip()
        data = data[data['text'].str.len() > 0]
        
        if data.empty:
            logger.error("No valid text data found after cleaning")
            return None
        
        # Convert to Polars DataFrame
        pl_df = pl.from_pandas(data)
        logger.info(f"Successfully loaded {len(pl_df)} rows from CSV")
        
        return pl_df
        
    except pd.errors.EmptyDataError:
        logger.error("CSV file is empty")
        return None
    except pd.errors.ParserError as e:
        logger.error(f"Error parsing CSV file: {str(e)}")
        return None
    except Exception as e:
        logger.error(f"Unexpected error reading CSV: {str(e)}")
        return None

def save_to_csv(dataframe: pl.DataFrame) -> Optional[str]:
    """
    Save DataFrame to CSV file.
    
    Args:
        dataframe: Polars DataFrame to save
        
    Returns:
        Optional[str]: Path to saved file or None if save fails
    """
    try:
        if dataframe is None or dataframe.is_empty():
            logger.warning("No data to save")
            return None
        
        # Create data directory if it doesn't exist
        folder_path = "data"
        os.makedirs(folder_path, exist_ok=True)
        
        # Generate unique filename with timestamp
        timestamp = int(time.time())
        csv_path = f"{folder_path}/results_{timestamp}.csv"
        
        # Save to CSV with tab separator
        dataframe.write_csv(csv_path, separator="\t")
        logger.info(f"Results saved to {csv_path}")
        
        return csv_path
        
    except Exception as e:
        logger.error(f"Error saving results: {str(e)}")
        return None

def delete_folder_periodically(path: str, interval: int = 3600) -> None:
    """
    Periodically clean up the data folder.
    
    Args:
        path: Path to folder to clean
        interval: Interval between cleanups in seconds
    """
    while True:
        try:
            if os.path.exists(path):
                # Get current time
                current_time = time.time()
                
                # Check each file in the directory
                for filename in os.listdir(path):
                    file_path = os.path.join(path, filename)
                    if os.path.isfile(file_path):
                        # Check file age
                        file_age = current_time - os.path.getmtime(file_path)
                        if file_age > interval:
                            os.remove(file_path)
                            logger.info(f"Deleted old file: {file_path}")
            
            time.sleep(interval)
            
        except Exception as e:
            logger.error(f"Error in cleanup task: {str(e)}")
            time.sleep(interval)