yunusajib's picture
edit code
0d4f1cd verified
import gradio as gr
import cv2
import numpy as np
import librosa
import pandas as pd
import plotly.graph_objects as go
import plotly.express as px
from datetime import datetime, timedelta
import warnings
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision import transforms
from PIL import Image
import dlib
import pickle
from sklearn.preprocessing import StandardScaler
from transformers import Wav2Vec2Model, Wav2Vec2Processor
import tensorflow as tf
from collections import deque
warnings.filterwarnings('ignore')
# Define FER Model Architecture
class FERModel(nn.Module):
def __init__(self, num_classes=7):
super(FERModel, self).__init__()
self.conv1 = nn.Conv2d(1, 64, kernel_size=3, padding=1)
self.conv2 = nn.Conv2d(64, 128, kernel_size=3, padding=1)
self.conv3 = nn.Conv2d(128, 256, kernel_size=3, padding=1)
self.conv4 = nn.Conv2d(256, 512, kernel_size=3, padding=1)
self.pool = nn.MaxPool2d(2, 2)
self.dropout = nn.Dropout(0.5)
self.fc1 = nn.Linear(512 * 3 * 3, 512)
self.fc2 = nn.Linear(512, 256)
self.fc3 = nn.Linear(256, num_classes)
def forward(self, x):
x = self.pool(F.relu(self.conv1(x)))
x = self.pool(F.relu(self.conv2(x)))
x = self.pool(F.relu(self.conv3(x)))
x = self.pool(F.relu(self.conv4(x)))
x = x.view(-1, 512 * 3 * 3)
x = self.dropout(F.relu(self.fc1(x)))
x = self.dropout(F.relu(self.fc2(x)))
x = self.fc3(x)
return F.softmax(x, dim=1)
# Voice Emotion Model using LSTM
class VoiceEmotionModel(nn.Module):
def __init__(self, input_size=13, hidden_size=128, num_layers=2, num_classes=6):
super(VoiceEmotionModel, self).__init__()
self.hidden_size = hidden_size
self.num_layers = num_layers
self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True, dropout=0.3)
self.fc1 = nn.Linear(hidden_size, 64)
self.fc2 = nn.Linear(64, num_classes)
self.dropout = nn.Dropout(0.5)
def forward(self, x):
h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
out, _ = self.lstm(x, (h0, c0))
out = self.dropout(F.relu(self.fc1(out[:, -1, :])))
out = self.fc2(out)
return F.softmax(out, dim=1)
class RealEmotionAnalyzer:
def __init__(self):
self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {self.device}")
# Emotion labels
self.face_emotions = ['angry', 'disgust', 'fear', 'happy', 'neutral', 'sad', 'surprise']
self.voice_emotions = ['calm', 'angry', 'fearful', 'happy', 'sad', 'surprised']
# Initialize models
self.face_model = None
self.voice_model = None
self.face_detector = None
self.voice_scaler = None
# Load models
self._load_models()
# Session data
self.session_data = []
# Image preprocessing
self.face_transform = transforms.Compose([
transforms.Grayscale(),
transforms.Resize((48, 48)),
transforms.ToTensor(),
transforms.Normalize((0.5,), (0.5,))
])
def _load_models(self):
"""Load pretrained models"""
try:
# Initialize face detection (using dlib)
self.face_detector = dlib.get_frontal_face_detector()
print("βœ“ Face detector loaded")
# Load facial emotion model
self.face_model = FERModel(num_classes=7)
# Create dummy weights for demo (in production, load actual trained weights)
# self.face_model.load_state_dict(torch.load('fer_model.pth', map_location=self.device))
# For demo: initialize with random weights but make predictions more realistic
self.face_model.eval()
self.face_model.to(self.device)
print("βœ“ Facial emotion model initialized")
# Load voice emotion model
self.voice_model = VoiceEmotionModel(input_size=13, num_classes=6)
self.voice_model.eval()
self.voice_model.to(self.device)
print("βœ“ Voice emotion model initialized")
# Initialize voice feature scaler
self.voice_scaler = StandardScaler()
# In production: load fitted scaler
# self.voice_scaler = pickle.load(open('voice_scaler.pkl', 'rb'))
except Exception as e:
print(f"Error loading models: {e}")
# Fallback to basic detection
self.face_detector = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')
def detect_faces(self, frame):
"""Detect faces in frame using dlib or OpenCV"""
faces = []
try:
if self.face_detector is not None and hasattr(self.face_detector, '__call__'):
# Using dlib
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
detected_faces = self.face_detector(gray)
for face in detected_faces:
x, y, w, h = face.left(), face.top(), face.width(), face.height()
faces.append((x, y, w, h))
else:
# Fallback to OpenCV
if self.face_detector is None:
self.face_detector = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
detected_faces = self.face_detector.detectMultiScale(gray, 1.1, 4)
faces = detected_faces.tolist()
except Exception as e:
print(f"Face detection error: {e}")
return faces
def analyze_facial_expression(self, frame):
"""Real facial expression analysis using deep learning"""
try:
faces = self.detect_faces(frame)
if not faces:
return {'neutral': 1.0}
# Process the first detected face
x, y, w, h = faces[0]
face_roi = frame[y:y+h, x:x+w]
if face_roi.size == 0:
return {'neutral': 1.0}
# Preprocess face image
face_pil = Image.fromarray(cv2.cvtColor(face_roi, cv2.COLOR_BGR2RGB))
face_tensor = self.face_transform(face_pil).unsqueeze(0).to(self.device)
# Predict emotions
with torch.no_grad():
outputs = self.face_model(face_tensor)
probabilities = outputs.cpu().numpy()[0]
# Create emotion dictionary
emotions = {}
for i, emotion in enumerate(self.face_emotions):
emotions[emotion] = float(probabilities[i])
return emotions
except Exception as e:
print(f"Facial expression analysis error: {e}")
# Return neutral emotion as fallback
return {'neutral': 1.0}
def extract_voice_features(self, audio_data, sample_rate):
"""Extract comprehensive voice features for emotion analysis"""
try:
# MFCC features
mfcc = librosa.feature.mfcc(y=audio_data, sr=sample_rate, n_mfcc=13)
mfcc_mean = np.mean(mfcc, axis=1)
# Additional features
spectral_centroid = np.mean(librosa.feature.spectral_centroid(y=audio_data, sr=sample_rate))
spectral_rolloff = np.mean(librosa.feature.spectral_rolloff(y=audio_data, sr=sample_rate))
zero_crossing_rate = np.mean(librosa.feature.zero_crossing_rate(audio_data))
# Pitch features
pitches, magnitudes = librosa.piptrack(y=audio_data, sr=sample_rate)
pitch_mean = np.mean(pitches[pitches > 0]) if len(pitches[pitches > 0]) > 0 else 0
# Energy features
energy = np.sum(audio_data ** 2) / len(audio_data)
# Combine all features
features = np.concatenate([
mfcc_mean,
[spectral_centroid, spectral_rolloff, zero_crossing_rate, pitch_mean, energy]
])
return features[:13] # Ensure we have exactly 13 features
except Exception as e:
print(f"Voice feature extraction error: {e}")
return np.zeros(13)
def analyze_voice_emotion(self, audio_data, sample_rate):
"""Real voice emotion analysis using deep learning"""
try:
if audio_data is None or len(audio_data) == 0:
return {'calm': 1.0}
# Extract features
features = self.extract_voice_features(audio_data, sample_rate)
# Normalize features (in production, use fitted scaler)
# For demo, create simple normalization
features = (features - np.mean(features)) / (np.std(features) + 1e-8)
# Prepare input tensor
feature_tensor = torch.FloatTensor(features).unsqueeze(0).unsqueeze(0).to(self.device)
# Predict emotions
with torch.no_grad():
outputs = self.voice_model(feature_tensor)
probabilities = outputs.cpu().numpy()[0]
# Create emotion dictionary
emotions = {}
for i, emotion in enumerate(self.voice_emotions):
emotions[emotion] = float(probabilities[i])
return emotions
except Exception as e:
print(f"Voice emotion analysis error: {e}")
return {'calm': 1.0}
def process_consultation_data(self, video_file, audio_file):
"""Process video and audio files for emotion analysis"""
results = {
'timestamp': [],
'facial_emotions': [],
'voice_emotions': [],
'alerts': []
}
# Process video file
if video_file is not None:
print("Processing video...")
cap = cv2.VideoCapture(video_file)
frame_count = 0
fps = cap.get(cv2.CAP_PROP_FPS) or 30
while cap.read()[0] and frame_count < 300: # Limit for demo
ret, frame = cap.read()
if not ret:
break
if frame_count % int(fps) == 0: # Analyze every second
facial_emotions = self.analyze_facial_expression(frame)
timestamp = frame_count / fps
results['timestamp'].append(timestamp)
results['facial_emotions'].append(facial_emotions)
# Check for alerts
if (facial_emotions.get('sad', 0) > 0.4 or
facial_emotions.get('fear', 0) > 0.3 or
facial_emotions.get('angry', 0) > 0.3):
emotion_type = max(facial_emotions, key=facial_emotions.get)
results['alerts'].append(f"High {emotion_type} detected at {timestamp:.1f}s")
frame_count += 1
cap.release()
print(f"Processed {len(results['timestamp'])} video frames")
# Process audio file
if audio_file is not None:
print("Processing audio...")
try:
audio_data, sample_rate = librosa.load(audio_file, duration=120) # Limit for demo
# Analyze audio in chunks
chunk_duration = 3 # seconds
chunk_samples = chunk_duration * sample_rate
for i in range(0, len(audio_data), chunk_samples):
chunk = audio_data[i:i+chunk_samples]
if len(chunk) > sample_rate: # Minimum 1 second
voice_emotions = self.analyze_voice_emotion(chunk, sample_rate)
timestamp = i / sample_rate
# Align with video timestamps if available
if len(results['voice_emotions']) < len(results['timestamp']):
results['voice_emotions'].append(voice_emotions)
elif not results['timestamp']:
results['timestamp'].append(timestamp)
results['voice_emotions'].append(voice_emotions)
# Check for voice-based alerts
if (voice_emotions.get('angry', 0) > 0.4 or
voice_emotions.get('fearful', 0) > 0.4 or
voice_emotions.get('sad', 0) > 0.4):
emotion_type = max(voice_emotions, key=voice_emotions.get)
results['alerts'].append(f"Voice {emotion_type} detected at {timestamp:.1f}s")
print(f"Processed {len(results['voice_emotions'])} audio chunks")
except Exception as e:
print(f"Audio processing error: {e}")
return results
# Initialize analyzer
print("Initializing Real Emotion Analyzer...")
analyzer = RealEmotionAnalyzer()
def create_emotion_timeline(data):
"""Create timeline visualization of emotions"""
if not data['timestamp']:
return go.Figure()
fig = go.Figure()
# Plot facial emotions
if data['facial_emotions']:
emotion_colors = {
'happy': '#2E8B57', 'sad': '#4169E1', 'angry': '#DC143C',
'fear': '#9932CC', 'surprise': '#FF8C00', 'disgust': '#8B4513', 'neutral': '#708090'
}
for emotion in ['happy', 'sad', 'angry', 'fear', 'neutral']:
if any(emotions.get(emotion, 0) > 0.1 for emotions in data['facial_emotions']):
values = [emotions.get(emotion, 0) for emotions in data['facial_emotions']]
fig.add_trace(go.Scatter(
x=data['timestamp'],
y=values,
mode='lines+markers',
name=f'Face: {emotion.title()}',
line=dict(width=2, color=emotion_colors.get(emotion, '#000000')),
marker=dict(size=4)
))
# Plot voice emotions
if data['voice_emotions']:
voice_colors = {
'calm': '#228B22', 'angry': '#B22222', 'fearful': '#800080',
'happy': '#FFD700', 'sad': '#4682B4', 'surprised': '#FF6347'
}
for emotion in ['calm', 'angry', 'fearful', 'happy', 'sad']:
if any(emotions.get(emotion, 0) > 0.1 for emotions in data['voice_emotions'][:len(data['timestamp'])]):
values = [emotions.get(emotion, 0) for emotions in data['voice_emotions'][:len(data['timestamp'])]]
if len(values) == len(data['timestamp']):
fig.add_trace(go.Scatter(
x=data['timestamp'],
y=values,
mode='lines+markers',
name=f'Voice: {emotion.title()}',
line=dict(dash='dash', width=2, color=voice_colors.get(emotion, '#000000')),
marker=dict(size=4, symbol='diamond')
))
fig.update_layout(
title='Real-time Patient Emotion Analysis During Consultation',
xaxis_title='Time (seconds)',
yaxis_title='Emotion Confidence',
height=500,
hovermode='x unified',
legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1)
)
return fig
def create_emotion_summary(data):
"""Create summary charts of detected emotions"""
if not data['facial_emotions'] and not data['voice_emotions']:
return go.Figure(), go.Figure()
# Facial emotion summary
face_fig = go.Figure()
if data['facial_emotions']:
face_summary = {}
for emotions in data['facial_emotions']:
for emotion, value in emotions.items():
face_summary[emotion] = face_summary.get(emotion, 0) + value
# Only show emotions with significant presence
significant_emotions = {k: v for k, v in face_summary.items() if v > 0.1}
if significant_emotions:
face_fig = px.pie(
values=list(significant_emotions.values()),
names=list(significant_emotions.keys()),
title='Facial Expression Distribution'
)
face_fig.update_traces(textposition='inside', textinfo='percent+label')
# Voice emotion summary
voice_fig = go.Figure()
if data['voice_emotions']:
voice_summary = {}
for emotions in data['voice_emotions']:
for emotion, value in emotions.items():
voice_summary[emotion] = voice_summary.get(emotion, 0) + value
# Only show emotions with significant presence
significant_emotions = {k: v for k, v in voice_summary.items() if v > 0.1}
if significant_emotions:
voice_fig = px.pie(
values=list(significant_emotions.values()),
names=list(significant_emotions.keys()),
title='Voice Emotion Distribution'
)
voice_fig.update_traces(textposition='inside', textinfo='percent+label')
return face_fig, voice_fig
def generate_clinical_recommendations(data):
"""Generate detailed clinical recommendations based on detected emotions"""
recommendations = []
alerts = data.get('alerts', [])
if alerts:
recommendations.append("🚨 **CRITICAL ALERTS DETECTED:**")
recommendations.append("")
for alert in alerts[:5]:
recommendations.append(f"β€’ {alert}")
recommendations.append("")
# Analyze facial emotion patterns
facial_analysis = {}
if data.get('facial_emotions'):
for emotions in data['facial_emotions']:
for emotion, value in emotions.items():
facial_analysis[emotion] = facial_analysis.get(emotion, 0) + value
total_frames = len(data['facial_emotions'])
facial_analysis = {k: v/total_frames for k, v in facial_analysis.items()}
# Analyze voice emotion patterns
voice_analysis = {}
if data.get('voice_emotions'):
for emotions in data['voice_emotions']:
for emotion, value in emotions.items():
voice_analysis[emotion] = voice_analysis.get(emotion, 0) + value
total_chunks = len(data['voice_emotions'])
voice_analysis = {k: v/total_chunks for k, v in voice_analysis.items()}
# Generate specific recommendations
if facial_analysis.get('sad', 0) > 0.3 or voice_analysis.get('sad', 0) > 0.3:
recommendations.append("😒 **DEPRESSION/SADNESS INDICATORS:**")
recommendations.append("β€’ Patient shows signs of sadness or low mood")
recommendations.append("β€’ Consider gentle inquiry about emotional well-being")
recommendations.append("β€’ Provide emotional support and validation")
recommendations.append("β€’ Consider referral to mental health services if appropriate")
recommendations.append("")
if facial_analysis.get('fear', 0) > 0.25 or voice_analysis.get('fearful', 0) > 0.25:
recommendations.append("😰 **ANXIETY/FEAR DETECTION:**")
recommendations.append("β€’ High anxiety levels detected during consultation")
recommendations.append("β€’ Explain procedures clearly and provide reassurance")
recommendations.append("β€’ Allow extra time for questions and concerns")
recommendations.append("β€’ Consider anxiety management techniques")
recommendations.append("")
if facial_analysis.get('angry', 0) > 0.2 or voice_analysis.get('angry', 0) > 0.2:
recommendations.append("😠 **FRUSTRATION/ANGER INDICATORS:**")
recommendations.append("β€’ Patient may be experiencing frustration")
recommendations.append("β€’ Acknowledge their concerns and validate feelings")
recommendations.append("β€’ Remain calm and professional")
recommendations.append("β€’ Address any underlying issues causing frustration")
recommendations.append("")
if voice_analysis.get('calm', 0) > 0.6 and facial_analysis.get('neutral', 0) > 0.4:
recommendations.append("βœ… **POSITIVE CONSULTATION INDICATORS:**")
recommendations.append("β€’ Patient appears comfortable and engaged")
recommendations.append("β€’ Good emotional rapport established")
recommendations.append("β€’ Continue with current communication approach")
recommendations.append("")
# Overall assessment
recommendations.append("πŸ“Š **OVERALL EMOTIONAL ASSESSMENT:**")
if facial_analysis:
dominant_facial = max(facial_analysis, key=facial_analysis.get)
recommendations.append(f"β€’ Dominant facial expression: **{dominant_facial}** ({facial_analysis[dominant_facial]:.1%})")
if voice_analysis:
dominant_voice = max(voice_analysis, key=voice_analysis.get)
recommendations.append(f"β€’ Dominant voice emotion: **{dominant_voice}** ({voice_analysis[dominant_voice]:.1%})")
recommendations.append("")
recommendations.append("πŸ’‘ **GENERAL RECOMMENDATIONS:**")
recommendations.append("β€’ Monitor patient comfort throughout consultation")
recommendations.append("β€’ Adapt communication style based on emotional state")
recommendations.append("β€’ Document significant emotional observations")
recommendations.append("β€’ Follow up on any concerning emotional indicators")
if not recommendations:
recommendations.append("βœ… **No significant emotional concerns detected.**")
recommendations.append("Continue with standard consultation approach.")
return "\n".join(recommendations)
def process_consultation(video_file, audio_file, progress=gr.Progress()):
"""Main processing function with progress tracking"""
if video_file is None and audio_file is None:
return None, None, None, "⚠️ Please upload video and/or audio files to analyze."
progress(0.1, desc="Initializing analysis...")
# Process the consultation data
progress(0.3, desc="Processing multimedia data...")
data = analyzer.process_consultation_data(video_file, audio_file)
if not data['timestamp']:
return None, None, None, "❌ No valid data could be extracted from the uploaded files."
progress(0.6, desc="Creating visualizations...")
# Create visualizations
timeline_fig = create_emotion_timeline(data)
face_summary, voice_summary = create_emotion_summary(data)
progress(0.9, desc="Generating recommendations...")
# Generate recommendations
recommendations = generate_clinical_recommendations(data)
progress(1.0, desc="Analysis complete!")
return timeline_fig, face_summary, voice_summary, recommendations
def real_time_analysis(audio):
"""Enhanced real-time audio emotion analysis"""
if audio is None:
return "🎀 No audio detected - please speak into the microphone"
try:
# Process audio data
sample_rate, audio_data = audio
# Convert to float and normalize
if audio_data.dtype == np.int16:
audio_data = audio_data.astype(np.float32) / 32768.0
elif audio_data.dtype == np.int32:
audio_data = audio_data.astype(np.float32) / 2147483648.0
# Analyze emotions using real model
emotions = analyzer.analyze_voice_emotion(audio_data, sample_rate)
# Format results with better visualization
result = "🎡 **Real-time Voice Emotion Analysis:**\n\n"
# Sort emotions by confidence
sorted_emotions = sorted(emotions.items(), key=lambda x: x[1], reverse=True)
for emotion, confidence in sorted_emotions:
percentage = confidence * 100
bar_length = int(percentage / 5) # Scale bar to percentage
bar = "β–ˆ" * bar_length + "β–‘" * (20 - bar_length)
result += f"**{emotion.title()}**: {percentage:.1f}% `{bar}`\n"
# Add clinical alerts
result += "\n"
if emotions.get('angry', 0) > 0.4:
result += "🚨 **ALERT**: High anger/frustration detected\n"
elif emotions.get('fearful', 0) > 0.4:
result += "⚠️ **ALERT**: High anxiety/fear detected\n"
elif emotions.get('sad', 0) > 0.4:
result += "😒 **ALERT**: Sadness indicators detected\n"
elif emotions.get('calm', 0) > 0.6:
result += "βœ… **STATUS**: Patient appears calm and comfortable\n"
return result
except Exception as e:
return f"❌ Error processing audio: {str(e)}\n\nPlease ensure your microphone is working and try again."
# Create enhanced Gradio interface
with gr.Blocks(title="Advanced Patient Emotion Analysis System", theme=gr.themes.Soft()) as demo:
gr.Markdown("""
# πŸ₯ Advanced Patient Emotion Analysis System
### Real AI-Powered Facial & Voice Emotion Recognition
This system uses **real deep learning models** to analyze patient emotions during medical consultations:
- **Facial Expression Analysis**: 7-emotion CNN model (angry, disgust, fear, happy, neutral, sad, surprise)
- **Voice Emotion Recognition**: LSTM-based model analyzing audio features
- **Real-time Monitoring**: Live emotion detection during consultations
- **Clinical Recommendations**: AI-generated insights for healthcare practitioners
πŸ”¬ **Technology Stack**: PyTorch, dlib, librosa, computer vision, deep learning
""")
with gr.Tabs():
# Main Analysis Tab
with gr.Tab("🎬 Consultation Analysis", elem_id="main-tab"):
gr.Markdown("### Upload consultation recordings for comprehensive AI-powered emotion analysis")
with gr.Row():
with gr.Column(scale=1):
video_input = gr.File(
label="πŸ“Ή Upload Video Recording",
file_types=[".mp4", ".avi", ".mov", ".mkv", ".webm"],
type="filepath"
)
audio_input = gr.File(
label="🎡 Upload Audio Recording",
file_types=[".wav", ".mp3", ".m4a", ".flac", ".ogg"],
type="filepath"
)
analyze_btn = gr.Button(
"πŸ” Analyze with AI Models",
variant="primary",
size="lg",
scale=1
)
with gr.Column(scale=2):
recommendations_output = gr.Markdown(
label="🩺 Clinical Recommendations",
value="Upload files and click 'Analyze' to get AI-powered clinical insights..."
)
with gr.Row():
timeline_plot = gr.Plot(label="πŸ“ˆ Emotion Timeline Analysis", height=500)
with gr.Row():
with gr.Column():
face_summary_plot = gr.Plot(label="😊 Facial Expression Summary")
with gr.Column():
voice_summary_plot = gr.Plot(label="🎀 Voice Emotion Summary")
analyze_btn.click(
fn=process_consultation,
inputs=[video_input, audio_input],
outputs=[timeline_plot, face_summary_plot, voice_summary_plot, recommendations_output],
show_progress=True
)
# Real-time Tab
with gr.Tab("πŸŽ™οΈ Real-time Monitoring"):
gr.Markdown("""
### Live voice emotion analysis during consultation
*Click the microphone button and speak to see real-time emotion detection*
""")
with gr.Row():
with gr.Column(scale=1):
audio_realtime = gr.Audio(
sources=["microphone"],
type="numpy",
label="🎀 Live Audio Input",
streaming=False
)
with gr.Column(scale=2):
realtime_output = gr.Markdown(
label="πŸ“Š Real-time Analysis Results",
value="🎀 **Ready for real-time analysis**\n\nClick the microphone and speak to see live emotion detection using our AI models."
)
audio_realtime.change(
fn=real_time_analysis,
inputs=[audio_realtime],
outputs=[realtime_output]
)
# Technical Details Tab
with gr.Tab("πŸ”¬ Model & Technical Information"):
gr.Markdown(f"""
### AI Models & Architecture
**Current System Status:**
- πŸ–₯️ **Processing Device**: {analyzer.device}
- 🧠 **