File size: 15,190 Bytes
ad05511
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e5794b2
 
 
 
ad05511
e5794b2
ad05511
e5794b2
ad05511
 
ee2f9a5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ad05511
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e5794b2
 
 
ad05511
 
e5794b2
 
ad05511
e5794b2
 
ad05511
 
 
 
 
 
 
9674180
ad05511
9674180
 
ad05511
 
 
 
9674180
 
ad05511
 
9674180
ad05511
9674180
 
ad05511
 
9674180
 
ad05511
 
 
 
9674180
ad05511
9674180
ad05511
 
 
9674180
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ad05511
 
9674180
 
 
 
 
 
 
 
ad05511
 
9674180
 
 
 
 
 
 
ad05511
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
"""
筛查模型服务
高召回率模型,用于快速风险筛查
"""

import logging
import time
from typing import Dict, Any, List
from schemas.user_input import UserInput, ScreeningResponse, RiskLevel
from utils.model_loader import model_manager

logger = logging.getLogger(__name__)

class ScreeningService:
    """筛查服务类"""
    
    def __init__(self):
        self.model_manager = model_manager
    
    async def screening_assessment(self, user_data: UserInput, models: List[str] = None, include_advisory: bool = True) -> ScreeningResponse:
        """
        执行风险筛查评估
        
        Args:
            user_data: 用户输入数据
            models: 要使用的模型列表,默认使用所有模型
            include_advisory: 是否包含建议模型预测,快速评估时为False
            
        Returns:
            ScreeningResponse: 筛查结果
        """
        start_time = time.time()
        
        try:
            # 转换用户数据为字典
            user_dict = user_data.model_dump()
            
            # 默认使用所有模型
            if models is None:
                models = ['sarcoI', 'sarcoII']
            
            # 初始化结果
            results = {}
            
            # 执行筛查预测
            advisory_results = {}
            for model_type in models:
                if model_type in ['sarcoI', 'sarcoII']:
                    try:
                        # 筛查模型预测 (高召回率)
                        screening_result = self.model_manager.predict_screening(user_dict, model_type)
                        results[model_type] = screening_result
                        logger.info(f"{model_type}筛查完成: {screening_result['risk_level']} (概率: {screening_result['probability']:.3f})")
                        
                        # 建议模型预测 (高精确率) - 仅在需要时运行
                        if include_advisory:
                            try:
                                logger.info(f"🔍 检查{model_type}建议模型特征...")
                                logger.info(f"   用户数据键: {list(user_dict.keys())}")
                                logger.info(f"   模型管理器状态: 建议模型={list(self.model_manager.advisory_models.keys())}")

                                # 检查是否有足够的特征进行建议模型预测
                                if self._has_sufficient_features_for_advisory(user_dict, model_type):
                                    logger.info(f"✅ {model_type}建议模型特征检查通过,开始预测...")
                                    advisory_result = self.model_manager.predict_advisory(user_dict, model_type)
                                    advisory_results[model_type] = advisory_result
                                    logger.info(f"✅ {model_type}建议模型完成: {advisory_result['risk_level']} (概率: {advisory_result['probability']:.3f})")
                                else:
                                    logger.warning(f"❌ {model_type}建议模型跳过: 特征不足,需详细评估")
                                    advisory_results[model_type] = None
                            except Exception as e:
                                logger.error(f"❌ {model_type}建议模型预测失败: {str(e)}")
                                import traceback
                                logger.error(f"   错误详情: {traceback.format_exc()}")
                                advisory_results[model_type] = None
                        else:
                            # 快速评估模式:完全跳过建议模型
                            advisory_results[model_type] = None
                            
                    except Exception as e:
                        logger.error(f"{model_type}筛查失败: {str(e)}")
                        import traceback
                        logger.error(f"详细错误信息: {traceback.format_exc()}")
                        # 使用默认低风险结果,但使用正确的阈值
                        default_threshold = 0.15 if model_type == 'sarcoI' else 0.09
                        results[model_type] = {
                            'probability': 0.05,  # 使用明显的低风险概率
                            'risk_level': 'low',
                            'threshold': default_threshold,
                            'model_type': f"{model_type}_screening"
                        }
                        # 筛查失败不影响建议模型 - 建议模型独立运行
                        if include_advisory:
                            try:
                                logger.info(f"🔄 {model_type}筛查失败,但尝试独立运行建议模型...")
                                if self._has_sufficient_features_for_advisory(user_dict, model_type):
                                    logger.info(f"✅ {model_type}建议模型特征检查通过,开始独立预测...")
                                    advisory_result = self.model_manager.predict_advisory(user_dict, model_type)
                                    advisory_results[model_type] = advisory_result
                                    logger.info(f"✅ {model_type}建议模型独立完成: {advisory_result['risk_level']} (概率: {advisory_result['probability']:.3f})")
                                else:
                                    logger.warning(f"❌ {model_type}建议模型跳过: 特征不足")
                                    advisory_results[model_type] = None
                            except Exception as advisory_e:
                                logger.error(f"❌ {model_type}建议模型独立预测也失败: {str(advisory_e)}")
                                advisory_results[model_type] = None
                        else:
                            advisory_results[model_type] = None
            
            # 计算综合风险
            overall_risk = self._calculate_overall_risk(results)
            
            # 计算置信度
            confidence = self._calculate_confidence(results)
            
            # 处理时间
            processing_time = time.time() - start_time
            
            # 构建响应 - 包含筛查和建议模型结果
            response = ScreeningResponse(
                # 筛查模型结果 (高召回率)
                sarcoI_risk=RiskLevel(results.get('sarcoI', {}).get('risk_level', 'low')),
                sarcoI_probability=results.get('sarcoI', {}).get('probability', 0.0),
                sarcoI_threshold=results.get('sarcoI', {}).get('threshold', 0.5),
                sarcoII_risk=RiskLevel(results.get('sarcoII', {}).get('risk_level', 'low')),
                sarcoII_probability=results.get('sarcoII', {}).get('probability', 0.0),
                sarcoII_threshold=results.get('sarcoII', {}).get('threshold', 0.5),

                # 建议模型结果 (高精确率)
                sarcoI_advisory_risk=RiskLevel(advisory_results.get('sarcoI', {}).get('risk_level', 'low')) if advisory_results.get('sarcoI') else None,
                sarcoI_advisory_probability=advisory_results.get('sarcoI', {}).get('probability', 0.0) if advisory_results.get('sarcoI') else None,
                sarcoI_advisory_threshold=advisory_results.get('sarcoI', {}).get('threshold', 0.36) if advisory_results.get('sarcoI') else None,
                sarcoII_advisory_risk=RiskLevel(advisory_results.get('sarcoII', {}).get('risk_level', 'low')) if advisory_results.get('sarcoII') else None,
                sarcoII_advisory_probability=advisory_results.get('sarcoII', {}).get('probability', 0.0) if advisory_results.get('sarcoII') else None,
                sarcoII_advisory_threshold=advisory_results.get('sarcoII', {}).get('threshold', 0.52) if advisory_results.get('sarcoII') else None,

                # 综合结果
                overall_risk=RiskLevel(overall_risk),
                confidence=confidence,
                processing_time=processing_time
            )
            
            logger.info(f"筛查评估完成: 综合风险={overall_risk}, 置信度={confidence:.3f}, 耗时={processing_time:.2f}s")
            return response
            
        except Exception as e:
            logger.error(f"筛查评估失败: {str(e)}")
            import traceback
            logger.error(f"详细错误信息: {traceback.format_exc()}")
            # 返回默认安全结果,使用明显的低风险概率
            return ScreeningResponse(
                sarcoI_risk=RiskLevel.LOW,
                sarcoI_probability=0.05,  # 明显低于所有阈值
                sarcoI_threshold=0.15,    # 使用正确的阈值
                sarcoII_risk=RiskLevel.LOW,
                sarcoII_probability=0.05,  # 明显低于所有阈值
                sarcoII_threshold=0.09,    # 使用正确的阈值
                overall_risk=RiskLevel.LOW,
                confidence=0.5,
                processing_time=time.time() - start_time
            )
    
    def _has_sufficient_features_for_advisory(self, user_data: Dict, model_type: str) -> bool:
        """检查是否有足够的特征进行建议模型预测"""
        # 建议模型所需的核心特征列表
        advisory_required_features = {
            'sarcoI': ['body_mass_index', 'race_ethnicity', 'WWI', 'age_years'],
            'sarcoII': ['body_mass_index', 'race_ethnicity', 'age_years', 'WWI']
        }

        required = advisory_required_features.get(model_type, [])

        # 检查核心特征是否存在
        missing_core_features = []
        for feature in required:
            if feature not in user_data or user_data[feature] is None:
                missing_core_features.append(feature)

        if missing_core_features:
            logger.info(f"{model_type}建议模型缺失核心特征: {missing_core_features}")
            return False

        # 检查是否有体力活动相关数据(更宽松的检查)
        # 1. 检查衍生特征
        derived_features = ['Total_MET_minutes_week', 'Total_Vigorous_Minutes_week',
                           'Total_Moderate_Minutes_week', 'Activity_Diversity_Index',
                           'Vigorous_MET_Ratio', 'Activity_Sedentary_Ratio']

        derived_count = sum(1 for feature in derived_features if feature in user_data and user_data[feature] is not None)

        # 2. 检查NHANES问卷数据
        activity_indicators = [
            'PAQ605', 'PAQ620', 'PAQ635', 'PAQ650', 'PAQ665',  # 活动类型选择
            'PAQ610', 'PAQ625', 'PAQ640', 'PAQ655', 'PAQ670',  # 活动天数
            'PAD615', 'PAD630', 'PAD645', 'PAD660', 'PAD675',  # 活动时长
            'PAD680'  # 久坐时间
        ]

        activity_data_count = sum(1 for indicator in activity_indicators
                                if indicator in user_data and user_data[indicator] is not None)

        # 3. 检查基础体力活动字段
        basic_activity_fields = [
            'vigorous_work_days', 'vigorous_work_minutes',
            'moderate_work_days', 'moderate_work_minutes',
            'walk_bicycle_days', 'walk_bicycle_minutes',
            'vigorous_rec_days', 'vigorous_rec_minutes',
            'moderate_rec_days', 'moderate_rec_minutes',
            'sedentary_minutes'
        ]

        basic_activity_count = sum(1 for field in basic_activity_fields
                                 if field in user_data and user_data[field] is not None)

        # 更宽松的验证逻辑:满足任一条件即可
        if derived_count >= 2:
            logger.info(f"{model_type}建议模型特征检查通过: 具备{derived_count}个衍生特征")
            return True
        elif activity_data_count >= 3:
            logger.info(f"{model_type}建议模型特征检查通过: 具备{activity_data_count}个NHANES问卷字段")
            return True
        elif basic_activity_count >= 3:
            logger.info(f"{model_type}建议模型特征检查通过: 具备{basic_activity_count}个基础活动字段")
            return True
        else:
            logger.info(f"{model_type}建议模型特征不足: 衍生特征{derived_count}个, NHANES字段{activity_data_count}个, 基础字段{basic_activity_count}个")
            logger.info(f"{model_type}建议模型跳过: 需要更多体力活动数据")
            return False
    
    def _calculate_overall_risk(self, results: Dict[str, Any]) -> str:
        """计算综合风险等级"""
        try:
            sarcoI_result = results.get('sarcoI', {})
            sarcoII_result = results.get('sarcoII', {})
            
            if not sarcoI_result or not sarcoII_result:
                return 'low'
            
            return self.model_manager.get_overall_risk(sarcoI_result, sarcoII_result)
            
        except Exception as e:
            logger.error(f"综合风险计算失败: {str(e)}")
            return 'low'
    
    def _calculate_confidence(self, results: Dict[str, Any]) -> float:
        """计算预测置信度"""
        try:
            probabilities = []
            
            for model_type, result in results.items():
                if 'probability' in result:
                    prob = result['probability']
                    # 将概率转换为置信度 (距离0.5越远,置信度越高)
                    confidence = abs(prob - 0.5) * 2
                    probabilities.append(confidence)
            
            if probabilities:
                avg_confidence = sum(probabilities) / len(probabilities)
                # 确保置信度在合理范围内
                return min(max(avg_confidence, 0.3), 0.95)
            else:
                return 0.5
                
        except Exception as e:
            logger.error(f"置信度计算失败: {str(e)}")
            return 0.5
    
    def get_risk_explanation(self, response: ScreeningResponse) -> Dict[str, str]:
        """获取风险等级解释"""
        explanations = {
            'low': {
                'title': '低风险',
                'description': '当前肌少症风险较低,建议维持健康的生活方式。',
                'recommendation': '继续保持规律运动和均衡饮食,定期进行健康检查。'
            },
            'medium': {
                'title': '中等风险', 
                'description': '存在一定的肌少症风险,建议加强预防措施。',
                'recommendation': '增加体力活动,注意蛋白质摄入,考虑咨询专业医生。'
            },
            'high': {
                'title': '高风险',
                'description': '肌少症风险较高,强烈建议寻求专业医疗指导。',
                'recommendation': '请尽快咨询医生,制定个性化的干预方案,进行详细的医学评估。'
            }
        }
        
        overall_risk = response.overall_risk.value
        return explanations.get(overall_risk, explanations['low'])

# 创建全局筛查服务实例
screening_service = ScreeningService()