Spaces:
Sleeping
Sleeping
File size: 5,828 Bytes
de4dc78 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 |
# ltx_worker_base.py (GPU-C: cuda:2)
# Worker para gerar os fragmentos de vídeo em resolução base.
# Este arquivo é parte do projeto Euia-AducSdr e está sob a licença AGPL v3.
# Copyright (C) 4 de Agosto de 2025 Carlos Rodrigues dos Santos
import torch
import gc
import os
import yaml
import numpy as np
import imageio
from pathlib import Path
import huggingface_hub
from inference import (
create_ltx_video_pipeline,
ConditioningItem,
calculate_padding,
prepare_conditioning
)
class LtxGenerator:
def __init__(self, device_id='cuda:2'):
print(f"WORKER CÂMERA-BASE: Inicializando...")
self.device = torch.device(device_id if torch.cuda.is_available() else 'cpu')
print(f"WORKER CÂMERA-BASE: Usando dispositivo: {self.device}")
config_file_path = "configs/ltxv-13b-0.9.8-distilled.yaml"
with open(config_file_path, "r") as file:
self.config = yaml.safe_load(file)
LTX_REPO = "Lightricks/LTX-Video"
models_dir = "downloaded_models_gradio"
Path(models_dir).mkdir(parents=True, exist_ok=True)
print("WORKER CÂMERA-BASE: Carregando pipeline LTX na CPU (estado de repouso)...")
distilled_model_actual_path = huggingface_hub.hf_hub_download(
repo_id=LTX_REPO,
filename=self.config["checkpoint_path"],
local_dir=models_dir,
local_dir_use_symlinks=False
)
self.pipeline = create_ltx_video_pipeline(
ckpt_path=distilled_model_actual_path,
precision=self.config["precision"],
text_encoder_model_name_or_path=self.config["text_encoder_model_name_or_path"],
sampler=self.config["sampler"],
device='cpu'
)
print("WORKER CÂMERA-BASE: Pronto (na CPU).")
def to_gpu(self):
if self.pipeline and torch.cuda.is_available():
print(f"WORKER CÂMERA-BASE: Movendo LTX para {self.device}...")
self.pipeline.to(self.device)
def to_cpu(self):
if self.pipeline:
print(f"WORKER CÂMERA-BASE: Descarregando LTX da GPU {self.device}...")
self.pipeline.to('cpu')
gc.collect()
if torch.cuda.is_available():
torch.cuda.empty_cache()
def generate_video_fragment(
self, motion_prompt: str, conditioning_items_data: list,
width: int, height: int, seed: int, cfg: float, video_total_frames: int,
video_fps: int, num_inference_steps: int, use_attention_slicing: bool,
current_fragment_index: int, output_path: str, progress
):
progress(0.1, desc=f"[Câmera LTX Base] Filmando Cena {current_fragment_index}...")
target_device = self.pipeline.device
if use_attention_slicing:
self.pipeline.enable_attention_slicing()
media_paths = [item[0] for item in conditioning_items_data]
start_frames = [item[1] for item in conditioning_items_data]
strengths = [item[2] for item in conditioning_items_data]
padded_h, padded_w = ((height - 1) // 32 + 1) * 32, ((width - 1) // 32 + 1) * 32
padding_vals = calculate_padding(height, width, padded_h, padded_w)
conditioning_items = prepare_conditioning(
conditioning_media_paths=media_paths, conditioning_strengths=strengths,
conditioning_start_frames=start_frames, height=height, width=width,
num_frames=video_total_frames, padding=padding_vals, pipeline=self.pipeline,
)
for item in conditioning_items:
item.media_item = item.media_item.to(target_device)
actual_num_frames = int(round((float(video_total_frames) - 1.0) / 8.0) * 8 + 1)
first_pass_config = self.config.get("first_pass", {}).copy()
first_pass_config['num_inference_steps'] = int(num_inference_steps)
kwargs = {
"prompt": motion_prompt, "negative_prompt": "blurry, distorted, bad quality, artifacts",
"height": padded_h, "width": padded_w, "num_frames": actual_num_frames,
"frame_rate": video_fps,
"generator": torch.Generator(device=target_device).manual_seed(int(seed) + current_fragment_index),
"output_type": "pt", "guidance_scale": float(cfg),
"timesteps": first_pass_config.get("timesteps"),
"conditioning_items": conditioning_items,
"decode_timestep": self.config.get("decode_timestep"),
"decode_noise_scale": self.config.get("decode_noise_scale"),
"stochastic_sampling": self.config.get("stochastic_sampling"),
"image_cond_noise_scale": 0.15, "is_video": True, "vae_per_channel_normalize": True,
"mixed_precision": (self.config.get("precision") == "mixed_precision"),
"enhance_prompt": False, "decode_every": 4, "num_inference_steps": int(num_inference_steps)
}
result_tensor = self.pipeline(**kwargs).images
pad_l, pad_r, pad_t, pad_b = map(int, padding_vals)
slice_h = -pad_b if pad_b > 0 else None; slice_w = -pad_r if pad_r > 0 else None
cropped_tensor = result_tensor[:, :, :actual_num_frames, pad_t:slice_h, pad_l:slice_w]
video_np = (cropped_tensor[0].permute(1, 2, 3, 0).cpu().float().numpy() * 255).astype(np.uint8)
with imageio.get_writer(output_path, fps=video_fps, codec='libx264', quality=8) as writer:
for frame in video_np:
writer.append_data(frame)
if use_attention_slicing and self.pipeline:
self.pipeline.disable_attention_slicing()
return output_path, actual_num_frames
# --- Instância Singleton para o Worker Base ---
ltx_base_singleton = LtxGenerator(device_id='cuda:2') |