Spaces:
Running
Running
File size: 6,641 Bytes
e354383 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 |
import cv2
import os
import shutil
import time
from SinglePhoto import FaceSwapper
def extract_frames(video_path, frames_dir):
if not os.path.exists(frames_dir):
os.makedirs(frames_dir)
last_idx = -1
else:
# Find the last extracted frame index
existing = [f for f in os.listdir(frames_dir) if f.startswith("frame_") and f.endswith(".jpg")]
if existing:
last_idx = max([int(f.split("_")[1].split(".")[0]) for f in existing])
else:
last_idx = -1
cap = cv2.VideoCapture(video_path)
frame_paths = []
idx = 0
while True:
ret, frame = cap.read()
if not ret:
break
frame_path = os.path.join(frames_dir, f"frame_{idx:05d}.jpg")
if idx > last_idx:
cv2.imwrite(frame_path, frame)
frame_paths.append(frame_path)
idx += 1
cap.release()
return frame_paths
def frames_to_video(frames_dir, output_video_path, fps):
frames = sorted([os.path.join(frames_dir, f) for f in os.listdir(frames_dir) if f.endswith('.jpg')])
if not frames:
print("No frames found in directory.")
return
first_frame = cv2.imread(frames[0])
height, width, layers = first_frame.shape
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height))
for frame_path in frames:
frame = cv2.imread(frame_path)
out.write(frame)
out.release()
def main():
# Use files from VideoSwapping folder
video_path = os.path.join("VideoSwapping", "data_dst.mp4")
source_image_path = os.path.join("VideoSwapping", "data_src.jpg")
frames_dir = os.path.join("VideoSwapping", "video_frames")
swapped_dir = os.path.join("VideoSwapping", "swapped_frames")
output_video_path = os.path.join("VideoSwapping", "output_swapped_video.mp4")
source_face_idx = 1
# Ask user for target_face_idx
while True:
try:
user_input = input("Enter target_face_idx (default is 1): ").strip()
if user_input == "":
dest_face_idx = 1
break
dest_face_idx = int(user_input)
break
except ValueError:
print("Invalid input. Please enter an integer value.")
print("Choose an option:")
print("1. Extract frames only")
print("2. Face swap only (requires extracted frames)")
print("3. Both extract frames and face swap")
choice = input("Enter 1, 2, or 3: ").strip()
frame_paths = []
if choice == "1" or choice == "3":
print("Extracting frames from video (resuming if needed)...")
frame_paths = extract_frames(video_path, frames_dir)
print(f"Extracted {len(frame_paths)} frames to {frames_dir}.")
if choice == "1":
return
if choice == "2":
# If only face swap, ensure frames are present
if not os.path.exists(frames_dir):
print("Frames directory does not exist. Please extract frames first.")
return
frame_paths = sorted([os.path.join(frames_dir, f) for f in os.listdir(frames_dir) if f.endswith('.jpg')])
if choice == "2" or choice == "3":
# Prepare output directory
if not os.path.exists(swapped_dir):
os.makedirs(swapped_dir)
# Initialize face swapper
swapper = FaceSwapper()
# Swap faces on each frame
print("Swapping faces on frames...")
start_time = time.time()
for idx, frame_path in enumerate(frame_paths):
frame_name = os.path.basename(frame_path)
# Replace 'frame_' with 'swapped_' in the filename
if frame_name.startswith("frame_"):
swapped_name = "swapped_" + frame_name[len("frame_"):]
else:
swapped_name = "swapped_" + frame_name
out_path = os.path.join(swapped_dir, swapped_name)
if os.path.exists(out_path):
# Skip already swapped frames
print(f"Frame {idx+1}/{len(frame_paths)} already swapped, skipping...", end='\r')
continue
try:
try:
swapped = swapper.swap_faces(
source_path=source_image_path,
source_face_idx=source_face_idx,
target_path=frame_path,
target_face_idx=dest_face_idx
)
except ValueError as ve:
if "Target image contains" in str(ve):
print(f"\nFrame {idx}: Target face idx {dest_face_idx} not found, trying with idx 1.",end='\r')
swapped = swapper.swap_faces(
source_path=source_image_path,
source_face_idx=source_face_idx,
target_path=frame_path,
target_face_idx=1
)
else:
raise ve
cv2.imwrite(out_path, swapped)
except Exception as e:
print(f"\nFrame {idx}: {e}")
# Optionally, copy the original frame if swap fails
cv2.imwrite(out_path, cv2.imread(frame_path))
# Estimate time left
elapsed = time.time() - start_time
avg_time = elapsed / (idx + 1)
remaining = avg_time * (len(frame_paths) - (idx + 1))
mins, secs = divmod(int(remaining), 60)
print(f"Swapping frame {idx+1}/{len(frame_paths)} | Est. time left: {mins:02d}:{secs:02d}", end='\r')
print() # Move to the next line after the loop
# Get FPS from original video
cap = cv2.VideoCapture(video_path)
fps = cap.get(cv2.CAP_PROP_FPS)
cap.release()
# Combine swapped frames into video
print("Combining swapped frames into video...")
frames_to_video(swapped_dir, output_video_path, fps)
print(f"Done! Output video saved as {output_video_path}")
# Ask user if they want to keep the extracted frames and swapped images
answer = input("Do you want to keep the extracted frames and swapped images? (y/n): ").strip().lower()
if answer == 'n':
try:
shutil.rmtree(frames_dir)
shutil.rmtree(swapped_dir)
print("Temporary folders deleted.")
except Exception as e:
print(f"Error deleting folders: {e}")
else:
print("Temporary folders kept.")
if __name__ == "__main__":
main() |