Spaces:
Sleeping
Sleeping
File size: 11,796 Bytes
4fb694e f6982b8 4fb694e ed48ba6 e53ac82 ed48ba6 e53ac82 8d924f5 e53ac82 8d924f5 e53ac82 e7a2627 f6982b8 ed48ba6 e96db4b f6982b8 4ac5e63 e96db4b e53ac82 4ac5e63 f6982b8 e96db4b 4ac5e63 f6982b8 8d924f5 f6982b8 8d924f5 ed48ba6 f6982b8 8d924f5 f6982b8 8d924f5 f6982b8 8d924f5 f6982b8 8d924f5 e7a2627 8d924f5 f6982b8 e53ac82 ed48ba6 e53ac82 8d924f5 e53ac82 8d924f5 e53ac82 8d924f5 e7a2627 ed48ba6 8d924f5 f6982b8 e53ac82 f6982b8 e53ac82 f6982b8 e96db4b f6982b8 e96db4b f6982b8 e96db4b f6982b8 8d924f5 e7a2627 f6982b8 e53ac82 f6982b8 e53ac82 e7a2627 f6982b8 e96db4b f6982b8 e96db4b 8d924f5 f6982b8 e96db4b f6982b8 e7a2627 f6982b8 e7a2627 e96db4b e7a2627 e53ac82 f6982b8 e96db4b f6982b8 e7a2627 8d924f5 e96db4b e7a2627 e96db4b e7a2627 e53ac82 8d924f5 e96db4b f6982b8 e96db4b f6982b8 e53ac82 8d924f5 f6982b8 ed48ba6 8d924f5 e7a2627 e96db4b f6982b8 8d924f5 e7a2627 ed48ba6 e96db4b e7a2627 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 |
"""
reid.py - Improved Single-Model Dog Re-Identification System
Enhanced with better feature matching and temporal consistency
"""
import numpy as np
import cv2
import torch
import torch.nn as nn
import torchvision.models as models
import torchvision.transforms as transforms
from sklearn.metrics.pairwise import cosine_similarity
from typing import Dict, List, Optional, Tuple
import time
from dataclasses import dataclass
import warnings
warnings.filterwarnings('ignore')
@dataclass
class DogFeatures:
"""Container for dog features"""
features: np.ndarray
confidence: float = 0
quality_score: float = 0.5
frame_num: int = 0
class SingleModelReID:
"""Improved ReID with better matching strategies"""
def __init__(self, device: str = 'cuda'):
self.device = device if torch.cuda.is_available() else 'cpu'
# Adaptive thresholds
self.primary_threshold = 0.45 # Main matching threshold
self.secondary_threshold = 0.35 # Lower threshold for recent tracks
self.new_dog_threshold = 0.55 # Higher threshold to create new dog
# In-memory dog database
self.dog_database = {} # dog_id -> list of features
self.next_dog_id = 1
# Track to dog mapping with confidence history
self.track_to_dog = {}
self.track_confidence = {} # track_id -> list of confidences
# Temporal consistency
self.recent_matches = {} # dog_id -> last_frame_seen
self.current_frame = 0
try:
# Initialize ResNet50
self.model = models.resnet50(weights='IMAGENET1K_V1')
self.model = nn.Sequential(*list(self.model.children())[:-1])
self.model.to(self.device).eval()
# Enhanced preprocessing with augmentation options
self.transform = transforms.Compose([
transforms.ToPILImage(),
transforms.Resize((256, 256)), # Slightly larger for better features
transforms.CenterCrop(224),
transforms.ToTensor(),
transforms.Normalize(
mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225]
)
])
print("Enhanced ResNet50 ReID initialized")
except Exception as e:
print(f"ResNet50 init error: {e}")
self.model = None
def extract_features(self, image: np.ndarray) -> Optional[np.ndarray]:
"""Extract ResNet50 features with quality check"""
if self.model is None or image is None or image.size == 0:
return None
# Quality check - skip blurry images
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
laplacian_var = cv2.Laplacian(gray, cv2.CV_64F).var()
if laplacian_var < 50: # Too blurry
return None
try:
img_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
img_tensor = self.transform(img_rgb).unsqueeze(0).to(self.device)
with torch.no_grad():
features = self.model(img_tensor)
features = features.squeeze().cpu().numpy()
# L2 normalize
features = features / (np.linalg.norm(features) + 1e-7)
return features
except:
return None
def match_or_register(self, track) -> Tuple[int, float]:
"""Enhanced matching with temporal consistency"""
self.current_frame += 1
# Get latest good quality detection
detection = None
for det in reversed(track.detections[-5:]): # Check last 5 detections
if det.image_crop is not None:
detection = det
break
if detection is None:
return 0, 0.0
# Check track confidence history
if track.track_id not in self.track_confidence:
self.track_confidence[track.track_id] = []
# If track already has consistent dog ID
if track.track_id in self.track_to_dog:
existing_dog_id = self.track_to_dog[track.track_id]
# Update features periodically (not every frame to save memory)
if self.current_frame % 3 == 0:
features = self.extract_features(detection.image_crop)
if features is not None:
if existing_dog_id in self.dog_database:
# Add with frame number for temporal reference
self.dog_database[existing_dog_id].append(
DogFeatures(
features=features,
confidence=detection.confidence,
frame_num=self.current_frame
)
)
# Keep only recent and high-quality features
self._prune_features(existing_dog_id)
# Update recent matches
self.recent_matches[existing_dog_id] = self.current_frame
# Calculate running confidence
recent_conf = np.mean(self.track_confidence[track.track_id][-10:]) if self.track_confidence[track.track_id] else detection.confidence
return existing_dog_id, recent_conf
# Extract features for matching
features = self.extract_features(detection.image_crop)
if features is None:
return 0, 0.0
# Find best match with adaptive thresholds
best_dog_id = None
best_score = -1.0
match_details = {}
for dog_id, feature_list in self.dog_database.items():
if not feature_list:
continue
# Check temporal proximity (bonus for recently seen dogs)
recency_bonus = 0.0
if dog_id in self.recent_matches:
frames_since = self.current_frame - self.recent_matches[dog_id]
if frames_since < 30: # Within 1 second at 30fps
recency_bonus = 0.05 * (1 - frames_since / 30)
# Weighted similarity based on feature quality and recency
similarities = []
weights = []
for dog_feat in feature_list[-8:]: # Use last 8 features
sim = cosine_similarity(
features.reshape(1, -1),
dog_feat.features.reshape(1, -1)
)[0, 0]
# Weight by confidence and recency
weight = dog_feat.confidence
if hasattr(dog_feat, 'frame_num'):
age = self.current_frame - dog_feat.frame_num
weight *= np.exp(-age / 100) # Exponential decay
similarities.append(sim)
weights.append(weight)
# Weighted average
if weights:
weights = np.array(weights)
weights = weights / weights.sum()
avg_similarity = np.average(similarities, weights=weights) + recency_bonus
else:
avg_similarity = np.mean(similarities) + recency_bonus
match_details[dog_id] = avg_similarity
if avg_similarity > best_score:
best_score = avg_similarity
best_dog_id = dog_id
# Adaptive threshold based on context
threshold = self.primary_threshold
if best_dog_id and best_dog_id in self.recent_matches:
# Lower threshold for recently seen dogs
if self.current_frame - self.recent_matches[best_dog_id] < 60:
threshold = self.secondary_threshold
# Decision logic
if best_dog_id is not None and best_score >= threshold:
# Match found - but verify it's not too different
if best_score < self.new_dog_threshold or len(match_details) < 3:
# Accept match
self.dog_database[best_dog_id].append(
DogFeatures(
features=features,
confidence=detection.confidence,
frame_num=self.current_frame
)
)
self._prune_features(best_dog_id)
self.track_to_dog[track.track_id] = best_dog_id
self.track_confidence[track.track_id].append(best_score)
self.recent_matches[best_dog_id] = self.current_frame
return best_dog_id, best_score
else:
# Score in ambiguous range - check if we should create new dog
second_best_score = sorted(match_details.values(), reverse=True)[1] if len(match_details) > 1 else 0
if best_score - second_best_score < 0.1:
# Too similar to multiple dogs - likely new dog
pass # Fall through to create new dog
# Register new dog
new_dog_id = self.next_dog_id
self.next_dog_id += 1
self.dog_database[new_dog_id] = [
DogFeatures(
features=features,
confidence=detection.confidence,
frame_num=self.current_frame
)
]
self.track_to_dog[track.track_id] = new_dog_id
self.track_confidence[track.track_id] = [1.0]
self.recent_matches[new_dog_id] = self.current_frame
return new_dog_id, 1.0
def _prune_features(self, dog_id: int):
"""Keep only best recent features to save memory"""
if dog_id not in self.dog_database:
return
features = self.dog_database[dog_id]
if len(features) > 15:
# Sort by confidence and recency
features.sort(key=lambda x: x.confidence + (0.001 * x.frame_num), reverse=True)
# Keep top 10
self.dog_database[dog_id] = features[:10]
def match_or_register_all(self, track) -> Dict:
"""Compatible interface"""
dog_id, confidence = self.match_or_register(track)
return {
'ResNet50': {
'dog_id': dog_id,
'confidence': confidence,
'processing_time': 0
}
}
def reset_all(self):
"""Reset all temporary data"""
self.dog_database.clear()
self.track_to_dog.clear()
self.track_confidence.clear()
self.recent_matches.clear()
self.next_dog_id = 1
self.current_frame = 0
def set_all_thresholds(self, threshold: float):
"""Set similarity thresholds adaptively"""
self.primary_threshold = max(0.3, min(0.9, threshold))
self.secondary_threshold = max(0.25, self.primary_threshold - 0.1)
self.new_dog_threshold = min(0.9, self.primary_threshold + 0.1)
def get_statistics(self) -> Dict:
"""Get statistics"""
return {
'ResNet50': {
'total_dogs': self.next_dog_id - 1,
'dogs_in_database': len(self.dog_database),
'active_dogs': len([d for d, f in self.recent_matches.items()
if self.current_frame - f < 150]),
'avg_features_per_dog': np.mean([len(f) for f in self.dog_database.values()]) if self.dog_database else 0,
'threshold': self.primary_threshold
}
}
# Compatibility aliases
ImprovedResNet50ReID = SingleModelReID
DualModelReID = SingleModelReID |