geethareddy's picture
Update app.py
b7ce4b0 verified
import gradio as gr
import librosa
import numpy as np
import os
import hashlib
from datetime import datetime
from transformers import pipeline
import soundfile
import torch
from tenacity import retry, stop_after_attempt, wait_fixed
import logging
import tempfile
import shutil
from simple_salesforce import Salesforce
from dotenv import load_dotenv
import pyttsx3
from cryptography.fernet import Fernet
import asyncio
import base64
# Set up logging
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[logging.FileHandler("voice_analyzer.log"), logging.StreamHandler()]
)
logger = logging.getLogger(__name__)
# Load environment variables
load_dotenv()
# Salesforce configuration
SF_USERNAME = os.getenv("SF_USERNAME")
SF_PASSWORD = os.getenv("SF_PASSWORD")
SF_SECURITY_TOKEN = os.getenv("SF_SECURITY_TOKEN")
SF_ENABLED = all([SF_USERNAME, SF_PASSWORD, SF_SECURITY_TOKEN])
sf = None
if SF_ENABLED:
try:
sf = Salesforce(
username=SF_USERNAME,
password=SF_PASSWORD,
security_token=SF_SECURITY_TOKEN
)
logger.info("Salesforce connection established")
except Exception as e:
logger.error(f"Salesforce connection failed: {str(e)}")
SF_ENABLED = False
# Encryption setup (AES-256)
ENCRYPTION_KEY = os.getenv("ENCRYPTION_KEY") or Fernet.generate_key()
fernet = Fernet(ENCRYPTION_KEY)
# Initialize text-to-speech with fallback
tts_engine = None
try:
tts_engine = pyttsx3.init()
tts_engine.setProperty("rate", 150)
logger.info("pyttsx3 initialized successfully")
except Exception as e:
logger.warning(f"Failed to initialize pyttsx3: {str(e)}. Text-to-speech disabled.")
# Initialize local models
@retry(stop=stop_after_attempt(3), wait=wait_fixed(2))
def load_whisper_model():
try:
model = pipeline(
"automatic-speech-recognition",
model="openai/whisper-large-v3",
device=-1, # CPU; use device=0 for GPU
model_kwargs={"use_safetensors": True}
)
logger.info("Whisper-large-v3 model loaded successfully")
return model
except Exception as e:
logger.error(f"Failed to load Whisper model: {str(e)}")
raise
@retry(stop=stop_after_attempt(3), wait=wait_fixed(2))
def load_symptom_model():
try:
model = pipeline(
"text-classification",
model="abhirajeshbhai/symptom-2-disease-net",
device=-1,
model_kwargs={"use_safetensors": True},
return_all_scores=False
)
logger.info("Symptom-2-Disease model loaded successfully")
return model
except Exception as e:
logger.error(f"Failed to load Symptom-2-Disease model: {str(e)}")
try:
model = pipeline(
"text-classification",
model="distilbert-base-uncased",
device=-1,
return_all_scores=False
)
logger.warning("Fallback to distilbert-base-uncased model")
return model
except Exception as fallback_e:
logger.error(f"Fallback model failed: {str(fallback_e)}")
raise
whisper = None
symptom_classifier = None
is_fallback_model = False
try:
whisper = load_whisper_model()
except Exception as e:
logger.error(f"Whisper model initialization failed: {str(e)}")
try:
symptom_classifier = load_symptom_model()
except Exception as e:
logger.error(f"Symptom model initialization failed: {str(e)}")
symptom_classifier = None
is_fallback_model = True
def encrypt_data(data):
"""Encrypt data using AES-256."""
try:
if isinstance(data, str):
data = data.encode()
return fernet.encrypt(data).decode()
except Exception as e:
logger.error(f"Encryption failed: {str(e)}")
return None
def decrypt_data(data):
"""Decrypt AES-256 encrypted data."""
try:
return fernet.decrypt(data.encode()).decode()
except Exception as e:
logger.error(f"Decryption failed: {str(e)}")
return None
def compute_file_hash(file_path):
"""Compute MD5 hash of encrypted file."""
try:
hash_md5 = hashlib.md5()
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()
except Exception as e:
logger.error(f"Failed to compute file hash: {str(e)}")
return "unknown"
def ensure_writable_dir(directory):
"""Ensure directory exists and is writable."""
try:
os.makedirs(directory, exist_ok=True)
test_file = os.path.join(directory, "test")
with open(test_file, "w") as f:
f.write("test")
os.remove(test_file)
logger.debug(f"Directory {directory} is writable")
return True
except Exception as e:
logger.error(f"Directory {directory} not writable: {str(e)}")
return False
async def transcribe_audio(audio_file, language="en"):
"""Transcribe audio using Whisper model."""
if not whisper:
logger.error("Whisper model not loaded")
return "Error: Whisper model not loaded"
try:
logger.debug(f"Transcribing audio: {audio_file} (language: {language})")
if not isinstance(audio_file, (str, bytes, os.PathLike)) or not os.path.exists(audio_file):
logger.error(f"Invalid or missing audio file: {audio_file}")
return "Error: Invalid or missing audio file"
audio, sr = librosa.load(audio_file, sr=16000)
if len(audio) < 1600:
logger.error("Audio too short")
return "Error: Audio too short (<0.1s)"
if np.max(np.abs(audio)) < 1e-4:
logger.error("Audio too quiet")
return "Error: Audio too quiet"
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as temp_wav:
temp_path = temp_wav.name
soundfile.write(audio, sr, temp_path)
logger.debug(f"Saved temp WAV: {temp_path}")
with torch.no_grad():
result = whisper(temp_path, language=language, generate_kwargs={"num_beams": 5})
transcription = result.get("text", "").strip()
logger.info(f"Transcription: {transcription}")
try:
os.remove(temp_path)
logger.debug(f"Deleted temp WAV: {temp_path}")
except Exception as e:
logger.error(f"Failed to delete temp WAV: {str(e)}")
if not transcription:
logger.error("Transcription empty")
return "Error: Transcription empty"
words = transcription.split()
if len(words) > 5 and len(set(words)) < len(words) / 2:
logger.error("Transcription repetitive")
return "Error: Transcription repetitive"
return transcription
except Exception as e:
logger.error(f"Transcription failed: {str(e)}")
return f"Error: {str(e)}"
def analyze_symptoms(text):
"""Analyze symptoms using Symptom-2-Disease model."""
if not symptom_classifier:
logger.error("Symptom model not loaded")
return "Error: Symptom model not loaded", 0.0
try:
if not text or not isinstance(text, str) or "Error" in text:
logger.error(f"Invalid text input: {text}")
return "Error: No valid transcription", 0.0
with torch.no_grad():
result = symptom_classifier(text)
logger.debug(f"Raw model output: {result}")
# Exhaustive output validation
prediction = "No health condition detected"
score = 0.0
# Handle all possible output types
if result is None:
logger.warning("Model output is None")
elif isinstance(result, (str, int, float, bool)):
logger.warning(f"Invalid model output type: {type(result)}, value: {result}")
elif isinstance(result, (tuple, list)):
# Flatten nested tuples/lists
flattened = []
def flatten(item, depth=0, max_depth=10):
if depth > max_depth:
logger.warning(f"Max recursion depth exceeded: {item}")
return
if isinstance(item, (tuple, list)):
for subitem in item:
flatten(subitem, depth + 1, max_depth)
elif isinstance(item, dict):
flattened.append(item)
else:
logger.warning(f"Skipping non-dict item: {item}")
flatten(result)
if not flattened:
logger.warning("Flattened model output is empty")
elif not all(isinstance(item, dict) for item in flattened):
logger.warning(f"Non-dictionary items in flattened result: {flattened}")
elif not all("label" in item and "score" in item for item in flattened):
logger.warning(f"Missing label or score in flattened result: {flattened}")
else:
# Sort by score (descending) and take the highest
valid_items = [
item for item in flattened
if isinstance(item, dict) and "label" in item and "score" in item
and isinstance(item["label"], str)
and isinstance(item["score"], (int, float)) and 0 <= item["score"] <= 1
]
if valid_items:
sorted_items = sorted(valid_items, key=lambda x: x["score"], reverse=True)
prediction = sorted_items[0]["label"]
score = sorted_items[0]["score"]
elif isinstance(result, dict):
logger.debug("Model returned single dictionary")
if "label" in result and "score" in result:
prediction = result["label"]
score = result["score"]
else:
logger.warning(f"Missing label or score in dictionary: {result}")
# Validate prediction and score
if not isinstance(prediction, str):
logger.warning(f"Invalid label type: {type(prediction)}, value: {prediction}")
prediction = "No health condition detected"
if not isinstance(score, (int, float)) or score < 0 or score > 1:
logger.warning(f"Invalid score: {score}")
score = 0.0
if is_fallback_model:
logger.warning("Using fallback DistilBERT model")
prediction = f"{prediction} (distilbert)"
logger.info(f"Prediction: {prediction}, Score: {score:.4f}")
return prediction, score
except Exception as e:
logger.error(f"Symptom analysis failed: {str(e)}")
return "Error: Symptom analysis failed", 0.0
def save_to_salesforce(user_id, transcription, prediction, score, feedback, consent_granted):
"""Save analysis results to Salesforce."""
if not SF_ENABLED or not sf:
logger.debug("Salesforce integration disabled or not connected")
return
try:
if consent_granted:
encrypted_transcription = encrypt_data(transcription)
encrypted_feedback = encrypt_data(feedback)
sf.Health_Analysis__c.create({
"User_ID__c": user_id,
"Transcription__c": encrypted_transcription[:255],
"Prediction__c": prediction[:255],
"Confidence_Score__c": float(score),
"Feedback__c": encrypted_feedback[:255],
"Analysis_Date__c": datetime.utcnow().strftime("%Y-%m-%d")
})
logger.info("Saved analysis to Salesforce")
except Exception as e:
logger.error(f"Failed to save to Salesforce: {str(e)}")
def generate_report():
"""Generate usage report via Salesforce."""
if not SF_ENABLED or not sf:
return "Error: Salesforce not connected"
try:
query = "SELECT COUNT(Id), Prediction__c FROM Health_Analysis__c GROUP BY Prediction__c"
result = sf.query(query)
report = "Health Analysis Report\n"
for record in result["records"]:
count = record["expr0"]
prediction = record["Prediction__c"]
report += f"Condition: {prediction}, Count: {count}\n"
logger.info("Generated usage report")
return report
except Exception as e:
logger.error(f"Failed to generate report: {str(e)}")
return f"Error: {str(e)}"
async def speak_response(text):
"""Convert text to speech."""
if not tts_engine:
logger.warning("Text-to-speech unavailable; skipping")
return
try:
def sync_speak():
tts_engine.say(text)
tts_engine.runAndWait()
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, sync_speak)
logger.debug("Spoke response")
except Exception as e:
logger.error(f"Text-to-speech failed: {str(e)}")
async def analyze_voice(audio_file, language="en", user_id="anonymous", consent_granted=True):
"""Analyze voice for health indicators."""
try:
logger.debug(f"Starting analysis for audio_file: {audio_file}, language: {language}")
if audio_file is None or not isinstance(audio_file, (str, bytes, os.PathLike)):
logger.error(f"Invalid audio file input: {audio_file}")
return "Error: No audio file provided"
temp_dir = os.path.join(tempfile.gettempdir(), "gradio")
if not ensure_writable_dir(temp_dir):
fallback_dir = os.path.join(os.getcwd(), "temp_gradio")
if not ensure_writable_dir(fallback_dir):
logger.error(f"Temp directories {temp_dir} and {fallback_dir} not writable")
return "Error: Temp directories not writable"
temp_dir = fallback_dir
if not os.path.exists(audio_file):
logger.error(f"Audio file not found: {audio_file}")
return "Error: Audio file not found"
unique_path = os.path.join(
temp_dir,
f"audio_{datetime.utcnow().strftime('%Y%m%d%H%M%S%f')}_{os.path.basename(audio_file or 'unknown.wav')}"
)
try:
shutil.copy(audio_file, unique_path)
audio_file = unique_path
logger.debug(f"Copied to: {audio_file}")
except Exception as e:
logger.error(f"Failed to copy audio file: {str(e)}")
return f"Error: Failed to copy audio file: {str(e)}"
file_hash = compute_file_hash(audio_file)
logger.info(f"Processing audio, Hash: {file_hash}")
audio, sr = librosa.load(audio_file, sr=16000)
logger.info(f"Audio loaded: shape={audio.shape}, SR={sr}, Duration={len(audio)/sr:.2f}s")
transcription = await transcribe_audio(audio_file, language)
if "Error" in transcription:
logger.error(f"Transcription error: {transcription}")
return transcription
if any(keyword in transcription.lower() for keyword in ["medicine", "treatment"]):
logger.warning("Medication query detected")
feedback = "Error: This tool does not provide medication advice"
await speak_response(feedback)
return feedback
prediction, score = analyze_symptoms(transcription)
if "Error" in prediction:
logger.error(f"Symptom analysis error: {prediction}")
return prediction
feedback = (
"No health condition detected, consult a doctor if symptoms persist. This is not a medical diagnosis."
if prediction == "No health condition detected"
else f"Possible {prediction.lower()} detected based on symptoms like '{transcription.lower()}', consult a doctor. This is not a medical diagnosis."
)
logger.info(f"Feedback: {feedback}, Transcription: {transcription}, Prediction: {prediction}, Score: {score:.4f}")
# Save to Salesforce
save_to_salesforce(user_id, transcription, prediction, score, feedback, consent_granted)
try:
os.remove(audio_file)
logger.debug(f"Deleted audio file: {audio_file}")
except Exception as e:
logger.error(f"Failed to delete audio file: {str(e)}")
# Speak response
await speak_response(feedback)
return feedback
except Exception as e:
logger.error(f"Voice analysis failed: {str(e)}")
return f"Error: {str(e)}"
async def test_with_sample_audio(language="en", user_id="anonymous", consent_granted=True):
"""Test with synthetic audio."""
temp_dir = os.path.join(tempfile.gettempdir(), "audio_samples")
if not ensure_writable_dir(temp_dir):
fallback_dir = os.path.join(os.getcwd(), "temp_audio_samples")
if not ensure_writable_dir(fallback_dir):
logger.error(f"Temp directories {temp_dir} and {fallback_dir} not writable")
return f"Error: Temp directories not writable"
temp_dir = fallback_dir
sample_audio_path = os.path.join(temp_dir, "dummy_test.wav")
logger.info(f"Generating synthetic audio at: {sample_audio_path}")
sr = 16000
t = np.linspace(0, 2, 2 * sr)
freq_mod = 440 + 10 * np.sin(2 * np.pi * 0.5 * t)
amplitude_mod = 0.5 + 0.1 * np.sin(2 * np.pi * 0.3 * t)
noise = 0.01 * np.random.normal(0, 1, len(t))
dummy_audio = amplitude_mod * np.sin(2 * np.pi * freq_mod * t) + noise
try:
soundfile.write(dummy_audio, sr, sample_audio_path)
logger.info(f"Generated synthetic audio: {sample_audio_path}")
except Exception as e:
logger.error(f"Failed to write synthetic audio: {str(e)}")
return f"Error: Failed to generate synthetic audio: {str(e)}"
if not os.path.exists(sample_audio_path):
logger.error(f"Synthetic audio not created: {sample_audio_path}")
return f"Error: Synthetic audio not created: {sample_audio_path}"
mock_transcription = "I have a cough and sore throat"
logger.info(f"Mock transcription: {mock_transcription}")
prediction, score = analyze_symptoms(mock_transcription)
feedback = (
"No health condition detected, consult a doctor if symptoms persist. This is not a medical diagnosis."
if prediction == "No health condition detected"
else f"Possible {prediction.lower()} detected based on symptoms like '{mock_transcription.lower()}', consult a doctor. This is not a medical diagnosis."
)
logger.info(f"Test feedback: {feedback}, Prediction: {prediction}, Score: {score:.4f}")
# Save to Salesforce
save_to_salesforce(user_id, mock_transcription, prediction, score, feedback, consent_granted)
try:
os.remove(sample_audio_path)
logger.debug(f"Deleted test audio: {sample_audio_path}")
except Exception:
pass
return feedback
async def voicebot_interface(audio_file, language="en", user_id="anonymous", consent_granted=True):
"""Gradio interface wrapper."""
return await analyze_voice(audio_file, language, user_id, consent_granted)
# Gradio interface
iface = gr.Interface(
fn=voicebot_interface,
inputs=[
gr.Audio(type="filepath", label="Record or Upload Voice (WAV, MP3, FLAC, 1+ sec)"),
gr.Dropdown(["en", "es", "hi", "zh"], label="Language", value="en"),
gr.Textbox(label="User ID (optional)", value="anonymous"),
gr.Checkbox(label="Consent to store data", value=True)
],
outputs=gr.Textbox(label="Health Assessment Feedback"),
title="Smart Voicebot for Public Health",
description="Record or upload a voice sample describing symptoms (e.g., 'I have a cough') for preliminary health assessment. Supports English, Spanish, Hindi, Mandarin. Not a diagnostic tool. Data is encrypted and stored with consent. Complies with HIPAA/GDPR."
)
if __name__ == "__main__":
logger.info("Starting Voice Health Analyzer")
# Test with synthetic audio
loop = asyncio.get_event_loop()
print(loop.run_until_complete(test_with_sample_audio()))
iface.launch(server_name="0.0.0.0", server_port=7860)