import mediapipe as mp from mediapipe.tasks import python from mediapipe.tasks.python import vision import cv2 import numpy as np import json from pathlib import Path import decord from typing import Dict, Optional, Tuple, Any class HolisticDetector: """ A class for detecting face, hand, and pose landmarks in videos using MediaPipe. """ def __init__(self, face_model_path: str, hand_model_path: str, min_detection_confidence: float = 0.1, min_hand_detection_confidence: float = 0.05, max_faces: int = 6, max_hands: int = 6): """ Initialize the HolisticDetector with model paths and configuration. Args: face_model_path: Path to the face detection model hand_model_path: Path to the hand detection model min_detection_confidence: Minimum confidence for pose detection min_hand_detection_confidence: Minimum confidence for hand detection max_faces: Maximum number of faces to detect max_hands: Maximum number of hands to detect """ self.face_model_path = face_model_path self.hand_model_path = hand_model_path self.min_detection_confidence = min_detection_confidence self.min_hand_detection_confidence = min_hand_detection_confidence self.max_faces = max_faces self.max_hands = max_hands self._initialize_detectors() def _initialize_detectors(self): """Initialize the MediaPipe detectors.""" # Initialize face detector base_options_face = python.BaseOptions(model_asset_path=self.face_model_path) options_face = vision.FaceLandmarkerOptions( base_options=base_options_face, output_face_blendshapes=True, output_facial_transformation_matrixes=True, num_faces=self.max_faces ) self.face_detector = vision.FaceLandmarker.create_from_options(options_face) # Initialize hand detector base_options_hand = python.BaseOptions(model_asset_path=self.hand_model_path) options_hand = vision.HandLandmarkerOptions( base_options=base_options_hand, num_hands=self.max_hands, min_hand_detection_confidence=self.min_hand_detection_confidence ) self.hand_detector = vision.HandLandmarker.create_from_options(options_hand) # Initialize holistic model for pose self.mp_holistic = mp.solutions.holistic.Holistic( min_detection_confidence=self.min_detection_confidence ) def detect_frame_landmarks(self, image: np.ndarray) -> Tuple[Dict[str, int], Dict[str, Any]]: """ Detect landmarks in a single frame. Args: image: Input image as numpy array Returns: Tuple of (bounding_boxes_count, landmarks_data) """ results = self.mp_holistic.process(image) mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=image) face_prediction = self.face_detector.detect(mp_image) hand_prediction = self.hand_detector.detect(mp_image) bounding_boxes = {} landmarks_data = {} # Process face landmarks if face_prediction.face_landmarks: bounding_boxes['#face'] = len(face_prediction.face_landmarks) landmarks_data['face_landmarks'] = [] for face in face_prediction.face_landmarks: landmarks_face = [[landmark.x, landmark.y, landmark.z] for landmark in face] landmarks_data['face_landmarks'].append(landmarks_face) else: bounding_boxes['#face'] = 0 landmarks_data['face_landmarks'] = None # Process hand landmarks if hand_prediction.hand_landmarks: bounding_boxes['#hands'] = len(hand_prediction.hand_landmarks) landmarks_data['hand_landmarks'] = [] for hand in hand_prediction.hand_landmarks: landmarks_hand = [[landmark.x, landmark.y, landmark.z] for landmark in hand] landmarks_data['hand_landmarks'].append(landmarks_hand) else: bounding_boxes['#hands'] = 0 landmarks_data['hand_landmarks'] = None # Process pose landmarks if results.pose_landmarks: bounding_boxes['#pose'] = 1 landmarks_data['pose_landmarks'] = [] pose_landmarks = [[landmark.x, landmark.y, landmark.z] for landmark in results.pose_landmarks.landmark] landmarks_data['pose_landmarks'].append(pose_landmarks) else: bounding_boxes['#pose'] = 0 landmarks_data['pose_landmarks'] = None return bounding_boxes, landmarks_data def process_video(self, video_input, save_results: bool = False, output_dir: Optional[str] = None, video_name: Optional[str] = None) -> Dict[int, Any]: """ Process a video and extract landmarks from all frames. Args: video_input: Either a path to video file (str) or a decord.VideoReader object save_results: Whether to save results to files output_dir: Directory to save results (required if save_results=True) video_name: Name for output files (required if save_results=True and video_input is VideoReader) Returns: Dictionary containing landmarks for each frame Raises: FileNotFoundError: If video file doesn't exist ValueError: If save_results=True but output_dir is None, or if video_name is None when needed TypeError: If video_input is neither string nor VideoReader """ if save_results and output_dir is None: raise ValueError("output_dir must be provided when save_results=True") # Handle different input types if isinstance(video_input, str): # Input is a file path video_path = Path(video_input) if not video_path.exists(): raise FileNotFoundError(f"Video file not found: {video_input}") try: video = decord.VideoReader(str(video_path)) except Exception as e: raise RuntimeError(f"Error loading video {video_input}: {e}") file_name = video_path.stem # elif hasattr(video_input, '__len__') and hasattr(video_input, '__getitem__'): else: # Input is a VideoReader object or similar video = video_input if save_results and video_name is None: raise ValueError("video_name must be provided when save_results=True and video_input is a VideoReader object") file_name = video_name or "video" # else: # raise TypeError("video_input must be either a file path (str) or a VideoReader object") result_dict = {} stats = {} # Process each frame for i in range(len(video)): try: # frame_rgb = video[i].asnumpy() frame_rgb = video[i] if hasattr(video, 'seek'): video.seek(0) bounding_boxes, landmarks = self.detect_frame_landmarks(frame_rgb) result_dict[i] = landmarks stats[i] = bounding_boxes except Exception as e: print(f"Error processing frame {i}: {e}") result_dict[i] = None stats[i] = {'#face': 0, '#hands': 0, '#pose': 0} # Save results if requested if save_results: self._save_results(file_name, result_dict, stats, output_dir) return result_dict def process_video_frames(self, frames: list, save_results: bool = False, output_dir: Optional[str] = None, video_name: str = "video") -> Dict[int, Any]: """ Process a list of frames and extract landmarks. Args: frames: List of frame images as numpy arrays save_results: Whether to save results to files output_dir: Directory to save results (required if save_results=True) video_name: Name for output files Returns: Dictionary containing landmarks for each frame """ if save_results and output_dir is None: raise ValueError("output_dir must be provided when save_results=True") result_dict = {} stats = {} # Process each frame for i, frame in enumerate(frames): try: bounding_boxes, landmarks = self.detect_frame_landmarks(frame) result_dict[i] = landmarks stats[i] = bounding_boxes except Exception as e: print(f"Error processing frame {i}: {e}") result_dict[i] = None stats[i] = {'#face': 0, '#hands': 0, '#pose': 0} # Save results if requested if save_results: self._save_results(video_name, result_dict, stats, output_dir) return result_dict def _save_results(self, video_name: str, landmarks_data: Dict, stats_data: Dict, output_dir: str): """Save landmarks and stats to JSON files.""" output_path = Path(output_dir) output_path.mkdir(parents=True, exist_ok=True) # Save landmarks landmarks_file = output_path / f"{video_name}_pose.json" with open(landmarks_file, 'w') as f: json.dump(landmarks_data, f) # Save stats stats_file = output_path / f"{video_name}_stats.json" with open(stats_file, 'w') as f: json.dump(stats_data, f) def compute_video_stats(self, landmarks_data: Dict) -> Dict[str, Any]: """ Compute statistics from landmarks data. Args: landmarks_data: Dictionary containing landmarks for each frame Returns: Dictionary containing frame-by-frame stats and maximums """ stats = {} max_counts = {'#face': 0, '#hands': 0, '#pose': 0} for frame, landmarks in landmarks_data.items(): if landmarks is None: presence = {'#face': 0, '#hands': 0, '#pose': 0} else: presence = { '#face': len(landmarks.get('face_landmarks', [])) if landmarks.get('face_landmarks') else 0, '#hands': len(landmarks.get('hand_landmarks', [])) if landmarks.get('hand_landmarks') else 0, '#pose': len(landmarks.get('pose_landmarks', [])) if landmarks.get('pose_landmarks') else 0 } stats[frame] = presence # Update max counts for key in max_counts: max_counts[key] = max(max_counts[key], presence[key]) stats['max'] = max_counts return stats # Convenience function for backward compatibility and simple usage def video_holistic(video_input, face_model_path: str, hand_model_path: str, save_results: bool = False, output_dir: Optional[str] = None, video_name: Optional[str] = None) -> Dict[int, Any]: """ Convenience function to process a video and extract holistic landmarks. Args: video_input: Either a path to video file (str) or a decord.VideoReader object face_model_path: Path to the face detection model hand_model_path: Path to the hand detection model save_results: Whether to save results to files output_dir: Directory to save results video_name: Name for output files (required if save_results=True and video_input is VideoReader) Returns: Dictionary containing landmarks for each frame """ detector = HolisticDetector(face_model_path, hand_model_path) return detector.process_video(video_input, save_results, output_dir, video_name) # Utility functions for batch processing def load_file(filename: str): """Load a pickled and gzipped file.""" import pickle import gzip with gzip.open(filename, "rb") as f: return pickle.load(f) def is_string_in_file(file_path: str, target_string: str) -> bool: """Check if a string exists in a file.""" try: with Path(file_path).open("r") as f: for line in f: if target_string in line: return True return False except Exception as e: print(f"Error: {e}") return False def main(): """Main function for command-line usage.""" import argparse import time import os parser = argparse.ArgumentParser() parser.add_argument('--index', type=int, required=True, help='index of the sub_list to work with') parser.add_argument('--batch_size', type=int, required=True, help='batch size') parser.add_argument('--pose_path', type=str, required=True, help='path to where the pose data will be saved') parser.add_argument('--stats_path', type=str, required=True, help='path to where the stats data will be saved') parser.add_argument('--time_limit', type=int, required=True, help='time limit') parser.add_argument('--files_list', type=str, required=True, help='files list') parser.add_argument('--problem_file_path', type=str, required=True, help='problem file path') parser.add_argument('--face_model_path', type=str, required=True, help='face model path') parser.add_argument('--hand_model_path', type=str, required=True, help='hand model path') args = parser.parse_args() start_time = time.time() # Initialize detector detector = HolisticDetector(args.face_model_path, args.hand_model_path) # Load the files list fixed_list = load_file(args.files_list) # Create folders if they do not exist Path(args.pose_path).mkdir(parents=True, exist_ok=True) Path(args.stats_path).mkdir(parents=True, exist_ok=True) # Create problem file if it doesn't exist if not os.path.exists(args.problem_file_path): with open(args.problem_file_path, 'w') as f: pass # Process videos in batches video_batches = [fixed_list[i:i + args.batch_size] for i in range(0, len(fixed_list), args.batch_size)] for video_file in video_batches[args.index]: current_time = time.time() if current_time - start_time > args.time_limit: print("Time limit reached. Stopping execution.") break # Check if output files already exist video_name = Path(video_file).stem landmark_json_path = Path(args.pose_path) / f"{video_name}_pose.json" stats_json_path = Path(args.stats_path) / f"{video_name}_stats.json" if landmark_json_path.exists() and stats_json_path.exists(): print(f"Skipping {video_file} - output files already exist") continue elif is_string_in_file(args.problem_file_path, video_file): print(f"Skipping {video_file} - found in problem file") continue else: try: print(f"Processing {video_file}") result_dict = detector.process_video( video_file_path=video_file, save_results=True, output_dir=args.pose_path ) # Also save stats separately for compatibility stats = detector.compute_video_stats(result_dict) with open(stats_json_path, 'w') as f: json.dump(stats, f) print(f"Successfully processed {video_file}") except Exception as e: print(f"Error processing {video_file}: {e}") # Add to problem file with open(args.problem_file_path, "a") as p: p.write(video_file + "\n") if __name__ == "__main__": main()