""" DeepSORT Tracker Implementation - Complete Fixed Version Uses Kalman filter for motion prediction + appearance features """ import numpy as np from scipy.optimize import linear_sum_assignment from scipy.spatial.distance import cosine from collections import deque from typing import List, Optional, Tuple import uuid from detection import Detection class KalmanFilter: """Kalman Filter for tracking with constant velocity model""" def __init__(self): self.F = np.eye(8) for i in range(4): self.F[i, i+4] = 1 self.H = np.eye(4, 8) self.Q = np.eye(8) self.Q[4:, 4:] *= 0.01 self.R = np.eye(4) * 10 self.P = np.eye(8) * 1000 self.x = np.zeros(8) def initiate(self, measurement: np.ndarray): self.x[:4] = measurement self.x[4:] = 0 def predict(self): self.x = self.F @ self.x self.P = self.F @ self.P @ self.F.T + self.Q return self.x[:4] def update(self, measurement: np.ndarray): y = measurement - self.H @ self.x S = self.H @ self.P @ self.H.T + self.R K = self.P @ self.H.T @ np.linalg.inv(S) self.x = self.x + K @ y self.P = (np.eye(8) - K @ self.H) @ self.P return self.x[:4] def get_state(self) -> np.ndarray: return self.x[:4] class DeepSORTTrack: """DeepSORT Track with Kalman filter and appearance features""" def __init__(self, detection: Detection, track_id: Optional[int] = None): self.track_id = track_id if track_id else self._generate_id() x1, y1, x2, y2 = detection.bbox cx = (x1 + x2) / 2 cy = (y1 + y2) / 2 w = x2 - x1 h = y2 - y1 self.kf = KalmanFilter() self.kf.initiate(np.array([cx, cy, w, h])) self.detections = [detection] self.confidence = detection.confidence self.hits = 1 self.age = 1 self.time_since_update = 0 self.state = 'tentative' self.features = deque(maxlen=100) if hasattr(detection, 'features') and detection.features is not None: self.features.append(detection.features) self.trajectory = deque(maxlen=30) self.trajectory.append((cx, cy)) self.avg_confidence = self.confidence self.consecutive_misses = 0 def _generate_id(self) -> int: return int(uuid.uuid4().int % 100000) @property def bbox(self) -> List[float]: cx, cy, w, h = self.kf.get_state() return [cx - w/2, cy - h/2, cx + w/2, cy + h/2] def predict(self): self.age += 1 self.time_since_update += 1 self.consecutive_misses += 1 self.kf.predict() def update(self, detection: Detection): x1, y1, x2, y2 = detection.bbox cx = (x1 + x2) / 2 cy = (y1 + y2) / 2 w = x2 - x1 h = y2 - y1 self.kf.update(np.array([cx, cy, w, h])) self.detections.append(detection) self.confidence = detection.confidence self.avg_confidence = 0.9 * self.avg_confidence + 0.1 * self.confidence self.hits += 1 self.time_since_update = 0 self.consecutive_misses = 0 if hasattr(detection, 'features') and detection.features is not None: self.features.append(detection.features) self.trajectory.append((cx, cy)) if self.state == 'tentative' and self.hits >= 3: self.state = 'confirmed' if len(self.detections) > 5: for old_det in self.detections[:-5]: if hasattr(old_det, 'image_crop'): old_det.image_crop = None self.detections = self.detections[-5:] def mark_missed(self): if self.state == 'confirmed': if self.consecutive_misses > 30 or self.time_since_update > 60: self.state = 'deleted' elif self.state == 'tentative': if self.consecutive_misses > 5: self.state = 'deleted' def get_feature(self) -> Optional[np.ndarray]: if not self.features: return None features_array = np.array(list(self.features)) weights = np.exp(np.linspace(-1, 0, len(features_array))) weights /= weights.sum() return np.average(features_array, axis=0, weights=weights) class DeepSORTTracker: """DeepSORT Tracker combining Kalman filter with appearance features""" def __init__(self, max_iou_distance: float = 0.7, max_age: int = 30, n_init: int = 3, nn_budget: int = 100, use_appearance: bool = True): self.max_iou_distance = max_iou_distance self.max_age = max_age self.n_init = n_init self.nn_budget = nn_budget self.use_appearance = use_appearance self.tracks: List[DeepSORTTrack] = [] self.track_id_count = 1 self.gating_threshold_iou = 0.3 self.gating_threshold_appearance = 0.5 def update(self, detections: List[Detection]) -> List[DeepSORTTrack]: for track in self.tracks: track.predict() matches, unmatched_tracks, unmatched_detections = self._match( detections, self.tracks ) for track_idx, det_idx in matches: self.tracks[track_idx].update(detections[det_idx]) for track_idx in unmatched_tracks: self.tracks[track_idx].mark_missed() for det_idx in unmatched_detections: self._initiate_track(detections[det_idx]) self.tracks = [t for t in self.tracks if t.state != 'deleted'] return [t for t in self.tracks if t.state == 'confirmed'] def _match(self, detections: List[Detection], tracks: List[DeepSORTTrack]) -> Tuple: if not tracks or not detections: return [], list(range(len(tracks))), list(range(len(detections))) confirmed_tracks = [i for i, t in enumerate(tracks) if t.state == 'confirmed'] unconfirmed_tracks = [i for i, t in enumerate(tracks) if t.state == 'tentative'] matches_a, unmatched_tracks_a, unmatched_detections = \ self._matching_cascade(detections, tracks, confirmed_tracks) iou_track_candidates = unconfirmed_tracks + unmatched_tracks_a remaining_detections = [detections[i] for i in unmatched_detections] matches_b, unmatched_tracks_b, unmatched_detections_b = \ self._match_iou(remaining_detections, tracks, iou_track_candidates) matches_b = [(t, unmatched_detections[d]) for t, d in matches_b] unmatched_detections = [unmatched_detections[i] for i in unmatched_detections_b] matches = matches_a + matches_b unmatched_tracks = unmatched_tracks_b return matches, unmatched_tracks, unmatched_detections def _matching_cascade(self, detections: List[Detection], tracks: List[DeepSORTTrack], track_indices: List[int]) -> Tuple: matches = [] unmatched_detections = list(range(len(detections))) for level in range(self.max_age): if len(unmatched_detections) == 0: break track_indices_l = [ k for k in track_indices if tracks[k].time_since_update == level ] if len(track_indices_l) == 0: continue detection_subset = [detections[i] for i in unmatched_detections] matches_l, _, unmatched_subset_indices = self._match_features_and_iou( detection_subset, tracks, track_indices_l ) matches_l_remapped = [(t, unmatched_detections[d]) for t, d in matches_l] matches.extend(matches_l_remapped) unmatched_detections = [unmatched_detections[i] for i in unmatched_subset_indices] unmatched_tracks = [ k for k in track_indices if k not in [t for t, _ in matches] ] return matches, unmatched_tracks, unmatched_detections def _match_features_and_iou(self, detections: List[Detection], tracks: List[DeepSORTTrack], track_indices: List[int]) -> Tuple: if not track_indices or not detections: return [], track_indices, list(range(len(detections))) cost_matrix = np.zeros((len(track_indices), len(detections))) for i, track_idx in enumerate(track_indices): track = tracks[track_idx] track_bbox = track.bbox track_feature = track.get_feature() for j, detection in enumerate(detections): det_bbox = detection.bbox iou = self._iou(track_bbox, det_bbox) iou_cost = 1 - iou if self.use_appearance and track_feature is not None: if hasattr(detection, 'features') and detection.features is not None: cosine_dist = cosine(track_feature, detection.features) appearance_cost = cosine_dist else: appearance_cost = 0.5 else: appearance_cost = 0.0 if self.use_appearance and track_feature is not None: cost = 0.5 * iou_cost + 0.5 * appearance_cost else: cost = iou_cost if iou < self.gating_threshold_iou: cost = 1e6 if self.use_appearance and appearance_cost > self.gating_threshold_appearance: cost = 1e6 cost_matrix[i, j] = cost row_indices, col_indices = linear_sum_assignment(cost_matrix) matches = [] unmatched_tracks = list(range(len(track_indices))) unmatched_detections = list(range(len(detections))) for row, col in zip(row_indices, col_indices): if cost_matrix[row, col] < 1e5: matches.append((track_indices[row], col)) unmatched_tracks.remove(row) unmatched_detections.remove(col) unmatched_tracks = [track_indices[i] for i in unmatched_tracks] return matches, unmatched_tracks, unmatched_detections def _match_iou(self, detections: List[Detection], tracks: List[DeepSORTTrack], track_indices: List[int]) -> Tuple: if not track_indices or not detections: return [], track_indices, list(range(len(detections))) iou_matrix = np.zeros((len(track_indices), len(detections))) for i, track_idx in enumerate(track_indices): track = tracks[track_idx] for j, detection in enumerate(detections): iou = self._iou(track.bbox, detection.bbox) iou_matrix[i, j] = 1 - iou row_indices, col_indices = linear_sum_assignment(iou_matrix) matches = [] unmatched_tracks = list(range(len(track_indices))) unmatched_detections = list(range(len(detections))) for row, col in zip(row_indices, col_indices): if iou_matrix[row, col] < self.max_iou_distance: matches.append((track_indices[row], col)) unmatched_tracks.remove(row) unmatched_detections.remove(col) unmatched_tracks = [track_indices[i] for i in unmatched_tracks] return matches, unmatched_tracks, unmatched_detections def _initiate_track(self, detection: Detection): new_track = DeepSORTTrack(detection, self.track_id_count) self.track_id_count += 1 self.tracks.append(new_track) def _iou(self, bbox1: List[float], bbox2: List[float]) -> float: try: x1 = max(bbox1[0], bbox2[0]) y1 = max(bbox1[1], bbox2[1]) x2 = min(bbox1[2], bbox2[2]) y2 = min(bbox1[3], bbox2[3]) if x2 < x1 or y2 < y1: return 0.0 intersection = (x2 - x1) * (y2 - y1) area1 = (bbox1[2] - bbox1[0]) * (bbox1[3] - bbox1[1]) area2 = (bbox2[2] - bbox2[0]) * (bbox2[3] - bbox2[1]) union = area1 + area2 - intersection return intersection / (union + 1e-6) except: return 0.0 def reset(self): self.tracks.clear() self.track_id_count = 1 print("DeepSORT tracker reset") SimpleTracker = DeepSORTTracker RobustTracker = DeepSORTTracker