File size: 5,289 Bytes
84669a3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
"""
Original pipeline class for FramePack Studio.
This pipeline handles the "Original" model type.
"""

import os
import time
import json
import numpy as np
from PIL import Image
from PIL.PngImagePlugin import PngInfo
from diffusers_helper.utils import resize_and_center_crop
from diffusers_helper.bucket_tools import find_nearest_bucket
from .base_pipeline import BasePipeline

class OriginalPipeline(BasePipeline):
    """Pipeline for Original generation type."""
    
    def prepare_parameters(self, job_params):
        """
        Prepare parameters for the Original generation job.
        
        Args:
            job_params: Dictionary of job parameters
            
        Returns:
            Processed parameters dictionary
        """
        processed_params = job_params.copy()
        
        # Ensure we have the correct model type
        processed_params['model_type'] = "Original"
        
        return processed_params
    
    def validate_parameters(self, job_params):
        """
        Validate parameters for the Original generation job.
        
        Args:
            job_params: Dictionary of job parameters
            
        Returns:
            Tuple of (is_valid, error_message)
        """
        # Check for required parameters
        required_params = ['prompt_text', 'seed', 'total_second_length', 'steps']
        for param in required_params:
            if param not in job_params:
                return False, f"Missing required parameter: {param}"
        
        # Validate numeric parameters
        if job_params.get('total_second_length', 0) <= 0:
            return False, "Video length must be greater than 0"
        
        if job_params.get('steps', 0) <= 0:
            return False, "Steps must be greater than 0"
        
        return True, None
    
    def preprocess_inputs(self, job_params):
        """
        Preprocess input images for the Original generation type.
        
        Args:
            job_params: Dictionary of job parameters
            
        Returns:
            Processed inputs dictionary
        """
        processed_inputs = {}
        
        # Process input image if provided
        input_image = job_params.get('input_image')
        if input_image is not None:
            # Get resolution parameters
            resolutionW = job_params.get('resolutionW', 640)
            resolutionH = job_params.get('resolutionH', 640)
            
            # Find nearest bucket size
            if job_params.get('has_input_image', True):
                # If we have an input image, use its dimensions to find the nearest bucket
                H, W, _ = input_image.shape
                height, width = find_nearest_bucket(H, W, resolution=resolutionW)
            else:
                # Otherwise, use the provided resolution parameters
                height, width = find_nearest_bucket(resolutionH, resolutionW, (resolutionW+resolutionH)/2)
            
            # Resize and center crop the input image
            input_image_np = resize_and_center_crop(input_image, target_width=width, target_height=height)
            
            # Store the processed image and dimensions
            processed_inputs['input_image'] = input_image_np
            processed_inputs['height'] = height
            processed_inputs['width'] = width
        else:
            # If no input image, create a blank image based on latent_type
            resolutionW = job_params.get('resolutionW', 640)
            resolutionH = job_params.get('resolutionH', 640)
            height, width = find_nearest_bucket(resolutionH, resolutionW, (resolutionW+resolutionH)/2)
            
            latent_type = job_params.get('latent_type', 'Black')
            if latent_type == "White":
                # Create a white image
                input_image_np = np.ones((height, width, 3), dtype=np.uint8) * 255
            elif latent_type == "Noise":
                # Create a noise image
                input_image_np = np.random.randint(0, 256, (height, width, 3), dtype=np.uint8)
            elif latent_type == "Green Screen":
                # Create a green screen image with standard chroma key green (0, 177, 64)
                input_image_np = np.zeros((height, width, 3), dtype=np.uint8)
                input_image_np[:, :, 1] = 177  # Green channel
                input_image_np[:, :, 2] = 64   # Blue channel
                # Red channel remains 0
            else:  # Default to "Black" or any other value
                # Create a black image
                input_image_np = np.zeros((height, width, 3), dtype=np.uint8)
            
            # Store the processed image and dimensions
            processed_inputs['input_image'] = input_image_np
            processed_inputs['height'] = height
            processed_inputs['width'] = width
        
        return processed_inputs
    
    def handle_results(self, job_params, result):
        """
        Handle the results of the Original generation.
        
        Args:
            job_params: The job parameters
            result: The generation result
            
        Returns:
            Processed result
        """
        # For Original generation, we just return the result as-is
        return result