File size: 12,808 Bytes
11d9dfb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
"""
Multi-level caching system with memory and disk storage, TTL, and LRU eviction.
"""

import os
import pickle
import time
import threading
from pathlib import Path
from typing import Any, Dict, Optional, Set, Tuple
from collections import OrderedDict
import hashlib
import json

from .error_handler import ResourceError


class CacheEntry:
    """Represents a cache entry with metadata."""
    
    def __init__(self, value: Any, ttl: Optional[int] = None):
        self.value = value
        self.created_at = time.time()
        self.last_accessed = self.created_at
        self.access_count = 0
        self.ttl = ttl
    
    def is_expired(self) -> bool:
        """Check if cache entry has expired."""
        if self.ttl is None:
            return False
        return time.time() - self.created_at > self.ttl
    
    def touch(self) -> None:
        """Update access time and count."""
        self.last_accessed = time.time()
        self.access_count += 1


class CacheManager:
    """Multi-level cache manager with memory and disk storage."""
    
    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.cache_config = config.get("cache", {})
        
        # Configuration
        self.memory_cache_size = self.cache_config.get("embedding_cache_size", 10000)
        self.default_ttl = self.cache_config.get("cache_ttl", 3600)
        self.enable_disk_cache = self.cache_config.get("enable_disk_cache", True)
        self.cache_dir = Path(self.cache_config.get("cache_dir", "./cache"))
        
        # Memory cache (LRU)
        self._memory_cache: OrderedDict[str, CacheEntry] = OrderedDict()
        self._lock = threading.RLock()
        
        # Disk cache setup
        if self.enable_disk_cache:
            self.cache_dir.mkdir(parents=True, exist_ok=True)
            self._disk_cache_index = self._load_disk_cache_index()
        else:
            self._disk_cache_index = {}
        
        # Statistics
        self.stats = {
            "memory_hits": 0,
            "disk_hits": 0,
            "misses": 0,
            "evictions": 0,
            "disk_writes": 0,
            "disk_reads": 0,
            "errors": 0
        }
    
    def get(self, key: str) -> Optional[Any]:
        """Get value from cache."""
        with self._lock:
            # Check memory cache first
            if key in self._memory_cache:
                entry = self._memory_cache[key]
                
                if entry.is_expired():
                    del self._memory_cache[key]
                else:
                    # Move to end (most recently used)
                    self._memory_cache.move_to_end(key)
                    entry.touch()
                    self.stats["memory_hits"] += 1
                    return entry.value
            
            # Check disk cache
            if self.enable_disk_cache and key in self._disk_cache_index:
                try:
                    value = self._load_from_disk(key)
                    if value is not None:
                        # Add to memory cache
                        self._set_memory_cache(key, value, self.default_ttl)
                        self.stats["disk_hits"] += 1
                        return value
                except Exception as e:
                    self.stats["errors"] += 1
                    print(f"Error loading from disk cache: {e}")
            
            self.stats["misses"] += 1
            return None
    
    def set(self, key: str, value: Any, ttl: Optional[int] = None) -> None:
        """Set value in cache."""
        ttl = ttl if ttl is not None else self.default_ttl
        
        with self._lock:
            # Set in memory cache
            self._set_memory_cache(key, value, ttl)
            
            # Set in disk cache if enabled
            if self.enable_disk_cache:
                try:
                    self._save_to_disk(key, value, ttl)
                except Exception as e:
                    self.stats["errors"] += 1
                    print(f"Error saving to disk cache: {e}")
    
    def _set_memory_cache(self, key: str, value: Any, ttl: Optional[int]) -> None:
        """Set value in memory cache with LRU eviction."""
        # Remove if already exists
        if key in self._memory_cache:
            del self._memory_cache[key]
        
        # Add new entry
        self._memory_cache[key] = CacheEntry(value, ttl)
        
        # Enforce size limit
        while len(self._memory_cache) > self.memory_cache_size:
            oldest_key = next(iter(self._memory_cache))
            del self._memory_cache[oldest_key]
            self.stats["evictions"] += 1
    
    def delete(self, key: str) -> bool:
        """Delete key from cache."""
        with self._lock:
            removed = False
            
            # Remove from memory cache
            if key in self._memory_cache:
                del self._memory_cache[key]
                removed = True
            
            # Remove from disk cache
            if self.enable_disk_cache and key in self._disk_cache_index:
                try:
                    cache_file = self._get_cache_file_path(key)
                    if cache_file.exists():
                        cache_file.unlink()
                    del self._disk_cache_index[key]
                    self._save_disk_cache_index()
                    removed = True
                except Exception as e:
                    self.stats["errors"] += 1
                    print(f"Error deleting from disk cache: {e}")
            
            return removed
    
    def clear(self) -> None:
        """Clear all cache entries."""
        with self._lock:
            # Clear memory cache
            self._memory_cache.clear()
            
            # Clear disk cache
            if self.enable_disk_cache:
                try:
                    for cache_file in self.cache_dir.glob("cache_*.pkl"):
                        cache_file.unlink()
                    self._disk_cache_index.clear()
                    self._save_disk_cache_index()
                except Exception as e:
                    self.stats["errors"] += 1
                    print(f"Error clearing disk cache: {e}")
    
    def _save_to_disk(self, key: str, value: Any, ttl: Optional[int]) -> None:
        """Save value to disk cache."""
        cache_file = self._get_cache_file_path(key)
        
        cache_data = {
            "value": value,
            "created_at": time.time(),
            "ttl": ttl
        }
        
        with open(cache_file, "wb") as f:
            pickle.dump(cache_data, f, protocol=pickle.HIGHEST_PROTOCOL)
        
        # Update index
        self._disk_cache_index[key] = {
            "file": cache_file.name,
            "created_at": cache_data["created_at"],
            "ttl": ttl
        }
        
        self._save_disk_cache_index()
        self.stats["disk_writes"] += 1
    
    def _load_from_disk(self, key: str) -> Optional[Any]:
        """Load value from disk cache."""
        if key not in self._disk_cache_index:
            return None
        
        cache_info = self._disk_cache_index[key]
        
        # Check if expired
        if cache_info["ttl"] is not None:
            age = time.time() - cache_info["created_at"]
            if age > cache_info["ttl"]:
                # Remove expired entry
                self.delete(key)
                return None
        
        cache_file = self.cache_dir / cache_info["file"]
        
        if not cache_file.exists():
            # Remove from index if file doesn't exist
            del self._disk_cache_index[key]
            self._save_disk_cache_index()
            return None
        
        try:
            with open(cache_file, "rb") as f:
                cache_data = pickle.load(f)
            
            self.stats["disk_reads"] += 1
            return cache_data["value"]
            
        except Exception:
            # Remove corrupted cache file
            try:
                cache_file.unlink()
                del self._disk_cache_index[key]
                self._save_disk_cache_index()
            except Exception:
                pass
            return None
    
    def _get_cache_file_path(self, key: str) -> Path:
        """Get cache file path for key."""
        # Hash key to create valid filename
        key_hash = hashlib.md5(key.encode()).hexdigest()
        return self.cache_dir / f"cache_{key_hash}.pkl"
    
    def _load_disk_cache_index(self) -> Dict[str, Dict[str, Any]]:
        """Load disk cache index."""
        index_file = self.cache_dir / "cache_index.json"
        
        if not index_file.exists():
            return {}
        
        try:
            with open(index_file, "r") as f:
                return json.load(f)
        except Exception as e:
            print(f"Error loading cache index: {e}")
            return {}
    
    def _save_disk_cache_index(self) -> None:
        """Save disk cache index."""
        index_file = self.cache_dir / "cache_index.json"
        
        try:
            with open(index_file, "w") as f:
                json.dump(self._disk_cache_index, f, indent=2)
        except Exception as e:
            print(f"Error saving cache index: {e}")
    
    def cleanup_expired(self) -> int:
        """Remove expired entries from cache."""
        removed_count = 0
        
        with self._lock:
            # Clean memory cache
            expired_keys = []
            for key, entry in self._memory_cache.items():
                if entry.is_expired():
                    expired_keys.append(key)
            
            for key in expired_keys:
                del self._memory_cache[key]
                removed_count += 1
            
            # Clean disk cache
            if self.enable_disk_cache:
                expired_disk_keys = []
                current_time = time.time()
                
                for key, info in self._disk_cache_index.items():
                    if info["ttl"] is not None:
                        age = current_time - info["created_at"]
                        if age > info["ttl"]:
                            expired_disk_keys.append(key)
                
                for key in expired_disk_keys:
                    try:
                        cache_file = self.cache_dir / self._disk_cache_index[key]["file"]
                        if cache_file.exists():
                            cache_file.unlink()
                        del self._disk_cache_index[key]
                        removed_count += 1
                    except Exception as e:
                        print(f"Error removing expired cache file: {e}")
                
                if expired_disk_keys:
                    self._save_disk_cache_index()
        
        return removed_count
    
    def get_size_info(self) -> Dict[str, Any]:
        """Get cache size information."""
        with self._lock:
            memory_size = len(self._memory_cache)
            disk_size = len(self._disk_cache_index)
            
            # Calculate disk usage
            disk_usage = 0
            if self.enable_disk_cache and self.cache_dir.exists():
                try:
                    for cache_file in self.cache_dir.glob("cache_*.pkl"):
                        disk_usage += cache_file.stat().st_size
                except Exception:
                    pass
            
            return {
                "memory_entries": memory_size,
                "disk_entries": disk_size,
                "disk_usage_bytes": disk_usage,
                "disk_usage_mb": disk_usage / (1024 * 1024),
                "memory_limit": self.memory_cache_size
            }
    
    def get_stats(self) -> Dict[str, Any]:
        """Get cache statistics."""
        stats = self.stats.copy()
        
        total_requests = stats["memory_hits"] + stats["disk_hits"] + stats["misses"]
        if total_requests > 0:
            stats["hit_rate"] = (stats["memory_hits"] + stats["disk_hits"]) / total_requests
            stats["memory_hit_rate"] = stats["memory_hits"] / total_requests
            stats["disk_hit_rate"] = stats["disk_hits"] / total_requests
        else:
            stats["hit_rate"] = 0
            stats["memory_hit_rate"] = 0
            stats["disk_hit_rate"] = 0
        
        stats["size_info"] = self.get_size_info()
        
        return stats
    
    def optimize(self) -> Dict[str, int]:
        """Optimize cache by removing expired entries and compacting."""
        expired_removed = self.cleanup_expired()
        
        # Could add more optimization like defragmenting disk cache
        
        return {
            "expired_removed": expired_removed
        }