FPS-Studio / modules /pipelines /metadata_utils.py
rahul7star's picture
Migrated from GitHub
05fcd0f verified
"""
Metadata utilities for FramePack Studio.
This module provides functions for generating and saving metadata.
"""
import os
import json
import time
import traceback # Moved to top
import numpy as np # Added
from PIL import Image, ImageDraw, ImageFont
from PIL.PngImagePlugin import PngInfo
from modules.version import APP_VERSION
def get_placeholder_color(model_type):
"""
Get the placeholder image color for a specific model type.
Args:
model_type: The model type string
Returns:
RGB tuple for the placeholder image color
"""
# Define color mapping for different model types
color_map = {
"Original": (0, 0, 0), # Black
"F1": (0, 0, 128), # Blue
"Video": (0, 128, 0), # Green
"XY Plot": (128, 128, 0), # Yellow
"F1 with Endframe": (0, 128, 128), # Teal
"Original with Endframe": (128, 0, 128), # Purple
}
# Return the color for the model type, or black as default
return color_map.get(model_type, (0, 0, 0))
# Function to save the starting image with comprehensive metadata
def save_job_start_image(job_params, job_id, settings):
"""
Saves the job's starting input image to the output directory with comprehensive metadata.
This is intended to be called early in the job processing and is the ONLY place metadata should be saved.
"""
# Get output directory from settings or job_params
output_dir_path = job_params.get("output_dir") or settings.get("output_dir")
metadata_dir_path = job_params.get("metadata_dir") or settings.get("metadata_dir")
if not output_dir_path:
print(f"[JOB_START_IMG_ERROR] No output directory found in job_params or settings")
return False
# Ensure directories exist
os.makedirs(output_dir_path, exist_ok=True)
os.makedirs(metadata_dir_path, exist_ok=True)
actual_start_image_target_path = os.path.join(output_dir_path, f'{job_id}.png')
actual_input_image_np = job_params.get('input_image')
# Create comprehensive metadata dictionary
metadata_dict = create_metadata(job_params, job_id, settings)
# Save JSON metadata with the same job_id
json_metadata_path = os.path.join(metadata_dir_path, f'{job_id}.json')
try:
with open(json_metadata_path, 'w') as f:
import json
json.dump(metadata_dict, f, indent=2)
except Exception as e:
traceback.print_exc()
# Save the input image if it's a numpy array
if actual_input_image_np is not None and isinstance(actual_input_image_np, np.ndarray):
try:
# Create PNG metadata
png_metadata = PngInfo()
png_metadata.add_text("prompt", job_params.get('prompt_text', ''))
png_metadata.add_text("seed", str(job_params.get('seed', 0)))
png_metadata.add_text("model_type", job_params.get('model_type', "Unknown"))
# Add more metadata fields
for key, value in metadata_dict.items():
if isinstance(value, (str, int, float, bool)) or value is None:
png_metadata.add_text(key, str(value))
# Convert image if needed
image_to_save_np = actual_input_image_np
if actual_input_image_np.dtype != np.uint8:
if actual_input_image_np.max() <= 1.0 and actual_input_image_np.min() >= -1.0 and actual_input_image_np.dtype in [np.float32, np.float64]:
image_to_save_np = ((actual_input_image_np + 1.0) / 2.0 * 255.0).clip(0, 255).astype(np.uint8)
elif actual_input_image_np.max() <= 1.0 and actual_input_image_np.min() >= 0.0 and actual_input_image_np.dtype in [np.float32, np.float64]:
image_to_save_np = (actual_input_image_np * 255.0).clip(0,255).astype(np.uint8)
else:
image_to_save_np = actual_input_image_np.clip(0, 255).astype(np.uint8)
# Save the image with metadata
start_image_pil = Image.fromarray(image_to_save_np)
start_image_pil.save(actual_start_image_target_path, pnginfo=png_metadata)
return True # Indicate success
except Exception as e:
traceback.print_exc()
return False # Indicate failure or inability to save
def create_metadata(job_params, job_id, settings, save_placeholder=False):
"""
Create metadata for the job.
Args:
job_params: Dictionary of job parameters
job_id: The job ID
settings: Dictionary of settings
save_placeholder: Whether to save the placeholder image (default: False)
Returns:
Metadata dictionary
"""
if not settings.get("save_metadata"):
return None
metadata_dir_path = settings.get("metadata_dir")
output_dir_path = settings.get("output_dir")
os.makedirs(metadata_dir_path, exist_ok=True)
os.makedirs(output_dir_path, exist_ok=True) # Ensure output_dir also exists
# Get model type and determine placeholder image color
model_type = job_params.get('model_type', "Original")
placeholder_color = get_placeholder_color(model_type)
# Create a placeholder image
height = job_params.get('height', 640)
width = job_params.get('width', 640)
# Use resolutionH and resolutionW if height and width are not available
if not height:
height = job_params.get('resolutionH', 640)
if not width:
width = job_params.get('resolutionW', 640)
placeholder_img = Image.new('RGB', (width, height), placeholder_color)
# Add XY plot parameters to the image if applicable
if model_type == "XY Plot":
x_param = job_params.get('x_param', '')
y_param = job_params.get('y_param', '')
x_values = job_params.get('x_values', [])
y_values = job_params.get('y_values', [])
draw = ImageDraw.Draw(placeholder_img)
try:
# Try to use a system font
font = ImageFont.truetype("Arial", 20)
except:
# Fall back to default font
font = ImageFont.load_default()
text = f"X: {x_param} - {x_values}\nY: {y_param} - {y_values}"
draw.text((10, 10), text, fill=(255, 255, 255), font=font)
# Create PNG metadata
metadata = PngInfo()
metadata.add_text("prompt", job_params.get('prompt_text', ''))
metadata.add_text("seed", str(job_params.get('seed', 0)))
# Add model-specific metadata to PNG
if model_type == "XY Plot":
metadata.add_text("x_param", job_params.get('x_param', ''))
metadata.add_text("y_param", job_params.get('y_param', ''))
# Determine end_frame_used value safely (avoiding NumPy array boolean ambiguity)
end_frame_image = job_params.get('end_frame_image')
end_frame_used = False
if end_frame_image is not None:
if isinstance(end_frame_image, np.ndarray):
end_frame_used = end_frame_image.any() # True if any element is non-zero
else:
end_frame_used = True
# Create comprehensive JSON metadata with all possible parameters
# This is created before file saving logic that might use it (e.g. JSON dump)
# but PngInfo 'metadata' is used for images.
metadata_dict = {
# Version information
"app_version": APP_VERSION, # Using numeric version without 'v' prefix for metadata
# Common parameters
"prompt": job_params.get('prompt_text', ''),
"negative_prompt": job_params.get('n_prompt', ''),
"seed": job_params.get('seed', 0),
"steps": job_params.get('steps', 25),
"cfg": job_params.get('cfg', 1.0),
"gs": job_params.get('gs', 10.0),
"rs": job_params.get('rs', 0.0),
"latent_type": job_params.get('latent_type', 'Black'),
"timestamp": time.time(),
"resolutionW": job_params.get('resolutionW', 640),
"resolutionH": job_params.get('resolutionH', 640),
"model_type": model_type,
"generation_type": job_params.get('generation_type', model_type),
"has_input_image": job_params.get('has_input_image', False),
"input_image_path": job_params.get('input_image_path', None),
# Video-related parameters
"total_second_length": job_params.get('total_second_length', 6),
"blend_sections": job_params.get('blend_sections', 4),
"latent_window_size": job_params.get('latent_window_size', 9),
"num_cleaned_frames": job_params.get('num_cleaned_frames', 5),
# Endframe-related parameters
"end_frame_strength": job_params.get('end_frame_strength', None),
"end_frame_image_path": job_params.get('end_frame_image_path', None),
"end_frame_used": str(end_frame_used),
# Video input-related parameters
"input_video": os.path.basename(job_params.get('input_image', '')) if job_params.get('input_image') is not None and model_type == "Video" else None,
"video_path": job_params.get('input_image') if model_type == "Video" else None,
# XY Plot-related parameters
"x_param": job_params.get('x_param', None),
"y_param": job_params.get('y_param', None),
"x_values": job_params.get('x_values', None),
"y_values": job_params.get('y_values', None),
# Combine with source video
"combine_with_source": job_params.get('combine_with_source', False),
# Tea cache parameters
"use_teacache": job_params.get('use_teacache', False),
"teacache_num_steps": job_params.get('teacache_num_steps', 0),
"teacache_rel_l1_thresh": job_params.get('teacache_rel_l1_thresh', 0.0),
# MagCache parameters
"use_magcache": job_params.get('use_magcache', False),
"magcache_threshold": job_params.get('magcache_threshold', 0.1),
"magcache_max_consecutive_skips": job_params.get('magcache_max_consecutive_skips', 2),
"magcache_retention_ratio": job_params.get('magcache_retention_ratio', 0.25),
}
# Add LoRA information if present
selected_loras = job_params.get('selected_loras', [])
lora_values = job_params.get('lora_values', [])
lora_loaded_names = job_params.get('lora_loaded_names', [])
if isinstance(selected_loras, list) and len(selected_loras) > 0:
lora_data = {}
for lora_name in selected_loras:
try:
idx = lora_loaded_names.index(lora_name)
# Fix for NumPy array boolean ambiguity
has_lora_values = lora_values is not None and len(lora_values) > 0
weight = lora_values[idx] if has_lora_values and idx < len(lora_values) else 1.0
# Handle different types of weight values
if isinstance(weight, np.ndarray):
# Convert NumPy array to a scalar value
weight_value = float(weight.item()) if weight.size == 1 else float(weight.mean())
elif isinstance(weight, list):
# Handle list type weights
has_items = weight is not None and len(weight) > 0
weight_value = float(weight[0]) if has_items else 1.0
else:
# Handle scalar weights
weight_value = float(weight) if weight is not None else 1.0
lora_data[lora_name] = weight_value
except ValueError:
lora_data[lora_name] = 1.0
except Exception as e:
lora_data[lora_name] = 1.0
traceback.print_exc()
metadata_dict["loras"] = lora_data
else:
metadata_dict["loras"] = {}
# This function now only creates the metadata dictionary without saving files
# The actual saving is done by save_job_start_image() at the beginning of the generation process
# This prevents duplicate metadata files from being created
# For backward compatibility, we still create the placeholder image
# and save it if explicitly requested
placeholder_target_path = os.path.join(metadata_dir_path, f'{job_id}.png')
# Save the placeholder image if requested
if save_placeholder:
try:
placeholder_img.save(placeholder_target_path, pnginfo=metadata)
except Exception as e:
traceback.print_exc()
return metadata_dict
def save_last_video_frame(job_params, job_id, settings, last_frame_np):
"""
Saves the last frame of the input video to the output directory with metadata.
"""
output_dir_path = job_params.get("output_dir") or settings.get("output_dir")
if not output_dir_path:
print(f"[SAVE_LAST_FRAME_ERROR] No output directory found.")
return False
os.makedirs(output_dir_path, exist_ok=True)
last_frame_path = os.path.join(output_dir_path, f'{job_id}.png')
metadata_dict = create_metadata(job_params, job_id, settings)
if last_frame_np is not None and isinstance(last_frame_np, np.ndarray):
try:
png_metadata = PngInfo()
for key, value in metadata_dict.items():
if isinstance(value, (str, int, float, bool)) or value is None:
png_metadata.add_text(key, str(value))
image_to_save_np = last_frame_np
if last_frame_np.dtype != np.uint8:
if last_frame_np.max() <= 1.0 and last_frame_np.min() >= -1.0 and last_frame_np.dtype in [np.float32, np.float64]:
image_to_save_np = ((last_frame_np + 1.0) / 2.0 * 255.0).clip(0, 255).astype(np.uint8)
elif last_frame_np.max() <= 1.0 and last_frame_np.min() >= 0.0 and last_frame_np.dtype in [np.float32, np.float64]:
image_to_save_np = (last_frame_np * 255.0).clip(0,255).astype(np.uint8)
else:
image_to_save_np = last_frame_np.clip(0, 255).astype(np.uint8)
last_frame_pil = Image.fromarray(image_to_save_np)
last_frame_pil.save(last_frame_path, pnginfo=png_metadata)
print(f"Saved last video frame for job {job_id} to {last_frame_path}")
return True
except Exception as e:
traceback.print_exc()
return False