ttsfm / ttsfm-web /app.py
NitinBot001's picture
Upload 26 files
f5ec497 verified
"""
TTSFM Web Application
A Flask web application that provides a user-friendly interface
for the TTSFM text-to-speech package.
"""
import os
import json
import logging
from datetime import datetime
from pathlib import Path
from typing import Dict, Any, Optional
from flask import Flask, request, jsonify, send_file, Response, render_template
from flask_cors import CORS
from dotenv import load_dotenv
# Import the TTSFM package
try:
from ttsfm import TTSClient, Voice, AudioFormat, TTSException
from ttsfm.exceptions import APIException, NetworkException, ValidationException
from ttsfm.utils import validate_text_length, split_text_by_length
except ImportError:
# Fallback for development when package is not installed
import sys
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
from ttsfm import TTSClient, Voice, AudioFormat, TTSException
from ttsfm.exceptions import APIException, NetworkException, ValidationException
from ttsfm.utils import validate_text_length, split_text_by_length
# Load environment variables
load_dotenv()
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Create Flask app
app = Flask(__name__, static_folder='static', static_url_path='/static')
CORS(app)
# Configuration
HOST = os.getenv("HOST", "localhost")
PORT = int(os.getenv("PORT", "8000"))
DEBUG = os.getenv("DEBUG", "false").lower() == "true"
# Create TTS client - now uses openai.fm directly, no configuration needed
tts_client = TTSClient()
logger.info("Initialized web app with TTSFM using openai.fm free service")
@app.route('/')
def index():
"""Serve the main web interface."""
return render_template('index.html')
@app.route('/playground')
def playground():
"""Serve the interactive playground."""
return render_template('playground.html')
@app.route('/docs')
def docs():
"""Serve the API documentation."""
return render_template('docs.html')
@app.route('/api/voices', methods=['GET'])
def get_voices():
"""Get list of available voices."""
try:
voices = [
{
"id": voice.value,
"name": voice.value.title(),
"description": f"{voice.value.title()} voice"
}
for voice in Voice
]
return jsonify({
"voices": voices,
"count": len(voices)
})
except Exception as e:
logger.error(f"Error getting voices: {e}")
return jsonify({"error": "Failed to get voices"}), 500
@app.route('/api/formats', methods=['GET'])
def get_formats():
"""Get list of supported audio formats."""
try:
formats = [
{
"id": "mp3",
"name": "MP3",
"mime_type": "audio/mpeg",
"description": "MP3 audio format - good quality, small file size",
"quality": "Good",
"file_size": "Small",
"use_case": "Web, mobile apps, general use"
},
{
"id": "opus",
"name": "OPUS",
"mime_type": "audio/opus",
"description": "OPUS audio format - excellent quality, small file size",
"quality": "Excellent",
"file_size": "Small",
"use_case": "Web streaming, VoIP"
},
{
"id": "aac",
"name": "AAC",
"mime_type": "audio/aac",
"description": "AAC audio format - good quality, medium file size",
"quality": "Good",
"file_size": "Medium",
"use_case": "Apple devices, streaming"
},
{
"id": "flac",
"name": "FLAC",
"mime_type": "audio/flac",
"description": "FLAC audio format - lossless quality, large file size",
"quality": "Lossless",
"file_size": "Large",
"use_case": "High-quality archival"
},
{
"id": "wav",
"name": "WAV",
"mime_type": "audio/wav",
"description": "WAV audio format - lossless quality, large file size",
"quality": "Lossless",
"file_size": "Large",
"use_case": "Professional audio"
},
{
"id": "pcm",
"name": "PCM",
"mime_type": "audio/pcm",
"description": "PCM audio format - raw audio data, large file size",
"quality": "Raw",
"file_size": "Large",
"use_case": "Audio processing"
}
]
return jsonify({
"formats": formats,
"count": len(formats)
})
except Exception as e:
logger.error(f"Error getting formats: {e}")
return jsonify({"error": "Failed to get formats"}), 500
@app.route('/api/validate-text', methods=['POST'])
def validate_text():
"""Validate text length and provide splitting suggestions."""
try:
data = request.get_json()
if not data:
return jsonify({"error": "No JSON data provided"}), 400
text = data.get('text', '').strip()
max_length = data.get('max_length', 4096)
if not text:
return jsonify({"error": "Text is required"}), 400
text_length = len(text)
is_valid = text_length <= max_length
result = {
"text_length": text_length,
"max_length": max_length,
"is_valid": is_valid,
"needs_splitting": not is_valid
}
if not is_valid:
# Provide splitting suggestions
chunks = split_text_by_length(text, max_length, preserve_words=True)
result.update({
"suggested_chunks": len(chunks),
"chunk_preview": [chunk[:100] + "..." if len(chunk) > 100 else chunk for chunk in chunks[:3]]
})
return jsonify(result)
except Exception as e:
logger.error(f"Text validation error: {e}")
return jsonify({"error": "Text validation failed"}), 500
@app.route('/api/generate', methods=['POST'])
def generate_speech():
"""Generate speech from text using the TTSFM package."""
try:
# Parse request data
data = request.get_json()
if not data:
return jsonify({"error": "No JSON data provided"}), 400
# Extract parameters
text = data.get('text', '').strip()
voice = data.get('voice', Voice.ALLOY.value)
response_format = data.get('format', AudioFormat.MP3.value)
instructions = data.get('instructions', '').strip() or None
max_length = data.get('max_length', 4096)
validate_length = data.get('validate_length', True)
# Validate required fields
if not text:
return jsonify({"error": "Text is required"}), 400
# Validate voice
try:
voice_enum = Voice(voice.lower())
except ValueError:
return jsonify({
"error": f"Invalid voice: {voice}. Must be one of: {[v.value for v in Voice]}"
}), 400
# Validate format
try:
format_enum = AudioFormat(response_format.lower())
except ValueError:
return jsonify({
"error": f"Invalid format: {response_format}. Must be one of: {[f.value for f in AudioFormat]}"
}), 400
logger.info(f"Generating speech: text='{text[:50]}...', voice={voice}, format={response_format}")
# Generate speech using the TTSFM package with validation
response = tts_client.generate_speech(
text=text,
voice=voice_enum,
response_format=format_enum,
instructions=instructions,
max_length=max_length,
validate_length=validate_length
)
# Return audio data
return Response(
response.audio_data,
mimetype=response.content_type,
headers={
'Content-Disposition': f'attachment; filename="speech.{response.format.value}"',
'Content-Length': str(response.size),
'X-Audio-Format': response.format.value,
'X-Audio-Size': str(response.size)
}
)
except ValidationException as e:
logger.warning(f"Validation error: {e}")
return jsonify({"error": str(e)}), 400
except APIException as e:
logger.error(f"API error: {e}")
return jsonify({
"error": str(e),
"status_code": getattr(e, 'status_code', 500)
}), getattr(e, 'status_code', 500)
except NetworkException as e:
logger.error(f"Network error: {e}")
return jsonify({
"error": "TTS service is currently unavailable",
"details": str(e)
}), 503
except TTSException as e:
logger.error(f"TTS error: {e}")
return jsonify({"error": str(e)}), 500
except Exception as e:
logger.error(f"Unexpected error: {e}")
return jsonify({"error": "Internal server error"}), 500
@app.route('/api/generate-batch', methods=['POST'])
def generate_speech_batch():
"""Generate speech from long text by splitting into chunks."""
try:
data = request.get_json()
if not data:
return jsonify({"error": "No JSON data provided"}), 400
text = data.get('text', '').strip()
voice = data.get('voice', Voice.ALLOY.value)
response_format = data.get('format', AudioFormat.MP3.value)
instructions = data.get('instructions', '').strip() or None
max_length = data.get('max_length', 4096)
preserve_words = data.get('preserve_words', True)
if not text:
return jsonify({"error": "Text is required"}), 400
# Validate voice and format
try:
voice_enum = Voice(voice.lower())
format_enum = AudioFormat(response_format.lower())
except ValueError as e:
return jsonify({"error": f"Invalid voice or format: {e}"}), 400
# Split text into chunks
chunks = split_text_by_length(text, max_length, preserve_words)
if not chunks:
return jsonify({"error": "No valid text chunks found"}), 400
logger.info(f"Processing {len(chunks)} chunks for batch generation")
# Generate speech for each chunk
results = []
for i, chunk in enumerate(chunks):
try:
response = tts_client.generate_speech(
text=chunk,
voice=voice_enum,
response_format=format_enum,
instructions=instructions,
max_length=max_length,
validate_length=False # Already split
)
# Convert to base64 for JSON response
import base64
audio_b64 = base64.b64encode(response.audio_data).decode('utf-8')
results.append({
"chunk_index": i + 1,
"chunk_text": chunk[:100] + "..." if len(chunk) > 100 else chunk,
"audio_data": audio_b64,
"content_type": response.content_type,
"size": response.size,
"format": response.format.value
})
except Exception as e:
logger.error(f"Failed to generate chunk {i+1}: {e}")
results.append({
"chunk_index": i + 1,
"chunk_text": chunk[:100] + "..." if len(chunk) > 100 else chunk,
"error": str(e)
})
return jsonify({
"total_chunks": len(chunks),
"successful_chunks": len([r for r in results if "audio_data" in r]),
"results": results
})
except Exception as e:
logger.error(f"Batch generation error: {e}")
return jsonify({"error": "Batch generation failed"}), 500
@app.route('/api/status', methods=['GET'])
def get_status():
"""Get service status."""
try:
# Try to make a simple request to check if the TTS service is available
test_response = tts_client.generate_speech(
text="test",
voice=Voice.ALLOY,
response_format=AudioFormat.MP3
)
return jsonify({
"status": "online",
"tts_service": "openai.fm (free)",
"package_version": "3.0.0",
"timestamp": datetime.now().isoformat()
})
except Exception as e:
logger.error(f"Status check failed: {e}")
return jsonify({
"status": "error",
"tts_service": "openai.fm (free)",
"error": str(e),
"timestamp": datetime.now().isoformat()
}), 503
@app.route('/api/health', methods=['GET'])
def health_check():
"""Simple health check endpoint."""
return jsonify({
"status": "healthy",
"timestamp": datetime.now().isoformat()
})
# OpenAI-compatible API endpoints
@app.route('/v1/audio/speech', methods=['POST'])
def openai_speech():
"""OpenAI-compatible speech generation endpoint."""
try:
# Parse request data
data = request.get_json()
if not data:
return jsonify({
"error": {
"message": "No JSON data provided",
"type": "invalid_request_error",
"code": "missing_data"
}
}), 400
# Extract OpenAI-compatible parameters
model = data.get('model', 'gpt-4o-mini-tts') # Accept but ignore model
input_text = data.get('input', '').strip()
voice = data.get('voice', 'alloy')
response_format = data.get('response_format', 'mp3')
instructions = data.get('instructions', '').strip() or None
speed = data.get('speed', 1.0) # Accept but ignore speed
# Validate required fields
if not input_text:
return jsonify({
"error": {
"message": "Input text is required",
"type": "invalid_request_error",
"code": "missing_input"
}
}), 400
# Validate voice
try:
voice_enum = Voice(voice.lower())
except ValueError:
return jsonify({
"error": {
"message": f"Invalid voice: {voice}. Must be one of: {[v.value for v in Voice]}",
"type": "invalid_request_error",
"code": "invalid_voice"
}
}), 400
# Validate format
try:
format_enum = AudioFormat(response_format.lower())
except ValueError:
return jsonify({
"error": {
"message": f"Invalid response_format: {response_format}. Must be one of: {[f.value for f in AudioFormat]}",
"type": "invalid_request_error",
"code": "invalid_format"
}
}), 400
logger.info(f"OpenAI API: Generating speech: text='{input_text[:50]}...', voice={voice}, format={response_format}")
# Generate speech using the TTSFM package
response = tts_client.generate_speech(
text=input_text,
voice=voice_enum,
response_format=format_enum,
instructions=instructions,
max_length=4096,
validate_length=True
)
# Return audio data in OpenAI format
return Response(
response.audio_data,
mimetype=response.content_type,
headers={
'Content-Type': response.content_type,
'Content-Length': str(response.size),
'X-Audio-Format': response.format.value,
'X-Audio-Size': str(response.size),
'X-Powered-By': 'TTSFM-OpenAI-Compatible'
}
)
except ValidationException as e:
logger.warning(f"OpenAI API validation error: {e}")
return jsonify({
"error": {
"message": str(e),
"type": "invalid_request_error",
"code": "validation_error"
}
}), 400
except APIException as e:
logger.error(f"OpenAI API error: {e}")
return jsonify({
"error": {
"message": str(e),
"type": "api_error",
"code": "tts_error"
}
}), getattr(e, 'status_code', 500)
except NetworkException as e:
logger.error(f"OpenAI API network error: {e}")
return jsonify({
"error": {
"message": "TTS service is currently unavailable",
"type": "service_unavailable_error",
"code": "service_unavailable"
}
}), 503
except Exception as e:
logger.error(f"OpenAI API unexpected error: {e}")
return jsonify({
"error": {
"message": "An unexpected error occurred",
"type": "internal_error",
"code": "internal_error"
}
}), 500
@app.route('/v1/models', methods=['GET'])
def openai_models():
"""OpenAI-compatible models endpoint."""
return jsonify({
"object": "list",
"data": [
{
"id": "gpt-4o-mini-tts",
"object": "model",
"created": 1699564800,
"owned_by": "ttsfm",
"permission": [],
"root": "gpt-4o-mini-tts",
"parent": None
}
]
})
@app.errorhandler(404)
def not_found(error):
"""Handle 404 errors."""
return jsonify({"error": "Endpoint not found"}), 404
@app.errorhandler(405)
def method_not_allowed(error):
"""Handle 405 errors."""
return jsonify({"error": "Method not allowed"}), 405
@app.errorhandler(500)
def internal_error(error):
"""Handle 500 errors."""
logger.error(f"Internal server error: {error}")
return jsonify({"error": "Internal server error"}), 500
if __name__ == '__main__':
logger.info(f"Starting TTSFM web application on {HOST}:{PORT}")
logger.info("Using openai.fm free TTS service")
logger.info(f"Debug mode: {DEBUG}")
try:
app.run(
host=HOST,
port=PORT,
debug=DEBUG
)
except KeyboardInterrupt:
logger.info("Application stopped by user")
except Exception as e:
logger.error(f"Failed to start application: {e}")
finally:
# Clean up TTS client
tts_client.close()