Spaces:
Sleeping
Sleeping
File size: 13,360 Bytes
d16001c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 |
"""
threshold_optimizer.py - Precision Threshold Optimization Module
Finds optimal detection and ReID thresholds with fine-grained tuning
"""
import numpy as np
from typing import Dict, Tuple, List, Optional
from dataclasses import dataclass
from collections import defaultdict
import json
@dataclass
class OptimizationMetrics:
"""Metrics for threshold evaluation"""
threshold: float
true_positives: int = 0
false_positives: int = 0
false_negatives: int = 0
precision: float = 0.0
recall: float = 0.0
f1_score: float = 0.0
unique_dogs: int = 0
total_matches: int = 0
avg_confidence: float = 0.0
class ThresholdOptimizer:
"""
Finds optimal thresholds with fine-grained precision
Handles narrow optimal ranges (like 76-78%)
"""
def __init__(self):
# Store performance metrics for different thresholds
self.reid_metrics: Dict[float, OptimizationMetrics] = {}
self.detection_metrics: Dict[float, OptimizationMetrics] = {}
# Track actual performance data
self.reid_samples = [] # (similarity, was_correct_match)
self.detection_samples = [] # (confidence, was_valid_detection)
# Optimal thresholds found
self.optimal_reid_threshold = 0.7
self.optimal_detection_threshold = 0.45
# Fine-grained search parameters
self.reid_search_range = (0.65, 0.85) # Focus on typical good range
self.reid_search_step = 0.005 # 0.5% steps for precision
self.detection_search_range = (0.3, 0.7)
self.detection_search_step = 0.01
def add_reid_sample(self,
similarity: float,
matched_dog_id: int,
true_dog_id: Optional[int] = None,
auto_label: bool = True):
"""
Add a ReID decision sample
Args:
similarity: Similarity score
matched_dog_id: Dog ID that was matched
true_dog_id: Ground truth dog ID (if known)
auto_label: Auto-label based on similarity distribution
"""
if true_dog_id is not None:
was_correct = (matched_dog_id == true_dog_id)
elif auto_label:
# Auto-labeling: very high/low similarities are likely correct
was_correct = similarity > 0.85 or similarity < 0.4
else:
was_correct = None
self.reid_samples.append({
'similarity': similarity,
'matched_id': matched_dog_id,
'correct': was_correct,
'timestamp': len(self.reid_samples)
})
# Trigger optimization if enough samples
if len(self.reid_samples) % 50 == 0:
self.optimize_reid_threshold()
def add_detection_sample(self, confidence: float, was_valid: bool = True):
"""Add a detection sample"""
self.detection_samples.append({
'confidence': confidence,
'valid': was_valid,
'timestamp': len(self.detection_samples)
})
def optimize_reid_threshold(self) -> float:
"""
Find optimal ReID threshold with fine precision
"""
if len(self.reid_samples) < 20:
return self.optimal_reid_threshold
# Extract similarities
similarities = [s['similarity'] for s in self.reid_samples]
# Dynamic range adjustment based on data
data_min = np.percentile(similarities, 5)
data_max = np.percentile(similarities, 95)
# Focus search on relevant range
search_min = max(self.reid_search_range[0], data_min)
search_max = min(self.reid_search_range[1], data_max)
# Fine-grained grid search
thresholds = np.arange(search_min, search_max, self.reid_search_step)
best_score = -1
best_threshold = self.optimal_reid_threshold
best_metrics = None
for threshold in thresholds:
metrics = self._evaluate_reid_threshold(threshold)
score = self._calculate_optimization_score(metrics)
if score > best_score:
best_score = score
best_threshold = threshold
best_metrics = metrics
# Check if we're in a narrow optimal range
if best_metrics:
narrow_range = self._find_narrow_optimal_range(thresholds, best_threshold)
if narrow_range:
# Use center of narrow range for stability
best_threshold = (narrow_range[0] + narrow_range[1]) / 2
self.optimal_reid_threshold = best_threshold
return best_threshold
def _evaluate_reid_threshold(self, threshold: float) -> OptimizationMetrics:
"""Evaluate performance at specific threshold"""
metrics = OptimizationMetrics(threshold=threshold)
# Group samples by time windows to evaluate consistency
window_size = 20
dog_assignments = defaultdict(list)
for i, sample in enumerate(self.reid_samples):
window_idx = i // window_size
sim = sample['similarity']
# Would this be a match at this threshold?
would_match = sim >= threshold
if would_match:
dog_assignments[window_idx].append(sample['matched_id'])
metrics.total_matches += 1
# Estimate correctness
if sample['correct'] is not None:
if sample['correct']:
metrics.true_positives += 1
else:
metrics.false_positives += 1
elif sim > threshold + 0.1: # High confidence match
metrics.true_positives += 1
elif sim < threshold + 0.02: # Borderline match
metrics.false_positives += 1
else:
if sample['correct'] is False:
metrics.false_negatives += 1
# Calculate unique dogs and fragmentation
all_dogs = set()
for dogs in dog_assignments.values():
all_dogs.update(dogs)
metrics.unique_dogs = len(all_dogs)
# Calculate precision/recall/F1
if metrics.true_positives + metrics.false_positives > 0:
metrics.precision = metrics.true_positives / (metrics.true_positives + metrics.false_positives)
if metrics.true_positives + metrics.false_negatives > 0:
metrics.recall = metrics.true_positives / (metrics.true_positives + metrics.false_negatives)
if metrics.precision + metrics.recall > 0:
metrics.f1_score = 2 * (metrics.precision * metrics.recall) / (metrics.precision + metrics.recall)
# Average confidence of matches
match_sims = [s['similarity'] for s in self.reid_samples if s['similarity'] >= threshold]
metrics.avg_confidence = np.mean(match_sims) if match_sims else 0
return metrics
def _calculate_optimization_score(self, metrics: OptimizationMetrics) -> float:
"""Custom scoring for narrow-margin optimization"""
# Balance between precision and recall with emphasis on precision
base_score = (metrics.f1_score * 0.4 +
metrics.precision * 0.4 +
metrics.recall * 0.2)
# Penalty for too many unique dogs (over-segmentation)
expected_dogs = len(self.reid_samples) / 50 # Rough estimate
if metrics.unique_dogs > expected_dogs * 1.5:
base_score *= 0.9
# Bonus for high average confidence
if metrics.avg_confidence > 0.8:
base_score *= 1.1
# Penalty for being too close to boundaries (unstable)
if abs(metrics.threshold - 0.5) > 0.4: # Too extreme
base_score *= 0.95
return base_score
def _find_narrow_optimal_range(self,
thresholds: np.ndarray,
best_threshold: float,
tolerance: float = 0.02) -> Optional[Tuple[float, float]]:
"""Detect if optimal performance is in a narrow range"""
best_score = self._calculate_optimization_score(
self._evaluate_reid_threshold(best_threshold)
)
# Find range where score is within tolerance of best
min_threshold = best_threshold
max_threshold = best_threshold
for t in thresholds:
score = self._calculate_optimization_score(
self._evaluate_reid_threshold(t)
)
if score >= best_score * (1 - tolerance):
min_threshold = min(min_threshold, t)
max_threshold = max(max_threshold, t)
# Check if range is narrow (< 5% difference)
if max_threshold - min_threshold < 0.05:
return (min_threshold, max_threshold)
return None
def get_optimization_report(self) -> Dict:
"""Get detailed optimization report"""
# Analyze similarity distribution
if self.reid_samples:
similarities = [s['similarity'] for s in self.reid_samples]
# Detect bimodal distribution
hist, bins = np.histogram(similarities, bins=30)
peaks = []
for i in range(1, len(hist)-1):
if hist[i] > hist[i-1] and hist[i] > hist[i+1]:
peaks.append(bins[i])
# Find valley between peaks
valley = None
if len(peaks) >= 2:
valley = (peaks[0] + peaks[1]) / 2
else:
similarities = []
peaks = []
valley = None
return {
'optimal_reid_threshold': self.optimal_reid_threshold,
'optimal_detection_threshold': self.optimal_detection_threshold,
'reid_samples': len(self.reid_samples),
'detection_samples': len(self.detection_samples),
'similarity_distribution': {
'min': min(similarities) if similarities else 0,
'max': max(similarities) if similarities else 0,
'mean': np.mean(similarities) if similarities else 0,
'std': np.std(similarities) if similarities else 0,
'percentiles': {
'25%': np.percentile(similarities, 25) if similarities else 0,
'50%': np.percentile(similarities, 50) if similarities else 0,
'75%': np.percentile(similarities, 75) if similarities else 0,
'90%': np.percentile(similarities, 90) if similarities else 0
}
},
'distribution_analysis': {
'peaks': peaks,
'valley': valley,
'bimodal': len(peaks) >= 2
},
'current_performance': self._evaluate_reid_threshold(self.optimal_reid_threshold).__dict__ if self.reid_samples else None
}
def suggest_threshold_adjustment(self) -> str:
"""Provide human-readable threshold suggestions"""
report = self.get_optimization_report()
suggestions = []
# Check if threshold is in narrow range
if self.reid_samples:
narrow_range = self._find_narrow_optimal_range(
np.arange(0.65, 0.85, 0.005),
self.optimal_reid_threshold
)
if narrow_range:
suggestions.append(
f"✓ Optimal ReID threshold is in narrow range: "
f"{narrow_range[0]:.1%} - {narrow_range[1]:.1%}. "
f"Using {self.optimal_reid_threshold:.1%}"
)
else:
suggestions.append(
f"ReID threshold: {self.optimal_reid_threshold:.1%} "
f"(clear optimum found)"
)
# Check distribution
if report['distribution_analysis']['bimodal']:
valley = report['distribution_analysis']['valley']
if valley:
if abs(valley - self.optimal_reid_threshold) > 0.05:
suggestions.append(
f"⚠ Natural separation point at {valley:.1%} "
f"differs from optimal {self.optimal_reid_threshold:.1%}"
)
# Performance insights
if report['current_performance']:
perf = report['current_performance']
if perf['precision'] < 0.8:
suggestions.append(
f"⚠ Precision is {perf['precision']:.1%}. "
f"Consider increasing threshold slightly."
)
if perf['recall'] < 0.7:
suggestions.append(
f"⚠ Recall is {perf['recall']:.1%}. "
f"Consider decreasing threshold slightly."
)
return "\n".join(suggestions) if suggestions else "Collecting data..." |