File size: 16,430 Bytes
b55e829
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
69ebf5d
b55e829
 
69ebf5d
 
b55e829
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
69ebf5d
 
b55e829
 
 
 
 
 
 
 
 
 
 
 
 
 
69ebf5d
b55e829
 
 
 
 
 
69ebf5d
 
 
 
 
 
b55e829
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
import json
import logging
from contextlib import AsyncExitStack
from datetime import timedelta
from pathlib import Path
from typing import TYPE_CHECKING, Any, Dict, List, Literal, Optional, Union, overload, Callable

from typing_extensions import NotRequired, TypeAlias, TypedDict, Unpack

if TYPE_CHECKING:
    from mcp import ClientSession

logger = logging.getLogger(__name__)

# Type alias for tool names
ToolName: TypeAlias = str

ServerType: TypeAlias = Literal["stdio", "sse", "http"]


class StdioServerParameters_T(TypedDict):
    command: str
    args: NotRequired[List[str]]
    env: NotRequired[Dict[str, str]]
    cwd: NotRequired[Union[str, Path, None]]


class SSEServerParameters_T(TypedDict):
    url: str
    headers: NotRequired[Dict[str, Any]]
    timeout: NotRequired[float]
    sse_read_timeout: NotRequired[float]


class StreamableHTTPParameters_T(TypedDict):
    url: str
    headers: NotRequired[dict[str, Any]]
    timeout: NotRequired[timedelta]
    sse_read_timeout: NotRequired[timedelta]
    terminate_on_close: NotRequired[bool]


def format_mcp_result(result: Any) -> str:
    #should we format mcp.types result to some result format handled by our framework?
    return str(result)
    """Format MCP tool result for minion brain.step"""
    # if isinstance(result, dict):
    #     # Handle MCP result format
    #     if "content" in result:
    #         content_items = result["content"]
    #         if isinstance(content_items, list):
    #             texts = []
    #             for item in content_items:
    #                 if isinstance(item, dict) and item.get("type") == "text":
    #                     texts.append(item.get("text", ""))
    #             return "\n".join(texts)
    #         elif isinstance(content_items, str):
    #             return content_items
    #
    #     # Handle other dict formats
    #     if "text" in result:
    #         return result["text"]
    #
    #     # Fallback to JSON string
    #     return json.dumps(result, indent=2)
    #
    # elif isinstance(result, str):
    #     return result
    # else:
    #     return str(result)


class BrainTool:
    """
    Adapter class to convert MCP tools to brain.step compatible format
    """
    def __init__(self, name: str, description: str, parameters: Dict[str, Any], session: "ClientSession"):
        self.name = name
        self.description = description
        self.parameters = parameters
        self.session = session
        
        # Add attributes expected by minion framework
        self.__name__ = name
        self.__doc__ = description
        self.__input_schema__ = parameters
        
    async def __call__(self, **kwargs) -> str:
        """Execute the tool with given parameters"""
        try:
            result = await self.session.call_tool(self.name, kwargs)
            return format_mcp_result(result)
        except Exception as e:
            logger.error(f"Error executing tool {self.name}: {e}")
            return f"Error: {str(e)}"
    
    def to_function_spec(self) -> Dict[str, Any]:
        """Convert to function specification format for brain.step"""
        return {
            "type": "function",
            "function": {
                "name": self.name,
                "description": self.description,
                "parameters": self.parameters,
            }
        }
    
    def to_dict(self) -> Dict[str, Any]:
        """Convert to dictionary format"""
        return {
            "name": self.name,
            "description": self.description,
            "parameters": self.parameters
        }


