import json import time import logging from typing import Dict, List, Any, Optional from datetime import datetime, timedelta from core.redis_client import redis_client logger = logging.getLogger(__name__) class RedisAnalyticsLogger: """Redis-based analytics storage system""" def __init__(self): self.redis_client = redis_client.get_client() def store_event(self, event_type: str, data: Dict[str, Any], user_id: Optional[str] = None): """Store an analytics event in Redis""" try: event_data = { "event_type": event_type, "timestamp": datetime.now().isoformat(), "data": data } # Create unique key with timestamp timestamp = int(time.time() * 1000) # Millisecond precision key = f"analytics:{event_type}:{timestamp}" if user_id: key = f"analytics:{user_id}:{event_type}:{timestamp}" # Store event data self.redis_client.setex(key, 2592000, json.dumps(event_data)) # 30 days expiry # Add to sorted set for time-based queries index_key = f"analytics:index:{event_type}" self.redis_client.zadd(index_key, {key: timestamp}) logger.debug(f"Stored analytics event: {key}") return True except Exception as e: logger.error(f"Failed to store analytics event: {e}") return False def get_events(self, event_type: str, user_id: Optional[str] = None, start_time: Optional[datetime] = None, end_time: Optional[datetime] = None, limit: int = 100) -> List[Dict[str, Any]]: """Retrieve analytics events""" try: # Determine index key index_key = f"analytics:index:{event_type}" if user_id: index_key = f"analytics:{user_id}:index:{event_type}" # Calculate time range if start_time is None: start_time = datetime.now() - timedelta(days=30) if end_time is None: end_time = datetime.now() start_timestamp = int(start_time.timestamp() * 1000) end_timestamp = int(end_time.timestamp() * 1000) # Get event keys in time range event_keys = self.redis_client.zrevrangebyscore( index_key, end_timestamp, start_timestamp, start=0, num=limit ) # Retrieve event data events = [] for key in event_keys: try: data = self.redis_client.get(key) if data: events.append(json.loads(data)) except Exception as e: logger.warning(f"Failed to retrieve event {key}: {e}") return events except Exception as e: logger.error(f"Failed to retrieve analytics events: {e}") return [] def get_event_count(self, event_type: str, user_id: Optional[str] = None, start_time: Optional[datetime] = None, end_time: Optional[datetime] = None) -> int: """Get count of events in time range""" try: index_key = f"analytics:index:{event_type}" if user_id: index_key = f"analytics:{user_id}:index:{event_type}" if start_time is None: start_time = datetime.now() - timedelta(days=30) if end_time is None: end_time = datetime.now() start_timestamp = int(start_time.timestamp() * 1000) end_timestamp = int(end_time.timestamp() * 1000) return self.redis_client.zcount(index_key, start_timestamp, end_timestamp) except Exception as e: logger.error(f"Failed to count analytics events: {e}") return 0 def aggregate_events(self, event_type: str, aggregation_field: str, start_time: Optional[datetime] = None, end_time: Optional[datetime] = None) -> Dict[str, int]: """Aggregate events by a specific field""" try: events = self.get_events(event_type, start_time=start_time, end_time=end_time) aggregation = {} for event in events: data = event.get("data", {}) field_value = data.get(aggregation_field) if field_value: aggregation[field_value] = aggregation.get(field_value, 0) + 1 return aggregation except Exception as e: logger.error(f"Failed to aggregate events: {e}") return {} # Global instance redis_logger = RedisAnalyticsLogger()