Spaces:
Sleeping
Sleeping
| """ | |
| DeepSORT Tracker Implementation - Complete Fixed Version | |
| Uses Kalman filter for motion prediction + appearance features | |
| """ | |
| import numpy as np | |
| from scipy.optimize import linear_sum_assignment | |
| from scipy.spatial.distance import cosine | |
| from collections import deque | |
| from typing import List, Optional, Tuple | |
| import uuid | |
| from detection import Detection | |
| class KalmanFilter: | |
| """Kalman Filter for tracking with constant velocity model""" | |
| def __init__(self): | |
| self.F = np.eye(8) | |
| for i in range(4): | |
| self.F[i, i+4] = 1 | |
| self.H = np.eye(4, 8) | |
| self.Q = np.eye(8) | |
| self.Q[4:, 4:] *= 0.01 | |
| self.R = np.eye(4) * 10 | |
| self.P = np.eye(8) * 1000 | |
| self.x = np.zeros(8) | |
| def initiate(self, measurement: np.ndarray): | |
| self.x[:4] = measurement | |
| self.x[4:] = 0 | |
| def predict(self): | |
| self.x = self.F @ self.x | |
| self.P = self.F @ self.P @ self.F.T + self.Q | |
| return self.x[:4] | |
| def update(self, measurement: np.ndarray): | |
| y = measurement - self.H @ self.x | |
| S = self.H @ self.P @ self.H.T + self.R | |
| K = self.P @ self.H.T @ np.linalg.inv(S) | |
| self.x = self.x + K @ y | |
| self.P = (np.eye(8) - K @ self.H) @ self.P | |
| return self.x[:4] | |
| def get_state(self) -> np.ndarray: | |
| return self.x[:4] | |
| class DeepSORTTrack: | |
| """DeepSORT Track with Kalman filter and appearance features""" | |
| def __init__(self, detection: Detection, track_id: Optional[int] = None): | |
| self.track_id = track_id if track_id else self._generate_id() | |
| x1, y1, x2, y2 = detection.bbox | |
| cx = (x1 + x2) / 2 | |
| cy = (y1 + y2) / 2 | |
| w = x2 - x1 | |
| h = y2 - y1 | |
| self.kf = KalmanFilter() | |
| self.kf.initiate(np.array([cx, cy, w, h])) | |
| self.detections = [detection] | |
| self.confidence = detection.confidence | |
| self.hits = 1 | |
| self.age = 1 | |
| self.time_since_update = 0 | |
| self.state = 'tentative' | |
| self.features = deque(maxlen=100) | |
| if hasattr(detection, 'features') and detection.features is not None: | |
| self.features.append(detection.features) | |
| self.trajectory = deque(maxlen=30) | |
| self.trajectory.append((cx, cy)) | |
| self.avg_confidence = self.confidence | |
| self.consecutive_misses = 0 | |
| def _generate_id(self) -> int: | |
| return int(uuid.uuid4().int % 100000) | |
| def bbox(self) -> List[float]: | |
| cx, cy, w, h = self.kf.get_state() | |
| return [cx - w/2, cy - h/2, cx + w/2, cy + h/2] | |
| def predict(self): | |
| self.age += 1 | |
| self.time_since_update += 1 | |
| self.consecutive_misses += 1 | |
| self.kf.predict() | |
| def update(self, detection: Detection): | |
| x1, y1, x2, y2 = detection.bbox | |
| cx = (x1 + x2) / 2 | |
| cy = (y1 + y2) / 2 | |
| w = x2 - x1 | |
| h = y2 - y1 | |
| self.kf.update(np.array([cx, cy, w, h])) | |
| self.detections.append(detection) | |
| self.confidence = detection.confidence | |
| self.avg_confidence = 0.9 * self.avg_confidence + 0.1 * self.confidence | |
| self.hits += 1 | |
| self.time_since_update = 0 | |
| self.consecutive_misses = 0 | |
| if hasattr(detection, 'features') and detection.features is not None: | |
| self.features.append(detection.features) | |
| self.trajectory.append((cx, cy)) | |
| if self.state == 'tentative' and self.hits >= 3: | |
| self.state = 'confirmed' | |
| if len(self.detections) > 5: | |
| for old_det in self.detections[:-5]: | |
| if hasattr(old_det, 'image_crop'): | |
| old_det.image_crop = None | |
| self.detections = self.detections[-5:] | |
| def mark_missed(self): | |
| if self.state == 'confirmed': | |
| if self.consecutive_misses > 30 or self.time_since_update > 60: | |
| self.state = 'deleted' | |
| elif self.state == 'tentative': | |
| if self.consecutive_misses > 5: | |
| self.state = 'deleted' | |
| def get_feature(self) -> Optional[np.ndarray]: | |
| if not self.features: | |
| return None | |
| features_array = np.array(list(self.features)) | |
| weights = np.exp(np.linspace(-1, 0, len(features_array))) | |
| weights /= weights.sum() | |
| return np.average(features_array, axis=0, weights=weights) | |
| class DeepSORTTracker: | |
| """DeepSORT Tracker combining Kalman filter with appearance features""" | |
| def __init__(self, | |
| max_iou_distance: float = 0.7, | |
| max_age: int = 30, | |
| n_init: int = 3, | |
| nn_budget: int = 100, | |
| use_appearance: bool = True): | |
| self.max_iou_distance = max_iou_distance | |
| self.max_age = max_age | |
| self.n_init = n_init | |
| self.nn_budget = nn_budget | |
| self.use_appearance = use_appearance | |
| self.tracks: List[DeepSORTTrack] = [] | |
| self.track_id_count = 1 | |
| self.gating_threshold_iou = 0.3 | |
| self.gating_threshold_appearance = 0.5 | |
| def update(self, detections: List[Detection]) -> List[DeepSORTTrack]: | |
| for track in self.tracks: | |
| track.predict() | |
| matches, unmatched_tracks, unmatched_detections = self._match( | |
| detections, self.tracks | |
| ) | |
| for track_idx, det_idx in matches: | |
| self.tracks[track_idx].update(detections[det_idx]) | |
| for track_idx in unmatched_tracks: | |
| self.tracks[track_idx].mark_missed() | |
| for det_idx in unmatched_detections: | |
| self._initiate_track(detections[det_idx]) | |
| self.tracks = [t for t in self.tracks if t.state != 'deleted'] | |
| return [t for t in self.tracks if t.state == 'confirmed'] | |
| def _match(self, detections: List[Detection], tracks: List[DeepSORTTrack]) -> Tuple: | |
| if not tracks or not detections: | |
| return [], list(range(len(tracks))), list(range(len(detections))) | |
| confirmed_tracks = [i for i, t in enumerate(tracks) if t.state == 'confirmed'] | |
| unconfirmed_tracks = [i for i, t in enumerate(tracks) if t.state == 'tentative'] | |
| matches_a, unmatched_tracks_a, unmatched_detections = \ | |
| self._matching_cascade(detections, tracks, confirmed_tracks) | |
| iou_track_candidates = unconfirmed_tracks + unmatched_tracks_a | |
| remaining_detections = [detections[i] for i in unmatched_detections] | |
| matches_b, unmatched_tracks_b, unmatched_detections_b = \ | |
| self._match_iou(remaining_detections, tracks, iou_track_candidates) | |
| matches_b = [(t, unmatched_detections[d]) for t, d in matches_b] | |
| unmatched_detections = [unmatched_detections[i] for i in unmatched_detections_b] | |
| matches = matches_a + matches_b | |
| unmatched_tracks = unmatched_tracks_b | |
| return matches, unmatched_tracks, unmatched_detections | |
| def _matching_cascade(self, detections: List[Detection], | |
| tracks: List[DeepSORTTrack], | |
| track_indices: List[int]) -> Tuple: | |
| matches = [] | |
| unmatched_detections = list(range(len(detections))) | |
| for level in range(self.max_age): | |
| if len(unmatched_detections) == 0: | |
| break | |
| track_indices_l = [ | |
| k for k in track_indices | |
| if tracks[k].time_since_update == level | |
| ] | |
| if len(track_indices_l) == 0: | |
| continue | |
| detection_subset = [detections[i] for i in unmatched_detections] | |
| matches_l, _, unmatched_subset_indices = self._match_features_and_iou( | |
| detection_subset, | |
| tracks, | |
| track_indices_l | |
| ) | |
| matches_l_remapped = [(t, unmatched_detections[d]) for t, d in matches_l] | |
| matches.extend(matches_l_remapped) | |
| unmatched_detections = [unmatched_detections[i] for i in unmatched_subset_indices] | |
| unmatched_tracks = [ | |
| k for k in track_indices | |
| if k not in [t for t, _ in matches] | |
| ] | |
| return matches, unmatched_tracks, unmatched_detections | |
| def _match_features_and_iou(self, detections: List[Detection], | |
| tracks: List[DeepSORTTrack], | |
| track_indices: List[int]) -> Tuple: | |
| if not track_indices or not detections: | |
| return [], track_indices, list(range(len(detections))) | |
| cost_matrix = np.zeros((len(track_indices), len(detections))) | |
| for i, track_idx in enumerate(track_indices): | |
| track = tracks[track_idx] | |
| track_bbox = track.bbox | |
| track_feature = track.get_feature() | |
| for j, detection in enumerate(detections): | |
| det_bbox = detection.bbox | |
| iou = self._iou(track_bbox, det_bbox) | |
| iou_cost = 1 - iou | |
| if self.use_appearance and track_feature is not None: | |
| if hasattr(detection, 'features') and detection.features is not None: | |
| cosine_dist = cosine(track_feature, detection.features) | |
| appearance_cost = cosine_dist | |
| else: | |
| appearance_cost = 0.5 | |
| else: | |
| appearance_cost = 0.0 | |
| if self.use_appearance and track_feature is not None: | |
| cost = 0.5 * iou_cost + 0.5 * appearance_cost | |
| else: | |
| cost = iou_cost | |
| if iou < self.gating_threshold_iou: | |
| cost = 1e6 | |
| if self.use_appearance and appearance_cost > self.gating_threshold_appearance: | |
| cost = 1e6 | |
| cost_matrix[i, j] = cost | |
| row_indices, col_indices = linear_sum_assignment(cost_matrix) | |
| matches = [] | |
| unmatched_tracks = list(range(len(track_indices))) | |
| unmatched_detections = list(range(len(detections))) | |
| for row, col in zip(row_indices, col_indices): | |
| if cost_matrix[row, col] < 1e5: | |
| matches.append((track_indices[row], col)) | |
| unmatched_tracks.remove(row) | |
| unmatched_detections.remove(col) | |
| unmatched_tracks = [track_indices[i] for i in unmatched_tracks] | |
| return matches, unmatched_tracks, unmatched_detections | |
| def _match_iou(self, detections: List[Detection], | |
| tracks: List[DeepSORTTrack], | |
| track_indices: List[int]) -> Tuple: | |
| if not track_indices or not detections: | |
| return [], track_indices, list(range(len(detections))) | |
| iou_matrix = np.zeros((len(track_indices), len(detections))) | |
| for i, track_idx in enumerate(track_indices): | |
| track = tracks[track_idx] | |
| for j, detection in enumerate(detections): | |
| iou = self._iou(track.bbox, detection.bbox) | |
| iou_matrix[i, j] = 1 - iou | |
| row_indices, col_indices = linear_sum_assignment(iou_matrix) | |
| matches = [] | |
| unmatched_tracks = list(range(len(track_indices))) | |
| unmatched_detections = list(range(len(detections))) | |
| for row, col in zip(row_indices, col_indices): | |
| if iou_matrix[row, col] < self.max_iou_distance: | |
| matches.append((track_indices[row], col)) | |
| unmatched_tracks.remove(row) | |
| unmatched_detections.remove(col) | |
| unmatched_tracks = [track_indices[i] for i in unmatched_tracks] | |
| return matches, unmatched_tracks, unmatched_detections | |
| def _initiate_track(self, detection: Detection): | |
| new_track = DeepSORTTrack(detection, self.track_id_count) | |
| self.track_id_count += 1 | |
| self.tracks.append(new_track) | |
| def _iou(self, bbox1: List[float], bbox2: List[float]) -> float: | |
| try: | |
| x1 = max(bbox1[0], bbox2[0]) | |
| y1 = max(bbox1[1], bbox2[1]) | |
| x2 = min(bbox1[2], bbox2[2]) | |
| y2 = min(bbox1[3], bbox2[3]) | |
| if x2 < x1 or y2 < y1: | |
| return 0.0 | |
| intersection = (x2 - x1) * (y2 - y1) | |
| area1 = (bbox1[2] - bbox1[0]) * (bbox1[3] - bbox1[1]) | |
| area2 = (bbox2[2] - bbox2[0]) * (bbox2[3] - bbox2[1]) | |
| union = area1 + area2 - intersection | |
| return intersection / (union + 1e-6) | |
| except: | |
| return 0.0 | |
| def reset(self): | |
| self.tracks.clear() | |
| self.track_id_count = 1 | |
| print("DeepSORT tracker reset") | |
| SimpleTracker = DeepSORTTracker | |
| RobustTracker = DeepSORTTracker |