Spaces:
Running
Running
File size: 4,315 Bytes
9314c03 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 |
from __future__ import annotations
import json
import time
import uuid
from typing import Any, Dict, Optional
from .logging import logger
from .config import (
WARMUP_INIT_RETRIES,
WARMUP_INIT_DELAY_S,
WARMUP_REQUEST_RETRIES,
WARMUP_REQUEST_DELAY_S,
)
from .packets import packet_template
from .state import STATE, ensure_tool_ids
# 导入warp2protobuf模块的函数,替代HTTP调用
from warp2protobuf.core.protobuf_utils import dict_to_protobuf_bytes
from warp2protobuf.warp.api_client import send_protobuf_to_warp_api_parsed
from warp2protobuf.core.schema_sanitizer import sanitize_mcp_input_schema_in_packet
async def bridge_send_stream(packet: Dict[str, Any]) -> Dict[str, Any]:
"""直接调用warp2protobuf模块,替代HTTP调用"""
try:
logger.info("[OpenAI Compat] Bridge request (direct call)")
logger.info("[OpenAI Compat] Bridge request payload: %s", json.dumps(packet, ensure_ascii=False))
# 应用schema清理
wrapped = {"json_data": packet}
wrapped = sanitize_mcp_input_schema_in_packet(wrapped)
actual_data = wrapped.get("json_data", packet)
# 转换为protobuf
protobuf_bytes = dict_to_protobuf_bytes(actual_data, "warp.multi_agent.v1.Request")
logger.info(f"[OpenAI Compat] JSON编码为protobuf成功: {len(protobuf_bytes)} 字节")
# 直接调用API客户端
response_text, conversation_id, task_id, parsed_events = await send_protobuf_to_warp_api_parsed(protobuf_bytes)
# 构造响应
result = {
"response": response_text,
"conversation_id": conversation_id,
"task_id": task_id,
"request_size": len(protobuf_bytes),
"response_size": len(response_text),
"message_type": "warp.multi_agent.v1.Request",
"parsed_events": parsed_events,
"events_count": len(parsed_events),
"events_summary": {}
}
if parsed_events:
event_type_counts = {}
for event in parsed_events:
event_data = event.get("parsed_data", {})
event_type = event.get("event_type", "UNKNOWN")
event_type_counts[event_type] = event_type_counts.get(event_type, 0) + 1
result["events_summary"] = event_type_counts
logger.info(f"[OpenAI Compat] Bridge调用成功,响应长度: {len(response_text)} 字符,事件数量: {len(parsed_events)}")
return result
except Exception as e:
import traceback
error_details = {
"error": str(e),
"error_type": type(e).__name__,
"traceback": traceback.format_exc(),
}
logger.error(f"[OpenAI Compat] Bridge调用失败: {e}")
raise Exception(f"bridge_error: {e}")
async def initialize_once() -> None:
"""初始化函数,现在使用直接调用,无需HTTP健康检查"""
if STATE.conversation_id:
return
ensure_tool_ids()
first_task_id = STATE.baseline_task_id or str(uuid.uuid4())
STATE.baseline_task_id = first_task_id
# 移除HTTP健康检查,因为现在是直接调用模块函数
logger.info("[OpenAI Compat] 使用直接模块调用,跳过HTTP健康检查")
pkt = packet_template()
pkt["task_context"]["active_task_id"] = first_task_id
pkt["input"]["user_inputs"]["inputs"].append({"user_query": {"query": "warmup"}})
last_exc: Optional[Exception] = None
for attempt in range(1, WARMUP_REQUEST_RETRIES + 1):
try:
resp = await bridge_send_stream(pkt)
break
except Exception as e:
last_exc = e
logger.warning(f"[OpenAI Compat] Warmup attempt {attempt}/{WARMUP_REQUEST_RETRIES} failed: {e}")
if attempt < WARMUP_REQUEST_RETRIES:
time.sleep(WARMUP_REQUEST_DELAY_S)
else:
raise
STATE.conversation_id = resp.get("conversation_id") or STATE.conversation_id
ret_task_id = resp.get("task_id")
if isinstance(ret_task_id, str) and ret_task_id:
STATE.baseline_task_id = ret_task_id |