# -*- coding: utf-8 -*- """ Shared utilities to validate and sanitize MCP tool input_schema in request packets. Ensures JSON Schema correctness, removes empty values, and enforces non-empty `type` and `description` for each property. Special handling for `headers`. """ from typing import Any, Dict, List def _is_empty_value(value: Any) -> bool: if value is None: return True if isinstance(value, str) and value.strip() == "": return True if isinstance(value, (list, dict)) and len(value) == 0: return True return False def _deep_clean(value: Any) -> Any: if isinstance(value, dict): cleaned: Dict[str, Any] = {} for k, v in value.items(): vv = _deep_clean(v) if _is_empty_value(vv): continue cleaned[k] = vv return cleaned if isinstance(value, list): cleaned_list = [] for item in value: ii = _deep_clean(item) if _is_empty_value(ii): continue cleaned_list.append(ii) return cleaned_list if isinstance(value, str): return value.strip() return value def _infer_type_for_property(prop_name: str) -> str: name = prop_name.lower() if name in ("url", "uri", "href", "link"): return "string" if name in ("headers", "options", "params", "payload", "data"): return "object" return "string" def _ensure_property_schema(name: str, schema: Dict[str, Any]) -> Dict[str, Any]: prop = dict(schema) if isinstance(schema, dict) else {} prop = _deep_clean(prop) # Enforce type & description if "type" not in prop or not isinstance(prop.get("type"), str) or not prop["type"].strip(): prop["type"] = _infer_type_for_property(name) if "description" not in prop or not isinstance(prop.get("description"), str) or not prop["description"].strip(): prop["description"] = f"{name} parameter" # Special handling for headers if name.lower() == "headers": prop["type"] = "object" headers_props = prop.get("properties") if not isinstance(headers_props, dict): headers_props = {} headers_props = _deep_clean(headers_props) if not headers_props: headers_props = { "user-agent": { "type": "string", "description": "User-Agent header for the request", } } else: fixed_headers: Dict[str, Any] = {} for hk, hv in headers_props.items(): sub = _deep_clean(hv if isinstance(hv, dict) else {}) if "type" not in sub or not isinstance(sub.get("type"), str) or not sub["type"].strip(): sub["type"] = "string" if "description" not in sub or not isinstance(sub.get("description"), str) or not sub["description"].strip(): sub["description"] = f"{hk} header" fixed_headers[hk] = sub headers_props = fixed_headers prop["properties"] = headers_props if isinstance(prop.get("required"), list): req = [r for r in prop["required"] if isinstance(r, str) and r in headers_props] if req: prop["required"] = req else: prop.pop("required", None) if isinstance(prop.get("additionalProperties"), dict) and len(prop["additionalProperties"]) == 0: prop.pop("additionalProperties", None) return prop def _sanitize_json_schema(schema: Dict[str, Any]) -> Dict[str, Any]: s = _deep_clean(schema if isinstance(schema, dict) else {}) # If properties exist, assume object type if "properties" in s and not isinstance(s.get("type"), str): s["type"] = "object" # Normalize $schema if "$schema" in s and not isinstance(s["$schema"], str): s.pop("$schema", None) if "$schema" not in s: s["$schema"] = "http://json-schema.org/draft-07/schema#" properties = s.get("properties") if isinstance(properties, dict): fixed_props: Dict[str, Any] = {} for name, subschema in properties.items(): fixed_props[name] = _ensure_property_schema(name, subschema if isinstance(subschema, dict) else {}) s["properties"] = fixed_props # Clean required list if isinstance(s.get("required"), list): if isinstance(properties, dict): req = [r for r in s["required"] if isinstance(r, str) and r in properties] else: req = [] if req: s["required"] = req else: s.pop("required", None) # Remove empty additionalProperties object if isinstance(s.get("additionalProperties"), dict) and len(s["additionalProperties"]) == 0: s.pop("additionalProperties", None) return s def sanitize_mcp_input_schema_in_packet(body: Dict[str, Any]) -> Dict[str, Any]: """Validate and sanitize mcp_context.tools[*].input_schema in the given packet. - Removes empty values (empty strings, lists, dicts) - Ensures each property has non-empty `type` and `description` - Special-cases `headers` to include at least `user-agent` when empty - Fixes `required` lists and general JSON Schema shape """ try: body = _deep_clean(body) candidate_roots: List[Dict[str, Any]] = [] if isinstance(body.get("json_data"), dict): candidate_roots.append(body["json_data"]) candidate_roots.append(body) for root in candidate_roots: if not isinstance(root, dict): continue mcp_ctx = root.get("mcp_context") if not isinstance(mcp_ctx, dict): continue tools = mcp_ctx.get("tools") if not isinstance(tools, list): continue fixed_tools: List[Any] = [] for tool in tools: if not isinstance(tool, dict): fixed_tools.append(tool) continue tool_copy = dict(tool) input_schema = tool_copy.get("input_schema") or tool_copy.get("inputSchema") if isinstance(input_schema, dict): tool_copy["input_schema"] = _sanitize_json_schema(input_schema) if "inputSchema" in tool_copy: tool_copy["inputSchema"] = tool_copy["input_schema"] fixed_tools.append(_deep_clean(tool_copy)) mcp_ctx["tools"] = fixed_tools return body except Exception: return body