import cv2 import os import shutil import time from SinglePhoto import FaceSwapper def extract_frames(video_path, frames_dir): if not os.path.exists(frames_dir): os.makedirs(frames_dir) last_idx = -1 else: # Find the last extracted frame index existing = [f for f in os.listdir(frames_dir) if f.startswith("frame_") and f.endswith(".jpg")] if existing: last_idx = max([int(f.split("_")[1].split(".")[0]) for f in existing]) else: last_idx = -1 cap = cv2.VideoCapture(video_path) frame_paths = [] idx = 0 while True: ret, frame = cap.read() if not ret: break frame_path = os.path.join(frames_dir, f"frame_{idx:05d}.jpg") if idx > last_idx: cv2.imwrite(frame_path, frame) frame_paths.append(frame_path) idx += 1 cap.release() return frame_paths def frames_to_video(frames_dir, output_video_path, fps): frames = sorted([os.path.join(frames_dir, f) for f in os.listdir(frames_dir) if f.endswith('.jpg')]) if not frames: print("No frames found in directory.") return first_frame = cv2.imread(frames[0]) height, width, layers = first_frame.shape fourcc = cv2.VideoWriter_fourcc(*'mp4v') out = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height)) for frame_path in frames: frame = cv2.imread(frame_path) out.write(frame) out.release() def main(): # Use files from VideoSwapping folder video_path = os.path.join("VideoSwapping", "data_dst.mp4") source_image_path = os.path.join("VideoSwapping", "data_src.jpg") frames_dir = os.path.join("VideoSwapping", "video_frames") swapped_dir = os.path.join("VideoSwapping", "swapped_frames") output_video_path = os.path.join("VideoSwapping", "output_swapped_video.mp4") source_face_idx = 1 # Ask user for target_face_idx while True: try: user_input = input("Enter target_face_idx (default is 1): ").strip() if user_input == "": dest_face_idx = 1 break dest_face_idx = int(user_input) break except ValueError: print("Invalid input. Please enter an integer value.") print("Choose an option:") print("1. Extract frames only") print("2. Face swap only (requires extracted frames)") print("3. Both extract frames and face swap") choice = input("Enter 1, 2, or 3: ").strip() frame_paths = [] if choice == "1" or choice == "3": print("Extracting frames from video (resuming if needed)...") frame_paths = extract_frames(video_path, frames_dir) print(f"Extracted {len(frame_paths)} frames to {frames_dir}.") if choice == "1": return if choice == "2": # If only face swap, ensure frames are present if not os.path.exists(frames_dir): print("Frames directory does not exist. Please extract frames first.") return frame_paths = sorted([os.path.join(frames_dir, f) for f in os.listdir(frames_dir) if f.endswith('.jpg')]) if choice == "2" or choice == "3": # Prepare output directory if not os.path.exists(swapped_dir): os.makedirs(swapped_dir) # Initialize face swapper swapper = FaceSwapper() # Swap faces on each frame print("Swapping faces on frames...") start_time = time.time() for idx, frame_path in enumerate(frame_paths): frame_name = os.path.basename(frame_path) # Replace 'frame_' with 'swapped_' in the filename if frame_name.startswith("frame_"): swapped_name = "swapped_" + frame_name[len("frame_"):] else: swapped_name = "swapped_" + frame_name out_path = os.path.join(swapped_dir, swapped_name) if os.path.exists(out_path): # Skip already swapped frames print(f"Frame {idx+1}/{len(frame_paths)} already swapped, skipping...", end='\r') continue try: try: swapped = swapper.swap_faces( source_path=source_image_path, source_face_idx=source_face_idx, target_path=frame_path, target_face_idx=dest_face_idx ) except ValueError as ve: if "Target image contains" in str(ve): print(f"\nFrame {idx}: Target face idx {dest_face_idx} not found, trying with idx 1.",end='\r') swapped = swapper.swap_faces( source_path=source_image_path, source_face_idx=source_face_idx, target_path=frame_path, target_face_idx=1 ) else: raise ve cv2.imwrite(out_path, swapped) except Exception as e: print(f"\nFrame {idx}: {e}") # Optionally, copy the original frame if swap fails cv2.imwrite(out_path, cv2.imread(frame_path)) # Estimate time left elapsed = time.time() - start_time avg_time = elapsed / (idx + 1) remaining = avg_time * (len(frame_paths) - (idx + 1)) mins, secs = divmod(int(remaining), 60) print(f"Swapping frame {idx+1}/{len(frame_paths)} | Est. time left: {mins:02d}:{secs:02d}", end='\r') print() # Move to the next line after the loop # Get FPS from original video cap = cv2.VideoCapture(video_path) fps = cap.get(cv2.CAP_PROP_FPS) cap.release() # Combine swapped frames into video print("Combining swapped frames into video...") frames_to_video(swapped_dir, output_video_path, fps) print(f"Done! Output video saved as {output_video_path}") # Ask user if they want to keep the extracted frames and swapped images answer = input("Do you want to keep the extracted frames and swapped images? (y/n): ").strip().lower() if answer == 'n': try: shutil.rmtree(frames_dir) shutil.rmtree(swapped_dir) print("Temporary folders deleted.") except Exception as e: print(f"Error deleting folders: {e}") else: print("Temporary folders kept.") if __name__ == "__main__": main()