# MIT License # Copyright (c) Microsoft # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # The above copyright notice and this permission notice shall be included in all # copies or substantial portions of the Software. # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. # Copyright (c) [2025] [Microsoft] # Copyright (c) [2025] [Chongjie Ye] # SPDX-License-Identifier: MIT # This file has been modified by Chongjie Ye on 2025/04/10 # Original file was released under MIT, with the full license text # available at https://github.com/atong01/conditional-flow-matching/blob/1.0.7/LICENSE. # This modified file is released under the same license. from typing import * import torch import torch.nn as nn import torch.nn.functional as F from ...modules import sparse as sp from .base import SparseTransformerBase class SLatEncoder(SparseTransformerBase): def __init__( self, resolution: int, in_channels: int, model_channels: int, latent_channels: int, num_blocks: int, num_heads: Optional[int] = None, num_head_channels: Optional[int] = 64, mlp_ratio: float = 4, attn_mode: Literal["full", "shift_window", "shift_sequence", "shift_order", "swin"] = "swin", window_size: int = 8, pe_mode: Literal["ape", "rope"] = "ape", use_fp16: bool = False, use_checkpoint: bool = False, qk_rms_norm: bool = False, ): super().__init__( in_channels=in_channels, model_channels=model_channels, num_blocks=num_blocks, num_heads=num_heads, num_head_channels=num_head_channels, mlp_ratio=mlp_ratio, attn_mode=attn_mode, window_size=window_size, pe_mode=pe_mode, use_fp16=use_fp16, use_checkpoint=use_checkpoint, qk_rms_norm=qk_rms_norm, ) self.resolution = resolution self.out_layer = sp.SparseLinear(model_channels, 2 * latent_channels) self.initialize_weights() if use_fp16: self.convert_to_fp16() def initialize_weights(self) -> None: super().initialize_weights() # Zero-out output layers: nn.init.constant_(self.out_layer.weight, 0) nn.init.constant_(self.out_layer.bias, 0) def forward(self, x: sp.SparseTensor, sample_posterior=True, return_raw=False): h = super().forward(x) h = h.type(x.dtype) h = h.replace(F.layer_norm(h.feats, h.feats.shape[-1:])) h = self.out_layer(h) # Sample from the posterior distribution mean, logvar = h.feats.chunk(2, dim=-1) if sample_posterior: std = torch.exp(0.5 * logvar) z = mean + std * torch.randn_like(std) else: z = mean z = h.replace(z) if return_raw: return z, mean, logvar else: return z