Spaces:
Running
Running
""" | |
Multi-level caching system with memory and disk storage, TTL, and LRU eviction. | |
""" | |
import os | |
import pickle | |
import time | |
import threading | |
from pathlib import Path | |
from typing import Any, Dict, Optional, Set, Tuple | |
from collections import OrderedDict | |
import hashlib | |
import json | |
from .error_handler import ResourceError | |
class CacheEntry: | |
"""Represents a cache entry with metadata.""" | |
def __init__(self, value: Any, ttl: Optional[int] = None): | |
self.value = value | |
self.created_at = time.time() | |
self.last_accessed = self.created_at | |
self.access_count = 0 | |
self.ttl = ttl | |
def is_expired(self) -> bool: | |
"""Check if cache entry has expired.""" | |
if self.ttl is None: | |
return False | |
return time.time() - self.created_at > self.ttl | |
def touch(self) -> None: | |
"""Update access time and count.""" | |
self.last_accessed = time.time() | |
self.access_count += 1 | |
class CacheManager: | |
"""Multi-level cache manager with memory and disk storage.""" | |
def __init__(self, config: Dict[str, Any]): | |
self.config = config | |
self.cache_config = config.get("cache", {}) | |
# Configuration | |
self.memory_cache_size = self.cache_config.get("embedding_cache_size", 10000) | |
self.default_ttl = self.cache_config.get("cache_ttl", 3600) | |
self.enable_disk_cache = self.cache_config.get("enable_disk_cache", True) | |
self.cache_dir = Path(self.cache_config.get("cache_dir", "./cache")) | |
# Memory cache (LRU) | |
self._memory_cache: OrderedDict[str, CacheEntry] = OrderedDict() | |
self._lock = threading.RLock() | |
# Disk cache setup | |
if self.enable_disk_cache: | |
self.cache_dir.mkdir(parents=True, exist_ok=True) | |
self._disk_cache_index = self._load_disk_cache_index() | |
else: | |
self._disk_cache_index = {} | |
# Statistics | |
self.stats = { | |
"memory_hits": 0, | |
"disk_hits": 0, | |
"misses": 0, | |
"evictions": 0, | |
"disk_writes": 0, | |
"disk_reads": 0, | |
"errors": 0 | |
} | |
def get(self, key: str) -> Optional[Any]: | |
"""Get value from cache.""" | |
with self._lock: | |
# Check memory cache first | |
if key in self._memory_cache: | |
entry = self._memory_cache[key] | |
if entry.is_expired(): | |
del self._memory_cache[key] | |
else: | |
# Move to end (most recently used) | |
self._memory_cache.move_to_end(key) | |
entry.touch() | |
self.stats["memory_hits"] += 1 | |
return entry.value | |
# Check disk cache | |
if self.enable_disk_cache and key in self._disk_cache_index: | |
try: | |
value = self._load_from_disk(key) | |
if value is not None: | |
# Add to memory cache | |
self._set_memory_cache(key, value, self.default_ttl) | |
self.stats["disk_hits"] += 1 | |
return value | |
except Exception as e: | |
self.stats["errors"] += 1 | |
print(f"Error loading from disk cache: {e}") | |
self.stats["misses"] += 1 | |
return None | |
def set(self, key: str, value: Any, ttl: Optional[int] = None) -> None: | |
"""Set value in cache.""" | |
ttl = ttl if ttl is not None else self.default_ttl | |
with self._lock: | |
# Set in memory cache | |
self._set_memory_cache(key, value, ttl) | |
# Set in disk cache if enabled | |
if self.enable_disk_cache: | |
try: | |
self._save_to_disk(key, value, ttl) | |
except Exception as e: | |
self.stats["errors"] += 1 | |
print(f"Error saving to disk cache: {e}") | |
def _set_memory_cache(self, key: str, value: Any, ttl: Optional[int]) -> None: | |
"""Set value in memory cache with LRU eviction.""" | |
# Remove if already exists | |
if key in self._memory_cache: | |
del self._memory_cache[key] | |
# Add new entry | |
self._memory_cache[key] = CacheEntry(value, ttl) | |
# Enforce size limit | |
while len(self._memory_cache) > self.memory_cache_size: | |
oldest_key = next(iter(self._memory_cache)) | |
del self._memory_cache[oldest_key] | |
self.stats["evictions"] += 1 | |
def delete(self, key: str) -> bool: | |
"""Delete key from cache.""" | |
with self._lock: | |
removed = False | |
# Remove from memory cache | |
if key in self._memory_cache: | |
del self._memory_cache[key] | |
removed = True | |
# Remove from disk cache | |
if self.enable_disk_cache and key in self._disk_cache_index: | |
try: | |
cache_file = self._get_cache_file_path(key) | |
if cache_file.exists(): | |
cache_file.unlink() | |
del self._disk_cache_index[key] | |
self._save_disk_cache_index() | |
removed = True | |
except Exception as e: | |
self.stats["errors"] += 1 | |
print(f"Error deleting from disk cache: {e}") | |
return removed | |
def clear(self) -> None: | |
"""Clear all cache entries.""" | |
with self._lock: | |
# Clear memory cache | |
self._memory_cache.clear() | |
# Clear disk cache | |
if self.enable_disk_cache: | |
try: | |
for cache_file in self.cache_dir.glob("cache_*.pkl"): | |
cache_file.unlink() | |
self._disk_cache_index.clear() | |
self._save_disk_cache_index() | |
except Exception as e: | |
self.stats["errors"] += 1 | |
print(f"Error clearing disk cache: {e}") | |
def _save_to_disk(self, key: str, value: Any, ttl: Optional[int]) -> None: | |
"""Save value to disk cache.""" | |
cache_file = self._get_cache_file_path(key) | |
cache_data = { | |
"value": value, | |
"created_at": time.time(), | |
"ttl": ttl | |
} | |
with open(cache_file, "wb") as f: | |
pickle.dump(cache_data, f, protocol=pickle.HIGHEST_PROTOCOL) | |
# Update index | |
self._disk_cache_index[key] = { | |
"file": cache_file.name, | |
"created_at": cache_data["created_at"], | |
"ttl": ttl | |
} | |
self._save_disk_cache_index() | |
self.stats["disk_writes"] += 1 | |
def _load_from_disk(self, key: str) -> Optional[Any]: | |
"""Load value from disk cache.""" | |
if key not in self._disk_cache_index: | |
return None | |
cache_info = self._disk_cache_index[key] | |
# Check if expired | |
if cache_info["ttl"] is not None: | |
age = time.time() - cache_info["created_at"] | |
if age > cache_info["ttl"]: | |
# Remove expired entry | |
self.delete(key) | |
return None | |
cache_file = self.cache_dir / cache_info["file"] | |
if not cache_file.exists(): | |
# Remove from index if file doesn't exist | |
del self._disk_cache_index[key] | |
self._save_disk_cache_index() | |
return None | |
try: | |
with open(cache_file, "rb") as f: | |
cache_data = pickle.load(f) | |
self.stats["disk_reads"] += 1 | |
return cache_data["value"] | |
except Exception: | |
# Remove corrupted cache file | |
try: | |
cache_file.unlink() | |
del self._disk_cache_index[key] | |
self._save_disk_cache_index() | |
except Exception: | |
pass | |
return None | |
def _get_cache_file_path(self, key: str) -> Path: | |
"""Get cache file path for key.""" | |
# Hash key to create valid filename | |
key_hash = hashlib.md5(key.encode()).hexdigest() | |
return self.cache_dir / f"cache_{key_hash}.pkl" | |
def _load_disk_cache_index(self) -> Dict[str, Dict[str, Any]]: | |
"""Load disk cache index.""" | |
index_file = self.cache_dir / "cache_index.json" | |
if not index_file.exists(): | |
return {} | |
try: | |
with open(index_file, "r") as f: | |
return json.load(f) | |
except Exception as e: | |
print(f"Error loading cache index: {e}") | |
return {} | |
def _save_disk_cache_index(self) -> None: | |
"""Save disk cache index.""" | |
index_file = self.cache_dir / "cache_index.json" | |
try: | |
with open(index_file, "w") as f: | |
json.dump(self._disk_cache_index, f, indent=2) | |
except Exception as e: | |
print(f"Error saving cache index: {e}") | |
def cleanup_expired(self) -> int: | |
"""Remove expired entries from cache.""" | |
removed_count = 0 | |
with self._lock: | |
# Clean memory cache | |
expired_keys = [] | |
for key, entry in self._memory_cache.items(): | |
if entry.is_expired(): | |
expired_keys.append(key) | |
for key in expired_keys: | |
del self._memory_cache[key] | |
removed_count += 1 | |
# Clean disk cache | |
if self.enable_disk_cache: | |
expired_disk_keys = [] | |
current_time = time.time() | |
for key, info in self._disk_cache_index.items(): | |
if info["ttl"] is not None: | |
age = current_time - info["created_at"] | |
if age > info["ttl"]: | |
expired_disk_keys.append(key) | |
for key in expired_disk_keys: | |
try: | |
cache_file = self.cache_dir / self._disk_cache_index[key]["file"] | |
if cache_file.exists(): | |
cache_file.unlink() | |
del self._disk_cache_index[key] | |
removed_count += 1 | |
except Exception as e: | |
print(f"Error removing expired cache file: {e}") | |
if expired_disk_keys: | |
self._save_disk_cache_index() | |
return removed_count | |
def get_size_info(self) -> Dict[str, Any]: | |
"""Get cache size information.""" | |
with self._lock: | |
memory_size = len(self._memory_cache) | |
disk_size = len(self._disk_cache_index) | |
# Calculate disk usage | |
disk_usage = 0 | |
if self.enable_disk_cache and self.cache_dir.exists(): | |
try: | |
for cache_file in self.cache_dir.glob("cache_*.pkl"): | |
disk_usage += cache_file.stat().st_size | |
except Exception: | |
pass | |
return { | |
"memory_entries": memory_size, | |
"disk_entries": disk_size, | |
"disk_usage_bytes": disk_usage, | |
"disk_usage_mb": disk_usage / (1024 * 1024), | |
"memory_limit": self.memory_cache_size | |
} | |
def get_stats(self) -> Dict[str, Any]: | |
"""Get cache statistics.""" | |
stats = self.stats.copy() | |
total_requests = stats["memory_hits"] + stats["disk_hits"] + stats["misses"] | |
if total_requests > 0: | |
stats["hit_rate"] = (stats["memory_hits"] + stats["disk_hits"]) / total_requests | |
stats["memory_hit_rate"] = stats["memory_hits"] / total_requests | |
stats["disk_hit_rate"] = stats["disk_hits"] / total_requests | |
else: | |
stats["hit_rate"] = 0 | |
stats["memory_hit_rate"] = 0 | |
stats["disk_hit_rate"] = 0 | |
stats["size_info"] = self.get_size_info() | |
return stats | |
def optimize(self) -> Dict[str, int]: | |
"""Optimize cache by removing expired entries and compacting.""" | |
expired_removed = self.cleanup_expired() | |
# Could add more optimization like defragmenting disk cache | |
return { | |
"expired_removed": expired_removed | |
} |