FPS-Studio / modules /pipelines /video_pipeline.py
rahul7star's picture
Migrated from GitHub
05fcd0f verified
"""
Video pipeline class for FramePack Studio.
This pipeline handles the "Video" model type.
"""
import os
import time
import json
import numpy as np
from PIL import Image
from PIL.PngImagePlugin import PngInfo
from diffusers_helper.utils import resize_and_center_crop
from diffusers_helper.bucket_tools import find_nearest_bucket
from .base_pipeline import BasePipeline
class VideoPipeline(BasePipeline):
"""Pipeline for Video generation type."""
def prepare_parameters(self, job_params):
"""
Prepare parameters for the Video generation job.
Args:
job_params: Dictionary of job parameters
Returns:
Processed parameters dictionary
"""
processed_params = job_params.copy()
# Ensure we have the correct model type
processed_params['model_type'] = "Video"
return processed_params
def validate_parameters(self, job_params):
"""
Validate parameters for the Video generation job.
Args:
job_params: Dictionary of job parameters
Returns:
Tuple of (is_valid, error_message)
"""
# Check for required parameters
required_params = ['prompt_text', 'seed', 'total_second_length', 'steps']
for param in required_params:
if param not in job_params:
return False, f"Missing required parameter: {param}"
# Validate numeric parameters
if job_params.get('total_second_length', 0) <= 0:
return False, "Video length must be greater than 0"
if job_params.get('steps', 0) <= 0:
return False, "Steps must be greater than 0"
# Check for input video (stored in input_image for Video model)
if not job_params.get('input_image'):
return False, "Input video is required for Video model"
# Check if combine_with_source is provided (optional)
combine_with_source = job_params.get('combine_with_source')
if combine_with_source is not None and not isinstance(combine_with_source, bool):
return False, "combine_with_source must be a boolean value"
return True, None
def preprocess_inputs(self, job_params):
"""
Preprocess input video for the Video generation type.
Args:
job_params: Dictionary of job parameters
Returns:
Processed inputs dictionary
"""
processed_inputs = {}
# Get the input video (stored in input_image for Video model)
input_video = job_params.get('input_image')
if not input_video:
raise ValueError("Input video is required for Video model")
# Store the input video
processed_inputs['input_video'] = input_video
# Note: The following code will be executed in the worker function:
# 1. The worker will call video_encode on the generator to get video_latents and input_video_pixels
# 2. Then it will store these values for later use:
# input_video_pixels = input_video_pixels.cpu()
# video_latents = video_latents.cpu()
#
# 3. If the generator has the set_full_video_latents method, it will store the video latents:
# if hasattr(current_generator, 'set_full_video_latents'):
# current_generator.set_full_video_latents(video_latents.clone())
# print(f"Stored full input video latents in VideoModelGenerator. Shape: {video_latents.shape}")
#
# 4. For the Video model, history_latents is initialized with the video_latents:
# history_latents = video_latents
# print(f"Initialized history_latents with video context. Shape: {history_latents.shape}")
processed_inputs['input_files_dir'] = job_params.get('input_files_dir')
# Pass through the combine_with_source parameter if it exists
if 'combine_with_source' in job_params:
processed_inputs['combine_with_source'] = job_params.get('combine_with_source')
print(f"Video pipeline: combine_with_source = {processed_inputs['combine_with_source']}")
# Pass through the num_cleaned_frames parameter if it exists
if 'num_cleaned_frames' in job_params:
processed_inputs['num_cleaned_frames'] = job_params.get('num_cleaned_frames')
print(f"Video pipeline: num_cleaned_frames = {processed_inputs['num_cleaned_frames']}")
# Get resolution parameters
resolutionW = job_params.get('resolutionW', 640)
resolutionH = job_params.get('resolutionH', 640)
# Find nearest bucket size
height, width = find_nearest_bucket(resolutionH, resolutionW, (resolutionW+resolutionH)/2)
# Store the dimensions
processed_inputs['height'] = height
processed_inputs['width'] = width
return processed_inputs
def handle_results(self, job_params, result):
"""
Handle the results of the Video generation.
Args:
job_params: The job parameters
result: The generation result
Returns:
Processed result
"""
# For Video generation, we just return the result as-is
return result
# Using the centralized create_metadata method from BasePipeline