|
""" |
|
Multi-level caching system with memory and disk storage, TTL, and LRU eviction. |
|
""" |
|
|
|
import os |
|
import pickle |
|
import time |
|
import threading |
|
from pathlib import Path |
|
from typing import Any, Dict, Optional, Set, Tuple |
|
from collections import OrderedDict |
|
import hashlib |
|
import json |
|
|
|
from .error_handler import ResourceError |
|
|
|
|
|
class CacheEntry: |
|
"""Represents a cache entry with metadata.""" |
|
|
|
def __init__(self, value: Any, ttl: Optional[int] = None): |
|
self.value = value |
|
self.created_at = time.time() |
|
self.last_accessed = self.created_at |
|
self.access_count = 0 |
|
self.ttl = ttl |
|
|
|
def is_expired(self) -> bool: |
|
"""Check if cache entry has expired.""" |
|
if self.ttl is None: |
|
return False |
|
return time.time() - self.created_at > self.ttl |
|
|
|
def touch(self) -> None: |
|
"""Update access time and count.""" |
|
self.last_accessed = time.time() |
|
self.access_count += 1 |
|
|
|
|
|
class CacheManager: |
|
"""Multi-level cache manager with memory and disk storage.""" |
|
|
|
def __init__(self, config: Dict[str, Any]): |
|
self.config = config |
|
self.cache_config = config.get("cache", {}) |
|
|
|
|
|
self.memory_cache_size = self.cache_config.get("embedding_cache_size", 10000) |
|
self.default_ttl = self.cache_config.get("cache_ttl", 3600) |
|
self.enable_disk_cache = self.cache_config.get("enable_disk_cache", True) |
|
self.cache_dir = Path(self.cache_config.get("cache_dir", "./cache")) |
|
|
|
|
|
self._memory_cache: OrderedDict[str, CacheEntry] = OrderedDict() |
|
self._lock = threading.RLock() |
|
|
|
|
|
if self.enable_disk_cache: |
|
self.cache_dir.mkdir(parents=True, exist_ok=True) |
|
self._disk_cache_index = self._load_disk_cache_index() |
|
else: |
|
self._disk_cache_index = {} |
|
|
|
|
|
self.stats = { |
|
"memory_hits": 0, |
|
"disk_hits": 0, |
|
"misses": 0, |
|
"evictions": 0, |
|
"disk_writes": 0, |
|
"disk_reads": 0, |
|
"errors": 0 |
|
} |
|
|
|
def get(self, key: str) -> Optional[Any]: |
|
"""Get value from cache.""" |
|
with self._lock: |
|
|
|
if key in self._memory_cache: |
|
entry = self._memory_cache[key] |
|
|
|
if entry.is_expired(): |
|
del self._memory_cache[key] |
|
else: |
|
|
|
self._memory_cache.move_to_end(key) |
|
entry.touch() |
|
self.stats["memory_hits"] += 1 |
|
return entry.value |
|
|
|
|
|
if self.enable_disk_cache and key in self._disk_cache_index: |
|
try: |
|
value = self._load_from_disk(key) |
|
if value is not None: |
|
|
|
self._set_memory_cache(key, value, self.default_ttl) |
|
self.stats["disk_hits"] += 1 |
|
return value |
|
except Exception as e: |
|
self.stats["errors"] += 1 |
|
print(f"Error loading from disk cache: {e}") |
|
|
|
self.stats["misses"] += 1 |
|
return None |
|
|
|
def set(self, key: str, value: Any, ttl: Optional[int] = None) -> None: |
|
"""Set value in cache.""" |
|
ttl = ttl if ttl is not None else self.default_ttl |
|
|
|
with self._lock: |
|
|
|
self._set_memory_cache(key, value, ttl) |
|
|
|
|
|
if self.enable_disk_cache: |
|
try: |
|
self._save_to_disk(key, value, ttl) |
|
except Exception as e: |
|
self.stats["errors"] += 1 |
|
print(f"Error saving to disk cache: {e}") |
|
|
|
def _set_memory_cache(self, key: str, value: Any, ttl: Optional[int]) -> None: |
|
"""Set value in memory cache with LRU eviction.""" |
|
|
|
if key in self._memory_cache: |
|
del self._memory_cache[key] |
|
|
|
|
|
self._memory_cache[key] = CacheEntry(value, ttl) |
|
|
|
|
|
while len(self._memory_cache) > self.memory_cache_size: |
|
oldest_key = next(iter(self._memory_cache)) |
|
del self._memory_cache[oldest_key] |
|
self.stats["evictions"] += 1 |
|
|
|
def delete(self, key: str) -> bool: |
|
"""Delete key from cache.""" |
|
with self._lock: |
|
removed = False |
|
|
|
|
|
if key in self._memory_cache: |
|
del self._memory_cache[key] |
|
removed = True |
|
|
|
|
|
if self.enable_disk_cache and key in self._disk_cache_index: |
|
try: |
|
cache_file = self._get_cache_file_path(key) |
|
if cache_file.exists(): |
|
cache_file.unlink() |
|
del self._disk_cache_index[key] |
|
self._save_disk_cache_index() |
|
removed = True |
|
except Exception as e: |
|
self.stats["errors"] += 1 |
|
print(f"Error deleting from disk cache: {e}") |
|
|
|
return removed |
|
|
|
def clear(self) -> None: |
|
"""Clear all cache entries.""" |
|
with self._lock: |
|
|
|
self._memory_cache.clear() |
|
|
|
|
|
if self.enable_disk_cache: |
|
try: |
|
for cache_file in self.cache_dir.glob("cache_*.pkl"): |
|
cache_file.unlink() |
|
self._disk_cache_index.clear() |
|
self._save_disk_cache_index() |
|
except Exception as e: |
|
self.stats["errors"] += 1 |
|
print(f"Error clearing disk cache: {e}") |
|
|
|
def _save_to_disk(self, key: str, value: Any, ttl: Optional[int]) -> None: |
|
"""Save value to disk cache.""" |
|
cache_file = self._get_cache_file_path(key) |
|
|
|
cache_data = { |
|
"value": value, |
|
"created_at": time.time(), |
|
"ttl": ttl |
|
} |
|
|
|
with open(cache_file, "wb") as f: |
|
pickle.dump(cache_data, f, protocol=pickle.HIGHEST_PROTOCOL) |
|
|
|
|
|
self._disk_cache_index[key] = { |
|
"file": cache_file.name, |
|
"created_at": cache_data["created_at"], |
|
"ttl": ttl |
|
} |
|
|
|
self._save_disk_cache_index() |
|
self.stats["disk_writes"] += 1 |
|
|
|
def _load_from_disk(self, key: str) -> Optional[Any]: |
|
"""Load value from disk cache.""" |
|
if key not in self._disk_cache_index: |
|
return None |
|
|
|
cache_info = self._disk_cache_index[key] |
|
|
|
|
|
if cache_info["ttl"] is not None: |
|
age = time.time() - cache_info["created_at"] |
|
if age > cache_info["ttl"]: |
|
|
|
self.delete(key) |
|
return None |
|
|
|
cache_file = self.cache_dir / cache_info["file"] |
|
|
|
if not cache_file.exists(): |
|
|
|
del self._disk_cache_index[key] |
|
self._save_disk_cache_index() |
|
return None |
|
|
|
try: |
|
with open(cache_file, "rb") as f: |
|
cache_data = pickle.load(f) |
|
|
|
self.stats["disk_reads"] += 1 |
|
return cache_data["value"] |
|
|
|
except Exception: |
|
|
|
try: |
|
cache_file.unlink() |
|
del self._disk_cache_index[key] |
|
self._save_disk_cache_index() |
|
except Exception: |
|
pass |
|
return None |
|
|
|
def _get_cache_file_path(self, key: str) -> Path: |
|
"""Get cache file path for key.""" |
|
|
|
key_hash = hashlib.md5(key.encode()).hexdigest() |
|
return self.cache_dir / f"cache_{key_hash}.pkl" |
|
|
|
def _load_disk_cache_index(self) -> Dict[str, Dict[str, Any]]: |
|
"""Load disk cache index.""" |
|
index_file = self.cache_dir / "cache_index.json" |
|
|
|
if not index_file.exists(): |
|
return {} |
|
|
|
try: |
|
with open(index_file, "r") as f: |
|
return json.load(f) |
|
except Exception as e: |
|
print(f"Error loading cache index: {e}") |
|
return {} |
|
|
|
def _save_disk_cache_index(self) -> None: |
|
"""Save disk cache index.""" |
|
index_file = self.cache_dir / "cache_index.json" |
|
|
|
try: |
|
with open(index_file, "w") as f: |
|
json.dump(self._disk_cache_index, f, indent=2) |
|
except Exception as e: |
|
print(f"Error saving cache index: {e}") |
|
|
|
def cleanup_expired(self) -> int: |
|
"""Remove expired entries from cache.""" |
|
removed_count = 0 |
|
|
|
with self._lock: |
|
|
|
expired_keys = [] |
|
for key, entry in self._memory_cache.items(): |
|
if entry.is_expired(): |
|
expired_keys.append(key) |
|
|
|
for key in expired_keys: |
|
del self._memory_cache[key] |
|
removed_count += 1 |
|
|
|
|
|
if self.enable_disk_cache: |
|
expired_disk_keys = [] |
|
current_time = time.time() |
|
|
|
for key, info in self._disk_cache_index.items(): |
|
if info["ttl"] is not None: |
|
age = current_time - info["created_at"] |
|
if age > info["ttl"]: |
|
expired_disk_keys.append(key) |
|
|
|
for key in expired_disk_keys: |
|
try: |
|
cache_file = self.cache_dir / self._disk_cache_index[key]["file"] |
|
if cache_file.exists(): |
|
cache_file.unlink() |
|
del self._disk_cache_index[key] |
|
removed_count += 1 |
|
except Exception as e: |
|
print(f"Error removing expired cache file: {e}") |
|
|
|
if expired_disk_keys: |
|
self._save_disk_cache_index() |
|
|
|
return removed_count |
|
|
|
def get_size_info(self) -> Dict[str, Any]: |
|
"""Get cache size information.""" |
|
with self._lock: |
|
memory_size = len(self._memory_cache) |
|
disk_size = len(self._disk_cache_index) |
|
|
|
|
|
disk_usage = 0 |
|
if self.enable_disk_cache and self.cache_dir.exists(): |
|
try: |
|
for cache_file in self.cache_dir.glob("cache_*.pkl"): |
|
disk_usage += cache_file.stat().st_size |
|
except Exception: |
|
pass |
|
|
|
return { |
|
"memory_entries": memory_size, |
|
"disk_entries": disk_size, |
|
"disk_usage_bytes": disk_usage, |
|
"disk_usage_mb": disk_usage / (1024 * 1024), |
|
"memory_limit": self.memory_cache_size |
|
} |
|
|
|
def get_stats(self) -> Dict[str, Any]: |
|
"""Get cache statistics.""" |
|
stats = self.stats.copy() |
|
|
|
total_requests = stats["memory_hits"] + stats["disk_hits"] + stats["misses"] |
|
if total_requests > 0: |
|
stats["hit_rate"] = (stats["memory_hits"] + stats["disk_hits"]) / total_requests |
|
stats["memory_hit_rate"] = stats["memory_hits"] / total_requests |
|
stats["disk_hit_rate"] = stats["disk_hits"] / total_requests |
|
else: |
|
stats["hit_rate"] = 0 |
|
stats["memory_hit_rate"] = 0 |
|
stats["disk_hit_rate"] = 0 |
|
|
|
stats["size_info"] = self.get_size_info() |
|
|
|
return stats |
|
|
|
def optimize(self) -> Dict[str, int]: |
|
"""Optimize cache by removing expired entries and compacting.""" |
|
expired_removed = self.cleanup_expired() |
|
|
|
|
|
|
|
return { |
|
"expired_removed": expired_removed |
|
} |