# ***************************************************************************** # Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are met: # * Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # * Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in the # documentation and/or other materials provided with the distribution. # * Neither the name of the NVIDIA CORPORATION nor the # names of its contributors may be used to endorse or promote products # derived from this software without specific prior written permission. # # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE # DISCLAIMED. IN NO EVENT SHALL NVIDIA CORPORATION BE LIABLE FOR ANY # DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES # (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND # ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. # # ***************************************************************************** import functools import json import re from pathlib import Path import librosa import numpy as np import torch import torch.nn.functional as F from scipy import ndimage from scipy.stats import betabinom import common.layers as layers from common.text.text_processing import TextProcessing from common.utils import load_wav_to_torch, load_filepaths_and_text, to_gpu class BetaBinomialInterpolator: """Interpolates alignment prior matrices to save computation. Calculating beta-binomial priors is costly. Instead cache popular sizes and use img interpolation to get priors faster. """ def __init__(self, round_mel_len_to=100, round_text_len_to=20): self.round_mel_len_to = round_mel_len_to self.round_text_len_to = round_text_len_to self.bank = functools.lru_cache(beta_binomial_prior_distribution) def round(self, val, to): return max(1, int(np.round((val + 1) / to))) * to def __call__(self, w, h): bw = self.round(w, to=self.round_mel_len_to) bh = self.round(h, to=self.round_text_len_to) ret = ndimage.zoom(self.bank(bw, bh).T, zoom=(w / bw, h / bh), order=1) assert ret.shape[0] == w, ret.shape assert ret.shape[1] == h, ret.shape return ret def beta_binomial_prior_distribution(phoneme_count, mel_count, scaling=1.0): P = phoneme_count M = mel_count x = np.arange(0, P) mel_text_probs = [] for i in range(1, M+1): a, b = scaling * i, scaling * (M + 1 - i) rv = betabinom(P, a, b) mel_i_prob = rv.pmf(x) mel_text_probs.append(mel_i_prob) return torch.tensor(np.array(mel_text_probs)) def estimate_pitch(wav, mel_len, method='pyin', normalize_mean=None, normalize_std=None, n_formants=1): if type(normalize_mean) is float or type(normalize_mean) is list: normalize_mean = torch.tensor(normalize_mean) if type(normalize_std) is float or type(normalize_std) is list: normalize_std = torch.tensor(normalize_std) if method == 'pyin': snd, sr = librosa.load(wav) pitch_mel, voiced_flag, voiced_probs = librosa.pyin( # snd, fmin=librosa.note_to_hz('C2'), ######################### snd, fmin=60, ###################### # fmax=librosa.note_to_hz('C7'), frame_length=1024) fmax=400, frame_length=1024) assert np.abs(mel_len - pitch_mel.shape[0]) <= 1.0 pitch_mel = np.where(np.isnan(pitch_mel), 0.0, pitch_mel) pitch_mel = torch.from_numpy(pitch_mel).unsqueeze(0) pitch_mel = F.pad(pitch_mel, (0, mel_len - pitch_mel.size(1))) if n_formants > 1: raise NotImplementedError else: raise ValueError pitch_mel = pitch_mel.float() if normalize_mean is not None: assert normalize_std is not None pitch_mel = normalize_pitch(pitch_mel, normalize_mean, normalize_std) return pitch_mel def normalize_pitch(pitch, mean, std): zeros = (pitch == 0.0) pitch -= mean[:, None] pitch /= std[:, None] pitch[zeros] = 0.0 return pitch class TTSDataset(torch.utils.data.Dataset): """ 1) loads audio,text pairs 2) normalizes text and converts them to sequences of one-hot vectors 3) computes mel-spectrograms from audio files. """ def __init__(self, dataset_path, audiopaths_and_text, text_cleaners, n_mel_channels, symbol_set='smj_expanded', p_arpabet=1.0, n_speakers=1, n_languages=1, #ANT: added load_mel_from_disk=True, load_pitch_from_disk=True, pitch_mean=214.72203, # LJSpeech defaults pitch_std=65.72038, max_wav_value=None, sampling_rate=None, filter_length=None, hop_length=None, win_length=None, mel_fmin=None, mel_fmax=None, prepend_space_to_text=False, append_space_to_text=False, pitch_online_dir=None, betabinomial_online_dir=None, use_betabinomial_interpolator=True, pitch_online_method='pyin', **ignored): # print(prepend_space_to_text, append_space_to_text) # Expect a list of filenames if type(audiopaths_and_text) is str: audiopaths_and_text = [audiopaths_and_text] self.dataset_path = dataset_path #ANT: do we need to add language to common_utils.load_filepaths_and_text, probably self.audiopaths_and_text = load_filepaths_and_text( dataset_path, audiopaths_and_text, has_speakers=(n_speakers > 1)) self.load_mel_from_disk = load_mel_from_disk if not load_mel_from_disk: self.max_wav_value = max_wav_value self.sampling_rate = sampling_rate self.stft = layers.TacotronSTFT( filter_length, hop_length, win_length, n_mel_channels, sampling_rate, mel_fmin, mel_fmax) self.load_pitch_from_disk = load_pitch_from_disk self.prepend_space_to_text = prepend_space_to_text self.append_space_to_text = append_space_to_text assert p_arpabet == 0.0 or p_arpabet == 1.0, ( 'Only 0.0 and 1.0 p_arpabet is currently supported. ' 'Variable probability breaks caching of betabinomial matrices.') self.tp = TextProcessing(symbol_set, text_cleaners, p_arpabet=p_arpabet) self.n_speakers = n_speakers # ANT: added languages, must add to config and probably train.py too self.n_languages = n_languages self.pitch_tmp_dir = pitch_online_dir self.f0_method = pitch_online_method self.betabinomial_tmp_dir = betabinomial_online_dir self.use_betabinomial_interpolator = use_betabinomial_interpolator if use_betabinomial_interpolator: self.betabinomial_interpolator = BetaBinomialInterpolator() # ANT: added language here expected_columns = (2 + int(load_pitch_from_disk) + (n_speakers > 1) + (n_languages > 1)) assert not (load_pitch_from_disk and self.pitch_tmp_dir is not None) """ if len(self.audiopaths_and_text[0]) < expected_columns: raise ValueError(f'Expected {expected_columns} columns in audiopaths file. ' 'The format is |[|][|]') """ if len(self.audiopaths_and_text[0]) > expected_columns: print('WARNING: Audiopaths file has more columns than expected') to_tensor = lambda x: torch.Tensor([x]) if type(x) is float else x self.pitch_mean = to_tensor(pitch_mean) self.pitch_std = to_tensor(pitch_std) def __getitem__(self, index): # Separate filename and text # ANT: added language, assume that if language is present, speaker labels are too # print(self.n_speakers, self.n_languages) ############################ if self.n_speakers > 1 and self.n_languages > 1: audiopath, *extra, text, speaker, language = self.audiopaths_and_text[index] speaker = int(speaker) language = int(language) # print("spkr", speaker, "lang",language) ############################ elif self.n_speakers >1: audiopath, *extra, text, speaker = self.audiopaths_and_text[index] speaker = int(speaker) # print(speaker) ############################ else: audiopath, *extra, text = self.audiopaths_and_text[index] speaker = None language = None mel = self.get_mel(audiopath) text = self.get_text(text) # print(text) pitch = self.get_pitch(index, mel.size(-1)) ## ANT: if external pitch extraction is used, n_frames may be one off due to rounding differences if pitch.size(-1) != mel.size(-1): ############################ print(pitch.shape, mel.shape, audiopath) ############################ if pitch.size(-1) < mel.size(-1): mel = mel[:, :pitch.size(-1)] else: pitch = pitch[:,:mel.size(-1)] #### energy = torch.norm(mel.float(), dim=0, p=2) attn_prior = self.get_prior(index, mel.shape[1], text.shape[0]) assert pitch.size(-1) == mel.size(-1) # No higher formants? if len(pitch.size()) == 1: pitch = pitch[None, :] return (text, mel, len(text), pitch, energy, speaker, language, attn_prior, audiopath) def __len__(self): return len(self.audiopaths_and_text) def get_mel(self, filename): if not self.load_mel_from_disk: audio, sampling_rate = load_wav_to_torch(filename) if sampling_rate != self.stft.sampling_rate: print(filename) raise ValueError("{} SR doesn't match target {} SR".format( sampling_rate, self.stft.sampling_rate)) audio_norm = audio / self.max_wav_value audio_norm = audio_norm.unsqueeze(0) audio_norm = torch.autograd.Variable(audio_norm, requires_grad=False) melspec = self.stft.mel_spectrogram(audio_norm) melspec = torch.squeeze(melspec, 0) else: raise Exception(filename) melspec = torch.load(filename) assert melspec.size(0) == self.stft.n_mel_channels, ( 'Mel dimension mismatch: given {}, expected {}'.format( melspec.size(0), self.stft.n_mel_channels)) ################ Plotting mels ######################################## """ import matplotlib.pyplot as plt # plt.imshow(melspec.detach().cpu().T,aspect="auto") fig, ax1 = plt.subplots(ncols=1) pos = ax1.imshow(melspec.cpu().numpy().T,aspect="auto") fig.colorbar(pos, ax=ax1) plt.show() """ ####################################################################### return melspec def get_text(self, text): text = self.tp.encode_text(text) space = [self.tp.encode_text("A A")[1]] if self.prepend_space_to_text: text = space + text print("prepending") if self.append_space_to_text: text = text + space print("appending") return torch.LongTensor(text) def get_prior(self, index, mel_len, text_len): if self.use_betabinomial_interpolator: return torch.from_numpy(self.betabinomial_interpolator(mel_len, text_len)) if self.betabinomial_tmp_dir is not None: audiopath, *_ = self.audiopaths_and_text[index] fname = Path(audiopath).relative_to(self.dataset_path) fname = fname.with_suffix('.pt') cached_fpath = Path(self.betabinomial_tmp_dir, fname) if cached_fpath.is_file(): return torch.load(cached_fpath) attn_prior = beta_binomial_prior_distribution(text_len, mel_len) if self.betabinomial_tmp_dir is not None: cached_fpath.parent.mkdir(parents=True, exist_ok=True) torch.save(attn_prior, cached_fpath) return attn_prior def get_pitch(self, index, mel_len=None): audiopath, *fields = self.audiopaths_and_text[index] # ANT: spk is not used but I'll let it be if self.n_speakers > 1 and self.n_languages > 1: spk = spk = int(fields[-2]) elif self.n_speakers > 1: spk = int(fields[-1]) else: spk = 0 if self.load_pitch_from_disk: pitchpath = fields[0] pitch = torch.load(pitchpath) if self.pitch_mean is not None: assert self.pitch_std is not None pitch = normalize_pitch(pitch, self.pitch_mean, self.pitch_std) return pitch if self.pitch_tmp_dir is not None: fname = Path(audiopath).relative_to(self.dataset_path) fname_method = fname.with_suffix('.pt') cached_fpath = Path(self.pitch_tmp_dir, fname_method) if cached_fpath.is_file(): return torch.load(cached_fpath) # No luck so far - calculate wav = audiopath if not wav.endswith('.wav'): wav = re.sub('/mels/', '/wavs/', wav) wav = re.sub('.pt$', '.wav', wav) pitch_mel = estimate_pitch(wav, mel_len, self.f0_method, self.pitch_mean, self.pitch_std) if self.pitch_tmp_dir is not None and not cached_fpath.is_file(): cached_fpath.parent.mkdir(parents=True, exist_ok=True) torch.save(pitch_mel, cached_fpath) return pitch_mel class TTSCollate: """Zero-pads model inputs and targets based on number of frames per step""" def __call__(self, batch): """Collate training batch from normalized text and mel-spec""" # Right zero-pad all one-hot text sequences to max input length input_lengths, ids_sorted_decreasing = torch.sort( torch.LongTensor([len(x[0]) for x in batch]), dim=0, descending=True) max_input_len = input_lengths[0] text_padded = torch.LongTensor(len(batch), max_input_len) text_padded.zero_() for i in range(len(ids_sorted_decreasing)): text = batch[ids_sorted_decreasing[i]][0] text_padded[i, :text.size(0)] = text # Right zero-pad mel-spec num_mels = batch[0][1].size(0) max_target_len = max([x[1].size(1) for x in batch]) # Include mel padded and gate padded mel_padded = torch.FloatTensor(len(batch), num_mels, max_target_len) mel_padded.zero_() output_lengths = torch.LongTensor(len(batch)) for i in range(len(ids_sorted_decreasing)): mel = batch[ids_sorted_decreasing[i]][1] mel_padded[i, :, :mel.size(1)] = mel output_lengths[i] = mel.size(1) n_formants = batch[0][3].shape[0] pitch_padded = torch.zeros(mel_padded.size(0), n_formants, mel_padded.size(2), dtype=batch[0][3].dtype) energy_padded = torch.zeros_like(pitch_padded[:, 0, :]) for i in range(len(ids_sorted_decreasing)): pitch = batch[ids_sorted_decreasing[i]][3] energy = batch[ids_sorted_decreasing[i]][4] pitch_padded[i, :, :pitch.shape[1]] = pitch energy_padded[i, :energy.shape[0]] = energy if batch[0][5] is not None: speaker = torch.zeros_like(input_lengths) for i in range(len(ids_sorted_decreasing)): speaker[i] = batch[ids_sorted_decreasing[i]][5] else: speaker = None #ANT: added language here and increased the attn_prior and audiopaths index by 1 if batch[0][6] is not None: language = torch.zeros_like(input_lengths) for i in range(len(ids_sorted_decreasing)): language[i] = batch[ids_sorted_decreasing[i]][6] else: language = None attn_prior_padded = torch.zeros(len(batch), max_target_len, max_input_len) attn_prior_padded.zero_() for i in range(len(ids_sorted_decreasing)): prior = batch[ids_sorted_decreasing[i]][7] attn_prior_padded[i, :prior.size(0), :prior.size(1)] = prior # Count number of items - characters in text len_x = [x[2] for x in batch] len_x = torch.Tensor(len_x) audiopaths = [batch[i][8] for i in ids_sorted_decreasing] return (text_padded, input_lengths, mel_padded, output_lengths, len_x, pitch_padded, energy_padded, speaker, language, attn_prior_padded, audiopaths) def batch_to_gpu(batch): # ANT: added language here too (text_padded, input_lengths, mel_padded, output_lengths, len_x, pitch_padded, energy_padded, speaker, language, attn_prior, audiopaths) = batch text_padded = to_gpu(text_padded).long() input_lengths = to_gpu(input_lengths).long() mel_padded = to_gpu(mel_padded).float() output_lengths = to_gpu(output_lengths).long() pitch_padded = to_gpu(pitch_padded).float() energy_padded = to_gpu(energy_padded).float() attn_prior = to_gpu(attn_prior).float() if speaker is not None: speaker = to_gpu(speaker).long() if language is not None: language = to_gpu(language).long() # Alignments act as both inputs and targets - pass shallow copies x = [text_padded, input_lengths, mel_padded, output_lengths, pitch_padded, energy_padded, speaker, language, attn_prior, audiopaths] y = [mel_padded, input_lengths, output_lengths] len_x = torch.sum(output_lengths) # print(output_lengths) return (x, y, len_x)