|
import gradio as gr |
|
import librosa |
|
import numpy as np |
|
import os |
|
import hashlib |
|
from datetime import datetime |
|
from transformers import pipeline |
|
import soundfile |
|
import torch |
|
from tenacity import retry, stop_after_attempt, wait_fixed |
|
import logging |
|
import tempfile |
|
import shutil |
|
from simple_salesforce import Salesforce |
|
from dotenv import load_dotenv |
|
import pyttsx3 |
|
from cryptography.fernet import Fernet |
|
import asyncio |
|
import base64 |
|
|
|
|
|
logging.basicConfig( |
|
level=logging.DEBUG, |
|
format="%(asctime)s - %(levelname)s - %(message)s", |
|
handlers=[logging.FileHandler("voice_analyzer.log"), logging.StreamHandler()] |
|
) |
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
SF_USERNAME = os.getenv("SF_USERNAME") |
|
SF_PASSWORD = os.getenv("SF_PASSWORD") |
|
SF_SECURITY_TOKEN = os.getenv("SF_SECURITY_TOKEN") |
|
SF_ENABLED = all([SF_USERNAME, SF_PASSWORD, SF_SECURITY_TOKEN]) |
|
sf = None |
|
if SF_ENABLED: |
|
try: |
|
sf = Salesforce( |
|
username=SF_USERNAME, |
|
password=SF_PASSWORD, |
|
security_token=SF_SECURITY_TOKEN |
|
) |
|
logger.info("Salesforce connection established") |
|
except Exception as e: |
|
logger.error(f"Salesforce connection failed: {str(e)}") |
|
SF_ENABLED = False |
|
|
|
|
|
ENCRYPTION_KEY = os.getenv("ENCRYPTION_KEY") or Fernet.generate_key() |
|
fernet = Fernet(ENCRYPTION_KEY) |
|
|
|
|
|
tts_engine = None |
|
try: |
|
tts_engine = pyttsx3.init() |
|
tts_engine.setProperty("rate", 150) |
|
logger.info("pyttsx3 initialized successfully") |
|
except Exception as e: |
|
logger.warning(f"Failed to initialize pyttsx3: {str(e)}. Text-to-speech disabled.") |
|
|
|
|
|
@retry(stop=stop_after_attempt(3), wait=wait_fixed(2)) |
|
def load_whisper_model(): |
|
try: |
|
model = pipeline( |
|
"automatic-speech-recognition", |
|
model="openai/whisper-large-v3", |
|
device=-1, |
|
model_kwargs={"use_safetensors": True} |
|
) |
|
logger.info("Whisper-large-v3 model loaded successfully") |
|
return model |
|
except Exception as e: |
|
logger.error(f"Failed to load Whisper model: {str(e)}") |
|
raise |
|
|
|
@retry(stop=stop_after_attempt(3), wait=wait_fixed(2)) |
|
def load_symptom_model(): |
|
try: |
|
model = pipeline( |
|
"text-classification", |
|
model="abhirajeshbhai/symptom-2-disease-net", |
|
device=-1, |
|
model_kwargs={"use_safetensors": True}, |
|
return_all_scores=False |
|
) |
|
logger.info("Symptom-2-Disease model loaded successfully") |
|
return model |
|
except Exception as e: |
|
logger.error(f"Failed to load Symptom-2-Disease model: {str(e)}") |
|
try: |
|
model = pipeline( |
|
"text-classification", |
|
model="distilbert-base-uncased", |
|
device=-1, |
|
return_all_scores=False |
|
) |
|
logger.warning("Fallback to distilbert-base-uncased model") |
|
return model |
|
except Exception as fallback_e: |
|
logger.error(f"Fallback model failed: {str(fallback_e)}") |
|
raise |
|
|
|
whisper = None |
|
symptom_classifier = None |
|
is_fallback_model = False |
|
|
|
try: |
|
whisper = load_whisper_model() |
|
except Exception as e: |
|
logger.error(f"Whisper model initialization failed: {str(e)}") |
|
|
|
try: |
|
symptom_classifier = load_symptom_model() |
|
except Exception as e: |
|
logger.error(f"Symptom model initialization failed: {str(e)}") |
|
symptom_classifier = None |
|
is_fallback_model = True |
|
|
|
def encrypt_data(data): |
|
"""Encrypt data using AES-256.""" |
|
try: |
|
if isinstance(data, str): |
|
data = data.encode() |
|
return fernet.encrypt(data).decode() |
|
except Exception as e: |
|
logger.error(f"Encryption failed: {str(e)}") |
|
return None |
|
|
|
def decrypt_data(data): |
|
"""Decrypt AES-256 encrypted data.""" |
|
try: |
|
return fernet.decrypt(data.encode()).decode() |
|
except Exception as e: |
|
logger.error(f"Decryption failed: {str(e)}") |
|
return None |
|
|
|
def compute_file_hash(file_path): |
|
"""Compute MD5 hash of encrypted file.""" |
|
try: |
|
hash_md5 = hashlib.md5() |
|
with open(file_path, "rb") as f: |
|
for chunk in iter(lambda: f.read(4096), b""): |
|
hash_md5.update(chunk) |
|
return hash_md5.hexdigest() |
|
except Exception as e: |
|
logger.error(f"Failed to compute file hash: {str(e)}") |
|
return "unknown" |
|
|
|
def ensure_writable_dir(directory): |
|
"""Ensure directory exists and is writable.""" |
|
try: |
|
os.makedirs(directory, exist_ok=True) |
|
test_file = os.path.join(directory, "test") |
|
with open(test_file, "w") as f: |
|
f.write("test") |
|
os.remove(test_file) |
|
logger.debug(f"Directory {directory} is writable") |
|
return True |
|
except Exception as e: |
|
logger.error(f"Directory {directory} not writable: {str(e)}") |
|
return False |
|
|
|
async def transcribe_audio(audio_file, language="en"): |
|
"""Transcribe audio using Whisper model.""" |
|
if not whisper: |
|
logger.error("Whisper model not loaded") |
|
return "Error: Whisper model not loaded" |
|
try: |
|
logger.debug(f"Transcribing audio: {audio_file} (language: {language})") |
|
if not isinstance(audio_file, (str, bytes, os.PathLike)) or not os.path.exists(audio_file): |
|
logger.error(f"Invalid or missing audio file: {audio_file}") |
|
return "Error: Invalid or missing audio file" |
|
audio, sr = librosa.load(audio_file, sr=16000) |
|
if len(audio) < 1600: |
|
logger.error("Audio too short") |
|
return "Error: Audio too short (<0.1s)" |
|
if np.max(np.abs(audio)) < 1e-4: |
|
logger.error("Audio too quiet") |
|
return "Error: Audio too quiet" |
|
|
|
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as temp_wav: |
|
temp_path = temp_wav.name |
|
soundfile.write(audio, sr, temp_path) |
|
logger.debug(f"Saved temp WAV: {temp_path}") |
|
|
|
with torch.no_grad(): |
|
result = whisper(temp_path, language=language, generate_kwargs={"num_beams": 5}) |
|
transcription = result.get("text", "").strip() |
|
logger.info(f"Transcription: {transcription}") |
|
|
|
try: |
|
os.remove(temp_path) |
|
logger.debug(f"Deleted temp WAV: {temp_path}") |
|
except Exception as e: |
|
logger.error(f"Failed to delete temp WAV: {str(e)}") |
|
|
|
if not transcription: |
|
logger.error("Transcription empty") |
|
return "Error: Transcription empty" |
|
words = transcription.split() |
|
if len(words) > 5 and len(set(words)) < len(words) / 2: |
|
logger.error("Transcription repetitive") |
|
return "Error: Transcription repetitive" |
|
return transcription |
|
except Exception as e: |
|
logger.error(f"Transcription failed: {str(e)}") |
|
return f"Error: {str(e)}" |
|
|
|
def analyze_symptoms(text): |
|
"""Analyze symptoms using Symptom-2-Disease model.""" |
|
if not symptom_classifier: |
|
logger.error("Symptom model not loaded") |
|
return "Error: Symptom model not loaded", 0.0 |
|
try: |
|
if not text or not isinstance(text, str) or "Error" in text: |
|
logger.error(f"Invalid text input: {text}") |
|
return "Error: No valid transcription", 0.0 |
|
|
|
with torch.no_grad(): |
|
result = symptom_classifier(text) |
|
logger.debug(f"Raw model output: {result}") |
|
|
|
|
|
prediction = "No health condition detected" |
|
score = 0.0 |
|
|
|
|
|
if result is None: |
|
logger.warning("Model output is None") |
|
elif isinstance(result, (str, int, float, bool)): |
|
logger.warning(f"Invalid model output type: {type(result)}, value: {result}") |
|
elif isinstance(result, (tuple, list)): |
|
|
|
flattened = [] |
|
def flatten(item, depth=0, max_depth=10): |
|
if depth > max_depth: |
|
logger.warning(f"Max recursion depth exceeded: {item}") |
|
return |
|
if isinstance(item, (tuple, list)): |
|
for subitem in item: |
|
flatten(subitem, depth + 1, max_depth) |
|
elif isinstance(item, dict): |
|
flattened.append(item) |
|
else: |
|
logger.warning(f"Skipping non-dict item: {item}") |
|
flatten(result) |
|
if not flattened: |
|
logger.warning("Flattened model output is empty") |
|
elif not all(isinstance(item, dict) for item in flattened): |
|
logger.warning(f"Non-dictionary items in flattened result: {flattened}") |
|
elif not all("label" in item and "score" in item for item in flattened): |
|
logger.warning(f"Missing label or score in flattened result: {flattened}") |
|
else: |
|
|
|
valid_items = [ |
|
item for item in flattened |
|
if isinstance(item, dict) and "label" in item and "score" in item |
|
and isinstance(item["label"], str) |
|
and isinstance(item["score"], (int, float)) and 0 <= item["score"] <= 1 |
|
] |
|
if valid_items: |
|
sorted_items = sorted(valid_items, key=lambda x: x["score"], reverse=True) |
|
prediction = sorted_items[0]["label"] |
|
score = sorted_items[0]["score"] |
|
elif isinstance(result, dict): |
|
logger.debug("Model returned single dictionary") |
|
if "label" in result and "score" in result: |
|
prediction = result["label"] |
|
score = result["score"] |
|
else: |
|
logger.warning(f"Missing label or score in dictionary: {result}") |
|
|
|
|
|
if not isinstance(prediction, str): |
|
logger.warning(f"Invalid label type: {type(prediction)}, value: {prediction}") |
|
prediction = "No health condition detected" |
|
if not isinstance(score, (int, float)) or score < 0 or score > 1: |
|
logger.warning(f"Invalid score: {score}") |
|
score = 0.0 |
|
|
|
if is_fallback_model: |
|
logger.warning("Using fallback DistilBERT model") |
|
prediction = f"{prediction} (distilbert)" |
|
logger.info(f"Prediction: {prediction}, Score: {score:.4f}") |
|
return prediction, score |
|
except Exception as e: |
|
logger.error(f"Symptom analysis failed: {str(e)}") |
|
return "Error: Symptom analysis failed", 0.0 |
|
|
|
def save_to_salesforce(user_id, transcription, prediction, score, feedback, consent_granted): |
|
"""Save analysis results to Salesforce.""" |
|
if not SF_ENABLED or not sf: |
|
logger.debug("Salesforce integration disabled or not connected") |
|
return |
|
try: |
|
if consent_granted: |
|
encrypted_transcription = encrypt_data(transcription) |
|
encrypted_feedback = encrypt_data(feedback) |
|
sf.Health_Analysis__c.create({ |
|
"User_ID__c": user_id, |
|
"Transcription__c": encrypted_transcription[:255], |
|
"Prediction__c": prediction[:255], |
|
"Confidence_Score__c": float(score), |
|
"Feedback__c": encrypted_feedback[:255], |
|
"Analysis_Date__c": datetime.utcnow().strftime("%Y-%m-%d") |
|
}) |
|
logger.info("Saved analysis to Salesforce") |
|
except Exception as e: |
|
logger.error(f"Failed to save to Salesforce: {str(e)}") |
|
|
|
def generate_report(): |
|
"""Generate usage report via Salesforce.""" |
|
if not SF_ENABLED or not sf: |
|
return "Error: Salesforce not connected" |
|
try: |
|
query = "SELECT COUNT(Id), Prediction__c FROM Health_Analysis__c GROUP BY Prediction__c" |
|
result = sf.query(query) |
|
report = "Health Analysis Report\n" |
|
for record in result["records"]: |
|
count = record["expr0"] |
|
prediction = record["Prediction__c"] |
|
report += f"Condition: {prediction}, Count: {count}\n" |
|
logger.info("Generated usage report") |
|
return report |
|
except Exception as e: |
|
logger.error(f"Failed to generate report: {str(e)}") |
|
return f"Error: {str(e)}" |
|
|
|
async def speak_response(text): |
|
"""Convert text to speech.""" |
|
if not tts_engine: |
|
logger.warning("Text-to-speech unavailable; skipping") |
|
return |
|
try: |
|
def sync_speak(): |
|
tts_engine.say(text) |
|
tts_engine.runAndWait() |
|
loop = asyncio.get_event_loop() |
|
await loop.run_in_executor(None, sync_speak) |
|
logger.debug("Spoke response") |
|
except Exception as e: |
|
logger.error(f"Text-to-speech failed: {str(e)}") |
|
|
|
async def analyze_voice(audio_file, language="en", user_id="anonymous", consent_granted=True): |
|
"""Analyze voice for health indicators.""" |
|
try: |
|
logger.debug(f"Starting analysis for audio_file: {audio_file}, language: {language}") |
|
if audio_file is None or not isinstance(audio_file, (str, bytes, os.PathLike)): |
|
logger.error(f"Invalid audio file input: {audio_file}") |
|
return "Error: No audio file provided" |
|
|
|
temp_dir = os.path.join(tempfile.gettempdir(), "gradio") |
|
if not ensure_writable_dir(temp_dir): |
|
fallback_dir = os.path.join(os.getcwd(), "temp_gradio") |
|
if not ensure_writable_dir(fallback_dir): |
|
logger.error(f"Temp directories {temp_dir} and {fallback_dir} not writable") |
|
return "Error: Temp directories not writable" |
|
temp_dir = fallback_dir |
|
|
|
if not os.path.exists(audio_file): |
|
logger.error(f"Audio file not found: {audio_file}") |
|
return "Error: Audio file not found" |
|
|
|
unique_path = os.path.join( |
|
temp_dir, |
|
f"audio_{datetime.utcnow().strftime('%Y%m%d%H%M%S%f')}_{os.path.basename(audio_file or 'unknown.wav')}" |
|
) |
|
try: |
|
shutil.copy(audio_file, unique_path) |
|
audio_file = unique_path |
|
logger.debug(f"Copied to: {audio_file}") |
|
except Exception as e: |
|
logger.error(f"Failed to copy audio file: {str(e)}") |
|
return f"Error: Failed to copy audio file: {str(e)}" |
|
|
|
file_hash = compute_file_hash(audio_file) |
|
logger.info(f"Processing audio, Hash: {file_hash}") |
|
|
|
audio, sr = librosa.load(audio_file, sr=16000) |
|
logger.info(f"Audio loaded: shape={audio.shape}, SR={sr}, Duration={len(audio)/sr:.2f}s") |
|
|
|
transcription = await transcribe_audio(audio_file, language) |
|
if "Error" in transcription: |
|
logger.error(f"Transcription error: {transcription}") |
|
return transcription |
|
|
|
if any(keyword in transcription.lower() for keyword in ["medicine", "treatment"]): |
|
logger.warning("Medication query detected") |
|
feedback = "Error: This tool does not provide medication advice" |
|
await speak_response(feedback) |
|
return feedback |
|
|
|
prediction, score = analyze_symptoms(transcription) |
|
if "Error" in prediction: |
|
logger.error(f"Symptom analysis error: {prediction}") |
|
return prediction |
|
|
|
feedback = ( |
|
"No health condition detected, consult a doctor if symptoms persist. This is not a medical diagnosis." |
|
if prediction == "No health condition detected" |
|
else f"Possible {prediction.lower()} detected based on symptoms like '{transcription.lower()}', consult a doctor. This is not a medical diagnosis." |
|
) |
|
logger.info(f"Feedback: {feedback}, Transcription: {transcription}, Prediction: {prediction}, Score: {score:.4f}") |
|
|
|
|
|
save_to_salesforce(user_id, transcription, prediction, score, feedback, consent_granted) |
|
|
|
try: |
|
os.remove(audio_file) |
|
logger.debug(f"Deleted audio file: {audio_file}") |
|
except Exception as e: |
|
logger.error(f"Failed to delete audio file: {str(e)}") |
|
|
|
|
|
await speak_response(feedback) |
|
|
|
return feedback |
|
except Exception as e: |
|
logger.error(f"Voice analysis failed: {str(e)}") |
|
return f"Error: {str(e)}" |
|
|
|
async def test_with_sample_audio(language="en", user_id="anonymous", consent_granted=True): |
|
"""Test with synthetic audio.""" |
|
temp_dir = os.path.join(tempfile.gettempdir(), "audio_samples") |
|
if not ensure_writable_dir(temp_dir): |
|
fallback_dir = os.path.join(os.getcwd(), "temp_audio_samples") |
|
if not ensure_writable_dir(fallback_dir): |
|
logger.error(f"Temp directories {temp_dir} and {fallback_dir} not writable") |
|
return f"Error: Temp directories not writable" |
|
temp_dir = fallback_dir |
|
|
|
sample_audio_path = os.path.join(temp_dir, "dummy_test.wav") |
|
logger.info(f"Generating synthetic audio at: {sample_audio_path}") |
|
sr = 16000 |
|
t = np.linspace(0, 2, 2 * sr) |
|
freq_mod = 440 + 10 * np.sin(2 * np.pi * 0.5 * t) |
|
amplitude_mod = 0.5 + 0.1 * np.sin(2 * np.pi * 0.3 * t) |
|
noise = 0.01 * np.random.normal(0, 1, len(t)) |
|
dummy_audio = amplitude_mod * np.sin(2 * np.pi * freq_mod * t) + noise |
|
try: |
|
soundfile.write(dummy_audio, sr, sample_audio_path) |
|
logger.info(f"Generated synthetic audio: {sample_audio_path}") |
|
except Exception as e: |
|
logger.error(f"Failed to write synthetic audio: {str(e)}") |
|
return f"Error: Failed to generate synthetic audio: {str(e)}" |
|
|
|
if not os.path.exists(sample_audio_path): |
|
logger.error(f"Synthetic audio not created: {sample_audio_path}") |
|
return f"Error: Synthetic audio not created: {sample_audio_path}" |
|
|
|
mock_transcription = "I have a cough and sore throat" |
|
logger.info(f"Mock transcription: {mock_transcription}") |
|
prediction, score = analyze_symptoms(mock_transcription) |
|
feedback = ( |
|
"No health condition detected, consult a doctor if symptoms persist. This is not a medical diagnosis." |
|
if prediction == "No health condition detected" |
|
else f"Possible {prediction.lower()} detected based on symptoms like '{mock_transcription.lower()}', consult a doctor. This is not a medical diagnosis." |
|
) |
|
logger.info(f"Test feedback: {feedback}, Prediction: {prediction}, Score: {score:.4f}") |
|
|
|
|
|
save_to_salesforce(user_id, mock_transcription, prediction, score, feedback, consent_granted) |
|
|
|
try: |
|
os.remove(sample_audio_path) |
|
logger.debug(f"Deleted test audio: {sample_audio_path}") |
|
except Exception: |
|
pass |
|
return feedback |
|
|
|
async def voicebot_interface(audio_file, language="en", user_id="anonymous", consent_granted=True): |
|
"""Gradio interface wrapper.""" |
|
return await analyze_voice(audio_file, language, user_id, consent_granted) |
|
|
|
|
|
iface = gr.Interface( |
|
fn=voicebot_interface, |
|
inputs=[ |
|
gr.Audio(type="filepath", label="Record or Upload Voice (WAV, MP3, FLAC, 1+ sec)"), |
|
gr.Dropdown(["en", "es", "hi", "zh"], label="Language", value="en"), |
|
gr.Textbox(label="User ID (optional)", value="anonymous"), |
|
gr.Checkbox(label="Consent to store data", value=True) |
|
], |
|
outputs=gr.Textbox(label="Health Assessment Feedback"), |
|
title="Smart Voicebot for Public Health", |
|
description="Record or upload a voice sample describing symptoms (e.g., 'I have a cough') for preliminary health assessment. Supports English, Spanish, Hindi, Mandarin. Not a diagnostic tool. Data is encrypted and stored with consent. Complies with HIPAA/GDPR." |
|
) |
|
|
|
if __name__ == "__main__": |
|
logger.info("Starting Voice Health Analyzer") |
|
|
|
loop = asyncio.get_event_loop() |
|
print(loop.run_until_complete(test_with_sample_audio())) |
|
iface.launch(server_name="0.0.0.0", server_port=7860) |