# demo/app.py import spaces import gradio as gr import torch from diffusers import StableDiffusionXLPipeline, EulerAncestralDiscreteScheduler import queue import threading import asyncio import sys import logging import random import numpy as np # Import the component and ALL its utilities from gradio_livelog import LiveLog from gradio_livelog.utils import ProgressTracker, Tee, TqdmToQueueWriter, capture_logs # --- 1. SETUP --- MODEL_ID = "SG161222/RealVisXL_V5.0_Lightning" MAX_SEED = np.iinfo(np.int32).max # --- 2. LOGIC FOR THE "LIVELOG FEATURE DEMO" TAB --- app_logger = logging.getLogger("logging_app") app_logger.setLevel(logging.INFO) # Avoid adding duplicate handlers if the script is reloaded if not app_logger.handlers: console_handler = logging.StreamHandler() console_handler.flush = sys.stderr.flush app_logger.addHandler(console_handler) async def run_process(disable_console: bool, rate_unit: str, run_error_case: bool): with capture_logs(log_level=logging.INFO, log_name=["logging_app"], disable_console=disable_console) as get_logs: #You can watch more than one log if you wish in log_name. If you do not pass log_name, the default log will be used. total_steps = 100 tracker = ProgressTracker(total=total_steps, description="Simulating a process...", rate_unit=rate_unit) all_logs = [] last_log_content = None initial_log = f"Starting simulated process with {total_steps} steps..." app_logger.info(initial_log) logs = [ { "type": "log", "level": "SUCCESS" if record.levelno == logging.INFO + 5 else record.levelname, "content": record.getMessage() } for record in get_logs() ] all_logs.extend(logs) last_log_content = logs[-1]["content"] if logs else None yield tracker.update(advance=0, status="running", logs=all_logs, log_content=None) for i in range(total_steps): await asyncio.sleep(0.03) current_step = i + 1 if current_step == 10: app_logger.warning(f"Low disk space warning at step {current_step}.") elif current_step == 30: app_logger.log(logging.INFO + 5, f"Asset pack loaded successfully at step {current_step}.") elif current_step == 75: app_logger.critical(f"Checksum mismatch! Data may be corrupt at step {current_step}.") if run_error_case and current_step == 50: app_logger.error("A fatal simulation error occurred! Aborting.") logs = [ { "type": "log", "level": "SUCCESS" if record.levelno == logging.INFO + 5 else record.levelname, "content": record.getMessage() } for record in get_logs() ] all_logs.extend(logs) last_log_content = logs[-1]["content"] if logs else last_log_content yield tracker.update(advance=0, status="error", logs=all_logs, log_content=last_log_content) return logs = [ { "type": "log", "level": "SUCCESS" if record.levelno == logging.INFO + 5 else record.levelname, "content": record.getMessage() } for record in get_logs() ] all_logs.extend(logs) if logs: last_log_content = logs[-1]["content"] yield tracker.update(advance=1, status="running", logs=all_logs, log_content=last_log_content) final_log = "Process completed successfully!" app_logger.log(logging.INFO + 5, final_log) logs = [ { "type": "log", "level": "SUCCESS" if record.levelno == logging.INFO + 5 else record.levelname, "content": record.getMessage() } for record in get_logs() ] all_logs.extend(logs) last_log_content = logs[-1]["content"] if logs else last_log_content yield tracker.update(advance=0, status="success", logs=all_logs, log_content=last_log_content) def update_livelog_properties(mode, color, lines, scroll): return gr.update(display_mode=mode, background_color=color, line_numbers=lines, autoscroll=scroll) def clear_output(): return None async def run_success_case(disable_console: bool, rate_unit: str): yield None async for update in run_process(disable_console=disable_console, rate_unit=rate_unit, run_error_case=False): yield update async def run_error_case(disable_console: bool, rate_unit: str): yield None async for update in run_process(disable_console=disable_console, rate_unit=rate_unit, run_error_case=True): yield update # --- 3. LOGIC FOR THE "DIFFUSION PIPELINE INTEGRATION" TAB --- diffusion_pipeline = None pipeline_lock = threading.Lock() def load_pipeline(on_load=True): """A function to load the model, ensuring it's only done once.""" global diffusion_pipeline with pipeline_lock: if diffusion_pipeline is None: print("Loading Stable Diffusion model for the first time...") pipe = StableDiffusionXLPipeline.from_pretrained( MODEL_ID, torch_dtype=torch.float16, use_safetensors=True, add_watermarker=False, device_map="cuda" ) pipe.enable_vae_tiling() #pipe.enable_model_cpu_offload() #disable this on huggingface spaces pipe.scheduler = EulerAncestralDiscreteScheduler.from_config(pipe.scheduler.config) diffusion_pipeline = pipe print("Model loaded successfully!") if not on_load: return diffusion_pipeline @spaces.GPU(duration=60, enable_queue=True) def run_diffusion_in_thread(prompt: str, disable_console: bool, update_queue: queue.Queue): """ This function now uses capture_logs to listen to internal diffusers logs while retaining the superior data structure you designed. """ tracker = None with capture_logs(log_level=logging.INFO, log_name=["logging_app"], disable_console=disable_console) as get_logs: #You can watch more than one log if you wish in log_name. If you do not pass log_name, the default log will be used. try: pipe = load_pipeline(on_load=False) #We will capture pipeline tqdm s/it progress instead rate_queue = queue.Queue() tqdm_writer = TqdmToQueueWriter(rate_queue) progress_bar_handler = Tee(sys.stderr, tqdm_writer) pipe.set_progress_bar_config(file=progress_bar_handler, #if you dont need to see the tqdm progress on console set file=tqdm_writer instead disable=False, ncols=100, dynamic_ncols=True, ascii=" █") seed = random.randint(0, MAX_SEED) generator = torch.Generator(device="cuda").manual_seed(seed) prompt_style = f"hyper-realistic 8K image of {prompt}. ultra-detailed, lifelike, high-resolution, sharp, vibrant colors, photorealistic" negative_prompt_style = "cartoonish, low resolution, blurry, simplistic, abstract, deformed, ugly" num_inference_steps = 10 all_logs = [] last_known_rate_data = None # Helper function to process and store new logs def process_and_send_updates(status="running", advance=0, final_image_payload=None): """ This is the core callback function. It captures new logs, formats them, and sends a complete update object (logs + progress) to the UI queue. This should also be called after the log record. """ nonlocal all_logs, last_known_rate_data new_rate_data = None while not rate_queue.empty(): try: new_rate_data = rate_queue.get_nowait() except queue.Empty: break if new_rate_data is not None: last_known_rate_data = new_rate_data new_records = get_logs() if new_records: new_logs = [{ "type": "log", "level": "SUCCESS" if r.levelno == logging.INFO + 5 else r.levelname, "content": r.getMessage() } for r in new_records] all_logs.extend(new_logs) # Use the tracker to generate the progress update dictionary if it exists. # If not, create a preliminary update dictionary. update_dict = {} if tracker: update_dict = tracker.update( advance=advance, status=status, logs=all_logs, rate_data=last_known_rate_data ) else: # Initial state before the tracker is created. update_dict = { "type": "progress", "logs": all_logs, "current": 0, "total": num_inference_steps, "desc": "Diffusion Steps" # Description is sent once } # Put the update on the queue. The image payload is usually None # until the very end. update_queue.put((final_image_payload, update_dict)) app_logger.info(f"Using seed: {seed}") process_and_send_updates() app_logger.info("Starting diffusion process...") process_and_send_updates() tracker = ProgressTracker(total=num_inference_steps, description="Diffusion Steps", rate_unit='it/s') def progress_callback(pipe_instance, step, timestep, callback_kwargs): process_and_send_updates(advance=1) return callback_kwargs images = pipe( prompt=prompt_style, negative_prompt=negative_prompt_style, width=1024, height=1024, guidance_scale=3.0, num_inference_steps=num_inference_steps, generator=generator, callback_on_step_end=progress_callback ).images app_logger.log(logging.INFO + 5, "Image generated successfully!") process_and_send_updates(status="success", final_image_payload=images) except Exception as e: app_logger.error(f"Error in diffusion thread: {e}, process aborted!", exc_info=True) process_and_send_updates(status="error") finally: update_queue.put(None) @spaces.GPU(duration=60, enable_queue=True) def generate(prompt): """This function starts the worker thread and yields updates from the queue.""" yield None, None, gr.update(interactive=False) update_queue = queue.Queue() diffusion_thread = threading.Thread(target=run_diffusion_in_thread, args=(prompt, False, update_queue)) diffusion_thread.start() final_images = None log_update = None while True: update = update_queue.get() if update is None: break images, log_update = update if images: final_images = images yield final_images, log_update, gr.skip() yield final_images, log_update, gr.update(interactive=True) # --- 4. THE COMBINED GRADIO UI with TABS --- with gr.Blocks(theme=gr.themes.Ocean()) as demo: gr.HTML("