MCP_indicators / src /api_client.py
Qdonnars's picture
feat: Implement MCP Server for Indicateurs Territoriaux API
bad6218
"""HTTP client for the Cube.js API of Indicateurs Territoriaux."""
import os
from typing import Any
import httpx
from dotenv import load_dotenv
load_dotenv()
class CubeJsClientError(Exception):
"""Base exception for Cube.js client errors."""
pass
class AuthenticationError(CubeJsClientError):
"""Raised when authentication fails (401)."""
pass
class BadRequestError(CubeJsClientError):
"""Raised when the request is malformed (400)."""
pass
class CubeJsClient:
"""HTTP client for the Cube.js REST API.
This client handles authentication and provides methods to interact
with the Indicateurs Territoriaux API endpoints.
"""
def __init__(
self,
base_url: str | None = None,
token: str | None = None,
timeout: float = 30.0,
):
"""Initialize the Cube.js client.
Args:
base_url: Base URL of the API. Defaults to env var INDICATEURS_TE_BASE_URL.
token: JWT authentication token. Defaults to env var INDICATEURS_TE_TOKEN.
timeout: Request timeout in seconds.
"""
self.base_url = (
base_url
or os.getenv("INDICATEURS_TE_BASE_URL")
or "https://api.indicateurs.ecologie.gouv.fr"
)
self.token = token or os.getenv("INDICATEURS_TE_TOKEN")
if not self.token:
raise ValueError(
"No API token provided. Set INDICATEURS_TE_TOKEN environment variable "
"or pass token parameter."
)
self.timeout = timeout
self._client: httpx.AsyncClient | None = None
@property
def headers(self) -> dict[str, str]:
"""HTTP headers for API requests."""
return {
"Authorization": f"Bearer {self.token}",
"Content-Type": "application/json",
}
async def _get_client(self) -> httpx.AsyncClient:
"""Get or create the async HTTP client."""
if self._client is None or self._client.is_closed:
self._client = httpx.AsyncClient(
base_url=self.base_url,
headers=self.headers,
timeout=self.timeout,
)
return self._client
async def close(self) -> None:
"""Close the HTTP client."""
if self._client is not None and not self._client.is_closed:
await self._client.aclose()
self._client = None
async def _handle_response(self, response: httpx.Response) -> dict[str, Any]:
"""Handle API response and raise appropriate errors.
Args:
response: The HTTP response object.
Returns:
Parsed JSON response.
Raises:
AuthenticationError: If the token is invalid or expired (401).
BadRequestError: If the request is malformed (400).
CubeJsClientError: For other HTTP errors.
"""
if response.status_code == 401:
raise AuthenticationError(
"Authentication failed. Your API token may be invalid or expired. "
"Please check your INDICATEURS_TE_TOKEN environment variable."
)
if response.status_code == 400:
try:
error_detail = response.json()
except Exception:
error_detail = response.text
raise BadRequestError(
f"Bad request to API. Details: {error_detail}"
)
if response.status_code >= 400:
raise CubeJsClientError(
f"API request failed with status {response.status_code}: {response.text}"
)
return response.json()
async def get_meta(self) -> dict[str, Any]:
"""Fetch the API schema metadata.
Returns the complete schema including all cubes, their measures,
dimensions, and available filters.
Returns:
Dict containing the API metadata with 'cubes' key.
Raises:
AuthenticationError: If authentication fails.
CubeJsClientError: For other API errors.
"""
client = await self._get_client()
response = await client.get("/cubejs-api/v1/meta")
return await self._handle_response(response)
async def load(self, query: dict[str, Any]) -> dict[str, Any]:
"""Execute a data query against the Cube.js API.
Args:
query: The Cube.js query object containing measures, dimensions,
filters, and other query parameters.
Returns:
Dict containing the query results with 'data' key.
Raises:
AuthenticationError: If authentication fails.
BadRequestError: If the query is malformed.
CubeJsClientError: For other API errors.
Example:
>>> query = {
... "measures": ["indicateur_metadata.count"],
... "dimensions": ["indicateur_metadata.id", "indicateur_metadata.libelle"],
... "limit": 10
... }
>>> result = await client.load(query)
"""
client = await self._get_client()
response = await client.post(
"/cubejs-api/v1/load",
json={"query": query},
)
return await self._handle_response(response)
async def load_indicators_metadata(
self,
dimensions: list[str] | None = None,
filters: list[dict[str, Any]] | None = None,
limit: int = 500,
) -> list[dict[str, Any]]:
"""Load indicator metadata from the indicateur_metadata cube.
Convenience method for querying the indicator metadata cube.
Args:
dimensions: List of dimensions to fetch. Defaults to basic info.
filters: Optional list of filters to apply.
limit: Maximum number of results.
Returns:
List of indicator metadata records.
"""
if dimensions is None:
dimensions = [
"indicateur_metadata.id",
"indicateur_metadata.libelle",
"indicateur_metadata.unite",
"indicateur_metadata.description",
"indicateur_metadata.mailles_disponibles",
"indicateur_metadata.thematique_fnv",
"indicateur_metadata.annees_disponibles",
]
query: dict[str, Any] = {
"dimensions": dimensions,
"limit": limit,
}
if filters:
query["filters"] = filters
result = await self.load(query)
return result.get("data", [])
async def load_sources_metadata(
self,
indicator_id: int | None = None,
limit: int = 100,
) -> list[dict[str, Any]]:
"""Load source metadata from the indicateur_x_source_metadata cube.
Args:
indicator_id: Optional indicator ID to filter sources.
limit: Maximum number of results.
Returns:
List of source metadata records.
"""
dimensions = [
"indicateur_x_source_metadata.id_indicateur",
"indicateur_x_source_metadata.nom_source",
"indicateur_x_source_metadata.libelle",
"indicateur_x_source_metadata.description",
"indicateur_x_source_metadata.producteur_source",
"indicateur_x_source_metadata.distributeur_source",
"indicateur_x_source_metadata.license_source",
"indicateur_x_source_metadata.lien_page",
"indicateur_x_source_metadata.date_derniere_extraction",
]
query: dict[str, Any] = {
"dimensions": dimensions,
"limit": limit,
}
if indicator_id is not None:
query["filters"] = [
{
"member": "indicateur_x_source_metadata.id_indicateur",
"operator": "equals",
"values": [str(indicator_id)],
}
]
result = await self.load(query)
return result.get("data", [])
async def search_indicators_by_libelle(
self,
search_term: str,
limit: int = 50,
) -> list[dict[str, Any]]:
"""Search indicators by keyword in libelle using contains filter.
This uses Cube.js contains operator for server-side filtering.
Note: Limited to single term, for multi-term use client-side filtering.
Args:
search_term: Term to search for in indicator libelle.
limit: Maximum number of results.
Returns:
List of matching indicator metadata records.
"""
query: dict[str, Any] = {
"dimensions": [
"indicateur_metadata.id",
"indicateur_metadata.libelle",
"indicateur_metadata.description",
"indicateur_metadata.unite",
"indicateur_metadata.mailles_disponibles",
"indicateur_metadata.thematique_fnv",
],
"filters": [
{
"member": "indicateur_metadata.libelle",
"operator": "contains",
"values": [search_term],
}
],
"limit": limit,
}
result = await self.load(query)
return result.get("data", [])
# Singleton instance for the application
_client_instance: CubeJsClient | None = None
def get_client() -> CubeJsClient:
"""Get or create the singleton CubeJsClient instance.
Returns:
The shared CubeJsClient instance.
"""
global _client_instance
if _client_instance is None:
_client_instance = CubeJsClient()
return _client_instance
async def close_client() -> None:
"""Close the singleton client instance."""
global _client_instance
if _client_instance is not None:
await _client_instance.close()
_client_instance = None