Spaces:
Running
Running
from dataclasses import dataclass | |
import torch | |
from torch import Tensor, nn | |
import torch.utils.checkpoint as ckpt | |
from .layers import ( | |
DoubleStreamBlock, | |
EmbedND, | |
LastLayer, | |
SingleStreamBlock, | |
timestep_embedding, | |
Approximator, | |
distribute_modulations, | |
) | |
class ChromaParams: | |
in_channels: int | |
context_in_dim: int | |
hidden_size: int | |
mlp_ratio: float | |
num_heads: int | |
depth: int | |
depth_single_blocks: int | |
axes_dim: list[int] | |
theta: int | |
qkv_bias: bool | |
guidance_embed: bool | |
approximator_in_dim: int | |
approximator_depth: int | |
approximator_hidden_size: int | |
_use_compiled: bool | |
chroma_params = ChromaParams( | |
in_channels=64, | |
context_in_dim=4096, | |
hidden_size=3072, | |
mlp_ratio=4.0, | |
num_heads=24, | |
depth=19, | |
depth_single_blocks=38, | |
axes_dim=[16, 56, 56], | |
theta=10_000, | |
qkv_bias=True, | |
guidance_embed=True, | |
approximator_in_dim=64, | |
approximator_depth=5, | |
approximator_hidden_size=5120, | |
_use_compiled=False, | |
) | |
def modify_mask_to_attend_padding(mask, max_seq_length, num_extra_padding=8): | |
""" | |
Modifies attention mask to allow attention to a few extra padding tokens. | |
Args: | |
mask: Original attention mask (1 for tokens to attend to, 0 for masked tokens) | |
max_seq_length: Maximum sequence length of the model | |
num_extra_padding: Number of padding tokens to unmask | |
Returns: | |
Modified mask | |
""" | |
# Get the actual sequence length from the mask | |
seq_length = mask.sum(dim=-1) | |
batch_size = mask.shape[0] | |
modified_mask = mask.clone() | |
for i in range(batch_size): | |
current_seq_len = int(seq_length[i].item()) | |
# Only add extra padding tokens if there's room | |
if current_seq_len < max_seq_length: | |
# Calculate how many padding tokens we can unmask | |
available_padding = max_seq_length - current_seq_len | |
tokens_to_unmask = min(num_extra_padding, available_padding) | |
# Unmask the specified number of padding tokens right after the sequence | |
modified_mask[i, current_seq_len : current_seq_len + tokens_to_unmask] = 1 | |
return modified_mask | |
class Chroma(nn.Module): | |
""" | |
Transformer model for flow matching on sequences. | |
""" | |
def __init__(self, params: ChromaParams): | |
super().__init__() | |
self.params = params | |
self.in_channels = params.in_channels | |
self.out_channels = self.in_channels | |
if params.hidden_size % params.num_heads != 0: | |
raise ValueError( | |
f"Hidden size {params.hidden_size} must be divisible by num_heads {params.num_heads}" | |
) | |
pe_dim = params.hidden_size // params.num_heads | |
if sum(params.axes_dim) != pe_dim: | |
raise ValueError( | |
f"Got {params.axes_dim} but expected positional dim {pe_dim}" | |
) | |
self.hidden_size = params.hidden_size | |
self.num_heads = params.num_heads | |
self.pe_embedder = EmbedND( | |
dim=pe_dim, theta=params.theta, axes_dim=params.axes_dim | |
) | |
self.img_in = nn.Linear(self.in_channels, self.hidden_size, bias=True) | |
# TODO: need proper mapping for this approximator output! | |
# currently the mapping is hardcoded in distribute_modulations function | |
self.distilled_guidance_layer = Approximator( | |
params.approximator_in_dim, | |
self.hidden_size, | |
params.approximator_hidden_size, | |
params.approximator_depth, | |
) | |
self.txt_in = nn.Linear(params.context_in_dim, self.hidden_size) | |
self.double_blocks = nn.ModuleList( | |
[ | |
DoubleStreamBlock( | |
self.hidden_size, | |
self.num_heads, | |
mlp_ratio=params.mlp_ratio, | |
qkv_bias=params.qkv_bias, | |
use_compiled=params._use_compiled, | |
) | |
for _ in range(params.depth) | |
] | |
) | |
self.single_blocks = nn.ModuleList( | |
[ | |
SingleStreamBlock( | |
self.hidden_size, | |
self.num_heads, | |
mlp_ratio=params.mlp_ratio, | |
use_compiled=params._use_compiled, | |
) | |
for _ in range(params.depth_single_blocks) | |
] | |
) | |
self.final_layer = LastLayer( | |
self.hidden_size, | |
1, | |
self.out_channels, | |
use_compiled=params._use_compiled, | |
) | |
# TODO: move this hardcoded value to config | |
self.mod_index_length = 344 | |
# self.mod_index = torch.tensor(list(range(self.mod_index_length)), device=0) | |
self.register_buffer( | |
"mod_index", | |
torch.tensor(list(range(self.mod_index_length)), device="cpu"), | |
persistent=False, | |
) | |
def device(self): | |
# Get the device of the module (assumes all parameters are on the same device) | |
return next(self.parameters()).device | |
def forward( | |
self, | |
img: Tensor, | |
img_ids: Tensor, | |
txt: Tensor, | |
txt_ids: Tensor, | |
txt_mask: Tensor, | |
timesteps: Tensor, | |
guidance: Tensor, | |
attn_padding: int = 1, | |
) -> Tensor: | |
if img.ndim != 3 or txt.ndim != 3: | |
raise ValueError("Input img and txt tensors must have 3 dimensions.") | |
# running on sequences img | |
img = self.img_in(img) | |
txt = self.txt_in(txt) | |
# TODO: | |
# need to fix grad accumulation issue here for now it's in no grad mode | |
# besides, i don't want to wash out the PFP that's trained on this model weights anyway | |
# the fan out operation here is deleting the backward graph | |
# alternatively doing forward pass for every block manually is doable but slow | |
# custom backward probably be better | |
with torch.no_grad(): | |
distill_timestep = timestep_embedding(timesteps, 16) | |
# TODO: need to add toggle to omit this from schnell but that's not a priority | |
distil_guidance = timestep_embedding(guidance, 16) | |
# get all modulation index | |
modulation_index = timestep_embedding(self.mod_index, 32) | |
# we need to broadcast the modulation index here so each batch has all of the index | |
modulation_index = modulation_index.unsqueeze(0).repeat(img.shape[0], 1, 1) | |
# and we need to broadcast timestep and guidance along too | |
timestep_guidance = ( | |
torch.cat([distill_timestep, distil_guidance], dim=1) | |
.unsqueeze(1) | |
.repeat(1, self.mod_index_length, 1) | |
) | |
# then and only then we could concatenate it together | |
input_vec = torch.cat([timestep_guidance, modulation_index], dim=-1) | |
mod_vectors = self.distilled_guidance_layer(input_vec.requires_grad_(True)) | |
mod_vectors_dict = distribute_modulations(mod_vectors) | |
ids = torch.cat((txt_ids, img_ids), dim=1) | |
pe = self.pe_embedder(ids) | |
# compute mask | |
# assume max seq length from the batched input | |
max_len = txt.shape[1] | |
# mask | |
with torch.no_grad(): | |
txt_mask_w_padding = modify_mask_to_attend_padding( | |
txt_mask, max_len, attn_padding | |
) | |
txt_img_mask = torch.cat( | |
[ | |
txt_mask_w_padding, | |
torch.ones([img.shape[0], img.shape[1]], device=txt_mask.device), | |
], | |
dim=1, | |
) | |
txt_img_mask = txt_img_mask.float().T @ txt_img_mask.float() | |
txt_img_mask = ( | |
txt_img_mask[None, None, ...] | |
.repeat(txt.shape[0], self.num_heads, 1, 1) | |
.int() | |
.bool() | |
) | |
# txt_mask_w_padding[txt_mask_w_padding==False] = True | |
for i, block in enumerate(self.double_blocks): | |
# the guidance replaced by FFN output | |
img_mod = mod_vectors_dict[f"double_blocks.{i}.img_mod.lin"] | |
txt_mod = mod_vectors_dict[f"double_blocks.{i}.txt_mod.lin"] | |
double_mod = [img_mod, txt_mod] | |
# just in case in different GPU for simple pipeline parallel | |
if self.training: | |
img.requires_grad_(True) | |
img, txt = ckpt.checkpoint( | |
block, img, txt, pe, double_mod, txt_img_mask | |
) | |
else: | |
img, txt = block( | |
img=img, txt=txt, pe=pe, distill_vec=double_mod, mask=txt_img_mask | |
) | |
img = torch.cat((txt, img), 1) | |
for i, block in enumerate(self.single_blocks): | |
single_mod = mod_vectors_dict[f"single_blocks.{i}.modulation.lin"] | |
if self.training: | |
img.requires_grad_(True) | |
img = ckpt.checkpoint(block, img, pe, single_mod, txt_img_mask) | |
else: | |
img = block(img, pe=pe, distill_vec=single_mod, mask=txt_img_mask) | |
img = img[:, txt.shape[1] :, ...] | |
final_mod = mod_vectors_dict["final_layer.adaLN_modulation.1"] | |
img = self.final_layer( | |
img, distill_vec=final_mod | |
) # (N, T, patch_size ** 2 * out_channels) | |
return img | |