entropy25's picture
Update app.py
457c1e5 verified
raw
history blame
61.5 kB
import torch
import gradio as gr
from transformers import AutoTokenizer, AutoModelForSequenceClassification
import plotly.graph_objects as go
import plotly.express as px
from plotly.subplots import make_subplots
import numpy as np
from wordcloud import WordCloud
from collections import Counter, defaultdict, OrderedDict
import re
import json
import csv
import io
import tempfile
from datetime import datetime
import logging
from functools import lru_cache, wraps
from dataclasses import dataclass
from typing import List, Dict, Optional, Tuple, Any, Callable
from contextlib import contextmanager
import nltk
from nltk.corpus import stopwords
import langdetect
import pandas as pd
import gc
import threading
import asyncio
from concurrent.futures import ThreadPoolExecutor
import time
# Advanced analysis imports
import shap
import lime
from lime.lime_text import LimeTextExplainer
# Configuration
@dataclass
class Config:
MAX_HISTORY_SIZE: int = 1000
BATCH_SIZE_LIMIT: int = 50
MAX_TEXT_LENGTH: int = 512
MIN_WORD_LENGTH: int = 2
CACHE_SIZE: int = 128
BATCH_PROCESSING_SIZE: int = 8
MODEL_CACHE_SIZE: int = 2 # Maximum models to keep in memory
# Supported languages and models
SUPPORTED_LANGUAGES = {
'auto': 'Auto Detect',
'en': 'English',
'zh': 'Chinese',
'es': 'Spanish',
'fr': 'French',
'de': 'German',
'sv': 'Swedish'
}
MODELS = {
'en': "cardiffnlp/twitter-roberta-base-sentiment-latest",
'multilingual': "cardiffnlp/twitter-xlm-roberta-base-sentiment",
'zh': "uer/roberta-base-finetuned-dianping-chinese"
}
# Color themes for Plotly
THEMES = {
'default': {'pos': '#4CAF50', 'neg': '#F44336', 'neu': '#FF9800'},
'ocean': {'pos': '#0077BE', 'neg': '#FF6B35', 'neu': '#00BCD4'},
'dark': {'pos': '#66BB6A', 'neg': '#EF5350', 'neu': '#FFA726'},
'rainbow': {'pos': '#9C27B0', 'neg': '#E91E63', 'neu': '#FF5722'}
}
config = Config()
# Logging setup
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Initialize NLTK
try:
nltk.download('stopwords', quiet=True)
nltk.download('punkt', quiet=True)
STOP_WORDS = set(stopwords.words('english'))
except:
STOP_WORDS = {'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'by'}
# Decorators and Context Managers
def handle_errors(default_return=None):
"""Centralized error handling decorator"""
def decorator(func: Callable) -> Callable:
@wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as e:
logger.error(f"{func.__name__} failed: {e}")
return default_return if default_return is not None else f"Error: {str(e)}"
return wrapper
return decorator
@contextmanager
def memory_cleanup():
"""Context manager for memory cleanup"""
try:
yield
finally:
gc.collect()
if torch.cuda.is_available():
torch.cuda.empty_cache()
class ThemeContext:
"""Theme management context"""
def __init__(self, theme: str = 'default'):
self.theme = theme
self.colors = config.THEMES.get(theme, config.THEMES['default'])
class LRUModelCache:
"""LRU Cache for models with memory management"""
def __init__(self, max_size: int = 2):
self.max_size = max_size
self.cache = OrderedDict()
self.lock = threading.Lock()
def get(self, key):
with self.lock:
if key in self.cache:
# Move to end (most recently used)
self.cache.move_to_end(key)
return self.cache[key]
return None
def put(self, key, value):
with self.lock:
if key in self.cache:
self.cache.move_to_end(key)
else:
if len(self.cache) >= self.max_size:
# Remove least recently used
oldest_key = next(iter(self.cache))
old_model, old_tokenizer = self.cache.pop(oldest_key)
# Force cleanup
del old_model, old_tokenizer
gc.collect()
if torch.cuda.is_available():
torch.cuda.empty_cache()
self.cache[key] = value
def clear(self):
with self.lock:
for model, tokenizer in self.cache.values():
del model, tokenizer
self.cache.clear()
gc.collect()
if torch.cuda.is_available():
torch.cuda.empty_cache()
# Enhanced Model Manager with Optimized Memory Management
class ModelManager:
"""Optimized multi-language model manager with LRU cache and lazy loading"""
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
if not self._initialized:
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
self.model_cache = LRUModelCache(config.MODEL_CACHE_SIZE)
self.loading_lock = threading.Lock()
self._initialized = True
logger.info(f"ModelManager initialized on device: {self.device}")
def _load_model(self, model_name: str, cache_key: str):
"""Load model with memory optimization"""
try:
logger.info(f"Loading model: {model_name}")
# Load with memory optimization
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForSequenceClassification.from_pretrained(
model_name,
torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32,
device_map="auto" if torch.cuda.is_available() else None
)
if not torch.cuda.is_available():
model.to(self.device)
# Set to eval mode to save memory
model.eval()
# Cache the model
self.model_cache.put(cache_key, (model, tokenizer))
logger.info(f"Model {model_name} loaded and cached successfully")
return model, tokenizer
except Exception as e:
logger.error(f"Failed to load model {model_name}: {e}")
raise
def get_model(self, language='en'):
"""Get model for specific language with lazy loading and caching"""
# Determine cache key and model name
if language == 'zh':
cache_key = 'zh'
model_name = config.MODELS['zh']
else:
cache_key = 'multilingual'
model_name = config.MODELS['multilingual']
# Try to get from cache first
cached_model = self.model_cache.get(cache_key)
if cached_model is not None:
return cached_model
# Load model if not in cache (with thread safety)
with self.loading_lock:
# Double-check pattern
cached_model = self.model_cache.get(cache_key)
if cached_model is not None:
return cached_model
return self._load_model(model_name, cache_key)
@staticmethod
def detect_language(text: str) -> str:
"""Detect text language"""
try:
detected = langdetect.detect(text)
language_mapping = {
'zh-cn': 'zh',
'zh-tw': 'zh'
}
detected = language_mapping.get(detected, detected)
return detected if detected in config.SUPPORTED_LANGUAGES else 'en'
except:
return 'en'
# Simplified Text Processing
class TextProcessor:
"""Optimized text processing with multi-language support"""
@staticmethod
@lru_cache(maxsize=config.CACHE_SIZE)
def clean_text(text: str, remove_punctuation: bool = True, remove_numbers: bool = False) -> str:
"""Clean text with language awareness"""
text = text.strip()
# Don't clean Chinese text aggressively
if re.search(r'[\u4e00-\u9fff]', text):
return text
text = text.lower()
if remove_numbers:
text = re.sub(r'\d+', '', text)
if remove_punctuation:
text = re.sub(r'[^\w\s]', '', text)
words = text.split()
cleaned_words = [w for w in words if w not in STOP_WORDS and len(w) >= config.MIN_WORD_LENGTH]
return ' '.join(cleaned_words)
@staticmethod
def parse_batch_input(text: str) -> List[str]:
"""Parse batch input from textarea"""
lines = text.strip().split('\n')
return [line.strip() for line in lines if line.strip()]
# Enhanced History Manager
class HistoryManager:
"""Enhanced history management with filtering"""
def __init__(self):
self._history = []
def add(self, entry: Dict):
"""Add entry with timestamp"""
entry['timestamp'] = datetime.now().isoformat()
self._history.append(entry)
if len(self._history) > config.MAX_HISTORY_SIZE:
self._history = self._history[-config.MAX_HISTORY_SIZE:]
def add_batch(self, entries: List[Dict]):
"""Add multiple entries"""
for entry in entries:
self.add(entry)
def get_all(self) -> List[Dict]:
return self._history.copy()
def get_recent(self, n: int = 10) -> List[Dict]:
return self._history[-n:] if self._history else []
def filter_by(self, sentiment: str = None, language: str = None,
min_confidence: float = None) -> List[Dict]:
"""Filter history by criteria"""
filtered = self._history
if sentiment:
filtered = [h for h in filtered if h['sentiment'] == sentiment]
if language:
filtered = [h for h in filtered if h.get('language', 'en') == language]
if min_confidence:
filtered = [h for h in filtered if h['confidence'] >= min_confidence]
return filtered
def clear(self) -> int:
count = len(self._history)
self._history.clear()
return count
def size(self) -> int:
return len(self._history)
def get_stats(self) -> Dict:
"""Get comprehensive statistics"""
if not self._history:
return {}
sentiments = [item['sentiment'] for item in self._history]
confidences = [item['confidence'] for item in self._history]
languages = [item.get('language', 'en') for item in self._history]
return {
'total_analyses': len(self._history),
'positive_count': sentiments.count('Positive'),
'negative_count': sentiments.count('Negative'),
'neutral_count': sentiments.count('Neutral'),
'avg_confidence': np.mean(confidences),
'max_confidence': np.max(confidences),
'min_confidence': np.min(confidences),
'languages_detected': len(set(languages)),
'most_common_language': Counter(languages).most_common(1)[0][0] if languages else 'en'
}
# Core Sentiment Analysis Engine with Performance Optimizations
class SentimentEngine:
"""Optimized multi-language sentiment analysis engine"""
def __init__(self):
self.model_manager = ModelManager()
self.executor = ThreadPoolExecutor(max_workers=4)
@handle_errors(default_return={'sentiment': 'Unknown', 'confidence': 0.0})
def analyze_single(self, text: str, language: str = 'auto', preprocessing_options: Dict = None) -> Dict:
"""Optimized single text analysis"""
if not text.strip():
raise ValueError("Empty text provided")
# Detect language
if language == 'auto':
detected_lang = self.model_manager.detect_language(text)
else:
detected_lang = language
# Get appropriate model
model, tokenizer = self.model_manager.get_model(detected_lang)
# Preprocessing
options = preprocessing_options or {}
processed_text = text
if options.get('clean_text', False) and not re.search(r'[\u4e00-\u9fff]', text):
processed_text = TextProcessor.clean_text(
text,
options.get('remove_punctuation', True),
options.get('remove_numbers', False)
)
# Tokenize and analyze with memory optimization
inputs = tokenizer(processed_text, return_tensors="pt", padding=True,
truncation=True, max_length=config.MAX_TEXT_LENGTH).to(self.model_manager.device)
# Use no_grad for inference to save memory
with torch.no_grad():
outputs = model(**inputs)
probs = torch.nn.functional.softmax(outputs.logits, dim=-1).cpu().numpy()[0]
# Clear GPU cache after inference
if torch.cuda.is_available():
torch.cuda.empty_cache()
# Handle different model outputs
if len(probs) == 3: # negative, neutral, positive
sentiment_idx = np.argmax(probs)
sentiment_labels = ['Negative', 'Neutral', 'Positive']
sentiment = sentiment_labels[sentiment_idx]
confidence = float(probs[sentiment_idx])
result = {
'sentiment': sentiment,
'confidence': confidence,
'neg_prob': float(probs[0]),
'neu_prob': float(probs[1]),
'pos_prob': float(probs[2]),
'has_neutral': True
}
else: # negative, positive
pred = np.argmax(probs)
sentiment = "Positive" if pred == 1 else "Negative"
confidence = float(probs[pred])
result = {
'sentiment': sentiment,
'confidence': confidence,
'neg_prob': float(probs[0]),
'pos_prob': float(probs[1]),
'neu_prob': 0.0,
'has_neutral': False
}
# Add metadata
result.update({
'language': detected_lang,
'word_count': len(text.split()),
'char_count': len(text)
})
return result
def _analyze_text_batch(self, text: str, language: str, preprocessing_options: Dict, index: int) -> Dict:
"""Single text analysis for batch processing"""
try:
result = self.analyze_single(text, language, preprocessing_options)
result['batch_index'] = index
result['text'] = text[:100] + '...' if len(text) > 100 else text
result['full_text'] = text
return result
except Exception as e:
return {
'sentiment': 'Error',
'confidence': 0.0,
'error': str(e),
'batch_index': index,
'text': text[:100] + '...' if len(text) > 100 else text,
'full_text': text
}
@handle_errors(default_return=[])
def analyze_batch(self, texts: List[str], language: str = 'auto',
preprocessing_options: Dict = None, progress_callback=None) -> List[Dict]:
"""Optimized parallel batch processing"""
if len(texts) > config.BATCH_SIZE_LIMIT:
texts = texts[:config.BATCH_SIZE_LIMIT]
if not texts:
return []
# Pre-load model to avoid race conditions
self.model_manager.get_model(language if language != 'auto' else 'en')
# Use ThreadPoolExecutor for parallel processing
with ThreadPoolExecutor(max_workers=min(4, len(texts))) as executor:
futures = []
for i, text in enumerate(texts):
future = executor.submit(
self._analyze_text_batch,
text, language, preprocessing_options, i
)
futures.append(future)
results = []
for i, future in enumerate(futures):
if progress_callback:
progress_callback((i + 1) / len(futures))
try:
result = future.result(timeout=30) # 30 second timeout per text
results.append(result)
except Exception as e:
results.append({
'sentiment': 'Error',
'confidence': 0.0,
'error': f"Timeout or error: {str(e)}",
'batch_index': i,
'text': texts[i][:100] + '...' if len(texts[i]) > 100 else texts[i],
'full_text': texts[i]
})
return results
class AdvancedAnalysisEngine:
"""Advanced analysis using SHAP and LIME with FIXED implementation"""
def __init__(self):
self.model_manager = ModelManager()
def create_prediction_function(self, model, tokenizer, device):
"""Create FIXED prediction function for SHAP/LIME"""
def predict_proba(texts):
# Ensure texts is a list
if isinstance(texts, str):
texts = [texts]
elif isinstance(texts, np.ndarray):
texts = texts.tolist()
# Convert all elements to strings
texts = [str(text) for text in texts]
results = []
batch_size = 16 # Process in smaller batches
for i in range(0, len(texts), batch_size):
batch_texts = texts[i:i + batch_size]
try:
with torch.no_grad():
# Tokenize batch
inputs = tokenizer(
batch_texts,
return_tensors="pt",
padding=True,
truncation=True,
max_length=config.MAX_TEXT_LENGTH
).to(device)
# Batch inference
outputs = model(**inputs)
probs = torch.nn.functional.softmax(outputs.logits, dim=-1).cpu().numpy()
results.extend(probs)
except Exception as e:
logger.error(f"Prediction batch failed: {e}")
# Return neutral predictions for failed batch
batch_size_actual = len(batch_texts)
if hasattr(model.config, 'num_labels') and model.config.num_labels == 3:
neutral_probs = np.array([[0.33, 0.34, 0.33]] * batch_size_actual)
else:
neutral_probs = np.array([[0.5, 0.5]] * batch_size_actual)
results.extend(neutral_probs)
return np.array(results)
return predict_proba
@handle_errors(default_return=("Analysis failed", None, None))
def analyze_with_shap(self, text: str, language: str = 'auto', num_samples: int = 100) -> Tuple[str, go.Figure, Dict]:
"""FIXED SHAP analysis implementation"""
if not text.strip():
return "Please enter text for analysis", None, {}
# Detect language and get model
if language == 'auto':
detected_lang = self.model_manager.detect_language(text)
else:
detected_lang = language
model, tokenizer = self.model_manager.get_model(detected_lang)
try:
# Create FIXED prediction function
predict_fn = self.create_prediction_function(model, tokenizer, self.model_manager.device)
# Test the prediction function first
test_pred = predict_fn([text])
if test_pred is None or len(test_pred) == 0:
return "Prediction function test failed", None, {}
# Use SHAP Text Explainer instead of generic Explainer
explainer = shap.Explainer(predict_fn, masker=shap.maskers.Text(tokenizer))
# Get SHAP values with proper text input
shap_values = explainer([text], max_evals=num_samples)
# Extract data safely
if hasattr(shap_values, 'data') and hasattr(shap_values, 'values'):
tokens = shap_values.data[0] if len(shap_values.data) > 0 else []
values = shap_values.values[0] if len(shap_values.values) > 0 else []
else:
return "SHAP values extraction failed", None, {}
if len(tokens) == 0 or len(values) == 0:
return "No tokens or values extracted from SHAP", None, {}
# Handle multi-dimensional values
if len(values.shape) > 1:
# Use positive class values (last column for 3-class, second for 2-class)
pos_values = values[:, -1] if values.shape[1] >= 2 else values[:, 0]
else:
pos_values = values
# Ensure we have matching lengths
min_len = min(len(tokens), len(pos_values))
tokens = tokens[:min_len]
pos_values = pos_values[:min_len]
# Create visualization
fig = go.Figure()
colors = ['red' if v < 0 else 'green' for v in pos_values]
fig.add_trace(go.Bar(
x=list(range(len(tokens))),
y=pos_values,
text=tokens,
textposition='outside',
marker_color=colors,
name='SHAP Values',
hovertemplate='<b>%{text}</b><br>SHAP Value: %{y:.4f}<extra></extra>'
))
fig.update_layout(
title=f"SHAP Analysis - Token Importance (Samples: {num_samples})",
xaxis_title="Token Index",
yaxis_title="SHAP Value",
height=500,
xaxis=dict(tickmode='array', tickvals=list(range(len(tokens))), ticktext=tokens)
)
# Create analysis summary
analysis_data = {
'method': 'SHAP',
'language': detected_lang,
'total_tokens': len(tokens),
'samples_used': num_samples,
'positive_influence': sum(1 for v in pos_values if v > 0),
'negative_influence': sum(1 for v in pos_values if v < 0),
'most_important_tokens': [(str(tokens[i]), float(pos_values[i]))
for i in np.argsort(np.abs(pos_values))[-5:]]
}
summary_text = f"""
**SHAP Analysis Results:**
- **Language:** {detected_lang.upper()}
- **Total Tokens:** {analysis_data['total_tokens']}
- **Samples Used:** {num_samples}
- **Positive Influence Tokens:** {analysis_data['positive_influence']}
- **Negative Influence Tokens:** {analysis_data['negative_influence']}
- **Most Important Tokens:** {', '.join([f"{token}({score:.3f})" for token, score in analysis_data['most_important_tokens']])}
- **Status:** SHAP analysis completed successfully
"""
return summary_text, fig, analysis_data
except Exception as e:
logger.error(f"SHAP analysis failed: {e}")
error_msg = f"""
**SHAP Analysis Failed:**
- **Error:** {str(e)}
- **Language:** {detected_lang.upper()}
- **Suggestion:** Try with a shorter text or reduce number of samples
**Common fixes:**
- Reduce sample size to 50-100
- Use shorter input text (< 200 words)
- Check if model supports the text language
"""
return error_msg, None, {}
@handle_errors(default_return=("Analysis failed", None, None))
def analyze_with_lime(self, text: str, language: str = 'auto', num_samples: int = 100) -> Tuple[str, go.Figure, Dict]:
"""FIXED LIME analysis implementation - Bug Fix for mode parameter"""
if not text.strip():
return "Please enter text for analysis", None, {}
# Detect language and get model
if language == 'auto':
detected_lang = self.model_manager.detect_language(text)
else:
detected_lang = language
model, tokenizer = self.model_manager.get_model(detected_lang)
try:
# Create FIXED prediction function
predict_fn = self.create_prediction_function(model, tokenizer, self.model_manager.device)
# Test the prediction function first
test_pred = predict_fn([text])
if test_pred is None or len(test_pred) == 0:
return "Prediction function test failed", None, {}
# Determine class names based on model output
num_classes = test_pred.shape[1] if len(test_pred.shape) > 1 else 2
if num_classes == 3:
class_names = ['Negative', 'Neutral', 'Positive']
else:
class_names = ['Negative', 'Positive']
# Initialize LIME explainer - FIXED: Remove 'mode' parameter
explainer = LimeTextExplainer(class_names=class_names)
# Get LIME explanation
exp = explainer.explain_instance(
text,
predict_fn,
num_features=min(20, len(text.split())), # Limit features
num_samples=num_samples
)
# Extract feature importance
lime_data = exp.as_list()
if not lime_data:
return "No LIME features extracted", None, {}
# Create visualization
words = [item[0] for item in lime_data]
scores = [item[1] for item in lime_data]
fig = go.Figure()
colors = ['red' if s < 0 else 'green' for s in scores]
fig.add_trace(go.Bar(
y=words,
x=scores,
orientation='h',
marker_color=colors,
text=[f'{s:.3f}' for s in scores],
textposition='auto',
name='LIME Importance',
hovertemplate='<b>%{y}</b><br>Importance: %{x:.4f}<extra></extra>'
))
fig.update_layout(
title=f"LIME Analysis - Feature Importance (Samples: {num_samples})",
xaxis_title="Importance Score",
yaxis_title="Words/Phrases",
height=500
)
# Create analysis summary
analysis_data = {
'method': 'LIME',
'language': detected_lang,
'features_analyzed': len(lime_data),
'samples_used': num_samples,
'positive_features': sum(1 for _, score in lime_data if score > 0),
'negative_features': sum(1 for _, score in lime_data if score < 0),
'feature_importance': lime_data
}
summary_text = f"""
**LIME Analysis Results:**
- **Language:** {detected_lang.upper()}
- **Features Analyzed:** {analysis_data['features_analyzed']}
- **Classes:** {', '.join(class_names)}
- **Samples Used:** {num_samples}
- **Positive Features:** {analysis_data['positive_features']}
- **Negative Features:** {analysis_data['negative_features']}
- **Top Features:** {', '.join([f"{word}({score:.3f})" for word, score in lime_data[:5]])}
- **Status:** LIME analysis completed successfully
"""
return summary_text, fig, analysis_data
except Exception as e:
logger.error(f"LIME analysis failed: {e}")
error_msg = f"""
**LIME Analysis Failed:**
- **Error:** {str(e)}
- **Language:** {detected_lang.upper()}
- **Suggestion:** Try with a shorter text or reduce number of samples
**Bug Fix Applied:**
- ✅ Removed 'mode' parameter from LimeTextExplainer initialization
- ✅ This should resolve the "unexpected keyword argument 'mode'" error
**Common fixes:**
- Reduce sample size to 50-100
- Use shorter input text (< 200 words)
- Check if model supports the text language
"""
return error_msg, None, {}
# Optimized Plotly Visualization System
class PlotlyVisualizer:
"""Enhanced Plotly visualizations"""
@staticmethod
@handle_errors(default_return=None)
def create_sentiment_gauge(result: Dict, theme: ThemeContext) -> go.Figure:
"""Create animated sentiment gauge"""
colors = theme.colors
if result.get('has_neutral', False):
# Three-way gauge
fig = go.Figure(go.Indicator(
mode="gauge+number+delta",
value=result['pos_prob'] * 100,
domain={'x': [0, 1], 'y': [0, 1]},
title={'text': f"Sentiment: {result['sentiment']}"},
delta={'reference': 50},
gauge={
'axis': {'range': [None, 100]},
'bar': {'color': colors['pos'] if result['sentiment'] == 'Positive' else colors['neg']},
'steps': [
{'range': [0, 33], 'color': colors['neg']},
{'range': [33, 67], 'color': colors['neu']},
{'range': [67, 100], 'color': colors['pos']}
],
'threshold': {
'line': {'color': "red", 'width': 4},
'thickness': 0.75,
'value': 90
}
}
))
else:
# Two-way gauge
fig = go.Figure(go.Indicator(
mode="gauge+number",
value=result['confidence'] * 100,
domain={'x': [0, 1], 'y': [0, 1]},
title={'text': f"Confidence: {result['sentiment']}"},
gauge={
'axis': {'range': [None, 100]},
'bar': {'color': colors['pos'] if result['sentiment'] == 'Positive' else colors['neg']},
'steps': [
{'range': [0, 50], 'color': "lightgray"},
{'range': [50, 100], 'color': "gray"}
]
}
))
fig.update_layout(height=400, font={'size': 16})
return fig
@staticmethod
@handle_errors(default_return=None)
def create_probability_bars(result: Dict, theme: ThemeContext) -> go.Figure:
"""Create probability bar chart"""
colors = theme.colors
if result.get('has_neutral', False):
labels = ['Negative', 'Neutral', 'Positive']
values = [result['neg_prob'], result['neu_prob'], result['pos_prob']]
bar_colors = [colors['neg'], colors['neu'], colors['pos']]
else:
labels = ['Negative', 'Positive']
values = [result['neg_prob'], result['pos_prob']]
bar_colors = [colors['neg'], colors['pos']]
fig = go.Figure(data=[
go.Bar(x=labels, y=values, marker_color=bar_colors,
text=[f'{v:.3f}' for v in values], textposition='outside')
])
fig.update_layout(
title="Sentiment Probabilities",
yaxis_title="Probability",
height=400,
showlegend=False
)
return fig
@staticmethod
@handle_errors(default_return=None)
def create_batch_summary(results: List[Dict], theme: ThemeContext) -> go.Figure:
"""Create batch analysis summary"""
colors = theme.colors
# Count sentiments
sentiments = [r['sentiment'] for r in results if 'sentiment' in r and r['sentiment'] != 'Error']
sentiment_counts = Counter(sentiments)
# Create pie chart
fig = go.Figure(data=[go.Pie(
labels=list(sentiment_counts.keys()),
values=list(sentiment_counts.values()),
marker_colors=[colors.get(s.lower()[:3], '#999999') for s in sentiment_counts.keys()],
textinfo='label+percent',
hole=0.3
)])
fig.update_layout(
title=f"Batch Analysis Summary ({len(results)} texts)",
height=400
)
return fig
@staticmethod
@handle_errors(default_return=None)
def create_confidence_distribution(results: List[Dict]) -> go.Figure:
"""Create confidence distribution plot"""
confidences = [r['confidence'] for r in results if 'confidence' in r and r['sentiment'] != 'Error']
if not confidences:
return go.Figure()
fig = go.Figure(data=[go.Histogram(
x=confidences,
nbinsx=20,
marker_color='skyblue',
opacity=0.7
)])
fig.update_layout(
title="Confidence Distribution",
xaxis_title="Confidence Score",
yaxis_title="Frequency",
height=400
)
return fig
@staticmethod
@handle_errors(default_return=None)
def create_history_dashboard(history: List[Dict], theme: ThemeContext) -> go.Figure:
"""Create comprehensive history dashboard"""
if len(history) < 2:
return go.Figure()
# Create subplots
fig = make_subplots(
rows=2, cols=2,
subplot_titles=['Sentiment Timeline', 'Confidence Distribution',
'Language Distribution', 'Sentiment Summary'],
specs=[[{"secondary_y": False}, {"secondary_y": False}],
[{"type": "pie"}, {"type": "bar"}]]
)
# Extract data
indices = list(range(len(history)))
pos_probs = [item.get('pos_prob', 0) for item in history]
confidences = [item['confidence'] for item in history]
sentiments = [item['sentiment'] for item in history]
languages = [item.get('language', 'en') for item in history]
# Sentiment timeline
colors_map = {'Positive': theme.colors['pos'], 'Negative': theme.colors['neg'], 'Neutral': theme.colors['neu']}
colors = [colors_map.get(s, '#999999') for s in sentiments]
fig.add_trace(
go.Scatter(x=indices, y=pos_probs, mode='lines+markers',
marker=dict(color=colors, size=8),
name='Positive Probability'),
row=1, col=1
)
# Confidence distribution
fig.add_trace(
go.Histogram(x=confidences, nbinsx=10, name='Confidence'),
row=1, col=2
)
# Language distribution
lang_counts = Counter(languages)
fig.add_trace(
go.Pie(labels=list(lang_counts.keys()), values=list(lang_counts.values()),
name="Languages"),
row=2, col=1
)
# Sentiment summary
sent_counts = Counter(sentiments)
sent_colors = [colors_map.get(k, '#999999') for k in sent_counts.keys()]
fig.add_trace(
go.Bar(x=list(sent_counts.keys()), y=list(sent_counts.values()),
marker_color=sent_colors),
row=2, col=2
)
fig.update_layout(height=800, showlegend=False)
return fig
# Universal Data Handler
class DataHandler:
"""Enhanced data operations"""
@staticmethod
@handle_errors(default_return=(None, "Export failed"))
def export_data(data: List[Dict], format_type: str) -> Tuple[Optional[str], str]:
"""Export data with comprehensive information"""
if not data:
return None, "No data to export"
temp_file = tempfile.NamedTemporaryFile(mode='w', delete=False,
suffix=f'.{format_type}', encoding='utf-8')
if format_type == 'csv':
writer = csv.writer(temp_file)
writer.writerow(['Timestamp', 'Text', 'Sentiment', 'Confidence', 'Language',
'Pos_Prob', 'Neg_Prob', 'Neu_Prob', 'Word_Count'])
for entry in data:
writer.writerow([
entry.get('timestamp', ''),
entry.get('text', ''),
entry.get('sentiment', ''),
f"{entry.get('confidence', 0):.4f}",
entry.get('language', 'en'),
f"{entry.get('pos_prob', 0):.4f}",
f"{entry.get('neg_prob', 0):.4f}",
f"{entry.get('neu_prob', 0):.4f}",
entry.get('word_count', 0)
])
elif format_type == 'json':
json.dump(data, temp_file, indent=2, ensure_ascii=False)
temp_file.close()
return temp_file.name, f"Exported {len(data)} entries"
@staticmethod
@handle_errors(default_return="")
def process_file(file) -> str:
"""Process uploaded files"""
if not file:
return ""
content = file.read().decode('utf-8')
if file.name.endswith('.csv'):
csv_file = io.StringIO(content)
reader = csv.reader(csv_file)
try:
next(reader) # Skip header
texts = []
for row in reader:
if row and row[0].strip():
text = row[0].strip().strip('"')
if text:
texts.append(text)
return '\n'.join(texts)
except:
lines = content.strip().split('\n')[1:]
texts = []
for line in lines:
if line.strip():
text = line.strip().strip('"')
if text:
texts.append(text)
return '\n'.join(texts)
return content
class SentimentApp:
"""Optimized multilingual sentiment analysis application"""
def __init__(self):
self.engine = SentimentEngine()
self.advanced_engine = AdvancedAnalysisEngine()
self.history = HistoryManager()
self.data_handler = DataHandler()
# Multi-language examples
self.examples = [
# Auto Detect
["The film had its moments, but overall it felt a bit too long and lacked emotional depth. Some scenes were visually impressive, yet they failed to connect emotionally. By the end, I found myself disengaged and unsatisfied."],
# English
["I was completely blown away by the movie — the performances were raw and powerful, and the story stayed with me long after the credits rolled. Every scene felt purposeful, and the emotional arc was handled with incredible nuance. It's the kind of film that makes you reflect deeply on your own life."],
# Chinese
["这部电影节奏拖沓,剧情老套,完全没有让我产生任何共鸣,是一次失望的观影体验。演员的表演也显得做作,缺乏真实感。看到最后甚至有点不耐烦,整体表现乏善可陈。"],
# Spanish
["Una obra maestra del cine contemporáneo, con actuaciones sobresalientes, un guion bien escrito y una dirección impecable. Cada plano parecía cuidadosamente pensado, y la historia avanzaba con una intensidad emocional que mantenía al espectador cautivado. Definitivamente una película que vale la pena volver a ver."],
# French
["Je m'attendais à beaucoup mieux. Le scénario était confus, les dialogues ennuyeux, et je me suis presque endormi au milieu du film. Même la mise en scène, habituellement un point fort, manquait cruellement d'inspiration cette fois-ci."],
# German
["Der Film war ein emotionales Erlebnis mit großartigen Bildern, einem mitreißenden Soundtrack und einer Geschichte, die zum Nachdenken anregt. Besonders beeindruckend war die schauspielerische Leistung der Hauptdarsteller, die eine tiefe Menschlichkeit vermittelten. Es ist ein Film, der lange nachwirkt."],
# Swedish
["Filmen var en besvikelse – tråkig handling, överdrivet skådespeleri och ett slut som inte gav något avslut alls. Den kändes forcerad och saknade en tydlig röd tråd. Jag gick från biografen med en känsla av tomhet och frustration."]
]
@handle_errors(default_return=("Please enter text", None, None))
def analyze_single(self, text: str, language: str, theme: str, clean_text: bool,
remove_punct: bool, remove_nums: bool):
"""Optimized single text analysis"""
if not text.strip():
return "Please enter text", None, None
# Map display names to language codes
language_map = {v: k for k, v in config.SUPPORTED_LANGUAGES.items()}
language_code = language_map.get(language, 'auto')
preprocessing_options = {
'clean_text': clean_text,
'remove_punctuation': remove_punct,
'remove_numbers': remove_nums
}
with memory_cleanup():
result = self.engine.analyze_single(text, language_code, preprocessing_options)
# Add to history
history_entry = {
'text': text[:100] + '...' if len(text) > 100 else text,
'full_text': text,
'sentiment': result['sentiment'],
'confidence': result['confidence'],
'pos_prob': result.get('pos_prob', 0),
'neg_prob': result.get('neg_prob', 0),
'neu_prob': result.get('neu_prob', 0),
'language': result['language'],
'word_count': result['word_count'],
'analysis_type': 'single'
}
self.history.add(history_entry)
# Create visualizations
theme_ctx = ThemeContext(theme)
gauge_fig = PlotlyVisualizer.create_sentiment_gauge(result, theme_ctx)
bars_fig = PlotlyVisualizer.create_probability_bars(result, theme_ctx)
# Create comprehensive result text
info_text = f"""
**Analysis Results:**
- **Sentiment:** {result['sentiment']} ({result['confidence']:.3f} confidence)
- **Language:** {result['language'].upper()}
- **Statistics:** {result['word_count']} words, {result['char_count']} characters
- **Probabilities:** Positive: {result.get('pos_prob', 0):.3f}, Negative: {result.get('neg_prob', 0):.3f}, Neutral: {result.get('neu_prob', 0):.3f}
"""
return info_text, gauge_fig, bars_fig
@handle_errors(default_return=("Please enter texts", None, None, None))
def analyze_batch(self, batch_text: str, language: str, theme: str,
clean_text: bool, remove_punct: bool, remove_nums: bool):
"""Enhanced batch analysis with parallel processing"""
if not batch_text.strip():
return "Please enter texts (one per line)", None, None, None
# Parse batch input
texts = TextProcessor.parse_batch_input(batch_text)
if len(texts) > config.BATCH_SIZE_LIMIT:
return f"Too many texts. Maximum {config.BATCH_SIZE_LIMIT} allowed.", None, None, None
if not texts:
return "No valid texts found", None, None, None
# Map display names to language codes
language_map = {v: k for k, v in config.SUPPORTED_LANGUAGES.items()}
language_code = language_map.get(language, 'auto')
preprocessing_options = {
'clean_text': clean_text,
'remove_punctuation': remove_punct,
'remove_numbers': remove_nums
}
with memory_cleanup():
results = self.engine.analyze_batch(texts, language_code, preprocessing_options)
# Add to history
batch_entries = []
for result in results:
if 'error' not in result:
entry = {
'text': result['text'],
'full_text': result['full_text'],
'sentiment': result['sentiment'],
'confidence': result['confidence'],
'pos_prob': result.get('pos_prob', 0),
'neg_prob': result.get('neg_prob', 0),
'neu_prob': result.get('neu_prob', 0),
'language': result['language'],
'word_count': result['word_count'],
'analysis_type': 'batch',
'batch_index': result['batch_index']
}
batch_entries.append(entry)
self.history.add_batch(batch_entries)
# Create visualizations
theme_ctx = ThemeContext(theme)
summary_fig = PlotlyVisualizer.create_batch_summary(results, theme_ctx)
confidence_fig = PlotlyVisualizer.create_confidence_distribution(results)
# Create results DataFrame
df_data = []
for result in results:
if 'error' in result:
df_data.append({
'Index': result['batch_index'] + 1,
'Text': result['text'],
'Sentiment': 'Error',
'Confidence': 0.0,
'Language': 'Unknown',
'Error': result['error']
})
else:
df_data.append({
'Index': result['batch_index'] + 1,
'Text': result['text'],
'Sentiment': result['sentiment'],
'Confidence': f"{result['confidence']:.3f}",
'Language': result['language'].upper(),
'Word_Count': result.get('word_count', 0)
})
df = pd.DataFrame(df_data)
# Create summary text
successful_results = [r for r in results if 'error' not in r]
error_count = len(results) - len(successful_results)
if successful_results:
sentiment_counts = Counter([r['sentiment'] for r in successful_results])
avg_confidence = np.mean([r['confidence'] for r in successful_results])
languages = Counter([r['language'] for r in successful_results])
summary_text = f"""
**Batch Analysis Summary:**
- **Total Texts:** {len(texts)}
- **Successful:** {len(successful_results)}
- **Errors:** {error_count}
- **Average Confidence:** {avg_confidence:.3f}
- **Sentiments:** {dict(sentiment_counts)}
- **Languages Detected:** {dict(languages)}
"""
else:
summary_text = f"All {len(texts)} texts failed to analyze."
return summary_text, df, summary_fig, confidence_fig
# FIXED advanced analysis methods with sample size control
@handle_errors(default_return=("Please enter text", None))
def analyze_with_shap(self, text: str, language: str, num_samples: int = 100):
"""Perform FIXED SHAP analysis with configurable samples"""
language_map = {v: k for k, v in config.SUPPORTED_LANGUAGES.items()}
language_code = language_map.get(language, 'auto')
return self.advanced_engine.analyze_with_shap(text, language_code, num_samples)
@handle_errors(default_return=("Please enter text", None))
def analyze_with_lime(self, text: str, language: str, num_samples: int = 100):
"""Perform FIXED LIME analysis with configurable samples"""
language_map = {v: k for k, v in config.SUPPORTED_LANGUAGES.items()}
language_code = language_map.get(language, 'auto')
return self.advanced_engine.analyze_with_lime(text, language_code, num_samples)
@handle_errors(default_return=(None, "No history available"))
def plot_history(self, theme: str = 'default'):
"""Plot comprehensive history analysis"""
history = self.history.get_all()
if len(history) < 2:
return None, f"Need at least 2 analyses for trends. Current: {len(history)}"
theme_ctx = ThemeContext(theme)
with memory_cleanup():
fig = PlotlyVisualizer.create_history_dashboard(history, theme_ctx)
stats = self.history.get_stats()
stats_text = f"""
**History Statistics:**
- **Total Analyses:** {stats.get('total_analyses', 0)}
- **Positive:** {stats.get('positive_count', 0)}
- **Negative:** {stats.get('negative_count', 0)}
- **Neutral:** {stats.get('neutral_count', 0)}
- **Average Confidence:** {stats.get('avg_confidence', 0):.3f}
- **Languages:** {stats.get('languages_detected', 0)}
- **Most Common Language:** {stats.get('most_common_language', 'N/A').upper()}
"""
return fig, stats_text
@handle_errors(default_return=("No data available",))
def get_history_status(self):
"""Get current history status"""
stats = self.history.get_stats()
if not stats:
return "No analyses performed yet"
return f"""
**Current Status:**
- **Total Analyses:** {stats['total_analyses']}
- **Recent Sentiment Distribution:**
* Positive: {stats['positive_count']}
* Negative: {stats['negative_count']}
* Neutral: {stats['neutral_count']}
- **Average Confidence:** {stats['avg_confidence']:.3f}
- **Languages Detected:** {stats['languages_detected']}
"""
# Optimized Gradio Interface
def create_interface():
"""Create comprehensive Gradio interface with optimizations"""
app = SentimentApp()
with gr.Blocks(theme=gr.themes.Soft(), title="Multilingual Sentiment Analyzer") as demo:
gr.Markdown("# 🌍 Multilingual Sentiment Analyzer")
gr.Markdown("AI-powered sentiment analysis with SHAP & LIME explainable AI features")
with gr.Tab("Single Analysis"):
with gr.Row():
with gr.Column():
text_input = gr.Textbox(
label="Enter Text for Analysis",
placeholder="Enter your text in any supported language...",
lines=5
)
with gr.Row():
language_selector = gr.Dropdown(
choices=list(config.SUPPORTED_LANGUAGES.values()),
value="Auto Detect",
label="Language"
)
theme_selector = gr.Dropdown(
choices=list(config.THEMES.keys()),
value="default",
label="Theme"
)
with gr.Row():
clean_text_cb = gr.Checkbox(label="Clean Text", value=False)
remove_punct_cb = gr.Checkbox(label="Remove Punctuation", value=False)
remove_nums_cb = gr.Checkbox(label="Remove Numbers", value=False)
analyze_btn = gr.Button("Analyze", variant="primary", size="lg")
gr.Examples(
examples=app.examples,
inputs=text_input,
cache_examples=False
)
with gr.Column():
result_output = gr.Textbox(label="Analysis Results", lines=8)
with gr.Row():
gauge_plot = gr.Plot(label="Sentiment Gauge")
probability_plot = gr.Plot(label="Probability Distribution")
# FIXED Advanced Analysis Tab
with gr.Tab("Advanced Analysis"):
gr.Markdown("## Explainable AI Analysis")
gr.Markdown("**SHAP and LIME analysis with FIXED implementation** - now handles text input correctly!")
with gr.Row():
with gr.Column():
advanced_text_input = gr.Textbox(
label="Enter Text for Advanced Analysis",
placeholder="Enter text to analyze with SHAP and LIME...",
lines=6,
value="This movie is absolutely fantastic and amazing!"
)
with gr.Row():
advanced_language = gr.Dropdown(
choices=list(config.SUPPORTED_LANGUAGES.values()),
value="Auto Detect",
label="Language"
)
num_samples_slider = gr.Slider(
minimum=50,
maximum=300,
value=100,
step=25,
label="Number of Samples",
info="Lower = Faster, Higher = More Accurate"
)
with gr.Row():
shap_btn = gr.Button("SHAP Analysis", variant="primary")
lime_btn = gr.Button("LIME Analysis", variant="secondary")
gr.Markdown("""
**📊 Analysis Methods:**
- **SHAP**: Token-level importance scores using Text masker
- **LIME**: Feature importance through text perturbation
**⚡ Expected Performance:**
- 50 samples: ~10-20s | 100 samples: ~20-40s | 200+ samples: ~40-80s
""")
with gr.Column():
advanced_results = gr.Textbox(label="Analysis Summary", lines=12)
with gr.Row():
advanced_plot = gr.Plot(label="Feature Importance Visualization")
with gr.Tab("Batch Analysis"):
with gr.Row():
with gr.Column():
file_upload = gr.File(
label="Upload File (CSV/TXT)",
file_types=[".csv", ".txt"]
)
batch_input = gr.Textbox(
label="Batch Input (one text per line)",
placeholder="Enter multiple texts, one per line...",
lines=10
)
with gr.Row():
batch_language = gr.Dropdown(
choices=list(config.SUPPORTED_LANGUAGES.values()),
value="Auto Detect",
label="Language"
)
batch_theme = gr.Dropdown(
choices=list(config.THEMES.keys()),
value="default",
label="Theme"
)
with gr.Row():
batch_clean_cb = gr.Checkbox(label="Clean Text", value=False)
batch_punct_cb = gr.Checkbox(label="Remove Punctuation", value=False)
batch_nums_cb = gr.Checkbox(label="Remove Numbers", value=False)
with gr.Row():
load_file_btn = gr.Button("Load File")
analyze_batch_btn = gr.Button("Analyze Batch", variant="primary")
with gr.Column():
batch_summary = gr.Textbox(label="Batch Summary", lines=8)
batch_results_df = gr.Dataframe(
label="Detailed Results",
headers=["Index", "Text", "Sentiment", "Confidence", "Language", "Word_Count"],
datatype=["number", "str", "str", "str", "str", "number"]
)
with gr.Row():
batch_plot = gr.Plot(label="Batch Analysis Summary")
confidence_dist_plot = gr.Plot(label="Confidence Distribution")
with gr.Tab("History & Analytics"):
with gr.Row():
with gr.Column():
with gr.Row():
refresh_history_btn = gr.Button("Refresh History")
clear_history_btn = gr.Button("Clear History", variant="stop")
status_btn = gr.Button("Get Status")
history_theme = gr.Dropdown(
choices=list(config.THEMES.keys()),
value="default",
label="Dashboard Theme"
)
with gr.Row():
export_csv_btn = gr.Button("Export CSV")
export_json_btn = gr.Button("Export JSON")
with gr.Column():
history_status = gr.Textbox(label="History Status", lines=8)
history_dashboard = gr.Plot(label="History Analytics Dashboard")
with gr.Row():
csv_download = gr.File(label="CSV Download", visible=True)
json_download = gr.File(label="JSON Download", visible=True)
# Event Handlers
# Single Analysis
analyze_btn.click(
app.analyze_single,
inputs=[text_input, language_selector, theme_selector,
clean_text_cb, remove_punct_cb, remove_nums_cb],
outputs=[result_output, gauge_plot, probability_plot]
)
# FIXED Advanced Analysis with sample size control
shap_btn.click(
app.analyze_with_shap,
inputs=[advanced_text_input, advanced_language, num_samples_slider],
outputs=[advanced_results, advanced_plot]
)
lime_btn.click(
app.analyze_with_lime,
inputs=[advanced_text_input, advanced_language, num_samples_slider],
outputs=[advanced_results, advanced_plot]
)
# Batch Analysis
load_file_btn.click(
app.data_handler.process_file,
inputs=file_upload,
outputs=batch_input
)
analyze_batch_btn.click(
app.analyze_batch,
inputs=[batch_input, batch_language, batch_theme,
batch_clean_cb, batch_punct_cb, batch_nums_cb],
outputs=[batch_summary, batch_results_df, batch_plot, confidence_dist_plot]
)
# History & Analytics
refresh_history_btn.click(
app.plot_history,
inputs=history_theme,
outputs=[history_dashboard, history_status]
)
clear_history_btn.click(
lambda: f"Cleared {app.history.clear()} entries",
outputs=history_status
)
status_btn.click(
app.get_history_status,
outputs=history_status
)
export_csv_btn.click(
lambda: app.data_handler.export_data(app.history.get_all(), 'csv'),
outputs=[csv_download, history_status]
)
export_json_btn.click(
lambda: app.data_handler.export_data(app.history.get_all(), 'json'),
outputs=[json_download, history_status]
)
return demo
# Application Entry Point
if __name__ == "__main__":
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
try:
demo = create_interface()
demo.launch(
share=True,
server_name="0.0.0.0",
server_port=7860,
show_error=True
)
except Exception as e:
logger.error(f"Failed to launch application: {e}")
raise