mustafa2ak commited on
Commit
1f6d97d
·
verified ·
1 Parent(s): fd5e5c8

Update tracking.py

Browse files
Files changed (1) hide show
  1. tracking.py +375 -479
tracking.py CHANGED
@@ -1,524 +1,440 @@
1
  """
2
- Enhanced Tracking with Track Validation and Improved Accuracy (Enhancement 7)
 
3
  """
4
  import numpy as np
5
- from typing import List, Optional, Tuple, Dict
6
  from scipy.optimize import linear_sum_assignment
 
7
  from collections import deque
 
8
  import uuid
9
  from detection import Detection
10
- import warnings
11
- warnings.filterwarnings('ignore')
12
 
13
- class Track:
14
- """Enhanced track with validation and quality metrics"""
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
15
  def __init__(self, detection: Detection, track_id: Optional[int] = None):
16
  self.track_id = track_id if track_id else self._generate_id()
17
- self.bbox = detection.bbox.copy() if hasattr(detection, 'bbox') else [0, 0, 100, 100]
18
- self.detections = [detection]
19
- self.confidence = detection.confidence if hasattr(detection, 'confidence') else 0.5
 
 
 
 
 
 
 
 
20
 
21
  # Track state
 
 
 
22
  self.age = 1
23
  self.time_since_update = 0
24
  self.state = 'tentative'
25
- self.hits = 1
26
- self.consecutive_misses = 0
27
 
28
- # ENHANCEMENT 7: Track validation
29
- self.validation_score = 0 # Increases with consistent detections
30
- self.min_validation_hits = 3 # Require 3 consistent detections
31
- self.is_validated = False
32
 
33
- # Store center points for trajectory
34
- cx = (self.bbox[0] + self.bbox[2]) / 2
35
- cy = (self.bbox[1] + self.bbox[3]) / 2
36
  self.trajectory = deque(maxlen=30)
37
  self.trajectory.append((cx, cy))
38
 
39
- # Enhanced motion model
40
- self.velocity = np.array([0.0, 0.0])
41
- self.acceleration = np.array([0.0, 0.0])
42
-
43
- # Appearance features
44
- self.appearance_features = []
45
- if hasattr(detection, 'features'):
46
- self.appearance_features.append(detection.features)
47
-
48
- # Size tracking with validation
49
- self.sizes = deque(maxlen=10)
50
- width = max(1, self.bbox[2] - self.bbox[0])
51
- height = max(1, self.bbox[3] - self.bbox[1])
52
- self.sizes.append((width, height))
53
- self.initial_size = (width, height)
54
-
55
- # Track quality metrics
56
  self.avg_confidence = self.confidence
57
- self.max_confidence = self.confidence
58
- self.confidence_history = deque(maxlen=10)
59
- self.confidence_history.append(self.confidence)
60
-
61
  def _generate_id(self) -> int:
62
  return int(uuid.uuid4().int % 100000)
63
 
64
- def validate_detection_consistency(self, detection: Detection) -> bool:
65
- """
66
- ENHANCEMENT 7: Validate detection consistency before accepting
67
- Checks size similarity and position consistency
68
- """
69
- if not hasattr(detection, 'bbox') or len(detection.bbox) != 4:
70
- return False
71
-
72
- # Check size similarity
73
- new_width = detection.bbox[2] - detection.bbox[0]
74
- new_height = detection.bbox[3] - detection.bbox[1]
75
-
76
- if self.initial_size[0] > 0 and self.initial_size[1] > 0:
77
- width_ratio = new_width / self.initial_size[0]
78
- height_ratio = new_height / self.initial_size[1]
79
-
80
- # Allow 50% size variation max (prevents wild mismatches)
81
- if width_ratio < 0.5 or width_ratio > 2.0:
82
- return False
83
- if height_ratio < 0.5 or height_ratio > 2.0:
84
- return False
85
-
86
- # Check position consistency (not jumping too far)
87
- new_cx = (detection.bbox[0] + detection.bbox[2]) / 2
88
- new_cy = (detection.bbox[1] + detection.bbox[3]) / 2
89
-
90
- if len(self.trajectory) > 0:
91
- last_cx, last_cy = self.trajectory[-1]
92
- distance = np.sqrt((new_cx - last_cx)**2 + (new_cy - last_cy)**2)
93
-
94
- # Max reasonable movement per frame (adjust based on your video)
95
- max_movement = 100 # pixels
96
- if distance > max_movement:
97
- return False
98
-
99
- return True
100
 
101
  def predict(self):
102
- """Enhanced motion prediction"""
103
  self.age += 1
104
  self.time_since_update += 1
105
  self.consecutive_misses += 1
106
 
