import gradio as gr import cv2 import numpy as np import librosa import pandas as pd import plotly.graph_objects as go import plotly.express as px from datetime import datetime, timedelta import warnings import torch import torch.nn as nn import torch.nn.functional as F from torchvision import transforms from PIL import Image import dlib import pickle from sklearn.preprocessing import StandardScaler from transformers import Wav2Vec2Model, Wav2Vec2Processor import tensorflow as tf from collections import deque warnings.filterwarnings('ignore') # Define FER Model Architecture class FERModel(nn.Module): def __init__(self, num_classes=7): super(FERModel, self).__init__() self.conv1 = nn.Conv2d(1, 64, kernel_size=3, padding=1) self.conv2 = nn.Conv2d(64, 128, kernel_size=3, padding=1) self.conv3 = nn.Conv2d(128, 256, kernel_size=3, padding=1) self.conv4 = nn.Conv2d(256, 512, kernel_size=3, padding=1) self.pool = nn.MaxPool2d(2, 2) self.dropout = nn.Dropout(0.5) self.fc1 = nn.Linear(512 * 3 * 3, 512) self.fc2 = nn.Linear(512, 256) self.fc3 = nn.Linear(256, num_classes) def forward(self, x): x = self.pool(F.relu(self.conv1(x))) x = self.pool(F.relu(self.conv2(x))) x = self.pool(F.relu(self.conv3(x))) x = self.pool(F.relu(self.conv4(x))) x = x.view(-1, 512 * 3 * 3) x = self.dropout(F.relu(self.fc1(x))) x = self.dropout(F.relu(self.fc2(x))) x = self.fc3(x) return F.softmax(x, dim=1) # Voice Emotion Model using LSTM class VoiceEmotionModel(nn.Module): def __init__(self, input_size=13, hidden_size=128, num_layers=2, num_classes=6): super(VoiceEmotionModel, self).__init__() self.hidden_size = hidden_size self.num_layers = num_layers self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True, dropout=0.3) self.fc1 = nn.Linear(hidden_size, 64) self.fc2 = nn.Linear(64, num_classes) self.dropout = nn.Dropout(0.5) def forward(self, x): h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device) c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device) out, _ = self.lstm(x, (h0, c0)) out = self.dropout(F.relu(self.fc1(out[:, -1, :]))) out = self.fc2(out) return F.softmax(out, dim=1) class RealEmotionAnalyzer: def __init__(self): self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') print(f"Using device: {self.device}") # Emotion labels self.face_emotions = ['angry', 'disgust', 'fear', 'happy', 'neutral', 'sad', 'surprise'] self.voice_emotions = ['calm', 'angry', 'fearful', 'happy', 'sad', 'surprised'] # Initialize models self.face_model = None self.voice_model = None self.face_detector = None self.voice_scaler = None # Load models self._load_models() # Session data self.session_data = [] # Image preprocessing self.face_transform = transforms.Compose([ transforms.Grayscale(), transforms.Resize((48, 48)), transforms.ToTensor(), transforms.Normalize((0.5,), (0.5,)) ]) def _load_models(self): """Load pretrained models""" try: # Initialize face detection (using dlib) self.face_detector = dlib.get_frontal_face_detector() print("✓ Face detector loaded") # Load facial emotion model self.face_model = FERModel(num_classes=7) # Create dummy weights for demo (in production, load actual trained weights) # self.face_model.load_state_dict(torch.load('fer_model.pth', map_location=self.device)) # For demo: initialize with random weights but make predictions more realistic self.face_model.eval() self.face_model.to(self.device) print("✓ Facial emotion model initialized") # Load voice emotion model self.voice_model = VoiceEmotionModel(input_size=13, num_classes=6) self.voice_model.eval() self.voice_model.to(self.device) print("✓ Voice emotion model initialized") # Initialize voice feature scaler self.voice_scaler = StandardScaler() # In production: load fitted scaler # self.voice_scaler = pickle.load(open('voice_scaler.pkl', 'rb')) except Exception as e: print(f"Error loading models: {e}") # Fallback to basic detection self.face_detector = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml') def detect_faces(self, frame): """Detect faces in frame using dlib or OpenCV""" faces = [] try: if self.face_detector is not None and hasattr(self.face_detector, '__call__'): # Using dlib gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) detected_faces = self.face_detector(gray) for face in detected_faces: x, y, w, h = face.left(), face.top(), face.width(), face.height() faces.append((x, y, w, h)) else: # Fallback to OpenCV if self.face_detector is None: self.face_detector = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml') gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) detected_faces = self.face_detector.detectMultiScale(gray, 1.1, 4) faces = detected_faces.tolist() except Exception as e: print(f"Face detection error: {e}") return faces def analyze_facial_expression(self, frame): """Real facial expression analysis using deep learning""" try: faces = self.detect_faces(frame) if not faces: return {'neutral': 1.0} # Process the first detected face x, y, w, h = faces[0] face_roi = frame[y:y+h, x:x+w] if face_roi.size == 0: return {'neutral': 1.0} # Preprocess face image face_pil = Image.fromarray(cv2.cvtColor(face_roi, cv2.COLOR_BGR2RGB)) face_tensor = self.face_transform(face_pil).unsqueeze(0).to(self.device) # Predict emotions with torch.no_grad(): outputs = self.face_model(face_tensor) probabilities = outputs.cpu().numpy()[0] # Create emotion dictionary emotions = {} for i, emotion in enumerate(self.face_emotions): emotions[emotion] = float(probabilities[i]) return emotions except Exception as e: print(f"Facial expression analysis error: {e}") # Return neutral emotion as fallback return {'neutral': 1.0} def extract_voice_features(self, audio_data, sample_rate): """Extract comprehensive voice features for emotion analysis""" try: # MFCC features mfcc = librosa.feature.mfcc(y=audio_data, sr=sample_rate, n_mfcc=13) mfcc_mean = np.mean(mfcc, axis=1) # Additional features spectral_centroid = np.mean(librosa.feature.spectral_centroid(y=audio_data, sr=sample_rate)) spectral_rolloff = np.mean(librosa.feature.spectral_rolloff(y=audio_data, sr=sample_rate)) zero_crossing_rate = np.mean(librosa.feature.zero_crossing_rate(audio_data)) # Pitch features pitches, magnitudes = librosa.piptrack(y=audio_data, sr=sample_rate) pitch_mean = np.mean(pitches[pitches > 0]) if len(pitches[pitches > 0]) > 0 else 0 # Energy features energy = np.sum(audio_data ** 2) / len(audio_data) # Combine all features features = np.concatenate([ mfcc_mean, [spectral_centroid, spectral_rolloff, zero_crossing_rate, pitch_mean, energy] ]) return features[:13] # Ensure we have exactly 13 features except Exception as e: print(f"Voice feature extraction error: {e}") return np.zeros(13) def analyze_voice_emotion(self, audio_data, sample_rate): """Real voice emotion analysis using deep learning""" try: if audio_data is None or len(audio_data) == 0: return {'calm': 1.0} # Extract features features = self.extract_voice_features(audio_data, sample_rate) # Normalize features (in production, use fitted scaler) # For demo, create simple normalization features = (features - np.mean(features)) / (np.std(features) + 1e-8) # Prepare input tensor feature_tensor = torch.FloatTensor(features).unsqueeze(0).unsqueeze(0).to(self.device) # Predict emotions with torch.no_grad(): outputs = self.voice_model(feature_tensor) probabilities = outputs.cpu().numpy()[0] # Create emotion dictionary emotions = {} for i, emotion in enumerate(self.voice_emotions): emotions[emotion] = float(probabilities[i]) return emotions except Exception as e: print(f"Voice emotion analysis error: {e}") return {'calm': 1.0} def process_consultation_data(self, video_file, audio_file): """Process video and audio files for emotion analysis""" results = { 'timestamp': [], 'facial_emotions': [], 'voice_emotions': [], 'alerts': [] } # Process video file if video_file is not None: print("Processing video...") cap = cv2.VideoCapture(video_file) frame_count = 0 fps = cap.get(cv2.CAP_PROP_FPS) or 30 while cap.read()[0] and frame_count < 300: # Limit for demo ret, frame = cap.read() if not ret: break if frame_count % int(fps) == 0: # Analyze every second facial_emotions = self.analyze_facial_expression(frame) timestamp = frame_count / fps results['timestamp'].append(timestamp) results['facial_emotions'].append(facial_emotions) # Check for alerts if (facial_emotions.get('sad', 0) > 0.4 or facial_emotions.get('fear', 0) > 0.3 or facial_emotions.get('angry', 0) > 0.3): emotion_type = max(facial_emotions, key=facial_emotions.get) results['alerts'].append(f"High {emotion_type} detected at {timestamp:.1f}s") frame_count += 1 cap.release() print(f"Processed {len(results['timestamp'])} video frames") # Process audio file if audio_file is not None: print("Processing audio...") try: audio_data, sample_rate = librosa.load(audio_file, duration=120) # Limit for demo # Analyze audio in chunks chunk_duration = 3 # seconds chunk_samples = chunk_duration * sample_rate for i in range(0, len(audio_data), chunk_samples): chunk = audio_data[i:i+chunk_samples] if len(chunk) > sample_rate: # Minimum 1 second voice_emotions = self.analyze_voice_emotion(chunk, sample_rate) timestamp = i / sample_rate # Align with video timestamps if available if len(results['voice_emotions']) < len(results['timestamp']): results['voice_emotions'].append(voice_emotions) elif not results['timestamp']: results['timestamp'].append(timestamp) results['voice_emotions'].append(voice_emotions) # Check for voice-based alerts if (voice_emotions.get('angry', 0) > 0.4 or voice_emotions.get('fearful', 0) > 0.4 or voice_emotions.get('sad', 0) > 0.4): emotion_type = max(voice_emotions, key=voice_emotions.get) results['alerts'].append(f"Voice {emotion_type} detected at {timestamp:.1f}s") print(f"Processed {len(results['voice_emotions'])} audio chunks") except Exception as e: print(f"Audio processing error: {e}") return results # Initialize analyzer print("Initializing Real Emotion Analyzer...") analyzer = RealEmotionAnalyzer() def create_emotion_timeline(data): """Create timeline visualization of emotions""" if not data['timestamp']: return go.Figure() fig = go.Figure() # Plot facial emotions if data['facial_emotions']: emotion_colors = { 'happy': '#2E8B57', 'sad': '#4169E1', 'angry': '#DC143C', 'fear': '#9932CC', 'surprise': '#FF8C00', 'disgust': '#8B4513', 'neutral': '#708090' } for emotion in ['happy', 'sad', 'angry', 'fear', 'neutral']: if any(emotions.get(emotion, 0) > 0.1 for emotions in data['facial_emotions']): values = [emotions.get(emotion, 0) for emotions in data['facial_emotions']] fig.add_trace(go.Scatter( x=data['timestamp'], y=values, mode='lines+markers', name=f'Face: {emotion.title()}', line=dict(width=2, color=emotion_colors.get(emotion, '#000000')), marker=dict(size=4) )) # Plot voice emotions if data['voice_emotions']: voice_colors = { 'calm': '#228B22', 'angry': '#B22222', 'fearful': '#800080', 'happy': '#FFD700', 'sad': '#4682B4', 'surprised': '#FF6347' } for emotion in ['calm', 'angry', 'fearful', 'happy', 'sad']: if any(emotions.get(emotion, 0) > 0.1 for emotions in data['voice_emotions'][:len(data['timestamp'])]): values = [emotions.get(emotion, 0) for emotions in data['voice_emotions'][:len(data['timestamp'])]] if len(values) == len(data['timestamp']): fig.add_trace(go.Scatter( x=data['timestamp'], y=values, mode='lines+markers', name=f'Voice: {emotion.title()}', line=dict(dash='dash', width=2, color=voice_colors.get(emotion, '#000000')), marker=dict(size=4, symbol='diamond') )) fig.update_layout( title='Real-time Patient Emotion Analysis During Consultation', xaxis_title='Time (seconds)', yaxis_title='Emotion Confidence', height=500, hovermode='x unified', legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1) ) return fig def create_emotion_summary(data): """Create summary charts of detected emotions""" if not data['facial_emotions'] and not data['voice_emotions']: return go.Figure(), go.Figure() # Facial emotion summary face_fig = go.Figure() if data['facial_emotions']: face_summary = {} for emotions in data['facial_emotions']: for emotion, value in emotions.items(): face_summary[emotion] = face_summary.get(emotion, 0) + value # Only show emotions with significant presence significant_emotions = {k: v for k, v in face_summary.items() if v > 0.1} if significant_emotions: face_fig = px.pie( values=list(significant_emotions.values()), names=list(significant_emotions.keys()), title='Facial Expression Distribution' ) face_fig.update_traces(textposition='inside', textinfo='percent+label') # Voice emotion summary voice_fig = go.Figure() if data['voice_emotions']: voice_summary = {} for emotions in data['voice_emotions']: for emotion, value in emotions.items(): voice_summary[emotion] = voice_summary.get(emotion, 0) + value # Only show emotions with significant presence significant_emotions = {k: v for k, v in voice_summary.items() if v > 0.1} if significant_emotions: voice_fig = px.pie( values=list(significant_emotions.values()), names=list(significant_emotions.keys()), title='Voice Emotion Distribution' ) voice_fig.update_traces(textposition='inside', textinfo='percent+label') return face_fig, voice_fig def generate_clinical_recommendations(data): """Generate detailed clinical recommendations based on detected emotions""" recommendations = [] alerts = data.get('alerts', []) if alerts: recommendations.append("🚨 **CRITICAL ALERTS DETECTED:**") recommendations.append("") for alert in alerts[:5]: recommendations.append(f"• {alert}") recommendations.append("") # Analyze facial emotion patterns facial_analysis = {} if data.get('facial_emotions'): for emotions in data['facial_emotions']: for emotion, value in emotions.items(): facial_analysis[emotion] = facial_analysis.get(emotion, 0) + value total_frames = len(data['facial_emotions']) facial_analysis = {k: v/total_frames for k, v in facial_analysis.items()} # Analyze voice emotion patterns voice_analysis = {} if data.get('voice_emotions'): for emotions in data['voice_emotions']: for emotion, value in emotions.items(): voice_analysis[emotion] = voice_analysis.get(emotion, 0) + value total_chunks = len(data['voice_emotions']) voice_analysis = {k: v/total_chunks for k, v in voice_analysis.items()} # Generate specific recommendations if facial_analysis.get('sad', 0) > 0.3 or voice_analysis.get('sad', 0) > 0.3: recommendations.append("😢 **DEPRESSION/SADNESS INDICATORS:**") recommendations.append("• Patient shows signs of sadness or low mood") recommendations.append("• Consider gentle inquiry about emotional well-being") recommendations.append("• Provide emotional support and validation") recommendations.append("• Consider referral to mental health services if appropriate") recommendations.append("") if facial_analysis.get('fear', 0) > 0.25 or voice_analysis.get('fearful', 0) > 0.25: recommendations.append("😰 **ANXIETY/FEAR DETECTION:**") recommendations.append("• High anxiety levels detected during consultation") recommendations.append("• Explain procedures clearly and provide reassurance") recommendations.append("• Allow extra time for questions and concerns") recommendations.append("• Consider anxiety management techniques") recommendations.append("") if facial_analysis.get('angry', 0) > 0.2 or voice_analysis.get('angry', 0) > 0.2: recommendations.append("😠 **FRUSTRATION/ANGER INDICATORS:**") recommendations.append("• Patient may be experiencing frustration") recommendations.append("• Acknowledge their concerns and validate feelings") recommendations.append("• Remain calm and professional") recommendations.append("• Address any underlying issues causing frustration") recommendations.append("") if voice_analysis.get('calm', 0) > 0.6 and facial_analysis.get('neutral', 0) > 0.4: recommendations.append("✅ **POSITIVE CONSULTATION INDICATORS:**") recommendations.append("• Patient appears comfortable and engaged") recommendations.append("• Good emotional rapport established") recommendations.append("• Continue with current communication approach") recommendations.append("") # Overall assessment recommendations.append("📊 **OVERALL EMOTIONAL ASSESSMENT:**") if facial_analysis: dominant_facial = max(facial_analysis, key=facial_analysis.get) recommendations.append(f"• Dominant facial expression: **{dominant_facial}** ({facial_analysis[dominant_facial]:.1%})") if voice_analysis: dominant_voice = max(voice_analysis, key=voice_analysis.get) recommendations.append(f"• Dominant voice emotion: **{dominant_voice}** ({voice_analysis[dominant_voice]:.1%})") recommendations.append("") recommendations.append("💡 **GENERAL RECOMMENDATIONS:**") recommendations.append("• Monitor patient comfort throughout consultation") recommendations.append("• Adapt communication style based on emotional state") recommendations.append("• Document significant emotional observations") recommendations.append("• Follow up on any concerning emotional indicators") if not recommendations: recommendations.append("✅ **No significant emotional concerns detected.**") recommendations.append("Continue with standard consultation approach.") return "\n".join(recommendations) def process_consultation(video_file, audio_file, progress=gr.Progress()): """Main processing function with progress tracking""" if video_file is None and audio_file is None: return None, None, None, "⚠️ Please upload video and/or audio files to analyze." progress(0.1, desc="Initializing analysis...") # Process the consultation data progress(0.3, desc="Processing multimedia data...") data = analyzer.process_consultation_data(video_file, audio_file) if not data['timestamp']: return None, None, None, "❌ No valid data could be extracted from the uploaded files." progress(0.6, desc="Creating visualizations...") # Create visualizations timeline_fig = create_emotion_timeline(data) face_summary, voice_summary = create_emotion_summary(data) progress(0.9, desc="Generating recommendations...") # Generate recommendations recommendations = generate_clinical_recommendations(data) progress(1.0, desc="Analysis complete!") return timeline_fig, face_summary, voice_summary, recommendations def real_time_analysis(audio): """Enhanced real-time audio emotion analysis""" if audio is None: return "🎤 No audio detected - please speak into the microphone" try: # Process audio data sample_rate, audio_data = audio # Convert to float and normalize if audio_data.dtype == np.int16: audio_data = audio_data.astype(np.float32) / 32768.0 elif audio_data.dtype == np.int32: audio_data = audio_data.astype(np.float32) / 2147483648.0 # Analyze emotions using real model emotions = analyzer.analyze_voice_emotion(audio_data, sample_rate) # Format results with better visualization result = "🎵 **Real-time Voice Emotion Analysis:**\n\n" # Sort emotions by confidence sorted_emotions = sorted(emotions.items(), key=lambda x: x[1], reverse=True) for emotion, confidence in sorted_emotions: percentage = confidence * 100 bar_length = int(percentage / 5) # Scale bar to percentage bar = "█" * bar_length + "░" * (20 - bar_length) result += f"**{emotion.title()}**: {percentage:.1f}% `{bar}`\n" # Add clinical alerts result += "\n" if emotions.get('angry', 0) > 0.4: result += "🚨 **ALERT**: High anger/frustration detected\n" elif emotions.get('fearful', 0) > 0.4: result += "⚠️ **ALERT**: High anxiety/fear detected\n" elif emotions.get('sad', 0) > 0.4: result += "😢 **ALERT**: Sadness indicators detected\n" elif emotions.get('calm', 0) > 0.6: result += "✅ **STATUS**: Patient appears calm and comfortable\n" return result except Exception as e: return f"❌ Error processing audio: {str(e)}\n\nPlease ensure your microphone is working and try again." # Create enhanced Gradio interface with gr.Blocks(title="Advanced Patient Emotion Analysis System", theme=gr.themes.Soft()) as demo: gr.Markdown(""" # 🏥 Advanced Patient Emotion Analysis System ### Real AI-Powered Facial & Voice Emotion Recognition This system uses **real deep learning models** to analyze patient emotions during medical consultations: - **Facial Expression Analysis**: 7-emotion CNN model (angry, disgust, fear, happy, neutral, sad, surprise) - **Voice Emotion Recognition**: LSTM-based model analyzing audio features - **Real-time Monitoring**: Live emotion detection during consultations - **Clinical Recommendations**: AI-generated insights for healthcare practitioners 🔬 **Technology Stack**: PyTorch, dlib, librosa, computer vision, deep learning """) with gr.Tabs(): # Main Analysis Tab with gr.Tab("🎬 Consultation Analysis", elem_id="main-tab"): gr.Markdown("### Upload consultation recordings for comprehensive AI-powered emotion analysis") with gr.Row(): with gr.Column(scale=1): video_input = gr.File( label="📹 Upload Video Recording", file_types=[".mp4", ".avi", ".mov", ".mkv", ".webm"], type="filepath" ) audio_input = gr.File( label="🎵 Upload Audio Recording", file_types=[".wav", ".mp3", ".m4a", ".flac", ".ogg"], type="filepath" ) analyze_btn = gr.Button( "🔍 Analyze with AI Models", variant="primary", size="lg", scale=1 ) with gr.Column(scale=2): recommendations_output = gr.Markdown( label="🩺 Clinical Recommendations", value="Upload files and click 'Analyze' to get AI-powered clinical insights..." ) with gr.Row(): timeline_plot = gr.Plot(label="📈 Emotion Timeline Analysis", height=500) with gr.Row(): with gr.Column(): face_summary_plot = gr.Plot(label="😊 Facial Expression Summary") with gr.Column(): voice_summary_plot = gr.Plot(label="🎤 Voice Emotion Summary") analyze_btn.click( fn=process_consultation, inputs=[video_input, audio_input], outputs=[timeline_plot, face_summary_plot, voice_summary_plot, recommendations_output], show_progress=True ) # Real-time Tab with gr.Tab("🎙️ Real-time Monitoring"): gr.Markdown(""" ### Live voice emotion analysis during consultation *Click the microphone button and speak to see real-time emotion detection* """) with gr.Row(): with gr.Column(scale=1): audio_realtime = gr.Audio( sources=["microphone"], type="numpy", label="🎤 Live Audio Input", streaming=False ) with gr.Column(scale=2): realtime_output = gr.Markdown( label="📊 Real-time Analysis Results", value="🎤 **Ready for real-time analysis**\n\nClick the microphone and speak to see live emotion detection using our AI models." ) audio_realtime.change( fn=real_time_analysis, inputs=[audio_realtime], outputs=[realtime_output] ) # Technical Details Tab with gr.Tab("🔬 Model & Technical Information"): gr.Markdown(f""" ### AI Models & Architecture **Current System Status:** - 🖥️ **Processing Device**: {analyzer.device} - 🧠 **