""" Original with Endframe pipeline class for FramePack Studio. This pipeline handles the "Original with Endframe" model type. """ import os import time import json import numpy as np from PIL import Image from PIL.PngImagePlugin import PngInfo from diffusers_helper.utils import resize_and_center_crop from diffusers_helper.bucket_tools import find_nearest_bucket from .base_pipeline import BasePipeline class OriginalWithEndframePipeline(BasePipeline): """Pipeline for Original with Endframe generation type.""" def prepare_parameters(self, job_params): """ Prepare parameters for the Original with Endframe generation job. Args: job_params: Dictionary of job parameters Returns: Processed parameters dictionary """ processed_params = job_params.copy() # Ensure we have the correct model type processed_params['model_type'] = "Original with Endframe" return processed_params def validate_parameters(self, job_params): """ Validate parameters for the Original with Endframe generation job. Args: job_params: Dictionary of job parameters Returns: Tuple of (is_valid, error_message) """ # Check for required parameters required_params = ['prompt_text', 'seed', 'total_second_length', 'steps'] for param in required_params: if param not in job_params: return False, f"Missing required parameter: {param}" # Validate numeric parameters if job_params.get('total_second_length', 0) <= 0: return False, "Video length must be greater than 0" if job_params.get('steps', 0) <= 0: return False, "Steps must be greater than 0" # Validate end frame parameters if job_params.get('end_frame_strength', 0) < 0 or job_params.get('end_frame_strength', 0) > 1: return False, "End frame strength must be between 0 and 1" return True, None def preprocess_inputs(self, job_params): """ Preprocess input images for the Original with Endframe generation type. Args: job_params: Dictionary of job parameters Returns: Processed inputs dictionary """ processed_inputs = {} # Process input image if provided input_image = job_params.get('input_image') if input_image is not None: # Get resolution parameters resolutionW = job_params.get('resolutionW', 640) resolutionH = job_params.get('resolutionH', 640) # Find nearest bucket size if job_params.get('has_input_image', True): # If we have an input image, use its dimensions to find the nearest bucket H, W, _ = input_image.shape height, width = find_nearest_bucket(H, W, resolution=resolutionW) else: # Otherwise, use the provided resolution parameters height, width = find_nearest_bucket(resolutionH, resolutionW, (resolutionW+resolutionH)/2) # Resize and center crop the input image input_image_np = resize_and_center_crop(input_image, target_width=width, target_height=height) # Store the processed image and dimensions processed_inputs['input_image'] = input_image_np processed_inputs['height'] = height processed_inputs['width'] = width else: # If no input image, create a blank image based on latent_type resolutionW = job_params.get('resolutionW', 640) resolutionH = job_params.get('resolutionH', 640) height, width = find_nearest_bucket(resolutionH, resolutionW, (resolutionW+resolutionH)/2) latent_type = job_params.get('latent_type', 'Black') if latent_type == "White": # Create a white image input_image_np = np.ones((height, width, 3), dtype=np.uint8) * 255 elif latent_type == "Noise": # Create a noise image input_image_np = np.random.randint(0, 256, (height, width, 3), dtype=np.uint8) elif latent_type == "Green Screen": # Create a green screen image with standard chroma key green (0, 177, 64) input_image_np = np.zeros((height, width, 3), dtype=np.uint8) input_image_np[:, :, 1] = 177 # Green channel input_image_np[:, :, 2] = 64 # Blue channel # Red channel remains 0 else: # Default to "Black" or any other value # Create a black image input_image_np = np.zeros((height, width, 3), dtype=np.uint8) # Store the processed image and dimensions processed_inputs['input_image'] = input_image_np processed_inputs['height'] = height processed_inputs['width'] = width # Process end frame image if provided end_frame_image = job_params.get('end_frame_image') if end_frame_image is not None: # Use the same dimensions as the input image height = processed_inputs['height'] width = processed_inputs['width'] # Resize and center crop the end frame image end_frame_np = resize_and_center_crop(end_frame_image, target_width=width, target_height=height) # Store the processed end frame image processed_inputs['end_frame_image'] = end_frame_np return processed_inputs def handle_results(self, job_params, result): """ Handle the results of the Original with Endframe generation. Args: job_params: The job parameters result: The generation result Returns: Processed result """ # For Original with Endframe generation, we just return the result as-is return result # Using the centralized create_metadata method from BasePipeline