|
import numpy as np |
|
import math |
|
from typing import Dict, List, Optional, Any, Tuple |
|
from dataclasses import dataclass, field |
|
from enum import Enum |
|
import warnings |
|
import json |
|
import os |
|
from openai import OpenAI |
|
import time |
|
import gradio as gr |
|
|
|
warnings.filterwarnings("ignore") |
|
|
|
|
|
class RiskProfile(Enum): |
|
CONSERVATIVE = "conservative" |
|
BALANCED = "balanced" |
|
AGGRESSIVE = "aggressive" |
|
|
|
|
|
class ProblemType(Enum): |
|
STATIC = "static" |
|
DYNAMIC = "dynamic" |
|
|
|
|
|
class ComplexityLevel(Enum): |
|
LOW = "low" |
|
MEDIUM = "medium" |
|
HIGH = "high" |
|
|
|
|
|
@dataclass |
|
class DecisionOption: |
|
name: str |
|
attributes: Dict[str, float] |
|
constraints: Dict[str, Any] = field(default_factory=dict) |
|
|
|
|
|
@dataclass |
|
class DecisionContext: |
|
description: str |
|
user_profile: Dict[str, Any] |
|
options: List[DecisionOption] |
|
objectives: List[str] |
|
constraints: List[str] |
|
|
|
|
|
@dataclass |
|
class DecisionFactors: |
|
primary_factors: List[Dict[str, Any]] |
|
weights: Dict[str, float] |
|
risk_profile: RiskProfile |
|
evaluation_criteria: List[str] |
|
|
|
|
|
@dataclass |
|
class ProblemAnalysis: |
|
problem_type: ProblemType |
|
complexity_level: ComplexityLevel |
|
recommended_iterations: int |
|
early_stop_threshold: float |
|
explanation: str |
|
|
|
|
|
class LLMExtractor: |
|
def __init__(self): |
|
self.client = OpenAI( |
|
api_key=os.getenv("OPENAI_API_KEY"), |
|
base_url=os.getenv("OPENAI_API_BASE"), |
|
) |
|
|
|
def _call_llm(self, prompt: str, system_prompt: str = None) -> str: |
|
messages = [] |
|
if system_prompt: |
|
messages.append({"role": "system", "content": system_prompt}) |
|
messages.append({"role": "user", "content": prompt}) |
|
|
|
try: |
|
response = self.client.chat.completions.create( |
|
model=os.getenv("OPENAI_API_MODEL"), |
|
messages=messages, |
|
temperature=0.1, |
|
max_tokens=1500, |
|
response_format={"type": "json_object"}, |
|
) |
|
return response.choices[0].message.content.replace("```json", "").replace("```", "") |
|
except Exception as e: |
|
print(f"LLM调用失败: {e}") |
|
raise |
|
|
|
def analyze_problem_type(self, user_input: str) -> ProblemAnalysis: |
|
"""分析问题类型和复杂度""" |
|
system_prompt = """你是一个决策分析专家。请分析用户的决策问题,判断: |
|
1. 问题类型:静态决策(选项固定,结果确定)还是动态博弈(涉及多轮决策、对手策略、环境变化) |
|
2. 复杂度:低(简单比较)、中(多因素权衡)、高(复杂约束和不确定性) |
|
3. 推荐的MCTS迭代次数和早停阈值 |
|
请以JSON格式返回结果。""" |
|
|
|
prompt = f"""请分析以下决策问题: |
|
{user_input} |
|
|
|
返回格式,请使用 JSON 格式返回,不要进行解释说明: |
|
{{ |
|
"problem_type": "static/dynamic", |
|
"complexity_level": "low/medium/high", |
|
"recommended_iterations": 数值, |
|
"early_stop_threshold": 数值, |
|
"explanation": "分析说明" |
|
}} |
|
|
|
判断标准: |
|
- 静态决策:选择学校、购买产品、投资组合等固定选项比较 |
|
- 动态博弈:游戏策略、谈判、竞争对手分析等涉及多轮交互 |
|
- 低复杂度:2-3个选项,1-3个主要因素 |
|
- 中复杂度:3-5个选项,3-6个因素 |
|
- 高复杂度:5+个选项,6+个因素或复杂约束""" |
|
|
|
response = self._call_llm(prompt, system_prompt) |
|
try: |
|
data = json.loads(response) |
|
return ProblemAnalysis( |
|
problem_type=(ProblemType.STATIC if data["problem_type"] == "static" else ProblemType.DYNAMIC), |
|
complexity_level=ComplexityLevel(data["complexity_level"]), |
|
recommended_iterations=data["recommended_iterations"], |
|
early_stop_threshold=data["early_stop_threshold"], |
|
explanation=data["explanation"], |
|
) |
|
except Exception as e: |
|
print(f"解析问题分析失败,使用默认设置: {e}") |
|
return ProblemAnalysis( |
|
problem_type=ProblemType.STATIC, |
|
complexity_level=ComplexityLevel.MEDIUM, |
|
recommended_iterations=1000, |
|
early_stop_threshold=0.01, |
|
explanation="使用默认分析结果", |
|
) |
|
|
|
def extract_decision_factors(self, user_input: str) -> DecisionContext: |
|
system_prompt = """你是一个决策分析专家。请分析用户的决策需求,提取以下信息: |
|
1. 决策选项(options):每个选项的名称和关键属性,只返回必须满足约束条件的决策选项(比如用户拥有的分数、积分、点数、数额必须大于对方拥有或要求数额) |
|
2. 决策目标(objectives):用户想要优化的目标 |
|
3. 约束条件(constraints):必须满足的限制条件 |
|
4. 用户特征(user_profile):风险偏好、预算、偏好等 |
|
请以JSON格式返回结果。""" |
|
|
|
prompt = f"""请分析以下决策需求: |
|
{user_input} |
|
|
|
返回格式示例,请使用 JSON 格式返回,不要进行解释说明: |
|
{{ |
|
"description": "决策描述", |
|
"options": [ |
|
{{ |
|
"name": "选项名称", |
|
"attributes": {{ |
|
"属性1": 数值, |
|
"属性2": 数值 |
|
}} |
|
}} |
|
], |
|
"objectives": ["目标1", "目标2"], |
|
"constraints": ["约束1", "约束2"], |
|
"user_profile": {{ |
|
"risk_preference": "conservative/balanced/aggressive", |
|
"其他特征": "值" |
|
}} |
|
}}""" |
|
|
|
response = self._call_llm(prompt, system_prompt) |
|
try: |
|
data = json.loads(response) |
|
options = [ |
|
DecisionOption( |
|
name=opt["name"], |
|
attributes=opt["attributes"], |
|
constraints=opt.get("constraints", {}), |
|
) |
|
for opt in data["options"] |
|
] |
|
|
|
return DecisionContext( |
|
description=data["description"], |
|
user_profile=data["user_profile"], |
|
options=options, |
|
objectives=data["objectives"], |
|
constraints=data["constraints"], |
|
) |
|
except Exception as e: |
|
print(f"解析LLM响应失败: {e}") |
|
raise |
|
|
|
def extract_evaluation_strategy(self, context: DecisionContext) -> DecisionFactors: |
|
system_prompt = """你是一个决策策略专家。基于决策上下文,设计评估策略: |
|
1. 识别主要决策因子及其重要性 |
|
2. 根据用户风险偏好分配权重 |
|
3. 定义评估标准 |
|
4. 只考虑用户决策描述涉及的决策因子,不要添加额外因子""" |
|
|
|
prompt = f"""基于以下决策上下文设计评估策略: |
|
决策描述:{context.description} |
|
目标:{', '.join(context.objectives)} |
|
约束:{', '.join(context.constraints)} |
|
用户特征:{json.dumps(context.user_profile, ensure_ascii=False)} |
|
|
|
请返回JSON格式的评估策略,不要进行解释说明: |
|
{{ |
|
"primary_factors": [ |
|
{{ |
|
"name": "因子名称", |
|
"type": "quantitative/qualitative", |
|
"importance": "high/medium/low", |
|
"uncertainty_level": 0.1 |
|
}} |
|
], |
|
"weights": {{ |
|
"因子1": 权重, |
|
"因子2": 权重 |
|
}}, |
|
"evaluation_criteria": ["标准1", "标准2"], |
|
"risk_adjustment": {{ |
|
"method": "utility_function", |
|
"parameter": 0.5 |
|
}} |
|
}}""" |
|
|
|
response = self._call_llm(prompt, system_prompt) |
|
|
|
try: |
|
data = json.loads(response) |
|
risk_pref = context.user_profile.get("risk_preference", "balanced").lower() |
|
risk_profile = RiskProfile.CONSERVATIVE |
|
if risk_pref == "balanced": |
|
risk_profile = RiskProfile.BALANCED |
|
elif risk_pref == "aggressive": |
|
risk_profile = RiskProfile.AGGRESSIVE |
|
|
|
return DecisionFactors( |
|
primary_factors=data["primary_factors"], |
|
weights=data["weights"], |
|
risk_profile=risk_profile, |
|
evaluation_criteria=data["evaluation_criteria"], |
|
) |
|
except Exception as e: |
|
print(f"解析评估策略失败: {e}") |
|
raise |
|
|
|
def batch_evaluate_options( |
|
self, context: DecisionContext, decision_factors: DecisionFactors |
|
) -> Dict[str, Tuple[float, Dict[str, float]]]: |
|
options_data = [] |
|
for opt in context.options: |
|
options_data.append({"name": opt.name, "attributes": opt.attributes}) |
|
|
|
evaluation_prompt = f"""批量评估以下所有选项: |
|
选项列表:{json.dumps(options_data, ensure_ascii=False)} |
|
目标:{', '.join(context.objectives)} |
|
约束:{', '.join(context.constraints)} |
|
评估标准:{', '.join(decision_factors.evaluation_criteria)} |
|
权重:{json.dumps(decision_factors.weights, ensure_ascii=False)} |
|
|
|
请为每个选项的每个评估维度打分(0-1),并计算加权综合分,请使用 JSON 格式返回,不要进行解释说明: |
|
{{ |
|
"evaluations": {{ |
|
"选项名称1": {{ |
|
"dimension_scores": {{ |
|
"维度1": 分数, |
|
"维度2": 分数 |
|
}}, |
|
"weighted_score": 加权综合分 |
|
}}, |
|
"选项名称2": {{ |
|
"dimension_scores": {{ |
|
"维度1": 分数, |
|
"维度2": 分数 |
|
}}, |
|
"weighted_score": 加权综合分 |
|
}} |
|
}} |
|
}}""" |
|
|
|
try: |
|
response = self._call_llm(evaluation_prompt) |
|
eval_data = json.loads(response) |
|
|
|
results = {} |
|
for option_name, eval_result in eval_data["evaluations"].items(): |
|
weighted_score = eval_result["weighted_score"] |
|
dimension_scores = eval_result["dimension_scores"] |
|
results[option_name] = (weighted_score, dimension_scores) |
|
|
|
return results |
|
except Exception as e: |
|
print(f"批量评估失败,使用简单评估: {e}") |
|
return self._simple_batch_evaluate(context, decision_factors) |
|
|
|
def _simple_batch_evaluate( |
|
self, context: DecisionContext, decision_factors: DecisionFactors |
|
) -> Dict[str, Tuple[float, Dict[str, float]]]: |
|
results = {} |
|
for option in context.options: |
|
scores = {} |
|
total_score = 0 |
|
|
|
for attr_name, attr_value in option.attributes.items(): |
|
normalized_score = min(1.0, attr_value / 100.0) if isinstance(attr_value, (int, float)) else 0.5 |
|
scores[attr_name] = normalized_score |
|
weight = decision_factors.weights.get(attr_name, 1.0 / len(option.attributes)) |
|
total_score += weight * normalized_score |
|
|
|
results[option.name] = (total_score, scores) |
|
|
|
return results |
|
|
|
|
|
class UtilityFunction: |
|
def __init__(self, risk_aversion: float = 0.5): |
|
self.risk_aversion = risk_aversion |
|
|
|
def calculate_utility(self, value: float) -> float: |
|
if self.risk_aversion == 0: |
|
return value |
|
elif self.risk_aversion > 0: |
|
return 1 - math.exp(-self.risk_aversion * value) |
|
else: |
|
return value ** (1 + abs(self.risk_aversion)) |
|
|
|
|
|
class UtilityEvaluator: |
|
def __init__( |
|
self, |
|
decision_factors: DecisionFactors, |
|
pre_evaluations: Dict[str, Tuple[float, Dict[str, float]]], |
|
): |
|
self.decision_factors = decision_factors |
|
self.utility_func = UtilityFunction(self._get_risk_aversion()) |
|
self.pre_evaluations = pre_evaluations |
|
|
|
def _get_risk_aversion(self) -> float: |
|
if self.decision_factors.risk_profile == RiskProfile.CONSERVATIVE: |
|
return 1.0 |
|
elif self.decision_factors.risk_profile == RiskProfile.BALANCED: |
|
return 0.5 |
|
else: |
|
return -0.3 |
|
|
|
def evaluate_option(self, option: DecisionOption, context: DecisionContext) -> Tuple[float, Dict[str, float]]: |
|
if option.name in self.pre_evaluations: |
|
score, dimension_scores = self.pre_evaluations[option.name] |
|
utility_score = self.utility_func.calculate_utility(score) |
|
return utility_score, dimension_scores |
|
|
|
return self._simple_evaluate(option, context) |
|
|
|
def _simple_evaluate(self, option: DecisionOption, context: DecisionContext) -> Tuple[float, Dict[str, float]]: |
|
scores = {} |
|
total_score = 0 |
|
|
|
for attr_name, attr_value in option.attributes.items(): |
|
normalized_score = min(1.0, attr_value / 100.0) if isinstance(attr_value, (int, float)) else 0.5 |
|
scores[attr_name] = normalized_score |
|
weight = self.decision_factors.weights.get(attr_name, 1.0 / len(option.attributes)) |
|
total_score += weight * normalized_score |
|
|
|
return total_score, scores |
|
|
|
|
|
class TraditionalEvaluator: |
|
"""传统的加权评分方法""" |
|
|
|
def __init__( |
|
self, |
|
decision_factors: DecisionFactors, |
|
pre_evaluations: Dict[str, Tuple[float, Dict[str, float]]], |
|
): |
|
self.decision_factors = decision_factors |
|
self.utility_func = UtilityFunction(self._get_risk_aversion()) |
|
self.pre_evaluations = pre_evaluations |
|
|
|
def _get_risk_aversion(self) -> float: |
|
if self.decision_factors.risk_profile == RiskProfile.CONSERVATIVE: |
|
return 1.0 |
|
elif self.decision_factors.risk_profile == RiskProfile.BALANCED: |
|
return 0.5 |
|
else: |
|
return -0.3 |
|
|
|
def evaluate_all_options(self, context: DecisionContext) -> Dict[str, Any]: |
|
"""评估所有选项并返回结果""" |
|
option_results = [] |
|
|
|
for option in context.options: |
|
score, dimension_scores = self.evaluate_option(option, context) |
|
|
|
result = { |
|
"option": option.name, |
|
"expected_value": score, |
|
"dimension_scores": dimension_scores, |
|
"recommendation_score": score, |
|
} |
|
option_results.append(result) |
|
|
|
|
|
option_results.sort(key=lambda x: x["recommendation_score"], reverse=True) |
|
|
|
return { |
|
"recommendations": option_results[:3], |
|
"best_choice": option_results[0] if option_results else None, |
|
"all_results": option_results, |
|
"analysis": { |
|
"method": "traditional_weighted_scoring", |
|
"dimension_leaders": self._get_dimension_leaders(option_results), |
|
}, |
|
|
|
"decision_factors": { |
|
"weights": self.decision_factors.weights, |
|
"risk_profile": self.decision_factors.risk_profile.value, |
|
"evaluation_criteria": self.decision_factors.evaluation_criteria, |
|
}, |
|
} |
|
|
|
def evaluate_option(self, option: DecisionOption, context: DecisionContext) -> Tuple[float, Dict[str, float]]: |
|
if option.name in self.pre_evaluations: |
|
score, dimension_scores = self.pre_evaluations[option.name] |
|
utility_score = self.utility_func.calculate_utility(score) |
|
return utility_score, dimension_scores |
|
|
|
return self._simple_evaluate(option, context) |
|
|
|
def _simple_evaluate(self, option: DecisionOption, context: DecisionContext) -> Tuple[float, Dict[str, float]]: |
|
scores = {} |
|
total_score = 0 |
|
|
|
for attr_name, attr_value in option.attributes.items(): |
|
normalized_score = min(1.0, attr_value / 100.0) if isinstance(attr_value, (int, float)) else 0.5 |
|
scores[attr_name] = normalized_score |
|
weight = self.decision_factors.weights.get(attr_name, 1.0 / len(option.attributes)) |
|
total_score += weight * normalized_score |
|
|
|
return total_score, scores |
|
|
|
def _get_dimension_leaders(self, option_results: List[Dict]) -> Dict[str, Tuple[str, float]]: |
|
dimension_leaders = {} |
|
for result in option_results: |
|
for dim, score in result.get("dimension_scores", {}).items(): |
|
if dim not in dimension_leaders or score > dimension_leaders[dim][1]: |
|
dimension_leaders[dim] = (result["option"], score) |
|
return dimension_leaders |
|
|
|
|
|
class BayesianMCTSNode: |
|
def __init__( |
|
self, |
|
state: Dict, |
|
parent: Optional["BayesianMCTSNode"] = None, |
|
action: Any = None, |
|
context: DecisionContext = None, |
|
): |
|
self.state = state |
|
self.parent = parent |
|
self.action = action |
|
self.context = context |
|
self.children = [] |
|
self.alpha = 1.0 |
|
self.beta = 1.0 |
|
self.visits = 0 |
|
self.value_history = [] |
|
self.dimension_scores = {} |
|
self.untried_actions = self._get_available_actions() |
|
|
|
def _get_available_actions(self) -> List[str]: |
|
if self.context and not self.state.get("is_terminal", False): |
|
return [opt.name for opt in self.context.options] |
|
return [] |
|
|
|
def is_terminal(self) -> bool: |
|
return self.state.get("is_terminal", False) or self.state.get("depth", 0) >= 1 |
|
|
|
def is_fully_expanded(self) -> bool: |
|
return len(self.untried_actions) == 0 |
|
|
|
def ucb_select_child(self, exploration_param: float = 1.414) -> Optional["BayesianMCTSNode"]: |
|
if not self.children: |
|
return None |
|
|
|
total_visits = sum(child.visits for child in self.children) |
|
if total_visits == 0: |
|
return np.random.choice(self.children) |
|
|
|
ucb_values = [] |
|
for child in self.children: |
|
if child.visits == 0: |
|
ucb_values.append(float("inf")) |
|
else: |
|
exploitation = child.get_posterior_mean() |
|
exploration = exploration_param * math.sqrt(math.log(total_visits) / child.visits) |
|
ucb_values.append(exploitation + exploration) |
|
|
|
return self.children[np.argmax(ucb_values)] |
|
|
|
def thompson_sampling_select(self) -> Optional["BayesianMCTSNode"]: |
|
if not self.children: |
|
return None |
|
|
|
samples = [np.random.beta(child.alpha, child.beta) for child in self.children] |
|
return self.children[np.argmax(samples)] |
|
|
|
def get_posterior_mean(self) -> float: |
|
return self.alpha / (self.alpha + self.beta) |
|
|
|
def get_posterior_variance(self) -> float: |
|
alpha, beta = self.alpha, self.beta |
|
return (alpha * beta) / ((alpha + beta) ** 2 * (alpha + beta + 1)) |
|
|
|
def expand(self) -> "BayesianMCTSNode": |
|
if not self.untried_actions: |
|
return self |
|
|
|
action = self.untried_actions.pop() |
|
next_state = self._apply_action(self.state, action) |
|
child = BayesianMCTSNode(next_state, self, action, self.context) |
|
self.children.append(child) |
|
return child |
|
|
|
def _apply_action(self, state: Dict, action: str) -> Dict: |
|
new_state = state.copy() |
|
new_state["selected_option"] = action |
|
new_state["depth"] = state.get("depth", 0) + 1 |
|
new_state["is_terminal"] = True |
|
return new_state |
|
|
|
def update(self, reward: float, dimension_scores: Dict[str, float] = None): |
|
self.visits += 1 |
|
self.value_history.append(reward) |
|
|
|
if dimension_scores: |
|
for dim, score in dimension_scores.items(): |
|
if dim not in self.dimension_scores: |
|
self.dimension_scores[dim] = [] |
|
self.dimension_scores[dim].append(score) |
|
|
|
reward = max(0, min(1, reward)) |
|
noise_factor = max(0.01, 1.0 / (self.visits + 1)) |
|
reward += np.random.normal(0, noise_factor) |
|
reward = max(0, min(1, reward)) |
|
|
|
update_rate = 1.0 |
|
self.alpha += reward * update_rate |
|
self.beta += (1 - reward) * update_rate |
|
|
|
|
|
class BayesianMCTS: |
|
def __init__( |
|
self, |
|
context: DecisionContext, |
|
decision_factors: DecisionFactors, |
|
evaluator: UtilityEvaluator, |
|
iterations: int = 1000, |
|
progress_callback=None, |
|
early_stop_threshold: float = 0.01, |
|
min_iterations: int = 100, |
|
selection_method: str = "mixed", |
|
): |
|
self.context = context |
|
self.decision_factors = decision_factors |
|
self.evaluator = evaluator |
|
self.iterations = iterations |
|
self.progress_callback = progress_callback |
|
self.early_stop_threshold = early_stop_threshold |
|
self.min_iterations = min_iterations |
|
self.selection_method = selection_method |
|
|
|
def search(self) -> Dict[str, Any]: |
|
initial_state = {"is_terminal": False, "depth": 0} |
|
root = BayesianMCTSNode(initial_state, context=self.context) |
|
|
|
best_scores_history = [] |
|
|
|
for iteration in range(self.iterations): |
|
if self.progress_callback and (iteration + 1) % 100 == 0: |
|
progress = (iteration + 1) / self.iterations |
|
self.progress_callback(progress, f"MCTS搜索进度: {iteration + 1}/{self.iterations}") |
|
|
|
node = self._select(root) |
|
|
|
if not node.is_terminal() and not node.is_fully_expanded(): |
|
node = node.expand() |
|
|
|
reward, dimension_scores = self._simulate(node) |
|
self._backpropagate(node, reward, dimension_scores) |
|
|
|
if iteration >= self.min_iterations and iteration % 50 == 0: |
|
if self._should_early_stop(root, best_scores_history): |
|
if self.progress_callback: |
|
self.progress_callback(1.0, f"MCTS早停触发,在第 {iteration + 1} 次迭代停止") |
|
break |
|
|
|
return self._get_results(root) |
|
|
|
def _select(self, node: BayesianMCTSNode) -> BayesianMCTSNode: |
|
while not node.is_terminal() and node.is_fully_expanded(): |
|
if self.selection_method == "ucb": |
|
node = node.ucb_select_child() |
|
elif self.selection_method == "thompson": |
|
node = node.thompson_sampling_select() |
|
else: |
|
if np.random.random() < 0.7: |
|
node = node.thompson_sampling_select() |
|
else: |
|
node = node.ucb_select_child() |
|
|
|
if node is None: |
|
break |
|
return node |
|
|
|
def _simulate(self, node: BayesianMCTSNode) -> Tuple[float, Dict[str, float]]: |
|
if "selected_option" in node.state: |
|
option_name = node.state["selected_option"] |
|
option = next((opt for opt in self.context.options if opt.name == option_name), None) |
|
if option: |
|
base_reward, dim_scores = self.evaluator.evaluate_option(option, self.context) |
|
noise = np.random.normal(0, 0.05) |
|
reward = max(0, min(1, base_reward + noise)) |
|
return reward, dim_scores |
|
|
|
random_option = np.random.choice(self.context.options) |
|
base_reward, dim_scores = self.evaluator.evaluate_option(random_option, self.context) |
|
noise = np.random.normal(0, 0.05) |
|
reward = max(0, min(1, base_reward + noise)) |
|
return reward, dim_scores |
|
|
|
def _backpropagate(self, node: BayesianMCTSNode, reward: float, dimension_scores: Dict[str, float]): |
|
while node is not None: |
|
node.update(reward, dimension_scores) |
|
node = node.parent |
|
|
|
def _should_early_stop(self, root: BayesianMCTSNode, best_scores_history: List[float]) -> bool: |
|
if not root.children: |
|
return False |
|
|
|
current_best_scores = [child.get_posterior_mean() for child in root.children] |
|
current_best_score = max(current_best_scores) |
|
best_scores_history.append(current_best_score) |
|
|
|
if len(best_scores_history) < 10: |
|
return False |
|
|
|
recent_scores = best_scores_history[-10:] |
|
score_variance = np.var(recent_scores) |
|
|
|
if score_variance < self.early_stop_threshold: |
|
sorted_scores = sorted(current_best_scores, reverse=True) |
|
if len(sorted_scores) >= 2: |
|
score_gap = sorted_scores[0] - sorted_scores[1] |
|
if score_gap > 0.1: |
|
return True |
|
|
|
return False |
|
|
|
def _get_results(self, root: BayesianMCTSNode) -> Dict[str, Any]: |
|
if not root.children: |
|
return { |
|
"recommendations": [], |
|
"analysis": {"method": "mcts"}, |
|
"decision_context": self.context.description, |
|
} |
|
|
|
option_results = [] |
|
|
|
for child in root.children: |
|
option_name = child.action |
|
lower, upper = self._calculate_confidence_interval(child) |
|
|
|
avg_dimension_scores = {} |
|
for dim, scores in child.dimension_scores.items(): |
|
if scores: |
|
avg_dimension_scores[dim] = np.mean(scores) |
|
|
|
uncertainty = child.get_posterior_variance() |
|
if child.visits > 1: |
|
uncertainty = max(uncertainty, np.var(child.value_history) / child.visits) |
|
|
|
result = { |
|
"option": option_name, |
|
"expected_value": child.get_posterior_mean(), |
|
"uncertainty": uncertainty, |
|
"visits": child.visits, |
|
"confidence_interval": (lower, upper), |
|
"dimension_scores": avg_dimension_scores, |
|
"recommendation_score": child.get_posterior_mean() * (1 - 0.3 * uncertainty), |
|
} |
|
option_results.append(result) |
|
|
|
option_results.sort(key=lambda x: x["recommendation_score"], reverse=True) |
|
|
|
analysis = self._generate_analysis(option_results) |
|
|
|
return { |
|
"recommendations": option_results[:3], |
|
"best_choice": option_results[0] if option_results else None, |
|
"all_results": option_results, |
|
"analysis": analysis, |
|
"decision_context": self.context.description, |
|
"decision_factors": { |
|
"weights": self.decision_factors.weights, |
|
"risk_profile": self.decision_factors.risk_profile.value, |
|
"evaluation_criteria": self.decision_factors.evaluation_criteria, |
|
}, |
|
} |
|
|
|
def _calculate_confidence_interval(self, node: BayesianMCTSNode, confidence: float = 0.95) -> Tuple[float, float]: |
|
if node.visits < 2: |
|
return (0.0, 1.0) |
|
|
|
values = node.value_history |
|
if len(values) > 1: |
|
mean_val = np.mean(values) |
|
std_val = np.std(values, ddof=1) |
|
margin = 1.96 * std_val / np.sqrt(len(values)) |
|
return (max(0, mean_val - margin), min(1, mean_val + margin)) |
|
else: |
|
return (0.0, 1.0) |
|
|
|
def _generate_analysis(self, option_results: List[Dict]) -> Dict[str, Any]: |
|
if not option_results: |
|
return {"method": "mcts"} |
|
|
|
dimension_leaders = {} |
|
for result in option_results: |
|
for dim, score in result.get("dimension_scores", {}).items(): |
|
if dim not in dimension_leaders or score > dimension_leaders[dim][1]: |
|
dimension_leaders[dim] = (result["option"], score) |
|
|
|
best = option_results[0] |
|
second_best = option_results[1] if len(option_results) > 1 else None |
|
|
|
analysis = { |
|
"method": "mcts", |
|
"selection_strategy": self.selection_method, |
|
"dimension_leaders": dimension_leaders, |
|
"confidence_in_best": best["expected_value"] - (second_best["expected_value"] if second_best else 0), |
|
"exploration_statistics": { |
|
"total_visits": sum(r["visits"] for r in option_results), |
|
"visit_distribution": {r["option"]: r["visits"] for r in option_results}, |
|
}, |
|
"uncertainty_analysis": {r["option"]: r["uncertainty"] for r in option_results}, |
|
} |
|
|
|
return analysis |
|
|
|
|
|
class IntelligentDecisionSystem: |
|
def __init__(self): |
|
self.llm_extractor = LLMExtractor() |
|
self.decision_history = [] |
|
|
|
def make_decision(self, user_input: str, progress_callback=None, force_mcts: bool = False) -> Dict[str, Any]: |
|
if progress_callback: |
|
progress_callback(0.1, "正在分析问题类型和复杂度...") |
|
|
|
problem_analysis = self.llm_extractor.analyze_problem_type(user_input) |
|
if force_mcts: |
|
problem_analysis.problem_type = ProblemType.DYNAMIC |
|
|
|
if progress_callback: |
|
progress_callback(0.2, f"问题类型: {problem_analysis.problem_type.value}, 复杂度: {problem_analysis.complexity_level.value}") |
|
|
|
if progress_callback: |
|
progress_callback(0.3, "正在分析您的决策需求...") |
|
context = self.llm_extractor.extract_decision_factors(user_input) |
|
|
|
if progress_callback: |
|
progress_callback(0.4, f"识别到 {len(context.options)} 个决策选项") |
|
|
|
if progress_callback: |
|
progress_callback(0.5, "正在制定评估策略...") |
|
decision_factors = self.llm_extractor.extract_evaluation_strategy(context) |
|
|
|
if progress_callback: |
|
progress_callback(0.6, "正在批量预评估所有选项...") |
|
pre_evaluations = self.llm_extractor.batch_evaluate_options(context, decision_factors) |
|
|
|
|
|
if problem_analysis.problem_type == ProblemType.STATIC: |
|
if progress_callback: |
|
progress_callback(0.8, "使用传统加权评分方法进行决策...") |
|
evaluator = TraditionalEvaluator(decision_factors, pre_evaluations) |
|
results = evaluator.evaluate_all_options(context) |
|
results["problem_analysis"] = problem_analysis |
|
if progress_callback: |
|
progress_callback(1.0, "决策分析完成!") |
|
else: |
|
if progress_callback: |
|
progress_callback(0.7, f"使用MCTS方法进行动态决策搜索({problem_analysis.recommended_iterations}次迭代)...") |
|
evaluator = UtilityEvaluator(decision_factors, pre_evaluations) |
|
mcts = BayesianMCTS( |
|
context, |
|
decision_factors, |
|
evaluator, |
|
iterations=problem_analysis.recommended_iterations, |
|
progress_callback=progress_callback, |
|
early_stop_threshold=problem_analysis.early_stop_threshold, |
|
selection_method="mixed", |
|
) |
|
results = mcts.search() |
|
results["problem_analysis"] = problem_analysis |
|
|
|
self._record_decision(context, results) |
|
|
|
return self._format_decision_report(results, context) |
|
|
|
def _record_decision(self, context: DecisionContext, results: Dict[str, Any]): |
|
if results.get("best_choice"): |
|
best_choice = results["best_choice"]["option"] |
|
expected_value = results["best_choice"]["expected_value"] |
|
|
|
decision_record = { |
|
"context": context, |
|
"chosen_option": best_choice, |
|
"expected_value": expected_value, |
|
"timestamp": time.time(), |
|
} |
|
self.decision_history.append(decision_record) |
|
|
|
if len(self.decision_history) > 100: |
|
self.decision_history.pop(0) |
|
|
|
def _format_decision_report(self, results: Dict[str, Any], context: DecisionContext) -> Dict[str, Any]: |
|
report = { |
|
"decision_summary": { |
|
"context": context.description, |
|
"objectives": context.objectives, |
|
"constraints": context.constraints, |
|
}, |
|
"problem_analysis": results.get("problem_analysis"), |
|
"recommendations": results.get("recommendations", []), |
|
"best_choice": results.get("best_choice"), |
|
"detailed_analysis": results.get("analysis", {}), |
|
"decision_factors": results.get("decision_factors", {}), |
|
"confidence_level": self._calculate_confidence_level(results), |
|
} |
|
|
|
return report |
|
|
|
def _calculate_confidence_level(self, results: Dict[str, Any]) -> str: |
|
if not results.get("recommendations"): |
|
return "low" |
|
|
|
best = results["recommendations"][0] |
|
|
|
|
|
if results.get("analysis", {}).get("method") == "traditional_weighted_scoring": |
|
if len(results["recommendations"]) > 1: |
|
second = results["recommendations"][1] |
|
gap = best["expected_value"] - second["expected_value"] |
|
|
|
if gap > 0.2: |
|
return "very_high" |
|
elif gap > 0.15: |
|
return "high" |
|
elif gap > 0.1: |
|
return "medium" |
|
else: |
|
return "low" |
|
else: |
|
return "medium" |
|
|
|
|
|
uncertainty = best.get("uncertainty", 0.0) |
|
|
|
if len(results["recommendations"]) > 1: |
|
second = results["recommendations"][1] |
|
gap = best["expected_value"] - second["expected_value"] |
|
|
|
if gap > 0.2 and uncertainty < 0.05: |
|
return "very_high" |
|
elif gap > 0.15 and uncertainty < 0.1: |
|
return "high" |
|
elif gap > 0.1 and uncertainty < 0.15: |
|
return "medium" |
|
else: |
|
return "low" |
|
else: |
|
if uncertainty < 0.05: |
|
return "high" |
|
elif uncertainty < 0.1: |
|
return "medium" |
|
else: |
|
return "low" |
|
|
|
|
|
def format_decision_report_for_chat(report: Dict[str, Any]) -> str: |
|
"""将决策报告格式化为适合聊天界面显示的文本""" |
|
output = [] |
|
|
|
output.append("# 🎯 智能决策分析报告") |
|
output.append("=" * 60) |
|
|
|
|
|
output.append(f"\n## 📋 决策场景") |
|
output.append(f"**描述**: {report['decision_summary']['context']}") |
|
output.append(f"**优化目标**: {', '.join(report['decision_summary']['objectives'])}") |
|
output.append(f"**约束条件**: {', '.join(report['decision_summary']['constraints'])}") |
|
|
|
|
|
if report.get("problem_analysis"): |
|
analysis = report["problem_analysis"] |
|
output.append(f"\n## 🔍 问题分析") |
|
output.append(f"- **问题类型**: {analysis.problem_type.value}") |
|
output.append(f"- **复杂度**: {analysis.complexity_level.value}") |
|
output.append(f"- **分析说明**: {analysis.explanation}") |
|
if analysis.problem_type == ProblemType.DYNAMIC: |
|
output.append(f"- **推荐迭代次数**: {analysis.recommended_iterations}") |
|
output.append(f"- **早停阈值**: {analysis.early_stop_threshold}") |
|
|
|
|
|
confidence_emoji = { |
|
"very_high": "🟢", |
|
"high": "🟡", |
|
"medium": "🟠", |
|
"low": "🔴" |
|
} |
|
confidence = report['confidence_level'] |
|
output.append(f"\n## 📊 决策信心水平: {confidence_emoji.get(confidence, '⚪')} {confidence.upper()}") |
|
|
|
|
|
output.append(f"\n## 🏆 推荐方案排序") |
|
for i, rec in enumerate(report["recommendations"], 1): |
|
emoji = "🥇" if i == 1 else "🥈" if i == 2 else "🥉" |
|
output.append(f"\n### {emoji} {i}. {rec['option']}") |
|
output.append(f"- **综合评分**: {rec['expected_value']:.3f}") |
|
output.append(f"- **推荐指数**: {rec['recommendation_score']:.3f}") |
|
|
|
|
|
if "confidence_interval" in rec: |
|
output.append(f"- **置信区间**: [{rec['confidence_interval'][0]:.3f}, {rec['confidence_interval'][1]:.3f}]") |
|
if "uncertainty" in rec: |
|
output.append(f"- **不确定性**: {rec['uncertainty']:.4f}") |
|
if "visits" in rec: |
|
output.append(f"- **访问次数**: {rec['visits']}") |
|
|
|
|
|
if rec["dimension_scores"]: |
|
output.append("- **维度得分**:") |
|
for dim, score in rec["dimension_scores"].items(): |
|
output.append(f" - {dim}: {score:.3f}") |
|
|
|
|
|
if report["detailed_analysis"].get("dimension_leaders"): |
|
output.append(f"\n## 🎖️ 各维度最佳选项") |
|
for dim, (option, score) in report["detailed_analysis"]["dimension_leaders"].items(): |
|
output.append(f"- **{dim}**: {option} ({score:.3f})") |
|
|
|
|
|
decision_factors = report.get("decision_factors", {}) |
|
if decision_factors.get("weights"): |
|
output.append(f"\n## ⚖️ 决策因子权重") |
|
for factor, weight in decision_factors["weights"].items(): |
|
output.append(f"- {factor}: {weight:.3f}") |
|
|
|
|
|
risk_profile = decision_factors.get("risk_profile", "未知") |
|
output.append(f"\n## 📈 风险偏好: {risk_profile}") |
|
|
|
analysis_method = report["detailed_analysis"].get("method", "unknown") |
|
output.append(f"\n## 🔧 决策方法: {analysis_method.upper()}") |
|
|
|
if analysis_method == "mcts": |
|
selection_strategy = report["detailed_analysis"].get("selection_strategy", "mixed") |
|
output.append(f"- **选择策略**: {selection_strategy}") |
|
|
|
if report["detailed_analysis"].get("exploration_statistics"): |
|
exp_stats = report["detailed_analysis"]["exploration_statistics"] |
|
output.append(f"- **总访问次数**: {exp_stats['total_visits']}") |
|
output.append("- **访问分布**:") |
|
for option, visits in exp_stats["visit_distribution"].items(): |
|
output.append(f" - {option}: {visits}") |
|
|
|
return "\n".join(output) |
|
|
|
|
|
|
|
def create_gradio_interface(): |
|
|
|
decision_system = IntelligentDecisionSystem() |
|
|
|
def process_decision(message, history, force_mcts=True): |
|
"""处理用户决策请求""" |
|
if not message.strip(): |
|
return history + [["请输入您的决策问题", "请描述您需要帮助的决策问题,我将为您提供智能分析和建议。"]] |
|
|
|
|
|
history = history + [[message, None]] |
|
|
|
try: |
|
|
|
progress_messages = [] |
|
|
|
def progress_callback(progress, status): |
|
progress_messages.append(f"⏳ {status}") |
|
|
|
if history and history[-1][1] is None: |
|
history[-1][1] = "\n".join(progress_messages) |
|
return history |
|
|
|
|
|
report = decision_system.make_decision( |
|
message, |
|
progress_callback=progress_callback, |
|
force_mcts=force_mcts |
|
) |
|
|
|
|
|
formatted_report = format_decision_report_for_chat(report) |
|
|
|
|
|
history[-1][1] = formatted_report |
|
|
|
except Exception as e: |
|
error_msg = f"❌ 分析过程中出现错误: {str(e)}\n\n请检查您的问题描述是否清晰,或稍后重试。" |
|
history[-1][1] = error_msg |
|
|
|
return history |
|
|
|
|
|
with gr.Blocks( |
|
title="智能决策助手", |
|
theme=gr.themes.Soft(), |
|
css=""" |
|
.chat-message { |
|
font-size: 14px; |
|
} |
|
""" |
|
) as demo: |
|
|
|
chatbot = gr.Chatbot( |
|
label="决策分析对话", |
|
height=600, |
|
show_label=True, |
|
container=True, |
|
bubble_full_width=False |
|
) |
|
|
|
with gr.Row(): |
|
msg = gr.Textbox( |
|
label="输入您的决策问题", |
|
placeholder="请详细描述您的决策场景、可选方案和目标...", |
|
lines=3, |
|
max_lines=10, |
|
show_label=True, |
|
container=True |
|
) |
|
|
|
with gr.Row(): |
|
submit_btn = gr.Button("🎯 智能分析", variant="primary", size="lg") |
|
clear_btn = gr.Button("🗑️ 清空对话", variant="stop", size="lg") |
|
|
|
|
|
gr.Markdown("### 📋 示例问题(点击快速填入)") |
|
|
|
example_1 = gr.Button("🏫 学校选择问题", size="sm") |
|
example_2 = gr.Button("🎮 游戏策略问题", size="sm") |
|
example_3 = gr.Button("💼 供应商选择问题", size="sm") |
|
|
|
|
|
def submit_message(message, history): |
|
return process_decision(message, history), "" |
|
|
|
def submit_with_mcts(message, history): |
|
return process_with_mcts(message, history), "" |
|
|
|
def clear_chat(): |
|
return [] |
|
|
|
|
|
submit_btn.click( |
|
submit_message, |
|
inputs=[msg, chatbot], |
|
outputs=[chatbot, msg] |
|
) |
|
|
|
|
|
msg.submit( |
|
submit_message, |
|
inputs=[msg, chatbot], |
|
outputs=[chatbot, msg] |
|
) |
|
|
|
clear_btn.click( |
|
clear_chat, |
|
outputs=[chatbot] |
|
) |
|
|
|
|
|
def fill_example_1(): |
|
return """我需要为孩子选择一所小学学校。我们的积分大约是103.75分,如果是报B学校还可以再加 3.5 积分。 |
|
可选学校: |
|
1. A学校:教学质量很好(9分),要求105分,有直升机会,无额外加积分,离家比较近 |
|
2. B学校:教学质量中等(6分),要求103分,没有直升,可以额外加3.5积分,离家比较近 |
|
3. C学校:教学质量一般(2分),要求90分,没有直升,无额外加积分,离家很远 |
|
|
|
我们比较看重教学质量,但也要把握录取概率,另外所有学校的积分有小概率在去年基础上加减 1 积分左右。""" |
|
|
|
def fill_example_2(): |
|
return """我在玩一个策略游戏,需要选择下一步行动。当前情况: |
|
1. 我有10金币,对手有80金币 |
|
2. 我可以选择:攻击(消耗30金币,可能获得50金币),防守(消耗10金币,减少损失),发展经济(消耗40金币,下回合+60金币),投降,平局 |
|
3. 对手可能会根据我的选择调整策略 |
|
4. 游戏还有3回合结束 |
|
|
|
我的目标是最终金币数量最多,需要考虑对手的反应。""" |
|
|
|
def fill_example_3(): |
|
return """公司需要选择新的原材料供应商,有以下几个选项: |
|
|
|
1. 供应商A:价格较高(单价120元),质量优秀(质量分9.2),交货及时率95%,距离较近 |
|
2. 供应商B:价格中等(单价100元),质量良好(质量分7.8),交货及时率88%,距离中等 |
|
3. 供应商C:价格便宜(单价80元),质量一般(质量分6.5),交货及时率75%,距离较远 |
|
|
|
我们的预算有限,但对质量和交货时间都有要求。年采购量预计10万件。""" |
|
|
|
example_1.click(fill_example_1, outputs=[msg]) |
|
example_2.click(fill_example_2, outputs=[msg]) |
|
example_3.click(fill_example_3, outputs=[msg]) |
|
|
|
|
|
with gr.Accordion("📖 详细使用说明", open=False): |
|
gr.Markdown(""" |
|
## 🔧 功能说明 |
|
- **强制使用MCTS**: 无论问题类型,都使用MCTS方法进行深度分析 |
|
- 适合需要考虑更多不确定性的复杂决策 |
|
- 分析时间较长,但结果更全面 |
|
|
|
### 📊 报告内容说明 |
|
- **综合评分**: 基于所有因素的加权综合得分 |
|
- **推荐指数**: 考虑不确定性后的最终推荐分数 |
|
- **置信区间**: MCTS方法提供的结果可信度范围 |
|
- **不确定性**: 决策结果的不确定程度 |
|
- **维度得分**: 各个评估维度的详细得分 |
|
|
|
### 💡 最佳实践 |
|
1. **详细描述**: 提供尽可能详细的背景信息 |
|
2. **量化信息**: 尽量提供具体的数值和指标 |
|
3. **明确目标**: 清楚说明您的优化目标和约束条件 |
|
4. **多轮对话**: 可以基于分析结果进一步提问和讨论 |
|
""") |
|
|
|
return demo |
|
|
|
|
|
def main(): |
|
"""启动Gradio应用""" |
|
demo = create_gradio_interface() |
|
|
|
|
|
demo.launch( |
|
server_name="0.0.0.0", |
|
server_port=7860, |
|
share=False, |
|
debug=True, |
|
show_error=True, |
|
quiet=False |
|
) |
|
|
|
|
|
if __name__ == "__main__": |
|
main() |
|
|