File size: 6,601 Bytes
1049797 6e3dbdb 1049797 6e3dbdb 1049797 6e3dbdb 1049797 6e3dbdb 1049797 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 |
# models/performance_optimizer.py
import functools
import time
import threading
from typing import Dict, Any, Optional
from .logging_config import logger
class PerformanceOptimizer:
"""Performance optimization utilities for the property verification system"""
def __init__(self):
self._cache = {}
self._cache_lock = threading.Lock()
self._cache_ttl = 300 # 5 minutes cache TTL
self._cache_timestamps = {}
def cache_result(self, key: str, result: Any, ttl: int = None) -> None:
"""Cache a result with TTL"""
with self._cache_lock:
self._cache[key] = result
self._cache_timestamps[key] = time.time() + (ttl or self._cache_ttl)
def get_cached_result(self, key: str) -> Optional[Any]:
"""Get cached result if not expired"""
with self._cache_lock:
if key in self._cache:
if time.time() < self._cache_timestamps.get(key, 0):
return self._cache[key]
else:
# Remove expired cache entry
del self._cache[key]
if key in self._cache_timestamps:
del self._cache_timestamps[key]
return None
def clear_cache(self) -> None:
"""Clear all cached results"""
with self._cache_lock:
self._cache.clear()
self._cache_timestamps.clear()
def get_cache_stats(self) -> Dict[str, Any]:
"""Get cache statistics"""
with self._cache_lock:
return {
'cache_size': len(self._cache),
'cache_keys': list(self._cache.keys()),
'cache_ttl': self._cache_ttl
}
# Global performance optimizer instance
performance_optimizer = PerformanceOptimizer()
def timed_function(func):
"""Decorator to time function execution"""
@functools.wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
try:
result = func(*args, **kwargs)
execution_time = time.time() - start_time
logger.info(f"{func.__name__} executed in {execution_time:.2f} seconds")
return result
except Exception as e:
execution_time = time.time() - start_time
logger.error(f"{func.__name__} failed after {execution_time:.2f} seconds: {str(e)}")
raise
return wrapper
def cached_function(ttl: int = 300):
"""Decorator to cache function results"""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
# Create cache key from function name and arguments
cache_key = f"{func.__name__}:{hash(str(args) + str(sorted(kwargs.items())))}"
# Try to get cached result
cached_result = performance_optimizer.get_cached_result(cache_key)
if cached_result is not None:
logger.debug(f"Cache hit for {func.__name__}")
return cached_result
# Execute function and cache result
result = func(*args, **kwargs)
performance_optimizer.cache_result(cache_key, result, ttl)
logger.debug(f"Cached result for {func.__name__}")
return result
return wrapper
return decorator
def optimize_model_loading():
"""Optimize model loading for better performance"""
try:
from .model_loader import load_model
# Pre-load only essential models in background threads
import concurrent.futures
import threading
def load_model_async(model_name):
try:
model = load_model(model_name)
logger.info(f"Pre-loaded model: {model_name}")
return model
except Exception as e:
logger.warning(f"Failed to pre-load model {model_name}: {str(e)}")
return None
# Load only essential models in parallel with timeout
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: # Reduced workers
model_names = [
"zero-shot-classification", # Most important
"summarization" # Second most important
]
futures = {executor.submit(load_model_async, name): name for name in model_names}
for future in concurrent.futures.as_completed(futures, timeout=30): # 30 second timeout
model_name = futures[future]
try:
future.result()
except Exception as e:
logger.error(f"Error pre-loading {model_name}: {str(e)}")
logger.info("Model pre-loading optimization completed")
except Exception as e:
logger.error(f"Error in model loading optimization: {str(e)}")
def optimize_image_processing():
"""Optimize image processing for better performance"""
try:
from PIL import Image
import io
# Set PIL to use optimized settings
Image.MAX_IMAGE_PIXELS = None # Allow large images
# Optimize JPEG quality for faster processing
def optimize_image(img, max_size=1024):
"""Optimize image for faster processing"""
if max(img.size) > max_size:
img.thumbnail((max_size, max_size), Image.Resampling.LANCZOS)
return img
return optimize_image
except Exception as e:
logger.error(f"Error in image processing optimization: {str(e)}")
return lambda img, max_size=1024: img
def get_performance_metrics():
"""Get current performance metrics"""
import psutil
import os
try:
process = psutil.Process(os.getpid())
memory_info = process.memory_info()
return {
'memory_usage_mb': memory_info.rss / 1024 / 1024,
'cpu_percent': process.cpu_percent(),
'cache_stats': performance_optimizer.get_cache_stats(),
'thread_count': threading.active_count()
}
except Exception as e:
logger.error(f"Error getting performance metrics: {str(e)}")
return {
'memory_usage_mb': 0,
'cpu_percent': 0,
'cache_stats': {},
'thread_count': 0
} |