Spaces:
Paused
Paused
from vitpose import VitPose | |
import requests | |
import os | |
from config import API_URL,API_KEY | |
from fastapi import UploadFile | |
import logging | |
import cv2 | |
import numpy as np | |
from dataclasses import dataclass | |
from typing import Optional, Tuple, Dict, List | |
import time | |
import json | |
from fastapi.responses import JSONResponse | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
# Jump Analysis Constants | |
JUMP_THRESHOLD_PERCENT = 0.05 | |
SMOOTHING_WINDOW = 5 | |
HORIZONTAL_OFFSET_FACTOR = 0.75 | |
VELOCITY_WINDOW = 3 | |
METRICS_BELOW_FEET_OFFSET = 20 | |
# Color Constants | |
BLUE = (255, 0, 0) | |
GREEN = (0, 255, 0) | |
YELLOW = (0, 255, 255) | |
WHITE = (255, 255, 255) | |
BLACK = (0, 0, 0) | |
GRAY = (128, 128, 128) | |
LIGHT_GRAY = (200, 200, 200) | |
COLORS = { | |
"blue": BLUE, | |
"green": GREEN, | |
"yellow": YELLOW, | |
"white": WHITE, | |
"black": BLACK, | |
"gray": GRAY, | |
"light_gray": LIGHT_GRAY | |
} | |
# Keypoint indices | |
KEYPOINT_INDICES = { | |
'L_Ankle': 15, 'L_Ear': 3, 'L_Elbow': 7, 'L_Eye': 1, 'L_Hip': 11, | |
'L_Knee': 13, 'L_Shoulder': 5, 'L_Wrist': 9, 'Nose': 0, 'R_Ankle': 16, | |
'R_Ear': 4, 'R_Elbow': 8, 'R_Eye': 2, 'R_Hip': 12, 'R_Knee': 14, | |
'R_Shoulder': 6, 'R_Wrist': 10 | |
} | |
# Skeleton connections | |
SKELETON_CONNECTIONS = [ | |
("Nose", "L_Eye"), ("Nose", "R_Eye"), ("L_Eye", "L_Ear"), ("R_Eye", "R_Ear"), | |
("Nose", "L_Shoulder"), ("Nose", "R_Shoulder"), ("L_Shoulder", "R_Shoulder"), | |
("L_Shoulder", "L_Elbow"), ("R_Shoulder", "R_Elbow"), ("L_Elbow", "L_Wrist"), | |
("R_Elbow", "R_Wrist"), ("L_Shoulder", "L_Hip"), ("R_Shoulder", "R_Hip"), | |
("L_Hip", "R_Hip"), ("L_Hip", "L_Knee"), ("R_Hip", "R_Knee"), | |
("L_Knee", "L_Ankle"), ("R_Knee", "R_Ankle") | |
] | |
class JumpMetrics: | |
max_jump_height: float = 0.0 | |
velocity_vertical: float = 0.0 | |
peak_power_sayer: float = 0.0 | |
jump_peak_power: float = 0.0 | |
repetition_count: int = 0 | |
ground_level: Optional[float] = None | |
takeoff_head_y: Optional[float] = None | |
max_head_height_px: Optional[float] = None | |
jump_started: bool = False | |
class OverlayConfig: | |
alpha: float = 0.7 | |
font: int = cv2.FONT_HERSHEY_SIMPLEX | |
font_scale_title_metric: float = 0.5 | |
font_scale_value: float = 0.7 | |
font_scale_title_main: float = 1.2 | |
font_thickness_metric: int = 1 | |
font_thickness_title_main: int = 1 | |
line_height_title_metric: int = int(20 * 1.2) | |
line_height_value: int = int(25 * 1.2) | |
padding_vertical: int = int(15 * 1.2) | |
padding_horizontal: int = int(15 * 1.2) | |
border_thickness: int = 1 | |
corner_radius: int = 10 | |
spacing_horizontal: int = 30 | |
title_y_offset: int = 50 | |
metrics_y_offset_alto: int = 80 | |
class FramePosition: | |
x: int | |
y: int | |
width: int | |
height: int | |
def process_video(file_name: str,vitpose: VitPose,user_id: str,player_id: str): | |
""" | |
Process a video file using VitPose for pose estimation and send results to webhook. | |
This function processes a video file by applying pose estimation, saving the annotated | |
video to the static directory, and sending the processed video to a webhook endpoint. | |
Args: | |
file_name (str): Path to the input video file | |
vitpose (VitPose): VitPose instance for pose estimation | |
user_id (str): ID of the user uploading the video | |
player_id (str): ID of the player in the video | |
Returns: | |
None | |
Raises: | |
ValueError: If video file cannot be opened or processed | |
requests.RequestException: If webhook request fails | |
""" | |
video_path = file_name | |
contents = open(video_path, "rb").read() | |
with open(video_path, "wb") as f: | |
f.write(contents) | |
logger.info(f"file saved {video_path}") | |
logger.info(f"starting task {video_path}") | |
new_file_name = os.path.join("static", video_path) | |
logger.info(f"new file name {new_file_name}") | |
vitpose.output_video_path = new_file_name | |
annotated_frames = vitpose.run(video_path) | |
vitpose.frames_to_video(annotated_frames) | |
logger.info(f"Video processed {video_path}") | |
with open(new_file_name, "rb") as f: | |
contents = f.read() | |
url = API_URL+ "/excercises/webhooks/video-processed" | |
logger.info(f"Sending video to {url}") | |
files = {"file": (video_path, contents, "video/mp4")} | |
logger.info(f"video_path: {video_path}") | |
response = requests.post(url, files=files, | |
data={"user_id":user_id,"typeMessage":"video_processed","file_name":video_path, | |
"player_id":player_id}, | |
stream=True, | |
headers={"token":API_KEY}) | |
logger.info(f"Response: {response.status_code}") | |
logger.info(f"Response: {response.text}") | |
logger.info(f"Video sent to {url}") | |
def process_salto_alto(file_name: str, | |
vitpose: VitPose, | |
player_data: dict, | |
exercise_id: str, | |
repetitions) -> dict: | |
""" | |
Process a high jump exercise video using VitPose for pose estimation and analyze jump metrics. | |
This function processes a high jump video by analyzing pose keypoints to calculate | |
jump metrics including height, velocity, and power. Results are sent to an API endpoint. | |
Args: | |
file_name (str): Path to the input video file | |
vitpose (VitPose): VitPose instance for pose estimation | |
player_data (dict): Dictionary containing player information including: | |
- height: Player height in cm | |
- weight: Player weight in kg | |
- id: Player identifier | |
exercise_id (str): Unique identifier for the exercise | |
repetitions (int): Expected number of jump repetitions in the video | |
Returns: | |
dict: Dictionary containing analysis results and video information | |
Raises: | |
ValueError: If video processing fails or player data is invalid | |
requests.RequestException: If API request fails | |
""" | |
# Use the provided VitPose instance | |
print(f"start processing") | |
model = vitpose.pipeline | |
# Get player parameters from player_data or use defaults | |
reference_height = player_data.get('height', 1.68) # Altura aproximada de la persona en metros | |
body_mass_kg = player_data.get('weight', 64) # Peso corporal en kg | |
# Generate output paths | |
output_video = file_name.replace('.mp4', '_analyzed.mp4') | |
# Process the video and get the jump metrics | |
# print(f"reference_height: {reference_height}") | |
results_dict = analyze_jump_video( | |
model=model, | |
input_video=file_name, | |
output_video=output_video, | |
player_height= float(reference_height) / 100, #cm to m | |
body_mass_kg= float(body_mass_kg), | |
repetitions=repetitions | |
) | |
results_dict = {'video_analysis': {'output_video': 'user_id_2_player_id_2_exercise_salto_alto_VIDEO-2025-05-19-18-55-47_analyzed.mp4'}, 'repetition_data': [{'repetition': 1, 'distancia_elevada': 0.47999998927116394, 'salto_alto': 2.180000066757202, 'potencia_sayer': 3768.719970703125}, {'repetition': 2, 'distancia_elevada': 0.49000000953674316, 'salto_alto': 2.190000057220459, 'potencia_sayer': 3827.929931640625}, {'repetition': 3, 'distancia_elevada': 0.5099999904632568, 'salto_alto': 2.2100000381469727, 'potencia_sayer': 3915.5}]} | |
print(f"results_dict: {results_dict}") | |
response = send_results_api(results_dict, | |
player_data["id"], | |
exercise_id, | |
file_name) | |
# os.remove(file_name) | |
# os.remove(output_video) | |
def send_results_api(results_dict: dict, | |
player_id: str, | |
exercise_id: str, | |
video_path: str) -> JSONResponse: | |
""" | |
Send video analysis results to the API webhook endpoint. | |
This function uploads the analyzed video file along with the computed metrics | |
to the API's webhook endpoint for processing and storage. | |
Args: | |
results_dict (dict): Dictionary containing analysis results including: | |
- video_analysis: Information about the processed video | |
- repetition_data: List of metrics for each jump repetition | |
player_id (str): Unique identifier for the player | |
exercise_id (str): Unique identifier for the exercise | |
video_path (str): Path to the video file to upload | |
Returns: | |
JSONResponse: HTTP response from the API endpoint | |
Raises: | |
FileNotFoundError: If the video file doesn't exist | |
requests.RequestException: If the API request fails | |
json.JSONEncodeError: If results_dict cannot be serialized to JSON | |
""" | |
url = API_URL + "/excercises/webhooks/video-processed-results" | |
logger.info(f"Sending video results to {url}") | |
# Open the video file | |
with open(video_path, 'rb') as video_file: | |
# Prepare the files dictionary for file upload | |
files = { | |
'file': (video_path.split('/')[-1], video_file, 'video/mp4') | |
} | |
# Prepare the form data | |
data = { | |
'player_id': player_id, | |
'exercise_id': exercise_id, | |
'results': json.dumps(results_dict) # Convert dict to JSON string | |
} | |
# Send the request with both files and data | |
response = requests.post( | |
url, | |
headers={"token": API_KEY}, | |
files=files, | |
data=data, | |
stream=True | |
) | |
logger.info(f"Response: {response.status_code}") | |
logger.info(f"Response: {response.text}") | |
return response | |
def setup_video_capture(input_video: str, output_video: str) -> Tuple[cv2.VideoCapture, cv2.VideoWriter, int, int]: | |
""" | |
Initialize video capture and writer objects for video processing. | |
This function creates OpenCV VideoCapture and VideoWriter objects with matching | |
properties (frame rate, dimensions) for reading from input and writing to output. | |
Args: | |
input_video (str): Path to the input video file | |
output_video (str): Path for the output video file | |
Returns: | |
Tuple[cv2.VideoCapture, cv2.VideoWriter, int, int]: A tuple containing: | |
- cap: VideoCapture object for reading input video | |
- out: VideoWriter object for writing output video | |
- width: Video frame width in pixels | |
- height: Video frame height in pixels | |
Raises: | |
ValueError: If the input video cannot be opened or read | |
cv2.error: If video writer initialization fails | |
""" | |
cap = cv2.VideoCapture(input_video) | |
if not cap.isOpened(): | |
raise ValueError("Error al abrir el video") | |
fps = cap.get(cv2.CAP_PROP_FPS) | |
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
out = cv2.VideoWriter(output_video, cv2.VideoWriter_fourcc(*'mp4v'), fps, (width, height)) | |
return cap, out, width, height | |
def calibrate_pose_detection(model, cap, player_height: float) -> Tuple[float, int, int]: | |
""" | |
Calibrate pose detection scale and reference points using the first video frame. | |
This function analyzes the first frame to establish the pixel-to-meter conversion | |
ratio based on the player's known height and detects initial shoulder positions | |
for reference during video processing. | |
Args: | |
model: VitPose model instance for pose estimation | |
cap: OpenCV VideoCapture object | |
player_height (float): Actual height of the player in meters | |
Returns: | |
Tuple[float, int, int]: A tuple containing: | |
- PX_PER_METER: Conversion factor from pixels to meters | |
- initial_left_shoulder_x: X-coordinate of left shoulder in pixels | |
- initial_right_shoulder_x: X-coordinate of right shoulder in pixels | |
Raises: | |
ValueError: If video cannot be read or pose detection fails on first frame | |
IndexError: If required keypoints are not detected in the first frame | |
""" | |
ret, frame = cap.read() | |
if not ret: | |
raise ValueError("Error al leer el video") | |
output = model(frame) | |
keypoints = output.keypoints_xy.float().cpu().numpy() | |
labels = model.pose_estimator_config.label2id | |
nose_keypoint = labels["Nose"] | |
L_ankle_keypoint = labels["L_Ankle"] | |
R_ankle_keypoint = labels["R_Ankle"] | |
L_shoulder_keypoint = labels["L_Shoulder"] | |
R_shoulder_keypoint = labels["R_Shoulder"] | |
PX_PER_METER = None | |
initial_left_shoulder_x = None | |
initial_right_shoulder_x = None | |
if (keypoints is not None and len(keypoints) > 0 and len(keypoints[0]) > 0): | |
kpts_first = keypoints[0] | |
if len(kpts_first[nose_keypoint]) > 0 and len(kpts_first[L_ankle_keypoint]) > 0: | |
initial_person_height_px = min(kpts_first[L_ankle_keypoint][1], kpts_first[R_ankle_keypoint][1]) - kpts_first[nose_keypoint][1] | |
PX_PER_METER = initial_person_height_px / player_height | |
if len(kpts_first[L_shoulder_keypoint]) > 0 and len(kpts_first[R_shoulder_keypoint]) > 0: | |
initial_left_shoulder_x = int(kpts_first[L_shoulder_keypoint][0]) | |
initial_right_shoulder_x = int(kpts_first[R_shoulder_keypoint][0]) | |
if PX_PER_METER is None or initial_left_shoulder_x is None or initial_right_shoulder_x is None: | |
raise ValueError("No se pudo calibrar la escala o detectar los hombros en el primer frame.") | |
return PX_PER_METER, initial_left_shoulder_x, initial_right_shoulder_x | |
def process_frame_keypoints(model, frame): | |
""" | |
Process a video frame and extract human pose keypoints. | |
This function applies the pose estimation model to a frame and validates | |
that all required keypoints (nose, ankles, shoulders) are detected and visible. | |
Args: | |
model: VitPose model instance for pose estimation | |
frame: Input video frame as numpy array | |
Returns: | |
Tuple containing: | |
- success (bool): True if all required keypoints were detected, False otherwise | |
- current_ankle_y (float or None): Y-coordinate of the highest ankle point if detected | |
- current_head_y (float or None): Y-coordinate of the nose point if detected | |
- keypoints (numpy.ndarray or None): Array of detected keypoints if successful | |
""" | |
try: | |
output = model(frame) | |
keypoints = output.keypoints_xy.float().cpu().numpy() | |
labels = model.pose_estimator_config.label2id | |
nose_keypoint = labels["Nose"] | |
L_ankle_keypoint = labels["L_Ankle"] | |
R_ankle_keypoint = labels["R_Ankle"] | |
L_shoulder_keypoint = labels["L_Shoulder"] | |
R_shoulder_keypoint = labels["R_Shoulder"] | |
if (keypoints is not None and | |
len(keypoints) > 0 and | |
len(keypoints[0]) > 0 and | |
keypoints.size > 0): | |
kpts = keypoints[0] | |
if (nose_keypoint < len(kpts) and L_ankle_keypoint < len(kpts) and | |
R_ankle_keypoint < len(kpts) and L_shoulder_keypoint < len(kpts) and | |
R_shoulder_keypoint < len(kpts)): | |
nose = kpts[nose_keypoint] | |
ankles = [kpts[L_ankle_keypoint], kpts[R_ankle_keypoint]] | |
left_shoulder = kpts[L_shoulder_keypoint] | |
right_shoulder = kpts[R_shoulder_keypoint] | |
if (nose[0] > 0 and nose[1] > 0 and | |
all(a[0] > 0 and a[1] > 0 for a in ankles) and | |
left_shoulder[0] > 0 and left_shoulder[1] > 0 and | |
right_shoulder[0] > 0 and right_shoulder[1] > 0): | |
current_ankle_y = min(a[1] for a in ankles) | |
current_head_y = nose[1] | |
return True, current_ankle_y, current_head_y, keypoints | |
return False, None, None, None | |
except Exception as e: | |
print(f"Error processing frame: {e}") | |
return False, None, None, None | |
def detect_jump_events(metrics: JumpMetrics, smoothed_ankle_y: float, smoothed_head_y: float, | |
repetition_data: List[Dict], player_height: float, body_mass_kg: float, | |
repetitions: int) -> bool: | |
""" | |
Detect jump start and end events based on ankle position changes. | |
This function monitors ankle position relative to ground level to detect when | |
a jump begins and ends. It calculates jump metrics for completed jumps and | |
tracks repetition count. | |
Args: | |
metrics (JumpMetrics): Object tracking current jump state and metrics | |
smoothed_ankle_y (float): Current smoothed ankle Y-coordinate | |
smoothed_head_y (float): Current smoothed head Y-coordinate | |
repetition_data (List[Dict]): List to store completed jump data | |
player_height (float): Player height in meters | |
body_mass_kg (float): Player body mass in kilograms | |
repetitions (int): Target number of repetitions to detect | |
Returns: | |
bool: True if target number of repetitions has been reached, False otherwise | |
Side Effects: | |
- Updates metrics object with jump state | |
- Appends completed jump data to repetition_data list | |
- Modifies metrics.ground_level, metrics.jump_started, metrics.repetition_count | |
""" | |
if metrics.ground_level is None: | |
metrics.ground_level = smoothed_ankle_y | |
metrics.takeoff_head_y = smoothed_head_y | |
return False | |
relative_ankle_change = (metrics.ground_level - smoothed_ankle_y) / metrics.ground_level if metrics.ground_level > 0 else 0 | |
# Detect jump start | |
if not metrics.jump_started and relative_ankle_change > JUMP_THRESHOLD_PERCENT: | |
metrics.jump_started = True | |
metrics.takeoff_head_y = smoothed_head_y | |
metrics.max_jump_height = 0 | |
metrics.max_head_height_px = smoothed_head_y | |
metrics.jump_peak_power = 0.0 | |
return False | |
# Detect jump end | |
if metrics.jump_started and relative_ankle_change <= JUMP_THRESHOLD_PERCENT: | |
high_jump = calculate_high_jump(player_height, metrics.max_jump_height) | |
repetition_data.append({ | |
"repetition": metrics.repetition_count + 1, | |
"distancia_elevada": round(metrics.max_jump_height, 2), | |
"salto_alto": round(high_jump, 2), | |
"potencia_sayer": round(metrics.jump_peak_power, 2) | |
}) | |
metrics.repetition_count += 1 | |
metrics.jump_started = False | |
return metrics.repetition_count >= repetitions | |
return False | |
def calculate_jump_metrics(metrics: JumpMetrics, smoothed_head_y: float, PX_PER_METER: float, | |
body_mass_kg: float, head_y_buffer: List[float], fps: float): | |
""" | |
Calculate jump metrics during an active jump phase. | |
This function continuously updates jump metrics while a jump is in progress, | |
tracking maximum jump height, peak power, and other performance indicators. | |
Args: | |
metrics (JumpMetrics): Object containing current jump state and metrics | |
smoothed_head_y (float): Current smoothed head Y-coordinate in pixels | |
PX_PER_METER (float): Conversion factor from pixels to meters | |
body_mass_kg (float): Player body mass in kilograms | |
head_y_buffer (List[float]): Buffer of recent head positions for velocity calculation | |
fps (float): Video frame rate in frames per second | |
Returns: | |
None | |
Side Effects: | |
- Updates metrics.max_jump_height if current jump exceeds previous maximum | |
- Updates metrics.max_head_height_px with lowest Y-coordinate (highest position) | |
- Updates metrics.jump_peak_power and metrics.peak_power_sayer with calculated power values | |
""" | |
if not metrics.jump_started: | |
return | |
relative_jump = (metrics.takeoff_head_y - smoothed_head_y) / PX_PER_METER | |
if relative_jump > metrics.max_jump_height: | |
metrics.max_jump_height = relative_jump | |
if smoothed_head_y < metrics.max_head_height_px: | |
metrics.max_head_height_px = smoothed_head_y | |
if relative_jump: | |
current_power = calculate_peak_power_sayer(relative_jump, body_mass_kg) | |
if current_power > metrics.jump_peak_power: | |
metrics.jump_peak_power = current_power | |
if current_power > metrics.peak_power_sayer: | |
metrics.peak_power_sayer = current_power | |
def calculate_velocity(head_y_buffer: List[float], PX_PER_METER: float, fps: float) -> float: | |
""" | |
Calculate vertical velocity based on head position changes over time. | |
This function computes the vertical velocity by analyzing the change in head | |
position over a specified time window, converting from pixel coordinates to | |
real-world units. | |
Args: | |
head_y_buffer (List[float]): Buffer containing recent head Y-coordinates in pixels | |
PX_PER_METER (float): Conversion factor from pixels to meters | |
fps (float): Video frame rate in frames per second | |
Returns: | |
float: Vertical velocity in meters per second (positive = upward motion) | |
Returns 0.0 if calculation cannot be performed | |
Note: | |
- Requires at least VELOCITY_WINDOW frames in the buffer | |
- Velocity is calculated as the change from oldest to newest position | |
- Y-coordinates decrease as objects move upward in image coordinates | |
""" | |
if len(head_y_buffer) < VELOCITY_WINDOW or PX_PER_METER is None or fps <= 0: | |
return 0.0 | |
delta_y_pixels = head_y_buffer[0] - head_y_buffer[-1] | |
delta_y_meters = delta_y_pixels / PX_PER_METER | |
delta_t = VELOCITY_WINDOW / fps | |
return delta_y_meters / delta_t | |
def draw_skeleton(frame, keypoints): | |
""" | |
Draw human pose skeleton on a video frame. | |
This function visualizes the detected pose by drawing keypoints as circles | |
and connecting them with lines according to the human body structure. | |
Args: | |
frame (numpy.ndarray): Video frame to draw on (modified in-place) | |
keypoints (numpy.ndarray or None): Array of detected keypoints with shape (N, 17, 2) | |
where N is batch size, 17 is number of keypoints, | |
and 2 represents (x, y) coordinates | |
Returns: | |
None | |
Side Effects: | |
- Modifies the input frame by drawing circles for keypoints | |
- Draws lines connecting related body parts (skeleton connections) | |
- Uses GREEN color for keypoints and YELLOW for connections | |
Note: | |
- Safely handles None or empty keypoints arrays | |
- Only draws keypoints and connections with positive coordinates | |
- Uses SKELETON_CONNECTIONS constant for body part relationships | |
""" | |
if keypoints is None or len(keypoints) == 0 or len(keypoints[0]) == 0: | |
return | |
try: | |
kpts = keypoints[0] | |
# Draw points | |
for point in kpts: | |
if point[0] > 0 and point[1] > 0: | |
cv2.circle(frame, (int(point[0]), int(point[1])), 5, GREEN, -1) | |
# Draw connections | |
for connection in SKELETON_CONNECTIONS: | |
start_name, end_name = connection | |
start_idx = KEYPOINT_INDICES[start_name] | |
end_idx = KEYPOINT_INDICES[end_name] | |
if (start_idx < len(kpts) and end_idx < len(kpts) and | |
kpts[start_idx][0] > 0 and kpts[start_idx][1] > 0 and | |
kpts[end_idx][0] > 0 and kpts[end_idx][1] > 0): | |
start_point = (int(kpts[start_idx][0]), int(kpts[start_idx][1])) | |
end_point = (int(kpts[end_idx][0]), int(kpts[end_idx][1])) | |
cv2.line(frame, start_point, end_point, YELLOW, 2) | |
except Exception as e: | |
print(f"Error drawing skeleton: {e}") | |
def analyze_jump_video(model: VitPose, | |
input_video: str, | |
output_video: str, | |
player_height: float, | |
body_mass_kg: float, | |
repetitions: int) -> dict | None: | |
""" | |
Analyze a jump video to calculate various jump metrics. | |
Args: | |
model: VitPose model instance | |
input_video: Path to input video | |
output_video: Path to output video | |
player_height: Height of the person in meters | |
body_mass_kg: Weight of the person in kg | |
repetitions: Expected number of repetitions | |
Returns: | |
Dictionary containing jump metrics and video analysis data | |
""" | |
try: | |
# Setup video capture and writer | |
cap, out, width, height = setup_video_capture(input_video, output_video) | |
fps = cap.get(cv2.CAP_PROP_FPS) | |
# Calibrate pose detection | |
PX_PER_METER, initial_left_shoulder_x, initial_right_shoulder_x = calibrate_pose_detection( | |
model, cap, player_height) | |
# Reset video for processing | |
cap.release() | |
cap = cv2.VideoCapture(input_video) | |
# Initialize tracking variables | |
metrics = JumpMetrics() | |
repetition_data = [] | |
head_y_history = [] | |
ankle_y_history = [] | |
head_y_buffer = [] | |
last_detected_ankles_y = None | |
# Process each frame | |
while cap.isOpened(): | |
ret, frame = cap.read() | |
if not ret: | |
break | |
annotated_frame = frame.copy() | |
if metrics.repetition_count >= repetitions: | |
out.write(annotated_frame) | |
continue | |
# Process frame keypoints | |
keypoints_valid, current_ankle_y, current_head_y, keypoints = process_frame_keypoints(model, annotated_frame) | |
if keypoints_valid: | |
last_detected_ankles_y = current_ankle_y | |
# Smooth positions | |
ankle_y_history.append(current_ankle_y) | |
if len(ankle_y_history) > SMOOTHING_WINDOW: | |
ankle_y_history.pop(0) | |
smoothed_ankle_y = np.mean(ankle_y_history) | |
head_y_history.append(current_head_y) | |
if len(head_y_history) > SMOOTHING_WINDOW: | |
head_y_history.pop(0) | |
smoothed_head_y = np.mean(head_y_history) | |
# Calculate velocity | |
head_y_buffer.append(smoothed_head_y) | |
if len(head_y_buffer) > VELOCITY_WINDOW: | |
head_y_buffer.pop(0) | |
metrics.velocity_vertical = calculate_velocity(head_y_buffer, PX_PER_METER, fps) | |
# Detect jump events | |
should_stop = detect_jump_events(metrics, smoothed_ankle_y, smoothed_head_y, | |
repetition_data, player_height, body_mass_kg, repetitions) | |
if should_stop: | |
break | |
# Calculate jump metrics during jump | |
calculate_jump_metrics(metrics, smoothed_head_y, PX_PER_METER, body_mass_kg, head_y_buffer, fps) | |
else: | |
last_detected_ankles_y = None | |
metrics.velocity_vertical = 0.0 | |
# Draw overlay and skeleton | |
high_jump = calculate_high_jump(player_height, metrics.max_jump_height) | |
annotated_frame = draw_metrics_overlay( | |
frame=annotated_frame, | |
max_jump_height=metrics.max_jump_height, | |
salto_alto=high_jump, | |
velocity_vertical=metrics.velocity_vertical, | |
peak_power_sayer=metrics.peak_power_sayer, | |
repetition_count=metrics.repetition_count, | |
last_detected_ankles_y=last_detected_ankles_y, | |
initial_left_shoulder_x=initial_left_shoulder_x, | |
initial_right_shoulder_x=initial_right_shoulder_x, | |
width=width, | |
height=height, | |
colors=COLORS, | |
metrics_below_feet_offset=METRICS_BELOW_FEET_OFFSET, | |
horizontal_offset_factor=HORIZONTAL_OFFSET_FACTOR | |
) | |
if keypoints_valid and keypoints is not None: | |
draw_skeleton(annotated_frame, keypoints) | |
out.write(annotated_frame) | |
# Prepare results | |
results_dict = { | |
"video_analysis": { | |
"output_video": str(output_video), | |
}, | |
"repetition_data": [ | |
{ | |
"repetition": int(rep["repetition"]), | |
"distancia_elevada": float(rep["distancia_elevada"]), | |
"salto_alto": float(rep["salto_alto"]), | |
"potencia_sayer": float(rep["potencia_sayer"]) | |
} for rep in repetition_data | |
] | |
} | |
cap.release() | |
out.release() | |
return results_dict | |
except Exception as e: | |
print(f"Error in analyze_jump_video: {e}") | |
return None | |
def calculate_peak_power_sayer(jump_height_m, body_mass_kg): | |
""" | |
Estimates peak anaerobic power using Sayer's equation. | |
Args: | |
jump_height_m: Jump height in meters | |
body_mass_kg: Body mass in kg | |
Returns: | |
Estimated peak power in watts | |
""" | |
jump_height_cm = jump_height_m * 100 | |
return (60.7 * jump_height_cm) + (45.3 * body_mass_kg) - 2055 | |
def calculate_high_jump(player_height:float, max_jump_height:float) -> float: | |
""" | |
Calculate the high jump height based on the player height and the max jump height. | |
Args: | |
player_height: Player height in meters | |
max_jump_height: Relative jump height in meters | |
Returns: | |
the high jump height in meters | |
""" | |
return player_height + max_jump_height | |
def draw_rounded_rect(img, pt1, pt2, color, thickness=-1, lineType=cv2.LINE_AA, radius=10): | |
""" | |
Draw a rectangle with rounded corners on an image. | |
This function creates a rounded rectangle by drawing four corner ellipses | |
and connecting them with straight rectangular sections. | |
Args: | |
img (numpy.ndarray): Image to draw on (modified in-place) | |
pt1 (tuple): Top-left corner coordinates (x, y) | |
pt2 (tuple): Bottom-right corner coordinates (x, y) | |
color (tuple): BGR color tuple (B, G, R) | |
thickness (int, optional): Line thickness. -1 for filled rectangle. Defaults to -1. | |
lineType (int, optional): Type of line drawing. Defaults to cv2.LINE_AA. | |
radius (int, optional): Corner radius in pixels. Defaults to 10. | |
Returns: | |
numpy.ndarray: The modified image with rounded rectangle drawn | |
Note: | |
- If radius is 0, draws a regular rectangle | |
- For filled rectangles, use thickness=-1 | |
- Corner ellipses are drawn at each corner with specified radius | |
- Rectangle sections fill the gaps between ellipses | |
""" | |
x1, y1 = pt1 | |
x2, y2 = pt2 | |
if radius > 0: | |
img = cv2.ellipse(img, (x1 + radius, y1 + radius), (radius, radius), 0, 0, 90, color, thickness, lineType) | |
img = cv2.ellipse(img, (x2 - radius, y1 + radius), (radius, radius), 0, 90, 180, color, thickness, lineType) | |
img = cv2.ellipse(img, (x2 - radius, y2 - radius), (radius, radius), 0, 180, 270, color, thickness, lineType) | |
img = cv2.ellipse(img, (x1 + radius, y2 - radius), (radius, radius), 0, 270, 360, color, thickness, lineType) | |
img = cv2.rectangle(img, (x1, y1 + radius), (x2, y2 - radius), color, thickness, lineType) | |
img = cv2.rectangle(img, (x1 + radius, y1), (x2 - radius, y2), color, thickness, lineType) | |
else: | |
img = cv2.rectangle(img, pt1, pt2, color, thickness, lineType) | |
return img | |
def draw_main_title(overlay, config: OverlayConfig, width: int, colors: Dict): | |
""" | |
Draw the main title text centered at the top of the video frame. | |
This function renders "Ejercicio de Salto" (Jump Exercise) as the main title | |
using specified font configuration and centers it horizontally. | |
Args: | |
overlay (numpy.ndarray): Image overlay to draw on (modified in-place) | |
config (OverlayConfig): Configuration object containing font settings | |
width (int): Width of the video frame in pixels | |
colors (Dict): Dictionary containing color definitions | |
Returns: | |
None | |
Side Effects: | |
- Draws text on the overlay image using white color | |
- Text is positioned at the top center of the frame | |
- Uses config.font_scale_title_main and config.font_thickness_title_main | |
""" | |
title_text = "Ejercicio de Salto" | |
title_text_size = cv2.getTextSize(title_text, config.font, config.font_scale_title_main, config.font_thickness_title_main)[0] | |
title_x = (width - title_text_size[0]) // 2 | |
title_y = config.title_y_offset | |
cv2.putText(overlay, title_text, (title_x, title_y), config.font, config.font_scale_title_main, | |
colors["white"], config.font_thickness_title_main, cv2.LINE_AA) | |
def calculate_metric_box_size(title: str, value: str, config: OverlayConfig) -> Tuple[int, int]: | |
""" | |
Calculate the required dimensions for a metric display box. | |
This function determines the width and height needed to display a metric | |
with its title and value, including padding and spacing requirements. | |
Args: | |
title (str): The metric title text (e.g., "SALTO ALTO") | |
value (str): The metric value text (e.g., "2.15 m") | |
config (OverlayConfig): Configuration object with font and spacing settings | |
Returns: | |
Tuple[int, int]: A tuple containing: | |
- bg_width: Required width in pixels for the metric box | |
- bg_height: Required height in pixels for the metric box | |
Note: | |
- Width is based on the maximum of title and value text widths | |
- Height accounts for both text lines plus vertical padding | |
- Includes horizontal padding on both sides | |
""" | |
title_size = cv2.getTextSize(title, config.font, config.font_scale_title_metric, config.font_thickness_metric)[0] | |
value_size = cv2.getTextSize(value, config.font, config.font_scale_value, config.font_thickness_metric)[0] | |
bg_width = max(title_size[0], value_size[0]) + 2 * config.padding_horizontal | |
bg_height = config.line_height_title_metric + config.line_height_value + 2 * config.padding_vertical | |
return bg_width, bg_height | |
def draw_metric_box(overlay, title: str, value: str, x: int, y: int, bg_width: int, bg_height: int, | |
config: OverlayConfig, colors: Dict): | |
""" | |
Draw a styled metric box with title and value text. | |
This function creates a rounded rectangle background and draws metric information | |
with proper text alignment and styling for video overlay display. | |
Args: | |
overlay (numpy.ndarray): Image overlay to draw on (modified in-place) | |
title (str): Metric title text (displayed in smaller font) | |
value (str): Metric value text (displayed in larger font) | |
x (int): X-coordinate of box top-left corner | |
y (int): Y-coordinate of box top-left corner | |
bg_width (int): Width of the background box in pixels | |
bg_height (int): Height of the background box in pixels | |
config (OverlayConfig): Configuration object with styling settings | |
colors (Dict): Dictionary containing color definitions | |
Returns: | |
numpy.ndarray: The modified overlay with the metric box drawn | |
Side Effects: | |
- Draws a rounded rectangle background with gray fill and white border | |
- Centers title text in light gray color | |
- Centers value text in white color below the title | |
- Uses different font scales for title and value | |
""" | |
pt1 = (x, y) | |
pt2 = (x + bg_width, y + bg_height) | |
# Draw background | |
overlay = draw_rounded_rect(overlay, pt1, pt2, colors["gray"], cv2.FILLED, cv2.LINE_AA, config.corner_radius) | |
cv2.rectangle(overlay, pt1, pt2, colors["white"], config.border_thickness, cv2.LINE_AA) | |
# Draw title | |
title_size = cv2.getTextSize(title, config.font, config.font_scale_title_metric, config.font_thickness_metric)[0] | |
title_x = x + (bg_width - title_size[0]) // 2 | |
title_y = y + config.padding_vertical + config.line_height_title_metric // 2 + 2 | |
cv2.putText(overlay, title, (title_x, title_y), config.font, config.font_scale_title_metric, | |
colors["light_gray"], config.font_thickness_metric, cv2.LINE_AA) | |
# Draw value | |
value_size = cv2.getTextSize(value, config.font, config.font_scale_value, config.font_thickness_metric)[0] | |
value_x = x + (bg_width - value_size[0]) // 2 | |
value_y = y + config.padding_vertical + config.line_height_title_metric + config.line_height_value // 2 + 5 | |
cv2.putText(overlay, value, (value_x, value_y), config.font, config.font_scale_value, | |
colors["white"], config.font_thickness_metric, cv2.LINE_AA) | |
return overlay | |
def calculate_positions(width: int, height: int, last_detected_ankles_y: Optional[float], | |
initial_left_shoulder_x: Optional[int], initial_right_shoulder_x: Optional[int], | |
config: OverlayConfig, horizontal_offset_factor: float, | |
metrics_below_feet_offset: int) -> Dict[str, Tuple[int, int]]: | |
""" | |
Calculate optimal positions for all metric display boxes on the video frame. | |
This function determines where to place metric boxes based on detected body positions | |
to avoid overlapping with the person while maintaining good visibility. | |
Args: | |
width (int): Video frame width in pixels | |
height (int): Video frame height in pixels | |
last_detected_ankles_y (Optional[float]): Y-coordinate of last detected ankles | |
initial_left_shoulder_x (Optional[int]): X-coordinate of left shoulder reference | |
initial_right_shoulder_x (Optional[int]): X-coordinate of right shoulder reference | |
config (OverlayConfig): Configuration object with layout settings | |
horizontal_offset_factor (float): Factor for horizontal positioning relative to shoulders | |
metrics_below_feet_offset (int): Vertical offset below feet for metric placement | |
Returns: | |
Dict[str, Tuple[int, int]]: Dictionary mapping metric names to (x, y) positions: | |
- "relativo": Position for relative jump metric | |
- "alto": Position for high jump metric | |
- "reps": Position for repetitions counter | |
- "velocidad": Position for velocity metric (if ankles detected) | |
- "potencia": Position for power metric (if ankles detected) | |
Note: | |
- Positions are calculated to avoid overlapping with the detected person | |
- Some metrics are positioned relative to body parts when available | |
- Falls back to default positions when body parts are not detected | |
""" | |
positions = {} | |
# Relative jump box (left side, dynamically positioned) | |
relativo_bg_width, relativo_bg_height = calculate_metric_box_size("SALTO RELATIVO", "0.00 m", config) | |
x_relativo = 20 | |
if last_detected_ankles_y is not None: | |
y_relativo = int(last_detected_ankles_y - relativo_bg_height - 10) | |
if y_relativo < config.title_y_offset + 50: | |
y_relativo = int(last_detected_ankles_y + metrics_below_feet_offset) | |
else: | |
y_relativo = height - 150 | |
positions["relativo"] = (x_relativo, y_relativo) | |
# High jump box (top right) | |
alto_bg_width, alto_bg_height = calculate_metric_box_size("SALTO ALTO", "0.00 m", config) | |
x_alto = width - alto_bg_width - 20 | |
if initial_right_shoulder_x is not None: | |
available_space = width - initial_right_shoulder_x | |
x_alto_calculated = initial_right_shoulder_x + int(available_space * (1 - horizontal_offset_factor)) - alto_bg_width | |
if (x_alto_calculated > x_relativo + relativo_bg_width + config.spacing_horizontal + 10 and | |
x_alto_calculated + alto_bg_width < width - 10): | |
x_alto = x_alto_calculated | |
positions["alto"] = (x_alto, config.metrics_y_offset_alto) | |
# Repetitions box (below relative jump) | |
positions["reps"] = (x_relativo, y_relativo + relativo_bg_height + 10) | |
# Velocity and power boxes (centered below feet) | |
if last_detected_ankles_y is not None: | |
velocidad_bg_width, velocidad_bg_height = calculate_metric_box_size("VELOCIDAD VERTICAL", "0.00 m/s", config) | |
x_velocidad = int(width / 2 - velocidad_bg_width / 2) | |
y_velocidad = int(last_detected_ankles_y + metrics_below_feet_offset + velocidad_bg_height) | |
positions["velocidad"] = (x_velocidad, y_velocidad - velocidad_bg_height) | |
positions["potencia"] = (x_velocidad, y_velocidad + 5) | |
return positions | |
def draw_metrics_overlay(frame, max_jump_height, salto_alto, velocity_vertical, peak_power_sayer, | |
repetition_count, last_detected_ankles_y, initial_left_shoulder_x, | |
initial_right_shoulder_x, width, height, colors, metrics_below_feet_offset=20, | |
horizontal_offset_factor=0.75): | |
""" | |
Draw metrics overlay on the frame. | |
Args: | |
frame: Input frame | |
max_jump_height: Maximum jump height in meters | |
salto_alto: Absolute jump height in meters | |
velocity_vertical: Vertical velocity in m/s | |
peak_power_sayer: Peak power in watts | |
repetition_count: Number of repetitions | |
last_detected_ankles_y: Y-coordinate of last detected ankles | |
initial_left_shoulder_x: X-coordinate of left shoulder | |
initial_right_shoulder_x: X-coordinate of right shoulder | |
width: Frame width | |
height: Frame height | |
colors: Dictionary with color values | |
metrics_below_feet_offset: Offset for metrics below feet | |
horizontal_offset_factor: Factor for horizontal offset | |
Returns: | |
Frame with metrics overlay | |
""" | |
overlay = frame.copy() | |
config = OverlayConfig() | |
# Draw main title | |
draw_main_title(overlay, config, width, colors) | |
# Calculate positions for all metric boxes | |
positions = calculate_positions(width, height, last_detected_ankles_y, | |
initial_left_shoulder_x, initial_right_shoulder_x, | |
config, horizontal_offset_factor, metrics_below_feet_offset) | |
# Draw relative jump box | |
if "relativo" in positions: | |
relativo_value = f"{max(0, max_jump_height):.2f} m" | |
bg_width, bg_height = calculate_metric_box_size("SALTO RELATIVO", relativo_value, config) | |
x, y = positions["relativo"] | |
overlay = draw_metric_box(overlay, "SALTO RELATIVO", relativo_value, x, y, bg_width, bg_height, config, colors) | |
# Draw high jump box | |
if "alto" in positions: | |
alto_value = f"{max(0, salto_alto):.2f} m" | |
bg_width, bg_height = calculate_metric_box_size("SALTO ALTO", alto_value, config) | |
x, y = positions["alto"] | |
overlay = draw_metric_box(overlay, "SALTO ALTO", alto_value, x, y, bg_width, bg_height, config, colors) | |
# Draw repetitions box | |
if "reps" in positions: | |
reps_value = f"{repetition_count}" | |
bg_width, bg_height = calculate_metric_box_size("REPETICIONES", reps_value, config) | |
x, y = positions["reps"] | |
overlay = draw_metric_box(overlay, "REPETICIONES", reps_value, x, y, bg_width, bg_height, config, colors) | |
# Draw velocity box (only if ankles detected) | |
if "velocidad" in positions: | |
velocidad_value = f"{abs(velocity_vertical):.2f} m/s" | |
bg_width, bg_height = calculate_metric_box_size("VELOCIDAD VERTICAL", velocidad_value, config) | |
x, y = positions["velocidad"] | |
overlay = draw_metric_box(overlay, "VELOCIDAD VERTICAL", velocidad_value, x, y, bg_width, bg_height, config, colors) | |
# Draw power box (only if ankles detected) | |
if "potencia" in positions: | |
potencia_value = f"{peak_power_sayer:.2f} W" | |
bg_width, bg_height = calculate_metric_box_size("POTENCIA SAYER", potencia_value, config) | |
x, y = positions["potencia"] | |
overlay = draw_metric_box(overlay, "POTENCIA SAYER", potencia_value, x, y, bg_width, bg_height, config, colors) | |
# Blend overlay with original frame | |
result = cv2.addWeighted(overlay, config.alpha, frame, 1 - config.alpha, 0) | |
return result |