|
import gradio as gr |
|
import librosa |
|
import numpy as np |
|
import torch |
|
from transformers import WhisperProcessor, WhisperForConditionalGeneration |
|
from simple_salesforce import Salesforce |
|
import os |
|
from datetime import datetime |
|
import logging |
|
import webrtcvad |
|
import google.generativeai as genai |
|
from gtts import gTTS |
|
import tempfile |
|
import base64 |
|
import re |
|
from cryptography.fernet import Fernet |
|
import pytz |
|
from reportlab.lib.pagesizes import A4 |
|
from reportlab.lib import colors |
|
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, ListFlowable, ListItem |
|
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle |
|
from reportlab.lib.units import inch |
|
import asyncio |
|
import hashlib |
|
from functools import lru_cache |
|
|
|
|
|
logging.basicConfig(level=logging.DEBUG, format="%(asctime)s - %(levelname)s - %(message)s") |
|
logger = logging.getLogger(__name__) |
|
usage_metrics = {"total_assessments": 0, "assessments_by_language": {}} |
|
|
|
|
|
SF_USERNAME = os.getenv("SF_USERNAME", "smartvoicebot@voice.com") |
|
SF_PASSWORD = os.getenv("SF_PASSWORD", "voicebot1") |
|
SF_SECURITY_TOKEN = os.getenv("SF_SECURITY_TOKEN", "jq4VVHUFti6TmzJDjjegv2h6b") |
|
SF_INSTANCE_URL = os.getenv("SF_INSTANCE_URL", "https://swe42.sfdc-cehfhs.salesforce.com") |
|
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY", "AIzaSyBzr5vVpbe8CV1v70l3pGDp9vRJ76yCxdk") |
|
ENCRYPTION_KEY = os.getenv("ENCRYPTION_KEY", Fernet.generate_key().decode()) |
|
DEFAULT_EMAIL = os.getenv("SALESFORCE_USER_EMAIL", "default@mindcare.com") |
|
|
|
|
|
cipher = Fernet(ENCRYPTION_KEY) |
|
|
|
|
|
try: |
|
sf = Salesforce( |
|
username=SF_USERNAME, |
|
password=SF_PASSWORD, |
|
security_token=SF_SECURITY_TOKEN, |
|
instance_url=SF_INSTANCE_URL |
|
) |
|
logger.info(f"Connected to Salesforce at {SF_INSTANCE_URL}") |
|
except Exception as e: |
|
logger.error(f"Salesforce connection failed: {str(e)}") |
|
sf = None |
|
|
|
|
|
try: |
|
genai.configure(api_key=GEMINI_API_KEY) |
|
gemini_model = genai.GenerativeModel('gemini-1.5-flash') |
|
chat = gemini_model.start_chat(history=[]) |
|
logger.info("Connected to Google Gemini") |
|
except Exception as e: |
|
logger.error(f"Google Gemini initialization failed: {str(e)}") |
|
chat = None |
|
|
|
|
|
SUPPORTED_LANGUAGES = {"English": "english", "Hindi": "hindi", "Spanish": "spanish", "Mandarin": "mandarin"} |
|
SALESFORCE_LANGUAGE_MAP = {"English": "English", "Hindi": "Hindi", "Spanish": "Spanish", "Mandarin": "Mandarin"} |
|
LANGUAGE_CODES = {"English": "en", "Hindi": "hi", "Spanish": "es", "Mandarin": "zh"} |
|
whisper_processor = WhisperProcessor.from_pretrained("openai/whisper-small") |
|
whisper_model = WhisperForConditionalGeneration.from_pretrained("openai/whisper-small") |
|
vad = webrtcvad.Vad(mode=2) |
|
|
|
|
|
base_info = """ |
|
MindCare is an AI health assistant focused on: |
|
- **Mental health**: Emotional support, mindfulness, stress-relief, anxiety management. |
|
- **Medical guidance**: Symptom analysis, possible conditions, medicine recommendations. |
|
- **Decision-making**: Personal, professional, emotional choices. |
|
- **General health**: Lifestyle, nutrition, physical and mental wellness. |
|
- **Emergency assistance**: Suggest professional help or helplines for distress. |
|
Tone: Empathetic, supportive, informative. |
|
""" |
|
mental_health = """ |
|
For stress/anxiety: |
|
- Suggest mindfulness, deep breathing, gratitude journaling. |
|
- Encourage breaks, hobbies, nature. |
|
- Provide affirmations, self-care routines. |
|
For distress: |
|
- Offer emotional support, assure they’re not alone. |
|
- Suggest trusted contacts or professionals. |
|
- Provide crisis helplines. |
|
""" |
|
medical_assistance = """ |
|
For symptoms: |
|
- Analyze and suggest possible conditions. |
|
- Offer general advice, not replacing doctor consultation. |
|
- Suggest lifestyle changes, home remedies. |
|
- Advise medical attention for severe symptoms. |
|
""" |
|
medicine_recommendation = """ |
|
For medicine queries: |
|
- Suggest common antibiotics (e.g., Amoxicillin), painkillers (e.g., Paracetamol, Ibuprofen). |
|
- Note precautions, side effects. |
|
- Stress doctor consultation before use. |
|
""" |
|
decision_guidance = """ |
|
For decisions: |
|
- Weigh pros/cons logically. |
|
- Consider values, goals, emotions. |
|
- Suggest decision matrices or intuitive checks. |
|
- Encourage trusted advice if needed. |
|
""" |
|
emergency_help = """ |
|
For severe distress: |
|
- Provide immediate emotional support. |
|
- Offer crisis helplines (region-specific). |
|
- Encourage talking to trusted contacts or professionals. |
|
- Assure help is available. |
|
""" |
|
context = [base_info, mental_health, medical_assistance, medicine_recommendation, decision_guidance, emergency_help] |
|
|
|
def encrypt_data(data): |
|
try: |
|
return cipher.encrypt(data.encode('utf-8')).decode('utf-8') |
|
except Exception as e: |
|
logger.error(f"Encryption failed: {str(e)}") |
|
return data |
|
|
|
def decrypt_data(encrypted_data): |
|
try: |
|
return cipher.decrypt(encrypted_data.encode('utf-8')).decode('utf-8') |
|
except Exception as e: |
|
logger.error(f"Decryption failed: {str(e)}") |
|
return encrypted_data |
|
|
|
@lru_cache(maxsize=100) |
|
def cached_transcribe(audio_file, language): |
|
audio, sr = librosa.load(audio_file, sr=16000) |
|
language_code = LANGUAGE_CODES.get(language, "en") |
|
return transcribe_audio(audio, language_code) |
|
|
|
def extract_health_features(audio, sr): |
|
try: |
|
audio = librosa.util.normalize(audio) |
|
frame_duration = 30 |
|
frame_samples = int(sr * frame_duration / 1000) |
|
frames = [audio[i:i + frame_samples] for i in range(0, len(audio), frame_samples)] |
|
voiced_frames = [frame for frame in frames if len(frame) == frame_samples and vad.is_speech((frame * 32768).astype(np.int16).tobytes(), sr)] |
|
if not voiced_frames: |
|
raise ValueError("No voiced segments detected") |
|
voiced_audio = np.concatenate(voiced_frames) |
|
|
|
frame_step = max(1, len(voiced_audio) // (sr // 8)) |
|
pitches, magnitudes = librosa.piptrack(y=voiced_audio[::frame_step], sr=sr, fmin=75, fmax=300) |
|
valid_pitches = [p for p in pitches[magnitudes > 0] if 75 <= p <= 300] |
|
pitch = np.mean(valid_pitches) if valid_pitches else 0 |
|
jitter = np.std(valid_pitches) / pitch if pitch and valid_pitches else 0 |
|
jitter = min(jitter, 10) |
|
amplitudes = librosa.feature.rms(y=voiced_audio, frame_length=512, hop_length=128)[0] |
|
shimmer = np.std(amplitudes) / np.mean(amplitudes) if np.mean(amplitudes) else 0 |
|
shimmer = min(shimmer, 10) |
|
energy = np.mean(amplitudes) |
|
|
|
mfcc = np.mean(librosa.feature.mfcc(y=voiced_audio[::4], sr=sr, n_mfcc=4), axis=1) |
|
spectral_centroid = np.mean(librosa.feature.spectral_centroid(y=voiced_audio[::4], sr=sr, n_fft=512, hop_length=128)) |
|
|
|
logger.debug(f"Extracted features: pitch={pitch:.2f}, jitter={jitter*100:.2f}%, shimmer={shimmer*100:.2f}%, energy={energy:.4f}, mfcc_mean={np.mean(mfcc):.2f}, spectral_centroid={spectral_centroid:.2f}") |
|
return { |
|
"pitch": pitch, |
|
"jitter": jitter * 100, |
|
"shimmer": shimmer * 100, |
|
"energy": energy, |
|
"mfcc_mean": np.mean(mfcc), |
|
"spectral_centroid": spectral_centroid |
|
} |
|
except Exception as e: |
|
logger.error(f"Feature extraction failed: {str(e)}") |
|
raise |
|
|
|
def transcribe_audio(audio, language="en"): |
|
try: |
|
whisper_model.config.forced_decoder_ids = whisper_processor.get_decoder_prompt_ids( |
|
language=SUPPORTED_LANGUAGES.get({"en": "English", "hi": "Hindi", "es": "Spanish", "zh": "Mandarin"}.get(language, "English"), "english"), task="transcribe" |
|
) |
|
inputs = whisper_processor(audio, sampling_rate=16000, return_tensors="pt") |
|
with torch.no_grad(): |
|
generated_ids = whisper_model.generate(inputs["input_features"], max_new_tokens=30) |
|
transcription = whisper_processor.batch_decode(generated_ids, skip_special_tokens=True)[0] |
|
logger.info(f"Transcription (language: {language}): {transcription}") |
|
return transcription |
|
except Exception as e: |
|
logger.error(f"Transcription failed: {str(e)}") |
|
return None |
|
|
|
async def get_chatbot_response(message, language="English"): |
|
if not chat or not message: |
|
return "Unable to generate response.", None |
|
language_code = LANGUAGE_CODES.get(language, "en") |
|
full_context = "\n".join(context) + f"\nUser: {message}\nMindCare: Provide response in 6-8 simple bullet points, tailored to the user's input, in a clear and empathetic tone, in {language}." |
|
try: |
|
response = await asyncio.get_event_loop().run_in_executor(None, lambda: chat.send_message(full_context).text) |
|
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as temp_audio: |
|
tts = gTTS(text=response, lang=language_code, slow=False) |
|
tts.save(temp_audio.name) |
|
audio_path = temp_audio.name |
|
logger.info(f"Generated response: {response[:100]}... and audio at {audio_path}") |
|
return response, audio_path |
|
except Exception as e: |
|
logger.error(f"Chatbot response failed: {str(e)}") |
|
return "Error generating response. Please check your input or API key.", None |
|
|
|
def analyze_symptoms(text, features): |
|
feedback = [] |
|
suggestions = [] |
|
text = text.lower() if text else "" |
|
|
|
|
|
if "cough" in text or "coughing" in text: |
|
feedback.append("You mentioned a cough, which may suggest a cold or respiratory issue.") |
|
suggestions.extend([ |
|
"• Drink warm fluids like herbal tea or water to soothe your throat.", |
|
"• Rest to help your body recover from possible infection.", |
|
"• Use a humidifier to ease throat irritation.", |
|
"• Consider over-the-counter cough remedies, but consult a doctor first.", |
|
"• Monitor symptoms; see a doctor if the cough lasts over a week." |
|
]) |
|
elif "fever" in text or "temperature" in text: |
|
feedback.append("You mentioned a fever, which could indicate an infection.") |
|
suggestions.extend([ |
|
"• Stay hydrated with water or electrolyte drinks.", |
|
"• Rest to support your immune system.", |
|
"• Monitor your temperature regularly.", |
|
"• Use paracetamol to reduce fever, but follow dosage instructions.", |
|
"• Seek medical advice if fever exceeds 100.4°F (38°C) for over 2 days." |
|
]) |
|
elif "headache" in text: |
|
feedback.append("You mentioned a headache, possibly due to stress or dehydration.") |
|
suggestions.extend([ |
|
"• Drink plenty of water to stay hydrated.", |
|
"• Take short breaks to relax your mind.", |
|
"• Try a mild pain reliever like ibuprofen, but consult a doctor.", |
|
"• Practice deep breathing to reduce tension.", |
|
"• Ensure you're getting enough sleep (7-8 hours)." |
|
]) |
|
elif "stress" in text or "anxious" in text or "mental stress" in text: |
|
feedback.append("You mentioned stress or anxiety, which can affect well-being.") |
|
suggestions.extend([ |
|
"• Try 5 minutes of deep breathing to calm your mind.", |
|
"• Write in a journal to process your thoughts.", |
|
"• Take a short walk in nature to relax.", |
|
"• Practice mindfulness or meditation daily.", |
|
"• Talk to a trusted friend or professional for support.", |
|
"• Prioritize sleep and avoid excessive caffeine." |
|
]) |
|
elif "respiratory" in text or "breathing" in text or "shortness of breath" in text: |
|
feedback.append("You mentioned breathing issues, which may indicate asthma or infection.") |
|
suggestions.extend([ |
|
"• Avoid triggers like smoke or allergens.", |
|
"• Practice slow, deep breathing exercises.", |
|
"• Stay in a well-ventilated area.", |
|
"• Monitor symptoms and seek medical help if severe.", |
|
"• Rest to reduce strain on your respiratory system." |
|
]) |
|
elif "cold" in text: |
|
feedback.append("You mentioned a cold, likely a viral infection.") |
|
suggestions.extend([ |
|
"• Drink warm fluids like soup or tea.", |
|
"• Rest to help your body fight the virus.", |
|
"• Use saline nasal spray to relieve congestion.", |
|
"• Take over-the-counter cold remedies, but consult a doctor.", |
|
"• Stay hydrated and avoid strenuous activity." |
|
]) |
|
|
|
|
|
if features["jitter"] > 6.5: |
|
feedback.append(f"High jitter ({features['jitter']:.2f}%) suggests vocal strain or respiratory issues.") |
|
suggestions.append("• Rest your voice and avoid shouting.") |
|
elif features["jitter"] > 4.0: |
|
feedback.append(f"Moderate jitter ({features['jitter']:.2f}%) indicates possible vocal instability.") |
|
suggestions.append("• Sip warm water to soothe your vocal cords.") |
|
|
|
if features["shimmer"] > 7.5: |
|
feedback.append(f"High shimmer ({features['shimmer']:.2f}%) may indicate emotional stress.") |
|
suggestions.append("• Try relaxation techniques like yoga or meditation.") |
|
elif features["shimmer"] > 5.0: |
|
feedback.append(f"Moderate shimmer ({features['shimmer']:.2f}%) suggests mild vocal strain.") |
|
suggestions.append("• Stay hydrated to support vocal health.") |
|
|
|
if features["energy"] < 0.003: |
|
feedback.append(f"Low vocal energy ({features['energy']:.4f}) may indicate fatigue.") |
|
suggestions.append("• Ensure 7-8 hours of sleep nightly.") |
|
elif features["energy"] < 0.007: |
|
feedback.append(f"Low vocal energy ({features['energy']:.4f}) suggests possible tiredness.") |
|
suggestions.append("• Take short naps to boost energy.") |
|
|
|
if features["pitch"] < 70 or features["pitch"] > 290: |
|
feedback.append(f"Unusual pitch ({features['pitch']:.2f} Hz) may indicate vocal issues.") |
|
suggestions.append("• Consult a doctor for a vocal health check.") |
|
elif 70 <= features["pitch"] <= 90 or 270 <= features["pitch"] <= 290: |
|
feedback.append(f"Pitch ({features['pitch']:.2f} Hz) is slightly outside typical range.") |
|
suggestions.append("• Avoid straining your voice during conversations.") |
|
|
|
if features["spectral_centroid"] > 2700: |
|
feedback.append(f"High spectral centroid ({features['spectral_centroid']:.2f} Hz) suggests tense speech.") |
|
suggestions.append("• Practice slow, calm speaking to reduce tension.") |
|
elif features["spectral_centroid"] > 2200: |
|
feedback.append(f"Elevated spectral centroid ({features['spectral_centroid']:.2f} Hz) may indicate mild tension.") |
|
suggestions.append("• Relax your jaw and shoulders while speaking.") |
|
|
|
if not feedback: |
|
feedback.append("No significant health concerns detected from voice or text analysis.") |
|
suggestions.extend([ |
|
"• Maintain a balanced diet with fruits and vegetables.", |
|
"• Exercise regularly for overall health.", |
|
"• Stay hydrated with 8 glasses of water daily.", |
|
"• Get 7-8 hours of sleep each night.", |
|
"• Practice stress-relief techniques like meditation.", |
|
"• Schedule regular health check-ups." |
|
]) |
|
|
|
|
|
suggestions = list(dict.fromkeys(suggestions))[:8] |
|
if len(suggestions) < 6: |
|
suggestions.extend([ |
|
"• Stay active with light exercise like walking.", |
|
"• Practice gratitude to boost mental well-being." |
|
][:6 - len(suggestions)]) |
|
|
|
logger.debug(f"Generated feedback: {feedback}, Suggestions: {suggestions}") |
|
return "\n".join(feedback), "\n".join(suggestions) |
|
|
|
def store_user_consent(email, language): |
|
if not sf: |
|
logger.warning("Salesforce not connected; skipping consent storage") |
|
return None |
|
try: |
|
email_to_use = email.strip() if email and email.strip() else DEFAULT_EMAIL |
|
sanitized_email = email_to_use.replace("'", "\\'").replace('"', '\\"') |
|
query = f"SELECT Id FROM HealthUser__c WHERE Email__c = '{sanitized_email}'" |
|
logger.debug(f"Executing SOQL query: {query}") |
|
user = sf.query(query) |
|
user_id = None |
|
if user["totalSize"] == 0: |
|
logger.info(f"No user found for email: {sanitized_email}, creating new user") |
|
user = sf.HealthUser__c.create({ |
|
"Email__c": sanitized_email, |
|
"Language__c": SALESFORCE_LANGUAGE_MAP.get(language, "English"), |
|
"ConsentGiven__c": True |
|
}) |
|
user_id = user["id"] |
|
logger.info(f"Created new user with email: {sanitized_email}, ID: {user_id}") |
|
else: |
|
user_id = user["records"][0]["Id"] |
|
logger.info(f"Found existing user with email: {sanitized_email}, ID: {user_id}") |
|
sf.HealthUser__c.update(user_id, { |
|
"Language__c": SALESFORCE_LANGUAGE_MAP.get(language, "English"), |
|
"ConsentGiven__c": True |
|
}) |
|
logger.info(f"Updated user with email: {sanitized_email}") |
|
sf.ConsentLog__c.create({ |
|
"HealthUser__c": user_id, |
|
"ConsentType__c": "Voice Analysis", |
|
"ConsentDate__c": datetime.utcnow().isoformat() |
|
}) |
|
logger.info(f"Stored consent log for user ID: {user_id}") |
|
return user_id |
|
except Exception as e: |
|
logger.error(f"Consent storage failed: {str(e)}") |
|
logger.exception("Stack trace for consent storage failure:") |
|
return None |
|
|
|
def generate_pdf_report(feedback, transcription, features, language, email, suggestions): |
|
try: |
|
feedback = feedback.replace('<', '<').replace('>', '>').replace('&', '&') |
|
transcription = transcription.replace('<', '<').replace('>', '>').replace('&', '&') if transcription else "None" |
|
suggestions = suggestions.replace('<', '<').replace('>', '>').replace('&', '&') if suggestions else "None" |
|
email_to_use = email.strip() if email and email.strip() else DEFAULT_EMAIL |
|
email = email_to_use.replace('<', '<').replace('>', '>').replace('&', '&') |
|
language_display = SALESFORCE_LANGUAGE_MAP.get(language, "English") |
|
|
|
ist = pytz.timezone('Asia/Kolkata') |
|
ist_time = datetime.now(ist).strftime("%I:%M %p IST on %B %d, %Y") |
|
logger.debug(f"Generating PDF with IST time: {ist_time}, feedback: {feedback[:100]}..., transcription: {transcription[:100]}..., suggestions: {suggestions[:100]}..., language: {language_display}, email: {email}") |
|
|
|
debug_dir = "/tmp/mindcare_logs" |
|
os.makedirs(debug_dir, exist_ok=True) |
|
timestamp = datetime.now(ist).strftime("%Y%m%d_%H%M%S") |
|
pdf_path = os.path.join(debug_dir, f"report_{timestamp}.pdf") |
|
|
|
doc = SimpleDocTemplate(pdf_path, pagesize=A4, rightMargin=inch, leftMargin=inch, topMargin=inch, bottomMargin=inch) |
|
styles = getSampleStyleSheet() |
|
title_style = ParagraphStyle( |
|
name='Title', |
|
fontSize=16, |
|
leading=20, |
|
alignment=1, |
|
spaceAfter=12, |
|
fontName='Times-Bold' |
|
) |
|
heading_style = ParagraphStyle( |
|
name='Heading1', |
|
fontSize=14, |
|
leading=16, |
|
spaceBefore=12, |
|
spaceAfter=8, |
|
fontName='Times-Bold' |
|
) |
|
subheading_style = ParagraphStyle( |
|
name='Heading2', |
|
fontSize=12, |
|
leading=14, |
|
spaceBefore=10, |
|
spaceAfter=6, |
|
fontName='Times-Bold' |
|
) |
|
normal_style = ParagraphStyle( |
|
name='Normal', |
|
fontSize=12, |
|
leading=14, |
|
spaceAfter=6, |
|
fontName='Times-Roman' |
|
) |
|
bullet_style = ParagraphStyle( |
|
name='Bullet', |
|
fontSize=12, |
|
leading=14, |
|
leftIndent=20, |
|
firstLineIndent=-10, |
|
spaceAfter=4, |
|
fontName='Times-Roman' |
|
) |
|
|
|
story = [] |
|
story.append(Paragraph("MindCare Health Assistant Report", title_style)) |
|
story.append(Paragraph(f"Generated on {ist_time}", normal_style)) |
|
story.append(Spacer(1, 0.5 * inch)) |
|
|
|
story.append(Paragraph("User Information", heading_style)) |
|
user_info = [ |
|
ListItem(Paragraph(f"<b>Email</b>: {email}", bullet_style), bulletText="•"), |
|
ListItem(Paragraph(f"<b>Language</b>: {language_display}", bullet_style), bulletText="•") |
|
] |
|
story.append(ListFlowable(user_info, bulletType='bullet')) |
|
story.append(Spacer(1, 0.25 * inch)) |
|
|
|
story.append(Paragraph("Voice Analysis Results", heading_style)) |
|
story.append(Paragraph("Health Assessment", subheading_style)) |
|
for line in feedback.split('\n'): |
|
if line.strip(): |
|
story.append(Paragraph(line, normal_style)) |
|
story.append(Spacer(1, 0.1 * inch)) |
|
|
|
story.append(Paragraph("Health Suggestions", subheading_style)) |
|
for line in suggestions.split('\n'): |
|
if line.strip(): |
|
story.append(Paragraph(line, normal_style)) |
|
story.append(Spacer(1, 0.1 * inch)) |
|
|
|
story.append(Paragraph("Voice Analysis Details", subheading_style)) |
|
details = [ |
|
ListItem(Paragraph(f"Pitch: {features['pitch']:.2f} Hz", bullet_style), bulletText="•"), |
|
ListItem(Paragraph(f"Jitter: {features['jitter']:.2f}% (voice stability)", bullet_style), bulletText="•"), |
|
ListItem(Paragraph(f"Shimmer: {features['shimmer']:.2f}% (amplitude variation)", bullet_style), bulletText="•"), |
|
ListItem(Paragraph(f"Energy: {features['energy']:.4f} (vocal intensity)", bullet_style), bulletText="•"), |
|
ListItem(Paragraph(f"MFCC Mean: {features['mfcc_mean']:.2f} (timbre quality)", bullet_style), bulletText="•"), |
|
ListItem(Paragraph(f"Spectral Centroid: {features['spectral_centroid']:.2f} Hz (voice brightness)", bullet_style), bulletText="•"), |
|
ListItem(Paragraph(f"Transcription: {transcription}", bullet_style), bulletText="•") |
|
] |
|
story.append(ListFlowable(details, bulletType='bullet')) |
|
story.append(Spacer(1, 0.1 * inch)) |
|
|
|
story.append(Paragraph("Transcription", subheading_style)) |
|
story.append(Paragraph(transcription, normal_style)) |
|
story.append(Spacer(1, 0.1 * inch)) |
|
|
|
story.append(Paragraph("Voice Metrics", subheading_style)) |
|
metrics = [ |
|
ListItem(Paragraph(f"<b>Pitch</b>: {features['pitch']:.2f} Hz", bullet_style), bulletText="•"), |
|
ListItem(Paragraph(f"<b>Jitter</b>: {features['jitter']:.2f}%", bullet_style), bulletText="•"), |
|
ListItem(Paragraph(f"<b>Shimmer</b>: {features['shimmer']:.2f}%", bullet_style), bulletText="•"), |
|
ListItem(Paragraph(f"<b>Energy</b>: {features['energy']:.4f}", bullet_style), bulletText="•"), |
|
ListItem(Paragraph(f"<b>MFCC Mean</b>: {features['mfcc_mean']:.2f}", bullet_style), bulletText="•"), |
|
ListItem(Paragraph(f"<b>Spectral Centroid</b>: {features['spectral_centroid']:.2f} Hz", bullet_style), bulletText="•") |
|
] |
|
story.append(ListFlowable(metrics, bulletType='bullet')) |
|
story.append(Spacer(1, 0.1 * inch)) |
|
|
|
story.append(Paragraph("Disclaimer", heading_style)) |
|
story.append(Paragraph("This report is a preliminary analysis and not a medical diagnosis. Always consult a healthcare provider.", normal_style)) |
|
|
|
doc.build(story) |
|
logger.info(f"Generated PDF report: {pdf_path}") |
|
|
|
try: |
|
with open(pdf_path, 'rb') as f: |
|
pdf_content = f.read() |
|
if len(pdf_content) > 0 and pdf_content.startswith(b'%PDF'): |
|
return pdf_path, None |
|
else: |
|
logger.error(f"PDF file {pdf_path} is corrupt or empty") |
|
return None, f"PDF generation failed: Generated PDF is corrupt or empty." |
|
except Exception as e: |
|
logger.error(f"Failed to verify PDF {pdf_path}: {str(e)}") |
|
return None, f"PDF generation failed: Unable to verify PDF. Error: {str(e)}." |
|
except Exception as e: |
|
logger.error(f"PDF generation failed: {str(e)}") |
|
logger.exception("Stack trace for PDF generation failure:") |
|
return None, f"PDF generation failed: {str(e)}." |
|
|
|
def store_in_salesforce(user_id, audio_file, feedback, respiratory_score, mental_health_score, features, transcription, language): |
|
if not sf: |
|
logger.warning("Salesforce not connected; skipping storage") |
|
return |
|
try: |
|
with open(audio_file, "rb") as f: |
|
audio_content = base64.b64encode(f.read()).decode() |
|
content_version = sf.ContentVersion.create({ |
|
"Title": f"Voice_Assessment_{datetime.utcnow().isoformat()}", |
|
"PathOnClient": os.path.basename(audio_file), |
|
"VersionData": audio_content, |
|
"IsMajorVersion": True |
|
}) |
|
content_document_id = sf.query(f"SELECT ContentDocumentId FROM ContentVersion WHERE Id = '{content_version['id']}'")["records"][0]["ContentDocumentId"] |
|
file_url = f"{SF_INSTANCE_URL}/lightning/r/ContentDocument/{content_document_id}/view" |
|
|
|
feedback_str = feedback[:32767] |
|
assessment = sf.VoiceAssessment__c.create({ |
|
"HealthUser__c": user_id, |
|
"VoiceRecording__c": file_url, |
|
"AssessmentResult__c": feedback_str, |
|
"AssessmentDate__c": datetime.utcnow().isoformat(), |
|
"ConfidenceScore__c": 95.0, |
|
"RespiratoryScore__c": float(respiratory_score), |
|
"MentalHealthScore__c": float(mental_health_score), |
|
"Pitch__c": float(features["pitch"]), |
|
"Jitter__c": float(features["jitter"]), |
|
"Shimmer__c": float(features["shimmer"]), |
|
"Energy__c": float(features["energy"]), |
|
"Transcription__c": transcription or "None", |
|
"Language__c": SALESFORCE_LANGUAGE_MAP.get(language, "English") |
|
}) |
|
sf.ContentDocumentLink.create({ |
|
"ContentDocumentId": content_document_id, |
|
"LinkedEntityId": assessment["id"], |
|
"ShareType": "V" |
|
}) |
|
logger.info(f"Stored assessment in Salesforce: {assessment['id']}") |
|
except Exception as e: |
|
logger.error(f"Salesforce storage failed: {str(e)}") |
|
logger.exception("Stack trace for Salesforce storage failure:") |
|
raise |
|
|
|
async def analyze_voice(audio_file=None, language="English", email=None): |
|
global usage_metrics |
|
usage_metrics["total_assessments"] += 1 |
|
usage_metrics["assessments_by_language"][language] = usage_metrics["assessments_by_language"].get(language, 0) + 1 |
|
|
|
try: |
|
if not audio_file or not os.path.exists(audio_file): |
|
raise ValueError("No valid audio file provided") |
|
|
|
audio, sr = librosa.load(audio_file, sr=16000) |
|
max_duration = 5 |
|
if len(audio) > max_duration * sr: |
|
audio = audio[:max_duration * sr] |
|
logger.info(f"Truncated audio to first {max_duration} seconds for faster processing") |
|
if len(audio) < sr: |
|
raise ValueError("Audio too short (minimum 1 second)") |
|
|
|
language_code = LANGUAGE_CODES.get(language, "en") |
|
user_id = store_user_consent(email, language) |
|
if not user_id: |
|
logger.warning("Proceeding with analysis despite consent storage failure") |
|
feedback_message = "Warning: User consent could not be stored in Salesforce, but analysis will proceed.\n" |
|
else: |
|
feedback_message = "" |
|
|
|
features = extract_health_features(audio, sr) |
|
transcription = cached_transcribe(audio_file, language) |
|
feedback, suggestions = analyze_symptoms(transcription, features) |
|
|
|
respiratory_score = features["jitter"] |
|
mental_health_score = features["shimmer"] |
|
|
|
feedback = feedback_message + feedback + "\n\n**Voice Analysis Details**:\n" |
|
feedback += f"- Pitch: {features['pitch']:.2f} Hz\n" |
|
feedback += f"- Jitter: {features['jitter']:.2f}% (voice stability)\n" |
|
feedback += f"- Shimmer: {features['shimmer']:.2f}% (amplitude variation)\n" |
|
feedback += f"- Energy: {features['energy']:.4f} (vocal intensity)\n" |
|
feedback += f"- MFCC Mean: {features['mfcc_mean']:.2f} (timbre quality)\n" |
|
feedback += f"- Spectral Centroid: {features['spectral_centroid']:.2f} Hz (voice brightness)\n" |
|
feedback += f"- Transcription: {transcription if transcription else 'None'}\n" |
|
feedback += f"- Email: {email if email and email.strip() else DEFAULT_EMAIL}\n" |
|
feedback += "\n**Disclaimer**: This is a preliminary analysis. Consult a healthcare provider for professional evaluation." |
|
|
|
if user_id and sf: |
|
store_in_salesforce(user_id, audio_file, feedback, respiratory_score, mental_health_score, features, transcription, language) |
|
else: |
|
logger.warning("Skipping Salesforce storage due to missing user_id or Salesforce connection") |
|
|
|
file_path, pdf_error = generate_pdf_report(feedback, transcription, features, language, email, suggestions) |
|
if pdf_error: |
|
feedback += f"\n\n**Error**: {pdf_error}" |
|
return feedback, file_path, suggestions, None |
|
|
|
|
|
response_text = suggestions |
|
response, audio_path = await get_chatbot_response(response_text, language) |
|
if audio_path: |
|
logger.info(f"Generated audio response at {audio_path}") |
|
else: |
|
logger.warning("Failed to generate audio response") |
|
return feedback, file_path, response, None |
|
|
|
try: |
|
os.remove(audio_file) |
|
logger.info(f"Deleted audio file: {audio_file}") |
|
except Exception as e: |
|
logger.error(f"Failed to delete audio file: {str(e)}") |
|
|
|
return feedback, file_path, response, audio_path |
|
except Exception as e: |
|
logger.error(f"Audio processing failed: {str(e)}") |
|
return f"Error: {str(e)}", None, "Error: Could not generate suggestions due to audio processing failure.", None |
|
|
|
def launch(): |
|
custom_css = """ |
|
.gradio-container { |
|
max-width: 1200px; |
|
margin: auto; |
|
font-family: 'Roboto', sans-serif; |
|
background-color: var(--background-primary); |
|
color: var(--text-primary); |
|
} |
|
@import url('https://fonts.googleapis.com/css2?family=Roboto:wght@400;700&display=swap'); |
|
h1, h3 { |
|
background: linear-gradient(to right, #007bff, #0056b3); |
|
color: white; |
|
padding: 15px; |
|
border-radius: 8px; |
|
text-align: center; |
|
box-shadow: 0 4px 6px rgba(0, 0, 0, 0.1); |
|
} |
|
.gr-column { |
|
background: var(--background-secondary); |
|
border-radius: 8px; |
|
padding: 20px; |
|
box-shadow: 0 2px 8px rgba(0, 0, 0, 0.1); |
|
margin: 10px; |
|
color: var(--text-primary); |
|
} |
|
.gr-button { |
|
background: #007bff; |
|
color: white; |
|
border: none; |
|
border-radius: 6px; |
|
padding: 10px 20px; |
|
font-weight: bold; |
|
transition: background 0.3s; |
|
} |
|
.gr-button:hover { |
|
background: #0056b3; |
|
} |
|
.gr-textbox, .gr-dropdown, .gr-checkbox, .gr-file, .gr-audio { |
|
border-radius: 6px; |
|
border: 1px solid var(--border-color); |
|
background: var(--background-secondary); |
|
color: var(--text-primary); |
|
} |
|
.gr-textbox textarea { |
|
font-size: 14px; |
|
color: var(--text-primary); |
|
} |
|
#health-results { |
|
background: var(--success-background); |
|
border: 1px solid var(--success-border); |
|
border-radius: 6px; |
|
color: var(--text-primary); |
|
} |
|
.no-microphone-warning { |
|
color: var(--error-text); |
|
font-weight: bold; |
|
} |
|
""" |
|
|
|
def check_microphone_access(): |
|
try: |
|
from navigator import mediaDevices |
|
devices = mediaDevices.enumerateDevices() |
|
for device in devices: |
|
if device.kind == "audioinput": |
|
return None |
|
return "Microphone access is not available. Please upload an audio file or check browser permissions." |
|
except Exception as e: |
|
logger.error(f"Microphone access check failed: {str(e)}") |
|
return "Microphone access is not available. Please upload an audio file or check browser permissions." |
|
|
|
with gr.Blocks(title="MindCare Health Assistant", css=custom_css) as demo: |
|
gr.Markdown("Record your voice or type a message for health assessments and suggestions.") |
|
|
|
with gr.Row(): |
|
with gr.Column(): |
|
gr.Markdown("### Voice Analysis") |
|
mic_warning = gr.Markdown() |
|
mic_warning.value = check_microphone_access() or "" |
|
gr.Markdown("Upload voice (1+ sec) describing symptoms (e.g., 'I have a cough' or 'I feel stressed'). Note: Microphone recording may not be supported in all contexts; use file upload instead.") |
|
email_input = gr.Textbox(label="Enter Your Email", placeholder="e.g., user@example.com", value="") |
|
language_input = gr.Dropdown(choices=list(SUPPORTED_LANGUAGES.keys()), label="Select Language", value="English") |
|
consent_input = gr.Checkbox(label="I consent to data storage and voice analysis", value=True, interactive=False) |
|
audio_input = gr.Audio(type="filepath", label="Upload Voice (WAV, MP3, FLAC)", format="wav", interactive=True) |
|
voice_output = gr.Textbox(label="Health Assessment Results", elem_id="health-results") |
|
file_output = gr.File(label="Download Assessment Report (PDF)", file_types=[".pdf"]) |
|
submit_btn = gr.Button("Submit") |
|
clear_btn = gr.Button("Clear") |
|
|
|
with gr.Column(): |
|
gr.Markdown("### Health Suggestions") |
|
gr.Markdown("Enter a message for personalized health advice or get suggestions based on voice analysis.") |
|
text_input = gr.Textbox(label="Enter your message (optional)") |
|
text_output = gr.Textbox(label="Response") |
|
audio_output = gr.Audio(label="Response Audio") |
|
suggest_submit_btn = gr.Button("Submit") |
|
suggest_clear_btn = gr.Button("Clear") |
|
|
|
submit_btn.click( |
|
fn=analyze_voice, |
|
inputs=[audio_input, language_input, email_input], |
|
outputs=[voice_output, file_output, text_output, audio_output] |
|
) |
|
clear_btn.click( |
|
fn=lambda: (gr.update(value=None), gr.update(value="English"), gr.update(value=""), gr.update(value=""), gr.update(value=None), gr.update(value=""), gr.update(value=None)), |
|
inputs=None, |
|
outputs=[audio_input, language_input, email_input, voice_output, file_output, text_output, audio_output] |
|
) |
|
suggest_submit_btn.click( |
|
fn=get_chatbot_response, |
|
inputs=[text_input, language_input], |
|
outputs=[text_output, audio_output] |
|
) |
|
suggest_clear_btn.click( |
|
fn=lambda: (gr.update(value=""), gr.update(value=""), gr.update(value=None)), |
|
inputs=None, |
|
outputs=[text_input, text_output, audio_output] |
|
) |
|
demo.launch(server_name="0.0.0.0", server_port=7860) |
|
|
|
if __name__ == "__main__": |
|
launch() |