Spaces:
Sleeping
Sleeping
import gradio as gr | |
import librosa | |
import numpy as np | |
import torch | |
from transformers import WhisperProcessor, WhisperForConditionalGeneration | |
from simple_salesforce import Salesforce | |
import os | |
from datetime import datetime | |
import logging | |
import webrtcvad | |
# Set up logging for usage metrics and debugging | |
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") | |
logger = logging.getLogger(__name__) | |
usage_metrics = {"total_assessments": 0} # Simple in-memory metric (to be expanded with Salesforce) | |
# Salesforce credentials (assumed secure via environment variables) | |
SF_USERNAME = os.getenv("SF_USERNAME") | |
SF_PASSWORD = os.getenv("SF_PASSWORD") | |
SF_SECURITY_TOKEN = os.getenv("SF_SECURITY_TOKEN") | |
SF_INSTANCE_URL = os.getenv("SF_INSTANCE_URL", "https://login.salesforce.com") | |
# Initialize Salesforce | |
sf = None | |
try: | |
if all([SF_USERNAME, SF_PASSWORD, SF_SECURITY_TOKEN]): | |
sf = Salesforce( | |
username=SF_USERNAME, | |
password=SF_PASSWORD, | |
security_token=SF_SECURITY_TOKEN, | |
instance_url=SF_INSTANCE_URL | |
) | |
logger.info("Connected to Salesforce for user management") | |
else: | |
logger.warning("Salesforce credentials missing; user management disabled") | |
except Exception as e: | |
logger.error(f"Salesforce connection failed: {str(e)}") | |
# Load Whisper model for speech-to-text | |
whisper_processor = WhisperProcessor.from_pretrained("openai/whisper-tiny") | |
whisper_model = WhisperForConditionalGeneration.from_pretrained("openai/whisper-tiny") | |
whisper_model.config.forced_decoder_ids = whisper_processor.get_decoder_prompt_ids(language="english", task="transcribe") | |
# Initialize VAD | |
vad = webrtcvad.Vad(mode=2) # Moderate mode for balanced voice detection | |
def extract_health_features(audio, sr): | |
"""Extract health-related audio features.""" | |
try: | |
# Normalize audio | |
audio = audio / np.max(np.abs(audio)) if np.max(np.abs(audio)) != 0 else audio | |
# Voice Activity Detection | |
frame_duration = 30 # ms | |
frame_samples = int(sr * frame_duration / 1000) | |
frames = [audio[i:i + frame_samples] for i in range(0, len(audio), frame_samples)] | |
voiced_frames = [ | |
frame for frame in frames | |
if len(frame) == frame_samples and vad.is_speech((frame * 32768).astype(np.int16).tobytes(), sr) | |
] | |
if not voiced_frames: | |
raise ValueError("No voiced segments detected") | |
voiced_audio = np.concatenate(voiced_frames) | |
# Pitch (F0) with validated range (75-300 Hz for adults) | |
pitches, magnitudes = librosa.piptrack(y=voiced_audio, sr=sr, fmin=75, fmax=300) | |
valid_pitches = [p for p in pitches[magnitudes > 0] if 75 <= p <= 300] | |
pitch = np.mean(valid_pitches) if valid_pitches else 0 | |
jitter = np.std(valid_pitches) / pitch if pitch and valid_pitches else 0 | |
if jitter > 10: # Cap extreme jitter (likely noise) | |
jitter = 10 | |
logger.warning("Jitter capped at 10% due to possible noise or distortion") | |
# Shimmer (amplitude variation) | |
amplitudes = librosa.feature.rms(y=voiced_audio, frame_length=2048, hop_length=512)[0] | |
shimmer = np.std(amplitudes) / np.mean(amplitudes) if np.mean(amplitudes) else 0 | |
if shimmer > 10: # Cap extreme shimmer (likely noise) | |
shimmer = 10 | |
logger.warning("Shimmer capped at 10% due to possible noise or distortion") | |
# Energy | |
energy = np.mean(librosa.feature.rms(y=voiced_audio, frame_length=2048, hop_length=512)[0]) | |
return { | |
"pitch": pitch, | |
"jitter": jitter * 100, # Convert to percentage | |
"shimmer": shimmer * 100, # Convert to percentage | |
"energy": energy | |
} | |
except Exception as e: | |
logger.error(f"Feature extraction failed: {str(e)}") | |
raise | |
def transcribe_audio(audio): | |
"""Transcribe audio to text using Whisper.""" | |
try: | |
inputs = whisper_processor(audio, sampling_rate=16000, return_tensors="pt") | |
with torch.no_grad(): | |
generated_ids = whisper_model.generate(inputs["input_features"]) | |
transcription = whisper_processor.batch_decode(generated_ids, skip_special_tokens=True)[0] | |
logger.info(f"Transcription: {transcription}") | |
return transcription | |
except Exception as e: | |
logger.error(f"Transcription failed: {str(e)}") | |
return "" | |
def analyze_symptoms(text): | |
"""Mock symptom-to-disease analysis (placeholder for symptom-2-disease-net).""" | |
text = text.lower() | |
feedback = [] | |
if "cough" in text or "difficulty breathing" in text: | |
feedback.append("Based on your input, you may have a respiratory issue, such as bronchitis or asthma. Please consult a doctor.") | |
elif "stressed" in text or "stress" in text or "tired" in text or "fatigue" in text: | |
feedback.append("Your description suggests possible stress or fatigue, potentially linked to anxiety or exhaustion. Consider seeking medical advice.") | |
else: | |
feedback.append("Your input didn’t clearly indicate specific symptoms. Please describe any health concerns (e.g., cough, stress) and consult a healthcare provider for a thorough check.") | |
return "\n".join(feedback) | |
def analyze_voice(audio_file=None): | |
"""Analyze voice for health indicators.""" | |
global usage_metrics | |
usage_metrics["total_assessments"] += 1 | |
logger.info(f"Total assessments: {usage_metrics['total_assessments']}") | |
try: | |
# Load audio from file if provided | |
if audio_file and os.path.exists(audio_file): | |
audio, sr = librosa.load(audio_file, sr=16000) | |
else: | |
raise ValueError("No valid audio file provided for analysis") | |
if len(audio) < sr: | |
raise ValueError("Audio too short (minimum 1 second)") | |
# Extract voice features | |
features = extract_health_features(audio, sr) | |
# Transcribe audio for symptom analysis | |
transcription = transcribe_audio(audio) | |
symptom_feedback = analyze_symptoms(transcription) if transcription else "No transcription available. Please record again with clear speech." | |
# Analyze voice features for health indicators | |
feedback = [] | |
respiratory_score = features["jitter"] | |
mental_health_score = features["shimmer"] | |
# Rule-based analysis with personalized feedback | |
if respiratory_score > 1.0: | |
feedback.append(f"Your voice indicates elevated jitter ({respiratory_score:.2f}%), which may suggest respiratory issues. Consult a doctor.") | |
if mental_health_score > 5.0: | |
feedback.append(f"Your voice shows elevated shimmer ({mental_health_score:.2f}%), possibly indicating stress or emotional strain. Consider a health check.") | |
if features["energy"] < 0.01: | |
feedback.append(f"Your vocal energy is low ({features['energy']:.4f}), which might point to fatigue. Seek medical advice if this persists.") | |
if not feedback and not symptom_feedback.startswith("No transcription"): | |
feedback.append("Your voice analysis shows no immediate health concerns based on current data.") | |
# Combine voice and symptom feedback | |
feedback.append("\n**Symptom Feedback (Based on Your Input)**:") | |
feedback.append(symptom_feedback) | |
feedback.append("\n**Voice Analysis Details**:") | |
feedback.append(f"Pitch: {features['pitch']:.2f} Hz (average fundamental frequency)") | |
feedback.append(f"Jitter: {respiratory_score:.2f}% (pitch variation, higher values may indicate respiratory issues)") | |
feedback.append(f"Shimmer: {mental_health_score:.2f}% (amplitude variation, higher values may indicate stress)") | |
feedback.append(f"Energy: {features['energy']:.4f} (vocal intensity, lower values may indicate fatigue)") | |
feedback.append(f"Transcription: {transcription if transcription else 'None'}") | |
feedback.append("\n**Disclaimer**: This is a preliminary analysis, not a medical diagnosis. Always consult a healthcare provider for professional evaluation.") | |
feedback_str = "\n".join(feedback) | |
# Store in Salesforce (with consent implied via credentials) | |
if sf: | |
store_in_salesforce(audio_file, feedback_str, respiratory_score, mental_health_score, features, transcription) | |
# Clean up audio file for HIPAA/GDPR compliance | |
if audio_file and os.path.exists(audio_file): | |
try: | |
os.remove(audio_file) | |
logger.info(f"Deleted audio file: {audio_file} for compliance") | |
except Exception as e: | |
logger.error(f"Failed to delete audio file: {str(e)}") | |
return feedback_str | |
except Exception as e: | |
logger.error(f"Audio processing failed: {str(e)}") | |
return f"Error: {str(e)}" | |
def store_in_salesforce(audio_file, feedback, respiratory_score, mental_health_score, features, transcription): | |
"""Store results in Salesforce with encrypted data.""" | |
try: | |
sf.HealthAssessment__c.create({ | |
"AssessmentDate__c": datetime.utcnow().isoformat(), | |
"Feedback__c": feedback, | |
"RespiratoryScore__c": float(respiratory_score), | |
"MentalHealthScore__c": float(mental_health_score), | |
"AudioFileName__c": os.path.basename(audio_file) if audio_file else "user_recorded_audio", | |
"Pitch__c": float(features["pitch"]), | |
"Jitter__c": float(features["jitter"]), | |
"Shimmer__c": float(features["shimmer"]), | |
"Energy__c": float(features["energy"]), | |
"Transcription__c": transcription | |
}) | |
logger.info("Stored assessment in Salesforce") | |
except Exception as e: | |
logger.error(f"Salesforce storage failed: {str(e)}") | |
# Gradio interface with accessibility focus | |
iface = gr.Interface( | |
fn=analyze_voice, | |
inputs=gr.Audio(type="filepath", label="Record or Upload Your Voice (WAV, MP3, FLAC, 1+ sec)", format="wav"), | |
outputs=gr.Textbox(label="Health Assessment Results", elem_id="health-results"), | |
title="Smart Voicebot for Public Health", | |
description="Record or upload your voice (minimum 1 second) to receive a preliminary health check. Speak clearly in English about your symptoms (e.g., 'I have a cough' or 'I feel stressed'). This tool is accessible via web and mobile.", | |
theme="default", # Basic theme; enhance for screen readers later | |
allow_flagging="never" # Prevent data retention without consent | |
) | |
if __name__ == "__main__": | |
logger.info("Starting Voice Health Analyzer at 12:34 PM IST, June 23, 2025") | |
iface.launch(server_name="0.0.0.0", server_port=7860) | |