Spaces:
Running
Running
# -*- coding: utf-8 -*- | |
""" | |
Shared utilities to validate and sanitize MCP tool input_schema in request packets. | |
Ensures JSON Schema correctness, removes empty values, and enforces non-empty | |
`type` and `description` for each property. Special handling for `headers`. | |
""" | |
from typing import Any, Dict, List | |
def _is_empty_value(value: Any) -> bool: | |
if value is None: | |
return True | |
if isinstance(value, str) and value.strip() == "": | |
return True | |
if isinstance(value, (list, dict)) and len(value) == 0: | |
return True | |
return False | |
def _deep_clean(value: Any) -> Any: | |
if isinstance(value, dict): | |
cleaned: Dict[str, Any] = {} | |
for k, v in value.items(): | |
vv = _deep_clean(v) | |
if _is_empty_value(vv): | |
continue | |
cleaned[k] = vv | |
return cleaned | |
if isinstance(value, list): | |
cleaned_list = [] | |
for item in value: | |
ii = _deep_clean(item) | |
if _is_empty_value(ii): | |
continue | |
cleaned_list.append(ii) | |
return cleaned_list | |
if isinstance(value, str): | |
return value.strip() | |
return value | |
def _infer_type_for_property(prop_name: str) -> str: | |
name = prop_name.lower() | |
if name in ("url", "uri", "href", "link"): | |
return "string" | |
if name in ("headers", "options", "params", "payload", "data"): | |
return "object" | |
return "string" | |
def _ensure_property_schema(name: str, schema: Dict[str, Any]) -> Dict[str, Any]: | |
prop = dict(schema) if isinstance(schema, dict) else {} | |
prop = _deep_clean(prop) | |
# Enforce type & description | |
if "type" not in prop or not isinstance(prop.get("type"), str) or not prop["type"].strip(): | |
prop["type"] = _infer_type_for_property(name) | |
if "description" not in prop or not isinstance(prop.get("description"), str) or not prop["description"].strip(): | |
prop["description"] = f"{name} parameter" | |
# Special handling for headers | |
if name.lower() == "headers": | |
prop["type"] = "object" | |
headers_props = prop.get("properties") | |
if not isinstance(headers_props, dict): | |
headers_props = {} | |
headers_props = _deep_clean(headers_props) | |
if not headers_props: | |
headers_props = { | |
"user-agent": { | |
"type": "string", | |
"description": "User-Agent header for the request", | |
} | |
} | |
else: | |
fixed_headers: Dict[str, Any] = {} | |
for hk, hv in headers_props.items(): | |
sub = _deep_clean(hv if isinstance(hv, dict) else {}) | |
if "type" not in sub or not isinstance(sub.get("type"), str) or not sub["type"].strip(): | |
sub["type"] = "string" | |
if "description" not in sub or not isinstance(sub.get("description"), str) or not sub["description"].strip(): | |
sub["description"] = f"{hk} header" | |
fixed_headers[hk] = sub | |
headers_props = fixed_headers | |
prop["properties"] = headers_props | |
if isinstance(prop.get("required"), list): | |
req = [r for r in prop["required"] if isinstance(r, str) and r in headers_props] | |
if req: | |
prop["required"] = req | |
else: | |
prop.pop("required", None) | |
if isinstance(prop.get("additionalProperties"), dict) and len(prop["additionalProperties"]) == 0: | |
prop.pop("additionalProperties", None) | |
return prop | |
def _sanitize_json_schema(schema: Dict[str, Any]) -> Dict[str, Any]: | |
s = _deep_clean(schema if isinstance(schema, dict) else {}) | |
# If properties exist, assume object type | |
if "properties" in s and not isinstance(s.get("type"), str): | |
s["type"] = "object" | |
# Normalize $schema | |
if "$schema" in s and not isinstance(s["$schema"], str): | |
s.pop("$schema", None) | |
if "$schema" not in s: | |
s["$schema"] = "http://json-schema.org/draft-07/schema#" | |
properties = s.get("properties") | |
if isinstance(properties, dict): | |
fixed_props: Dict[str, Any] = {} | |
for name, subschema in properties.items(): | |
fixed_props[name] = _ensure_property_schema(name, subschema if isinstance(subschema, dict) else {}) | |
s["properties"] = fixed_props | |
# Clean required list | |
if isinstance(s.get("required"), list): | |
if isinstance(properties, dict): | |
req = [r for r in s["required"] if isinstance(r, str) and r in properties] | |
else: | |
req = [] | |
if req: | |
s["required"] = req | |
else: | |
s.pop("required", None) | |
# Remove empty additionalProperties object | |
if isinstance(s.get("additionalProperties"), dict) and len(s["additionalProperties"]) == 0: | |
s.pop("additionalProperties", None) | |
return s | |
def sanitize_mcp_input_schema_in_packet(body: Dict[str, Any]) -> Dict[str, Any]: | |
"""Validate and sanitize mcp_context.tools[*].input_schema in the given packet. | |
- Removes empty values (empty strings, lists, dicts) | |
- Ensures each property has non-empty `type` and `description` | |
- Special-cases `headers` to include at least `user-agent` when empty | |
- Fixes `required` lists and general JSON Schema shape | |
""" | |
try: | |
body = _deep_clean(body) | |
candidate_roots: List[Dict[str, Any]] = [] | |
if isinstance(body.get("json_data"), dict): | |
candidate_roots.append(body["json_data"]) | |
candidate_roots.append(body) | |
for root in candidate_roots: | |
if not isinstance(root, dict): | |
continue | |
mcp_ctx = root.get("mcp_context") | |
if not isinstance(mcp_ctx, dict): | |
continue | |
tools = mcp_ctx.get("tools") | |
if not isinstance(tools, list): | |
continue | |
fixed_tools: List[Any] = [] | |
for tool in tools: | |
if not isinstance(tool, dict): | |
fixed_tools.append(tool) | |
continue | |
tool_copy = dict(tool) | |
input_schema = tool_copy.get("input_schema") or tool_copy.get("inputSchema") | |
if isinstance(input_schema, dict): | |
tool_copy["input_schema"] = _sanitize_json_schema(input_schema) | |
if "inputSchema" in tool_copy: | |
tool_copy["inputSchema"] = tool_copy["input_schema"] | |
fixed_tools.append(_deep_clean(tool_copy)) | |
mcp_ctx["tools"] = fixed_tools | |
return body | |
except Exception: | |
return body |