import gradio as gr import spaces import pandas as pd import torch from transformers import pipeline, AutoModelForSeq2SeqLM, AutoTokenizer import plotly.graph_objects as go import logging import io from rapidfuzz import fuzz import time import os groq_key = os.environ['groq_key'] from langchain_openai import ChatOpenAI from langchain.prompts import PromptTemplate from openpyxl import load_workbook from openpyxl.utils.dataframe import dataframe_to_rows def fuzzy_deduplicate(df, column, threshold=55): """Deduplicate rows based on fuzzy matching of text content""" seen_texts = [] indices_to_keep = [] for i, text in enumerate(df[column]): if pd.isna(text): indices_to_keep.append(i) continue text = str(text) if not seen_texts or all(fuzz.ratio(text, seen) < threshold for seen in seen_texts): seen_texts.append(text) indices_to_keep.append(i) return df.iloc[indices_to_keep] logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class ProcessControl: def __init__(self): self.stop_requested = False def request_stop(self): self.stop_requested = True def should_stop(self): return self.stop_requested def reset(self): self.stop_requested = False class ProcessControl: def __init__(self): self.stop_requested = False self.error = None def request_stop(self): self.stop_requested = True def should_stop(self): return self.stop_requested def reset(self): self.stop_requested = False self.error = None def set_error(self, error): self.error = error self.stop_requested = True class EventDetector: def __init__(self): try: # Initialize models device = "cuda" if torch.cuda.is_available() else "cpu" logger.info(f"Initializing models on device: {device}") # Initialize all models self.initialize_models(device) # Move initialization to separate method self.device = device self.initialized = True logger.info("All models initialized successfully") except Exception as e: logger.error(f"Error in EventDetector initialization: {str(e)}") raise @spaces.GPU(duration=30) def initialize_models(self, device): """Initialize all models with GPU support""" # Initialize translation model self.translator = pipeline( "translation", model="Helsinki-NLP/opus-mt-ru-en", device=device ) self.rutranslator = pipeline( "translation", model="Helsinki-NLP/opus-mt-en-ru", device=device ) # Initialize sentiment models self.finbert = pipeline( "sentiment-analysis", model="ProsusAI/finbert", device=device, truncation=True, max_length=512 ) self.roberta = pipeline( "sentiment-analysis", model="cardiffnlp/twitter-roberta-base-sentiment", device=device, truncation=True, max_length=512 ) self.finbert_tone = pipeline( "sentiment-analysis", model="yiyanghkust/finbert-tone", device=device, truncation=True, max_length=512 ) # Initialize MT5 model self.model_name = "google/mt5-small" self.tokenizer = AutoTokenizer.from_pretrained( self.model_name, legacy=True ) self.model = AutoModelForSeq2SeqLM.from_pretrained(self.model_name).to(device) # Initialize Groq if 'groq_key': self.groq = ChatOpenAI( base_url="https://api.groq.com/openai/v1", model="llama-3.1-70b-versatile", openai_api_key=groq_key, temperature=0.0 ) else: logger.warning("Groq API key not found, impact estimation will be limited") self.groq = None @spaces.GPU(duration=20) def _translate_text(self, text): """Translate Russian text to English""" try: if not text or not isinstance(text, str): return "" text = text.strip() if not text: return "" # Split into manageable chunks max_length = 450 chunks = [text[i:i + max_length] for i in range(0, len(text), max_length)] translated_chunks = [] for chunk in chunks: result = self.translator(chunk)[0]['translation_text'] translated_chunks.append(result) time.sleep(0.1) # Rate limiting return " ".join(translated_chunks) except Exception as e: logger.error(f"Translation error: {str(e)}") return text @spaces.GPU(duration=20) def analyze_sentiment(self, text): """Enhanced sentiment analysis with better negative detection""" try: if not text or not isinstance(text, str): return "Neutral" text = text.strip() if not text: return "Neutral" # Get predictions with confidence scores finbert_result = self.finbert(text)[0] roberta_result = self.roberta(text)[0] finbert_tone_result = self.finbert_tone(text)[0] # Enhanced sentiment mapping with confidence thresholds def map_sentiment(result): label = result['label'].lower() score = result['score'] # Higher threshold for positive to reduce false positives if label in ['positive', 'pos', 'positive tone'] and score > 0.75: return "Positive" # Lower threshold for negative to catch more cases elif label in ['negative', 'neg', 'negative tone'] and score > 0.6: return "Negative" # Consider high-confidence neutral predictions elif label == 'neutral' and score > 0.8: return "Neutral" # Default to negative for uncertain cases in financial context else: return "Negative" if score > 0.4 else "Neutral" # Get mapped sentiments with confidence-based logic sentiments = [ map_sentiment(finbert_result), map_sentiment(roberta_result), map_sentiment(finbert_tone_result) ] # Weighted voting - prioritize negative signals if "Negative" in sentiments: neg_count = sentiments.count("Negative") if neg_count >= 2: # More sensitive to negative sentiment return "Negative" pos_count = sentiments.count("Positive") if pos_count >= 2: # Require stronger positive consensus return "Positive" return "Neutral" except Exception as e: logger.error(f"Sentiment analysis error: {str(e)}") return "Neutral" def estimate_impact(self, text, entity): """Estimate impact using Groq for negative sentiment texts""" try: if not self.groq: return "Неопределенный эффект", "Groq API недоступен" template = """ You are a financial analyst. Analyze this news about {entity} and assess its potential impact. News: {news} Classify the impact into one of these categories: 1. "Значительный риск убытков" (Significant loss risk) 2. "Умеренный риск убытков" (Moderate loss risk) 3. "Незначительный риск убытков" (Minor loss risk) 4. "Вероятность прибыли" (Potential profit) 5. "Неопределенный эффект" (Uncertain effect) Format your response exactly as: Impact: [category] Reasoning: [explanation in 2-3 sentences] """ prompt = PromptTemplate(template=template, input_variables=["entity", "news"]) chain = prompt | self.groq response = chain.invoke({ "entity": entity, "news": text }) # Parse response response_text = response.content if hasattr(response, 'content') else str(response) if "Impact:" in response_text and "Reasoning:" in response_text: parts = response_text.split("Reasoning:") impact = parts[0].split("Impact:")[1].strip() reasoning = parts[1].strip() else: impact = "Неопределенный эффект" reasoning = "Не удалось определить влияние" return impact, reasoning except Exception as e: logger.error(f"Impact estimation error: {str(e)}") return "Неопределенный эффект", f"Ошибка анализа: {str(e)}" @spaces.GPU(duration=60) def process_text(self, text, entity): """Process text with Groq-driven sentiment analysis""" try: translated_text = self._translate_text(text) initial_sentiment = self.analyze_sentiment(translated_text) impact = "Неопределенный эффект" reasoning = "" # Always get Groq analysis for all texts impact, reasoning = self.estimate_impact(translated_text, entity) reasoning = self.rutranslator(reasoning)[0]['translation_text'] # Override sentiment based on Groq impact final_sentiment = initial_sentiment if impact == "Вероятность прибыли": final_sentiment = "Positive" event_type, event_summary = self.detect_events(text, entity) return { 'translated_text': translated_text, 'sentiment': final_sentiment, 'impact': impact, 'reasoning': reasoning, 'event_type': event_type, 'event_summary': event_summary } except Exception as e: logger.error(f"Text processing error: {str(e)}") return { 'translated_text': '', 'sentiment': 'Neutral', 'impact': 'Неопределенный эффект', 'reasoning': f'Ошибка обработки: {str(e)}', 'event_type': 'Нет', 'event_summary': '' } @spaces.GPU(duration=20) def detect_events(self, text, entity): if not text or not entity: return "Нет", "Invalid input" try: # Improved prompt for MT5 prompt = f"""Analyze this news about {entity}: Text: {text} Classify this news into ONE of these categories: 1. "Отчетность" if about: financial reports, revenue, profit, EBITDA, financial results, quarterly/annual reports 2. "Суд" if about: court cases, lawsuits, arbitration, bankruptcy, legal proceedings 3. "РЦБ" if about: bonds, securities, defaults, debt restructuring, coupon payments 4. "Нет" if none of the above Provide classification and 2-3 sentence summary focusing on key facts. Format response exactly as: Category: [category name] Summary: [brief factual summary]""" inputs = self.tokenizer( prompt, return_tensors="pt", padding=True, truncation=True, max_length=512 ).to(self.device) outputs = self.model.generate( **inputs, max_length=200, num_return_sequences=1, do_sample=False, temperature=0.7, top_p=0.9, no_repeat_ngram_size=3 ) response = self.tokenizer.decode(outputs[0], skip_special_tokens=True) # Extract category and summary if "Category:" in response and "Summary:" in response: parts = response.split("Summary:") category = parts[0].split("Category:")[1].strip() summary = parts[1].strip() # Validate category valid_categories = {"Отчетность", "Суд", "РЦБ", "Нет"} category = category if category in valid_categories else "Нет" return category, summary return "Нет", "Could not classify event" except Exception as e: logger.error(f"Event detection error: {str(e)}") return "Нет", f"Error in event detection: {str(e)}" def cleanup(self): """Clean up GPU resources""" try: self.model = None self.translator = None self.finbert = None self.roberta = None self.finbert_tone = None torch.cuda.empty_cache() self.initialized = False logger.info("Cleaned up GPU resources") except Exception as e: logger.error(f"Error in cleanup: {str(e)}") def create_visualizations(df): if df is None or df.empty: return None, None try: sentiments = df['Sentiment'].value_counts() fig_sentiment = go.Figure(data=[go.Pie( labels=sentiments.index, values=sentiments.values, marker_colors=['#FF6B6B', '#4ECDC4', '#95A5A6'] )]) fig_sentiment.update_layout(title="Распределение тональности") events = df['Event_Type'].value_counts() fig_events = go.Figure(data=[go.Bar( x=events.index, y=events.values, marker_color='#2196F3' )]) fig_events.update_layout(title="Распределение событий") return fig_sentiment, fig_events except Exception as e: logger.error(f"Visualization error: {e}") return None, None @spaces.GPU def process_file(file_obj): try: logger.info("Starting to read Excel file...") df = pd.read_excel(file_obj, sheet_name='Публикации') logger.info(f"Successfully read Excel file. Shape: {df.shape}") # Deduplication original_count = len(df) df = fuzzy_deduplicate(df, 'Выдержки из текста', threshold=55) logger.info(f"Removed {original_count - len(df)} duplicate entries") detector = EventDetector() processed_rows = [] total = len(df) # Process in smaller batches with quota management BATCH_SIZE = 3 # Reduced batch size QUOTA_WAIT_TIME = 60 # Wait time when quota is exceeded for batch_start in range(0, total, BATCH_SIZE): try: batch_end = min(batch_start + BATCH_SIZE, total) batch = df.iloc[batch_start:batch_end] # Initialize models for batch if not detector.initialized: detector.initialize_models() time.sleep(1) # Wait after initialization for idx, row in batch.iterrows(): try: text = str(row.get('Выдержки из текста', '')) if not text.strip(): continue entity = str(row.get('Объект', '')) if not entity.strip(): continue # Process with GPU quota management event_type = "Нет" event_summary = "" sentiment = "Neutral" try: event_type, event_summary = detector.detect_events(text, entity) time.sleep(1) # Wait between GPU operations sentiment = detector.analyze_sentiment(text) except Exception as e: if "GPU quota" in str(e): logger.warning("GPU quota exceeded, waiting...") time.sleep(QUOTA_WAIT_TIME) continue else: raise e processed_rows.append({ 'Объект': entity, 'Заголовок': str(row.get('Заголовок', '')), 'Sentiment': sentiment, 'Event_Type': event_type, 'Event_Summary': event_summary, 'Текст': text[:1000] }) logger.info(f"Processed {idx + 1}/{total} rows") except Exception as e: logger.error(f"Error processing row {idx}: {str(e)}") continue # Create intermediate results if processed_rows: intermediate_df = pd.DataFrame(processed_rows) yield ( intermediate_df, None, None, f"Обработано {len(processed_rows)}/{total} строк" ) # Wait between batches time.sleep(2) # Cleanup GPU resources after each batch torch.cuda.empty_cache() except Exception as e: logger.error(f"Batch processing error: {str(e)}") if "GPU quota" in str(e): time.sleep(QUOTA_WAIT_TIME) continue # Final results if processed_rows: result_df = pd.DataFrame(processed_rows) fig_sentiment, fig_events = create_visualizations(result_df) return result_df, fig_sentiment, fig_events, "Обработка завершена!" else: return None, None, None, "Нет обработанных данных" except Exception as e: logger.error(f"File processing error: {str(e)}") raise def create_output_file(df, uploaded_file): """Create Excel file with multiple sheets from processed DataFrame""" try: wb = load_workbook("sample_file.xlsx") # 1. Update 'Публикации' sheet ws = wb['Публикации'] for r_idx, row in enumerate(dataframe_to_rows(df, index=False, header=True), start=1): for c_idx, value in enumerate(row, start=1): ws.cell(row=r_idx, column=c_idx, value=value) # 2. Update 'Мониторинг' sheet with events ws = wb['Мониторинг'] row_idx = 4 events_df = df[df['Event_Type'] != 'Нет'].copy() for _, row in events_df.iterrows(): ws.cell(row=row_idx, column=5, value=row['Объект']) ws.cell(row=row_idx, column=6, value=row['Заголовок']) ws.cell(row=row_idx, column=7, value=row['Event_Type']) ws.cell(row=row_idx, column=8, value=row['Event_Summary']) ws.cell(row=row_idx, column=9, value=row['Выдержки из текста']) row_idx += 1 # 3. Update 'Сводка' sheet ws = wb['Сводка'] unique_entities = df['Объект'].unique() entity_stats = [] for entity in unique_entities: entity_df = df[df['Объект'] == entity] stats = { 'Объект': entity, 'Всего': len(entity_df), 'Негативные': len(entity_df[entity_df['Sentiment'] == 'Negative']), 'Позитивные': len(entity_df[entity_df['Sentiment'] == 'Positive']) } # Get most severe impact for entity negative_df = entity_df[entity_df['Sentiment'] == 'Negative'] if len(negative_df) > 0: impacts = negative_df['Impact'].dropna() if len(impacts) > 0: stats['Impact'] = impacts.iloc[0] else: stats['Impact'] = 'Неопределенный эффект' else: stats['Impact'] = 'Неопределенный эффект' entity_stats.append(stats) # Sort by number of negative mentions entity_stats = sorted(entity_stats, key=lambda x: x['Негативные'], reverse=True) # Write to sheet row_idx = 4 # Starting row in Сводка sheet for stats in entity_stats: ws.cell(row=row_idx, column=5, value=stats['Объект']) ws.cell(row=row_idx, column=6, value=stats['Всего']) ws.cell(row=row_idx, column=7, value=stats['Негативные']) ws.cell(row=row_idx, column=8, value=stats['Позитивные']) ws.cell(row=row_idx, column=9, value=stats['Impact']) row_idx += 1 # 4. Update 'Значимые' sheet ws = wb['Значимые'] row_idx = 3 sentiment_df = df[df['Sentiment'].isin(['Negative', 'Positive'])].copy() for _, row in sentiment_df.iterrows(): ws.cell(row=row_idx, column=3, value=row['Объект']) ws.cell(row=row_idx, column=4, value='релевантно') ws.cell(row=row_idx, column=5, value=row['Sentiment']) ws.cell(row=row_idx, column=6, value=row.get('Impact', '-')) ws.cell(row=row_idx, column=7, value=row['Заголовок']) ws.cell(row=row_idx, column=8, value=row['Выдержки из текста']) row_idx += 1 # 5. Update 'Анализ' sheet ws = wb['Анализ'] row_idx = 4 negative_df = df[df['Sentiment'] == 'Negative'].copy() for _, row in negative_df.iterrows(): ws.cell(row=row_idx, column=5, value=row['Объект']) ws.cell(row=row_idx, column=6, value=row['Заголовок']) ws.cell(row=row_idx, column=7, value="Риск убытка") ws.cell(row=row_idx, column=8, value=row.get('Reasoning', '-')) ws.cell(row=row_idx, column=9, value=row['Выдержки из текста']) row_idx += 1 # 6. Update 'Тех.приложение' sheet if 'Тех.приложение' not in wb.sheetnames: wb.create_sheet('Тех.приложение') ws = wb['Тех.приложение'] tech_cols = ['Объект', 'Заголовок', 'Выдержки из текста', 'Translated', 'Sentiment', 'Impact', 'Reasoning'] tech_df = df[tech_cols].copy() for r_idx, row in enumerate(dataframe_to_rows(tech_df, index=False, header=True), start=1): for c_idx, value in enumerate(row, start=1): ws.cell(row=r_idx, column=c_idx, value=value) # Save workbook output = io.BytesIO() wb.save(output) output.seek(0) return output except Exception as e: logger.error(f"Error creating output file: {str(e)}") logger.error(f"DataFrame shape: {df.shape}") logger.error(f"Available columns: {df.columns.tolist()}") return None def create_interface(): control = ProcessControl() with gr.Blocks(theme=gr.themes.Soft()) as app: # Create state for file data current_file = gr.State(None) gr.Markdown("# AI-анализ мониторинга новостей v.1.58") with gr.Row(): file_input = gr.File( label="Загрузите Excel файл", file_types=[".xlsx"], type="binary" ) with gr.Row(): with gr.Column(scale=1): analyze_btn = gr.Button( "▶️ Начать анализ", variant="primary", size="lg" ) with gr.Column(scale=1): stop_btn = gr.Button( "⏹️ Остановить", variant="stop", size="lg" ) with gr.Row(): status_box = gr.Textbox( label="Статус дедупликации", interactive=False, value="" ) with gr.Row(): progress = gr.Textbox( label="Статус обработки", interactive=False, value="Ожидание файла..." ) with gr.Row(): stats = gr.DataFrame( label="Результаты анализа", interactive=False, wrap=True ) with gr.Row(): with gr.Column(scale=1): sentiment_plot = gr.Plot(label="Распределение тональности") with gr.Column(scale=1): events_plot = gr.Plot(label="Распределение событий") # Create a download row with file component only with gr.Row(): file_output = gr.File( label="Скачать результаты", visible=True, interactive=True ) def stop_processing(): control.request_stop() return "Остановка обработки..." @spaces.GPU(duration=300) def process_and_download(file_bytes): if file_bytes is None: gr.Warning("Пожалуйста, загрузите файл") return ( pd.DataFrame(), None, None, None, "Ожидание файла...", "" ) try: file_obj = io.BytesIO(file_bytes) logger.info("File loaded into BytesIO successfully") detector = EventDetector() # Read and deduplicate data df = pd.read_excel(file_obj, sheet_name='Публикации') original_count = len(df) df = fuzzy_deduplicate(df, 'Выдержки из текста', threshold=55) removed_count = original_count - len(df) dedup_message = f"Удалено {removed_count} дубликатов из {original_count} записей" logger.info(f"Removed {removed_count} duplicate entries") processed_rows = [] total = len(df) batch_size = 3 for batch_start in range(0, total, batch_size): if control.should_stop(): if processed_rows: result_df = pd.DataFrame(processed_rows) output_bytes_io = create_output_file(result_df, file_obj) if output_bytes_io: fig_sentiment, fig_events = create_visualizations(result_df) # Create temporary file temp_file = "partial_results.xlsx" with open(temp_file, "wb") as f: f.write(output_bytes_io.getvalue()) return ( result_df, fig_sentiment, fig_events, temp_file, # Return path to temporary file f"Обработка остановлена. Обработано {len(processed_rows)}/{total} строк", dedup_message ) break batch_end = min(batch_start + batch_size, total) batch = df.iloc[batch_start:batch_end] for idx, row in batch.iterrows(): try: text = str(row.get('Выдержки из текста', '')).strip() entity = str(row.get('Объект', '')).strip() if not text or not entity: continue # Process with GPU results = detector.process_text(text, entity) processed_rows.append({ 'Объект': entity, 'Заголовок': str(row.get('Заголовок', '')), 'Translated': results['translated_text'], 'Sentiment': results['sentiment'], 'Impact': results['impact'], 'Reasoning': results['reasoning'], 'Event_Type': results['event_type'], 'Event_Summary': results['event_summary'], 'Выдержки из текста': text }) except Exception as e: logger.error(f"Error processing row {idx}: {str(e)}") continue if processed_rows: result_df = pd.DataFrame(processed_rows) output_bytes_io = create_output_file(result_df, file_obj) fig_sentiment, fig_events = create_visualizations(result_df) if output_bytes_io: # Create temporary file temp_file = "results.xlsx" with open(temp_file, "wb") as f: f.write(output_bytes_io.getvalue()) return ( result_df, fig_sentiment, fig_events, temp_file, # Return path to temporary file "Обработка завершена!", dedup_message ) return ( pd.DataFrame(), None, None, None, "Нет обработанных данных", dedup_message ) except Exception as e: error_msg = f"Ошибка анализа: {str(e)}" logger.error(error_msg) gr.Error(error_msg) return ( pd.DataFrame(), None, None, None, error_msg, "" ) finally: if detector: detector.cleanup() stop_btn.click(fn=stop_processing, outputs=[progress]) # Main processing - simplified outputs analyze_btn.click( fn=process_and_download, inputs=[file_input], outputs=[ stats, sentiment_plot, events_plot, file_output, progress, status_box ] ) return app if __name__ == "__main__": app = create_interface() app.launch(share=True)