minion-space / mcp_integration.py
femtowin's picture
Simplify filesystem tool to only use current directory absolute path
69ebf5d
import json
import logging
from contextlib import AsyncExitStack
from datetime import timedelta
from pathlib import Path
from typing import TYPE_CHECKING, Any, Dict, List, Literal, Optional, Union, overload, Callable
from typing_extensions import NotRequired, TypeAlias, TypedDict, Unpack
if TYPE_CHECKING:
from mcp import ClientSession
logger = logging.getLogger(__name__)
# Type alias for tool names
ToolName: TypeAlias = str
ServerType: TypeAlias = Literal["stdio", "sse", "http"]
class StdioServerParameters_T(TypedDict):
command: str
args: NotRequired[List[str]]
env: NotRequired[Dict[str, str]]
cwd: NotRequired[Union[str, Path, None]]
class SSEServerParameters_T(TypedDict):
url: str
headers: NotRequired[Dict[str, Any]]
timeout: NotRequired[float]
sse_read_timeout: NotRequired[float]
class StreamableHTTPParameters_T(TypedDict):
url: str
headers: NotRequired[dict[str, Any]]
timeout: NotRequired[timedelta]
sse_read_timeout: NotRequired[timedelta]
terminate_on_close: NotRequired[bool]
def format_mcp_result(result: Any) -> str:
#should we format mcp.types result to some result format handled by our framework?
return str(result)
"""Format MCP tool result for minion brain.step"""
# if isinstance(result, dict):
# # Handle MCP result format
# if "content" in result:
# content_items = result["content"]
# if isinstance(content_items, list):
# texts = []
# for item in content_items:
# if isinstance(item, dict) and item.get("type") == "text":
# texts.append(item.get("text", ""))
# return "\n".join(texts)
# elif isinstance(content_items, str):
# return content_items
#
# # Handle other dict formats
# if "text" in result:
# return result["text"]
#
# # Fallback to JSON string
# return json.dumps(result, indent=2)
#
# elif isinstance(result, str):
# return result
# else:
# return str(result)
class BrainTool:
"""
Adapter class to convert MCP tools to brain.step compatible format
"""
def __init__(self, name: str, description: str, parameters: Dict[str, Any], session: "ClientSession"):
self.name = name
self.description = description
self.parameters = parameters
self.session = session
# Add attributes expected by minion framework
self.__name__ = name
self.__doc__ = description
self.__input_schema__ = parameters
async def __call__(self, **kwargs) -> str:
"""Execute the tool with given parameters"""
try:
result = await self.session.call_tool(self.name, kwargs)
return format_mcp_result(result)
except Exception as e:
logger.error(f"Error executing tool {self.name}: {e}")
return f"Error: {str(e)}"
def to_function_spec(self) -> Dict[str, Any]:
"""Convert to function specification format for brain.step"""
return {
"type": "function",
"function": {
"name": self.name,
"description": self.description,
"parameters": self.parameters,
}
}
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary format"""
return {
"name": self.name,
"description": self.description,
"parameters": self.parameters
}
class MCPBrainClient:
"""
Client for connecting to MCP servers and providing tools to minion brain.step
"""
def __init__(self):
# Initialize MCP sessions as a dictionary of ClientSession objects
self.sessions: Dict[ToolName, "ClientSession"] = {}
self.exit_stack = AsyncExitStack()
self.available_tools: List[BrainTool] = []
async def __aenter__(self):
"""Enter the context manager"""
await self.exit_stack.__aenter__()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Exit the context manager"""
await self.cleanup()
async def cleanup(self):
"""Clean up resources"""
await self.exit_stack.aclose()
@overload
async def add_mcp_server(self, type: Literal["stdio"], **params: Unpack[StdioServerParameters_T]): ...
@overload
async def add_mcp_server(self, type: Literal["sse"], **params: Unpack[SSEServerParameters_T]): ...
@overload
async def add_mcp_server(self, type: Literal["http"], **params: Unpack[StreamableHTTPParameters_T]): ...
async def add_mcp_server(self, type: ServerType, **params: Any):
"""Connect to an MCP server and add its tools to available tools
Args:
type (`str`):
Type of the server to connect to. Can be one of:
- "stdio": Standard input/output server (local)
- "sse": Server-sent events (SSE) server
- "http": StreamableHTTP server
**params (`Dict[str, Any]`):
Server parameters that can be either:
- For stdio servers:
- command (str): The command to run the MCP server
- args (List[str], optional): Arguments for the command
- env (Dict[str, str], optional): Environment variables for the command
- cwd (Union[str, Path, None], optional): Working directory for the command
- For SSE servers:
- url (str): The URL of the SSE server
- headers (Dict[str, Any], optional): Headers for the SSE connection
- timeout (float, optional): Connection timeout
- sse_read_timeout (float, optional): SSE read timeout
- For StreamableHTTP servers:
- url (str): The URL of the StreamableHTTP server
- headers (Dict[str, Any], optional): Headers for the StreamableHTTP connection
- timeout (timedelta, optional): Connection timeout
- sse_read_timeout (timedelta, optional): SSE read timeout
- terminate_on_close (bool, optional): Whether to terminate on close
"""
from mcp import ClientSession, StdioServerParameters
from mcp import types as mcp_types
# Determine server type and create appropriate parameters
if type == "stdio":
# Handle stdio server
from mcp.client.stdio import stdio_client
logger.info(f"Connecting to stdio MCP server with command: {params['command']} {params.get('args', [])}")
client_kwargs = {"command": params["command"]}
for key in ["args", "env", "cwd"]:
if params.get(key) is not None:
client_kwargs[key] = params[key]
server_params = StdioServerParameters(**client_kwargs)
read, write = await self.exit_stack.enter_async_context(stdio_client(server_params))
elif type == "sse":
# Handle SSE server
from mcp.client.sse import sse_client
logger.info(f"Connecting to SSE MCP server at: {params['url']}")
client_kwargs = {"url": params["url"]}
for key in ["headers", "timeout", "sse_read_timeout"]:
if params.get(key) is not None:
client_kwargs[key] = params[key]
read, write = await self.exit_stack.enter_async_context(sse_client(**client_kwargs))
elif type == "http":
# Handle StreamableHTTP server
from mcp.client.streamable_http import streamablehttp_client
logger.info(f"Connecting to StreamableHTTP MCP server at: {params['url']}")
client_kwargs = {"url": params["url"]}
for key in ["headers", "timeout", "sse_read_timeout", "terminate_on_close"]:
if params.get(key) is not None:
client_kwargs[key] = params[key]
read, write, _ = await self.exit_stack.enter_async_context(streamablehttp_client(**client_kwargs))
else:
raise ValueError(f"Unsupported server type: {type}")
session = await self.exit_stack.enter_async_context(
ClientSession(
read_stream=read,
write_stream=write,
client_info=mcp_types.Implementation(
name="minion.MCPBrainClient",
version="1.0.0",
),
)
)
logger.debug("Initializing session...")
await session.initialize()
# List available tools
response = await session.list_tools()
logger.debug("Connected to server with tools:", [tool.name for tool in response.tools])
for tool in response.tools:
if tool.name in self.sessions:
logger.warning(f"Tool '{tool.name}' already defined by another server. Skipping.")
continue
# Map tool names to their server for later lookup
self.sessions[tool.name] = session
# Create BrainTool wrapper
brain_tool = BrainTool(
name=tool.name,
description=tool.description,
parameters=tool.inputSchema,
session=session
)
# Add tool to the list of available tools
self.available_tools.append(brain_tool)
def get_tools_for_brain(self) -> List[BrainTool]:
"""Get list of tools in the format expected by brain.step"""
return self.available_tools
def get_tool_functions(self) -> Dict[str, Callable]:
"""Get dictionary of tool functions for direct execution"""
return {tool.name: tool for tool in self.available_tools}
def get_tool_specs(self) -> List[Dict[str, Any]]:
"""Get list of tool specifications in ChatCompletion format"""
return [tool.to_function_spec() for tool in self.available_tools]
def get_tools_dict(self) -> List[Dict[str, Any]]:
"""Get list of tools as dictionaries"""
return [tool.to_dict() for tool in self.available_tools]
# Helper function to create final answer tool (example implementation)
def create_final_answer_tool() -> BrainTool:
"""
Create a final answer tool that can be used with brain.step
This is an example of how to create a local tool without MCP
"""
class FinalAnswerSession:
async def call_tool(self, name: str, args: Dict[str, Any]) -> Dict[str, Any]:
return {
"content": [
{
"type": "text",
"text": args.get("answer", "No answer provided")
}
]
}
session = FinalAnswerSession()
tool = BrainTool(
name="final_answer",
description="Provide the final answer to the user's question",
parameters={
"type": "object",
"properties": {
"answer": {
"type": "string",
"description": "The final answer to provide to the user"
}
},
"required": ["answer"]
},
session=session
)
return tool
def create_calculator_tool() -> BrainTool:
"""
Create a local calculator tool for basic arithmetic
"""
class CalculatorSession:
async def call_tool(self, name: str, args: Dict[str, Any]) -> Dict[str, Any]:
expression = args.get("expression", "")
try:
# Simple and safe evaluation for basic arithmetic
allowed_chars = set("0123456789+-*/()., ")
if not all(c in allowed_chars for c in expression):
raise ValueError("Invalid characters in expression")
result = eval(expression)
return {
"content": [
{
"type": "text",
"text": f"Calculation result: {expression} = {result}"
}
]
}
except Exception as e:
return {
"content": [
{
"type": "text",
"text": f"Error: Unable to calculate '{expression}': {str(e)}"
}
]
}
session = CalculatorSession()
tool = BrainTool(
name="calculator",
description="Perform basic arithmetic calculations",
parameters={
"type": "object",
"properties": {
"expression": {
"type": "string",
"description": "Mathematical expression to evaluate (e.g., '2 + 3 * 4')"
}
},
"required": ["expression"]
},
session=session
)
return tool
async def add_filesystem_tool(mcp_client: MCPBrainClient, workspace_paths: List[str] = None) -> None:
"""
Add filesystem MCP tool to the client
Args:
mcp_client: The MCP client to add the tool to
workspace_paths: List of paths to allow access to. Defaults to current directory.
"""
if workspace_paths is None:
import os
workspace_paths = [os.path.abspath(".")]
try:
await mcp_client.add_mcp_server(
"stdio",
command="npx",
args=["-y", "@modelcontextprotocol/server-filesystem"] + workspace_paths
)
logger.info(f"✓ Added filesystem tool with paths: {workspace_paths}")
except Exception as e:
logger.error(f"Failed to add filesystem tool: {e}")
raise
def create_filesystem_tool_factory(workspace_paths: List[str] = None):
"""
Create a factory function for the filesystem tool
Args:
workspace_paths: List of paths to allow access to
Returns:
Async function that adds filesystem tool to an MCP client
"""
if workspace_paths is None:
import os
workspace_paths = [os.path.abspath(".")]
async def add_to_client(mcp_client: MCPBrainClient):
return await add_filesystem_tool(mcp_client, workspace_paths)
return add_to_client
class MCPToolConfig:
"""Configuration for different MCP tools"""
FILESYSTEM_DEFAULT = {
"type": "stdio",
"command": "npx",
"args": ["-y", "@modelcontextprotocol/server-filesystem"],
"workspace_paths": None # Will be set to current directory at runtime
}
@staticmethod
def get_filesystem_config(workspace_paths: List[str] = None) -> Dict[str, Any]:
"""Get filesystem tool configuration"""
config = MCPToolConfig.FILESYSTEM_DEFAULT.copy()
if workspace_paths is None:
import os
workspace_paths = [os.path.abspath(".")]
config["workspace_paths"] = workspace_paths
config["args"] = ["-y", "@modelcontextprotocol/server-filesystem"] + workspace_paths
return config
# Example usage:
"""
# Initialize MCP client
async def example_usage():
async with MCPBrainClient() as mcp_client:
# Add MCP servers
await mcp_client.add_mcp_server("sse", url="http://localhost:8080/sse")
# Get tools for brain.step
mcp_tools = mcp_client.get_tools_for_brain()
# Add final answer tool
final_answer_tool = create_final_answer_tool()
all_tools = mcp_tools + [final_answer_tool]
# Use with brain.step
from minion.main.brain import Brain
from minion.main import LocalPythonEnv
from minion.providers import create_llm_provider
# Create brain instance (you'll need to configure this)
# brain = Brain(...)
# obs, score, *_ = await brain.step(
# query="what's the solution 234*568",
# route="raw",
# check=False,
# tools=all_tools
# )
"""