DocUA's picture
Нарешті робочий варіант, принаймі для Антропік
228113c
import json
import re
from pathlib import Path
from typing import List, Tuple, Dict, Optional
import logging
import gradio as gr
import pandas as pd
from datetime import datetime
import asyncio
from src.processor import LLMProcessor
from src.utils import (
setup_logging, format_chat_history,
get_messages_from_json, format_mb_recommendation,
validate_json_structure, sanitize_input,
validate_mb_recommendation, get_scenario_type
)
class GradioInterface:
"""Клас для створення веб-інтерфейсу використовуючи Gradio."""
def __init__(self):
"""Ініціалізація інтерфейсу."""
setup_logging()
self.processor = LLMProcessor()
self.current_df = None
self.available_providers = ["anthropic", "openai", "azure"]
self.current_file_name = None
self.mb_indices = []
# Завантаження контенту допомоги
self.help_content = self._load_help_content()
# Зберігаємо поточну інформацію
self.current_patient_info = ""
self.current_chat_history = ""
def _load_help_content(self) -> str:
"""Завантаження контенту допомоги з файлу."""
try:
help_path = Path(__file__).parent.parent / "docs" / "HELP.md"
with open(help_path, 'r', encoding='utf-8') as file:
return file.read()
except Exception as e:
logging.error(f"Error loading help content: {str(e)}")
return "Help content is currently unavailable."
def _process_file(self, file_obj) -> Tuple[str, str, str, str]:
"""
Обробка завантаженого файлу.
Args:
file_obj: Об'єкт завантаженого файлу
Returns:
Tuple[str, str, str, str]: (Статус обробки, Інформація про файл,
Інформація про пацієнта, Історія чату)
"""
if file_obj is None:
return "Error: No file uploaded", "", "", ""
try:
# Отримання шляху до файлу
file_path = file_obj.name
self.current_file_name = Path(file_path).name
# Завантаження та валідація JSON
with open(file_path, 'r', encoding='utf-8') as file:
json_data = json.load(file)
if not validate_json_structure(json_data):
raise ValueError("Invalid JSON structure")
# Отримання повідомлень
messages = get_messages_from_json(json_data)
chat_history, self.mb_indices = format_chat_history(messages)
# Створення DataFrame
rows = []
chat_context = []
patient_summary = json_data.get('Context', '')
for msg in messages:
if isinstance(msg, str):
mb_recommendation = format_mb_recommendation(msg)
if validate_mb_recommendation(json.loads(mb_recommendation)):
rows.append({
'PATIENT_SUMMARY': patient_summary,
'CHAT_CONTEXT': format_chat_history(chat_context)[0],
'MB_RECOMMENDATION': mb_recommendation
})
chat_context.append(msg)
if not rows:
raise ValueError("No valid messages found")
self.current_df = pd.DataFrame(rows)
# Зберігаємо поточні значення
self.current_patient_info = patient_summary
self.current_chat_history = chat_history
# Формування інформації про файл
file_info = (
f"📁 File: {self.current_file_name}\n"
f"📊 Records: {len(messages)}\n"
f"🕒 Processed at: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n"
f"🤖 MB Messages: {len(self.mb_indices)}"
)
logging.info(f"File processed successfully: {self.current_file_name}")
return (
f"✓ File successfully processed. Found {len(self.current_df)} records.",
file_info,
patient_summary,
chat_history
)
except Exception as e:
error_msg = f"File processing error: {str(e)}"
logging.error(error_msg)
return error_msg, "", "", ""
async def _process_row(self, row_number: int, provider: str) -> Tuple[Dict, str]:
"""
Обробка одного рядка даних.
Args:
row_number: Номер рядка для обробки
provider: Провайдер для використання
Returns:
Tuple[Dict, str]: (Результати обробки, Статус обробки)
"""
logging.info(f"Starting _process_row with row_number: {row_number}, provider: {provider}")
if self.current_df is None:
logging.warning("No data loaded, current_df is None")
return {}, "⚠️ Please upload a file first"
try:
mb_index = row_number - 1
if mb_index not in self.mb_indices:
logging.warning(f"Row {row_number} is not an MB message. Valid indices: {self.mb_indices}")
return {}, f"⚠️ Row {row_number} is not a Medical Brain message"
logging.info(f"Processing row {row_number} (index {mb_index})")
row = self.current_df.iloc[mb_index]
logging.debug(f"Row data: {row.to_dict()}")
# Обробка рядка
logging.info("Calling processor.process_single_row...")
processing_details = await self.processor.process_single_row(
patient_summary=sanitize_input(str(row['PATIENT_SUMMARY'])),
chat_context=sanitize_input(str(row['CHAT_CONTEXT'])),
mb_recommendation=str(row['MB_RECOMMENDATION']),
provider=provider
)
logging.info("Received processing details")
logging.debug(f"Processing details: {json.dumps(processing_details, indent=2)}")
# Форматування результатів для відображення
display_results = {
'scenario': processing_details['scenario'],
'timestamp': processing_details['timestamp'],
'provider': provider,
'stages': {}
}
# Додавання проміжних результатів
for stage, data in processing_details['stages'].items():
logging.debug(f"Processing stage: {stage}")
display_results['stages'][stage] = {
'prompt': data['prompt'],
'response': json.loads(data['response'])
}
# Додавання фінальної відповіді
display_results['final_response'] = processing_details['final_response']
logging.info("Results formatting completed")
logging.debug(f"Final display results: {json.dumps(display_results, indent=2)}")
return display_results, "✓ Processing completed successfully"
except Exception as e:
error_msg = f"Error processing row: {str(e)}"
logging.error(error_msg, exc_info=True)
return {}, f"❌ {error_msg}"
def create_interface(self) -> gr.Blocks:
"""Створення інтерфейсу Gradio."""
with gr.Blocks(title="AI Assistant for Medical Brain",
theme=gr.themes.Soft()) as interface:
gr.Markdown("# AI Assistant for Medical Brain")
with gr.Tabs() as tabs:
# Основна вкладка
with gr.Tab("🔍 Main"):
with gr.Row():
# Ліва колонка (завантаження файлу)
with gr.Column(scale=1):
file_input = gr.File(
label="Upload JSON File",
file_types=[".json"]
)
file_status = gr.Textbox(label="Status")
file_info = gr.Textbox(label="File Information")
# Права колонка (дані пацієнта)
with gr.Column(scale=2):
patient_info = gr.TextArea(
label="Patient Information",
lines=10,
max_lines=15
)
chat_history = gr.TextArea(
label="Chat History",
lines=10,
max_lines=20
)
with gr.Row():
with gr.Column(scale=1):
row_number = gr.Number(
label="Message Number",
value=0,
minimum=1,
step=1
)
provider_dropdown = gr.Dropdown(
choices=self.available_providers,
label="Select Provider",
value=self.available_providers[0]
)
process_btn = gr.Button("Process", variant="primary")
with gr.Column(scale=2):
with gr.Tabs() as result_tabs:
with gr.Tab("Final Result"):
final_result = gr.JSON(
label="Processing Result"
)
with gr.Tab("Risk Assessment"):
risk_assessment = gr.JSON(
label="Risk Assessment Details"
)
with gr.Tab("Classification"):
classification = gr.JSON(
label="Risk Classification"
)
with gr.Tab("Raw Data"):
raw_data = gr.JSON(
label="Complete Processing Details"
)
# Вкладка допомоги
with gr.Tab("❓ Help"):
gr.Markdown(self.help_content)
# Події
file_input.change(
fn=self._process_file,
inputs=[file_input],
outputs=[file_status, file_info, patient_info, chat_history]
)
async def process_and_show_details(row_num: int, provider: str):
"""Обробка та відображення результатів."""
if not row_num or row_num < 1:
return {"error": "Invalid row number"}, {}, {}, {}
if not provider or provider not in self.available_providers:
return {"error": "Invalid provider"}, {}, {}, {}
try:
results, status = await self._process_row(row_num, provider)
if 'error' in results:
return results, {}, {}, {}
final_response = results.get('final_response', {})
risk_data = {}
classification_data = {}
stages = results.get('stages', {})
if 'risk_assessment' in stages:
risk_data = stages['risk_assessment'].get('response', {})
if 'red_classification' in stages:
classification_data = stages['red_classification'].get('response', {})
return final_response, risk_data, classification_data, results
except Exception as e:
error = {"error": str(e)}
return error, {}, {}, {}
process_btn.click(
fn=process_and_show_details,
inputs=[row_number, provider_dropdown],
outputs=[final_result, risk_assessment, classification, raw_data]
)
return interface
def launch_interface():
"""Запуск інтерфейсу."""
interface = GradioInterface()
app = interface.create_interface()
app.queue() # Додано для підтримки асинхронних операцій
app.launch(
server_name="0.0.0.0",
server_port=7860,
share=True,
show_error=True
)
if __name__ == "__main__":
launch_interface()