import os import json import logging import anyio import asyncio from fastapi import FastAPI, Request from fastapi.responses import Response from sse_starlette import EventSourceResponse from mcp.server.lowlevel import Server # Set up logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) app = FastAPI() # Define the MCP server server = Server(name="airtable-mcp") # Store write streams for each session ID write_streams = {} # Configure environment variables (for logging purposes) token = os.getenv("AIRTABLE_API_TOKEN") base_id = os.getenv("AIRTABLE_BASE_ID") logger.info(f"Using Airtable token: {token}") logger.info(f"Using Airtable base ID: {base_id}") @app.get("/airtable/mcp") async def handle_sse(request: Request): try: session_id = None async def sse_writer(): nonlocal session_id async with sse_stream_writer, read_stream_reader: endpoint_data = f"/airtable/mcp?session_id={{session_id}}" await sse_stream_writer.send({"event": "endpoint", "data": endpoint_data}) async for message in read_stream_reader: message_data = json.loads(message) if isinstance(message, str) else message if message_data.get("event") == "endpoint": endpoint_url = message_data.get("data", "") if "session_id=" in endpoint_url: session_id = endpoint_url.split("session_id=")[1] placeholder_id = f"placeholder_{id(write_stream)}" if placeholder_id in write_streams: write_streams[session_id] = write_streams.pop(placeholder_id) logger.info(f"Updated placeholder {placeholder_id} to session_id {session_id}") await sse_stream_writer.send(message_data) # Keep-alive loop to maintain the SSE connection while True: await sse_stream_writer.send({"event": "ping", "data": "keep-alive"}) await asyncio.sleep(15) # Send keep-alive every 15 seconds # Create separate send and receive streams for reading and writing read_stream_writer, read_stream_reader = anyio.create_memory_object_stream(0) write_stream_writer, write_stream_reader = anyio.create_memory_object_stream(0) placeholder_id = f"placeholder_{id(write_stream_writer)}" write_streams[placeholder_id] = write_stream_writer logger.info("Starting MCP server with streams") await server.run(read_stream_reader, write_stream_writer, server.create_initialization_options()) logger.info("MCP server running") return EventSourceResponse(read_stream_writer, data_sender_callable=sse_writer) except Exception as e: logger.error(f"Error in handle_sse: {str(e)}") raise @app.post("/airtable/mcp") async def handle_post(request: Request): try: body = await request.body() message = json.loads(body.decode()) session_id = request.query_params.get("session_id") logger.info(f"Received POST with session_id: {session_id}, message: {message}") # Try to find the write_stream write_stream = write_streams.get(session_id) if not write_stream: for sid, ws in list(write_streams.items()): if sid.startswith("placeholder_"): write_streams[session_id] = ws write_streams.pop(sid) write_stream = write_streams[session_id] logger.info(f"Associated placeholder {sid} with session_id {session_id}") break if message.get("method") == "initialize" and write_stream: response = { "jsonrpc": "2.0", "id": message.get("id"), "result": { "protocolVersion": "2025-03-26", "capabilities": { "tools": {"listChanged": True}, "prompts": {"listChanged": False}, "resources": {"subscribe": False, "listChanged": False}, "logging": {}, "experimental": {} }, "serverInfo": { "name": "airtable-mcp", "version": "1.0.0" }, "instructions": "Airtable MCP server for listing and creating records." } } response_data = json.dumps(response) await write_stream.send({"event": "message", "data": response_data}) logger.info(f"Sent initialize response for session {session_id} via write_stream") return Response(status_code=202) if message.get("method") == "tools/list" and write_stream: response = { "jsonrpc": "2.0", "id": message.get("id"), "result": { "tools": [ {"name": "list_airtable_records", "description": "Lists all records in the specified Airtable table", "inputSchema": {}}, {"name": "create_airtable_record", "description": "Creates a new record in the specified Airtable table", "inputSchema": {"record_data": {"type": "object"}}} ], "nextCursor": None } } response_data = json.dumps(response) try: await write_stream.send({"event": "message", "data": response_data}) logger.info(f"Successfully sent tools/list response for session {session_id} via write_stream") except Exception as e: logger.error(f"Failed to send response for session {session_id} via write_stream: {str(e)}") return Response(status_code=202) return Response(status_code=202) if not write_stream: logger.error(f"No write_stream found for session_id: {session_id}") return Response(status_code=202) return Response(status_code=202) except Exception as e: logger.error(f"Error handling POST message: {str(e)}") return Response(status_code=202) if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=7860)