from diffusers_helper.hf_login import login import json import os import shutil from pathlib import PurePath, Path import time import argparse import traceback import einops import numpy as np import torch import datetime import spaces # Version information from modules.version import APP_VERSION # Set environment variables os.environ['HF_HOME'] = os.path.abspath(os.path.realpath(os.path.join(os.path.dirname(__file__), './hf_download'))) os.environ['TOKENIZERS_PARALLELISM'] = 'false' # Prevent tokenizers parallelism warning import gradio as gr from PIL import Image from PIL.PngImagePlugin import PngInfo from diffusers import AutoencoderKLHunyuanVideo from transformers import LlamaModel, CLIPTextModel, LlamaTokenizerFast, CLIPTokenizer from diffusers_helper.hunyuan import encode_prompt_conds, vae_decode, vae_encode, vae_decode_fake from diffusers_helper.utils import save_bcthw_as_mp4, crop_or_pad_yield_mask, soft_append_bcthw, resize_and_center_crop, generate_timestamp from diffusers_helper.models.hunyuan_video_packed import HunyuanVideoTransformer3DModelPacked from diffusers_helper.pipelines.k_diffusion_hunyuan import sample_hunyuan from diffusers_helper.memory import cpu, gpu, get_cuda_free_memory_gb, move_model_to_device_with_memory_preservation, offload_model_from_device_for_memory_preservation, fake_diffusers_current_device, DynamicSwapInstaller, unload_complete_models, load_model_as_complete from diffusers_helper.thread_utils import AsyncStream from diffusers_helper.gradio.progress_bar import make_progress_bar_html from transformers import SiglipImageProcessor, SiglipVisionModel from diffusers_helper.clip_vision import hf_clip_vision_encode from diffusers_helper.bucket_tools import find_nearest_bucket from diffusers_helper import lora_utils from diffusers_helper.lora_utils import load_lora, unload_all_loras # Import model generators from modules.generators import create_model_generator # Global cache for prompt embeddings prompt_embedding_cache = {} # Import from modules from modules.video_queue import VideoJobQueue, JobStatus from modules.prompt_handler import parse_timestamped_prompt from modules.interface import create_interface, format_queue_status from modules.settings import Settings from modules import DUMMY_LORA_NAME # Import the constant from modules.pipelines.metadata_utils import create_metadata from modules.pipelines.worker import worker # Try to suppress annoyingly persistent Windows asyncio proactor errors if os.name == 'nt': # Windows only import asyncio from functools import wraps # Replace the problematic proactor event loop with selector event loop asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) # Patch the base transport's close method def silence_event_loop_closed(func): @wraps(func) def wrapper(self, *args, **kwargs): try: return func(self, *args, **kwargs) except RuntimeError as e: if str(e) != 'Event loop is closed': raise return wrapper # Apply the patch if hasattr(asyncio.proactor_events._ProactorBasePipeTransport, '_call_connection_lost'): asyncio.proactor_events._ProactorBasePipeTransport._call_connection_lost = silence_event_loop_closed( asyncio.proactor_events._ProactorBasePipeTransport._call_connection_lost) # ADDED: Debug function to verify LoRA state def verify_lora_state(transformer, label=""): """Debug function to verify the state of LoRAs in a transformer model""" if transformer is None: print(f"[{label}] Transformer is None, cannot verify LoRA state") return has_loras = False if hasattr(transformer, 'peft_config'): adapter_names = list(transformer.peft_config.keys()) if transformer.peft_config else [] if adapter_names: has_loras = True print(f"[{label}] Transformer has LoRAs: {', '.join(adapter_names)}") else: print(f"[{label}] Transformer has no LoRAs in peft_config") else: print(f"[{label}] Transformer has no peft_config attribute") # Check for any LoRA modules for name, module in transformer.named_modules(): if hasattr(module, 'lora_A') and module.lora_A: has_loras = True # print(f"[{label}] Found lora_A in module {name}") if hasattr(module, 'lora_B') and module.lora_B: has_loras = True # print(f"[{label}] Found lora_B in module {name}") if not has_loras: print(f"[{label}] No LoRA components found in transformer") parser = argparse.ArgumentParser() parser.add_argument('--share', action='store_true') parser.add_argument("--server", type=str, default='0.0.0.0') parser.add_argument("--port", type=int, required=False) parser.add_argument("--inbrowser", action='store_true') parser.add_argument("--lora", type=str, default=None, help="Lora path (comma separated for multiple)") parser.add_argument("--offline", action='store_true', help="Run in offline mode") args = parser.parse_args() print(args) if args.offline: print("Offline mode enabled.") os.environ['HF_HUB_OFFLINE'] = '1' else: if 'HF_HUB_OFFLINE' in os.environ: del os.environ['HF_HUB_OFFLINE'] free_mem_gb = get_cuda_free_memory_gb(gpu) high_vram = free_mem_gb > 60 print(f'Free VRAM {free_mem_gb} GB') print(f'High-VRAM Mode: {high_vram}') # Load models text_encoder = LlamaModel.from_pretrained("hunyuanvideo-community/HunyuanVideo", subfolder='text_encoder', torch_dtype=torch.float16).cpu() text_encoder_2 = CLIPTextModel.from_pretrained("hunyuanvideo-community/HunyuanVideo", subfolder='text_encoder_2', torch_dtype=torch.float16).cpu() tokenizer = LlamaTokenizerFast.from_pretrained("hunyuanvideo-community/HunyuanVideo", subfolder='tokenizer') tokenizer_2 = CLIPTokenizer.from_pretrained("hunyuanvideo-community/HunyuanVideo", subfolder='tokenizer_2') vae = AutoencoderKLHunyuanVideo.from_pretrained("hunyuanvideo-community/HunyuanVideo", subfolder='vae', torch_dtype=torch.float16).cpu() feature_extractor = SiglipImageProcessor.from_pretrained("lllyasviel/flux_redux_bfl", subfolder='feature_extractor') image_encoder = SiglipVisionModel.from_pretrained("lllyasviel/flux_redux_bfl", subfolder='image_encoder', torch_dtype=torch.float16).cpu() # Initialize model generator placeholder current_generator = None # Will hold the currently active model generator # Load models based on VRAM availability later # Configure models vae.eval() text_encoder.eval() text_encoder_2.eval() image_encoder.eval() if not high_vram: vae.enable_slicing() vae.enable_tiling() vae.to(dtype=torch.float16) image_encoder.to(dtype=torch.float16) text_encoder.to(dtype=torch.float16) text_encoder_2.to(dtype=torch.float16) vae.requires_grad_(False) text_encoder.requires_grad_(False) text_encoder_2.requires_grad_(False) image_encoder.requires_grad_(False) # Create lora directory if it doesn't exist lora_dir = os.path.join(os.path.dirname(__file__), 'loras') os.makedirs(lora_dir, exist_ok=True) # Initialize LoRA support - moved scanning after settings load lora_names = [] lora_values = [] # This seems unused for population, might be related to weights later script_dir = os.path.dirname(os.path.abspath(__file__)) # Define default LoRA folder path relative to the script directory (used if setting is missing) default_lora_folder = os.path.join(script_dir, "loras") os.makedirs(default_lora_folder, exist_ok=True) # Ensure default exists if not high_vram: # DynamicSwapInstaller is same as huggingface's enable_sequential_offload but 3x faster DynamicSwapInstaller.install_model(text_encoder, device=gpu) else: text_encoder.to(gpu) text_encoder_2.to(gpu) image_encoder.to(gpu) vae.to(gpu) stream = AsyncStream() outputs_folder = './outputs/' os.makedirs(outputs_folder, exist_ok=True) # Initialize settings settings = Settings() # NEW: auto-cleanup on start-up option in Settings if settings.get("auto_cleanup_on_startup", False): print("--- Running Automatic Startup Cleanup ---") # Import the processor instance from modules.toolbox_app import tb_processor # Call the single cleanup function and print its summary. cleanup_summary = tb_processor.tb_clear_temporary_files() print(f"{cleanup_summary}") # This cleaner print handles the multiline string well print("--- Startup Cleanup Complete ---") # --- Populate LoRA names AFTER settings are loaded --- lora_folder_from_settings: str = settings.get("lora_dir", default_lora_folder) # Use setting, fallback to default print(f"Scanning for LoRAs in: {lora_folder_from_settings}") if os.path.isdir(lora_folder_from_settings): try: for root, _, files in os.walk(lora_folder_from_settings): for file in files: if file.endswith('.safetensors') or file.endswith('.pt'): lora_relative_path = os.path.relpath(os.path.join(root, file), lora_folder_from_settings) lora_name = str(PurePath(lora_relative_path).with_suffix('')) lora_names.append(lora_name) print(f"Found LoRAs: {lora_names}") # Temp solution for only 1 lora if len(lora_names) == 1: lora_names.append(DUMMY_LORA_NAME) except Exception as e: print(f"Error scanning LoRA directory '{lora_folder_from_settings}': {e}") else: print(f"LoRA directory not found: {lora_folder_from_settings}") # --- End LoRA population --- # Create job queue job_queue = VideoJobQueue() # Function to load a LoRA file def load_lora_file(lora_file: str | PurePath): if not lora_file: return None, "No file selected" try: # Get the filename from the path lora_path = PurePath(lora_file) lora_name = lora_path.name # Copy the file to the lora directory lora_dest = PurePath(lora_dir, lora_path) import shutil shutil.copy(lora_file, lora_dest) # Load the LoRA global current_generator, lora_names if current_generator is None: return None, "Error: No model loaded to apply LoRA to. Generate something first." # Unload any existing LoRAs first current_generator.unload_loras() # Load the single LoRA selected_loras = [lora_path.stem] current_generator.load_loras(selected_loras, lora_dir, selected_loras) # Add to lora_names if not already there lora_base_name = lora_path.stem if lora_base_name not in lora_names: lora_names.append(lora_base_name) # Get the current device of the transformer device = next(current_generator.transformer.parameters()).device # Move all LoRA adapters to the same device as the base model current_generator.move_lora_adapters_to_device(device) print(f"Loaded LoRA: {lora_name} to {current_generator.get_model_name()} model") return gr.update(choices=lora_names), f"Successfully loaded LoRA: {lora_name}" except Exception as e: print(f"Error loading LoRA: {e}") return None, f"Error loading LoRA: {e}" @torch.no_grad() def get_cached_or_encode_prompt(prompt, text_encoder, text_encoder_2, tokenizer, tokenizer_2, target_device): """ Retrieves prompt embeddings from cache or encodes them if not found. Stores encoded embeddings (on CPU) in the cache. Returns embeddings moved to the target_device. """ if prompt in prompt_embedding_cache: print(f"Cache hit for prompt: {prompt[:60]}...") llama_vec_cpu, llama_mask_cpu, clip_l_pooler_cpu = prompt_embedding_cache[prompt] # Move cached embeddings (from CPU) to the target device llama_vec = llama_vec_cpu.to(target_device) llama_attention_mask = llama_mask_cpu.to(target_device) if llama_mask_cpu is not None else None clip_l_pooler = clip_l_pooler_cpu.to(target_device) return llama_vec, llama_attention_mask, clip_l_pooler else: print(f"Cache miss for prompt: {prompt[:60]}...") llama_vec, clip_l_pooler = encode_prompt_conds( prompt, text_encoder, text_encoder_2, tokenizer, tokenizer_2 ) llama_vec, llama_attention_mask = crop_or_pad_yield_mask(llama_vec, length=512) # Store CPU copies in cache prompt_embedding_cache[prompt] = (llama_vec.cpu(), llama_attention_mask.cpu() if llama_attention_mask is not None else None, clip_l_pooler.cpu()) # Return embeddings already on the target device (as encode_prompt_conds uses the model's device) return llama_vec, llama_attention_mask, clip_l_pooler # Set the worker function for the job queue - using the imported worker from modules/pipelines/worker.py job_queue.set_worker_function(worker) def get_duration(model_type, input_image, end_frame_image, # NEW end_frame_strength, # NEW prompt_text, n_prompt, seed, total_second_length, latent_window_size, steps, cfg, gs, rs, use_teacache, teacache_num_steps, teacache_rel_l1_thresh, use_magcache, magcache_threshold, magcache_max_consecutive_skips, magcache_retention_ratio, blend_sections, latent_type, clean_up_videos, selected_loras, resolutionW, resolutionH, input_image_path, combine_with_source, num_cleaned_frames, *lora_args, save_metadata_checked=True): return total_second_length * 60 @spaces.GPU(duration=get_duration) def process( model_type, input_image, end_frame_image, # NEW end_frame_strength, # NEW prompt_text, n_prompt, seed, total_second_length, latent_window_size, steps, cfg, gs, rs, use_teacache, teacache_num_steps, teacache_rel_l1_thresh, use_magcache, magcache_threshold, magcache_max_consecutive_skips, magcache_retention_ratio, blend_sections, latent_type, clean_up_videos, selected_loras, resolutionW, resolutionH, input_image_path, combine_with_source, num_cleaned_frames, *lora_args, save_metadata_checked=True, # NEW: Parameter to control metadata saving ): # Create a blank black image if no # Create a default image based on the selected latent_type has_input_image = True if input_image is None: has_input_image = False default_height, default_width = resolutionH, resolutionW if latent_type == "White": # Create a white image input_image = np.ones((default_height, default_width, 3), dtype=np.uint8) * 255 print("No input image provided. Using a blank white image.") elif latent_type == "Noise": # Create a noise image input_image = np.random.randint(0, 256, (default_height, default_width, 3), dtype=np.uint8) print("No input image provided. Using a random noise image.") elif latent_type == "Green Screen": # Create a green screen image with standard chroma key green (0, 177, 64) input_image = np.zeros((default_height, default_width, 3), dtype=np.uint8) input_image[:, :, 1] = 177 # Green channel input_image[:, :, 2] = 64 # Blue channel # Red channel remains 0 print("No input image provided. Using a standard chroma key green screen.") else: # Default to "Black" or any other value # Create a black image input_image = np.zeros((default_height, default_width, 3), dtype=np.uint8) print(f"No input image provided. Using a blank black image (latent_type: {latent_type}).") # Handle input files - copy to input_files_dir to prevent them from being deleted by temp cleanup input_files_dir = settings.get("input_files_dir") os.makedirs(input_files_dir, exist_ok=True) # Process input image (if it's a file path) input_image_path = None if isinstance(input_image, str) and os.path.exists(input_image): # It's a file path, copy it to input_files_dir filename = os.path.basename(input_image) input_image_path = os.path.join(input_files_dir, f"{generate_timestamp()}_{filename}") try: shutil.copy2(input_image, input_image_path) print(f"Copied input image to {input_image_path}") # For Video model, we'll use the path if model_type == "Video": input_image = input_image_path except Exception as e: print(f"Error copying input image: {e}") # Process end frame image (if it's a file path) end_frame_image_path = None if isinstance(end_frame_image, str) and os.path.exists(end_frame_image): # It's a file path, copy it to input_files_dir filename = os.path.basename(end_frame_image) end_frame_image_path = os.path.join(input_files_dir, f"{generate_timestamp()}_{filename}") try: shutil.copy2(end_frame_image, end_frame_image_path) print(f"Copied end frame image to {end_frame_image_path}") except Exception as e: print(f"Error copying end frame image: {e}") # Extract lora_loaded_names from lora_args lora_loaded_names = lora_args[0] if lora_args and len(lora_args) > 0 else [] lora_values = lora_args[1:] if lora_args and len(lora_args) > 1 else [] # Create job parameters job_params = { 'model_type': model_type, 'input_image': input_image.copy() if hasattr(input_image, 'copy') else input_image, # Handle both image arrays and video paths 'end_frame_image': end_frame_image.copy() if end_frame_image is not None else None, 'end_frame_strength': end_frame_strength, 'prompt_text': prompt_text, 'n_prompt': n_prompt, 'seed': seed, 'total_second_length': total_second_length, 'latent_window_size': latent_window_size, 'latent_type': latent_type, 'steps': steps, 'cfg': cfg, 'gs': gs, 'rs': rs, 'blend_sections': blend_sections, 'use_teacache': use_teacache, 'teacache_num_steps': teacache_num_steps, 'teacache_rel_l1_thresh': teacache_rel_l1_thresh, 'use_magcache': use_magcache, 'magcache_threshold': magcache_threshold, 'magcache_max_consecutive_skips': magcache_max_consecutive_skips, 'magcache_retention_ratio': magcache_retention_ratio, 'selected_loras': selected_loras, 'has_input_image': has_input_image, 'output_dir': settings.get("output_dir"), 'metadata_dir': settings.get("metadata_dir"), 'input_files_dir': input_files_dir, # Add input_files_dir to job parameters 'input_image_path': input_image_path, # Add the path to the copied input image 'end_frame_image_path': end_frame_image_path, # Add the path to the copied end frame image 'resolutionW': resolutionW, # Add resolution parameter 'resolutionH': resolutionH, 'lora_loaded_names': lora_loaded_names, 'combine_with_source': combine_with_source, # Add combine_with_source parameter 'num_cleaned_frames': num_cleaned_frames, 'save_metadata_checked': save_metadata_checked, # NEW: Add save_metadata_checked parameter } # Print teacache parameters for debugging print(f"Teacache parameters: use_teacache={use_teacache}, teacache_num_steps={teacache_num_steps}, teacache_rel_l1_thresh={teacache_rel_l1_thresh}") # Add LoRA values if provided - extract them from the tuple if lora_values: # Convert tuple to list lora_values_list = list(lora_values) job_params['lora_values'] = lora_values_list # Add job to queue job_id = job_queue.add_job(job_params) # Set the generation_type attribute on the job object directly job = job_queue.get_job(job_id) if job: job.generation_type = model_type # Set generation_type to model_type for display in queue print(f"Added job {job_id} to queue") queue_status = update_queue_status() # Return immediately after adding to queue # Return separate updates for start_button and end_button to prevent cross-contamination return None, job_id, None, '', f'Job added to queue. Job ID: {job_id}', gr.update(value="🚀 Add to Queue", interactive=True), gr.update(value="❌ Cancel Current Job", interactive=True) def end_process(): """Cancel the current running job and update the queue status""" print("Cancelling current job") with job_queue.lock: if job_queue.current_job: job_id = job_queue.current_job.id print(f"Cancelling job {job_id}") # Send the end signal to the job's stream if job_queue.current_job.stream: job_queue.current_job.stream.input_queue.push('end') # Mark the job as cancelled job_queue.current_job.status = JobStatus.CANCELLED job_queue.current_job.completed_at = time.time() # Set completion time # Force an update to the queue status return update_queue_status() def update_queue_status(): """Update queue status and refresh job positions""" jobs = job_queue.get_all_jobs() for job in jobs: if job.status == JobStatus.PENDING: job.queue_position = job_queue.get_queue_position(job.id) # Make sure to update current running job info if job_queue.current_job: # Make sure the running job is showing status = RUNNING job_queue.current_job.status = JobStatus.RUNNING # Update the toolbar stats pending_count = 0 running_count = 0 completed_count = 0 for job in jobs: if hasattr(job, 'status'): status = str(job.status) if status == "JobStatus.PENDING": pending_count += 1 elif status == "JobStatus.RUNNING": running_count += 1 elif status == "JobStatus.COMPLETED": completed_count += 1 return format_queue_status(jobs) def monitor_job(job_id=None): """ Monitor a specific job and update the UI with the latest video segment as soon as it's available. If no job_id is provided, check if there's a current job in the queue. ALWAYS shows the current running job, regardless of the job_id provided. """ last_video = None # Track the last video file shown last_job_status = None # Track the previous job status to detect status changes last_progress_update_time = time.time() # Track when we last updated the progress last_preview = None # Track the last preview image shown force_update = True # Force an update on first iteration # Flag to indicate we're waiting for a job transition waiting_for_transition = False transition_start_time = None max_transition_wait = 5.0 # Maximum time to wait for transition in seconds def get_preview_updates(preview_value): """Create preview updates that respect the latents_display_top setting""" display_top = settings.get("latents_display_top", False) if display_top: # Top display enabled: update top preview with value, don't update right preview return gr.update(), preview_value if preview_value is not None else gr.update() else: # Right column display: update right preview with value, don't update top preview return preview_value if preview_value is not None else gr.update(), gr.update() while True: # ALWAYS check if there's a current running job that's different from our tracked job_id with job_queue.lock: current_job = job_queue.current_job if current_job and current_job.id != job_id and current_job.status == JobStatus.RUNNING: # Always switch to the current running job job_id = current_job.id waiting_for_transition = False force_update = True # Yield a temporary update to show we're switching jobs right_preview, top_preview = get_preview_updates(None) yield last_video, right_preview, top_preview, '', 'Switching to current job...', gr.update(interactive=True), gr.update(value="❌ Cancel Current Job", visible=True) continue # Check if we're waiting for a job transition if waiting_for_transition: current_time = time.time() # If we've been waiting too long, stop waiting if current_time - transition_start_time > max_transition_wait: waiting_for_transition = False # Check one more time for a current job with job_queue.lock: current_job = job_queue.current_job if current_job and current_job.status == JobStatus.RUNNING: # Switch to whatever job is currently running job_id = current_job.id force_update = True right_preview, top_preview = get_preview_updates(None) yield last_video, right_preview, top_preview, '', 'Switching to current job...', gr.update(interactive=True), gr.update(value="❌ Cancel Current Job", visible=True) continue else: # If still waiting, sleep briefly and continue time.sleep(0.1) continue job = job_queue.get_job(job_id) if not job: # Correctly yield 7 items for the startup/no-job case # This ensures the status text goes to the right component and the buttons are set correctly. yield None, None, None, 'No job ID provided', '', gr.update(value="🚀 Add to Queue", interactive=True, visible=True), gr.update(interactive=False, visible=False) return # If a new video file is available, yield it immediately if job.result and job.result != last_video: last_video = job.result # You can also update preview/progress here if desired right_preview, top_preview = get_preview_updates(None) yield last_video, right_preview, top_preview, '', '', gr.update(interactive=True), gr.update(interactive=True) # Handle job status and progress if job.status == JobStatus.PENDING: position = job_queue.get_queue_position(job_id) right_preview, top_preview = get_preview_updates(None) yield last_video, right_preview, top_preview, '', f'Waiting in queue. Position: {position}', gr.update(interactive=True), gr.update(interactive=True) elif job.status == JobStatus.RUNNING: # Only reset the cancel button when a job transitions from another state to RUNNING # This ensures we don't reset the button text during cancellation if last_job_status != JobStatus.RUNNING: # Check if the button text is already "Cancelling..." - if so, don't change it # This prevents the button from changing back to "Cancel Current Job" during cancellation button_update = gr.update(interactive=True, value="❌ Cancel Current Job", visible=True) else: # Keep current text and state - important to not override "Cancelling..." text button_update = gr.update(interactive=True, visible=True) # Check if we have progress data and if it's time to update current_time = time.time() update_needed = force_update or (current_time - last_progress_update_time > 0.05) # More frequent updates # Always check for progress data, even if we don't have a preview yet if job.progress_data and update_needed: preview = job.progress_data.get('preview') desc = job.progress_data.get('desc', '') html = job.progress_data.get('html', '') # Only update the preview if it has changed or we're forcing an update # Ensure all components get an update current_preview_value = job.progress_data.get('preview') if job.progress_data else None current_desc_value = job.progress_data.get('desc', 'Processing...') if job.progress_data else 'Processing...' current_html_value = job.progress_data.get('html', make_progress_bar_html(0, 'Processing...')) if job.progress_data else make_progress_bar_html(0, 'Processing...') if current_preview_value is not None and (current_preview_value is not last_preview or force_update): last_preview = current_preview_value # Always update if force_update is true, or if it's time for a periodic update if force_update or update_needed: last_progress_update_time = current_time force_update = False right_preview, top_preview = get_preview_updates(last_preview) yield job.result, right_preview, top_preview, current_desc_value, current_html_value, gr.update(interactive=True), button_update # Fallback for periodic update if no new progress data but job is still running elif current_time - last_progress_update_time > 0.5: # More frequent fallback update last_progress_update_time = current_time force_update = False # Reset force_update after a yield current_desc_value = job.progress_data.get('desc', 'Processing...') if job.progress_data else 'Processing...' current_html_value = job.progress_data.get('html', make_progress_bar_html(0, 'Processing...')) if job.progress_data else make_progress_bar_html(0, 'Processing...') right_preview, top_preview = get_preview_updates(last_preview) yield job.result, right_preview, top_preview, current_desc_value, current_html_value, gr.update(interactive=True), button_update elif job.status == JobStatus.COMPLETED: # Show the final video and reset the button text right_preview, top_preview = get_preview_updates(last_preview) yield job.result, right_preview, top_preview, 'Completed', make_progress_bar_html(100, 'Completed'), gr.update(value="🚀 Add to Queue"), gr.update(interactive=True, value="❌ Cancel Current Job", visible=False) break elif job.status == JobStatus.FAILED: # Show error and reset the button text right_preview, top_preview = get_preview_updates(last_preview) yield job.result, right_preview, top_preview, f'Error: {job.error}', make_progress_bar_html(0, 'Failed'), gr.update(value="🚀 Add to Queue"), gr.update(interactive=True, value="❌ Cancel Current Job", visible=False) break elif job.status == JobStatus.CANCELLED: # Show cancelled message and reset the button text right_preview, top_preview = get_preview_updates(last_preview) yield job.result, right_preview, top_preview, 'Job cancelled', make_progress_bar_html(0, 'Cancelled'), gr.update(interactive=True), gr.update(interactive=True, value="❌ Cancel Current Job", visible=False) break # Update last_job_status for the next iteration last_job_status = job.status # Wait a bit before checking again time.sleep(0.05) # Reduced wait time for more responsive updates # Set Gradio temporary directory from settings os.environ["GRADIO_TEMP_DIR"] = settings.get("gradio_temp_dir") # Create the interface interface = create_interface( process_fn=process, monitor_fn=monitor_job, end_process_fn=end_process, update_queue_status_fn=update_queue_status, load_lora_file_fn=load_lora_file, job_queue=job_queue, settings=settings, lora_names=lora_names # Explicitly pass the found LoRA names ) # Launch the interface interface.launch( share=True, )