107
- try:
108
- if len(self.trajectory) >= 3:
109
- positions = np.array(list(self.trajectory))[-3:]
110
- self.velocity = positions[-1] - positions[-2]
111
-
112
- # Limit velocity
113
- max_velocity = 50
114
- velocity_magnitude = np.linalg.norm(self.velocity)
115
- if velocity_magnitude > max_velocity:
116
- self.velocity = self.velocity / velocity_magnitude * max_velocity
117
-
118
- if len(positions) == 3:
119
- prev_velocity = positions[-2] - positions[-3]
120
- self.acceleration = (self.velocity - prev_velocity) * 0.3
121
-
122
- predicted_pos = positions[-1] + self.velocity * 0.7 + self.acceleration * 0.1
123
-
124
- if self.sizes:
125
- avg_width = np.mean([s[0] for s in self.sizes])
126
- avg_height = np.mean([s[1] for s in self.sizes])
127
- else:
128
- avg_width = max(10, self.bbox[2] - self.bbox[0])
129
- avg_height = max(10, self.bbox[3] - self.bbox[1])
130
-
131
- self.bbox = [
132
- predicted_pos[0] - avg_width/2,
133
- predicted_pos[1] - avg_height/2,
134
- predicted_pos[0] + avg_width/2,
135
- predicted_pos[1] + avg_height/2
136
- ]
137
- except Exception as e:
138
- pass
139
 
140
  def update(self, detection: Detection):
141
- """Update track with validation"""
142
- try:
143
- # ENHANCEMENT 7: Validate consistency
144
- if not self.validate_detection_consistency(detection):
145
- print(f" ⚠️ Track {self.track_id}: Rejected inconsistent detection")
146
- return
147
-
148
- # Update bbox
149
- if hasattr(detection, 'bbox'):
150
- self.bbox = detection.bbox.copy()
151
- self.detections.append(detection)
152
-
153
- # Update confidence
154
- if hasattr(detection, 'confidence'):
155
- self.confidence = detection.confidence
156
- self.confidence_history.append(self.confidence)
157
- self.avg_confidence = np.mean(list(self.confidence_history))
158
- self.max_confidence = max(self.max_confidence, self.confidence)
159
-
160
- self.hits += 1
161
- self.time_since_update = 0
162
- self.consecutive_misses = 0
163
-
164
- # ENHANCEMENT 7: Update validation score
165
- self.validation_score += 1
166
- if self.validation_score >= self.min_validation_hits and not self.is_validated:
167
- self.is_validated = True
168
- print(f" ✅ Track {self.track_id} validated after {self.validation_score} consistent hits")
169
-
170
- # Update trajectory
171
- cx = (self.bbox[0] + self.bbox[2]) / 2
172
- cy = (self.bbox[1] + self.bbox[3]) / 2
173
- self.trajectory.append((cx, cy))
174
-
175
- # Update size history
176
- width = max(1, self.bbox[2] - self.bbox[0])
177
- height = max(1, self.bbox[3] - self.bbox[1])
178
- self.sizes.append((width, height))
179
-
180
- # Store appearance features
181
- if hasattr(detection, 'features'):
182
- self.appearance_features.append(detection.features)
183
- if len(self.appearance_features) > 5:
184
- self.appearance_features = self.appearance_features[-5:]
185
-
186
- # Confirm track after validation
187
- if self.is_validated and self.state == 'tentative':
188
- self.state = 'confirmed'
189
-
190
- # Keep only recent detections
191
- if len(self.detections) > 5:
192
- for old_det in self.detections[:-5]:
193
- if hasattr(old_det, 'image_crop'):
194
- old_det.image_crop = None
195
- self.detections = self.detections[-5:]
196
-
197
- except Exception as e:
198
- print(f"Track update error: {e}")
199
 
200
  def mark_missed(self):
201
  """Mark track as missed"""
202
- # ENHANCEMENT 7: Only delete validated tracks after longer period
203
- if self.is_validated and self.state == 'confirmed':
204
- if self.consecutive_misses > 20: # Extended buffer for validated tracks
205
- self.state = 'deleted'
206
- elif self.time_since_update > 40:
207
  self.state = 'deleted'
208
  elif self.state == 'tentative':
209
- if self.consecutive_misses > 3:
210
  self.state = 'deleted'
211
-
212
- # Reduce validation score when missed
213
- self.validation_score = max(0, self.validation_score - 0.5)
214
-
 
 
 
 
 
 
215
 
216
- class EnhancedTracker:
217
  """
218
- Enhanced Tracker with Track Validation (Enhancement 7)
 
219
  """
220
  def __init__(self,
221
- match_threshold: float = 0.35,
222
- track_buffer: int = 40, # Increased from 30
223
- min_iou_for_match: float = 0.15,
224
- use_appearance: bool = False):
225
-
226
- self.match_threshold = match_threshold
227
- self.track_buffer = track_buffer
228
- self.min_iou_for_match = min_iou_for_match
 
 
 
 
 
 
 
 
 
229
  self.use_appearance = use_appearance
230
 
231
- self.tracks: List[Track] = []
232
  self.track_id_count = 1
233
 
234
- # ENHANCEMENT 7: Size constraints for valid dogs
235
- self.min_dog_size = 30 # Minimum width/height in pixels
236
- self.max_dog_size = 800 # Maximum width/height in pixels
237
 
238
- # Enhanced parameters
239
- self.max_center_distance = 120
240
- self.min_size_similarity = 0.4
 
 
241
 
