mlbench123's picture
Update app.py
042a019 verified
# run7.py
# Updated to implement Option 1 directional crossing:
# - Detect directional crossing of L1 then L2 (L1 coords and L2 coords provided)
# - Maintain a global counter that increments only when an ID crosses L1 (outside->inside) then later crosses L2 (outside->inside)
# - Maintain a live "inside polygon" counter
# - Visualize both counters in Zone Summary panel
# - Keeps all previous features: homography patch, foot-point mapping, travel distance, avg time, occlusion tolerance and reappearance inheritance
# Paste and run. Output video and person_times.xlsx saved in working folder.
import cv2
import numpy as np
import time
import torch
import pandas as pd
from collections import defaultdict, deque
from scipy.ndimage import gaussian_filter1d
from ultralytics import YOLO
import os
import platform
import sys
# Mac-specific optimizations
if platform.system() == "Darwin":
import os
os.environ['PYTORCH_ENABLE_MPS_FALLBACK'] = '1'
os.environ['OMP_NUM_THREADS'] = '1'
# ---------------- Points in image (given) - adjust if needed
A = (440.0, 829.0)
B = (883.0, 928.0)
C = (1052.0, 325.0)
D = (739.0, 297.0)
E = (727.0, 688.0)
F = (893.0, 312.0)
POLYGON = np.array([A, B, C, D], dtype=np.float32)
# ---------------- Real-world segment lengths for path C -> B -> A -> D (meters)
SEG_REAL_M = [5.0, 2.5, 5.0] # C->B, B->A, A->D
# image path (order C,B,A,D)
PATH_IMAGE = np.array([C, B, A, D], dtype=np.float32)
# Patch base scaling (pixels per meter). Will adapt to fit.
BASE_SCALE_PX_PER_M = 80.0
RIGHT_PANEL_W = 350
SMOOTH_ALPHA = 0.65
MISSING_TIMEOUT = 3.0
# ---------------- Lines (L1, L2) coordinates (image space) - use these for counting
L1_p1 = (898.0, 322.0)
L1_p2 = (1020.0, 453.0)
L2_p1 = (786.0, 576.0)
L2_p2 = (977.0, 607.0)
# ---------------- Utilities
def progress_bar(current, total, bar_length=30):
if total <= 0:
return
ratio = current / total
filled = int(ratio * bar_length)
bar = "█" * filled + "-" * (bar_length - filled)
print(f"\r[{bar}] {int(ratio * 100)}% Frame {current}/{total}", end="")
def point_in_polygon(cx, cy, polygon):
return cv2.pointPolygonTest(polygon.astype(np.int32), (int(cx), int(cy)), False) >= 0
def euclid(a, b):
return float(np.hypot(a[0]-b[0], a[1]-b[1]))
def fmt(t):
return time.strftime('%H:%M:%S', time.gmtime(t))
def calculate_foot_from_head(head_box, head_center):
"""Calculate foot position from head detection."""
x1, y1, x2, y2 = head_box
head_cx, head_cy = head_center
head_height = y2 - y1
body_length_est = head_height * 5.5
foot_x = head_cx
foot_y = head_cy + body_length_est
return foot_x, foot_y
def nms_obb(boxes, scores, threshold=0.4):
"""Non-Maximum Suppression for Oriented Bounding Boxes"""
if len(boxes) == 0:
return []
boxes_np = np.array(boxes)
scores_np = np.array(scores)
x_coords = boxes_np[:, 0::2]
y_coords = boxes_np[:, 1::2]
x_min = np.min(x_coords, axis=1)
y_min = np.min(y_coords, axis=1)
x_max = np.max(x_coords, axis=1)
y_max = np.max(y_coords, axis=1)
areas = (x_max - x_min) * (y_max - y_min)
order = scores_np.argsort()[::-1]
keep = []
while order.size > 0:
i = order[0]
keep.append(i)
xx1 = np.maximum(x_min[i], x_min[order[1:]])
yy1 = np.maximum(y_min[i], y_min[order[1:]])
xx2 = np.minimum(x_max[i], x_max[order[1:]])
yy2 = np.minimum(y_max[i], y_max[order[1:]])
w = np.maximum(0.0, xx2 - xx1)
h = np.maximum(0.0, yy2 - yy1)
intersection = w * h
union = areas[i] + areas[order[1:]] - intersection
iou = intersection / union
inds = np.where(iou <= threshold)[0]
order = order[inds + 1]
return keep
# ---------------- Project point onto polyline (returns along distance in px and proj point)
def project_point_to_polyline(pt, poly):
best_dist = None
best_proj = None
best_cum = 0.0
cum = 0.0
for i in range(1, len(poly)):
a = np.array(poly[i-1], dtype=np.float32)
b = np.array(poly[i], dtype=np.float32)
v = b - a
w = np.array(pt, dtype=np.float32) - a
seg_len = float(np.hypot(v[0], v[1]))
if seg_len == 0:
t = 0.0
proj = a.copy()
else:
t = float(np.dot(w, v) / (seg_len*seg_len))
t = max(0.0, min(1.0, t))
proj = a + t*v
d = float(np.hypot(proj[0]-pt[0], proj[1]-pt[1]))
along_px = cum + t * seg_len
if best_dist is None or d < best_dist:
best_dist = d
best_proj = proj
best_cum = along_px
cum += seg_len
return float(best_cum), (float(best_proj[0]), float(best_proj[1]))
def polyline_pixel_lengths(poly):
return [euclid(poly[i-1], poly[i]) for i in range(1, len(poly))]
# ---------------- Compute conversion per segment (image)
img_seg_px_lengths = polyline_pixel_lengths(PATH_IMAGE)
if len(img_seg_px_lengths) != len(SEG_REAL_M):
raise RuntimeError("PATH_IMAGE and SEG_REAL_M length mismatch")
seg_px_to_m = []
for px_len, m_len in zip(img_seg_px_lengths, SEG_REAL_M):
seg_px_to_m.append((m_len / px_len) if px_len > 1e-6 else 0.0)
# helper: compute along_m from an image point using image PATH_IMAGE
def image_point_to_along_m(pt):
along_px, _ = project_point_to_polyline(pt, PATH_IMAGE)
px_cum = 0.0
cum_m = 0.0
for i, seg_px in enumerate(img_seg_px_lengths):
next_px = px_cum + seg_px
if along_px <= next_px + 1e-9:
offset_px = along_px - px_cum
along_m = cum_m + offset_px * seg_px_to_m[i]
return float(max(0.0, min(sum(SEG_REAL_M), along_m)))
px_cum = next_px
cum_m += SEG_REAL_M[i]
return float(sum(SEG_REAL_M))
# ---------------- Build patch rectangle layout (pixel coordinates)
def build_patch_layout(scale_px_per_m):
margin = 18
rect_w_px = int(2.5 * scale_px_per_m)
rect_h_px = int(5.0 * scale_px_per_m)
patch_w = rect_w_px + 2*margin
patch_h = rect_h_px + 2*margin
left_x = margin
right_x = margin + rect_w_px
top_y = margin
bottom_y = margin + rect_h_px
# top row: D (left-top), F (mid-top), C (right-top)
D_p = (left_x, top_y)
F_p = ( (left_x + right_x)//2, top_y )
C_p = (right_x, top_y)
A_p = (left_x, bottom_y)
B_p = (right_x, bottom_y)
# E point down from F
E_p = (F_p[0], top_y + int(rect_h_px * 0.55))
path_patch = np.array([C_p, B_p, A_p, D_p], dtype=np.float32) # C->B->A->D
extras = {"patch_w": patch_w, "patch_h": patch_h, "D": D_p, "F": F_p, "C": C_p, "A": A_p, "B": B_p, "E": E_p, "scale": scale_px_per_m}
return path_patch, extras
PATCH_PATH, PATCH_EXTRAS = build_patch_layout(BASE_SCALE_PX_PER_M)
PATCH_W = PATCH_EXTRAS["patch_w"]
PATCH_H = PATCH_EXTRAS["patch_h"]
# ---------------- Line helpers for crossing detection
def line_coeffs(p1, p2):
# returns a,b,c for line ax+by+c=0
(x1,y1), (x2,y2) = p1, p2
a = y1 - y2
b = x2 - x1
c = x1*y2 - x2*y1
return a, b, c
def signed_dist_to_line(p, line_coeff):
a,b,c = line_coeff
x,y = p
return (a*x + b*y + c) / (np.hypot(a,b) + 1e-12)
def segment_intersects(a1,a2,b1,b2):
# standard segment intersection test
def ccw(A,B,C):
return (C[1]-A[1])*(B[0]-A[0]) > (B[1]-A[1])*(C[0]-A[0])
A=a1; B=a2; C=b1; D=b2
return (ccw(A,C,D) != ccw(B,C,D)) and (ccw(A,B,C) != ccw(A,B,D))
L1_coeff = line_coeffs(L1_p1, L1_p2)
L2_coeff = line_coeffs(L2_p1, L2_p2)
# Determine inside side for each line using polygon centroid:
poly_centroid = tuple(np.mean(POLYGON, axis=0).tolist())
L1_inside_sign = np.sign(signed_dist_to_line(poly_centroid, L1_coeff))
if L1_inside_sign == 0:
L1_inside_sign = 1.0
L2_inside_sign = np.sign(signed_dist_to_line(poly_centroid, L2_coeff))
if L2_inside_sign == 0:
L2_inside_sign = 1.0
# ---------------- BBox smoother
class BBoxSmoother:
def __init__(self, buffer_size=5):
self.buf = buffer_size
self.hist = defaultdict(lambda: deque(maxlen=buffer_size))
def smooth(self, boxes, ids):
out = []
for box, tid in zip(boxes, ids):
self.hist[tid].append(box)
arr = np.array(self.hist[tid])
if arr.shape[0] >= 3:
sm = gaussian_filter1d(arr, sigma=1, axis=0)[-1]
else:
sm = arr[-1]
out.append(sm)
return np.array(out)
# ---------------- Main processing function
def process_video(
input_video_path="crop_video.mp4",
output_video_path="people_polygon_tracking_corrected.avi",
model_name="yolo11x.pt",
head_model_name="head_detection_model.pt",
conf_threshold=0.3,
img_size=1280,
use_gpu=True,
enhance_frames=False,
smooth_bbox_tracks=True,
missing_timeout=MISSING_TIMEOUT
):
device = "cuda" if torch.cuda.is_available() and use_gpu else "cpu"
model = YOLO(model_name)
PERSON_CLASS = 0
head_model = YOLO(head_model_name) # Your OBB head detection model
HEAD_CLASS = 0
bbox_smoother = BBoxSmoother(5) if smooth_bbox_tracks else None
# persistent state
inside_state = {}
entry_time = {}
accumulated_time = defaultdict(float)
first_entry_vid = {}
last_exit_vid = {}
last_seen = {}
prev_along = {}
prev_time = {}
entry_along = {}
travel_distance = defaultdict(float)
display_pos = {}
head_foot_positions = {} # Stores head detections with estimated foot positions
person_only_ids = set() # Track person-only detections
head_only_ids = set() # Track head-only detections
# crossing trackers
prev_foot = {} # {id: (x,y)} previous foot coordinate (image space)
crossed_l1_flag = {} # {id: bool} whether this id has crossed L1 (in required direction) and not yet used to count
crossed_l2_counted = {} # {id: bool} whether this id has already triggered the global count by crossing L2 after L1
prev_l1_dist = {} # Track distance to L1
prev_l2_dist = {} # Track distance to L2
global_counter = 0 # counts completed L1->L2 sequences
completed_times = [] # for avg time taken
sequential_entries = []
cap = cv2.VideoCapture(input_video_path)
if not cap.isOpened():
raise RuntimeError("Cannot open input video: " + input_video_path)
fps = int(cap.get(cv2.CAP_PROP_FPS)) or 25
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
out_w = width + RIGHT_PANEL_W
out_h = height
fourcc = cv2.VideoWriter_fourcc(*'mp4v') # or 'H264' or 'avc1'
output_video_path = "output23.mp4" # Must be .mp4 extension
writer = cv2.VideoWriter(output_video_path, fourcc, fps, (out_w, out_h))
if not writer.isOpened():
raise RuntimeError("Failed to open VideoWriter. Try different codec or path.")
# adjust patch scale if too tall
PATCH_PATH_local = PATCH_PATH.copy()
patch_w = PATCH_W
patch_h = PATCH_H
patch_scale = PATCH_EXTRAS["scale"]
if patch_h > height - 40:
factor = (height - 60) / patch_h
PATCH_PATH_local = PATCH_PATH_local * factor
patch_w = int(patch_w * factor)
patch_h = int(patch_h * factor)
patch_scale = patch_scale * factor
# Create homography from POLYGON (image A,B,C,D) to rect corners in patch coordinates (A_p,B_p,C_p,D_p)
A_p = PATCH_EXTRAS["A"]
B_p = PATCH_EXTRAS["B"]
C_p = PATCH_EXTRAS["C"]
D_p = PATCH_EXTRAS["D"]
dest_rect = np.array([A_p, B_p, C_p, D_p], dtype=np.float32)
H_img2patch = cv2.getPerspectiveTransform(POLYGON.astype(np.float32), dest_rect.astype(np.float32))
start_time = time.time()
frame_idx = 0
# precompute line endpoints & ints for visualization and intersection tests
L1 = (L1_p1, L1_p2)
L2 = (L2_p1, L2_p2)
while True:
ret, frame = cap.read()
if not ret:
break
frame_idx += 1
progress_bar(frame_idx, total_frames)
now = time.time()
vid_seconds = now - start_time
if enhance_frames:
frame = cv2.fastNlMeansDenoisingColored(frame, None, 5,5,7,21)
results = model.track(
frame,
persist=True,
tracker="bytetrack.yaml",
classes=[PERSON_CLASS],
conf=conf_threshold,
iou=0.5,
imgsz=img_size,
device=device,
half=use_gpu,
verbose=False
)
# Head detection (NEW - runs in parallel)
head_results = head_model(frame, conf=conf_threshold, classes=[HEAD_CLASS], verbose=False)[0]
# Process head detections
obb_boxes = []
obb_scores = []
obb_data = []
head_foot_positions = {} # {estimated_foot_pos: (head_box, conf)}
if head_results.obb is not None and len(head_results.obb) > 0:
for obb in head_results.obb:
xyxyxyxy = obb.xyxyxyxy[0].cpu().numpy()
conf = float(obb.conf[0])
if conf < conf_threshold:
continue
obb_boxes.append(xyxyxyxy.flatten().tolist())
obb_scores.append(conf)
obb_data.append((xyxyxyxy, conf))
# Apply NMS to head detections
if len(obb_boxes) > 0:
keep_indices = nms_obb(obb_boxes, obb_scores, 0.4)
for idx in keep_indices:
xyxyxyxy, conf = obb_data[idx]
# Convert OBB to axis-aligned bbox
x_min = int(xyxyxyxy[:, 0].min())
y_min = int(xyxyxyxy[:, 1].min())
x_max = int(xyxyxyxy[:, 0].max())
y_max = int(xyxyxyxy[:, 1].max())
head_cx = (x_min + x_max) / 2.0
head_cy = float(y_min)
# Calculate foot from head
foot_x, foot_y = calculate_foot_from_head(
[x_min, y_min, x_max, y_max],
(head_cx, head_cy)
)
head_foot_positions[(foot_x, foot_y)] = ((x_min, y_min, x_max, y_max, xyxyxyxy), conf)
# draw polygon on frame
cv2.polylines(frame, [POLYGON.astype(np.int32)], True, (255,0,0), 3)
# draw L1 and L2 on frame (blue)
cv2.line(frame, tuple(map(int, L1_p1)), tuple(map(int, L1_p2)), (255,180,0), 3)
cv2.line(frame, tuple(map(int, L2_p1)), tuple(map(int, L2_p2)), (255,180,0), 3)
right_panel = np.ones((height, RIGHT_PANEL_W, 3), dtype=np.uint8) * 40
patch = np.ones((patch_h, patch_w, 3), dtype=np.uint8) * 255
# draw patch structure: rectangle and center divider
A_px = (int(dest_rect[0][0]), int(dest_rect[0][1]))
B_px = (int(dest_rect[1][0]), int(dest_rect[1][1]))
C_px = (int(dest_rect[2][0]), int(dest_rect[2][1]))
D_px = (int(dest_rect[3][0]), int(dest_rect[3][1]))
# walls (thick black lines)
cv2.line(patch, A_px, D_px, (0,0,0), 6) # left
cv2.line(patch, A_px, B_px, (0,0,0), 6) # bottom
cv2.line(patch, B_px, C_px, (0,0,0), 6) # right
cv2.line(patch, D_px, C_px, (0,0,0), 6) # top
# center divider F->E
F_px = ( (D_px[0] + C_px[0])//2, D_px[1] )
E_px = (F_px[0], D_px[1] + int((patch_h) * 0.5))
cv2.line(patch, F_px, E_px, (0,0,0), 6)
for p in [A_px, B_px, C_px, D_px, F_px, E_px]:
cv2.circle(patch, p, 5, (0,0,0), -1)
# Match person detections with head detections
person_head_matches = {} # {person_id: head_foot_pos}
matched_heads = set()
b = results[0].boxes
detected_ids = set()
current_inside = []
current_projs = []
if b is not None and b.id is not None:
boxes = b.xyxy.cpu().numpy()
ids = b.id.cpu().numpy().astype(int)
if bbox_smoother is not None:
boxes = bbox_smoother.smooth(boxes, ids)
# First pass: match person detections with head detections
for box, tid in zip(boxes, ids):
x1, y1, x2, y2 = map(int, box)
person_foot_x = float((x1 + x2) / 2.0)
person_foot_y = float(y2)
# Find closest head detection within reasonable distance
best_head = None
best_dist = 100 # pixels threshold
for head_foot_pos, (head_box_data, head_conf) in head_foot_positions.items():
head_fx, head_fy = head_foot_pos
dist = np.sqrt((person_foot_x - head_fx)**2 + (person_foot_y - head_fy)**2)
# Check if head is roughly above person bbox (y_head < y_person_top)
head_box = head_box_data[:4]
if head_box[3] < y1 + 50: # head bottom should be near person top
if dist < best_dist and head_foot_pos not in matched_heads:
best_dist = dist
best_head = head_foot_pos
if best_head:
person_head_matches[tid] = best_head
matched_heads.add(best_head)
person_only_ids.discard(tid)
else:
person_only_ids.add(tid)
for box, tid in zip(boxes, ids):
x1, y1, x2, y2 = map(int, box)
# Use head-derived foot if available, otherwise use person bbox foot
if tid in person_head_matches:
fx, fy = person_head_matches[tid]
head_box_data, head_conf = head_foot_positions[person_head_matches[tid]]
head_box = head_box_data[:4]
xyxyxyxy = head_box_data[4]
# Draw head OBB (cyan for matched detection)
points = xyxyxyxy.astype(np.int32)
cv2.polylines(frame, [points], True, (255, 255, 0), 2)
else:
fx = float((x1 + x2) / 2.0)
fy = float(y2) # bottom center (foot)
detected_ids.add(tid)
last_seen[tid] = now
inside = point_in_polygon(fx, fy, POLYGON)
prev = inside_state.get(tid, False)
# maintain prev_foot for intersection tests
prev_pt = prev_foot.get(tid, None)
current_pt = (fx, fy)
# Crossing detection for L1
# if prev_pt is not None:
# # check intersection with L1
# inter_l1 = segment_intersects(prev_pt, current_pt, L1_p1, L1_p2)
# if inter_l1:
# # check direction: we want prev_sign != curr_sign and curr_sign == inside sign
# prev_sign = np.sign(signed_dist_to_line(prev_pt, L1_coeff))
# curr_sign = np.sign(signed_dist_to_line(current_pt, L1_coeff))
# if prev_sign == 0:
# prev_sign = -curr_sign if curr_sign != 0 else 1.0
# if curr_sign == 0:
# curr_sign = prev_sign
# if prev_sign != curr_sign and curr_sign == L1_inside_sign:
# # crossed L1 in correct direction (outside -> inside)
# crossed_l1_flag[tid] = True
# # check intersection with L2
# inter_l2 = segment_intersects(prev_pt, current_pt, L2_p1, L2_p2)
# if inter_l2:
# prev_sign = np.sign(signed_dist_to_line(prev_pt, L2_coeff))
# curr_sign = np.sign(signed_dist_to_line(current_pt, L2_coeff))
# if prev_sign == 0:
# prev_sign = -curr_sign if curr_sign != 0 else 1.0
# if curr_sign == 0:
# curr_sign = prev_sign
# if prev_sign != curr_sign and curr_sign == L2_inside_sign:
# # crossed L2 in correct direction; if previously crossed L1 and not yet counted => count
# if crossed_l1_flag.get(tid, False) and not crossed_l2_counted.get(tid, False):
# global_counter += 1
# crossed_l2_counted[tid] = True
# # Record the sequential entry
# entry_vid_time = first_entry_vid.get(tid, vid_seconds)
# sequential_entries.append({
# 'person_num': global_counter,
# 'tid': tid,
# 'entry_time': entry_vid_time,
# 'exit_time': None,
# 'duration': None
# })
# # once person completed crossing sequence, we keep their travel/time records intact
# update prev_foot
# prev_foot[tid] = current_pt
# maintain prev_foot for intersection tests
prev_pt = prev_foot.get(tid, None)
current_pt = (fx, fy)
# Calculate signed distances to both lines
curr_l1_dist = signed_dist_to_line(current_pt, L1_coeff)
curr_l2_dist = signed_dist_to_line(current_pt, L2_coeff)
# Robust crossing detection
if prev_pt is not None and tid in prev_l1_dist and tid in prev_l2_dist:
prev_l1 = prev_l1_dist[tid]
prev_l2 = prev_l2_dist[tid]
# === L1 CROSSING (3 detection methods) ===
# Method 1: Segment intersection (current method)
inter_l1 = segment_intersects(prev_pt, current_pt, L1_p1, L1_p2)
# Method 2: Sign change in distance
prev_sign_l1 = np.sign(prev_l1)
curr_sign_l1 = np.sign(curr_l1_dist)
if prev_sign_l1 == 0:
prev_sign_l1 = 1.0
if curr_sign_l1 == 0:
curr_sign_l1 = prev_sign_l1
sign_change_l1 = (prev_sign_l1 != curr_sign_l1)
correct_dir_l1 = (curr_sign_l1 == L1_inside_sign)
# Method 3: Close proximity check (catches near-misses)
close_to_l1 = abs(curr_l1_dist) < 35 # within 40 pixels
was_far_l1 = abs(prev_l1) > 40 # was at least 20 pixels away
moving_toward_l1 = abs(curr_l1_dist) < abs(prev_l1) # getting closer
# Trigger L1 crossing if ANY method detects it
if (inter_l1 or (sign_change_l1 and correct_dir_l1) or
(close_to_l1 and was_far_l1 and moving_toward_l1 and correct_dir_l1)):
if inside and not crossed_l1_flag.get(tid, False):
crossed_l1_flag[tid] = True
print(f"L1 crossed by ID {tid}")
# === L2 CROSSING (3 detection methods) ===
# Method 1: Segment intersection
inter_l2 = segment_intersects(prev_pt, current_pt, L2_p1, L2_p2)
# Method 2: Sign change in distance
prev_sign_l2 = np.sign(prev_l2)
curr_sign_l2 = np.sign(curr_l2_dist)
if prev_sign_l2 == 0:
prev_sign_l2 = 1.0
if curr_sign_l2 == 0:
curr_sign_l2 = prev_sign_l2
sign_change_l2 = (prev_sign_l2 != curr_sign_l2)
correct_dir_l2 = (curr_sign_l2 == L2_inside_sign)
# Method 3: Close proximity check
close_to_l2 = abs(curr_l2_dist) < 40
was_far_l2 = abs(prev_l2) > 20
moving_toward_l2 = abs(curr_l2_dist) < abs(prev_l2)
# Trigger L2 crossing if ANY method detects it
if (inter_l2 or
(sign_change_l2 and correct_dir_l2) or
(close_to_l2 and was_far_l2 and moving_toward_l2 and correct_dir_l2)):
# Count only if L1 was already crossed and not yet counted
if inside and crossed_l1_flag.get(tid, False) and not crossed_l2_counted.get(tid, False):
global_counter += 1
crossed_l2_counted[tid] = True
print(f"✓ COUNTED: ID {tid} | Global count now: {global_counter}")
entry_vid_time = first_entry_vid.get(tid, vid_seconds)
sequential_entries.append({
'person_num': global_counter,
'tid': tid,
'entry_time': entry_vid_time,
'exit_time': None,
'duration': None
})
# Update distance tracking for next frame
prev_l1_dist[tid] = curr_l1_dist
prev_l2_dist[tid] = curr_l2_dist
prev_foot[tid] = current_pt
if inside and not prev:
inside_state[tid] = True
if tid not in entry_time:
entry_time[tid] = now
if tid not in first_entry_vid:
first_entry_vid[tid] = vid_seconds
if tid not in accumulated_time:
accumulated_time[tid] = 0.0
if tid not in travel_distance:
travel_distance[tid] = 0.0
# draw bbox only for inside persons
if inside:
# Green if matched with head, yellow if person-only
color = (0, 200, 0) if tid in person_head_matches else (0, 200, 200)
cv2.rectangle(frame, (x1,y1), (x2,y2), color, 2)
cv2.putText(frame, f"ID {tid}", (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.8, color, 2)
# map foot point through homography to patch coordinates (this is the key)
pt_img = np.array([[[fx, fy]]], dtype=np.float32)
mapped = cv2.perspectiveTransform(pt_img, H_img2patch)[0][0]
mx = float(np.clip(mapped[0], 0, patch_w - 1))
my = float(np.clip(mapped[1], 0, patch_h - 1))
# smooth display position
if tid in display_pos:
px_prev, py_prev = display_pos[tid]
sx = SMOOTH_ALPHA
dx = px_prev*(1 - sx) + mx*sx
dy = py_prev*(1 - sx) + my*sx
else:
dx, dy = mx, my
display_pos[tid] = (dx, dy)
current_inside.append(tid)
# compute along_m using image-based method for metric consistency
along_m = image_point_to_along_m((fx, fy))
current_projs.append((tid, along_m))
# initialize prev_along if first time
if tid not in prev_along:
prev_along[tid] = along_m
entry_along[tid] = along_m
prev_time[tid] = now
# compute forward-only travel distance
delta = along_m - prev_along.get(tid, along_m)
if delta > 0:
travel_distance[tid] += delta
prev_along[tid] = along_m
prev_time[tid] = now
for head_foot_pos, (head_box_data, head_conf) in head_foot_positions.items():
if head_foot_pos in matched_heads:
continue # Already matched with a person
fx, fy = head_foot_pos
# Only process if inside polygon
if not point_in_polygon(fx, fy, POLYGON):
continue
# Try to match with existing tracked IDs by proximity
matched_existing = False
for tid in list(inside_state.keys()):
if tid in detected_ids:
continue # Already detected this frame
if tid in display_pos:
prev_x, prev_y = display_pos[tid]
# Check if head is near previous position
dist = np.sqrt((fx - prev_x)**2 + (fy - prev_y)**2)
if dist < 80: # pixels threshold
# Reactivate this ID using head detection
detected_ids.add(tid)
last_seen[tid] = now
prev_foot[tid] = (fx, fy)
matched_existing = True
head_only_ids.add(tid)
# Draw head detection (red for head-only recovery)
head_box = head_box_data[:4]
xyxyxyxy = head_box_data[4]
points = xyxyxyxy.astype(np.int32)
cv2.polylines(frame, [points], True, (0, 0, 255), 2)
cv2.putText(frame, f"ID {tid} (H)", (int(head_box[0]), int(head_box[1]) - 10),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)
# Continue tracking
inside_state[tid] = True
current_inside.append(tid)
# Map through homography
pt_img = np.array([[[fx, fy]]], dtype=np.float32)
mapped = cv2.perspectiveTransform(pt_img, H_img2patch)[0][0]
mx = float(np.clip(mapped[0], 0, patch_w - 1))
my = float(np.clip(mapped[1], 0, patch_h - 1))
# Smooth display position
if tid in display_pos:
px_prev, py_prev = display_pos[tid]
sx = SMOOTH_ALPHA
dx = px_prev*(1 - sx) + mx*sx
dy = py_prev*(1 - sx) + my*sx
else:
dx, dy = mx, my
display_pos[tid] = (dx, dy)
# Track travel distance
along_m = image_point_to_along_m((fx, fy))
current_projs.append((tid, along_m))
if tid not in prev_along:
prev_along[tid] = along_m
entry_along[tid] = along_m
prev_time[tid] = now
delta = along_m - prev_along.get(tid, along_m)
if delta > 0:
travel_distance[tid] += delta
prev_along[tid] = along_m
prev_time[tid] = now
break
# finalize exits after missing timeout
known_ids = set(list(inside_state.keys()) + list(last_seen.keys()))
for tid in list(known_ids):
if inside_state.get(tid, False) and tid not in detected_ids:
ls = last_seen.get(tid, None)
if ls is None:
continue
missing = now - ls
if missing > missing_timeout:
inside_state[tid] = False
if tid in entry_time:
accumulated_time[tid] += now - entry_time[tid]
exit_vid_time = ls - start_time
last_exit_vid[tid] = exit_vid_time
completed_times.append(accumulated_time[tid])
# Update sequential entry exit time
for entry in sequential_entries:
if entry['tid'] == tid and entry['exit_time'] is None:
entry['exit_time'] = exit_vid_time
entry['duration'] = accumulated_time[tid]
break
entry_time.pop(tid, None)
else:
# within occlusion grace window -> keep inside state
pass
# Reappearance inheritance logic (same as prior): copy neighbor state if ID lost & reappears
current_projs_map = {tid: a for tid, a in current_projs}
for tid, along in current_projs:
if tid in prev_along:
continue
candidates = []
for other_tid, other_al in current_projs_map.items():
if other_tid == tid:
continue
candidates.append((other_tid, other_al))
if not candidates and prev_along:
candidates = [(other_tid, prev_along_val) for other_tid, prev_along_val in prev_along.items() if other_tid != tid]
if not candidates:
prev_along[tid] = along
entry_along.setdefault(tid, along)
prev_time[tid] = now
continue
neighbor_tid, neighbor_al = min(candidates, key=lambda x: abs(x[1] - along))
if abs(neighbor_al - along) < max(0.5, sum(SEG_REAL_M)*0.5):
prev_along[tid] = prev_along.get(neighbor_tid, neighbor_al)
entry_along[tid] = entry_along.get(neighbor_tid, neighbor_al)
prev_time[tid] = now
accumulated_time[tid] = accumulated_time.get(neighbor_tid, 0.0)
if neighbor_tid in entry_time:
entry_time[tid] = entry_time[neighbor_tid]
else:
entry_time[tid] = now - accumulated_time[tid]
# also inherit crossed L1/L2 flags if neighbor had them (helps maintain global count consistency)
if crossed_l1_flag.get(neighbor_tid, False) and not crossed_l1_flag.get(tid, False):
crossed_l1_flag[tid] = True
if crossed_l2_counted.get(neighbor_tid, False) and not crossed_l2_counted.get(tid, False):
crossed_l2_counted[tid] = True
else:
prev_along[tid] = along
entry_along.setdefault(tid, along)
prev_time[tid] = now
# build display list sorted by along for consistent ordering
disp = []
for tid in current_inside:
if tid not in display_pos:
continue
dx, dy = display_pos[tid]
cur_al = prev_along.get(tid, entry_along.get(tid, 0.0))
t_inside = int(now - entry_time[tid]) if tid in entry_time else int(accumulated_time.get(tid, 0.0))
trav = travel_distance.get(tid, 0.0)
disp.append((tid, int(round(dx)), int(round(dy)), t_inside, trav, cur_al))
disp.sort(key=lambda x: x[5]) # by along
# draw patch dots and labels (no velocity)
for tid, xi, yi, t_inside, trav, _ in disp:
cv2.circle(patch, (xi, yi), 6, (0,0,255), -1)
cv2.putText(patch, f"ID {tid}", (xi+8, yi-8), cv2.FONT_HERSHEY_SIMPLEX, 0.45, (0,0,0), 1)
cv2.putText(patch, f"{t_inside}s {trav:.2f}m", (xi+8, yi+8), cv2.FONT_HERSHEY_SIMPLEX, 0.4, (0,0,0), 1)
# compute avg time taken from completed_times
avg_time_taken = float(np.mean(completed_times)) if len(completed_times) > 0 else 0.0
# top-right summary: show both counters
panel_h, panel_w = 220, 350
panel = np.ones((panel_h, panel_w, 3), dtype=np.uint8) * 255
cv2.putText(panel, "Zone Summary", (12, 24), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0,0,0), 2)
cv2.putText(panel, f"Inside count: {len(disp)}", (12, 58), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0,120,0), 2)
cv2.putText(panel, f"Global count: {global_counter}", (12, 92), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0,0,128), 2)
cv2.putText(panel, f"Avg time taken: {int(avg_time_taken)}s", (12, 126), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0,0,0), 2)
yv = 150
for tid, _, _, t_inside, trav, _ in disp[:8]:
cv2.putText(panel, f"ID {tid}: {t_inside}s, {trav:.2f}m", (12, yv), cv2.FONT_HERSHEY_SIMPLEX, 0.55, (50,50,50), 1)
yv += 18
final = np.hstack((frame, right_panel))
# place panel top-right inside right panel
panel_x = width + (RIGHT_PANEL_W - panel_w)//2
panel_y = 10
final[panel_y:panel_y+panel_h, panel_x:panel_x+panel_w] = panel
# place patch below panel
patch_x = width + (RIGHT_PANEL_W - patch_w)//2
patch_y = panel_y + panel_h + 10
if patch_y + patch_h > height:
patch_y = height - patch_h - 10
final[patch_y:patch_y+patch_h, patch_x:patch_x+patch_w] = patch
writer.write(np.ascontiguousarray(final))
# finalize
end_t = time.time()
for tid in list(entry_time.keys()):
accumulated_time[tid] += end_t - entry_time[tid]
exit_vid_time = last_seen.get(tid, end_t) - start_time
last_exit_vid[tid] = exit_vid_time
completed_times.append(accumulated_time[tid])
# Update sequential entry exit time
for entry in sequential_entries:
if entry['tid'] == tid and entry['exit_time'] is None:
entry['exit_time'] = exit_vid_time
entry['duration'] = accumulated_time[tid]
break
entry_time.pop(tid, None)
inside_state[tid] = False
cap.release()
writer.release()
# export excel (only >0)
# export excel with sequential person numbers
rows = []
for entry in sequential_entries:
if entry['exit_time'] is not None and entry['duration'] is not None and entry['duration'] > 0:
rows.append({
"Person": entry['person_num'],
"Time in": fmt(entry['entry_time']),
"Time out": fmt(entry['exit_time']),
"Time in queue (seconds)": round(float(entry['duration']), 2)
})
df = pd.DataFrame(rows, columns=["Person","Time in","Time out","Time in queue (seconds)"])
if len(df) > 0:
df.to_excel("person_times_2.xlsx", index=False)
else:
pd.DataFrame(columns=["Passenger","Time in","Time out","Time in queue (seconds)"]).to_excel("person_times_2.xlsx", index=False)
print("\nFinished. Output:", os.path.abspath(output_video_path))
print("Saved times:", os.path.abspath("person_times_2.xlsx"))
# # ---------------- Runner
# if __name__ == "__main__":
# CONFIG = {
# 'input_video_path': "sample_vid_o.mp4",
# 'output_video_path': "output24.avi",
# 'model_name': "yolo11x.pt",
# 'head_model_name': "head_detection_single_video_best.pt",
# 'conf_threshold': 0.3,
# 'img_size': 1280,
# 'use_gpu': True,
# 'enhance_frames': False,
# 'smooth_bbox_tracks': True,
# 'missing_timeout': 3.0
# }
# process_video(
# input_video_path = CONFIG['input_video_path'],
# output_video_path = CONFIG['output_video_path'],
# model_name = CONFIG['model_name'],
# head_model_name = CONFIG['head_model_name'],
# conf_threshold = CONFIG['conf_threshold'],
# img_size = CONFIG['img_size'],
# use_gpu = CONFIG['use_gpu'],
# enhance_frames = CONFIG['enhance_frames'],
# smooth_bbox_tracks = CONFIG['smooth_bbox_tracks'],
# missing_timeout = CONFIG['missing_timeout']
# )
# ---------------- Gradio Interface
import gradio as gr
import tempfile
import shutil
def gradio_process_video(input_video, conf_threshold=0.3, missing_timeout=3.0):
"""
Wrapper function for Gradio interface
"""
try:
# Create temporary directory for outputs
temp_dir = tempfile.mkdtemp()
# Define output paths
output_video_path = os.path.join(temp_dir, "output_tracking.mp4")
excel_path = os.path.join(temp_dir, "person_times.xlsx")
# Copy the excel file path for the process_video function to use
original_excel = "person_times_2.xlsx"
# Run the processing
CONFIG = {
'input_video_path': input_video,
'output_video_path': output_video_path,
'model_name': "yolo11x.pt",
'head_model_name': "head_detection_single_video_best.pt",
'conf_threshold': float(conf_threshold),
'img_size': 1280,
'use_gpu': torch.cuda.is_available(),
'enhance_frames': False,
'smooth_bbox_tracks': True,
'missing_timeout': float(missing_timeout)
}
process_video(
input_video_path=CONFIG['input_video_path'],
output_video_path=CONFIG['output_video_path'],
model_name=CONFIG['model_name'],
head_model_name=CONFIG['head_model_name'],
conf_threshold=CONFIG['conf_threshold'],
img_size=CONFIG['img_size'],
use_gpu=CONFIG['use_gpu'],
enhance_frames=CONFIG['enhance_frames'],
smooth_bbox_tracks=CONFIG['smooth_bbox_tracks'],
missing_timeout=CONFIG['missing_timeout']
)
# Copy the generated excel file to temp directory
if os.path.exists(original_excel):
shutil.copy(original_excel, excel_path)
return output_video_path, excel_path
except Exception as e:
print(f"Error processing video: {str(e)}")
import traceback
traceback.print_exc()
return None, None
# Create Gradio interface
with gr.Blocks(title="Queue Tracking System") as demo:
gr.Markdown(
"""
# 🎯 Queue Tracking & Analytics System
Upload a video to track people in a defined polygon area. The system will:
- Track people entering and exiting the zone
- Count directional crossings through L1 and L2 lines
- Calculate time spent in queue
- Measure travel distance
- Detect both full body and head-only detections
**Note:** Processing may take several minutes depending on video length.
"""
)
with gr.Row():
with gr.Column():
video_input = gr.Video(
label="Upload Video",
format="mp4"
)
conf_threshold = gr.Slider(
minimum=0.1,
maximum=0.9,
value=0.3,
step=0.05,
label="Detection Confidence Threshold",
info="Lower values detect more objects but may include false positives"
)
missing_timeout = gr.Slider(
minimum=1.0,
maximum=10.0,
value=3.0,
step=0.5,
label="Missing Timeout (seconds)",
info="How long to wait before considering a person has left the zone"
)
process_btn = gr.Button("🚀 Process Video", variant="primary", size="lg")
with gr.Column():
video_output = gr.Video(
label="Processed Video with Tracking",
format="mp4"
)
excel_output = gr.File(
label="Download Excel Report",
file_types=[".xlsx"]
)
gr.Markdown(
"""
### 📊 Output Information:
- **Processed Video**: Shows tracking overlay with IDs, polygon area, and crossing lines
- **Excel Report**: Contains entry/exit times and queue duration for each person
"""
)
gr.Markdown(
"""
---
### 🔧 Technical Details:
- Uses YOLO11x for person detection
- Custom head detection model for occlusion handling
- Homographic transformation for accurate spatial mapping
- ByteTrack for robust ID tracking
- Directional crossing detection (L1 → L2)
"""
)
# Connect the button to the processing function
process_btn.click(
fn=gradio_process_video,
inputs=[video_input, conf_threshold, missing_timeout],
outputs=[video_output, excel_output]
)
# Add examples if you have sample videos
gr.Examples(
examples=[
["sample_vid_o.mp4", 0.3, 3.0],
],
inputs=[video_input, conf_threshold, missing_timeout],
outputs=[video_output, excel_output],
fn=gradio_process_video,
cache_examples=False,
)
# Launch the app
if __name__ == "__main__":
demo.launch(
share=False, # Set to True if you want a temporary public link
server_name="0.0.0.0", # Important for Hugging Face Spaces
server_port=7860 # Default port for HF Spaces
)