""" Updates after 0416: Import half in config Rebuild npy without filling v2 support No f0 model support Fix int16: Added support for no index Changed f0 algorithm to harvest (seems like this is the only thing that affects CPU usage), but the effect is not good without this change """ import os, sys, traceback, re import json now_dir = os.getcwd() sys.path.append(now_dir) from config import Config Config = Config() import PySimpleGUI as sg import sounddevice as sd import noisereduce as nr import numpy as np from fairseq import checkpoint_utils import librosa, torch, pyworld, faiss, time, threading import torch.nn.functional as F import torchaudio.transforms as tat import scipy.signal as signal import torchcrepe # import matplotlib.pyplot as plt from infer_pack.models import ( SynthesizerTrnMs256NSFsid, SynthesizerTrnMs256NSFsid_nono, SynthesizerTrnMs768NSFsid, SynthesizerTrnMs768NSFsid_nono, ) from i18n import I18nAuto i18n = I18nAuto() device = torch.device("cuda" if torch.cuda.is_available() else "cpu") current_dir = os.getcwd() class RVC: def __init__( self, key, f0_method, hubert_path, pth_path, index_path, npy_path, index_rate ) -> None: """ initialization """ try: self.f0_up_key = key self.time_step = 160 / 16000 * 1000 self.f0_min = 50 self.f0_max = 1100 self.f0_mel_min = 1127 * np.log(1 + self.f0_min / 700) self.f0_mel_max = 1127 * np.log(1 + self.f0_max / 700) self.f0_method = f0_method self.sr = 16000 self.window = 160 # Get Torch Device if(torch.cuda.is_available()): self.torch_device = torch.device(f"cuda:{0 % torch.cuda.device_count()}") elif torch.backends.mps.is_available(): self.torch_device = torch.device("mps") else: self.torch_device = torch.device("cpu") if index_rate != 0: self.index = faiss.read_index(index_path) # self.big_npy = np.load(npy_path) self.big_npy = self.index.reconstruct_n(0, self.index.ntotal) print("index search enabled") self.index_rate = index_rate model_path = hubert_path print("load model(s) from {}".format(model_path)) models, saved_cfg, task = checkpoint_utils.load_model_ensemble_and_task( [model_path], suffix="", ) self.model = models[0] self.model = self.model.to(device) if Config.is_half: self.model = self.model.half() else: self.model = self.model.float() self.model.eval() cpt = torch.load(pth_path, map_location="cpu") self.tgt_sr = cpt["config"][-1] cpt["config"][-3] = cpt["weight"]["emb_g.weight"].shape[0] # n_spk self.if_f0 = cpt.get("f0", 1) self.version = cpt.get("version", "v1") if self.version == "v1": if self.if_f0 == 1: self.net_g = SynthesizerTrnMs256NSFsid( *cpt["config"], is_half=Config.is_half ) else: self.net_g = SynthesizerTrnMs256NSFsid_nono(*cpt["config"]) elif self.version == "v2": if self.if_f0 == 1: self.net_g = SynthesizerTrnMs768NSFsid( *cpt["config"], is_half=Config.is_half ) else: self.net_g = SynthesizerTrnMs768NSFsid_nono(*cpt["config"]) del self.net_g.enc_q print(self.net_g.load_state_dict(cpt["weight"], strict=False)) self.net_g.eval().to(device) if Config.is_half: self.net_g = self.net_g.half() else: self.net_g = self.net_g.float() except: print(traceback.format_exc()) def get_regular_crepe_computation(self, x, f0_min, f0_max, model="full"): batch_size = 512 # Compute pitch using first gpu audio = torch.tensor(np.copy(x))[None].float() f0, pd = torchcrepe.predict( audio, self.sr, self.window, f0_min, f0_max, model, batch_size=batch_size, device=self.torch_device, return_periodicity=True, ) pd = torchcrepe.filter.median(pd, 3) f0 = torchcrepe.filter.mean(f0, 3) f0[pd < 0.1] = 0 f0 = f0[0].cpu().numpy() return f0 def get_harvest_computation(self, x, f0_min, f0_max): f0, t = pyworld.harvest( x.astype(np.double), fs=self.sr, f0_ceil=f0_max, f0_floor=f0_min, frame_period=10, ) f0 = pyworld.stonemask(x.astype(np.double), f0, t, self.sr) f0 = signal.medfilt(f0, 3) return f0 def get_f0(self, x, f0_up_key, inp_f0=None): # Calculate Padding and f0 details here p_len = x.shape[0] // 512 # For Now This probs doesn't work x_pad = 1 f0_min = 50 f0_max = 1100 f0_mel_min = 1127 * np.log(1 + f0_min / 700) f0_mel_max = 1127 * np.log(1 + f0_max / 700) f0 = 0 # Here, check f0_methods and get their computations if(self.f0_method == 'harvest'): f0 = self.get_harvest_computation(x, f0_min, f0_max) elif(self.f0_method == 'reg-crepe'): f0 = self.get_regular_crepe_computation(x, f0_min, f0_max) elif(self.f0_method == 'reg-crepe-tiny'): f0 = self.get_regular_crepe_computation(x, f0_min, f0_max, "tiny") # Calculate f0_course and f0_bak here f0 *= pow(2, f0_up_key / 12) # with open("test.txt","w")as f:f.write("\n".join([str(i)for i in f0.tolist()])) tf0 = self.sr // self.window # f0 points per second if inp_f0 is not None: delta_t = np.round( (inp_f0[:, 0].max() - inp_f0[:, 0].min()) * tf0 + 1 ).astype("int16") replace_f0 = np.interp( list(range(delta_t)), inp_f0[:, 0] * 100, inp_f0[:, 1] ) shape = f0[x_pad * tf0 : x_pad * tf0 + len(replace_f0)].shape[0] f0[x_pad * tf0 : x_pad * tf0 + len(replace_f0)] = replace_f0[:shape] # with open("test_opt.txt","w")as f:f.write("\n".join([str(i)for i in f0.tolist()])) f0bak = f0.copy() f0_mel = 1127 * np.log(1 + f0 / 700) f0_mel[f0_mel > 0] = (f0_mel[f0_mel > 0] - f0_mel_min) * 254 / ( f0_mel_max - f0_mel_min ) + 1 f0_mel[f0_mel <= 1] = 1 f0_mel[f0_mel > 255] = 255 f0_coarse = np.rint(f0_mel).astype(np.int) return f0_coarse, f0bak # 1-0 def infer(self, feats: torch.Tensor) -> np.ndarray: """ inference function """ audio = feats.clone().cpu().numpy() assert feats.dim() == 1, feats.dim() feats = feats.view(1, -1) padding_mask = torch.BoolTensor(feats.shape).fill_(False) if Config.is_half: feats = feats.half() else: feats = feats.float() inputs = { "source": feats.to(device), "padding_mask": padding_mask.to(device), "output_layer": 9 if self.version == "v1" else 12, } torch.cuda.synchronize() with torch.no_grad(): logits = self.model.extract_features(**inputs) feats = ( self.model.final_proj(logits[0]) if self.version == "v1" else logits[0] ) ####Index optimization try: if ( hasattr(self, "index") and hasattr(self, "big_npy") and self.index_rate != 0 ): npy = feats[0].cpu().numpy().astype("float32") score, ix = self.index.search(npy, k=8) weight = np.square(1 / score) weight /= weight.sum(axis=1, keepdims=True) npy = np.sum(self.big_npy[ix] * np.expand_dims(weight, axis=2), axis=1) if Config.is_half: npy = npy.astype("float16") feats = ( torch.from_numpy(npy).unsqueeze(0).to(device) * self.index_rate + (1 - self.index_rate) * feats ) else: print("index search FAIL or disabled") except: traceback.print_exc() print("index search FAIL") feats = F.interpolate(feats.permute(0, 2, 1), scale_factor=2).permute(0, 2, 1) torch.cuda.synchronize() print(feats.shape) if self.if_f0 == 1: pitch, pitchf = self.get_f0(audio, self.f0_up_key) p_len = min(feats.shape[1], 13000, pitch.shape[0]) # Too big to burst video memory else: pitch, pitchf = None, None p_len = min(feats.shape[1], 13000) # Too big to burst video memory torch.cuda.synchronize() # print(feats.shape,pitch.shape) feats = feats[:, :p_len, :] if self.if_f0 == 1: pitch = pitch[:p_len] pitchf = pitchf[:p_len] pitch = torch.LongTensor(pitch).unsqueeze(0).to(device) pitchf = torch.FloatTensor(pitchf).unsqueeze(0).to(device) p_len = torch.LongTensor([p_len]).to(device) ii = 0 # sid sid = torch.LongTensor([ii]).to(device) with torch.no_grad(): if self.if_f0 == 1: infered_audio = ( self.net_g.infer(feats, p_len, pitch, pitchf, sid)[0][0, 0] .data.cpu() .float() ) else: infered_audio = ( self.net_g.infer(feats, p_len, sid)[0][0, 0].data.cpu().float() ) torch.cuda.synchronize() return infered_audio class GUIConfig: def __init__(self) -> None: self.hubert_path: str = "" self.pth_path: str = "" self.index_path: str = "" self.npy_path: str = "" self.f0_method: str = "" self.pitch: int = 12 self.samplerate: int = 44100 self.block_time: float = 1.0 # s self.buffer_num: int = 1 self.threhold: int = -30 self.crossfade_time: float = 0.08 self.extra_time: float = 0.04 self.I_noise_reduce = False self.O_noise_reduce = False self.index_rate = 0.3 class GUI: def __init__(self) -> None: self.config = GUIConfig() self.flag_vc = False self.launcher() def load(self): input_devices, output_devices, _, _ = self.get_devices() try: with open("values1.json", "r") as j: data = json.load(j) except: # Injecting f0_method into the json data with open("values1.json", "w") as j: data = { "pth_path": " ", "index_path": " ", "sg_input_device": input_devices[sd.default.device[0]], "sg_output_device": output_devices[sd.default.device[1]], "threhold": "-45", "pitch": "0", "index_rate": "0", "block_time": "1", "crossfade_length": "0.04", "extra_time": "1", } return data def launcher(self): data = self.load() sg.theme("DarkTeal12") input_devices, output_devices, _, _ = self.get_devices() layout = [ [ sg.Frame( title="Proudly forked by Mangio621", layout=[ [ sg.Image('./mangio_utils/lol.png') ] ] ), sg.Frame( title=i18n("Load model"), layout=[ [ sg.Input( default_text="hubert_base.pt", key="hubert_path", disabled=True, ), sg.FileBrowse( i18n("Hubert model"), initial_folder=os.path.join(os.getcwd()), file_types=((". pt"),), ), ], [ sg.Input( default_text=data.get("pth_path", ""), key="pth_path", ), sg.FileBrowse( i18n("Select .pth file"), initial_folder=os.path.join(os.getcwd(), "weights"), file_types=((". pth"),), ), ], [ sg.Input( default_text=data.get("index_path", ""), key="index_path", ), sg.FileBrowse( i18n("Select .index file"), initial_folder=os.path.join(os.getcwd(), "logs"), file_types=((". index"),), ), ], [ sg.Input( default_text="You don't need to write this.", key="npy_path", disabled=True, ), sg.FileBrowse( i18n("Select .npy file"), initial_folder=os.path.join(os.getcwd(), "logs"), file_types=((". npy"),), ), ], ], ) ], [ # Mangio f0 Selection frame Here sg.Frame( layout=[ [ sg.Radio("Harvest", "f0_method", key="harvest", default=True), sg.Radio("Crepe", "f0_method", key="reg-crepe"), sg.Radio("Crepe Tiny", "f0_method", key="reg-crepe-tiny"), ] ], title="Select an f0 Method", ) ], [ sg.Frame( layout=[ [ sg.Text(i18n("input device")), sg.Combo( input_devices, key="sg_input_device", default_value=data.get("sg_input_device", ""), ), ], [ sg.Text(i18n("output device")), sg.Combo( output_devices, key="sg_output_device", default_value=data.get("sg_output_device", ""), ), ], ], title=i18n("Audio device (please use the same type of driver)"), ) ], [ sg.Frame( layout=[ [ sg.Text(i18n("response threshold")), sg.Slider( range=(-60, 0), key="threhold", resolution=1, orientation="h", default_value=data.get("threhold", ""), ), ], [ sg.Text(i18n("Tone settings")), sg.Slider( range=(-24, 24), key="pitch", resolution=1, orientation="h", default_value=data.get("pitch", ""), ), ], [ sg.Text(i18n("Index Rate")), sg.Slider( range=(0.0, 1.0), key="index_rate", resolution=0.01, orientation="h", default_value=data.get("index_rate", ""), ), ], ], title=i18n("General settings"), ), sg.Frame( layout=[ [ sg.Text(i18n("Sample length")), sg.Slider( range=(0.1, 3.0), key="block_time", resolution=0.1, orientation="h", default_value=data.get("block_time", ""), ), ], [ sg.Text(i18n("Fade Length")), sg.Slider( range=(0.01, 0.15), key="crossfade_length", resolution=0.01, orientation="h", default_value=data.get("crossfade_length", ""), ), ], [ sg.Text(i18n("Additional reasoning time")), sg.Slider( range=(0.05, 3.00), key="extra_time", resolution=0.01, orientation="h", default_value=data.get("extra_time", ""), ), ], [ sg.Checkbox(i18n("Input noise reduction"), key="I_noise_reduce"), sg.Checkbox(i18n("Output noise reduction"), key="O_noise_reduce"), ], ], title=i18n("Performance settings"), ), ], [ sg.Button(i18n("Start audio conversion"), key="start_vc"), sg.Button(i18n("Stop audio conversion"), key="stop_vc"), sg.Text(i18n("Inference time (ms):")), sg.Text("0", key="infer_time"), ], ] self.window = sg.Window("RVC - GUI", layout=layout) self.event_handler() def event_handler(self): while True: event, values = self.window.read() if event == sg.WINDOW_CLOSED: self.flag_vc = False exit() if event == "start_vc" and self.flag_vc == False: if self.set_values(values) == True: print("using_cuda:" + str(torch.cuda.is_available())) self.start_vc() settings = { "pth_path": values["pth_path"], "index_path": values["index_path"], "f0_method": self.get_f0_method_from_radios(values), "sg_input_device": values["sg_input_device"], "sg_output_device": values["sg_output_device"], "threhold": values["threhold"], "pitch": values["pitch"], "index_rate": values["index_rate"], "block_time": values["block_time"], "crossfade_length": values["crossfade_length"], "extra_time": values["extra_time"], } with open("values1.json", "w") as j: json.dump(settings, j) if event == "stop_vc" and self.flag_vc == True: self.flag_vc = False # Function that returns the used f0 method in string format "harvest" def get_f0_method_from_radios(self, values): f0_array = [ {"name": "harvest", "val": values['harvest']}, {"name": "reg-crepe", "val": values['reg-crepe']}, {"name": "reg-crepe-tiny", "val": values['reg-crepe-tiny']}, ] # Filter through to find a true value used_f0 = "" for f0 in f0_array: if(f0['val'] == True): used_f0 = f0['name'] break if(used_f0 == ""): used_f0 = "harvest" # Default Harvest if used_f0 is empty somehow return used_f0 def set_values(self, values): if len(values["pth_path"].strip()) == 0: sg.popup(i18n("Please select pth file")) return False if len(values["index_path"].strip()) == 0: sg.popup(i18n("Please select index file")) return False pattern = re.compile("[^\x00-\x7F]+") if pattern.findall(values["hubert_path"]): sg.popup(i18n("The hubert model path cannot contain Chinese characters")) return False if pattern.findall(values["pth_path"]): sg.popup(i18n("pth file path cannot contain Chinese characters")) return False if pattern.findall(values["index_path"]): sg.popup(i18n("The index file path cannot contain Chinese characters")) return False self.set_devices(values["sg_input_device"], values["sg_output_device"]) self.config.hubert_path = os.path.join(current_dir, "hubert_base.pt") self.config.pth_path = values["pth_path"] self.config.index_path = values["index_path"] self.config.npy_path = values["npy_path"] self.config.f0_method = self.get_f0_method_from_radios(values) self.config.threhold = values["threhold"] self.config.pitch = values["pitch"] self.config.block_time = values["block_time"] self.config.crossfade_time = values["crossfade_length"] self.config.extra_time = values["extra_time"] self.config.I_noise_reduce = values["I_noise_reduce"] self.config.O_noise_reduce = values["O_noise_reduce"] self.config.index_rate = values["index_rate"] return True def start_vc(self): torch.cuda.empty_cache() self.flag_vc = True self.block_frame = int(self.config.block_time * self.config.samplerate) self.crossfade_frame = int(self.config.crossfade_time * self.config.samplerate) self.sola_search_frame = int(0.012 * self.config.samplerate) self.delay_frame = int(0.01 * self.config.samplerate) # Reserve 0.02s forward self.extra_frame = int(self.config.extra_time * self.config.samplerate) self.rvc = None self.rvc = RVC( self.config.pitch, self.config.f0_method, self.config.hubert_path, self.config.pth_path, self.config.index_path, self.config.npy_path, self.config.index_rate, ) self.input_wav: np.ndarray = np.zeros( self.extra_frame + self.crossfade_frame + self.sola_search_frame + self.block_frame, dtype="float32", ) self.output_wav: torch.Tensor = torch.zeros( self.block_frame, device=device, dtype=torch.float32 ) self.sola_buffer: torch.Tensor = torch.zeros( self.crossfade_frame, device=device, dtype=torch.float32 ) self.fade_in_window: torch.Tensor = torch.linspace( 0.0, 1.0, steps=self.crossfade_frame, device=device, dtype=torch.float32 ) self.fade_out_window: torch.Tensor = 1 - self.fade_in_window self.resampler1 = tat.Resample( orig_freq=self.config.samplerate, new_freq=16000, dtype=torch.float32 ) self.resampler2 = tat.Resample( orig_freq=self.rvc.tgt_sr, new_freq=self.config.samplerate, dtype=torch.float32, ) thread_vc = threading.Thread(target=self.soundinput) thread_vc.start() def soundinput(self): """ accept audio input """ with sd.Stream( callback=self.audio_callback, blocksize=self.block_frame, samplerate=self.config.samplerate, dtype="float32", ): while self.flag_vc: time.sleep(self.config.block_time) print("Audio block passed.") print("ENDing VC") def audio_callback( self, indata: np.ndarray, outdata: np.ndarray, frames, times, status ): """ audio processing """ start_time = time.perf_counter() indata = librosa.to_mono(indata.T) if self.config.I_noise_reduce: indata[:] = nr.reduce_noise(y=indata, sr=self.config.samplerate) """noise gate""" frame_length = 2048 hop_length = 1024 rms = librosa.feature.rms( y=indata, frame_length=frame_length, hop_length=hop_length ) db_threhold = librosa.amplitude_to_db(rms, ref=1.0)[0] < self.config.threhold # print(rms.shape,db.shape,db) for i in range(db_threhold.shape[0]): if db_threhold[i]: indata[i * hop_length : (i + 1) * hop_length] = 0 self.input_wav[:] = np.append(self.input_wav[self.block_frame :], indata) # infer print("input_wav:" + str(self.input_wav.shape)) # print('infered_wav:'+str(infer_wav.shape)) infer_wav: torch.Tensor = self.resampler2( self.rvc.infer(self.resampler1(torch.from_numpy(self.input_wav))) )[-self.crossfade_frame - self.sola_search_frame - self.block_frame :].to( device ) print("infer_wav:" + str(infer_wav.shape)) # SOLA algorithm from https://github.com/yxlllc/DDSP-SVC cor_nom = F.conv1d( infer_wav[None, None, : self.crossfade_frame + self.sola_search_frame], self.sola_buffer[None, None, :], ) cor_den = torch.sqrt( F.conv1d( infer_wav[None, None, : self.crossfade_frame + self.sola_search_frame] ** 2, torch.ones(1, 1, self.crossfade_frame, device=device), ) + 1e-8 ) sola_offset = torch.argmax(cor_nom[0, 0] / cor_den[0, 0]) print("sola offset: " + str(int(sola_offset))) # crossfade self.output_wav[:] = infer_wav[sola_offset : sola_offset + self.block_frame] self.output_wav[: self.crossfade_frame] *= self.fade_in_window self.output_wav[: self.crossfade_frame] += self.sola_buffer[:] if sola_offset < self.sola_search_frame: self.sola_buffer[:] = ( infer_wav[ -self.sola_search_frame - self.crossfade_frame + sola_offset : -self.sola_search_frame + sola_offset ] * self.fade_out_window ) else: self.sola_buffer[:] = ( infer_wav[-self.crossfade_frame :] * self.fade_out_window ) if self.config.O_noise_reduce: outdata[:] = np.tile( nr.reduce_noise( y=self.output_wav[:].cpu().numpy(), sr=self.config.samplerate ), (2, 1), ).T else: outdata[:] = self.output_wav[:].repeat(2, 1).t().cpu().numpy() total_time = time.perf_counter() - start_time self.window["infer_time"].update(int(total_time * 1000)) print("infer time:" + str(total_time)) print("f0_method: " + str(self.config.f0_method)) def get_devices(self, update: bool = True): """Get device list""" if update: sd._terminate() sd._initialize() devices = sd.query_devices() hostapis = sd.query_hostapis() for hostapi in hostapis: for device_idx in hostapi["devices"]: devices[device_idx]["hostapi_name"] = hostapi["name"] input_devices = [ f"{d['name']} ({d['hostapi_name']})" for d in devices if d["max_input_channels"] > 0 ] output_devices = [ f"{d['name']} ({d['hostapi_name']})" for d in devices if d["max_output_channels"] > 0 ] input_devices_indices = [ d["index"] if "index" in d else d["name"] for d in devices if d["max_input_channels"] > 0 ] output_devices_indices = [ d["index"] if "index" in d else d["name"] for d in devices if d["max_output_channels"] > 0 ] return ( input_devices, output_devices, input_devices_indices, output_devices_indices, ) def set_devices(self, input_device, output_device): """Set up output device""" ( input_devices, output_devices, input_device_indices, output_device_indices, ) = self.get_devices() sd.default.device[0] = input_device_indices[input_devices.index(input_device)] sd.default.device[1] = output_device_indices[ output_devices.index(output_device) ] print("input device:" + str(sd.default.device[0]) + ":" + str(input_device)) print("output device:" + str(sd.default.device[1]) + ":" + str(output_device)) gui = GUI()