Spaces:
Runtime error
Runtime error
import gradio as gr | |
import os | |
import tempfile | |
import requests | |
import soundfile as sf | |
import json | |
import shutil | |
from pathlib import Path | |
import numpy as np | |
import re | |
from typing import Generator | |
# ===== NEUTTS IMPORTS ===== | |
try: | |
# Try multiple import approaches for NeuTTS | |
try: | |
# Approach 1: Direct import from the structure | |
from neutts import NeuTTSAir | |
except ImportError: | |
try: | |
# Approach 2: Import from the module directly | |
import sys | |
sys.path.append('/usr/local/lib/python3.10/site-packages') | |
from neutts import NeuTTSAir | |
except ImportError: | |
# Approach 3: Use the components directly | |
from phonemizer.backend import EspeakBackend | |
import perth | |
from neucodec import NeuCodec | |
from llama_cpp import Llama | |
# Define NeuTTSAir class manually | |
class NeuTTSAir: | |
def __init__(self, backbone_repo="neuphonic/neutts-air-q4-gguf", backbone_device="cpu", codec_repo="neuphonic/neucodec", codec_device="cpu"): | |
self.sample_rate = 24_000 | |
self.max_context = 2048 | |
self.hop_length = 480 | |
print("π§ Loading phonemizer...") | |
self.phonemizer = EspeakBackend(language="en-us", preserve_punctuation=True, with_stress=True) | |
self._load_backbone(backbone_repo, backbone_device) | |
self._load_codec(codec_repo, codec_device) | |
self.watermarker = perth.PerthImplicitWatermarker() | |
print("β NeuTTS-Air initialized!") | |
def _load_backbone(self, backbone_repo, backbone_device): | |
print(f"π§ Loading Q4 GGUF backbone: {backbone_repo}") | |
self.backbone = Llama.from_pretrained( | |
repo_id=backbone_repo, | |
filename="*.gguf", | |
n_ctx=self.max_context, | |
n_gpu_layers=0, | |
verbose=False, | |
use_mlock=False, | |
n_threads=2, | |
low_vram=True | |
) | |
def _load_codec(self, codec_repo, codec_device): | |
print(f"π§ Loading codec: {codec_repo}") | |
self.codec = NeuCodec.from_pretrained(codec_repo) | |
self.codec.eval().to(codec_device) | |
def infer(self, text: str, ref_codes: np.ndarray | torch.Tensor, ref_text: str) -> np.ndarray: | |
output_str = self._infer_gguf(ref_codes, ref_text, text) | |
wav = self._decode(output_str) | |
watermarked_wav = self.watermarker.apply_watermark(wav, sample_rate=24000) | |
return watermarked_wav | |
def encode_reference(self, ref_audio_path: str | Path): | |
import torch | |
import librosa | |
wav, _ = librosa.load(ref_audio_path, sr=16000, mono=True) | |
wav_tensor = torch.from_numpy(wav).float().unsqueeze(0).unsqueeze(0) | |
with torch.no_grad(): | |
ref_codes = self.codec.encode_code(audio_or_path=wav_tensor).squeeze(0).squeeze(0) | |
return ref_codes.numpy() if isinstance(ref_codes, torch.Tensor) else ref_codes | |
def _decode(self, codes: str): | |
speech_ids = [int(num) for num in re.findall(r"<\|speech_(\d+)\|>", codes)] | |
if len(speech_ids) > 0: | |
import torch | |
with torch.no_grad(): | |
codes_tensor = torch.tensor(speech_ids, dtype=torch.long)[None, None, :].to(self.codec.device) | |
recon = self.codec.decode_code(codes_tensor).cpu().numpy() | |
return recon[0, 0, :] | |
else: | |
raise ValueError("No speech tokens found") | |
def _to_phones(self, text: str) -> str: | |
phones = self.phonemizer.phonemize([text]) | |
return " ".join(phones[0].split()) | |
def _infer_gguf(self, ref_codes: list, ref_text: str, input_text: str) -> str: | |
ref_text_phones = self._to_phones(ref_text) | |
input_text_phones = self._to_phones(input_text) | |
if isinstance(ref_codes, (torch.Tensor, np.ndarray)): | |
ref_codes = ref_codes.tolist() | |
codes_str = "".join([f"<|speech_{idx}|>" for idx in ref_codes]) | |
prompt = f"user: Convert the text to speech:<|TEXT_PROMPT_START|>{ref_text_phones} {input_text_phones}<|TEXT_PROMPT_END|>\nassistant:<|SPEECH_GENERATION_START|>{codes_str}" | |
output = self.backbone( | |
prompt, | |
max_tokens=self.max_context, | |
temperature=1.0, | |
top_k=50, | |
stop=["<|SPEECH_GENERATION_END|>"], | |
echo=False | |
) | |
return output["choices"][0]["text"] | |
NEUTTS_AVAILABLE = True | |
print("β NeuTTS-Air loaded successfully!") | |
except Exception as e: | |
NEUTTS_AVAILABLE = False | |
print(f"β NeuTTS-Air import failed: {e}") | |
# ===== CONFIGURATION ===== | |
CONFIG_FILE = "voice_profiles.json" | |
SAMPLE_DIR = "samples" | |
os.makedirs(SAMPLE_DIR, exist_ok=True) | |
# ===== VOICE PROFILE MANAGEMENT ===== | |
class VoiceProfileManager: | |
def __init__(self, config_file=CONFIG_FILE): | |
self.config_file = config_file | |
self.profiles = self.load_profiles() | |
def load_profiles(self): | |
if os.path.exists(self.config_file): | |
with open(self.config_file, 'r') as f: | |
return json.load(f) | |
return { | |
"dave": { | |
"audio_path": "samples/dave.wav", | |
"text": "Hey there, this is Dave speaking.", | |
"created_at": "default" | |
}, | |
"andrea": { | |
"audio_path": "samples/andrea.wav", | |
"text": "Hello, my name is Andrea.", | |
"created_at": "default" | |
} | |
} | |
def save_profiles(self): | |
with open(self.config_file, 'w') as f: | |
json.dump(self.profiles, f, indent=2) | |
def add_profile(self, name, audio_path, text): | |
self.profiles[name] = { | |
"audio_path": audio_path, | |
"text": text, | |
"created_at": str(np.datetime64('now')) | |
} | |
self.save_profiles() | |
return f"β Voice profile '{name}' saved!" | |
def get_profile(self, name): | |
return self.profiles.get(name) | |
def list_profiles(self): | |
return list(self.profiles.keys()) | |
# ===== SAMPLE MANAGEMENT ===== | |
def download_default_samples(): | |
"""Download default sample voices""" | |
samples = { | |
"dave": { | |
"audio": "https://github.com/neophonic/neutts-air/raw/main/samples/dave.wav", | |
"text": "Hey there, this is Dave speaking." | |
}, | |
"andrea": { | |
"audio": "https://github.com/neophonic/neutts-air/raw/main/samples/andrea.wav", | |
"text": "Hello, my name is Andrea." | |
} | |
} | |
for name, urls in samples.items(): | |
audio_path = f"{SAMPLE_DIR}/{name}.wav" | |
text_path = f"{SAMPLE_DIR}/{name}.txt" | |
if not os.path.exists(audio_path): | |
print(f"π₯ Downloading {name} sample...") | |
try: | |
# Download audio | |
response = requests.get(urls["audio"], timeout=60) | |
response.raise_for_status() # Check for download errors | |
with open(audio_path, 'wb') as f: | |
f.write(response.content) | |
# Write text | |
with open(text_path, 'w') as f: | |
f.write(urls["text"]) | |
print(f"β Finished downloading {name}.") # <-- Corrected line | |
except requests.exceptions.RequestException as e: | |
print(f"β Failed to download {name}: {e}")) |