|
""" |
|
Metadata utilities for FramePack Studio. |
|
This module provides functions for generating and saving metadata. |
|
""" |
|
|
|
import os |
|
import json |
|
import time |
|
import traceback |
|
import numpy as np |
|
from PIL import Image, ImageDraw, ImageFont |
|
from PIL.PngImagePlugin import PngInfo |
|
|
|
from modules.version import APP_VERSION |
|
|
|
def get_placeholder_color(model_type): |
|
""" |
|
Get the placeholder image color for a specific model type. |
|
|
|
Args: |
|
model_type: The model type string |
|
|
|
Returns: |
|
RGB tuple for the placeholder image color |
|
""" |
|
|
|
color_map = { |
|
"Original": (0, 0, 0), |
|
"F1": (0, 0, 128), |
|
"Video": (0, 128, 0), |
|
"XY Plot": (128, 128, 0), |
|
"F1 with Endframe": (0, 128, 128), |
|
"Original with Endframe": (128, 0, 128), |
|
} |
|
|
|
|
|
return color_map.get(model_type, (0, 0, 0)) |
|
|
|
|
|
def save_job_start_image(job_params, job_id, settings): |
|
""" |
|
Saves the job's starting input image to the output directory with comprehensive metadata. |
|
This is intended to be called early in the job processing and is the ONLY place metadata should be saved. |
|
""" |
|
|
|
output_dir_path = job_params.get("output_dir") or settings.get("output_dir") |
|
metadata_dir_path = job_params.get("metadata_dir") or settings.get("metadata_dir") |
|
|
|
if not output_dir_path: |
|
print(f"[JOB_START_IMG_ERROR] No output directory found in job_params or settings") |
|
return False |
|
|
|
|
|
os.makedirs(output_dir_path, exist_ok=True) |
|
os.makedirs(metadata_dir_path, exist_ok=True) |
|
|
|
actual_start_image_target_path = os.path.join(output_dir_path, f'{job_id}.png') |
|
actual_input_image_np = job_params.get('input_image') |
|
|
|
|
|
metadata_dict = create_metadata(job_params, job_id, settings) |
|
|
|
|
|
json_metadata_path = os.path.join(metadata_dir_path, f'{job_id}.json') |
|
|
|
try: |
|
with open(json_metadata_path, 'w') as f: |
|
import json |
|
json.dump(metadata_dict, f, indent=2) |
|
except Exception as e: |
|
traceback.print_exc() |
|
|
|
|
|
if actual_input_image_np is not None and isinstance(actual_input_image_np, np.ndarray): |
|
try: |
|
|
|
png_metadata = PngInfo() |
|
png_metadata.add_text("prompt", job_params.get('prompt_text', '')) |
|
png_metadata.add_text("seed", str(job_params.get('seed', 0))) |
|
png_metadata.add_text("model_type", job_params.get('model_type', "Unknown")) |
|
|
|
|
|
for key, value in metadata_dict.items(): |
|
if isinstance(value, (str, int, float, bool)) or value is None: |
|
png_metadata.add_text(key, str(value)) |
|
|
|
|
|
image_to_save_np = actual_input_image_np |
|
if actual_input_image_np.dtype != np.uint8: |
|
if actual_input_image_np.max() <= 1.0 and actual_input_image_np.min() >= -1.0 and actual_input_image_np.dtype in [np.float32, np.float64]: |
|
image_to_save_np = ((actual_input_image_np + 1.0) / 2.0 * 255.0).clip(0, 255).astype(np.uint8) |
|
elif actual_input_image_np.max() <= 1.0 and actual_input_image_np.min() >= 0.0 and actual_input_image_np.dtype in [np.float32, np.float64]: |
|
image_to_save_np = (actual_input_image_np * 255.0).clip(0,255).astype(np.uint8) |
|
else: |
|
image_to_save_np = actual_input_image_np.clip(0, 255).astype(np.uint8) |
|
|
|
start_image_pil = Image.fromarray(image_to_save_np) |
|
start_image_pil.save(actual_start_image_target_path, pnginfo=png_metadata) |
|
return True |
|
except Exception as e: |
|
traceback.print_exc() |
|
return False |
|
|
|
def create_metadata(job_params, job_id, settings, save_placeholder=False): |
|
""" |
|
Create metadata for the job. |
|
|
|
Args: |
|
job_params: Dictionary of job parameters |
|
job_id: The job ID |
|
settings: Dictionary of settings |
|
save_placeholder: Whether to save the placeholder image (default: False) |
|
|
|
Returns: |
|
Metadata dictionary |
|
""" |
|
if not settings.get("save_metadata"): |
|
return None |
|
|
|
metadata_dir_path = settings.get("metadata_dir") |
|
output_dir_path = settings.get("output_dir") |
|
os.makedirs(metadata_dir_path, exist_ok=True) |
|
os.makedirs(output_dir_path, exist_ok=True) |
|
|
|
|
|
model_type = job_params.get('model_type', "Original") |
|
placeholder_color = get_placeholder_color(model_type) |
|
|
|
|
|
height = job_params.get('height', 640) |
|
width = job_params.get('width', 640) |
|
|
|
|
|
if not height: |
|
height = job_params.get('resolutionH', 640) |
|
if not width: |
|
width = job_params.get('resolutionW', 640) |
|
|
|
placeholder_img = Image.new('RGB', (width, height), placeholder_color) |
|
|
|
|
|
if model_type == "XY Plot": |
|
x_param = job_params.get('x_param', '') |
|
y_param = job_params.get('y_param', '') |
|
x_values = job_params.get('x_values', []) |
|
y_values = job_params.get('y_values', []) |
|
|
|
draw = ImageDraw.Draw(placeholder_img) |
|
try: |
|
|
|
font = ImageFont.truetype("Arial", 20) |
|
except: |
|
|
|
font = ImageFont.load_default() |
|
|
|
text = f"X: {x_param} - {x_values}\nY: {y_param} - {y_values}" |
|
draw.text((10, 10), text, fill=(255, 255, 255), font=font) |
|
|
|
|
|
metadata = PngInfo() |
|
metadata.add_text("prompt", job_params.get('prompt_text', '')) |
|
metadata.add_text("seed", str(job_params.get('seed', 0))) |
|
|
|
|
|
if model_type == "XY Plot": |
|
metadata.add_text("x_param", job_params.get('x_param', '')) |
|
metadata.add_text("y_param", job_params.get('y_param', '')) |
|
|
|
|
|
end_frame_image = job_params.get('end_frame_image') |
|
end_frame_used = False |
|
if end_frame_image is not None: |
|
if isinstance(end_frame_image, np.ndarray): |
|
end_frame_used = end_frame_image.any() |
|
else: |
|
end_frame_used = True |
|
|
|
|
|
|
|
|
|
metadata_dict = { |
|
|
|
"app_version": APP_VERSION, |
|
|
|
|
|
"prompt": job_params.get('prompt_text', ''), |
|
"negative_prompt": job_params.get('n_prompt', ''), |
|
"seed": job_params.get('seed', 0), |
|
"steps": job_params.get('steps', 25), |
|
"cfg": job_params.get('cfg', 1.0), |
|
"gs": job_params.get('gs', 10.0), |
|
"rs": job_params.get('rs', 0.0), |
|
"latent_type": job_params.get('latent_type', 'Black'), |
|
"timestamp": time.time(), |
|
"resolutionW": job_params.get('resolutionW', 640), |
|
"resolutionH": job_params.get('resolutionH', 640), |
|
"model_type": model_type, |
|
"generation_type": job_params.get('generation_type', model_type), |
|
"has_input_image": job_params.get('has_input_image', False), |
|
"input_image_path": job_params.get('input_image_path', None), |
|
|
|
|
|
"total_second_length": job_params.get('total_second_length', 6), |
|
"blend_sections": job_params.get('blend_sections', 4), |
|
"latent_window_size": job_params.get('latent_window_size', 9), |
|
"num_cleaned_frames": job_params.get('num_cleaned_frames', 5), |
|
|
|
|
|
"end_frame_strength": job_params.get('end_frame_strength', None), |
|
"end_frame_image_path": job_params.get('end_frame_image_path', None), |
|
"end_frame_used": str(end_frame_used), |
|
|
|
|
|
"input_video": os.path.basename(job_params.get('input_image', '')) if job_params.get('input_image') is not None and model_type == "Video" else None, |
|
"video_path": job_params.get('input_image') if model_type == "Video" else None, |
|
|
|
|
|
"x_param": job_params.get('x_param', None), |
|
"y_param": job_params.get('y_param', None), |
|
"x_values": job_params.get('x_values', None), |
|
"y_values": job_params.get('y_values', None), |
|
|
|
|
|
"combine_with_source": job_params.get('combine_with_source', False), |
|
|
|
|
|
"use_teacache": job_params.get('use_teacache', False), |
|
"teacache_num_steps": job_params.get('teacache_num_steps', 0), |
|
"teacache_rel_l1_thresh": job_params.get('teacache_rel_l1_thresh', 0.0) |
|
} |
|
|
|
|
|
selected_loras = job_params.get('selected_loras', []) |
|
lora_values = job_params.get('lora_values', []) |
|
lora_loaded_names = job_params.get('lora_loaded_names', []) |
|
|
|
if isinstance(selected_loras, list) and len(selected_loras) > 0: |
|
lora_data = {} |
|
for lora_name in selected_loras: |
|
try: |
|
idx = lora_loaded_names.index(lora_name) |
|
|
|
has_lora_values = lora_values is not None and len(lora_values) > 0 |
|
weight = lora_values[idx] if has_lora_values and idx < len(lora_values) else 1.0 |
|
|
|
|
|
if isinstance(weight, np.ndarray): |
|
|
|
weight_value = float(weight.item()) if weight.size == 1 else float(weight.mean()) |
|
elif isinstance(weight, list): |
|
|
|
has_items = weight is not None and len(weight) > 0 |
|
weight_value = float(weight[0]) if has_items else 1.0 |
|
else: |
|
|
|
weight_value = float(weight) if weight is not None else 1.0 |
|
|
|
lora_data[lora_name] = weight_value |
|
except ValueError: |
|
lora_data[lora_name] = 1.0 |
|
except Exception as e: |
|
lora_data[lora_name] = 1.0 |
|
traceback.print_exc() |
|
|
|
metadata_dict["loras"] = lora_data |
|
else: |
|
metadata_dict["loras"] = {} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
placeholder_target_path = os.path.join(metadata_dir_path, f'{job_id}.png') |
|
|
|
|
|
if save_placeholder: |
|
try: |
|
placeholder_img.save(placeholder_target_path, pnginfo=metadata) |
|
except Exception as e: |
|
traceback.print_exc() |
|
|
|
return metadata_dict |
|
|
|
def save_last_video_frame(job_params, job_id, settings, last_frame_np): |
|
""" |
|
Saves the last frame of the input video to the output directory with metadata. |
|
""" |
|
output_dir_path = job_params.get("output_dir") or settings.get("output_dir") |
|
|
|
if not output_dir_path: |
|
print(f"[SAVE_LAST_FRAME_ERROR] No output directory found.") |
|
return False |
|
|
|
os.makedirs(output_dir_path, exist_ok=True) |
|
|
|
last_frame_path = os.path.join(output_dir_path, f'{job_id}.png') |
|
|
|
metadata_dict = create_metadata(job_params, job_id, settings) |
|
|
|
if last_frame_np is not None and isinstance(last_frame_np, np.ndarray): |
|
try: |
|
png_metadata = PngInfo() |
|
for key, value in metadata_dict.items(): |
|
if isinstance(value, (str, int, float, bool)) or value is None: |
|
png_metadata.add_text(key, str(value)) |
|
|
|
image_to_save_np = last_frame_np |
|
if last_frame_np.dtype != np.uint8: |
|
if last_frame_np.max() <= 1.0 and last_frame_np.min() >= -1.0 and last_frame_np.dtype in [np.float32, np.float64]: |
|
image_to_save_np = ((last_frame_np + 1.0) / 2.0 * 255.0).clip(0, 255).astype(np.uint8) |
|
elif last_frame_np.max() <= 1.0 and last_frame_np.min() >= 0.0 and last_frame_np.dtype in [np.float32, np.float64]: |
|
image_to_save_np = (last_frame_np * 255.0).clip(0,255).astype(np.uint8) |
|
else: |
|
image_to_save_np = last_frame_np.clip(0, 255).astype(np.uint8) |
|
|
|
last_frame_pil = Image.fromarray(image_to_save_np) |
|
last_frame_pil.save(last_frame_path, pnginfo=png_metadata) |
|
print(f"Saved last video frame for job {job_id} to {last_frame_path}") |
|
return True |
|
except Exception as e: |
|
traceback.print_exc() |
|
return False |
|
|