File size: 12,986 Bytes
2245de1
 
 
5b6d070
 
dcfe138
2245de1
5b6d070
2245de1
5e33bad
5b6d070
 
 
15914ab
bda84dd
15914ab
 
 
 
5e33bad
 
 
dcfe138
 
 
 
 
45eef8e
dcfe138
 
 
2245de1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
15914ab
2245de1
 
15914ab
2245de1
 
 
15914ab
 
2245de1
15914ab
2245de1
 
 
15914ab
 
2245de1
 
 
15914ab
2245de1
 
 
15914ab
 
2245de1
15914ab
2245de1
15914ab
 
2245de1
15914ab
2245de1
 
 
15914ab
 
2245de1
107bdd1
 
 
 
 
 
 
 
 
 
 
 
 
 
ecbb371
 
 
 
 
 
 
15914ab
ecbb371
15914ab
ecbb371
d9de825
bda84dd
2245de1
d9de825
 
 
45eef8e
 
 
5b6d070
 
 
15914ab
6926355
5b6d070
d0b7a26
15914ab
5b6d070
cad7d6a
15914ab
5b6d070
15914ab
5b6d070
15914ab
f9046d5
d9de825
62935c3
cad7d6a
 
d9de825
62935c3
 
d9de825
d0b7a26
cad7d6a
 
dbcccf3
 
6926355
 
 
d9de825
6926355
d9de825
cad7d6a
 
 
 
2245de1
5b6d070
bda84dd
 
 
 
6926355
 
f9046d5
 
bda84dd
 
 
 
d9de825
6926355
 
 
 
d9de825
bda84dd
5b6d070
 
 
 
 
15914ab
 
 
dcfe138
 
bda84dd
d9de825
 
bda84dd
d9de825
6a9c2dc
 
 
 
 
 
 
 
 
45eef8e
 
 
 
 
 
 
 
 
 
6a9c2dc
 
 
 
 
 
 
 
 
f1bc20d
d9de825
f1bc20d
 
 
d9de825
6a9c2dc
efb6a3d
dcfe138
f9046d5
 
 
 
 
 
dcfe138
f9046d5
 
 
 
d9de825
 
f9046d5
d9de825
6926355
 
 
d9de825
6926355
f9046d5
d9de825
 
 
 
6926355
 
 
 
d9de825
6926355
d9de825
6926355
 
 
 
 
 
 
 
 
d9de825
f9046d5
d9de825
f9046d5
d9de825
 
 
dbcccf3
dcfe138
 
 
 
dbcccf3
5b6d070
2245de1
 
5e33bad
 
 
 
15914ab
 
 
 
 
5e33bad
61df420
5e33bad
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
import os
import json
import requests
from fastapi import FastAPI, Request
from fastapi.responses import Response
from fastapi.middleware.cors import CORSMiddleware
from mcp.server.lowlevel import Server, NotificationOptions
from mcp.server.sse import SseServerTransport
from mcp import types as mcp_types
import uvicorn
from sse_starlette import EventSourceResponse
import anyio
import asyncio
import logging
from typing import Dict

# Set up logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)

app = FastAPI()

# Add CORS middleware to allow Deep Agent to connect
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],  # Adjust for production
    allow_credentials=True,
    allow_methods=["GET", "POST", "OPTIONS"],
    allow_headers=["*"],
)

# Load environment variables
AIRTABLE_API_TOKEN = os.getenv("AIRTABLE_API_TOKEN")
AIRTABLE_BASE_ID = os.getenv("AIRTABLE_BASE_ID")
TABLE_ID = "tblQECi5f7m4y2NEV"
AIRTABLE_API_URL = f"https://api.airtable.com/v0/{AIRTABLE_BASE_ID}/{TABLE_ID}"

# Helper function for Airtable API requests
def airtable_request(method, endpoint="", data=None):
    headers = {
        "Authorization": f"Bearer {AIRTABLE_API_TOKEN}",
        "Content-Type": "application/json"
    }
    url = f"{AIRTABLE_API_URL}/{endpoint}" if endpoint else AIRTABLE_API_URL
    response = requests.request(method, url, headers=headers, json=data)
    response.raise_for_status()
    return response.json()

# Tool to list records
async def list_records_tool(request: mcp_types.CallToolRequest):
    logger.debug(f"Received list_records_tool request: {request}")
    try:
        records = airtable_request("GET")
        response = {
            "success": True,
            "result": json.dumps(records)
        }
        logger.debug(f"list_records_tool response: {response}")
        return response
    except Exception as e:
        response = {
            "success": False,
            "error": str(e)
        }
        logger.error(f"list_records_tool error: {response}")
        return response

