Spaces:
Sleeping
Sleeping
import json | |
import logging | |
from contextlib import AsyncExitStack | |
from datetime import timedelta | |
from pathlib import Path | |
from typing import TYPE_CHECKING, Any, Dict, List, Literal, Optional, Union, overload, Callable | |
from typing_extensions import NotRequired, TypeAlias, TypedDict, Unpack | |
if TYPE_CHECKING: | |
from mcp import ClientSession | |
logger = logging.getLogger(__name__) | |
# Type alias for tool names | |
ToolName: TypeAlias = str | |
ServerType: TypeAlias = Literal["stdio", "sse", "http"] | |
class StdioServerParameters_T(TypedDict): | |
command: str | |
args: NotRequired[List[str]] | |
env: NotRequired[Dict[str, str]] | |
cwd: NotRequired[Union[str, Path, None]] | |
class SSEServerParameters_T(TypedDict): | |
url: str | |
headers: NotRequired[Dict[str, Any]] | |
timeout: NotRequired[float] | |
sse_read_timeout: NotRequired[float] | |
class StreamableHTTPParameters_T(TypedDict): | |
url: str | |
headers: NotRequired[dict[str, Any]] | |
timeout: NotRequired[timedelta] | |
sse_read_timeout: NotRequired[timedelta] | |
terminate_on_close: NotRequired[bool] | |
def format_mcp_result(result: Any) -> str: | |
#should we format mcp.types result to some result format handled by our framework? | |
return str(result) | |
"""Format MCP tool result for minion brain.step""" | |
# if isinstance(result, dict): | |
# # Handle MCP result format | |
# if "content" in result: | |
# content_items = result["content"] | |
# if isinstance(content_items, list): | |
# texts = [] | |
# for item in content_items: | |
# if isinstance(item, dict) and item.get("type") == "text": | |
# texts.append(item.get("text", "")) | |
# return "\n".join(texts) | |
# elif isinstance(content_items, str): | |
# return content_items | |
# | |
# # Handle other dict formats | |
# if "text" in result: | |
# return result["text"] | |
# | |
# # Fallback to JSON string | |
# return json.dumps(result, indent=2) | |
# | |
# elif isinstance(result, str): | |
# return result | |
# else: | |
# return str(result) | |
class BrainTool: | |
""" | |
Adapter class to convert MCP tools to brain.step compatible format | |
""" | |
def __init__(self, name: str, description: str, parameters: Dict[str, Any], session: "ClientSession"): | |
self.name = name | |
self.description = description | |
self.parameters = parameters | |
self.session = session | |
# Add attributes expected by minion framework | |
self.__name__ = name | |
self.__doc__ = description | |
self.__input_schema__ = parameters | |
async def __call__(self, **kwargs) -> str: | |
"""Execute the tool with given parameters""" | |
try: | |
result = await self.session.call_tool(self.name, kwargs) | |
return format_mcp_result(result) | |
except Exception as e: | |
logger.error(f"Error executing tool {self.name}: {e}") | |
return f"Error: {str(e)}" | |
def to_function_spec(self) -> Dict[str, Any]: | |
"""Convert to function specification format for brain.step""" | |
return { | |
"type": "function", | |
"function": { | |
"name": self.name, | |
"description": self.description, | |
"parameters": self.parameters, | |
} | |
} | |
def to_dict(self) -> Dict[str, Any]: | |
"""Convert to dictionary format""" | |
return { | |
"name": self.name, | |
"description": self.description, | |
"parameters": self.parameters | |
} | |
class MCPBrainClient: | |
""" | |
Client for connecting to MCP servers and providing tools to minion brain.step | |
""" | |
def __init__(self): | |
# Initialize MCP sessions as a dictionary of ClientSession objects | |
self.sessions: Dict[ToolName, "ClientSession"] = {} | |
self.exit_stack = AsyncExitStack() | |
self.available_tools: List[BrainTool] = [] | |
async def __aenter__(self): | |
"""Enter the context manager""" | |
await self.exit_stack.__aenter__() | |
return self | |
async def __aexit__(self, exc_type, exc_val, exc_tb): | |
"""Exit the context manager""" | |
await self.cleanup() | |
async def cleanup(self): | |
"""Clean up resources""" | |
await self.exit_stack.aclose() | |
async def add_mcp_server(self, type: Literal["stdio"], **params: Unpack[StdioServerParameters_T]): ... | |
async def add_mcp_server(self, type: Literal["sse"], **params: Unpack[SSEServerParameters_T]): ... | |
async def add_mcp_server(self, type: Literal["http"], **params: Unpack[StreamableHTTPParameters_T]): ... | |
async def add_mcp_server(self, type: ServerType, **params: Any): | |
"""Connect to an MCP server and add its tools to available tools | |
Args: | |
type (`str`): | |
Type of the server to connect to. Can be one of: | |
- "stdio": Standard input/output server (local) | |
- "sse": Server-sent events (SSE) server | |
- "http": StreamableHTTP server | |
**params (`Dict[str, Any]`): | |
Server parameters that can be either: | |
- For stdio servers: | |
- command (str): The command to run the MCP server | |
- args (List[str], optional): Arguments for the command | |
- env (Dict[str, str], optional): Environment variables for the command | |
- cwd (Union[str, Path, None], optional): Working directory for the command | |
- For SSE servers: | |
- url (str): The URL of the SSE server | |
- headers (Dict[str, Any], optional): Headers for the SSE connection | |
- timeout (float, optional): Connection timeout | |
- sse_read_timeout (float, optional): SSE read timeout | |
- For StreamableHTTP servers: | |
- url (str): The URL of the StreamableHTTP server | |
- headers (Dict[str, Any], optional): Headers for the StreamableHTTP connection | |
- timeout (timedelta, optional): Connection timeout | |
- sse_read_timeout (timedelta, optional): SSE read timeout | |
- terminate_on_close (bool, optional): Whether to terminate on close | |
""" | |
from mcp import ClientSession, StdioServerParameters | |
from mcp import types as mcp_types | |
# Determine server type and create appropriate parameters | |
if type == "stdio": | |
# Handle stdio server | |
from mcp.client.stdio import stdio_client | |
logger.info(f"Connecting to stdio MCP server with command: {params['command']} {params.get('args', [])}") | |
client_kwargs = {"command": params["command"]} | |
for key in ["args", "env", "cwd"]: | |
if params.get(key) is not None: | |
client_kwargs[key] = params[key] | |
server_params = StdioServerParameters(**client_kwargs) | |
read, write = await self.exit_stack.enter_async_context(stdio_client(server_params)) | |
elif type == "sse": | |
# Handle SSE server | |
from mcp.client.sse import sse_client | |
logger.info(f"Connecting to SSE MCP server at: {params['url']}") | |
client_kwargs = {"url": params["url"]} | |
for key in ["headers", "timeout", "sse_read_timeout"]: | |
if params.get(key) is not None: | |
client_kwargs[key] = params[key] | |
read, write = await self.exit_stack.enter_async_context(sse_client(**client_kwargs)) | |
elif type == "http": | |
# Handle StreamableHTTP server | |
from mcp.client.streamable_http import streamablehttp_client | |
logger.info(f"Connecting to StreamableHTTP MCP server at: {params['url']}") | |
client_kwargs = {"url": params["url"]} | |
for key in ["headers", "timeout", "sse_read_timeout", "terminate_on_close"]: | |
if params.get(key) is not None: | |
client_kwargs[key] = params[key] | |
read, write, _ = await self.exit_stack.enter_async_context(streamablehttp_client(**client_kwargs)) | |
else: | |
raise ValueError(f"Unsupported server type: {type}") | |
session = await self.exit_stack.enter_async_context( | |
ClientSession( | |
read_stream=read, | |
write_stream=write, | |
client_info=mcp_types.Implementation( | |
name="minion.MCPBrainClient", | |
version="1.0.0", | |
), | |
) | |
) | |
logger.debug("Initializing session...") | |
await session.initialize() | |
# List available tools | |
response = await session.list_tools() | |
logger.debug("Connected to server with tools:", [tool.name for tool in response.tools]) | |
for tool in response.tools: | |
if tool.name in self.sessions: | |
logger.warning(f"Tool '{tool.name}' already defined by another server. Skipping.") | |
continue | |
# Map tool names to their server for later lookup | |
self.sessions[tool.name] = session | |
# Create BrainTool wrapper | |
brain_tool = BrainTool( | |
name=tool.name, | |
description=tool.description, | |
parameters=tool.inputSchema, | |
session=session | |
) | |
# Add tool to the list of available tools | |
self.available_tools.append(brain_tool) | |
def get_tools_for_brain(self) -> List[BrainTool]: | |
"""Get list of tools in the format expected by brain.step""" | |
return self.available_tools | |
def get_tool_functions(self) -> Dict[str, Callable]: | |
"""Get dictionary of tool functions for direct execution""" | |
return {tool.name: tool for tool in self.available_tools} | |
def get_tool_specs(self) -> List[Dict[str, Any]]: | |
"""Get list of tool specifications in ChatCompletion format""" | |
return [tool.to_function_spec() for tool in self.available_tools] | |
def get_tools_dict(self) -> List[Dict[str, Any]]: | |
"""Get list of tools as dictionaries""" | |
return [tool.to_dict() for tool in self.available_tools] | |
# Helper function to create final answer tool (example implementation) | |
def create_final_answer_tool() -> BrainTool: | |
""" | |
Create a final answer tool that can be used with brain.step | |
This is an example of how to create a local tool without MCP | |
""" | |
class FinalAnswerSession: | |
async def call_tool(self, name: str, args: Dict[str, Any]) -> Dict[str, Any]: | |
return { | |
"content": [ | |
{ | |
"type": "text", | |
"text": args.get("answer", "No answer provided") | |
} | |
] | |
} | |
session = FinalAnswerSession() | |
tool = BrainTool( | |
name="final_answer", | |
description="Provide the final answer to the user's question", | |
parameters={ | |
"type": "object", | |
"properties": { | |
"answer": { | |
"type": "string", | |
"description": "The final answer to provide to the user" | |
} | |
}, | |
"required": ["answer"] | |
}, | |
session=session | |
) | |
return tool | |
def create_calculator_tool() -> BrainTool: | |
""" | |
Create a local calculator tool for basic arithmetic | |
""" | |
class CalculatorSession: | |
async def call_tool(self, name: str, args: Dict[str, Any]) -> Dict[str, Any]: | |
expression = args.get("expression", "") | |
try: | |
# Simple and safe evaluation for basic arithmetic | |
allowed_chars = set("0123456789+-*/()., ") | |
if not all(c in allowed_chars for c in expression): | |
raise ValueError("Invalid characters in expression") | |
result = eval(expression) | |
return { | |
"content": [ | |
{ | |
"type": "text", | |
"text": f"Calculation result: {expression} = {result}" | |
} | |
] | |
} | |
except Exception as e: | |
return { | |
"content": [ | |
{ | |
"type": "text", | |
"text": f"Error: Unable to calculate '{expression}': {str(e)}" | |
} | |
] | |
} | |
session = CalculatorSession() | |
tool = BrainTool( | |
name="calculator", | |
description="Perform basic arithmetic calculations", | |
parameters={ | |
"type": "object", | |
"properties": { | |
"expression": { | |
"type": "string", | |
"description": "Mathematical expression to evaluate (e.g., '2 + 3 * 4')" | |
} | |
}, | |
"required": ["expression"] | |
}, | |
session=session | |
) | |
return tool | |
async def add_filesystem_tool(mcp_client: MCPBrainClient, workspace_paths: List[str] = None) -> None: | |
""" | |
Add filesystem MCP tool to the client | |
Args: | |
mcp_client: The MCP client to add the tool to | |
workspace_paths: List of paths to allow access to. Defaults to current directory. | |
""" | |
if workspace_paths is None: | |
import os | |
workspace_paths = [os.path.abspath(".")] | |
try: | |
await mcp_client.add_mcp_server( | |
"stdio", | |
command="npx", | |
args=["-y", "@modelcontextprotocol/server-filesystem"] + workspace_paths | |
) | |
logger.info(f"✓ Added filesystem tool with paths: {workspace_paths}") | |
except Exception as e: | |
logger.error(f"Failed to add filesystem tool: {e}") | |
raise | |
def create_filesystem_tool_factory(workspace_paths: List[str] = None): | |
""" | |
Create a factory function for the filesystem tool | |
Args: | |
workspace_paths: List of paths to allow access to | |
Returns: | |
Async function that adds filesystem tool to an MCP client | |
""" | |
if workspace_paths is None: | |
import os | |
workspace_paths = [os.path.abspath(".")] | |
async def add_to_client(mcp_client: MCPBrainClient): | |
return await add_filesystem_tool(mcp_client, workspace_paths) | |
return add_to_client | |
class MCPToolConfig: | |
"""Configuration for different MCP tools""" | |
FILESYSTEM_DEFAULT = { | |
"type": "stdio", | |
"command": "npx", | |
"args": ["-y", "@modelcontextprotocol/server-filesystem"], | |
"workspace_paths": None # Will be set to current directory at runtime | |
} | |
def get_filesystem_config(workspace_paths: List[str] = None) -> Dict[str, Any]: | |
"""Get filesystem tool configuration""" | |
config = MCPToolConfig.FILESYSTEM_DEFAULT.copy() | |
if workspace_paths is None: | |
import os | |
workspace_paths = [os.path.abspath(".")] | |
config["workspace_paths"] = workspace_paths | |
config["args"] = ["-y", "@modelcontextprotocol/server-filesystem"] + workspace_paths | |
return config | |
# Example usage: | |
""" | |
# Initialize MCP client | |
async def example_usage(): | |
async with MCPBrainClient() as mcp_client: | |
# Add MCP servers | |
await mcp_client.add_mcp_server("sse", url="http://localhost:8080/sse") | |
# Get tools for brain.step | |
mcp_tools = mcp_client.get_tools_for_brain() | |
# Add final answer tool | |
final_answer_tool = create_final_answer_tool() | |
all_tools = mcp_tools + [final_answer_tool] | |
# Use with brain.step | |
from minion.main.brain import Brain | |
from minion.main import LocalPythonEnv | |
from minion.providers import create_llm_provider | |
# Create brain instance (you'll need to configure this) | |
# brain = Brain(...) | |
# obs, score, *_ = await brain.step( | |
# query="what's the solution 234*568", | |
# route="raw", | |
# check=False, | |
# tools=all_tools | |
# ) | |
""" |