Spaces:
Sleeping
Sleeping
| """ | |
| threshold_optimizer.py - Precision Threshold Optimization Module | |
| Finds optimal detection and ReID thresholds with fine-grained tuning | |
| """ | |
| import numpy as np | |
| from typing import Dict, Tuple, List, Optional | |
| from dataclasses import dataclass | |
| from collections import defaultdict | |
| import json | |
| class OptimizationMetrics: | |
| """Metrics for threshold evaluation""" | |
| threshold: float | |
| true_positives: int = 0 | |
| false_positives: int = 0 | |
| false_negatives: int = 0 | |
| precision: float = 0.0 | |
| recall: float = 0.0 | |
| f1_score: float = 0.0 | |
| unique_dogs: int = 0 | |
| total_matches: int = 0 | |
| avg_confidence: float = 0.0 | |
| class ThresholdOptimizer: | |
| """ | |
| Finds optimal thresholds with fine-grained precision | |
| Handles narrow optimal ranges (like 76-78%) | |
| """ | |
| def __init__(self): | |
| # Store performance metrics for different thresholds | |
| self.reid_metrics: Dict[float, OptimizationMetrics] = {} | |
| self.detection_metrics: Dict[float, OptimizationMetrics] = {} | |
| # Track actual performance data | |
| self.reid_samples = [] # (similarity, was_correct_match) | |
| self.detection_samples = [] # (confidence, was_valid_detection) | |
| # Optimal thresholds found | |
| self.optimal_reid_threshold = 0.7 | |
| self.optimal_detection_threshold = 0.45 | |
| # Fine-grained search parameters | |
| self.reid_search_range = (0.65, 0.85) # Focus on typical good range | |
| self.reid_search_step = 0.005 # 0.5% steps for precision | |
| self.detection_search_range = (0.3, 0.7) | |
| self.detection_search_step = 0.01 | |
| def add_reid_sample(self, | |
| similarity: float, | |
| matched_dog_id: int, | |
| true_dog_id: Optional[int] = None, | |
| auto_label: bool = True): | |
| """ | |
| Add a ReID decision sample | |
| Args: | |
| similarity: Similarity score | |
| matched_dog_id: Dog ID that was matched | |
| true_dog_id: Ground truth dog ID (if known) | |
| auto_label: Auto-label based on similarity distribution | |
| """ | |
| if true_dog_id is not None: | |
| was_correct = (matched_dog_id == true_dog_id) | |
| elif auto_label: | |
| # Auto-labeling: very high/low similarities are likely correct | |
| was_correct = similarity > 0.85 or similarity < 0.4 | |
| else: | |
| was_correct = None | |
| self.reid_samples.append({ | |
| 'similarity': similarity, | |
| 'matched_id': matched_dog_id, | |
| 'correct': was_correct, | |
| 'timestamp': len(self.reid_samples) | |
| }) | |
| # Trigger optimization if enough samples | |
| if len(self.reid_samples) % 50 == 0: | |
| self.optimize_reid_threshold() | |
| def add_detection_sample(self, confidence: float, was_valid: bool = True): | |
| """Add a detection sample""" | |
| self.detection_samples.append({ | |
| 'confidence': confidence, | |
| 'valid': was_valid, | |
| 'timestamp': len(self.detection_samples) | |
| }) | |
| def optimize_reid_threshold(self) -> float: | |
| """ | |
| Find optimal ReID threshold with fine precision | |
| """ | |
| if len(self.reid_samples) < 20: | |
| return self.optimal_reid_threshold | |
| # Extract similarities | |
| similarities = [s['similarity'] for s in self.reid_samples] | |
| # Dynamic range adjustment based on data | |
| data_min = np.percentile(similarities, 5) | |
| data_max = np.percentile(similarities, 95) | |
| # Focus search on relevant range | |
| search_min = max(self.reid_search_range[0], data_min) | |
| search_max = min(self.reid_search_range[1], data_max) | |
| # Fine-grained grid search | |
| thresholds = np.arange(search_min, search_max, self.reid_search_step) | |
| best_score = -1 | |
| best_threshold = self.optimal_reid_threshold | |
| best_metrics = None | |
| for threshold in thresholds: | |
| metrics = self._evaluate_reid_threshold(threshold) | |
| score = self._calculate_optimization_score(metrics) | |
| if score > best_score: | |
| best_score = score | |
| best_threshold = threshold | |
| best_metrics = metrics | |
| # Check if we're in a narrow optimal range | |
| if best_metrics: | |
| narrow_range = self._find_narrow_optimal_range(thresholds, best_threshold) | |
| if narrow_range: | |
| # Use center of narrow range for stability | |
| best_threshold = (narrow_range[0] + narrow_range[1]) / 2 | |
| self.optimal_reid_threshold = best_threshold | |
| return best_threshold | |
| def _evaluate_reid_threshold(self, threshold: float) -> OptimizationMetrics: | |
| """Evaluate performance at specific threshold""" | |
| metrics = OptimizationMetrics(threshold=threshold) | |
| # Group samples by time windows to evaluate consistency | |
| window_size = 20 | |
| dog_assignments = defaultdict(list) | |
| for i, sample in enumerate(self.reid_samples): | |
| window_idx = i // window_size | |
| sim = sample['similarity'] | |
| # Would this be a match at this threshold? | |
| would_match = sim >= threshold | |
| if would_match: | |
| dog_assignments[window_idx].append(sample['matched_id']) | |
| metrics.total_matches += 1 | |
| # Estimate correctness | |
| if sample['correct'] is not None: | |
| if sample['correct']: | |
| metrics.true_positives += 1 | |
| else: | |
| metrics.false_positives += 1 | |
| elif sim > threshold + 0.1: # High confidence match | |
| metrics.true_positives += 1 | |
| elif sim < threshold + 0.02: # Borderline match | |
| metrics.false_positives += 1 | |
| else: | |
| if sample['correct'] is False: | |
| metrics.false_negatives += 1 | |
| # Calculate unique dogs and fragmentation | |
| all_dogs = set() | |
| for dogs in dog_assignments.values(): | |
| all_dogs.update(dogs) | |
| metrics.unique_dogs = len(all_dogs) | |
| # Calculate precision/recall/F1 | |
| if metrics.true_positives + metrics.false_positives > 0: | |
| metrics.precision = metrics.true_positives / (metrics.true_positives + metrics.false_positives) | |
| if metrics.true_positives + metrics.false_negatives > 0: | |
| metrics.recall = metrics.true_positives / (metrics.true_positives + metrics.false_negatives) | |
| if metrics.precision + metrics.recall > 0: | |
| metrics.f1_score = 2 * (metrics.precision * metrics.recall) / (metrics.precision + metrics.recall) | |
| # Average confidence of matches | |
| match_sims = [s['similarity'] for s in self.reid_samples if s['similarity'] >= threshold] | |
| metrics.avg_confidence = np.mean(match_sims) if match_sims else 0 | |
| return metrics | |
| def _calculate_optimization_score(self, metrics: OptimizationMetrics) -> float: | |
| """Custom scoring for narrow-margin optimization""" | |
| # Balance between precision and recall with emphasis on precision | |
| base_score = (metrics.f1_score * 0.4 + | |
| metrics.precision * 0.4 + | |
| metrics.recall * 0.2) | |
| # Penalty for too many unique dogs (over-segmentation) | |
| expected_dogs = len(self.reid_samples) / 50 # Rough estimate | |
| if metrics.unique_dogs > expected_dogs * 1.5: | |
| base_score *= 0.9 | |
| # Bonus for high average confidence | |
| if metrics.avg_confidence > 0.8: | |
| base_score *= 1.1 | |
| # Penalty for being too close to boundaries (unstable) | |
| if abs(metrics.threshold - 0.5) > 0.4: # Too extreme | |
| base_score *= 0.95 | |
| return base_score | |
| def _find_narrow_optimal_range(self, | |
| thresholds: np.ndarray, | |
| best_threshold: float, | |
| tolerance: float = 0.02) -> Optional[Tuple[float, float]]: | |
| """Detect if optimal performance is in a narrow range""" | |
| best_score = self._calculate_optimization_score( | |
| self._evaluate_reid_threshold(best_threshold) | |
| ) | |
| # Find range where score is within tolerance of best | |
| min_threshold = best_threshold | |
| max_threshold = best_threshold | |
| for t in thresholds: | |
| score = self._calculate_optimization_score( | |
| self._evaluate_reid_threshold(t) | |
| ) | |
| if score >= best_score * (1 - tolerance): | |
| min_threshold = min(min_threshold, t) | |
| max_threshold = max(max_threshold, t) | |
| # Check if range is narrow (< 5% difference) | |
| if max_threshold - min_threshold < 0.05: | |
| return (min_threshold, max_threshold) | |
| return None | |
| def get_optimization_report(self) -> Dict: | |
| """Get detailed optimization report""" | |
| # Analyze similarity distribution | |
| if self.reid_samples: | |
| similarities = [s['similarity'] for s in self.reid_samples] | |
| # Detect bimodal distribution | |
| hist, bins = np.histogram(similarities, bins=30) | |
| peaks = [] | |
| for i in range(1, len(hist)-1): | |
| if hist[i] > hist[i-1] and hist[i] > hist[i+1]: | |
| peaks.append(bins[i]) | |
| # Find valley between peaks | |
| valley = None | |
| if len(peaks) >= 2: | |
| valley = (peaks[0] + peaks[1]) / 2 | |
| else: | |
| similarities = [] | |
| peaks = [] | |
| valley = None | |
| return { | |
| 'optimal_reid_threshold': self.optimal_reid_threshold, | |
| 'optimal_detection_threshold': self.optimal_detection_threshold, | |
| 'reid_samples': len(self.reid_samples), | |
| 'detection_samples': len(self.detection_samples), | |
| 'similarity_distribution': { | |
| 'min': min(similarities) if similarities else 0, | |
| 'max': max(similarities) if similarities else 0, | |
| 'mean': np.mean(similarities) if similarities else 0, | |
| 'std': np.std(similarities) if similarities else 0, | |
| 'percentiles': { | |
| '25%': np.percentile(similarities, 25) if similarities else 0, | |
| '50%': np.percentile(similarities, 50) if similarities else 0, | |
| '75%': np.percentile(similarities, 75) if similarities else 0, | |
| '90%': np.percentile(similarities, 90) if similarities else 0 | |
| } | |
| }, | |
| 'distribution_analysis': { | |
| 'peaks': peaks, | |
| 'valley': valley, | |
| 'bimodal': len(peaks) >= 2 | |
| }, | |
| 'current_performance': self._evaluate_reid_threshold(self.optimal_reid_threshold).__dict__ if self.reid_samples else None | |
| } | |
| def suggest_threshold_adjustment(self) -> str: | |
| """Provide human-readable threshold suggestions""" | |
| report = self.get_optimization_report() | |
| suggestions = [] | |
| # Check if threshold is in narrow range | |
| if self.reid_samples: | |
| narrow_range = self._find_narrow_optimal_range( | |
| np.arange(0.65, 0.85, 0.005), | |
| self.optimal_reid_threshold | |
| ) | |
| if narrow_range: | |
| suggestions.append( | |
| f"✓ Optimal ReID threshold is in narrow range: " | |
| f"{narrow_range[0]:.1%} - {narrow_range[1]:.1%}. " | |
| f"Using {self.optimal_reid_threshold:.1%}" | |
| ) | |
| else: | |
| suggestions.append( | |
| f"ReID threshold: {self.optimal_reid_threshold:.1%} " | |
| f"(clear optimum found)" | |
| ) | |
| # Check distribution | |
| if report['distribution_analysis']['bimodal']: | |
| valley = report['distribution_analysis']['valley'] | |
| if valley: | |
| if abs(valley - self.optimal_reid_threshold) > 0.05: | |
| suggestions.append( | |
| f"⚠ Natural separation point at {valley:.1%} " | |
| f"differs from optimal {self.optimal_reid_threshold:.1%}" | |
| ) | |
| # Performance insights | |
| if report['current_performance']: | |
| perf = report['current_performance'] | |
| if perf['precision'] < 0.8: | |
| suggestions.append( | |
| f"⚠ Precision is {perf['precision']:.1%}. " | |
| f"Consider increasing threshold slightly." | |
| ) | |
| if perf['recall'] < 0.7: | |
| suggestions.append( | |
| f"⚠ Recall is {perf['recall']:.1%}. " | |
| f"Consider decreasing threshold slightly." | |
| ) | |
| return "\n".join(suggestions) if suggestions else "Collecting data..." |