import gradio as gr import spaces import pandas as pd import torch from transformers import pipeline, AutoModelForSeq2SeqLM, AutoTokenizer import plotly.graph_objects as go import logging import io from rapidfuzz import fuzz import time def fuzzy_deduplicate(df, column, threshold=55): """Deduplicate rows based on fuzzy matching of text content""" seen_texts = [] indices_to_keep = [] for i, text in enumerate(df[column]): if pd.isna(text): indices_to_keep.append(i) continue text = str(text) if not seen_texts or all(fuzz.ratio(text, seen) < threshold for seen in seen_texts): seen_texts.append(text) indices_to_keep.append(i) return df.iloc[indices_to_keep] logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class ProcessControl: def __init__(self): self.stop_requested = False def request_stop(self): self.stop_requested = True def should_stop(self): return self.stop_requested def reset(self): self.stop_requested = False class ProcessControl: def __init__(self): self.stop_requested = False self.error = None def request_stop(self): self.stop_requested = True def should_stop(self): return self.stop_requested def reset(self): self.stop_requested = False self.error = None def set_error(self, error): self.error = error self.stop_requested = True class EventDetector: def __init__(self): try: self.model_name = "google/mt5-small" self.tokenizer = AutoTokenizer.from_pretrained( self.model_name, legacy=True ) self.model = None self.finbert = None self.roberta = None self.finbert_tone = None self.last_gpu_use = 0 self.initialized = False logger.info("EventDetector initialized successfully") except Exception as e: logger.error(f"Error in EventDetector initialization: {e}") raise @spaces.GPU(duration=30) def initialize_models(self): if self.initialized: return True try: current_time = time.time() if current_time - self.last_gpu_use < 2: time.sleep(2) device = "cuda" if torch.cuda.is_available() else "cpu" logger.info(f"Initializing models on device: {device}") self.model = AutoModelForSeq2SeqLM.from_pretrained(self.model_name).to(device) # Initialize sentiment models with proper error handling try: self.finbert = pipeline( "sentiment-analysis", model="ProsusAI/finbert", device=device, truncation=True, max_length=512 ) except Exception as e: logger.error(f"Error initializing finbert: {e}") raise try: self.roberta = pipeline( "sentiment-analysis", model="cardiffnlp/twitter-roberta-base-sentiment", device=device, truncation=True, max_length=512 ) except Exception as e: logger.error(f"Error initializing roberta: {e}") raise try: self.finbert_tone = pipeline( "sentiment-analysis", model="yiyanghkust/finbert-tone", device=device, truncation=True, max_length=512 ) except Exception as e: logger.error(f"Error initializing finbert_tone: {e}") raise self.last_gpu_use = time.time() self.initialized = True logger.info("All models initialized successfully") return True except Exception as e: self.initialized = False logger.error(f"Model initialization error: {str(e)}") # Clean up any partially initialized models self.cleanup() raise def cleanup(self): """Clean up GPU resources""" try: self.model = None self.finbert = None self.roberta = None self.finbert_tone = None torch.cuda.empty_cache() self.initialized = False except Exception as e: logger.error(f"Error in cleanup: {e}") @spaces.GPU(duration=20) def detect_events(self, text, entity): if not text or not entity: return "Нет", "Invalid input" try: if not self.initialized: if not self.initialize_models(): return "Нет", "Model initialization failed" current_time = time.time() if current_time - self.last_gpu_use < 2: time.sleep(2) text = text[:500] # Truncate text prompt = f"""Analyze the following news about {entity}: Text: {text} Task: Identify the main event type and provide a brief summary.""" device = self.model.device inputs = self.tokenizer( prompt, return_tensors="pt", padding=True, truncation=True, max_length=512 ).to(device) outputs = self.model.generate( **inputs, max_length=300, num_return_sequences=1, pad_token_id=self.tokenizer.pad_token_id, eos_token_id=self.tokenizer.eos_token_id ) response = self.tokenizer.decode(outputs[0], skip_special_tokens=True) # Event classification event_type = "Нет" if any(term in text.lower() for term in ['отчет', 'выручка', 'прибыль', 'ebitda']): event_type = "Отчетность" elif any(term in text.lower() for term in ['облигаци', 'купон', 'дефолт']): event_type = "РЦБ" elif any(term in text.lower() for term in ['суд', 'иск', 'арбитраж']): event_type = "Суд" self.last_gpu_use = time.time() return event_type, response except Exception as e: logger.error(f"Event detection error: {str(e)}") return "Нет", f"Error: {str(e)}" @spaces.GPU(duration=20) def analyze_sentiment(self, text): try: if not self.initialized: if not self.initialize_models(): return "Neutral" current_time = time.time() if current_time - self.last_gpu_use < 2: time.sleep(2) truncated_text = text[:500] results = [] try: inputs = [truncated_text] sentiment_results = [] # Process each model separately with delay if self.finbert: finbert_result = self.finbert(inputs, truncation=True, max_length=512)[0] results.append(self.get_sentiment_label(finbert_result)) time.sleep(0.5) if self.roberta: roberta_result = self.roberta(inputs, truncation=True, max_length=512)[0] results.append(self.get_sentiment_label(roberta_result)) time.sleep(0.5) if self.finbert_tone: finbert_tone_result = self.finbert_tone(inputs, truncation=True, max_length=512)[0] results.append(self.get_sentiment_label(finbert_tone_result)) # Get majority vote if results: sentiment_counts = pd.Series(results).value_counts() final_sentiment = sentiment_counts.index[0] if sentiment_counts.iloc[0] >= 2 else "Neutral" else: final_sentiment = "Neutral" self.last_gpu_use = time.time() return final_sentiment except Exception as e: logger.error(f"Model inference error: {e}") return "Neutral" except Exception as e: logger.error(f"Sentiment analysis error: {e}") return "Neutral" def create_visualizations(df): if df is None or df.empty: return None, None try: sentiments = df['Sentiment'].value_counts() fig_sentiment = go.Figure(data=[go.Pie( labels=sentiments.index, values=sentiments.values, marker_colors=['#FF6B6B', '#4ECDC4', '#95A5A6'] )]) fig_sentiment.update_layout(title="Распределение тональности") events = df['Event_Type'].value_counts() fig_events = go.Figure(data=[go.Bar( x=events.index, y=events.values, marker_color='#2196F3' )]) fig_events.update_layout(title="Распределение событий") return fig_sentiment, fig_events except Exception as e: logger.error(f"Visualization error: {e}") return None, None @spaces.GPU def process_file(file_obj): try: logger.info("Starting to read Excel file...") df = pd.read_excel(file_obj, sheet_name='Публикации') logger.info(f"Successfully read Excel file. Shape: {df.shape}") # Deduplication original_count = len(df) df = fuzzy_deduplicate(df, 'Выдержки из текста', threshold=55) logger.info(f"Removed {original_count - len(df)} duplicate entries") detector = EventDetector() processed_rows = [] total = len(df) # Process in smaller batches with quota management BATCH_SIZE = 3 # Reduced batch size QUOTA_WAIT_TIME = 60 # Wait time when quota is exceeded for batch_start in range(0, total, BATCH_SIZE): try: batch_end = min(batch_start + BATCH_SIZE, total) batch = df.iloc[batch_start:batch_end] # Initialize models for batch if not detector.initialized: detector.initialize_models() time.sleep(1) # Wait after initialization for idx, row in batch.iterrows(): try: text = str(row.get('Выдержки из текста', '')) if not text.strip(): continue entity = str(row.get('Объект', '')) if not entity.strip(): continue # Process with GPU quota management event_type = "Нет" event_summary = "" sentiment = "Neutral" try: event_type, event_summary = detector.detect_events(text, entity) time.sleep(1) # Wait between GPU operations sentiment = detector.analyze_sentiment(text) except Exception as e: if "GPU quota" in str(e): logger.warning("GPU quota exceeded, waiting...") time.sleep(QUOTA_WAIT_TIME) continue else: raise e processed_rows.append({ 'Объект': entity, 'Заголовок': str(row.get('Заголовок', '')), 'Sentiment': sentiment, 'Event_Type': event_type, 'Event_Summary': event_summary, 'Текст': text[:1000] }) logger.info(f"Processed {idx + 1}/{total} rows") except Exception as e: logger.error(f"Error processing row {idx}: {str(e)}") continue # Create intermediate results if processed_rows: intermediate_df = pd.DataFrame(processed_rows) yield ( intermediate_df, None, None, f"Обработано {len(processed_rows)}/{total} строк" ) # Wait between batches time.sleep(2) # Cleanup GPU resources after each batch torch.cuda.empty_cache() except Exception as e: logger.error(f"Batch processing error: {str(e)}") if "GPU quota" in str(e): time.sleep(QUOTA_WAIT_TIME) continue # Final results if processed_rows: result_df = pd.DataFrame(processed_rows) fig_sentiment, fig_events = create_visualizations(result_df) return result_df, fig_sentiment, fig_events, "Обработка завершена!" else: return None, None, None, "Нет обработанных данных" except Exception as e: logger.error(f"File processing error: {str(e)}") raise def create_interface(): control = ProcessControl() with gr.Blocks(theme=gr.themes.Soft()) as app: gr.Markdown("# AI-анализ мониторинга новостей v.1.20") with gr.Row(): file_input = gr.File( label="Загрузите Excel файл", file_types=[".xlsx"], type="binary" ) with gr.Row(): with gr.Column(scale=1): analyze_btn = gr.Button( "▶️ Начать анализ", variant="primary", size="lg" ) with gr.Column(scale=1): stop_btn = gr.Button( "⏹️ Остановить", variant="stop", size="lg" ) with gr.Row(): progress = gr.Textbox( label="Статус обработки", interactive=False, value="Ожидание файла..." ) with gr.Row(): stats = gr.DataFrame( label="Результаты анализа", interactive=False, wrap=True ) with gr.Row(): with gr.Column(scale=1): sentiment_plot = gr.Plot(label="Распределение тональности") with gr.Column(scale=1): events_plot = gr.Plot(label="Распределение событий") def stop_processing(): control.request_stop() return "Остановка обработки..." @spaces.GPU(duration=300) # 5 minutes duration for the entire analysis def analyze(file_bytes): if file_bytes is None: gr.Warning("Пожалуйста, загрузите файл") return None, None, None, "Ожидание файла..." try: # Reset stop flag control.reset() file_obj = io.BytesIO(file_bytes) logger.info("File loaded into BytesIO successfully") detector = EventDetector() # Initialize models with GPU @spaces.GPU(duration=30) def init_models(): return detector.initialize_models() if not init_models(): raise Exception("Failed to initialize models") # Process in batches with GPU allocation @spaces.GPU(duration=20) def process_batch(batch, entity): event_type, event_summary = detector.detect_events(batch, entity) time.sleep(1) # Wait between GPU operations sentiment = detector.analyze_sentiment(batch) return event_type, event_summary, sentiment # Read and deduplicate data df = pd.read_excel(file_obj, sheet_name='Публикации') original_count = len(df) df = fuzzy_deduplicate(df, 'Выдержки из текста', threshold=55) logger.info(f"Removed {original_count - len(df)} duplicate entries") processed_rows = [] total = len(df) batch_size = 3 for batch_start in range(0, total, batch_size): if control.should_stop(): break batch_end = min(batch_start + batch_size, total) batch = df.iloc[batch_start:batch_end] for idx, row in batch.iterrows(): try: text = str(row.get('Выдержки из текста', '')).strip() entity = str(row.get('Объект', '')).strip() if not text or not entity: continue # Process with GPU event_type, event_summary, sentiment = process_batch(text, entity) processed_rows.append({ 'Объект': entity, 'Заголовок': str(row.get('Заголовок', '')), 'Sentiment': sentiment, 'Event_Type': event_type, 'Event_Summary': event_summary, 'Текст': text[:1000] }) except Exception as e: logger.error(f"Error processing row {idx}: {str(e)}") continue # Create intermediate results if processed_rows: result_df = pd.DataFrame(processed_rows) fig_sentiment, fig_events = create_visualizations(result_df) yield ( result_df, fig_sentiment, fig_events, f"Обработано {len(processed_rows)}/{total} строк" ) # Cleanup GPU resources after batch torch.cuda.empty_cache() time.sleep(2) if processed_rows: final_df = pd.DataFrame(processed_rows) fig_sentiment, fig_events = create_visualizations(final_df) return final_df, fig_sentiment, fig_events, "Обработка завершена!" else: return None, None, None, "Нет обработанных данных" except Exception as e: error_msg = f"Ошибка анализа: {str(e)}" logger.error(error_msg) gr.Error(error_msg) return None, None, None, error_msg stop_btn.click(fn=stop_processing, outputs=[progress]) analyze_btn.click( fn=analyze, inputs=[file_input], outputs=[stats, sentiment_plot, events_plot, progress] ) return app if __name__ == "__main__": app = create_interface() app.launch(share=True)