Spaces:
Paused
Paused
import os | |
import gc | |
import sys | |
import re | |
import numpy as np | |
import torch | |
import imageio | |
import gradio as gr | |
import subprocess | |
import devicetorch | |
import json | |
import math | |
import shutil | |
import traceback | |
from datetime import datetime | |
from pathlib import Path | |
from huggingface_hub import snapshot_download | |
from tqdm.auto import tqdm | |
from torchvision.transforms.functional import to_tensor, to_pil_image | |
from modules.toolbox.rife_core import RIFEHandler | |
from modules.toolbox.esrgan_core import ESRGANUpscaler | |
from modules.toolbox.message_manager import MessageManager | |
device_name_str = devicetorch.get(torch) | |
VIDEO_QUALITY = 8 # Used by imageio.mimwrite quality/quantizer | |
class VideoProcessor: | |
def __init__(self, message_manager: MessageManager, settings): | |
self.message_manager = message_manager | |
self.rife_handler = RIFEHandler(message_manager) | |
self.device_obj = torch.device(device_name_str) # Store device_obj | |
self.esrgan_upscaler = ESRGANUpscaler(message_manager, self.device_obj) | |
self.settings = settings | |
self.project_root = Path(__file__).resolve().parents[2] | |
# FFmpeg/FFprobe paths and status flags | |
self.ffmpeg_exe = None | |
self.ffprobe_exe = None | |
self.has_ffmpeg = False | |
self.has_ffprobe = False | |
self.ffmpeg_source = None | |
self.ffprobe_source = None | |
self._tb_initialize_ffmpeg() # Finds executables and sets flags | |
studio_output_dir = Path(self.settings.get("output_dir")) | |
self.postprocessed_output_root_dir = studio_output_dir / "postprocessed_output" | |
self._base_temp_output_dir = self.postprocessed_output_root_dir / "temp_processing" | |
self._base_permanent_save_dir = self.postprocessed_output_root_dir / "saved_videos" | |
self.toolbox_video_output_dir = self._base_temp_output_dir | |
self.toolbox_permanent_save_dir = self._base_permanent_save_dir | |
os.makedirs(self.postprocessed_output_root_dir, exist_ok=True) | |
os.makedirs(self._base_temp_output_dir, exist_ok=True) | |
os.makedirs(self._base_permanent_save_dir, exist_ok=True) | |
# Note: Renamed to a more generic name as it holds more than just extracted frames now | |
self.frames_io_dir = self.postprocessed_output_root_dir / "frames" | |
self.extracted_frames_target_path = self.frames_io_dir / "extracted_frames" | |
os.makedirs(self.extracted_frames_target_path, exist_ok=True) | |
self.reassembled_video_target_path = self.frames_io_dir / "reassembled_videos" | |
os.makedirs(self.reassembled_video_target_path, exist_ok=True) | |
# --- NEW BATCH PROCESSING FUNCTION --- | |
def tb_process_video_batch(self, video_paths: list, pipeline_config: dict, progress=gr.Progress()): | |
""" | |
Processes a batch of videos according to a defined pipeline of operations. | |
- Batch jobs are ALWAYS saved to a new, unique, timestamped folder in 'saved_videos'. | |
- Single video pipeline jobs respect the 'Autosave' setting for the FINAL output only. | |
- Intermediate files are always created in and cleaned from the temp directory. | |
- The very last successfully processed video (from single or batch) is kept for the UI. | |
""" | |
original_autosave_state = self.settings.get("toolbox_autosave_enabled", True) | |
is_batch_job = len(video_paths) > 1 | |
batch_output_dir = None | |
last_successful_video_path_for_ui = None | |
try: | |
if is_batch_job: | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
batch_output_dir = self._base_permanent_save_dir / f"batch_process_{timestamp}" | |
os.makedirs(batch_output_dir, exist_ok=True) | |
self.message_manager.add_message(f"Batch outputs will be saved to: {batch_output_dir}", "SUCCESS") | |
self.set_autosave_mode(False, silent=True) | |
operations = pipeline_config.get("operations", []) | |
if not operations: | |
self.message_manager.add_warning("No operations were selected for the pipeline. Nothing to do.") | |
return None | |
op_names = [op['name'].replace('_', ' ').title() for op in operations] | |
self.message_manager.add_message(f"🚀 Starting pipeline for {len(video_paths)} videos. Pipeline: {' -> '.join(op_names)}") | |
total_videos = len(video_paths) | |
for i, original_video_path in enumerate(video_paths): | |
progress(i / total_videos, desc=f"Video {i+1}/{total_videos}: {os.path.basename(original_video_path)}") | |
self.message_manager.add_message(f"\n--- Processing Video {i+1}/{total_videos}: {os.path.basename(original_video_path)} ---", "INFO") | |
current_video_path = original_video_path | |
video_failed = False | |
path_to_clean = None | |
for op_config in operations: | |
op_name = op_config["name"] | |
op_params = op_config["params"] | |
self.message_manager.add_message(f" -> Step: Applying {op_name.replace('_', ' ')}...") | |
output_path = None | |
try: | |
if op_name == "upscale": output_path = self.tb_upscale_video(current_video_path, **op_params, progress=progress) | |
elif op_name == "frame_adjust": output_path = self.tb_process_frames(current_video_path, **op_params, progress=progress) | |
elif op_name == "filters": output_path = self.tb_apply_filters(current_video_path, **op_params, progress=progress) | |
elif op_name == "loop": output_path = self.tb_create_loop(current_video_path, **op_params, progress=progress) | |
elif op_name == "export": output_path = self.tb_export_video(current_video_path, **op_params, progress=progress) | |
if output_path and os.path.exists(output_path): | |
self.message_manager.add_success(f" -> Step '{op_name}' completed. Output: {os.path.basename(output_path)}") | |
if path_to_clean and os.path.exists(path_to_clean): | |
try: | |
os.remove(path_to_clean) | |
self.message_manager.add_message(f" -> Cleaned intermediate file: {os.path.basename(path_to_clean)}", "DEBUG") | |
except OSError as e: | |
self.message_manager.add_warning(f"Could not clean intermediate file {path_to_clean}: {e}") | |
current_video_path = output_path | |
path_to_clean = output_path | |
else: | |
video_failed = True; break | |
except Exception as e: | |
video_failed = True | |
self.message_manager.add_error(f"An unexpected error occurred during step '{op_name}': {e}") | |
self.message_manager.add_error(traceback.format_exc()) | |
break | |
if not video_failed: | |
final_temp_path = current_video_path | |
is_last_video_in_batch = (i == total_videos - 1) | |
if is_batch_job: | |
# For batch jobs, copy the final output to the permanent batch folder. | |
final_dest_path = batch_output_dir / os.path.basename(final_temp_path) | |
shutil.copy2(final_temp_path, final_dest_path) # Use copy2 to keep temp file for UI | |
self.message_manager.add_success(f"--- Successfully processed. Final output saved to: {final_dest_path} ---") | |
if is_last_video_in_batch: | |
# This is the very last video of the whole batch, keep its temp path for the UI player. | |
last_successful_video_path_for_ui = final_temp_path | |
else: | |
# This is a completed video but not the last one in the batch, so we can clean its temp file. | |
try: os.remove(final_temp_path) | |
except OSError: pass | |
else: # Single video pipeline run. | |
if original_autosave_state: | |
final_dest_path = self._base_permanent_save_dir / os.path.basename(final_temp_path) | |
shutil.move(final_temp_path, final_dest_path) # Move, as it's saved permanently | |
self.message_manager.add_success(f"--- Successfully processed. Final output saved to: {final_dest_path} ---") | |
last_successful_video_path_for_ui = final_dest_path | |
else: | |
# Autosave off, so the final file remains in the temp folder for the UI. | |
self.message_manager.add_success(f"--- Successfully processed. Final output is in temp folder: {final_temp_path} ---") | |
last_successful_video_path_for_ui = final_temp_path | |
else: | |
self.message_manager.add_warning(f"--- Processing failed for {os.path.basename(original_video_path)} ---") | |
if path_to_clean and os.path.exists(path_to_clean): | |
try: os.remove(path_to_clean) | |
except OSError as e: self.message_manager.add_warning(f"Could not clean failed intermediate file {path_to_clean}: {e}") | |
gc.collect() | |
devicetorch.empty_cache(torch) | |
progress(1.0, desc="Pipeline complete.") | |
self.message_manager.add_message("\n✅ Pipeline processing finished.", "SUCCESS") | |
return last_successful_video_path_for_ui | |
finally: | |
# Restore the user's original autosave setting silently. | |
self.set_autosave_mode(original_autosave_state, silent=True) | |
def _tb_initialize_ffmpeg(self): | |
"""Finds FFmpeg/FFprobe and sets status flags and sources.""" | |
( | |
self.ffmpeg_exe, | |
self.ffmpeg_source, | |
self.ffprobe_exe, | |
self.ffprobe_source, | |
) = self._tb_find_ffmpeg_executables() | |
self.has_ffmpeg = bool(self.ffmpeg_exe) | |
self.has_ffprobe = bool(self.ffprobe_exe) | |
self._report_ffmpeg_status() | |
def _tb_find_ffmpeg_executables(self): | |
""" | |
Finds ffmpeg and ffprobe with a priority system. | |
Priority: 1. Bundled -> 2. System PATH -> 3. imageio-ffmpeg | |
Returns (ffmpeg_path, ffmpeg_source, ffprobe_path, ffprobe_source) | |
""" | |
ffmpeg_path, ffprobe_path = None, None | |
ffmpeg_source, ffprobe_source = None, None | |
ffmpeg_name = "ffmpeg.exe" if sys.platform == "win32" else "ffmpeg" | |
ffprobe_name = "ffprobe.exe" if sys.platform == "win32" else "ffprobe" | |
try: | |
script_dir = os.path.dirname(os.path.abspath(__file__)) | |
bin_dir = os.path.join(script_dir, 'bin') | |
bundled_ffmpeg = os.path.join(bin_dir, ffmpeg_name) | |
bundled_ffprobe = os.path.join(bin_dir, ffprobe_name) | |
if os.path.exists(bundled_ffmpeg): | |
ffmpeg_path = bundled_ffmpeg | |
ffmpeg_source = "Bundled" | |
if os.path.exists(bundled_ffprobe): | |
ffprobe_path = bundled_ffprobe | |
ffprobe_source = "Bundled" | |
except Exception: | |
pass | |
if not ffmpeg_path: | |
path_from_env = shutil.which(ffmpeg_name) | |
if path_from_env: | |
ffmpeg_path = path_from_env | |
ffmpeg_source = "System PATH" | |
if not ffprobe_path: | |
path_from_env = shutil.which(ffprobe_name) | |
if path_from_env: | |
ffprobe_path = path_from_env | |
ffprobe_source = "System PATH" | |
if not ffmpeg_path: | |
try: | |
imageio_ffmpeg_exe = imageio.plugins.ffmpeg.get_exe() | |
if os.path.isfile(imageio_ffmpeg_exe): | |
ffmpeg_path = imageio_ffmpeg_exe | |
ffmpeg_source = "imageio-ffmpeg" | |
except Exception: | |
pass | |
return ffmpeg_path, ffmpeg_source, ffprobe_path, ffprobe_source | |
def _report_ffmpeg_status(self): | |
"""Provides a summary of FFmpeg/FFprobe status based on what was found.""" | |
if self.ffmpeg_source == "Bundled" and self.ffprobe_source == "Bundled": | |
self.message_manager.add_message(f"Bundled FFmpeg found: {self.ffmpeg_exe}", "SUCCESS") | |
self.message_manager.add_message(f"Bundled FFprobe found: {self.ffprobe_exe}", "SUCCESS") | |
self.message_manager.add_message("All video and audio features are enabled.", "SUCCESS") | |
return | |
if self.has_ffmpeg: | |
self.message_manager.add_message(f"FFmpeg found via {self.ffmpeg_source}: {self.ffmpeg_exe}", "SUCCESS") | |
else: | |
self.message_manager.add_error("Critical: FFmpeg executable could not be found. Most video processing operations will fail. Please try running the setup script.") | |
if self.has_ffprobe: | |
self.message_manager.add_message(f"FFprobe found via {self.ffprobe_source}: {self.ffprobe_exe}", "SUCCESS") | |
else: | |
self.message_manager.add_warning("FFprobe not found. Audio detection and full video analysis will be limited.") | |
if self.ffmpeg_source != "Bundled": | |
self.message_manager.add_warning("For full functionality, please run the 'setup_ffmpeg.py' script.") | |
def tb_get_frames_from_folder(self, folder_name: str) -> list: | |
""" | |
Gets a sorted list of image file paths from a given folder name. | |
This is the backend for the "Load Frames to Studio" button. | |
""" | |
if not folder_name: | |
return [] | |
full_folder_path = os.path.join(self.extracted_frames_target_path, folder_name) | |
if not os.path.isdir(full_folder_path): | |
self.message_manager.add_error(f"Cannot load frames: Directory not found at {full_folder_path}") | |
return [] | |
frame_files = [] | |
try: | |
for filename in os.listdir(full_folder_path): | |
if filename.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp', '.webp')): | |
frame_files.append(os.path.join(full_folder_path, filename)) | |
# Natural sort to handle frame_0, frame_1, ... frame_10 correctly | |
def natural_sort_key(s): | |
return [int(text) if text.isdigit() else text.lower() for text in re.split('([0-9]+)', s)] | |
frame_files.sort(key=natural_sort_key) | |
return frame_files | |
except Exception as e: | |
self.message_manager.add_error(f"Error reading frames from '{folder_name}': {e}") | |
return [] | |
def tb_delete_single_frame(self, frame_path_to_delete: str) -> str: | |
"""Deletes a single frame file from the disk, logs the action, and returns a status message.""" | |
if not frame_path_to_delete or not isinstance(frame_path_to_delete, str): | |
# This message is returned to the app's info box | |
msg_for_infobox = "Error: Invalid frame path provided for deletion." | |
# The message manager gets a more detailed log entry | |
self.message_manager.add_error("Could not delete frame: Invalid path provided to processor.") | |
return msg_for_infobox | |
try: | |
filename = os.path.basename(frame_path_to_delete) | |
if os.path.isfile(frame_path_to_delete): | |
os.remove(frame_path_to_delete) | |
# Add a success message to the main log | |
self.message_manager.add_success(f"Deleted frame: {filename}") | |
# Return a concise status for the info box | |
return f"✅ Deleted: {filename}" | |
else: | |
self.message_manager.add_error(f"Could not delete frame. File not found: {frame_path_to_delete}") | |
return f"Error: Frame not found" | |
except OSError as e: | |
self.message_manager.add_error(f"Error deleting frame {filename}: {e}") | |
return f"Error deleting frame: {e}" | |
def tb_save_single_frame(self, source_frame_path: str) -> str | None: | |
"""Saves a copy of a single frame to the permanent 'saved_videos' directory.""" | |
if not source_frame_path or not os.path.isfile(source_frame_path): | |
self.message_manager.add_error("Source frame to save does not exist or is invalid.") | |
return None | |
try: | |
source_path_obj = Path(source_frame_path) | |
parent_folder_name = source_path_obj.parent.name | |
frame_filename = source_path_obj.name | |
# Create a descriptive filename to avoid collisions | |
timestamp = datetime.now().strftime("%y%m%d_%H%M%S") | |
dest_filename = f"saved_frame_{parent_folder_name}_{timestamp}_{frame_filename}" | |
destination_path = os.path.join(self.toolbox_permanent_save_dir, dest_filename) | |
os.makedirs(self.toolbox_permanent_save_dir, exist_ok=True) | |
shutil.copy2(source_frame_path, destination_path) | |
self.message_manager.add_success(f"Saved frame to permanent storage: {destination_path}") | |
return destination_path | |
except Exception as e: | |
self.message_manager.add_error(f"Error saving frame to permanent storage: {e}") | |
self.message_manager.add_error(traceback.format_exc()) | |
return None | |
def set_autosave_mode(self, autosave_enabled: bool, silent: bool = False): | |
if autosave_enabled: | |
self.toolbox_video_output_dir = self._base_permanent_save_dir | |
if not silent: | |
self.message_manager.add_message("Autosave ENABLED: Processed videos will be saved to the permanent folder.", "SUCCESS") | |
else: | |
self.toolbox_video_output_dir = self._base_temp_output_dir | |
if not silent: | |
self.message_manager.add_message("Autosave DISABLED: Processed videos will be saved to the temporary folder.", "INFO") | |
def _tb_log_ffmpeg_error(self, e_ffmpeg: subprocess.CalledProcessError, operation_description: str): | |
self.message_manager.add_error(f"FFmpeg failed during {operation_description}.") | |
ffmpeg_stderr_str = e_ffmpeg.stderr.strip() if e_ffmpeg.stderr else "" | |
ffmpeg_stdout_str = e_ffmpeg.stdout.strip() if e_ffmpeg.stdout else "" | |
details_log = [] | |
if ffmpeg_stderr_str: details_log.append(f"FFmpeg Stderr: {ffmpeg_stderr_str}") | |
if ffmpeg_stdout_str: details_log.append(f"FFmpeg Stdout: {ffmpeg_stdout_str}") | |
if details_log: | |
self.message_manager.add_message("FFmpeg Output:\n" + "\n".join(details_log), "INFO") | |
else: | |
self.message_manager.add_message(f"No specific output from FFmpeg. (Return code: {e_ffmpeg.returncode}, Command: '{e_ffmpeg.cmd}')", "INFO") | |
def _tb_get_video_frame_count(self, video_path: str) -> int | None: | |
""" | |
Uses ffprobe to get an accurate frame count by requesting JSON output for robust parsing. | |
Tries a fast metadata read first, then falls back to a slower but more accurate full stream count. | |
""" | |
if not self.has_ffprobe: | |
self.message_manager.add_message("Cannot get frame count: ffprobe not found.", "DEBUG") | |
return None | |
# --- Tier 1: Fast metadata read using JSON output --- | |
try: | |
ffprobe_cmd_fast = [ | |
self.ffprobe_exe, | |
"-v", "error", | |
"-probesize", "5M", # Input option | |
"-i", video_path, # The input file | |
"-select_streams", "v:0", | |
"-show_entries", "stream=nb_frames", | |
"-of", "json" | |
] | |
result = subprocess.run(ffprobe_cmd_fast, capture_output=True, text=True, check=True, errors='ignore') | |
data = json.loads(result.stdout) | |
frame_count_str = data.get("streams", [{}])[0].get("nb_frames", "N/A") | |
if frame_count_str.isdigit() and int(frame_count_str) > 0: | |
self.message_manager.add_message(f"Frame count from metadata: {frame_count_str}", "DEBUG") | |
return int(frame_count_str) | |
else: | |
self.message_manager.add_warning(f"Fast metadata frame count was invalid ('{frame_count_str}'). Falling back to full count.") | |
except Exception as e: | |
self.message_manager.add_warning(f"Fast metadata read failed: {e}. Falling back to full count.") | |
# --- Tier 2: Slow, accurate full-stream count using JSON output --- | |
try: | |
self.message_manager.add_message("Performing full, accurate frame count with ffprobe (this may take a moment)...", "INFO") | |
ffprobe_cmd_accurate = [ | |
self.ffprobe_exe, | |
"-v", "error", | |
"-probesize", "5M", # Input option | |
"-i", video_path, # The input file | |
"-count_frames", | |
"-select_streams", "v:0", | |
"-show_entries", "stream=nb_read_frames", | |
"-of", "json" | |
] | |
result = subprocess.run(ffprobe_cmd_accurate, capture_output=True, text=True, check=True, errors='ignore') | |
data = json.loads(result.stdout) | |
frame_count_str = data.get("streams", [{}])[0].get("nb_read_frames", "N/A") | |
if frame_count_str.isdigit() and int(frame_count_str) > 0: | |
self.message_manager.add_message(f"Accurate frame count from full scan: {frame_count_str}", "DEBUG") | |
return int(frame_count_str) | |
else: | |
self.message_manager.add_error(f"Full ffprobe scan returned invalid frame count: '{frame_count_str}'.") | |
return None | |
except Exception as e: | |
self.message_manager.add_error(f"Critical error during full ffprobe frame count: {e}") | |
self.message_manager.add_error(traceback.format_exc()) | |
return None | |
def tb_extract_frames(self, video_path, extraction_rate, progress=gr.Progress()): | |
if video_path is None: | |
self.message_manager.add_warning("No input video for frame extraction.") | |
return None | |
if not isinstance(extraction_rate, int) or extraction_rate < 1: | |
self.message_manager.add_error("Extraction rate must be a positive integer (1 for all frames, N for every Nth frame).") | |
return None | |
resolved_video_path = str(Path(video_path).resolve()) | |
output_folder_name = self._tb_generate_output_folder_path( | |
resolved_video_path, | |
suffix=f"extracted_every_{extraction_rate}") | |
os.makedirs(output_folder_name, exist_ok=True) | |
self.message_manager.add_message( | |
f"Starting frame extraction for {os.path.basename(resolved_video_path)} (every {extraction_rate} frame(s))." | |
) | |
self.message_manager.add_message(f"Outputting to: {output_folder_name}") | |
reader = None | |
try: | |
total_frames = self._tb_get_video_frame_count(resolved_video_path) | |
# If we know the total frames, we can provide accurate progress. | |
if total_frames: | |
progress(0, desc=f"Extracting 0 / {total_frames} frames...") | |
else: | |
self.message_manager.add_warning("Could not determine total frames. Progress will be indeterminate.") | |
progress(0, desc="Extracting frames (total unknown)...") | |
reader = imageio.get_reader(resolved_video_path) | |
extracted_count = 0 | |
# --- MANUAL PROGRESS LOOP --- | |
for i, frame in enumerate(reader): | |
# Update progress manually every few frames to avoid overwhelming the UI | |
if total_frames and i % 10 == 0: | |
progress(i / total_frames, desc=f"Extracting {i} / {total_frames} frames...") | |
if i % extraction_rate == 0: | |
frame_filename = f"frame_{extracted_count:06d}.png" | |
output_frame_path = os.path.join(output_folder_name, frame_filename) | |
imageio.imwrite(output_frame_path, frame, format='PNG') | |
extracted_count += 1 | |
# --- FINAL UPDATE --- | |
progress(1.0, desc="Extraction complete.") | |
self.message_manager.add_success(f"Successfully extracted {extracted_count} frames to: {output_folder_name}") | |
return output_folder_name | |
except Exception as e: | |
self.message_manager.add_error(f"Error during frame extraction: {e}") | |
self.message_manager.add_error(traceback.format_exc()) | |
progress(1.0, desc="Error during extraction.") | |
return None | |
finally: | |
if reader: | |
reader.close() | |
gc.collect() | |
def tb_get_extracted_frame_folders(self) -> list: | |
if not os.path.exists(self.extracted_frames_target_path): | |
self.message_manager.add_warning(f"Extracted frames directory not found: {self.extracted_frames_target_path}") | |
return [] | |
try: | |
folders = [ | |
d for d in os.listdir(self.extracted_frames_target_path) | |
if os.path.isdir(os.path.join(self.extracted_frames_target_path, d)) | |
] | |
folders.sort() | |
return folders | |
except Exception as e: | |
self.message_manager.add_error(f"Error scanning for extracted frame folders: {e}") | |
return [] | |
def tb_delete_extracted_frames_folder(self, folder_name_to_delete: str) -> bool: | |
if not folder_name_to_delete: | |
self.message_manager.add_warning("No folder selected for deletion.") | |
return False | |
folder_path_to_delete = os.path.join(self.extracted_frames_target_path, folder_name_to_delete) | |
if not os.path.exists(folder_path_to_delete) or not os.path.isdir(folder_path_to_delete): | |
self.message_manager.add_error(f"Folder not found or is not a directory: {folder_path_to_delete}") | |
return False | |
try: | |
shutil.rmtree(folder_path_to_delete) | |
self.message_manager.add_success(f"Successfully deleted folder: {folder_name_to_delete}") | |
return True | |
except Exception as e: | |
self.message_manager.add_error(f"Error deleting folder '{folder_name_to_delete}': {e}") | |
self.message_manager.add_error(traceback.format_exc()) | |
return False | |
def tb_reassemble_frames_to_video(self, frames_source, output_fps, output_base_name_override=None, progress=gr.Progress()): | |
if not frames_source: | |
self.message_manager.add_warning("No frames source (folder or files) provided for reassembly.") | |
return None | |
try: | |
output_fps = int(output_fps) | |
if output_fps <= 0: | |
self.message_manager.add_error("Output FPS must be a positive number.") | |
return None | |
except ValueError: | |
self.message_manager.add_error("Invalid FPS value for reassembly.") | |
return None | |
self.message_manager.add_message(f"Starting frame reassembly to video at {output_fps} FPS.") | |
frame_info_list = [] | |
frames_data_prepared = False | |
try: | |
# This logic now primarily handles a directory path string | |
if isinstance(frames_source, str) and os.path.isdir(frames_source): | |
self.message_manager.add_message(f"Processing frames from directory: {frames_source}") | |
# Use our existing function to get a sorted list of frame paths | |
sorted_frame_paths = self.tb_get_frames_from_folder(os.path.basename(frames_source)) | |
for full_path in sorted_frame_paths: | |
frame_info_list.append({ | |
'original_like_filename': os.path.basename(full_path), | |
'temp_path': full_path | |
}) | |
else: | |
self.message_manager.add_error("Invalid frames_source type for reassembly. Expected a directory path.") | |
return None | |
if not frame_info_list: | |
self.message_manager.add_warning("No valid image files found in the provided source to reassemble.") | |
return None | |
self.message_manager.add_message(f"Found {len(frame_info_list)} frames for reassembly.") | |
output_file_basename = "reassembled_video" | |
if output_base_name_override and isinstance(output_base_name_override, str) and output_base_name_override.strip(): | |
sanitized_name = "".join(c if c.isalnum() or c in (' ', '_', '-') else '_' for c in output_base_name_override.strip()) | |
output_file_basename = Path(sanitized_name).stem | |
if not output_file_basename: output_file_basename = "reassembled_video" | |
self.message_manager.add_message(f"Using custom output video base name: {output_file_basename}") | |
output_video_path = self._tb_generate_output_path( | |
input_material_name=output_file_basename, | |
suffix=f"{output_fps}fps_reassembled", | |
target_dir=self.reassembled_video_target_path, | |
ext=".mp4" | |
) | |
frames_data = [] | |
frames_data_prepared = True | |
self.message_manager.add_message("Reading frame images (in sorted order)...") | |
frame_iterator = frame_info_list | |
if frame_info_list and progress is not None and hasattr(progress, 'tqdm'): | |
frame_iterator = progress.tqdm(frame_info_list, desc="Reading frames") | |
for frame_info in frame_iterator: | |
frame_actual_path = frame_info['temp_path'] | |
filename_for_log = frame_info['original_like_filename'] | |
try: | |
if not filename_for_log.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp', '.webp')): | |
self.message_manager.add_warning(f"Skipping non-standard image file: {filename_for_log}.") | |
continue | |
frames_data.append(imageio.imread(frame_actual_path)) | |
except Exception as e_read_frame: | |
self.message_manager.add_warning(f"Could not read frame ({filename_for_log}): {e_read_frame}. Skipping.") | |
if not frames_data: | |
self.message_manager.add_error("No valid frames could be successfully read for reassembly.") | |
return None | |
self.message_manager.add_message(f"Writing {len(frames_data)} frames to video: {output_video_path}") | |
imageio.mimwrite(output_video_path, frames_data, fps=output_fps, quality=VIDEO_QUALITY, macro_block_size=None) | |
self.message_manager.add_success(f"Successfully reassembled {len(frames_data)} frames into: {output_video_path}") | |
return output_video_path | |
except Exception as e: | |
self.message_manager.add_error(f"Error during frame reassembly: {e}") | |
self.message_manager.add_error(traceback.format_exc()) | |
if "Could not find a backend" in str(e) or "No such file or directory: 'ffmpeg'" in str(e).lower(): | |
self.message_manager.add_error("This might indicate an issue with FFmpeg backend for imageio. Ensure 'imageio-ffmpeg' is installed or FFmpeg is in PATH.") | |
return None | |
finally: | |
if frames_data_prepared and 'frames_data' in locals(): | |
del frames_data | |
gc.collect() | |
def _tb_get_video_duration(self, video_path: str) -> str | None: | |
"""Uses ffprobe to get the duration of a video file as a string.""" | |
if not self.has_ffprobe: | |
return None | |
try: | |
ffprobe_cmd = [ | |
self.ffprobe_exe, "-v", "error", "-show_entries", | |
"format=duration", "-of", "default=noprint_wrappers=1:nokey=1", video_path | |
] | |
result = subprocess.run(ffprobe_cmd, capture_output=True, text=True, check=True, errors='ignore') | |
return result.stdout.strip() | |
except Exception: | |
return None | |
def tb_join_videos(self, video_paths: list, output_base_name_override=None, progress=gr.Progress()): | |
if not video_paths or len(video_paths) < 2: | |
self.message_manager.add_warning("Please select at least two videos to join.") | |
return None | |
if not self.has_ffmpeg: | |
self.message_manager.add_error("FFmpeg is required for joining videos. This operation cannot proceed.") | |
return None | |
self.message_manager.add_message(f"🚀 Starting video join process for {len(video_paths)} videos...") | |
progress(0.1, desc="Analyzing input videos...") | |
# --- 1. STANDARDIZE DIMENSIONS --- | |
# Get dimensions of the first video to use as the standard for all others. | |
first_video_dims = self._tb_get_video_dimensions(video_paths[0]) | |
if not all(first_video_dims): | |
self.message_manager.add_error("Could not determine dimensions of the first video. Cannot proceed.") | |
return None | |
target_w, target_h = first_video_dims | |
self.message_manager.add_message(f"Standardizing all videos to {target_w}x{target_h} for joining.") | |
# --- 2. BUILD THE FFMPEG COMMAND --- | |
ffmpeg_cmd = [self.ffmpeg_exe, "-y", "-loglevel", "error"] | |
filter_complex_parts = [] | |
video_stream_labels = [] | |
audio_stream_labels = [] | |
# Loop through each input video to prepare its streams. | |
for i, path in enumerate(video_paths): | |
ffmpeg_cmd.extend(["-i", str(Path(path).resolve())]) | |
# --- VIDEO STREAM PREPARATION --- | |
video_label = f"v{i}" | |
# Scale video, pad to fit, set aspect ratio, and ensure standard pixel format. | |
filter_complex_parts.append( | |
f"[{i}:v:0]scale={target_w}:{target_h}:force_original_aspect_ratio=decrease,pad={target_w}:{target_h}:-1:-1:color=black,setsar=1,format=yuv420p[{video_label}]" | |
) | |
video_stream_labels.append(f"[{video_label}]") | |
# --- AUDIO STREAM PREPARATION --- | |
audio_label = f"a{i}" | |
if self._tb_has_audio_stream(path): | |
# If audio exists, standardize it to a common format. | |
filter_complex_parts.append( | |
f"[{i}:a:0]aformat=sample_fmts=fltp:sample_rates=44100:channel_layouts=stereo[{audio_label}]" | |
) | |
else: | |
# If no audio, get the video's duration first. | |
duration = self._tb_get_video_duration(path) | |
if duration: | |
# Then, generate a silent audio track of that exact duration. | |
self.message_manager.add_message(f"'{Path(path).name}' has no audio. Generating silent track of {float(duration):.2f}s.", "INFO") | |
filter_complex_parts.append( | |
f"anullsrc=channel_layout=stereo:sample_rate=44100,atrim=duration={duration}[{audio_label}]" | |
) | |
else: | |
# If we can't get duration, we can't create a silent track, so we must skip it. | |
self.message_manager.add_warning(f"Could not get duration for '{Path(path).name}' to generate silent audio. This track's audio will be skipped.") | |
continue | |
audio_stream_labels.append(f"[{audio_label}]") | |
# --- 3. CONCATENATE THE STREAMS --- | |
# Join all the prepared video and audio streams together into final output streams. | |
filter_complex_parts.append(f"{''.join(video_stream_labels)}concat=n={len(video_paths)}:v=1:a=0[outv]") | |
# Only add the audio concat filter if we successfully prepared audio streams. | |
if audio_stream_labels: | |
filter_complex_parts.append(f"{''.join(audio_stream_labels)}concat=n={len(audio_stream_labels)}:v=0:a=1[outa]") | |
final_filter_complex = ";".join(filter_complex_parts) | |
ffmpeg_cmd.extend(["-filter_complex", final_filter_complex]) | |
# --- 4. MAP AND ENCODE THE FINAL VIDEO --- | |
# Map the final concatenated video stream to the output. | |
ffmpeg_cmd.extend(["-map", "[outv]"]) | |
# If we have a final audio stream, map that too. | |
if audio_stream_labels: | |
ffmpeg_cmd.extend(["-map", "[outa]"]) | |
# Determine the output filename. | |
if output_base_name_override and isinstance(output_base_name_override, str) and output_base_name_override.strip(): | |
sanitized_name = "".join(c for c in output_base_name_override.strip() if c.isalnum() or c in (' ', '_', '-')).strip() | |
base_name_to_use = Path(sanitized_name).stem if sanitized_name else Path(video_paths[0]).stem | |
else: | |
base_name_to_use = Path(video_paths[0]).stem | |
output_path = self._tb_generate_output_path( | |
base_name_to_use, | |
suffix=f"joined_{len(video_paths)}_videos", | |
target_dir=self.toolbox_video_output_dir | |
) | |
# Set standard, high-compatibility encoding options. | |
ffmpeg_cmd.extend([ | |
"-c:v", "libx264", "-preset", "medium", "-crf", "20", | |
"-c:a", "aac", "-b:a", "192k", output_path | |
]) | |
# --- 5. EXECUTE THE COMMAND --- | |
try: | |
self.message_manager.add_message("Running FFmpeg to join videos. This may take a while...") | |
progress(0.5, desc=f"Joining {len(video_paths)} videos...") | |
subprocess.run(ffmpeg_cmd, check=True, capture_output=True, text=True, errors='ignore') | |
progress(1.0, desc="Join complete.") | |
self.message_manager.add_success(f"✅ Videos successfully joined! Output: {output_path}") | |
return output_path | |
except subprocess.CalledProcessError as e_join: | |
self._tb_log_ffmpeg_error(e_join, "video joining") | |
return None | |
except Exception as e: | |
self.message_manager.add_error(f"An unexpected error occurred during video joining: {e}") | |
self.message_manager.add_error(traceback.format_exc()) | |
return None | |
finally: | |
gc.collect() | |
def _tb_clean_filename(self, filename): | |
filename = re.sub(r'_\d{6}_\d{6}', '', filename) # Example timestamp pattern | |
filename = re.sub(r'_\d{6}_\d{4}', '', filename) # Another example | |
return filename.strip('_') | |
def tb_export_video(self, video_path: str, export_format: str, quality_slider: int, max_width: int, | |
output_base_name_override=None, progress=gr.Progress()): | |
if not video_path: | |
self.message_manager.add_warning("No input video for exporting.") | |
return None | |
if not self.has_ffmpeg: | |
self.message_manager.add_error("FFmpeg is required for exporting. This operation cannot proceed.") | |
return None | |
self.message_manager.add_message(f"🚀 Starting export to {export_format.upper()}...") | |
progress(0, desc=f"Preparing to export to {export_format.upper()}...") | |
resolved_video_path = str(Path(video_path).resolve()) | |
# --- Base FFmpeg Command --- | |
ffmpeg_cmd = [self.ffmpeg_exe, "-y", "-loglevel", "error", "-i", resolved_video_path] | |
# --- Video Filters (Resizing) --- | |
vf_parts = [] | |
# The scale filter resizes while maintaining aspect ratio. '-2' ensures the height is an even number for codec compatibility. | |
vf_parts.append(f"scale={max_width}:-2") | |
# --- Format-Specific Settings --- | |
ext = f".{export_format.lower()}" | |
if export_format == "MP4": | |
# CRF (Constant Rate Factor) is the quality setting for x264. Lower is higher quality. | |
# We map our 0-100 slider to a good CRF range (e.g., 28 (low) to 18 (high)). | |
crf_value = int(28 - (quality_slider / 100) * 10) | |
self.message_manager.add_message(f"Exporting MP4 with CRF: {crf_value} (Quality: {quality_slider}%)") | |
ffmpeg_cmd.extend(["-c:v", "libx264", "-crf", str(crf_value), "-preset", "medium"]) | |
ffmpeg_cmd.extend(["-c:a", "aac", "-b:a", "128k"]) # Keep audio, but compress it | |
elif export_format == "WebM": | |
# Similar to MP4, but for the VP9 codec. A good CRF range is ~35 (low) to 25 (high). | |
crf_value = int(35 - (quality_slider / 100) * 10) | |
self.message_manager.add_message(f"Exporting WebM with CRF: {crf_value} (Quality: {quality_slider}%)") | |
ffmpeg_cmd.extend(["-c:v", "libvpx-vp9", "-crf", str(crf_value), "-b:v", "0"]) | |
ffmpeg_cmd.extend(["-c:a", "libopus", "-b:a", "96k"]) # Use Opus for WebM audio | |
elif export_format == "GIF": | |
# High-quality GIF generation is a two-pass process. | |
self.message_manager.add_message("Generating high-quality GIF (2-pass)...") | |
# Pass 1: Generate a color palette. | |
palette_path = os.path.join(self._base_temp_output_dir, f"palette_{Path(video_path).stem}.png") | |
vf_parts.append("split[s0][s1];[s0]palettegen[p];[s1][p]paletteuse") | |
ffmpeg_cmd.extend(["-an"]) # No audio in GIFs | |
if vf_parts: | |
ffmpeg_cmd.extend(["-vf", ",".join(vf_parts)]) | |
# --- Output Path --- | |
if output_base_name_override and isinstance(output_base_name_override, str) and output_base_name_override.strip(): | |
sanitized_name = "".join(c for c in output_base_name_override.strip() if c.isalnum() or c in (' ', '_', '-')).strip() | |
base_name_to_use = Path(sanitized_name).stem if sanitized_name else Path(video_path).stem | |
else: | |
base_name_to_use = Path(video_path).stem | |
# --- SPECIAL HANDLING FOR GIF OUTPUT PATH --- | |
if export_format == "GIF": | |
# GIFs are always saved to the permanent directory to avoid being lost | |
# by Gradio's re-encoding for the video player preview. | |
target_dir_for_export = self._base_permanent_save_dir | |
self.message_manager.add_message("GIF export detected. Output will be forced to the permanent 'saved_videos' folder, ignoring Autosave setting.", "INFO") | |
else: | |
# For MP4/WebM, respect the current autosave setting | |
target_dir_for_export = self.toolbox_video_output_dir | |
output_path = self._tb_generate_output_path( | |
base_name_to_use, | |
suffix=f"exported_{quality_slider}q_{max_width}w", | |
target_dir=target_dir_for_export, | |
ext=ext | |
) | |
ffmpeg_cmd.append(output_path) | |
# --- Execute --- | |
try: | |
progress(0.5, desc=f"Encoding to {export_format.upper()}...") | |
subprocess.run(ffmpeg_cmd, check=True, capture_output=True, text=True, errors='ignore') | |
progress(1.0, desc="Export complete!") | |
# Add specific messaging for GIF vs other formats | |
if export_format == "GIF": | |
self.message_manager.add_success(f"✅ GIF successfully created and saved to: {output_path}") | |
self.message_manager.add_warning("⚠️ Note: The video player shows a re-encoded MP4 for preview. Your original GIF is in the output folder.") | |
else: | |
self.message_manager.add_success(f"✅ Successfully exported to {export_format.upper()}! Output: {output_path}") | |
return output_path | |
except subprocess.CalledProcessError as e: | |
self._tb_log_ffmpeg_error(e, f"export to {export_format.upper()}") | |
return None | |
except Exception as e: | |
self.message_manager.add_error(f"An unexpected error occurred during export: {e}") | |
self.message_manager.add_error(traceback.format_exc()) | |
return None | |
finally: | |
gc.collect() | |
def _tb_generate_output_path(self, input_material_name, suffix, target_dir, ext=".mp4"): | |
base_name = Path(input_material_name).stem | |
if not base_name: base_name = "untitled_video" | |
cleaned_name = self._tb_clean_filename(base_name) | |
timestamp = datetime.now().strftime("%y%m%d_%H%M%S") | |
filename = f"{cleaned_name}_{suffix}_{timestamp}{ext}" | |
return os.path.join(target_dir, filename) | |
def _tb_generate_output_folder_path(self, input_video_path, suffix): | |
base_name = Path(input_video_path).stem | |
if not base_name: base_name = "untitled_video_frames" | |
cleaned_name = self._tb_clean_filename(base_name) | |
timestamp = datetime.now().strftime("%y%m%d_%H%M%S") | |
folder_name = f"{cleaned_name}_{suffix}_{timestamp}" | |
return os.path.join(self.extracted_frames_target_path, folder_name) | |
def tb_copy_video_to_permanent_storage(self, temp_video_path): | |
if not temp_video_path or not os.path.exists(temp_video_path): | |
self.message_manager.add_error("No video file provided or file does not exist to save.") | |
return temp_video_path | |
try: | |
video_filename = Path(temp_video_path).name | |
permanent_video_path = os.path.join(self.toolbox_permanent_save_dir, video_filename) | |
os.makedirs(self.toolbox_permanent_save_dir, exist_ok=True) | |
self.message_manager.add_message(f"Copying '{video_filename}' to permanent storage: '{permanent_video_path}'") | |
shutil.copy2(temp_video_path, permanent_video_path) | |
self.message_manager.add_success(f"Video saved to: {permanent_video_path}") | |
return permanent_video_path | |
except Exception as e: | |
self.message_manager.add_error(f"Error saving video to permanent storage: {e}") | |
self.message_manager.add_error(traceback.format_exc()) | |
return temp_video_path | |
def tb_analyze_video_input(self, video_path): | |
if video_path is None: | |
self.message_manager.add_warning("No video provided for analysis.") | |
return "Please upload a video." | |
resolved_video_path = str(Path(video_path).resolve()) | |
analysis_report_lines = [] # Use a list to build the report string | |
file_size_bytes = 0 | |
file_size_display = "N/A" | |
try: | |
if os.path.exists(resolved_video_path): | |
file_size_bytes = os.path.getsize(resolved_video_path) | |
if file_size_bytes < 1024: | |
file_size_display = f"{file_size_bytes} B" | |
elif file_size_bytes < 1024**2: | |
file_size_display = f"{file_size_bytes/1024:.2f} KB" | |
elif file_size_bytes < 1024**3: | |
file_size_display = f"{file_size_bytes/1024**2:.2f} MB" | |
else: | |
file_size_display = f"{file_size_bytes/1024**3:.2f} GB" | |
except Exception as e: | |
self.message_manager.add_warning(f"Could not get file size: {e}") | |
# Variables to hold parsed info, initialized to defaults | |
video_width, video_height = 0, 0 | |
num_frames_value = None # For the upscale warning | |
duration_display, fps_display, resolution_display, nframes_display, has_audio_str = "N/A", "N/A", "N/A", "N/A", "No" | |
analysis_source = "imageio" # Default analysis source | |
if self.has_ffprobe: | |
self.message_manager.add_message(f"Analyzing video with ffprobe: {os.path.basename(video_path)}") | |
try: | |
probe_cmd = [ | |
self.ffprobe_exe, "-v", "error", "-show_format", "-show_streams", | |
"-of", "json", resolved_video_path | |
] | |
result = subprocess.run(probe_cmd, capture_output=True, text=True, check=True, errors='ignore') | |
probe_data = json.loads(result.stdout) | |
video_stream = next((s for s in probe_data.get("streams", []) if s.get("codec_type") == "video"), None) | |
audio_stream = next((s for s in probe_data.get("streams", []) if s.get("codec_type") == "audio"), None) | |
if not video_stream: | |
self.message_manager.add_error("No video stream found in the file (ffprobe).") | |
else: | |
analysis_source = "ffprobe" | |
duration_str = probe_data.get("format", {}).get("duration", "0") | |
duration = float(duration_str) if duration_str and duration_str.replace('.', '', 1).isdigit() else 0.0 | |
duration_display = f"{duration:.2f} seconds" | |
r_frame_rate_str = video_stream.get("r_frame_rate", "0/0") | |
avg_frame_rate_str = video_stream.get("avg_frame_rate", "0/0") | |
calculated_fps = 0.0 | |
def parse_fps(fps_s): | |
if isinstance(fps_s, (int, float)): return float(fps_s) | |
if isinstance(fps_s, str) and "/" in fps_s: | |
try: num, den = map(float, fps_s.split('/')); return num / den if den != 0 else 0.0 | |
except ValueError: return 0.0 | |
try: return float(fps_s) | |
except ValueError: return 0.0 | |
r_fps_val = parse_fps(r_frame_rate_str); avg_fps_val = parse_fps(avg_frame_rate_str) | |
if r_fps_val > 0: calculated_fps = r_fps_val; fps_display = f"{r_fps_val:.2f} FPS" | |
if avg_fps_val > 0 and abs(r_fps_val - avg_fps_val) > 0.01 : # Only show average if meaningfully different | |
calculated_fps = avg_fps_val # Prefer average if it's different and valid | |
fps_display = f"{avg_fps_val:.2f} FPS (Avg, r: {r_fps_val:.2f})" | |
elif avg_fps_val > 0 and r_fps_val <=0: | |
calculated_fps = avg_fps_val; fps_display = f"{avg_fps_val:.2f} FPS (Average)" | |
video_width = video_stream.get("width", 0) | |
video_height = video_stream.get("height", 0) | |
resolution_display = f"{video_width}x{video_height}" if video_width and video_height else "N/A" | |
nframes_str_probe = video_stream.get("nb_frames") | |
if nframes_str_probe and nframes_str_probe.isdigit(): | |
num_frames_value = int(nframes_str_probe) | |
nframes_display = str(num_frames_value) | |
elif duration > 0 and calculated_fps > 0: | |
num_frames_value = int(duration * calculated_fps) | |
nframes_display = f"{num_frames_value} (Calculated)" | |
if audio_stream: | |
has_audio_str = (f"Yes (Codec: {audio_stream.get('codec_name', 'N/A')}, " | |
f"Channels: {audio_stream.get('channels', 'N/A')}, " | |
f"Rate: {audio_stream.get('sample_rate', 'N/A')} Hz)") | |
self.message_manager.add_success("Video analysis complete (using ffprobe).") | |
except (subprocess.CalledProcessError, json.JSONDecodeError, Exception) as e_ffprobe: | |
self.message_manager.add_warning(f"ffprobe analysis failed ({type(e_ffprobe).__name__}). Trying imageio fallback.") | |
if isinstance(e_ffprobe, subprocess.CalledProcessError): | |
self._tb_log_ffmpeg_error(e_ffprobe, "video analysis with ffprobe") | |
analysis_source = "imageio" # Ensure fallback if ffprobe fails midway | |
if analysis_source == "imageio": # Either ffprobe not available, or it failed | |
self.message_manager.add_message(f"Analyzing video with imageio: {os.path.basename(video_path)}") | |
reader = None | |
try: | |
reader = imageio.get_reader(resolved_video_path) | |
meta = reader.get_meta_data() | |
duration_imgio_val = meta.get('duration') | |
duration_display = f"{float(duration_imgio_val):.2f} seconds" if duration_imgio_val is not None else "N/A" | |
fps_val_imgio = meta.get('fps') | |
fps_display = f"{float(fps_val_imgio):.2f} FPS" if fps_val_imgio is not None else "N/A" | |
size_imgio = meta.get('size') | |
if isinstance(size_imgio, tuple) and len(size_imgio) == 2: | |
video_width, video_height = int(size_imgio[0]), int(size_imgio[1]) | |
resolution_display = f"{video_width}x{video_height}" | |
else: | |
resolution_display = "N/A" | |
nframes_val_imgio_meta = meta.get('nframes') | |
if nframes_val_imgio_meta not in [float('inf'), "N/A", None] and isinstance(nframes_val_imgio_meta, (int,float)): | |
num_frames_value = int(nframes_val_imgio_meta) | |
nframes_display = str(num_frames_value) | |
elif hasattr(reader, 'count_frames'): | |
try: | |
nframes_val_imgio_count = reader.count_frames() | |
if nframes_val_imgio_count != float('inf'): | |
num_frames_value = int(nframes_val_imgio_count) | |
nframes_display = f"{num_frames_value} (Counted)" | |
else: nframes_display = "Unknown (Stream or very long)" | |
except Exception: nframes_display = "Unknown (Frame count failed)" | |
has_audio_str = "(Audio info not available via imageio)" | |
self.message_manager.add_success("Video analysis complete (using imageio).") | |
except Exception as e_imgio: | |
self.message_manager.add_error(f"Error analyzing video with imageio: {e_imgio}") | |
import traceback | |
self.message_manager.add_error(traceback.format_exc()) | |
return f"Error analyzing video: Both ffprobe (if attempted) and imageio failed." | |
finally: | |
if reader: reader.close() | |
# --- Construct Main Analysis Report --- | |
analysis_report_lines.append(f"Video Analysis ({analysis_source}):") | |
analysis_report_lines.append(f"File: {os.path.basename(video_path)}") | |
analysis_report_lines.append("------------------------------------") | |
analysis_report_lines.append(f"File Size: {file_size_display}") | |
analysis_report_lines.append(f"Duration: {duration_display}") | |
analysis_report_lines.append(f"Frame Rate: {fps_display}") | |
analysis_report_lines.append(f"Resolution: {resolution_display}") | |
analysis_report_lines.append(f"Frames: {nframes_display}") | |
analysis_report_lines.append(f"Audio: {has_audio_str}") | |
analysis_report_lines.append(f"Source: {video_path}") | |
# --- Append UPSCALE ADVISORY Conditionally --- | |
if video_width > 0 and video_height > 0: # Ensure we have dimensions | |
HD_WIDTH_THRESHOLD = 1920 | |
FOUR_K_WIDTH_THRESHOLD = 3800 | |
is_hd_or_larger = (video_width >= HD_WIDTH_THRESHOLD or video_height >= (HD_WIDTH_THRESHOLD * 9/16 * 0.95)) # Adjusted height for aspect ratios | |
is_4k_or_larger = (video_width >= FOUR_K_WIDTH_THRESHOLD or video_height >= (FOUR_K_WIDTH_THRESHOLD * 9/16 * 0.95)) | |
upscale_warnings = [] | |
if is_4k_or_larger: | |
upscale_warnings.append( | |
"This video is 4K resolution or higher. Upscaling (e.g., to 8K+) will be very " | |
"slow, memory-intensive, and may cause issues. Proceed with caution." | |
) | |
elif is_hd_or_larger: | |
upscale_warnings.append( | |
"This video is HD or larger. Upscaling (e.g., to 4K+) will be resource-intensive " | |
"and slow. Ensure your system is prepared." | |
) | |
if num_frames_value and num_frames_value > 900: # e.g., > 30 seconds at 30fps | |
upscale_warnings.append( | |
f"With {num_frames_value} frames, upscaling will also be very time-consuming." | |
) | |
if upscale_warnings: | |
analysis_report_lines.append("\n--- UPSCALE ADVISORY ---") | |
for warning_msg in upscale_warnings: | |
analysis_report_lines.append(f"⚠️ {warning_msg}") | |
# analysis_report_lines.append("------------------------") # Optional closing separator | |
return "\n".join(analysis_report_lines) | |
def _tb_has_audio_stream(self, video_path_to_check): | |
if not self.has_ffprobe: # Critical check | |
self.message_manager.add_warning( | |
"FFprobe not available. Cannot reliably determine if video has audio. " | |
"Assuming no audio for operations requiring this check. " | |
"Install FFmpeg with ffprobe for full audio support." | |
) | |
return False | |
try: | |
resolved_path = str(Path(video_path_to_check).resolve()) | |
ffprobe_cmd = [ | |
self.ffprobe_exe, "-v", "error", "-select_streams", "a:0", | |
"-show_entries", "stream=codec_type", "-of", "csv=p=0", resolved_path | |
] | |
# check=False because a non-zero return often means no audio stream, which is a valid outcome here. | |
audio_check_result = subprocess.run(ffprobe_cmd, capture_output=True, text=True, check=False, errors='ignore') | |
if audio_check_result.returncode == 0 and "audio" in audio_check_result.stdout.strip().lower(): | |
return True | |
else: | |
# Optionally log if ffprobe ran but found no audio, or if it errored for other reasons | |
# if audio_check_result.returncode != 0 and audio_check_result.stderr: | |
# self.message_manager.add_message(f"FFprobe check for audio stream in {os.path.basename(video_path_to_check)} completed. Stderr: {audio_check_result.stderr.strip()}", "DEBUG") | |
return False | |
except FileNotFoundError: | |
self.message_manager.add_warning("FFprobe executable not found during audio stream check (should have been caught by self.has_ffprobe). Assuming no audio.") | |
return False # Should ideally not happen if self.has_ffprobe is true and self.ffprobe_exe is set | |
except Exception as e: | |
self.message_manager.add_warning(f"Error checking for audio stream in {os.path.basename(video_path_to_check)}: {e}. Assuming no audio.") | |
return False | |
def tb_process_frames(self, video_path, target_fps_mode, speed_factor, use_streaming: bool, progress=gr.Progress()): | |
if video_path is None: self.message_manager.add_warning("No input video for frame processing."); return None | |
final_output_path = None | |
reader = None | |
writer = None | |
video_stream_output_path = None | |
try: | |
interpolation_factor = 1 | |
if "2x" in target_fps_mode: interpolation_factor = 2 | |
elif "4x" in target_fps_mode: interpolation_factor = 4 | |
should_interpolate = interpolation_factor > 1 | |
self.message_manager.add_message(f"Starting frame processing: FPS Mode: {target_fps_mode}, Speed: {speed_factor}x") | |
progress(0, desc="Initializing...") | |
resolved_video_path = str(Path(video_path).resolve()) | |
reader = imageio.get_reader(resolved_video_path) | |
meta_data = reader.get_meta_data() | |
original_fps = meta_data.get('fps', 30.0) | |
output_fps = original_fps * interpolation_factor | |
self.message_manager.add_message( | |
f"User selected {'Streaming (low memory)' if use_streaming else 'In-Memory (fast)'} mode for frame processing." | |
) | |
if use_streaming and speed_factor != 1.0: | |
self.message_manager.add_warning("Speed factor is not applied in Streaming Interpolation mode. Processing at 1.0x speed.") | |
speed_factor = 1.0 | |
op_suffix_parts = [] | |
if speed_factor != 1.0: op_suffix_parts.append(f"speed{speed_factor:.2f}x".replace('.',',')) | |
if should_interpolate: op_suffix_parts.append(f"RIFE{interpolation_factor}x") | |
op_suffix = "_".join(op_suffix_parts) if op_suffix_parts else "processed" | |
temp_video_suffix = f"{op_suffix}_temp_video" | |
video_stream_output_path = self._tb_generate_output_path(resolved_video_path, suffix=temp_video_suffix, target_dir=self.toolbox_video_output_dir) | |
final_muxed_output_path = video_stream_output_path.replace("_temp_video", "") | |
# --- PROCESSING BLOCK --- | |
if use_streaming: | |
# --- STREAMING (LOW MEMORY) PATH - WITH FULL 4X LOGIC --- | |
if not should_interpolate: | |
self.message_manager.add_warning("Streaming mode selected but no interpolation chosen. Writing video without changes.") | |
writer = imageio.get_writer(video_stream_output_path, fps=original_fps, quality=VIDEO_QUALITY, macro_block_size=None) | |
for frame in reader: | |
writer.append_data(frame) | |
else: | |
writer = imageio.get_writer(video_stream_output_path, fps=output_fps, quality=VIDEO_QUALITY, macro_block_size=None) | |
self.message_manager.add_message(f"Attempting to load RIFE model for {interpolation_factor}x interpolation...") | |
if not self.rife_handler._ensure_model_downloaded_and_loaded(): | |
self.message_manager.add_error("RIFE model could not be loaded. Aborting."); return None | |
n_frames = self._tb_get_video_frame_count(resolved_video_path) | |
if n_frames is None: | |
self.message_manager.add_error("Cannot determine video length for streaming progress. Aborting.") | |
return None | |
num_passes = int(math.log2(interpolation_factor)) | |
desc = f"RIFE Pass 1/{num_passes} (Streaming)" | |
self.message_manager.add_message(desc) | |
try: | |
frame1_np = next(iter(reader)) | |
except StopIteration: | |
self.message_manager.add_warning("Video has no frames."); return None | |
# This list will only be used if we are doing a 4x (2-pass) interpolation | |
intermediate_frames_for_pass2 = [frame1_np] if num_passes > 1 else None | |
# Loop for the first pass (2x) | |
for i, frame2_np in enumerate(reader, 1): | |
progress(i / (n_frames - 1), desc=desc) | |
# For 2x mode (num_passes == 1), we write directly to the file. | |
if num_passes == 1: | |
writer.append_data(frame1_np) | |
middle_frame_np = self.rife_handler.interpolate_between_frames(frame1_np, frame2_np) | |
if middle_frame_np is not None: | |
if num_passes == 1: | |
writer.append_data(middle_frame_np) | |
# For 4x mode, we collect the 2x results in a list. | |
if intermediate_frames_for_pass2 is not None: | |
intermediate_frames_for_pass2.append(middle_frame_np) | |
else: # On failure, duplicate previous frame | |
if num_passes == 1: | |
writer.append_data(frame1_np) | |
if intermediate_frames_for_pass2 is not None: | |
intermediate_frames_for_pass2.append(frame1_np) | |
# Add the "end" frame of the pair to our intermediate list for the next pass | |
if intermediate_frames_for_pass2 is not None: | |
intermediate_frames_for_pass2.append(frame2_np) | |
frame1_np = frame2_np | |
if num_passes == 1: | |
writer.append_data(frame1_np) | |
if num_passes > 1 and intermediate_frames_for_pass2: | |
self.message_manager.add_message(f"RIFE Pass 2/{num_passes}: Interpolating 2x frames (in-memory)...") | |
pass2_iterator = progress.tqdm( | |
range(len(intermediate_frames_for_pass2) - 1), | |
desc=f"RIFE Pass 2/{num_passes}" | |
) | |
# Loop through the 2x frames to create 4x frames, mirroring the IN-MEMORY logic. | |
for i in pass2_iterator: | |
p2_frame1 = intermediate_frames_for_pass2[i] | |
p2_frame2 = intermediate_frames_for_pass2[i+1] | |
# Write the "start" frame of the pair | |
writer.append_data(p2_frame1) | |
# Interpolate and write the middle frame | |
p2_middle = self.rife_handler.interpolate_between_frames(p2_frame1, p2_frame2) | |
if p2_middle is not None: | |
writer.append_data(p2_middle) | |
else: # On failure, duplicate | |
writer.append_data(p2_frame1) | |
# After the loop, write the very last frame of the entire list | |
writer.append_data(intermediate_frames_for_pass2[-1]) | |
else: | |
# --- IN-MEMORY (FAST) PATH --- | |
self.message_manager.add_message("Reading all video frames into memory...") | |
video_frames = [frame for frame in reader] | |
processed_frames = video_frames | |
if speed_factor != 1.0: | |
self.message_manager.add_message(f"Adjusting speed by {speed_factor}x (in-memory)...") | |
if speed_factor > 1.0: | |
indices = np.arange(0, len(video_frames), speed_factor).astype(int) | |
processed_frames = [video_frames[i] for i in indices if i < len(video_frames)] | |
else: | |
new_len = int(len(video_frames) / speed_factor) | |
indices = np.linspace(0, len(video_frames) - 1, new_len).astype(int) | |
processed_frames = [video_frames[i] for i in indices] | |
if should_interpolate and len(processed_frames) > 1: | |
self.message_manager.add_message(f"Loading RIFE for {interpolation_factor}x interpolation (in-memory)...") | |
if not self.rife_handler._ensure_model_downloaded_and_loaded(): | |
self.message_manager.add_error("RIFE model could not be loaded."); return None | |
num_passes = int(math.log2(interpolation_factor)) | |
for p in range(num_passes): | |
self.message_manager.add_message(f"RIFE Pass {p+1}/{num_passes} (in-memory)...") | |
interpolated_this_pass = [] | |
frame_iterator = progress.tqdm(range(len(processed_frames) - 1), desc=f"RIFE Pass {p+1}/{num_passes}") | |
for i in frame_iterator: | |
interpolated_this_pass.append(processed_frames[i]) | |
middle_frame = self.rife_handler.interpolate_between_frames(processed_frames[i], processed_frames[i+1]) | |
interpolated_this_pass.append(middle_frame if middle_frame is not None else processed_frames[i]) | |
interpolated_this_pass.append(processed_frames[-1]) | |
processed_frames = interpolated_this_pass | |
self.message_manager.add_message(f"Writing {len(processed_frames)} frames to file...") | |
writer = imageio.get_writer(video_stream_output_path, fps=output_fps, quality=VIDEO_QUALITY, macro_block_size=None) | |
for frame in progress.tqdm(processed_frames, desc="Writing frames"): | |
writer.append_data(frame) | |
# --- Universal Teardown & Muxing --- | |
if writer: writer.close() | |
if reader: reader.close() | |
writer, reader = None, None | |
final_output_path = final_muxed_output_path | |
can_process_audio = self.has_ffmpeg | |
original_video_has_audio = self._tb_has_audio_stream(resolved_video_path) if can_process_audio else False | |
if can_process_audio and original_video_has_audio: | |
self.message_manager.add_message("Original video has audio. Processing audio with FFmpeg...") | |
progress(0.9, desc="Processing audio...") | |
ffmpeg_mux_cmd = [self.ffmpeg_exe, "-y", "-loglevel", "error", "-i", video_stream_output_path] | |
audio_filters = [] | |
if speed_factor != 1.0: | |
if 0.5 <= speed_factor <= 100.0: | |
audio_filters.append(f"atempo={speed_factor:.4f}") | |
elif speed_factor < 0.5: # Needs multiple 0.5 steps | |
num_half_steps = int(np.ceil(np.log(speed_factor) / np.log(0.5))) | |
for _ in range(num_half_steps): audio_filters.append("atempo=0.5") | |
final_factor = speed_factor / (0.5**num_half_steps) | |
if abs(final_factor - 1.0) > 1e-4 and 0.5 <= final_factor <= 100.0: # Add final adjustment if needed | |
audio_filters.append(f"atempo={final_factor:.4f}") | |
elif speed_factor > 100.0: # Needs multiple 2.0 (or higher, like 100.0) steps | |
num_double_steps = int(np.ceil(np.log(speed_factor / 100.0) / np.log(2.0))) # Example for steps of 2 after 100 | |
audio_filters.append("atempo=100.0") # Max one step | |
remaining_factor = speed_factor / 100.0 | |
if abs(remaining_factor - 1.0) > 1e-4 and 0.5 <= remaining_factor <= 100.0: | |
audio_filters.append(f"atempo={remaining_factor:.4f}") | |
self.message_manager.add_message(f"Applying audio speed adjustment with atempo: {','.join(audio_filters) if audio_filters else 'None (speed_factor out of simple atempo range or 1.0)'}") | |
ffmpeg_mux_cmd.extend(["-i", resolved_video_path]) # Input for audio | |
ffmpeg_mux_cmd.extend(["-c:v", "copy"]) | |
if audio_filters: | |
ffmpeg_mux_cmd.extend(["-filter:a", ",".join(audio_filters)]) | |
# Always re-encode audio to AAC for MP4 compatibility, even if no speed change, | |
# as original audio might not be AAC. | |
ffmpeg_mux_cmd.extend(["-c:a", "aac", "-b:a", "192k"]) | |
ffmpeg_mux_cmd.extend(["-map", "0:v:0", "-map", "1:a:0?", "-shortest", final_muxed_output_path]) | |
try: | |
subprocess.run(ffmpeg_mux_cmd, check=True, capture_output=True, text=True) | |
self.message_manager.add_success(f"Video saved with processed audio: {final_muxed_output_path}") | |
except subprocess.CalledProcessError as e_mux: | |
self._tb_log_ffmpeg_error(e_mux, "audio processing/muxing") | |
if os.path.exists(final_muxed_output_path): os.remove(final_muxed_output_path) | |
os.rename(video_stream_output_path, final_muxed_output_path) | |
else: | |
if original_video_has_audio and not can_process_audio: | |
self.message_manager.add_warning("Original video has audio, but FFmpeg is not available to process it. Output will be silent.") | |
if os.path.exists(final_muxed_output_path) and final_muxed_output_path != video_stream_output_path: | |
os.remove(final_muxed_output_path) | |
os.rename(video_stream_output_path, final_muxed_output_path) | |
if os.path.exists(video_stream_output_path) and video_stream_output_path != final_muxed_output_path: | |
try: os.remove(video_stream_output_path) | |
except Exception as e_clean: self.message_manager.add_warning(f"Could not remove temp video file {video_stream_output_path}: {e_clean}") | |
progress(1.0, desc="Complete.") | |
self.message_manager.add_success(f"Frame processing complete: {final_output_path}") | |
return final_output_path | |
except Exception as e: | |
self.message_manager.add_error(f"Error during frame processing: {e}") | |
import traceback; self.message_manager.add_error(traceback.format_exc()) | |
progress(1.0, desc="Error.") | |
return None | |
finally: | |
if reader and not reader.closed: reader.close() | |
if writer and not writer.closed: writer.close() | |
if self.rife_handler: self.rife_handler.unload_model() | |
devicetorch.empty_cache(torch); gc.collect() | |
def tb_create_loop(self, video_path, loop_type, num_loops, progress=gr.Progress()): | |
if video_path is None: self.message_manager.add_warning("No input video for loop creation."); return None | |
if not self.has_ffmpeg: # FFmpeg is essential for this function's stream_loop and complex filter | |
self.message_manager.add_error("FFmpeg is required for creating video loops. This operation cannot proceed.") | |
return video_path # Return original video path | |
if loop_type == "none": self.message_manager.add_message("Loop type 'none'. No action."); return video_path | |
progress(0, desc="Initializing loop creation...") | |
resolved_video_path = str(Path(video_path).resolve()) | |
output_path = self._tb_generate_output_path( | |
resolved_video_path, | |
suffix=f"{loop_type}_{num_loops}x", | |
target_dir=self.toolbox_video_output_dir | |
) | |
self.message_manager.add_message(f"Creating {loop_type} ({num_loops}x) for {os.path.basename(resolved_video_path)}...") | |
ping_pong_unit_path = None | |
original_video_has_audio = self._tb_has_audio_stream(resolved_video_path) # Check once | |
try: | |
progress(0.2, desc=f"Preparing {loop_type} loop...") | |
if loop_type == "ping-pong": | |
ping_pong_unit_path = self._tb_generate_output_path( | |
resolved_video_path, | |
suffix="pingpong_unit_temp", | |
target_dir=self.toolbox_video_output_dir | |
) | |
# Create video-only ping-pong unit first | |
ffmpeg_pp_unit_cmd = [ | |
self.ffmpeg_exe, "-y", "-loglevel", "error", | |
"-i", resolved_video_path, | |
"-vf", "split[main][tmp];[tmp]reverse[rev];[main][rev]concat=n=2:v=1:a=0", # Video only | |
"-an", ping_pong_unit_path | |
] | |
subprocess.run(ffmpeg_pp_unit_cmd, check=True, capture_output=True, text=True) | |
self.message_manager.add_message(f"Created ping-pong unit (video-only): {ping_pong_unit_path}") | |
ffmpeg_cmd = [ | |
self.ffmpeg_exe, "-y", "-loglevel", "error", | |
"-stream_loop", str(num_loops - 1), # Loop the video unit N-1 times (total N plays) | |
"-i", ping_pong_unit_path, | |
"-c:v", "copy" | |
] | |
if original_video_has_audio: | |
self.message_manager.add_message("Original video has audio. Will loop audio for ping-pong.") | |
# Audio duration needs to match 2T * num_loops. FFmpeg aloop is complex. | |
# A simpler approach for ping-pong audio might be to create a 2T audio segment (original + reversed original) | |
# then loop that audio segment N times and mux with the N-times looped ping-pong video. | |
# Current FFmpeg command for aloop: | |
audio_loop_count_for_ffmpeg = (num_loops * 2) -1 # Total plays of original audio needed, minus one for initial play by -i | |
ffmpeg_cmd.extend([ | |
"-i", resolved_video_path, # Original video for audio source | |
"-filter_complex", f"[1:a]areverse[areva];[1:a][areva]concat=n=2:v=0:a=1[ppa];[ppa]aloop=loop={num_loops-1}:size=2147483647[a_looped]", | |
"-map", "0:v:0", "-map", "[a_looped]", | |
"-c:a", "aac", "-b:a", "192k", "-shortest" | |
]) | |
else: | |
self.message_manager.add_message("No audio in original or detection issue. Creating video-only ping-pong loop.") | |
ffmpeg_cmd.extend(["-an"]) | |
ffmpeg_cmd.append(output_path) | |
else: # Regular 'loop' | |
# Hi Gemini/Claude! | |
# The 'num_loops' parameter comes from the UI slider labeled "Number of Loops/Repeats". | |
# The user's intent is that this slider value (num_loops) represents the number of | |
# *additional* times the video should be repeated after its initial playthrough. | |
# For example, if num_loops = 1 (from slider), the video should play twice (original + 1 repeat). | |
# | |
# FFmpeg's -stream_loop option takes a value (let's call it X_ffmpeg), | |
# meaning the input is looped X_ffmpeg times *in addition* to the first play. | |
# So, X_ffmpeg should be equal to the slider value 'num_loops'. | |
ffmpeg_stream_loop_value = num_loops | |
# Ensure ffmpeg_stream_loop_value is non-negative. | |
# Given the UI slider minimum is typically 1, num_loops should always be >= 1. | |
# This check is for robustness if the input num_loops could ever be less than 0 | |
# (e.g., if UI constraints change or input comes from elsewhere). | |
if ffmpeg_stream_loop_value < 0: | |
ffmpeg_stream_loop_value = 0 # Should ideally not be hit if slider min is 1. | |
# Total plays will be the original play + ffmpeg_stream_loop_value additional plays. | |
total_plays = ffmpeg_stream_loop_value + 1 | |
self.message_manager.add_message( | |
f"Regular loop: original video + {ffmpeg_stream_loop_value} additional repeat(s). Total {total_plays} plays." | |
) | |
ffmpeg_cmd = [ | |
self.ffmpeg_exe, "-y", "-loglevel", "error", | |
"-stream_loop", str(ffmpeg_stream_loop_value), # This now uses num_loops directly | |
"-i", resolved_video_path, | |
"-c:v", "copy" | |
] | |
if original_video_has_audio: | |
self.message_manager.add_message("Original video has audio. Re-encoding to AAC for looped MP4 (if not already AAC).") | |
ffmpeg_cmd.extend(["-c:a", "aac", "-b:a", "192k", "-map", "0:v:0", "-map", "0:a:0?"]) | |
else: | |
self.message_manager.add_message("No audio in original or detection issue. Looped video will be silent.") | |
ffmpeg_cmd.extend(["-an", "-map", "0:v:0"]) | |
ffmpeg_cmd.append(output_path) | |
self.message_manager.add_message(f"Processing video {loop_type} with FFmpeg...") | |
progress(0.5, desc=f"Running FFmpeg for {loop_type}...") | |
subprocess.run(ffmpeg_cmd, check=True, capture_output=True, text=True, errors='ignore') | |
progress(1.0, desc=f"{loop_type.capitalize()} loop created successfully.") | |
self.message_manager.add_success(f"Loop creation complete: {output_path}") | |
return output_path | |
except subprocess.CalledProcessError as e_loop: | |
self._tb_log_ffmpeg_error(e_loop, f"{loop_type} creation") | |
progress(1.0, desc=f"Error creating {loop_type}.") | |
return None | |
except Exception as e: | |
self.message_manager.add_error(f"Error creating loop: {e}") | |
import traceback; self.message_manager.add_error(traceback.format_exc()) | |
progress(1.0, desc="Error creating loop.") | |
return None | |
finally: | |
if ping_pong_unit_path and os.path.exists(ping_pong_unit_path): | |
try: os.remove(ping_pong_unit_path) | |
except Exception as e_clean_pp: self.message_manager.add_warning(f"Could not remove temp ping-pong unit: {e_clean_pp}") | |
gc.collect() | |
def _tb_get_video_dimensions(self, video_path): | |
video_width, video_height = 0, 0 | |
# Prefer ffprobe if available for dimensions | |
if self.has_ffprobe: | |
try: | |
probe_cmd = [self.ffprobe_exe, "-v", "error", "-select_streams", "v:0", | |
"-show_entries", "stream=width,height", "-of", "csv=s=x:p=0", video_path] | |
result = subprocess.run(probe_cmd, capture_output=True, text=True, check=True, errors='ignore') | |
w_str, h_str = result.stdout.strip().split('x') | |
video_width, video_height = int(w_str), int(h_str) | |
if video_width > 0 and video_height > 0: return video_width, video_height | |
except Exception as e_probe_dim: | |
self.message_manager.add_warning(f"ffprobe failed to get dimensions ({e_probe_dim}), trying imageio.") | |
# Fallback to imageio | |
reader = None | |
try: | |
reader = imageio.get_reader(video_path) | |
meta = reader.get_meta_data() | |
size_imgio = meta.get('size') | |
if size_imgio and isinstance(size_imgio, tuple) and len(size_imgio) == 2: | |
video_width, video_height = int(size_imgio[0]), int(size_imgio[1]) | |
except Exception as e_meta: | |
self.message_manager.add_warning(f"Error getting video dimensions for vignette (imageio): {e_meta}. Defaulting aspect to 1/1.") | |
finally: | |
if reader: reader.close() | |
return video_width, video_height # Might be 0,0 if all failed | |
def _tb_create_vignette_filter(self, strength_percent, width, height): | |
min_angle_rad = math.pi / 3.5; max_angle_rad = math.pi / 2 | |
normalized_strength = strength_percent / 100.0 | |
angle_rad = min_angle_rad + normalized_strength * (max_angle_rad - min_angle_rad) | |
vignette_aspect_ratio_val = "1/1" | |
if width > 0 and height > 0: vignette_aspect_ratio_val = f"{width/height:.4f}" | |
return f"vignette=angle={angle_rad:.4f}:mode=forward:eval=init:aspect={vignette_aspect_ratio_val}" | |
def tb_apply_filters(self, video_path, brightness, contrast, saturation, temperature, | |
sharpen, blur, denoise, vignette, s_curve_contrast, film_grain_strength, | |
progress=gr.Progress()): | |
if video_path is None: self.message_manager.add_warning("No input video for filters."); return None | |
if not self.has_ffmpeg: # FFmpeg is essential for this function | |
self.message_manager.add_error("FFmpeg is required for applying video filters. This operation cannot proceed.") | |
return video_path | |
progress(0, desc="Initializing filter application...") | |
resolved_video_path = str(Path(video_path).resolve()) | |
output_path = self._tb_generate_output_path(resolved_video_path, "filtered", self.toolbox_video_output_dir) | |
self.message_manager.add_message(f"🎨 Applying filters to {os.path.basename(resolved_video_path)}...") | |
video_width, video_height = 0,0 | |
if vignette > 0: # Only get dimensions if vignette is used | |
video_width, video_height = self._tb_get_video_dimensions(resolved_video_path) | |
if video_width > 0 and video_height > 0: self.message_manager.add_message(f"Video dimensions for vignette: {video_width}x{video_height}", "DEBUG") | |
filters, applied_filter_descriptions = [], [] | |
# Filter definitions | |
if denoise > 0: filters.append(f"hqdn3d={denoise*0.8:.1f}:{denoise*0.6:.1f}:{denoise*0.7:.1f}:{denoise*0.5:.1f}"); applied_filter_descriptions.append(f"Denoise (hqdn3d)") | |
if temperature != 0: mid_shift = (temperature/100.0)*0.3; filters.append(f"colorbalance=rm={mid_shift:.2f}:bm={-mid_shift:.2f}"); applied_filter_descriptions.append(f"Color Temp") | |
eq_parts = []; desc_eq = [] | |
if brightness != 0: eq_parts.append(f"brightness={brightness/100.0:.2f}"); desc_eq.append(f"Brightness") | |
if contrast != 1: eq_parts.append(f"contrast={contrast:.2f}"); desc_eq.append(f"Contrast (Linear)") | |
if saturation != 1: eq_parts.append(f"saturation={saturation:.2f}"); desc_eq.append(f"Saturation") | |
if eq_parts: filters.append(f"eq={':'.join(eq_parts)}"); applied_filter_descriptions.append(" & ".join(desc_eq)) | |
if s_curve_contrast > 0: s = s_curve_contrast/100.0; y1 = 0.25-s*(0.25-0.10); y2 = 0.75+s*(0.90-0.75); filters.append(f"curves=all='0/0 0.25/{y1:.2f} 0.75/{y2:.2f} 1/1'"); applied_filter_descriptions.append(f"S-Curve Contrast") | |
if blur > 0: filters.append(f"gblur=sigma={blur*0.4:.1f}"); applied_filter_descriptions.append(f"Blur") | |
if sharpen > 0: filters.append(f"unsharp=luma_msize_x=5:luma_msize_y=5:luma_amount={sharpen*0.3:.2f}"); applied_filter_descriptions.append(f"Sharpen") | |
if film_grain_strength > 0: filters.append(f"noise=alls={film_grain_strength*0.5:.1f}:allf=t+u"); applied_filter_descriptions.append(f"Film Grain") | |
if vignette > 0: filters.append(self._tb_create_vignette_filter(vignette, video_width, video_height)); applied_filter_descriptions.append(f"Vignette") | |
# --- CORRECTED LOGIC --- | |
if applied_filter_descriptions: | |
self.message_manager.add_message("🔧 Applying FFmpeg filters: " + ", ".join(applied_filter_descriptions)) | |
else: | |
self.message_manager.add_message("ℹ️ No filters selected. Passing video through (re-encoding).") | |
progress(0.2, desc="Preparing filter command...") | |
original_video_has_audio = self._tb_has_audio_stream(resolved_video_path) | |
try: | |
ffmpeg_cmd = [ | |
self.ffmpeg_exe, "-y", "-loglevel", "error", "-i", resolved_video_path | |
] | |
# Conditionally add the video filter flag only if there are filters to apply | |
if filters: | |
ffmpeg_cmd.extend(["-vf", ",".join(filters)]) | |
# Add the rest of the encoding options | |
ffmpeg_cmd.extend([ | |
"-c:v", "libx264", "-preset", "medium", "-crf", "20", | |
"-pix_fmt", "yuv420p", | |
"-map", "0:v:0" | |
]) | |
if original_video_has_audio: | |
self.message_manager.add_message("Original video has audio. Re-encoding to AAC for filtered video.", "INFO") | |
ffmpeg_cmd.extend(["-c:a", "aac", "-b:a", "192k", "-map", "0:a:0?"]) | |
else: | |
self.message_manager.add_message("No audio in original or detection issue. Filtered video will be silent.", "INFO") | |
ffmpeg_cmd.extend(["-an"]) | |
ffmpeg_cmd.append(output_path) | |
self.message_manager.add_message("🔄 Processing with FFmpeg...") | |
progress(0.5, desc="Running FFmpeg for filters...") | |
subprocess.run(ffmpeg_cmd, check=True, capture_output=True, text=True, errors='ignore') | |
progress(1.0, desc="Filters applied successfully.") | |
self.message_manager.add_success(f"✅ Filter step complete! Output: {output_path}") | |
return output_path | |
except subprocess.CalledProcessError as e_filters: | |
self._tb_log_ffmpeg_error(e_filters, "filter application") | |
progress(1.0, desc="Error applying filters."); return None | |
except Exception as e: | |
self.message_manager.add_error(f"❌ An unexpected error occurred: {e}") | |
import traceback; self.message_manager.add_error(traceback.format_exc()) | |
progress(1.0, desc="Error applying filters."); return None | |
finally: gc.collect() | |
def tb_upscale_video(self, video_path, model_key: str, output_scale_factor_ui: float, | |
tile_size: int, enhance_face: bool, | |
denoise_strength_ui: float | None, | |
use_streaming: bool, # New parameter from UI | |
progress=gr.Progress()): | |
if video_path is None: self.message_manager.add_warning("No input video for upscaling."); return None | |
reader = None | |
writer = None | |
final_output_path = None | |
video_stream_output_path = None | |
try: | |
# --- Model Loading and Setup --- | |
if model_key not in self.esrgan_upscaler.supported_models: | |
self.message_manager.add_error(f"Upscale model key '{model_key}' not found in supported models."); return None | |
model_native_scale = self.esrgan_upscaler.supported_models[model_key].get('scale', 0) | |
tile_size_str_for_log = str(tile_size) if tile_size > 0 else "Auto" | |
face_enhance_str_for_log = "+FaceEnhance" if enhance_face else "" | |
denoise_str_for_log = "" | |
if model_key == "RealESR-general-x4v3" and denoise_strength_ui is not None: | |
denoise_str_for_log = f", DNI: {denoise_strength_ui:.2f}" | |
self.message_manager.add_message( | |
f"Preparing to load ESRGAN model '{model_key}' for {output_scale_factor_ui:.2f}x target upscale " | |
f"(Native: {model_native_scale}x, Tile: {tile_size_str_for_log}{face_enhance_str_for_log}{denoise_str_for_log})." | |
) | |
progress(0.05, desc=f"Loading ESRGAN model '{model_key}'...") | |
upsampler_instance = self.esrgan_upscaler.load_model( | |
model_key=model_key, | |
tile_size=tile_size, | |
denoise_strength=denoise_strength_ui if model_key == "RealESR-general-x4v3" else None | |
) | |
if not upsampler_instance: | |
self.message_manager.add_error(f"Could not load ESRGAN model '{model_key}'. Aborting."); return None | |
if enhance_face: | |
if not self.esrgan_upscaler._load_face_enhancer(bg_upsampler=upsampler_instance): | |
self.message_manager.add_warning("GFPGAN load failed. Proceeding without face enhancement.") | |
enhance_face = False | |
self.message_manager.add_message(f"ESRGAN model '{model_key}' loaded. Initializing process...") | |
resolved_video_path = str(Path(video_path).resolve()) | |
reader = imageio.get_reader(resolved_video_path) | |
meta_data = reader.get_meta_data() | |
original_fps = meta_data.get('fps', 30.0) | |
# --- Define output paths --- | |
temp_video_suffix_base = f"upscaled_{model_key}{'_FaceEnhance' if enhance_face else ''}" | |
if model_key == "RealESR-general-x4v3" and denoise_strength_ui is not None: | |
temp_video_suffix_base += f"_dni{denoise_strength_ui:.2f}" | |
temp_video_suffix = temp_video_suffix_base.replace(".","p") + "_temp_video" | |
video_stream_output_path = self._tb_generate_output_path(resolved_video_path, temp_video_suffix, self.toolbox_video_output_dir) | |
final_muxed_output_path = video_stream_output_path.replace("_temp_video", "") | |
self.message_manager.add_message( | |
f"User selected {'Streaming (low memory)' if use_streaming else 'In-Memory (fast)'} mode." | |
) | |
# --- PROCESSING BLOCK --- | |
if use_streaming: | |
# --- STREAMING (LOW MEMORY) PATH --- | |
self.message_manager.add_message("Processing frame-by-frame...") | |
n_frames = self._tb_get_video_frame_count(resolved_video_path) | |
if n_frames is None: | |
self.message_manager.add_error("Cannot use streaming mode because the total number of frames could not be determined. Aborting.") | |
return None | |
writer = imageio.get_writer(video_stream_output_path, fps=original_fps, quality=VIDEO_QUALITY, macro_block_size=None) | |
# Use a range-based loop and get_data() instead of iterating the reader directly | |
for i in progress.tqdm(range(n_frames), desc="Upscaling Frames (Streaming)"): | |
frame_np = reader.get_data(i) # Explicitly get frame i | |
upscaled_frame_np = self.esrgan_upscaler.upscale_frame(frame_np, model_key, float(output_scale_factor_ui), enhance_face) | |
if upscaled_frame_np is not None: | |
writer.append_data(upscaled_frame_np) | |
else: | |
self.message_manager.add_error(f"Failed to upscale frame {i}. Skipping.") | |
if "out of memory" in self.message_manager.get_recent_errors_as_str(count=1).lower(): | |
self.message_manager.add_error("CUDA OOM. Aborting video upscale."); return None | |
# We can be more aggressive with GC in streaming mode | |
if (i + 1) % 10 == 0: | |
gc.collect() | |
writer.close() | |
writer = None | |
else: | |
# --- IN-MEMORY (FAST) PATH --- | |
self.message_manager.add_message("Processing all frames in memory...") | |
all_frames = [frame for frame in progress.tqdm(reader, desc="Reading all frames")] | |
upscaled_frames = [] | |
frame_iterator = progress.tqdm(all_frames, desc="Upscaling Frames (In-Memory)") | |
for frame_np in frame_iterator: | |
upscaled_frame_np = self.esrgan_upscaler.upscale_frame(frame_np, model_key, float(output_scale_factor_ui), enhance_face) | |
if upscaled_frame_np is not None: | |
upscaled_frames.append(upscaled_frame_np) | |
else: | |
if "out of memory" in self.message_manager.get_recent_errors_as_str(count=1).lower(): | |
self.message_manager.add_error("CUDA OOM. Aborting video upscale."); return None | |
self.message_manager.add_message("Writing upscaled video file...") | |
imageio.mimwrite(video_stream_output_path, upscaled_frames, fps=original_fps, quality=VIDEO_QUALITY, macro_block_size=None) | |
# --- Teardown and Audio Muxing --- | |
reader.close() | |
reader = None | |
self.message_manager.add_message(f"Upscaled video stream saved to: {video_stream_output_path}") | |
progress(0.85, desc="Upscaled video stream saved.") | |
final_output_path = final_muxed_output_path | |
can_process_audio = self.has_ffmpeg | |
original_video_has_audio = self._tb_has_audio_stream(resolved_video_path) if can_process_audio else False | |
if can_process_audio and original_video_has_audio: | |
progress(0.90, desc="Muxing audio...") | |
self.message_manager.add_message("Original video has audio. Muxing audio with FFmpeg...") | |
ffmpeg_mux_cmd = [ | |
self.ffmpeg_exe, "-y", "-loglevel", "error", | |
"-i", video_stream_output_path, "-i", resolved_video_path, | |
"-c:v", "copy", "-c:a", "aac", "-b:a", "192k", | |
"-map", "0:v:0", "-map", "1:a:0?", "-shortest", final_muxed_output_path | |
] | |
try: | |
subprocess.run(ffmpeg_mux_cmd, check=True, capture_output=True, text=True) | |
self.message_manager.add_success(f"Upscaled video saved with audio: {final_muxed_output_path}") | |
except subprocess.CalledProcessError as e_mux: | |
self._tb_log_ffmpeg_error(e_mux, "audio muxing for upscaled video") | |
if os.path.exists(final_muxed_output_path): os.remove(final_muxed_output_path) | |
os.rename(video_stream_output_path, final_muxed_output_path) | |
else: | |
if original_video_has_audio and not can_process_audio: | |
self.message_manager.add_warning("Original video has audio, but FFmpeg is not available to process it. Upscaled output will be silent.") | |
if os.path.exists(final_muxed_output_path) and final_muxed_output_path != video_stream_output_path: | |
os.remove(final_muxed_output_path) | |
os.rename(video_stream_output_path, final_muxed_output_path) | |
progress(1.0, desc="Upscaling complete.") | |
self.message_manager.add_success(f"Video upscaling complete: {final_output_path}") | |
return final_output_path | |
except Exception as e: | |
self.message_manager.add_error(f"Error during video upscaling: {e}") | |
import traceback; self.message_manager.add_error(traceback.format_exc()) | |
progress(1.0, desc="Error during upscaling."); return None | |
finally: | |
if reader and not reader.closed: reader.close() | |
if writer and not writer.closed: writer.close() | |
if video_stream_output_path and os.path.exists(video_stream_output_path) and final_output_path and video_stream_output_path != final_output_path: | |
try: os.remove(video_stream_output_path) | |
except Exception as e_clean: self.message_manager.add_warning(f"Could not remove temp upscaled video: {e_clean}") | |
if self.esrgan_upscaler: | |
self.esrgan_upscaler.unload_model(model_key) | |
if enhance_face: | |
self.esrgan_upscaler._unload_face_enhancer() | |
devicetorch.empty_cache(torch); gc.collect() | |
def tb_open_output_folder(self): | |
folder_path = os.path.abspath(self.postprocessed_output_root_dir) | |
try: | |
os.makedirs(folder_path, exist_ok=True) | |
if sys.platform == 'win32': subprocess.run(['explorer', folder_path]) | |
elif sys.platform == 'darwin': subprocess.run(['open', folder_path]) | |
else: subprocess.run(['xdg-open', folder_path]) | |
self.message_manager.add_success(f"Opened postprocessed output folder: {folder_path}") | |
except Exception as e: | |
self.message_manager.add_error(f"Error opening folder {folder_path}: {e}") | |
def _tb_clean_directory(self, dir_path, dir_description): | |
""" | |
Helper to clean a single temp directory and return a single, formatted status line. | |
""" | |
LABEL_WIDTH = 32 # Width for the description label for alignment | |
status_icon = "✅" | |
status_text = "" | |
# Make path relative for cleaner logging | |
try: | |
display_path = os.path.relpath(dir_path, self.project_root) if dir_path else "N/A" | |
except (ValueError, TypeError): | |
display_path = str(dir_path) # Fallback if path is weird | |
if not dir_path or not os.path.exists(dir_path): | |
status_icon = "ℹ️" | |
status_text = "Path not found or not set." | |
return f"[{status_icon}] {dir_description:<{LABEL_WIDTH}} : {status_text}" | |
try: | |
items = os.listdir(dir_path) | |
file_count = sum(1 for item in items if os.path.isfile(os.path.join(dir_path, item))) | |
dir_count = sum(1 for item in items if os.path.isdir(os.path.join(dir_path, item))) | |
if file_count == 0 and dir_count == 0: | |
status_text = f"Already empty at '{display_path}'" | |
else: | |
shutil.rmtree(dir_path) | |
# --- Dynamic String Building --- | |
summary_parts = [] | |
if file_count > 0: | |
summary_parts.append(f"{file_count} file{'s' if file_count != 1 else ''}") | |
if dir_count > 0: | |
summary_parts.append(f"{dir_count} folder{'s' if dir_count != 1 else ''}") | |
status_text = f"Cleaned ({' and '.join(summary_parts)}) from '{display_path}'" | |
os.makedirs(dir_path, exist_ok=True) | |
except Exception as e: | |
status_icon = "❌" | |
status_text = f"ERROR cleaning '{display_path}': {e}" | |
return f"[{status_icon}] {dir_description:<{LABEL_WIDTH}} : {status_text}" | |
def tb_clear_temporary_files(self): | |
""" | |
Clears all temporary file locations and returns a formatted summary string. | |
""" | |
# 1. Clean Post-processing Temp Folder | |
postproc_temp_dir = self._base_temp_output_dir | |
postproc_summary_line = self._tb_clean_directory(postproc_temp_dir, "Post-processing temp folder") | |
# 2. Clean Gradio Temp Folder | |
gradio_temp_dir = self.settings.get("gradio_temp_dir") | |
gradio_summary_line = self._tb_clean_directory(gradio_temp_dir, "Gradio temp folder") | |
# Join the individual lines into a single string for printing | |
return f"{postproc_summary_line}\n{gradio_summary_line}" | |