| import torch |
| import torch.nn as nn |
|
|
| from einops import rearrange |
| from torchtune.modules import RotaryPositionalEmbeddings |
| from vector_quantize_pytorch import ResidualFSQ |
|
|
| from huggingface_hub import PyTorchModelHubMixin, hf_hub_download |
|
|
| |
| |
|
|
| class RMSNorm(torch.nn.Module): |
| def __init__(self, dim: int, eps: float = 1e-6): |
| r"""https://github.com/meta-llama/llama/blob/main/llama/model.py""" |
| super().__init__() |
| self.eps = eps |
| self.weight = nn.Parameter(torch.ones(dim)) |
|
|
| def forward(self, x): |
| norm_x = torch.mean(x**2, dim=-1, keepdim=True) |
| output = x * torch.rsqrt(norm_x + self.eps) * self.weight |
| return output |
|
|
|
|
| class MLP(nn.Module): |
| def __init__(self, dim: int) -> None: |
| super().__init__() |
|
|
| self.fc1 = nn.Linear(dim, 4 * dim, bias=False) |
| self.silu = nn.SiLU() |
| self.fc2 = nn.Linear(4 * dim, dim, bias=False) |
|
|
| def forward(self, x): |
| x = self.fc1(x) |
| x = self.silu(x) |
| x = self.fc2(x) |
| return x |
|
|
|
|
| class Attention(nn.Module): |
| def __init__( |
| self, dim: int, n_heads: int, rotary_embed: RotaryPositionalEmbeddings |
| ): |
| super().__init__() |
|
|
| assert dim % n_heads == 0 |
|
|
| self.n_heads = n_heads |
| self.dim = dim |
| self.rotary_embed = rotary_embed |
|
|
| self.flash = hasattr(torch.nn.functional, "scaled_dot_product_attention") |
| assert self.flash, "Must have flash attention." |
|
|
| self.c_attn = nn.Linear(dim, 3 * dim, bias=False) |
| self.c_proj = nn.Linear(dim, dim, bias=False) |
|
|
| def forward(self, x): |
| r""" |
| Args: |
| x: (b, t, h*d) |
| |
| Constants: |
| b: batch_size |
| t: time steps |
| r: 3 |
| h: heads_num |
| d: heads_dim |
| """ |
| B, T, C = x.size() |
|
|
| q, k, v = rearrange( |
| self.c_attn(x), "b t (r h d) -> r b h t d", r=3, h=self.n_heads |
| ) |
| |
|
|
| q = self.rotary_embed(q) |
| k = self.rotary_embed(k) |
|
|
| if self.flash: |
| y = torch.nn.functional.scaled_dot_product_attention( |
| q, k, v, attn_mask=None, dropout_p=0, is_causal=False |
| ) |
|
|
| y = rearrange(y, "b h t d -> b t (h d)") |
|
|
| y = self.c_proj(y) |
| |
|
|
| return y |
|
|
|
|
| class TransformerBlock(nn.Module): |
| def __init__( |
| self, dim: int, n_heads: int, rotary_embed: RotaryPositionalEmbeddings |
| ): |
| super().__init__() |
| self.dim = dim |
| self.n_heads = n_heads |
|
|
| self.att_norm = RMSNorm(dim) |
| self.ffn_norm = RMSNorm(dim) |
| self.att = Attention(dim=dim, n_heads=n_heads, rotary_embed=rotary_embed) |
| self.mlp = MLP(dim=dim) |
|
|
| def forward( |
| self, |
| x: torch.Tensor, |
| ): |
| x = x + self.att(self.att_norm(x)) |
| x = x + self.mlp(self.ffn_norm(x)) |
| return x |
|
|
| class ISTFT(nn.Module): |
| """ |
| Custom implementation of ISTFT since torch.istft doesn't allow custom padding (other than `center=True`) with |
| windowing. This is because the NOLA (Nonzero Overlap Add) check fails at the edges. |
| See issue: https://github.com/pytorch/pytorch/issues/62323 |
| Specifically, in the context of neural vocoding we are interested in "same" padding analogous to CNNs. |
| The NOLA constraint is met as we trim padded samples anyway. |
| |
| Args: |
| n_fft (int): Size of Fourier transform. |
| hop_length (int): The distance between neighboring sliding window frames. |
| win_length (int): The size of window frame and STFT filter. |
| padding (str, optional): Type of padding. Options are "center" or "same". Defaults to "same". |
| """ |
|
|
| def __init__( |
| self, n_fft: int, hop_length: int, win_length: int, padding: str = "same" |
| ): |
| super().__init__() |
| if padding not in ["center", "same"]: |
| raise ValueError("Padding must be 'center' or 'same'.") |
| self.padding = padding |
| self.n_fft = n_fft |
| self.hop_length = hop_length |
| self.win_length = win_length |
| window = torch.hann_window(win_length) |
| self.register_buffer("window", window, persistent=False) |
|
|
| def forward(self, spec: torch.Tensor) -> torch.Tensor: |
| """ |
| Compute the Inverse Short Time Fourier Transform (ISTFT) of a complex spectrogram. |
| |
| Args: |
| spec (Tensor): Input complex spectrogram of shape (B, N, T), where B is the batch size, |
| N is the number of frequency bins, and T is the number of time frames. |
| |
| Returns: |
| Tensor: Reconstructed time-domain signal of shape (B, L), where L is the length of the output signal. |
| """ |
| if self.padding == "center": |
| |
| return torch.istft( |
| spec, |
| self.n_fft, |
| self.hop_length, |
| self.win_length, |
| self.window, |
| center=True, |
| ) |
| elif self.padding == "same": |
| pad = (self.win_length - self.hop_length) // 2 |
| else: |
| raise ValueError("Padding must be 'center' or 'same'.") |
|
|
| assert spec.dim() == 3, "Expected a 3D tensor as input" |
| B, N, T = spec.shape |
|
|
| |
| ifft = torch.fft.irfft(spec, self.n_fft, dim=1, norm="backward") |
| ifft = ifft * self.window[None, :, None] |
|
|
| |
| output_size = (T - 1) * self.hop_length + self.win_length |
| y = torch.nn.functional.fold( |
| ifft, |
| output_size=(1, output_size), |
| kernel_size=(1, self.win_length), |
| stride=(1, self.hop_length), |
| )[:, 0, 0, pad:-pad] |
|
|
| |
| window_sq = self.window.square().expand(1, T, -1).transpose(1, 2) |
| window_envelope = torch.nn.functional.fold( |
| window_sq, |
| output_size=(1, output_size), |
| kernel_size=(1, self.win_length), |
| stride=(1, self.hop_length), |
| ).squeeze()[pad:-pad] |
|
|
| |
| assert (window_envelope > 1e-11).all() |
| y = y / window_envelope |
|
|
| return y |
|
|
|
|
| class FourierHead(nn.Module): |
| """Base class for inverse fourier modules.""" |
|
|
| def forward(self, x: torch.Tensor) -> torch.Tensor: |
| """ |
| Args: |
| x (Tensor): Input tensor of shape (B, L, H), where B is the batch size, |
| L is the sequence length, and H denotes the model dimension. |
| |
| Returns: |
| Tensor: Reconstructed time-domain audio signal of shape (B, T), where T is the length of the output signal. |
| """ |
| raise NotImplementedError("Subclasses must implement the forward method.") |
|
|
|
|
| class ISTFTHead(FourierHead): |
| """ |
| ISTFT Head module for predicting STFT complex coefficients. |
| |
| Args: |
| dim (int): Hidden dimension of the model. |
| n_fft (int): Size of Fourier transform. |
| hop_length (int): The distance between neighboring sliding window frames, which should align with |
| the resolution of the input features. |
| padding (str, optional): Type of padding. Options are "center" or "same". Defaults to "same". |
| """ |
|
|
| def __init__(self, dim: int, n_fft: int, hop_length: int, padding: str = "same"): |
| super().__init__() |
| out_dim = n_fft + 2 |
| self.out = torch.nn.Linear(dim, out_dim) |
| self.istft = ISTFT( |
| n_fft=n_fft, hop_length=hop_length, win_length=n_fft, padding=padding |
| ) |
|
|
| def forward(self, x: torch.Tensor) -> torch.Tensor: |
| """ |
| Forward pass of the ISTFTHead module. |
| |
| Args: |
| x (Tensor): Input tensor of shape (B, L, H), where B is the batch size, |
| L is the sequence length, and H denotes the model dimension. |
| |
| Returns: |
| Tensor: Reconstructed time-domain audio signal of shape (B, T), where T is the length of the output signal. |
| """ |
| x_pred = self.out(x) |
| |
| x_pred = x_pred.transpose(1, 2) |
| mag, p = x_pred.chunk(2, dim=1) |
| mag = torch.exp(mag) |
| mag = torch.clip( |
| mag, max=1e2 |
| ) |
| |
| x = torch.cos(p) |
| y = torch.sin(p) |
| |
| |
| |
| |
| |
| S = mag * (x + 1j * y) |
| audio = self.istft(S) |
| return audio.unsqueeze(1), x_pred |
|
|
|
|
| def nonlinearity(x): |
| |
| return x * torch.sigmoid(x) |
|
|
|
|
| def Normalize(in_channels, num_groups=32): |
| return torch.nn.GroupNorm( |
| num_groups=num_groups, num_channels=in_channels, eps=1e-6, affine=True |
| ) |
|
|
|
|
| class ResnetBlock(nn.Module): |
| def __init__( |
| self, |
| *, |
| in_channels, |
| out_channels=None, |
| conv_shortcut=False, |
| dropout, |
| temb_channels=512, |
| ): |
| super().__init__() |
| self.in_channels = in_channels |
| out_channels = in_channels if out_channels is None else out_channels |
| self.out_channels = out_channels |
| self.use_conv_shortcut = conv_shortcut |
|
|
| self.norm1 = Normalize(in_channels) |
| self.conv1 = torch.nn.Conv1d( |
| in_channels, out_channels, kernel_size=3, stride=1, padding=1 |
| ) |
| if temb_channels > 0: |
| self.temb_proj = torch.nn.Linear(temb_channels, out_channels) |
| self.norm2 = Normalize(out_channels) |
| self.dropout = torch.nn.Dropout(dropout) |
| self.conv2 = torch.nn.Conv1d( |
| out_channels, out_channels, kernel_size=3, stride=1, padding=1 |
| ) |
| if self.in_channels != self.out_channels: |
| if self.use_conv_shortcut: |
| self.conv_shortcut = torch.nn.Conv1d( |
| in_channels, out_channels, kernel_size=3, stride=1, padding=1 |
| ) |
| else: |
| self.nin_shortcut = torch.nn.Conv1d( |
| in_channels, out_channels, kernel_size=1, stride=1, padding=0 |
| ) |
|
|
| def forward(self, x, temb=None): |
| h = x |
| h = self.norm1(h) |
| h = nonlinearity(h) |
| h = self.conv1(h) |
|
|
| if temb is not None: |
| h = h + self.temb_proj(nonlinearity(temb))[:, :, None, None] |
|
|
| h = self.norm2(h) |
| h = nonlinearity(h) |
| h = self.dropout(h) |
| h = self.conv2(h) |
|
|
| if self.in_channels != self.out_channels: |
| if self.use_conv_shortcut: |
| x = self.conv_shortcut(x) |
| else: |
| x = self.nin_shortcut(x) |
|
|
| return x + h |
|
|
|
|
| class Backbone(nn.Module): |
| """Base class for the generator's backbone. It preserves the same temporal resolution across all layers.""" |
|
|
| def forward(self, x: torch.Tensor, **kwargs) -> torch.Tensor: |
| """ |
| Args: |
| x (Tensor): Input tensor of shape (B, C, L), where B is the batch size, |
| C denotes output features, and L is the sequence length. |
| |
| Returns: |
| Tensor: Output of shape (B, L, H), where B is the batch size, L is the sequence length, |
| and H denotes the model dimension. |
| """ |
| raise NotImplementedError("Subclasses must implement the forward method.") |
|
|
|
|
| class VocosBackbone(Backbone): |
| """ |
| Vocos backbone module built with ConvNeXt blocks. Supports additional conditioning with Adaptive Layer Normalization |
| |
| Args: |
| input_channels (int): Number of input features channels. |
| dim (int): Hidden dimension of the model. |
| intermediate_dim (int): Intermediate dimension used in ConvNeXtBlock. |
| num_layers (int): Number of ConvNeXtBlock layers. |
| layer_scale_init_value (float, optional): Initial value for layer scaling. Defaults to `1 / num_layers`. |
| adanorm_num_embeddings (int, optional): Number of embeddings for AdaLayerNorm. |
| None means non-conditional model. Defaults to None. |
| """ |
|
|
| def __init__(self, hidden_dim=1024, depth=12, heads=16, pos_meb_dim=64): |
| super().__init__() |
|
|
| self.embed = nn.Conv1d(hidden_dim, hidden_dim, kernel_size=7, padding=3) |
|
|
| self.temb_ch = 0 |
| block_in = hidden_dim |
| dropout = 0.1 |
|
|
| prior_net: List[nn.Module] = [ |
| ResnetBlock( |
| in_channels=block_in, |
| out_channels=block_in, |
| temb_channels=self.temb_ch, |
| dropout=dropout, |
| ), |
| ResnetBlock( |
| in_channels=block_in, |
| out_channels=block_in, |
| temb_channels=self.temb_ch, |
| dropout=dropout, |
| ), |
| ] |
| self.prior_net = nn.Sequential(*prior_net) |
|
|
| depth = depth |
| time_rotary_embed = RotaryPositionalEmbeddings(dim=pos_meb_dim) |
|
|
| transformer_blocks = [ |
| TransformerBlock( |
| dim=hidden_dim, n_heads=heads, rotary_embed=time_rotary_embed |
| ) |
| for _ in range(depth) |
| ] |
|
|
| self.transformers = nn.Sequential(*transformer_blocks) |
| self.final_layer_norm = nn.LayerNorm(hidden_dim, eps=1e-6) |
| post_net: List[nn.Module] = [ |
| ResnetBlock( |
| in_channels=block_in, |
| out_channels=block_in, |
| temb_channels=self.temb_ch, |
| dropout=dropout, |
| ), |
| ResnetBlock( |
| in_channels=block_in, |
| out_channels=block_in, |
| temb_channels=self.temb_ch, |
| dropout=dropout, |
| ), |
| ] |
| self.post_net = nn.Sequential(*post_net) |
|
|
| def forward(self, x: torch.Tensor) -> torch.Tensor: |
| x = x.transpose(1, 2) |
| x = self.embed(x) |
| x = self.prior_net(x) |
| x = x.transpose(1, 2) |
| x = self.transformers(x) |
| x = x.transpose(1, 2) |
| x = self.post_net(x) |
| x = x.transpose(1, 2) |
| x = self.final_layer_norm(x) |
| return x |
|
|
|
|
| def init_weights(m): |
| if isinstance(m, nn.Conv1d): |
| nn.init.trunc_normal_(m.weight, std=0.02) |
| nn.init.constant_(m.bias, 0) |
|
|
| class CodecDecoderVocos(nn.Module): |
| def __init__( |
| self, |
| hidden_dim=1024, |
| depth=12, |
| heads=16, |
| pos_meb_dim=64, |
| hop_length=320, |
| vq_num_quantizers=1, |
| vq_dim=2048, |
| vq_commit_weight=0.25, |
| vq_weight_init=False, |
| vq_full_commit_loss=False, |
| codebook_size=16384, |
| codebook_dim=16, |
| ): |
| super().__init__() |
| self.hop_length = hop_length |
|
|
| self.quantizer = ResidualFSQ( |
| dim=vq_dim, levels=[4, 4, 4, 4, 4, 4, 4, 4], num_quantizers=1 |
| ) |
|
|
| self.backbone = VocosBackbone( |
| hidden_dim=hidden_dim, depth=depth, heads=heads, pos_meb_dim=pos_meb_dim |
| ) |
|
|
| self.head = ISTFTHead( |
| dim=hidden_dim, |
| n_fft=self.hop_length * 4, |
| hop_length=self.hop_length, |
| padding="same", |
| ) |
|
|
| self.reset_parameters() |
|
|
| def forward(self, x, vq=True): |
| if vq is True: |
| |
| x = x.permute(0, 2, 1) |
| x, q = self.quantizer(x) |
| x = x.permute(0, 2, 1) |
| q = q.permute(0, 2, 1) |
| return x, q, None |
| x = self.backbone(x) |
| x, _ = self.head(x) |
|
|
| return x, _ |
|
|
| def vq2emb(self, vq): |
| self.quantizer = self.quantizer.eval() |
| x = self.quantizer.vq2emb(vq) |
| return x |
|
|
| def get_emb(self): |
| self.quantizer = self.quantizer.eval() |
| embs = self.quantizer.get_emb() |
| return embs |
|
|
| def inference_vq(self, vq): |
| x = vq[None, :, :] |
| x = self.model(x) |
| return x |
|
|
| def inference_0(self, x): |
| x, q, loss, perp = self.quantizer(x) |
| x = self.model(x) |
| return x, None |
|
|
| def inference(self, x): |
| x = self.model(x) |
| return x, None |
|
|
| def remove_weight_norm(self): |
| """Remove weight normalization module from all of the layers.""" |
|
|
| def _remove_weight_norm(m): |
| try: |
| torch.nn.utils.remove_weight_norm(m) |
| except ValueError: |
| return |
|
|
| self.apply(_remove_weight_norm) |
|
|
| def apply_weight_norm(self): |
| """Apply weight normalization module from all of the layers.""" |
|
|
| def _apply_weight_norm(m): |
| if isinstance(m, nn.Conv1d) or isinstance(m, nn.ConvTranspose1d): |
| torch.nn.utils.weight_norm(m) |
|
|
| self.apply(_apply_weight_norm) |
|
|
| def reset_parameters(self): |
| self.apply(init_weights) |
|
|
| class NeuCodecDecoder( |
| nn.Module, |
| PyTorchModelHubMixin |
| ): |
|
|
| def __init__(self, sample_rate: int, hop_length: int): |
| super().__init__() |
| self.sample_rate = sample_rate |
| self.hop_length = hop_length |
| self.generator = CodecDecoderVocos(hop_length=hop_length) |
| self.fc_post_a = nn.Linear(2048, 1024) |
|
|
| @property |
| def device(self): |
| return next(self.parameters()).device |
|
|
| def decode_code(self, fsq_codes: torch.Tensor) -> torch.Tensor: |
| """ |
| Args: |
| fsq_codes: torch.Tensor [B, 1, F], 50hz FSQ codes |
| |
| Returns: |
| recon: torch.Tensor [B, 1, T], reconstructed 24kHz audio |
| """ |
|
|
| fsq_post_emb = self.generator.quantizer.get_output_from_indices(fsq_codes.transpose(1, 2)) |
| fsq_post_emb = fsq_post_emb.transpose(1, 2) |
| fsq_post_emb = self.fc_post_a(fsq_post_emb.transpose(1, 2)).transpose(1, 2) |
| recon = self.generator(fsq_post_emb.transpose(1, 2), vq=False)[0] |
| return recon |
|
|