242
- self.debug = False
243
-
244
- def _is_valid_detection_size(self, detection: Detection) -> bool:
245
- """ENHANCEMENT 7: Size-based filtering"""
246
- if not hasattr(detection, 'bbox') or len(detection.bbox) != 4:
247
- return False
248
-
249
- width = detection.bbox[2] - detection.bbox[0]
250
- height = detection.bbox[3] - detection.bbox[1]
251
-
252
- # Filter too small or too large
253
- if width < self.min_dog_size or height < self.min_dog_size:
254
- return False
255
- if width > self.max_dog_size or height > self.max_dog_size:
256
- return False
257
-
258
- # Aspect ratio check (dogs shouldn't be super wide or super tall)
259
- if width > 0 and height > 0:
260
- aspect_ratio = width / height
261
- if aspect_ratio < 0.3 or aspect_ratio > 3.0:
262
- return False
263
-
264
- return True
265
-
266
- def _check_appearance_similarity(self, detection: Detection) -> bool:
267
- """
268
- ENHANCEMENT 7: Check if detection is too similar to existing tracks
269
- Prevents duplicate tracks for the same dog
270
- """
271
- if not hasattr(detection, 'bbox'):
272
- return True
273
 
274
- det_center = self._get_center(detection.bbox)
275
- det_size = (detection.bbox[2] - detection.bbox[0],
276
- detection.bbox[3] - detection.bbox[1])
277
 
278
- for track in self.tracks:
279
- if track.state == 'deleted':
280
- continue
281
-
282
- track_center = self._get_center(track.bbox)
283
- track_size = (track.bbox[2] - track.bbox[0],
284
- track.bbox[3] - track.bbox[1])
285
-
286
- # Check center distance
287
- distance = np.linalg.norm(np.array(det_center) - np.array(track_center))
288
-
289
- # Check size similarity
290
- size_diff = abs(det_size[0] - track_size[0]) + abs(det_size[1] - track_size[1])
291
- avg_size = (det_size[0] + det_size[1] + track_size[0] + track_size[1]) / 4
292
-
293
- # If very close and similar size, it's likely the same dog
294
- if distance < 40 and size_diff < avg_size * 0.3:
295
- return False # Too similar to existing track
296
 
297
- return True
 
 
 
 
298
 
299
- def update(self, detections: List[Detection]) -> List[Track]:
300
- """Update tracks with enhanced validation"""
 
 
 
 
 
301
 
302
- # ENHANCEMENT 7: Filter detections by size
303
- valid_detections = [d for d in detections if self._is_valid_detection_size(d)]
 
304
 
305
- if len(valid_detections) < len(detections):
306
- print(f" 🔍 Filtered {len(detections) - len(valid_detections)} invalid size detections")
 
307
 
308
- detections = valid_detections
 
309
 
310
- if not detections:
311
- for track in self.tracks:
312
- track.predict()
313
- track.mark_missed()
314
-
315
- # Move lost tracks to sleeping (call ReID's move_to_sleeping)
316
- self._handle_lost_tracks()
317
-
318
- self.tracks = [t for t in self.tracks if t.state != 'deleted']
319
- return [t for t in self.tracks if t.state == 'confirmed']
320
 
321
- try:
322
- # Predict existing tracks
323
- for track in self.tracks:
324
- track.predict()
325
-
326
- # Split tracks by state
327
- confirmed_tracks = [t for t in self.tracks if t.state == 'confirmed']
328
- tentative_tracks = [t for t in self.tracks if t.state == 'tentative']
329
-
330
- matched_track_indices = set()
331
- matched_det_indices = set()
332
-
333
- # Stage 1: Match confirmed tracks
334
- if confirmed_tracks and detections:
335
- matched_track_indices, matched_det_indices = self._associate_tracks(
336
- confirmed_tracks, detections,
337
- matched_track_indices, matched_det_indices,
338
- threshold_mult=1.0
339
- )
340
-
341
- # Stage 2: Match tentative tracks
342
- if tentative_tracks:
343
- unmatched_dets = [detections[i] for i in range(len(detections))
344
- if i not in matched_det_indices]
345
-
346
- if unmatched_dets:
347
- temp_det_mapping = [i for i in range(len(detections))
348
- if i not in matched_det_indices]
349
-
350
- tent_matched_tracks, tent_matched_dets = self._associate_tracks(
351
- tentative_tracks, unmatched_dets,
352
- set(), set(),
353
- threshold_mult=0.7
354
- )
355
-
356
- for det_idx in tent_matched_dets:
357
- matched_det_indices.add(temp_det_mapping[det_idx])
358
-
359
- # Mark unmatched tracks
360
- for i, track in enumerate(confirmed_tracks):
361
- if i not in matched_track_indices:
362
- track.mark_missed()
363
-
364
- for track in tentative_tracks:
365
- if track.time_since_update > 0:
366
- track.mark_missed()
367
-
368
- # ENHANCEMENT 7: Create new tracks with validation
369
- for det_idx in range(len(detections)):
370
- if det_idx not in matched_det_indices:
371
- detection = detections[det_idx]
372
-
373
- # Check appearance similarity before creating track
374
- if self._check_appearance_similarity(detection):
375
- new_track = Track(detection, self.track_id_count)
376
- self.track_id_count += 1
377
- self.tracks.append(new_track)
378
- else:
379
- print(f" 🚫 Rejected duplicate track candidate")
380
-
381
- # Handle lost tracks
382
- self._handle_lost_tracks()
383
-
384
- # Remove deleted tracks
385
- self.tracks = [t for t in self.tracks if t.state != 'deleted']
386
-
387
- # Return only validated confirmed tracks
388
- return [t for t in self.tracks if t.state == 'confirmed' and t.is_validated]
389
 
