Spaces:
Running
Running
from __future__ import annotations | |
import json | |
import time | |
import uuid | |
from typing import Any, Dict, Optional | |
from .logging import logger | |
from .config import ( | |
WARMUP_INIT_RETRIES, | |
WARMUP_INIT_DELAY_S, | |
WARMUP_REQUEST_RETRIES, | |
WARMUP_REQUEST_DELAY_S, | |
) | |
from .packets import packet_template | |
from .state import STATE, ensure_tool_ids | |
# 导入warp2protobuf模块的函数,替代HTTP调用 | |
from warp2protobuf.core.protobuf_utils import dict_to_protobuf_bytes | |
from warp2protobuf.warp.api_client import send_protobuf_to_warp_api_parsed | |
from warp2protobuf.core.schema_sanitizer import sanitize_mcp_input_schema_in_packet | |
async def bridge_send_stream(packet: Dict[str, Any]) -> Dict[str, Any]: | |
"""直接调用warp2protobuf模块,替代HTTP调用""" | |
try: | |
logger.info("[OpenAI Compat] Bridge request (direct call)") | |
logger.info("[OpenAI Compat] Bridge request payload: %s", json.dumps(packet, ensure_ascii=False)) | |
# 应用schema清理 | |
wrapped = {"json_data": packet} | |
wrapped = sanitize_mcp_input_schema_in_packet(wrapped) | |
actual_data = wrapped.get("json_data", packet) | |
# 转换为protobuf | |
protobuf_bytes = dict_to_protobuf_bytes(actual_data, "warp.multi_agent.v1.Request") | |
logger.info(f"[OpenAI Compat] JSON编码为protobuf成功: {len(protobuf_bytes)} 字节") | |
# 直接调用API客户端 | |
response_text, conversation_id, task_id, parsed_events = await send_protobuf_to_warp_api_parsed(protobuf_bytes) | |
# 构造响应 | |
result = { | |
"response": response_text, | |
"conversation_id": conversation_id, | |
"task_id": task_id, | |
"request_size": len(protobuf_bytes), | |
"response_size": len(response_text), | |
"message_type": "warp.multi_agent.v1.Request", | |
"parsed_events": parsed_events, | |
"events_count": len(parsed_events), | |
"events_summary": {} | |
} | |
if parsed_events: | |
event_type_counts = {} | |
for event in parsed_events: | |
event_data = event.get("parsed_data", {}) | |
event_type = event.get("event_type", "UNKNOWN") | |
event_type_counts[event_type] = event_type_counts.get(event_type, 0) + 1 | |
result["events_summary"] = event_type_counts | |
logger.info(f"[OpenAI Compat] Bridge调用成功,响应长度: {len(response_text)} 字符,事件数量: {len(parsed_events)}") | |
return result | |
except Exception as e: | |
import traceback | |
error_details = { | |
"error": str(e), | |
"error_type": type(e).__name__, | |
"traceback": traceback.format_exc(), | |
} | |
logger.error(f"[OpenAI Compat] Bridge调用失败: {e}") | |
raise Exception(f"bridge_error: {e}") | |
async def initialize_once() -> None: | |
"""初始化函数,现在使用直接调用,无需HTTP健康检查""" | |
if STATE.conversation_id: | |
return | |
ensure_tool_ids() | |
first_task_id = STATE.baseline_task_id or str(uuid.uuid4()) | |
STATE.baseline_task_id = first_task_id | |
# 移除HTTP健康检查,因为现在是直接调用模块函数 | |
logger.info("[OpenAI Compat] 使用直接模块调用,跳过HTTP健康检查") | |
pkt = packet_template() | |
pkt["task_context"]["active_task_id"] = first_task_id | |
pkt["input"]["user_inputs"]["inputs"].append({"user_query": {"query": "warmup"}}) | |
last_exc: Optional[Exception] = None | |
for attempt in range(1, WARMUP_REQUEST_RETRIES + 1): | |
try: | |
resp = await bridge_send_stream(pkt) | |
break | |
except Exception as e: | |
last_exc = e | |
logger.warning(f"[OpenAI Compat] Warmup attempt {attempt}/{WARMUP_REQUEST_RETRIES} failed: {e}") | |
if attempt < WARMUP_REQUEST_RETRIES: | |
time.sleep(WARMUP_REQUEST_DELAY_S) | |
else: | |
raise | |
STATE.conversation_id = resp.get("conversation_id") or STATE.conversation_id | |
ret_task_id = resp.get("task_id") | |
if isinstance(ret_task_id, str) and ret_task_id: | |
STATE.baseline_task_id = ret_task_id |