rdune71's picture
Implement comprehensive analytics system for UI/UX improvements
fc3fdb8
import json
import time
import logging
from typing import Dict, List, Any, Optional
from datetime import datetime, timedelta
from core.redis_client import redis_client
logger = logging.getLogger(__name__)
class RedisAnalyticsLogger:
"""Redis-based analytics storage system"""
def __init__(self):
self.redis_client = redis_client.get_client()
def store_event(self, event_type: str, data: Dict[str, Any], user_id: Optional[str] = None):
"""Store an analytics event in Redis"""
try:
event_data = {
"event_type": event_type,
"timestamp": datetime.now().isoformat(),
"data": data
}
# Create unique key with timestamp
timestamp = int(time.time() * 1000) # Millisecond precision
key = f"analytics:{event_type}:{timestamp}"
if user_id:
key = f"analytics:{user_id}:{event_type}:{timestamp}"
# Store event data
self.redis_client.setex(key, 2592000, json.dumps(event_data)) # 30 days expiry
# Add to sorted set for time-based queries
index_key = f"analytics:index:{event_type}"
self.redis_client.zadd(index_key, {key: timestamp})
logger.debug(f"Stored analytics event: {key}")
return True
except Exception as e:
logger.error(f"Failed to store analytics event: {e}")
return False
def get_events(self, event_type: str, user_id: Optional[str] = None,
start_time: Optional[datetime] = None,
end_time: Optional[datetime] = None,
limit: int = 100) -> List[Dict[str, Any]]:
"""Retrieve analytics events"""
try:
# Determine index key
index_key = f"analytics:index:{event_type}"
if user_id:
index_key = f"analytics:{user_id}:index:{event_type}"
# Calculate time range
if start_time is None:
start_time = datetime.now() - timedelta(days=30)
if end_time is None:
end_time = datetime.now()
start_timestamp = int(start_time.timestamp() * 1000)
end_timestamp = int(end_time.timestamp() * 1000)
# Get event keys in time range
event_keys = self.redis_client.zrevrangebyscore(
index_key,
end_timestamp,
start_timestamp,
start=0,
num=limit
)
# Retrieve event data
events = []
for key in event_keys:
try:
data = self.redis_client.get(key)
if data:
events.append(json.loads(data))
except Exception as e:
logger.warning(f"Failed to retrieve event {key}: {e}")
return events
except Exception as e:
logger.error(f"Failed to retrieve analytics events: {e}")
return []
def get_event_count(self, event_type: str, user_id: Optional[str] = None,
start_time: Optional[datetime] = None,
end_time: Optional[datetime] = None) -> int:
"""Get count of events in time range"""
try:
index_key = f"analytics:index:{event_type}"
if user_id:
index_key = f"analytics:{user_id}:index:{event_type}"
if start_time is None:
start_time = datetime.now() - timedelta(days=30)
if end_time is None:
end_time = datetime.now()
start_timestamp = int(start_time.timestamp() * 1000)
end_timestamp = int(end_time.timestamp() * 1000)
return self.redis_client.zcount(index_key, start_timestamp, end_timestamp)
except Exception as e:
logger.error(f"Failed to count analytics events: {e}")
return 0
def aggregate_events(self, event_type: str, aggregation_field: str,
start_time: Optional[datetime] = None,
end_time: Optional[datetime] = None) -> Dict[str, int]:
"""Aggregate events by a specific field"""
try:
events = self.get_events(event_type, start_time=start_time, end_time=end_time)
aggregation = {}
for event in events:
data = event.get("data", {})
field_value = data.get(aggregation_field)
if field_value:
aggregation[field_value] = aggregation.get(field_value, 0) + 1
return aggregation
except Exception as e:
logger.error(f"Failed to aggregate events: {e}")
return {}
# Global instance
redis_logger = RedisAnalyticsLogger()