"""Example token verifier implementation using OAuth 2.0 Token Introspection (RFC 7662).""" import logging from mcp.server.auth.provider import AccessToken, TokenVerifier from mcp.shared.auth_utils import check_resource_allowed, resource_url_from_server_url logger = logging.getLogger(__name__) class IntrospectionTokenVerifier(TokenVerifier): """Example token verifier that uses OAuth 2.0 Token Introspection (RFC 7662). This is a simple example implementation for demonstration purposes. Production implementations should consider: - Connection pooling and reuse - More sophisticated error handling - Rate limiting and retry logic - Comprehensive configuration options """ def __init__( self, introspection_endpoint: str, server_url: str, validate_resource: bool = False, ): self.introspection_endpoint = introspection_endpoint self.server_url = server_url self.validate_resource = validate_resource self.resource_url = resource_url_from_server_url(server_url) async def verify_token(self, token: str) -> AccessToken | None: """Verify token via introspection endpoint.""" import httpx # Validate URL to prevent SSRF attacks if not self.introspection_endpoint.startswith(("https://", "http://localhost", "http://127.0.0.1")): logger.warning(f"Rejecting introspection endpoint with unsafe scheme: {self.introspection_endpoint}") return None # Configure secure HTTP client timeout = httpx.Timeout(10.0, connect=5.0) limits = httpx.Limits(max_connections=10, max_keepalive_connections=5) async with httpx.AsyncClient( timeout=timeout, limits=limits, verify=True, # Enforce SSL verification ) as client: try: response = await client.post( self.introspection_endpoint, data={"token": token}, headers={"Content-Type": "application/x-www-form-urlencoded"}, ) if response.status_code != 200: logger.debug(f"Token introspection returned status {response.status_code}") return None data = response.json() if not data.get("active", False): return None # RFC 8707 resource validation (only when --oauth-strict is set) if self.validate_resource and not self._validate_resource(data): logger.warning(f"Token resource validation failed. Expected: {self.resource_url}") return None return AccessToken( token=token, client_id=data.get("client_id", "unknown"), scopes=data.get("scope", "").split() if data.get("scope") else [], expires_at=data.get("exp"), resource=data.get("aud"), # Include resource in token ) except Exception as e: logger.warning(f"Token introspection failed: {e}") return None def _validate_resource(self, token_data: dict) -> bool: """Validate token was issued for this resource server.""" if not self.server_url or not self.resource_url: return False # Fail if strict validation requested but URLs missing # Check 'aud' claim first (standard JWT audience) aud = token_data.get("aud") if isinstance(aud, list): for audience in aud: if self._is_valid_resource(audience): return True return False elif aud: return self._is_valid_resource(aud) # No resource binding - invalid per RFC 8707 return False def _is_valid_resource(self, resource: str) -> bool: """Check if resource matches this server using hierarchical matching.""" if not self.resource_url: return False return check_resource_allowed(requested_resource=self.resource_url, configured_resource=resource)