Spaces:
Sleeping
Sleeping
import csv | |
import json | |
import io | |
import tempfile | |
import re | |
import numpy as np | |
from datetime import datetime | |
from functools import lru_cache | |
from collections import Counter | |
from typing import List, Dict, Optional, Tuple | |
import nltk | |
from nltk.corpus import stopwords | |
from config import config | |
from models import handle_errors | |
# Initialize NLTK | |
try: | |
nltk.download('stopwords', quiet=True) | |
nltk.download('punkt', quiet=True) | |
STOP_WORDS = set(stopwords.words('english')) | |
except: | |
STOP_WORDS = {'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'by'} | |
# Simplified Text Processing | |
class TextProcessor: | |
"""Optimized text processing with multi-language support""" | |
def clean_text(text: str, remove_punctuation: bool = True, remove_numbers: bool = False) -> str: | |
"""Clean text with language awareness""" | |
text = text.strip() | |
# Don't clean Chinese text aggressively | |
if re.search(r'[\u4e00-\u9fff]', text): | |
return text | |
text = text.lower() | |
if remove_numbers: | |
text = re.sub(r'\d+', '', text) | |
if remove_punctuation: | |
text = re.sub(r'[^\w\s]', '', text) | |
words = text.split() | |
cleaned_words = [w for w in words if w not in STOP_WORDS and len(w) >= config.MIN_WORD_LENGTH] | |
return ' '.join(cleaned_words) | |
def parse_batch_input(text: str) -> List[str]: | |
"""Parse batch input from textarea""" | |
lines = text.strip().split('\n') | |
return [line.strip() for line in lines if line.strip()] | |
# Enhanced History Manager | |
class HistoryManager: | |
"""Enhanced history management with filtering""" | |
def __init__(self): | |
self._history = [] | |
def add(self, entry: Dict): | |
"""Add entry with timestamp""" | |
entry['timestamp'] = datetime.now().isoformat() | |
self._history.append(entry) | |
if len(self._history) > config.MAX_HISTORY_SIZE: | |
self._history = self._history[-config.MAX_HISTORY_SIZE:] | |
def add_batch(self, entries: List[Dict]): | |
"""Add multiple entries""" | |
for entry in entries: | |
self.add(entry) | |
def get_all(self) -> List[Dict]: | |
return self._history.copy() | |
def get_recent(self, n: int = 10) -> List[Dict]: | |
return self._history[-n:] if self._history else [] | |
def filter_by(self, sentiment: str = None, language: str = None, | |
min_confidence: float = None) -> List[Dict]: | |
"""Filter history by criteria""" | |
filtered = self._history | |
if sentiment: | |
filtered = [h for h in filtered if h['sentiment'] == sentiment] | |
if language: | |
filtered = [h for h in filtered if h.get('language', 'en') == language] | |
if min_confidence: | |
filtered = [h for h in filtered if h['confidence'] >= min_confidence] | |
return filtered | |
def clear(self) -> int: | |
count = len(self._history) | |
self._history.clear() | |
return count | |
def size(self) -> int: | |
return len(self._history) | |
def get_stats(self) -> Dict: | |
"""Get comprehensive statistics""" | |
if not self._history: | |
return {} | |
sentiments = [item['sentiment'] for item in self._history] | |
confidences = [item['confidence'] for item in self._history] | |
languages = [item.get('language', 'en') for item in self._history] | |
return { | |
'total_analyses': len(self._history), | |
'positive_count': sentiments.count('Positive'), | |
'negative_count': sentiments.count('Negative'), | |
'neutral_count': sentiments.count('Neutral'), | |
'avg_confidence': np.mean(confidences), | |
'max_confidence': np.max(confidences), | |
'min_confidence': np.min(confidences), | |
'languages_detected': len(set(languages)), | |
'most_common_language': Counter(languages).most_common(1)[0][0] if languages else 'en' | |
} | |
# Universal Data Handler | |
class DataHandler: | |
"""Enhanced data operations""" | |
def export_data(data: List[Dict], format_type: str) -> Tuple[Optional[str], str]: | |
"""Export data with comprehensive information""" | |
if not data: | |
return None, "No data to export" | |
temp_file = tempfile.NamedTemporaryFile(mode='w', delete=False, | |
suffix=f'.{format_type}', encoding='utf-8') | |
if format_type == 'csv': | |
writer = csv.writer(temp_file) | |
writer.writerow(['Timestamp', 'Text', 'Sentiment', 'Confidence', 'Language', | |
'Pos_Prob', 'Neg_Prob', 'Neu_Prob', 'Word_Count']) | |
for entry in data: | |
writer.writerow([ | |
entry.get('timestamp', ''), | |
entry.get('text', ''), | |
entry.get('sentiment', ''), | |
f"{entry.get('confidence', 0):.4f}", | |
entry.get('language', 'en'), | |
f"{entry.get('pos_prob', 0):.4f}", | |
f"{entry.get('neg_prob', 0):.4f}", | |
f"{entry.get('neu_prob', 0):.4f}", | |
entry.get('word_count', 0) | |
]) | |
elif format_type == 'json': | |
json.dump(data, temp_file, indent=2, ensure_ascii=False) | |
temp_file.close() | |
return temp_file.name, f"Exported {len(data)} entries" | |
def process_file(file) -> str: | |
"""Process uploaded files""" | |
if not file: | |
return "" | |
content = file.read().decode('utf-8') | |
if file.name.endswith('.csv'): | |
csv_file = io.StringIO(content) | |
reader = csv.reader(csv_file) | |
try: | |
next(reader) # Skip header | |
texts = [] | |
for row in reader: | |
if row and row[0].strip(): | |
text = row[0].strip().strip('"') | |
if text: | |
texts.append(text) | |
return '\n'.join(texts) | |
except: | |
lines = content.strip().split('\n')[1:] | |
texts = [] | |
for line in lines: | |
if line.strip(): | |
text = line.strip().strip('"') | |
if text: | |
texts.append(text) | |
return '\n'.join(texts) | |
return content |