390
- except Exception as e:
391
- print(f"Tracker update error: {e}")
392
- return [t for t in self.tracks if t.state == 'confirmed']
393
-
394
- def _handle_lost_tracks(self):
395
- """Handle tracks that are about to be deleted"""
396
- for track in self.tracks:
397
- # If track is validated and about to be deleted, could move to sleeping
398
- if (track.is_validated and
399
- track.state == 'confirmed' and
400
- track.consecutive_misses == 18): # Just before deletion at 20
401
-
402
- # This would be called by demo to move to ReID sleeping tracks
403
- if hasattr(self, '_reid_callback'):
404
- self._reid_callback(track.track_id)
405
-
406
- def set_reid_callback(self, callback):
407
- """Set callback to ReID for moving tracks to sleeping"""
408
- self._reid_callback = callback
409
-
410
- def _associate_tracks(self, tracks: List[Track], detections: List[Detection],
411
- existing_matched_tracks: set, existing_matched_dets: set,
412
- threshold_mult: float = 1.0) -> Tuple[set, set]:
413
- """Track-detection association"""
414
- if not tracks or not detections:
415
- return existing_matched_tracks, existing_matched_dets
416
 
417
- try:
418
- cost_matrix = self._calculate_enhanced_cost_matrix(tracks, detections)
419
-
420
- if cost_matrix.size == 0:
421
- return existing_matched_tracks, existing_matched_dets
422
-
423
- row_ind, col_ind = linear_sum_assignment(cost_matrix)
424
-
425
- matched_tracks = existing_matched_tracks.copy()
426
- matched_dets = existing_matched_dets.copy()
427
-
428
- for r, c in zip(row_ind, col_ind):
429
- if r >= len(tracks) or c >= len(detections):
430
- continue
431
-
432
- threshold = (1 - self.match_threshold * threshold_mult)
433
- if cost_matrix[r, c] < threshold:
434
- tracks[r].update(detections[c])
435
- matched_tracks.add(r)
436
- matched_dets.add(c)
437
-
438
- return matched_tracks, matched_dets
439
 
440
- except Exception as e:
441
- print(f"Association error: {e}")
442
- return existing_matched_tracks, existing_matched_dets
443
 
444
- def _calculate_enhanced_cost_matrix(self, tracks: List[Track],
445
- detections: List[Detection]) -> np.ndarray:
446
- """Calculate cost matrix"""
447
- try:
448
- if not tracks or not detections:
449
- return np.array([])
450
-
451
- n_tracks = len(tracks)
452
- n_dets = len(detections)
453
- cost_matrix = np.ones((n_tracks, n_dets))
 
 
 
 
 
 
 
 
 
454
 
455
- for t_idx, track in enumerate(tracks):
456
- if not hasattr(track, 'bbox') or len(track.bbox) != 4:
457
- continue
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
458
 
459
- track_center = np.array(self._get_center(track.bbox))
460
- track_size = np.array([
461
- max(1, track.bbox[2] - track.bbox[0]),
462
- max(1, track.bbox[3] - track.bbox[1])
463
- ])
464
 
465
- for d_idx, detection in enumerate(detections):
466
- if not hasattr(detection, 'bbox') or len(detection.bbox) != 4:
467
- continue
468
-
469
- # IoU cost
470
- iou = self._iou(track.bbox, detection.bbox)
471
-
472
- # Center distance
473
- det_center = np.array(self._get_center(detection.bbox))
474
- distance = np.linalg.norm(track_center - det_center)
475
-
476
- # Size similarity
477
- det_size = np.array([
478
- max(1, detection.bbox[2] - detection.bbox[0]),
479
- max(1, detection.bbox[3] - detection.bbox[1])
480
- ])
481
-
482
- size_ratio = np.minimum(track_size, det_size) / (np.maximum(track_size, det_size) + 1e-6)
483
- size_cost = 1 - np.mean(size_ratio)
484
-
485
- if iou >= self.min_iou_for_match and distance < self.max_center_distance:
486
- iou_cost = 1 - iou
487
- dist_cost = distance / self.max_center_distance
488
-
489
- total_cost = (0.6 * iou_cost +
490
- 0.25 * dist_cost +
491
- 0.15 * size_cost)
492
-
493
- # Boost validated tracks
494
- if track.is_validated:
495
- total_cost *= 0.9
496
-
497
- cost_matrix[t_idx, d_idx] = total_cost
498
  else:
499
- cost_matrix[t_idx, d_idx] = 1.0
500
-
501
- return cost_matrix
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
502
 
503
- except Exception as e:
504
- print(f"Cost matrix error: {e}")
505
- return np.ones((len(tracks), len(detections)))
 
 
 
 
 
 
 
 
 
 
506
 
