ShesterG's picture
Add application file
ceeabec
import cv2
import numpy as np
import os
import pickle
import gzip
from datetime import datetime
from pathlib import Path
import decord
import argparse
import json
import time
from typing import Dict, Optional, Tuple, List, Union, Any
import tempfile
class HandExtractor:
"""
A class for extracting hand regions from videos based on pose landmarks.
"""
def __init__(self, output_size: Tuple[int, int] = (224, 224),
scale_factor: float = 1.5, distance_threshold: float = 0.1):
"""
Initialize the HandExtractor.
Args:
output_size: Size of the output hand frames (width, height)
scale_factor: Scale factor for bounding box expansion
distance_threshold: Distance threshold for hand-pose matching
"""
self.output_size = output_size
self.scale_factor = scale_factor
self.distance_threshold = distance_threshold
def resize_frame(self, frame: np.ndarray, frame_size: Tuple[int, int]) -> Optional[np.ndarray]:
"""Resize frame to specified size."""
if frame is not None and frame.size > 0:
return cv2.resize(frame, frame_size, interpolation=cv2.INTER_AREA)
else:
return None
def crop_frame(self, image: np.ndarray, bounding_box: Tuple[int, int, int, int]) -> np.ndarray:
"""Crop frame using bounding box."""
x, y, w, h = bounding_box
cropped_frame = image[y:y + h, x:x + w]
return cropped_frame
def get_bounding_box(self, landmarks: List[List[float]], image_shape: Tuple[int, int, int],
scale_factor: float = 1.2) -> Tuple[int, int, int, int]:
"""Get bounding box from landmarks."""
ih, iw, _ = image_shape
landmarks_px = np.array([(int(l[0] * iw), int(l[1] * ih)) for l in landmarks])
center_x, center_y = np.mean(landmarks_px, axis=0, dtype=int)
xb, yb, wb, hb = cv2.boundingRect(landmarks_px)
box_size = max(wb, hb)
half_size = box_size // 2
x = center_x - half_size
y = center_y - half_size
w = box_size
h = box_size
w_padding = int((scale_factor - 1) * w / 2)
h_padding = int((scale_factor - 1) * h / 2)
x -= w_padding
y -= h_padding
w += 2 * w_padding
h += 2 * h_padding
return x, y, w, h
def adjust_bounding_box(self, bounding_box: Tuple[int, int, int, int],
image_shape: Tuple[int, int, int]) -> Tuple[int, int, int, int]:
"""Adjust bounding box to fit within image boundaries."""
x, y, w, h = bounding_box
ih, iw, _ = image_shape
# Adjust x-coordinate if the bounding box extends beyond the image's right edge
if x + w > iw:
x = iw - w
# Adjust y-coordinate if the bounding box extends beyond the image's bottom edge
if y + h > ih:
y = ih - h
# Ensure bounding box's x and y coordinates are not negative
x = max(x, 0)
y = max(y, 0)
return x, y, w, h
def select_hands(self, pose_landmarks: List[List[float]], hand_landmarks: Optional[List[List[List[float]]]],
image_shape: Tuple[int, int, int]) -> Tuple[Optional[List[List[float]]], Optional[List[List[float]]]]:
"""
Select left and right hands from detected hand landmarks based on pose wrist positions.
Args:
pose_landmarks: Pose landmarks from MediaPipe
hand_landmarks: Hand landmarks from MediaPipe
image_shape: Shape of the image (height, width, channels)
Returns:
Tuple of (left_hand_landmarks, right_hand_landmarks)
"""
if hand_landmarks is None:
return None, None
# Get wrist landmarks from pose (indices 15 and 16 for left and right wrists)
left_wrist_from_pose = pose_landmarks[15]
right_wrist_from_pose = pose_landmarks[16]
# Get wrist landmarks from hand detections (index 0 is wrist in hand landmarks)
wrist_from_hand = [hand_landmarks[i][0] for i in range(len(hand_landmarks))]
# Match right hand
right_hand_landmarks = None
if right_wrist_from_pose is not None:
minimum_distance = 100
best_hand_idx = 0
for i in range(len(hand_landmarks)):
distance = np.linalg.norm(np.array(right_wrist_from_pose[0:2]) - np.array(wrist_from_hand[i][0:2]))
if distance < minimum_distance:
minimum_distance = distance
best_hand_idx = i
if minimum_distance < self.distance_threshold:
right_hand_landmarks = hand_landmarks[best_hand_idx]
# Match left hand
left_hand_landmarks = None
if left_wrist_from_pose is not None:
minimum_distance = 100
best_hand_idx = 0
for i in range(len(hand_landmarks)):
distance = np.linalg.norm(np.array(left_wrist_from_pose[0:2]) - np.array(wrist_from_hand[i][0:2]))
if distance < minimum_distance:
minimum_distance = distance
best_hand_idx = i
if minimum_distance < self.distance_threshold:
left_hand_landmarks = hand_landmarks[best_hand_idx]
return left_hand_landmarks, right_hand_landmarks
def extract_hand_frames(self, video_input, landmarks_data: Dict[int, Any]) -> Tuple[List[np.ndarray], List[np.ndarray]]:
"""
Extract hand frames from video based on landmarks.
Args:
video_input: Either a path to video file (str) or a decord.VideoReader object
landmarks_data: Dictionary containing pose and hand landmarks for each frame
Returns:
Tuple of (left_hand_frames, right_hand_frames) as lists of numpy arrays
"""
# Handle different input types
if isinstance(video_input, str):
video_path = Path(video_input)
if not video_path.exists():
raise FileNotFoundError(f"Video file not found: {video_input}")
video = decord.VideoReader(str(video_path))
# elif hasattr(video_input, '__len__') and hasattr(video_input, '__getitem__'):
else:
video = video_input
# else:
# raise TypeError("video_input must be either a file path (str) or a VideoReader object")
left_hand_frames = []
right_hand_frames = []
prev_left_frame = None
prev_right_frame = None
prev_landmarks = None
for i in range(len(video)):
# frame = video[i].asnumpy()
frame = video[i]
if hasattr(video, 'seek'):
video.seek(0)
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
# Get landmarks for this frame
frame_landmarks = landmarks_data.get(i, None)
# Handle missing landmarks
if frame_landmarks is None:
if prev_landmarks is not None:
frame_landmarks = prev_landmarks
else:
# Use blank frames if no landmarks available
left_hand_frames.append(np.zeros((*self.output_size, 3), dtype=np.uint8))
right_hand_frames.append(np.zeros((*self.output_size, 3), dtype=np.uint8))
continue
else:
prev_landmarks = frame_landmarks
# Check if pose landmarks exist
if frame_landmarks.get('pose_landmarks') is None:
# Use previous frames or blank frames
if prev_left_frame is not None:
left_hand_frames.append(prev_left_frame)
else:
left_hand_frames.append(np.zeros((*self.output_size, 3), dtype=np.uint8))
if prev_right_frame is not None:
right_hand_frames.append(prev_right_frame)
else:
right_hand_frames.append(np.zeros((*self.output_size, 3), dtype=np.uint8))
continue
# Select hands based on pose landmarks
left_hand_landmarks, right_hand_landmarks = self.select_hands(
frame_landmarks['pose_landmarks'][0],
frame_landmarks.get('hand_landmarks'),
frame_rgb.shape
)
# Process left hand
if left_hand_landmarks is not None:
left_box = self.get_bounding_box(left_hand_landmarks, frame_rgb.shape, self.scale_factor)
left_box = self.adjust_bounding_box(left_box, frame_rgb.shape)
left_frame = self.crop_frame(frame_rgb, left_box)
left_frame = self.resize_frame(left_frame, self.output_size)
left_hand_frames.append(left_frame)
prev_left_frame = left_frame
elif prev_left_frame is not None:
left_hand_frames.append(prev_left_frame)
else:
left_hand_frames.append(np.zeros((*self.output_size, 3), dtype=np.uint8))
# Process right hand
if right_hand_landmarks is not None:
right_box = self.get_bounding_box(right_hand_landmarks, frame_rgb.shape, self.scale_factor)
right_box = self.adjust_bounding_box(right_box, frame_rgb.shape)
right_frame = self.crop_frame(frame_rgb, right_box)
right_frame = self.resize_frame(right_frame, self.output_size)
right_hand_frames.append(right_frame)
prev_right_frame = right_frame
elif prev_right_frame is not None:
right_hand_frames.append(prev_right_frame)
else:
right_hand_frames.append(np.zeros((*self.output_size, 3), dtype=np.uint8))
return left_hand_frames, right_hand_frames
def extract_and_save_hand_videos(self, video_input, landmarks_data: Dict[int, Any],
output_dir: str, video_name: Optional[str] = None) -> Tuple[str, str]:
"""
Extract hand frames and save as video files.
Args:
video_input: Either a path to video file (str) or a decord.VideoReader object
landmarks_data: Dictionary containing pose and hand landmarks for each frame
output_dir: Directory to save the hand videos
video_name: Name for output videos (auto-generated if not provided)
Returns:
Tuple of (left_hand_video_path, right_hand_video_path)
"""
# Handle video input and get FPS
if isinstance(video_input, str):
video_path = Path(video_input)
if not video_path.exists():
raise FileNotFoundError(f"Video file not found: {video_input}")
video = decord.VideoReader(str(video_path))
if video_name is None:
video_name = video_path.stem
# elif hasattr(video_input, '__len__') and hasattr(video_input, '__getitem__'):
else:
video = video_input
if video_name is None:
video_name = "video"
# else:
# raise TypeError("video_input must be either a file path (str) or a VideoReader object")
fps = video.get_avg_fps() if hasattr(video, 'get_avg_fps') else 30.0
# Create output directory
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)
# Define output paths
left_hand_path = output_path / f"{video_name}_hand1.mp4"
right_hand_path = output_path / f"{video_name}_hand2.mp4"
# Remove existing files
if left_hand_path.exists():
left_hand_path.unlink()
if right_hand_path.exists():
right_hand_path.unlink()
# Create video writers
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
left_writer = cv2.VideoWriter(str(left_hand_path), fourcc, fps, self.output_size)
right_writer = cv2.VideoWriter(str(right_hand_path), fourcc, fps, self.output_size)
try:
# Extract hand frames
left_frames, right_frames = self.extract_hand_frames(video, landmarks_data)
# Write frames to video files
for left_frame, right_frame in zip(left_frames, right_frames):
left_writer.write(left_frame)
right_writer.write(right_frame)
finally:
# Clean up
left_writer.release()
right_writer.release()
del left_writer
del right_writer
return str(left_hand_path), str(right_hand_path)
# Convenience function for backward compatibility
def extract_hand_frames(video_input, landmarks_data: Dict[int, Any],
output_size: Tuple[int, int] = (224, 224)) -> Tuple[List[np.ndarray], List[np.ndarray]]:
"""
Convenience function to extract hand frames from video.
Args:
video_input: Either a path to video file (str) or a decord.VideoReader object
landmarks_data: Dictionary containing pose and hand landmarks for each frame
output_size: Size of the output hand frames (width, height)
Returns:
Tuple of (left_hand_frames, right_hand_frames) as lists of numpy arrays
"""
extractor = HandExtractor(output_size=output_size)
return extractor.extract_hand_frames(video_input, landmarks_data)
def video_holistic(video_file: str, hand_path: str, problem_file_path: str, pose_path: str):
"""
Original function for backward compatibility with command-line usage.
"""
try:
video = decord.VideoReader(video_file)
fps = video.get_avg_fps()
video_name = Path(video_file).stem
clip_hand1_path = Path(hand_path) / f"{video_name}_hand1.mp4"
clip_hand2_path = Path(hand_path) / f"{video_name}_hand2.mp4"
landmark_json_path = Path(pose_path) / f"{video_name}_pose.json"
# Load landmarks
with open(landmark_json_path, 'r') as rd:
landmarks_data = json.load(rd)
# Convert string keys to integers
landmarks_data = {int(k): v for k, v in landmarks_data.items()}
# Extract hand videos
extractor = HandExtractor()
extractor.extract_and_save_hand_videos(video, landmarks_data, hand_path, video_name)
except Exception as e:
print(f"Error processing {video_file}: {e}")
with open(problem_file_path, "a") as p:
p.write(video_file + "\n")
# Utility functions for batch processing
def load_file(filename: str):
"""Load a pickled and gzipped file."""
with gzip.open(filename, "rb") as f:
return pickle.load(f)
def is_string_in_file(file_path: str, target_string: str) -> bool:
"""Check if a string exists in a file."""
try:
with Path(file_path).open("r") as f:
for line in f:
if target_string in line:
return True
return False
except Exception as e:
print(f"Error: {e}")
return False
def main():
"""Main function for command-line usage."""
parser = argparse.ArgumentParser()
parser.add_argument('--index', type=int, required=True,
help='index of the sub_list to work with')
parser.add_argument('--batch_size', type=int, required=True,
help='batch size')
parser.add_argument('--time_limit', type=int, required=True,
help='time limit')
parser.add_argument('--files_list', type=str, required=True,
help='files list')
parser.add_argument('--problem_file_path', type=str, required=True,
help='problem file path')
parser.add_argument('--pose_path', type=str, required=True,
help='pose path')
parser.add_argument('--hand_path', type=str, required=True,
help='hand path')
args = parser.parse_args()
start_time = time.time()
# Create directories if they do not exist
Path(args.hand_path).mkdir(parents=True, exist_ok=True)
# Load files list
fixed_list = load_file(args.files_list)
# Create problem file if it doesn't exist
if not os.path.exists(args.problem_file_path):
with open(args.problem_file_path, "w") as f:
f.write("")
# Process videos in batches
video_batches = [fixed_list[i:i + args.batch_size] for i in range(0, len(fixed_list), args.batch_size)]
for video_file in video_batches[args.index]:
current_time = time.time()
if current_time - start_time > args.time_limit:
print("Time limit reached. Stopping execution.")
break
video_name = Path(video_file).stem
clip_hand2_path = Path(args.hand_path) / f"{video_name}_hand2.mp4"
if clip_hand2_path.exists():
print(f"Skipping {video_file} - output already exists")
continue
elif is_string_in_file(args.problem_file_path, video_file):
print(f"Skipping {video_file} - found in problem file")
continue
else:
try:
print(f"Processing {video_file}")
video_holistic(video_file, args.hand_path, args.problem_file_path, args.pose_path)
print(f"Successfully processed {video_file}")
except Exception as e:
print(f"Error processing {video_file}: {e}")
with open(args.problem_file_path, "a") as p:
p.write(video_file + "\n")
if __name__ == "__main__":
main()