sentiment-multi / data_utils.py
entropy25's picture
Create data_utils.py
f823c9f verified
import csv
import json
import io
import tempfile
import re
import numpy as np
from datetime import datetime
from functools import lru_cache
from collections import Counter
from typing import List, Dict, Optional, Tuple
import nltk
from nltk.corpus import stopwords
from config import config
from models import handle_errors
# Initialize NLTK
try:
nltk.download('stopwords', quiet=True)
nltk.download('punkt', quiet=True)
STOP_WORDS = set(stopwords.words('english'))
except:
STOP_WORDS = {'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'by'}
# Simplified Text Processing
class TextProcessor:
"""Optimized text processing with multi-language support"""
@staticmethod
@lru_cache(maxsize=config.CACHE_SIZE)
def clean_text(text: str, remove_punctuation: bool = True, remove_numbers: bool = False) -> str:
"""Clean text with language awareness"""
text = text.strip()
# Don't clean Chinese text aggressively
if re.search(r'[\u4e00-\u9fff]', text):
return text
text = text.lower()
if remove_numbers:
text = re.sub(r'\d+', '', text)
if remove_punctuation:
text = re.sub(r'[^\w\s]', '', text)
words = text.split()
cleaned_words = [w for w in words if w not in STOP_WORDS and len(w) >= config.MIN_WORD_LENGTH]
return ' '.join(cleaned_words)
@staticmethod
def parse_batch_input(text: str) -> List[str]:
"""Parse batch input from textarea"""
lines = text.strip().split('\n')
return [line.strip() for line in lines if line.strip()]
# Enhanced History Manager
class HistoryManager:
"""Enhanced history management with filtering"""
def __init__(self):
self._history = []
def add(self, entry: Dict):
"""Add entry with timestamp"""
entry['timestamp'] = datetime.now().isoformat()
self._history.append(entry)
if len(self._history) > config.MAX_HISTORY_SIZE:
self._history = self._history[-config.MAX_HISTORY_SIZE:]
def add_batch(self, entries: List[Dict]):
"""Add multiple entries"""
for entry in entries:
self.add(entry)
def get_all(self) -> List[Dict]:
return self._history.copy()
def get_recent(self, n: int = 10) -> List[Dict]:
return self._history[-n:] if self._history else []
def filter_by(self, sentiment: str = None, language: str = None,
min_confidence: float = None) -> List[Dict]:
"""Filter history by criteria"""
filtered = self._history
if sentiment:
filtered = [h for h in filtered if h['sentiment'] == sentiment]
if language:
filtered = [h for h in filtered if h.get('language', 'en') == language]
if min_confidence:
filtered = [h for h in filtered if h['confidence'] >= min_confidence]
return filtered
def clear(self) -> int:
count = len(self._history)
self._history.clear()
return count
def size(self) -> int:
return len(self._history)
def get_stats(self) -> Dict:
"""Get comprehensive statistics"""
if not self._history:
return {}
sentiments = [item['sentiment'] for item in self._history]
confidences = [item['confidence'] for item in self._history]
languages = [item.get('language', 'en') for item in self._history]
return {
'total_analyses': len(self._history),
'positive_count': sentiments.count('Positive'),
'negative_count': sentiments.count('Negative'),
'neutral_count': sentiments.count('Neutral'),
'avg_confidence': np.mean(confidences),
'max_confidence': np.max(confidences),
'min_confidence': np.min(confidences),
'languages_detected': len(set(languages)),
'most_common_language': Counter(languages).most_common(1)[0][0] if languages else 'en'
}
# Universal Data Handler
class DataHandler:
"""Enhanced data operations"""
@staticmethod
@handle_errors(default_return=(None, "Export failed"))
def export_data(data: List[Dict], format_type: str) -> Tuple[Optional[str], str]:
"""Export data with comprehensive information"""
if not data:
return None, "No data to export"
temp_file = tempfile.NamedTemporaryFile(mode='w', delete=False,
suffix=f'.{format_type}', encoding='utf-8')
if format_type == 'csv':
writer = csv.writer(temp_file)
writer.writerow(['Timestamp', 'Text', 'Sentiment', 'Confidence', 'Language',
'Pos_Prob', 'Neg_Prob', 'Neu_Prob', 'Word_Count'])
for entry in data:
writer.writerow([
entry.get('timestamp', ''),
entry.get('text', ''),
entry.get('sentiment', ''),
f"{entry.get('confidence', 0):.4f}",
entry.get('language', 'en'),
f"{entry.get('pos_prob', 0):.4f}",
f"{entry.get('neg_prob', 0):.4f}",
f"{entry.get('neu_prob', 0):.4f}",
entry.get('word_count', 0)
])
elif format_type == 'json':
json.dump(data, temp_file, indent=2, ensure_ascii=False)
temp_file.close()
return temp_file.name, f"Exported {len(data)} entries"
@staticmethod
@handle_errors(default_return="")
def process_file(file) -> str:
"""Process uploaded files"""
if not file:
return ""
content = file.read().decode('utf-8')
if file.name.endswith('.csv'):
csv_file = io.StringIO(content)
reader = csv.reader(csv_file)
try:
next(reader) # Skip header
texts = []
for row in reader:
if row and row[0].strip():
text = row[0].strip().strip('"')
if text:
texts.append(text)
return '\n'.join(texts)
except:
lines = content.strip().split('\n')[1:]
texts = []
for line in lines:
if line.strip():
text = line.strip().strip('"')
if text:
texts.append(text)
return '\n'.join(texts)
return content