|
""" |
|
Video pipeline class for FramePack Studio. |
|
This pipeline handles the "Video" model type. |
|
""" |
|
|
|
import os |
|
import time |
|
import json |
|
import numpy as np |
|
from PIL import Image |
|
from PIL.PngImagePlugin import PngInfo |
|
from diffusers_helper.utils import resize_and_center_crop |
|
from diffusers_helper.bucket_tools import find_nearest_bucket |
|
from .base_pipeline import BasePipeline |
|
|
|
class VideoPipeline(BasePipeline): |
|
"""Pipeline for Video generation type.""" |
|
|
|
def prepare_parameters(self, job_params): |
|
""" |
|
Prepare parameters for the Video generation job. |
|
|
|
Args: |
|
job_params: Dictionary of job parameters |
|
|
|
Returns: |
|
Processed parameters dictionary |
|
""" |
|
processed_params = job_params.copy() |
|
|
|
|
|
processed_params['model_type'] = "Video" |
|
|
|
return processed_params |
|
|
|
def validate_parameters(self, job_params): |
|
""" |
|
Validate parameters for the Video generation job. |
|
|
|
Args: |
|
job_params: Dictionary of job parameters |
|
|
|
Returns: |
|
Tuple of (is_valid, error_message) |
|
""" |
|
|
|
required_params = ['prompt_text', 'seed', 'total_second_length', 'steps'] |
|
for param in required_params: |
|
if param not in job_params: |
|
return False, f"Missing required parameter: {param}" |
|
|
|
|
|
if job_params.get('total_second_length', 0) <= 0: |
|
return False, "Video length must be greater than 0" |
|
|
|
if job_params.get('steps', 0) <= 0: |
|
return False, "Steps must be greater than 0" |
|
|
|
|
|
if not job_params.get('input_image'): |
|
return False, "Input video is required for Video model" |
|
|
|
|
|
combine_with_source = job_params.get('combine_with_source') |
|
if combine_with_source is not None and not isinstance(combine_with_source, bool): |
|
return False, "combine_with_source must be a boolean value" |
|
|
|
return True, None |
|
|
|
def preprocess_inputs(self, job_params): |
|
""" |
|
Preprocess input video for the Video generation type. |
|
|
|
Args: |
|
job_params: Dictionary of job parameters |
|
|
|
Returns: |
|
Processed inputs dictionary |
|
""" |
|
processed_inputs = {} |
|
|
|
|
|
input_video = job_params.get('input_image') |
|
if not input_video: |
|
raise ValueError("Input video is required for Video model") |
|
|
|
|
|
processed_inputs['input_video'] = input_video |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
processed_inputs['input_files_dir'] = job_params.get('input_files_dir') |
|
|
|
|
|
if 'combine_with_source' in job_params: |
|
processed_inputs['combine_with_source'] = job_params.get('combine_with_source') |
|
print(f"Video pipeline: combine_with_source = {processed_inputs['combine_with_source']}") |
|
|
|
|
|
if 'num_cleaned_frames' in job_params: |
|
processed_inputs['num_cleaned_frames'] = job_params.get('num_cleaned_frames') |
|
print(f"Video pipeline: num_cleaned_frames = {processed_inputs['num_cleaned_frames']}") |
|
|
|
|
|
resolutionW = job_params.get('resolutionW', 640) |
|
resolutionH = job_params.get('resolutionH', 640) |
|
|
|
|
|
height, width = find_nearest_bucket(resolutionH, resolutionW, (resolutionW+resolutionH)/2) |
|
|
|
|
|
processed_inputs['height'] = height |
|
processed_inputs['width'] = width |
|
|
|
return processed_inputs |
|
|
|
def handle_results(self, job_params, result): |
|
""" |
|
Handle the results of the Video generation. |
|
|
|
Args: |
|
job_params: The job parameters |
|
result: The generation result |
|
|
|
Returns: |
|
Processed result |
|
""" |
|
|
|
return result |
|
|
|
|
|
|