Spaces:
Running
Running
| """Metadata cache for indicators and cube mappings.""" | |
| import asyncio | |
| import os | |
| from datetime import datetime, timedelta | |
| from typing import Any | |
| from .api_client import CubeJsClient, get_client | |
| from .cube_resolver import CubeResolver, get_resolver | |
| from .models import IndicatorMetadata, IndicatorListItem | |
| class IndicatorCache: | |
| """Cache for indicator metadata and cube resolution. | |
| This cache stores indicator metadata loaded at startup and periodically | |
| refreshes to pick up new indicators. It also initializes the CubeResolver | |
| for mapping indicator IDs to data cubes. | |
| """ | |
| def __init__( | |
| self, | |
| refresh_interval_seconds: int | None = None, | |
| ): | |
| """Initialize the cache. | |
| Args: | |
| refresh_interval_seconds: How often to refresh the cache. | |
| Defaults to CACHE_REFRESH_SECONDS env var or 3600 (1 hour). | |
| """ | |
| self.refresh_interval = timedelta( | |
| seconds=refresh_interval_seconds | |
| or int(os.getenv("CACHE_REFRESH_SECONDS", "3600")) | |
| ) | |
| # Indicator metadata by ID | |
| self._indicators: dict[int, IndicatorMetadata] = {} | |
| # Reference to the cube resolver | |
| self._resolver: CubeResolver = get_resolver() | |
| # Last refresh timestamp | |
| self._last_refresh: datetime | None = None | |
| # Lock for thread-safe refresh | |
| self._refresh_lock = asyncio.Lock() | |
| # Flag to indicate if initial load is complete | |
| self._initialized = False | |
| def is_initialized(self) -> bool: | |
| """Check if the cache has been initialized.""" | |
| return self._initialized | |
| def needs_refresh(self) -> bool: | |
| """Check if the cache needs to be refreshed.""" | |
| if not self._initialized or self._last_refresh is None: | |
| return True | |
| return datetime.now() - self._last_refresh > self.refresh_interval | |
| def indicators(self) -> dict[int, IndicatorMetadata]: | |
| """Get all cached indicators.""" | |
| return self._indicators.copy() | |
| def resolver(self) -> CubeResolver: | |
| """Get the cube resolver instance.""" | |
| return self._resolver | |
| async def initialize(self, client: CubeJsClient | None = None) -> None: | |
| """Initialize the cache with data from the API. | |
| This should be called at application startup. | |
| Args: | |
| client: Optional CubeJsClient instance. If not provided, | |
| uses the singleton instance. | |
| """ | |
| if client is None: | |
| client = get_client() | |
| await self.refresh(client) | |
| async def refresh(self, client: CubeJsClient | None = None) -> None: | |
| """Refresh the cache from the API. | |
| Args: | |
| client: Optional CubeJsClient instance. | |
| """ | |
| async with self._refresh_lock: | |
| if client is None: | |
| client = get_client() | |
| # Load indicator metadata | |
| await self._load_indicators(client) | |
| # Load and parse /meta for cube resolution | |
| await self._load_cube_metadata(client) | |
| self._last_refresh = datetime.now() | |
| self._initialized = True | |
| async def _load_indicators(self, client: CubeJsClient) -> None: | |
| """Load all indicator metadata from the API.""" | |
| # Note: Some dimensions listed in /meta may not exist in actual data | |
| # Only include dimensions that have been validated to work | |
| dimensions = [ | |
| "indicateur_metadata.id", | |
| "indicateur_metadata.libelle", | |
| "indicateur_metadata.unite", | |
| "indicateur_metadata.description", | |
| "indicateur_metadata.methode_calcul", | |
| "indicateur_metadata.annees_disponibles", | |
| "indicateur_metadata.mailles_disponibles", | |
| "indicateur_metadata.maille_mini_disponible", | |
| "indicateur_metadata.couverture_geographique", | |
| "indicateur_metadata.completion_region", | |
| "indicateur_metadata.completion_departement", | |
| "indicateur_metadata.completion_epci", | |
| "indicateur_metadata.completion_commune", | |
| "indicateur_metadata.thematique_fnv", | |
| # Note: secteur_fnv, enjeux_fnv, levier_fnv cause errors despite being in schema | |
| ] | |
| data = await client.load_indicators_metadata( | |
| dimensions=dimensions, | |
| limit=1000, # Should be enough for all indicators | |
| ) | |
| self._indicators.clear() | |
| for row in data: | |
| try: | |
| indicator = IndicatorMetadata.from_api_response(row) | |
| self._indicators[indicator.id] = indicator | |
| except Exception as e: | |
| # Log but don't fail on individual indicator parsing errors | |
| print(f"Warning: Failed to parse indicator: {e}") | |
| async def _load_cube_metadata(self, client: CubeJsClient) -> None: | |
| """Load cube metadata from /meta and initialize the resolver.""" | |
| meta = await client.get_meta() | |
| self._resolver.load_from_meta(meta) | |
| def get_indicator(self, indicator_id: int) -> IndicatorMetadata | None: | |
| """Get indicator metadata by ID. | |
| Args: | |
| indicator_id: The indicator ID. | |
| Returns: | |
| The indicator metadata, or None if not found. | |
| """ | |
| return self._indicators.get(indicator_id) | |
| def get_cube_name(self, indicator_id: int, maille: str) -> str | None: | |
| """Get the data cube name for an indicator at a specific maille. | |
| Args: | |
| indicator_id: The indicator ID. | |
| maille: The geographic level. | |
| Returns: | |
| The cube name, or None if not found. | |
| """ | |
| return self._resolver.find_cube_for_indicator(indicator_id, maille) | |
| def list_indicators( | |
| self, | |
| thematique: str | None = None, | |
| maille: str | None = None, | |
| ) -> list[IndicatorListItem]: | |
| """List indicators with optional filtering. | |
| Args: | |
| thematique: Filter by thematique_fnv (case-insensitive partial match). | |
| maille: Filter by available geographic level. | |
| Returns: | |
| List of matching indicators. | |
| """ | |
| results = [] | |
| for indicator in self._indicators.values(): | |
| # Apply thematique filter | |
| if thematique: | |
| if not indicator.thematique_fnv: | |
| continue | |
| if thematique.lower() not in indicator.thematique_fnv.lower(): | |
| continue | |
| # Apply maille filter | |
| if maille: | |
| if not indicator.has_geographic_level(maille): | |
| continue | |
| results.append( | |
| IndicatorListItem( | |
| id=indicator.id, | |
| libelle=indicator.libelle, | |
| unite=indicator.unite, | |
| mailles_disponibles=indicator.mailles_disponibles, | |
| thematique_fnv=indicator.thematique_fnv, | |
| ) | |
| ) | |
| # Sort by ID for consistent ordering | |
| results.sort(key=lambda x: x.id) | |
| return results | |
| def search_indicators(self, query: str) -> list[IndicatorListItem]: | |
| """Search indicators by keyword. | |
| Searches in libelle and description fields (case-insensitive). | |
| Args: | |
| query: Search query string. | |
| Returns: | |
| List of matching indicators. | |
| """ | |
| if not query or not query.strip(): | |
| return self.list_indicators() | |
| query_lower = query.lower().strip() | |
| query_words = query_lower.split() | |
| results = [] | |
| for indicator in self._indicators.values(): | |
| # Search in libelle and description | |
| searchable = " ".join( | |
| filter(None, [indicator.libelle, indicator.description]) | |
| ).lower() | |
| # Check if all query words are present | |
| if all(word in searchable for word in query_words): | |
| results.append( | |
| IndicatorListItem( | |
| id=indicator.id, | |
| libelle=indicator.libelle, | |
| unite=indicator.unite, | |
| mailles_disponibles=indicator.mailles_disponibles, | |
| thematique_fnv=indicator.thematique_fnv, | |
| ) | |
| ) | |
| # Sort by relevance (exact match in libelle first, then by ID) | |
| def sort_key(item: IndicatorListItem) -> tuple[int, int]: | |
| exact_match = 0 if query_lower in item.libelle.lower() else 1 | |
| return (exact_match, item.id) | |
| results.sort(key=sort_key) | |
| return results | |
| # Singleton cache instance | |
| _cache_instance: IndicatorCache | None = None | |
| def get_cache() -> IndicatorCache: | |
| """Get or create the singleton IndicatorCache instance. | |
| Returns: | |
| The shared IndicatorCache instance. | |
| """ | |
| global _cache_instance | |
| if _cache_instance is None: | |
| _cache_instance = IndicatorCache() | |
| return _cache_instance | |
| async def initialize_cache(client: CubeJsClient | None = None) -> IndicatorCache: | |
| """Initialize the singleton cache. | |
| This should be called at application startup. | |
| Args: | |
| client: Optional CubeJsClient instance. | |
| Returns: | |
| The initialized cache. | |
| """ | |
| cache = get_cache() | |
| if not cache.is_initialized: | |
| await cache.initialize(client) | |
| return cache | |
| async def refresh_cache_if_needed(client: CubeJsClient | None = None) -> None: | |
| """Refresh the cache if it's stale. | |
| Args: | |
| client: Optional CubeJsClient instance. | |
| """ | |
| cache = get_cache() | |
| if cache.needs_refresh: | |
| await cache.refresh(client) | |