devme's picture
Upload 90 files
9314c03 verified
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Warp API response parsing
Handles parsing of protobuf responses and extraction of OpenAI-compatible content.
"""
from typing import Optional, Dict, List, Any
from ..core.logging import logger
from ..core.protobuf import ensure_proto_runtime, msg_cls
def extract_openai_content_from_response(payload: bytes) -> dict:
"""
Extract OpenAI-compatible content from Warp API response payload.
"""
if not payload:
logger.debug("extract_openai_content_from_response: payload is empty")
return {"content": None, "tool_calls": [], "finish_reason": None, "metadata": {}}
logger.debug(f"extract_openai_content_from_response: processing payload of {len(payload)} bytes")
hex_dump = payload.hex()
logger.debug(f"extract_openai_content_from_response: complete payload hex: {hex_dump}")
try:
ensure_proto_runtime()
ResponseEvent = msg_cls("warp.multi_agent.v1.ResponseEvent")
response = ResponseEvent()
response.ParseFromString(payload)
result = {"content": "", "tool_calls": [], "finish_reason": None, "metadata": {}}
if response.HasField("client_actions"):
for i, action in enumerate(response.client_actions.actions):
if action.HasField("append_to_message_content"):
message = action.append_to_message_content.message
if message.HasField("agent_output"):
agent_output = message.agent_output
if agent_output.text:
result["content"] += agent_output.text
if agent_output.reasoning:
if "reasoning" not in result:
result["reasoning"] = ""
result["reasoning"] += agent_output.reasoning
if message.HasField("tool_call"):
tool_call = message.tool_call
openai_tool_call = {
"id": getattr(tool_call, 'id', f"call_{i}"),
"type": "function",
"function": {
"name": getattr(tool_call, 'name', getattr(tool_call, 'function_name', 'unknown')),
"arguments": getattr(tool_call, 'arguments', getattr(tool_call, 'parameters', '{}'))
}
}
result["tool_calls"].append(openai_tool_call)
elif action.HasField("add_messages_to_task"):
for j, msg in enumerate(action.add_messages_to_task.messages):
if msg.HasField("agent_output") and msg.agent_output.text:
result["content"] += msg.agent_output.text
if msg.HasField("tool_call"):
tool_call = msg.tool_call
tool_name = "unknown"
tool_args = "{}"
tool_call_id = getattr(tool_call, 'tool_call_id', f"call_{i}_{j}")
for field, value in tool_call.ListFields():
if field.name == 'tool_call_id':
continue
tool_name = field.name
if hasattr(value, 'ListFields'):
tool_fields_dict = {}
for tool_field, tool_value in value.ListFields():
if isinstance(tool_value, str):
tool_fields_dict[tool_field.name] = tool_value
elif hasattr(tool_value, '__len__') and not isinstance(tool_value, str):
tool_fields_dict[tool_field.name] = list(tool_value)
else:
tool_fields_dict[tool_field.name] = str(tool_value)
if tool_fields_dict:
import json
tool_args = json.dumps(tool_fields_dict)
break
openai_tool_call = {
"id": tool_call_id,
"type": "function",
"function": {"name": tool_name, "arguments": tool_args}
}
result["tool_calls"].append(openai_tool_call)
elif action.HasField("update_task_message"):
umsg = action.update_task_message.message
if umsg.HasField("agent_output") and umsg.agent_output.text:
result["content"] += umsg.agent_output.text
elif action.HasField("create_task"):
task = action.create_task.task
for j, msg in enumerate(task.messages):
if msg.HasField("agent_output") and msg.agent_output.text:
result["content"] += msg.agent_output.text
elif action.HasField("update_task_summary"):
summary = action.update_task_summary.summary
if summary:
result["content"] += summary
if response.HasField("finished"):
result["finish_reason"] = "stop"
result["metadata"] = {
"response_fields": [field.name for field, _ in response.ListFields()],
"has_client_actions": response.HasField("client_actions"),
"payload_size": len(payload)
}
return result
except Exception as e:
logger.error(f"extract_openai_content_from_response: exception occurred: {e}")
import traceback
logger.error(f"extract_openai_content_from_response: traceback: {traceback.format_exc()}")
return {"content": None, "tool_calls": [], "finish_reason": "error", "metadata": {"error": str(e)}}
def extract_text_from_response(payload: bytes) -> Optional[str]:
result = extract_openai_content_from_response(payload)
return result["content"] if result["content"] else None
def extract_openai_sse_deltas_from_response(payload: bytes) -> List[Dict[str, Any]]:
if not payload:
return []
try:
ensure_proto_runtime()
ResponseEvent = msg_cls("warp.multi_agent.v1.ResponseEvent")
response = ResponseEvent()
response.ParseFromString(payload)
deltas = []
if response.HasField("client_actions"):
for i, action in enumerate(response.client_actions.actions):
if action.HasField("append_to_message_content"):
message = action.append_to_message_content.message
if message.HasField("agent_output"):
agent_output = message.agent_output
if agent_output.text:
deltas.append({"choices": [{"index": 0, "delta": {"content": agent_output.text}, "finish_reason": None}]})
if agent_output.reasoning:
deltas.append({"choices": [{"index": 0, "delta": {"reasoning": agent_output.reasoning}, "finish_reason": None}]})
if message.HasField("tool_call"):
tool_call = message.tool_call
deltas.append({"choices": [{"index": 0, "delta": {"role": "assistant"}, "finish_reason": None}]})
openai_tool_call = {
"id": getattr(tool_call, 'tool_call_id', f"call_{i}"),
"type": "function",
"function": {
"name": getattr(tool_call, 'name', 'unknown'),
"arguments": getattr(tool_call, 'arguments', '{}')
}
}
deltas.append({"choices": [{"index": 0, "delta": {"tool_calls": [openai_tool_call]}, "finish_reason": None}]})
elif action.HasField("add_messages_to_task"):
for j, msg in enumerate(action.add_messages_to_task.messages):
if msg.HasField("agent_output") and msg.agent_output.text:
deltas.append({"choices": [{"index": 0, "delta": {"content": msg.agent_output.text}, "finish_reason": None}]})
if msg.HasField("tool_call"):
tool_call = msg.tool_call
if j == 0:
deltas.append({"choices": [{"index": 0, "delta": {"role": "assistant"}, "finish_reason": None}]})
tool_call_id = getattr(tool_call, 'tool_call_id', f"call_{i}_{j}")
tool_name = "unknown"
tool_args = "{}"
for field, value in tool_call.ListFields():
if field.name == 'tool_call_id':
continue
tool_name = field.name
if hasattr(value, 'ListFields'):
tool_fields_dict = {}
for tool_field, tool_value in value.ListFields():
if isinstance(tool_value, str):
tool_fields_dict[tool_field.name] = tool_value
elif hasattr(tool_value, '__len__') and not isinstance(tool_value, str):
tool_fields_dict[tool_field.name] = list(tool_value)
else:
tool_fields_dict[tool_field.name] = str(tool_value)
if tool_fields_dict:
import json
tool_args = json.dumps(tool_fields_dict)
break
openai_tool_call = {"id": tool_call_id, "type": "function", "function": {"name": tool_name, "arguments": tool_args}}
deltas.append({"choices": [{"index": 0, "delta": {"tool_calls": [openai_tool_call]}, "finish_reason": None}]})
elif action.HasField("update_task_message"):
umsg = action.update_task_message.message
if umsg.HasField("agent_output") and umsg.agent_output.text:
deltas.append({"choices": [{"index": 0, "delta": {"content": umsg.agent_output.text}, "finish_reason": None}]})
elif action.HasField("create_task"):
task = action.create_task.task
for j, msg in enumerate(task.messages):
if msg.HasField("agent_output") and msg.agent_output.text:
deltas.append({"choices": [{"index": 0, "delta": {"content": msg.agent_output.text}, "finish_reason": None}]})
elif action.HasField("update_task_summary"):
summary = action.update_task_summary.summary
if summary:
deltas.append({"choices": [{"index": 0, "delta": {"content": summary}, "finish_reason": None}]})
if response.HasField("finished"):
deltas.append({"choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}]})
return deltas
except Exception as e:
logger.error(f"extract_openai_sse_deltas_from_response: exception occurred: {e}")
import traceback
logger.error(f"extract_openai_sse_deltas_from_response: traceback: {traceback.format_exc()}")
return []