Spaces:
Sleeping
Sleeping
| # run7.py | |
| # Updated to implement Option 1 directional crossing: | |
| # - Detect directional crossing of L1 then L2 (L1 coords and L2 coords provided) | |
| # - Maintain a global counter that increments only when an ID crosses L1 (outside->inside) then later crosses L2 (outside->inside) | |
| # - Maintain a live "inside polygon" counter | |
| # - Visualize both counters in Zone Summary panel | |
| # - Keeps all previous features: homography patch, foot-point mapping, travel distance, avg time, occlusion tolerance and reappearance inheritance | |
| # Paste and run. Output video and person_times.xlsx saved in working folder. | |
| import cv2 | |
| import numpy as np | |
| import time | |
| import torch | |
| import pandas as pd | |
| from collections import defaultdict, deque | |
| from scipy.ndimage import gaussian_filter1d | |
| from ultralytics import YOLO | |
| import os | |
| import platform | |
| import sys | |
| # Mac-specific optimizations | |
| if platform.system() == "Darwin": | |
| import os | |
| os.environ['PYTORCH_ENABLE_MPS_FALLBACK'] = '1' | |
| os.environ['OMP_NUM_THREADS'] = '1' | |
| # ---------------- Points in image (given) - adjust if needed | |
| A = (440.0, 829.0) | |
| B = (883.0, 928.0) | |
| C = (1052.0, 325.0) | |
| D = (739.0, 297.0) | |
| E = (727.0, 688.0) | |
| F = (893.0, 312.0) | |
| POLYGON = np.array([A, B, C, D], dtype=np.float32) | |
| # ---------------- Real-world segment lengths for path C -> B -> A -> D (meters) | |
| SEG_REAL_M = [5.0, 2.5, 5.0] # C->B, B->A, A->D | |
| # image path (order C,B,A,D) | |
| PATH_IMAGE = np.array([C, B, A, D], dtype=np.float32) | |
| # Patch base scaling (pixels per meter). Will adapt to fit. | |
| BASE_SCALE_PX_PER_M = 80.0 | |
| RIGHT_PANEL_W = 350 | |
| SMOOTH_ALPHA = 0.65 | |
| MISSING_TIMEOUT = 3.0 | |
| # ---------------- Lines (L1, L2) coordinates (image space) - use these for counting | |
| L1_p1 = (898.0, 322.0) | |
| L1_p2 = (1020.0, 453.0) | |
| L2_p1 = (786.0, 576.0) | |
| L2_p2 = (977.0, 607.0) | |
| # ---------------- Utilities | |
| def progress_bar(current, total, bar_length=30): | |
| if total <= 0: | |
| return | |
| ratio = current / total | |
| filled = int(ratio * bar_length) | |
| bar = "█" * filled + "-" * (bar_length - filled) | |
| print(f"\r[{bar}] {int(ratio * 100)}% Frame {current}/{total}", end="") | |
| def point_in_polygon(cx, cy, polygon): | |
| return cv2.pointPolygonTest(polygon.astype(np.int32), (int(cx), int(cy)), False) >= 0 | |
| def euclid(a, b): | |
| return float(np.hypot(a[0]-b[0], a[1]-b[1])) | |
| def fmt(t): | |
| return time.strftime('%H:%M:%S', time.gmtime(t)) | |
| def calculate_foot_from_head(head_box, head_center): | |
| """Calculate foot position from head detection.""" | |
| x1, y1, x2, y2 = head_box | |
| head_cx, head_cy = head_center | |
| head_height = y2 - y1 | |
| body_length_est = head_height * 5.5 | |
| foot_x = head_cx | |
| foot_y = head_cy + body_length_est | |
| return foot_x, foot_y | |
| def nms_obb(boxes, scores, threshold=0.4): | |
| """Non-Maximum Suppression for Oriented Bounding Boxes""" | |
| if len(boxes) == 0: | |
| return [] | |
| boxes_np = np.array(boxes) | |
| scores_np = np.array(scores) | |
| x_coords = boxes_np[:, 0::2] | |
| y_coords = boxes_np[:, 1::2] | |
| x_min = np.min(x_coords, axis=1) | |
| y_min = np.min(y_coords, axis=1) | |
| x_max = np.max(x_coords, axis=1) | |
| y_max = np.max(y_coords, axis=1) | |
| areas = (x_max - x_min) * (y_max - y_min) | |
| order = scores_np.argsort()[::-1] | |
| keep = [] | |
| while order.size > 0: | |
| i = order[0] | |
| keep.append(i) | |
| xx1 = np.maximum(x_min[i], x_min[order[1:]]) | |
| yy1 = np.maximum(y_min[i], y_min[order[1:]]) | |
| xx2 = np.minimum(x_max[i], x_max[order[1:]]) | |
| yy2 = np.minimum(y_max[i], y_max[order[1:]]) | |
| w = np.maximum(0.0, xx2 - xx1) | |
| h = np.maximum(0.0, yy2 - yy1) | |
| intersection = w * h | |
| union = areas[i] + areas[order[1:]] - intersection | |
| iou = intersection / union | |
| inds = np.where(iou <= threshold)[0] | |
| order = order[inds + 1] | |
| return keep | |
| # ---------------- Project point onto polyline (returns along distance in px and proj point) | |
| def project_point_to_polyline(pt, poly): | |
| best_dist = None | |
| best_proj = None | |
| best_cum = 0.0 | |
| cum = 0.0 | |
| for i in range(1, len(poly)): | |
| a = np.array(poly[i-1], dtype=np.float32) | |
| b = np.array(poly[i], dtype=np.float32) | |
| v = b - a | |
| w = np.array(pt, dtype=np.float32) - a | |
| seg_len = float(np.hypot(v[0], v[1])) | |
| if seg_len == 0: | |
| t = 0.0 | |
| proj = a.copy() | |
| else: | |
| t = float(np.dot(w, v) / (seg_len*seg_len)) | |
| t = max(0.0, min(1.0, t)) | |
| proj = a + t*v | |
| d = float(np.hypot(proj[0]-pt[0], proj[1]-pt[1])) | |
| along_px = cum + t * seg_len | |
| if best_dist is None or d < best_dist: | |
| best_dist = d | |
| best_proj = proj | |
| best_cum = along_px | |
| cum += seg_len | |
| return float(best_cum), (float(best_proj[0]), float(best_proj[1])) | |
| def polyline_pixel_lengths(poly): | |
| return [euclid(poly[i-1], poly[i]) for i in range(1, len(poly))] | |
| # ---------------- Compute conversion per segment (image) | |
| img_seg_px_lengths = polyline_pixel_lengths(PATH_IMAGE) | |
| if len(img_seg_px_lengths) != len(SEG_REAL_M): | |
| raise RuntimeError("PATH_IMAGE and SEG_REAL_M length mismatch") | |
| seg_px_to_m = [] | |
| for px_len, m_len in zip(img_seg_px_lengths, SEG_REAL_M): | |
| seg_px_to_m.append((m_len / px_len) if px_len > 1e-6 else 0.0) | |
| # helper: compute along_m from an image point using image PATH_IMAGE | |
| def image_point_to_along_m(pt): | |
| along_px, _ = project_point_to_polyline(pt, PATH_IMAGE) | |
| px_cum = 0.0 | |
| cum_m = 0.0 | |
| for i, seg_px in enumerate(img_seg_px_lengths): | |
| next_px = px_cum + seg_px | |
| if along_px <= next_px + 1e-9: | |
| offset_px = along_px - px_cum | |
| along_m = cum_m + offset_px * seg_px_to_m[i] | |
| return float(max(0.0, min(sum(SEG_REAL_M), along_m))) | |
| px_cum = next_px | |
| cum_m += SEG_REAL_M[i] | |
| return float(sum(SEG_REAL_M)) | |
| # ---------------- Build patch rectangle layout (pixel coordinates) | |
| def build_patch_layout(scale_px_per_m): | |
| margin = 18 | |
| rect_w_px = int(2.5 * scale_px_per_m) | |
| rect_h_px = int(5.0 * scale_px_per_m) | |
| patch_w = rect_w_px + 2*margin | |
| patch_h = rect_h_px + 2*margin | |
| left_x = margin | |
| right_x = margin + rect_w_px | |
| top_y = margin | |
| bottom_y = margin + rect_h_px | |
| # top row: D (left-top), F (mid-top), C (right-top) | |
| D_p = (left_x, top_y) | |
| F_p = ( (left_x + right_x)//2, top_y ) | |
| C_p = (right_x, top_y) | |
| A_p = (left_x, bottom_y) | |
| B_p = (right_x, bottom_y) | |
| # E point down from F | |
| E_p = (F_p[0], top_y + int(rect_h_px * 0.55)) | |
| path_patch = np.array([C_p, B_p, A_p, D_p], dtype=np.float32) # C->B->A->D | |
| extras = {"patch_w": patch_w, "patch_h": patch_h, "D": D_p, "F": F_p, "C": C_p, "A": A_p, "B": B_p, "E": E_p, "scale": scale_px_per_m} | |
| return path_patch, extras | |
| PATCH_PATH, PATCH_EXTRAS = build_patch_layout(BASE_SCALE_PX_PER_M) | |
| PATCH_W = PATCH_EXTRAS["patch_w"] | |
| PATCH_H = PATCH_EXTRAS["patch_h"] | |
| # ---------------- Line helpers for crossing detection | |
| def line_coeffs(p1, p2): | |
| # returns a,b,c for line ax+by+c=0 | |
| (x1,y1), (x2,y2) = p1, p2 | |
| a = y1 - y2 | |
| b = x2 - x1 | |
| c = x1*y2 - x2*y1 | |
| return a, b, c | |
| def signed_dist_to_line(p, line_coeff): | |
| a,b,c = line_coeff | |
| x,y = p | |
| return (a*x + b*y + c) / (np.hypot(a,b) + 1e-12) | |
| def segment_intersects(a1,a2,b1,b2): | |
| # standard segment intersection test | |
| def ccw(A,B,C): | |
| return (C[1]-A[1])*(B[0]-A[0]) > (B[1]-A[1])*(C[0]-A[0]) | |
| A=a1; B=a2; C=b1; D=b2 | |
| return (ccw(A,C,D) != ccw(B,C,D)) and (ccw(A,B,C) != ccw(A,B,D)) | |
| L1_coeff = line_coeffs(L1_p1, L1_p2) | |
| L2_coeff = line_coeffs(L2_p1, L2_p2) | |
| # Determine inside side for each line using polygon centroid: | |
| poly_centroid = tuple(np.mean(POLYGON, axis=0).tolist()) | |
| L1_inside_sign = np.sign(signed_dist_to_line(poly_centroid, L1_coeff)) | |
| if L1_inside_sign == 0: | |
| L1_inside_sign = 1.0 | |
| L2_inside_sign = np.sign(signed_dist_to_line(poly_centroid, L2_coeff)) | |
| if L2_inside_sign == 0: | |
| L2_inside_sign = 1.0 | |
| # ---------------- BBox smoother | |
| class BBoxSmoother: | |
| def __init__(self, buffer_size=5): | |
| self.buf = buffer_size | |
| self.hist = defaultdict(lambda: deque(maxlen=buffer_size)) | |
| def smooth(self, boxes, ids): | |
| out = [] | |
| for box, tid in zip(boxes, ids): | |
| self.hist[tid].append(box) | |
| arr = np.array(self.hist[tid]) | |
| if arr.shape[0] >= 3: | |
| sm = gaussian_filter1d(arr, sigma=1, axis=0)[-1] | |
| else: | |
| sm = arr[-1] | |
| out.append(sm) | |
| return np.array(out) | |
| # ---------------- Main processing function | |
| def process_video( | |
| input_video_path="crop_video.mp4", | |
| output_video_path="people_polygon_tracking_corrected.avi", | |
| model_name="yolo11x.pt", | |
| head_model_name="head_detection_model.pt", | |
| conf_threshold=0.3, | |
| img_size=1280, | |
| use_gpu=True, | |
| enhance_frames=False, | |
| smooth_bbox_tracks=True, | |
| missing_timeout=MISSING_TIMEOUT | |
| ): | |
| device = "cuda" if torch.cuda.is_available() and use_gpu else "cpu" | |
| model = YOLO(model_name) | |
| PERSON_CLASS = 0 | |
| head_model = YOLO(head_model_name) # Your OBB head detection model | |
| HEAD_CLASS = 0 | |
| bbox_smoother = BBoxSmoother(5) if smooth_bbox_tracks else None | |
| # persistent state | |
| inside_state = {} | |
| entry_time = {} | |
| accumulated_time = defaultdict(float) | |
| first_entry_vid = {} | |
| last_exit_vid = {} | |
| last_seen = {} | |
| prev_along = {} | |
| prev_time = {} | |
| entry_along = {} | |
| travel_distance = defaultdict(float) | |
| display_pos = {} | |
| head_foot_positions = {} # Stores head detections with estimated foot positions | |
| person_only_ids = set() # Track person-only detections | |
| head_only_ids = set() # Track head-only detections | |
| # crossing trackers | |
| prev_foot = {} # {id: (x,y)} previous foot coordinate (image space) | |
| crossed_l1_flag = {} # {id: bool} whether this id has crossed L1 (in required direction) and not yet used to count | |
| crossed_l2_counted = {} # {id: bool} whether this id has already triggered the global count by crossing L2 after L1 | |
| prev_l1_dist = {} # Track distance to L1 | |
| prev_l2_dist = {} # Track distance to L2 | |
| global_counter = 0 # counts completed L1->L2 sequences | |
| completed_times = [] # for avg time taken | |
| sequential_entries = [] | |
| cap = cv2.VideoCapture(input_video_path) | |
| if not cap.isOpened(): | |
| raise RuntimeError("Cannot open input video: " + input_video_path) | |
| fps = int(cap.get(cv2.CAP_PROP_FPS)) or 25 | |
| width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| out_w = width + RIGHT_PANEL_W | |
| out_h = height | |
| fourcc = cv2.VideoWriter_fourcc(*'mp4v') # or 'H264' or 'avc1' | |
| output_video_path = "output23.mp4" # Must be .mp4 extension | |
| writer = cv2.VideoWriter(output_video_path, fourcc, fps, (out_w, out_h)) | |
| if not writer.isOpened(): | |
| raise RuntimeError("Failed to open VideoWriter. Try different codec or path.") | |
| # adjust patch scale if too tall | |
| PATCH_PATH_local = PATCH_PATH.copy() | |
| patch_w = PATCH_W | |
| patch_h = PATCH_H | |
| patch_scale = PATCH_EXTRAS["scale"] | |
| if patch_h > height - 40: | |
| factor = (height - 60) / patch_h | |
| PATCH_PATH_local = PATCH_PATH_local * factor | |
| patch_w = int(patch_w * factor) | |
| patch_h = int(patch_h * factor) | |
| patch_scale = patch_scale * factor | |
| # Create homography from POLYGON (image A,B,C,D) to rect corners in patch coordinates (A_p,B_p,C_p,D_p) | |
| A_p = PATCH_EXTRAS["A"] | |
| B_p = PATCH_EXTRAS["B"] | |
| C_p = PATCH_EXTRAS["C"] | |
| D_p = PATCH_EXTRAS["D"] | |
| dest_rect = np.array([A_p, B_p, C_p, D_p], dtype=np.float32) | |
| H_img2patch = cv2.getPerspectiveTransform(POLYGON.astype(np.float32), dest_rect.astype(np.float32)) | |
| start_time = time.time() | |
| frame_idx = 0 | |
| # precompute line endpoints & ints for visualization and intersection tests | |
| L1 = (L1_p1, L1_p2) | |
| L2 = (L2_p1, L2_p2) | |
| while True: | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| frame_idx += 1 | |
| progress_bar(frame_idx, total_frames) | |
| now = time.time() | |
| vid_seconds = now - start_time | |
| if enhance_frames: | |
| frame = cv2.fastNlMeansDenoisingColored(frame, None, 5,5,7,21) | |
| results = model.track( | |
| frame, | |
| persist=True, | |
| tracker="bytetrack.yaml", | |
| classes=[PERSON_CLASS], | |
| conf=conf_threshold, | |
| iou=0.5, | |
| imgsz=img_size, | |
| device=device, | |
| half=use_gpu, | |
| verbose=False | |
| ) | |
| # Head detection (NEW - runs in parallel) | |
| head_results = head_model(frame, conf=conf_threshold, classes=[HEAD_CLASS], verbose=False)[0] | |
| # Process head detections | |
| obb_boxes = [] | |
| obb_scores = [] | |
| obb_data = [] | |
| head_foot_positions = {} # {estimated_foot_pos: (head_box, conf)} | |
| if head_results.obb is not None and len(head_results.obb) > 0: | |
| for obb in head_results.obb: | |
| xyxyxyxy = obb.xyxyxyxy[0].cpu().numpy() | |
| conf = float(obb.conf[0]) | |
| if conf < conf_threshold: | |
| continue | |
| obb_boxes.append(xyxyxyxy.flatten().tolist()) | |
| obb_scores.append(conf) | |
| obb_data.append((xyxyxyxy, conf)) | |
| # Apply NMS to head detections | |
| if len(obb_boxes) > 0: | |
| keep_indices = nms_obb(obb_boxes, obb_scores, 0.4) | |
| for idx in keep_indices: | |
| xyxyxyxy, conf = obb_data[idx] | |
| # Convert OBB to axis-aligned bbox | |
| x_min = int(xyxyxyxy[:, 0].min()) | |
| y_min = int(xyxyxyxy[:, 1].min()) | |
| x_max = int(xyxyxyxy[:, 0].max()) | |
| y_max = int(xyxyxyxy[:, 1].max()) | |
| head_cx = (x_min + x_max) / 2.0 | |
| head_cy = float(y_min) | |
| # Calculate foot from head | |
| foot_x, foot_y = calculate_foot_from_head( | |
| [x_min, y_min, x_max, y_max], | |
| (head_cx, head_cy) | |
| ) | |
| head_foot_positions[(foot_x, foot_y)] = ((x_min, y_min, x_max, y_max, xyxyxyxy), conf) | |
| # draw polygon on frame | |
| cv2.polylines(frame, [POLYGON.astype(np.int32)], True, (255,0,0), 3) | |
| # draw L1 and L2 on frame (blue) | |
| cv2.line(frame, tuple(map(int, L1_p1)), tuple(map(int, L1_p2)), (255,180,0), 3) | |
| cv2.line(frame, tuple(map(int, L2_p1)), tuple(map(int, L2_p2)), (255,180,0), 3) | |
| right_panel = np.ones((height, RIGHT_PANEL_W, 3), dtype=np.uint8) * 40 | |
| patch = np.ones((patch_h, patch_w, 3), dtype=np.uint8) * 255 | |
| # draw patch structure: rectangle and center divider | |
| A_px = (int(dest_rect[0][0]), int(dest_rect[0][1])) | |
| B_px = (int(dest_rect[1][0]), int(dest_rect[1][1])) | |
| C_px = (int(dest_rect[2][0]), int(dest_rect[2][1])) | |
| D_px = (int(dest_rect[3][0]), int(dest_rect[3][1])) | |
| # walls (thick black lines) | |
| cv2.line(patch, A_px, D_px, (0,0,0), 6) # left | |
| cv2.line(patch, A_px, B_px, (0,0,0), 6) # bottom | |
| cv2.line(patch, B_px, C_px, (0,0,0), 6) # right | |
| cv2.line(patch, D_px, C_px, (0,0,0), 6) # top | |
| # center divider F->E | |
| F_px = ( (D_px[0] + C_px[0])//2, D_px[1] ) | |
| E_px = (F_px[0], D_px[1] + int((patch_h) * 0.5)) | |
| cv2.line(patch, F_px, E_px, (0,0,0), 6) | |
| for p in [A_px, B_px, C_px, D_px, F_px, E_px]: | |
| cv2.circle(patch, p, 5, (0,0,0), -1) | |
| # Match person detections with head detections | |
| person_head_matches = {} # {person_id: head_foot_pos} | |
| matched_heads = set() | |
| b = results[0].boxes | |
| detected_ids = set() | |
| current_inside = [] | |
| current_projs = [] | |
| if b is not None and b.id is not None: | |
| boxes = b.xyxy.cpu().numpy() | |
| ids = b.id.cpu().numpy().astype(int) | |
| if bbox_smoother is not None: | |
| boxes = bbox_smoother.smooth(boxes, ids) | |
| # First pass: match person detections with head detections | |
| for box, tid in zip(boxes, ids): | |
| x1, y1, x2, y2 = map(int, box) | |
| person_foot_x = float((x1 + x2) / 2.0) | |
| person_foot_y = float(y2) | |
| # Find closest head detection within reasonable distance | |
| best_head = None | |
| best_dist = 100 # pixels threshold | |
| for head_foot_pos, (head_box_data, head_conf) in head_foot_positions.items(): | |
| head_fx, head_fy = head_foot_pos | |
| dist = np.sqrt((person_foot_x - head_fx)**2 + (person_foot_y - head_fy)**2) | |
| # Check if head is roughly above person bbox (y_head < y_person_top) | |
| head_box = head_box_data[:4] | |
| if head_box[3] < y1 + 50: # head bottom should be near person top | |
| if dist < best_dist and head_foot_pos not in matched_heads: | |
| best_dist = dist | |
| best_head = head_foot_pos | |
| if best_head: | |
| person_head_matches[tid] = best_head | |
| matched_heads.add(best_head) | |
| person_only_ids.discard(tid) | |
| else: | |
| person_only_ids.add(tid) | |
| for box, tid in zip(boxes, ids): | |
| x1, y1, x2, y2 = map(int, box) | |
| # Use head-derived foot if available, otherwise use person bbox foot | |
| if tid in person_head_matches: | |
| fx, fy = person_head_matches[tid] | |
| head_box_data, head_conf = head_foot_positions[person_head_matches[tid]] | |
| head_box = head_box_data[:4] | |
| xyxyxyxy = head_box_data[4] | |
| # Draw head OBB (cyan for matched detection) | |
| points = xyxyxyxy.astype(np.int32) | |
| cv2.polylines(frame, [points], True, (255, 255, 0), 2) | |
| else: | |
| fx = float((x1 + x2) / 2.0) | |
| fy = float(y2) # bottom center (foot) | |
| detected_ids.add(tid) | |
| last_seen[tid] = now | |
| inside = point_in_polygon(fx, fy, POLYGON) | |
| prev = inside_state.get(tid, False) | |
| # maintain prev_foot for intersection tests | |
| prev_pt = prev_foot.get(tid, None) | |
| current_pt = (fx, fy) | |
| # Crossing detection for L1 | |
| # if prev_pt is not None: | |
| # # check intersection with L1 | |
| # inter_l1 = segment_intersects(prev_pt, current_pt, L1_p1, L1_p2) | |
| # if inter_l1: | |
| # # check direction: we want prev_sign != curr_sign and curr_sign == inside sign | |
| # prev_sign = np.sign(signed_dist_to_line(prev_pt, L1_coeff)) | |
| # curr_sign = np.sign(signed_dist_to_line(current_pt, L1_coeff)) | |
| # if prev_sign == 0: | |
| # prev_sign = -curr_sign if curr_sign != 0 else 1.0 | |
| # if curr_sign == 0: | |
| # curr_sign = prev_sign | |
| # if prev_sign != curr_sign and curr_sign == L1_inside_sign: | |
| # # crossed L1 in correct direction (outside -> inside) | |
| # crossed_l1_flag[tid] = True | |
| # # check intersection with L2 | |
| # inter_l2 = segment_intersects(prev_pt, current_pt, L2_p1, L2_p2) | |
| # if inter_l2: | |
| # prev_sign = np.sign(signed_dist_to_line(prev_pt, L2_coeff)) | |
| # curr_sign = np.sign(signed_dist_to_line(current_pt, L2_coeff)) | |
| # if prev_sign == 0: | |
| # prev_sign = -curr_sign if curr_sign != 0 else 1.0 | |
| # if curr_sign == 0: | |
| # curr_sign = prev_sign | |
| # if prev_sign != curr_sign and curr_sign == L2_inside_sign: | |
| # # crossed L2 in correct direction; if previously crossed L1 and not yet counted => count | |
| # if crossed_l1_flag.get(tid, False) and not crossed_l2_counted.get(tid, False): | |
| # global_counter += 1 | |
| # crossed_l2_counted[tid] = True | |
| # # Record the sequential entry | |
| # entry_vid_time = first_entry_vid.get(tid, vid_seconds) | |
| # sequential_entries.append({ | |
| # 'person_num': global_counter, | |
| # 'tid': tid, | |
| # 'entry_time': entry_vid_time, | |
| # 'exit_time': None, | |
| # 'duration': None | |
| # }) | |
| # # once person completed crossing sequence, we keep their travel/time records intact | |
| # update prev_foot | |
| # prev_foot[tid] = current_pt | |
| # maintain prev_foot for intersection tests | |
| prev_pt = prev_foot.get(tid, None) | |
| current_pt = (fx, fy) | |
| # Calculate signed distances to both lines | |
| curr_l1_dist = signed_dist_to_line(current_pt, L1_coeff) | |
| curr_l2_dist = signed_dist_to_line(current_pt, L2_coeff) | |
| # Robust crossing detection | |
| if prev_pt is not None and tid in prev_l1_dist and tid in prev_l2_dist: | |
| prev_l1 = prev_l1_dist[tid] | |
| prev_l2 = prev_l2_dist[tid] | |
| # === L1 CROSSING (3 detection methods) === | |
| # Method 1: Segment intersection (current method) | |
| inter_l1 = segment_intersects(prev_pt, current_pt, L1_p1, L1_p2) | |
| # Method 2: Sign change in distance | |
| prev_sign_l1 = np.sign(prev_l1) | |
| curr_sign_l1 = np.sign(curr_l1_dist) | |
| if prev_sign_l1 == 0: | |
| prev_sign_l1 = 1.0 | |
| if curr_sign_l1 == 0: | |
| curr_sign_l1 = prev_sign_l1 | |
| sign_change_l1 = (prev_sign_l1 != curr_sign_l1) | |
| correct_dir_l1 = (curr_sign_l1 == L1_inside_sign) | |
| # Method 3: Close proximity check (catches near-misses) | |
| close_to_l1 = abs(curr_l1_dist) < 35 # within 40 pixels | |
| was_far_l1 = abs(prev_l1) > 40 # was at least 20 pixels away | |
| moving_toward_l1 = abs(curr_l1_dist) < abs(prev_l1) # getting closer | |
| # Trigger L1 crossing if ANY method detects it | |
| if (inter_l1 or (sign_change_l1 and correct_dir_l1) or | |
| (close_to_l1 and was_far_l1 and moving_toward_l1 and correct_dir_l1)): | |
| if inside and not crossed_l1_flag.get(tid, False): | |
| crossed_l1_flag[tid] = True | |
| print(f"L1 crossed by ID {tid}") | |
| # === L2 CROSSING (3 detection methods) === | |
| # Method 1: Segment intersection | |
| inter_l2 = segment_intersects(prev_pt, current_pt, L2_p1, L2_p2) | |
| # Method 2: Sign change in distance | |
| prev_sign_l2 = np.sign(prev_l2) | |
| curr_sign_l2 = np.sign(curr_l2_dist) | |
| if prev_sign_l2 == 0: | |
| prev_sign_l2 = 1.0 | |
| if curr_sign_l2 == 0: | |
| curr_sign_l2 = prev_sign_l2 | |
| sign_change_l2 = (prev_sign_l2 != curr_sign_l2) | |
| correct_dir_l2 = (curr_sign_l2 == L2_inside_sign) | |
| # Method 3: Close proximity check | |
| close_to_l2 = abs(curr_l2_dist) < 40 | |
| was_far_l2 = abs(prev_l2) > 20 | |
| moving_toward_l2 = abs(curr_l2_dist) < abs(prev_l2) | |
| # Trigger L2 crossing if ANY method detects it | |
| if (inter_l2 or | |
| (sign_change_l2 and correct_dir_l2) or | |
| (close_to_l2 and was_far_l2 and moving_toward_l2 and correct_dir_l2)): | |
| # Count only if L1 was already crossed and not yet counted | |
| if inside and crossed_l1_flag.get(tid, False) and not crossed_l2_counted.get(tid, False): | |
| global_counter += 1 | |
| crossed_l2_counted[tid] = True | |
| print(f"✓ COUNTED: ID {tid} | Global count now: {global_counter}") | |
| entry_vid_time = first_entry_vid.get(tid, vid_seconds) | |
| sequential_entries.append({ | |
| 'person_num': global_counter, | |
| 'tid': tid, | |
| 'entry_time': entry_vid_time, | |
| 'exit_time': None, | |
| 'duration': None | |
| }) | |
| # Update distance tracking for next frame | |
| prev_l1_dist[tid] = curr_l1_dist | |
| prev_l2_dist[tid] = curr_l2_dist | |
| prev_foot[tid] = current_pt | |
| if inside and not prev: | |
| inside_state[tid] = True | |
| if tid not in entry_time: | |
| entry_time[tid] = now | |
| if tid not in first_entry_vid: | |
| first_entry_vid[tid] = vid_seconds | |
| if tid not in accumulated_time: | |
| accumulated_time[tid] = 0.0 | |
| if tid not in travel_distance: | |
| travel_distance[tid] = 0.0 | |
| # draw bbox only for inside persons | |
| if inside: | |
| # Green if matched with head, yellow if person-only | |
| color = (0, 200, 0) if tid in person_head_matches else (0, 200, 200) | |
| cv2.rectangle(frame, (x1,y1), (x2,y2), color, 2) | |
| cv2.putText(frame, f"ID {tid}", (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.8, color, 2) | |
| # map foot point through homography to patch coordinates (this is the key) | |
| pt_img = np.array([[[fx, fy]]], dtype=np.float32) | |
| mapped = cv2.perspectiveTransform(pt_img, H_img2patch)[0][0] | |
| mx = float(np.clip(mapped[0], 0, patch_w - 1)) | |
| my = float(np.clip(mapped[1], 0, patch_h - 1)) | |
| # smooth display position | |
| if tid in display_pos: | |
| px_prev, py_prev = display_pos[tid] | |
| sx = SMOOTH_ALPHA | |
| dx = px_prev*(1 - sx) + mx*sx | |
| dy = py_prev*(1 - sx) + my*sx | |
| else: | |
| dx, dy = mx, my | |
| display_pos[tid] = (dx, dy) | |
| current_inside.append(tid) | |
| # compute along_m using image-based method for metric consistency | |
| along_m = image_point_to_along_m((fx, fy)) | |
| current_projs.append((tid, along_m)) | |
| # initialize prev_along if first time | |
| if tid not in prev_along: | |
| prev_along[tid] = along_m | |
| entry_along[tid] = along_m | |
| prev_time[tid] = now | |
| # compute forward-only travel distance | |
| delta = along_m - prev_along.get(tid, along_m) | |
| if delta > 0: | |
| travel_distance[tid] += delta | |
| prev_along[tid] = along_m | |
| prev_time[tid] = now | |
| for head_foot_pos, (head_box_data, head_conf) in head_foot_positions.items(): | |
| if head_foot_pos in matched_heads: | |
| continue # Already matched with a person | |
| fx, fy = head_foot_pos | |
| # Only process if inside polygon | |
| if not point_in_polygon(fx, fy, POLYGON): | |
| continue | |
| # Try to match with existing tracked IDs by proximity | |
| matched_existing = False | |
| for tid in list(inside_state.keys()): | |
| if tid in detected_ids: | |
| continue # Already detected this frame | |
| if tid in display_pos: | |
| prev_x, prev_y = display_pos[tid] | |
| # Check if head is near previous position | |
| dist = np.sqrt((fx - prev_x)**2 + (fy - prev_y)**2) | |
| if dist < 80: # pixels threshold | |
| # Reactivate this ID using head detection | |
| detected_ids.add(tid) | |
| last_seen[tid] = now | |
| prev_foot[tid] = (fx, fy) | |
| matched_existing = True | |
| head_only_ids.add(tid) | |
| # Draw head detection (red for head-only recovery) | |
| head_box = head_box_data[:4] | |
| xyxyxyxy = head_box_data[4] | |
| points = xyxyxyxy.astype(np.int32) | |
| cv2.polylines(frame, [points], True, (0, 0, 255), 2) | |
| cv2.putText(frame, f"ID {tid} (H)", (int(head_box[0]), int(head_box[1]) - 10), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2) | |
| # Continue tracking | |
| inside_state[tid] = True | |
| current_inside.append(tid) | |
| # Map through homography | |
| pt_img = np.array([[[fx, fy]]], dtype=np.float32) | |
| mapped = cv2.perspectiveTransform(pt_img, H_img2patch)[0][0] | |
| mx = float(np.clip(mapped[0], 0, patch_w - 1)) | |
| my = float(np.clip(mapped[1], 0, patch_h - 1)) | |
| # Smooth display position | |
| if tid in display_pos: | |
| px_prev, py_prev = display_pos[tid] | |
| sx = SMOOTH_ALPHA | |
| dx = px_prev*(1 - sx) + mx*sx | |
| dy = py_prev*(1 - sx) + my*sx | |
| else: | |
| dx, dy = mx, my | |
| display_pos[tid] = (dx, dy) | |
| # Track travel distance | |
| along_m = image_point_to_along_m((fx, fy)) | |
| current_projs.append((tid, along_m)) | |
| if tid not in prev_along: | |
| prev_along[tid] = along_m | |
| entry_along[tid] = along_m | |
| prev_time[tid] = now | |
| delta = along_m - prev_along.get(tid, along_m) | |
| if delta > 0: | |
| travel_distance[tid] += delta | |
| prev_along[tid] = along_m | |
| prev_time[tid] = now | |
| break | |
| # finalize exits after missing timeout | |
| known_ids = set(list(inside_state.keys()) + list(last_seen.keys())) | |
| for tid in list(known_ids): | |
| if inside_state.get(tid, False) and tid not in detected_ids: | |
| ls = last_seen.get(tid, None) | |
| if ls is None: | |
| continue | |
| missing = now - ls | |
| if missing > missing_timeout: | |
| inside_state[tid] = False | |
| if tid in entry_time: | |
| accumulated_time[tid] += now - entry_time[tid] | |
| exit_vid_time = ls - start_time | |
| last_exit_vid[tid] = exit_vid_time | |
| completed_times.append(accumulated_time[tid]) | |
| # Update sequential entry exit time | |
| for entry in sequential_entries: | |
| if entry['tid'] == tid and entry['exit_time'] is None: | |
| entry['exit_time'] = exit_vid_time | |
| entry['duration'] = accumulated_time[tid] | |
| break | |
| entry_time.pop(tid, None) | |
| else: | |
| # within occlusion grace window -> keep inside state | |
| pass | |
| # Reappearance inheritance logic (same as prior): copy neighbor state if ID lost & reappears | |
| current_projs_map = {tid: a for tid, a in current_projs} | |
| for tid, along in current_projs: | |
| if tid in prev_along: | |
| continue | |
| candidates = [] | |
| for other_tid, other_al in current_projs_map.items(): | |
| if other_tid == tid: | |
| continue | |
| candidates.append((other_tid, other_al)) | |
| if not candidates and prev_along: | |
| candidates = [(other_tid, prev_along_val) for other_tid, prev_along_val in prev_along.items() if other_tid != tid] | |
| if not candidates: | |
| prev_along[tid] = along | |
| entry_along.setdefault(tid, along) | |
| prev_time[tid] = now | |
| continue | |
| neighbor_tid, neighbor_al = min(candidates, key=lambda x: abs(x[1] - along)) | |
| if abs(neighbor_al - along) < max(0.5, sum(SEG_REAL_M)*0.5): | |
| prev_along[tid] = prev_along.get(neighbor_tid, neighbor_al) | |
| entry_along[tid] = entry_along.get(neighbor_tid, neighbor_al) | |
| prev_time[tid] = now | |
| accumulated_time[tid] = accumulated_time.get(neighbor_tid, 0.0) | |
| if neighbor_tid in entry_time: | |
| entry_time[tid] = entry_time[neighbor_tid] | |
| else: | |
| entry_time[tid] = now - accumulated_time[tid] | |
| # also inherit crossed L1/L2 flags if neighbor had them (helps maintain global count consistency) | |
| if crossed_l1_flag.get(neighbor_tid, False) and not crossed_l1_flag.get(tid, False): | |
| crossed_l1_flag[tid] = True | |
| if crossed_l2_counted.get(neighbor_tid, False) and not crossed_l2_counted.get(tid, False): | |
| crossed_l2_counted[tid] = True | |
| else: | |
| prev_along[tid] = along | |
| entry_along.setdefault(tid, along) | |
| prev_time[tid] = now | |
| # build display list sorted by along for consistent ordering | |
| disp = [] | |
| for tid in current_inside: | |
| if tid not in display_pos: | |
| continue | |
| dx, dy = display_pos[tid] | |
| cur_al = prev_along.get(tid, entry_along.get(tid, 0.0)) | |
| t_inside = int(now - entry_time[tid]) if tid in entry_time else int(accumulated_time.get(tid, 0.0)) | |
| trav = travel_distance.get(tid, 0.0) | |
| disp.append((tid, int(round(dx)), int(round(dy)), t_inside, trav, cur_al)) | |
| disp.sort(key=lambda x: x[5]) # by along | |
| # draw patch dots and labels (no velocity) | |
| for tid, xi, yi, t_inside, trav, _ in disp: | |
| cv2.circle(patch, (xi, yi), 6, (0,0,255), -1) | |
| cv2.putText(patch, f"ID {tid}", (xi+8, yi-8), cv2.FONT_HERSHEY_SIMPLEX, 0.45, (0,0,0), 1) | |
| cv2.putText(patch, f"{t_inside}s {trav:.2f}m", (xi+8, yi+8), cv2.FONT_HERSHEY_SIMPLEX, 0.4, (0,0,0), 1) | |
| # compute avg time taken from completed_times | |
| avg_time_taken = float(np.mean(completed_times)) if len(completed_times) > 0 else 0.0 | |
| # top-right summary: show both counters | |
| panel_h, panel_w = 220, 350 | |
| panel = np.ones((panel_h, panel_w, 3), dtype=np.uint8) * 255 | |
| cv2.putText(panel, "Zone Summary", (12, 24), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0,0,0), 2) | |
| cv2.putText(panel, f"Inside count: {len(disp)}", (12, 58), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0,120,0), 2) | |
| cv2.putText(panel, f"Global count: {global_counter}", (12, 92), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0,0,128), 2) | |
| cv2.putText(panel, f"Avg time taken: {int(avg_time_taken)}s", (12, 126), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0,0,0), 2) | |
| yv = 150 | |
| for tid, _, _, t_inside, trav, _ in disp[:8]: | |
| cv2.putText(panel, f"ID {tid}: {t_inside}s, {trav:.2f}m", (12, yv), cv2.FONT_HERSHEY_SIMPLEX, 0.55, (50,50,50), 1) | |
| yv += 18 | |
| final = np.hstack((frame, right_panel)) | |
| # place panel top-right inside right panel | |
| panel_x = width + (RIGHT_PANEL_W - panel_w)//2 | |
| panel_y = 10 | |
| final[panel_y:panel_y+panel_h, panel_x:panel_x+panel_w] = panel | |
| # place patch below panel | |
| patch_x = width + (RIGHT_PANEL_W - patch_w)//2 | |
| patch_y = panel_y + panel_h + 10 | |
| if patch_y + patch_h > height: | |
| patch_y = height - patch_h - 10 | |
| final[patch_y:patch_y+patch_h, patch_x:patch_x+patch_w] = patch | |
| writer.write(np.ascontiguousarray(final)) | |
| # finalize | |
| end_t = time.time() | |
| for tid in list(entry_time.keys()): | |
| accumulated_time[tid] += end_t - entry_time[tid] | |
| exit_vid_time = last_seen.get(tid, end_t) - start_time | |
| last_exit_vid[tid] = exit_vid_time | |
| completed_times.append(accumulated_time[tid]) | |
| # Update sequential entry exit time | |
| for entry in sequential_entries: | |
| if entry['tid'] == tid and entry['exit_time'] is None: | |
| entry['exit_time'] = exit_vid_time | |
| entry['duration'] = accumulated_time[tid] | |
| break | |
| entry_time.pop(tid, None) | |
| inside_state[tid] = False | |
| cap.release() | |
| writer.release() | |
| # export excel (only >0) | |
| # export excel with sequential person numbers | |
| rows = [] | |
| for entry in sequential_entries: | |
| if entry['exit_time'] is not None and entry['duration'] is not None and entry['duration'] > 0: | |
| rows.append({ | |
| "Person": entry['person_num'], | |
| "Time in": fmt(entry['entry_time']), | |
| "Time out": fmt(entry['exit_time']), | |
| "Time in queue (seconds)": round(float(entry['duration']), 2) | |
| }) | |
| df = pd.DataFrame(rows, columns=["Person","Time in","Time out","Time in queue (seconds)"]) | |
| if len(df) > 0: | |
| df.to_excel("person_times_2.xlsx", index=False) | |
| else: | |
| pd.DataFrame(columns=["Passenger","Time in","Time out","Time in queue (seconds)"]).to_excel("person_times_2.xlsx", index=False) | |
| print("\nFinished. Output:", os.path.abspath(output_video_path)) | |
| print("Saved times:", os.path.abspath("person_times_2.xlsx")) | |
| # # ---------------- Runner | |
| # if __name__ == "__main__": | |
| # CONFIG = { | |
| # 'input_video_path': "sample_vid_o.mp4", | |
| # 'output_video_path': "output24.avi", | |
| # 'model_name': "yolo11x.pt", | |
| # 'head_model_name': "head_detection_single_video_best.pt", | |
| # 'conf_threshold': 0.3, | |
| # 'img_size': 1280, | |
| # 'use_gpu': True, | |
| # 'enhance_frames': False, | |
| # 'smooth_bbox_tracks': True, | |
| # 'missing_timeout': 3.0 | |
| # } | |
| # process_video( | |
| # input_video_path = CONFIG['input_video_path'], | |
| # output_video_path = CONFIG['output_video_path'], | |
| # model_name = CONFIG['model_name'], | |
| # head_model_name = CONFIG['head_model_name'], | |
| # conf_threshold = CONFIG['conf_threshold'], | |
| # img_size = CONFIG['img_size'], | |
| # use_gpu = CONFIG['use_gpu'], | |
| # enhance_frames = CONFIG['enhance_frames'], | |
| # smooth_bbox_tracks = CONFIG['smooth_bbox_tracks'], | |
| # missing_timeout = CONFIG['missing_timeout'] | |
| # ) | |
| # ---------------- Gradio Interface | |
| import gradio as gr | |
| import tempfile | |
| import shutil | |
| def gradio_process_video(input_video, conf_threshold=0.3, missing_timeout=3.0): | |
| """ | |
| Wrapper function for Gradio interface | |
| """ | |
| try: | |
| # Create temporary directory for outputs | |
| temp_dir = tempfile.mkdtemp() | |
| # Define output paths | |
| output_video_path = os.path.join(temp_dir, "output_tracking.mp4") | |
| excel_path = os.path.join(temp_dir, "person_times.xlsx") | |
| # Copy the excel file path for the process_video function to use | |
| original_excel = "person_times_2.xlsx" | |
| # Run the processing | |
| CONFIG = { | |
| 'input_video_path': input_video, | |
| 'output_video_path': output_video_path, | |
| 'model_name': "yolo11x.pt", | |
| 'head_model_name': "head_detection_single_video_best.pt", | |
| 'conf_threshold': float(conf_threshold), | |
| 'img_size': 1280, | |
| 'use_gpu': torch.cuda.is_available(), | |
| 'enhance_frames': False, | |
| 'smooth_bbox_tracks': True, | |
| 'missing_timeout': float(missing_timeout) | |
| } | |
| process_video( | |
| input_video_path=CONFIG['input_video_path'], | |
| output_video_path=CONFIG['output_video_path'], | |
| model_name=CONFIG['model_name'], | |
| head_model_name=CONFIG['head_model_name'], | |
| conf_threshold=CONFIG['conf_threshold'], | |
| img_size=CONFIG['img_size'], | |
| use_gpu=CONFIG['use_gpu'], | |
| enhance_frames=CONFIG['enhance_frames'], | |
| smooth_bbox_tracks=CONFIG['smooth_bbox_tracks'], | |
| missing_timeout=CONFIG['missing_timeout'] | |
| ) | |
| # Copy the generated excel file to temp directory | |
| if os.path.exists(original_excel): | |
| shutil.copy(original_excel, excel_path) | |
| return output_video_path, excel_path | |
| except Exception as e: | |
| print(f"Error processing video: {str(e)}") | |
| import traceback | |
| traceback.print_exc() | |
| return None, None | |
| # Create Gradio interface | |
| with gr.Blocks(title="Queue Tracking System") as demo: | |
| gr.Markdown( | |
| """ | |
| # 🎯 Queue Tracking & Analytics System | |
| Upload a video to track people in a defined polygon area. The system will: | |
| - Track people entering and exiting the zone | |
| - Count directional crossings through L1 and L2 lines | |
| - Calculate time spent in queue | |
| - Measure travel distance | |
| - Detect both full body and head-only detections | |
| **Note:** Processing may take several minutes depending on video length. | |
| """ | |
| ) | |
| with gr.Row(): | |
| with gr.Column(): | |
| video_input = gr.Video( | |
| label="Upload Video", | |
| format="mp4" | |
| ) | |
| conf_threshold = gr.Slider( | |
| minimum=0.1, | |
| maximum=0.9, | |
| value=0.3, | |
| step=0.05, | |
| label="Detection Confidence Threshold", | |
| info="Lower values detect more objects but may include false positives" | |
| ) | |
| missing_timeout = gr.Slider( | |
| minimum=1.0, | |
| maximum=10.0, | |
| value=3.0, | |
| step=0.5, | |
| label="Missing Timeout (seconds)", | |
| info="How long to wait before considering a person has left the zone" | |
| ) | |
| process_btn = gr.Button("🚀 Process Video", variant="primary", size="lg") | |
| with gr.Column(): | |
| video_output = gr.Video( | |
| label="Processed Video with Tracking", | |
| format="mp4" | |
| ) | |
| excel_output = gr.File( | |
| label="Download Excel Report", | |
| file_types=[".xlsx"] | |
| ) | |
| gr.Markdown( | |
| """ | |
| ### 📊 Output Information: | |
| - **Processed Video**: Shows tracking overlay with IDs, polygon area, and crossing lines | |
| - **Excel Report**: Contains entry/exit times and queue duration for each person | |
| """ | |
| ) | |
| gr.Markdown( | |
| """ | |
| --- | |
| ### 🔧 Technical Details: | |
| - Uses YOLO11x for person detection | |
| - Custom head detection model for occlusion handling | |
| - Homographic transformation for accurate spatial mapping | |
| - ByteTrack for robust ID tracking | |
| - Directional crossing detection (L1 → L2) | |
| """ | |
| ) | |
| # Connect the button to the processing function | |
| process_btn.click( | |
| fn=gradio_process_video, | |
| inputs=[video_input, conf_threshold, missing_timeout], | |
| outputs=[video_output, excel_output] | |
| ) | |
| # Add examples if you have sample videos | |
| gr.Examples( | |
| examples=[ | |
| ["sample_vid_o.mp4", 0.3, 3.0], | |
| ], | |
| inputs=[video_input, conf_threshold, missing_timeout], | |
| outputs=[video_output, excel_output], | |
| fn=gradio_process_video, | |
| cache_examples=False, | |
| ) | |
| # Launch the app | |
| if __name__ == "__main__": | |
| demo.launch( | |
| share=False, # Set to True if you want a temporary public link | |
| server_name="0.0.0.0", # Important for Hugging Face Spaces | |
| server_port=7860 # Default port for HF Spaces | |
| ) | |