Stray_Dogs / threshold_optimizer.py
mustafa2ak's picture
Create threshold_optimizer.py
d16001c verified
raw
history blame
13.4 kB
"""
threshold_optimizer.py - Precision Threshold Optimization Module
Finds optimal detection and ReID thresholds with fine-grained tuning
"""
import numpy as np
from typing import Dict, Tuple, List, Optional
from dataclasses import dataclass
from collections import defaultdict
import json
@dataclass
class OptimizationMetrics:
"""Metrics for threshold evaluation"""
threshold: float
true_positives: int = 0
false_positives: int = 0
false_negatives: int = 0
precision: float = 0.0
recall: float = 0.0
f1_score: float = 0.0
unique_dogs: int = 0
total_matches: int = 0
avg_confidence: float = 0.0
class ThresholdOptimizer:
"""
Finds optimal thresholds with fine-grained precision
Handles narrow optimal ranges (like 76-78%)
"""
def __init__(self):
# Store performance metrics for different thresholds
self.reid_metrics: Dict[float, OptimizationMetrics] = {}
self.detection_metrics: Dict[float, OptimizationMetrics] = {}
# Track actual performance data
self.reid_samples = [] # (similarity, was_correct_match)
self.detection_samples = [] # (confidence, was_valid_detection)
# Optimal thresholds found
self.optimal_reid_threshold = 0.7
self.optimal_detection_threshold = 0.45
# Fine-grained search parameters
self.reid_search_range = (0.65, 0.85) # Focus on typical good range
self.reid_search_step = 0.005 # 0.5% steps for precision
self.detection_search_range = (0.3, 0.7)
self.detection_search_step = 0.01
def add_reid_sample(self,
similarity: float,
matched_dog_id: int,
true_dog_id: Optional[int] = None,
auto_label: bool = True):
"""
Add a ReID decision sample
Args:
similarity: Similarity score
matched_dog_id: Dog ID that was matched
true_dog_id: Ground truth dog ID (if known)
auto_label: Auto-label based on similarity distribution
"""
if true_dog_id is not None:
was_correct = (matched_dog_id == true_dog_id)
elif auto_label:
# Auto-labeling: very high/low similarities are likely correct
was_correct = similarity > 0.85 or similarity < 0.4
else:
was_correct = None
self.reid_samples.append({
'similarity': similarity,
'matched_id': matched_dog_id,
'correct': was_correct,
'timestamp': len(self.reid_samples)
})
# Trigger optimization if enough samples
if len(self.reid_samples) % 50 == 0:
self.optimize_reid_threshold()
def add_detection_sample(self, confidence: float, was_valid: bool = True):
"""Add a detection sample"""
self.detection_samples.append({
'confidence': confidence,
'valid': was_valid,
'timestamp': len(self.detection_samples)
})
def optimize_reid_threshold(self) -> float:
"""
Find optimal ReID threshold with fine precision
"""
if len(self.reid_samples) < 20:
return self.optimal_reid_threshold
# Extract similarities
similarities = [s['similarity'] for s in self.reid_samples]
# Dynamic range adjustment based on data
data_min = np.percentile(similarities, 5)
data_max = np.percentile(similarities, 95)
# Focus search on relevant range
search_min = max(self.reid_search_range[0], data_min)
search_max = min(self.reid_search_range[1], data_max)
# Fine-grained grid search
thresholds = np.arange(search_min, search_max, self.reid_search_step)
best_score = -1
best_threshold = self.optimal_reid_threshold
best_metrics = None
for threshold in thresholds:
metrics = self._evaluate_reid_threshold(threshold)
score = self._calculate_optimization_score(metrics)
if score > best_score:
best_score = score
best_threshold = threshold
best_metrics = metrics
# Check if we're in a narrow optimal range
if best_metrics:
narrow_range = self._find_narrow_optimal_range(thresholds, best_threshold)
if narrow_range:
# Use center of narrow range for stability
best_threshold = (narrow_range[0] + narrow_range[1]) / 2
self.optimal_reid_threshold = best_threshold
return best_threshold
def _evaluate_reid_threshold(self, threshold: float) -> OptimizationMetrics:
"""Evaluate performance at specific threshold"""
metrics = OptimizationMetrics(threshold=threshold)
# Group samples by time windows to evaluate consistency
window_size = 20
dog_assignments = defaultdict(list)
for i, sample in enumerate(self.reid_samples):
window_idx = i // window_size
sim = sample['similarity']
# Would this be a match at this threshold?
would_match = sim >= threshold
if would_match:
dog_assignments[window_idx].append(sample['matched_id'])
metrics.total_matches += 1
# Estimate correctness
if sample['correct'] is not None:
if sample['correct']:
metrics.true_positives += 1
else:
metrics.false_positives += 1
elif sim > threshold + 0.1: # High confidence match
metrics.true_positives += 1
elif sim < threshold + 0.02: # Borderline match
metrics.false_positives += 1
else:
if sample['correct'] is False:
metrics.false_negatives += 1
# Calculate unique dogs and fragmentation
all_dogs = set()
for dogs in dog_assignments.values():
all_dogs.update(dogs)
metrics.unique_dogs = len(all_dogs)
# Calculate precision/recall/F1
if metrics.true_positives + metrics.false_positives > 0:
metrics.precision = metrics.true_positives / (metrics.true_positives + metrics.false_positives)
if metrics.true_positives + metrics.false_negatives > 0:
metrics.recall = metrics.true_positives / (metrics.true_positives + metrics.false_negatives)
if metrics.precision + metrics.recall > 0:
metrics.f1_score = 2 * (metrics.precision * metrics.recall) / (metrics.precision + metrics.recall)
# Average confidence of matches
match_sims = [s['similarity'] for s in self.reid_samples if s['similarity'] >= threshold]
metrics.avg_confidence = np.mean(match_sims) if match_sims else 0
return metrics
def _calculate_optimization_score(self, metrics: OptimizationMetrics) -> float:
"""Custom scoring for narrow-margin optimization"""
# Balance between precision and recall with emphasis on precision
base_score = (metrics.f1_score * 0.4 +
metrics.precision * 0.4 +
metrics.recall * 0.2)
# Penalty for too many unique dogs (over-segmentation)
expected_dogs = len(self.reid_samples) / 50 # Rough estimate
if metrics.unique_dogs > expected_dogs * 1.5:
base_score *= 0.9
# Bonus for high average confidence
if metrics.avg_confidence > 0.8:
base_score *= 1.1
# Penalty for being too close to boundaries (unstable)
if abs(metrics.threshold - 0.5) > 0.4: # Too extreme
base_score *= 0.95
return base_score
def _find_narrow_optimal_range(self,
thresholds: np.ndarray,
best_threshold: float,
tolerance: float = 0.02) -> Optional[Tuple[float, float]]:
"""Detect if optimal performance is in a narrow range"""
best_score = self._calculate_optimization_score(
self._evaluate_reid_threshold(best_threshold)
)
# Find range where score is within tolerance of best
min_threshold = best_threshold
max_threshold = best_threshold
for t in thresholds:
score = self._calculate_optimization_score(
self._evaluate_reid_threshold(t)
)
if score >= best_score * (1 - tolerance):
min_threshold = min(min_threshold, t)
max_threshold = max(max_threshold, t)
# Check if range is narrow (< 5% difference)
if max_threshold - min_threshold < 0.05:
return (min_threshold, max_threshold)
return None
def get_optimization_report(self) -> Dict:
"""Get detailed optimization report"""
# Analyze similarity distribution
if self.reid_samples:
similarities = [s['similarity'] for s in self.reid_samples]
# Detect bimodal distribution
hist, bins = np.histogram(similarities, bins=30)
peaks = []
for i in range(1, len(hist)-1):
if hist[i] > hist[i-1] and hist[i] > hist[i+1]:
peaks.append(bins[i])
# Find valley between peaks
valley = None
if len(peaks) >= 2:
valley = (peaks[0] + peaks[1]) / 2
else:
similarities = []
peaks = []
valley = None
return {
'optimal_reid_threshold': self.optimal_reid_threshold,
'optimal_detection_threshold': self.optimal_detection_threshold,
'reid_samples': len(self.reid_samples),
'detection_samples': len(self.detection_samples),
'similarity_distribution': {
'min': min(similarities) if similarities else 0,
'max': max(similarities) if similarities else 0,
'mean': np.mean(similarities) if similarities else 0,
'std': np.std(similarities) if similarities else 0,
'percentiles': {
'25%': np.percentile(similarities, 25) if similarities else 0,
'50%': np.percentile(similarities, 50) if similarities else 0,
'75%': np.percentile(similarities, 75) if similarities else 0,
'90%': np.percentile(similarities, 90) if similarities else 0
}
},
'distribution_analysis': {
'peaks': peaks,
'valley': valley,
'bimodal': len(peaks) >= 2
},
'current_performance': self._evaluate_reid_threshold(self.optimal_reid_threshold).__dict__ if self.reid_samples else None
}
def suggest_threshold_adjustment(self) -> str:
"""Provide human-readable threshold suggestions"""
report = self.get_optimization_report()
suggestions = []
# Check if threshold is in narrow range
if self.reid_samples:
narrow_range = self._find_narrow_optimal_range(
np.arange(0.65, 0.85, 0.005),
self.optimal_reid_threshold
)
if narrow_range:
suggestions.append(
f"✓ Optimal ReID threshold is in narrow range: "
f"{narrow_range[0]:.1%} - {narrow_range[1]:.1%}. "
f"Using {self.optimal_reid_threshold:.1%}"
)
else:
suggestions.append(
f"ReID threshold: {self.optimal_reid_threshold:.1%} "
f"(clear optimum found)"
)
# Check distribution
if report['distribution_analysis']['bimodal']:
valley = report['distribution_analysis']['valley']
if valley:
if abs(valley - self.optimal_reid_threshold) > 0.05:
suggestions.append(
f"⚠ Natural separation point at {valley:.1%} "
f"differs from optimal {self.optimal_reid_threshold:.1%}"
)
# Performance insights
if report['current_performance']:
perf = report['current_performance']
if perf['precision'] < 0.8:
suggestions.append(
f"⚠ Precision is {perf['precision']:.1%}. "
f"Consider increasing threshold slightly."
)
if perf['recall'] < 0.7:
suggestions.append(
f"⚠ Recall is {perf['recall']:.1%}. "
f"Consider decreasing threshold slightly."
)
return "\n".join(suggestions) if suggestions else "Collecting data..."