507
- def _get_center(self, bbox: List[float]) -> Tuple[float, float]:
508
- """Get bbox center"""
509
- try:
510
- if len(bbox) >= 4:
511
- return ((bbox[0] + bbox[2]) / 2, (bbox[1] + bbox[3]) / 2)
512
- return (0, 0)
513
- except:
514
- return (0, 0)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
515
 
516
  def _iou(self, bbox1: List[float], bbox2: List[float]) -> float:
517
- """Calculate IoU"""
518
  try:
519
- if len(bbox1) < 4 or len(bbox2) < 4:
520
- return 0.0
521
-
522
  x1 = max(bbox1[0], bbox2[0])
523
  y1 = max(bbox1[1], bbox2[1])
524
  x2 = min(bbox1[2], bbox2[2])
@@ -528,40 +444,20 @@ class EnhancedTracker:
528
  return 0.0
529
 
530
  intersection = (x2 - x1) * (y2 - y1)
531
- area1 = max(1, (bbox1[2] - bbox1[0]) * (bbox1[3] - bbox1[1]))
532
- area2 = max(1, (bbox2[2] - bbox2[0]) * (bbox2[3] - bbox2[1]))
533
  union = area1 + area2 - intersection
534
 
535
- return max(0, min(1, intersection / (union + 1e-6)))
536
-
537
- except Exception as e:
538
  return 0.0
539
 
540
- def set_match_threshold(self, threshold: float):
541
- """Update matching threshold"""
542
- self.match_threshold = max(0.1, min(0.8, threshold))
543
- print(f"Tracking threshold: {self.match_threshold:.2f}")
544
-
545
  def reset(self):
546
  """Reset tracker"""
547
  self.tracks.clear()
548
  self.track_id_count = 1
549
- print("Tracker reset")
550
-
551
- def get_statistics(self) -> Dict:
552
- """Get statistics"""
553
- confirmed = len([t for t in self.tracks if t.state == 'confirmed'])
554
- validated = len([t for t in self.tracks if t.is_validated])
555
- tentative = len([t for t in self.tracks if t.state == 'tentative'])
556
-
557
- return {
558
- 'total_tracks': len(self.tracks),
559
- 'confirmed_tracks': confirmed,
560
- 'validated_tracks': validated,
561
- 'tentative_tracks': tentative,
562
- 'next_id': self.track_id_count
563
- }
564
 
565
- # Compatibility aliases
566
- SimpleTracker = EnhancedTracker
567
- RobustTracker = EnhancedTracker
 
1
  """
2
+ DeepSORT Tracker Implementation
3
+ Uses Kalman filter for motion prediction + appearance features
4
  """
5
  import numpy as np
 
6
  from scipy.optimize import linear_sum_assignment
7
+ from scipy.spatial.distance import cosine
8
  from collections import deque
9
+ from typing import List, Optional, Tuple
10
  import uuid
11
  from detection import Detection
 
 
12
 
13
+ class KalmanFilter:
14
+ """
15
+ Kalman Filter for tracking with constant velocity model
16
+ State: [x, y, w, h, vx, vy, vw, vh]
17
+ """
18
+ def __init__(self):
19
+ # State transition matrix (constant velocity model)
20
+ self.F = np.eye(8)
21
+ for i in range(4):
22
+ self.F[i, i+4] = 1
23
+
24
+ # Measurement matrix (we observe position and size)
25
+ self.H = np.eye(4, 8)
26
+
27
+ # Process noise
28
+ self.Q = np.eye(8)
29
+ self.Q[4:, 4:] *= 0.01 # Smaller noise for velocities
30
+
31
+ # Measurement noise
32
+ self.R = np.eye(4) * 10
33
+
34
+ # State covariance
35
+ self.P = np.eye(8) * 1000
36
+
37
+ # State vector [x, y, w, h, vx, vy, vw, vh]
38
+ self.x = np.zeros(8)
39
+
40
+ def initiate(self, measurement: np.ndarray):
41
+ """Initialize filter with first measurement [x, y, w, h]"""
42
+ self.x[:4] = measurement
43
+ self.x[4:] = 0 # Zero velocity
44
+
45
+ def predict(self):
46
+ """Predict next state"""
47
+ self.x = self.F @ self.x
48
+ self.P = self.F @ self.P @ self.F.T + self.Q
49
+ return self.x[:4]
50
+
51
+ def update(self, measurement: np.ndarray):
52
+ """Update with new measurement"""
53
+ # Innovation
54
+ y = measurement - self.H @ self.x
55
+
56
+ # Innovation covariance
57
+ S = self.H @ self.P @ self.H.T + self.R
58
+
59
+ # Kalman gain
60
+ K = self.P @ self.H.T @ np.linalg.inv(S)
61
+
62
+ # Update state
63
+ self.x = self.x + K @ y
64
+
65
+ # Update covariance
66
+ self.P = (np.eye(8) - K @ self.H) @ self.P
67
+
68
+ return self.x[:4]
69
+
70
+ def get_state(self) -> np.ndarray:
71
+ """Get current state [x, y, w, h]"""
72
+ return self.x[:4]
73
+
74
+ class DeepSORTTrack:
75
+ """
76
+ DeepSORT Track with Kalman filter and appearance features
77
+ """
78
  def __init__(self, detection: Detection, track_id: Optional[int] = None):
