#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Warp API response parsing Handles parsing of protobuf responses and extraction of OpenAI-compatible content. """ from typing import Optional, Dict, List, Any from ..core.logging import logger from ..core.protobuf import ensure_proto_runtime, msg_cls def extract_openai_content_from_response(payload: bytes) -> dict: """ Extract OpenAI-compatible content from Warp API response payload. """ if not payload: logger.debug("extract_openai_content_from_response: payload is empty") return {"content": None, "tool_calls": [], "finish_reason": None, "metadata": {}} logger.debug(f"extract_openai_content_from_response: processing payload of {len(payload)} bytes") hex_dump = payload.hex() logger.debug(f"extract_openai_content_from_response: complete payload hex: {hex_dump}") try: ensure_proto_runtime() ResponseEvent = msg_cls("warp.multi_agent.v1.ResponseEvent") response = ResponseEvent() response.ParseFromString(payload) result = {"content": "", "tool_calls": [], "finish_reason": None, "metadata": {}} if response.HasField("client_actions"): for i, action in enumerate(response.client_actions.actions): if action.HasField("append_to_message_content"): message = action.append_to_message_content.message if message.HasField("agent_output"): agent_output = message.agent_output if agent_output.text: result["content"] += agent_output.text if agent_output.reasoning: if "reasoning" not in result: result["reasoning"] = "" result["reasoning"] += agent_output.reasoning if message.HasField("tool_call"): tool_call = message.tool_call openai_tool_call = { "id": getattr(tool_call, 'id', f"call_{i}"), "type": "function", "function": { "name": getattr(tool_call, 'name', getattr(tool_call, 'function_name', 'unknown')), "arguments": getattr(tool_call, 'arguments', getattr(tool_call, 'parameters', '{}')) } } result["tool_calls"].append(openai_tool_call) elif action.HasField("add_messages_to_task"): for j, msg in enumerate(action.add_messages_to_task.messages): if msg.HasField("agent_output") and msg.agent_output.text: result["content"] += msg.agent_output.text if msg.HasField("tool_call"): tool_call = msg.tool_call tool_name = "unknown" tool_args = "{}" tool_call_id = getattr(tool_call, 'tool_call_id', f"call_{i}_{j}") for field, value in tool_call.ListFields(): if field.name == 'tool_call_id': continue tool_name = field.name if hasattr(value, 'ListFields'): tool_fields_dict = {} for tool_field, tool_value in value.ListFields(): if isinstance(tool_value, str): tool_fields_dict[tool_field.name] = tool_value elif hasattr(tool_value, '__len__') and not isinstance(tool_value, str): tool_fields_dict[tool_field.name] = list(tool_value) else: tool_fields_dict[tool_field.name] = str(tool_value) if tool_fields_dict: import json tool_args = json.dumps(tool_fields_dict) break openai_tool_call = { "id": tool_call_id, "type": "function", "function": {"name": tool_name, "arguments": tool_args} } result["tool_calls"].append(openai_tool_call) elif action.HasField("update_task_message"): umsg = action.update_task_message.message if umsg.HasField("agent_output") and umsg.agent_output.text: result["content"] += umsg.agent_output.text elif action.HasField("create_task"): task = action.create_task.task for j, msg in enumerate(task.messages): if msg.HasField("agent_output") and msg.agent_output.text: result["content"] += msg.agent_output.text elif action.HasField("update_task_summary"): summary = action.update_task_summary.summary if summary: result["content"] += summary if response.HasField("finished"): result["finish_reason"] = "stop" result["metadata"] = { "response_fields": [field.name for field, _ in response.ListFields()], "has_client_actions": response.HasField("client_actions"), "payload_size": len(payload) } return result except Exception as e: logger.error(f"extract_openai_content_from_response: exception occurred: {e}") import traceback logger.error(f"extract_openai_content_from_response: traceback: {traceback.format_exc()}") return {"content": None, "tool_calls": [], "finish_reason": "error", "metadata": {"error": str(e)}} def extract_text_from_response(payload: bytes) -> Optional[str]: result = extract_openai_content_from_response(payload) return result["content"] if result["content"] else None def extract_openai_sse_deltas_from_response(payload: bytes) -> List[Dict[str, Any]]: if not payload: return [] try: ensure_proto_runtime() ResponseEvent = msg_cls("warp.multi_agent.v1.ResponseEvent") response = ResponseEvent() response.ParseFromString(payload) deltas = [] if response.HasField("client_actions"): for i, action in enumerate(response.client_actions.actions): if action.HasField("append_to_message_content"): message = action.append_to_message_content.message if message.HasField("agent_output"): agent_output = message.agent_output if agent_output.text: deltas.append({"choices": [{"index": 0, "delta": {"content": agent_output.text}, "finish_reason": None}]}) if agent_output.reasoning: deltas.append({"choices": [{"index": 0, "delta": {"reasoning": agent_output.reasoning}, "finish_reason": None}]}) if message.HasField("tool_call"): tool_call = message.tool_call deltas.append({"choices": [{"index": 0, "delta": {"role": "assistant"}, "finish_reason": None}]}) openai_tool_call = { "id": getattr(tool_call, 'tool_call_id', f"call_{i}"), "type": "function", "function": { "name": getattr(tool_call, 'name', 'unknown'), "arguments": getattr(tool_call, 'arguments', '{}') } } deltas.append({"choices": [{"index": 0, "delta": {"tool_calls": [openai_tool_call]}, "finish_reason": None}]}) elif action.HasField("add_messages_to_task"): for j, msg in enumerate(action.add_messages_to_task.messages): if msg.HasField("agent_output") and msg.agent_output.text: deltas.append({"choices": [{"index": 0, "delta": {"content": msg.agent_output.text}, "finish_reason": None}]}) if msg.HasField("tool_call"): tool_call = msg.tool_call if j == 0: deltas.append({"choices": [{"index": 0, "delta": {"role": "assistant"}, "finish_reason": None}]}) tool_call_id = getattr(tool_call, 'tool_call_id', f"call_{i}_{j}") tool_name = "unknown" tool_args = "{}" for field, value in tool_call.ListFields(): if field.name == 'tool_call_id': continue tool_name = field.name if hasattr(value, 'ListFields'): tool_fields_dict = {} for tool_field, tool_value in value.ListFields(): if isinstance(tool_value, str): tool_fields_dict[tool_field.name] = tool_value elif hasattr(tool_value, '__len__') and not isinstance(tool_value, str): tool_fields_dict[tool_field.name] = list(tool_value) else: tool_fields_dict[tool_field.name] = str(tool_value) if tool_fields_dict: import json tool_args = json.dumps(tool_fields_dict) break openai_tool_call = {"id": tool_call_id, "type": "function", "function": {"name": tool_name, "arguments": tool_args}} deltas.append({"choices": [{"index": 0, "delta": {"tool_calls": [openai_tool_call]}, "finish_reason": None}]}) elif action.HasField("update_task_message"): umsg = action.update_task_message.message if umsg.HasField("agent_output") and umsg.agent_output.text: deltas.append({"choices": [{"index": 0, "delta": {"content": umsg.agent_output.text}, "finish_reason": None}]}) elif action.HasField("create_task"): task = action.create_task.task for j, msg in enumerate(task.messages): if msg.HasField("agent_output") and msg.agent_output.text: deltas.append({"choices": [{"index": 0, "delta": {"content": msg.agent_output.text}, "finish_reason": None}]}) elif action.HasField("update_task_summary"): summary = action.update_task_summary.summary if summary: deltas.append({"choices": [{"index": 0, "delta": {"content": summary}, "finish_reason": None}]}) if response.HasField("finished"): deltas.append({"choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}]}) return deltas except Exception as e: logger.error(f"extract_openai_sse_deltas_from_response: exception occurred: {e}") import traceback logger.error(f"extract_openai_sse_deltas_from_response: traceback: {traceback.format_exc()}") return []