Spaces:
Sleeping
Sleeping
| """HTTP client for the Cube.js API of Indicateurs Territoriaux.""" | |
| import os | |
| from typing import Any | |
| import httpx | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| class CubeJsClientError(Exception): | |
| """Base exception for Cube.js client errors.""" | |
| pass | |
| class AuthenticationError(CubeJsClientError): | |
| """Raised when authentication fails (401).""" | |
| pass | |
| class BadRequestError(CubeJsClientError): | |
| """Raised when the request is malformed (400).""" | |
| pass | |
| class CubeJsClient: | |
| """HTTP client for the Cube.js REST API. | |
| This client handles authentication and provides methods to interact | |
| with the Indicateurs Territoriaux API endpoints. | |
| """ | |
| def __init__( | |
| self, | |
| base_url: str | None = None, | |
| token: str | None = None, | |
| timeout: float = 30.0, | |
| ): | |
| """Initialize the Cube.js client. | |
| Args: | |
| base_url: Base URL of the API. Defaults to env var INDICATEURS_TE_BASE_URL. | |
| token: JWT authentication token. Defaults to env var INDICATEURS_TE_TOKEN. | |
| timeout: Request timeout in seconds. | |
| """ | |
| self.base_url = ( | |
| base_url | |
| or os.getenv("INDICATEURS_TE_BASE_URL") | |
| or "https://api.indicateurs.ecologie.gouv.fr" | |
| ) | |
| self.token = token or os.getenv("INDICATEURS_TE_TOKEN") | |
| if not self.token: | |
| raise ValueError( | |
| "No API token provided. Set INDICATEURS_TE_TOKEN environment variable " | |
| "or pass token parameter." | |
| ) | |
| self.timeout = timeout | |
| self._client: httpx.AsyncClient | None = None | |
| def headers(self) -> dict[str, str]: | |
| """HTTP headers for API requests.""" | |
| return { | |
| "Authorization": f"Bearer {self.token}", | |
| "Content-Type": "application/json", | |
| } | |
| async def _get_client(self) -> httpx.AsyncClient: | |
| """Get or create the async HTTP client.""" | |
| if self._client is None or self._client.is_closed: | |
| self._client = httpx.AsyncClient( | |
| base_url=self.base_url, | |
| headers=self.headers, | |
| timeout=self.timeout, | |
| ) | |
| return self._client | |
| async def close(self) -> None: | |
| """Close the HTTP client.""" | |
| if self._client is not None and not self._client.is_closed: | |
| await self._client.aclose() | |
| self._client = None | |
| async def _handle_response(self, response: httpx.Response) -> dict[str, Any]: | |
| """Handle API response and raise appropriate errors. | |
| Args: | |
| response: The HTTP response object. | |
| Returns: | |
| Parsed JSON response. | |
| Raises: | |
| AuthenticationError: If the token is invalid or expired (401). | |
| BadRequestError: If the request is malformed (400). | |
| CubeJsClientError: For other HTTP errors. | |
| """ | |
| if response.status_code == 401: | |
| raise AuthenticationError( | |
| "Authentication failed. Your API token may be invalid or expired. " | |
| "Please check your INDICATEURS_TE_TOKEN environment variable." | |
| ) | |
| if response.status_code == 400: | |
| try: | |
| error_detail = response.json() | |
| except Exception: | |
| error_detail = response.text | |
| raise BadRequestError( | |
| f"Bad request to API. Details: {error_detail}" | |
| ) | |
| if response.status_code >= 400: | |
| raise CubeJsClientError( | |
| f"API request failed with status {response.status_code}: {response.text}" | |
| ) | |
| return response.json() | |
| async def get_meta(self) -> dict[str, Any]: | |
| """Fetch the API schema metadata. | |
| Returns the complete schema including all cubes, their measures, | |
| dimensions, and available filters. | |
| Returns: | |
| Dict containing the API metadata with 'cubes' key. | |
| Raises: | |
| AuthenticationError: If authentication fails. | |
| CubeJsClientError: For other API errors. | |
| """ | |
| client = await self._get_client() | |
| response = await client.get("/cubejs-api/v1/meta") | |
| return await self._handle_response(response) | |
| async def load(self, query: dict[str, Any]) -> dict[str, Any]: | |
| """Execute a data query against the Cube.js API. | |
| Args: | |
| query: The Cube.js query object containing measures, dimensions, | |
| filters, and other query parameters. | |
| Returns: | |
| Dict containing the query results with 'data' key. | |
| Raises: | |
| AuthenticationError: If authentication fails. | |
| BadRequestError: If the query is malformed. | |
| CubeJsClientError: For other API errors. | |
| Example: | |
| >>> query = { | |
| ... "measures": ["indicateur_metadata.count"], | |
| ... "dimensions": ["indicateur_metadata.id", "indicateur_metadata.libelle"], | |
| ... "limit": 10 | |
| ... } | |
| >>> result = await client.load(query) | |
| """ | |
| client = await self._get_client() | |
| response = await client.post( | |
| "/cubejs-api/v1/load", | |
| json={"query": query}, | |
| ) | |
| return await self._handle_response(response) | |
| async def load_indicators_metadata( | |
| self, | |
| dimensions: list[str] | None = None, | |
| filters: list[dict[str, Any]] | None = None, | |
| limit: int = 500, | |
| ) -> list[dict[str, Any]]: | |
| """Load indicator metadata from the indicateur_metadata cube. | |
| Convenience method for querying the indicator metadata cube. | |
| Args: | |
| dimensions: List of dimensions to fetch. Defaults to basic info. | |
| filters: Optional list of filters to apply. | |
| limit: Maximum number of results. | |
| Returns: | |
| List of indicator metadata records. | |
| """ | |
| if dimensions is None: | |
| dimensions = [ | |
| "indicateur_metadata.id", | |
| "indicateur_metadata.libelle", | |
| "indicateur_metadata.unite", | |
| "indicateur_metadata.description", | |
| "indicateur_metadata.mailles_disponibles", | |
| "indicateur_metadata.thematique_fnv", | |
| "indicateur_metadata.annees_disponibles", | |
| ] | |
| query: dict[str, Any] = { | |
| "dimensions": dimensions, | |
| "limit": limit, | |
| } | |
| if filters: | |
| query["filters"] = filters | |
| result = await self.load(query) | |
| return result.get("data", []) | |
| async def load_sources_metadata( | |
| self, | |
| indicator_id: int | None = None, | |
| limit: int = 100, | |
| ) -> list[dict[str, Any]]: | |
| """Load source metadata from the indicateur_x_source_metadata cube. | |
| Args: | |
| indicator_id: Optional indicator ID to filter sources. | |
| limit: Maximum number of results. | |
| Returns: | |
| List of source metadata records. | |
| """ | |
| dimensions = [ | |
| "indicateur_x_source_metadata.id_indicateur", | |
| "indicateur_x_source_metadata.nom_source", | |
| "indicateur_x_source_metadata.libelle", | |
| "indicateur_x_source_metadata.description", | |
| "indicateur_x_source_metadata.producteur_source", | |
| "indicateur_x_source_metadata.distributeur_source", | |
| "indicateur_x_source_metadata.license_source", | |
| "indicateur_x_source_metadata.lien_page", | |
| "indicateur_x_source_metadata.date_derniere_extraction", | |
| ] | |
| query: dict[str, Any] = { | |
| "dimensions": dimensions, | |
| "limit": limit, | |
| } | |
| if indicator_id is not None: | |
| query["filters"] = [ | |
| { | |
| "member": "indicateur_x_source_metadata.id_indicateur", | |
| "operator": "equals", | |
| "values": [str(indicator_id)], | |
| } | |
| ] | |
| result = await self.load(query) | |
| return result.get("data", []) | |
| async def search_indicators_by_libelle( | |
| self, | |
| search_term: str, | |
| limit: int = 50, | |
| ) -> list[dict[str, Any]]: | |
| """Search indicators by keyword in libelle using contains filter. | |
| This uses Cube.js contains operator for server-side filtering. | |
| Note: Limited to single term, for multi-term use client-side filtering. | |
| Args: | |
| search_term: Term to search for in indicator libelle. | |
| limit: Maximum number of results. | |
| Returns: | |
| List of matching indicator metadata records. | |
| """ | |
| query: dict[str, Any] = { | |
| "dimensions": [ | |
| "indicateur_metadata.id", | |
| "indicateur_metadata.libelle", | |
| "indicateur_metadata.description", | |
| "indicateur_metadata.unite", | |
| "indicateur_metadata.mailles_disponibles", | |
| "indicateur_metadata.thematique_fnv", | |
| ], | |
| "filters": [ | |
| { | |
| "member": "indicateur_metadata.libelle", | |
| "operator": "contains", | |
| "values": [search_term], | |
| } | |
| ], | |
| "limit": limit, | |
| } | |
| result = await self.load(query) | |
| return result.get("data", []) | |
| # Singleton instance for the application | |
| _client_instance: CubeJsClient | None = None | |
| def get_client() -> CubeJsClient: | |
| """Get or create the singleton CubeJsClient instance. | |
| Returns: | |
| The shared CubeJsClient instance. | |
| """ | |
| global _client_instance | |
| if _client_instance is None: | |
| _client_instance = CubeJsClient() | |
| return _client_instance | |
| async def close_client() -> None: | |
| """Close the singleton client instance.""" | |
| global _client_instance | |
| if _client_instance is not None: | |
| await _client_instance.close() | |
| _client_instance = None | |