rs / token_verifier.py
apple muncy
Signed-off-by: apple muncy <apple@llama-3.local>
95fad9c
"""Example token verifier implementation using OAuth 2.0 Token Introspection (RFC 7662)."""
import logging
from mcp.server.auth.provider import AccessToken, TokenVerifier
from mcp.shared.auth_utils import check_resource_allowed, resource_url_from_server_url
logger = logging.getLogger(__name__)
class IntrospectionTokenVerifier(TokenVerifier):
"""Example token verifier that uses OAuth 2.0 Token Introspection (RFC 7662).
This is a simple example implementation for demonstration purposes.
Production implementations should consider:
- Connection pooling and reuse
- More sophisticated error handling
- Rate limiting and retry logic
- Comprehensive configuration options
"""
def __init__(
self,
introspection_endpoint: str,
server_url: str,
validate_resource: bool = False,
):
self.introspection_endpoint = introspection_endpoint
self.server_url = server_url
self.validate_resource = validate_resource
self.resource_url = resource_url_from_server_url(server_url)
async def verify_token(self, token: str) -> AccessToken | None:
"""Verify token via introspection endpoint."""
import httpx
# Validate URL to prevent SSRF attacks
if not self.introspection_endpoint.startswith(("https://", "http://localhost", "http://127.0.0.1")):
logger.warning(f"Rejecting introspection endpoint with unsafe scheme: {self.introspection_endpoint}")
return None
# Configure secure HTTP client
timeout = httpx.Timeout(10.0, connect=5.0)
limits = httpx.Limits(max_connections=10, max_keepalive_connections=5)
async with httpx.AsyncClient(
timeout=timeout,
limits=limits,
verify=True, # Enforce SSL verification
) as client:
try:
response = await client.post(
self.introspection_endpoint,
data={"token": token},
headers={"Content-Type": "application/x-www-form-urlencoded"},
)
if response.status_code != 200:
logger.debug(f"Token introspection returned status {response.status_code}")
return None
data = response.json()
if not data.get("active", False):
return None
# RFC 8707 resource validation (only when --oauth-strict is set)
if self.validate_resource and not self._validate_resource(data):
logger.warning(f"Token resource validation failed. Expected: {self.resource_url}")
return None
return AccessToken(
token=token,
client_id=data.get("client_id", "unknown"),
scopes=data.get("scope", "").split() if data.get("scope") else [],
expires_at=data.get("exp"),
resource=data.get("aud"), # Include resource in token
)
except Exception as e:
logger.warning(f"Token introspection failed: {e}")
return None
def _validate_resource(self, token_data: dict) -> bool:
"""Validate token was issued for this resource server."""
if not self.server_url or not self.resource_url:
return False # Fail if strict validation requested but URLs missing
# Check 'aud' claim first (standard JWT audience)
aud = token_data.get("aud")
if isinstance(aud, list):
for audience in aud:
if self._is_valid_resource(audience):
return True
return False
elif aud:
return self._is_valid_resource(aud)
# No resource binding - invalid per RFC 8707
return False
def _is_valid_resource(self, resource: str) -> bool:
"""Check if resource matches this server using hierarchical matching."""
if not self.resource_url:
return False
return check_resource_allowed(requested_resource=self.resource_url, configured_resource=resource)