Spaces:
Build error
Build error
# app.py | |
import os | |
import json | |
import requests | |
import gradio as gr | |
from dotenv import load_dotenv | |
from fastapi import FastAPI | |
from mcp.server import Server | |
from mcp.server.http import make_http_app | |
from mcp.types import Tool, TextContent | |
# ββ 1) CONFIG βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
load_dotenv() | |
SAP_KEY = os.getenv("SAP_API_KEY") | |
SAP_BASE = os.getenv("SAP_API_BASE_URL", "").rstrip("/") | |
if not SAP_KEY or not SAP_BASE: | |
raise RuntimeError("Set SAP_API_KEY and SAP_API_BASE_URL in your env vars") | |
# ββ 2) MCP SERVER SETUP βββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
mcp = Server("sap-mcp") | |
async def list_tools(): | |
return [ | |
# your existing generic GET tool | |
Tool( | |
name="sap_api_get", | |
description="GET any SAP endpoint under your base URL", | |
inputSchema={ | |
"type": "object", | |
"properties": { | |
"endpoint": {"type": "string"}, | |
"params": {"type": "object", "additionalProperties": {"type": "string"}} | |
}, | |
"required": ["endpoint"] | |
} | |
), | |
# OData queryβstyle tool | |
Tool( | |
name="sap_odata_query", | |
description="Generic OData query ($filter, $select, $top, β¦)", | |
inputSchema={ | |
"type":"object", | |
"properties":{ | |
"service": {"type":"string"}, | |
"select": {"type":"string"}, | |
"filter": {"type":"string"}, | |
"top": {"type":"integer"}, | |
"skip": {"type":"integer"}, | |
"orderby": {"type":"string"} | |
}, | |
"required":["service"] | |
} | |
), | |
# Business partner demo tool | |
Tool( | |
name="sap_business_partner", | |
description="Query Business Partner data", | |
inputSchema={ | |
"type":"object", | |
"properties":{ | |
"partner_id": {"type":"string"}, | |
"company_name": {"type":"string"}, | |
"limit": {"type":"integer"} | |
} | |
} | |
), | |
# Metadata tool | |
Tool( | |
name="sap_metadata", | |
description="Fetch $metadata for a given service", | |
inputSchema={ | |
"type":"object", | |
"properties":{ | |
"service": {"type":"string"} | |
} | |
} | |
), | |
# ββ NEW: Purchase Requisition tool | |
Tool( | |
name="purchase_requisition", | |
description="List or filter Purchase Requisitions (CE_PURCHASEREQUISITION_0001)", | |
inputSchema={ | |
"type":"object", | |
"properties":{ | |
"filter": {"type":"string"}, | |
"select": {"type":"string"}, | |
"top": {"type":"integer"}, | |
"skip": {"type":"integer"}, | |
"orderby": {"type":"string"} | |
} | |
} | |
), | |
# ββ NEW: Purchase Order tool | |
Tool( | |
name="purchase_order", | |
description="List or filter Purchase Orders (CE_PURCHASEORDER_0001)", | |
inputSchema={ | |
"type":"object", | |
"properties":{ | |
"filter": {"type":"string"}, | |
"select": {"type":"string"}, | |
"top": {"type":"integer"}, | |
"skip": {"type":"integer"}, | |
"orderby": {"type":"string"} | |
} | |
} | |
), | |
] | |
async def call_tool(name: str, arguments: dict): | |
""" | |
Routes the six tools above into HTTP GETs against your SAP_BASE. | |
""" | |
# helper to build OData params | |
def odata_params(args): | |
p = {} | |
for k in ("select","filter","top","skip","orderby"): | |
v = args.get(k) | |
if v is not None: | |
p[f"${k}"] = str(v) | |
p["$format"] = "json" | |
return p | |
# 1) Generic GET | |
if name == "sap_api_get": | |
url = f"{SAP_BASE}/{arguments['endpoint']}" | |
resp = requests.get(url, headers={"APIKey":SAP_KEY}, params=arguments.get("params",{})) | |
resp.raise_for_status() | |
return [TextContent(type="text", text=json.dumps(resp.json(), indent=2))] | |
# 2) Generic OData | |
if name == "sap_odata_query": | |
url = f"{SAP_BASE}/{arguments['service']}" | |
resp = requests.get( | |
url, | |
headers={"APIKey":SAP_KEY}, | |
params=odata_params(arguments) | |
) | |
resp.raise_for_status() | |
data = resp.json() | |
# unwrap OData v2/v4 | |
if isinstance(data, dict) and "d" in data: | |
data = data["d"].get("results", data["d"]) | |
return [TextContent(type="text", text=json.dumps(data, indent=2))] | |
# 3) Business Partner (mock/demo) | |
if name == "sap_business_partner": | |
# β¦your existing logicβ¦ | |
return [TextContent(type="text", text="(business partner demo)β¦")] | |
# 4) Metadata | |
if name == "sap_metadata": | |
svc = arguments.get("service") | |
url = f"{SAP_BASE}/{svc}/$metadata" if svc else f"{SAP_BASE}/$metadata" | |
resp = requests.get(url, headers={"APIKey":SAP_KEY}) | |
resp.raise_for_status() | |
return [TextContent(type="text", text=resp.text)] | |
# ββ NEW TOOL: Purchase Requisition | |
if name == "purchase_requisition": | |
url = f"{SAP_BASE}/PurchaseRequisition" | |
resp = requests.get(url, headers={"APIKey":SAP_KEY}, params=odata_params(arguments)) | |
resp.raise_for_status() | |
data = resp.json().get("d", {}).get("results", resp.json()) | |
return [TextContent(type="text", text=json.dumps(data, indent=2))] | |
# ββ NEW TOOL: Purchase Order | |
if name == "purchase_order": | |
url = f"{SAP_BASE}/PurchaseOrder" | |
resp = requests.get(url, headers={"APIKey":SAP_KEY}, params=odata_params(arguments)) | |
resp.raise_for_status() | |
data = resp.json().get("d", {}).get("results", resp.json()) | |
return [TextContent(type="text", text=json.dumps(data, indent=2))] | |
raise ValueError(f"Unknown tool: {name}") | |
# wrap MCP in FastAPI under /mcp | |
mcp_app = make_http_app(mcp, prefix="/mcp") | |
# ββ 3) GRADIO UI βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
# list all six tools in the dropdown | |
TOOLS = [ | |
"sap_api_get", | |
"sap_odata_query", | |
"sap_business_partner", | |
"sap_metadata", | |
"purchase_requisition", | |
"purchase_order", | |
] | |
def execute_tool(tool, endpoint, params_json, | |
service, select, filter_, top, skip, orderby, | |
partner_id, company_name, limit): | |
# build args exactly as the MCP server expects | |
args = {} | |
if tool == "sap_api_get": | |
args["endpoint"] = endpoint | |
try: | |
args["params"] = json.loads(params_json or "{}") | |
except: | |
return "β Invalid JSON in params" | |
elif tool == "sap_odata_query": | |
args = {k:v for k,v in { | |
"service": service, "select": select, "filter": filter_, | |
"top": top, "skip": skip, "orderby": orderby | |
}.items() if v not in (None,"")} | |
elif tool == "sap_business_partner": | |
args = {k:v for k,v in { | |
"partner_id":partner_id, "company_name":company_name, "limit":limit | |
}.items() if v not in (None,"")} | |
elif tool == "sap_metadata": | |
if service: args["service"] = service | |
elif tool in ("purchase_requisition","purchase_order"): | |
args = {k:v for k,v in { | |
"select": select, "filter": filter_, "top": top, "skip": skip, "orderby": orderby | |
}.items() if v not in (None,"")} | |
# call your MCP HTTP endpoint | |
payload = { | |
"jsonrpc":"2.0", | |
"method":"call_tool", | |
"params":{"name":tool,"arguments":args}, | |
"id":1 | |
} | |
r = requests.post("http://localhost:7860/mcp", json=payload) | |
r.raise_for_status() | |
texts = [c["text"] for c in r.json().get("result",[]) if c.get("type")=="text"] | |
return "\n\n".join(texts) or "(no result)" | |
def toggle_visibility(tool): | |
# 11 widgets: endpoint,params, service,select,filter,top,skip,orderby, partner_id,company_name,limit | |
if tool=="sap_api_get": return [True,True] + [False]*9 | |
if tool=="sap_odata_query": return [False,False,True] + [True]*5 + [False]*3 | |
if tool=="sap_business_partner": return [False]*8 + [True]*3 | |
if tool=="sap_metadata": return [False,False,True] + [False]*8 | |
if tool in ("purchase_requisition","purchase_order"): | |
return [False,False,False] + [True]*5 + [False]*3 | |
return [False]*11 | |
with gr.Blocks() as demo: | |
gr.Markdown("## π SAP MCP + CE_PR & CE_PO Demo") | |
tool = gr.Dropdown(TOOLS, label="Tool") | |
# inputs row by row | |
endpoint = gr.Textbox(label="Endpoint") | |
params_json = gr.Textbox(label="Params (JSON)") | |
service = gr.Textbox(label="Service") | |
select = gr.Textbox(label="$select") | |
filter_ = gr.Textbox(label="$filter") | |
top = gr.Number(label="$top") | |
skip = gr.Number(label="$skip") | |
orderby = gr.Textbox(label="$orderby") | |
partner_id = gr.Textbox(label="Business Partner ID") | |
company_name = gr.Textbox(label="Company Name") | |
limit = gr.Number(label="Limit") | |
run = gr.Button("π Execute") | |
output = gr.Textbox(label="Result", lines=12) | |
tool.change(toggle_visibility, tool, | |
[endpoint,params_json,service,select,filter_,top,skip,orderby, | |
partner_id,company_name,limit]) | |
run.click(execute_tool, | |
[tool,endpoint,params_json,service,select,filter_,top,skip,orderby, | |
partner_id,company_name,limit], | |
output) | |
# mount MCP under the same FastAPI app | |
demo.app.mount("/mcp", mcp_app) | |
# start on HF Spaces port | |
if __name__=="__main__": | |
demo.launch(server_name="0.0.0.0", | |
server_port=int(os.environ.get("PORT",7860))) |