class MCPBrainClient:
    """
    Client for connecting to MCP servers and providing tools to minion brain.step
    """
    
    def __init__(self):
        # Initialize MCP sessions as a dictionary of ClientSession objects
        self.sessions: Dict[ToolName, "ClientSession"] = {}
        self.exit_stack = AsyncExitStack()
        self.available_tools: List[BrainTool] = []

    async def __aenter__(self):
        """Enter the context manager"""
        await self.exit_stack.__aenter__()
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """Exit the context manager"""
        await self.cleanup()

    async def cleanup(self):
        """Clean up resources"""
        await self.exit_stack.aclose()

    @overload
    async def add_mcp_server(self, type: Literal["stdio"], **params: Unpack[StdioServerParameters_T]): ...

    @overload
    async def add_mcp_server(self, type: Literal["sse"], **params: Unpack[SSEServerParameters_T]): ...

    @overload
    async def add_mcp_server(self, type: Literal["http"], **params: Unpack[StreamableHTTPParameters_T]): ...

    async def add_mcp_server(self, type: ServerType, **params: Any):
        """Connect to an MCP server and add its tools to available tools

        Args:
            type (`str`):
                Type of the server to connect to. Can be one of:
                - "stdio": Standard input/output server (local)
                - "sse": Server-sent events (SSE) server
                - "http": StreamableHTTP server
            **params (`Dict[str, Any]`):
                Server parameters that can be either:
                    - For stdio servers:
                        - command (str): The command to run the MCP server
                        - args (List[str], optional): Arguments for the command
                        - env (Dict[str, str], optional): Environment variables for the command
                        - cwd (Union[str, Path, None], optional): Working directory for the command
                    - For SSE servers:
                        - url (str): The URL of the SSE server
                        - headers (Dict[str, Any], optional): Headers for the SSE connection
                        - timeout (float, optional): Connection timeout
                        - sse_read_timeout (float, optional): SSE read timeout
                    - For StreamableHTTP servers:
                        - url (str): The URL of the StreamableHTTP server
                        - headers (Dict[str, Any], optional): Headers for the StreamableHTTP connection
                        - timeout (timedelta, optional): Connection timeout
                        - sse_read_timeout (timedelta, optional): SSE read timeout
                        - terminate_on_close (bool, optional): Whether to terminate on close
        """
        from mcp import ClientSession, StdioServerParameters
        from mcp import types as mcp_types

        # Determine server type and create appropriate parameters
        if type == "stdio":
            # Handle stdio server
            from mcp.client.stdio import stdio_client

            logger.info(f"Connecting to stdio MCP server with command: {params['command']} {params.get('args', [])}")

            client_kwargs = {"command": params["command"]}
            for key in ["args", "env", "cwd"]:
                if params.get(key) is not None:
                    client_kwargs[key] = params[key]
            server_params = StdioServerParameters(**client_kwargs)
            read, write = await self.exit_stack.enter_async_context(stdio_client(server_params))
        elif type == "sse":
            # Handle SSE server
            from mcp.client.sse import sse_client

            logger.info(f"Connecting to SSE MCP server at: {params['url']}")

            client_kwargs = {"url": params["url"]}
            for key in ["headers", "timeout", "sse_read_timeout"]:
                if params.get(key) is not None:
                    client_kwargs[key] = params[key]
            read, write = await self.exit_stack.enter_async_context(sse_client(**client_kwargs))
        elif type == "http":
            # Handle StreamableHTTP server
            from mcp.client.streamable_http import streamablehttp_client

            logger.info(f"Connecting to StreamableHTTP MCP server at: {params['url']}")

            client_kwargs = {"url": params["url"]}
            for key in ["headers", "timeout", "sse_read_timeout", "terminate_on_close"]:
                if params.get(key) is not None:
                    client_kwargs[key] = params[key]
            read, write, _ = await self.exit_stack.enter_async_context(streamablehttp_client(**client_kwargs))
        else:
            raise ValueError(f"Unsupported server type: {type}")

        session = await self.exit_stack.enter_async_context(
            ClientSession(
                read_stream=read,
                write_stream=write,
                client_info=mcp_types.Implementation(
                    name="minion.MCPBrainClient",
                    version="1.0.0",
                ),
            )
        )

        logger.debug("Initializing session...")
        await session.initialize()

        # List available tools
        response = await session.list_tools()
        logger.debug("Connected to server with tools:", [tool.name for tool in response.tools])

        for tool in response.tools:
            if tool.name in self.sessions:
                logger.warning(f"Tool '{tool.name}' already defined by another server. Skipping.")
                continue

            # Map tool names to their server for later lookup
            self.sessions[tool.name] = session

            # Create BrainTool wrapper
            brain_tool = BrainTool(
                name=tool.name,
                description=tool.description,
                parameters=tool.inputSchema,
                session=session
            )
            
            # Add tool to the list of available tools
            self.available_tools.append(brain_tool)

    def get_tools_for_brain(self) -> List[BrainTool]:
        """Get list of tools in the format expected by brain.step"""
        return self.available_tools

    def get_tool_functions(self) -> Dict[str, Callable]:
        """Get dictionary of tool functions for direct execution"""
        return {tool.name: tool for tool in self.available_tools}

    def get_tool_specs(self) -> List[Dict[str, Any]]:
        """Get list of tool specifications in ChatCompletion format"""
        return [tool.to_function_spec() for tool in self.available_tools]

    def get_tools_dict(self) -> List[Dict[str, Any]]:
        """Get list of tools as dictionaries"""
        return [tool.to_dict() for tool in self.available_tools]


