airtable-mcp / app.py
aeonshift's picture
Update app.py
d9de825 verified
import os
import json
import requests
from fastapi import FastAPI, Request
from fastapi.responses import Response
from fastapi.middleware.cors import CORSMiddleware
from mcp.server.lowlevel import Server, NotificationOptions
from mcp.server.sse import SseServerTransport
from mcp import types as mcp_types
import uvicorn
from sse_starlette import EventSourceResponse
import anyio
import asyncio
import logging
from typing import Dict
# Set up logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
app = FastAPI()
# Add CORS middleware to allow Deep Agent to connect
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Adjust for production
allow_credentials=True,
allow_methods=["GET", "POST", "OPTIONS"],
allow_headers=["*"],
)
# Load environment variables
AIRTABLE_API_TOKEN = os.getenv("AIRTABLE_API_TOKEN")
AIRTABLE_BASE_ID = os.getenv("AIRTABLE_BASE_ID")
TABLE_ID = "tblQECi5f7m4y2NEV"
AIRTABLE_API_URL = f"https://api.airtable.com/v0/{AIRTABLE_BASE_ID}/{TABLE_ID}"
# Helper function for Airtable API requests
def airtable_request(method, endpoint="", data=None):
headers = {
"Authorization": f"Bearer {AIRTABLE_API_TOKEN}",
"Content-Type": "application/json"
}
url = f"{AIRTABLE_API_URL}/{endpoint}" if endpoint else AIRTABLE_API_URL
response = requests.request(method, url, headers=headers, json=data)
response.raise_for_status()
return response.json()
# Tool to list records
async def list_records_tool(request: mcp_types.CallToolRequest):
logger.debug(f"Received list_records_tool request: {request}")
try:
records = airtable_request("GET")
response = {
"success": True,
"result": json.dumps(records)
}
logger.debug(f"list_records_tool response: {response}")
return response
except Exception as e:
response = {
"success": False,
"error": str(e)
}
logger.error(f"list_records_tool error: {response}")
return response
# Tool to create a record
async def create_record_tool(request: mcp_types.CallToolRequest):
logger.debug(f"Received create_record_tool request: {request}")
try:
record_data = request.input.get("record_data", {})
data = {"records": [{"fields": record_data}]}
response_data = airtable_request("POST", data=data)
response = {
"success": True,
"result": json.dumps(response_data)
}
logger.debug(f"create_record_tool response: {response}")
return response
except Exception as e:
response = {
"success": False,
"error": str(e)
}
logger.error(f"create_record_tool error: {response}")
return response
# Define tools separately (for Deep Agent to discover them)
tools = [
mcp_types.Tool(
name="list_airtable_records",
description="Lists all records in the specified Airtable table",
inputSchema={}
),
mcp_types.Tool(
name="create_airtable_record",
description="Creates a new record in the specified Airtable table",
inputSchema={"record_data": {"type": "object"}}
)
]
# Define tool handlers
tool_handlers = {
"list_airtable_records": list_records_tool,
"create_airtable_record": create_record_tool
}
# Create MCP server
mcp_server = Server(name="airtable-mcp")
mcp_server.tool_handlers = tool_handlers # Set as attribute
mcp_server.tools = tools # Set tools as attribute for Deep Agent to discover
# Store write streams for each session ID (for SseServerTransport messages)
write_streams: Dict[str, anyio.streams.memory.MemoryObjectSendStream] = {}
# Store SSE stream writers for each session ID (for manual messages)
sse_stream_writers: Dict[str, anyio.streams.memory.MemoryObjectSendStream] = {}
# Initialize SseServerTransport
transport = SseServerTransport("/airtable/mcp")
# SSE endpoint for GET requests
@app.get("/airtable/mcp")
async def handle_sse(request: Request):
logger.debug("Handling SSE connection request")
session_id = None # We'll extract this later
async def sse_writer():
nonlocal session_id
logger.debug("Starting SSE writer")
async with sse_stream_writer, write_stream_reader:
# Send the initial endpoint event manually to capture the session_id
endpoint_data = "/airtable/mcp?session_id={session_id}"
await sse_stream_writer.send(
{"event": "endpoint", "data": endpoint_data}
)
logger.debug(f"Sent endpoint event: {endpoint_data}")
async for session_message in write_stream_reader:
# Handle messages from SseServerTransport
if hasattr(session_message, 'message'):
message_data = session_message.message.model_dump_json(by_alias=True, exclude_none=True)
event_data = json.loads(message_data)
logger.debug(f"Received SessionMessage from SseServerTransport: {event_data}")
else:
event_data = session_message
logger.debug(f"Received dict event from SseServerTransport: {event_data}")
# Extract session_id from the endpoint event
if not session_id and event_data.get("event") == "endpoint":
endpoint_url = event_data.get("data", "")
if "session_id=" in endpoint_url:
session_id = endpoint_url.split("session_id=")[1]
placeholder_id = f"placeholder_{id(write_stream)}"
if placeholder_id in write_streams:
write_streams[session_id] = write_streams.pop(placeholder_id)
sse_stream_writers[session_id] = sse_stream_writer
logger.debug(f"Updated placeholder {placeholder_id} to session_id {session_id}")
# Forward the event to the client
await sse_stream_writer.send({
"event": event_data.get("event", "message"),
"data": event_data.get("data", json.dumps(event_data))
})
sse_stream_writer, sse_stream_reader = anyio.create_memory_object_stream(0)
try:
async with transport.connect_sse(request.scope, request.receive, request._send) as streams:
read_stream, write_stream = streams
write_stream_reader = write_stream # Since streams are MemoryObject streams
# Store the write_stream with a placeholder ID
placeholder_id = f"placeholder_{id(write_stream)}"
write_streams[placeholder_id] = write_stream
logger.debug(f"Stored write_stream with placeholder_id: {placeholder_id}")
logger.debug("Running MCP server with streams")
await mcp_server.run(read_stream, write_stream, mcp_server.create_initialization_options())
except Exception as e:
logger.error(f"Error in handle_sse: {str(e)}")
# Clean up write_streams and sse_stream_writers on error
placeholder_id = f"placeholder_{id(write_stream)}"
write_streams.pop(placeholder_id, None)
if session_id:
write_streams.pop(session_id, None)
sse_stream_writers.pop(session_id, None)
raise
return EventSourceResponse(sse_stream_reader, data_sender_callable=sse_writer)
# Message handling endpoint for POST requests
@app.post("/airtable/mcp")
async def handle_post_message(request: Request):
logger.debug("Handling POST message request")
body = await request.body()
logger.debug(f"Received POST message body: {body}")
try:
message = json.loads(body.decode())
session_id = request.query_params.get("session_id")
# Use sse_stream_writers to send manual responses directly
sse_writer = sse_stream_writers.get(session_id) if session_id else None
write_stream = write_streams.get(session_id) if session_id else None
if message.get("method") == "initialize" and sse_writer:
logger.debug("Handling initialize request manually")
response = {
"jsonrpc": "2.0",
"id": message.get("id"),
"result": {
"protocolVersion": "2025-03-26",
"capabilities": {
"tools": {
"listChanged": True
},
"prompts": {
"listChanged": False
},
"resources": {
"subscribe": False,
"listChanged": False
},
"logging": {},
"experimental": {}
},
"serverInfo": {
"name": "airtable-mcp",
"version": "1.0.0"
},
"instructions": "Airtable MCP server for listing and creating records."
}
}
logger.debug(f"Manual initialize response: {response}")
response_data = json.dumps(response)
await sse_writer.send({
"event": "message",
"data": response_data
})
logger.debug(f"Sent initialize response directly via SSE for session {session_id}")
return Response(status_code=202)
if message.get("method") == "tools/list":
logger.debug("Handling tools/list request manually")
response = {
"jsonrpc": "2.0",
"id": message.get("id"),
"result": {
"tools": [tool.model_dump(by_alias=True, exclude_none=True) for tool in tools],
"nextCursor": None
}
}
logger.debug(f"Manual tools/list response: {response}")
response_data = json.dumps(response)
sent = False
# First, try sending directly via sse_writer
if sse_writer:
try:
await sse_writer.send({
"event": "message",
"data": response_data
})
logger.debug(f"Sent tools/list response directly via SSE for session {session_id}")
sent = True
except Exception as e:
logger.error(f"Error sending to session {session_id} via sse_writer: {str(e)}")
sse_stream_writers.pop(session_id, None)
# If not found or failed, look for a placeholder ID and update it
if not sent and write_stream:
for sid, ws in list(write_streams.items()):
if sid.startswith("placeholder_"):
try:
write_streams[session_id] = ws
sse_stream_writers[session_id] = sse_writer
write_streams.pop(sid, None)
await sse_writer.send({
"event": "message",
"data": response_data
})
logger.debug(f"Updated placeholder {sid} to session_id {session_id} and sent tools/list response")
sent = True
break
except Exception as e:
logger.error(f"Error sending to placeholder {sid}: {str(e)}")
write_streams.pop(sid, None)
sse_stream_writers.pop(session_id, None)
if not sent:
logger.warning(f"Failed to send tools/list response: no active write_streams or sse_writer found")
return Response(status_code=202)
# If neither sse_writer nor write_stream is available, log and handle gracefully
if not sse_writer and not write_stream:
logger.error(f"No sse_writer or write_stream found for session_id: {session_id}")
return Response(status_code=202)
await transport.handle_post_message(request.scope, request.receive, request._send)
logger.debug("POST message handled successfully")
except Exception as e:
logger.error(f"Error handling POST message: {str(e)}")
return Response(status_code=202)
return Response(status_code=202)
# Health check endpoint
@app.get("/health")
async def health_check():
return {"status": "healthy"}
# Endpoint to list tools (for debugging)
@app.get("/tools")
async def list_tools():
return {"tools": [tool.model_dump() for tool in tools]}
if __name__ == "__main__":
port = int(os.getenv("PORT", 7860))
uvicorn.run(app, host="0.0.0.0", port=port)