|
import json |
|
import time |
|
import logging |
|
from typing import Dict, List, Any, Optional |
|
from datetime import datetime, timedelta |
|
from core.redis_client import redis_client |
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
class RedisAnalyticsLogger: |
|
"""Redis-based analytics storage system""" |
|
|
|
def __init__(self): |
|
self.redis_client = redis_client.get_client() |
|
|
|
def store_event(self, event_type: str, data: Dict[str, Any], user_id: Optional[str] = None): |
|
"""Store an analytics event in Redis""" |
|
try: |
|
event_data = { |
|
"event_type": event_type, |
|
"timestamp": datetime.now().isoformat(), |
|
"data": data |
|
} |
|
|
|
|
|
timestamp = int(time.time() * 1000) |
|
key = f"analytics:{event_type}:{timestamp}" |
|
if user_id: |
|
key = f"analytics:{user_id}:{event_type}:{timestamp}" |
|
|
|
|
|
self.redis_client.setex(key, 2592000, json.dumps(event_data)) |
|
|
|
|
|
index_key = f"analytics:index:{event_type}" |
|
self.redis_client.zadd(index_key, {key: timestamp}) |
|
|
|
logger.debug(f"Stored analytics event: {key}") |
|
return True |
|
except Exception as e: |
|
logger.error(f"Failed to store analytics event: {e}") |
|
return False |
|
|
|
def get_events(self, event_type: str, user_id: Optional[str] = None, |
|
start_time: Optional[datetime] = None, |
|
end_time: Optional[datetime] = None, |
|
limit: int = 100) -> List[Dict[str, Any]]: |
|
"""Retrieve analytics events""" |
|
try: |
|
|
|
index_key = f"analytics:index:{event_type}" |
|
if user_id: |
|
index_key = f"analytics:{user_id}:index:{event_type}" |
|
|
|
|
|
if start_time is None: |
|
start_time = datetime.now() - timedelta(days=30) |
|
if end_time is None: |
|
end_time = datetime.now() |
|
|
|
start_timestamp = int(start_time.timestamp() * 1000) |
|
end_timestamp = int(end_time.timestamp() * 1000) |
|
|
|
|
|
event_keys = self.redis_client.zrevrangebyscore( |
|
index_key, |
|
end_timestamp, |
|
start_timestamp, |
|
start=0, |
|
num=limit |
|
) |
|
|
|
|
|
events = [] |
|
for key in event_keys: |
|
try: |
|
data = self.redis_client.get(key) |
|
if data: |
|
events.append(json.loads(data)) |
|
except Exception as e: |
|
logger.warning(f"Failed to retrieve event {key}: {e}") |
|
|
|
return events |
|
except Exception as e: |
|
logger.error(f"Failed to retrieve analytics events: {e}") |
|
return [] |
|
|
|
def get_event_count(self, event_type: str, user_id: Optional[str] = None, |
|
start_time: Optional[datetime] = None, |
|
end_time: Optional[datetime] = None) -> int: |
|
"""Get count of events in time range""" |
|
try: |
|
index_key = f"analytics:index:{event_type}" |
|
if user_id: |
|
index_key = f"analytics:{user_id}:index:{event_type}" |
|
|
|
if start_time is None: |
|
start_time = datetime.now() - timedelta(days=30) |
|
if end_time is None: |
|
end_time = datetime.now() |
|
|
|
start_timestamp = int(start_time.timestamp() * 1000) |
|
end_timestamp = int(end_time.timestamp() * 1000) |
|
|
|
return self.redis_client.zcount(index_key, start_timestamp, end_timestamp) |
|
except Exception as e: |
|
logger.error(f"Failed to count analytics events: {e}") |
|
return 0 |
|
|
|
def aggregate_events(self, event_type: str, aggregation_field: str, |
|
start_time: Optional[datetime] = None, |
|
end_time: Optional[datetime] = None) -> Dict[str, int]: |
|
"""Aggregate events by a specific field""" |
|
try: |
|
events = self.get_events(event_type, start_time=start_time, end_time=end_time) |
|
aggregation = {} |
|
|
|
for event in events: |
|
data = event.get("data", {}) |
|
field_value = data.get(aggregation_field) |
|
if field_value: |
|
aggregation[field_value] = aggregation.get(field_value, 0) + 1 |
|
|
|
return aggregation |
|
except Exception as e: |
|
logger.error(f"Failed to aggregate events: {e}") |
|
return {} |
|
|
|
|
|
redis_logger = RedisAnalyticsLogger() |
|
|