"""Metadata cache for indicators and cube mappings.""" import asyncio import os from datetime import datetime, timedelta from typing import Any from .api_client import CubeJsClient, get_client from .cube_resolver import CubeResolver, get_resolver from .models import IndicatorMetadata, IndicatorListItem class IndicatorCache: """Cache for indicator metadata and cube resolution. This cache stores indicator metadata loaded at startup and periodically refreshes to pick up new indicators. It also initializes the CubeResolver for mapping indicator IDs to data cubes. """ def __init__( self, refresh_interval_seconds: int | None = None, ): """Initialize the cache. Args: refresh_interval_seconds: How often to refresh the cache. Defaults to CACHE_REFRESH_SECONDS env var or 3600 (1 hour). """ self.refresh_interval = timedelta( seconds=refresh_interval_seconds or int(os.getenv("CACHE_REFRESH_SECONDS", "3600")) ) # Indicator metadata by ID self._indicators: dict[int, IndicatorMetadata] = {} # Reference to the cube resolver self._resolver: CubeResolver = get_resolver() # Last refresh timestamp self._last_refresh: datetime | None = None # Lock for thread-safe refresh self._refresh_lock = asyncio.Lock() # Flag to indicate if initial load is complete self._initialized = False @property def is_initialized(self) -> bool: """Check if the cache has been initialized.""" return self._initialized @property def needs_refresh(self) -> bool: """Check if the cache needs to be refreshed.""" if not self._initialized or self._last_refresh is None: return True return datetime.now() - self._last_refresh > self.refresh_interval @property def indicators(self) -> dict[int, IndicatorMetadata]: """Get all cached indicators.""" return self._indicators.copy() @property def resolver(self) -> CubeResolver: """Get the cube resolver instance.""" return self._resolver async def initialize(self, client: CubeJsClient | None = None) -> None: """Initialize the cache with data from the API. This should be called at application startup. Args: client: Optional CubeJsClient instance. If not provided, uses the singleton instance. """ if client is None: client = get_client() await self.refresh(client) async def refresh(self, client: CubeJsClient | None = None) -> None: """Refresh the cache from the API. Args: client: Optional CubeJsClient instance. """ async with self._refresh_lock: if client is None: client = get_client() # Load indicator metadata await self._load_indicators(client) # Load and parse /meta for cube resolution await self._load_cube_metadata(client) self._last_refresh = datetime.now() self._initialized = True async def _load_indicators(self, client: CubeJsClient) -> None: """Load all indicator metadata from the API.""" # Note: Some dimensions listed in /meta may not exist in actual data # Only include dimensions that have been validated to work dimensions = [ "indicateur_metadata.id", "indicateur_metadata.libelle", "indicateur_metadata.unite", "indicateur_metadata.description", "indicateur_metadata.methode_calcul", "indicateur_metadata.annees_disponibles", "indicateur_metadata.mailles_disponibles", "indicateur_metadata.maille_mini_disponible", "indicateur_metadata.couverture_geographique", "indicateur_metadata.completion_region", "indicateur_metadata.completion_departement", "indicateur_metadata.completion_epci", "indicateur_metadata.completion_commune", "indicateur_metadata.thematique_fnv", # Note: secteur_fnv, enjeux_fnv, levier_fnv cause errors despite being in schema ] data = await client.load_indicators_metadata( dimensions=dimensions, limit=1000, # Should be enough for all indicators ) self._indicators.clear() for row in data: try: indicator = IndicatorMetadata.from_api_response(row) self._indicators[indicator.id] = indicator except Exception as e: # Log but don't fail on individual indicator parsing errors print(f"Warning: Failed to parse indicator: {e}") async def _load_cube_metadata(self, client: CubeJsClient) -> None: """Load cube metadata from /meta and initialize the resolver.""" meta = await client.get_meta() self._resolver.load_from_meta(meta) def get_indicator(self, indicator_id: int) -> IndicatorMetadata | None: """Get indicator metadata by ID. Args: indicator_id: The indicator ID. Returns: The indicator metadata, or None if not found. """ return self._indicators.get(indicator_id) def get_cube_name(self, indicator_id: int, maille: str) -> str | None: """Get the data cube name for an indicator at a specific maille. Args: indicator_id: The indicator ID. maille: The geographic level. Returns: The cube name, or None if not found. """ return self._resolver.find_cube_for_indicator(indicator_id, maille) def list_indicators( self, thematique: str | None = None, maille: str | None = None, ) -> list[IndicatorListItem]: """List indicators with optional filtering. Args: thematique: Filter by thematique_fnv (case-insensitive partial match). maille: Filter by available geographic level. Returns: List of matching indicators. """ results = [] for indicator in self._indicators.values(): # Apply thematique filter if thematique: if not indicator.thematique_fnv: continue if thematique.lower() not in indicator.thematique_fnv.lower(): continue # Apply maille filter if maille: if not indicator.has_geographic_level(maille): continue results.append( IndicatorListItem( id=indicator.id, libelle=indicator.libelle, unite=indicator.unite, mailles_disponibles=indicator.mailles_disponibles, thematique_fnv=indicator.thematique_fnv, ) ) # Sort by ID for consistent ordering results.sort(key=lambda x: x.id) return results def search_indicators(self, query: str) -> list[IndicatorListItem]: """Search indicators by keyword. Searches in libelle and description fields (case-insensitive). Args: query: Search query string. Returns: List of matching indicators. """ if not query or not query.strip(): return self.list_indicators() query_lower = query.lower().strip() query_words = query_lower.split() results = [] for indicator in self._indicators.values(): # Search in libelle and description searchable = " ".join( filter(None, [indicator.libelle, indicator.description]) ).lower() # Check if all query words are present if all(word in searchable for word in query_words): results.append( IndicatorListItem( id=indicator.id, libelle=indicator.libelle, unite=indicator.unite, mailles_disponibles=indicator.mailles_disponibles, thematique_fnv=indicator.thematique_fnv, ) ) # Sort by relevance (exact match in libelle first, then by ID) def sort_key(item: IndicatorListItem) -> tuple[int, int]: exact_match = 0 if query_lower in item.libelle.lower() else 1 return (exact_match, item.id) results.sort(key=sort_key) return results # Singleton cache instance _cache_instance: IndicatorCache | None = None def get_cache() -> IndicatorCache: """Get or create the singleton IndicatorCache instance. Returns: The shared IndicatorCache instance. """ global _cache_instance if _cache_instance is None: _cache_instance = IndicatorCache() return _cache_instance async def initialize_cache(client: CubeJsClient | None = None) -> IndicatorCache: """Initialize the singleton cache. This should be called at application startup. Args: client: Optional CubeJsClient instance. Returns: The initialized cache. """ cache = get_cache() if not cache.is_initialized: await cache.initialize(client) return cache async def refresh_cache_if_needed(client: CubeJsClient | None = None) -> None: """Refresh the cache if it's stale. Args: client: Optional CubeJsClient instance. """ cache = get_cache() if cache.needs_refresh: await cache.refresh(client)