""" threshold_optimizer.py - Precision Threshold Optimization Module Finds optimal detection and ReID thresholds with fine-grained tuning """ import numpy as np from typing import Dict, Tuple, List, Optional from dataclasses import dataclass from collections import defaultdict import json @dataclass class OptimizationMetrics: """Metrics for threshold evaluation""" threshold: float true_positives: int = 0 false_positives: int = 0 false_negatives: int = 0 precision: float = 0.0 recall: float = 0.0 f1_score: float = 0.0 unique_dogs: int = 0 total_matches: int = 0 avg_confidence: float = 0.0 class ThresholdOptimizer: """ Finds optimal thresholds with fine-grained precision Handles narrow optimal ranges (like 76-78%) """ def __init__(self): # Store performance metrics for different thresholds self.reid_metrics: Dict[float, OptimizationMetrics] = {} self.detection_metrics: Dict[float, OptimizationMetrics] = {} # Track actual performance data self.reid_samples = [] # (similarity, was_correct_match) self.detection_samples = [] # (confidence, was_valid_detection) # Optimal thresholds found self.optimal_reid_threshold = 0.7 self.optimal_detection_threshold = 0.45 # Fine-grained search parameters self.reid_search_range = (0.65, 0.85) # Focus on typical good range self.reid_search_step = 0.005 # 0.5% steps for precision self.detection_search_range = (0.3, 0.7) self.detection_search_step = 0.01 def add_reid_sample(self, similarity: float, matched_dog_id: int, true_dog_id: Optional[int] = None, auto_label: bool = True): """ Add a ReID decision sample Args: similarity: Similarity score matched_dog_id: Dog ID that was matched true_dog_id: Ground truth dog ID (if known) auto_label: Auto-label based on similarity distribution """ if true_dog_id is not None: was_correct = (matched_dog_id == true_dog_id) elif auto_label: # Auto-labeling: very high/low similarities are likely correct was_correct = similarity > 0.85 or similarity < 0.4 else: was_correct = None self.reid_samples.append({ 'similarity': similarity, 'matched_id': matched_dog_id, 'correct': was_correct, 'timestamp': len(self.reid_samples) }) # Trigger optimization if enough samples if len(self.reid_samples) % 50 == 0: self.optimize_reid_threshold() def add_detection_sample(self, confidence: float, was_valid: bool = True): """Add a detection sample""" self.detection_samples.append({ 'confidence': confidence, 'valid': was_valid, 'timestamp': len(self.detection_samples) }) def optimize_reid_threshold(self) -> float: """ Find optimal ReID threshold with fine precision """ if len(self.reid_samples) < 20: return self.optimal_reid_threshold # Extract similarities similarities = [s['similarity'] for s in self.reid_samples] # Dynamic range adjustment based on data data_min = np.percentile(similarities, 5) data_max = np.percentile(similarities, 95) # Focus search on relevant range search_min = max(self.reid_search_range[0], data_min) search_max = min(self.reid_search_range[1], data_max) # Fine-grained grid search thresholds = np.arange(search_min, search_max, self.reid_search_step) best_score = -1 best_threshold = self.optimal_reid_threshold best_metrics = None for threshold in thresholds: metrics = self._evaluate_reid_threshold(threshold) score = self._calculate_optimization_score(metrics) if score > best_score: best_score = score best_threshold = threshold best_metrics = metrics # Check if we're in a narrow optimal range if best_metrics: narrow_range = self._find_narrow_optimal_range(thresholds, best_threshold) if narrow_range: # Use center of narrow range for stability best_threshold = (narrow_range[0] + narrow_range[1]) / 2 self.optimal_reid_threshold = best_threshold return best_threshold def _evaluate_reid_threshold(self, threshold: float) -> OptimizationMetrics: """Evaluate performance at specific threshold""" metrics = OptimizationMetrics(threshold=threshold) # Group samples by time windows to evaluate consistency window_size = 20 dog_assignments = defaultdict(list) for i, sample in enumerate(self.reid_samples): window_idx = i // window_size sim = sample['similarity'] # Would this be a match at this threshold? would_match = sim >= threshold if would_match: dog_assignments[window_idx].append(sample['matched_id']) metrics.total_matches += 1 # Estimate correctness if sample['correct'] is not None: if sample['correct']: metrics.true_positives += 1 else: metrics.false_positives += 1 elif sim > threshold + 0.1: # High confidence match metrics.true_positives += 1 elif sim < threshold + 0.02: # Borderline match metrics.false_positives += 1 else: if sample['correct'] is False: metrics.false_negatives += 1 # Calculate unique dogs and fragmentation all_dogs = set() for dogs in dog_assignments.values(): all_dogs.update(dogs) metrics.unique_dogs = len(all_dogs) # Calculate precision/recall/F1 if metrics.true_positives + metrics.false_positives > 0: metrics.precision = metrics.true_positives / (metrics.true_positives + metrics.false_positives) if metrics.true_positives + metrics.false_negatives > 0: metrics.recall = metrics.true_positives / (metrics.true_positives + metrics.false_negatives) if metrics.precision + metrics.recall > 0: metrics.f1_score = 2 * (metrics.precision * metrics.recall) / (metrics.precision + metrics.recall) # Average confidence of matches match_sims = [s['similarity'] for s in self.reid_samples if s['similarity'] >= threshold] metrics.avg_confidence = np.mean(match_sims) if match_sims else 0 return metrics def _calculate_optimization_score(self, metrics: OptimizationMetrics) -> float: """Custom scoring for narrow-margin optimization""" # Balance between precision and recall with emphasis on precision base_score = (metrics.f1_score * 0.4 + metrics.precision * 0.4 + metrics.recall * 0.2) # Penalty for too many unique dogs (over-segmentation) expected_dogs = len(self.reid_samples) / 50 # Rough estimate if metrics.unique_dogs > expected_dogs * 1.5: base_score *= 0.9 # Bonus for high average confidence if metrics.avg_confidence > 0.8: base_score *= 1.1 # Penalty for being too close to boundaries (unstable) if abs(metrics.threshold - 0.5) > 0.4: # Too extreme base_score *= 0.95 return base_score def _find_narrow_optimal_range(self, thresholds: np.ndarray, best_threshold: float, tolerance: float = 0.02) -> Optional[Tuple[float, float]]: """Detect if optimal performance is in a narrow range""" best_score = self._calculate_optimization_score( self._evaluate_reid_threshold(best_threshold) ) # Find range where score is within tolerance of best min_threshold = best_threshold max_threshold = best_threshold for t in thresholds: score = self._calculate_optimization_score( self._evaluate_reid_threshold(t) ) if score >= best_score * (1 - tolerance): min_threshold = min(min_threshold, t) max_threshold = max(max_threshold, t) # Check if range is narrow (< 5% difference) if max_threshold - min_threshold < 0.05: return (min_threshold, max_threshold) return None def get_optimization_report(self) -> Dict: """Get detailed optimization report""" # Analyze similarity distribution if self.reid_samples: similarities = [s['similarity'] for s in self.reid_samples] # Detect bimodal distribution hist, bins = np.histogram(similarities, bins=30) peaks = [] for i in range(1, len(hist)-1): if hist[i] > hist[i-1] and hist[i] > hist[i+1]: peaks.append(bins[i]) # Find valley between peaks valley = None if len(peaks) >= 2: valley = (peaks[0] + peaks[1]) / 2 else: similarities = [] peaks = [] valley = None return { 'optimal_reid_threshold': self.optimal_reid_threshold, 'optimal_detection_threshold': self.optimal_detection_threshold, 'reid_samples': len(self.reid_samples), 'detection_samples': len(self.detection_samples), 'similarity_distribution': { 'min': min(similarities) if similarities else 0, 'max': max(similarities) if similarities else 0, 'mean': np.mean(similarities) if similarities else 0, 'std': np.std(similarities) if similarities else 0, 'percentiles': { '25%': np.percentile(similarities, 25) if similarities else 0, '50%': np.percentile(similarities, 50) if similarities else 0, '75%': np.percentile(similarities, 75) if similarities else 0, '90%': np.percentile(similarities, 90) if similarities else 0 } }, 'distribution_analysis': { 'peaks': peaks, 'valley': valley, 'bimodal': len(peaks) >= 2 }, 'current_performance': self._evaluate_reid_threshold(self.optimal_reid_threshold).__dict__ if self.reid_samples else None } def suggest_threshold_adjustment(self) -> str: """Provide human-readable threshold suggestions""" report = self.get_optimization_report() suggestions = [] # Check if threshold is in narrow range if self.reid_samples: narrow_range = self._find_narrow_optimal_range( np.arange(0.65, 0.85, 0.005), self.optimal_reid_threshold ) if narrow_range: suggestions.append( f"✓ Optimal ReID threshold is in narrow range: " f"{narrow_range[0]:.1%} - {narrow_range[1]:.1%}. " f"Using {self.optimal_reid_threshold:.1%}" ) else: suggestions.append( f"ReID threshold: {self.optimal_reid_threshold:.1%} " f"(clear optimum found)" ) # Check distribution if report['distribution_analysis']['bimodal']: valley = report['distribution_analysis']['valley'] if valley: if abs(valley - self.optimal_reid_threshold) > 0.05: suggestions.append( f"⚠ Natural separation point at {valley:.1%} " f"differs from optimal {self.optimal_reid_threshold:.1%}" ) # Performance insights if report['current_performance']: perf = report['current_performance'] if perf['precision'] < 0.8: suggestions.append( f"⚠ Precision is {perf['precision']:.1%}. " f"Consider increasing threshold slightly." ) if perf['recall'] < 0.7: suggestions.append( f"⚠ Recall is {perf['recall']:.1%}. " f"Consider decreasing threshold slightly." ) return "\n".join(suggestions) if suggestions else "Collecting data..."