File size: 5,111 Bytes
fc3fdb8 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 |
import json
import time
import logging
from typing import Dict, List, Any, Optional
from datetime import datetime, timedelta
from core.redis_client import redis_client
logger = logging.getLogger(__name__)
class RedisAnalyticsLogger:
"""Redis-based analytics storage system"""
def __init__(self):
self.redis_client = redis_client.get_client()
def store_event(self, event_type: str, data: Dict[str, Any], user_id: Optional[str] = None):
"""Store an analytics event in Redis"""
try:
event_data = {
"event_type": event_type,
"timestamp": datetime.now().isoformat(),
"data": data
}
# Create unique key with timestamp
timestamp = int(time.time() * 1000) # Millisecond precision
key = f"analytics:{event_type}:{timestamp}"
if user_id:
key = f"analytics:{user_id}:{event_type}:{timestamp}"
# Store event data
self.redis_client.setex(key, 2592000, json.dumps(event_data)) # 30 days expiry
# Add to sorted set for time-based queries
index_key = f"analytics:index:{event_type}"
self.redis_client.zadd(index_key, {key: timestamp})
logger.debug(f"Stored analytics event: {key}")
return True
except Exception as e:
logger.error(f"Failed to store analytics event: {e}")
return False
def get_events(self, event_type: str, user_id: Optional[str] = None,
start_time: Optional[datetime] = None,
end_time: Optional[datetime] = None,
limit: int = 100) -> List[Dict[str, Any]]:
"""Retrieve analytics events"""
try:
# Determine index key
index_key = f"analytics:index:{event_type}"
if user_id:
index_key = f"analytics:{user_id}:index:{event_type}"
# Calculate time range
if start_time is None:
start_time = datetime.now() - timedelta(days=30)
if end_time is None:
end_time = datetime.now()
start_timestamp = int(start_time.timestamp() * 1000)
end_timestamp = int(end_time.timestamp() * 1000)
# Get event keys in time range
event_keys = self.redis_client.zrevrangebyscore(
index_key,
end_timestamp,
start_timestamp,
start=0,
num=limit
)
# Retrieve event data
events = []
for key in event_keys:
try:
data = self.redis_client.get(key)
if data:
events.append(json.loads(data))
except Exception as e:
logger.warning(f"Failed to retrieve event {key}: {e}")
return events
except Exception as e:
logger.error(f"Failed to retrieve analytics events: {e}")
return []
def get_event_count(self, event_type: str, user_id: Optional[str] = None,
start_time: Optional[datetime] = None,
end_time: Optional[datetime] = None) -> int:
"""Get count of events in time range"""
try:
index_key = f"analytics:index:{event_type}"
if user_id:
index_key = f"analytics:{user_id}:index:{event_type}"
if start_time is None:
start_time = datetime.now() - timedelta(days=30)
if end_time is None:
end_time = datetime.now()
start_timestamp = int(start_time.timestamp() * 1000)
end_timestamp = int(end_time.timestamp() * 1000)
return self.redis_client.zcount(index_key, start_timestamp, end_timestamp)
except Exception as e:
logger.error(f"Failed to count analytics events: {e}")
return 0
def aggregate_events(self, event_type: str, aggregation_field: str,
start_time: Optional[datetime] = None,
end_time: Optional[datetime] = None) -> Dict[str, int]:
"""Aggregate events by a specific field"""
try:
events = self.get_events(event_type, start_time=start_time, end_time=end_time)
aggregation = {}
for event in events:
data = event.get("data", {})
field_value = data.get(aggregation_field)
if field_value:
aggregation[field_value] = aggregation.get(field_value, 0) + 1
return aggregation
except Exception as e:
logger.error(f"Failed to aggregate events: {e}")
return {}
# Global instance
redis_logger = RedisAnalyticsLogger()
|