|
import torch |
|
import gradio as gr |
|
from transformers import AutoTokenizer, AutoModelForSequenceClassification |
|
import plotly.graph_objects as go |
|
import plotly.express as px |
|
from plotly.subplots import make_subplots |
|
import numpy as np |
|
from wordcloud import WordCloud |
|
from collections import Counter, defaultdict, OrderedDict |
|
import re |
|
import json |
|
import csv |
|
import io |
|
import tempfile |
|
from datetime import datetime |
|
import logging |
|
from functools import lru_cache, wraps |
|
from dataclasses import dataclass |
|
from typing import List, Dict, Optional, Tuple, Any, Callable |
|
from contextlib import contextmanager |
|
import nltk |
|
from nltk.corpus import stopwords |
|
import langdetect |
|
import pandas as pd |
|
import gc |
|
import threading |
|
import asyncio |
|
from concurrent.futures import ThreadPoolExecutor |
|
import time |
|
|
|
|
|
import shap |
|
import lime |
|
from lime.lime_text import LimeTextExplainer |
|
|
|
|
|
@dataclass |
|
class Config: |
|
MAX_HISTORY_SIZE: int = 1000 |
|
BATCH_SIZE_LIMIT: int = 50 |
|
MAX_TEXT_LENGTH: int = 512 |
|
MIN_WORD_LENGTH: int = 2 |
|
CACHE_SIZE: int = 128 |
|
BATCH_PROCESSING_SIZE: int = 8 |
|
MODEL_CACHE_SIZE: int = 2 |
|
|
|
|
|
SUPPORTED_LANGUAGES = { |
|
'auto': 'Auto Detect', |
|
'en': 'English', |
|
'zh': 'Chinese', |
|
'es': 'Spanish', |
|
'fr': 'French', |
|
'de': 'German', |
|
'sv': 'Swedish' |
|
} |
|
|
|
MODELS = { |
|
'en': "cardiffnlp/twitter-roberta-base-sentiment-latest", |
|
'multilingual': "cardiffnlp/twitter-xlm-roberta-base-sentiment", |
|
'zh': "uer/roberta-base-finetuned-dianping-chinese" |
|
} |
|
|
|
|
|
THEMES = { |
|
'default': {'pos': '#4CAF50', 'neg': '#F44336', 'neu': '#FF9800'}, |
|
'ocean': {'pos': '#0077BE', 'neg': '#FF6B35', 'neu': '#00BCD4'}, |
|
'dark': {'pos': '#66BB6A', 'neg': '#EF5350', 'neu': '#FFA726'}, |
|
'rainbow': {'pos': '#9C27B0', 'neg': '#E91E63', 'neu': '#FF5722'} |
|
} |
|
|
|
config = Config() |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
try: |
|
nltk.download('stopwords', quiet=True) |
|
nltk.download('punkt', quiet=True) |
|
STOP_WORDS = set(stopwords.words('english')) |
|
except: |
|
STOP_WORDS = {'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'by'} |
|
|
|
|
|
def handle_errors(default_return=None): |
|
"""Centralized error handling decorator""" |
|
def decorator(func: Callable) -> Callable: |
|
@wraps(func) |
|
def wrapper(*args, **kwargs): |
|
try: |
|
return func(*args, **kwargs) |
|
except Exception as e: |
|
logger.error(f"{func.__name__} failed: {e}") |
|
return default_return if default_return is not None else f"Error: {str(e)}" |
|
return wrapper |
|
return decorator |
|
|
|
@contextmanager |
|
def memory_cleanup(): |
|
"""Context manager for memory cleanup""" |
|
try: |
|
yield |
|
finally: |
|
gc.collect() |
|
if torch.cuda.is_available(): |
|
torch.cuda.empty_cache() |
|
|
|
class ThemeContext: |
|
"""Theme management context""" |
|
def __init__(self, theme: str = 'default'): |
|
self.theme = theme |
|
self.colors = config.THEMES.get(theme, config.THEMES['default']) |
|
|
|
class LRUModelCache: |
|
"""LRU Cache for models with memory management""" |
|
def __init__(self, max_size: int = 2): |
|
self.max_size = max_size |
|
self.cache = OrderedDict() |
|
self.lock = threading.Lock() |
|
|
|
def get(self, key): |
|
with self.lock: |
|
if key in self.cache: |
|
|
|
self.cache.move_to_end(key) |
|
return self.cache[key] |
|
return None |
|
|
|
def put(self, key, value): |
|
with self.lock: |
|
if key in self.cache: |
|
self.cache.move_to_end(key) |
|
else: |
|
if len(self.cache) >= self.max_size: |
|
|
|
oldest_key = next(iter(self.cache)) |
|
old_model, old_tokenizer = self.cache.pop(oldest_key) |
|
|
|
del old_model, old_tokenizer |
|
gc.collect() |
|
if torch.cuda.is_available(): |
|
torch.cuda.empty_cache() |
|
|
|
self.cache[key] = value |
|
|
|
def clear(self): |
|
with self.lock: |
|
for model, tokenizer in self.cache.values(): |
|
del model, tokenizer |
|
self.cache.clear() |
|
gc.collect() |
|
if torch.cuda.is_available(): |
|
torch.cuda.empty_cache() |
|
|
|
|
|
class ModelManager: |
|
"""Optimized multi-language model manager with LRU cache and lazy loading""" |
|
_instance = None |
|
|
|
def __new__(cls): |
|
if cls._instance is None: |
|
cls._instance = super().__new__(cls) |
|
cls._instance._initialized = False |
|
return cls._instance |
|
|
|
def __init__(self): |
|
if not self._initialized: |
|
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
|
self.model_cache = LRUModelCache(config.MODEL_CACHE_SIZE) |
|
self.loading_lock = threading.Lock() |
|
self._initialized = True |
|
logger.info(f"ModelManager initialized on device: {self.device}") |
|
|
|
def _load_model(self, model_name: str, cache_key: str): |
|
"""Load model with memory optimization""" |
|
try: |
|
logger.info(f"Loading model: {model_name}") |
|
|
|
|
|
tokenizer = AutoTokenizer.from_pretrained(model_name) |
|
model = AutoModelForSequenceClassification.from_pretrained( |
|
model_name, |
|
torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32, |
|
device_map="auto" if torch.cuda.is_available() else None |
|
) |
|
|
|
if not torch.cuda.is_available(): |
|
model.to(self.device) |
|
|
|
|
|
model.eval() |
|
|
|
|
|
self.model_cache.put(cache_key, (model, tokenizer)) |
|
logger.info(f"Model {model_name} loaded and cached successfully") |
|
|
|
return model, tokenizer |
|
|
|
except Exception as e: |
|
logger.error(f"Failed to load model {model_name}: {e}") |
|
raise |
|
|
|
def get_model(self, language='en'): |
|
"""Get model for specific language with lazy loading and caching""" |
|
|
|
if language == 'zh': |
|
cache_key = 'zh' |
|
model_name = config.MODELS['zh'] |
|
else: |
|
cache_key = 'multilingual' |
|
model_name = config.MODELS['multilingual'] |
|
|
|
|
|
cached_model = self.model_cache.get(cache_key) |
|
if cached_model is not None: |
|
return cached_model |
|
|
|
|
|
with self.loading_lock: |
|
|
|
cached_model = self.model_cache.get(cache_key) |
|
if cached_model is not None: |
|
return cached_model |
|
|
|
return self._load_model(model_name, cache_key) |
|
|
|
@staticmethod |
|
def detect_language(text: str) -> str: |
|
"""Detect text language""" |
|
try: |
|
detected = langdetect.detect(text) |
|
language_mapping = { |
|
'zh-cn': 'zh', |
|
'zh-tw': 'zh' |
|
} |
|
detected = language_mapping.get(detected, detected) |
|
return detected if detected in config.SUPPORTED_LANGUAGES else 'en' |
|
except: |
|
return 'en' |
|
|
|
|
|
class TextProcessor: |
|
"""Optimized text processing with multi-language support""" |
|
|
|
@staticmethod |
|
@lru_cache(maxsize=config.CACHE_SIZE) |
|
def clean_text(text: str, remove_punctuation: bool = True, remove_numbers: bool = False) -> str: |
|
"""Clean text with language awareness""" |
|
text = text.strip() |
|
|
|
|
|
if re.search(r'[\u4e00-\u9fff]', text): |
|
return text |
|
|
|
text = text.lower() |
|
|
|
if remove_numbers: |
|
text = re.sub(r'\d+', '', text) |
|
|
|
if remove_punctuation: |
|
text = re.sub(r'[^\w\s]', '', text) |
|
|
|
words = text.split() |
|
cleaned_words = [w for w in words if w not in STOP_WORDS and len(w) >= config.MIN_WORD_LENGTH] |
|
return ' '.join(cleaned_words) |
|
|
|
@staticmethod |
|
def parse_batch_input(text: str) -> List[str]: |
|
"""Parse batch input from textarea""" |
|
lines = text.strip().split('\n') |
|
return [line.strip() for line in lines if line.strip()] |
|
|
|
|
|
class HistoryManager: |
|
"""Enhanced history management with filtering""" |
|
def __init__(self): |
|
self._history = [] |
|
|
|
def add(self, entry: Dict): |
|
"""Add entry with timestamp""" |
|
entry['timestamp'] = datetime.now().isoformat() |
|
self._history.append(entry) |
|
if len(self._history) > config.MAX_HISTORY_SIZE: |
|
self._history = self._history[-config.MAX_HISTORY_SIZE:] |
|
|
|
def add_batch(self, entries: List[Dict]): |
|
"""Add multiple entries""" |
|
for entry in entries: |
|
self.add(entry) |
|
|
|
def get_all(self) -> List[Dict]: |
|
return self._history.copy() |
|
|
|
def get_recent(self, n: int = 10) -> List[Dict]: |
|
return self._history[-n:] if self._history else [] |
|
|
|
def filter_by(self, sentiment: str = None, language: str = None, |
|
min_confidence: float = None) -> List[Dict]: |
|
"""Filter history by criteria""" |
|
filtered = self._history |
|
|
|
if sentiment: |
|
filtered = [h for h in filtered if h['sentiment'] == sentiment] |
|
if language: |
|
filtered = [h for h in filtered if h.get('language', 'en') == language] |
|
if min_confidence: |
|
filtered = [h for h in filtered if h['confidence'] >= min_confidence] |
|
|
|
return filtered |
|
|
|
def clear(self) -> int: |
|
count = len(self._history) |
|
self._history.clear() |
|
return count |
|
|
|
def size(self) -> int: |
|
return len(self._history) |
|
|
|
def get_stats(self) -> Dict: |
|
"""Get comprehensive statistics""" |
|
if not self._history: |
|
return {} |
|
|
|
sentiments = [item['sentiment'] for item in self._history] |
|
confidences = [item['confidence'] for item in self._history] |
|
languages = [item.get('language', 'en') for item in self._history] |
|
|
|
return { |
|
'total_analyses': len(self._history), |
|
'positive_count': sentiments.count('Positive'), |
|
'negative_count': sentiments.count('Negative'), |
|
'neutral_count': sentiments.count('Neutral'), |
|
'avg_confidence': np.mean(confidences), |
|
'max_confidence': np.max(confidences), |
|
'min_confidence': np.min(confidences), |
|
'languages_detected': len(set(languages)), |
|
'most_common_language': Counter(languages).most_common(1)[0][0] if languages else 'en' |
|
} |
|
|
|
|
|
class SentimentEngine: |
|
"""Optimized multi-language sentiment analysis engine""" |
|
|
|
def __init__(self): |
|
self.model_manager = ModelManager() |
|
self.executor = ThreadPoolExecutor(max_workers=4) |
|
|
|
@handle_errors(default_return={'sentiment': 'Unknown', 'confidence': 0.0}) |
|
def analyze_single(self, text: str, language: str = 'auto', preprocessing_options: Dict = None) -> Dict: |
|
"""Optimized single text analysis""" |
|
if not text.strip(): |
|
raise ValueError("Empty text provided") |
|
|
|
|
|
if language == 'auto': |
|
detected_lang = self.model_manager.detect_language(text) |
|
else: |
|
detected_lang = language |
|
|
|
|
|
model, tokenizer = self.model_manager.get_model(detected_lang) |
|
|
|
|
|
options = preprocessing_options or {} |
|
processed_text = text |
|
if options.get('clean_text', False) and not re.search(r'[\u4e00-\u9fff]', text): |
|
processed_text = TextProcessor.clean_text( |
|
text, |
|
options.get('remove_punctuation', True), |
|
options.get('remove_numbers', False) |
|
) |
|
|
|
|
|
inputs = tokenizer(processed_text, return_tensors="pt", padding=True, |
|
truncation=True, max_length=config.MAX_TEXT_LENGTH).to(self.model_manager.device) |
|
|
|
|
|
with torch.no_grad(): |
|
outputs = model(**inputs) |
|
probs = torch.nn.functional.softmax(outputs.logits, dim=-1).cpu().numpy()[0] |
|
|
|
|
|
if torch.cuda.is_available(): |
|
torch.cuda.empty_cache() |
|
|
|
|
|
if len(probs) == 3: |
|
sentiment_idx = np.argmax(probs) |
|
sentiment_labels = ['Negative', 'Neutral', 'Positive'] |
|
sentiment = sentiment_labels[sentiment_idx] |
|
confidence = float(probs[sentiment_idx]) |
|
|
|
result = { |
|
'sentiment': sentiment, |
|
'confidence': confidence, |
|
'neg_prob': float(probs[0]), |
|
'neu_prob': float(probs[1]), |
|
'pos_prob': float(probs[2]), |
|
'has_neutral': True |
|
} |
|
else: |
|
pred = np.argmax(probs) |
|
sentiment = "Positive" if pred == 1 else "Negative" |
|
confidence = float(probs[pred]) |
|
|
|
result = { |
|
'sentiment': sentiment, |
|
'confidence': confidence, |
|
'neg_prob': float(probs[0]), |
|
'pos_prob': float(probs[1]), |
|
'neu_prob': 0.0, |
|
'has_neutral': False |
|
} |
|
|
|
|
|
result.update({ |
|
'language': detected_lang, |
|
'word_count': len(text.split()), |
|
'char_count': len(text) |
|
}) |
|
|
|
return result |
|
|
|
def _analyze_text_batch(self, text: str, language: str, preprocessing_options: Dict, index: int) -> Dict: |
|
"""Single text analysis for batch processing""" |
|
try: |
|
result = self.analyze_single(text, language, preprocessing_options) |
|
result['batch_index'] = index |
|
result['text'] = text[:100] + '...' if len(text) > 100 else text |
|
result['full_text'] = text |
|
return result |
|
except Exception as e: |
|
return { |
|
'sentiment': 'Error', |
|
'confidence': 0.0, |
|
'error': str(e), |
|
'batch_index': index, |
|
'text': text[:100] + '...' if len(text) > 100 else text, |
|
'full_text': text |
|
} |
|
|
|
@handle_errors(default_return=[]) |
|
def analyze_batch(self, texts: List[str], language: str = 'auto', |
|
preprocessing_options: Dict = None, progress_callback=None) -> List[Dict]: |
|
"""Optimized parallel batch processing""" |
|
if len(texts) > config.BATCH_SIZE_LIMIT: |
|
texts = texts[:config.BATCH_SIZE_LIMIT] |
|
|
|
if not texts: |
|
return [] |
|
|
|
|
|
self.model_manager.get_model(language if language != 'auto' else 'en') |
|
|
|
|
|
with ThreadPoolExecutor(max_workers=min(4, len(texts))) as executor: |
|
futures = [] |
|
for i, text in enumerate(texts): |
|
future = executor.submit( |
|
self._analyze_text_batch, |
|
text, language, preprocessing_options, i |
|
) |
|
futures.append(future) |
|
|
|
results = [] |
|
for i, future in enumerate(futures): |
|
if progress_callback: |
|
progress_callback((i + 1) / len(futures)) |
|
|
|
try: |
|
result = future.result(timeout=30) |
|
results.append(result) |
|
except Exception as e: |
|
results.append({ |
|
'sentiment': 'Error', |
|
'confidence': 0.0, |
|
'error': f"Timeout or error: {str(e)}", |
|
'batch_index': i, |
|
'text': texts[i][:100] + '...' if len(texts[i]) > 100 else texts[i], |
|
'full_text': texts[i] |
|
}) |
|
|
|
return results |
|
|
|
class AdvancedAnalysisEngine: |
|
"""Advanced analysis using SHAP and LIME with FIXED implementation""" |
|
|
|
def __init__(self): |
|
self.model_manager = ModelManager() |
|
|
|
def create_prediction_function(self, model, tokenizer, device): |
|
"""Create FIXED prediction function for SHAP/LIME""" |
|
def predict_proba(texts): |
|
|
|
if isinstance(texts, str): |
|
texts = [texts] |
|
elif isinstance(texts, np.ndarray): |
|
texts = texts.tolist() |
|
|
|
|
|
texts = [str(text) for text in texts] |
|
|
|
results = [] |
|
batch_size = 16 |
|
|
|
for i in range(0, len(texts), batch_size): |
|
batch_texts = texts[i:i + batch_size] |
|
|
|
try: |
|
with torch.no_grad(): |
|
|
|
inputs = tokenizer( |
|
batch_texts, |
|
return_tensors="pt", |
|
padding=True, |
|
truncation=True, |
|
max_length=config.MAX_TEXT_LENGTH |
|
).to(device) |
|
|
|
|
|
outputs = model(**inputs) |
|
probs = torch.nn.functional.softmax(outputs.logits, dim=-1).cpu().numpy() |
|
|
|
results.extend(probs) |
|
|
|
except Exception as e: |
|
logger.error(f"Prediction batch failed: {e}") |
|
|
|
batch_size_actual = len(batch_texts) |
|
if hasattr(model.config, 'num_labels') and model.config.num_labels == 3: |
|
neutral_probs = np.array([[0.33, 0.34, 0.33]] * batch_size_actual) |
|
else: |
|
neutral_probs = np.array([[0.5, 0.5]] * batch_size_actual) |
|
results.extend(neutral_probs) |
|
|
|
return np.array(results) |
|
|
|
return predict_proba |
|
|
|
@handle_errors(default_return=("Analysis failed", None, None)) |
|
def analyze_with_shap(self, text: str, language: str = 'auto', num_samples: int = 100) -> Tuple[str, go.Figure, Dict]: |
|
"""FIXED SHAP analysis implementation""" |
|
if not text.strip(): |
|
return "Please enter text for analysis", None, {} |
|
|
|
|
|
if language == 'auto': |
|
detected_lang = self.model_manager.detect_language(text) |
|
else: |
|
detected_lang = language |
|
|
|
model, tokenizer = self.model_manager.get_model(detected_lang) |
|
|
|
try: |
|
|
|
predict_fn = self.create_prediction_function(model, tokenizer, self.model_manager.device) |
|
|
|
|
|
test_pred = predict_fn([text]) |
|
if test_pred is None or len(test_pred) == 0: |
|
return "Prediction function test failed", None, {} |
|
|
|
|
|
explainer = shap.Explainer(predict_fn, masker=shap.maskers.Text(tokenizer)) |
|
|
|
|
|
shap_values = explainer([text], max_evals=num_samples) |
|
|
|
|
|
if hasattr(shap_values, 'data') and hasattr(shap_values, 'values'): |
|
tokens = shap_values.data[0] if len(shap_values.data) > 0 else [] |
|
values = shap_values.values[0] if len(shap_values.values) > 0 else [] |
|
else: |
|
return "SHAP values extraction failed", None, {} |
|
|
|
if len(tokens) == 0 or len(values) == 0: |
|
return "No tokens or values extracted from SHAP", None, {} |
|
|
|
|
|
if len(values.shape) > 1: |
|
|
|
pos_values = values[:, -1] if values.shape[1] >= 2 else values[:, 0] |
|
else: |
|
pos_values = values |
|
|
|
|
|
min_len = min(len(tokens), len(pos_values)) |
|
tokens = tokens[:min_len] |
|
pos_values = pos_values[:min_len] |
|
|
|
|
|
fig = go.Figure() |
|
|
|
colors = ['red' if v < 0 else 'green' for v in pos_values] |
|
|
|
fig.add_trace(go.Bar( |
|
x=list(range(len(tokens))), |
|
y=pos_values, |
|
text=tokens, |
|
textposition='outside', |
|
marker_color=colors, |
|
name='SHAP Values', |
|
hovertemplate='<b>%{text}</b><br>SHAP Value: %{y:.4f}<extra></extra>' |
|
)) |
|
|
|
fig.update_layout( |
|
title=f"SHAP Analysis - Token Importance (Samples: {num_samples})", |
|
xaxis_title="Token Index", |
|
yaxis_title="SHAP Value", |
|
height=500, |
|
xaxis=dict(tickmode='array', tickvals=list(range(len(tokens))), ticktext=tokens) |
|
) |
|
|
|
|
|
analysis_data = { |
|
'method': 'SHAP', |
|
'language': detected_lang, |
|
'total_tokens': len(tokens), |
|
'samples_used': num_samples, |
|
'positive_influence': sum(1 for v in pos_values if v > 0), |
|
'negative_influence': sum(1 for v in pos_values if v < 0), |
|
'most_important_tokens': [(str(tokens[i]), float(pos_values[i])) |
|
for i in np.argsort(np.abs(pos_values))[-5:]] |
|
} |
|
|
|
summary_text = f""" |
|
**SHAP Analysis Results:** |
|
- **Language:** {detected_lang.upper()} |
|
- **Total Tokens:** {analysis_data['total_tokens']} |
|
- **Samples Used:** {num_samples} |
|
- **Positive Influence Tokens:** {analysis_data['positive_influence']} |
|
- **Negative Influence Tokens:** {analysis_data['negative_influence']} |
|
- **Most Important Tokens:** {', '.join([f"{token}({score:.3f})" for token, score in analysis_data['most_important_tokens']])} |
|
- **Status:** SHAP analysis completed successfully |
|
""" |
|
|
|
return summary_text, fig, analysis_data |
|
|
|
except Exception as e: |
|
logger.error(f"SHAP analysis failed: {e}") |
|
error_msg = f""" |
|
**SHAP Analysis Failed:** |
|
- **Error:** {str(e)} |
|
- **Language:** {detected_lang.upper()} |
|
- **Suggestion:** Try with a shorter text or reduce number of samples |
|
|
|
**Common fixes:** |
|
- Reduce sample size to 50-100 |
|
- Use shorter input text (< 200 words) |
|
- Check if model supports the text language |
|
""" |
|
return error_msg, None, {} |
|
|
|
@handle_errors(default_return=("Analysis failed", None, None)) |
|
def analyze_with_lime(self, text: str, language: str = 'auto', num_samples: int = 100) -> Tuple[str, go.Figure, Dict]: |
|
"""FIXED LIME analysis implementation - Bug Fix for mode parameter""" |
|
if not text.strip(): |
|
return "Please enter text for analysis", None, {} |
|
|
|
|
|
if language == 'auto': |
|
detected_lang = self.model_manager.detect_language(text) |
|
else: |
|
detected_lang = language |
|
|
|
model, tokenizer = self.model_manager.get_model(detected_lang) |
|
|
|
try: |
|
|
|
predict_fn = self.create_prediction_function(model, tokenizer, self.model_manager.device) |
|
|
|
|
|
test_pred = predict_fn([text]) |
|
if test_pred is None or len(test_pred) == 0: |
|
return "Prediction function test failed", None, {} |
|
|
|
|
|
num_classes = test_pred.shape[1] if len(test_pred.shape) > 1 else 2 |
|
if num_classes == 3: |
|
class_names = ['Negative', 'Neutral', 'Positive'] |
|
else: |
|
class_names = ['Negative', 'Positive'] |
|
|
|
|
|
explainer = LimeTextExplainer(class_names=class_names) |
|
|
|
|
|
exp = explainer.explain_instance( |
|
text, |
|
predict_fn, |
|
num_features=min(20, len(text.split())), |
|
num_samples=num_samples |
|
) |
|
|
|
|
|
lime_data = exp.as_list() |
|
|
|
if not lime_data: |
|
return "No LIME features extracted", None, {} |
|
|
|
|
|
words = [item[0] for item in lime_data] |
|
scores = [item[1] for item in lime_data] |
|
|
|
fig = go.Figure() |
|
|
|
colors = ['red' if s < 0 else 'green' for s in scores] |
|
|
|
fig.add_trace(go.Bar( |
|
y=words, |
|
x=scores, |
|
orientation='h', |
|
marker_color=colors, |
|
text=[f'{s:.3f}' for s in scores], |
|
textposition='auto', |
|
name='LIME Importance', |
|
hovertemplate='<b>%{y}</b><br>Importance: %{x:.4f}<extra></extra>' |
|
)) |
|
|
|
fig.update_layout( |
|
title=f"LIME Analysis - Feature Importance (Samples: {num_samples})", |
|
xaxis_title="Importance Score", |
|
yaxis_title="Words/Phrases", |
|
height=500 |
|
) |
|
|
|
|
|
analysis_data = { |
|
'method': 'LIME', |
|
'language': detected_lang, |
|
'features_analyzed': len(lime_data), |
|
'samples_used': num_samples, |
|
'positive_features': sum(1 for _, score in lime_data if score > 0), |
|
'negative_features': sum(1 for _, score in lime_data if score < 0), |
|
'feature_importance': lime_data |
|
} |
|
|
|
summary_text = f""" |
|
**LIME Analysis Results:** |
|
- **Language:** {detected_lang.upper()} |
|
- **Features Analyzed:** {analysis_data['features_analyzed']} |
|
- **Classes:** {', '.join(class_names)} |
|
- **Samples Used:** {num_samples} |
|
- **Positive Features:** {analysis_data['positive_features']} |
|
- **Negative Features:** {analysis_data['negative_features']} |
|
- **Top Features:** {', '.join([f"{word}({score:.3f})" for word, score in lime_data[:5]])} |
|
- **Status:** LIME analysis completed successfully |
|
""" |
|
|
|
return summary_text, fig, analysis_data |
|
|
|
except Exception as e: |
|
logger.error(f"LIME analysis failed: {e}") |
|
error_msg = f""" |
|
**LIME Analysis Failed:** |
|
- **Error:** {str(e)} |
|
- **Language:** {detected_lang.upper()} |
|
- **Suggestion:** Try with a shorter text or reduce number of samples |
|
|
|
**Bug Fix Applied:** |
|
- ✅ Removed 'mode' parameter from LimeTextExplainer initialization |
|
- ✅ This should resolve the "unexpected keyword argument 'mode'" error |
|
|
|
**Common fixes:** |
|
- Reduce sample size to 50-100 |
|
- Use shorter input text (< 200 words) |
|
- Check if model supports the text language |
|
""" |
|
return error_msg, None, {} |
|
|
|
|
|
class PlotlyVisualizer: |
|
"""Enhanced Plotly visualizations""" |
|
|
|
@staticmethod |
|
@handle_errors(default_return=None) |
|
def create_sentiment_gauge(result: Dict, theme: ThemeContext) -> go.Figure: |
|
"""Create animated sentiment gauge""" |
|
colors = theme.colors |
|
|
|
if result.get('has_neutral', False): |
|
|
|
fig = go.Figure(go.Indicator( |
|
mode="gauge+number+delta", |
|
value=result['pos_prob'] * 100, |
|
domain={'x': [0, 1], 'y': [0, 1]}, |
|
title={'text': f"Sentiment: {result['sentiment']}"}, |
|
delta={'reference': 50}, |
|
gauge={ |
|
'axis': {'range': [None, 100]}, |
|
'bar': {'color': colors['pos'] if result['sentiment'] == 'Positive' else colors['neg']}, |
|
'steps': [ |
|
{'range': [0, 33], 'color': colors['neg']}, |
|
{'range': [33, 67], 'color': colors['neu']}, |
|
{'range': [67, 100], 'color': colors['pos']} |
|
], |
|
'threshold': { |
|
'line': {'color': "red", 'width': 4}, |
|
'thickness': 0.75, |
|
'value': 90 |
|
} |
|
} |
|
)) |
|
else: |
|
|
|
fig = go.Figure(go.Indicator( |
|
mode="gauge+number", |
|
value=result['confidence'] * 100, |
|
domain={'x': [0, 1], 'y': [0, 1]}, |
|
title={'text': f"Confidence: {result['sentiment']}"}, |
|
gauge={ |
|
'axis': {'range': [None, 100]}, |
|
'bar': {'color': colors['pos'] if result['sentiment'] == 'Positive' else colors['neg']}, |
|
'steps': [ |
|
{'range': [0, 50], 'color': "lightgray"}, |
|
{'range': [50, 100], 'color': "gray"} |
|
] |
|
} |
|
)) |
|
|
|
fig.update_layout(height=400, font={'size': 16}) |
|
return fig |
|
|
|
@staticmethod |
|
@handle_errors(default_return=None) |
|
def create_probability_bars(result: Dict, theme: ThemeContext) -> go.Figure: |
|
"""Create probability bar chart""" |
|
colors = theme.colors |
|
|
|
if result.get('has_neutral', False): |
|
labels = ['Negative', 'Neutral', 'Positive'] |
|
values = [result['neg_prob'], result['neu_prob'], result['pos_prob']] |
|
bar_colors = [colors['neg'], colors['neu'], colors['pos']] |
|
else: |
|
labels = ['Negative', 'Positive'] |
|
values = [result['neg_prob'], result['pos_prob']] |
|
bar_colors = [colors['neg'], colors['pos']] |
|
|
|
fig = go.Figure(data=[ |
|
go.Bar(x=labels, y=values, marker_color=bar_colors, |
|
text=[f'{v:.3f}' for v in values], textposition='outside') |
|
]) |
|
|
|
fig.update_layout( |
|
title="Sentiment Probabilities", |
|
yaxis_title="Probability", |
|
height=400, |
|
showlegend=False |
|
) |
|
|
|
return fig |
|
|
|
@staticmethod |
|
@handle_errors(default_return=None) |
|
def create_batch_summary(results: List[Dict], theme: ThemeContext) -> go.Figure: |
|
"""Create batch analysis summary""" |
|
colors = theme.colors |
|
|
|
|
|
sentiments = [r['sentiment'] for r in results if 'sentiment' in r and r['sentiment'] != 'Error'] |
|
sentiment_counts = Counter(sentiments) |
|
|
|
|
|
fig = go.Figure(data=[go.Pie( |
|
labels=list(sentiment_counts.keys()), |
|
values=list(sentiment_counts.values()), |
|
marker_colors=[colors.get(s.lower()[:3], '#999999') for s in sentiment_counts.keys()], |
|
textinfo='label+percent', |
|
hole=0.3 |
|
)]) |
|
|
|
fig.update_layout( |
|
title=f"Batch Analysis Summary ({len(results)} texts)", |
|
height=400 |
|
) |
|
|
|
return fig |
|
|
|
@staticmethod |
|
@handle_errors(default_return=None) |
|
def create_confidence_distribution(results: List[Dict]) -> go.Figure: |
|
"""Create confidence distribution plot""" |
|
confidences = [r['confidence'] for r in results if 'confidence' in r and r['sentiment'] != 'Error'] |
|
|
|
if not confidences: |
|
return go.Figure() |
|
|
|
fig = go.Figure(data=[go.Histogram( |
|
x=confidences, |
|
nbinsx=20, |
|
marker_color='skyblue', |
|
opacity=0.7 |
|
)]) |
|
|
|
fig.update_layout( |
|
title="Confidence Distribution", |
|
xaxis_title="Confidence Score", |
|
yaxis_title="Frequency", |
|
height=400 |
|
) |
|
|
|
return fig |
|
|
|
@staticmethod |
|
@handle_errors(default_return=None) |
|
def create_history_dashboard(history: List[Dict], theme: ThemeContext) -> go.Figure: |
|
"""Create comprehensive history dashboard""" |
|
if len(history) < 2: |
|
return go.Figure() |
|
|
|
|
|
fig = make_subplots( |
|
rows=2, cols=2, |
|
subplot_titles=['Sentiment Timeline', 'Confidence Distribution', |
|
'Language Distribution', 'Sentiment Summary'], |
|
specs=[[{"secondary_y": False}, {"secondary_y": False}], |
|
[{"type": "pie"}, {"type": "bar"}]] |
|
) |
|
|
|
|
|
indices = list(range(len(history))) |
|
pos_probs = [item.get('pos_prob', 0) for item in history] |
|
confidences = [item['confidence'] for item in history] |
|
sentiments = [item['sentiment'] for item in history] |
|
languages = [item.get('language', 'en') for item in history] |
|
|
|
|
|
colors_map = {'Positive': theme.colors['pos'], 'Negative': theme.colors['neg'], 'Neutral': theme.colors['neu']} |
|
colors = [colors_map.get(s, '#999999') for s in sentiments] |
|
|
|
fig.add_trace( |
|
go.Scatter(x=indices, y=pos_probs, mode='lines+markers', |
|
marker=dict(color=colors, size=8), |
|
name='Positive Probability'), |
|
row=1, col=1 |
|
) |
|
|
|
|
|
fig.add_trace( |
|
go.Histogram(x=confidences, nbinsx=10, name='Confidence'), |
|
row=1, col=2 |
|
) |
|
|
|
|
|
lang_counts = Counter(languages) |
|
fig.add_trace( |
|
go.Pie(labels=list(lang_counts.keys()), values=list(lang_counts.values()), |
|
name="Languages"), |
|
row=2, col=1 |
|
) |
|
|
|
|
|
sent_counts = Counter(sentiments) |
|
sent_colors = [colors_map.get(k, '#999999') for k in sent_counts.keys()] |
|
fig.add_trace( |
|
go.Bar(x=list(sent_counts.keys()), y=list(sent_counts.values()), |
|
marker_color=sent_colors), |
|
row=2, col=2 |
|
) |
|
|
|
fig.update_layout(height=800, showlegend=False) |
|
return fig |
|
|
|
|
|
class DataHandler: |
|
"""Enhanced data operations""" |
|
|
|
@staticmethod |
|
@handle_errors(default_return=(None, "Export failed")) |
|
def export_data(data: List[Dict], format_type: str) -> Tuple[Optional[str], str]: |
|
"""Export data with comprehensive information""" |
|
if not data: |
|
return None, "No data to export" |
|
|
|
temp_file = tempfile.NamedTemporaryFile(mode='w', delete=False, |
|
suffix=f'.{format_type}', encoding='utf-8') |
|
|
|
if format_type == 'csv': |
|
writer = csv.writer(temp_file) |
|
writer.writerow(['Timestamp', 'Text', 'Sentiment', 'Confidence', 'Language', |
|
'Pos_Prob', 'Neg_Prob', 'Neu_Prob', 'Word_Count']) |
|
for entry in data: |
|
writer.writerow([ |
|
entry.get('timestamp', ''), |
|
entry.get('text', ''), |
|
entry.get('sentiment', ''), |
|
f"{entry.get('confidence', 0):.4f}", |
|
entry.get('language', 'en'), |
|
f"{entry.get('pos_prob', 0):.4f}", |
|
f"{entry.get('neg_prob', 0):.4f}", |
|
f"{entry.get('neu_prob', 0):.4f}", |
|
entry.get('word_count', 0) |
|
]) |
|
elif format_type == 'json': |
|
json.dump(data, temp_file, indent=2, ensure_ascii=False) |
|
|
|
temp_file.close() |
|
return temp_file.name, f"Exported {len(data)} entries" |
|
|
|
@staticmethod |
|
@handle_errors(default_return="") |
|
def process_file(file) -> str: |
|
"""Process uploaded files""" |
|
if not file: |
|
return "" |
|
|
|
content = file.read().decode('utf-8') |
|
|
|
if file.name.endswith('.csv'): |
|
csv_file = io.StringIO(content) |
|
reader = csv.reader(csv_file) |
|
try: |
|
next(reader) |
|
texts = [] |
|
for row in reader: |
|
if row and row[0].strip(): |
|
text = row[0].strip().strip('"') |
|
if text: |
|
texts.append(text) |
|
return '\n'.join(texts) |
|
except: |
|
lines = content.strip().split('\n')[1:] |
|
texts = [] |
|
for line in lines: |
|
if line.strip(): |
|
text = line.strip().strip('"') |
|
if text: |
|
texts.append(text) |
|
return '\n'.join(texts) |
|
|
|
return content |
|
|
|
|
|
class SentimentApp: |
|
"""Optimized multilingual sentiment analysis application""" |
|
|
|
def __init__(self): |
|
self.engine = SentimentEngine() |
|
self.advanced_engine = AdvancedAnalysisEngine() |
|
self.history = HistoryManager() |
|
self.data_handler = DataHandler() |
|
|
|
|
|
self.examples = [ |
|
|
|
["The film had its moments, but overall it felt a bit too long and lacked emotional depth. Some scenes were visually impressive, yet they failed to connect emotionally. By the end, I found myself disengaged and unsatisfied."], |
|
|
|
|
|
["I was completely blown away by the movie — the performances were raw and powerful, and the story stayed with me long after the credits rolled. Every scene felt purposeful, and the emotional arc was handled with incredible nuance. It's the kind of film that makes you reflect deeply on your own life."], |
|
|
|
|
|
["这部电影节奏拖沓,剧情老套,完全没有让我产生任何共鸣,是一次失望的观影体验。演员的表演也显得做作,缺乏真实感。看到最后甚至有点不耐烦,整体表现乏善可陈。"], |
|
|
|
|
|
["Una obra maestra del cine contemporáneo, con actuaciones sobresalientes, un guion bien escrito y una dirección impecable. Cada plano parecía cuidadosamente pensado, y la historia avanzaba con una intensidad emocional que mantenía al espectador cautivado. Definitivamente una película que vale la pena volver a ver."], |
|
|
|
|
|
["Je m'attendais à beaucoup mieux. Le scénario était confus, les dialogues ennuyeux, et je me suis presque endormi au milieu du film. Même la mise en scène, habituellement un point fort, manquait cruellement d'inspiration cette fois-ci."], |
|
|
|
|
|
["Der Film war ein emotionales Erlebnis mit großartigen Bildern, einem mitreißenden Soundtrack und einer Geschichte, die zum Nachdenken anregt. Besonders beeindruckend war die schauspielerische Leistung der Hauptdarsteller, die eine tiefe Menschlichkeit vermittelten. Es ist ein Film, der lange nachwirkt."], |
|
|
|
|
|
["Filmen var en besvikelse – tråkig handling, överdrivet skådespeleri och ett slut som inte gav något avslut alls. Den kändes forcerad och saknade en tydlig röd tråd. Jag gick från biografen med en känsla av tomhet och frustration."] |
|
] |
|
|
|
@handle_errors(default_return=("Please enter text", None, None)) |
|
def analyze_single(self, text: str, language: str, theme: str, clean_text: bool, |
|
remove_punct: bool, remove_nums: bool): |
|
"""Optimized single text analysis""" |
|
if not text.strip(): |
|
return "Please enter text", None, None |
|
|
|
|
|
language_map = {v: k for k, v in config.SUPPORTED_LANGUAGES.items()} |
|
language_code = language_map.get(language, 'auto') |
|
|
|
preprocessing_options = { |
|
'clean_text': clean_text, |
|
'remove_punctuation': remove_punct, |
|
'remove_numbers': remove_nums |
|
} |
|
|
|
with memory_cleanup(): |
|
result = self.engine.analyze_single(text, language_code, preprocessing_options) |
|
|
|
|
|
history_entry = { |
|
'text': text[:100] + '...' if len(text) > 100 else text, |
|
'full_text': text, |
|
'sentiment': result['sentiment'], |
|
'confidence': result['confidence'], |
|
'pos_prob': result.get('pos_prob', 0), |
|
'neg_prob': result.get('neg_prob', 0), |
|
'neu_prob': result.get('neu_prob', 0), |
|
'language': result['language'], |
|
'word_count': result['word_count'], |
|
'analysis_type': 'single' |
|
} |
|
self.history.add(history_entry) |
|
|
|
|
|
theme_ctx = ThemeContext(theme) |
|
gauge_fig = PlotlyVisualizer.create_sentiment_gauge(result, theme_ctx) |
|
bars_fig = PlotlyVisualizer.create_probability_bars(result, theme_ctx) |
|
|
|
|
|
info_text = f""" |
|
**Analysis Results:** |
|
- **Sentiment:** {result['sentiment']} ({result['confidence']:.3f} confidence) |
|
- **Language:** {result['language'].upper()} |
|
- **Statistics:** {result['word_count']} words, {result['char_count']} characters |
|
- **Probabilities:** Positive: {result.get('pos_prob', 0):.3f}, Negative: {result.get('neg_prob', 0):.3f}, Neutral: {result.get('neu_prob', 0):.3f} |
|
""" |
|
|
|
return info_text, gauge_fig, bars_fig |
|
|
|
@handle_errors(default_return=("Please enter texts", None, None, None)) |
|
def analyze_batch(self, batch_text: str, language: str, theme: str, |
|
clean_text: bool, remove_punct: bool, remove_nums: bool): |
|
"""Enhanced batch analysis with parallel processing""" |
|
if not batch_text.strip(): |
|
return "Please enter texts (one per line)", None, None, None |
|
|
|
|
|
texts = TextProcessor.parse_batch_input(batch_text) |
|
|
|
if len(texts) > config.BATCH_SIZE_LIMIT: |
|
return f"Too many texts. Maximum {config.BATCH_SIZE_LIMIT} allowed.", None, None, None |
|
|
|
if not texts: |
|
return "No valid texts found", None, None, None |
|
|
|
|
|
language_map = {v: k for k, v in config.SUPPORTED_LANGUAGES.items()} |
|
language_code = language_map.get(language, 'auto') |
|
|
|
preprocessing_options = { |
|
'clean_text': clean_text, |
|
'remove_punctuation': remove_punct, |
|
'remove_numbers': remove_nums |
|
} |
|
|
|
with memory_cleanup(): |
|
results = self.engine.analyze_batch(texts, language_code, preprocessing_options) |
|
|
|
|
|
batch_entries = [] |
|
for result in results: |
|
if 'error' not in result: |
|
entry = { |
|
'text': result['text'], |
|
'full_text': result['full_text'], |
|
'sentiment': result['sentiment'], |
|
'confidence': result['confidence'], |
|
'pos_prob': result.get('pos_prob', 0), |
|
'neg_prob': result.get('neg_prob', 0), |
|
'neu_prob': result.get('neu_prob', 0), |
|
'language': result['language'], |
|
'word_count': result['word_count'], |
|
'analysis_type': 'batch', |
|
'batch_index': result['batch_index'] |
|
} |
|
batch_entries.append(entry) |
|
|
|
self.history.add_batch(batch_entries) |
|
|
|
|
|
theme_ctx = ThemeContext(theme) |
|
summary_fig = PlotlyVisualizer.create_batch_summary(results, theme_ctx) |
|
confidence_fig = PlotlyVisualizer.create_confidence_distribution(results) |
|
|
|
|
|
df_data = [] |
|
for result in results: |
|
if 'error' in result: |
|
df_data.append({ |
|
'Index': result['batch_index'] + 1, |
|
'Text': result['text'], |
|
'Sentiment': 'Error', |
|
'Confidence': 0.0, |
|
'Language': 'Unknown', |
|
'Error': result['error'] |
|
}) |
|
else: |
|
df_data.append({ |
|
'Index': result['batch_index'] + 1, |
|
'Text': result['text'], |
|
'Sentiment': result['sentiment'], |
|
'Confidence': f"{result['confidence']:.3f}", |
|
'Language': result['language'].upper(), |
|
'Word_Count': result.get('word_count', 0) |
|
}) |
|
|
|
df = pd.DataFrame(df_data) |
|
|
|
|
|
successful_results = [r for r in results if 'error' not in r] |
|
error_count = len(results) - len(successful_results) |
|
|
|
if successful_results: |
|
sentiment_counts = Counter([r['sentiment'] for r in successful_results]) |
|
avg_confidence = np.mean([r['confidence'] for r in successful_results]) |
|
languages = Counter([r['language'] for r in successful_results]) |
|
|
|
summary_text = f""" |
|
**Batch Analysis Summary:** |
|
- **Total Texts:** {len(texts)} |
|
- **Successful:** {len(successful_results)} |
|
- **Errors:** {error_count} |
|
- **Average Confidence:** {avg_confidence:.3f} |
|
- **Sentiments:** {dict(sentiment_counts)} |
|
- **Languages Detected:** {dict(languages)} |
|
""" |
|
else: |
|
summary_text = f"All {len(texts)} texts failed to analyze." |
|
|
|
return summary_text, df, summary_fig, confidence_fig |
|
|
|
|
|
@handle_errors(default_return=("Please enter text", None)) |
|
def analyze_with_shap(self, text: str, language: str, num_samples: int = 100): |
|
"""Perform FIXED SHAP analysis with configurable samples""" |
|
language_map = {v: k for k, v in config.SUPPORTED_LANGUAGES.items()} |
|
language_code = language_map.get(language, 'auto') |
|
|
|
return self.advanced_engine.analyze_with_shap(text, language_code, num_samples) |
|
|
|
@handle_errors(default_return=("Please enter text", None)) |
|
def analyze_with_lime(self, text: str, language: str, num_samples: int = 100): |
|
"""Perform FIXED LIME analysis with configurable samples""" |
|
language_map = {v: k for k, v in config.SUPPORTED_LANGUAGES.items()} |
|
language_code = language_map.get(language, 'auto') |
|
|
|
return self.advanced_engine.analyze_with_lime(text, language_code, num_samples) |
|
|
|
@handle_errors(default_return=(None, "No history available")) |
|
def plot_history(self, theme: str = 'default'): |
|
"""Plot comprehensive history analysis""" |
|
history = self.history.get_all() |
|
if len(history) < 2: |
|
return None, f"Need at least 2 analyses for trends. Current: {len(history)}" |
|
|
|
theme_ctx = ThemeContext(theme) |
|
|
|
with memory_cleanup(): |
|
fig = PlotlyVisualizer.create_history_dashboard(history, theme_ctx) |
|
stats = self.history.get_stats() |
|
|
|
stats_text = f""" |
|
**History Statistics:** |
|
- **Total Analyses:** {stats.get('total_analyses', 0)} |
|
- **Positive:** {stats.get('positive_count', 0)} |
|
- **Negative:** {stats.get('negative_count', 0)} |
|
- **Neutral:** {stats.get('neutral_count', 0)} |
|
- **Average Confidence:** {stats.get('avg_confidence', 0):.3f} |
|
- **Languages:** {stats.get('languages_detected', 0)} |
|
- **Most Common Language:** {stats.get('most_common_language', 'N/A').upper()} |
|
""" |
|
|
|
return fig, stats_text |
|
|
|
@handle_errors(default_return=("No data available",)) |
|
def get_history_status(self): |
|
"""Get current history status""" |
|
stats = self.history.get_stats() |
|
if not stats: |
|
return "No analyses performed yet" |
|
|
|
return f""" |
|
**Current Status:** |
|
- **Total Analyses:** {stats['total_analyses']} |
|
- **Recent Sentiment Distribution:** |
|
* Positive: {stats['positive_count']} |
|
* Negative: {stats['negative_count']} |
|
* Neutral: {stats['neutral_count']} |
|
- **Average Confidence:** {stats['avg_confidence']:.3f} |
|
- **Languages Detected:** {stats['languages_detected']} |
|
""" |
|
|
|
|
|
def create_interface(): |
|
"""Create comprehensive Gradio interface with optimizations""" |
|
app = SentimentApp() |
|
|
|
with gr.Blocks(theme=gr.themes.Soft(), title="Multilingual Sentiment Analyzer") as demo: |
|
gr.Markdown("# 🌍 Multilingual Sentiment Analyzer") |
|
gr.Markdown("AI-powered sentiment analysis with SHAP & LIME explainable AI features") |
|
|
|
with gr.Tab("Single Analysis"): |
|
with gr.Row(): |
|
with gr.Column(): |
|
text_input = gr.Textbox( |
|
label="Enter Text for Analysis", |
|
placeholder="Enter your text in any supported language...", |
|
lines=5 |
|
) |
|
|
|
with gr.Row(): |
|
language_selector = gr.Dropdown( |
|
choices=list(config.SUPPORTED_LANGUAGES.values()), |
|
value="Auto Detect", |
|
label="Language" |
|
) |
|
theme_selector = gr.Dropdown( |
|
choices=list(config.THEMES.keys()), |
|
value="default", |
|
label="Theme" |
|
) |
|
|
|
with gr.Row(): |
|
clean_text_cb = gr.Checkbox(label="Clean Text", value=False) |
|
remove_punct_cb = gr.Checkbox(label="Remove Punctuation", value=False) |
|
remove_nums_cb = gr.Checkbox(label="Remove Numbers", value=False) |
|
|
|
analyze_btn = gr.Button("Analyze", variant="primary", size="lg") |
|
|
|
gr.Examples( |
|
examples=app.examples, |
|
inputs=text_input, |
|
cache_examples=False |
|
) |
|
|
|
with gr.Column(): |
|
result_output = gr.Textbox(label="Analysis Results", lines=8) |
|
|
|
with gr.Row(): |
|
gauge_plot = gr.Plot(label="Sentiment Gauge") |
|
probability_plot = gr.Plot(label="Probability Distribution") |
|
|
|
|
|
with gr.Tab("Advanced Analysis"): |
|
gr.Markdown("## Explainable AI Analysis") |
|
gr.Markdown("**SHAP and LIME analysis with FIXED implementation** - now handles text input correctly!") |
|
|
|
with gr.Row(): |
|
with gr.Column(): |
|
advanced_text_input = gr.Textbox( |
|
label="Enter Text for Advanced Analysis", |
|
placeholder="Enter text to analyze with SHAP and LIME...", |
|
lines=6, |
|
value="This movie is absolutely fantastic and amazing!" |
|
) |
|
|
|
with gr.Row(): |
|
advanced_language = gr.Dropdown( |
|
choices=list(config.SUPPORTED_LANGUAGES.values()), |
|
value="Auto Detect", |
|
label="Language" |
|
) |
|
|
|
num_samples_slider = gr.Slider( |
|
minimum=50, |
|
maximum=300, |
|
value=100, |
|
step=25, |
|
label="Number of Samples", |
|
info="Lower = Faster, Higher = More Accurate" |
|
) |
|
|
|
with gr.Row(): |
|
shap_btn = gr.Button("SHAP Analysis", variant="primary") |
|
lime_btn = gr.Button("LIME Analysis", variant="secondary") |
|
|
|
gr.Markdown(""" |
|
|
|
**📊 Analysis Methods:** |
|
- **SHAP**: Token-level importance scores using Text masker |
|
- **LIME**: Feature importance through text perturbation |
|
|
|
**⚡ Expected Performance:** |
|
- 50 samples: ~10-20s | 100 samples: ~20-40s | 200+ samples: ~40-80s |
|
""") |
|
|
|
with gr.Column(): |
|
advanced_results = gr.Textbox(label="Analysis Summary", lines=12) |
|
|
|
with gr.Row(): |
|
advanced_plot = gr.Plot(label="Feature Importance Visualization") |
|
|
|
with gr.Tab("Batch Analysis"): |
|
with gr.Row(): |
|
with gr.Column(): |
|
file_upload = gr.File( |
|
label="Upload File (CSV/TXT)", |
|
file_types=[".csv", ".txt"] |
|
) |
|
batch_input = gr.Textbox( |
|
label="Batch Input (one text per line)", |
|
placeholder="Enter multiple texts, one per line...", |
|
lines=10 |
|
) |
|
|
|
with gr.Row(): |
|
batch_language = gr.Dropdown( |
|
choices=list(config.SUPPORTED_LANGUAGES.values()), |
|
value="Auto Detect", |
|
label="Language" |
|
) |
|
batch_theme = gr.Dropdown( |
|
choices=list(config.THEMES.keys()), |
|
value="default", |
|
label="Theme" |
|
) |
|
|
|
with gr.Row(): |
|
batch_clean_cb = gr.Checkbox(label="Clean Text", value=False) |
|
batch_punct_cb = gr.Checkbox(label="Remove Punctuation", value=False) |
|
batch_nums_cb = gr.Checkbox(label="Remove Numbers", value=False) |
|
|
|
with gr.Row(): |
|
load_file_btn = gr.Button("Load File") |
|
analyze_batch_btn = gr.Button("Analyze Batch", variant="primary") |
|
|
|
with gr.Column(): |
|
batch_summary = gr.Textbox(label="Batch Summary", lines=8) |
|
batch_results_df = gr.Dataframe( |
|
label="Detailed Results", |
|
headers=["Index", "Text", "Sentiment", "Confidence", "Language", "Word_Count"], |
|
datatype=["number", "str", "str", "str", "str", "number"] |
|
) |
|
|
|
with gr.Row(): |
|
batch_plot = gr.Plot(label="Batch Analysis Summary") |
|
confidence_dist_plot = gr.Plot(label="Confidence Distribution") |
|
|
|
with gr.Tab("History & Analytics"): |
|
with gr.Row(): |
|
with gr.Column(): |
|
with gr.Row(): |
|
refresh_history_btn = gr.Button("Refresh History") |
|
clear_history_btn = gr.Button("Clear History", variant="stop") |
|
status_btn = gr.Button("Get Status") |
|
|
|
history_theme = gr.Dropdown( |
|
choices=list(config.THEMES.keys()), |
|
value="default", |
|
label="Dashboard Theme" |
|
) |
|
|
|
with gr.Row(): |
|
export_csv_btn = gr.Button("Export CSV") |
|
export_json_btn = gr.Button("Export JSON") |
|
|
|
with gr.Column(): |
|
history_status = gr.Textbox(label="History Status", lines=8) |
|
|
|
history_dashboard = gr.Plot(label="History Analytics Dashboard") |
|
|
|
with gr.Row(): |
|
csv_download = gr.File(label="CSV Download", visible=True) |
|
json_download = gr.File(label="JSON Download", visible=True) |
|
|
|
|
|
|
|
|
|
analyze_btn.click( |
|
app.analyze_single, |
|
inputs=[text_input, language_selector, theme_selector, |
|
clean_text_cb, remove_punct_cb, remove_nums_cb], |
|
outputs=[result_output, gauge_plot, probability_plot] |
|
) |
|
|
|
|
|
shap_btn.click( |
|
app.analyze_with_shap, |
|
inputs=[advanced_text_input, advanced_language, num_samples_slider], |
|
outputs=[advanced_results, advanced_plot] |
|
) |
|
|
|
lime_btn.click( |
|
app.analyze_with_lime, |
|
inputs=[advanced_text_input, advanced_language, num_samples_slider], |
|
outputs=[advanced_results, advanced_plot] |
|
) |
|
|
|
|
|
load_file_btn.click( |
|
app.data_handler.process_file, |
|
inputs=file_upload, |
|
outputs=batch_input |
|
) |
|
|
|
analyze_batch_btn.click( |
|
app.analyze_batch, |
|
inputs=[batch_input, batch_language, batch_theme, |
|
batch_clean_cb, batch_punct_cb, batch_nums_cb], |
|
outputs=[batch_summary, batch_results_df, batch_plot, confidence_dist_plot] |
|
) |
|
|
|
|
|
refresh_history_btn.click( |
|
app.plot_history, |
|
inputs=history_theme, |
|
outputs=[history_dashboard, history_status] |
|
) |
|
|
|
clear_history_btn.click( |
|
lambda: f"Cleared {app.history.clear()} entries", |
|
outputs=history_status |
|
) |
|
|
|
status_btn.click( |
|
app.get_history_status, |
|
outputs=history_status |
|
) |
|
|
|
export_csv_btn.click( |
|
lambda: app.data_handler.export_data(app.history.get_all(), 'csv'), |
|
outputs=[csv_download, history_status] |
|
) |
|
|
|
export_json_btn.click( |
|
lambda: app.data_handler.export_data(app.history.get_all(), 'json'), |
|
outputs=[json_download, history_status] |
|
) |
|
|
|
return demo |
|
|
|
|
|
if __name__ == "__main__": |
|
logging.basicConfig( |
|
level=logging.INFO, |
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' |
|
) |
|
|
|
try: |
|
demo = create_interface() |
|
demo.launch( |
|
share=True, |
|
server_name="0.0.0.0", |
|
server_port=7860, |
|
show_error=True |
|
) |
|
except Exception as e: |
|
logger.error(f"Failed to launch application: {e}") |
|
raise |