Spaces:
Sleeping
Sleeping
"""Example token verifier implementation using OAuth 2.0 Token Introspection (RFC 7662).""" | |
import logging | |
from mcp.server.auth.provider import AccessToken, TokenVerifier | |
from mcp.shared.auth_utils import check_resource_allowed, resource_url_from_server_url | |
logger = logging.getLogger(__name__) | |
class IntrospectionTokenVerifier(TokenVerifier): | |
"""Example token verifier that uses OAuth 2.0 Token Introspection (RFC 7662). | |
This is a simple example implementation for demonstration purposes. | |
Production implementations should consider: | |
- Connection pooling and reuse | |
- More sophisticated error handling | |
- Rate limiting and retry logic | |
- Comprehensive configuration options | |
""" | |
def __init__( | |
self, | |
introspection_endpoint: str, | |
server_url: str, | |
validate_resource: bool = False, | |
): | |
self.introspection_endpoint = introspection_endpoint | |
self.server_url = server_url | |
self.validate_resource = validate_resource | |
self.resource_url = resource_url_from_server_url(server_url) | |
async def verify_token(self, token: str) -> AccessToken | None: | |
"""Verify token via introspection endpoint.""" | |
import httpx | |
# Validate URL to prevent SSRF attacks | |
if not self.introspection_endpoint.startswith(("https://", "http://localhost", "http://127.0.0.1")): | |
logger.warning(f"Rejecting introspection endpoint with unsafe scheme: {self.introspection_endpoint}") | |
return None | |
# Configure secure HTTP client | |
timeout = httpx.Timeout(10.0, connect=5.0) | |
limits = httpx.Limits(max_connections=10, max_keepalive_connections=5) | |
async with httpx.AsyncClient( | |
timeout=timeout, | |
limits=limits, | |
verify=True, # Enforce SSL verification | |
) as client: | |
try: | |
response = await client.post( | |
self.introspection_endpoint, | |
data={"token": token}, | |
headers={"Content-Type": "application/x-www-form-urlencoded"}, | |
) | |
if response.status_code != 200: | |
logger.debug(f"Token introspection returned status {response.status_code}") | |
return None | |
data = response.json() | |
if not data.get("active", False): | |
return None | |
# RFC 8707 resource validation (only when --oauth-strict is set) | |
if self.validate_resource and not self._validate_resource(data): | |
logger.warning(f"Token resource validation failed. Expected: {self.resource_url}") | |
return None | |
return AccessToken( | |
token=token, | |
client_id=data.get("client_id", "unknown"), | |
scopes=data.get("scope", "").split() if data.get("scope") else [], | |
expires_at=data.get("exp"), | |
resource=data.get("aud"), # Include resource in token | |
) | |
except Exception as e: | |
logger.warning(f"Token introspection failed: {e}") | |
return None | |
def _validate_resource(self, token_data: dict) -> bool: | |
"""Validate token was issued for this resource server.""" | |
if not self.server_url or not self.resource_url: | |
return False # Fail if strict validation requested but URLs missing | |
# Check 'aud' claim first (standard JWT audience) | |
aud = token_data.get("aud") | |
if isinstance(aud, list): | |
for audience in aud: | |
if self._is_valid_resource(audience): | |
return True | |
return False | |
elif aud: | |
return self._is_valid_resource(aud) | |
# No resource binding - invalid per RFC 8707 | |
return False | |
def _is_valid_resource(self, resource: str) -> bool: | |
"""Check if resource matches this server using hierarchical matching.""" | |
if not self.resource_url: | |
return False | |
return check_resource_allowed(requested_resource=self.resource_url, configured_resource=resource) | |