Cross‑lingual voice cloning powered by the Coqui TTS XTTS v2 model. Provide a few seconds of a reference voice, choose a language, and synthesize any text in that cloned voice.
Upload a short reference clip (WAV/MP3/M4A/FLAC/OGG/OPUS)
Select target language
Type the text you want the cloned voice to speak
Note: First run may take longer while the model downloads and loads. A loading indicator will be shown.
Before you start
This demo runs the XTTS model locally. The first request may take a little longer while the model loads. Subsequent runs will be faster. Thanks for your patience.
Your reference audio stays on this machine. The generated audio will appear when processing completes.
Cloning in progress
Preparing
Validating inputs
Uploading reference
Sending audio to server
Waiting for server
Request queued
Loading model
First run can be slow
Generating audio
Synthesizing speech
Finalizing
Preparing playback
'''
RECORD_HTML = r'''
Record Your Voice • XTTS Demo
Try your own voice by recording a short, clear clip. Then choose a language and synthesize any text in your cloned voice.
Idle
00:00
Use a quiet environment and speak naturally for 5–10 seconds.
Before you start
This demo runs the XTTS model locally. The first request may take a little longer while the model loads. Subsequent runs will be faster. Thanks for your patience.
Your voice recording stays on this machine. The generated audio will appear when processing completes.
Cloning in progress
Preparing
Validating inputs
Uploading reference
Sending audio to server
Waiting for server
Request queued
Loading model
First run can be slow
Generating audio
Synthesizing speech
Finalizing
Preparing playback
'''
@app.route("/record")
def record():
return render_template_string(RECORD_HTML)
@app.route("/")
def index():
return render_template_string(INDEX_HTML)
@app.route("/outputs/")
def serve_output(filename: str):
return send_from_directory(OUTPUT_DIR, filename, as_attachment=False)
# ---------------- Progress tracking and async job execution ---------------- #
JOBS = {}
JOBS_LOCK = threading.Lock()
STEPS_TEMPLATE = [
{"label": "Preparing", "sub": "Validating inputs", "status": "pending"},
{"label": "Uploading reference", "sub": "Saving audio", "status": "pending"},
{"label": "Waiting for server", "sub": "Queued", "status": "pending"},
{"label": "Loading model", "sub": "First run may be slow", "status": "pending"},
{"label": "Generating audio", "sub": "Synthesizing speech", "status": "pending"},
{"label": "Finalizing", "sub": "Preparing playback", "status": "pending"},
]
def _new_job() -> dict:
return {
"status": "pending",
"steps": [dict(label=s["label"], sub=s["sub"], status="pending") for s in STEPS_TEMPLATE],
"error": None,
"audio_url": None,
"created": time.time(),
}
# Cleanup policy for job registry
JOB_TTL_SECONDS = 3600 # 1 hour
MAX_JOBS = 500
def _cleanup_jobs() -> None:
now = time.time()
with JOBS_LOCK:
# Remove jobs older than TTL
to_delete = [jid for jid, job in JOBS.items() if now - job.get("created", now) > JOB_TTL_SECONDS]
# If too many jobs, remove oldest finished (done/error)
if len(JOBS) > MAX_JOBS:
finished = [jid for jid, job in JOBS.items() if job.get("status") in ("done", "error")]
finished.sort(key=lambda j: JOBS[j].get("created", 0))
overflow = max(0, len(JOBS) - MAX_JOBS)
to_delete.extend(finished[:overflow])
for jid in set(to_delete):
JOBS.pop(jid, None)
def _set_step(job_id: str, idx: int, status: str, sub: str | None = None) -> None:
with JOBS_LOCK:
job = JOBS.get(job_id)
if not job:
return
st = job["steps"][idx]
st["status"] = status
if sub is not None:
st["sub"] = sub
def _set_job_status(job_id: str, status: str) -> None:
with JOBS_LOCK:
job = JOBS.get(job_id)
if job:
job["status"] = status
def _set_job_error(job_id: str, msg: str) -> None:
with JOBS_LOCK:
job = JOBS.get(job_id)
if job:
job["status"] = "error"
job["error"] = msg
def _set_job_audio(job_id: str, audio_url: str) -> None:
with JOBS_LOCK:
job = JOBS.get(job_id)
if job:
job["audio_url"] = audio_url
def _run_job(job_id: str, *, text: str, language: str, device: str | None, input_path: str, output_name: str, output_path: str) -> None:
current_step = -1
try:
_set_job_status(job_id, "running")
# Step 0: Preparing
current_step = 0
_set_step(job_id, 0, "active")
_set_step(job_id, 0, "done")
# Step 1: Uploading reference (already saved by start endpoint)
current_step = 1
_set_step(job_id, 1, "active")
_set_step(job_id, 1, "done")
# Step 2: Waiting for server (queue)
current_step = 2
_set_step(job_id, 2, "active")
_set_step(job_id, 2, "done")
# Step 3: Loading model
current_step = 3
if not is_model_loaded(device):
_set_step(job_id, 3, "active")
warm_model(device)
_set_step(job_id, 3, "done")
else:
_set_step(job_id, 3, "done", sub="Model already in memory")
# Step 4: Generating audio
current_step = 4
_set_step(job_id, 4, "active", sub="Synthesizing speech")
ref_path = input_path
if _should_convert_to_wav(input_path):
if _ffmpeg_path():
_set_step(job_id, 4, "active", sub="Converting reference audio")
ref_path = _convert_to_wav(input_path)
_set_step(job_id, 4, "active", sub="Synthesizing speech")
else:
raise RuntimeError("Reference format not supported by backend. Please install ffmpeg or upload WAV/OGG/OPUS/MP3/M4A.")
do_clone(text=text, speaker_wav=ref_path, language=language, output=output_path, device=device)
_set_step(job_id, 4, "done")
# Step 5: Finalizing
current_step = 5
_set_step(job_id, 5, "active")
# Avoid url_for in background thread (no app context). Use relative path.
audio_url = f"/outputs/{output_name}"
_set_job_audio(job_id, audio_url)
_set_step(job_id, 5, "done")
_set_job_status(job_id, "done")
except Exception as e:
failed_step = current_step if current_step >= 0 else 0
_set_step(job_id, failed_step, "error")
_set_job_error(job_id, str(e))
@app.route("/api/clone_start", methods=["POST"])
def api_clone_start():
_cleanup_jobs()
text = (request.form.get("text") or "").strip()
language = (request.form.get("language") or "en").strip()
device = (request.form.get("device") or None)
file = request.files.get("reference")
if not text:
return jsonify({"success": False, "error": "Text is required."}), 400
if not file or file.filename == "":
return jsonify({"success": False, "error": "Reference audio file is required."}), 400
if not allowed_file(file.filename):
return jsonify({"success": False, "error": "Unsupported file type. Use wav, mp3, m4a, flac, ogg, or opus."}), 400
filename = secure_filename(file.filename)
ts = int(time.time() * 1000)
input_path = os.path.join(UPLOAD_DIR, f"{ts}_{filename}")
output_name = f"clone_{ts}.wav"
output_path = os.path.join(OUTPUT_DIR, output_name)
# Save upload before returning job id
file.save(input_path)
job_id = uuid.uuid4().hex
with JOBS_LOCK:
JOBS[job_id] = _new_job()
threading.Thread(
target=_run_job,
kwargs={
"job_id": job_id,
"text": text,
"language": language,
"device": device,
"input_path": input_path,
"output_name": output_name,
"output_path": output_path,
},
daemon=True,
).start()
return jsonify({"success": True, "job_id": job_id})
@app.route("/api/clone_status/", methods=["GET"])
def api_clone_status(job_id: str):
_cleanup_jobs()
with JOBS_LOCK:
job = JOBS.get(job_id)
if not job:
return jsonify({"success": False, "error": "Invalid job id"}), 404
return jsonify({"success": True, "status": job["status"], "steps": job["steps"], "error": job["error"], "audio_url": job["audio_url"]})
@app.route("/api/clone", methods=["POST"])
def api_clone():
text = (request.form.get("text") or "").strip()
language = (request.form.get("language") or "en").strip()
device = (request.form.get("device") or None)
file = request.files.get("reference")
if not text:
return jsonify({"success": False, "error": "Text is required."}), 400
if not file or file.filename == "":
return jsonify({"success": False, "error": "Reference audio file is required."}), 400
if not allowed_file(file.filename):
return jsonify({"success": False, "error": "Unsupported file type. Use wav, mp3, m4a, flac, ogg, or opus."}), 400
filename = secure_filename(file.filename)
ts = int(time.time() * 1000)
input_path = os.path.join(UPLOAD_DIR, f"{ts}_{filename}")
output_name = f"clone_{ts}.wav"
output_path = os.path.join(OUTPUT_DIR, output_name)
file.save(input_path)
# Convert to WAV if necessary (for formats like WEBM/M4A)
ref_path = input_path
if _should_convert_to_wav(input_path):
if _ffmpeg_path():
try:
ref_path = _convert_to_wav(input_path)
except Exception as e:
return jsonify({"success": False, "error": str(e)}), 400
else:
return jsonify({"success": False, "error": "Reference format not supported by backend. Install ffmpeg or upload WAV/OGG/OPUS/MP3/M4A."}), 400
try:
# Perform cloning
do_clone(text=text, speaker_wav=ref_path, language=language, output=output_path, device=device)
except Exception as e:
return jsonify({"success": False, "error": str(e)}), 500
audio_url = url_for("serve_output", filename=output_name)
return jsonify({"success": True, "audio_url": audio_url})
if __name__ == "__main__":
# For local development
app.run(host="127.0.0.1", port=5000, debug=True, use_reloader=False)