import json import logging import re from typing import Dict, List, Optional, Union, Tuple from pathlib import Path from datetime import datetime import asyncio import anthropic from openai import OpenAI from openai import AzureOpenAI import pandas as pd from src.config.config import ( OPENAI_API_KEY, ANTHROPIC_API_KEY, AZURE_API_KEY, AZURE_ENDPOINT, AVAILABLE_MODELS, MAX_TOKENS, TEMPERATURE ) from src.prompts import ( BASE_PROMPT, RISK_ASSESSMENT_PROMPT, YELLOW_PROMPT, RED_PROMPT_CLASS, RED_PROMPT ) from src.utils import ( clean_json_response, validate_risk_assessment, validate_classification_response, validate_final_response, retry_with_backoff ) class LLMProcessor: """Клас для обробки даних через різні LLM API.""" def __init__(self): """Ініціалізація процесора.""" self.client_openai = None self.client_anthropic = None self.client_azure = None self.category_instructions = {} self.initialize_clients() self.load_category_instructions() def initialize_clients(self) -> None: """Ініціалізація клієнтів API з ключами з конфігурації.""" try: if OPENAI_API_KEY: self.client_openai = OpenAI(api_key=OPENAI_API_KEY) logging.info("OpenAI client initialized") else: logging.warning("OpenAI API key not found") if ANTHROPIC_API_KEY: self.client_anthropic = anthropic.Anthropic(api_key=ANTHROPIC_API_KEY) logging.info("Anthropic client initialized") else: logging.warning("Anthropic API key not found") if AZURE_API_KEY and AZURE_ENDPOINT: self.client_azure = AzureOpenAI( azure_endpoint=AZURE_ENDPOINT, api_key=AZURE_API_KEY, api_version="2024-05-01-preview" ) logging.info("Azure OpenAI client initialized") else: logging.warning("Azure OpenAI credentials not found") except Exception as e: logging.error(f"Error initializing clients: {str(e)}") def load_category_instructions(self) -> None: """Завантаження інструкцій для категорій.""" try: guidance_path = Path(__file__).parent / "high-risk-symptoms-guidance" if not guidance_path.exists(): logging.error("High risk symptoms guidance directory not found") return # Завантаження всіх txt файлів for file_path in guidance_path.glob("*.txt"): category_name = file_path.stem try: with open(file_path, 'r', encoding='utf-8') as f: self.category_instructions[category_name] = f.read().strip() logging.info(f"Loaded instructions for category: {category_name}") except Exception as e: logging.error(f"Error loading instructions for {category_name}: {e}") logging.info(f"Loaded {len(self.category_instructions)} category instructions") except Exception as e: logging.error(f"Error loading category instructions: {e}") def get_category_instruction(self, category: str) -> str: """Отримання інструкцій для категорії.""" instruction = self.category_instructions.get(category) if not instruction: logging.warning(f"No instructions found for category: {category}") return "No specific guidance available for this category." return instruction def create_prompt(self, patient_summary: str, chat_context: str, mb_recommendation: str, prompt_template: str, classification: Optional[dict] = None) -> str: """ Створення промпту для LLM. Args: patient_summary: Інформація про пацієнта chat_context: Історія спілкування mb_recommendation: Рекомендація MB prompt_template: Шаблон промпту classification: Результати класифікації для червоного сценарію Returns: str: Готовий промпт """ logging.info("Creating prompt with template...") prompt = prompt_template.replace('{{context}}', patient_summary) prompt = prompt.replace('{{history}}', chat_context) prompt = prompt.replace('{{notifications}}', mb_recommendation) if classification: # Додавання інструкцій для категорій categories = classification.get('RelevantCategories', []) category_instructions = {} for category in categories: instruction = self.get_category_instruction(category) category_instructions[category] = instruction logging.debug(f"Added instruction for category: {category}") prompt = prompt.replace('{{high_risk_level}}', str(classification.get('HighRiskLevel', ''))) prompt = prompt.replace('{{relevant_categories}}', json.dumps(category_instructions, ensure_ascii=False)) prompt = prompt.replace('{{reason}}', classification.get('Reason', '')) return prompt @retry_with_backoff async def _process_with_provider(self, prompt: str, provider: str, prompt_type: str = "response") -> str: """Обробка промпту через провайдера.""" logging.info(f"Processing with {provider}, prompt type: {prompt_type}") try: max_tokens = MAX_TOKENS.get(prompt_type, MAX_TOKENS["response"]) temperature = TEMPERATURE.get(prompt_type, TEMPERATURE["response"]) if provider == 'anthropic': response = await self.process_with_anthropic(prompt, max_tokens, temperature) elif provider == 'openai': response = await self.process_with_openai(prompt, max_tokens, temperature) elif provider == 'azure': response = await self.process_with_azure(prompt, max_tokens, temperature) else: raise ValueError(f"Unknown API provider: {provider}") # Детальне логування відповіді logging.debug(f"Raw {prompt_type} response: {response}") # Очищення та валідація відповіді cleaned_response = clean_json_response(response) logging.debug(f"Cleaned {prompt_type} response: {cleaned_response}") # Валідація відповідно до типу if prompt_type == "risk": if not validate_risk_assessment(cleaned_response): raise ValueError("Invalid risk assessment response format") elif prompt_type == "classification": if not validate_classification_response(cleaned_response): raise ValueError("Invalid classification response format") else: if not validate_final_response(cleaned_response): raise ValueError("Invalid final response format") logging.info(f"Response from {provider} ({prompt_type}) validated successfully") return cleaned_response except Exception as e: logging.error(f"Error in {prompt_type} processing: {str(e)}") raise @retry_with_backoff async def process_with_anthropic(self, prompt: str, max_tokens: int, temperature: float) -> str: """Обробка через Anthropic API.""" if not self.client_anthropic: raise ValueError("Anthropic client not initialized") try: message = self.client_anthropic.messages.create( model=AVAILABLE_MODELS["anthropic"], max_tokens=max_tokens, temperature=temperature, messages=[{ "role": "user", "content": [{ "type": "text", "text": prompt }] }] ) response = message.content[0].text # Логування сирої відповіді для діагностики logging.debug(f"Raw Anthropic response: {response}") return response except Exception as e: # У випадку помилки логувати деталі відповіді, якщо вони є logging.error(f"Error in Anthropic API call: {str(e)}. Raw response: {response if 'response' in locals() else 'No response received'}") raise @retry_with_backoff async def process_with_openai(self, prompt: str, max_tokens: int, temperature: float) -> str: """Обробка через OpenAI API.""" if not self.client_openai: raise ValueError("OpenAI client not initialized") try: response = await self.client_openai.chat.completions.create( model=AVAILABLE_MODELS["openai"], messages=[ {"role": "user", "content": prompt} ], temperature=temperature, max_tokens=max_tokens, response_format={"type": "json_object"} ) return response.choices[0].message.content except Exception as e: logging.error(f"Error in OpenAI API call: {str(e)}") raise @retry_with_backoff async def process_with_azure(self, prompt: str, max_tokens: int, temperature: float) -> str: """Обробка через Azure OpenAI API.""" if not self.client_azure: raise ValueError("Azure OpenAI client not initialized") try: response = await self.client_azure.chat.completions.create( model=AVAILABLE_MODELS["azure"], messages=[ {"role": "user", "content": prompt} ], response_format={"type": "json_object"}, temperature=temperature, max_tokens=max_tokens ) return response.choices[0].message.content except Exception as e: logging.error(f"Error in Azure API call: {str(e)}") raise async def process_single_row(self, patient_summary: str, chat_context: str, mb_recommendation: str, provider: str) -> Dict: """ Обробка одного рядка даних з деталізацією процесу. Args: patient_summary: Інформація про пацієнта chat_context: Історія спілкування mb_recommendation: Рекомендація MB provider: Провайдер LLM Returns: Dict: Результати обробки з проміжними даними """ try: logging.info(f"Starting process_single_row with provider: {provider}") logging.debug(f"Input data lengths - summary: {len(patient_summary)}, " f"context: {len(chat_context)}, " f"recommendation: {len(mb_recommendation)}") processing_details = { 'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), 'provider': provider, 'stages': {}, 'mb_recommendation': mb_recommendation } # Етап 1: Оцінка ризику logging.info("Creating risk assessment prompt...") risk_prompt = self.create_prompt( patient_summary, chat_context, mb_recommendation, RISK_ASSESSMENT_PROMPT ) logging.debug(f"Risk assessment prompt created, length: {len(risk_prompt)}") logging.info("Processing risk assessment...") risk_response = await self._process_with_provider( risk_prompt, provider, "risk" ) processing_details['stages']['risk_assessment'] = { 'prompt': risk_prompt, 'response': risk_response } risk_data = json.loads(risk_response) logging.info(f"Risk assessment result: {json.dumps(risk_data, indent=2)}") # Визначення сценарію та подальша обробка if not risk_data['high_risk_level']: if risk_data['agree_with_mb']: # Зелений сценарій logging.info("Processing GREEN scenario") processing_details['scenario'] = 'GREEN' mb_data = json.loads(mb_recommendation) final_response = [{ "Id": mb_data.get('Id', ''), "NotificationPriority": mb_data.get('NotificationPriority', '3'), "Direction": mb_data.get('Direction', ''), "Message": mb_data.get('Message', {}), "HighRisk": False, "Reason": risk_data['reason'] }] else: # Жовтий сценарій logging.info("Processing YELLOW scenario") processing_details['scenario'] = 'YELLOW' yellow_prompt = self.create_prompt( patient_summary, chat_context, mb_recommendation, YELLOW_PROMPT ) yellow_response = await self._process_with_provider( yellow_prompt, provider, "final" ) processing_details['stages']['yellow_scenario'] = { 'prompt': yellow_prompt, 'response': yellow_response } final_response = json.loads(yellow_response) else: # Червоний сценарій logging.info("Processing RED scenario") processing_details['scenario'] = 'RED' # Крок 1: Класифікація ризику logging.info("Creating classification prompt...") class_prompt = self.create_prompt( patient_summary, chat_context, mb_recommendation, RED_PROMPT_CLASS ) logging.info("Processing classification...") class_response = await self._process_with_provider( class_prompt, provider, "classification" ) processing_details['stages']['red_classification'] = { 'prompt': class_prompt, 'response': class_response } class_data = json.loads(class_response) logging.info(f"Classification result: {json.dumps(class_data, indent=2)}") # Крок 2: Генерація фінальної відповіді logging.info("Creating final red scenario prompt...") red_prompt = self.create_prompt( patient_summary, chat_context, mb_recommendation, RED_PROMPT, classification=class_data ) logging.info("Processing final recommendations...") red_response = await self._process_with_provider( red_prompt, provider, "final" ) processing_details['stages']['red_final'] = { 'prompt': red_prompt, 'response': red_response } final_response = json.loads(red_response) # Додаємо фінальну відповідь до результатів processing_details['final_response'] = final_response logging.info("Processing completed successfully") return processing_details except Exception as e: logging.error(f"Error in process_single_row: {str(e)}", exc_info=True) raise ValueError(f"Processing error: {str(e)}") def process_json_file(self, file_path: Union[str, Path]) -> pd.DataFrame: """ Обробка JSON файлу та підготовка DataFrame. Args: file_path: Шлях до JSON файлу Returns: pd.DataFrame: DataFrame з обробленими даними """ try: logging.info(f"Processing JSON file: {file_path}") with open(file_path, 'r', encoding='utf-8') as file: json_data = json.load(file) # Валідація структури JSON if not self.validate_input_json(json_data): raise ValueError("Invalid JSON structure") messages = json_data.get('History', []) patient_summary = json_data.get('Context', '') # Сортування повідомлень за часом messages.sort(key=lambda x: x['Timestamp']) rows = [] chat_context = [] for msg in messages: try: # Форматування повідомлення formatted_message = self.format_message(msg) if formatted_message: mb_recommendation = self.create_mb_recommendation(msg) rows.append({ 'PATIENT_SUMMARY': patient_summary, 'CHAT_CONTEXT': self.format_chat_history(chat_context), 'MB_RECOMMENDATION': mb_recommendation }) chat_context.append(formatted_message) except Exception as e: logging.error(f"Error processing message: {str(e)}") continue if not rows: raise ValueError("No valid messages found in file") logging.info(f"Processed {len(rows)} messages successfully") return pd.DataFrame(rows) except Exception as e: logging.error(f"Error processing file: {str(e)}") raise def validate_input_json(self, data: Dict) -> bool: """Валідація вхідного JSON.""" try: if not isinstance(data, dict): logging.error("Input data is not a dictionary") return False required_fields = ['Context', 'History'] if not all(field in data for field in required_fields): logging.error("Missing required fields in input JSON") return False if not isinstance(data['History'], list): logging.error("History must be a list") return False for message in data['History']: required_msg_fields = ['Timestamp', 'Direction', 'Subject', 'Body'] if not all(field in message for field in required_msg_fields): logging.error(f"Message missing required fields: {message}") return False return True except Exception as e: logging.error(f"Error validating input JSON: {str(e)}") return False def format_message(self, message: Dict) -> Optional[str]: """Форматування повідомлення.""" try: timestamp = datetime.strptime( message['Timestamp'], '%Y-%m-%dT%H:%M:%S.%f%z' ) return ( f"{timestamp.strftime('%m/%d/%Y %H:%M:%S')} " f"{message['Direction']}:\n" f"{message['Subject']}\n" f"{message['Body']}" ) except Exception as e: logging.error(f"Error formatting message: {str(e)}") return None def format_chat_history(self, messages: List[str]) -> str: """Форматування історії чату.""" try: formatted_messages = [] current_date = None for idx, msg in enumerate(messages, 1): if not isinstance(msg, str): continue # Отримання дати date_match = re.match(r'(\d{2}/\d{2}/\d{4})', msg) if not date_match: continue msg_date = date_match.group(1) # Додавання роздільника дати if msg_date != current_date: current_date = msg_date formatted_messages.append(f"\n📅 {current_date}\n{'─' * 40}") # Визначення типу повідомлення if 'system_to' in msg: icon = '🧠' elif 'provider_to' in msg: icon = '💊' elif 'office_to' in msg: icon = '🏥' elif 'patient_to' in msg: icon = '👤' else: icon = '❓' formatted_messages.append(f"{idx}. {icon} {msg}") return "\n".join(formatted_messages) except Exception as e: logging.error(f"Error formatting chat history: {str(e)}") return "" def create_mb_recommendation(self, message: Dict) -> str: """Створення структури рекомендації MB.""" try: recommendation = { "Id": message.get('Id', str(hash(message['Timestamp']))[:8]), "NotificationPriority": message.get('NotificationPriority', '3'), "Direction": message['Direction'], "Timestamp": datetime.strptime( message['Timestamp'], '%Y-%m-%dT%H:%M:%S.%f%z' ).strftime('%m/%d/%Y %H:%M:%S'), "Message": { "Subject": message['Subject'], "Body": message['Body'] } } return json.dumps(recommendation, indent=2, ensure_ascii=False) except Exception as e: logging.error(f"Error creating MB recommendation: {str(e)}") raise