Spaces:
Running
Running
""" | |
筛查模型服务 | |
高召回率模型,用于快速风险筛查 | |
""" | |
import logging | |
import time | |
from typing import Dict, Any, List | |
from schemas.user_input import UserInput, ScreeningResponse, RiskLevel | |
from utils.model_loader import model_manager | |
logger = logging.getLogger(__name__) | |
class ScreeningService: | |
"""筛查服务类""" | |
def __init__(self): | |
self.model_manager = model_manager | |
async def screening_assessment(self, user_data: UserInput, models: List[str] = None, include_advisory: bool = True) -> ScreeningResponse: | |
""" | |
执行风险筛查评估 | |
Args: | |
user_data: 用户输入数据 | |
models: 要使用的模型列表,默认使用所有模型 | |
include_advisory: 是否包含建议模型预测,快速评估时为False | |
Returns: | |
ScreeningResponse: 筛查结果 | |
""" | |
start_time = time.time() | |
try: | |
# 转换用户数据为字典 | |
user_dict = user_data.model_dump() | |
# 默认使用所有模型 | |
if models is None: | |
models = ['sarcoI', 'sarcoII'] | |
# 初始化结果 | |
results = {} | |
# 执行筛查预测 | |
advisory_results = {} | |
for model_type in models: | |
if model_type in ['sarcoI', 'sarcoII']: | |
try: | |
# 筛查模型预测 (高召回率) | |
screening_result = self.model_manager.predict_screening(user_dict, model_type) | |
results[model_type] = screening_result | |
logger.info(f"{model_type}筛查完成: {screening_result['risk_level']} (概率: {screening_result['probability']:.3f})") | |
# 建议模型预测 (高精确率) - 仅在需要时运行 | |
if include_advisory: | |
try: | |
logger.info(f"🔍 检查{model_type}建议模型特征...") | |
logger.info(f" 用户数据键: {list(user_dict.keys())}") | |
logger.info(f" 模型管理器状态: 建议模型={list(self.model_manager.advisory_models.keys())}") | |
# 检查是否有足够的特征进行建议模型预测 | |
if self._has_sufficient_features_for_advisory(user_dict, model_type): | |
logger.info(f"✅ {model_type}建议模型特征检查通过,开始预测...") | |
advisory_result = self.model_manager.predict_advisory(user_dict, model_type) | |
advisory_results[model_type] = advisory_result | |
logger.info(f"✅ {model_type}建议模型完成: {advisory_result['risk_level']} (概率: {advisory_result['probability']:.3f})") | |
else: | |
logger.warning(f"❌ {model_type}建议模型跳过: 特征不足,需详细评估") | |
advisory_results[model_type] = None | |
except Exception as e: | |
logger.error(f"❌ {model_type}建议模型预测失败: {str(e)}") | |
import traceback | |
logger.error(f" 错误详情: {traceback.format_exc()}") | |
advisory_results[model_type] = None | |
else: | |
# 快速评估模式:完全跳过建议模型 | |
advisory_results[model_type] = None | |
except Exception as e: | |
logger.error(f"{model_type}筛查失败: {str(e)}") | |
import traceback | |
logger.error(f"详细错误信息: {traceback.format_exc()}") | |
# 使用默认低风险结果,但使用正确的阈值 | |
default_threshold = 0.15 if model_type == 'sarcoI' else 0.09 | |
results[model_type] = { | |
'probability': 0.05, # 使用明显的低风险概率 | |
'risk_level': 'low', | |
'threshold': default_threshold, | |
'model_type': f"{model_type}_screening" | |
} | |
# 筛查失败不影响建议模型 - 建议模型独立运行 | |
if include_advisory: | |
try: | |
logger.info(f"🔄 {model_type}筛查失败,但尝试独立运行建议模型...") | |
if self._has_sufficient_features_for_advisory(user_dict, model_type): | |
logger.info(f"✅ {model_type}建议模型特征检查通过,开始独立预测...") | |
advisory_result = self.model_manager.predict_advisory(user_dict, model_type) | |
advisory_results[model_type] = advisory_result | |
logger.info(f"✅ {model_type}建议模型独立完成: {advisory_result['risk_level']} (概率: {advisory_result['probability']:.3f})") | |
else: | |
logger.warning(f"❌ {model_type}建议模型跳过: 特征不足") | |
advisory_results[model_type] = None | |
except Exception as advisory_e: | |
logger.error(f"❌ {model_type}建议模型独立预测也失败: {str(advisory_e)}") | |
advisory_results[model_type] = None | |
else: | |
advisory_results[model_type] = None | |
# 计算综合风险 | |
overall_risk = self._calculate_overall_risk(results) | |
# 计算置信度 | |
confidence = self._calculate_confidence(results) | |
# 处理时间 | |
processing_time = time.time() - start_time | |
# 构建响应 - 包含筛查和建议模型结果 | |
response = ScreeningResponse( | |
# 筛查模型结果 (高召回率) | |
sarcoI_risk=RiskLevel(results.get('sarcoI', {}).get('risk_level', 'low')), | |
sarcoI_probability=results.get('sarcoI', {}).get('probability', 0.0), | |
sarcoI_threshold=results.get('sarcoI', {}).get('threshold', 0.5), | |
sarcoII_risk=RiskLevel(results.get('sarcoII', {}).get('risk_level', 'low')), | |
sarcoII_probability=results.get('sarcoII', {}).get('probability', 0.0), | |
sarcoII_threshold=results.get('sarcoII', {}).get('threshold', 0.5), | |
# 建议模型结果 (高精确率) | |
sarcoI_advisory_risk=RiskLevel(advisory_results.get('sarcoI', {}).get('risk_level', 'low')) if advisory_results.get('sarcoI') else None, | |
sarcoI_advisory_probability=advisory_results.get('sarcoI', {}).get('probability', 0.0) if advisory_results.get('sarcoI') else None, | |
sarcoI_advisory_threshold=advisory_results.get('sarcoI', {}).get('threshold', 0.36) if advisory_results.get('sarcoI') else None, | |
sarcoII_advisory_risk=RiskLevel(advisory_results.get('sarcoII', {}).get('risk_level', 'low')) if advisory_results.get('sarcoII') else None, | |
sarcoII_advisory_probability=advisory_results.get('sarcoII', {}).get('probability', 0.0) if advisory_results.get('sarcoII') else None, | |
sarcoII_advisory_threshold=advisory_results.get('sarcoII', {}).get('threshold', 0.52) if advisory_results.get('sarcoII') else None, | |
# 综合结果 | |
overall_risk=RiskLevel(overall_risk), | |
confidence=confidence, | |
processing_time=processing_time | |
) | |
logger.info(f"筛查评估完成: 综合风险={overall_risk}, 置信度={confidence:.3f}, 耗时={processing_time:.2f}s") | |
return response | |
except Exception as e: | |
logger.error(f"筛查评估失败: {str(e)}") | |
import traceback | |
logger.error(f"详细错误信息: {traceback.format_exc()}") | |
# 返回默认安全结果,使用明显的低风险概率 | |
return ScreeningResponse( | |
sarcoI_risk=RiskLevel.LOW, | |
sarcoI_probability=0.05, # 明显低于所有阈值 | |
sarcoI_threshold=0.15, # 使用正确的阈值 | |
sarcoII_risk=RiskLevel.LOW, | |
sarcoII_probability=0.05, # 明显低于所有阈值 | |
sarcoII_threshold=0.09, # 使用正确的阈值 | |
overall_risk=RiskLevel.LOW, | |
confidence=0.5, | |
processing_time=time.time() - start_time | |
) | |
def _has_sufficient_features_for_advisory(self, user_data: Dict, model_type: str) -> bool: | |
"""检查是否有足够的特征进行建议模型预测""" | |
# 建议模型所需的核心特征列表 | |
advisory_required_features = { | |
'sarcoI': ['body_mass_index', 'race_ethnicity', 'WWI', 'age_years'], | |
'sarcoII': ['body_mass_index', 'race_ethnicity', 'age_years', 'WWI'] | |
} | |
required = advisory_required_features.get(model_type, []) | |
# 检查核心特征是否存在 | |
missing_core_features = [] | |
for feature in required: | |
if feature not in user_data or user_data[feature] is None: | |
missing_core_features.append(feature) | |
if missing_core_features: | |
logger.info(f"{model_type}建议模型缺失核心特征: {missing_core_features}") | |
return False | |
# 检查是否有体力活动相关数据(更宽松的检查) | |
# 1. 检查衍生特征 | |
derived_features = ['Total_MET_minutes_week', 'Total_Vigorous_Minutes_week', | |
'Total_Moderate_Minutes_week', 'Activity_Diversity_Index', | |
'Vigorous_MET_Ratio', 'Activity_Sedentary_Ratio'] | |
derived_count = sum(1 for feature in derived_features if feature in user_data and user_data[feature] is not None) | |
# 2. 检查NHANES问卷数据 | |
activity_indicators = [ | |
'PAQ605', 'PAQ620', 'PAQ635', 'PAQ650', 'PAQ665', # 活动类型选择 | |
'PAQ610', 'PAQ625', 'PAQ640', 'PAQ655', 'PAQ670', # 活动天数 | |
'PAD615', 'PAD630', 'PAD645', 'PAD660', 'PAD675', # 活动时长 | |
'PAD680' # 久坐时间 | |
] | |
activity_data_count = sum(1 for indicator in activity_indicators | |
if indicator in user_data and user_data[indicator] is not None) | |
# 3. 检查基础体力活动字段 | |
basic_activity_fields = [ | |
'vigorous_work_days', 'vigorous_work_minutes', | |
'moderate_work_days', 'moderate_work_minutes', | |
'walk_bicycle_days', 'walk_bicycle_minutes', | |
'vigorous_rec_days', 'vigorous_rec_minutes', | |
'moderate_rec_days', 'moderate_rec_minutes', | |
'sedentary_minutes' | |
] | |
basic_activity_count = sum(1 for field in basic_activity_fields | |
if field in user_data and user_data[field] is not None) | |
# 更宽松的验证逻辑:满足任一条件即可 | |
if derived_count >= 2: | |
logger.info(f"{model_type}建议模型特征检查通过: 具备{derived_count}个衍生特征") | |
return True | |
elif activity_data_count >= 3: | |
logger.info(f"{model_type}建议模型特征检查通过: 具备{activity_data_count}个NHANES问卷字段") | |
return True | |
elif basic_activity_count >= 3: | |
logger.info(f"{model_type}建议模型特征检查通过: 具备{basic_activity_count}个基础活动字段") | |
return True | |
else: | |
logger.info(f"{model_type}建议模型特征不足: 衍生特征{derived_count}个, NHANES字段{activity_data_count}个, 基础字段{basic_activity_count}个") | |
logger.info(f"{model_type}建议模型跳过: 需要更多体力活动数据") | |
return False | |
def _calculate_overall_risk(self, results: Dict[str, Any]) -> str: | |
"""计算综合风险等级""" | |
try: | |
sarcoI_result = results.get('sarcoI', {}) | |
sarcoII_result = results.get('sarcoII', {}) | |
if not sarcoI_result or not sarcoII_result: | |
return 'low' | |
return self.model_manager.get_overall_risk(sarcoI_result, sarcoII_result) | |
except Exception as e: | |
logger.error(f"综合风险计算失败: {str(e)}") | |
return 'low' | |
def _calculate_confidence(self, results: Dict[str, Any]) -> float: | |
"""计算预测置信度""" | |
try: | |
probabilities = [] | |
for model_type, result in results.items(): | |
if 'probability' in result: | |
prob = result['probability'] | |
# 将概率转换为置信度 (距离0.5越远,置信度越高) | |
confidence = abs(prob - 0.5) * 2 | |
probabilities.append(confidence) | |
if probabilities: | |
avg_confidence = sum(probabilities) / len(probabilities) | |
# 确保置信度在合理范围内 | |
return min(max(avg_confidence, 0.3), 0.95) | |
else: | |
return 0.5 | |
except Exception as e: | |
logger.error(f"置信度计算失败: {str(e)}") | |
return 0.5 | |
def get_risk_explanation(self, response: ScreeningResponse) -> Dict[str, str]: | |
"""获取风险等级解释""" | |
explanations = { | |
'low': { | |
'title': '低风险', | |
'description': '当前肌少症风险较低,建议维持健康的生活方式。', | |
'recommendation': '继续保持规律运动和均衡饮食,定期进行健康检查。' | |
}, | |
'medium': { | |
'title': '中等风险', | |
'description': '存在一定的肌少症风险,建议加强预防措施。', | |
'recommendation': '增加体力活动,注意蛋白质摄入,考虑咨询专业医生。' | |
}, | |
'high': { | |
'title': '高风险', | |
'description': '肌少症风险较高,强烈建议寻求专业医疗指导。', | |
'recommendation': '请尽快咨询医生,制定个性化的干预方案,进行详细的医学评估。' | |
} | |
} | |
overall_risk = response.overall_risk.value | |
return explanations.get(overall_risk, explanations['low']) | |
# 创建全局筛查服务实例 | |
screening_service = ScreeningService() |