Spaces:
Running
Running
File size: 12,175 Bytes
9314c03 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 |
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Warp API response parsing
Handles parsing of protobuf responses and extraction of OpenAI-compatible content.
"""
from typing import Optional, Dict, List, Any
from ..core.logging import logger
from ..core.protobuf import ensure_proto_runtime, msg_cls
def extract_openai_content_from_response(payload: bytes) -> dict:
"""
Extract OpenAI-compatible content from Warp API response payload.
"""
if not payload:
logger.debug("extract_openai_content_from_response: payload is empty")
return {"content": None, "tool_calls": [], "finish_reason": None, "metadata": {}}
logger.debug(f"extract_openai_content_from_response: processing payload of {len(payload)} bytes")
hex_dump = payload.hex()
logger.debug(f"extract_openai_content_from_response: complete payload hex: {hex_dump}")
try:
ensure_proto_runtime()
ResponseEvent = msg_cls("warp.multi_agent.v1.ResponseEvent")
response = ResponseEvent()
response.ParseFromString(payload)
result = {"content": "", "tool_calls": [], "finish_reason": None, "metadata": {}}
if response.HasField("client_actions"):
for i, action in enumerate(response.client_actions.actions):
if action.HasField("append_to_message_content"):
message = action.append_to_message_content.message
if message.HasField("agent_output"):
agent_output = message.agent_output
if agent_output.text:
result["content"] += agent_output.text
if agent_output.reasoning:
if "reasoning" not in result:
result["reasoning"] = ""
result["reasoning"] += agent_output.reasoning
if message.HasField("tool_call"):
tool_call = message.tool_call
openai_tool_call = {
"id": getattr(tool_call, 'id', f"call_{i}"),
"type": "function",
"function": {
"name": getattr(tool_call, 'name', getattr(tool_call, 'function_name', 'unknown')),
"arguments": getattr(tool_call, 'arguments', getattr(tool_call, 'parameters', '{}'))
}
}
result["tool_calls"].append(openai_tool_call)
elif action.HasField("add_messages_to_task"):
for j, msg in enumerate(action.add_messages_to_task.messages):
if msg.HasField("agent_output") and msg.agent_output.text:
result["content"] += msg.agent_output.text
if msg.HasField("tool_call"):
tool_call = msg.tool_call
tool_name = "unknown"
tool_args = "{}"
tool_call_id = getattr(tool_call, 'tool_call_id', f"call_{i}_{j}")
for field, value in tool_call.ListFields():
if field.name == 'tool_call_id':
continue
tool_name = field.name
if hasattr(value, 'ListFields'):
tool_fields_dict = {}
for tool_field, tool_value in value.ListFields():
if isinstance(tool_value, str):
tool_fields_dict[tool_field.name] = tool_value
elif hasattr(tool_value, '__len__') and not isinstance(tool_value, str):
tool_fields_dict[tool_field.name] = list(tool_value)
else:
tool_fields_dict[tool_field.name] = str(tool_value)
if tool_fields_dict:
import json
tool_args = json.dumps(tool_fields_dict)
break
openai_tool_call = {
"id": tool_call_id,
"type": "function",
"function": {"name": tool_name, "arguments": tool_args}
}
result["tool_calls"].append(openai_tool_call)
elif action.HasField("update_task_message"):
umsg = action.update_task_message.message
if umsg.HasField("agent_output") and umsg.agent_output.text:
result["content"] += umsg.agent_output.text
elif action.HasField("create_task"):
task = action.create_task.task
for j, msg in enumerate(task.messages):
if msg.HasField("agent_output") and msg.agent_output.text:
result["content"] += msg.agent_output.text
elif action.HasField("update_task_summary"):
summary = action.update_task_summary.summary
if summary:
result["content"] += summary
if response.HasField("finished"):
result["finish_reason"] = "stop"
result["metadata"] = {
"response_fields": [field.name for field, _ in response.ListFields()],
"has_client_actions": response.HasField("client_actions"),
"payload_size": len(payload)
}
return result
except Exception as e:
logger.error(f"extract_openai_content_from_response: exception occurred: {e}")
import traceback
logger.error(f"extract_openai_content_from_response: traceback: {traceback.format_exc()}")
return {"content": None, "tool_calls": [], "finish_reason": "error", "metadata": {"error": str(e)}}
def extract_text_from_response(payload: bytes) -> Optional[str]:
result = extract_openai_content_from_response(payload)
return result["content"] if result["content"] else None
def extract_openai_sse_deltas_from_response(payload: bytes) -> List[Dict[str, Any]]:
if not payload:
return []
try:
ensure_proto_runtime()
ResponseEvent = msg_cls("warp.multi_agent.v1.ResponseEvent")
response = ResponseEvent()
response.ParseFromString(payload)
deltas = []
if response.HasField("client_actions"):
for i, action in enumerate(response.client_actions.actions):
if action.HasField("append_to_message_content"):
message = action.append_to_message_content.message
if message.HasField("agent_output"):
agent_output = message.agent_output
if agent_output.text:
deltas.append({"choices": [{"index": 0, "delta": {"content": agent_output.text}, "finish_reason": None}]})
if agent_output.reasoning:
deltas.append({"choices": [{"index": 0, "delta": {"reasoning": agent_output.reasoning}, "finish_reason": None}]})
if message.HasField("tool_call"):
tool_call = message.tool_call
deltas.append({"choices": [{"index": 0, "delta": {"role": "assistant"}, "finish_reason": None}]})
openai_tool_call = {
"id": getattr(tool_call, 'tool_call_id', f"call_{i}"),
"type": "function",
"function": {
"name": getattr(tool_call, 'name', 'unknown'),
"arguments": getattr(tool_call, 'arguments', '{}')
}
}
deltas.append({"choices": [{"index": 0, "delta": {"tool_calls": [openai_tool_call]}, "finish_reason": None}]})
elif action.HasField("add_messages_to_task"):
for j, msg in enumerate(action.add_messages_to_task.messages):
if msg.HasField("agent_output") and msg.agent_output.text:
deltas.append({"choices": [{"index": 0, "delta": {"content": msg.agent_output.text}, "finish_reason": None}]})
if msg.HasField("tool_call"):
tool_call = msg.tool_call
if j == 0:
deltas.append({"choices": [{"index": 0, "delta": {"role": "assistant"}, "finish_reason": None}]})
tool_call_id = getattr(tool_call, 'tool_call_id', f"call_{i}_{j}")
tool_name = "unknown"
tool_args = "{}"
for field, value in tool_call.ListFields():
if field.name == 'tool_call_id':
continue
tool_name = field.name
if hasattr(value, 'ListFields'):
tool_fields_dict = {}
for tool_field, tool_value in value.ListFields():
if isinstance(tool_value, str):
tool_fields_dict[tool_field.name] = tool_value
elif hasattr(tool_value, '__len__') and not isinstance(tool_value, str):
tool_fields_dict[tool_field.name] = list(tool_value)
else:
tool_fields_dict[tool_field.name] = str(tool_value)
if tool_fields_dict:
import json
tool_args = json.dumps(tool_fields_dict)
break
openai_tool_call = {"id": tool_call_id, "type": "function", "function": {"name": tool_name, "arguments": tool_args}}
deltas.append({"choices": [{"index": 0, "delta": {"tool_calls": [openai_tool_call]}, "finish_reason": None}]})
elif action.HasField("update_task_message"):
umsg = action.update_task_message.message
if umsg.HasField("agent_output") and umsg.agent_output.text:
deltas.append({"choices": [{"index": 0, "delta": {"content": umsg.agent_output.text}, "finish_reason": None}]})
elif action.HasField("create_task"):
task = action.create_task.task
for j, msg in enumerate(task.messages):
if msg.HasField("agent_output") and msg.agent_output.text:
deltas.append({"choices": [{"index": 0, "delta": {"content": msg.agent_output.text}, "finish_reason": None}]})
elif action.HasField("update_task_summary"):
summary = action.update_task_summary.summary
if summary:
deltas.append({"choices": [{"index": 0, "delta": {"content": summary}, "finish_reason": None}]})
if response.HasField("finished"):
deltas.append({"choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}]})
return deltas
except Exception as e:
logger.error(f"extract_openai_sse_deltas_from_response: exception occurred: {e}")
import traceback
logger.error(f"extract_openai_sse_deltas_from_response: traceback: {traceback.format_exc()}")
return [] |