79
  self.track_id = track_id if track_id else self._generate_id()
80
+
81
+ # Convert bbox to center format [cx, cy, w, h]
82
+ x1, y1, x2, y2 = detection.bbox
83
+ cx = (x1 + x2) / 2
84
+ cy = (y1 + y2) / 2
85
+ w = x2 - x1
86
+ h = y2 - y1
87
+
88
+ # Initialize Kalman filter
89
+ self.kf = KalmanFilter()
90
+ self.kf.initiate(np.array([cx, cy, w, h]))
91
 
92
  # Track state
93
+ self.detections = [detection]
94
+ self.confidence = detection.confidence
95
+ self.hits = 1
96
  self.age = 1
97
  self.time_since_update = 0
98
  self.state = 'tentative'
 
 
99
 
100
+ # Appearance features for ReID
101
+ self.features = deque(maxlen=100) # Store last 100 features
102
+ if hasattr(detection, 'features') and detection.features is not None:
103
+ self.features.append(detection.features)
104
 
105
+ # Trajectory
 
 
106
  self.trajectory = deque(maxlen=30)
107
  self.trajectory.append((cx, cy))
108
 
109
+ # Quality metrics
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
110
  self.avg_confidence = self.confidence
111
+ self.consecutive_misses = 0
112
+
 
 
113
  def _generate_id(self) -> int:
114
  return int(uuid.uuid4().int % 100000)
115
 
116
+ @property
117
+ def bbox(self) -> List[float]:
118
+ """Get bbox in [x1, y1, x2, y2] format"""
119
+ cx, cy, w, h = self.kf.get_state()
120
+ return [
121
+ cx - w/2,
122
+ cy - h/2,
123
+ cx + w/2,
124
+ cy + h/2
125
+ ]
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
126
 
127
  def predict(self):
128
+ """Predict next state using Kalman filter"""
129
  self.age += 1
130
  self.time_since_update += 1
131
  self.consecutive_misses += 1
132
 
133
+ # Predict with Kalman filter
134
+ self.kf.predict()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
135
 
136
  def update(self, detection: Detection):
137
+ """Update track with new detection"""
138
+ # Convert to center format
139
+ x1, y1, x2, y2 = detection.bbox
140
+ cx = (x1 + x2) / 2
141
+ cy = (y1 + y2) / 2
142
+ w = x2 - x1
143
+ h = y2 - y1
144
+
145
+ # Update Kalman filter
146
+ self.kf.update(np.array([cx, cy, w, h]))
147
+
148
+ # Update track state
149
+ self.detections.append(detection)
150
+ self.confidence = detection.confidence
151
+ self.avg_confidence = 0.9 * self.avg_confidence + 0.1 * self.confidence
152
+
153
+ self.hits += 1
154
+ self.time_since_update = 0
155
+ self.consecutive_misses = 0
156
+
157
+ # Update features
158
+ if hasattr(detection, 'features') and detection.features is not None:
159
+ self.features.append(detection.features)
160
+
161
+ # Update trajectory
162
+ self.trajectory.append((cx, cy))
163
+
164
+ # Confirm track
165
+ if self.state == 'tentative' and self.hits >= 3:
166
+ self.state = 'confirmed'
167
+
168
+ # Keep only recent detections
169
+ if len(self.detections) > 5:
170
+ for old_det in self.detections[:-5]:
171
+ if hasattr(old_det, 'image_crop'):
172
+ old_det.image_crop = None
173
+ self.detections = self.detections[-5:]
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
174
 
175
  def mark_missed(self):
176
  """Mark track as missed"""
177
+ if self.state == 'confirmed':
178
+ if self.consecutive_misses > 30 or self.time_since_update > 60:
 
 
 
179
  self.state = 'deleted'
180
  elif self.state == 'tentative':
181
+ if self.consecutive_misses > 5:
182
  self.state = 'deleted'
183
+
184
+ def get_feature(self) -> Optional[np.ndarray]:
185
+ """Get averaged appearance feature"""
186
+ if not self.features:
187
+ return None
188
+ # Use exponential moving average for recent features
189
+ features_array = np.array(list(self.features))
190
+ weights = np.exp(np.linspace(-1, 0, len(features_array)))
191
+ weights /= weights.sum()
192
+ return np.average(features_array, axis=0, weights=weights)
193
 
194
+ class DeepSORTTracker:
195
  """
196
+ DeepSORT Tracker combining Kalman filter motion model
197
+ with appearance feature matching
198
  """
199
  def __init__(self,
200
+ max_iou_distance: float = 0.7,
201
+ max_age: int = 30,
202
+ n_init: int = 3,
203
+ nn_budget: int = 100,
204
+ use_appearance: bool = True):
205
+ """
206
+ Args:
207
+ max_iou_distance: Maximum IoU distance for matching (0.7 = 30% IoU)
208
+ max_age: Maximum frames to keep lost tracks
209
+ n_init: Number of frames to confirm a track
210
+ nn_budget: Maximum size of appearance feature gallery
211
+ use_appearance: Whether to use appearance features
212
+ """
213
+ self.max_iou_distance = max_iou_distance
214
+ self.max_age = max_age
215
+ self.n_init = n_init
216
+ self.nn_budget = nn_budget
217
  self.use_appearance = use_appearance