# Tool to create a record
async def create_record_tool(request: mcp_types.CallToolRequest):
    logger.debug(f"Received create_record_tool request: {request}")
    try:
        record_data = request.input.get("record_data", {})
        data = {"records": [{"fields": record_data}]}
        response_data = airtable_request("POST", data=data)
        response = {
            "success": True,
            "result": json.dumps(response_data)
        }
        logger.debug(f"create_record_tool response: {response}")
        return response
    except Exception as e:
        response = {
            "success": False,
            "error": str(e)
        }
        logger.error(f"create_record_tool error: {response}")
        return response

# Define tools separately (for Deep Agent to discover them)
tools = [
    mcp_types.Tool(
        name="list_airtable_records",
        description="Lists all records in the specified Airtable table",
        inputSchema={}
    ),
    mcp_types.Tool(
        name="create_airtable_record",
        description="Creates a new record in the specified Airtable table",
        inputSchema={"record_data": {"type": "object"}}
    )
]

# Define tool handlers
tool_handlers = {
    "list_airtable_records": list_records_tool,
    "create_airtable_record": create_record_tool
}

# Create MCP server
mcp_server = Server(name="airtable-mcp")
mcp_server.tool_handlers = tool_handlers  # Set as attribute
mcp_server.tools = tools  # Set tools as attribute for Deep Agent to discover

# Store write streams for each session ID (for SseServerTransport messages)
write_streams: Dict[str, anyio.streams.memory.MemoryObjectSendStream] = {}

# Store SSE stream writers for each session ID (for manual messages)
sse_stream_writers: Dict[str, anyio.streams.memory.MemoryObjectSendStream] = {}

# Initialize SseServerTransport
transport = SseServerTransport("/airtable/mcp")

# SSE endpoint for GET requests
@app.get("/airtable/mcp")
async def handle_sse(request: Request):
    logger.debug("Handling SSE connection request")
    session_id = None  # We'll extract this later
    async def sse_writer():
        nonlocal session_id
        logger.debug("Starting SSE writer")
        async with sse_stream_writer, write_stream_reader:
            # Send the initial endpoint event manually to capture the session_id
            endpoint_data = "/airtable/mcp?session_id={session_id}"
            await sse_stream_writer.send(
                {"event": "endpoint", "data": endpoint_data}
            )
            logger.debug(f"Sent endpoint event: {endpoint_data}")
            async for session_message in write_stream_reader:
                # Handle messages from SseServerTransport
                if hasattr(session_message, 'message'):
                    message_data = session_message.message.model_dump_json(by_alias=True, exclude_none=True)
                    event_data = json.loads(message_data)
                    logger.debug(f"Received SessionMessage from SseServerTransport: {event_data}")
                else:
                    event_data = session_message
                    logger.debug(f"Received dict event from SseServerTransport: {event_data}")
                # Extract session_id from the endpoint event
                if not session_id and event_data.get("event") == "endpoint":
                    endpoint_url = event_data.get("data", "")
                    if "session_id=" in endpoint_url:
                        session_id = endpoint_url.split("session_id=")[1]
                        placeholder_id = f"placeholder_{id(write_stream)}"
                        if placeholder_id in write_streams:
                            write_streams[session_id] = write_streams.pop(placeholder_id)
                            sse_stream_writers[session_id] = sse_stream_writer
                            logger.debug(f"Updated placeholder {placeholder_id} to session_id {session_id}")
                # Forward the event to the client
                await sse_stream_writer.send({
                    "event": event_data.get("event", "message"),
                    "data": event_data.get("data", json.dumps(event_data))
                })

    sse_stream_writer, sse_stream_reader = anyio.create_memory_object_stream(0)
    try:
        async with transport.connect_sse(request.scope, request.receive, request._send) as streams:
            read_stream, write_stream = streams
            write_stream_reader = write_stream  # Since streams are MemoryObject streams
            # Store the write_stream with a placeholder ID
            placeholder_id = f"placeholder_{id(write_stream)}"
            write_streams[placeholder_id] = write_stream
            logger.debug(f"Stored write_stream with placeholder_id: {placeholder_id}")
            logger.debug("Running MCP server with streams")
            await mcp_server.run(read_stream, write_stream, mcp_server.create_initialization_options())
    except Exception as e:
        logger.error(f"Error in handle_sse: {str(e)}")
        # Clean up write_streams and sse_stream_writers on error
        placeholder_id = f"placeholder_{id(write_stream)}"
        write_streams.pop(placeholder_id, None)
        if session_id:
            write_streams.pop(session_id, None)
            sse_stream_writers.pop(session_id, None)
        raise
    return EventSourceResponse(sse_stream_reader, data_sender_callable=sse_writer)

