from __future__ import annotations import json import time import uuid from typing import Any, Dict, Optional from .logging import logger from .config import ( WARMUP_INIT_RETRIES, WARMUP_INIT_DELAY_S, WARMUP_REQUEST_RETRIES, WARMUP_REQUEST_DELAY_S, ) from .packets import packet_template from .state import STATE, ensure_tool_ids # 导入warp2protobuf模块的函数,替代HTTP调用 from warp2protobuf.core.protobuf_utils import dict_to_protobuf_bytes from warp2protobuf.warp.api_client import send_protobuf_to_warp_api_parsed from warp2protobuf.core.schema_sanitizer import sanitize_mcp_input_schema_in_packet async def bridge_send_stream(packet: Dict[str, Any]) -> Dict[str, Any]: """直接调用warp2protobuf模块,替代HTTP调用""" try: logger.info("[OpenAI Compat] Bridge request (direct call)") logger.info("[OpenAI Compat] Bridge request payload: %s", json.dumps(packet, ensure_ascii=False)) # 应用schema清理 wrapped = {"json_data": packet} wrapped = sanitize_mcp_input_schema_in_packet(wrapped) actual_data = wrapped.get("json_data", packet) # 转换为protobuf protobuf_bytes = dict_to_protobuf_bytes(actual_data, "warp.multi_agent.v1.Request") logger.info(f"[OpenAI Compat] JSON编码为protobuf成功: {len(protobuf_bytes)} 字节") # 直接调用API客户端 response_text, conversation_id, task_id, parsed_events = await send_protobuf_to_warp_api_parsed(protobuf_bytes) # 构造响应 result = { "response": response_text, "conversation_id": conversation_id, "task_id": task_id, "request_size": len(protobuf_bytes), "response_size": len(response_text), "message_type": "warp.multi_agent.v1.Request", "parsed_events": parsed_events, "events_count": len(parsed_events), "events_summary": {} } if parsed_events: event_type_counts = {} for event in parsed_events: event_data = event.get("parsed_data", {}) event_type = event.get("event_type", "UNKNOWN") event_type_counts[event_type] = event_type_counts.get(event_type, 0) + 1 result["events_summary"] = event_type_counts logger.info(f"[OpenAI Compat] Bridge调用成功,响应长度: {len(response_text)} 字符,事件数量: {len(parsed_events)}") return result except Exception as e: import traceback error_details = { "error": str(e), "error_type": type(e).__name__, "traceback": traceback.format_exc(), } logger.error(f"[OpenAI Compat] Bridge调用失败: {e}") raise Exception(f"bridge_error: {e}") async def initialize_once() -> None: """初始化函数,现在使用直接调用,无需HTTP健康检查""" if STATE.conversation_id: return ensure_tool_ids() first_task_id = STATE.baseline_task_id or str(uuid.uuid4()) STATE.baseline_task_id = first_task_id # 移除HTTP健康检查,因为现在是直接调用模块函数 logger.info("[OpenAI Compat] 使用直接模块调用,跳过HTTP健康检查") pkt = packet_template() pkt["task_context"]["active_task_id"] = first_task_id pkt["input"]["user_inputs"]["inputs"].append({"user_query": {"query": "warmup"}}) last_exc: Optional[Exception] = None for attempt in range(1, WARMUP_REQUEST_RETRIES + 1): try: resp = await bridge_send_stream(pkt) break except Exception as e: last_exc = e logger.warning(f"[OpenAI Compat] Warmup attempt {attempt}/{WARMUP_REQUEST_RETRIES} failed: {e}") if attempt < WARMUP_REQUEST_RETRIES: time.sleep(WARMUP_REQUEST_DELAY_S) else: raise STATE.conversation_id = resp.get("conversation_id") or STATE.conversation_id ret_task_id = resp.get("task_id") if isinstance(ret_task_id, str) and ret_task_id: STATE.baseline_task_id = ret_task_id