218
 
219
+ self.tracks: List[DeepSORTTrack] = []
220
  self.track_id_count = 1
221
 
222
+ # Gating thresholds
223
+ self.gating_threshold_iou = 0.3 # Minimum IoU for association
224
+ self.gating_threshold_appearance = 0.5 # Maximum cosine distance
225
 
226
+ def update(self, detections: List[Detection]) -> List[DeepSORTTrack]:
227
+ """Update tracks with new detections"""
228
+ # Predict existing tracks
229
+ for track in self.tracks:
230
+ track.predict()
231
 
232
+ # Match detections to tracks
233
+ matches, unmatched_tracks, unmatched_detections = self._match(
234
+ detections, self.tracks
235
+ )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
236
 
237
+ # Update matched tracks
238
+ for track_idx, det_idx in matches:
239
+ self.tracks[track_idx].update(detections[det_idx])
240
 
241
+ # Mark unmatched tracks as missed
242
+ for track_idx in unmatched_tracks:
243
+ self.tracks[track_idx].mark_missed()
244
+
245
+ # Create new tracks for unmatched detections
246
+ for det_idx in unmatched_detections:
247
+ self._initiate_track(detections[det_idx])
 
 
 
 
 
 
 
 
 
 
 
248
 
249
+ # Remove deleted tracks
250
+ self.tracks = [t for t in self.tracks if t.state != 'deleted']
251
+
252
+ # Return confirmed tracks
253
+ return [t for t in self.tracks if t.state == 'confirmed']
254
 
255
+ def _match(self, detections: List[Detection], tracks: List[DeepSORTTrack]) -> Tuple:
256
+ """
257
+ Match detections to tracks using cascade matching
258
+ Returns: (matches, unmatched_tracks, unmatched_detections)
259
+ """
260
+ if not tracks or not detections:
261
+ return [], list(range(len(tracks))), list(range(len(detections)))
262
 
263
+ # Split tracks by state
264
+ confirmed_tracks = [i for i, t in enumerate(tracks) if t.state == 'confirmed']
265
+ unconfirmed_tracks = [i for i, t in enumerate(tracks) if t.state == 'tentative']
266
 
267
+ # Stage 1: Match confirmed tracks
268
+ matches_a, unmatched_tracks_a, unmatched_detections = \
269
+ self._matching_cascade(detections, tracks, confirmed_tracks)
270
 
271
+ # Stage 2: Match unconfirmed tracks
272
+ iou_track_candidates = unconfirmed_tracks + unmatched_tracks_a
273
 
274
+ # Filter detections used in stage 1
275
+ remaining_detections = [detections[i] for i in unmatched_detections]
 
 
 
 
 
 
 
 
276
 
277
+ matches_b, unmatched_tracks_b, unmatched_detections_b = \
278
+ self._match_iou(remaining_detections, tracks, iou_track_candidates)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
279
 
280
+ # Remap indices for stage 2
281
+ matches_b = [(t, unmatched_detections[d]) for t, d in matches_b]
282
+ unmatched_detections = [unmatched_detections[i] for i in unmatched_detections_b]
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
283
 
284
+ matches = matches_a + matches_b
285
+ unmatched_tracks = unmatched_tracks_b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
286
 
287
+ return matches, unmatched_tracks, unmatched_detections
 
 
288
 
289
+ def _matching_cascade(self, detections: List[Detection],
290
+ tracks: List[DeepSORTTrack],
291
+ track_indices: List[int]) -> Tuple:
292
+ """Cascade matching by time since update"""
293
+ matches = []
294
+ unmatched_detections = list(range(len(detections)))
295
+
296
+ # Group tracks by time since update
297
+ for level in range(self.max_age):
298
+ if len(unmatched_detections) == 0:
299
+ break
300
+
301
+ track_indices_l = [
302
+ k for k in track_indices
303
+ if tracks[k].time_since_update == level
304
+ ]
305
+
306
+ if len(track_indices_l) == 0:
307
+ continue
308
 
309
+ # Match at this level
310
+ matches_l, _, unmatched_detections = self._match_features_and_iou(
311
+ [detections[i] for i in unmatched_detections],
312
+ tracks,
313
+ track_indices_l
314
+ )
315
+
316
+ # Remap detection indices
317
+ matches_l = [(t, unmatched_detections[d]) for t, d in matches_l]
318
+ matches.extend(matches_l)
319
+
320
+ # Update unmatched detections
321
+ unmatched_detections = [
322
+ unmatched_detections[i] for i in range(len(unmatched_detections))
323
+ if i not in [d for _, d in matches_l]
324
+ ]
325
+
326
+ unmatched_tracks = [
327
+ k for k in track_indices
328
+ if k not in [t for t, _ in matches]
329
+ ]
330
+
331
+ return matches, unmatched_tracks, unmatched_detections
332
+
333
+ def _match_features_and_iou(self, detections: List[Detection],
334
+ tracks: List[DeepSORTTrack],
335
+ track_indices: List[int]) -> Tuple:
336
+ """Match using both appearance features and IoU"""
337
+ if not track_indices or not detections:
338
+ return [], track_indices, list(range(len(detections)))
339
+
340
+ # Calculate cost matrix
341
+ cost_matrix = np.zeros((len(track_indices), len(detections)))
342
+
343
+ for i, track_idx in enumerate(track_indices):
344
+ track = tracks[track_idx]
345
+ track_bbox = track.bbox
346
+ track_feature = track.get_feature()
347
+
348
+ for j, detection in enumerate(detections):
349
+ det_bbox = detection.bbox
350
 
