|
import json |
|
import re |
|
from pathlib import Path |
|
from typing import List, Tuple, Dict, Optional |
|
import logging |
|
import gradio as gr |
|
import pandas as pd |
|
from datetime import datetime |
|
import asyncio |
|
|
|
from src.processor import LLMProcessor |
|
from src.utils import ( |
|
setup_logging, format_chat_history, |
|
get_messages_from_json, format_mb_recommendation, |
|
validate_json_structure, sanitize_input, |
|
validate_mb_recommendation, get_scenario_type |
|
) |
|
|
|
class GradioInterface: |
|
"""Клас для створення веб-інтерфейсу використовуючи Gradio.""" |
|
|
|
def __init__(self): |
|
"""Ініціалізація інтерфейсу.""" |
|
setup_logging() |
|
self.processor = LLMProcessor() |
|
self.current_df = None |
|
self.available_providers = ["anthropic", "openai", "azure"] |
|
self.current_file_name = None |
|
self.mb_indices = [] |
|
|
|
|
|
self.help_content = self._load_help_content() |
|
|
|
|
|
self.current_patient_info = "" |
|
self.current_chat_history = "" |
|
|
|
def _load_help_content(self) -> str: |
|
"""Завантаження контенту допомоги з файлу.""" |
|
try: |
|
help_path = Path(__file__).parent.parent / "docs" / "HELP.md" |
|
with open(help_path, 'r', encoding='utf-8') as file: |
|
return file.read() |
|
except Exception as e: |
|
logging.error(f"Error loading help content: {str(e)}") |
|
return "Help content is currently unavailable." |
|
|
|
def _process_file(self, file_obj) -> Tuple[str, str, str, str]: |
|
""" |
|
Обробка завантаженого файлу. |
|
|
|
Args: |
|
file_obj: Об'єкт завантаженого файлу |
|
|
|
Returns: |
|
Tuple[str, str, str, str]: (Статус обробки, Інформація про файл, |
|
Інформація про пацієнта, Історія чату) |
|
""" |
|
if file_obj is None: |
|
return "Error: No file uploaded", "", "", "" |
|
|
|
try: |
|
|
|
file_path = file_obj.name |
|
self.current_file_name = Path(file_path).name |
|
|
|
|
|
with open(file_path, 'r', encoding='utf-8') as file: |
|
json_data = json.load(file) |
|
|
|
if not validate_json_structure(json_data): |
|
raise ValueError("Invalid JSON structure") |
|
|
|
|
|
messages = get_messages_from_json(json_data) |
|
chat_history, self.mb_indices = format_chat_history(messages) |
|
|
|
|
|
rows = [] |
|
chat_context = [] |
|
patient_summary = json_data.get('Context', '') |
|
|
|
for msg in messages: |
|
if isinstance(msg, str): |
|
mb_recommendation = format_mb_recommendation(msg) |
|
if validate_mb_recommendation(json.loads(mb_recommendation)): |
|
rows.append({ |
|
'PATIENT_SUMMARY': patient_summary, |
|
'CHAT_CONTEXT': format_chat_history(chat_context)[0], |
|
'MB_RECOMMENDATION': mb_recommendation |
|
}) |
|
chat_context.append(msg) |
|
|
|
if not rows: |
|
raise ValueError("No valid messages found") |
|
|
|
self.current_df = pd.DataFrame(rows) |
|
|
|
|
|
self.current_patient_info = patient_summary |
|
self.current_chat_history = chat_history |
|
|
|
|
|
file_info = ( |
|
f"📁 File: {self.current_file_name}\n" |
|
f"📊 Records: {len(messages)}\n" |
|
f"🕒 Processed at: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n" |
|
f"🤖 MB Messages: {len(self.mb_indices)}" |
|
) |
|
|
|
logging.info(f"File processed successfully: {self.current_file_name}") |
|
return ( |
|
f"✓ File successfully processed. Found {len(self.current_df)} records.", |
|
file_info, |
|
patient_summary, |
|
chat_history |
|
) |
|
|
|
except Exception as e: |
|
error_msg = f"File processing error: {str(e)}" |
|
logging.error(error_msg) |
|
return error_msg, "", "", "" |
|
|
|
async def _process_row(self, row_number: int, provider: str) -> Tuple[Dict, str]: |
|
""" |
|
Обробка одного рядка даних. |
|
|
|
Args: |
|
row_number: Номер рядка для обробки |
|
provider: Провайдер для використання |
|
|
|
Returns: |
|
Tuple[Dict, str]: (Результати обробки, Статус обробки) |
|
""" |
|
logging.info(f"Starting _process_row with row_number: {row_number}, provider: {provider}") |
|
|
|
if self.current_df is None: |
|
logging.warning("No data loaded, current_df is None") |
|
return {}, "⚠️ Please upload a file first" |
|
|
|
try: |
|
mb_index = row_number - 1 |
|
if mb_index not in self.mb_indices: |
|
logging.warning(f"Row {row_number} is not an MB message. Valid indices: {self.mb_indices}") |
|
return {}, f"⚠️ Row {row_number} is not a Medical Brain message" |
|
|
|
logging.info(f"Processing row {row_number} (index {mb_index})") |
|
row = self.current_df.iloc[mb_index] |
|
|
|
logging.debug(f"Row data: {row.to_dict()}") |
|
|
|
|
|
logging.info("Calling processor.process_single_row...") |
|
processing_details = await self.processor.process_single_row( |
|
patient_summary=sanitize_input(str(row['PATIENT_SUMMARY'])), |
|
chat_context=sanitize_input(str(row['CHAT_CONTEXT'])), |
|
mb_recommendation=str(row['MB_RECOMMENDATION']), |
|
provider=provider |
|
) |
|
logging.info("Received processing details") |
|
logging.debug(f"Processing details: {json.dumps(processing_details, indent=2)}") |
|
|
|
|
|
display_results = { |
|
'scenario': processing_details['scenario'], |
|
'timestamp': processing_details['timestamp'], |
|
'provider': provider, |
|
'stages': {} |
|
} |
|
|
|
|
|
for stage, data in processing_details['stages'].items(): |
|
logging.debug(f"Processing stage: {stage}") |
|
display_results['stages'][stage] = { |
|
'prompt': data['prompt'], |
|
'response': json.loads(data['response']) |
|
} |
|
|
|
|
|
display_results['final_response'] = processing_details['final_response'] |
|
logging.info("Results formatting completed") |
|
logging.debug(f"Final display results: {json.dumps(display_results, indent=2)}") |
|
|
|
return display_results, "✓ Processing completed successfully" |
|
|
|
except Exception as e: |
|
error_msg = f"Error processing row: {str(e)}" |
|
logging.error(error_msg, exc_info=True) |
|
return {}, f"❌ {error_msg}" |
|
|
|
def create_interface(self) -> gr.Blocks: |
|
"""Створення інтерфейсу Gradio.""" |
|
with gr.Blocks(title="AI Assistant for Medical Brain", |
|
theme=gr.themes.Soft()) as interface: |
|
gr.Markdown("# AI Assistant for Medical Brain") |
|
|
|
with gr.Tabs() as tabs: |
|
|
|
with gr.Tab("🔍 Main"): |
|
with gr.Row(): |
|
|
|
with gr.Column(scale=1): |
|
file_input = gr.File( |
|
label="Upload JSON File", |
|
file_types=[".json"] |
|
) |
|
file_status = gr.Textbox(label="Status") |
|
file_info = gr.Textbox(label="File Information") |
|
|
|
|
|
with gr.Column(scale=2): |
|
patient_info = gr.TextArea( |
|
label="Patient Information", |
|
lines=10, |
|
max_lines=15 |
|
) |
|
chat_history = gr.TextArea( |
|
label="Chat History", |
|
lines=10, |
|
max_lines=20 |
|
) |
|
|
|
with gr.Row(): |
|
with gr.Column(scale=1): |
|
row_number = gr.Number( |
|
label="Message Number", |
|
value=0, |
|
minimum=1, |
|
step=1 |
|
) |
|
provider_dropdown = gr.Dropdown( |
|
choices=self.available_providers, |
|
label="Select Provider", |
|
value=self.available_providers[0] |
|
) |
|
process_btn = gr.Button("Process", variant="primary") |
|
|
|
with gr.Column(scale=2): |
|
with gr.Tabs() as result_tabs: |
|
with gr.Tab("Final Result"): |
|
final_result = gr.JSON( |
|
label="Processing Result" |
|
) |
|
with gr.Tab("Risk Assessment"): |
|
risk_assessment = gr.JSON( |
|
label="Risk Assessment Details" |
|
) |
|
with gr.Tab("Classification"): |
|
classification = gr.JSON( |
|
label="Risk Classification" |
|
) |
|
with gr.Tab("Raw Data"): |
|
raw_data = gr.JSON( |
|
label="Complete Processing Details" |
|
) |
|
|
|
|
|
with gr.Tab("❓ Help"): |
|
gr.Markdown(self.help_content) |
|
|
|
|
|
file_input.change( |
|
fn=self._process_file, |
|
inputs=[file_input], |
|
outputs=[file_status, file_info, patient_info, chat_history] |
|
) |
|
|
|
async def process_and_show_details(row_num: int, provider: str): |
|
"""Обробка та відображення результатів.""" |
|
if not row_num or row_num < 1: |
|
return {"error": "Invalid row number"}, {}, {}, {} |
|
|
|
if not provider or provider not in self.available_providers: |
|
return {"error": "Invalid provider"}, {}, {}, {} |
|
|
|
try: |
|
results, status = await self._process_row(row_num, provider) |
|
if 'error' in results: |
|
return results, {}, {}, {} |
|
|
|
final_response = results.get('final_response', {}) |
|
risk_data = {} |
|
classification_data = {} |
|
|
|
stages = results.get('stages', {}) |
|
if 'risk_assessment' in stages: |
|
risk_data = stages['risk_assessment'].get('response', {}) |
|
|
|
if 'red_classification' in stages: |
|
classification_data = stages['red_classification'].get('response', {}) |
|
|
|
return final_response, risk_data, classification_data, results |
|
|
|
except Exception as e: |
|
error = {"error": str(e)} |
|
return error, {}, {}, {} |
|
|
|
process_btn.click( |
|
fn=process_and_show_details, |
|
inputs=[row_number, provider_dropdown], |
|
outputs=[final_result, risk_assessment, classification, raw_data] |
|
) |
|
|
|
return interface |
|
|
|
def launch_interface(): |
|
"""Запуск інтерфейсу.""" |
|
interface = GradioInterface() |
|
app = interface.create_interface() |
|
app.queue() |
|
app.launch( |
|
server_name="0.0.0.0", |
|
server_port=7860, |
|
share=True, |
|
show_error=True |
|
) |
|
|
|
if __name__ == "__main__": |
|
launch_interface() |