import json import logging from contextlib import AsyncExitStack from datetime import timedelta from pathlib import Path from typing import TYPE_CHECKING, Any, Dict, List, Literal, Optional, Union, overload, Callable from typing_extensions import NotRequired, TypeAlias, TypedDict, Unpack if TYPE_CHECKING: from mcp import ClientSession logger = logging.getLogger(__name__) # Type alias for tool names ToolName: TypeAlias = str ServerType: TypeAlias = Literal["stdio", "sse", "http"] class StdioServerParameters_T(TypedDict): command: str args: NotRequired[List[str]] env: NotRequired[Dict[str, str]] cwd: NotRequired[Union[str, Path, None]] class SSEServerParameters_T(TypedDict): url: str headers: NotRequired[Dict[str, Any]] timeout: NotRequired[float] sse_read_timeout: NotRequired[float] class StreamableHTTPParameters_T(TypedDict): url: str headers: NotRequired[dict[str, Any]] timeout: NotRequired[timedelta] sse_read_timeout: NotRequired[timedelta] terminate_on_close: NotRequired[bool] def format_mcp_result(result: Any) -> str: #should we format mcp.types result to some result format handled by our framework? return str(result) """Format MCP tool result for minion brain.step""" # if isinstance(result, dict): # # Handle MCP result format # if "content" in result: # content_items = result["content"] # if isinstance(content_items, list): # texts = [] # for item in content_items: # if isinstance(item, dict) and item.get("type") == "text": # texts.append(item.get("text", "")) # return "\n".join(texts) # elif isinstance(content_items, str): # return content_items # # # Handle other dict formats # if "text" in result: # return result["text"] # # # Fallback to JSON string # return json.dumps(result, indent=2) # # elif isinstance(result, str): # return result # else: # return str(result) class BrainTool: """ Adapter class to convert MCP tools to brain.step compatible format """ def __init__(self, name: str, description: str, parameters: Dict[str, Any], session: "ClientSession"): self.name = name self.description = description self.parameters = parameters self.session = session # Add attributes expected by minion framework self.__name__ = name self.__doc__ = description self.__input_schema__ = parameters async def __call__(self, **kwargs) -> str: """Execute the tool with given parameters""" try: result = await self.session.call_tool(self.name, kwargs) return format_mcp_result(result) except Exception as e: logger.error(f"Error executing tool {self.name}: {e}") return f"Error: {str(e)}" def to_function_spec(self) -> Dict[str, Any]: """Convert to function specification format for brain.step""" return { "type": "function", "function": { "name": self.name, "description": self.description, "parameters": self.parameters, } } def to_dict(self) -> Dict[str, Any]: """Convert to dictionary format""" return { "name": self.name, "description": self.description, "parameters": self.parameters } class MCPBrainClient: """ Client for connecting to MCP servers and providing tools to minion brain.step """ def __init__(self): # Initialize MCP sessions as a dictionary of ClientSession objects self.sessions: Dict[ToolName, "ClientSession"] = {} self.exit_stack = AsyncExitStack() self.available_tools: List[BrainTool] = [] async def __aenter__(self): """Enter the context manager""" await self.exit_stack.__aenter__() return self async def __aexit__(self, exc_type, exc_val, exc_tb): """Exit the context manager""" await self.cleanup() async def cleanup(self): """Clean up resources""" await self.exit_stack.aclose() @overload async def add_mcp_server(self, type: Literal["stdio"], **params: Unpack[StdioServerParameters_T]): ... @overload async def add_mcp_server(self, type: Literal["sse"], **params: Unpack[SSEServerParameters_T]): ... @overload async def add_mcp_server(self, type: Literal["http"], **params: Unpack[StreamableHTTPParameters_T]): ... async def add_mcp_server(self, type: ServerType, **params: Any): """Connect to an MCP server and add its tools to available tools Args: type (`str`): Type of the server to connect to. Can be one of: - "stdio": Standard input/output server (local) - "sse": Server-sent events (SSE) server - "http": StreamableHTTP server **params (`Dict[str, Any]`): Server parameters that can be either: - For stdio servers: - command (str): The command to run the MCP server - args (List[str], optional): Arguments for the command - env (Dict[str, str], optional): Environment variables for the command - cwd (Union[str, Path, None], optional): Working directory for the command - For SSE servers: - url (str): The URL of the SSE server - headers (Dict[str, Any], optional): Headers for the SSE connection - timeout (float, optional): Connection timeout - sse_read_timeout (float, optional): SSE read timeout - For StreamableHTTP servers: - url (str): The URL of the StreamableHTTP server - headers (Dict[str, Any], optional): Headers for the StreamableHTTP connection - timeout (timedelta, optional): Connection timeout - sse_read_timeout (timedelta, optional): SSE read timeout - terminate_on_close (bool, optional): Whether to terminate on close """ from mcp import ClientSession, StdioServerParameters from mcp import types as mcp_types # Determine server type and create appropriate parameters if type == "stdio": # Handle stdio server from mcp.client.stdio import stdio_client logger.info(f"Connecting to stdio MCP server with command: {params['command']} {params.get('args', [])}") client_kwargs = {"command": params["command"]} for key in ["args", "env", "cwd"]: if params.get(key) is not None: client_kwargs[key] = params[key] server_params = StdioServerParameters(**client_kwargs) read, write = await self.exit_stack.enter_async_context(stdio_client(server_params)) elif type == "sse": # Handle SSE server from mcp.client.sse import sse_client logger.info(f"Connecting to SSE MCP server at: {params['url']}") client_kwargs = {"url": params["url"]} for key in ["headers", "timeout", "sse_read_timeout"]: if params.get(key) is not None: client_kwargs[key] = params[key] read, write = await self.exit_stack.enter_async_context(sse_client(**client_kwargs)) elif type == "http": # Handle StreamableHTTP server from mcp.client.streamable_http import streamablehttp_client logger.info(f"Connecting to StreamableHTTP MCP server at: {params['url']}") client_kwargs = {"url": params["url"]} for key in ["headers", "timeout", "sse_read_timeout", "terminate_on_close"]: if params.get(key) is not None: client_kwargs[key] = params[key] read, write, _ = await self.exit_stack.enter_async_context(streamablehttp_client(**client_kwargs)) else: raise ValueError(f"Unsupported server type: {type}") session = await self.exit_stack.enter_async_context( ClientSession( read_stream=read, write_stream=write, client_info=mcp_types.Implementation( name="minion.MCPBrainClient", version="1.0.0", ), ) ) logger.debug("Initializing session...") await session.initialize() # List available tools response = await session.list_tools() logger.debug("Connected to server with tools:", [tool.name for tool in response.tools]) for tool in response.tools: if tool.name in self.sessions: logger.warning(f"Tool '{tool.name}' already defined by another server. Skipping.") continue # Map tool names to their server for later lookup self.sessions[tool.name] = session # Create BrainTool wrapper brain_tool = BrainTool( name=tool.name, description=tool.description, parameters=tool.inputSchema, session=session ) # Add tool to the list of available tools self.available_tools.append(brain_tool) def get_tools_for_brain(self) -> List[BrainTool]: """Get list of tools in the format expected by brain.step""" return self.available_tools def get_tool_functions(self) -> Dict[str, Callable]: """Get dictionary of tool functions for direct execution""" return {tool.name: tool for tool in self.available_tools} def get_tool_specs(self) -> List[Dict[str, Any]]: """Get list of tool specifications in ChatCompletion format""" return [tool.to_function_spec() for tool in self.available_tools] def get_tools_dict(self) -> List[Dict[str, Any]]: """Get list of tools as dictionaries""" return [tool.to_dict() for tool in self.available_tools] # Helper function to create final answer tool (example implementation) def create_final_answer_tool() -> BrainTool: """ Create a final answer tool that can be used with brain.step This is an example of how to create a local tool without MCP """ class FinalAnswerSession: async def call_tool(self, name: str, args: Dict[str, Any]) -> Dict[str, Any]: return { "content": [ { "type": "text", "text": args.get("answer", "No answer provided") } ] } session = FinalAnswerSession() tool = BrainTool( name="final_answer", description="Provide the final answer to the user's question", parameters={ "type": "object", "properties": { "answer": { "type": "string", "description": "The final answer to provide to the user" } }, "required": ["answer"] }, session=session ) return tool def create_calculator_tool() -> BrainTool: """ Create a local calculator tool for basic arithmetic """ class CalculatorSession: async def call_tool(self, name: str, args: Dict[str, Any]) -> Dict[str, Any]: expression = args.get("expression", "") try: # Simple and safe evaluation for basic arithmetic allowed_chars = set("0123456789+-*/()., ") if not all(c in allowed_chars for c in expression): raise ValueError("Invalid characters in expression") result = eval(expression) return { "content": [ { "type": "text", "text": f"Calculation result: {expression} = {result}" } ] } except Exception as e: return { "content": [ { "type": "text", "text": f"Error: Unable to calculate '{expression}': {str(e)}" } ] } session = CalculatorSession() tool = BrainTool( name="calculator", description="Perform basic arithmetic calculations", parameters={ "type": "object", "properties": { "expression": { "type": "string", "description": "Mathematical expression to evaluate (e.g., '2 + 3 * 4')" } }, "required": ["expression"] }, session=session ) return tool async def add_filesystem_tool(mcp_client: MCPBrainClient, workspace_paths: List[str] = None) -> None: """ Add filesystem MCP tool to the client Args: mcp_client: The MCP client to add the tool to workspace_paths: List of paths to allow access to. Defaults to current directory. """ if workspace_paths is None: import os workspace_paths = [os.path.abspath(".")] try: await mcp_client.add_mcp_server( "stdio", command="npx", args=["-y", "@modelcontextprotocol/server-filesystem"] + workspace_paths ) logger.info(f"✓ Added filesystem tool with paths: {workspace_paths}") except Exception as e: logger.error(f"Failed to add filesystem tool: {e}") raise def create_filesystem_tool_factory(workspace_paths: List[str] = None): """ Create a factory function for the filesystem tool Args: workspace_paths: List of paths to allow access to Returns: Async function that adds filesystem tool to an MCP client """ if workspace_paths is None: import os workspace_paths = [os.path.abspath(".")] async def add_to_client(mcp_client: MCPBrainClient): return await add_filesystem_tool(mcp_client, workspace_paths) return add_to_client class MCPToolConfig: """Configuration for different MCP tools""" FILESYSTEM_DEFAULT = { "type": "stdio", "command": "npx", "args": ["-y", "@modelcontextprotocol/server-filesystem"], "workspace_paths": None # Will be set to current directory at runtime } @staticmethod def get_filesystem_config(workspace_paths: List[str] = None) -> Dict[str, Any]: """Get filesystem tool configuration""" config = MCPToolConfig.FILESYSTEM_DEFAULT.copy() if workspace_paths is None: import os workspace_paths = [os.path.abspath(".")] config["workspace_paths"] = workspace_paths config["args"] = ["-y", "@modelcontextprotocol/server-filesystem"] + workspace_paths return config # Example usage: """ # Initialize MCP client async def example_usage(): async with MCPBrainClient() as mcp_client: # Add MCP servers await mcp_client.add_mcp_server("sse", url="http://localhost:8080/sse") # Get tools for brain.step mcp_tools = mcp_client.get_tools_for_brain() # Add final answer tool final_answer_tool = create_final_answer_tool() all_tools = mcp_tools + [final_answer_tool] # Use with brain.step from minion.main.brain import Brain from minion.main import LocalPythonEnv from minion.providers import create_llm_provider # Create brain instance (you'll need to configure this) # brain = Brain(...) # obs, score, *_ = await brain.step( # query="what's the solution 234*568", # route="raw", # check=False, # tools=all_tools # ) """