|
import json |
|
import logging |
|
import re |
|
import asyncio |
|
from datetime import datetime |
|
from pathlib import Path |
|
from typing import Dict, List, Optional, Union, Tuple |
|
import time |
|
import random |
|
|
|
from src.config.config import ( |
|
LOGS_DIR, |
|
MIN_DELAY, |
|
MAX_DELAY, |
|
MAX_RETRIES |
|
) |
|
|
|
def validate_risk_assessment(response: str) -> bool: |
|
"""Валідація відповіді оцінки ризику.""" |
|
try: |
|
data = json.loads(response) |
|
|
|
|
|
required_fields = ['high_risk_level', 'agree_with_mb', 'reason'] |
|
if not all(field in data for field in required_fields): |
|
logging.error("Missing required fields in risk assessment") |
|
return False |
|
|
|
|
|
if not isinstance(data['high_risk_level'], bool): |
|
logging.error("high_risk_level must be boolean") |
|
return False |
|
|
|
if not isinstance(data['agree_with_mb'], str): |
|
logging.error("agree_with_mb must be string") |
|
return False |
|
|
|
if data['agree_with_mb'] not in ['yes', 'no']: |
|
logging.error("agree_with_mb must be 'yes' or 'no'") |
|
return False |
|
|
|
if not isinstance(data['reason'], str) or not data['reason'].strip(): |
|
logging.error("reason must be non-empty string") |
|
return False |
|
|
|
return True |
|
|
|
except json.JSONDecodeError as e: |
|
logging.error(f"JSON decode error in risk assessment: {str(e)}") |
|
return False |
|
except Exception as e: |
|
logging.error(f"Validation error in risk assessment: {str(e)}") |
|
return False |
|
|
|
def validate_classification_response(response: str) -> bool: |
|
"""Валідація відповіді класифікації.""" |
|
try: |
|
data = json.loads(response) |
|
|
|
|
|
required_fields = ['HighRiskLevel', 'RelevantCategories', 'Reason'] |
|
if not all(field in data for field in required_fields): |
|
logging.error("Missing required fields in classification") |
|
return False |
|
|
|
|
|
if not isinstance(data['HighRiskLevel'], int): |
|
logging.error("HighRiskLevel must be integer") |
|
return False |
|
|
|
if not (7 <= data['HighRiskLevel'] <= 10): |
|
logging.error("HighRiskLevel must be between 7 and 10") |
|
return False |
|
|
|
|
|
if not isinstance(data['RelevantCategories'], list): |
|
logging.error("RelevantCategories must be a list") |
|
return False |
|
|
|
|
|
valid_categories = { |
|
'ALTERED_MENTAL_STATUS_CONFUSION', |
|
'ALLERGIC_REACTION_ANGIOEDEMA', |
|
'ANIMAL_OR_HUMAN_BITES', |
|
'BACK_PAIN', |
|
'DIZZINESS_LIGHTHEADEDNESS', |
|
'FLANK_PAIN', |
|
'HEMOPTYSIS_COUGHING_UP_BLOOD', |
|
'PELVIC_PAIN_IN_WOMEN', |
|
'SEIZURE', |
|
'SUDDEN_HEARING_LOSS', |
|
'SYNCOPE_NEAR_SYNCOPE_FAINTING', |
|
'TESTICULAR_PAIN_SWELLING', |
|
'UPPER_GI_BLEEDING_HEMATEMESIS', |
|
'ABDOMINAL_PAIN_IN_PREGNANCY', |
|
'DECREASED_FETAL_MOVEMENTS_IN_PREGNANCY', |
|
'HYPERTENSION_DISORDERS_IN_PREGNANCY', |
|
'NAUSEA_AND_VOMITING_IN_PREGNANCY', |
|
'SUSPECTED_LABOR_IN_PREGNANCY', |
|
'VAGINAL_BLEEDING_IN_PREGNANCY' |
|
} |
|
|
|
if data['RelevantCategories'] and not all( |
|
isinstance(cat, str) and cat in valid_categories |
|
for cat in data['RelevantCategories'] |
|
): |
|
logging.error("Invalid category format or unknown category") |
|
return False |
|
|
|
return True |
|
|
|
except json.JSONDecodeError as e: |
|
logging.error(f"JSON decode error in classification: {str(e)}") |
|
return False |
|
except Exception as e: |
|
logging.error(f"Validation error in classification: {str(e)}") |
|
return False |
|
|
|
def validate_final_response(response: str) -> bool: |
|
"""Валідація фінальної відповіді.""" |
|
try: |
|
data = json.loads(response) |
|
|
|
|
|
if not isinstance(data, list): |
|
logging.error("Response must be a list") |
|
return False |
|
|
|
|
|
for item in data: |
|
|
|
required_fields = ['Id', 'NotificationPriority', 'Direction', 'Message', 'HighRisk', 'Reason'] |
|
if not all(field in item for field in required_fields): |
|
logging.error(f"Missing required fields: {set(required_fields) - set(item.keys())}") |
|
return False |
|
|
|
|
|
if not isinstance(item['Message'], dict): |
|
logging.error("Message must be an object") |
|
return False |
|
|
|
if not all(field in item['Message'] for field in ['Subject', 'Body']): |
|
logging.error("Message must have Subject and Body") |
|
return False |
|
|
|
|
|
if not isinstance(item['Id'], str): |
|
logging.error("Id must be string") |
|
return False |
|
|
|
if not isinstance(item['NotificationPriority'], (str, int)): |
|
logging.error("NotificationPriority must be string or integer") |
|
return False |
|
|
|
if not isinstance(item['Direction'], str): |
|
logging.error("Direction must be string") |
|
return False |
|
|
|
|
|
valid_directions = {'system_to_patient', 'system_to_provider', 'system_to_office'} |
|
if item['Direction'] not in valid_directions: |
|
logging.error(f"Invalid direction: {item['Direction']}") |
|
return False |
|
|
|
if not isinstance(item['HighRisk'], bool): |
|
logging.error("HighRisk must be boolean") |
|
return False |
|
|
|
if not isinstance(item['Reason'], str) or not item['Reason'].strip(): |
|
logging.error("Reason must be non-empty string") |
|
return False |
|
|
|
return True |
|
|
|
except json.JSONDecodeError as e: |
|
logging.error(f"JSON decode error in final response: {str(e)}") |
|
return False |
|
except Exception as e: |
|
logging.error(f"Validation error in final response: {str(e)}") |
|
return False |
|
|
|
def clean_json_response(response: str) -> str: |
|
""" |
|
Очищення відповіді API від некоректних символів та форматування. |
|
|
|
Args: |
|
response: Відповідь від API |
|
|
|
Returns: |
|
str: Очищений JSON рядок |
|
""" |
|
try: |
|
|
|
response = ' '.join(response.split()) |
|
|
|
|
|
if response.startswith('{'): |
|
start = response.find('{') |
|
end = response.rfind('}') + 1 |
|
|
|
elif response.startswith('['): |
|
start = response.find('[') |
|
end = response.rfind(']') + 1 |
|
else: |
|
raise ValueError("No JSON structure found") |
|
|
|
|
|
json_str = response[start:end] |
|
|
|
|
|
json_str = ''.join(char for char in json_str if ord(char) >= 32) |
|
|
|
|
|
json_str = json_str.replace('"', '"') |
|
|
|
|
|
json_str = json_str.replace('\\', '\\\\') |
|
json_str = json_str.replace('\n', '\\n') |
|
|
|
|
|
json.loads(json_str) |
|
|
|
return json_str |
|
|
|
except Exception as e: |
|
logging.error(f"Error cleaning JSON response: {str(e)}") |
|
raise ValueError(f"JSON cleaning error: {str(e)}") |
|
|
|
|
|
def setup_logging() -> None: |
|
"""Налаштування системи логування.""" |
|
log_file = LOGS_DIR / f"api_calls_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log" |
|
log_file.parent.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
formatter = logging.Formatter( |
|
'%(asctime)s - %(levelname)s - %(filename)s:%(lineno)d - %(message)s' |
|
) |
|
|
|
|
|
file_handler = logging.FileHandler(str(log_file), encoding='utf-8') |
|
file_handler.setFormatter(formatter) |
|
file_handler.setLevel(logging.DEBUG) |
|
|
|
|
|
console_handler = logging.StreamHandler() |
|
console_handler.setFormatter(formatter) |
|
console_handler.setLevel(logging.INFO) |
|
|
|
|
|
root_logger = logging.getLogger() |
|
root_logger.setLevel(logging.DEBUG) |
|
root_logger.addHandler(file_handler) |
|
root_logger.addHandler(console_handler) |
|
|
|
logging.info("Logging system initialized") |
|
|
|
def retry_with_backoff(func): |
|
"""Декоратор для повторних спроб з експоненційною затримкою.""" |
|
async def wrapper(*args, **kwargs): |
|
for attempt in range(MAX_RETRIES): |
|
try: |
|
return await func(*args, **kwargs) |
|
except Exception as e: |
|
if attempt == MAX_RETRIES - 1: |
|
logging.error(f"All retry attempts exhausted. Last error: {str(e)}") |
|
raise |
|
|
|
delay = (2 ** attempt) + random.uniform(MIN_DELAY, MAX_DELAY) |
|
logging.warning(f"Attempt {attempt + 1} failed. Waiting {delay:.1f}s...") |
|
await asyncio.sleep(delay) |
|
return None |
|
return wrapper |
|
|
|
def format_message(message: Dict) -> Optional[str]: |
|
""" |
|
Форматування повідомлення з JSON структури. |
|
|
|
Args: |
|
message: Словник з даними повідомлення |
|
|
|
Returns: |
|
Optional[str]: Відформатоване повідомлення або None у випадку помилки |
|
""" |
|
try: |
|
|
|
required_fields = ['Timestamp', 'Direction', 'Subject', 'Body'] |
|
if not all(field in message for field in required_fields): |
|
logging.error(f"Missing required fields in message: {message}") |
|
return None |
|
|
|
|
|
timestamp = datetime.strptime(message['Timestamp'], '%Y-%m-%dT%H:%M:%S.%f%z') |
|
|
|
|
|
formatted_msg = ( |
|
f"{timestamp.strftime('%m/%d/%Y %H:%M:%S')} " |
|
f"{message['Direction']}:\n" |
|
f"{message.get('Subject', '').strip()}\n" |
|
f"{message.get('Body', '').strip()}" |
|
) |
|
|
|
return formatted_msg.strip() |
|
|
|
except Exception as e: |
|
logging.error(f"Error formatting message: {str(e)}") |
|
return None |
|
|
|
def format_chat_history(messages: List[str]) -> Tuple[str, List[int]]: |
|
""" |
|
Форматування історії чату з маркуванням повідомлень MB. |
|
|
|
Args: |
|
messages: Список повідомлень |
|
|
|
Returns: |
|
Tuple[str, List[int]]: (Форматована історія, Індекси повідомлень MB) |
|
""" |
|
try: |
|
formatted_messages = [] |
|
mb_indices = [] |
|
current_date = None |
|
|
|
for idx, msg in enumerate(messages): |
|
if not isinstance(msg, str): |
|
continue |
|
|
|
try: |
|
|
|
date_match = re.match(r'(\d{2}/\d{2}/\d{4})', msg) |
|
if not date_match: |
|
continue |
|
|
|
msg_date = date_match.group(1) |
|
|
|
|
|
if msg_date != current_date: |
|
current_date = msg_date |
|
formatted_messages.append(f"\n📅 {current_date}\n{'─' * 40}") |
|
|
|
|
|
if 'system_to' in msg: |
|
icon = '🧠' |
|
mb_indices.append(idx) |
|
elif 'provider_to' in msg: |
|
icon = '💊' |
|
elif 'office_to' in msg: |
|
icon = '🏥' |
|
elif 'patient_to' in msg: |
|
icon = '👤' |
|
else: |
|
icon = '❓' |
|
|
|
|
|
formatted_messages.append(f"{idx + 1}. {icon} {msg}") |
|
|
|
except Exception as e: |
|
logging.error(f"Error formatting message {idx}: {str(e)}") |
|
continue |
|
|
|
return "\n".join(formatted_messages), mb_indices |
|
|
|
except Exception as e: |
|
logging.error(f"Error formatting chat history: {str(e)}") |
|
return "", [] |
|
|
|
def get_messages_from_json(json_data: Dict) -> List[str]: |
|
""" |
|
Отримання повідомлень з JSON структури. |
|
|
|
Args: |
|
json_data: JSON дані |
|
|
|
Returns: |
|
List[str]: Список форматованих повідомлень |
|
""" |
|
try: |
|
messages = [] |
|
raw_messages = json_data.get('History', []) |
|
|
|
|
|
sorted_messages = sorted(raw_messages, key=lambda x: x['Timestamp']) |
|
|
|
for message in sorted_messages: |
|
try: |
|
formatted_msg = format_message(message) |
|
if formatted_msg: |
|
messages.append(formatted_msg) |
|
|
|
except Exception as e: |
|
logging.error(f"Error processing message: {str(e)}") |
|
continue |
|
|
|
return messages |
|
|
|
except Exception as e: |
|
logging.error(f"Error extracting messages: {str(e)}") |
|
return [] |
|
|
|
def format_mb_recommendation(msg: str) -> str: |
|
""" |
|
Форматування рекомендації MB у JSON структуру. |
|
|
|
Args: |
|
msg: Текст повідомлення |
|
|
|
Returns: |
|
str: JSON структура рекомендації |
|
""" |
|
try: |
|
|
|
pattern = r'^(\d{2}/\d{2}/\d{4} \d{2}:\d{2}:\d{2}) ([^:]+):\n(.+)' |
|
matches = re.findall(pattern, msg, flags=re.DOTALL) |
|
|
|
if not matches: |
|
raise ValueError("Invalid message format") |
|
|
|
timestamp, direction, content = matches[0] |
|
|
|
|
|
recommendation = { |
|
"Id": str(hash(msg))[:8], |
|
"NotificationPriority": "3", |
|
"Direction": direction, |
|
"Timestamp": timestamp, |
|
"Message": { |
|
"Subject": content.strip(), |
|
"Body": "" |
|
} |
|
} |
|
|
|
return json.dumps(recommendation, indent=2, ensure_ascii=False) |
|
|
|
except Exception as e: |
|
logging.error(f"Error formatting MB recommendation: {str(e)}") |
|
raise |
|
def validate_json_structure(data: Dict) -> bool: |
|
""" |
|
Валідація базової структури вхідного JSON. |
|
|
|
Args: |
|
data: JSON дані для перевірки |
|
|
|
Returns: |
|
bool: True якщо структура валідна, False інакше |
|
""" |
|
try: |
|
|
|
if not isinstance(data, dict): |
|
logging.error("Input data is not a dictionary") |
|
return False |
|
|
|
|
|
required_fields = ['Context', 'History'] |
|
if not all(field in data for field in required_fields): |
|
logging.error(f"Missing required fields: {set(required_fields) - set(data.keys())}") |
|
return False |
|
|
|
|
|
if not isinstance(data['History'], list): |
|
logging.error("History must be a list") |
|
return False |
|
|
|
|
|
for idx, message in enumerate(data['History']): |
|
if not isinstance(message, dict): |
|
logging.error(f"Message {idx} is not a dictionary") |
|
return False |
|
|
|
required_msg_fields = ['Timestamp', 'Direction', 'Subject', 'Body'] |
|
missing_fields = [field for field in required_msg_fields if field not in message] |
|
|
|
if missing_fields: |
|
logging.error(f"Message {idx} missing fields: {missing_fields}") |
|
return False |
|
|
|
|
|
try: |
|
datetime.strptime(message['Timestamp'], '%Y-%m-%dT%H:%M:%S.%f%z') |
|
except ValueError as e: |
|
logging.error(f"Invalid timestamp format in message {idx}: {str(e)}") |
|
return False |
|
|
|
|
|
if not message['Direction'].strip(): |
|
logging.error(f"Empty Direction in message {idx}") |
|
return False |
|
|
|
|
|
if not isinstance(message.get('Subject', ''), str): |
|
logging.error(f"Subject is not a string in message {idx}") |
|
return False |
|
|
|
if not isinstance(message.get('Body', ''), str): |
|
logging.error(f"Body is not a string in message {idx}") |
|
return False |
|
|
|
logging.info("JSON structure validation passed successfully") |
|
return True |
|
|
|
except Exception as e: |
|
logging.error(f"Error validating JSON structure: {str(e)}") |
|
return False |
|
|
|
def sanitize_input(text: str) -> str: |
|
""" |
|
Очищення введеного тексту від потенційно небезпечних символів. |
|
|
|
Args: |
|
text: Вхідний текст для очищення |
|
|
|
Returns: |
|
str: Очищений текст |
|
""" |
|
try: |
|
if not isinstance(text, str): |
|
logging.warning(f"Input is not a string, converting: {type(text)}") |
|
text = str(text) |
|
|
|
|
|
text = ''.join(char for char in text if ord(char) >= 32 or char in '\n\t') |
|
|
|
|
|
text = ' '.join(text.split()) |
|
|
|
|
|
text = text.replace('<!--', '').replace('-->', '') |
|
text = text.replace('<script', '').replace('</script>', '') |
|
|
|
|
|
text = text.replace('"', '"').replace('"', '"') |
|
text = text.replace(''', "'").replace(''', "'") |
|
|
|
|
|
text = text.strip('`~!@#$%^&*()_+-=[]{}\\|;:\'",.<>?/') |
|
|
|
return text.strip() |
|
|
|
except Exception as e: |
|
logging.error(f"Error sanitizing input: {str(e)}") |
|
return "" |
|
|
|
def validate_mb_recommendation(recommendation: Dict) -> bool: |
|
""" |
|
Валідація структури рекомендації Medical Brain. |
|
|
|
Args: |
|
recommendation: Словник з рекомендацією для перевірки |
|
|
|
Returns: |
|
bool: True якщо структура валідна, False інакше |
|
""" |
|
try: |
|
|
|
if not isinstance(recommendation, dict): |
|
logging.error("Recommendation is not a dictionary") |
|
return False |
|
|
|
|
|
required_fields = ['Id', 'NotificationPriority', 'Direction', 'Message', 'Timestamp'] |
|
if not all(field in recommendation for field in required_fields): |
|
missing = set(required_fields) - set(recommendation.keys()) |
|
logging.error(f"Missing required fields in recommendation: {missing}") |
|
return False |
|
|
|
|
|
message = recommendation.get('Message', {}) |
|
if not isinstance(message, dict): |
|
logging.error("Message must be a dictionary") |
|
return False |
|
|
|
message_fields = ['Subject', 'Body'] |
|
if not all(field in message for field in message_fields): |
|
missing = set(message_fields) - set(message.keys()) |
|
logging.error(f"Missing required fields in Message: {missing}") |
|
return False |
|
|
|
|
|
if not isinstance(recommendation['Id'], str): |
|
logging.error("Id must be a string") |
|
return False |
|
|
|
if not isinstance(recommendation['NotificationPriority'], (str, int)): |
|
logging.error("NotificationPriority must be string or integer") |
|
return False |
|
|
|
if not isinstance(recommendation['Direction'], str): |
|
logging.error("Direction must be a string") |
|
return False |
|
|
|
if not isinstance(message['Subject'], str): |
|
logging.error("Subject must be a string") |
|
return False |
|
|
|
if not isinstance(message['Body'], str): |
|
logging.error("Body must be a string") |
|
return False |
|
|
|
|
|
try: |
|
datetime.strptime(recommendation['Timestamp'], '%m/%d/%Y %H:%M:%S') |
|
except ValueError as e: |
|
logging.error(f"Invalid timestamp format: {str(e)}") |
|
return False |
|
|
|
|
|
if not recommendation['Id'].strip(): |
|
logging.error("Id cannot be empty") |
|
return False |
|
|
|
if isinstance(recommendation['NotificationPriority'], str): |
|
if not recommendation['NotificationPriority'].isdigit(): |
|
logging.error("NotificationPriority must be numeric") |
|
return False |
|
|
|
if not recommendation['Direction'].strip(): |
|
logging.error("Direction cannot be empty") |
|
return False |
|
|
|
logging.debug("MB recommendation validation passed") |
|
return True |
|
|
|
except Exception as e: |
|
logging.error(f"Error validating MB recommendation: {str(e)}") |
|
return False |
|
|
|
def get_scenario_type(risk_assessment: Dict) -> str: |
|
""" |
|
Визначення типу сценарію на основі оцінки ризику. |
|
|
|
Args: |
|
risk_assessment: Словник з результатами оцінки ризику |
|
|
|
Returns: |
|
str: Тип сценарію ('RED', 'YELLOW', або 'GREEN') |
|
""" |
|
try: |
|
|
|
required_fields = ['high_risk_level', 'agree_with_mb'] |
|
if not all(field in risk_assessment for field in required_fields): |
|
logging.error(f"Missing required fields for scenario detection: {set(required_fields) - set(risk_assessment.keys())}") |
|
raise ValueError("Invalid risk assessment data") |
|
|
|
|
|
if risk_assessment['high_risk_level']: |
|
scenario = 'RED' |
|
elif risk_assessment['agree_with_mb'] == 'yes': |
|
scenario = 'GREEN' |
|
else: |
|
scenario = 'YELLOW' |
|
|
|
logging.info(f"Detected scenario type: {scenario}") |
|
return scenario |
|
|
|
except Exception as e: |
|
logging.error(f"Error determining scenario type: {str(e)}") |
|
|
|
return 'RED' |