Spaces:
Sleeping
Sleeping
from typing import Any, Dict, List, Optional | |
import json | |
from pydantic import BaseModel | |
class LLMResponseError(Exception): | |
"""Represents an error in LLM response. | |
Attributes: | |
message: Error message | |
model: Model name | |
response: Original response object | |
""" | |
def __init__(self, message: str, model: str = "unknown", response: Any = None): | |
""" | |
Initialize LLM response error | |
Args: | |
message: Error message | |
model: Model name | |
response: Original response object | |
""" | |
self.message = message | |
self.model = model | |
self.response = response | |
super().__init__(f"LLM Error ({model}): {message}. Response: {response}") | |
class Function(BaseModel): | |
""" | |
Represents a function call made by a model | |
""" | |
name: str | |
arguments: str = None | |
class ToolCall(BaseModel): | |
""" | |
Represents a tool call made by a model | |
""" | |
id: str | |
type: str = "function" | |
function: Function = None | |
# name: str = None | |
# arguments: str = None | |
def from_dict(cls, data: Dict[str, Any]) -> 'ToolCall': | |
""" | |
Create ToolCall from dictionary representation | |
Args: | |
data: Dictionary containing tool call data | |
Returns: | |
ToolCall object | |
""" | |
if not data: | |
return None | |
tool_id = data.get('id', f"call_{hash(str(data)) & 0xffffffff:08x}") | |
tool_type = data.get('type', 'function') | |
function_data = data.get('function', {}) | |
name = function_data.get('name') | |
arguments = function_data.get('arguments') | |
# Ensure arguments is a string | |
if arguments is not None and not isinstance(arguments, str): | |
arguments = json.dumps(arguments, ensure_ascii=False) | |
function = Function(name=name, arguments=arguments) | |
return cls( | |
id=tool_id, | |
type=tool_type, | |
function=function, | |
# name=name, | |
# arguments=arguments, | |
) | |
def to_dict(self) -> Dict[str, Any]: | |
""" | |
Convert ToolCall to dictionary representation | |
Returns: | |
Dictionary representation | |
""" | |
return { | |
"id": self.id, | |
"type": self.type, | |
"function": { | |
"name": self.function.name, | |
"arguments": self.function.arguments | |
} | |
} | |
def __repr__(self): | |
return json.dumps(self.to_dict(), ensure_ascii=False) | |
def __iter__(self): | |
""" | |
Make ToolCall dict-like for JSON serialization | |
""" | |
yield from self.to_dict().items() | |
class ModelResponse: | |
""" | |
Unified model response class for encapsulating responses from different LLM providers | |
""" | |
def __init__( | |
self, | |
id: str, | |
model: str, | |
content: str = None, | |
tool_calls: List[ToolCall] = None, | |
usage: Dict[str, int] = None, | |
error: str = None, | |
raw_response: Any = None, | |
message: Dict[str, Any] = None | |
): | |
""" | |
Initialize ModelResponse object | |
Args: | |
id: Response ID | |
model: Model name used | |
content: Generated text content | |
tool_calls: List of tool calls | |
usage: Usage statistics (token counts, etc.) | |
error: Error message (if any) | |
raw_response: Original response object | |
message: Complete message object, can be used for subsequent API calls | |
""" | |
self.id = id | |
self.model = model | |
self.content = content | |
self.tool_calls = tool_calls | |
self.usage = usage or { | |
"completion_tokens": 0, | |
"prompt_tokens": 0, | |
"total_tokens": 0 | |
} | |
self.error = error | |
self.raw_response = raw_response | |
# If message is not provided, construct one from other fields | |
if message is None: | |
self.message = { | |
"role": "assistant", | |
"content": content | |
} | |
if tool_calls: | |
self.message["tool_calls"] = [tool_call.to_dict() for tool_call in tool_calls] | |
else: | |
self.message = message | |
def from_openai_response(cls, response: Any) -> 'ModelResponse': | |
""" | |
Create ModelResponse from OpenAI response object | |
Args: | |
response: OpenAI response object | |
Returns: | |
ModelResponse object | |
Raises: | |
LLMResponseError: When LLM response error occurs | |
""" | |
# Handle error cases | |
if hasattr(response, 'error') or (isinstance(response, dict) and response.get('error')): | |
error_msg = response.error if hasattr(response, 'error') else response.get('error', 'Unknown error') | |
raise LLMResponseError( | |
error_msg, | |
response.model if hasattr(response, 'model') else response.get('model', 'unknown'), | |
response | |
) | |
# Normal case | |
message = None | |
if hasattr(response, 'choices') and response.choices: | |
message = response.choices[0].message | |
elif isinstance(response, dict) and response.get('choices'): | |
message = response['choices'][0].get('message', {}) | |
if not message: | |
raise LLMResponseError( | |
"No message found in response", | |
response.model if hasattr(response, 'model') else response.get('model', 'unknown'), | |
response | |
) | |
# Extract usage information | |
usage = {} | |
if hasattr(response, 'usage'): | |
usage = { | |
"completion_tokens": response.usage.completion_tokens if hasattr(response.usage, | |
'completion_tokens') else 0, | |
"prompt_tokens": response.usage.prompt_tokens if hasattr(response.usage, 'prompt_tokens') else 0, | |
"total_tokens": response.usage.total_tokens if hasattr(response.usage, 'total_tokens') else 0 | |
} | |
elif isinstance(response, dict) and response.get('usage'): | |
usage = response['usage'] | |
# Build message object | |
message_dict = {} | |
if hasattr(message, '__dict__'): | |
# Convert object to dictionary | |
for key, value in message.__dict__.items(): | |
if not key.startswith('_'): | |
message_dict[key] = value | |
elif isinstance(message, dict): | |
message_dict = message | |
else: | |
# Extract common properties | |
message_dict = { | |
"role": "assistant", | |
"content": message.content if hasattr(message, 'content') else "", | |
"tool_calls": message.tool_calls if hasattr(message, 'tool_calls') else None, | |
} | |
message_dict["content"] = '' if message_dict.get('content') is None else message_dict.get('content', '') | |
# Process tool calls | |
processed_tool_calls = [] | |
raw_tool_calls = message.tool_calls if hasattr(message, 'tool_calls') else message_dict.get('tool_calls') | |
if raw_tool_calls: | |
for tool_call in raw_tool_calls: | |
if isinstance(tool_call, dict): | |
processed_tool_calls.append(ToolCall.from_dict(tool_call)) | |
else: | |
# Handle OpenAI object | |
tool_call_dict = { | |
"id": tool_call.id if hasattr(tool_call, | |
'id') else f"call_{hash(str(tool_call)) & 0xffffffff:08x}", | |
"type": tool_call.type if hasattr(tool_call, 'type') else "function" | |
} | |
if hasattr(tool_call, 'function'): | |
function = tool_call.function | |
tool_call_dict["function"] = { | |
"name": function.name if hasattr(function, 'name') else None, | |
"arguments": function.arguments if hasattr(function, 'arguments') else None | |
} | |
processed_tool_calls.append(ToolCall.from_dict(tool_call_dict)) | |
if message_dict and processed_tool_calls: | |
message_dict["tool_calls"] = [tool_call.to_dict() for tool_call in processed_tool_calls] | |
# Create and return ModelResponse | |
return cls( | |
id=response.id if hasattr(response, 'id') else response.get('id', 'unknown'), | |
model=response.model if hasattr(response, 'model') else response.get('model', 'unknown'), | |
content=message.content if hasattr(message, 'content') else message.get('content') or "", | |
tool_calls=processed_tool_calls or None, | |
usage=usage, | |
raw_response=response, | |
message=message_dict | |
) | |
def from_openai_stream_chunk(cls, chunk: Any) -> 'ModelResponse': | |
""" | |
Create ModelResponse from OpenAI stream response chunk | |
Args: | |
chunk: OpenAI stream chunk | |
Returns: | |
ModelResponse object | |
Raises: | |
LLMResponseError: When LLM response error occurs | |
""" | |
# Handle error cases | |
if hasattr(chunk, 'error') or (isinstance(chunk, dict) and chunk.get('error')): | |
error_msg = chunk.error if hasattr(chunk, 'error') else chunk.get('error', 'Unknown error') | |
raise LLMResponseError( | |
error_msg, | |
chunk.model if hasattr(chunk, 'model') else chunk.get('model', 'unknown'), | |
chunk | |
) | |
# Handle finish reason chunk (end of stream) | |
if hasattr(chunk, 'choices') and chunk.choices and chunk.choices[0].finish_reason: | |
return cls( | |
id=chunk.id if hasattr(chunk, 'id') else chunk.get('id', 'unknown'), | |
model=chunk.model if hasattr(chunk, 'model') else chunk.get('model', 'unknown'), | |
content=None, | |
raw_response=chunk, | |
message={"role": "assistant", "content": "", "finish_reason": chunk.choices[0].finish_reason} | |
) | |
# Normal chunk with delta content | |
content = None | |
processed_tool_calls = [] | |
if hasattr(chunk, 'choices') and chunk.choices: | |
delta = chunk.choices[0].delta | |
if hasattr(delta, 'content') and delta.content: | |
content = delta.content | |
if hasattr(delta, 'tool_calls') and delta.tool_calls: | |
raw_tool_calls = delta.tool_calls | |
for tool_call in raw_tool_calls: | |
if isinstance(tool_call, dict): | |
processed_tool_calls.append(ToolCall.from_dict(tool_call)) | |
else: | |
# Handle OpenAI object | |
tool_call_dict = { | |
"id": tool_call.id if hasattr(tool_call, | |
'id') else f"call_{hash(str(tool_call)) & 0xffffffff:08x}", | |
"type": tool_call.type if hasattr(tool_call, 'type') else "function" | |
} | |
if hasattr(tool_call, 'function'): | |
function = tool_call.function | |
tool_call_dict["function"] = { | |
"name": function.name if hasattr(function, 'name') else None, | |
"arguments": function.arguments if hasattr(function, 'arguments') else None | |
} | |
processed_tool_calls.append(ToolCall.from_dict(tool_call_dict)) | |
elif isinstance(chunk, dict) and chunk.get('choices'): | |
delta = chunk['choices'][0].get('delta', {}) | |
if not delta: | |
delta = chunk['choices'][0].get('message', {}) | |
content = delta.get('content') | |
raw_tool_calls = delta.get('tool_calls') | |
if raw_tool_calls: | |
for tool_call in raw_tool_calls: | |
processed_tool_calls.append(ToolCall.from_dict(tool_call)) | |
# Extract usage information | |
usage = {} | |
if hasattr(chunk, 'usage'): | |
usage = { | |
"completion_tokens": chunk.usage.completion_tokens if hasattr(chunk.usage, 'completion_tokens') else 0, | |
"prompt_tokens": chunk.usage.prompt_tokens if hasattr(chunk.usage, 'prompt_tokens') else 0, | |
"total_tokens": chunk.usage.total_tokens if hasattr(chunk.usage, 'total_tokens') else 0 | |
} | |
elif isinstance(chunk, dict) and chunk.get('usage'): | |
usage = chunk['usage'] | |
# Create message object | |
message = { | |
"role": "assistant", | |
"content": content or "", | |
"tool_calls": [tool_call.to_dict() for tool_call in processed_tool_calls] if processed_tool_calls else None, | |
"is_chunk": True | |
} | |
# Create and return ModelResponse | |
return cls( | |
id=chunk.id if hasattr(chunk, 'id') else chunk.get('id', 'unknown'), | |
model=chunk.model if hasattr(chunk, 'model') else chunk.get('model', 'unknown'), | |
content=content, | |
tool_calls=processed_tool_calls or None, | |
usage=usage, | |
raw_response=chunk, | |
message=message | |
) | |
def from_anthropic_stream_chunk(cls, chunk: Any) -> 'ModelResponse': | |
""" | |
Create ModelResponse from Anthropic stream response chunk | |
Args: | |
chunk: Anthropic stream chunk | |
Returns: | |
ModelResponse object | |
Raises: | |
LLMResponseError: When LLM response error occurs | |
""" | |
try: | |
# Handle error cases | |
if not chunk or (isinstance(chunk, dict) and chunk.get('error')): | |
error_msg = chunk.get('error', 'Unknown error') if isinstance(chunk, dict) else 'Empty response' | |
raise LLMResponseError( | |
error_msg, | |
chunk.model if hasattr(chunk, 'model') else chunk.get('model', 'unknown'), | |
chunk) | |
# Handle stop reason (end of stream) | |
if hasattr(chunk, 'stop_reason') and chunk.stop_reason: | |
return cls( | |
id=chunk.id if hasattr(chunk, 'id') else 'unknown', | |
model=chunk.model if hasattr(chunk, 'model') else 'claude', | |
content=None, | |
raw_response=chunk, | |
message={"role": "assistant", "content": "", "stop_reason": chunk.stop_reason} | |
) | |
# Handle delta content | |
content = None | |
processed_tool_calls = [] | |
if hasattr(chunk, 'delta') and chunk.delta: | |
delta = chunk.delta | |
if hasattr(delta, 'text') and delta.text: | |
content = delta.text | |
elif hasattr(delta, 'tool_use') and delta.tool_use: | |
tool_call_dict = { | |
"id": f"call_{delta.tool_use.id}", | |
"type": "function", | |
"function": { | |
"name": delta.tool_use.name, | |
"arguments": delta.tool_use.input if isinstance(delta.tool_use.input, str) else json.dumps( | |
delta.tool_use.input, ensure_ascii=False) | |
} | |
} | |
processed_tool_calls.append(ToolCall.from_dict(tool_call_dict)) | |
# Create message object | |
message = { | |
"role": "assistant", | |
"content": content or "", | |
"tool_calls": [tool_call.to_dict() for tool_call in | |
processed_tool_calls] if processed_tool_calls else None, | |
"is_chunk": True | |
} | |
# Create and return ModelResponse | |
return cls( | |
id=chunk.id if hasattr(chunk, 'id') else 'unknown', | |
model=chunk.model if hasattr(chunk, 'model') else 'claude', | |
content=content, | |
tool_calls=processed_tool_calls or None, | |
raw_response=chunk, | |
message=message | |
) | |
except Exception as e: | |
if isinstance(e, LLMResponseError): | |
raise e | |
raise LLMResponseError( | |
f"Error processing Anthropic stream chunk: {str(e)}", | |
chunk.model if hasattr(chunk, 'model') else chunk.get('model', 'unknown'), | |
chunk) | |
def from_anthropic_response(cls, response: Any) -> 'ModelResponse': | |
""" | |
Create ModelResponse from Anthropic original response object | |
Args: | |
response: Anthropic response object | |
Returns: | |
ModelResponse object | |
Raises: | |
LLMResponseError: When LLM response error occurs | |
""" | |
try: | |
# Handle error cases | |
if not response or (isinstance(response, dict) and response.get('error')): | |
error_msg = response.get('error', 'Unknown error') if isinstance(response, dict) else 'Empty response' | |
raise LLMResponseError( | |
error_msg, | |
response.model if hasattr(response, 'model') else response.get('model', 'unknown'), | |
response) | |
# Build message content | |
message = { | |
"content": "", | |
"role": "assistant", | |
"tool_calls": None, | |
} | |
processed_tool_calls = [] | |
if hasattr(response, 'content') and response.content: | |
for content_block in response.content: | |
if content_block.type == "text": | |
message["content"] = content_block.text | |
elif content_block.type == "tool_use": | |
tool_call_dict = { | |
"id": f"call_{content_block.id}", | |
"type": "function", | |
"function": { | |
"name": content_block.name, | |
"arguments": content_block.input if isinstance(content_block.input, | |
str) else json.dumps(content_block.input) | |
} | |
} | |
processed_tool_calls.append(ToolCall.from_dict(tool_call_dict)) | |
else: | |
message["content"] = "" | |
if processed_tool_calls: | |
message["tool_calls"] = [tool_call.to_dict() for tool_call in processed_tool_calls] | |
# Extract usage information | |
usage = { | |
"completion_tokens": 0, | |
"prompt_tokens": 0, | |
"total_tokens": 0 | |
} | |
if hasattr(response, 'usage'): | |
if hasattr(response.usage, 'output_tokens'): | |
usage["completion_tokens"] = response.usage.output_tokens | |
if hasattr(response.usage, 'input_tokens'): | |
usage["prompt_tokens"] = response.usage.input_tokens | |
if hasattr(response.usage, 'input_tokens') and hasattr(response.usage, 'output_tokens'): | |
usage["total_tokens"] = response.usage.input_tokens + response.usage.output_tokens | |
# Create ModelResponse | |
return cls( | |
id=response.id if hasattr(response, | |
'id') else f"chatcmpl-anthropic-{hash(str(response)) & 0xffffffff:08x}", | |
model=response.model if hasattr(response, 'model') else "claude", | |
content=message["content"], | |
tool_calls=processed_tool_calls or None, | |
usage=usage, | |
raw_response=response, | |
message=message | |
) | |
except Exception as e: | |
if isinstance(e, LLMResponseError): | |
raise e | |
raise LLMResponseError( | |
f"Error processing Anthropic response: {str(e)}", | |
response.model if hasattr(response, 'model') else response.get('model', 'unknown'), | |
response) | |
def from_error(cls, error_msg: str, model: str = "unknown") -> 'ModelResponse': | |
""" | |
Create ModelResponse from error message | |
Args: | |
error_msg: Error message | |
model: Model name | |
Returns: | |
ModelResponse object | |
""" | |
return cls( | |
id="error", | |
model=model, | |
error=error_msg, | |
message={"role": "assistant", "content": f"Error: {error_msg}"} | |
) | |
def to_dict(self) -> Dict[str, Any]: | |
""" | |
Convert ModelResponse to dictionary representation | |
Returns: | |
Dictionary representation | |
""" | |
tool_calls_dict = None | |
if self.tool_calls: | |
tool_calls_dict = [tool_call.to_dict() for tool_call in self.tool_calls] | |
return { | |
"id": self.id, | |
"model": self.model, | |
"content": self.content, | |
"tool_calls": tool_calls_dict, | |
"usage": self.usage, | |
"error": self.error, | |
"message": self.message | |
} | |
def get_message(self) -> Dict[str, Any]: | |
""" | |
Return message object that can be directly used for subsequent API calls | |
Returns: | |
Message object dictionary | |
""" | |
return self.message | |
def serialize_tool_calls(self) -> List[Dict[str, Any]]: | |
""" | |
Convert tool call objects to JSON format, handling OpenAI object types | |
Returns: | |
List[Dict[str, Any]]: Tool calls list in JSON format | |
""" | |
if not self.tool_calls: | |
return [] | |
result = [] | |
for tool_call in self.tool_calls: | |
if hasattr(tool_call, 'to_dict'): | |
result.append(tool_call.to_dict()) | |
elif isinstance(tool_call, dict): | |
result.append(tool_call) | |
else: | |
result.append(str(tool_call)) | |
return result | |
def __repr__(self): | |
return json.dumps(self.to_dict(), ensure_ascii=False, indent=None, | |
default=lambda obj: obj.to_dict() if hasattr(obj, 'to_dict') else str(obj)) | |
def _serialize_message(self) -> Dict[str, Any]: | |
""" | |
Serialize message object | |
Returns: | |
Dict[str, Any]: Serialized message dictionary | |
""" | |
if not self.message: | |
return {} | |
result = {} | |
# Copy basic fields | |
for key, value in self.message.items(): | |
if key == 'tool_calls': | |
# Handle tool_calls | |
result[key] = self.serialize_tool_calls() | |
else: | |
result[key] = value | |
return result | |