import gradio as gr import spaces import pandas as pd import torch from transformers import pipeline, AutoModelForSeq2SeqLM, AutoTokenizer import plotly.graph_objects as go import logging import io from rapidfuzz import fuzz import time def fuzzy_deduplicate(df, column, threshold=55): """Deduplicate rows based on fuzzy matching of text content""" seen_texts = [] indices_to_keep = [] for i, text in enumerate(df[column]): if pd.isna(text): indices_to_keep.append(i) continue text = str(text) if not seen_texts or all(fuzz.ratio(text, seen) < threshold for seen in seen_texts): seen_texts.append(text) indices_to_keep.append(i) return df.iloc[indices_to_keep] logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class ProcessControl: def __init__(self): self.stop_requested = False def request_stop(self): self.stop_requested = True def should_stop(self): return self.stop_requested def reset(self): self.stop_requested = False class ProcessControl: def __init__(self): self.stop_requested = False self.error = None def request_stop(self): self.stop_requested = True def should_stop(self): return self.stop_requested def reset(self): self.stop_requested = False self.error = None def set_error(self, error): self.error = error self.stop_requested = True class EventDetector: def __init__(self): """Initialize models with GPU support""" try: # Initialize sentiment models device = "cuda" if torch.cuda.is_available() else "cpu" logger.info(f"Initializing models on device: {device}") self.finbert = pipeline( "sentiment-analysis", model="ProsusAI/finbert", device=device, truncation=True, max_length=512 ) self.roberta = pipeline( "sentiment-analysis", model="cardiffnlp/twitter-roberta-base-sentiment", device=device, truncation=True, max_length=512 ) self.finbert_tone = pipeline( "sentiment-analysis", model="yiyanghkust/finbert-tone", device=device, truncation=True, max_length=512 ) # Initialize MT5 model self.model_name = "google/mt5-small" self.tokenizer = AutoTokenizer.from_pretrained( self.model_name, legacy=True ) self.model = AutoModelForSeq2SeqLM.from_pretrained(self.model_name).to(device) self.device = device self.initialized = True logger.info("All models initialized successfully") except Exception as e: logger.error(f"Error in EventDetector initialization: {str(e)}") raise @spaces.GPU(duration=30) def initialize_models(self): """Keep this method for compatibility, now just returns initialization status""" return self.initialized def analyze_sentiment(self, text): """Rest of the analyze_sentiment method remains the same""" try: if not text or not isinstance(text, str): return "Neutral" text = text.strip() if not text: return "Neutral" # Get predictions from all models finbert_result = self.finbert(text)[0] roberta_result = self.roberta(text)[0] finbert_tone_result = self.finbert_tone(text)[0] # Map labels to standard format def map_sentiment(result): label = result['label'].lower() if label in ['positive', 'pos', 'positive tone']: return "Positive" elif label in ['negative', 'neg', 'negative tone']: return "Negative" return "Neutral" # Get mapped sentiments sentiments = [ map_sentiment(finbert_result), map_sentiment(roberta_result), map_sentiment(finbert_tone_result) ] # Use majority voting sentiment_counts = pd.Series(sentiments).value_counts() if sentiment_counts.iloc[0] >= 2: return sentiment_counts.index[0] return "Neutral" except Exception as e: logger.error(f"Sentiment analysis error: {str(e)}") return "Neutral" def detect_events(self, text, entity): """Rest of the detect_events method remains the same""" if not text or not entity: return "Нет", "Invalid input" try: text = str(text).strip() entity = str(entity).strip() if not text or not entity: return "Нет", "Empty input" # First check for keyword matches text_lower = text.lower() keywords = { 'Отчетность': ['отчет', 'выручка', 'прибыль', 'ebitda', 'финансов', 'результат', 'показател'], 'РЦБ': ['облигаци', 'купон', 'дефолт', 'реструктуризац', 'ценные бумаги', 'долг'], 'Суд': ['суд', 'иск', 'арбитраж', 'разбирательств', 'банкрот'] } # Check keywords first detected_event = None for event_type, terms in keywords.items(): if any(term in text_lower for term in terms): detected_event = event_type break if detected_event: # Prepare prompt for summary prompt = f"""Summarize this {detected_event} news about {entity}: Text: {text} Create a brief, factual summary focusing on the main points. Format: Summary: [2-3 sentence summary]""" # Generate summary inputs = self.tokenizer( prompt, return_tensors="pt", padding=True, truncation=True, max_length=512 ).to(self.device) outputs = self.model.generate( **inputs, max_length=200, num_return_sequences=1, do_sample=False, pad_token_id=self.tokenizer.pad_token_id, eos_token_id=self.tokenizer.eos_token_id, no_repeat_ngram_size=3 ) response = self.tokenizer.decode(outputs[0], skip_special_tokens=True) # Extract summary if "Summary:" in response: summary = response.split("Summary:")[1].strip() summary = summary.replace('', '').replace('', '').strip() else: if detected_event == 'Отчетность': summary = f"Компания {entity} опубликовала финансовые показатели." elif detected_event == 'РЦБ': summary = f"Обнаружена информация о ценных бумагах компании {entity}." elif detected_event == 'Суд': summary = f"Компания {entity} участвует в судебном разбирательстве." return detected_event, summary return "Нет", "No significant event detected" except Exception as e: logger.error(f"Event detection error: {str(e)}") return "Нет", f"Error in event detection: {str(e)}" def cleanup(self): """Clean up GPU resources""" try: self.model = None self.finbert = None self.roberta = None self.finbert_tone = None torch.cuda.empty_cache() self.initialized = False logger.info("Cleaned up GPU resources") except Exception as e: logger.error(f"Error in cleanup: {str(e)}") def create_visualizations(df): if df is None or df.empty: return None, None try: sentiments = df['Sentiment'].value_counts() fig_sentiment = go.Figure(data=[go.Pie( labels=sentiments.index, values=sentiments.values, marker_colors=['#FF6B6B', '#4ECDC4', '#95A5A6'] )]) fig_sentiment.update_layout(title="Распределение тональности") events = df['Event_Type'].value_counts() fig_events = go.Figure(data=[go.Bar( x=events.index, y=events.values, marker_color='#2196F3' )]) fig_events.update_layout(title="Распределение событий") return fig_sentiment, fig_events except Exception as e: logger.error(f"Visualization error: {e}") return None, None @spaces.GPU def process_file(file_obj): try: logger.info("Starting to read Excel file...") df = pd.read_excel(file_obj, sheet_name='Публикации') logger.info(f"Successfully read Excel file. Shape: {df.shape}") # Deduplication original_count = len(df) df = fuzzy_deduplicate(df, 'Выдержки из текста', threshold=55) logger.info(f"Removed {original_count - len(df)} duplicate entries") detector = EventDetector() processed_rows = [] total = len(df) # Process in smaller batches with quota management BATCH_SIZE = 3 # Reduced batch size QUOTA_WAIT_TIME = 60 # Wait time when quota is exceeded for batch_start in range(0, total, BATCH_SIZE): try: batch_end = min(batch_start + BATCH_SIZE, total) batch = df.iloc[batch_start:batch_end] # Initialize models for batch if not detector.initialized: detector.initialize_models() time.sleep(1) # Wait after initialization for idx, row in batch.iterrows(): try: text = str(row.get('Выдержки из текста', '')) if not text.strip(): continue entity = str(row.get('Объект', '')) if not entity.strip(): continue # Process with GPU quota management event_type = "Нет" event_summary = "" sentiment = "Neutral" try: event_type, event_summary = detector.detect_events(text, entity) time.sleep(1) # Wait between GPU operations sentiment = detector.analyze_sentiment(text) except Exception as e: if "GPU quota" in str(e): logger.warning("GPU quota exceeded, waiting...") time.sleep(QUOTA_WAIT_TIME) continue else: raise e processed_rows.append({ 'Объект': entity, 'Заголовок': str(row.get('Заголовок', '')), 'Sentiment': sentiment, 'Event_Type': event_type, 'Event_Summary': event_summary, 'Текст': text[:1000] }) logger.info(f"Processed {idx + 1}/{total} rows") except Exception as e: logger.error(f"Error processing row {idx}: {str(e)}") continue # Create intermediate results if processed_rows: intermediate_df = pd.DataFrame(processed_rows) yield ( intermediate_df, None, None, f"Обработано {len(processed_rows)}/{total} строк" ) # Wait between batches time.sleep(2) # Cleanup GPU resources after each batch torch.cuda.empty_cache() except Exception as e: logger.error(f"Batch processing error: {str(e)}") if "GPU quota" in str(e): time.sleep(QUOTA_WAIT_TIME) continue # Final results if processed_rows: result_df = pd.DataFrame(processed_rows) fig_sentiment, fig_events = create_visualizations(result_df) return result_df, fig_sentiment, fig_events, "Обработка завершена!" else: return None, None, None, "Нет обработанных данных" except Exception as e: logger.error(f"File processing error: {str(e)}") raise def create_interface(): control = ProcessControl() with gr.Blocks(theme=gr.themes.Soft()) as app: gr.Markdown("# AI-анализ мониторинга новостей v.1.24+") with gr.Row(): file_input = gr.File( label="Загрузите Excel файл", file_types=[".xlsx"], type="binary" ) with gr.Row(): with gr.Column(scale=1): analyze_btn = gr.Button( "▶️ Начать анализ", variant="primary", size="lg" ) with gr.Column(scale=1): stop_btn = gr.Button( "⏹️ Остановить", variant="stop", size="lg" ) with gr.Row(): progress = gr.Textbox( label="Статус обработки", interactive=False, value="Ожидание файла..." ) with gr.Row(): stats = gr.DataFrame( label="Результаты анализа", interactive=False, wrap=True ) with gr.Row(): with gr.Column(scale=1): sentiment_plot = gr.Plot(label="Распределение тональности") with gr.Column(scale=1): events_plot = gr.Plot(label="Распределение событий") def stop_processing(): control.request_stop() return "Остановка обработки..." @spaces.GPU(duration=300) # 5 minutes duration for the entire analysis def analyze(file_bytes): if file_bytes is None: gr.Warning("Пожалуйста, загрузите файл") return None, None, None, "Ожидание файла..." try: # Reset stop flag control.reset() file_obj = io.BytesIO(file_bytes) logger.info("File loaded into BytesIO successfully") detector = EventDetector() # Initialize models with GPU @spaces.GPU(duration=30) def init_models(): return detector.initialize_models() if not init_models(): raise Exception("Failed to initialize models") # Process in batches with GPU allocation @spaces.GPU(duration=20) def process_batch(batch, entity): event_type, event_summary = detector.detect_events(batch, entity) time.sleep(1) # Wait between GPU operations sentiment = detector.analyze_sentiment(batch) return event_type, event_summary, sentiment # Read and deduplicate data df = pd.read_excel(file_obj, sheet_name='Публикации') original_count = len(df) df = fuzzy_deduplicate(df, 'Выдержки из текста', threshold=55) logger.info(f"Removed {original_count - len(df)} duplicate entries") processed_rows = [] total = len(df) batch_size = 3 for batch_start in range(0, total, batch_size): if control.should_stop(): break batch_end = min(batch_start + batch_size, total) batch = df.iloc[batch_start:batch_end] for idx, row in batch.iterrows(): try: text = str(row.get('Выдержки из текста', '')).strip() entity = str(row.get('Объект', '')).strip() if not text or not entity: continue # Process with GPU event_type, event_summary, sentiment = process_batch(text, entity) processed_rows.append({ 'Объект': entity, 'Заголовок': str(row.get('Заголовок', '')), 'Sentiment': sentiment, 'Event_Type': event_type, 'Event_Summary': event_summary, 'Текст': text[:1000] }) except Exception as e: logger.error(f"Error processing row {idx}: {str(e)}") continue # Create intermediate results if processed_rows: result_df = pd.DataFrame(processed_rows) fig_sentiment, fig_events = create_visualizations(result_df) yield ( result_df, fig_sentiment, fig_events, f"Обработано {len(processed_rows)}/{total} строк" ) # Cleanup GPU resources after batch torch.cuda.empty_cache() time.sleep(2) if processed_rows: final_df = pd.DataFrame(processed_rows) fig_sentiment, fig_events = create_visualizations(final_df) return final_df, fig_sentiment, fig_events, "Обработка завершена!" else: return None, None, None, "Нет обработанных данных" except Exception as e: error_msg = f"Ошибка анализа: {str(e)}" logger.error(error_msg) gr.Error(error_msg) return None, None, None, error_msg stop_btn.click(fn=stop_processing, outputs=[progress]) analyze_btn.click( fn=analyze, inputs=[file_input], outputs=[stats, sentiment_plot, events_plot, progress] ) return app if __name__ == "__main__": app = create_interface() app.launch(share=True)