File size: 4,315 Bytes
9314c03
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
from __future__ import annotations

import json
import time
import uuid
from typing import Any, Dict, Optional

from .logging import logger

from .config import (
    WARMUP_INIT_RETRIES,
    WARMUP_INIT_DELAY_S,
    WARMUP_REQUEST_RETRIES,
    WARMUP_REQUEST_DELAY_S,
)
from .packets import packet_template
from .state import STATE, ensure_tool_ids

# 导入warp2protobuf模块的函数,替代HTTP调用
from warp2protobuf.core.protobuf_utils import dict_to_protobuf_bytes
from warp2protobuf.warp.api_client import send_protobuf_to_warp_api_parsed
from warp2protobuf.core.schema_sanitizer import sanitize_mcp_input_schema_in_packet


async def bridge_send_stream(packet: Dict[str, Any]) -> Dict[str, Any]:
    """直接调用warp2protobuf模块,替代HTTP调用"""
    try:
        logger.info("[OpenAI Compat] Bridge request (direct call)")
        logger.info("[OpenAI Compat] Bridge request payload: %s", json.dumps(packet, ensure_ascii=False))
        
        # 应用schema清理
        wrapped = {"json_data": packet}
        wrapped = sanitize_mcp_input_schema_in_packet(wrapped)
        actual_data = wrapped.get("json_data", packet)
        
        # 转换为protobuf
        protobuf_bytes = dict_to_protobuf_bytes(actual_data, "warp.multi_agent.v1.Request")
        logger.info(f"[OpenAI Compat] JSON编码为protobuf成功: {len(protobuf_bytes)} 字节")
        
        # 直接调用API客户端
        response_text, conversation_id, task_id, parsed_events = await send_protobuf_to_warp_api_parsed(protobuf_bytes)
        
        # 构造响应
        result = {
            "response": response_text,
            "conversation_id": conversation_id,
            "task_id": task_id,
            "request_size": len(protobuf_bytes),
            "response_size": len(response_text),
            "message_type": "warp.multi_agent.v1.Request",
            "parsed_events": parsed_events,
            "events_count": len(parsed_events),
            "events_summary": {}
        }
        
        if parsed_events:
            event_type_counts = {}
            for event in parsed_events:
                event_data = event.get("parsed_data", {})
                event_type = event.get("event_type", "UNKNOWN")
                event_type_counts[event_type] = event_type_counts.get(event_type, 0) + 1
            result["events_summary"] = event_type_counts
        
        logger.info(f"[OpenAI Compat] Bridge调用成功,响应长度: {len(response_text)} 字符,事件数量: {len(parsed_events)}")
        return result
        
    except Exception as e:
        import traceback
        error_details = {
            "error": str(e),
            "error_type": type(e).__name__,
            "traceback": traceback.format_exc(),
        }
        logger.error(f"[OpenAI Compat] Bridge调用失败: {e}")
        raise Exception(f"bridge_error: {e}")


async def initialize_once() -> None:
    """初始化函数,现在使用直接调用,无需HTTP健康检查"""
    if STATE.conversation_id:
        return

    ensure_tool_ids()

    first_task_id = STATE.baseline_task_id or str(uuid.uuid4())
    STATE.baseline_task_id = first_task_id

    # 移除HTTP健康检查,因为现在是直接调用模块函数
    logger.info("[OpenAI Compat] 使用直接模块调用,跳过HTTP健康检查")

    pkt = packet_template()
    pkt["task_context"]["active_task_id"] = first_task_id
    pkt["input"]["user_inputs"]["inputs"].append({"user_query": {"query": "warmup"}})

    last_exc: Optional[Exception] = None
    for attempt in range(1, WARMUP_REQUEST_RETRIES + 1):
        try:
            resp = await bridge_send_stream(pkt)
            break
        except Exception as e:
            last_exc = e
            logger.warning(f"[OpenAI Compat] Warmup attempt {attempt}/{WARMUP_REQUEST_RETRIES} failed: {e}")
            if attempt < WARMUP_REQUEST_RETRIES:
                time.sleep(WARMUP_REQUEST_DELAY_S)
            else:
                raise

    STATE.conversation_id = resp.get("conversation_id") or STATE.conversation_id
    ret_task_id = resp.get("task_id")
    if isinstance(ret_task_id, str) and ret_task_id:
        STATE.baseline_task_id = ret_task_id