Spaces:
Runtime error
Runtime error
# --- Imports --- | |
import os | |
import shutil | |
import traceback | |
import asyncio | |
import subprocess | |
from datetime import datetime | |
import gradio as gr | |
import torch | |
import numpy as np | |
import librosa | |
import soundfile as sf | |
import yt_dlp | |
import edge_tts | |
from fairseq import checkpoint_utils | |
# --- Local Module Imports --- | |
from lib.infer_pack.models import ( | |
SynthesizerTrnMs256NSFsid, | |
SynthesizerTrnMs256NSFsid_nono, | |
SynthesizerTrnMs768NSFsid, | |
SynthesizerTrnMs768NSFsid_nono, | |
) | |
from vc_infer_pipeline import VC | |
from config import Config | |
# --- Constants and Configuration --- | |
now_dir = os.getcwd() | |
config = Config() | |
# Define paths for read-only models from the repository | |
HUBERT_PATH = os.path.join(now_dir, "pretraineds", "hubert_base.pt") | |
RMVPE_PATH = os.path.join(now_dir, "pretraineds", "rmvpe.pt") | |
WEIGHT_ROOT = os.path.join(now_dir, "weights") | |
INDEX_ROOT = os.path.join(WEIGHT_ROOT, "index") | |
# The /app directory is read-only in the Docker environment. | |
main_tmp_dir = "/tmp/rvc_app" | |
output_dir = os.path.join(main_tmp_dir, "output") # For demucs output | |
dl_audio_dir = os.path.join(main_tmp_dir, "dl_audio") # For youtube-dl output | |
tts_audio_dir = os.path.join(main_tmp_dir, "tts") # For tts output | |
# Create all necessary temporary directories at startup | |
shutil.rmtree(main_tmp_dir, ignore_errors=True) | |
os.makedirs(output_dir, exist_ok=True) | |
os.makedirs(dl_audio_dir, exist_ok=True) | |
os.makedirs(tts_audio_dir, exist_ok=True) | |
os.environ["TEMP"] = main_tmp_dir # Set for any underlying libraries | |
# --- Model Loading (Cached for Performance) --- | |
def load_hubert_model(): | |
"""Loads the Hubert model and caches it.""" | |
print("Loading Hubert model...") | |
models, _, _ = checkpoint_utils.load_model_ensemble_and_task([HUBERT_PATH], suffix="") | |
hubert_model = models[0] | |
hubert_model = hubert_model.to(config.device) | |
hubert_model = hubert_model.half() if config.is_half else hubert_model.float() | |
hubert_model.eval() | |
print("Hubert model loaded.") | |
return hubert_model | |
hubert_model = load_hubert_model() | |
# --- Utility Functions --- | |
def get_models_and_indices(): | |
"""Scans the weights folders and returns lists of available models and indices.""" | |
model_files = [f for f in os.listdir(WEIGHT_ROOT) if f.endswith(".pth")] | |
index_files = [os.path.join(INDEX_ROOT, f) for f in os.listdir(INDEX_ROOT) if f.endswith('.index') and "trained" not in f] | |
return sorted(model_files), sorted(index_files) | |
def get_edge_tts_voices(): | |
"""Fetches the list of available voices for Edge-TTS.""" | |
try: | |
tts_voice_list = asyncio.run(edge_tts.list_voices()) | |
return [f"{v['ShortName']}-{v['Gender']}" for v in tts_voice_list] | |
except Exception as e: | |
print(f"Error fetching TTS voices: {e}. Returning a default list.") | |
return ["en-US-AnaNeural-Female", "en-US-AriaNeural-Female", "en-GB-SoniaNeural-Female"] | |
# --- Core Logic (Updated with correct paths) --- | |
def vc_single(sid, input_audio_tuple, f0_up_key, f0_method, file_index, index_rate, filter_radius, resample_sr, rms_mix_rate, protect, f0_file, loaded_model): | |
if not input_audio_tuple: return "You need to upload an audio file.", None | |
if not loaded_model or loaded_model["sid"] != sid: return "Model not loaded. Please select a model from the dropdown.", None | |
net_g, tgt_sr, vc, version, if_f0 = loaded_model["model"], loaded_model["tgt_sr"], loaded_model["vc"], loaded_model["version"], loaded_model["if_f0"] | |
try: | |
sampling_rate, audio_data = input_audio_tuple | |
audio_data = (audio_data / np.iinfo(audio_data.dtype).max).astype(np.float32) | |
if len(audio_data.shape) > 1: audio_data = librosa.to_mono(audio_data.transpose(1, 0)) | |
if sampling_rate != 16000: audio_data = librosa.resample(audio=audio_data, orig_sr=sampling_rate, target_sr=16000) | |
times = [0, 0, 0] | |
audio_opt = vc.pipeline(hubert_model, net_g, sid, audio_data, "dummy_path", times, int(f0_up_key), f0_method, file_index, index_rate, if_f0, filter_radius, tgt_sr, resample_sr, rms_mix_rate, version, protect, f0_file=f0_file) | |
final_sr = resample_sr if resample_sr >= 16000 else tgt_sr | |
index_info = f"Using index: {os.path.basename(file_index)}" if file_index and os.path.exists(file_index) else "Index not used." | |
info = f"Success. {index_info}\nTime: npy:{times[0]:.2f}s, f0:{times[1]:.2f}s, infer:{times[2]:.2f}s" | |
return info, (final_sr, audio_opt) | |
except Exception: return traceback.format_exc(), None | |
def load_selected_model(sid, protect_val): | |
if not sid: return None, gr.update(maximum=2333, visible=False), gr.update(visible=True), gr.update(value=""), gr.update(value="# <center> No model selected") | |
print(f"Loading model: {sid}") | |
try: | |
cpt = torch.load(os.path.join(WEIGHT_ROOT, sid), map_location="cpu") | |
tgt_sr, n_spk = cpt["config"][-1], cpt["weight"]["emb_g.weight"].shape[0] | |
cpt["config"][-3] = n_spk | |
if_f0, version = cpt.get("f0", 1), cpt.get("version", "v1") | |
synth_class = {"v1": {1: SynthesizerTrnMs256NSFsid, 0: SynthesizerTrnMs256NSFsid_nono}, "v2": {1: SynthesizerTrnMs768NSFsid, 0: SynthesizerTrnMs768NSFsid_nono}}[version][if_f0] | |
net_g = synth_class(*cpt["config"], is_half=config.is_half) | |
del net_g.enc_q | |
net_g.load_state_dict(cpt["weight"], strict=False) | |
net_g.eval().to(config.device) | |
net_g = net_g.half() if config.is_half else net_g.float() | |
vc = VC(tgt_sr, config) | |
loaded_model_state = {"sid": sid, "model": net_g, "tgt_sr": tgt_sr, "vc": vc, "version": version, "if_f0": if_f0, "n_spk": n_spk} | |
model_name_no_ext, (_, index_files) = os.path.splitext(sid)[0], get_models_and_indices() | |
best_index = next((index_file for index_file in index_files if model_name_no_ext in os.path.basename(index_file)), "") | |
return loaded_model_state, gr.update(maximum=n_spk - 1, visible=True), gr.update(visible=(if_f0 != 0), value=protect_val), gr.update(value=best_index), gr.update(value=f'## <center> ✅ Loaded: {model_name_no_ext}\n### <center> RVC {version} Model') | |
except Exception: return None, gr.update(visible=False), gr.update(visible=True), gr.update(value=""), gr.update(value=f"# <center> ⚠️ Error loading {sid}") | |
def run_tts(tts_text, tts_voice): | |
if not tts_text or not tts_voice: raise gr.Error("TTS text and voice are required.") | |
output_file = os.path.join(tts_audio_dir, "tts_output.mp3") | |
try: | |
asyncio.run(edge_tts.Communicate(tts_text, "-".join(tts_voice.split('-')[:-1])).save(output_file)) | |
return "TTS audio generated.", output_file | |
except Exception as e: return f"TTS failed: {e}", None | |
def run_youtube_dl(url): | |
if not url: raise gr.Error("URL is required.") | |
output_path = os.path.join(dl_audio_dir, "audio.wav") | |
ydl_opts = {'noplaylist': True, 'format': 'bestaudio/best', 'postprocessors': [{'key': 'FFmpegExtractAudio', 'preferredcodec': 'wav'}], "outtmpl": os.path.join(dl_audio_dir, "audio"), 'quiet': True} | |
try: | |
with yt_dlp.YoutubeDL(ydl_opts) as ydl: ydl.download([url]) | |
return "Download complete.", output_path | |
except Exception as e: return f"Download failed: {e}", None | |
def run_demucs(audio_path, model="htdemucs_ft"): | |
if not audio_path or not os.path.exists(audio_path): raise gr.Error("Input audio for splitting not found.") | |
command = f"demucs --two-stems=vocals -n {model} \"{audio_path}\" -o \"{output_dir}\"" | |
print(f"Running command: {command}") | |
try: | |
subprocess.run(command.split(), check=True, capture_output=True, text=True) | |
input_filename = os.path.splitext(os.path.basename(audio_path))[0] | |
vocal_path = os.path.join(output_dir, model, input_filename, "vocals.wav") | |
inst_path = os.path.join(output_dir, model, input_filename, "no_vocals.wav") | |
if os.path.exists(vocal_path): return "Splitting complete.", vocal_path, inst_path | |
else: return "Splitting failed: vocal file not found.", None, None | |
except subprocess.CalledProcessError as e: | |
error_message = f"Demucs failed: {e.stderr}" | |
return error_message, None, None | |
def refresh_model_list_ui(): | |
models, indices = get_models_and_indices() | |
return gr.update(choices=models), gr.update(choices=indices) | |
# --- Gradio UI Layout --- | |
initial_models, initial_indices = get_models_and_indices() | |
tts_voices = get_edge_tts_voices() | |
with gr.Blocks(theme=gr.themes.Soft(primary_hue="rose", secondary_hue="pink")) as demo: | |
gr.Markdown("# 🌺 Modernized RVC Voice Conversion 🌺") | |
loaded_model_state = gr.State(value=None) | |
with gr.Row(): | |
sid = gr.Dropdown(label="1. Select Voice Model (.pth)", choices=initial_models) | |
refresh_button = gr.Button("🔄 Refresh", variant="secondary") | |
selected_model_info = gr.Markdown("# <center> No model selected", elem_id="model-info") | |
with gr.Tabs(): | |
with gr.TabItem("🎙️ Main Inference"): | |
with gr.Row(): | |
with gr.Column(scale=1): | |
gr.Markdown("### Input Audio"); input_audio_type = gr.Radio(["Upload", "Microphone", "TTS", "YouTube"], value="Upload", label="Input Source") | |
audio_in = gr.Audio(label="Upload or Record Audio", type="filepath", sources=["upload", "microphone"], visible=True) | |
tts_text_in, tts_voice_in, tts_gen_button = gr.Textbox(label="TTS Text", lines=3, visible=False), gr.Dropdown(label="TTS Voice", choices=tts_voices, value=tts_voices[0], visible=False), gr.Button("Generate TTS Audio", variant="primary", visible=False) | |
yt_url_in, yt_dl_button = gr.Textbox(label="YouTube URL", visible=False), gr.Button("Download from YouTube", variant="primary", visible=False) | |
gr.Markdown("### (Optional) Vocal Separation"); run_demucs_button = gr.Button("Separate Vocals from Input", variant="secondary") | |
demucs_output_vocals, demucs_output_inst, demucs_status = gr.Audio(label="Separated Vocals (for conversion)", type="filepath"), gr.Audio(label="Separated Instrumentals", type="filepath"), gr.Textbox(label="Splitter Status", interactive=False) | |
gr.Markdown("_Use the 'Separated Vocals' as input for the best results._") | |
with gr.Column(scale=1): | |
gr.Markdown("### Inference Settings"); spk_item = gr.Slider(minimum=0, maximum=2333, step=1, label="Speaker ID", value=0, visible=False, interactive=True) | |
vc_transform0 = gr.Number(label="Transpose (semitones)", value=0) | |
f0method0 = gr.Radio(label="Pitch Extraction Algorithm", choices=["pm", "harvest", "crepe", "rmvpe"] if os.path.exists(RMVPE_PATH) else ["pm", "harvest", "crepe"], value="rmvpe" if os.path.exists(RMVPE_PATH) else "pm", interactive=True) | |
file_index, index_rate0, filter_radius0 = gr.Dropdown(label="Feature Index File (.index)", choices=initial_indices, interactive=True), gr.Slider(minimum=0, maximum=1, label="Feature Retrieval Ratio", value=0.7, interactive=True), gr.Slider(minimum=0, maximum=7, label="Median Filtering Radius", value=3, step=1, interactive=True) | |
resample_sr0, rms_mix_rate0, protect0 = gr.Slider(minimum=0, maximum=48000, label="Output Resampling", value=0, step=1, interactive=True), gr.Slider(minimum=0, maximum=1, label="Volume Envelope Mix Ratio", value=1, interactive=True), gr.Slider(minimum=0, maximum=0.5, label="Voice Protection", value=0.33, step=0.01, interactive=True) | |
f0_file0 = gr.File(label="Optional F0 Curve File (.txt)", file_count="single") | |
with gr.Column(scale=1): | |
gr.Markdown("### Output"); convert_button = gr.Button("✨ Convert", variant="primary") | |
vc_log, vc_output = gr.Textbox(label="Output Information", interactive=False), gr.Audio(label="Converted Audio", interactive=False) | |
with gr.TabItem("📚 Add New Models"): | |
gr.Markdown("## How to Add New Models\n1. Go to the 'Files' tab of this Space.\n2. Navigate to the `weights` folder.\n3. Click 'Upload file' to add your `.pth` model file.\n4. Navigate to `weights/index` to upload your `.index` file.\n5. Come back here and click '🔄 Refresh'.") | |
sid.change(load_selected_model, [sid, protect0], [loaded_model_state, spk_item, protect0, file_index, selected_model_info]) | |
refresh_button.click(refresh_model_list_ui, None, [sid, file_index]) | |
convert_button.click(vc_single, [spk_item, demucs_output_vocals, vc_transform0, f0method0, file_index, index_rate0, filter_radius0, resample_sr0, rms_mix_rate0, protect0, f0_file0, loaded_model_state], [vc_log, vc_output]) | |
def update_input_visibility(c): return {audio_in: gr.update(visible=c in ["Upload", "Microphone"]), tts_text_in: gr.update(visible=c=="TTS"), tts_voice_in: gr.update(visible=c=="TTS"), tts_gen_button: gr.update(visible=c=="TTS"), yt_url_in: gr.update(visible=c=="YouTube"), yt_dl_button: gr.update(visible=c=="YouTube")} | |
input_audio_type.change(update_input_visibility, input_audio_type, [audio_in, tts_text_in, tts_voice_in, tts_gen_button, yt_url_in, yt_dl_button]) | |
tts_gen_button.click(run_tts, [tts_text_in, tts_voice_in], [demucs_status, audio_in]) | |
yt_dl_button.click(run_youtube_dl, [yt_url_in], [demucs_status, audio_in]) | |
run_demucs_button.click(run_demucs, [audio_in], [demucs_status, demucs_output_vocals, demucs_output_inst]) | |
demo.queue(max_size=20).launch() |