File size: 14,455 Bytes
05fcd0f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
"""
Metadata utilities for FramePack Studio.
This module provides functions for generating and saving metadata.
"""

import os
import json
import time
import traceback # Moved to top
import numpy as np # Added
from PIL import Image, ImageDraw, ImageFont
from PIL.PngImagePlugin import PngInfo

from modules.version import APP_VERSION

def get_placeholder_color(model_type):
    """
    Get the placeholder image color for a specific model type.
    
    Args:
        model_type: The model type string
        
    Returns:
        RGB tuple for the placeholder image color
    """
    # Define color mapping for different model types
    color_map = {
        "Original": (0, 0, 0),           # Black
        "F1": (0, 0, 128),               # Blue
        "Video": (0, 128, 0),            # Green
        "XY Plot": (128, 128, 0),        # Yellow
        "F1 with Endframe": (0, 128, 128),  # Teal
        "Original with Endframe": (128, 0, 128),  # Purple
    }
    
    # Return the color for the model type, or black as default
    return color_map.get(model_type, (0, 0, 0))

# Function to save the starting image with comprehensive metadata
def save_job_start_image(job_params, job_id, settings):
    """
    Saves the job's starting input image to the output directory with comprehensive metadata.
    This is intended to be called early in the job processing and is the ONLY place metadata should be saved.
    """
    # Get output directory from settings or job_params
    output_dir_path = job_params.get("output_dir") or settings.get("output_dir")
    metadata_dir_path = job_params.get("metadata_dir") or settings.get("metadata_dir")
    
    if not output_dir_path:
        print(f"[JOB_START_IMG_ERROR] No output directory found in job_params or settings")
        return False
        
    # Ensure directories exist
    os.makedirs(output_dir_path, exist_ok=True)
    os.makedirs(metadata_dir_path, exist_ok=True)

    actual_start_image_target_path = os.path.join(output_dir_path, f'{job_id}.png')
    actual_input_image_np = job_params.get('input_image')

    # Create comprehensive metadata dictionary
    metadata_dict = create_metadata(job_params, job_id, settings)
    
    # Save JSON metadata with the same job_id
    json_metadata_path = os.path.join(metadata_dir_path, f'{job_id}.json')
 
    try:
        with open(json_metadata_path, 'w') as f:
            import json
            json.dump(metadata_dict, f, indent=2)
    except Exception as e:
        traceback.print_exc()

    # Save the input image if it's a numpy array
    if actual_input_image_np is not None and isinstance(actual_input_image_np, np.ndarray):
        try:
            # Create PNG metadata
            png_metadata = PngInfo()
            png_metadata.add_text("prompt", job_params.get('prompt_text', ''))
            png_metadata.add_text("seed", str(job_params.get('seed', 0)))
            png_metadata.add_text("model_type", job_params.get('model_type', "Unknown"))
            
            # Add more metadata fields
            for key, value in metadata_dict.items():
                if isinstance(value, (str, int, float, bool)) or value is None:
                    png_metadata.add_text(key, str(value))

            # Convert image if needed
            image_to_save_np = actual_input_image_np
            if actual_input_image_np.dtype != np.uint8:
                if actual_input_image_np.max() <= 1.0 and actual_input_image_np.min() >= -1.0 and actual_input_image_np.dtype in [np.float32, np.float64]:
                     image_to_save_np = ((actual_input_image_np + 1.0) / 2.0 * 255.0).clip(0, 255).astype(np.uint8)
                elif actual_input_image_np.max() <= 1.0 and actual_input_image_np.min() >= 0.0 and actual_input_image_np.dtype in [np.float32, np.float64]:
                     image_to_save_np = (actual_input_image_np * 255.0).clip(0,255).astype(np.uint8)
                else:
                     image_to_save_np = actual_input_image_np.clip(0, 255).astype(np.uint8)
            # Save the image with metadata
            start_image_pil = Image.fromarray(image_to_save_np)
            start_image_pil.save(actual_start_image_target_path, pnginfo=png_metadata)
            return True # Indicate success
        except Exception as e:
            traceback.print_exc()
    return False # Indicate failure or inability to save

def create_metadata(job_params, job_id, settings, save_placeholder=False):
    """
    Create metadata for the job.
    
    Args:
        job_params: Dictionary of job parameters
        job_id: The job ID
        settings: Dictionary of settings
        save_placeholder: Whether to save the placeholder image (default: False)
        
    Returns:
        Metadata dictionary
    """
    if not settings.get("save_metadata"):
        return None
    
    metadata_dir_path = settings.get("metadata_dir")
    output_dir_path = settings.get("output_dir")
    os.makedirs(metadata_dir_path, exist_ok=True)
    os.makedirs(output_dir_path, exist_ok=True) # Ensure output_dir also exists
    
    # Get model type and determine placeholder image color
    model_type = job_params.get('model_type', "Original")
    placeholder_color = get_placeholder_color(model_type)
    
    # Create a placeholder image
    height = job_params.get('height', 640)
    width = job_params.get('width', 640)
    
    # Use resolutionH and resolutionW if height and width are not available
    if not height:
        height = job_params.get('resolutionH', 640)
    if not width:
        width = job_params.get('resolutionW', 640)
        
    placeholder_img = Image.new('RGB', (width, height), placeholder_color)
    
    # Add XY plot parameters to the image if applicable
    if model_type == "XY Plot":
        x_param = job_params.get('x_param', '')
        y_param = job_params.get('y_param', '')
        x_values = job_params.get('x_values', [])
        y_values = job_params.get('y_values', [])
        
        draw = ImageDraw.Draw(placeholder_img)
        try:
            # Try to use a system font
            font = ImageFont.truetype("Arial", 20)
        except:
            # Fall back to default font
            font = ImageFont.load_default()
        
        text = f"X: {x_param} - {x_values}\nY: {y_param} - {y_values}"
        draw.text((10, 10), text, fill=(255, 255, 255), font=font)
    
    # Create PNG metadata
    metadata = PngInfo()
    metadata.add_text("prompt", job_params.get('prompt_text', ''))
    metadata.add_text("seed", str(job_params.get('seed', 0)))
    
    # Add model-specific metadata to PNG
    if model_type == "XY Plot":
        metadata.add_text("x_param", job_params.get('x_param', ''))
        metadata.add_text("y_param", job_params.get('y_param', ''))
    
    # Determine end_frame_used value safely (avoiding NumPy array boolean ambiguity)
    end_frame_image = job_params.get('end_frame_image')
    end_frame_used = False
    if end_frame_image is not None:
        if isinstance(end_frame_image, np.ndarray):
            end_frame_used = end_frame_image.any()  # True if any element is non-zero
        else:
            end_frame_used = True
    
    # Create comprehensive JSON metadata with all possible parameters
    # This is created before file saving logic that might use it (e.g. JSON dump)
    # but PngInfo 'metadata' is used for images.
    metadata_dict = {
        # Version information
        "app_version": APP_VERSION,  # Using numeric version without 'v' prefix for metadata
        
        # Common parameters
        "prompt": job_params.get('prompt_text', ''),
        "negative_prompt": job_params.get('n_prompt', ''),
        "seed": job_params.get('seed', 0),
        "steps": job_params.get('steps', 25),
        "cfg": job_params.get('cfg', 1.0),
        "gs": job_params.get('gs', 10.0),
        "rs": job_params.get('rs', 0.0),
        "latent_type": job_params.get('latent_type', 'Black'),
        "timestamp": time.time(),
        "resolutionW": job_params.get('resolutionW', 640),
        "resolutionH": job_params.get('resolutionH', 640),
        "model_type": model_type,
        "generation_type": job_params.get('generation_type', model_type),
        "has_input_image": job_params.get('has_input_image', False),
        "input_image_path": job_params.get('input_image_path', None),
        
        # Video-related parameters
        "total_second_length": job_params.get('total_second_length', 6),
        "blend_sections": job_params.get('blend_sections', 4),
        "latent_window_size": job_params.get('latent_window_size', 9),
        "num_cleaned_frames": job_params.get('num_cleaned_frames', 5),
        
        # Endframe-related parameters
        "end_frame_strength": job_params.get('end_frame_strength', None),
        "end_frame_image_path": job_params.get('end_frame_image_path', None),
        "end_frame_used": str(end_frame_used),
        
        # Video input-related parameters
        "input_video": os.path.basename(job_params.get('input_image', '')) if job_params.get('input_image') is not None and model_type == "Video" else None,
        "video_path": job_params.get('input_image') if model_type == "Video" else None,
        
        # XY Plot-related parameters
        "x_param": job_params.get('x_param', None),
        "y_param": job_params.get('y_param', None),
        "x_values": job_params.get('x_values', None),
        "y_values": job_params.get('y_values', None),

        # Combine with source video
        "combine_with_source": job_params.get('combine_with_source', False),
        
        # Tea cache parameters
        "use_teacache": job_params.get('use_teacache', False),
        "teacache_num_steps": job_params.get('teacache_num_steps', 0),
        "teacache_rel_l1_thresh": job_params.get('teacache_rel_l1_thresh', 0.0),
        # MagCache parameters
        "use_magcache": job_params.get('use_magcache', False),
        "magcache_threshold": job_params.get('magcache_threshold', 0.1),
        "magcache_max_consecutive_skips": job_params.get('magcache_max_consecutive_skips', 2),
        "magcache_retention_ratio": job_params.get('magcache_retention_ratio', 0.25),
    }
    
    # Add LoRA information if present
    selected_loras = job_params.get('selected_loras', [])
    lora_values = job_params.get('lora_values', [])
    lora_loaded_names = job_params.get('lora_loaded_names', [])
    
    if isinstance(selected_loras, list) and len(selected_loras) > 0:
        lora_data = {}
        for lora_name in selected_loras:
            try:
                idx = lora_loaded_names.index(lora_name)
                # Fix for NumPy array boolean ambiguity
                has_lora_values = lora_values is not None and len(lora_values) > 0
                weight = lora_values[idx] if has_lora_values and idx < len(lora_values) else 1.0
                
                # Handle different types of weight values
                if isinstance(weight, np.ndarray):
                    # Convert NumPy array to a scalar value
                    weight_value = float(weight.item()) if weight.size == 1 else float(weight.mean())
                elif isinstance(weight, list):
                    # Handle list type weights
                    has_items = weight is not None and len(weight) > 0
                    weight_value = float(weight[0]) if has_items else 1.0
                else:
                    # Handle scalar weights
                    weight_value = float(weight) if weight is not None else 1.0
                
                lora_data[lora_name] = weight_value
            except ValueError:
                lora_data[lora_name] = 1.0
            except Exception as e:
                lora_data[lora_name] = 1.0
                traceback.print_exc()
        
        metadata_dict["loras"] = lora_data
    else:
        metadata_dict["loras"] = {}

    # This function now only creates the metadata dictionary without saving files
    # The actual saving is done by save_job_start_image() at the beginning of the generation process
    # This prevents duplicate metadata files from being created
    
    # For backward compatibility, we still create the placeholder image
    # and save it if explicitly requested
    placeholder_target_path = os.path.join(metadata_dir_path, f'{job_id}.png')
    
    # Save the placeholder image if requested
    if save_placeholder:
        try:
            placeholder_img.save(placeholder_target_path, pnginfo=metadata)
        except Exception as e:
            traceback.print_exc()
        
    return metadata_dict

