# demo/app.py import gradio as gr import torch import time import logging import random import numpy as np from diffusers import StableDiffusionXLPipeline, EulerAncestralDiscreteScheduler import threading import queue import spaces # Import the component and ALL its utilities from gradio_livelog import LiveLog from gradio_livelog.utils import ProgressTracker, capture_logs # --- 1. LOGIC FOR THE "LIVELOG FEATURE DEMO" TAB --- # Configure logging for the demo logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s') def run_process(disable_console: bool, run_error_case: bool): """ The generator function for the interactive feature demo. """ with capture_logs(disable_console=disable_console) as get_logs: total_steps = 100 tracker = ProgressTracker(total=total_steps, description="Simulating a process...") logging.info(f"Starting simulated process with {total_steps} steps...") for record in get_logs(): yield {"type": "log", "level": record.levelname, "content": record.getMessage()} for i in range(total_steps): time.sleep(0.03) current_step = i + 1 if current_step == 10: logging.warning(f"Low disk space warning at step {current_step}.") elif current_step == 30: logging.log(logging.INFO + 5, f"Asset pack loaded successfully at step {current_step}.") elif current_step == 75: logging.critical(f"Checksum mismatch! Data may be corrupt at step {current_step}.") if run_error_case and current_step == 50: logging.error("A fatal simulation error occurred! Aborting.") for record in get_logs(): yield {"type": "log", "level": record.levelname, "content": record.getMessage()} yield tracker.update(advance=0, status="error") return yield tracker.update() for record in get_logs(): level_name = "SUCCESS" if record.levelno == logging.INFO + 5 else record.levelname yield {"type": "log", "level": level_name, "content": record.getMessage()} logging.log(logging.INFO + 5, "Process completed successfully!") for record in get_logs(): level_name = "SUCCESS" if record.levelno == logging.INFO + 5 else record.levelname yield {"type": "log", "level": level_name, "content": record.getMessage()} yield tracker.update(advance=0, status="success") def update_livelog_properties(mode, color, lines, scroll): return gr.update(display_mode=mode, background_color=color, line_numbers=lines, autoscroll=scroll) def clear_output(): return None def run_success_case(disable_console: bool): yield from run_process(disable_console=disable_console, run_error_case=False) def run_error_case(disable_console: bool): yield from run_process(disable_console=disable_console, run_error_case=True) # --- 2. LOGIC FOR THE "Diffusion Pipeline Integration" TAB --- MODEL_ID = "SG161222/RealVisXL_V5.0" MAX_SEED = np.iinfo(np.int32).max print("Loading Stable Diffusion model... This may take a moment.") pipe = StableDiffusionXLPipeline.from_pretrained( MODEL_ID, torch_dtype=torch.float16, use_safetensors=True, add_watermarker=False ) pipe.enable_vae_tiling() pipe.enable_model_cpu_offload() pipe.scheduler = EulerAncestralDiscreteScheduler.from_config(pipe.scheduler.config) print("Model loaded successfully.") pipe.set_progress_bar_config(disable=True) @spaces.GPU(duration=60, enable_queue=True) def run_diffusion_in_thread(prompt: str, update_queue: queue.Queue): """This function contains the blocking pipe() call and runs in a worker thread.""" tracker = None try: seed = random.randint(0, MAX_SEED) generator = torch.Generator(device="cuda").manual_seed(seed) prompt_style = f"hyper-realistic 8K image of {prompt}. ultra-detailed, lifelike, high-resolution, sharp, vibrant colors, photorealistic" negative_prompt_style = "cartoonish, low resolution, blurry, simplistic, abstract, deformed, ugly" num_inference_steps = 20 update_queue.put((None, {"type": "log", "level": "INFO", "content": f"Using seed: {seed}"})) update_queue.put((None, {"type": "log", "level": "INFO", "content": "Starting diffusion process..."})) tracker = ProgressTracker(total=num_inference_steps, description="Diffusion Steps") def progress_callback(pipe_instance, step, timestep, callback_kwargs): update_dict = tracker.update() update_queue.put((None, update_dict)) return callback_kwargs images = pipe( prompt=prompt_style, negative_prompt=negative_prompt_style, width=1024, height=1024, guidance_scale=3.0, num_inference_steps=num_inference_steps, generator=generator, callback_on_step_end=progress_callback ).images update_queue.put((images, {"type": "log", "level": "SUCCESS", "content": "Image generated successfully!"})) final_update = tracker.update(advance=0, status="success") update_queue.put((images, final_update)) except Exception as e: logging.error(f"Error in diffusion thread: {e}", exc_info=True) if tracker: error_update = tracker.update(advance=0, status="error") update_queue.put((None, error_update)) update_queue.put((None, {"type": "log", "level": "ERROR", "content": f"An error occurred: {e}"})) finally: update_queue.put(None) def generate(prompt): """This function starts the worker thread and yields updates from the queue.""" yield None, {"type": "log", "level": "INFO", "content": "Preparing generation..."} update_queue = queue.Queue() diffusion_thread = threading.Thread(target=run_diffusion_in_thread, args=(prompt, update_queue)) diffusion_thread.start() while True: update = update_queue.get() if update is None: break yield update # --- 3. THE COMBINED GRADIO UI with TABS --- with gr.Blocks(theme=gr.themes.Ocean()) as demo: gr.HTML("