Spaces:
Runtime error
Runtime error
import cv2 | |
import numpy as np | |
import os | |
import shutil | |
import subprocess | |
import glob | |
from tqdm.auto import tqdm | |
import uuid | |
import re | |
from zipfile import ZipFile | |
gpu = False | |
os.makedirs("./results",exist_ok=True) | |
def apply_green_screen(image_path, save_path,foreground_segmenter): | |
""" | |
Replaces the background of the input image with green using a segmentation model. | |
Args: | |
image_path (str): Path to the input image. | |
segmenter (SoftForegroundSegmenter): Initialized segmentation model. | |
save_path (str, optional): If provided, saves the result to this path. | |
Returns: | |
np.ndarray: The green screen composited image. | |
""" | |
# Load image with alpha if available | |
image = cv2.imread(image_path, cv2.IMREAD_UNCHANGED) | |
if image is None: | |
raise FileNotFoundError(f"Image not found: {image_path}") | |
# Remove transparency if present | |
if image.shape[2] == 4: | |
image = cv2.cvtColor(image, cv2.COLOR_BGRA2BGR) | |
# Convert to RGB for the model | |
image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) | |
# Get segmentation mask | |
mask = foreground_segmenter.estimate_foreground_segmentation(image_rgb) | |
# Normalize and convert mask to 0-255 uint8 | |
if mask.max() <= 1.0: | |
mask = (mask * 255).astype(np.uint8) | |
else: | |
mask = mask.astype(np.uint8) | |
if mask.ndim == 2: | |
mask_gray = mask | |
elif mask.shape[2] == 1: | |
mask_gray = mask[:, :, 0] | |
else: | |
mask_gray = cv2.cvtColor(mask, cv2.COLOR_BGR2GRAY) | |
_, binary_mask = cv2.threshold(mask_gray, 128, 255, cv2.THRESH_BINARY) | |
# Create green background | |
green_bg = np.full_like(image_rgb, (0, 255, 0), dtype=np.uint8) | |
# Create 3-channel mask | |
mask_3ch = cv2.cvtColor(binary_mask, cv2.COLOR_GRAY2BGR) | |
# Composite: foreground from image, background as green | |
output_rgb = np.where(mask_3ch == 255, image_rgb, green_bg) | |
# Convert back to BGR for OpenCV | |
output_bgr = cv2.cvtColor(output_rgb, cv2.COLOR_RGB2BGR) | |
# Save if path is given | |
if save_path: | |
cv2.imwrite(save_path, output_bgr) | |
return output_bgr | |
def create_transparent_foreground(image_path,foreground_segmenter): | |
uid = uuid.uuid4().hex[:8].upper() | |
base_name = os.path.splitext(os.path.basename(image_path))[0] | |
base_name = re.sub(r'[^a-zA-Z\s]', '', base_name) | |
base_name = base_name.strip().replace(" ", "_").replace("__","_") | |
save_path = f"./results/{base_name}_{uid}.png" | |
save_path = os.path.abspath(save_path) | |
image = cv2.imread(image_path, cv2.IMREAD_UNCHANGED) | |
if image is None: | |
raise FileNotFoundError(f"Image not found: {image_path}") | |
if image.shape[2] == 4: | |
image = cv2.cvtColor(image, cv2.COLOR_BGRA2BGR) | |
image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) | |
mask = foreground_segmenter.estimate_foreground_segmentation(image_rgb) | |
if mask.max() <= 1.0: | |
mask = (mask * 255).astype(np.uint8) | |
else: | |
mask = mask.astype(np.uint8) | |
if mask.ndim == 3 and mask.shape[2] == 3: | |
mask = cv2.cvtColor(mask, cv2.COLOR_BGR2GRAY) | |
_, alpha = cv2.threshold(mask, 128, 255, cv2.THRESH_BINARY) | |
rgba_image = np.dstack((image_rgb, alpha)) | |
cv2.imwrite(save_path, cv2.cvtColor(rgba_image, cv2.COLOR_RGBA2BGRA)) | |
return image_rgb, rgba_image, save_path | |
def remove_background_batch_images(img_list, foreground_segmenter): | |
# Create unique temp directory | |
uid = uuid.uuid4().hex[:8].upper() | |
temp_dir = os.path.abspath(f"./results/bg_removed_{uid}") | |
os.makedirs(temp_dir, exist_ok=True) | |
# Process each image | |
for image_path in tqdm(img_list, desc="Removing Backgrounds"): | |
_, _, save_path = create_transparent_foreground(image_path, foreground_segmenter) | |
shutil.move(save_path, os.path.join(temp_dir, os.path.basename(save_path))) | |
# Create zip file | |
zip_path = f"{temp_dir}.zip" | |
with ZipFile(zip_path, 'w') as zipf: | |
for root, _, files in os.walk(temp_dir): | |
for file in files: | |
file_path = os.path.join(root, file) | |
arcname = os.path.relpath(file_path, start=temp_dir) | |
zipf.write(file_path, arcname=arcname) | |
# shutil.rmtree(temp_dir) | |
return os.path.abspath(zip_path) | |
def get_sorted_paths(directory, extension="png"): | |
""" | |
Returns full paths of all images with the given extension, sorted by filename (without extension). | |
""" | |
extension = extension.lstrip(".").lower() | |
pattern = os.path.join(directory, f"*.{extension}") | |
files = glob.glob(pattern) | |
files.sort(key=lambda x: int(os.path.splitext(os.path.basename(x))[0])) | |
return files | |
def extract_all_frames_ffmpeg_gpu(video_path, output_dir="frames", extension="png", use_gpu=True): | |
""" | |
Extracts all frames from a video using ffmpeg, with optional GPU acceleration. | |
Returns a sorted list of full paths to the extracted frames. | |
""" | |
if os.path.exists(output_dir): | |
shutil.rmtree(output_dir) | |
os.makedirs(output_dir, exist_ok=True) | |
extension = extension.lstrip(".") | |
output_pattern = os.path.join(output_dir, f"%05d.{extension}") | |
command = [ | |
"ffmpeg", "-i", video_path, output_pattern | |
] | |
if use_gpu: | |
command.insert(1, "cuda") | |
command.insert(1, "-hwaccel") | |
print("Running command:", " ".join(command)) | |
subprocess.run(command, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) | |
return get_sorted_paths(output_dir, extension) | |
def green_screen_batch(frames, foreground_segmenter,output_dir="green_screen_frames"): | |
""" | |
Applies green screen background to a batch of frames and saves the results. | |
Args: | |
frames (List[str]): List of image paths. | |
output_dir (str): Directory to save green-screened output. | |
""" | |
if os.path.exists(output_dir): | |
shutil.rmtree(output_dir) | |
os.makedirs(output_dir, exist_ok=True) | |
green_screen_frames=[] | |
for frame in tqdm(frames, desc="Processing green screen frames"): | |
save_image_path=os.path.join(output_dir, os.path.basename(frame)) | |
result = apply_green_screen( | |
frame, | |
save_image_path, | |
foreground_segmenter | |
) | |
green_screen_frames.append(save_image_path) | |
return green_screen_frames | |
def green_screen_video_maker(original_video, green_screen_frames, batch_size=100): | |
""" | |
Creates video chunks from green screen frames based on original video's properties. | |
Args: | |
original_video (str): Path to the original video file (to read FPS, size). | |
green_screen_frames (List[str]): List of green screen frame paths. | |
batch_size (int): Number of frames per chunked video. | |
""" | |
temp_folder = "temp_video" | |
if os.path.exists(temp_folder): | |
shutil.rmtree(temp_folder) | |
os.makedirs(temp_folder, exist_ok=True) | |
# Get video info from original video | |
cap = cv2.VideoCapture(original_video) | |
fps = cap.get(cv2.CAP_PROP_FPS) | |
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
cap.release() | |
total_frames = len(green_screen_frames) | |
num_chunks = (total_frames + batch_size - 1) // batch_size # Ceiling division | |
for chunk_idx in tqdm(range(num_chunks), desc="Processing video chunks"): | |
chunk_path = os.path.join(temp_folder, f"{chunk_idx+1}.mp4") | |
out = cv2.VideoWriter(chunk_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (width, height)) | |
start_idx = chunk_idx * batch_size | |
end_idx = min(start_idx + batch_size, total_frames) | |
for frame_path in green_screen_frames[start_idx:end_idx]: | |
frame = cv2.imread(frame_path) | |
frame = cv2.resize(frame, (width, height)) # Ensure matching resolution | |
out.write(frame) | |
out.release() | |
def merge_video_chunks(output_path="final_video.mp4", temp_folder="temp_video", use_gpu=True): | |
""" | |
Merges all video chunks from temp_folder into a final single video. | |
""" | |
os.makedirs("./results", exist_ok=True) | |
output_path = f"../results/{output_path}" # relative to temp_folder | |
file_list_path = os.path.join(temp_folder, "chunks.txt") | |
chunk_files=sorted( | |
[f for f in os.listdir(temp_folder) if f.lower().endswith("mp4")], | |
key=lambda x: int(os.path.splitext(x)[0]) | |
) | |
with open(file_list_path, "w") as f: | |
for chunk in chunk_files: | |
f.write(f"file '{chunk}'\n") # β No './' prefix | |
ffmpeg_cmd = ["ffmpeg", "-y", "-f", "concat", "-safe", "0", "-i", "chunks.txt"] | |
if use_gpu: | |
ffmpeg_cmd += ["-c:v", "h264_nvenc", "-preset", "fast"] | |
else: | |
ffmpeg_cmd += ["-c", "copy"] | |
ffmpeg_cmd.append(output_path) | |
# β Run from inside temp_folder, so chunks.txt and mp4 files are local | |
subprocess.run(ffmpeg_cmd, cwd=temp_folder, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) | |
def extract_audio_from_video(video_path, output_audio_path="output_audio.wav", format="wav", sample_rate=16000, channels=1): | |
""" | |
Extracts audio from a video file using ffmpeg. | |
Args: | |
video_path (str): Path to the input video file. | |
output_audio_path (str): Path to save the extracted audio (e.g., .wav or .mp3). | |
format (str): 'wav' or 'mp3' | |
sample_rate (int): Sampling rate in Hz (e.g., 16000 for ASR models) | |
channels (int): Number of audio channels (1=mono, 2=stereo) | |
""" | |
# Ensure the output directory exists | |
os.makedirs(os.path.dirname(output_audio_path) or ".", exist_ok=True) | |
# Build ffmpeg command | |
if format.lower() == "wav": | |
command = [ | |
"ffmpeg", "-y", # Overwrite output | |
"-i", video_path, # Input video | |
"-vn", # Disable video | |
"-ac", str(channels), # Audio channels (1 = mono) | |
"-ar", str(sample_rate), # Audio sample rate | |
"-acodec", "pcm_s16le", # WAV codec | |
output_audio_path | |
] | |
elif format.lower() == "mp3": | |
command = [ | |
"ffmpeg", "-y", | |
"-i", video_path, | |
"-vn", | |
"-ac", str(channels), | |
"-ar", str(sample_rate), | |
"-acodec", "libmp3lame", # MP3 codec | |
output_audio_path | |
] | |
else: | |
raise ValueError("Unsupported format. Use 'wav' or 'mp3'.") | |
# Run command silently | |
subprocess.run(command, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) | |
def add_audio(video_path, audio_path, output_path, use_gpu=False): | |
""" | |
Replaces the audio of a video with a new audio track. | |
Args: | |
video_path (str): Path to the video file. | |
audio_path (str): Path to the audio file. | |
output_path (str): Path where the final video will be saved. | |
use_gpu (bool): If True, use GPU-accelerated video encoding. | |
""" | |
os.makedirs(os.path.dirname(output_path), exist_ok=True) | |
command = [ | |
"ffmpeg", "-y", # Overwrite without asking | |
"-i", video_path, # Input video | |
"-i", audio_path, # Input audio | |
"-map", "0:v:0", # Use video from first input | |
"-map", "1:a:0", # Use audio from second input | |
"-shortest" # Trim to the shortest stream (audio/video) | |
] | |
if use_gpu: | |
command += ["-c:v", "h264_nvenc", "-preset", "fast"] | |
else: | |
command += ["-c:v", "copy"] | |
command += ["-c:a", "aac", "-b:a", "192k", output_path] | |
subprocess.run(command, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) | |
def remove_background_from_video(uploaded_video_path,foreground_segmenter): | |
# π Generate a single UUID to use for all related files | |
uid = uuid.uuid4().hex[:8].upper() | |
# Define all output paths using that UUID | |
base_name = os.path.splitext(os.path.basename(uploaded_video_path))[0] | |
base_name = re.sub(r'[^a-zA-Z\s]', '', base_name) | |
base_name = base_name.strip().replace(" ", "_") | |
temp_video_path = f"./results/{base_name}_chunks_{uid}.mp4" | |
audio_path = f"./results/{base_name}_audio_{uid}.wav" | |
final_output_path = f"./results/{base_name}_final_{uid}.mp4" | |
# Step 1: Extract frames | |
frames = extract_all_frames_ffmpeg_gpu( | |
video_path=uploaded_video_path, | |
output_dir="frames", | |
extension="png", | |
use_gpu=gpu | |
) | |
# Step 2: Remove background (green screen) | |
green_screen_frames = green_screen_batch(frames,foreground_segmenter) | |
# Step 3: Rebuild video from frames | |
green_screen_video_maker(uploaded_video_path, green_screen_frames, batch_size=100) | |
# Step 4: Merge video chunks | |
merge_video_chunks(output_path=os.path.basename(temp_video_path), use_gpu=gpu) | |
# Step 5: Extract original audio | |
extract_audio_from_video(uploaded_video_path, output_audio_path=audio_path) | |
# Step 6: Add audio back | |
add_audio( | |
video_path=temp_video_path, | |
audio_path=audio_path, | |
output_path=final_output_path, | |
use_gpu=True | |
) | |
return os.path.abspath(final_output_path) | |