# Message handling endpoint for POST requests
@app.post("/airtable/mcp")
async def handle_post_message(request: Request):
    logger.debug("Handling POST message request")
    body = await request.body()
    logger.debug(f"Received POST message body: {body}")
    try:
        message = json.loads(body.decode())
        session_id = request.query_params.get("session_id")
        # Use sse_stream_writers to send manual responses directly
        sse_writer = sse_stream_writers.get(session_id) if session_id else None
        write_stream = write_streams.get(session_id) if session_id else None
        if message.get("method") == "initialize" and sse_writer:
            logger.debug("Handling initialize request manually")
            response = {
                "jsonrpc": "2.0",
                "id": message.get("id"),
                "result": {
                    "protocolVersion": "2025-03-26",
                    "capabilities": {
                        "tools": {
                            "listChanged": True
                        },
                        "prompts": {
                            "listChanged": False
                        },
                        "resources": {
                            "subscribe": False,
                            "listChanged": False
                        },
                        "logging": {},
                        "experimental": {}
                    },
                    "serverInfo": {
                        "name": "airtable-mcp",
                        "version": "1.0.0"
                    },
                    "instructions": "Airtable MCP server for listing and creating records."
                }
            }
            logger.debug(f"Manual initialize response: {response}")
            response_data = json.dumps(response)
            await sse_writer.send({
                "event": "message",
                "data": response_data
            })
            logger.debug(f"Sent initialize response directly via SSE for session {session_id}")
            return Response(status_code=202)
        if message.get("method") == "tools/list":
            logger.debug("Handling tools/list request manually")
            response = {
                "jsonrpc": "2.0",
                "id": message.get("id"),
                "result": {
                    "tools": [tool.model_dump(by_alias=True, exclude_none=True) for tool in tools],
                    "nextCursor": None
                }
            }
            logger.debug(f"Manual tools/list response: {response}")
            response_data = json.dumps(response)
            sent = False
            # First, try sending directly via sse_writer
            if sse_writer:
                try:
                    await sse_writer.send({
                        "event": "message",
                        "data": response_data
                    })
                    logger.debug(f"Sent tools/list response directly via SSE for session {session_id}")
                    sent = True
                except Exception as e:
                    logger.error(f"Error sending to session {session_id} via sse_writer: {str(e)}")
                    sse_stream_writers.pop(session_id, None)
            # If not found or failed, look for a placeholder ID and update it
            if not sent and write_stream:
                for sid, ws in list(write_streams.items()):
                    if sid.startswith("placeholder_"):
                        try:
                            write_streams[session_id] = ws
                            sse_stream_writers[session_id] = sse_writer
                            write_streams.pop(sid, None)
                            await sse_writer.send({
                                "event": "message",
                                "data": response_data
                            })
                            logger.debug(f"Updated placeholder {sid} to session_id {session_id} and sent tools/list response")
                            sent = True
                            break
                        except Exception as e:
                            logger.error(f"Error sending to placeholder {sid}: {str(e)}")
                            write_streams.pop(sid, None)
                            sse_stream_writers.pop(session_id, None)
            if not sent:
                logger.warning(f"Failed to send tools/list response: no active write_streams or sse_writer found")
            return Response(status_code=202)
        # If neither sse_writer nor write_stream is available, log and handle gracefully
        if not sse_writer and not write_stream:
            logger.error(f"No sse_writer or write_stream found for session_id: {session_id}")
            return Response(status_code=202)
        await transport.handle_post_message(request.scope, request.receive, request._send)
        logger.debug("POST message handled successfully")
    except Exception as e:
        logger.error(f"Error handling POST message: {str(e)}")
        return Response(status_code=202)
    return Response(status_code=202)

# Health check endpoint
@app.get("/health")
async def health_check():
    return {"status": "healthy"}

# Endpoint to list tools (for debugging)
@app.get("/tools")
async def list_tools():
    return {"tools": [tool.model_dump() for tool in tools]}

if __name__ == "__main__":
    port = int(os.getenv("PORT", 7860))
    uvicorn.run(app, host="0.0.0.0", port=port)