Spaces:
Running
on
Zero
Running
on
Zero
from dataclasses import dataclass | |
from pathlib import Path | |
import os | |
import librosa | |
import torch | |
import perth | |
import torch.nn.functional as F | |
from safetensors.torch import load_file as load_safetensors | |
from huggingface_hub import snapshot_download | |
from .models.t3 import T3 | |
from .models.t3.modules.t3_config import T3Config | |
from .models.s3tokenizer import S3_SR, drop_invalid_tokens | |
from .models.s3gen import S3GEN_SR, S3Gen | |
from .models.tokenizers import MTLTokenizer | |
from .models.voice_encoder import VoiceEncoder | |
from .models.t3.modules.cond_enc import T3Cond | |
REPO_ID = "ResembleAI/chatterbox" | |
# Supported languages for the multilingual model | |
SUPPORTED_LANGUAGES = { | |
"ar": "Arabic", | |
"da": "Danish", | |
"de": "German", | |
"el": "Greek", | |
"en": "English", | |
"es": "Spanish", | |
"fi": "Finnish", | |
"fr": "French", | |
"he": "Hebrew", | |
"hi": "Hindi", | |
"it": "Italian", | |
"ja": "Japanese", | |
"ko": "Korean", | |
"ms": "Malay", | |
"nl": "Dutch", | |
"no": "Norwegian", | |
"pl": "Polish", | |
"pt": "Portuguese", | |
"ru": "Russian", | |
"sv": "Swedish", | |
"sw": "Swahili", | |
"tr": "Turkish", | |
"zh": "Chinese", | |
} | |
def punc_norm(text: str) -> str: | |
""" | |
Quick cleanup func for punctuation from LLMs or | |
containing chars not seen often in the dataset | |
""" | |
if len(text) == 0: | |
return "You need to add some text for me to talk." | |
# Capitalise first letter | |
if text[0].islower(): | |
text = text[0].upper() + text[1:] | |
# Remove multiple space chars | |
text = " ".join(text.split()) | |
# Replace uncommon/llm punc | |
punc_to_replace = [ | |
("...", ", "), | |
("…", ", "), | |
(":", ","), | |
(" - ", ", "), | |
(";", ", "), | |
("—", "-"), | |
("–", "-"), | |
(" ,", ","), | |
("“", "\""), | |
("”", "\""), | |
("‘", "'"), | |
("’", "'"), | |
] | |
for old_char_sequence, new_char in punc_to_replace: | |
text = text.replace(old_char_sequence, new_char) | |
# Add full stop if no ending punc | |
text = text.rstrip(" ") | |
sentence_enders = {".", "!", "?", "-", ",","、",",","。","?","!"} | |
if not any(text.endswith(p) for p in sentence_enders): | |
text += "." | |
return text | |
class Conditionals: | |
""" | |
Conditionals for T3 and S3Gen | |
- T3 conditionals: | |
- speaker_emb | |
- clap_emb | |
- cond_prompt_speech_tokens | |
- cond_prompt_speech_emb | |
- emotion_adv | |
- S3Gen conditionals: | |
- prompt_token | |
- prompt_token_len | |
- prompt_feat | |
- prompt_feat_len | |
- embedding | |
""" | |
t3: T3Cond | |
gen: dict | |
def to(self, device): | |
self.t3 = self.t3.to(device=device) | |
for k, v in self.gen.items(): | |
if torch.is_tensor(v): | |
self.gen[k] = v.to(device=device) | |
return self | |
def save(self, fpath: Path): | |
arg_dict = dict( | |
t3=self.t3.__dict__, | |
gen=self.gen | |
) | |
torch.save(arg_dict, fpath) | |
def load(cls, fpath, map_location="cpu"): | |
kwargs = torch.load(fpath, map_location=map_location, weights_only=True) | |
return cls(T3Cond(**kwargs['t3']), kwargs['gen']) | |
class ChatterboxMultilingualTTS: | |
ENC_COND_LEN = 6 * S3_SR | |
DEC_COND_LEN = 10 * S3GEN_SR | |
def __init__( | |
self, | |
t3: T3, | |
s3gen: S3Gen, | |
ve: VoiceEncoder, | |
tokenizer: MTLTokenizer, | |
device: str, | |
conds: Conditionals = None, | |
): | |
self.sr = S3GEN_SR # sample rate of synthesized audio | |
self.t3 = t3 | |
self.s3gen = s3gen | |
self.ve = ve | |
self.tokenizer = tokenizer | |
self.device = device | |
self.conds = conds | |
self.watermarker = perth.PerthImplicitWatermarker() | |
def get_supported_languages(cls): | |
"""Return dictionary of supported language codes and names.""" | |
return SUPPORTED_LANGUAGES.copy() | |
def from_local(cls, ckpt_dir, device) -> 'ChatterboxMultilingualTTS': | |
ckpt_dir = Path(ckpt_dir) | |
ve = VoiceEncoder() | |
ve.load_state_dict( | |
torch.load(ckpt_dir / "ve.pt", weights_only=True) | |
) | |
ve.to(device).eval() | |
t3 = T3(T3Config.multilingual()) | |
t3_state = load_safetensors(ckpt_dir / "t3_23lang.safetensors") | |
if "model" in t3_state.keys(): | |
t3_state = t3_state["model"][0] | |
t3.load_state_dict(t3_state) | |
t3.to(device).eval() | |
s3gen = S3Gen() | |
s3gen.load_state_dict( | |
torch.load(ckpt_dir / "s3gen.pt", weights_only=True) | |
) | |
s3gen.to(device).eval() | |
tokenizer = MTLTokenizer( | |
str(ckpt_dir / "mtl_tokenizer.json") | |
) | |
conds = None | |
if (builtin_voice := ckpt_dir / "conds.pt").exists(): | |
conds = Conditionals.load(builtin_voice).to(device) | |
return cls(t3, s3gen, ve, tokenizer, device, conds=conds) | |
def from_pretrained(cls, device: torch.device) -> 'ChatterboxMultilingualTTS': | |
ckpt_dir = Path( | |
snapshot_download( | |
repo_id=REPO_ID, | |
repo_type="model", | |
revision="main", | |
allow_patterns=["ve.pt", "t3_23lang.safetensors", "s3gen.pt", "mtl_tokenizer.json", "conds.pt", "Cangjie5_TC.json"], | |
token=os.getenv("HF_TOKEN"), | |
) | |
) | |
return cls.from_local(ckpt_dir, device) | |
def prepare_conditionals(self, wav_fpath, exaggeration=0.5): | |
## Load reference wav | |
s3gen_ref_wav, _sr = librosa.load(wav_fpath, sr=S3GEN_SR) | |
ref_16k_wav = librosa.resample(s3gen_ref_wav, orig_sr=S3GEN_SR, target_sr=S3_SR) | |
s3gen_ref_wav = s3gen_ref_wav[:self.DEC_COND_LEN] | |
s3gen_ref_dict = self.s3gen.embed_ref(s3gen_ref_wav, S3GEN_SR, device=self.device) | |
# Speech cond prompt tokens | |
t3_cond_prompt_tokens = None | |
if plen := self.t3.hp.speech_cond_prompt_len: | |
s3_tokzr = self.s3gen.tokenizer | |
t3_cond_prompt_tokens, _ = s3_tokzr.forward([ref_16k_wav[:self.ENC_COND_LEN]], max_len=plen) | |
t3_cond_prompt_tokens = torch.atleast_2d(t3_cond_prompt_tokens).to(self.device) | |
# Voice-encoder speaker embedding | |
ve_embed = torch.from_numpy(self.ve.embeds_from_wavs([ref_16k_wav], sample_rate=S3_SR)) | |
ve_embed = ve_embed.mean(axis=0, keepdim=True).to(self.device) | |
t3_cond = T3Cond( | |
speaker_emb=ve_embed, | |
cond_prompt_speech_tokens=t3_cond_prompt_tokens, | |
emotion_adv=exaggeration * torch.ones(1, 1, 1), | |
).to(device=self.device) | |
self.conds = Conditionals(t3_cond, s3gen_ref_dict) | |
def generate( | |
self, | |
text, | |
language_id, | |
audio_prompt_path=None, | |
exaggeration=0.5, | |
cfg_weight=0.5, | |
temperature=0.8, | |
repetition_penalty=2.0, | |
min_p=0.05, | |
top_p=1.0, | |
): | |
# Validate language_id | |
if language_id and language_id.lower() not in SUPPORTED_LANGUAGES: | |
supported_langs = ", ".join(SUPPORTED_LANGUAGES.keys()) | |
raise ValueError( | |
f"Unsupported language_id '{language_id}'. " | |
f"Supported languages: {supported_langs}" | |
) | |
if audio_prompt_path: | |
self.prepare_conditionals(audio_prompt_path, exaggeration=exaggeration) | |
else: | |
assert self.conds is not None, "Please `prepare_conditionals` first or specify `audio_prompt_path`" | |
# Update exaggeration if needed | |
if float(exaggeration) != float(self.conds.t3.emotion_adv[0, 0, 0].item()): | |
_cond: T3Cond = self.conds.t3 | |
self.conds.t3 = T3Cond( | |
speaker_emb=_cond.speaker_emb, | |
cond_prompt_speech_tokens=_cond.cond_prompt_speech_tokens, | |
emotion_adv=exaggeration * torch.ones(1, 1, 1), | |
).to(device=self.device) | |
# Norm and tokenize text | |
text = punc_norm(text) | |
text_tokens = self.tokenizer.text_to_tokens(text, language_id=language_id.lower() if language_id else None).to(self.device) | |
text_tokens = torch.cat([text_tokens, text_tokens], dim=0) # Need two seqs for CFG | |
sot = self.t3.hp.start_text_token | |
eot = self.t3.hp.stop_text_token | |
text_tokens = F.pad(text_tokens, (1, 0), value=sot) | |
text_tokens = F.pad(text_tokens, (0, 1), value=eot) | |
with torch.inference_mode(): | |
speech_tokens = self.t3.inference( | |
t3_cond=self.conds.t3, | |
text_tokens=text_tokens, | |
max_new_tokens=1000, # TODO: use the value in config | |
temperature=temperature, | |
cfg_weight=cfg_weight, | |
repetition_penalty=repetition_penalty, | |
min_p=min_p, | |
top_p=top_p, | |
) | |
# Extract only the conditional batch. | |
speech_tokens = speech_tokens[0] | |
# TODO: output becomes 1D | |
speech_tokens = drop_invalid_tokens(speech_tokens) | |
speech_tokens = speech_tokens.to(self.device) | |
wav, _ = self.s3gen.inference( | |
speech_tokens=speech_tokens, | |
ref_dict=self.conds.gen, | |
) | |
wav = wav.squeeze(0).detach().cpu().numpy() | |
watermarked_wav = self.watermarker.apply_watermark(wav, sample_rate=self.sr) | |
return torch.from_numpy(watermarked_wav).unsqueeze(0) | |