File size: 6,335 Bytes
84669a3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
"""
Original with Endframe pipeline class for FramePack Studio.
This pipeline handles the "Original with Endframe" model type.
"""

import os
import time
import json
import numpy as np
from PIL import Image
from PIL.PngImagePlugin import PngInfo
from diffusers_helper.utils import resize_and_center_crop
from diffusers_helper.bucket_tools import find_nearest_bucket
from .base_pipeline import BasePipeline

class OriginalWithEndframePipeline(BasePipeline):
    """Pipeline for Original with Endframe generation type."""
    
    def prepare_parameters(self, job_params):
        """
        Prepare parameters for the Original with Endframe generation job.
        
        Args:
            job_params: Dictionary of job parameters
            
        Returns:
            Processed parameters dictionary
        """
        processed_params = job_params.copy()
        
        # Ensure we have the correct model type
        processed_params['model_type'] = "Original with Endframe"
        
        return processed_params
    
    def validate_parameters(self, job_params):
        """
        Validate parameters for the Original with Endframe generation job.
        
        Args:
            job_params: Dictionary of job parameters
            
        Returns:
            Tuple of (is_valid, error_message)
        """
        # Check for required parameters
        required_params = ['prompt_text', 'seed', 'total_second_length', 'steps']
        for param in required_params:
            if param not in job_params:
                return False, f"Missing required parameter: {param}"
        
        # Validate numeric parameters
        if job_params.get('total_second_length', 0) <= 0:
            return False, "Video length must be greater than 0"
        
        if job_params.get('steps', 0) <= 0:
            return False, "Steps must be greater than 0"
        
        # Validate end frame parameters
        if job_params.get('end_frame_strength', 0) < 0 or job_params.get('end_frame_strength', 0) > 1:
            return False, "End frame strength must be between 0 and 1"
        
        return True, None
    
    def preprocess_inputs(self, job_params):
        """
        Preprocess input images for the Original with Endframe generation type.
        
        Args:
            job_params: Dictionary of job parameters
            
        Returns:
            Processed inputs dictionary
        """
        processed_inputs = {}
        
        # Process input image if provided
        input_image = job_params.get('input_image')
        if input_image is not None:
            # Get resolution parameters
            resolutionW = job_params.get('resolutionW', 640)
            resolutionH = job_params.get('resolutionH', 640)
            
            # Find nearest bucket size
            if job_params.get('has_input_image', True):
                # If we have an input image, use its dimensions to find the nearest bucket
                H, W, _ = input_image.shape
                height, width = find_nearest_bucket(H, W, resolution=resolutionW)
            else:
                # Otherwise, use the provided resolution parameters
                height, width = find_nearest_bucket(resolutionH, resolutionW, (resolutionW+resolutionH)/2)
            
            # Resize and center crop the input image
            input_image_np = resize_and_center_crop(input_image, target_width=width, target_height=height)
            
            # Store the processed image and dimensions
            processed_inputs['input_image'] = input_image_np
            processed_inputs['height'] = height
            processed_inputs['width'] = width
        else:
            # If no input image, create a blank image based on latent_type
            resolutionW = job_params.get('resolutionW', 640)
            resolutionH = job_params.get('resolutionH', 640)
            height, width = find_nearest_bucket(resolutionH, resolutionW, (resolutionW+resolutionH)/2)
            
            latent_type = job_params.get('latent_type', 'Black')
            if latent_type == "White":
                # Create a white image
                input_image_np = np.ones((height, width, 3), dtype=np.uint8) * 255
            elif latent_type == "Noise":
                # Create a noise image
                input_image_np = np.random.randint(0, 256, (height, width, 3), dtype=np.uint8)
            elif latent_type == "Green Screen":
                # Create a green screen image with standard chroma key green (0, 177, 64)
                input_image_np = np.zeros((height, width, 3), dtype=np.uint8)
                input_image_np[:, :, 1] = 177  # Green channel
                input_image_np[:, :, 2] = 64   # Blue channel
                # Red channel remains 0
            else:  # Default to "Black" or any other value
                # Create a black image
                input_image_np = np.zeros((height, width, 3), dtype=np.uint8)
            
            # Store the processed image and dimensions
            processed_inputs['input_image'] = input_image_np
            processed_inputs['height'] = height
            processed_inputs['width'] = width
        
        # Process end frame image if provided
        end_frame_image = job_params.get('end_frame_image')
        if end_frame_image is not None:
            # Use the same dimensions as the input image
            height = processed_inputs['height']
            width = processed_inputs['width']
            
            # Resize and center crop the end frame image
            end_frame_np = resize_and_center_crop(end_frame_image, target_width=width, target_height=height)
            
            # Store the processed end frame image
            processed_inputs['end_frame_image'] = end_frame_np
        
        return processed_inputs
    
    def handle_results(self, job_params, result):
        """
        Handle the results of the Original with Endframe generation.
        
        Args:
            job_params: The job parameters
            result: The generation result
            
        Returns:
            Processed result
        """
        # For Original with Endframe generation, we just return the result as-is
        return result
    
    # Using the centralized create_metadata method from BasePipeline