|
import json |
|
import logging |
|
import re |
|
from typing import Dict, List, Optional, Union, Tuple |
|
from pathlib import Path |
|
from datetime import datetime |
|
import asyncio |
|
|
|
import anthropic |
|
from openai import OpenAI |
|
from openai import AzureOpenAI |
|
import pandas as pd |
|
|
|
from src.config.config import ( |
|
OPENAI_API_KEY, |
|
ANTHROPIC_API_KEY, |
|
AZURE_API_KEY, |
|
AZURE_ENDPOINT, |
|
AVAILABLE_MODELS, |
|
MAX_TOKENS, |
|
TEMPERATURE |
|
) |
|
from src.prompts import ( |
|
BASE_PROMPT, |
|
RISK_ASSESSMENT_PROMPT, |
|
YELLOW_PROMPT, |
|
RED_PROMPT_CLASS, |
|
RED_PROMPT |
|
) |
|
from src.utils import ( |
|
clean_json_response, |
|
validate_risk_assessment, |
|
validate_classification_response, |
|
validate_final_response, |
|
retry_with_backoff |
|
) |
|
|
|
class LLMProcessor: |
|
"""Клас для обробки даних через різні LLM API.""" |
|
|
|
def __init__(self): |
|
"""Ініціалізація процесора.""" |
|
self.client_openai = None |
|
self.client_anthropic = None |
|
self.client_azure = None |
|
self.category_instructions = {} |
|
|
|
self.initialize_clients() |
|
self.load_category_instructions() |
|
|
|
def initialize_clients(self) -> None: |
|
"""Ініціалізація клієнтів API з ключами з конфігурації.""" |
|
try: |
|
if OPENAI_API_KEY: |
|
self.client_openai = OpenAI(api_key=OPENAI_API_KEY) |
|
logging.info("OpenAI client initialized") |
|
else: |
|
logging.warning("OpenAI API key not found") |
|
|
|
if ANTHROPIC_API_KEY: |
|
self.client_anthropic = anthropic.Anthropic(api_key=ANTHROPIC_API_KEY) |
|
logging.info("Anthropic client initialized") |
|
else: |
|
logging.warning("Anthropic API key not found") |
|
|
|
if AZURE_API_KEY and AZURE_ENDPOINT: |
|
self.client_azure = AzureOpenAI( |
|
azure_endpoint=AZURE_ENDPOINT, |
|
api_key=AZURE_API_KEY, |
|
api_version="2024-05-01-preview" |
|
) |
|
logging.info("Azure OpenAI client initialized") |
|
else: |
|
logging.warning("Azure OpenAI credentials not found") |
|
|
|
except Exception as e: |
|
logging.error(f"Error initializing clients: {str(e)}") |
|
|
|
def load_category_instructions(self) -> None: |
|
"""Завантаження інструкцій для категорій.""" |
|
try: |
|
guidance_path = Path(__file__).parent / "high-risk-symptoms-guidance" |
|
if not guidance_path.exists(): |
|
logging.error("High risk symptoms guidance directory not found") |
|
return |
|
|
|
|
|
for file_path in guidance_path.glob("*.txt"): |
|
category_name = file_path.stem |
|
try: |
|
with open(file_path, 'r', encoding='utf-8') as f: |
|
self.category_instructions[category_name] = f.read().strip() |
|
logging.info(f"Loaded instructions for category: {category_name}") |
|
except Exception as e: |
|
logging.error(f"Error loading instructions for {category_name}: {e}") |
|
|
|
logging.info(f"Loaded {len(self.category_instructions)} category instructions") |
|
|
|
except Exception as e: |
|
logging.error(f"Error loading category instructions: {e}") |
|
|
|
def get_category_instruction(self, category: str) -> str: |
|
"""Отримання інструкцій для категорії.""" |
|
instruction = self.category_instructions.get(category) |
|
if not instruction: |
|
logging.warning(f"No instructions found for category: {category}") |
|
return "No specific guidance available for this category." |
|
return instruction |
|
|
|
def create_prompt(self, patient_summary: str, chat_context: str, |
|
mb_recommendation: str, prompt_template: str, |
|
classification: Optional[dict] = None) -> str: |
|
""" |
|
Створення промпту для LLM. |
|
|
|
Args: |
|
patient_summary: Інформація про пацієнта |
|
chat_context: Історія спілкування |
|
mb_recommendation: Рекомендація MB |
|
prompt_template: Шаблон промпту |
|
classification: Результати класифікації для червоного сценарію |
|
|
|
Returns: |
|
str: Готовий промпт |
|
""" |
|
logging.info("Creating prompt with template...") |
|
|
|
prompt = prompt_template.replace('{{context}}', patient_summary) |
|
prompt = prompt.replace('{{history}}', chat_context) |
|
prompt = prompt.replace('{{notifications}}', mb_recommendation) |
|
|
|
if classification: |
|
|
|
categories = classification.get('RelevantCategories', []) |
|
category_instructions = {} |
|
|
|
for category in categories: |
|
instruction = self.get_category_instruction(category) |
|
category_instructions[category] = instruction |
|
logging.debug(f"Added instruction for category: {category}") |
|
|
|
prompt = prompt.replace('{{high_risk_level}}', |
|
str(classification.get('HighRiskLevel', ''))) |
|
prompt = prompt.replace('{{relevant_categories}}', |
|
json.dumps(category_instructions, ensure_ascii=False)) |
|
prompt = prompt.replace('{{reason}}', |
|
classification.get('Reason', '')) |
|
|
|
return prompt |
|
|
|
@retry_with_backoff |
|
async def _process_with_provider(self, prompt: str, provider: str, |
|
prompt_type: str = "response") -> str: |
|
"""Обробка промпту через провайдера.""" |
|
logging.info(f"Processing with {provider}, prompt type: {prompt_type}") |
|
|
|
try: |
|
max_tokens = MAX_TOKENS.get(prompt_type, MAX_TOKENS["response"]) |
|
temperature = TEMPERATURE.get(prompt_type, TEMPERATURE["response"]) |
|
|
|
if provider == 'anthropic': |
|
response = await self.process_with_anthropic(prompt, max_tokens, temperature) |
|
elif provider == 'openai': |
|
response = await self.process_with_openai(prompt, max_tokens, temperature) |
|
elif provider == 'azure': |
|
response = await self.process_with_azure(prompt, max_tokens, temperature) |
|
else: |
|
raise ValueError(f"Unknown API provider: {provider}") |
|
|
|
|
|
logging.debug(f"Raw {prompt_type} response: {response}") |
|
|
|
|
|
cleaned_response = clean_json_response(response) |
|
logging.debug(f"Cleaned {prompt_type} response: {cleaned_response}") |
|
|
|
|
|
if prompt_type == "risk": |
|
if not validate_risk_assessment(cleaned_response): |
|
raise ValueError("Invalid risk assessment response format") |
|
elif prompt_type == "classification": |
|
if not validate_classification_response(cleaned_response): |
|
raise ValueError("Invalid classification response format") |
|
else: |
|
if not validate_final_response(cleaned_response): |
|
raise ValueError("Invalid final response format") |
|
|
|
logging.info(f"Response from {provider} ({prompt_type}) validated successfully") |
|
return cleaned_response |
|
|
|
except Exception as e: |
|
logging.error(f"Error in {prompt_type} processing: {str(e)}") |
|
raise |
|
|
|
@retry_with_backoff |
|
async def process_with_anthropic(self, prompt: str, max_tokens: int, temperature: float) -> str: |
|
"""Обробка через Anthropic API.""" |
|
if not self.client_anthropic: |
|
raise ValueError("Anthropic client not initialized") |
|
|
|
try: |
|
message = self.client_anthropic.messages.create( |
|
model=AVAILABLE_MODELS["anthropic"], |
|
max_tokens=max_tokens, |
|
temperature=temperature, |
|
messages=[{ |
|
"role": "user", |
|
"content": [{ |
|
"type": "text", |
|
"text": prompt |
|
}] |
|
}] |
|
) |
|
|
|
response = message.content[0].text |
|
|
|
logging.debug(f"Raw Anthropic response: {response}") |
|
return response |
|
|
|
except Exception as e: |
|
|
|
logging.error(f"Error in Anthropic API call: {str(e)}. Raw response: {response if 'response' in locals() else 'No response received'}") |
|
raise |
|
|
|
|
|
@retry_with_backoff |
|
async def process_with_openai(self, prompt: str, max_tokens: int, |
|
temperature: float) -> str: |
|
"""Обробка через OpenAI API.""" |
|
if not self.client_openai: |
|
raise ValueError("OpenAI client not initialized") |
|
|
|
try: |
|
response = await self.client_openai.chat.completions.create( |
|
model=AVAILABLE_MODELS["openai"], |
|
messages=[ |
|
{"role": "user", "content": prompt} |
|
], |
|
temperature=temperature, |
|
max_tokens=max_tokens, |
|
response_format={"type": "json_object"} |
|
) |
|
return response.choices[0].message.content |
|
except Exception as e: |
|
logging.error(f"Error in OpenAI API call: {str(e)}") |
|
raise |
|
|
|
@retry_with_backoff |
|
async def process_with_azure(self, prompt: str, max_tokens: int, |
|
temperature: float) -> str: |
|
"""Обробка через Azure OpenAI API.""" |
|
if not self.client_azure: |
|
raise ValueError("Azure OpenAI client not initialized") |
|
|
|
try: |
|
response = await self.client_azure.chat.completions.create( |
|
model=AVAILABLE_MODELS["azure"], |
|
messages=[ |
|
{"role": "user", "content": prompt} |
|
], |
|
response_format={"type": "json_object"}, |
|
temperature=temperature, |
|
max_tokens=max_tokens |
|
) |
|
return response.choices[0].message.content |
|
except Exception as e: |
|
logging.error(f"Error in Azure API call: {str(e)}") |
|
raise |
|
|
|
async def process_single_row(self, patient_summary: str, chat_context: str, |
|
mb_recommendation: str, provider: str) -> Dict: |
|
""" |
|
Обробка одного рядка даних з деталізацією процесу. |
|
|
|
Args: |
|
patient_summary: Інформація про пацієнта |
|
chat_context: Історія спілкування |
|
mb_recommendation: Рекомендація MB |
|
provider: Провайдер LLM |
|
|
|
Returns: |
|
Dict: Результати обробки з проміжними даними |
|
""" |
|
try: |
|
logging.info(f"Starting process_single_row with provider: {provider}") |
|
logging.debug(f"Input data lengths - summary: {len(patient_summary)}, " |
|
f"context: {len(chat_context)}, " |
|
f"recommendation: {len(mb_recommendation)}") |
|
|
|
processing_details = { |
|
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), |
|
'provider': provider, |
|
'stages': {}, |
|
'mb_recommendation': mb_recommendation |
|
} |
|
|
|
|
|
logging.info("Creating risk assessment prompt...") |
|
risk_prompt = self.create_prompt( |
|
patient_summary, chat_context, mb_recommendation, |
|
RISK_ASSESSMENT_PROMPT |
|
) |
|
logging.debug(f"Risk assessment prompt created, length: {len(risk_prompt)}") |
|
|
|
logging.info("Processing risk assessment...") |
|
risk_response = await self._process_with_provider( |
|
risk_prompt, provider, "risk" |
|
) |
|
|
|
processing_details['stages']['risk_assessment'] = { |
|
'prompt': risk_prompt, |
|
'response': risk_response |
|
} |
|
|
|
risk_data = json.loads(risk_response) |
|
logging.info(f"Risk assessment result: {json.dumps(risk_data, indent=2)}") |
|
|
|
|
|
if not risk_data['high_risk_level']: |
|
if risk_data['agree_with_mb']: |
|
|
|
logging.info("Processing GREEN scenario") |
|
processing_details['scenario'] = 'GREEN' |
|
mb_data = json.loads(mb_recommendation) |
|
final_response = [{ |
|
"Id": mb_data.get('Id', ''), |
|
"NotificationPriority": mb_data.get('NotificationPriority', '3'), |
|
"Direction": mb_data.get('Direction', ''), |
|
"Message": mb_data.get('Message', {}), |
|
"HighRisk": False, |
|
"Reason": risk_data['reason'] |
|
}] |
|
|
|
else: |
|
|
|
logging.info("Processing YELLOW scenario") |
|
processing_details['scenario'] = 'YELLOW' |
|
yellow_prompt = self.create_prompt( |
|
patient_summary, chat_context, mb_recommendation, |
|
YELLOW_PROMPT |
|
) |
|
|
|
yellow_response = await self._process_with_provider( |
|
yellow_prompt, provider, "final" |
|
) |
|
|
|
processing_details['stages']['yellow_scenario'] = { |
|
'prompt': yellow_prompt, |
|
'response': yellow_response |
|
} |
|
|
|
final_response = json.loads(yellow_response) |
|
|
|
else: |
|
|
|
logging.info("Processing RED scenario") |
|
processing_details['scenario'] = 'RED' |
|
|
|
|
|
logging.info("Creating classification prompt...") |
|
class_prompt = self.create_prompt( |
|
patient_summary, chat_context, mb_recommendation, |
|
RED_PROMPT_CLASS |
|
) |
|
|
|
logging.info("Processing classification...") |
|
class_response = await self._process_with_provider( |
|
class_prompt, provider, "classification" |
|
) |
|
|
|
processing_details['stages']['red_classification'] = { |
|
'prompt': class_prompt, |
|
'response': class_response |
|
} |
|
|
|
class_data = json.loads(class_response) |
|
logging.info(f"Classification result: {json.dumps(class_data, indent=2)}") |
|
|
|
|
|
logging.info("Creating final red scenario prompt...") |
|
red_prompt = self.create_prompt( |
|
patient_summary, chat_context, mb_recommendation, |
|
RED_PROMPT, |
|
classification=class_data |
|
) |
|
|
|
logging.info("Processing final recommendations...") |
|
red_response = await self._process_with_provider( |
|
red_prompt, provider, "final" |
|
) |
|
|
|
processing_details['stages']['red_final'] = { |
|
'prompt': red_prompt, |
|
'response': red_response |
|
} |
|
|
|
final_response = json.loads(red_response) |
|
|
|
|
|
processing_details['final_response'] = final_response |
|
logging.info("Processing completed successfully") |
|
|
|
return processing_details |
|
|
|
except Exception as e: |
|
logging.error(f"Error in process_single_row: {str(e)}", exc_info=True) |
|
raise ValueError(f"Processing error: {str(e)}") |
|
|
|
def process_json_file(self, file_path: Union[str, Path]) -> pd.DataFrame: |
|
""" |
|
Обробка JSON файлу та підготовка DataFrame. |
|
|
|
Args: |
|
file_path: Шлях до JSON файлу |
|
|
|
Returns: |
|
pd.DataFrame: DataFrame з обробленими даними |
|
""" |
|
try: |
|
logging.info(f"Processing JSON file: {file_path}") |
|
|
|
with open(file_path, 'r', encoding='utf-8') as file: |
|
json_data = json.load(file) |
|
|
|
|
|
if not self.validate_input_json(json_data): |
|
raise ValueError("Invalid JSON structure") |
|
|
|
messages = json_data.get('History', []) |
|
patient_summary = json_data.get('Context', '') |
|
|
|
|
|
messages.sort(key=lambda x: x['Timestamp']) |
|
|
|
rows = [] |
|
chat_context = [] |
|
|
|
for msg in messages: |
|
try: |
|
|
|
formatted_message = self.format_message(msg) |
|
if formatted_message: |
|
mb_recommendation = self.create_mb_recommendation(msg) |
|
|
|
rows.append({ |
|
'PATIENT_SUMMARY': patient_summary, |
|
'CHAT_CONTEXT': self.format_chat_history(chat_context), |
|
'MB_RECOMMENDATION': mb_recommendation |
|
}) |
|
|
|
chat_context.append(formatted_message) |
|
|
|
except Exception as e: |
|
logging.error(f"Error processing message: {str(e)}") |
|
continue |
|
|
|
if not rows: |
|
raise ValueError("No valid messages found in file") |
|
|
|
logging.info(f"Processed {len(rows)} messages successfully") |
|
return pd.DataFrame(rows) |
|
|
|
except Exception as e: |
|
logging.error(f"Error processing file: {str(e)}") |
|
raise |
|
|
|
def validate_input_json(self, data: Dict) -> bool: |
|
"""Валідація вхідного JSON.""" |
|
try: |
|
if not isinstance(data, dict): |
|
logging.error("Input data is not a dictionary") |
|
return False |
|
|
|
required_fields = ['Context', 'History'] |
|
if not all(field in data for field in required_fields): |
|
logging.error("Missing required fields in input JSON") |
|
return False |
|
|
|
if not isinstance(data['History'], list): |
|
logging.error("History must be a list") |
|
return False |
|
|
|
for message in data['History']: |
|
required_msg_fields = ['Timestamp', 'Direction', 'Subject', 'Body'] |
|
if not all(field in message for field in required_msg_fields): |
|
logging.error(f"Message missing required fields: {message}") |
|
return False |
|
|
|
return True |
|
|
|
except Exception as e: |
|
logging.error(f"Error validating input JSON: {str(e)}") |
|
return False |
|
|
|
def format_message(self, message: Dict) -> Optional[str]: |
|
"""Форматування повідомлення.""" |
|
try: |
|
timestamp = datetime.strptime( |
|
message['Timestamp'], |
|
'%Y-%m-%dT%H:%M:%S.%f%z' |
|
) |
|
|
|
return ( |
|
f"{timestamp.strftime('%m/%d/%Y %H:%M:%S')} " |
|
f"{message['Direction']}:\n" |
|
f"{message['Subject']}\n" |
|
f"{message['Body']}" |
|
) |
|
|
|
except Exception as e: |
|
logging.error(f"Error formatting message: {str(e)}") |
|
return None |
|
|
|
def format_chat_history(self, messages: List[str]) -> str: |
|
"""Форматування історії чату.""" |
|
try: |
|
formatted_messages = [] |
|
current_date = None |
|
|
|
for idx, msg in enumerate(messages, 1): |
|
if not isinstance(msg, str): |
|
continue |
|
|
|
|
|
date_match = re.match(r'(\d{2}/\d{2}/\d{4})', msg) |
|
if not date_match: |
|
continue |
|
|
|
msg_date = date_match.group(1) |
|
|
|
|
|
if msg_date != current_date: |
|
current_date = msg_date |
|
formatted_messages.append(f"\n📅 {current_date}\n{'─' * 40}") |
|
|
|
|
|
if 'system_to' in msg: |
|
icon = '🧠' |
|
elif 'provider_to' in msg: |
|
icon = '💊' |
|
elif 'office_to' in msg: |
|
icon = '🏥' |
|
elif 'patient_to' in msg: |
|
icon = '👤' |
|
else: |
|
icon = '❓' |
|
|
|
formatted_messages.append(f"{idx}. {icon} {msg}") |
|
|
|
return "\n".join(formatted_messages) |
|
|
|
except Exception as e: |
|
logging.error(f"Error formatting chat history: {str(e)}") |
|
return "" |
|
|
|
def create_mb_recommendation(self, message: Dict) -> str: |
|
"""Створення структури рекомендації MB.""" |
|
try: |
|
recommendation = { |
|
"Id": message.get('Id', str(hash(message['Timestamp']))[:8]), |
|
"NotificationPriority": message.get('NotificationPriority', '3'), |
|
"Direction": message['Direction'], |
|
"Timestamp": datetime.strptime( |
|
message['Timestamp'], |
|
'%Y-%m-%dT%H:%M:%S.%f%z' |
|
).strftime('%m/%d/%Y %H:%M:%S'), |
|
"Message": { |
|
"Subject": message['Subject'], |
|
"Body": message['Body'] |
|
} |
|
} |
|
|
|
return json.dumps(recommendation, indent=2, ensure_ascii=False) |
|
|
|
except Exception as e: |
|
logging.error(f"Error creating MB recommendation: {str(e)}") |
|
raise |