Spaces:
Sleeping
Sleeping
# ***************************************************************************** | |
# Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved. | |
# | |
# Redistribution and use in source and binary forms, with or without | |
# modification, are permitted provided that the following conditions are met: | |
# * Redistributions of source code must retain the above copyright | |
# notice, this list of conditions and the following disclaimer. | |
# * Redistributions in binary form must reproduce the above copyright | |
# notice, this list of conditions and the following disclaimer in the | |
# documentation and/or other materials provided with the distribution. | |
# * Neither the name of the NVIDIA CORPORATION nor the | |
# names of its contributors may be used to endorse or promote products | |
# derived from this software without specific prior written permission. | |
# | |
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND | |
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED | |
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE | |
# DISCLAIMED. IN NO EVENT SHALL NVIDIA CORPORATION BE LIABLE FOR ANY | |
# DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES | |
# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; | |
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND | |
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT | |
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS | |
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. | |
# | |
# ***************************************************************************** | |
import functools | |
import json | |
import re | |
from pathlib import Path | |
import librosa | |
import numpy as np | |
import torch | |
import torch.nn.functional as F | |
from scipy import ndimage | |
from scipy.stats import betabinom | |
import common.layers as layers | |
from common.text.text_processing import TextProcessing | |
from common.utils import load_wav_to_torch, load_filepaths_and_text, to_gpu | |
class BetaBinomialInterpolator: | |
"""Interpolates alignment prior matrices to save computation. | |
Calculating beta-binomial priors is costly. Instead cache popular sizes | |
and use img interpolation to get priors faster. | |
""" | |
def __init__(self, round_mel_len_to=100, round_text_len_to=20): | |
self.round_mel_len_to = round_mel_len_to | |
self.round_text_len_to = round_text_len_to | |
self.bank = functools.lru_cache(beta_binomial_prior_distribution) | |
def round(self, val, to): | |
return max(1, int(np.round((val + 1) / to))) * to | |
def __call__(self, w, h): | |
bw = self.round(w, to=self.round_mel_len_to) | |
bh = self.round(h, to=self.round_text_len_to) | |
ret = ndimage.zoom(self.bank(bw, bh).T, zoom=(w / bw, h / bh), order=1) | |
assert ret.shape[0] == w, ret.shape | |
assert ret.shape[1] == h, ret.shape | |
return ret | |
def beta_binomial_prior_distribution(phoneme_count, mel_count, scaling=1.0): | |
P = phoneme_count | |
M = mel_count | |
x = np.arange(0, P) | |
mel_text_probs = [] | |
for i in range(1, M+1): | |
a, b = scaling * i, scaling * (M + 1 - i) | |
rv = betabinom(P, a, b) | |
mel_i_prob = rv.pmf(x) | |
mel_text_probs.append(mel_i_prob) | |
return torch.tensor(np.array(mel_text_probs)) | |
def estimate_pitch(wav, mel_len, method='pyin', normalize_mean=None, | |
normalize_std=None, n_formants=1): | |
if type(normalize_mean) is float or type(normalize_mean) is list: | |
normalize_mean = torch.tensor(normalize_mean) | |
if type(normalize_std) is float or type(normalize_std) is list: | |
normalize_std = torch.tensor(normalize_std) | |
if method == 'pyin': | |
snd, sr = librosa.load(wav) | |
pitch_mel, voiced_flag, voiced_probs = librosa.pyin( | |
# snd, fmin=librosa.note_to_hz('C2'), ######################### | |
snd, fmin=60, ###################### | |
# fmax=librosa.note_to_hz('C7'), frame_length=1024) | |
fmax=400, frame_length=1024) | |
assert np.abs(mel_len - pitch_mel.shape[0]) <= 1.0 | |
pitch_mel = np.where(np.isnan(pitch_mel), 0.0, pitch_mel) | |
pitch_mel = torch.from_numpy(pitch_mel).unsqueeze(0) | |
pitch_mel = F.pad(pitch_mel, (0, mel_len - pitch_mel.size(1))) | |
if n_formants > 1: | |
raise NotImplementedError | |
else: | |
raise ValueError | |
pitch_mel = pitch_mel.float() | |
if normalize_mean is not None: | |
assert normalize_std is not None | |
pitch_mel = normalize_pitch(pitch_mel, normalize_mean, normalize_std) | |
return pitch_mel | |
def normalize_pitch(pitch, mean, std): | |
zeros = (pitch == 0.0) | |
pitch -= mean[:, None] | |
pitch /= std[:, None] | |
pitch[zeros] = 0.0 | |
return pitch | |
class TTSDataset(torch.utils.data.Dataset): | |
""" | |
1) loads audio,text pairs | |
2) normalizes text and converts them to sequences of one-hot vectors | |
3) computes mel-spectrograms from audio files. | |
""" | |
def __init__(self, | |
dataset_path, | |
audiopaths_and_text, | |
text_cleaners, | |
n_mel_channels, | |
symbol_set='smj_expanded', | |
p_arpabet=1.0, | |
n_speakers=1, | |
n_languages=1, #ANT: added | |
load_mel_from_disk=True, | |
load_pitch_from_disk=True, | |
pitch_mean=214.72203, # LJSpeech defaults | |
pitch_std=65.72038, | |
max_wav_value=None, | |
sampling_rate=None, | |
filter_length=None, | |
hop_length=None, | |
win_length=None, | |
mel_fmin=None, | |
mel_fmax=None, | |
prepend_space_to_text=False, | |
append_space_to_text=False, | |
pitch_online_dir=None, | |
betabinomial_online_dir=None, | |
use_betabinomial_interpolator=True, | |
pitch_online_method='pyin', | |
**ignored): | |
# print(prepend_space_to_text, append_space_to_text) | |
# Expect a list of filenames | |
if type(audiopaths_and_text) is str: | |
audiopaths_and_text = [audiopaths_and_text] | |
self.dataset_path = dataset_path | |
#ANT: do we need to add language to common_utils.load_filepaths_and_text, probably | |
self.audiopaths_and_text = load_filepaths_and_text( | |
dataset_path, audiopaths_and_text, | |
has_speakers=(n_speakers > 1)) | |
self.load_mel_from_disk = load_mel_from_disk | |
if not load_mel_from_disk: | |
self.max_wav_value = max_wav_value | |
self.sampling_rate = sampling_rate | |
self.stft = layers.TacotronSTFT( | |
filter_length, hop_length, win_length, | |
n_mel_channels, sampling_rate, mel_fmin, mel_fmax) | |
self.load_pitch_from_disk = load_pitch_from_disk | |
self.prepend_space_to_text = prepend_space_to_text | |
self.append_space_to_text = append_space_to_text | |
assert p_arpabet == 0.0 or p_arpabet == 1.0, ( | |
'Only 0.0 and 1.0 p_arpabet is currently supported. ' | |
'Variable probability breaks caching of betabinomial matrices.') | |
self.tp = TextProcessing(symbol_set, text_cleaners, p_arpabet=p_arpabet) | |
self.n_speakers = n_speakers | |
# ANT: added languages, must add to config and probably train.py too | |
self.n_languages = n_languages | |
self.pitch_tmp_dir = pitch_online_dir | |
self.f0_method = pitch_online_method | |
self.betabinomial_tmp_dir = betabinomial_online_dir | |
self.use_betabinomial_interpolator = use_betabinomial_interpolator | |
if use_betabinomial_interpolator: | |
self.betabinomial_interpolator = BetaBinomialInterpolator() | |
# ANT: added language here | |
expected_columns = (2 + int(load_pitch_from_disk) + (n_speakers > 1) + (n_languages > 1)) | |
assert not (load_pitch_from_disk and self.pitch_tmp_dir is not None) | |
""" | |
if len(self.audiopaths_and_text[0]) < expected_columns: | |
raise ValueError(f'Expected {expected_columns} columns in audiopaths file. ' | |
'The format is <mel_or_wav>|[<pitch>|]<text>[|<speaker_id>]') | |
""" | |
if len(self.audiopaths_and_text[0]) > expected_columns: | |
print('WARNING: Audiopaths file has more columns than expected') | |
to_tensor = lambda x: torch.Tensor([x]) if type(x) is float else x | |
self.pitch_mean = to_tensor(pitch_mean) | |
self.pitch_std = to_tensor(pitch_std) | |
def __getitem__(self, index): | |
# Separate filename and text | |
# ANT: added language, assume that if language is present, speaker labels are too | |
# print(self.n_speakers, self.n_languages) ############################ | |
if self.n_speakers > 1 and self.n_languages > 1: | |
audiopath, *extra, text, speaker, language = self.audiopaths_and_text[index] | |
speaker = int(speaker) | |
language = int(language) | |
# print("spkr", speaker, "lang",language) ############################ | |
elif self.n_speakers >1: | |
audiopath, *extra, text, speaker = self.audiopaths_and_text[index] | |
speaker = int(speaker) | |
# print(speaker) ############################ | |
else: | |
audiopath, *extra, text = self.audiopaths_and_text[index] | |
speaker = None | |
language = None | |
mel = self.get_mel(audiopath) | |
text = self.get_text(text) | |
# print(text) | |
pitch = self.get_pitch(index, mel.size(-1)) | |
## ANT: if external pitch extraction is used, n_frames may be one off due to rounding differences | |
if pitch.size(-1) != mel.size(-1): ############################ | |
print(pitch.shape, mel.shape, audiopath) ############################ | |
if pitch.size(-1) < mel.size(-1): | |
mel = mel[:, :pitch.size(-1)] | |
else: | |
pitch = pitch[:,:mel.size(-1)] #### | |
energy = torch.norm(mel.float(), dim=0, p=2) | |
attn_prior = self.get_prior(index, mel.shape[1], text.shape[0]) | |
assert pitch.size(-1) == mel.size(-1) | |
# No higher formants? | |
if len(pitch.size()) == 1: | |
pitch = pitch[None, :] | |
return (text, mel, len(text), pitch, energy, speaker, language, attn_prior, | |
audiopath) | |
def __len__(self): | |
return len(self.audiopaths_and_text) | |
def get_mel(self, filename): | |
if not self.load_mel_from_disk: | |
audio, sampling_rate = load_wav_to_torch(filename) | |
if sampling_rate != self.stft.sampling_rate: | |
print(filename) | |
raise ValueError("{} SR doesn't match target {} SR".format( | |
sampling_rate, self.stft.sampling_rate)) | |
audio_norm = audio / self.max_wav_value | |
audio_norm = audio_norm.unsqueeze(0) | |
audio_norm = torch.autograd.Variable(audio_norm, | |
requires_grad=False) | |
melspec = self.stft.mel_spectrogram(audio_norm) | |
melspec = torch.squeeze(melspec, 0) | |
else: | |
raise Exception(filename) | |
melspec = torch.load(filename) | |
assert melspec.size(0) == self.stft.n_mel_channels, ( | |
'Mel dimension mismatch: given {}, expected {}'.format( | |
melspec.size(0), self.stft.n_mel_channels)) | |
################ Plotting mels ######################################## | |
""" | |
import matplotlib.pyplot as plt | |
# plt.imshow(melspec.detach().cpu().T,aspect="auto") | |
fig, ax1 = plt.subplots(ncols=1) | |
pos = ax1.imshow(melspec.cpu().numpy().T,aspect="auto") | |
fig.colorbar(pos, ax=ax1) | |
plt.show() | |
""" | |
####################################################################### | |
return melspec | |
def get_text(self, text): | |
text = self.tp.encode_text(text) | |
space = [self.tp.encode_text("A A")[1]] | |
if self.prepend_space_to_text: | |
text = space + text | |
print("prepending") | |
if self.append_space_to_text: | |
text = text + space | |
print("appending") | |
return torch.LongTensor(text) | |
def get_prior(self, index, mel_len, text_len): | |
if self.use_betabinomial_interpolator: | |
return torch.from_numpy(self.betabinomial_interpolator(mel_len, | |
text_len)) | |
if self.betabinomial_tmp_dir is not None: | |
audiopath, *_ = self.audiopaths_and_text[index] | |
fname = Path(audiopath).relative_to(self.dataset_path) | |
fname = fname.with_suffix('.pt') | |
cached_fpath = Path(self.betabinomial_tmp_dir, fname) | |
if cached_fpath.is_file(): | |
return torch.load(cached_fpath) | |
attn_prior = beta_binomial_prior_distribution(text_len, mel_len) | |
if self.betabinomial_tmp_dir is not None: | |
cached_fpath.parent.mkdir(parents=True, exist_ok=True) | |
torch.save(attn_prior, cached_fpath) | |
return attn_prior | |
def get_pitch(self, index, mel_len=None): | |
audiopath, *fields = self.audiopaths_and_text[index] | |
# ANT: spk is not used but I'll let it be | |
if self.n_speakers > 1 and self.n_languages > 1: | |
spk = spk = int(fields[-2]) | |
elif self.n_speakers > 1: | |
spk = int(fields[-1]) | |
else: | |
spk = 0 | |
if self.load_pitch_from_disk: | |
pitchpath = fields[0] | |
pitch = torch.load(pitchpath) | |
if self.pitch_mean is not None: | |
assert self.pitch_std is not None | |
pitch = normalize_pitch(pitch, self.pitch_mean, self.pitch_std) | |
return pitch | |
if self.pitch_tmp_dir is not None: | |
fname = Path(audiopath).relative_to(self.dataset_path) | |
fname_method = fname.with_suffix('.pt') | |
cached_fpath = Path(self.pitch_tmp_dir, fname_method) | |
if cached_fpath.is_file(): | |
return torch.load(cached_fpath) | |
# No luck so far - calculate | |
wav = audiopath | |
if not wav.endswith('.wav'): | |
wav = re.sub('/mels/', '/wavs/', wav) | |
wav = re.sub('.pt$', '.wav', wav) | |
pitch_mel = estimate_pitch(wav, mel_len, self.f0_method, | |
self.pitch_mean, self.pitch_std) | |
if self.pitch_tmp_dir is not None and not cached_fpath.is_file(): | |
cached_fpath.parent.mkdir(parents=True, exist_ok=True) | |
torch.save(pitch_mel, cached_fpath) | |
return pitch_mel | |
class TTSCollate: | |
"""Zero-pads model inputs and targets based on number of frames per step""" | |
def __call__(self, batch): | |
"""Collate training batch from normalized text and mel-spec""" | |
# Right zero-pad all one-hot text sequences to max input length | |
input_lengths, ids_sorted_decreasing = torch.sort( | |
torch.LongTensor([len(x[0]) for x in batch]), | |
dim=0, descending=True) | |
max_input_len = input_lengths[0] | |
text_padded = torch.LongTensor(len(batch), max_input_len) | |
text_padded.zero_() | |
for i in range(len(ids_sorted_decreasing)): | |
text = batch[ids_sorted_decreasing[i]][0] | |
text_padded[i, :text.size(0)] = text | |
# Right zero-pad mel-spec | |
num_mels = batch[0][1].size(0) | |
max_target_len = max([x[1].size(1) for x in batch]) | |
# Include mel padded and gate padded | |
mel_padded = torch.FloatTensor(len(batch), num_mels, max_target_len) | |
mel_padded.zero_() | |
output_lengths = torch.LongTensor(len(batch)) | |
for i in range(len(ids_sorted_decreasing)): | |
mel = batch[ids_sorted_decreasing[i]][1] | |
mel_padded[i, :, :mel.size(1)] = mel | |
output_lengths[i] = mel.size(1) | |
n_formants = batch[0][3].shape[0] | |
pitch_padded = torch.zeros(mel_padded.size(0), n_formants, | |
mel_padded.size(2), dtype=batch[0][3].dtype) | |
energy_padded = torch.zeros_like(pitch_padded[:, 0, :]) | |
for i in range(len(ids_sorted_decreasing)): | |
pitch = batch[ids_sorted_decreasing[i]][3] | |
energy = batch[ids_sorted_decreasing[i]][4] | |
pitch_padded[i, :, :pitch.shape[1]] = pitch | |
energy_padded[i, :energy.shape[0]] = energy | |
if batch[0][5] is not None: | |
speaker = torch.zeros_like(input_lengths) | |
for i in range(len(ids_sorted_decreasing)): | |
speaker[i] = batch[ids_sorted_decreasing[i]][5] | |
else: | |
speaker = None | |
#ANT: added language here and increased the attn_prior and audiopaths index by 1 | |
if batch[0][6] is not None: | |
language = torch.zeros_like(input_lengths) | |
for i in range(len(ids_sorted_decreasing)): | |
language[i] = batch[ids_sorted_decreasing[i]][6] | |
else: | |
language = None | |
attn_prior_padded = torch.zeros(len(batch), max_target_len, | |
max_input_len) | |
attn_prior_padded.zero_() | |
for i in range(len(ids_sorted_decreasing)): | |
prior = batch[ids_sorted_decreasing[i]][7] | |
attn_prior_padded[i, :prior.size(0), :prior.size(1)] = prior | |
# Count number of items - characters in text | |
len_x = [x[2] for x in batch] | |
len_x = torch.Tensor(len_x) | |
audiopaths = [batch[i][8] for i in ids_sorted_decreasing] | |
return (text_padded, input_lengths, mel_padded, output_lengths, len_x, | |
pitch_padded, energy_padded, speaker, language, attn_prior_padded, | |
audiopaths) | |
def batch_to_gpu(batch): | |
# ANT: added language here too | |
(text_padded, input_lengths, mel_padded, output_lengths, len_x, | |
pitch_padded, energy_padded, speaker, language, attn_prior, audiopaths) = batch | |
text_padded = to_gpu(text_padded).long() | |
input_lengths = to_gpu(input_lengths).long() | |
mel_padded = to_gpu(mel_padded).float() | |
output_lengths = to_gpu(output_lengths).long() | |
pitch_padded = to_gpu(pitch_padded).float() | |
energy_padded = to_gpu(energy_padded).float() | |
attn_prior = to_gpu(attn_prior).float() | |
if speaker is not None: | |
speaker = to_gpu(speaker).long() | |
if language is not None: | |
language = to_gpu(language).long() | |
# Alignments act as both inputs and targets - pass shallow copies | |
x = [text_padded, input_lengths, mel_padded, output_lengths, | |
pitch_padded, energy_padded, speaker, language, attn_prior, audiopaths] | |
y = [mel_padded, input_lengths, output_lengths] | |
len_x = torch.sum(output_lengths) | |
# print(output_lengths) | |
return (x, y, len_x) | |