import gradio as gr import torch import numpy as np import librosa import soundfile as sf from transformers import pipeline, AutoTokenizer, AutoModelForCausalLM import warnings import json import time from datetime import datetime import os import sys import gc # Import with enhanced error handling try: from dia.model import Dia DIA_AVAILABLE = True print("✅ Dia TTS library imported successfully") except ImportError as e: print(f"⚠️ Dia TTS not available: {e}") DIA_AVAILABLE = False warnings.filterwarnings("ignore") # Global models asr_pipe = None qwen_model = None qwen_tokenizer = None tts_model = None tts_type = None class ConversationManager: def __init__(self, max_exchanges=5): self.history = [] self.max_exchanges = max_exchanges self.current_emotion = "neutral" def add_exchange(self, user_input, ai_response, emotion="neutral"): self.history.append({ "timestamp": datetime.now().isoformat(), "user": user_input, "ai": ai_response, "emotion": emotion }) if len(self.history) > self.max_exchanges: self.history = self.history[-self.max_exchanges:] def get_context(self): context = "" for exchange in self.history[-3:]: context += f"User: {exchange['user']}\nAI: {exchange['ai']}\n" return context def clear(self): self.history = [] self.current_emotion = "neutral" def optimize_gpu_memory(): """Optimize GPU memory usage""" if torch.cuda.is_available(): torch.cuda.empty_cache() torch.cuda.synchronize() gc.collect() def check_system_info(): """Check system capabilities""" print("🔍 System Information:") print(f"Python: {sys.version}") print(f"PyTorch: {torch.__version__}") if torch.cuda.is_available(): print(f"✅ CUDA: {torch.cuda.get_device_name()}") print(f"💾 GPU Memory: {torch.cuda.get_device_properties(0).total_memory / 1e9:.1f} GB") print(f"🔥 CUDA Version: {torch.version.cuda}") # Check current memory usage allocated = torch.cuda.memory_allocated() / 1e9 cached = torch.cuda.memory_reserved() / 1e9 print(f"📊 Current GPU Usage: {allocated:.1f}GB allocated, {cached:.1f}GB cached") else: print("⚠️ CUDA not available, using CPU") def load_models(): """Load all models with FIXED Dia loading""" global asr_pipe, qwen_model, qwen_tokenizer, tts_model, tts_type print("🚀 Loading Maya AI models...") optimize_gpu_memory() # Load ASR model (Whisper) print("🎤 Loading Whisper for ASR...") try: asr_pipe = pipeline( "automatic-speech-recognition", model="openai/whisper-base", torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32, device=0 if torch.cuda.is_available() else -1 ) print("✅ Whisper ASR loaded successfully!") optimize_gpu_memory() except Exception as e: print(f"❌ Error loading Whisper: {e}") return False # Load Qwen model print("🧠 Loading Qwen2.5-1.5B for conversation...") try: model_name = "Qwen/Qwen2.5-1.5B-Instruct" qwen_tokenizer = AutoTokenizer.from_pretrained( model_name, trust_remote_code=True ) qwen_model = AutoModelForCausalLM.from_pretrained( model_name, torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32, device_map="auto" if torch.cuda.is_available() else None, trust_remote_code=True ) print("✅ Qwen loaded successfully!") optimize_gpu_memory() except Exception as e: print(f"❌ Error loading Qwen: {e}") return False # FIXED: Load Dia TTS without unsupported parameters if DIA_AVAILABLE: try: print("Attempting to load Dia TTS with FIXED parameters...") # Clear memory before loading Dia optimize_gpu_memory() # FIXED: Remove unsupported parameters tts_model = Dia.from_pretrained( "nari-labs/Dia-1.6B", compute_dtype="float16" if torch.cuda.is_available() else "float32" # Removed: low_cpu_mem_usage=True (not supported by Dia) ) # Move to GPU if available if torch.cuda.is_available(): tts_model = tts_model.cuda() tts_type = "dia" print("✅ Dia TTS loaded successfully!") optimize_gpu_memory() return True except Exception as e: print(f"⚠️ Dia TTS failed to load: {e}") tts_model = None print("⚠️ No TTS available, running in text-only mode") tts_type = "none" return True def detect_emotion_from_text(text): """Enhanced emotion detection from text""" text_lower = text.lower() emotions = { 'happy': ['happy', 'great', 'awesome', 'wonderful', 'excited', 'laugh', 'amazing', 'fantastic', 'excellent', 'brilliant', 'perfect', 'love', 'joy', 'cheerful'], 'sad': ['sad', 'upset', 'disappointed', 'cry', 'terrible', 'awful', 'depressed', 'miserable', 'heartbroken', 'devastated', 'gloomy', 'melancholy'], 'angry': ['angry', 'mad', 'furious', 'annoyed', 'frustrated', 'hate', 'rage', 'irritated', 'outraged', 'livid', 'enraged'], 'surprised': ['wow', 'incredible', 'surprised', 'unbelievable', 'shocking', 'astonishing', 'remarkable', 'extraordinary', 'mind-blowing'], 'neutral': [] } emotion_scores = {} for emotion, keywords in emotions.items(): score = sum(1 for keyword in keywords if keyword in text_lower) if score > 0: emotion_scores[emotion] = score if emotion_scores: return max(emotion_scores, key=emotion_scores.get) return 'neutral' def speech_to_text_with_emotion(audio_input): """Enhanced STT with proper audio processing""" try: if audio_input is None: return "", "neutral" print("🎤 Processing audio input...") if isinstance(audio_input, tuple): sample_rate, audio_data = audio_input print(f"Audio input: sample_rate={sample_rate}, shape={audio_data.shape}") # Handle different audio formats if audio_data.dtype == np.int16: audio_data = audio_data.astype(np.float32) / 32768.0 elif audio_data.dtype == np.int32: audio_data = audio_data.astype(np.float32) / 2147483648.0 elif audio_data.dtype != np.float32: audio_data = audio_data.astype(np.float32) # Handle stereo audio if len(audio_data.shape) > 1: audio_data = audio_data.mean(axis=1) else: audio_data = audio_input sample_rate = 16000 # Validate audio if len(audio_data) < 1600: return "Audio too short, please speak for at least 1 second", "neutral" max_amplitude = np.max(np.abs(audio_data)) if max_amplitude < 0.01: return "Audio too quiet, please speak louder", "neutral" # Normalize audio if max_amplitude > 0: audio_data = audio_data / max_amplitude * 0.95 # Resample to 16kHz if needed if sample_rate != 16000: print(f"Resampling from {sample_rate}Hz to 16000Hz...") audio_data = librosa.resample(audio_data, orig_sr=sample_rate, target_sr=16000) print("🔄 Running Whisper ASR...") result = asr_pipe(audio_data, language='en') # Force English transcription = result['text'].strip() print(f"Transcription: '{transcription}'") if not transcription or len(transcription) < 2: return "No clear speech detected, please try speaking more clearly", "neutral" emotion = detect_emotion_from_text(transcription) print(f"Detected emotion: {emotion}") return transcription, emotion except Exception as e: print(f"❌ Error in STT: {e}") return "Sorry, I couldn't understand that. Please try again.", "neutral" def generate_contextual_response(user_input, emotion, conversation_manager): """Enhanced response generation with memory optimization""" try: optimize_gpu_memory() context = conversation_manager.get_context() emotional_prompts = { "happy": "Respond with genuine enthusiasm and joy. Use positive language and show excitement.", "sad": "Respond with empathy and comfort. Be gentle and understanding.", "angry": "Respond calmly and try to help. Be patient and de-escalate.", "surprised": "Share in their surprise and show curiosity. Be engaging.", "neutral": "Respond naturally and conversationally. Be helpful and friendly." } system_prompt = f"""You are Maya, a friendly AI assistant with emotional intelligence. {emotional_prompts.get(emotion, emotional_prompts['neutral'])} Previous context: {context} User emotion: {emotion} Guidelines: - Keep responses very concise (1 sentence maximum) - Be natural and conversational - Show empathy and understanding - Provide helpful responses """ messages = [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_input} ] text = qwen_tokenizer.apply_chat_template( messages, tokenize=False, add_generation_prompt=True ) model_inputs = qwen_tokenizer([text], return_tensors="pt") if torch.cuda.is_available(): model_inputs = model_inputs.to(qwen_model.device) with torch.no_grad(): generated_ids = qwen_model.generate( model_inputs.input_ids, max_new_tokens=50, do_sample=True, temperature=0.7, top_p=0.9, repetition_penalty=1.1, pad_token_id=qwen_tokenizer.eos_token_id, attention_mask=model_inputs.attention_mask ) generated_ids = [ output_ids[len(input_ids):] for input_ids, output_ids in zip(model_inputs.input_ids, generated_ids) ] response = qwen_tokenizer.batch_decode(generated_ids, skip_special_tokens=True)[0] response = response.strip() if response.startswith("Maya:"): response = response[5:].strip() optimize_gpu_memory() return response except Exception as e: print(f"Error in response generation: {e}") return "I'm sorry, I'm having trouble processing that right now." def text_to_speech_emotional(text, emotion="neutral"): """FIXED TTS with proper Dia configuration""" try: if tts_model is None: print(f"🔊 Maya says ({emotion}): {text}") return None optimize_gpu_memory() if tts_type == "dia": # Simplified text processing for Dia enhanced_text = f"[S1] {text}" # Limit text length to prevent memory issues if len(enhanced_text) > 200: enhanced_text = enhanced_text[:200] + "..." print(f"Generating Dia TTS for: {enhanced_text}") try: with torch.no_grad(): audio_output = tts_model.generate( enhanced_text, use_torch_compile=False, verbose=False ) # Enhanced audio processing if isinstance(audio_output, torch.Tensor): audio_output = audio_output.cpu().numpy() # Ensure proper audio format if len(audio_output.shape) > 1: audio_output = audio_output.squeeze() # Conservative normalization if len(audio_output) > 0: # Remove DC offset audio_output = audio_output - np.mean(audio_output) # Gentle normalization max_val = np.max(np.abs(audio_output)) if max_val > 0: audio_output = audio_output / max_val * 0.8 # Ensure correct data type audio_output = audio_output.astype(np.float32) # Validate audio output if np.any(np.isnan(audio_output)) or np.any(np.isinf(audio_output)): print("❌ Audio contains NaN or Inf values") return None print(f"✅ Generated audio: shape={audio_output.shape}, dtype={audio_output.dtype}, range=[{audio_output.min():.3f}, {audio_output.max():.3f}]") optimize_gpu_memory() return (44100, audio_output) except Exception as e: print(f"❌ Error in Dia generation: {e}") optimize_gpu_memory() return None else: print(f"🔊 Maya says ({emotion}): {text}") return None except Exception as e: print(f"❌ Error in TTS: {e}") optimize_gpu_memory() print(f"🔊 Maya says ({emotion}): {text}") return None # Initialize conversation manager conv_manager = ConversationManager() def start_call(): """Initialize call and return greeting""" conv_manager.clear() optimize_gpu_memory() greeting_text = "Hello! I'm Maya. How can I help you today?" greeting_audio = text_to_speech_emotional(greeting_text, "happy") tts_status = f"Using {tts_type.upper()} TTS" if tts_type != "none" else "Text-only mode" return greeting_audio, greeting_text, f"📞 Call started! Maya is ready. {tts_status}" def process_conversation(audio_input): """Main conversation processing pipeline""" if audio_input is None: return None, "Please record some audio first.", "", "❌ No audio input received." try: print("🔄 Processing conversation...") optimize_gpu_memory() # STT + Emotion Detection user_text, emotion = speech_to_text_with_emotion(audio_input) # Check for STT errors error_phrases = ["audio too short", "audio too quiet", "no clear speech", "sorry", "couldn't understand"] if any(phrase in user_text.lower() for phrase in error_phrases): return None, user_text, "", f"❌ STT Issue: {user_text}" if not user_text or user_text.strip() == "": return None, "I didn't catch that. Please speak louder and closer to the microphone.", "", "❌ No speech detected." # Generate response ai_response = generate_contextual_response(user_text, emotion, conv_manager) # Convert to speech response_audio = text_to_speech_emotional(ai_response, emotion) # Update history conv_manager.add_exchange(user_text, ai_response, emotion) # Memory status if torch.cuda.is_available(): allocated = torch.cuda.memory_allocated() / 1e9 status = f"✅ Success! | Emotion: {emotion} | Exchange: {len(conv_manager.history)}/5 | GPU: {allocated:.1f}GB" else: status = f"✅ Success! | Emotion: {emotion} | Exchange: {len(conv_manager.history)}/5" return response_audio, ai_response, user_text, status except Exception as e: error_msg = f"❌ Error: {str(e)}" print(error_msg) optimize_gpu_memory() return None, "I'm sorry, I encountered an error. Please try again.", "", error_msg def get_conversation_history(): """Return conversation history""" if not conv_manager.history: return "No conversation history yet. Start a call to begin!" history_text = "📋 **Conversation History:**\n\n" for i, exchange in enumerate(conv_manager.history, 1): timestamp = exchange['timestamp'][:19].replace('T', ' ') history_text += f"**Exchange {i}** ({timestamp}) - Emotion: {exchange['emotion']}\n" history_text += f"👤 **You:** {exchange['user']}\n" history_text += f"🤖 **Maya:** {exchange['ai']}\n\n" return history_text def end_call(): """End call with memory cleanup""" farewell_text = "Thank you for talking with me! Have a wonderful day!" farewell_audio = text_to_speech_emotional(farewell_text, "happy") conv_manager.clear() optimize_gpu_memory() return farewell_audio, farewell_text, "📞❌ Call ended. Thank you!" def create_interface(): """Create Gradio interface""" with gr.Blocks( title="Maya AI - Speech-to-Speech Assistant", theme=gr.themes.Soft() ) as demo: gr.HTML("""
Advanced Speech-to-Speech Conversational AI
Natural • Emotional • Contextual • Intelligent
🔧 Fixed Issues: