Spaces:
Paused
Paused
File size: 14,455 Bytes
05fcd0f |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 |
"""
Metadata utilities for FramePack Studio.
This module provides functions for generating and saving metadata.
"""
import os
import json
import time
import traceback # Moved to top
import numpy as np # Added
from PIL import Image, ImageDraw, ImageFont
from PIL.PngImagePlugin import PngInfo
from modules.version import APP_VERSION
def get_placeholder_color(model_type):
"""
Get the placeholder image color for a specific model type.
Args:
model_type: The model type string
Returns:
RGB tuple for the placeholder image color
"""
# Define color mapping for different model types
color_map = {
"Original": (0, 0, 0), # Black
"F1": (0, 0, 128), # Blue
"Video": (0, 128, 0), # Green
"XY Plot": (128, 128, 0), # Yellow
"F1 with Endframe": (0, 128, 128), # Teal
"Original with Endframe": (128, 0, 128), # Purple
}
# Return the color for the model type, or black as default
return color_map.get(model_type, (0, 0, 0))
# Function to save the starting image with comprehensive metadata
def save_job_start_image(job_params, job_id, settings):
"""
Saves the job's starting input image to the output directory with comprehensive metadata.
This is intended to be called early in the job processing and is the ONLY place metadata should be saved.
"""
# Get output directory from settings or job_params
output_dir_path = job_params.get("output_dir") or settings.get("output_dir")
metadata_dir_path = job_params.get("metadata_dir") or settings.get("metadata_dir")
if not output_dir_path:
print(f"[JOB_START_IMG_ERROR] No output directory found in job_params or settings")
return False
# Ensure directories exist
os.makedirs(output_dir_path, exist_ok=True)
os.makedirs(metadata_dir_path, exist_ok=True)
actual_start_image_target_path = os.path.join(output_dir_path, f'{job_id}.png')
actual_input_image_np = job_params.get('input_image')
# Create comprehensive metadata dictionary
metadata_dict = create_metadata(job_params, job_id, settings)
# Save JSON metadata with the same job_id
json_metadata_path = os.path.join(metadata_dir_path, f'{job_id}.json')
try:
with open(json_metadata_path, 'w') as f:
import json
json.dump(metadata_dict, f, indent=2)
except Exception as e:
traceback.print_exc()
# Save the input image if it's a numpy array
if actual_input_image_np is not None and isinstance(actual_input_image_np, np.ndarray):
try:
# Create PNG metadata
png_metadata = PngInfo()
png_metadata.add_text("prompt", job_params.get('prompt_text', ''))
png_metadata.add_text("seed", str(job_params.get('seed', 0)))
png_metadata.add_text("model_type", job_params.get('model_type', "Unknown"))
# Add more metadata fields
for key, value in metadata_dict.items():
if isinstance(value, (str, int, float, bool)) or value is None:
png_metadata.add_text(key, str(value))
# Convert image if needed
image_to_save_np = actual_input_image_np
if actual_input_image_np.dtype != np.uint8:
if actual_input_image_np.max() <= 1.0 and actual_input_image_np.min() >= -1.0 and actual_input_image_np.dtype in [np.float32, np.float64]:
image_to_save_np = ((actual_input_image_np + 1.0) / 2.0 * 255.0).clip(0, 255).astype(np.uint8)
elif actual_input_image_np.max() <= 1.0 and actual_input_image_np.min() >= 0.0 and actual_input_image_np.dtype in [np.float32, np.float64]:
image_to_save_np = (actual_input_image_np * 255.0).clip(0,255).astype(np.uint8)
else:
image_to_save_np = actual_input_image_np.clip(0, 255).astype(np.uint8)
# Save the image with metadata
start_image_pil = Image.fromarray(image_to_save_np)
start_image_pil.save(actual_start_image_target_path, pnginfo=png_metadata)
return True # Indicate success
except Exception as e:
traceback.print_exc()
return False # Indicate failure or inability to save
def create_metadata(job_params, job_id, settings, save_placeholder=False):
"""
Create metadata for the job.
Args:
job_params: Dictionary of job parameters
job_id: The job ID
settings: Dictionary of settings
save_placeholder: Whether to save the placeholder image (default: False)
Returns:
Metadata dictionary
"""
if not settings.get("save_metadata"):
return None
metadata_dir_path = settings.get("metadata_dir")
output_dir_path = settings.get("output_dir")
os.makedirs(metadata_dir_path, exist_ok=True)
os.makedirs(output_dir_path, exist_ok=True) # Ensure output_dir also exists
# Get model type and determine placeholder image color
model_type = job_params.get('model_type', "Original")
placeholder_color = get_placeholder_color(model_type)
# Create a placeholder image
height = job_params.get('height', 640)
width = job_params.get('width', 640)
# Use resolutionH and resolutionW if height and width are not available
if not height:
height = job_params.get('resolutionH', 640)
if not width:
width = job_params.get('resolutionW', 640)
placeholder_img = Image.new('RGB', (width, height), placeholder_color)
# Add XY plot parameters to the image if applicable
if model_type == "XY Plot":
x_param = job_params.get('x_param', '')
y_param = job_params.get('y_param', '')
x_values = job_params.get('x_values', [])
y_values = job_params.get('y_values', [])
draw = ImageDraw.Draw(placeholder_img)
try:
# Try to use a system font
font = ImageFont.truetype("Arial", 20)
except:
# Fall back to default font
font = ImageFont.load_default()
text = f"X: {x_param} - {x_values}\nY: {y_param} - {y_values}"
draw.text((10, 10), text, fill=(255, 255, 255), font=font)
# Create PNG metadata
metadata = PngInfo()
metadata.add_text("prompt", job_params.get('prompt_text', ''))
metadata.add_text("seed", str(job_params.get('seed', 0)))
# Add model-specific metadata to PNG
if model_type == "XY Plot":
metadata.add_text("x_param", job_params.get('x_param', ''))
metadata.add_text("y_param", job_params.get('y_param', ''))
# Determine end_frame_used value safely (avoiding NumPy array boolean ambiguity)
end_frame_image = job_params.get('end_frame_image')
end_frame_used = False
if end_frame_image is not None:
if isinstance(end_frame_image, np.ndarray):
end_frame_used = end_frame_image.any() # True if any element is non-zero
else:
end_frame_used = True
# Create comprehensive JSON metadata with all possible parameters
# This is created before file saving logic that might use it (e.g. JSON dump)
# but PngInfo 'metadata' is used for images.
metadata_dict = {
# Version information
"app_version": APP_VERSION, # Using numeric version without 'v' prefix for metadata
# Common parameters
"prompt": job_params.get('prompt_text', ''),
"negative_prompt": job_params.get('n_prompt', ''),
"seed": job_params.get('seed', 0),
"steps": job_params.get('steps', 25),
"cfg": job_params.get('cfg', 1.0),
"gs": job_params.get('gs', 10.0),
"rs": job_params.get('rs', 0.0),
"latent_type": job_params.get('latent_type', 'Black'),
"timestamp": time.time(),
"resolutionW": job_params.get('resolutionW', 640),
"resolutionH": job_params.get('resolutionH', 640),
"model_type": model_type,
"generation_type": job_params.get('generation_type', model_type),
"has_input_image": job_params.get('has_input_image', False),
"input_image_path": job_params.get('input_image_path', None),
# Video-related parameters
"total_second_length": job_params.get('total_second_length', 6),
"blend_sections": job_params.get('blend_sections', 4),
"latent_window_size": job_params.get('latent_window_size', 9),
"num_cleaned_frames": job_params.get('num_cleaned_frames', 5),
# Endframe-related parameters
"end_frame_strength": job_params.get('end_frame_strength', None),
"end_frame_image_path": job_params.get('end_frame_image_path', None),
"end_frame_used": str(end_frame_used),
# Video input-related parameters
"input_video": os.path.basename(job_params.get('input_image', '')) if job_params.get('input_image') is not None and model_type == "Video" else None,
"video_path": job_params.get('input_image') if model_type == "Video" else None,
# XY Plot-related parameters
"x_param": job_params.get('x_param', None),
"y_param": job_params.get('y_param', None),
"x_values": job_params.get('x_values', None),
"y_values": job_params.get('y_values', None),
# Combine with source video
"combine_with_source": job_params.get('combine_with_source', False),
# Tea cache parameters
"use_teacache": job_params.get('use_teacache', False),
"teacache_num_steps": job_params.get('teacache_num_steps', 0),
"teacache_rel_l1_thresh": job_params.get('teacache_rel_l1_thresh', 0.0),
# MagCache parameters
"use_magcache": job_params.get('use_magcache', False),
"magcache_threshold": job_params.get('magcache_threshold', 0.1),
"magcache_max_consecutive_skips": job_params.get('magcache_max_consecutive_skips', 2),
"magcache_retention_ratio": job_params.get('magcache_retention_ratio', 0.25),
}
# Add LoRA information if present
selected_loras = job_params.get('selected_loras', [])
lora_values = job_params.get('lora_values', [])
lora_loaded_names = job_params.get('lora_loaded_names', [])
if isinstance(selected_loras, list) and len(selected_loras) > 0:
lora_data = {}
for lora_name in selected_loras:
try:
idx = lora_loaded_names.index(lora_name)
# Fix for NumPy array boolean ambiguity
has_lora_values = lora_values is not None and len(lora_values) > 0
weight = lora_values[idx] if has_lora_values and idx < len(lora_values) else 1.0
# Handle different types of weight values
if isinstance(weight, np.ndarray):
# Convert NumPy array to a scalar value
weight_value = float(weight.item()) if weight.size == 1 else float(weight.mean())
elif isinstance(weight, list):
# Handle list type weights
has_items = weight is not None and len(weight) > 0
weight_value = float(weight[0]) if has_items else 1.0
else:
# Handle scalar weights
weight_value = float(weight) if weight is not None else 1.0
lora_data[lora_name] = weight_value
except ValueError:
lora_data[lora_name] = 1.0
except Exception as e:
lora_data[lora_name] = 1.0
traceback.print_exc()
metadata_dict["loras"] = lora_data
else:
metadata_dict["loras"] = {}
# This function now only creates the metadata dictionary without saving files
# The actual saving is done by save_job_start_image() at the beginning of the generation process
# This prevents duplicate metadata files from being created
# For backward compatibility, we still create the placeholder image
# and save it if explicitly requested
placeholder_target_path = os.path.join(metadata_dir_path, f'{job_id}.png')
# Save the placeholder image if requested
if save_placeholder:
try:
placeholder_img.save(placeholder_target_path, pnginfo=metadata)
except Exception as e:
traceback.print_exc()
return metadata_dict
def save_last_video_frame(job_params, job_id, settings, last_frame_np):
"""
Saves the last frame of the input video to the output directory with metadata.
"""
output_dir_path = job_params.get("output_dir") or settings.get("output_dir")
if not output_dir_path:
print(f"[SAVE_LAST_FRAME_ERROR] No output directory found.")
return False
os.makedirs(output_dir_path, exist_ok=True)
last_frame_path = os.path.join(output_dir_path, f'{job_id}.png')
metadata_dict = create_metadata(job_params, job_id, settings)
if last_frame_np is not None and isinstance(last_frame_np, np.ndarray):
try:
png_metadata = PngInfo()
for key, value in metadata_dict.items():
if isinstance(value, (str, int, float, bool)) or value is None:
png_metadata.add_text(key, str(value))
image_to_save_np = last_frame_np
if last_frame_np.dtype != np.uint8:
if last_frame_np.max() <= 1.0 and last_frame_np.min() >= -1.0 and last_frame_np.dtype in [np.float32, np.float64]:
image_to_save_np = ((last_frame_np + 1.0) / 2.0 * 255.0).clip(0, 255).astype(np.uint8)
elif last_frame_np.max() <= 1.0 and last_frame_np.min() >= 0.0 and last_frame_np.dtype in [np.float32, np.float64]:
image_to_save_np = (last_frame_np * 255.0).clip(0,255).astype(np.uint8)
else:
image_to_save_np = last_frame_np.clip(0, 255).astype(np.uint8)
last_frame_pil = Image.fromarray(image_to_save_np)
last_frame_pil.save(last_frame_path, pnginfo=png_metadata)
print(f"Saved last video frame for job {job_id} to {last_frame_path}")
return True
except Exception as e:
traceback.print_exc()
return False
|