Spaces:
Paused
Paused
| """Advanced voice memory system for consistent voice generation.""" | |
| import os | |
| import torch | |
| import torchaudio | |
| import numpy as np | |
| import random | |
| import logging | |
| from typing import Dict, List, Optional | |
| from dataclasses import dataclass | |
| from app.models import Segment | |
| # Setup logging | |
| logger = logging.getLogger(__name__) | |
| # Path to store voice memories - use persistent location | |
| VOICE_MEMORIES_DIR = "/app/voice_memories" | |
| os.makedirs(VOICE_MEMORIES_DIR, exist_ok=True) | |
| class VoiceMemory: | |
| """Store voice characteristics for consistent generation.""" | |
| name: str # Voice name (alloy, echo, etc.) | |
| speaker_id: int # Speaker ID (0-5) | |
| # Store multiple audio segments for context | |
| audio_segments: List[torch.Tensor] | |
| # Store text prompts that produced good results | |
| text_segments: List[str] | |
| # Base characteristics for this voice | |
| pitch_base: float # Base pitch characteristic (Hz) | |
| timbre: str # Voice quality descriptor | |
| def get_context_segments(self, device: torch.device, max_segments: int = 2) -> List[Segment]: | |
| """Get context segments for this voice.""" | |
| if not self.audio_segments: | |
| return [] | |
| # Select a limited number of segments to avoid context overflow | |
| num_segments = min(len(self.audio_segments), max_segments) | |
| indices = list(range(len(self.audio_segments))) | |
| random.shuffle(indices) | |
| selected_indices = indices[:num_segments] | |
| segments = [] | |
| for i in selected_indices: | |
| segments.append( | |
| Segment( | |
| speaker=self.speaker_id, | |
| text=self.text_segments[i] if i < len(self.text_segments) else f"Voice sample {i}", | |
| audio=self.audio_segments[i].to(device) | |
| ) | |
| ) | |
| return segments | |
| def update_with_new_audio(self, audio: torch.Tensor, text: str, max_stored: int = 5): | |
| """Update voice memory with newly generated audio.""" | |
| # Add new audio and text | |
| self.audio_segments.append(audio.detach().cpu()) | |
| self.text_segments.append(text) | |
| # Keep only the most recent segments | |
| if len(self.audio_segments) > max_stored: | |
| self.audio_segments = self.audio_segments[-max_stored:] | |
| self.text_segments = self.text_segments[-max_stored:] | |
| def save(self): | |
| """Save voice memory to persistent storage.""" | |
| data = { | |
| "name": self.name, | |
| "speaker_id": self.speaker_id, | |
| "audio_segments": self.audio_segments, | |
| "text_segments": self.text_segments, | |
| "pitch_base": self.pitch_base, | |
| "timbre": self.timbre | |
| } | |
| # Save to the persistent directory | |
| save_path = os.path.join(VOICE_MEMORIES_DIR, f"{self.name}.pt") | |
| try: | |
| torch.save(data, save_path) | |
| logger.info(f"Saved voice memory for {self.name} to {save_path}") | |
| except Exception as e: | |
| logger.error(f"Error saving voice memory for {self.name}: {e}") | |
| def load(cls, name: str) -> Optional['VoiceMemory']: | |
| """Load voice memory from persistent storage.""" | |
| path = os.path.join(VOICE_MEMORIES_DIR, f"{name}.pt") | |
| if not os.path.exists(path): | |
| logger.info(f"No saved voice memory found for {name} at {path}") | |
| return None | |
| try: | |
| data = torch.load(path) | |
| return cls( | |
| name=data["name"], | |
| speaker_id=data["speaker_id"], | |
| audio_segments=data["audio_segments"], | |
| text_segments=data["text_segments"], | |
| pitch_base=data["pitch_base"], | |
| timbre=data["timbre"] | |
| ) | |
| except Exception as e: | |
| logger.error(f"Error loading voice memory for {name}: {e}") | |
| return None | |
| # Dictionary of voice memories | |
| VOICE_MEMORIES: Dict[str, VoiceMemory] = {} | |
| # Voice characteristics | |
| VOICE_CHARACTERISTICS = { | |
| "alloy": {"pitch": 220.0, "timbre": "balanced", "description": "A balanced, natural voice with medium pitch"}, | |
| "echo": {"pitch": 330.0, "timbre": "resonant", "description": "A resonant voice with a reverberant quality"}, | |
| "fable": {"pitch": 523.0, "timbre": "bright", "description": "A bright, higher-pitched voice with clear articulation"}, | |
| "onyx": {"pitch": 165.0, "timbre": "deep", "description": "A deep, authoritative voice with lower pitch"}, | |
| "nova": {"pitch": 392.0, "timbre": "warm", "description": "A warm, smooth voice with pleasant midrange tone"}, | |
| "shimmer": {"pitch": 587.0, "timbre": "light", "description": "A light, airy voice with higher frequencies"} | |
| } | |
| # Voice intro texts - carefully crafted to capture voice characteristics | |
| VOICE_INTROS = { | |
| "alloy": [ | |
| "Hello, I'm Alloy. My voice is designed to be clear and balanced.", | |
| "This is the Alloy voice. I aim to sound natural and easy to understand.", | |
| "Welcome, I'm the voice known as Alloy. I have a balanced, medium-range tone." | |
| ], | |
| "echo": [ | |
| "Hello, I'm Echo. My voice has a rich, resonant quality.", | |
| "This is the Echo voice. Notice my distinctive resonance and depth.", | |
| "Welcome, I'm the voice known as Echo. My tone is designed to resonate clearly." | |
| ], | |
| "fable": [ | |
| "Hello, I'm Fable. My voice is bright and articulate.", | |
| "This is the Fable voice. I have a higher pitch with clear pronunciation.", | |
| "Welcome, I'm the voice known as Fable. I speak with a bright, energetic tone." | |
| ], | |
| "onyx": [ | |
| "Hello, I'm Onyx. My voice is deep and authoritative.", | |
| "This is the Onyx voice. I speak with a lower pitch and commanding presence.", | |
| "Welcome, I'm the voice known as Onyx. My tone is deep and resonant." | |
| ], | |
| "nova": [ | |
| "Hello, I'm Nova. My voice is warm and harmonious.", | |
| "This is the Nova voice. I have a smooth, pleasant mid-range quality.", | |
| "Welcome, I'm the voice known as Nova. I speak with a warm, friendly tone." | |
| ], | |
| "shimmer": [ | |
| "Hello, I'm Shimmer. My voice is light and expressive.", | |
| "This is the Shimmer voice. I have a higher-pitched, airy quality.", | |
| "Welcome, I'm the voice known as Shimmer. My tone is bright and crisp." | |
| ] | |
| } | |
| def initialize_voices(sample_rate: int = 24000): | |
| """Initialize voice memories with consistent base samples.""" | |
| global VOICE_MEMORIES | |
| # Check if persistent directory exists, create if needed | |
| os.makedirs(VOICE_MEMORIES_DIR, exist_ok=True) | |
| logger.info(f"Using voice memories directory: {VOICE_MEMORIES_DIR}") | |
| # First try to load existing memories from persistent storage | |
| for voice_name in ["alloy", "echo", "fable", "onyx", "nova", "shimmer"]: | |
| memory = VoiceMemory.load(voice_name) | |
| if memory: | |
| VOICE_MEMORIES[voice_name] = memory | |
| logger.info(f"Loaded existing voice memory for {voice_name} with {len(memory.audio_segments)} segments") | |
| continue | |
| # If no memory exists, create a new one | |
| speaker_id = ["alloy", "echo", "fable", "onyx", "nova", "shimmer"].index(voice_name) | |
| characteristics = VOICE_CHARACTERISTICS[voice_name] | |
| # Create deterministic seed audio | |
| np.random.seed(speaker_id + 42) | |
| duration = 1.0 # seconds | |
| t = np.linspace(0, duration, int(sample_rate * duration), endpoint=False) | |
| # Create characteristic waveform | |
| pitch = characteristics["pitch"] | |
| if voice_name == "alloy": | |
| audio = 0.5 * np.sin(2 * np.pi * pitch * t) + 0.3 * np.sin(2 * np.pi * pitch * 2 * t) | |
| elif voice_name == "echo": | |
| audio = np.sin(2 * np.pi * pitch * t) * np.exp(-t * 3) | |
| elif voice_name == "fable": | |
| audio = 0.7 * np.sin(2 * np.pi * pitch * t) | |
| elif voice_name == "onyx": | |
| audio = 0.8 * np.sin(2 * np.pi * pitch * t) + 0.1 * np.sin(2 * np.pi * pitch * 0.5 * t) | |
| elif voice_name == "nova": | |
| audio = 0.4 * np.sin(2 * np.pi * pitch * t) + 0.4 * np.sin(2 * np.pi * pitch * 0.5 * t) | |
| else: # shimmer | |
| audio = 0.3 * np.sin(2 * np.pi * pitch * t) + 0.2 * np.sin(2 * np.pi * pitch * 1.5 * t) + 0.1 * np.sin(2 * np.pi * pitch * 2 * t) | |
| # Normalize | |
| audio = audio / np.max(np.abs(audio)) | |
| # Convert to tensor | |
| audio_tensor = torch.tensor(audio, dtype=torch.float32) | |
| # Create voice memory | |
| memory = VoiceMemory( | |
| name=voice_name, | |
| speaker_id=speaker_id, | |
| audio_segments=[audio_tensor], | |
| text_segments=[f"This is the voice of {voice_name}"], | |
| pitch_base=characteristics["pitch"], | |
| timbre=characteristics["timbre"] | |
| ) | |
| # Save the voice memory to persistent storage | |
| memory.save() | |
| # Store in dictionary | |
| VOICE_MEMORIES[voice_name] = memory | |
| # Save as wav for reference | |
| save_path = os.path.join(VOICE_MEMORIES_DIR, f"{voice_name}_seed.wav") | |
| torchaudio.save(save_path, audio_tensor.unsqueeze(0), sample_rate) | |
| logger.info(f"Initialized new voice memory for {voice_name}") | |
| def get_voice_context(voice_name: str, device: torch.device, max_segments: int = 2) -> List[Segment]: | |
| """Get context segments for a given voice.""" | |
| if not VOICE_MEMORIES: | |
| initialize_voices() | |
| if voice_name in VOICE_MEMORIES: | |
| return VOICE_MEMORIES[voice_name].get_context_segments(device, max_segments=max_segments) | |
| # Default to alloy if voice not found | |
| logger.warning(f"Voice {voice_name} not found, defaulting to alloy") | |
| return VOICE_MEMORIES["alloy"].get_context_segments(device, max_segments=max_segments) | |
| def update_voice_memory(voice_name: str, audio: torch.Tensor, text: str): | |
| """Update voice memory with newly generated audio and save to persistent storage.""" | |
| if not VOICE_MEMORIES: | |
| initialize_voices() | |
| if voice_name in VOICE_MEMORIES: | |
| VOICE_MEMORIES[voice_name].update_with_new_audio(audio, text) | |
| VOICE_MEMORIES[voice_name].save() | |
| logger.info(f"Updated voice memory for {voice_name}, now has {len(VOICE_MEMORIES[voice_name].audio_segments)} segments") | |
| def generate_voice_samples(app_state): | |
| """Generate high-quality voice samples for each voice. | |
| Args: | |
| app_state: The FastAPI app state containing the generator | |
| """ | |
| generator = app_state.generator | |
| if not generator: | |
| logger.error("Cannot generate voice samples: generator not available") | |
| return | |
| logger.info("Beginning voice sample generation...") | |
| # Ensure persistent directory exists | |
| os.makedirs(VOICE_MEMORIES_DIR, exist_ok=True) | |
| for voice_name in ["alloy", "echo", "fable", "onyx", "nova", "shimmer"]: | |
| speaker_id = ["alloy", "echo", "fable", "onyx", "nova", "shimmer"].index(voice_name) | |
| # Get multiple sample texts for this voice | |
| sample_texts = VOICE_INTROS[voice_name] | |
| # Generate a collection of samples for this voice | |
| logger.info(f"Generating samples for voice: {voice_name}") | |
| audio_segments = [] | |
| text_segments = [] | |
| for i, sample_text in enumerate(sample_texts): | |
| try: | |
| # Check if we already have a sample | |
| sample_path = os.path.join(VOICE_MEMORIES_DIR, f"{voice_name}_sample_{i}.wav") | |
| if os.path.exists(sample_path): | |
| logger.info(f"Found existing sample {i+1} for {voice_name}, loading from {sample_path}") | |
| audio_tensor, sr = torchaudio.load(sample_path) | |
| if sr != generator.sample_rate: | |
| audio_tensor = torchaudio.functional.resample( | |
| audio_tensor.squeeze(0), orig_freq=sr, new_freq=generator.sample_rate | |
| ) | |
| else: | |
| audio_tensor = audio_tensor.squeeze(0) | |
| audio_segments.append(audio_tensor) | |
| text_segments.append(sample_text) | |
| continue | |
| # Generate without context first for seed samples | |
| logger.info(f"Generating sample {i+1}/{len(sample_texts)} for {voice_name}: '{sample_text}'") | |
| # Use a lower temperature for more stable output | |
| audio = generator.generate( | |
| text=sample_text, | |
| speaker=speaker_id, | |
| context=[], # No context for initial samples | |
| max_audio_length_ms=10000, | |
| temperature=0.7, # Lower temperature for more stable output | |
| topk=30, | |
| ) | |
| # Save this segment | |
| audio_segments.append(audio.detach().cpu()) | |
| text_segments.append(sample_text) | |
| # Save as WAV for reference to persistent storage | |
| torchaudio.save(sample_path, audio.unsqueeze(0).cpu(), generator.sample_rate) | |
| logger.info(f"Generated sample {i+1} for {voice_name}, length: {audio.shape[0]/generator.sample_rate:.2f}s") | |
| except Exception as e: | |
| logger.error(f"Error generating sample {i+1} for {voice_name}: {e}") | |
| # Use the generated samples to update the voice memory | |
| if voice_name in VOICE_MEMORIES and audio_segments: | |
| # Replace existing samples with these high quality ones | |
| VOICE_MEMORIES[voice_name].audio_segments = audio_segments | |
| VOICE_MEMORIES[voice_name].text_segments = text_segments | |
| VOICE_MEMORIES[voice_name].save() | |
| logger.info(f"Updated voice memory for {voice_name} with {len(audio_segments)} high-quality samples") | |
| # Now generate a second pass with context from these samples | |
| if len(audio_segments) >= 2: | |
| try: | |
| # Check if we already have a character sample | |
| character_path = os.path.join(VOICE_MEMORIES_DIR, f"{voice_name}_character.wav") | |
| if os.path.exists(character_path): | |
| logger.info(f"Found existing character sample for {voice_name}, loading from {character_path}") | |
| audio_tensor, sr = torchaudio.load(character_path) | |
| if sr != generator.sample_rate: | |
| audio_tensor = torchaudio.functional.resample( | |
| audio_tensor.squeeze(0), orig_freq=sr, new_freq=generator.sample_rate | |
| ) | |
| else: | |
| audio_tensor = audio_tensor.squeeze(0) | |
| character_sample_text = f"I'm the voice assistant known as {voice_name}. I'm designed to have a distinctive voice that you can easily recognize." | |
| VOICE_MEMORIES[voice_name].audio_segments.append(audio_tensor) | |
| VOICE_MEMORIES[voice_name].text_segments.append(character_sample_text) | |
| VOICE_MEMORIES[voice_name].save() | |
| continue | |
| # Get intro and conclusion prompts that build voice consistency | |
| context = [ | |
| Segment( | |
| speaker=speaker_id, | |
| text=text_segments[0], | |
| audio=audio_segments[0].to(generator.device) | |
| ) | |
| ] | |
| # Create a longer sample with the voice characteristics now established | |
| character_sample_text = f"I'm the voice assistant known as {voice_name}. I'm designed to have a distinctive voice that you can easily recognize. My speech patterns and tone should remain consistent throughout our conversation." | |
| logger.info(f"Generating character sample for {voice_name} with context") | |
| character_audio = generator.generate( | |
| text=character_sample_text, | |
| speaker=speaker_id, | |
| context=context, | |
| max_audio_length_ms=15000, | |
| temperature=0.7, | |
| topk=30, | |
| ) | |
| # Save this comprehensive character sample to persistent storage | |
| torchaudio.save(character_path, character_audio.unsqueeze(0).cpu(), generator.sample_rate) | |
| # Add this to the memory as well | |
| VOICE_MEMORIES[voice_name].audio_segments.append(character_audio.detach().cpu()) | |
| VOICE_MEMORIES[voice_name].text_segments.append(character_sample_text) | |
| VOICE_MEMORIES[voice_name].save() | |
| logger.info(f"Generated character sample for {voice_name}, length: {character_audio.shape[0]/generator.sample_rate:.2f}s") | |
| except Exception as e: | |
| logger.error(f"Error generating character sample for {voice_name}: {e}") | |
| def create_custom_voice( | |
| app_state, | |
| name: str, | |
| initial_text: str, | |
| speaker_id: int = 0, | |
| pitch: Optional[float] = None, | |
| timbre: str = "custom" | |
| ) -> Dict: | |
| """Create a new custom voice. | |
| Args: | |
| app_state: The FastAPI app state containing the generator | |
| name: Name for the new voice | |
| initial_text: Text for the initial voice sample | |
| speaker_id: Base speaker ID (0-5) | |
| pitch: Base pitch in Hz (optional) | |
| timbre: Voice quality descriptor | |
| Returns: | |
| Dict with creation status and voice info | |
| """ | |
| generator = app_state.generator | |
| if not generator: | |
| return {"status": "error", "message": "Generator not available"} | |
| # Check if voice already exists | |
| if not VOICE_MEMORIES: | |
| initialize_voices() | |
| if name in VOICE_MEMORIES: | |
| return {"status": "error", "message": f"Voice '{name}' already exists"} | |
| # Generate a voice sample | |
| try: | |
| logger.info(f"Creating custom voice '{name}' with text: '{initial_text}'") | |
| audio = generator.generate( | |
| text=initial_text, | |
| speaker=speaker_id, | |
| context=[], | |
| max_audio_length_ms=10000, | |
| temperature=0.7, | |
| ) | |
| # Determine base pitch if not provided | |
| if pitch is None: | |
| if speaker_id == 0: # alloy | |
| pitch = 220.0 | |
| elif speaker_id == 1: # echo | |
| pitch = 330.0 | |
| elif speaker_id == 2: # fable | |
| pitch = 523.0 | |
| elif speaker_id == 3: # onyx | |
| pitch = 165.0 | |
| elif speaker_id == 4: # nova | |
| pitch = 392.0 | |
| else: # shimmer | |
| pitch = 587.0 | |
| # Create a new voice memory | |
| memory = VoiceMemory( | |
| name=name, | |
| speaker_id=speaker_id, | |
| audio_segments=[audio.detach().cpu()], | |
| text_segments=[initial_text], | |
| pitch_base=pitch, | |
| timbre=timbre | |
| ) | |
| # Save the voice memory to persistent storage | |
| memory.save() | |
| VOICE_MEMORIES[name] = memory | |
| # Save sample as WAV for reference to persistent storage | |
| sample_path = os.path.join(VOICE_MEMORIES_DIR, f"{name}_sample.wav") | |
| torchaudio.save(sample_path, audio.unsqueeze(0).cpu(), generator.sample_rate) | |
| logger.info(f"Created custom voice '{name}' successfully") | |
| return { | |
| "status": "success", | |
| "message": f"Voice '{name}' created successfully", | |
| "voice": { | |
| "name": name, | |
| "speaker_id": speaker_id, | |
| "pitch": pitch, | |
| "timbre": timbre, | |
| "sample_length_seconds": audio.shape[0] / generator.sample_rate | |
| } | |
| } | |
| except Exception as e: | |
| logger.error(f"Error creating custom voice '{name}': {e}") | |
| return { | |
| "status": "error", | |
| "message": f"Error creating voice: {str(e)}" | |
| } |