# Helper function to create final answer tool (example implementation)
def create_final_answer_tool() -> BrainTool:
    """
    Create a final answer tool that can be used with brain.step
    This is an example of how to create a local tool without MCP
    """
    class FinalAnswerSession:
        async def call_tool(self, name: str, args: Dict[str, Any]) -> Dict[str, Any]:
            return {
                "content": [
                    {
                        "type": "text",
                        "text": args.get("answer", "No answer provided")
                    }
                ]
            }
    
    session = FinalAnswerSession()
    
    tool = BrainTool(
        name="final_answer",
        description="Provide the final answer to the user's question",
        parameters={
            "type": "object",
            "properties": {
                "answer": {
                    "type": "string",
                    "description": "The final answer to provide to the user"
                }
            },
            "required": ["answer"]
        },
        session=session
    )
    
    return tool


def create_calculator_tool() -> BrainTool:
    """
    Create a local calculator tool for basic arithmetic
    """
    class CalculatorSession:
        async def call_tool(self, name: str, args: Dict[str, Any]) -> Dict[str, Any]:
            expression = args.get("expression", "")
            try:
                # Simple and safe evaluation for basic arithmetic
                allowed_chars = set("0123456789+-*/()., ")
                if not all(c in allowed_chars for c in expression):
                    raise ValueError("Invalid characters in expression")
                
                result = eval(expression)
                return {
                    "content": [
                        {
                            "type": "text",
                            "text": f"Calculation result: {expression} = {result}"
                        }
                    ]
                }
            except Exception as e:
                return {
                    "content": [
                        {
                            "type": "text",
                            "text": f"Error: Unable to calculate '{expression}': {str(e)}"
                        }
                    ]
                }
    
    session = CalculatorSession()
    
    tool = BrainTool(
        name="calculator",
        description="Perform basic arithmetic calculations",
        parameters={
            "type": "object",
            "properties": {
                "expression": {
                    "type": "string",
                    "description": "Mathematical expression to evaluate (e.g., '2 + 3 * 4')"
                }
            },
            "required": ["expression"]
        },
        session=session
    )
    
    return tool


async def add_filesystem_tool(mcp_client: MCPBrainClient, workspace_paths: List[str] = None) -> None:
    """
    Add filesystem MCP tool to the client
    
    Args:
        mcp_client: The MCP client to add the tool to
        workspace_paths: List of paths to allow access to. Defaults to current directory.
    """
    if workspace_paths is None:
        import os
        workspace_paths = [os.path.abspath(".")]
    
    try:
        await mcp_client.add_mcp_server(
            "stdio",
            command="npx",
            args=["-y", "@modelcontextprotocol/server-filesystem"] + workspace_paths
        )
        logger.info(f"✓ Added filesystem tool with paths: {workspace_paths}")
    except Exception as e:
        logger.error(f"Failed to add filesystem tool: {e}")
        raise


def create_filesystem_tool_factory(workspace_paths: List[str] = None):
    """
    Create a factory function for the filesystem tool
    
    Args:
        workspace_paths: List of paths to allow access to
        
    Returns:
        Async function that adds filesystem tool to an MCP client
    """
    if workspace_paths is None:
        import os
        workspace_paths = [os.path.abspath(".")]
    
    async def add_to_client(mcp_client: MCPBrainClient):
        return await add_filesystem_tool(mcp_client, workspace_paths)
    
    return add_to_client


class MCPToolConfig:
    """Configuration for different MCP tools"""
    
    FILESYSTEM_DEFAULT = {
        "type": "stdio",
        "command": "npx",
        "args": ["-y", "@modelcontextprotocol/server-filesystem"],
        "workspace_paths": None  # Will be set to current directory at runtime
    }
    
    @staticmethod
    def get_filesystem_config(workspace_paths: List[str] = None) -> Dict[str, Any]:
        """Get filesystem tool configuration"""
        config = MCPToolConfig.FILESYSTEM_DEFAULT.copy()
        if workspace_paths is None:
            import os
            workspace_paths = [os.path.abspath(".")]
        
        config["workspace_paths"] = workspace_paths
        config["args"] = ["-y", "@modelcontextprotocol/server-filesystem"] + workspace_paths
        return config


# Example usage:
"""
# Initialize MCP client
async def example_usage():
    async with MCPBrainClient() as mcp_client:
        # Add MCP servers
        await mcp_client.add_mcp_server("sse", url="http://localhost:8080/sse")
        
        # Get tools for brain.step
        mcp_tools = mcp_client.get_tools_for_brain()
        
        # Add final answer tool
        final_answer_tool = create_final_answer_tool()
        all_tools = mcp_tools + [final_answer_tool]
        
        # Use with brain.step
        from minion.main.brain import Brain
        from minion.main import LocalPythonEnv
        from minion.providers import create_llm_provider
        
        # Create brain instance (you'll need to configure this)
        # brain = Brain(...)
        
        # obs, score, *_ = await brain.step(
        #     query="what's the solution 234*568",
        #     route="raw",
        #     check=False,
        #     tools=all_tools
        # )
"""