Spaces:
Sleeping
Sleeping
import gradio as gr | |
import numpy as np | |
import logging | |
import warnings | |
import torch | |
import re | |
import time | |
from kokoro import KPipeline | |
import os | |
# import shutil | |
import soundfile as sf # You need to pip install soundfile | |
AUDIO_DIR = "audio_exports" | |
AUDIO_FILE_PATH = None | |
# Configure logging and suppress warnings | |
logging.basicConfig(level=logging.INFO) | |
warnings.filterwarnings("ignore", category=UserWarning, module="torch.nn.modules.rnn") | |
warnings.filterwarnings( | |
"ignore", category=FutureWarning, module="torch.nn.utils.weight_norm" | |
) | |
# Create output directory if it doesn't exist | |
os.makedirs(AUDIO_DIR, exist_ok=True) | |
# Initialize global variables | |
LANG_CODE = "a" # Default to American English | |
PIPELINE = None | |
CURRENT_VOICE = "af_bella" # Default voice | |
# Timing metrics | |
PIPELINE_LOAD_TIME = 0 | |
AUDIO_GEN_TIME = 0 | |
loading_time_box = None | |
# Mapping from human-readable to Kokoro language codes | |
LANG_MAP = { | |
"American English (en-us)": "a", | |
"British English (en-gb)": "b", | |
"Spanish (es)": "e", | |
"French (fr-fr)": "f", | |
"Hindi (hi)": "h", | |
"Italian (it)": "i", | |
"Japanese (ja)": "j", | |
"Brazilian Portuguese (pt-br)": "p", | |
"Mandarin Chinese (zh)": "z", | |
} | |
# Reverse mapping for display | |
CODE_TO_LANG = {v: k for k, v in LANG_MAP.items()} | |
# Complete list of all voices by language | |
ALL_VOICES = { | |
"a": [ | |
"af_heart", | |
"af_alloy", | |
"af_aoede", | |
"af_bella", | |
"af_jessica", | |
"af_kore", | |
"af_nicole", | |
"af_nova", | |
"af_river", | |
"af_sarah", | |
"af_sky", | |
"am_adam", | |
"am_echo", | |
"am_eric", | |
"am_fenrir", | |
"am_liam", | |
"am_michael", | |
"am_onyx", | |
"am_puck", | |
"am_santa", | |
], | |
"b": [ | |
"bf_alice", | |
"bf_emma", | |
"bf_isabella", | |
"bf_lily", | |
"bm_daniel", | |
"bm_fable", | |
"bm_george", | |
"bm_lewis", | |
], | |
"e": ["ef_dora", "em_alex", "em_santa"], | |
"f": ["ff_siwis"], | |
"h": ["hf_alpha", "hf_beta", "hm_omega", "hm_psi"], | |
"i": ["if_sara", "im_nicola"], | |
"j": ["jf_alpha", "jf_gongitsune", "jf_nezumi", "jf_tebukuro", "jm_kumo"], | |
"p": ["pf_dora", "pm_alex", "pm_santa"], | |
"z": [ | |
"zf_xiaobei", | |
"zf_xiaoni", | |
"zf_xiaoxiao", | |
"zf_xiaoyi", | |
"zm_yunjian", | |
"zm_yunxi", | |
"zm_yunxia", | |
"zm_yunyang", | |
], | |
} | |
# Voice ratings (A, B, C, etc.) for voice recommendation | |
VOICE_RATINGS = { | |
"af_heart": "A", | |
"af_bella": "A-", | |
"af_nicole": "B-", | |
"bf_emma": "B-", | |
"ff_siwis": "B-", | |
} | |
# Add generic ratings for all other voices | |
for lang_code, voices in ALL_VOICES.items(): | |
for voice in voices: | |
if voice not in VOICE_RATINGS: | |
if voice.startswith(lang_code + "f_"): # Female voices generally better | |
VOICE_RATINGS[voice] = "C+" | |
else: | |
VOICE_RATINGS[voice] = "C" | |
# Split pattern presets | |
SPLIT_PATTERNS = { | |
"Paragraphs (one or more newlines)": r"\n+", | |
"Sentences (periods, question marks, exclamation points)": r"(?<=[.!?])\s+", | |
"Commas and semicolons": r"[,;]\s+", | |
"No splitting (process as one chunk)": r"$^", # Pattern that won't match anything | |
"Custom": "custom", | |
} | |
# Flatten all voices list for full selection | |
ALL_VOICES_FLAT = [] | |
for voices in ALL_VOICES.values(): | |
ALL_VOICES_FLAT.extend(voices) | |
# Initialize pipeline | |
def init_pipeline(lang_code="a"): | |
""" | |
Initialize or reload the Kokoro pipeline for a specific language | |
""" | |
global PIPELINE, LANG_CODE, PIPELINE_LOAD_TIME | |
print(f"Initializing pipeline for language code: {lang_code}") | |
# Track loading time | |
start_time = time.time() | |
# Load the pipeline | |
LANG_CODE = lang_code | |
PIPELINE = KPipeline(lang_code=lang_code, repo_id="hexgrad/Kokoro-82M") | |
# Calculate loading time | |
PIPELINE_LOAD_TIME = time.time() - start_time | |
# Log language change | |
lang_name = CODE_TO_LANG.get(lang_code, f"Unknown ({lang_code})") | |
print(f"Pipeline loaded for {lang_name} in {PIPELINE_LOAD_TIME:.6f} seconds") | |
return PIPELINE, PIPELINE_LOAD_TIME | |
# Initialize the default pipeline | |
PIPELINE, PIPELINE_LOAD_TIME = init_pipeline(LANG_CODE) | |
def preview_text_splitting(text, split_pattern): | |
""" | |
Preview how text will be split based on the pattern | |
""" | |
try: | |
if split_pattern == "$^": # Special case for no splitting | |
return [text] | |
chunks = re.split(split_pattern, text) | |
# Filter out empty chunks | |
chunks = [chunk.strip() for chunk in chunks if chunk.strip()] | |
return chunks | |
except Exception as e: | |
return [f"Error previewing split: {e}"] | |
def generate_audio(text, voice, split_pattern=r"\n+", speed=1.0, output_dir=AUDIO_DIR): | |
""" | |
Generate audio using pure Kokoro with support for splitting | |
Args: | |
text: Text to synthesize | |
voice: Voice to use | |
split_pattern: Pattern to split text into chunks | |
speed: Speech speed | |
output_dir: Directory to save audio files | |
Returns: | |
Tuple of (audio_tuple, phonemes, split_info, timing_info) | |
""" | |
global PIPELINE, CURRENT_VOICE, AUDIO_GEN_TIME | |
# Create output directory if it doesn't exist | |
os.makedirs(output_dir, exist_ok=True) | |
# Update current voice | |
if voice != CURRENT_VOICE: | |
print(f"Voice changed from {CURRENT_VOICE} to {voice}") | |
CURRENT_VOICE = voice | |
# Handle "No splitting" special case | |
actual_split_pattern = split_pattern | |
if split_pattern == "$^": | |
print("Using no-split mode (processing as one chunk)") | |
# Preview how text will be split | |
chunks_preview = preview_text_splitting(text, actual_split_pattern) | |
split_info = f"Text split into {len(chunks_preview)} chunks using pattern: '{actual_split_pattern}'" | |
print(split_info) | |
# Process text | |
all_audio = [] | |
all_phonemes = [] | |
sample_rate = 24000 # Kokoro's sample rate | |
# Timing metrics | |
chunk_gen_times = [] | |
chunk_save_times = [] | |
generator_init_time = 0 | |
# Measure generator creation time too | |
generator_start_time = time.time() | |
generator = PIPELINE( | |
text, voice=voice, speed=speed, split_pattern=actual_split_pattern | |
) | |
generator_init_time = time.time() - generator_start_time | |
print(f"Generator initialization: {generator_init_time:.6f}s") | |
# Start tracking overall generation time and iteration time | |
gen_start_time = time.time() | |
iter_start_time = time.time() | |
for i, (gs, ps, audio) in enumerate(generator): | |
# Track time for this chunk | |
chunk_start_time = time.time() | |
# Save the phonemes for each chunk | |
all_phonemes.append(f"Chunk {i + 1}: {ps}") | |
# Convert PyTorch tensor to NumPy array if needed | |
if isinstance(audio, torch.Tensor): | |
audio_chunk = audio.detach().cpu().numpy() | |
else: | |
audio_chunk = audio | |
all_audio.append(audio_chunk) | |
# Calculate chunk generation time | |
chunk_gen_time = time.time() - chunk_start_time | |
chunk_gen_times.append(chunk_gen_time) | |
print(f"Chunk {i + 1} generated in {chunk_gen_time:.6f}s") | |
# Save individual chunk to file | |
save_start_time = time.time() | |
chunk_filename = os.path.join(output_dir, f"chunk_{i + 1}_{voice}.wav") | |
sf.write(chunk_filename, audio_chunk, sample_rate) | |
chunk_save_time = time.time() - save_start_time | |
chunk_save_times.append(chunk_save_time) | |
print(f"Chunk {i + 1} saved to {chunk_filename} in {chunk_save_time:.6f}s") | |
# Calculate iteration time (includes Kokoro processing) | |
iter_total_time = time.time() - iter_start_time | |
print(f"Total iteration time: {iter_total_time:.6f}s") | |
# Calculate the "hidden" Kokoro processing time by subtracting our measured components | |
sum_chunk_processing = sum(chunk_gen_times) + sum(chunk_save_times) | |
kokoro_processing_time = iter_total_time - sum_chunk_processing | |
# Time to combine chunks | |
combine_start_time = time.time() | |
if len(all_audio) > 1: | |
audio_data = np.concatenate(all_audio) | |
combine_time = time.time() - combine_start_time | |
print(f"Combined {len(all_audio)} chunks in {combine_time:.6f}s") | |
else: | |
audio_data = all_audio[0] if all_audio else np.array([]) | |
combine_time = 0 | |
# Time to save combined file | |
save_combined_start = time.time() | |
combined_filename = os.path.join(output_dir, f"combined_{voice}.wav") | |
sf.write(combined_filename, audio_data, sample_rate) | |
save_combined_time = time.time() - save_combined_start | |
print(f"Combined audio saved to {combined_filename} in {save_combined_time:.6f}s") | |
# Total time | |
AUDIO_GEN_TIME = time.time() - gen_start_time | |
# Create detailed timing info | |
chunks_count = len(all_audio) | |
timing_lines = [] | |
# Add Kokoro processing time | |
timing_lines.append(f"Kokoro processing time: {kokoro_processing_time:.6f}s") | |
# Per-chunk timing | |
if chunks_count > 1: | |
timing_lines.append("\nChunk details:") | |
for i, (t, s) in enumerate(zip(chunk_gen_times, chunk_save_times)): | |
timing_lines.append(f" Chunk {i + 1}: Gen {t:.6f}s, Save {s:.6f}s") | |
# Post-processing timing | |
if chunks_count > 1: | |
timing_lines.append(f"\nCombine chunks: {combine_time:.6f}s") | |
timing_lines.append(f"Save combined: {save_combined_time:.6f}s") | |
# Overall timing | |
post_processing = ( | |
sum(chunk_gen_times) + sum(chunk_save_times) + combine_time + save_combined_time | |
) | |
timing_lines.append(f"\nTotal Kokoro time: {kokoro_processing_time:.6f}s") | |
timing_lines.append(f"Total post-processing: {post_processing:.6f}s") | |
timing_lines.append(f"Total processing time: {AUDIO_GEN_TIME:.6f}s") | |
# Format timing info for display | |
timing_info = "\n".join(timing_lines) | |
# Combine phonemes | |
phonemes = "\n\n".join(all_phonemes) | |
# Update split info | |
if chunks_count > 1: | |
split_info = ( | |
f"Text was split into {chunks_count} chunks and saved to {output_dir}" | |
) | |
else: | |
split_info = f"Text processed as a single chunk and saved to {output_dir}" | |
return (sample_rate, audio_data), phonemes, split_info, timing_info | |
def on_language_change(language_display): | |
""" | |
Handle language change by reloading the pipeline | |
""" | |
global PIPELINE, LANG_CODE, PIPELINE_LOAD_TIME | |
# Get language code from display name | |
new_lang_code = LANG_MAP.get(language_display, "a") | |
# Only reload if language changed | |
if new_lang_code != LANG_CODE: | |
print( | |
f"Language changed from {LANG_CODE} to {new_lang_code}. Reloading pipeline..." | |
) | |
PIPELINE, PIPELINE_LOAD_TIME = init_pipeline(new_lang_code) | |
# Recommend voices for this language | |
recommended_voices = [] | |
# Find the top-rated voices for this language | |
for voice in ALL_VOICES.get(new_lang_code, []): | |
if voice in VOICE_RATINGS and VOICE_RATINGS[voice] in ["A", "A-", "B", "B-"]: | |
recommended_voices.append(f"{voice} ({VOICE_RATINGS[voice]})") | |
# If no high-rated voices, just take the first few | |
if not recommended_voices and new_lang_code in ALL_VOICES: | |
recommended_voices = [f"{v}" for v in ALL_VOICES[new_lang_code][:3]] | |
recommendation_text = f"Language changed to {language_display}. Pipeline loaded in {PIPELINE_LOAD_TIME:.6f} seconds." | |
if recommended_voices: | |
recommendation_text += f"\nRecommended voices: {', '.join(recommended_voices)}" | |
return recommendation_text, f"{PIPELINE_LOAD_TIME:.6f}s" | |
def on_split_pattern_change(pattern_name, custom_pattern): | |
""" | |
Handle changes to the split pattern selection | |
""" | |
if pattern_name == "Custom": | |
return custom_pattern, gr.update(visible=True) | |
else: | |
return SPLIT_PATTERNS[pattern_name], gr.update(visible=False) | |
def preview_splits(text, pattern): | |
""" | |
Preview how text will be split based on the pattern | |
""" | |
chunks = preview_text_splitting(text, pattern) | |
if len(chunks) == 1 and pattern == "$^": | |
return "Text will be processed as a single chunk (no splitting)" | |
result = f"Text will be split into {len(chunks)} chunks:\n\n" | |
for i, chunk in enumerate(chunks): | |
# Truncate very long chunks in the preview | |
display_chunk = chunk[:100] + "..." if len(chunk) > 100 else chunk | |
result += f"Chunk {i + 1}: {display_chunk}\n\n" | |
return result | |
def create_app(): | |
global loading_time_box, PIPELINE_LOAD_TIME | |
with gr.Blocks(theme=gr.themes.Soft(font=[gr.themes.GoogleFont("Lato"), gr.themes.GoogleFont("Roboto"), "system-ui", "sans-serif"])) as ui: | |
# Title | |
gr.Markdown("# Kokoro TTS Demo") | |
gr.Markdown("#### Pure Kokoro Implementation with Enhanced Text Splitting") | |
# Status message for language/voice changes | |
status_message = gr.Markdown("") | |
# Input controls | |
with gr.Row(): | |
with gr.Column(scale=1): | |
text_input = gr.TextArea( | |
label="Input Text", | |
value="Hello!\n\nThis is a multi-paragraph test.\nWith multiple lines.\n\nKokoro can split on paragraphs, sentences, or other patterns.", | |
lines=8, | |
) | |
# Information about split patterns | |
with gr.Accordion("About Text Splitting in Kokoro", open=False): | |
gr.Markdown(""" | |
### Understanding Text Splitting | |
The splitting pattern controls how Kokoro breaks your text into manageable chunks for processing. | |
**Common patterns:** | |
- `\\n+`: Split on one or more newlines (paragraphs) | |
- `(?<=[.!?])\\s+`: Split after periods, question marks, and exclamation points (sentences) | |
- `[,;]\\s+`: Split after commas and semicolons | |
- `$^`: Special pattern that won't match anything (processes the entire text as one chunk) | |
**Benefits of splitting:** | |
- Better phrasing and natural pauses | |
- Improved handling of longer texts | |
- More consistent pronunciation across chunks | |
**When to use different patterns:** | |
- Paragraph splits: Good for clearly separated content | |
- Sentence splits: Maintains sentence integrity but creates more natural breaks | |
- No splitting: Best for very short texts or when you want continuous flow | |
The preview feature lets you see exactly how your text will be divided before generating audio. | |
""") | |
# Split Pattern Selection | |
split_pattern_dropdown = gr.Dropdown( | |
label="Split Text Using", | |
value="Paragraphs (one or more newlines)", | |
choices=list(SPLIT_PATTERNS.keys()), | |
info="Select how to split your text into chunks", | |
) | |
custom_pattern_input = gr.Textbox( | |
label="Custom Split Pattern (Regular Expression)", | |
value=r"\n+", | |
visible=False, | |
info="Enter a custom regex pattern for splitting text", | |
) | |
preview_button = gr.Button("Preview Text Splitting") | |
split_preview = gr.Textbox( | |
label="Split Preview", | |
value="Click 'Preview Text Splitting' to see how your text will be divided", | |
lines=5, | |
) | |
with gr.Column(scale=1): | |
# Language selection | |
language_input = gr.Dropdown( | |
label="Language", | |
value="American English (en-us)", | |
choices=list(LANG_MAP.keys()), | |
info="Select the language for text processing", | |
) | |
# loading_time_box = gr.Textbox(label="Model Loading time", lines=1) | |
loading_time_box = gr.Label( | |
label="Lang loaded in", value=f"{PIPELINE_LOAD_TIME:.6f}s" | |
) | |
# Voice selection with grouping | |
with gr.Accordion("Voice Selection", open=True): | |
voice_input = gr.Dropdown( | |
label="Voice", | |
value="af_bella", | |
choices=sorted(ALL_VOICES_FLAT), | |
info="Select voice for synthesis", | |
) | |
gr.Markdown(""" | |
**Voice naming convention**: | |
- First letter = language: a=American, b=British, f=French, etc. | |
- Second letter = gender: f=female, m=male | |
- After underscore = voice name | |
""") | |
# Speed slider | |
speed_input = gr.Slider( | |
label="Speech Speed", | |
minimum=0.5, | |
maximum=1.5, | |
value=1.0, | |
step=0.1, | |
info="Adjust speaking rate", | |
) | |
with gr.Column(scale=1): | |
# Generate button | |
submit_button = gr.Button("Generate Audio", variant="primary") | |
# Outputs | |
audio_output = gr.Audio( | |
label="Generated Audio", format="wav", show_download_button=True | |
) | |
audio_gen_timing_output = gr.Textbox( | |
label="Performance Metrics", lines=12 | |
) | |
phonemes_output = gr.Textbox(label="Phoneme Representation", lines=10) | |
split_info_output = gr.Textbox(label="Processing Information", lines=5) | |
# Handle language change | |
language_input.change( | |
fn=on_language_change, | |
inputs=[language_input], | |
outputs=[status_message, loading_time_box], | |
) | |
# Handle split pattern change | |
split_pattern_dropdown.change( | |
fn=on_split_pattern_change, | |
inputs=[split_pattern_dropdown, custom_pattern_input], | |
outputs=[custom_pattern_input, custom_pattern_input], | |
) | |
# Preview splitting button | |
preview_button.click( | |
fn=preview_splits, | |
inputs=[text_input, custom_pattern_input], | |
outputs=[split_preview], | |
) | |
# Button click handler | |
# def on_generate(text, language_display, voice, split_pattern, speed): | |
# # Generate the audio | |
# audio_tuple, phonemes, split_info, timing_info = generate_audio( | |
# text, voice, split_pattern=split_pattern, speed=speed | |
# ) | |
# # Return results | |
# return audio_tuple, timing_info, phonemes, split_info | |
def on_generate(text, language_display, voice, split_pattern, speed): | |
# Generate the audio with output directory | |
audio_tuple, phonemes, split_info, timing_info = generate_audio( | |
text, | |
voice, | |
split_pattern=split_pattern, | |
speed=speed, | |
output_dir=AUDIO_DIR, # Add this parameter | |
) | |
# Return results | |
return audio_tuple, timing_info, phonemes, split_info | |
submit_button.click( | |
fn=on_generate, | |
inputs=[ | |
text_input, | |
language_input, | |
voice_input, | |
custom_pattern_input, | |
speed_input, | |
], | |
outputs=[ | |
audio_output, | |
audio_gen_timing_output, | |
phonemes_output, | |
split_info_output, | |
], | |
) | |
return ui | |
# Create and launch the app | |
ui = create_app() | |
ui.launch( | |
debug=True, | |
server_name="0.0.0.0", # Make accessible externally | |
server_port=7860, # Choose your port | |
share=True, # Set to True if you want a public link | |
) | |