IndexTTS-2-Demo / indextts /utils /maskgct_utils.py
kemuriririn's picture
init
fba9477
import torch
import librosa
import json5
from huggingface_hub import hf_hub_download
from transformers import SeamlessM4TFeatureExtractor, Wav2Vec2BertModel
import safetensors
import numpy as np
from indextts.utils.maskgct.models.codec.kmeans.repcodec_model import RepCodec
from indextts.utils.maskgct.models.tts.maskgct.maskgct_s2a import MaskGCT_S2A
from indextts.utils.maskgct.models.codec.amphion_codec.codec import CodecEncoder, CodecDecoder
import time
def _load_config(config_fn, lowercase=False):
"""Load configurations into a dictionary
Args:
config_fn (str): path to configuration file
lowercase (bool, optional): whether changing keys to lower case. Defaults to False.
Returns:
dict: dictionary that stores configurations
"""
with open(config_fn, "r") as f:
data = f.read()
config_ = json5.loads(data)
if "base_config" in config_:
# load configurations from new path
p_config_path = os.path.join(os.getenv("WORK_DIR"), config_["base_config"])
p_config_ = _load_config(p_config_path)
config_ = override_config(p_config_, config_)
if lowercase:
# change keys in config_ to lower case
config_ = get_lowercase_keys_config(config_)
return config_
def load_config(config_fn, lowercase=False):
"""Load configurations into a dictionary
Args:
config_fn (str): path to configuration file
lowercase (bool, optional): _description_. Defaults to False.
Returns:
JsonHParams: an object that stores configurations
"""
config_ = _load_config(config_fn, lowercase=lowercase)
# create an JsonHParams object with configuration dict
cfg = JsonHParams(**config_)
return cfg
class JsonHParams:
def __init__(self, **kwargs):
for k, v in kwargs.items():
if type(v) == dict:
v = JsonHParams(**v)
self[k] = v
def keys(self):
return self.__dict__.keys()
def items(self):
return self.__dict__.items()
def values(self):
return self.__dict__.values()
def __len__(self):
return len(self.__dict__)
def __getitem__(self, key):
return getattr(self, key)
def __setitem__(self, key, value):
return setattr(self, key, value)
def __contains__(self, key):
return key in self.__dict__
def __repr__(self):
return self.__dict__.__repr__()
def build_semantic_model(path_='./models/tts/maskgct/ckpt/wav2vec2bert_stats.pt'):
semantic_model = Wav2Vec2BertModel.from_pretrained("facebook/w2v-bert-2.0")
semantic_model.eval()
stat_mean_var = torch.load(path_)
semantic_mean = stat_mean_var["mean"]
semantic_std = torch.sqrt(stat_mean_var["var"])
return semantic_model, semantic_mean, semantic_std
def build_semantic_codec(cfg):
semantic_codec = RepCodec(cfg=cfg)
semantic_codec.eval()
return semantic_codec
def build_s2a_model(cfg, device):
soundstorm_model = MaskGCT_S2A(cfg=cfg)
soundstorm_model.eval()
soundstorm_model.to(device)
return soundstorm_model
def build_acoustic_codec(cfg, device):
codec_encoder = CodecEncoder(cfg=cfg.encoder)
codec_decoder = CodecDecoder(cfg=cfg.decoder)
codec_encoder.eval()
codec_decoder.eval()
codec_encoder.to(device)
codec_decoder.to(device)
return codec_encoder, codec_decoder
class Inference_Pipeline():
def __init__(
self,
semantic_model,
semantic_codec,
semantic_mean,
semantic_std,
codec_encoder,
codec_decoder,
s2a_model_1layer,
s2a_model_full,
):
self.semantic_model = semantic_model
self.semantic_codec = semantic_codec
self.semantic_mean = semantic_mean
self.semantic_std = semantic_std
self.codec_encoder = codec_encoder
self.codec_decoder = codec_decoder
self.s2a_model_1layer = s2a_model_1layer
self.s2a_model_full = s2a_model_full
@torch.no_grad()
def get_emb(self, input_features, attention_mask):
vq_emb = self.semantic_model(
input_features=input_features,
attention_mask=attention_mask,
output_hidden_states=True,
)
feat = vq_emb.hidden_states[17] # (B, T, C)
feat = (feat - self.semantic_mean.to(feat)) / self.semantic_std.to(feat)
return feat
@torch.no_grad()
def extract_acoustic_code(self, speech):
vq_emb = self.codec_encoder(speech.unsqueeze(1))
_, vq, _, _, _ = self.codec_decoder.quantizer(vq_emb)
acoustic_code = vq.permute(1, 2, 0)
return acoustic_code
@torch.no_grad()
def get_scode(self, inputs):
semantic_code, feat = self.semantic_codec.quantize(inputs)
# vq = self.semantic_codec.quantizer.vq2emb(semantic_code.unsqueeze(1))
# vq = vq.transpose(1,2)
return semantic_code
@torch.no_grad()
def semantic2acoustic(
self,
combine_semantic_code,
acoustic_code,
n_timesteps=[25, 10, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1],
cfg=2.5,
rescale_cfg=0.75,
):
semantic_code = combine_semantic_code
cond = self.s2a_model_1layer.cond_emb(semantic_code)
prompt = acoustic_code[:, :, :]
predict_1layer = self.s2a_model_1layer.reverse_diffusion(
cond=cond,
prompt=prompt,
temp=1.5,
filter_thres=0.98,
n_timesteps=n_timesteps[:1],
cfg=cfg,
rescale_cfg=rescale_cfg,
)
cond = self.s2a_model_full.cond_emb(semantic_code)
prompt = acoustic_code[:, :, :]
predict_full = self.s2a_model_full.reverse_diffusion(
cond=cond,
prompt=prompt,
temp=1.5,
filter_thres=0.98,
n_timesteps=n_timesteps,
cfg=cfg,
rescale_cfg=rescale_cfg,
gt_code=predict_1layer,
)
vq_emb = self.codec_decoder.vq2emb(
predict_full.permute(2, 0, 1), n_quantizers=12
)
recovered_audio = self.codec_decoder(vq_emb)
prompt_vq_emb = self.codec_decoder.vq2emb(
prompt.permute(2, 0, 1), n_quantizers=12
)
recovered_prompt_audio = self.codec_decoder(prompt_vq_emb)
recovered_prompt_audio = recovered_prompt_audio[0][0].cpu().numpy()
recovered_audio = recovered_audio[0][0].cpu().numpy()
combine_audio = np.concatenate([recovered_prompt_audio, recovered_audio])
return combine_audio, recovered_audio
def s2a_inference(
self,
prompt_speech_path,
combine_semantic_code,
cfg=2.5,
n_timesteps_s2a=[25, 10, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1],
cfg_s2a=2.5,
rescale_cfg_s2a=0.75,
):
speech = librosa.load(prompt_speech_path, sr=24000)[0]
acoustic_code = self.extract_acoustic_code(
torch.tensor(speech).unsqueeze(0).to(combine_semantic_code.device)
)
_, recovered_audio = self.semantic2acoustic(
combine_semantic_code,
acoustic_code,
n_timesteps=n_timesteps_s2a,
cfg=cfg_s2a,
rescale_cfg=rescale_cfg_s2a,
)
return recovered_audio
@torch.no_grad()
def gt_inference(
self,
prompt_speech_path,
combine_semantic_code,
):
speech = librosa.load(prompt_speech_path, sr=24000)[0]
'''
acoustic_code = self.extract_acoustic_code(
torch.tensor(speech).unsqueeze(0).to(combine_semantic_code.device)
)
prompt = acoustic_code[:, :, :]
prompt_vq_emb = self.codec_decoder.vq2emb(
prompt.permute(2, 0, 1), n_quantizers=12
)
'''
prompt_vq_emb = self.codec_encoder(torch.tensor(speech).unsqueeze(0).unsqueeze(1).to(combine_semantic_code.device))
recovered_prompt_audio = self.codec_decoder(prompt_vq_emb)
recovered_prompt_audio = recovered_prompt_audio[0][0].cpu().numpy()
return recovered_prompt_audio