Spaces:
Running
Running
""" | |
TTSFM Web Application | |
A Flask web application that provides a user-friendly interface | |
for the TTSFM text-to-speech package. | |
""" | |
import os | |
import json | |
import logging | |
from datetime import datetime | |
from pathlib import Path | |
from typing import Dict, Any, Optional | |
from flask import Flask, request, jsonify, send_file, Response, render_template | |
from flask_cors import CORS | |
from dotenv import load_dotenv | |
# Import the TTSFM package | |
try: | |
from ttsfm import TTSClient, Voice, AudioFormat, TTSException | |
from ttsfm.exceptions import APIException, NetworkException, ValidationException | |
from ttsfm.utils import validate_text_length, split_text_by_length | |
except ImportError: | |
# Fallback for development when package is not installed | |
import sys | |
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) | |
from ttsfm import TTSClient, Voice, AudioFormat, TTSException | |
from ttsfm.exceptions import APIException, NetworkException, ValidationException | |
from ttsfm.utils import validate_text_length, split_text_by_length | |
# Load environment variables | |
load_dotenv() | |
# Configure logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
) | |
logger = logging.getLogger(__name__) | |
# Create Flask app | |
app = Flask(__name__, static_folder='static', static_url_path='/static') | |
CORS(app) | |
# Configuration | |
HOST = os.getenv("HOST", "localhost") | |
PORT = int(os.getenv("PORT", "8000")) | |
DEBUG = os.getenv("DEBUG", "false").lower() == "true" | |
# Create TTS client - now uses openai.fm directly, no configuration needed | |
tts_client = TTSClient() | |
logger.info("Initialized web app with TTSFM using openai.fm free service") | |
def index(): | |
"""Serve the main web interface.""" | |
return render_template('index.html') | |
def playground(): | |
"""Serve the interactive playground.""" | |
return render_template('playground.html') | |
def docs(): | |
"""Serve the API documentation.""" | |
return render_template('docs.html') | |
def get_voices(): | |
"""Get list of available voices.""" | |
try: | |
voices = [ | |
{ | |
"id": voice.value, | |
"name": voice.value.title(), | |
"description": f"{voice.value.title()} voice" | |
} | |
for voice in Voice | |
] | |
return jsonify({ | |
"voices": voices, | |
"count": len(voices) | |
}) | |
except Exception as e: | |
logger.error(f"Error getting voices: {e}") | |
return jsonify({"error": "Failed to get voices"}), 500 | |
def get_formats(): | |
"""Get list of supported audio formats.""" | |
try: | |
formats = [ | |
{ | |
"id": "mp3", | |
"name": "MP3", | |
"mime_type": "audio/mpeg", | |
"description": "MP3 audio format - good quality, small file size", | |
"quality": "Good", | |
"file_size": "Small", | |
"use_case": "Web, mobile apps, general use" | |
}, | |
{ | |
"id": "opus", | |
"name": "OPUS", | |
"mime_type": "audio/opus", | |
"description": "OPUS audio format - excellent quality, small file size", | |
"quality": "Excellent", | |
"file_size": "Small", | |
"use_case": "Web streaming, VoIP" | |
}, | |
{ | |
"id": "aac", | |
"name": "AAC", | |
"mime_type": "audio/aac", | |
"description": "AAC audio format - good quality, medium file size", | |
"quality": "Good", | |
"file_size": "Medium", | |
"use_case": "Apple devices, streaming" | |
}, | |
{ | |
"id": "flac", | |
"name": "FLAC", | |
"mime_type": "audio/flac", | |
"description": "FLAC audio format - lossless quality, large file size", | |
"quality": "Lossless", | |
"file_size": "Large", | |
"use_case": "High-quality archival" | |
}, | |
{ | |
"id": "wav", | |
"name": "WAV", | |
"mime_type": "audio/wav", | |
"description": "WAV audio format - lossless quality, large file size", | |
"quality": "Lossless", | |
"file_size": "Large", | |
"use_case": "Professional audio" | |
}, | |
{ | |
"id": "pcm", | |
"name": "PCM", | |
"mime_type": "audio/pcm", | |
"description": "PCM audio format - raw audio data, large file size", | |
"quality": "Raw", | |
"file_size": "Large", | |
"use_case": "Audio processing" | |
} | |
] | |
return jsonify({ | |
"formats": formats, | |
"count": len(formats) | |
}) | |
except Exception as e: | |
logger.error(f"Error getting formats: {e}") | |
return jsonify({"error": "Failed to get formats"}), 500 | |
def validate_text(): | |
"""Validate text length and provide splitting suggestions.""" | |
try: | |
data = request.get_json() | |
if not data: | |
return jsonify({"error": "No JSON data provided"}), 400 | |
text = data.get('text', '').strip() | |
max_length = data.get('max_length', 4096) | |
if not text: | |
return jsonify({"error": "Text is required"}), 400 | |
text_length = len(text) | |
is_valid = text_length <= max_length | |
result = { | |
"text_length": text_length, | |
"max_length": max_length, | |
"is_valid": is_valid, | |
"needs_splitting": not is_valid | |
} | |
if not is_valid: | |
# Provide splitting suggestions | |
chunks = split_text_by_length(text, max_length, preserve_words=True) | |
result.update({ | |
"suggested_chunks": len(chunks), | |
"chunk_preview": [chunk[:100] + "..." if len(chunk) > 100 else chunk for chunk in chunks[:3]] | |
}) | |
return jsonify(result) | |
except Exception as e: | |
logger.error(f"Text validation error: {e}") | |
return jsonify({"error": "Text validation failed"}), 500 | |
def generate_speech(): | |
"""Generate speech from text using the TTSFM package.""" | |
try: | |
# Parse request data | |
data = request.get_json() | |
if not data: | |
return jsonify({"error": "No JSON data provided"}), 400 | |
# Extract parameters | |
text = data.get('text', '').strip() | |
voice = data.get('voice', Voice.ALLOY.value) | |
response_format = data.get('format', AudioFormat.MP3.value) | |
instructions = data.get('instructions', '').strip() or None | |
max_length = data.get('max_length', 4096) | |
validate_length = data.get('validate_length', True) | |
# Validate required fields | |
if not text: | |
return jsonify({"error": "Text is required"}), 400 | |
# Validate voice | |
try: | |
voice_enum = Voice(voice.lower()) | |
except ValueError: | |
return jsonify({ | |
"error": f"Invalid voice: {voice}. Must be one of: {[v.value for v in Voice]}" | |
}), 400 | |
# Validate format | |
try: | |
format_enum = AudioFormat(response_format.lower()) | |
except ValueError: | |
return jsonify({ | |
"error": f"Invalid format: {response_format}. Must be one of: {[f.value for f in AudioFormat]}" | |
}), 400 | |
logger.info(f"Generating speech: text='{text[:50]}...', voice={voice}, format={response_format}") | |
# Generate speech using the TTSFM package with validation | |
response = tts_client.generate_speech( | |
text=text, | |
voice=voice_enum, | |
response_format=format_enum, | |
instructions=instructions, | |
max_length=max_length, | |
validate_length=validate_length | |
) | |
# Return audio data | |
return Response( | |
response.audio_data, | |
mimetype=response.content_type, | |
headers={ | |
'Content-Disposition': f'attachment; filename="speech.{response.format.value}"', | |
'Content-Length': str(response.size), | |
'X-Audio-Format': response.format.value, | |
'X-Audio-Size': str(response.size) | |
} | |
) | |
except ValidationException as e: | |
logger.warning(f"Validation error: {e}") | |
return jsonify({"error": str(e)}), 400 | |
except APIException as e: | |
logger.error(f"API error: {e}") | |
return jsonify({ | |
"error": str(e), | |
"status_code": getattr(e, 'status_code', 500) | |
}), getattr(e, 'status_code', 500) | |
except NetworkException as e: | |
logger.error(f"Network error: {e}") | |
return jsonify({ | |
"error": "TTS service is currently unavailable", | |
"details": str(e) | |
}), 503 | |
except TTSException as e: | |
logger.error(f"TTS error: {e}") | |
return jsonify({"error": str(e)}), 500 | |
except Exception as e: | |
logger.error(f"Unexpected error: {e}") | |
return jsonify({"error": "Internal server error"}), 500 | |
def generate_speech_batch(): | |
"""Generate speech from long text by splitting into chunks.""" | |
try: | |
data = request.get_json() | |
if not data: | |
return jsonify({"error": "No JSON data provided"}), 400 | |
text = data.get('text', '').strip() | |
voice = data.get('voice', Voice.ALLOY.value) | |
response_format = data.get('format', AudioFormat.MP3.value) | |
instructions = data.get('instructions', '').strip() or None | |
max_length = data.get('max_length', 4096) | |
preserve_words = data.get('preserve_words', True) | |
if not text: | |
return jsonify({"error": "Text is required"}), 400 | |
# Validate voice and format | |
try: | |
voice_enum = Voice(voice.lower()) | |
format_enum = AudioFormat(response_format.lower()) | |
except ValueError as e: | |
return jsonify({"error": f"Invalid voice or format: {e}"}), 400 | |
# Split text into chunks | |
chunks = split_text_by_length(text, max_length, preserve_words) | |
if not chunks: | |
return jsonify({"error": "No valid text chunks found"}), 400 | |
logger.info(f"Processing {len(chunks)} chunks for batch generation") | |
# Generate speech for each chunk | |
results = [] | |
for i, chunk in enumerate(chunks): | |
try: | |
response = tts_client.generate_speech( | |
text=chunk, | |
voice=voice_enum, | |
response_format=format_enum, | |
instructions=instructions, | |
max_length=max_length, | |
validate_length=False # Already split | |
) | |
# Convert to base64 for JSON response | |
import base64 | |
audio_b64 = base64.b64encode(response.audio_data).decode('utf-8') | |
results.append({ | |
"chunk_index": i + 1, | |
"chunk_text": chunk[:100] + "..." if len(chunk) > 100 else chunk, | |
"audio_data": audio_b64, | |
"content_type": response.content_type, | |
"size": response.size, | |
"format": response.format.value | |
}) | |
except Exception as e: | |
logger.error(f"Failed to generate chunk {i+1}: {e}") | |
results.append({ | |
"chunk_index": i + 1, | |
"chunk_text": chunk[:100] + "..." if len(chunk) > 100 else chunk, | |
"error": str(e) | |
}) | |
return jsonify({ | |
"total_chunks": len(chunks), | |
"successful_chunks": len([r for r in results if "audio_data" in r]), | |
"results": results | |
}) | |
except Exception as e: | |
logger.error(f"Batch generation error: {e}") | |
return jsonify({"error": "Batch generation failed"}), 500 | |
def get_status(): | |
"""Get service status.""" | |
try: | |
# Try to make a simple request to check if the TTS service is available | |
test_response = tts_client.generate_speech( | |
text="test", | |
voice=Voice.ALLOY, | |
response_format=AudioFormat.MP3 | |
) | |
return jsonify({ | |
"status": "online", | |
"tts_service": "openai.fm (free)", | |
"package_version": "3.0.0", | |
"timestamp": datetime.now().isoformat() | |
}) | |
except Exception as e: | |
logger.error(f"Status check failed: {e}") | |
return jsonify({ | |
"status": "error", | |
"tts_service": "openai.fm (free)", | |
"error": str(e), | |
"timestamp": datetime.now().isoformat() | |
}), 503 | |
def health_check(): | |
"""Simple health check endpoint.""" | |
return jsonify({ | |
"status": "healthy", | |
"timestamp": datetime.now().isoformat() | |
}) | |
# OpenAI-compatible API endpoints | |
def openai_speech(): | |
"""OpenAI-compatible speech generation endpoint.""" | |
try: | |
# Parse request data | |
data = request.get_json() | |
if not data: | |
return jsonify({ | |
"error": { | |
"message": "No JSON data provided", | |
"type": "invalid_request_error", | |
"code": "missing_data" | |
} | |
}), 400 | |
# Extract OpenAI-compatible parameters | |
model = data.get('model', 'gpt-4o-mini-tts') # Accept but ignore model | |
input_text = data.get('input', '').strip() | |
voice = data.get('voice', 'alloy') | |
response_format = data.get('response_format', 'mp3') | |
instructions = data.get('instructions', '').strip() or None | |
speed = data.get('speed', 1.0) # Accept but ignore speed | |
# Validate required fields | |
if not input_text: | |
return jsonify({ | |
"error": { | |
"message": "Input text is required", | |
"type": "invalid_request_error", | |
"code": "missing_input" | |
} | |
}), 400 | |
# Validate voice | |
try: | |
voice_enum = Voice(voice.lower()) | |
except ValueError: | |
return jsonify({ | |
"error": { | |
"message": f"Invalid voice: {voice}. Must be one of: {[v.value for v in Voice]}", | |
"type": "invalid_request_error", | |
"code": "invalid_voice" | |
} | |
}), 400 | |
# Validate format | |
try: | |
format_enum = AudioFormat(response_format.lower()) | |
except ValueError: | |
return jsonify({ | |
"error": { | |
"message": f"Invalid response_format: {response_format}. Must be one of: {[f.value for f in AudioFormat]}", | |
"type": "invalid_request_error", | |
"code": "invalid_format" | |
} | |
}), 400 | |
logger.info(f"OpenAI API: Generating speech: text='{input_text[:50]}...', voice={voice}, format={response_format}") | |
# Generate speech using the TTSFM package | |
response = tts_client.generate_speech( | |
text=input_text, | |
voice=voice_enum, | |
response_format=format_enum, | |
instructions=instructions, | |
max_length=4096, | |
validate_length=True | |
) | |
# Return audio data in OpenAI format | |
return Response( | |
response.audio_data, | |
mimetype=response.content_type, | |
headers={ | |
'Content-Type': response.content_type, | |
'Content-Length': str(response.size), | |
'X-Audio-Format': response.format.value, | |
'X-Audio-Size': str(response.size), | |
'X-Powered-By': 'TTSFM-OpenAI-Compatible' | |
} | |
) | |
except ValidationException as e: | |
logger.warning(f"OpenAI API validation error: {e}") | |
return jsonify({ | |
"error": { | |
"message": str(e), | |
"type": "invalid_request_error", | |
"code": "validation_error" | |
} | |
}), 400 | |
except APIException as e: | |
logger.error(f"OpenAI API error: {e}") | |
return jsonify({ | |
"error": { | |
"message": str(e), | |
"type": "api_error", | |
"code": "tts_error" | |
} | |
}), getattr(e, 'status_code', 500) | |
except NetworkException as e: | |
logger.error(f"OpenAI API network error: {e}") | |
return jsonify({ | |
"error": { | |
"message": "TTS service is currently unavailable", | |
"type": "service_unavailable_error", | |
"code": "service_unavailable" | |
} | |
}), 503 | |
except Exception as e: | |
logger.error(f"OpenAI API unexpected error: {e}") | |
return jsonify({ | |
"error": { | |
"message": "An unexpected error occurred", | |
"type": "internal_error", | |
"code": "internal_error" | |
} | |
}), 500 | |
def openai_models(): | |
"""OpenAI-compatible models endpoint.""" | |
return jsonify({ | |
"object": "list", | |
"data": [ | |
{ | |
"id": "gpt-4o-mini-tts", | |
"object": "model", | |
"created": 1699564800, | |
"owned_by": "ttsfm", | |
"permission": [], | |
"root": "gpt-4o-mini-tts", | |
"parent": None | |
} | |
] | |
}) | |
def not_found(error): | |
"""Handle 404 errors.""" | |
return jsonify({"error": "Endpoint not found"}), 404 | |
def method_not_allowed(error): | |
"""Handle 405 errors.""" | |
return jsonify({"error": "Method not allowed"}), 405 | |
def internal_error(error): | |
"""Handle 500 errors.""" | |
logger.error(f"Internal server error: {error}") | |
return jsonify({"error": "Internal server error"}), 500 | |
if __name__ == '__main__': | |
logger.info(f"Starting TTSFM web application on {HOST}:{PORT}") | |
logger.info("Using openai.fm free TTS service") | |
logger.info(f"Debug mode: {DEBUG}") | |
try: | |
app.run( | |
host=HOST, | |
port=PORT, | |
debug=DEBUG | |
) | |
except KeyboardInterrupt: | |
logger.info("Application stopped by user") | |
except Exception as e: | |
logger.error(f"Failed to start application: {e}") | |
finally: | |
# Clean up TTS client | |
tts_client.close() | |