Spaces:
Paused
Paused
File size: 12,986 Bytes
2245de1 5b6d070 dcfe138 2245de1 5b6d070 2245de1 5e33bad 5b6d070 15914ab bda84dd 15914ab 5e33bad dcfe138 45eef8e dcfe138 2245de1 15914ab 2245de1 15914ab 2245de1 15914ab 2245de1 15914ab 2245de1 15914ab 2245de1 15914ab 2245de1 15914ab 2245de1 15914ab 2245de1 15914ab 2245de1 15914ab 2245de1 15914ab 2245de1 107bdd1 ecbb371 15914ab ecbb371 15914ab ecbb371 d9de825 bda84dd 2245de1 d9de825 45eef8e 5b6d070 15914ab 6926355 5b6d070 d0b7a26 15914ab 5b6d070 cad7d6a 15914ab 5b6d070 15914ab 5b6d070 15914ab f9046d5 d9de825 62935c3 cad7d6a d9de825 62935c3 d9de825 d0b7a26 cad7d6a dbcccf3 6926355 d9de825 6926355 d9de825 cad7d6a 2245de1 5b6d070 bda84dd 6926355 f9046d5 bda84dd d9de825 6926355 d9de825 bda84dd 5b6d070 15914ab dcfe138 bda84dd d9de825 bda84dd d9de825 6a9c2dc 45eef8e 6a9c2dc f1bc20d d9de825 f1bc20d d9de825 6a9c2dc efb6a3d dcfe138 f9046d5 dcfe138 f9046d5 d9de825 f9046d5 d9de825 6926355 d9de825 6926355 f9046d5 d9de825 6926355 d9de825 6926355 d9de825 6926355 d9de825 f9046d5 d9de825 f9046d5 d9de825 dbcccf3 dcfe138 dbcccf3 5b6d070 2245de1 5e33bad 15914ab 5e33bad 61df420 5e33bad |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 |
import os
import json
import requests
from fastapi import FastAPI, Request
from fastapi.responses import Response
from fastapi.middleware.cors import CORSMiddleware
from mcp.server.lowlevel import Server, NotificationOptions
from mcp.server.sse import SseServerTransport
from mcp import types as mcp_types
import uvicorn
from sse_starlette import EventSourceResponse
import anyio
import asyncio
import logging
from typing import Dict
# Set up logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
app = FastAPI()
# Add CORS middleware to allow Deep Agent to connect
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Adjust for production
allow_credentials=True,
allow_methods=["GET", "POST", "OPTIONS"],
allow_headers=["*"],
)
# Load environment variables
AIRTABLE_API_TOKEN = os.getenv("AIRTABLE_API_TOKEN")
AIRTABLE_BASE_ID = os.getenv("AIRTABLE_BASE_ID")
TABLE_ID = "tblQECi5f7m4y2NEV"
AIRTABLE_API_URL = f"https://api.airtable.com/v0/{AIRTABLE_BASE_ID}/{TABLE_ID}"
# Helper function for Airtable API requests
def airtable_request(method, endpoint="", data=None):
headers = {
"Authorization": f"Bearer {AIRTABLE_API_TOKEN}",
"Content-Type": "application/json"
}
url = f"{AIRTABLE_API_URL}/{endpoint}" if endpoint else AIRTABLE_API_URL
response = requests.request(method, url, headers=headers, json=data)
response.raise_for_status()
return response.json()
# Tool to list records
async def list_records_tool(request: mcp_types.CallToolRequest):
logger.debug(f"Received list_records_tool request: {request}")
try:
records = airtable_request("GET")
response = {
"success": True,
"result": json.dumps(records)
}
logger.debug(f"list_records_tool response: {response}")
return response
except Exception as e:
response = {
"success": False,
"error": str(e)
}
logger.error(f"list_records_tool error: {response}")
return response
# Tool to create a record
async def create_record_tool(request: mcp_types.CallToolRequest):
logger.debug(f"Received create_record_tool request: {request}")
try:
record_data = request.input.get("record_data", {})
data = {"records": [{"fields": record_data}]}
response_data = airtable_request("POST", data=data)
response = {
"success": True,
"result": json.dumps(response_data)
}
logger.debug(f"create_record_tool response: {response}")
return response
except Exception as e:
response = {
"success": False,
"error": str(e)
}
logger.error(f"create_record_tool error: {response}")
return response
# Define tools separately (for Deep Agent to discover them)
tools = [
mcp_types.Tool(
name="list_airtable_records",
description="Lists all records in the specified Airtable table",
inputSchema={}
),
mcp_types.Tool(
name="create_airtable_record",
description="Creates a new record in the specified Airtable table",
inputSchema={"record_data": {"type": "object"}}
)
]
# Define tool handlers
tool_handlers = {
"list_airtable_records": list_records_tool,
"create_airtable_record": create_record_tool
}
# Create MCP server
mcp_server = Server(name="airtable-mcp")
mcp_server.tool_handlers = tool_handlers # Set as attribute
mcp_server.tools = tools # Set tools as attribute for Deep Agent to discover
# Store write streams for each session ID (for SseServerTransport messages)
write_streams: Dict[str, anyio.streams.memory.MemoryObjectSendStream] = {}
# Store SSE stream writers for each session ID (for manual messages)
sse_stream_writers: Dict[str, anyio.streams.memory.MemoryObjectSendStream] = {}
# Initialize SseServerTransport
transport = SseServerTransport("/airtable/mcp")
# SSE endpoint for GET requests
@app.get("/airtable/mcp")
async def handle_sse(request: Request):
logger.debug("Handling SSE connection request")
session_id = None # We'll extract this later
async def sse_writer():
nonlocal session_id
logger.debug("Starting SSE writer")
async with sse_stream_writer, write_stream_reader:
# Send the initial endpoint event manually to capture the session_id
endpoint_data = "/airtable/mcp?session_id={session_id}"
await sse_stream_writer.send(
{"event": "endpoint", "data": endpoint_data}
)
logger.debug(f"Sent endpoint event: {endpoint_data}")
async for session_message in write_stream_reader:
# Handle messages from SseServerTransport
if hasattr(session_message, 'message'):
message_data = session_message.message.model_dump_json(by_alias=True, exclude_none=True)
event_data = json.loads(message_data)
logger.debug(f"Received SessionMessage from SseServerTransport: {event_data}")
else:
event_data = session_message
logger.debug(f"Received dict event from SseServerTransport: {event_data}")
# Extract session_id from the endpoint event
if not session_id and event_data.get("event") == "endpoint":
endpoint_url = event_data.get("data", "")
if "session_id=" in endpoint_url:
session_id = endpoint_url.split("session_id=")[1]
placeholder_id = f"placeholder_{id(write_stream)}"
if placeholder_id in write_streams:
write_streams[session_id] = write_streams.pop(placeholder_id)
sse_stream_writers[session_id] = sse_stream_writer
logger.debug(f"Updated placeholder {placeholder_id} to session_id {session_id}")
# Forward the event to the client
await sse_stream_writer.send({
"event": event_data.get("event", "message"),
"data": event_data.get("data", json.dumps(event_data))
})
sse_stream_writer, sse_stream_reader = anyio.create_memory_object_stream(0)
try:
async with transport.connect_sse(request.scope, request.receive, request._send) as streams:
read_stream, write_stream = streams
write_stream_reader = write_stream # Since streams are MemoryObject streams
# Store the write_stream with a placeholder ID
placeholder_id = f"placeholder_{id(write_stream)}"
write_streams[placeholder_id] = write_stream
logger.debug(f"Stored write_stream with placeholder_id: {placeholder_id}")
logger.debug("Running MCP server with streams")
await mcp_server.run(read_stream, write_stream, mcp_server.create_initialization_options())
except Exception as e:
logger.error(f"Error in handle_sse: {str(e)}")
# Clean up write_streams and sse_stream_writers on error
placeholder_id = f"placeholder_{id(write_stream)}"
write_streams.pop(placeholder_id, None)
if session_id:
write_streams.pop(session_id, None)
sse_stream_writers.pop(session_id, None)
raise
return EventSourceResponse(sse_stream_reader, data_sender_callable=sse_writer)
# Message handling endpoint for POST requests
@app.post("/airtable/mcp")
async def handle_post_message(request: Request):
logger.debug("Handling POST message request")
body = await request.body()
logger.debug(f"Received POST message body: {body}")
try:
message = json.loads(body.decode())
session_id = request.query_params.get("session_id")
# Use sse_stream_writers to send manual responses directly
sse_writer = sse_stream_writers.get(session_id) if session_id else None
write_stream = write_streams.get(session_id) if session_id else None
if message.get("method") == "initialize" and sse_writer:
logger.debug("Handling initialize request manually")
response = {
"jsonrpc": "2.0",
"id": message.get("id"),
"result": {
"protocolVersion": "2025-03-26",
"capabilities": {
"tools": {
"listChanged": True
},
"prompts": {
"listChanged": False
},
"resources": {
"subscribe": False,
"listChanged": False
},
"logging": {},
"experimental": {}
},
"serverInfo": {
"name": "airtable-mcp",
"version": "1.0.0"
},
"instructions": "Airtable MCP server for listing and creating records."
}
}
logger.debug(f"Manual initialize response: {response}")
response_data = json.dumps(response)
await sse_writer.send({
"event": "message",
"data": response_data
})
logger.debug(f"Sent initialize response directly via SSE for session {session_id}")
return Response(status_code=202)
if message.get("method") == "tools/list":
logger.debug("Handling tools/list request manually")
response = {
"jsonrpc": "2.0",
"id": message.get("id"),
"result": {
"tools": [tool.model_dump(by_alias=True, exclude_none=True) for tool in tools],
"nextCursor": None
}
}
logger.debug(f"Manual tools/list response: {response}")
response_data = json.dumps(response)
sent = False
# First, try sending directly via sse_writer
if sse_writer:
try:
await sse_writer.send({
"event": "message",
"data": response_data
})
logger.debug(f"Sent tools/list response directly via SSE for session {session_id}")
sent = True
except Exception as e:
logger.error(f"Error sending to session {session_id} via sse_writer: {str(e)}")
sse_stream_writers.pop(session_id, None)
# If not found or failed, look for a placeholder ID and update it
if not sent and write_stream:
for sid, ws in list(write_streams.items()):
if sid.startswith("placeholder_"):
try:
write_streams[session_id] = ws
sse_stream_writers[session_id] = sse_writer
write_streams.pop(sid, None)
await sse_writer.send({
"event": "message",
"data": response_data
})
logger.debug(f"Updated placeholder {sid} to session_id {session_id} and sent tools/list response")
sent = True
break
except Exception as e:
logger.error(f"Error sending to placeholder {sid}: {str(e)}")
write_streams.pop(sid, None)
sse_stream_writers.pop(session_id, None)
if not sent:
logger.warning(f"Failed to send tools/list response: no active write_streams or sse_writer found")
return Response(status_code=202)
# If neither sse_writer nor write_stream is available, log and handle gracefully
if not sse_writer and not write_stream:
logger.error(f"No sse_writer or write_stream found for session_id: {session_id}")
return Response(status_code=202)
await transport.handle_post_message(request.scope, request.receive, request._send)
logger.debug("POST message handled successfully")
except Exception as e:
logger.error(f"Error handling POST message: {str(e)}")
return Response(status_code=202)
return Response(status_code=202)
# Health check endpoint
@app.get("/health")
async def health_check():
return {"status": "healthy"}
# Endpoint to list tools (for debugging)
@app.get("/tools")
async def list_tools():
return {"tools": [tool.model_dump() for tool in tools]}
if __name__ == "__main__":
port = int(os.getenv("PORT", 7860))
uvicorn.run(app, host="0.0.0.0", port=port) |