def save_last_video_frame(job_params, job_id, settings, last_frame_np):
    """
    Saves the last frame of the input video to the output directory with metadata.
    """
    output_dir_path = job_params.get("output_dir") or settings.get("output_dir")
    
    if not output_dir_path:
        print(f"[SAVE_LAST_FRAME_ERROR] No output directory found.")
        return False
        
    os.makedirs(output_dir_path, exist_ok=True)

    last_frame_path = os.path.join(output_dir_path, f'{job_id}.png')

    metadata_dict = create_metadata(job_params, job_id, settings)
    
    if last_frame_np is not None and isinstance(last_frame_np, np.ndarray):
        try:
            png_metadata = PngInfo()
            for key, value in metadata_dict.items():
                if isinstance(value, (str, int, float, bool)) or value is None:
                    png_metadata.add_text(key, str(value))

            image_to_save_np = last_frame_np
            if last_frame_np.dtype != np.uint8:
                if last_frame_np.max() <= 1.0 and last_frame_np.min() >= -1.0 and last_frame_np.dtype in [np.float32, np.float64]:
                     image_to_save_np = ((last_frame_np + 1.0) / 2.0 * 255.0).clip(0, 255).astype(np.uint8)
                elif last_frame_np.max() <= 1.0 and last_frame_np.min() >= 0.0 and last_frame_np.dtype in [np.float32, np.float64]:
                     image_to_save_np = (last_frame_np * 255.0).clip(0,255).astype(np.uint8)
                else:
                     image_to_save_np = last_frame_np.clip(0, 255).astype(np.uint8)

            last_frame_pil = Image.fromarray(image_to_save_np)
            last_frame_pil.save(last_frame_path, pnginfo=png_metadata)
            print(f"Saved last video frame for job {job_id} to {last_frame_path}")
            return True
        except Exception as e:
            traceback.print_exc()
    return False