Spaces:
Sleeping
Sleeping
import gradio as gr | |
import spaces | |
import pandas as pd | |
import torch | |
from transformers import pipeline, AutoModelForSeq2SeqLM, AutoTokenizer | |
import plotly.graph_objects as go | |
import logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
class EventDetector: | |
def __init__(self): | |
try: | |
logger.info(f"Using device: {torch.cuda.get_device_name(0) if torch.cuda.is_available() else 'CPU'}") | |
self.device = "cuda" if torch.cuda.is_available() else "cpu" | |
self.model_name = "google/mt5-small" | |
self.tokenizer = AutoTokenizer.from_pretrained(self.model_name) | |
self.model = AutoModelForSeq2SeqLM.from_pretrained(self.model_name).to(self.device) | |
self.finbert = pipeline("sentiment-analysis", model="ProsusAI/finbert", device=self.device) | |
self.roberta = pipeline("sentiment-analysis", model="cardiffnlp/twitter-roberta-base-sentiment", device=self.device) | |
self.finbert_tone = pipeline("sentiment-analysis", model="yiyanghkust/finbert-tone", device=self.device) | |
logger.info("Models initialized successfully") | |
except Exception as e: | |
logger.error(f"Model initialization error: {e}") | |
raise | |
def detect_events(self, text, entity): | |
if not text or not entity: | |
return "Нет", "Invalid input" | |
try: | |
prompt = f"""<s>Analyze the following news about {entity}: | |
Text: {text} | |
Task: Identify the main event type and provide a brief summary.</s>""" | |
inputs = self.tokenizer(prompt, return_tensors="pt", padding=True, | |
truncation=True, max_length=512).to(self.device) | |
outputs = self.model.generate(**inputs, max_length=300, num_return_sequences=1) | |
response = self.tokenizer.decode(outputs[0], skip_special_tokens=True) | |
event_type = "Нет" | |
if any(term in text.lower() for term in ['отчет', 'выручка', 'прибыль', 'ebitda']): | |
event_type = "Отчетность" | |
elif any(term in text.lower() for term in ['облигаци', 'купон', 'дефолт']): | |
event_type = "РЦБ" | |
elif any(term in text.lower() for term in ['суд', 'иск', 'арбитраж']): | |
event_type = "Суд" | |
return event_type, response | |
except Exception as e: | |
logger.error(f"Event detection error: {e}") | |
return "Нет", f"Error: {str(e)}" | |
def analyze_sentiment(self, text): | |
try: | |
results = [] | |
texts = [text[:512]] # Truncate to avoid token length issues | |
for model in [self.finbert, self.roberta, self.finbert_tone]: | |
try: | |
result = model(texts)[0] | |
results.append(self._get_sentiment(result)) | |
except Exception as e: | |
logger.error(f"Model inference error: {e}") | |
results.append("Neutral") | |
sentiment_counts = pd.Series(results).value_counts() | |
return sentiment_counts.index[0] if sentiment_counts.iloc[0] >= 2 else "Neutral" | |
except Exception as e: | |
logger.error(f"Sentiment analysis error: {e}") | |
return "Neutral" | |
def _get_sentiment(self, result): | |
label = result['label'].lower() | |
if label in ["positive", "label_2", "pos"]: | |
return "Positive" | |
elif label in ["negative", "label_0", "neg"]: | |
return "Negative" | |
return "Neutral" | |
def process_file(file): | |
try: | |
gr.Info("Starting file processing...") | |
if isinstance(file, str): | |
df = pd.read_excel(file, sheet_name='Публикации') | |
else: | |
df = pd.read_excel(file.name, sheet_name='Публикации') | |
detector = EventDetector() | |
processed_rows = [] | |
total = len(df) | |
for idx, row in df.iterrows(): | |
if idx % 10 == 0: | |
gr.Info(f"Processing {idx}/{total} rows...") | |
text = str(row.get('Выдержки из текста', '')) | |
entity = str(row.get('Объект', '')) | |
event_type, event_summary = detector.detect_events(text, entity) | |
sentiment = detector.analyze_sentiment(text) | |
processed_rows.append({ | |
'Объект': entity, | |
'Заголовок': str(row.get('Заголовок', '')), | |
'Sentiment': sentiment, | |
'Event_Type': event_type, | |
'Event_Summary': event_summary, | |
'Текст': text | |
}) | |
result_df = pd.DataFrame(processed_rows) | |
gr.Info("File processing complete!") | |
return result_df | |
except Exception as e: | |
logger.error(f"File processing error: {e}") | |
gr.Error(f"Error processing file: {str(e)}") | |
return pd.DataFrame(columns=['Объект', 'Заголовок', 'Sentiment', 'Event_Type', 'Event_Summary', 'Текст']) | |
def create_visualizations(df): | |
if df is None or df.empty: | |
return None, None | |
try: | |
sentiments = df['Sentiment'].value_counts() | |
fig_sentiment = go.Figure(data=[go.Pie( | |
labels=sentiments.index, | |
values=sentiments.values, | |
marker_colors=['#FF6B6B', '#4ECDC4', '#95A5A6'] | |
)]) | |
fig_sentiment.update_layout(title="Распределение тональности") | |
events = df['Event_Type'].value_counts() | |
fig_events = go.Figure(data=[go.Bar( | |
x=events.index, | |
y=events.values, | |
marker_color='#2196F3' | |
)]) | |
fig_events.update_layout(title="Распределение событий") | |
return fig_sentiment, fig_events | |
except Exception as e: | |
logger.error(f"Visualization error: {e}") | |
return None, None | |
def create_interface(): | |
with gr.Blocks(theme=gr.themes.Soft()) as app: | |
gr.Markdown("# AI-анализ мониторинга новостей v.1.06") | |
with gr.Row(): | |
file_input = gr.File( | |
label="Загрузите Excel файл", | |
file_types=[".xlsx"], | |
type="binary" # Changed from "file" to "binary" | |
) | |
with gr.Row(): | |
analyze_btn = gr.Button( | |
"Начать анализ", | |
variant="primary" | |
) | |
with gr.Row(): | |
progress = gr.Textbox( | |
label="Статус", | |
interactive=False | |
) | |
with gr.Row(): | |
stats = gr.DataFrame( | |
label="Результаты анализа", | |
interactive=False, | |
wrap=True | |
) | |
with gr.Row(): | |
with gr.Column(): | |
sentiment_plot = gr.Plot(label="Распределение тональности") | |
with gr.Column(): | |
events_plot = gr.Plot(label="Распределение событий") | |
def analyze(file): | |
if file is None: | |
gr.Warning("Пожалуйста, загрузите файл") | |
return None, None, None, "Ожидание файла" | |
try: | |
progress.update("Обработка начата...") | |
# Modified file handling for binary type | |
temp_file = file.name # For binary type, file.name contains the path | |
df = process_file(temp_file) | |
if df.empty: | |
return None, None, None, "Нет данных для обработки" | |
fig_sentiment, fig_events = create_visualizations(df) | |
return df, fig_sentiment, fig_events, "Обработка завершена" | |
except Exception as e: | |
logger.error(f"Analysis error: {e}") | |
gr.Error(f"Ошибка анализа: {str(e)}") | |
return None, None, None, f"Ошибка: {str(e)}" | |
analyze_btn.click( | |
analyze, | |
inputs=[file_input], | |
outputs=[stats, sentiment_plot, events_plot, progress] | |
) | |
return app | |
if __name__ == "__main__": | |
app = create_interface() | |
app.launch(share=True) |