Wan2.2-Lora / app.py
rahul7star's picture
Update app.py
c3e1669 verified
# PyTorch 2.8 (temporary hack)
import os
os.system('pip install --upgrade --pre --extra-index-url https://download.pytorch.org/whl/nightly/cu126 "torch<2.9"')
from huggingface_hub import HfApi, upload_file
import os
import uuid
import subprocess
import tempfile
import logging
import shutil
import os
from huggingface_hub import HfApi, upload_file
from datetime import datetime
import uuid
# Actual demo code
import spaces
import torch
from diffusers.pipelines.wan.pipeline_wan_i2v import WanImageToVideoPipeline
from diffusers.models.transformers.transformer_wan import WanTransformer3DModel
from diffusers.utils.export_utils import export_to_video
import gradio as gr
import tempfile
import numpy as np
from PIL import Image
import random
import gc
from optimization import optimize_pipeline_
from huggingface_hub import hf_hub_download
MODEL_ID = "Wan-AI/Wan2.2-I2V-A14B-Diffusers"
HF_MODEL = os.environ.get("HF_UPLOAD_REPO", "rahul7star/wan22lora-text-img-video-analysis")
from huggingface_hub import HfApi, upload_file
import os
import uuid
import os
import uuid
import logging
from datetime import datetime
def upscale_and_upload_4k(input_video_path: str, input_image, summary_text: str) -> str:
"""
Upscale a video to 4K and upload it to Hugging Face Hub along with the input image and a text summary.
Args:
input_video_path (str): Path to the original video.
input_image (PIL.Image.Image or path-like): Input image to upload alongside the video.
summary_text (str): Text summary or prompt to upload alongside the video.
Returns:
str: Hugging Face folder path where the video, image, and summary were uploaded.
"""
logging.info(f"Upscaling video to 4K for upload: {input_video_path}")
# --- Upscale video ---
with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as tmp_upscaled:
upscaled_path = tmp_upscaled.name
cmd = [
"ffmpeg",
"-i", input_video_path,
"-vf", "scale=3840:2160:flags=lanczos",
"-c:v", "libx264",
"-crf", "18",
"-preset", "slow",
"-y",
upscaled_path,
]
try:
subprocess.run(cmd, check=True, capture_output=True)
logging.info(f"✅ Upscaled video created at: {upscaled_path}")
except subprocess.CalledProcessError as e:
logging.error(f"FFmpeg failed:\n{e.stderr.decode()}")
raise
# --- Create HF folder ---
today_str = datetime.now().strftime("%Y-%m-%d")
unique_subfolder = f"upload_{uuid.uuid4().hex[:8]}"
hf_folder = f"{today_str}-WAN-I2V/{unique_subfolder}"
# --- Upload video ---
video_filename = os.path.basename(input_video_path)
video_hf_path = f"{hf_folder}/{video_filename}"
upload_file(
path_or_fileobj=upscaled_path,
path_in_repo=video_hf_path,
repo_id=HF_MODEL,
repo_type="model",
token=os.environ.get("HUGGINGFACE_HUB_TOKEN"),
)
logging.info(f"✅ Uploaded 4K video to HF: {video_hf_path}")
# --- Upload input image ---
with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as tmp_img:
if isinstance(input_image, str):
import shutil
shutil.copy(input_image, tmp_img.name)
else:
input_image.save(tmp_img.name, format="PNG")
tmp_img_path = tmp_img.name
image_hf_path = f"{hf_folder}/input_image.png"
upload_file(
path_or_fileobj=tmp_img_path,
path_in_repo=image_hf_path,
repo_id=HF_MODEL,
repo_type="model",
token=os.environ.get("HUGGINGFACE_HUB_TOKEN"),
)
logging.info(f"✅ Uploaded input image to HF: {image_hf_path}")
# --- Upload summary text ---
summary_file = tempfile.NamedTemporaryFile(delete=False, suffix=".txt").name
with open(summary_file, "w", encoding="utf-8") as f:
f.write(summary_text)
summary_hf_path = f"{hf_folder}/summary.txt"
upload_file(
path_or_fileobj=summary_file,
path_in_repo=summary_hf_path,
repo_id=HF_MODEL,
repo_type="model",
token=os.environ.get("HUGGINGFACE_HUB_TOKEN"),
)
logging.info(f"✅ Uploaded summary to HF: {summary_hf_path}")
# --- Cleanup temporary files ---
os.remove(upscaled_path)
os.remove(tmp_img_path)
os.remove(summary_file)
return hf_folder
LORA_REPO_ID = "rahul7star/wan2.2Lora"
LORA_SETS = {
"NF": {
"high_noise": {"file": "DR34ML4Y_I2V_14B_HIGH.safetensors", "adapter_name": "nf_high"},
"low_noise": {"file": "DR34ML4Y_I2V_14B_LOW.safetensors", "adapter_name": "nf_low"}
},
"BP": {
"high_noise": {"file": "Wan2.2_BP-v1-HighNoise-I2V_T2V.safetensors", "adapter_name": "bp_high"},
"low_noise": {"file": "Wan2.2_BP-v1-LowNoise-I2V_T2V.safetensors", "adapter_name": "bp_low"}
},
"Py-v1": {
"high_noise": {"file": "wan2.2_i2v_highnoise_pov_missionary_v1.0.safetensors", "adapter_name": "py_high"},
"low_noise": {"file": "wan2.2_i2v_lownoise_pov_missionary_v1.0.safetensors", "adapter_name": "py_low"}
}
}
LANDSCAPE_WIDTH = 832
LANDSCAPE_HEIGHT = 576
MAX_SEED = np.iinfo(np.int32).max
FIXED_FPS = 16
MIN_FRAMES_MODEL = 8
MAX_FRAMES_MODEL = 81
MIN_DURATION = round(MIN_FRAMES_MODEL/FIXED_FPS,1)
MAX_DURATION = round(MAX_FRAMES_MODEL/FIXED_FPS,1)
pipe = WanImageToVideoPipeline.from_pretrained(MODEL_ID,
transformer=WanTransformer3DModel.from_pretrained('cbensimon/Wan2.2-I2V-A14B-bf16-Diffusers',
subfolder='transformer',
torch_dtype=torch.bfloat16,
device_map='cuda',
),
transformer_2=WanTransformer3DModel.from_pretrained('cbensimon/Wan2.2-I2V-A14B-bf16-Diffusers',
subfolder='transformer_2',
torch_dtype=torch.bfloat16,
device_map='cuda',
),
torch_dtype=torch.bfloat16,
).to('cuda')
optimize_pipeline_(pipe,
image=Image.new('RGB', (LANDSCAPE_WIDTH, LANDSCAPE_HEIGHT)),
prompt='prompt',
height=LANDSCAPE_HEIGHT,
width=LANDSCAPE_WIDTH,
num_frames=MAX_FRAMES_MODEL,
)
for name, lora_set in LORA_SETS.items():
print(f"---LoRA 集合: {name} ---")
# 加载 High Noise
high_noise_config = lora_set["high_noise"]
print(f"High Noise: {high_noise_config['file']}...")
pipe.load_lora_weights(LORA_REPO_ID, weight_name=high_noise_config['file'], adapter_name=high_noise_config['adapter_name'])
print("High Noise LoRA 加载完成。")
# 加载 Low Noise
low_noise_config = lora_set["low_noise"]
print(f" Low Noise: {low_noise_config['file']}...")
pipe.load_lora_weights(LORA_REPO_ID, weight_name=low_noise_config['file'], adapter_name=low_noise_config['adapter_name'])
print("Low Noise LoRA ")
print("。")
for i in range(3):
gc.collect()
torch.cuda.synchronize()
torch.cuda.empty_cache()
default_prompt_i2v = "make this image come alive, cinematic motion, smooth animation"
default_negative_prompt = "色调艳丽, 过曝, 静态, 细节模糊不清, 字幕, 风格, 作品, 画作, 画面, 静止, 整体发灰, 最差质量, 低质量, JPEG压缩残留, 丑陋的, 残缺的, 多余的手指, 画得不好的手部, 画得不好的脸部, 畸形的, 毁容的, 形态畸形的肢体, 手指融合, 静止不动的画面, 杂乱的背景, 三条腿, 背景人很多, 倒着走"
def resize_image(image: Image.Image) -> Image.Image:
if image.height > image.width:
transposed = image.transpose(Image.Transpose.ROTATE_90)
resized = resize_image_landscape(transposed)
return resized.transpose(Image.Transpose.ROTATE_270)
return resize_image_landscape(image)
def resize_image_landscape(image: Image.Image) -> Image.Image:
target_aspect = LANDSCAPE_WIDTH / LANDSCAPE_HEIGHT
width, height = image.size
in_aspect = width / height
if in_aspect > target_aspect:
new_width = round(height * target_aspect)
left = (width - new_width) // 2
image = image.crop((left, 0, left + new_width, height))
else:
new_height = round(width / target_aspect)
top = (height - new_height) // 2
image = image.crop((0, top, width, top + new_height))
return image.resize((LANDSCAPE_WIDTH, LANDSCAPE_HEIGHT), Image.LANCZOS)
def get_duration(
input_image,
prompt,
steps,
negative_prompt,
duration_seconds,
guidance_scale,
guidance_scale_2,
seed,
randomize_seed,
selected_loras,
progress,
):
return int(steps) * 15
@spaces.GPU(duration=get_duration)
def generate_video(
input_image,
prompt,
steps = 4,
negative_prompt=default_negative_prompt,
duration_seconds = MAX_DURATION,
guidance_scale = 1,
guidance_scale_2 = 1,
seed = 42,
randomize_seed = False,
selected_loras = [],
progress=gr.Progress(track_tqdm=True),
):
if input_image is None:
raise gr.Error("Please upload an input image.")
print("potmpt is ")
print(prompt)
num_frames = np.clip(int(round(duration_seconds * FIXED_FPS)), MIN_FRAMES_MODEL, MAX_FRAMES_MODEL)
current_seed = random.randint(0, MAX_SEED) if randomize_seed else int(seed)
resized_image = resize_image(input_image)
num_inference_steps = int(steps)
switch_step = num_inference_steps // 2
class LoraSwitcher:
def __init__(self, selected_lora_names):
self.switched = False
self.high_noise_adapters = []
self.low_noise_adapters = []
if selected_lora_names:
for name in selected_lora_names:
if name in LORA_SETS:
self.high_noise_adapters.append(LORA_SETS[name]["high_noise"]["adapter_name"])
self.low_noise_adapters.append(LORA_SETS[name]["low_noise"]["adapter_name"])
def __call__(self, pipe, step_index, timestep, callback_kwargs):
# LoRA 状态
if step_index == 0:
self.switched = False
# LoRA,则激活 High Noise 版本
if self.high_noise_adapters:
print(f"激活 High Noise LoRA: {self.high_noise_adapters}")
pipe.set_adapters(self.high_noise_adapters, adapter_weights=[1.0] * len(self.high_noise_adapters))
# 🔥 同时 fuse_lora
try:
print(f"Fuse High Noise LoRA: {self.high_noise_adapters}")
pipe.fuse_lora()
except Exception as e:
print(f"Fuse High Noise LoRA 失败: {e}")
# LoRA,则通过将权重设为0来禁用任何可能残留的 LoRA
elif pipe.get_active_adapters():
active_adapters = pipe.get_active_adapters()
print(f"未选择 LoRA,通过设置权重为0来禁用残留的 LoRA: {active_adapters}")
pipe.set_adapters(active_adapters, adapter_weights=[0.0] * len(active_adapters))
#Low Noise LoRA(仅当有 LoRA 被选择时)
if self.low_noise_adapters and step_index >= switch_step and not self.switched:
print(f"在第 {step_index} 步切换到 Low Noise LoRA: {self.low_noise_adapters}")
pipe.set_adapters(self.low_noise_adapters, adapter_weights=[1.0] * len(self.low_noise_adapters))
try:
print(f"Fuse Low Noise LoRA: {self.low_noise_adapters}")
pipe.fuse_lora()
except Exception as e:
print(f"Fuse Low Noise LoRA 失败: {e}")
self.switched = True
return callback_kwargs
lora_switcher_callback = LoraSwitcher(selected_loras)
output_frames_list = pipe(
image=resized_image,
prompt=prompt,
negative_prompt=negative_prompt,
height=resized_image.height,
width=resized_image.width,
num_frames=num_frames,
guidance_scale=float(guidance_scale),
guidance_scale_2=float(guidance_scale_2),
num_inference_steps=num_inference_steps,
generator=torch.Generator(device="cuda").manual_seed(current_seed),
callback_on_step_end=lora_switcher_callback,
).frames[0]
with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as tmpfile:
video_path = tmpfile.name
export_to_video(output_frames_list, video_path, fps=FIXED_FPS)
#upscale_and_upload_4k(video_path, input_image, prompt)
return video_path, current_seed
with gr.Blocks() as demo:
gr.Markdown("# Fast 4 steps Wan 2.2 I2V (14B) with Lightning LoRA")
gr.Markdown("run Wan 2.2 in just 4-8 steps, with [Lightning LoRA](https://huggingface.co/Kijai/WanVideo_comfy/tree/main/Wan22-Lightning), fp8 quantization & AoT compilation - compatible with 🧨 diffusers and ZeroGPU⚡️")
with gr.Row(): # ensures columns align in height
with gr.Column():
input_image_component = gr.Image(
type="pil",
label="Input Image (auto-resized to target H/W)",
interactive=True,
elem_classes=["flex-image"]
)
prompt_input = gr.Textbox(label="Prompt", value=default_prompt_i2v)
duration_seconds_input = gr.Slider(
minimum=MIN_DURATION,
maximum=MAX_DURATION,
step=0.1,
value=3.5,
label="Duration (seconds)",
info=f"Clamped to model's {MIN_FRAMES_MODEL}-{MAX_FRAMES_MODEL} frames at {FIXED_FPS}fps."
)
lora_selection_checkbox = gr.CheckboxGroup(
choices=list(LORA_SETS.keys()),
label="选择要应用的 LoRA (可多选)",
info="选择一个或多个 LoRA 风格进行组合。"
)
with gr.Accordion("Advanced Settings", open=False):
negative_prompt_input = gr.Textbox(label="Negative Prompt", value=default_negative_prompt, lines=3)
seed_input = gr.Slider(label="Seed", minimum=0, maximum=MAX_SEED, step=1, value=42, interactive=True)
randomize_seed_checkbox = gr.Checkbox(label="Randomize seed", value=True, interactive=True)
steps_slider = gr.Slider(minimum=1, maximum=30, step=1, value=6, label="Inference Steps")
guidance_scale_input = gr.Slider(minimum=0.0, maximum=10.0, step=0.5, value=1, label="Guidance Scale - high noise stage")
guidance_scale_2_input = gr.Slider(minimum=0.0, maximum=10.0, step=0.5, value=1, label="Guidance Scale 2 - low noise stage")
generate_button = gr.Button("Generate Video", variant="primary")
with gr.Column():
video_output = gr.Video(label="Generated Video", autoplay=True, interactive=False, elem_classes=["stretch-video"])
ui_inputs = [
input_image_component, prompt_input, steps_slider,
negative_prompt_input, duration_seconds_input,
guidance_scale_input, guidance_scale_2_input, seed_input, randomize_seed_checkbox,
lora_selection_checkbox
]
generate_button.click(fn=generate_video, inputs=ui_inputs, outputs=[video_output, seed_input])
if __name__ == "__main__":
demo.queue().launch()