351
+ # IoU cost
352
+ iou = self._iou(track_bbox, det_bbox)
353
+ iou_cost = 1 - iou
 
 
354
 
355
+ # Appearance cost
356
+ if self.use_appearance and track_feature is not None:
357
+ if hasattr(detection, 'features') and detection.features is not None:
358
+ cosine_dist = cosine(track_feature, detection.features)
359
+ appearance_cost = cosine_dist
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
360
  else:
361
+ appearance_cost = 0.5 # Neutral if no features
362
+ else:
363
+ appearance_cost = 0.0
364
+
365
+ # Combined cost (weighted)
366
+ if self.use_appearance and track_feature is not None:
367
+ cost = 0.5 * iou_cost + 0.5 * appearance_cost
368
+ else:
369
+ cost = iou_cost
370
+
371
+ # Gating: set to infinity if outside thresholds
372
+ if iou < self.gating_threshold_iou:
373
+ cost = 1e6
374
+ if self.use_appearance and appearance_cost > self.gating_threshold_appearance:
375
+ cost = 1e6
376
+
377
+ cost_matrix[i, j] = cost
378
+
379
+ # Hungarian algorithm
380
+ row_indices, col_indices = linear_sum_assignment(cost_matrix)
381
 
382
+ matches = []
383
+ unmatched_tracks = list(range(len(track_indices)))
384
+ unmatched_detections = list(range(len(detections)))
385
+
386
+ for row, col in zip(row_indices, col_indices):
387
+ if cost_matrix[row, col] < 1e5:
388
+ matches.append((track_indices[row], col))
389
+ unmatched_tracks.remove(row)
390
+ unmatched_detections.remove(col)
391
+
392
+ unmatched_tracks = [track_indices[i] for i in unmatched_tracks]
393
+
394
+ return matches, unmatched_tracks, unmatched_detections
395
 
396
+ def _match_iou(self, detections: List[Detection],
397
+ tracks: List[DeepSORTTrack],
398
+ track_indices: List[int]) -> Tuple:
399
+ """Simple IoU matching for unconfirmed tracks"""
400
+ if not track_indices or not detections:
401
+ return [], track_indices, list(range(len(detections)))
402
+
403
+ # IoU distance matrix
404
+ iou_matrix = np.zeros((len(track_indices), len(detections)))
405
+
406
+ for i, track_idx in enumerate(track_indices):
407
+ track = tracks[track_idx]
408
+ for j, detection in enumerate(detections):
409
+ iou = self._iou(track.bbox, detection.bbox)
410
+ iou_matrix[i, j] = 1 - iou
411
+
412
+ # Hungarian matching
413
+ row_indices, col_indices = linear_sum_assignment(iou_matrix)
414
+
415
+ matches = []
416
+ unmatched_tracks = list(range(len(track_indices)))
417
+ unmatched_detections = list(range(len(detections)))
418
+
419
+ for row, col in zip(row_indices, col_indices):
420
+ if iou_matrix[row, col] < self.max_iou_distance:
421
+ matches.append((track_indices[row], col))
422
+ unmatched_tracks.remove(row)
423
+ unmatched_detections.remove(col)
424
+
425
+ unmatched_tracks = [track_indices[i] for i in unmatched_tracks]
426
+
427
+ return matches, unmatched_tracks, unmatched_detections
428
+
429
+ def _initiate_track(self, detection: Detection):
430
+ """Create new track"""
431
+ new_track = DeepSORTTrack(detection, self.track_id_count)
432
+ self.track_id_count += 1
433
+ self.tracks.append(new_track)
434
 
435
  def _iou(self, bbox1: List[float], bbox2: List[float]) -> float:
436
+ """Calculate IoU between two bboxes"""
437
  try:
 
 
 
438
  x1 = max(bbox1[0], bbox2[0])
439
  y1 = max(bbox1[1], bbox2[1])
440
  x2 = min(bbox1[2], bbox2[2])
 
444
  return 0.0
445
 
446
  intersection = (x2 - x1) * (y2 - y1)
447
+ area1 = (bbox1[2] - bbox1[0]) * (bbox1[3] - bbox1[1])
448
+ area2 = (bbox2[2] - bbox2[0]) * (bbox2[3] - bbox2[1])
449
  union = area1 + area2 - intersection
450
 
451
+ return intersection / (union + 1e-6)
452
+ except:
 
453
  return 0.0
454
 
 
 
 
 
 
455
  def reset(self):
456
  """Reset tracker"""
457
  self.tracks.clear()
458
  self.track_id_count = 1
459
+ print("DeepSORT tracker reset")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
460
 
461
+ # Compatibility alias
462
+ SimpleTracker = DeepSORTTracker
463
+ RobustTracker = DeepSORTTracker