Spaces:
Runtime error
Runtime error
| import os | |
| import json | |
| import httpx | |
| from fastapi import FastAPI, Request, HTTPException | |
| from mcp.server import Server | |
| from mcp.server.sse import sse_server | |
| from mcp.types import Tool, TextContent | |
| # Create MCP Server | |
| server = Server("sap-mcp-proxy") | |
| # SAP function | |
| async def fetch_supplier_invoices(): | |
| api_key = os.getenv("API_KEY") | |
| if not api_key: | |
| return {"error": "API_KEY not configured"} | |
| url = "https://sandbox.api.sap.com/s4hanacloud/sap/opu/odata/sap/API_SUPPLIERINVOICE_PROCESS_SRV/A_BR_SupplierInvoiceNFDocument?$top=50&$inlinecount=allpages" | |
| headers = {"APIKey": api_key, "Accept": "application/json"} | |
| async with httpx.AsyncClient() as client: | |
| resp = await client.get(url, headers=headers) | |
| return resp.json() if resp.status_code == 200 else {"error": resp.status_code} | |
| # Register tools | |
| async def list_tools() -> list[Tool]: | |
| return [Tool(name="get_supplier_invoices", description="Fetch supplier invoices from SAP", inputSchema={"type": "object", "properties": {}})] | |
| async def call_tool(name: str, arguments: dict) -> list[TextContent]: | |
| if name == "get_supplier_invoices": | |
| result = await fetch_supplier_invoices() | |
| return [TextContent(type="text", text=json.dumps(result))] | |
| raise ValueError(f"Unknown tool: {name}") | |
| # Create app using MCP's built-in SSE server helper | |
| app = sse_server(server) | |
| # Add auth middleware | |
| API_GATEWAY_KEY = os.getenv("API_GATEWAY_KEY") | |
| async def check_api_key(request: Request, call_next): | |
| if request.url.path in ["/", "/health"]: | |
| return await call_next(request) | |
| token = request.headers.get("X-API-Key") | |
| if not token or token != API_GATEWAY_KEY: | |
| raise HTTPException(status_code=401, detail="Unauthorized") | |
| return await call_next(request) | |