gprocess / app.py
pentarosarium's picture
v.1.20
680c2d5
raw
history blame
21.4 kB
import gradio as gr
import spaces
import pandas as pd
import torch
from transformers import pipeline, AutoModelForSeq2SeqLM, AutoTokenizer
import plotly.graph_objects as go
import logging
import io
from rapidfuzz import fuzz
import time
def fuzzy_deduplicate(df, column, threshold=55):
"""Deduplicate rows based on fuzzy matching of text content"""
seen_texts = []
indices_to_keep = []
for i, text in enumerate(df[column]):
if pd.isna(text):
indices_to_keep.append(i)
continue
text = str(text)
if not seen_texts or all(fuzz.ratio(text, seen) < threshold for seen in seen_texts):
seen_texts.append(text)
indices_to_keep.append(i)
return df.iloc[indices_to_keep]
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class ProcessControl:
def __init__(self):
self.stop_requested = False
def request_stop(self):
self.stop_requested = True
def should_stop(self):
return self.stop_requested
def reset(self):
self.stop_requested = False
class ProcessControl:
def __init__(self):
self.stop_requested = False
self.error = None
def request_stop(self):
self.stop_requested = True
def should_stop(self):
return self.stop_requested
def reset(self):
self.stop_requested = False
self.error = None
def set_error(self, error):
self.error = error
self.stop_requested = True
class EventDetector:
def __init__(self):
try:
self.model_name = "google/mt5-small"
self.tokenizer = AutoTokenizer.from_pretrained(
self.model_name,
legacy=True
)
self.model = None
self.finbert = None
self.roberta = None
self.finbert_tone = None
self.last_gpu_use = 0
self.initialized = False
logger.info("EventDetector initialized successfully")
except Exception as e:
logger.error(f"Error in EventDetector initialization: {e}")
raise
@spaces.GPU(duration=30)
def initialize_models(self):
if self.initialized:
return True
try:
current_time = time.time()
if current_time - self.last_gpu_use < 2:
time.sleep(2)
device = "cuda" if torch.cuda.is_available() else "cpu"
logger.info(f"Initializing models on device: {device}")
self.model = AutoModelForSeq2SeqLM.from_pretrained(self.model_name).to(device)
# Initialize sentiment models with proper error handling
try:
self.finbert = pipeline(
"sentiment-analysis",
model="ProsusAI/finbert",
device=device,
truncation=True,
max_length=512
)
except Exception as e:
logger.error(f"Error initializing finbert: {e}")
raise
try:
self.roberta = pipeline(
"sentiment-analysis",
model="cardiffnlp/twitter-roberta-base-sentiment",
device=device,
truncation=True,
max_length=512
)
except Exception as e:
logger.error(f"Error initializing roberta: {e}")
raise
try:
self.finbert_tone = pipeline(
"sentiment-analysis",
model="yiyanghkust/finbert-tone",
device=device,
truncation=True,
max_length=512
)
except Exception as e:
logger.error(f"Error initializing finbert_tone: {e}")
raise
self.last_gpu_use = time.time()
self.initialized = True
logger.info("All models initialized successfully")
return True
except Exception as e:
self.initialized = False
logger.error(f"Model initialization error: {str(e)}")
# Clean up any partially initialized models
self.cleanup()
raise
def cleanup(self):
"""Clean up GPU resources"""
try:
self.model = None
self.finbert = None
self.roberta = None
self.finbert_tone = None
torch.cuda.empty_cache()
self.initialized = False
except Exception as e:
logger.error(f"Error in cleanup: {e}")
@spaces.GPU(duration=20)
def detect_events(self, text, entity):
if not text or not entity:
return "Нет", "Invalid input"
try:
if not self.initialized:
if not self.initialize_models():
return "Нет", "Model initialization failed"
current_time = time.time()
if current_time - self.last_gpu_use < 2:
time.sleep(2)
text = text[:500] # Truncate text
prompt = f"""<s>Analyze the following news about {entity}:
Text: {text}
Task: Identify the main event type and provide a brief summary.</s>"""
device = self.model.device
inputs = self.tokenizer(
prompt,
return_tensors="pt",
padding=True,
truncation=True,
max_length=512
).to(device)
outputs = self.model.generate(
**inputs,
max_length=300,
num_return_sequences=1,
pad_token_id=self.tokenizer.pad_token_id,
eos_token_id=self.tokenizer.eos_token_id
)
response = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
# Event classification
event_type = "Нет"
if any(term in text.lower() for term in ['отчет', 'выручка', 'прибыль', 'ebitda']):
event_type = "Отчетность"
elif any(term in text.lower() for term in ['облигаци', 'купон', 'дефолт']):
event_type = "РЦБ"
elif any(term in text.lower() for term in ['суд', 'иск', 'арбитраж']):
event_type = "Суд"
self.last_gpu_use = time.time()
return event_type, response
except Exception as e:
logger.error(f"Event detection error: {str(e)}")
return "Нет", f"Error: {str(e)}"
@spaces.GPU(duration=20)
def analyze_sentiment(self, text):
try:
if not self.initialized:
if not self.initialize_models():
return "Neutral"
current_time = time.time()
if current_time - self.last_gpu_use < 2:
time.sleep(2)
truncated_text = text[:500]
results = []
try:
inputs = [truncated_text]
sentiment_results = []
# Process each model separately with delay
if self.finbert:
finbert_result = self.finbert(inputs, truncation=True, max_length=512)[0]
results.append(self.get_sentiment_label(finbert_result))
time.sleep(0.5)
if self.roberta:
roberta_result = self.roberta(inputs, truncation=True, max_length=512)[0]
results.append(self.get_sentiment_label(roberta_result))
time.sleep(0.5)
if self.finbert_tone:
finbert_tone_result = self.finbert_tone(inputs, truncation=True, max_length=512)[0]
results.append(self.get_sentiment_label(finbert_tone_result))
# Get majority vote
if results:
sentiment_counts = pd.Series(results).value_counts()
final_sentiment = sentiment_counts.index[0] if sentiment_counts.iloc[0] >= 2 else "Neutral"
else:
final_sentiment = "Neutral"
self.last_gpu_use = time.time()
return final_sentiment
except Exception as e:
logger.error(f"Model inference error: {e}")
return "Neutral"
except Exception as e:
logger.error(f"Sentiment analysis error: {e}")
return "Neutral"
def create_visualizations(df):
if df is None or df.empty:
return None, None
try:
sentiments = df['Sentiment'].value_counts()
fig_sentiment = go.Figure(data=[go.Pie(
labels=sentiments.index,
values=sentiments.values,
marker_colors=['#FF6B6B', '#4ECDC4', '#95A5A6']
)])
fig_sentiment.update_layout(title="Распределение тональности")
events = df['Event_Type'].value_counts()
fig_events = go.Figure(data=[go.Bar(
x=events.index,
y=events.values,
marker_color='#2196F3'
)])
fig_events.update_layout(title="Распределение событий")
return fig_sentiment, fig_events
except Exception as e:
logger.error(f"Visualization error: {e}")
return None, None
@spaces.GPU
def process_file(file_obj):
try:
logger.info("Starting to read Excel file...")
df = pd.read_excel(file_obj, sheet_name='Публикации')
logger.info(f"Successfully read Excel file. Shape: {df.shape}")
# Deduplication
original_count = len(df)
df = fuzzy_deduplicate(df, 'Выдержки из текста', threshold=55)
logger.info(f"Removed {original_count - len(df)} duplicate entries")
detector = EventDetector()
processed_rows = []
total = len(df)
# Process in smaller batches with quota management
BATCH_SIZE = 3 # Reduced batch size
QUOTA_WAIT_TIME = 60 # Wait time when quota is exceeded
for batch_start in range(0, total, BATCH_SIZE):
try:
batch_end = min(batch_start + BATCH_SIZE, total)
batch = df.iloc[batch_start:batch_end]
# Initialize models for batch
if not detector.initialized:
detector.initialize_models()
time.sleep(1) # Wait after initialization
for idx, row in batch.iterrows():
try:
text = str(row.get('Выдержки из текста', ''))
if not text.strip():
continue
entity = str(row.get('Объект', ''))
if not entity.strip():
continue
# Process with GPU quota management
event_type = "Нет"
event_summary = ""
sentiment = "Neutral"
try:
event_type, event_summary = detector.detect_events(text, entity)
time.sleep(1) # Wait between GPU operations
sentiment = detector.analyze_sentiment(text)
except Exception as e:
if "GPU quota" in str(e):
logger.warning("GPU quota exceeded, waiting...")
time.sleep(QUOTA_WAIT_TIME)
continue
else:
raise e
processed_rows.append({
'Объект': entity,
'Заголовок': str(row.get('Заголовок', '')),
'Sentiment': sentiment,
'Event_Type': event_type,
'Event_Summary': event_summary,
'Текст': text[:1000]
})
logger.info(f"Processed {idx + 1}/{total} rows")
except Exception as e:
logger.error(f"Error processing row {idx}: {str(e)}")
continue
# Create intermediate results
if processed_rows:
intermediate_df = pd.DataFrame(processed_rows)
yield (
intermediate_df,
None,
None,
f"Обработано {len(processed_rows)}/{total} строк"
)
# Wait between batches
time.sleep(2)
# Cleanup GPU resources after each batch
torch.cuda.empty_cache()
except Exception as e:
logger.error(f"Batch processing error: {str(e)}")
if "GPU quota" in str(e):
time.sleep(QUOTA_WAIT_TIME)
continue
# Final results
if processed_rows:
result_df = pd.DataFrame(processed_rows)
fig_sentiment, fig_events = create_visualizations(result_df)
return result_df, fig_sentiment, fig_events, "Обработка завершена!"
else:
return None, None, None, "Нет обработанных данных"
except Exception as e:
logger.error(f"File processing error: {str(e)}")
raise
def create_interface():
control = ProcessControl()
with gr.Blocks(theme=gr.themes.Soft()) as app:
gr.Markdown("# AI-анализ мониторинга новостей v.1.20")
with gr.Row():
file_input = gr.File(
label="Загрузите Excel файл",
file_types=[".xlsx"],
type="binary"
)
with gr.Row():
with gr.Column(scale=1):
analyze_btn = gr.Button(
"▶️ Начать анализ",
variant="primary",
size="lg"
)
with gr.Column(scale=1):
stop_btn = gr.Button(
"⏹️ Остановить",
variant="stop",
size="lg"
)
with gr.Row():
progress = gr.Textbox(
label="Статус обработки",
interactive=False,
value="Ожидание файла..."
)
with gr.Row():
stats = gr.DataFrame(
label="Результаты анализа",
interactive=False,
wrap=True
)
with gr.Row():
with gr.Column(scale=1):
sentiment_plot = gr.Plot(label="Распределение тональности")
with gr.Column(scale=1):
events_plot = gr.Plot(label="Распределение событий")
def stop_processing():
control.request_stop()
return "Остановка обработки..."
@spaces.GPU(duration=300) # 5 minutes duration for the entire analysis
def analyze(file_bytes):
if file_bytes is None:
gr.Warning("Пожалуйста, загрузите файл")
return None, None, None, "Ожидание файла..."
try:
# Reset stop flag
control.reset()
file_obj = io.BytesIO(file_bytes)
logger.info("File loaded into BytesIO successfully")
detector = EventDetector()
# Initialize models with GPU
@spaces.GPU(duration=30)
def init_models():
return detector.initialize_models()
if not init_models():
raise Exception("Failed to initialize models")
# Process in batches with GPU allocation
@spaces.GPU(duration=20)
def process_batch(batch, entity):
event_type, event_summary = detector.detect_events(batch, entity)
time.sleep(1) # Wait between GPU operations
sentiment = detector.analyze_sentiment(batch)
return event_type, event_summary, sentiment
# Read and deduplicate data
df = pd.read_excel(file_obj, sheet_name='Публикации')
original_count = len(df)
df = fuzzy_deduplicate(df, 'Выдержки из текста', threshold=55)
logger.info(f"Removed {original_count - len(df)} duplicate entries")
processed_rows = []
total = len(df)
batch_size = 3
for batch_start in range(0, total, batch_size):
if control.should_stop():
break
batch_end = min(batch_start + batch_size, total)
batch = df.iloc[batch_start:batch_end]
for idx, row in batch.iterrows():
try:
text = str(row.get('Выдержки из текста', '')).strip()
entity = str(row.get('Объект', '')).strip()
if not text or not entity:
continue
# Process with GPU
event_type, event_summary, sentiment = process_batch(text, entity)
processed_rows.append({
'Объект': entity,
'Заголовок': str(row.get('Заголовок', '')),
'Sentiment': sentiment,
'Event_Type': event_type,
'Event_Summary': event_summary,
'Текст': text[:1000]
})
except Exception as e:
logger.error(f"Error processing row {idx}: {str(e)}")
continue
# Create intermediate results
if processed_rows:
result_df = pd.DataFrame(processed_rows)
fig_sentiment, fig_events = create_visualizations(result_df)
yield (
result_df,
fig_sentiment,
fig_events,
f"Обработано {len(processed_rows)}/{total} строк"
)
# Cleanup GPU resources after batch
torch.cuda.empty_cache()
time.sleep(2)
if processed_rows:
final_df = pd.DataFrame(processed_rows)
fig_sentiment, fig_events = create_visualizations(final_df)
return final_df, fig_sentiment, fig_events, "Обработка завершена!"
else:
return None, None, None, "Нет обработанных данных"
except Exception as e:
error_msg = f"Ошибка анализа: {str(e)}"
logger.error(error_msg)
gr.Error(error_msg)
return None, None, None, error_msg
stop_btn.click(fn=stop_processing, outputs=[progress])
analyze_btn.click(
fn=analyze,
inputs=[file_input],
outputs=[stats, sentiment_plot, events_plot, progress]
)
return app
if __name__ == "__main__":
app = create_interface()
app.launch(share=True)