from diffusers_helper.hf_login import login import json import os import shutil from pathlib import PurePath, Path import time import argparse import traceback import einops import numpy as np import torch import datetime # Version information from modules.version import APP_VERSION # Set environment variables os.environ['HF_HOME'] = os.path.abspath(os.path.realpath(os.path.join(os.path.dirname(__file__), './hf_download'))) os.environ['TOKENIZERS_PARALLELISM'] = 'false' # Prevent tokenizers parallelism warning import gradio as gr from PIL import Image from PIL.PngImagePlugin import PngInfo from diffusers import AutoencoderKLHunyuanVideo from transformers import LlamaModel, CLIPTextModel, LlamaTokenizerFast, CLIPTokenizer from diffusers_helper.hunyuan import encode_prompt_conds, vae_decode, vae_encode, vae_decode_fake from diffusers_helper.utils import save_bcthw_as_mp4, crop_or_pad_yield_mask, soft_append_bcthw, resize_and_center_crop, generate_timestamp from diffusers_helper.models.hunyuan_video_packed import HunyuanVideoTransformer3DModelPacked from diffusers_helper.pipelines.k_diffusion_hunyuan import sample_hunyuan from diffusers_helper.memory import cpu, gpu, get_cuda_free_memory_gb, move_model_to_device_with_memory_preservation, offload_model_from_device_for_memory_preservation, fake_diffusers_current_device, DynamicSwapInstaller, unload_complete_models, load_model_as_complete from diffusers_helper.thread_utils import AsyncStream from diffusers_helper.gradio.progress_bar import make_progress_bar_html from transformers import SiglipImageProcessor, SiglipVisionModel from diffusers_helper.clip_vision import hf_clip_vision_encode from diffusers_helper.bucket_tools import find_nearest_bucket from diffusers_helper import lora_utils from diffusers_helper.lora_utils import load_lora, unload_all_loras # Import model generators from modules.generators import create_model_generator # Global cache for prompt embeddings prompt_embedding_cache = {} # Import from modules from modules.video_queue import VideoJobQueue, JobStatus from modules.prompt_handler import parse_timestamped_prompt from modules.interface import create_interface, format_queue_status from modules.settings import Settings from modules import DUMMY_LORA_NAME # Import the constant from modules.pipelines.metadata_utils import create_metadata from modules.pipelines.worker import worker # Try to suppress annoyingly persistent Windows asyncio proactor errors if os.name == 'nt': # Windows only import asyncio from functools import wraps # Replace the problematic proactor event loop with selector event loop asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) # Patch the base transport's close method def silence_event_loop_closed(func): @wraps(func) def wrapper(self, *args, **kwargs): try: return func(self, *args, **kwargs) except RuntimeError as e: if str(e) != 'Event loop is closed': raise return wrapper # Apply the patch if hasattr(asyncio.proactor_events._ProactorBasePipeTransport, '_call_connection_lost'): asyncio.proactor_events._ProactorBasePipeTransport._call_connection_lost = silence_event_loop_closed( asyncio.proactor_events._ProactorBasePipeTransport._call_connection_lost) # ADDED: Debug function to verify LoRA state def verify_lora_state(transformer, label=""): """Debug function to verify the state of LoRAs in a transformer model""" if transformer is None: print(f"[{label}] Transformer is None, cannot verify LoRA state") return has_loras = False if hasattr(transformer, 'peft_config'): adapter_names = list(transformer.peft_config.keys()) if transformer.peft_config else [] if adapter_names: has_loras = True print(f"[{label}] Transformer has LoRAs: {', '.join(adapter_names)}") else: print(f"[{label}] Transformer has no LoRAs in peft_config") else: print(f"[{label}] Transformer has no peft_config attribute") # Check for any LoRA modules for name, module in transformer.named_modules(): if hasattr(module, 'lora_A') and module.lora_A: has_loras = True # print(f"[{label}] Found lora_A in module {name}") if hasattr(module, 'lora_B') and module.lora_B: has_loras = True # print(f"[{label}] Found lora_B in module {name}") if not has_loras: print(f"[{label}] No LoRA components found in transformer") parser = argparse.ArgumentParser() parser.add_argument('--share', action='store_true') parser.add_argument("--server", type=str, default='0.0.0.0') parser.add_argument("--port", type=int, required=False) parser.add_argument("--inbrowser", action='store_true') parser.add_argument("--lora", type=str, default=None, help="Lora path (comma separated for multiple)") parser.add_argument("--offline", action='store_true', help="Run in offline mode") args = parser.parse_args() print(args) if args.offline: print("Offline mode enabled.") os.environ['HF_HUB_OFFLINE'] = '1' else: if 'HF_HUB_OFFLINE' in os.environ: del os.environ['HF_HUB_OFFLINE'] free_mem_gb = get_cuda_free_memory_gb(gpu) high_vram = free_mem_gb > 60 print(f'Free VRAM {free_mem_gb} GB') print(f'High-VRAM Mode: {high_vram}') # Load models text_encoder = LlamaModel.from_pretrained("hunyuanvideo-community/HunyuanVideo", subfolder='text_encoder', torch_dtype=torch.float16).cpu() text_encoder_2 = CLIPTextModel.from_pretrained("hunyuanvideo-community/HunyuanVideo", subfolder='text_encoder_2', torch_dtype=torch.float16).cpu() tokenizer = LlamaTokenizerFast.from_pretrained("hunyuanvideo-community/HunyuanVideo", subfolder='tokenizer') tokenizer_2 = CLIPTokenizer.from_pretrained("hunyuanvideo-community/HunyuanVideo", subfolder='tokenizer_2') vae = AutoencoderKLHunyuanVideo.from_pretrained("hunyuanvideo-community/HunyuanVideo", subfolder='vae', torch_dtype=torch.float16).cpu() feature_extractor = SiglipImageProcessor.from_pretrained("lllyasviel/flux_redux_bfl", subfolder='feature_extractor') image_encoder = SiglipVisionModel.from_pretrained("lllyasviel/flux_redux_bfl", subfolder='image_encoder', torch_dtype=torch.float16).cpu() # Initialize model generator placeholder current_generator = None # Will hold the currently active model generator # Load models based on VRAM availability later # Configure models vae.eval() text_encoder.eval() text_encoder_2.eval() image_encoder.eval() if not high_vram: vae.enable_slicing() vae.enable_tiling() vae.to(dtype=torch.float16) image_encoder.to(dtype=torch.float16) text_encoder.to(dtype=torch.float16) text_encoder_2.to(dtype=torch.float16) vae.requires_grad_(False) text_encoder.requires_grad_(False) text_encoder_2.requires_grad_(False) image_encoder.requires_grad_(False) # Create lora directory if it doesn't exist lora_dir = os.path.join(os.path.dirname(__file__), 'loras') os.makedirs(lora_dir, exist_ok=True) # Initialize LoRA support - moved scanning after settings load lora_names = [] lora_values = [] # This seems unused for population, might be related to weights later script_dir = os.path.dirname(os.path.abspath(__file__)) # Define default LoRA folder path relative to the script directory (used if setting is missing) default_lora_folder = os.path.join(script_dir, "loras") os.makedirs(default_lora_folder, exist_ok=True) # Ensure default exists if not high_vram: # DynamicSwapInstaller is same as huggingface's enable_sequential_offload but 3x faster DynamicSwapInstaller.install_model(text_encoder, device=gpu) else: text_encoder.to(gpu) text_encoder_2.to(gpu) image_encoder.to(gpu) vae.to(gpu) stream = AsyncStream() outputs_folder = './outputs/' os.makedirs(outputs_folder, exist_ok=True) # Initialize settings settings = Settings() # NEW: auto-cleanup on start-up option in Settings if settings.get("auto_cleanup_on_startup", False): print("--- Running Automatic Startup Cleanup ---") # Import the processor instance from modules.toolbox_app import tb_processor # Call the single cleanup function and print its summary. cleanup_summary = tb_processor.tb_clear_temporary_files() print(f"{cleanup_summary}") # This cleaner print handles the multiline string well print("--- Startup Cleanup Complete ---") # --- Populate LoRA names AFTER settings are loaded --- lora_folder_from_settings: str = settings.get("lora_dir", default_lora_folder) # Use setting, fallback to default print(f"Scanning for LoRAs in: {lora_folder_from_settings}") if os.path.isdir(lora_folder_from_settings): try: for root, _, files in os.walk(lora_folder_from_settings): for file in files: if file.endswith('.safetensors') or file.endswith('.pt'): lora_relative_path = os.path.relpath(os.path.join(root, file), lora_folder_from_settings) lora_name = str(PurePath(lora_relative_path).with_suffix('')) lora_names.append(lora_name) print(f"Found LoRAs: {lora_names}") # Temp solution for only 1 lora if len(lora_names) == 1: lora_names.append(DUMMY_LORA_NAME) except Exception as e: print(f"Error scanning LoRA directory '{lora_folder_from_settings}': {e}") else: print(f"LoRA directory not found: {lora_folder_from_settings}") # --- End LoRA population --- # Create job queue job_queue = VideoJobQueue() # Function to load a LoRA file def load_lora_file(lora_file: str | PurePath): if not lora_file: return None, "No file selected" try: # Get the filename from the path lora_path = PurePath(lora_file) lora_name = lora_path.name # Copy the file to the lora directory lora_dest = PurePath(lora_dir, lora_path) import shutil shutil.copy(lora_file, lora_dest) # Load the LoRA global current_generator, lora_names if current_generator is None: return None, "Error: No model loaded to apply LoRA to. Generate something first." # Unload any existing LoRAs first current_generator.unload_loras() # Load the single LoRA selected_loras = [lora_path.stem] current_generator.load_loras(selected_loras, lora_dir, selected_loras) # Add to lora_names if not already there lora_base_name = lora_path.stem if lora_base_name not in lora_names: lora_names.append(lora_base_name) # Get the current device of the transformer device = next(current_generator.transformer.parameters()).device # Move all LoRA adapters to the same device as the base model current_generator.move_lora_adapters_to_device(device) print(f"Loaded LoRA: {lora_name} to {current_generator.get_model_name()} model") return gr.update(choices=lora_names), f"Successfully loaded LoRA: {lora_name}" except Exception as e: print(f"Error loading LoRA: {e}") return None, f"Error loading LoRA: {e}" @torch.no_grad() def get_cached_or_encode_prompt(prompt, text_encoder, text_encoder_2, tokenizer, tokenizer_2, target_device): """ Retrieves prompt embeddings from cache or encodes them if not found. Stores encoded embeddings (on CPU) in the cache. Returns embeddings moved to the target_device. """ if prompt in prompt_embedding_cache: print(f"Cache hit for prompt: {prompt[:60]}...") llama_vec_cpu, llama_mask_cpu, clip_l_pooler_cpu = prompt_embedding_cache[prompt] # Move cached embeddings (from CPU) to the target device llama_vec = llama_vec_cpu.to(target_device) llama_attention_mask = llama_mask_cpu.to(target_device) if llama_mask_cpu is not None else None clip_l_pooler = clip_l_pooler_cpu.to(target_device) return llama_vec, llama_attention_mask, clip_l_pooler else: print(f"Cache miss for prompt: {prompt[:60]}...") llama_vec, clip_l_pooler = encode_prompt_conds( prompt, text_encoder, text_encoder_2, tokenizer, tokenizer_2 ) llama_vec, llama_attention_mask = crop_or_pad_yield_mask(llama_vec, length=512) # Store CPU copies in cache prompt_embedding_cache[prompt] = (llama_vec.cpu(), llama_attention_mask.cpu() if llama_attention_mask is not None else None, clip_l_pooler.cpu()) # Return embeddings already on the target device (as encode_prompt_conds uses the model's device) return llama_vec, llama_attention_mask, clip_l_pooler # Set the worker function for the job queue - using the imported worker from modules/pipelines/worker.py job_queue.set_worker_function(worker) def process( model_type, input_image, end_frame_image, # NEW end_frame_strength, # NEW prompt_text, n_prompt, seed, total_second_length, latent_window_size, steps, cfg, gs, rs, use_teacache, teacache_num_steps, teacache_rel_l1_thresh, use_magcache, magcache_threshold, magcache_max_consecutive_skips, magcache_retention_ratio, blend_sections, latent_type, clean_up_videos, selected_loras, resolutionW, resolutionH, input_image_path, combine_with_source, num_cleaned_frames, *lora_args, save_metadata_checked=True, # NEW: Parameter to control metadata saving ): # Create a blank black image if no # Create a default image based on the selected latent_type has_input_image = True if input_image is None: has_input_image = False default_height, default_width = resolutionH, resolutionW if latent_type == "White": # Create a white image input_image = np.ones((default_height, default_width, 3), dtype=np.uint8) * 255 print("No input image provided. Using a blank white image.") elif latent_type == "Noise": # Create a noise image input_image = np.random.randint(0, 256, (default_height, default_width, 3), dtype=np.uint8) print("No input image provided. Using a random noise image.") elif latent_type == "Green Screen": # Create a green screen image with standard chroma key green (0, 177, 64) input_image = np.zeros((default_height, default_width, 3), dtype=np.uint8) input_image[:, :, 1] = 177 # Green channel input_image[:, :, 2] = 64 # Blue channel # Red channel remains 0 print("No input image provided. Using a standard chroma key green screen.") else: # Default to "Black" or any other value # Create a black image input_image = np.zeros((default_height, default_width, 3), dtype=np.uint8) print(f"No input image provided. Using a blank black image (latent_type: {latent_type}).") # Handle input files - copy to input_files_dir to prevent them from being deleted by temp cleanup input_files_dir = settings.get("input_files_dir") os.makedirs(input_files_dir, exist_ok=True) # Process input image (if it's a file path) input_image_path = None if isinstance(input_image, str) and os.path.exists(input_image): # It's a file path, copy it to input_files_dir filename = os.path.basename(input_image) input_image_path = os.path.join(input_files_dir, f"{generate_timestamp()}_{filename}") try: shutil.copy2(input_image, input_image_path) print(f"Copied input image to {input_image_path}") # For Video model, we'll use the path if model_type == "Video": input_image = input_image_path except Exception as e: print(f"Error copying input image: {e}") # Process end frame image (if it's a file path) end_frame_image_path = None if isinstance(end_frame_image, str) and os.path.exists(end_frame_image): # It's a file path, copy it to input_files_dir filename = os.path.basename(end_frame_image) end_frame_image_path = os.path.join(input_files_dir, f"{generate_timestamp()}_{filename}") try: shutil.copy2(end_frame_image, end_frame_image_path) print(f"Copied end frame image to {end_frame_image_path}") except Exception as e: print(f"Error copying end frame image: {e}") # Extract lora_loaded_names from lora_args lora_loaded_names = lora_args[0] if lora_args and len(lora_args) > 0 else [] lora_values = lora_args[1:] if lora_args and len(lora_args) > 1 else [] # Create job parameters job_params = { 'model_type': model_type, 'input_image': input_image.copy() if hasattr(input_image, 'copy') else input_image, # Handle both image arrays and video paths 'end_frame_image': end_frame_image.copy() if end_frame_image is not None else None, 'end_frame_strength': end_frame_strength, 'prompt_text': prompt_text, 'n_prompt': n_prompt, 'seed': seed, 'total_second_length': total_second_length, 'latent_window_size': latent_window_size, 'latent_type': latent_type, 'steps': steps, 'cfg': cfg, 'gs': gs, 'rs': rs, 'blend_sections': blend_sections, 'use_teacache': use_teacache, 'teacache_num_steps': teacache_num_steps, 'teacache_rel_l1_thresh': teacache_rel_l1_thresh, 'use_magcache': use_magcache, 'magcache_threshold': magcache_threshold, 'magcache_max_consecutive_skips': magcache_max_consecutive_skips, 'magcache_retention_ratio': magcache_retention_ratio, 'selected_loras': selected_loras, 'has_input_image': has_input_image, 'output_dir': settings.get("output_dir"), 'metadata_dir': settings.get("metadata_dir"), 'input_files_dir': input_files_dir, # Add input_files_dir to job parameters 'input_image_path': input_image_path, # Add the path to the copied input image 'end_frame_image_path': end_frame_image_path, # Add the path to the copied end frame image 'resolutionW': resolutionW, # Add resolution parameter 'resolutionH': resolutionH, 'lora_loaded_names': lora_loaded_names, 'combine_with_source': combine_with_source, # Add combine_with_source parameter 'num_cleaned_frames': num_cleaned_frames, 'save_metadata_checked': save_metadata_checked, # NEW: Add save_metadata_checked parameter } # Print teacache parameters for debugging print(f"Teacache parameters: use_teacache={use_teacache}, teacache_num_steps={teacache_num_steps}, teacache_rel_l1_thresh={teacache_rel_l1_thresh}") # Add LoRA values if provided - extract them from the tuple if lora_values: # Convert tuple to list lora_values_list = list(lora_values) job_params['lora_values'] = lora_values_list # Add job to queue job_id = job_queue.add_job(job_params) # Set the generation_type attribute on the job object directly job = job_queue.get_job(job_id) if job: job.generation_type = model_type # Set generation_type to model_type for display in queue print(f"Added job {job_id} to queue") queue_status = update_queue_status() # Return immediately after adding to queue # Return separate updates for start_button and end_button to prevent cross-contamination return None, job_id, None, '', f'Job added to queue. Job ID: {job_id}', gr.update(value="🚀 Add to Queue", interactive=True), gr.update(value="❌ Cancel Current Job", interactive=True) def end_process(): """Cancel the current running job and update the queue status""" print("Cancelling current job") with job_queue.lock: if job_queue.current_job: job_id = job_queue.current_job.id print(f"Cancelling job {job_id}") # Send the end signal to the job's stream if job_queue.current_job.stream: job_queue.current_job.stream.input_queue.push('end') # Mark the job as cancelled job_queue.current_job.status = JobStatus.CANCELLED job_queue.current_job.completed_at = time.time() # Set completion time # Force an update to the queue status return update_queue_status() def update_queue_status(): """Update queue status and refresh job positions""" jobs = job_queue.get_all_jobs() for job in jobs: if job.status == JobStatus.PENDING: job.queue_position = job_queue.get_queue_position(job.id) # Make sure to update current running job info if job_queue.current_job: # Make sure the running job is showing status = RUNNING job_queue.current_job.status = JobStatus.RUNNING # Update the toolbar stats pending_count = 0 running_count = 0 completed_count = 0 for job in jobs: if hasattr(job, 'status'): status = str(job.status) if status == "JobStatus.PENDING": pending_count += 1 elif status == "JobStatus.RUNNING": running_count += 1 elif status == "JobStatus.COMPLETED": completed_count += 1 return format_queue_status(jobs) def monitor_job(job_id=None): """ Monitor a specific job and update the UI with the latest video segment as soon as it's available. If no job_id is provided, check if there's a current job in the queue. ALWAYS shows the current running job, regardless of the job_id provided. """ last_video = None # Track the last video file shown last_job_status = None # Track the previous job status to detect status changes last_progress_update_time = time.time() # Track when we last updated the progress last_preview = None # Track the last preview image shown force_update = True # Force an update on first iteration # Flag to indicate we're waiting for a job transition waiting_for_transition = False transition_start_time = None max_transition_wait = 5.0 # Maximum time to wait for transition in seconds def get_preview_updates(preview_value): """Create preview updates that respect the latents_display_top setting""" display_top = settings.get("latents_display_top", False) if display_top: # Top display enabled: update top preview with value, don't update right preview return gr.update(), preview_value if preview_value is not None else gr.update() else: # Right column display: update right preview with value, don't update top preview return preview_value if preview_value is not None else gr.update(), gr.update() while True: # ALWAYS check if there's a current running job that's different from our tracked job_id with job_queue.lock: current_job = job_queue.current_job if current_job and current_job.id != job_id and current_job.status == JobStatus.RUNNING: # Always switch to the current running job job_id = current_job.id waiting_for_transition = False force_update = True # Yield a temporary update to show we're switching jobs right_preview, top_preview = get_preview_updates(None) yield last_video, right_preview, top_preview, '', 'Switching to current job...', gr.update(interactive=True), gr.update(value="❌ Cancel Current Job", visible=True) continue # Check if we're waiting for a job transition if waiting_for_transition: current_time = time.time() # If we've been waiting too long, stop waiting if current_time - transition_start_time > max_transition_wait: waiting_for_transition = False # Check one more time for a current job with job_queue.lock: current_job = job_queue.current_job if current_job and current_job.status == JobStatus.RUNNING: # Switch to whatever job is currently running job_id = current_job.id force_update = True right_preview, top_preview = get_preview_updates(None) yield last_video, right_preview, top_preview, '', 'Switching to current job...', gr.update(interactive=True), gr.update(value="❌ Cancel Current Job", visible=True) continue else: # If still waiting, sleep briefly and continue time.sleep(0.1) continue job = job_queue.get_job(job_id) if not job: # Correctly yield 7 items for the startup/no-job case # This ensures the status text goes to the right component and the buttons are set correctly. yield None, None, None, 'No job ID provided', '', gr.update(value="🚀 Add to Queue", interactive=True, visible=True), gr.update(interactive=False, visible=False) return # If a new video file is available, yield it immediately if job.result and job.result != last_video: last_video = job.result # You can also update preview/progress here if desired right_preview, top_preview = get_preview_updates(None) yield last_video, right_preview, top_preview, '', '', gr.update(interactive=True), gr.update(interactive=True) # Handle job status and progress if job.status == JobStatus.PENDING: position = job_queue.get_queue_position(job_id) right_preview, top_preview = get_preview_updates(None) yield last_video, right_preview, top_preview, '', f'Waiting in queue. Position: {position}', gr.update(interactive=True), gr.update(interactive=True) elif job.status == JobStatus.RUNNING: # Only reset the cancel button when a job transitions from another state to RUNNING # This ensures we don't reset the button text during cancellation if last_job_status != JobStatus.RUNNING: # Check if the button text is already "Cancelling..." - if so, don't change it # This prevents the button from changing back to "Cancel Current Job" during cancellation button_update = gr.update(interactive=True, value="❌ Cancel Current Job", visible=True) else: # Keep current text and state - important to not override "Cancelling..." text button_update = gr.update(interactive=True, visible=True) # Check if we have progress data and if it's time to update current_time = time.time() update_needed = force_update or (current_time - last_progress_update_time > 0.05) # More frequent updates # Always check for progress data, even if we don't have a preview yet if job.progress_data and update_needed: preview = job.progress_data.get('preview') desc = job.progress_data.get('desc', '') html = job.progress_data.get('html', '') # Only update the preview if it has changed or we're forcing an update # Ensure all components get an update current_preview_value = job.progress_data.get('preview') if job.progress_data else None current_desc_value = job.progress_data.get('desc', 'Processing...') if job.progress_data else 'Processing...' current_html_value = job.progress_data.get('html', make_progress_bar_html(0, 'Processing...')) if job.progress_data else make_progress_bar_html(0, 'Processing...') if current_preview_value is not None and (current_preview_value is not last_preview or force_update): last_preview = current_preview_value # Always update if force_update is true, or if it's time for a periodic update if force_update or update_needed: last_progress_update_time = current_time force_update = False right_preview, top_preview = get_preview_updates(last_preview) yield job.result, right_preview, top_preview, current_desc_value, current_html_value, gr.update(interactive=True), button_update # Fallback for periodic update if no new progress data but job is still running elif current_time - last_progress_update_time > 0.5: # More frequent fallback update last_progress_update_time = current_time force_update = False # Reset force_update after a yield current_desc_value = job.progress_data.get('desc', 'Processing...') if job.progress_data else 'Processing...' current_html_value = job.progress_data.get('html', make_progress_bar_html(0, 'Processing...')) if job.progress_data else make_progress_bar_html(0, 'Processing...') right_preview, top_preview = get_preview_updates(last_preview) yield job.result, right_preview, top_preview, current_desc_value, current_html_value, gr.update(interactive=True), button_update elif job.status == JobStatus.COMPLETED: # Show the final video and reset the button text right_preview, top_preview = get_preview_updates(last_preview) yield job.result, right_preview, top_preview, 'Completed', make_progress_bar_html(100, 'Completed'), gr.update(value="🚀 Add to Queue"), gr.update(interactive=True, value="❌ Cancel Current Job", visible=False) break elif job.status == JobStatus.FAILED: # Show error and reset the button text right_preview, top_preview = get_preview_updates(last_preview) yield job.result, right_preview, top_preview, f'Error: {job.error}', make_progress_bar_html(0, 'Failed'), gr.update(value="🚀 Add to Queue"), gr.update(interactive=True, value="❌ Cancel Current Job", visible=False) break elif job.status == JobStatus.CANCELLED: # Show cancelled message and reset the button text right_preview, top_preview = get_preview_updates(last_preview) yield job.result, right_preview, top_preview, 'Job cancelled', make_progress_bar_html(0, 'Cancelled'), gr.update(interactive=True), gr.update(interactive=True, value="❌ Cancel Current Job", visible=False) break # Update last_job_status for the next iteration last_job_status = job.status # Wait a bit before checking again time.sleep(0.05) # Reduced wait time for more responsive updates # Set Gradio temporary directory from settings os.environ["GRADIO_TEMP_DIR"] = settings.get("gradio_temp_dir") # Create the interface interface = create_interface( process_fn=process, monitor_fn=monitor_job, end_process_fn=end_process, update_queue_status_fn=update_queue_status, load_lora_file_fn=load_lora_file, job_queue=job_queue, settings=settings, lora_names=lora_names # Explicitly pass the found LoRA names ) # Launch the interface interface.launch( server_name=args.server, server_port=args.port, share=args.share, inbrowser=args.inbrowser, allowed_paths=[settings.get("output_dir"), settings.get("metadata_dir")], )