from dataclasses import dataclass import torch from torch import Tensor, nn import torch.utils.checkpoint as ckpt from .layers import ( DoubleStreamBlock, EmbedND, LastLayer, SingleStreamBlock, timestep_embedding, Approximator, distribute_modulations, ) @dataclass class ChromaParams: in_channels: int context_in_dim: int hidden_size: int mlp_ratio: float num_heads: int depth: int depth_single_blocks: int axes_dim: list[int] theta: int qkv_bias: bool guidance_embed: bool approximator_in_dim: int approximator_depth: int approximator_hidden_size: int _use_compiled: bool chroma_params = ChromaParams( in_channels=64, context_in_dim=4096, hidden_size=3072, mlp_ratio=4.0, num_heads=24, depth=19, depth_single_blocks=38, axes_dim=[16, 56, 56], theta=10_000, qkv_bias=True, guidance_embed=True, approximator_in_dim=64, approximator_depth=5, approximator_hidden_size=5120, _use_compiled=False, ) def modify_mask_to_attend_padding(mask, max_seq_length, num_extra_padding=8): """ Modifies attention mask to allow attention to a few extra padding tokens. Args: mask: Original attention mask (1 for tokens to attend to, 0 for masked tokens) max_seq_length: Maximum sequence length of the model num_extra_padding: Number of padding tokens to unmask Returns: Modified mask """ # Get the actual sequence length from the mask seq_length = mask.sum(dim=-1) batch_size = mask.shape[0] modified_mask = mask.clone() for i in range(batch_size): current_seq_len = int(seq_length[i].item()) # Only add extra padding tokens if there's room if current_seq_len < max_seq_length: # Calculate how many padding tokens we can unmask available_padding = max_seq_length - current_seq_len tokens_to_unmask = min(num_extra_padding, available_padding) # Unmask the specified number of padding tokens right after the sequence modified_mask[i, current_seq_len : current_seq_len + tokens_to_unmask] = 1 return modified_mask class Chroma(nn.Module): """ Transformer model for flow matching on sequences. """ def __init__(self, params: ChromaParams): super().__init__() self.params = params self.in_channels = params.in_channels self.out_channels = self.in_channels if params.hidden_size % params.num_heads != 0: raise ValueError( f"Hidden size {params.hidden_size} must be divisible by num_heads {params.num_heads}" ) pe_dim = params.hidden_size // params.num_heads if sum(params.axes_dim) != pe_dim: raise ValueError( f"Got {params.axes_dim} but expected positional dim {pe_dim}" ) self.hidden_size = params.hidden_size self.num_heads = params.num_heads self.pe_embedder = EmbedND( dim=pe_dim, theta=params.theta, axes_dim=params.axes_dim ) self.img_in = nn.Linear(self.in_channels, self.hidden_size, bias=True) # TODO: need proper mapping for this approximator output! # currently the mapping is hardcoded in distribute_modulations function self.distilled_guidance_layer = Approximator( params.approximator_in_dim, self.hidden_size, params.approximator_hidden_size, params.approximator_depth, ) self.txt_in = nn.Linear(params.context_in_dim, self.hidden_size) self.double_blocks = nn.ModuleList( [ DoubleStreamBlock( self.hidden_size, self.num_heads, mlp_ratio=params.mlp_ratio, qkv_bias=params.qkv_bias, use_compiled=params._use_compiled, ) for _ in range(params.depth) ] ) self.single_blocks = nn.ModuleList( [ SingleStreamBlock( self.hidden_size, self.num_heads, mlp_ratio=params.mlp_ratio, use_compiled=params._use_compiled, ) for _ in range(params.depth_single_blocks) ] ) self.final_layer = LastLayer( self.hidden_size, 1, self.out_channels, use_compiled=params._use_compiled, ) # TODO: move this hardcoded value to config self.mod_index_length = 344 # self.mod_index = torch.tensor(list(range(self.mod_index_length)), device=0) self.register_buffer( "mod_index", torch.tensor(list(range(self.mod_index_length)), device="cpu"), persistent=False, ) @property def device(self): # Get the device of the module (assumes all parameters are on the same device) return next(self.parameters()).device def forward( self, img: Tensor, img_ids: Tensor, txt: Tensor, txt_ids: Tensor, txt_mask: Tensor, timesteps: Tensor, guidance: Tensor, attn_padding: int = 1, ) -> Tensor: if img.ndim != 3 or txt.ndim != 3: raise ValueError("Input img and txt tensors must have 3 dimensions.") # running on sequences img img = self.img_in(img) txt = self.txt_in(txt) # TODO: # need to fix grad accumulation issue here for now it's in no grad mode # besides, i don't want to wash out the PFP that's trained on this model weights anyway # the fan out operation here is deleting the backward graph # alternatively doing forward pass for every block manually is doable but slow # custom backward probably be better with torch.no_grad(): distill_timestep = timestep_embedding(timesteps, 16) # TODO: need to add toggle to omit this from schnell but that's not a priority distil_guidance = timestep_embedding(guidance, 16) # get all modulation index modulation_index = timestep_embedding(self.mod_index, 32) # we need to broadcast the modulation index here so each batch has all of the index modulation_index = modulation_index.unsqueeze(0).repeat(img.shape[0], 1, 1) # and we need to broadcast timestep and guidance along too timestep_guidance = ( torch.cat([distill_timestep, distil_guidance], dim=1) .unsqueeze(1) .repeat(1, self.mod_index_length, 1) ) # then and only then we could concatenate it together input_vec = torch.cat([timestep_guidance, modulation_index], dim=-1) mod_vectors = self.distilled_guidance_layer(input_vec.requires_grad_(True)) mod_vectors_dict = distribute_modulations(mod_vectors) ids = torch.cat((txt_ids, img_ids), dim=1) pe = self.pe_embedder(ids) # compute mask # assume max seq length from the batched input max_len = txt.shape[1] # mask with torch.no_grad(): txt_mask_w_padding = modify_mask_to_attend_padding( txt_mask, max_len, attn_padding ) txt_img_mask = torch.cat( [ txt_mask_w_padding, torch.ones([img.shape[0], img.shape[1]], device=txt_mask.device), ], dim=1, ) txt_img_mask = txt_img_mask.float().T @ txt_img_mask.float() txt_img_mask = ( txt_img_mask[None, None, ...] .repeat(txt.shape[0], self.num_heads, 1, 1) .int() .bool() ) # txt_mask_w_padding[txt_mask_w_padding==False] = True for i, block in enumerate(self.double_blocks): # the guidance replaced by FFN output img_mod = mod_vectors_dict[f"double_blocks.{i}.img_mod.lin"] txt_mod = mod_vectors_dict[f"double_blocks.{i}.txt_mod.lin"] double_mod = [img_mod, txt_mod] # just in case in different GPU for simple pipeline parallel if self.training: img.requires_grad_(True) img, txt = ckpt.checkpoint( block, img, txt, pe, double_mod, txt_img_mask ) else: img, txt = block( img=img, txt=txt, pe=pe, distill_vec=double_mod, mask=txt_img_mask ) img = torch.cat((txt, img), 1) for i, block in enumerate(self.single_blocks): single_mod = mod_vectors_dict[f"single_blocks.{i}.modulation.lin"] if self.training: img.requires_grad_(True) img = ckpt.checkpoint(block, img, pe, single_mod, txt_img_mask) else: img = block(img, pe=pe, distill_vec=single_mod, mask=txt_img_mask) img = img[:, txt.shape[1] :, ...] final_mod = mod_vectors_dict["final_layer.adaLN_modulation.1"] img = self.final_layer( img, distill_vec=final_mod ) # (N, T, patch_size ** 2 * out_channels) return img