import gradio as gr import librosa import numpy as np import os import hashlib from datetime import datetime from transformers import pipeline import soundfile import torch from tenacity import retry, stop_after_attempt, wait_fixed import logging import tempfile import shutil from simple_salesforce import Salesforce from dotenv import load_dotenv import pyttsx3 from cryptography.fernet import Fernet import asyncio import base64 # Set up logging logging.basicConfig( level=logging.DEBUG, format="%(asctime)s - %(levelname)s - %(message)s", handlers=[logging.FileHandler("voice_analyzer.log"), logging.StreamHandler()] ) logger = logging.getLogger(__name__) # Load environment variables load_dotenv() # Salesforce configuration SF_USERNAME = os.getenv("SF_USERNAME") SF_PASSWORD = os.getenv("SF_PASSWORD") SF_SECURITY_TOKEN = os.getenv("SF_SECURITY_TOKEN") SF_ENABLED = all([SF_USERNAME, SF_PASSWORD, SF_SECURITY_TOKEN]) sf = None if SF_ENABLED: try: sf = Salesforce( username=SF_USERNAME, password=SF_PASSWORD, security_token=SF_SECURITY_TOKEN ) logger.info("Salesforce connection established") except Exception as e: logger.error(f"Salesforce connection failed: {str(e)}") SF_ENABLED = False # Encryption setup (AES-256) ENCRYPTION_KEY = os.getenv("ENCRYPTION_KEY") or Fernet.generate_key() fernet = Fernet(ENCRYPTION_KEY) # Initialize text-to-speech with fallback tts_engine = None try: tts_engine = pyttsx3.init() tts_engine.setProperty("rate", 150) logger.info("pyttsx3 initialized successfully") except Exception as e: logger.warning(f"Failed to initialize pyttsx3: {str(e)}. Text-to-speech disabled.") # Initialize local models @retry(stop=stop_after_attempt(3), wait=wait_fixed(2)) def load_whisper_model(): try: model = pipeline( "automatic-speech-recognition", model="openai/whisper-large-v3", device=-1, # CPU; use device=0 for GPU model_kwargs={"use_safetensors": True} ) logger.info("Whisper-large-v3 model loaded successfully") return model except Exception as e: logger.error(f"Failed to load Whisper model: {str(e)}") raise @retry(stop=stop_after_attempt(3), wait=wait_fixed(2)) def load_symptom_model(): try: model = pipeline( "text-classification", model="abhirajeshbhai/symptom-2-disease-net", device=-1, model_kwargs={"use_safetensors": True}, return_all_scores=False ) logger.info("Symptom-2-Disease model loaded successfully") return model except Exception as e: logger.error(f"Failed to load Symptom-2-Disease model: {str(e)}") try: model = pipeline( "text-classification", model="distilbert-base-uncased", device=-1, return_all_scores=False ) logger.warning("Fallback to distilbert-base-uncased model") return model except Exception as fallback_e: logger.error(f"Fallback model failed: {str(fallback_e)}") raise whisper = None symptom_classifier = None is_fallback_model = False try: whisper = load_whisper_model() except Exception as e: logger.error(f"Whisper model initialization failed: {str(e)}") try: symptom_classifier = load_symptom_model() except Exception as e: logger.error(f"Symptom model initialization failed: {str(e)}") symptom_classifier = None is_fallback_model = True def encrypt_data(data): """Encrypt data using AES-256.""" try: if isinstance(data, str): data = data.encode() return fernet.encrypt(data).decode() except Exception as e: logger.error(f"Encryption failed: {str(e)}") return None def decrypt_data(data): """Decrypt AES-256 encrypted data.""" try: return fernet.decrypt(data.encode()).decode() except Exception as e: logger.error(f"Decryption failed: {str(e)}") return None def compute_file_hash(file_path): """Compute MD5 hash of encrypted file.""" try: hash_md5 = hashlib.md5() with open(file_path, "rb") as f: for chunk in iter(lambda: f.read(4096), b""): hash_md5.update(chunk) return hash_md5.hexdigest() except Exception as e: logger.error(f"Failed to compute file hash: {str(e)}") return "unknown" def ensure_writable_dir(directory): """Ensure directory exists and is writable.""" try: os.makedirs(directory, exist_ok=True) test_file = os.path.join(directory, "test") with open(test_file, "w") as f: f.write("test") os.remove(test_file) logger.debug(f"Directory {directory} is writable") return True except Exception as e: logger.error(f"Directory {directory} not writable: {str(e)}") return False async def transcribe_audio(audio_file, language="en"): """Transcribe audio using Whisper model.""" if not whisper: logger.error("Whisper model not loaded") return "Error: Whisper model not loaded" try: logger.debug(f"Transcribing audio: {audio_file} (language: {language})") if not isinstance(audio_file, (str, bytes, os.PathLike)) or not os.path.exists(audio_file): logger.error(f"Invalid or missing audio file: {audio_file}") return "Error: Invalid or missing audio file" audio, sr = librosa.load(audio_file, sr=16000) if len(audio) < 1600: logger.error("Audio too short") return "Error: Audio too short (<0.1s)" if np.max(np.abs(audio)) < 1e-4: logger.error("Audio too quiet") return "Error: Audio too quiet" with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as temp_wav: temp_path = temp_wav.name soundfile.write(audio, sr, temp_path) logger.debug(f"Saved temp WAV: {temp_path}") with torch.no_grad(): result = whisper(temp_path, language=language, generate_kwargs={"num_beams": 5}) transcription = result.get("text", "").strip() logger.info(f"Transcription: {transcription}") try: os.remove(temp_path) logger.debug(f"Deleted temp WAV: {temp_path}") except Exception as e: logger.error(f"Failed to delete temp WAV: {str(e)}") if not transcription: logger.error("Transcription empty") return "Error: Transcription empty" words = transcription.split() if len(words) > 5 and len(set(words)) < len(words) / 2: logger.error("Transcription repetitive") return "Error: Transcription repetitive" return transcription except Exception as e: logger.error(f"Transcription failed: {str(e)}") return f"Error: {str(e)}" def analyze_symptoms(text): """Analyze symptoms using Symptom-2-Disease model.""" if not symptom_classifier: logger.error("Symptom model not loaded") return "Error: Symptom model not loaded", 0.0 try: if not text or not isinstance(text, str) or "Error" in text: logger.error(f"Invalid text input: {text}") return "Error: No valid transcription", 0.0 with torch.no_grad(): result = symptom_classifier(text) logger.debug(f"Raw model output: {result}") # Exhaustive output validation prediction = "No health condition detected" score = 0.0 # Handle all possible output types if result is None: logger.warning("Model output is None") elif isinstance(result, (str, int, float, bool)): logger.warning(f"Invalid model output type: {type(result)}, value: {result}") elif isinstance(result, (tuple, list)): # Flatten nested tuples/lists flattened = [] def flatten(item, depth=0, max_depth=10): if depth > max_depth: logger.warning(f"Max recursion depth exceeded: {item}") return if isinstance(item, (tuple, list)): for subitem in item: flatten(subitem, depth + 1, max_depth) elif isinstance(item, dict): flattened.append(item) else: logger.warning(f"Skipping non-dict item: {item}") flatten(result) if not flattened: logger.warning("Flattened model output is empty") elif not all(isinstance(item, dict) for item in flattened): logger.warning(f"Non-dictionary items in flattened result: {flattened}") elif not all("label" in item and "score" in item for item in flattened): logger.warning(f"Missing label or score in flattened result: {flattened}") else: # Sort by score (descending) and take the highest valid_items = [ item for item in flattened if isinstance(item, dict) and "label" in item and "score" in item and isinstance(item["label"], str) and isinstance(item["score"], (int, float)) and 0 <= item["score"] <= 1 ] if valid_items: sorted_items = sorted(valid_items, key=lambda x: x["score"], reverse=True) prediction = sorted_items[0]["label"] score = sorted_items[0]["score"] elif isinstance(result, dict): logger.debug("Model returned single dictionary") if "label" in result and "score" in result: prediction = result["label"] score = result["score"] else: logger.warning(f"Missing label or score in dictionary: {result}") # Validate prediction and score if not isinstance(prediction, str): logger.warning(f"Invalid label type: {type(prediction)}, value: {prediction}") prediction = "No health condition detected" if not isinstance(score, (int, float)) or score < 0 or score > 1: logger.warning(f"Invalid score: {score}") score = 0.0 if is_fallback_model: logger.warning("Using fallback DistilBERT model") prediction = f"{prediction} (distilbert)" logger.info(f"Prediction: {prediction}, Score: {score:.4f}") return prediction, score except Exception as e: logger.error(f"Symptom analysis failed: {str(e)}") return "Error: Symptom analysis failed", 0.0 def save_to_salesforce(user_id, transcription, prediction, score, feedback, consent_granted): """Save analysis results to Salesforce.""" if not SF_ENABLED or not sf: logger.debug("Salesforce integration disabled or not connected") return try: if consent_granted: encrypted_transcription = encrypt_data(transcription) encrypted_feedback = encrypt_data(feedback) sf.Health_Analysis__c.create({ "User_ID__c": user_id, "Transcription__c": encrypted_transcription[:255], "Prediction__c": prediction[:255], "Confidence_Score__c": float(score), "Feedback__c": encrypted_feedback[:255], "Analysis_Date__c": datetime.utcnow().strftime("%Y-%m-%d") }) logger.info("Saved analysis to Salesforce") except Exception as e: logger.error(f"Failed to save to Salesforce: {str(e)}") def generate_report(): """Generate usage report via Salesforce.""" if not SF_ENABLED or not sf: return "Error: Salesforce not connected" try: query = "SELECT COUNT(Id), Prediction__c FROM Health_Analysis__c GROUP BY Prediction__c" result = sf.query(query) report = "Health Analysis Report\n" for record in result["records"]: count = record["expr0"] prediction = record["Prediction__c"] report += f"Condition: {prediction}, Count: {count}\n" logger.info("Generated usage report") return report except Exception as e: logger.error(f"Failed to generate report: {str(e)}") return f"Error: {str(e)}" async def speak_response(text): """Convert text to speech.""" if not tts_engine: logger.warning("Text-to-speech unavailable; skipping") return try: def sync_speak(): tts_engine.say(text) tts_engine.runAndWait() loop = asyncio.get_event_loop() await loop.run_in_executor(None, sync_speak) logger.debug("Spoke response") except Exception as e: logger.error(f"Text-to-speech failed: {str(e)}") async def analyze_voice(audio_file, language="en", user_id="anonymous", consent_granted=True): """Analyze voice for health indicators.""" try: logger.debug(f"Starting analysis for audio_file: {audio_file}, language: {language}") if audio_file is None or not isinstance(audio_file, (str, bytes, os.PathLike)): logger.error(f"Invalid audio file input: {audio_file}") return "Error: No audio file provided" temp_dir = os.path.join(tempfile.gettempdir(), "gradio") if not ensure_writable_dir(temp_dir): fallback_dir = os.path.join(os.getcwd(), "temp_gradio") if not ensure_writable_dir(fallback_dir): logger.error(f"Temp directories {temp_dir} and {fallback_dir} not writable") return "Error: Temp directories not writable" temp_dir = fallback_dir if not os.path.exists(audio_file): logger.error(f"Audio file not found: {audio_file}") return "Error: Audio file not found" unique_path = os.path.join( temp_dir, f"audio_{datetime.utcnow().strftime('%Y%m%d%H%M%S%f')}_{os.path.basename(audio_file or 'unknown.wav')}" ) try: shutil.copy(audio_file, unique_path) audio_file = unique_path logger.debug(f"Copied to: {audio_file}") except Exception as e: logger.error(f"Failed to copy audio file: {str(e)}") return f"Error: Failed to copy audio file: {str(e)}" file_hash = compute_file_hash(audio_file) logger.info(f"Processing audio, Hash: {file_hash}") audio, sr = librosa.load(audio_file, sr=16000) logger.info(f"Audio loaded: shape={audio.shape}, SR={sr}, Duration={len(audio)/sr:.2f}s") transcription = await transcribe_audio(audio_file, language) if "Error" in transcription: logger.error(f"Transcription error: {transcription}") return transcription if any(keyword in transcription.lower() for keyword in ["medicine", "treatment"]): logger.warning("Medication query detected") feedback = "Error: This tool does not provide medication advice" await speak_response(feedback) return feedback prediction, score = analyze_symptoms(transcription) if "Error" in prediction: logger.error(f"Symptom analysis error: {prediction}") return prediction feedback = ( "No health condition detected, consult a doctor if symptoms persist. This is not a medical diagnosis." if prediction == "No health condition detected" else f"Possible {prediction.lower()} detected based on symptoms like '{transcription.lower()}', consult a doctor. This is not a medical diagnosis." ) logger.info(f"Feedback: {feedback}, Transcription: {transcription}, Prediction: {prediction}, Score: {score:.4f}") # Save to Salesforce save_to_salesforce(user_id, transcription, prediction, score, feedback, consent_granted) try: os.remove(audio_file) logger.debug(f"Deleted audio file: {audio_file}") except Exception as e: logger.error(f"Failed to delete audio file: {str(e)}") # Speak response await speak_response(feedback) return feedback except Exception as e: logger.error(f"Voice analysis failed: {str(e)}") return f"Error: {str(e)}" async def test_with_sample_audio(language="en", user_id="anonymous", consent_granted=True): """Test with synthetic audio.""" temp_dir = os.path.join(tempfile.gettempdir(), "audio_samples") if not ensure_writable_dir(temp_dir): fallback_dir = os.path.join(os.getcwd(), "temp_audio_samples") if not ensure_writable_dir(fallback_dir): logger.error(f"Temp directories {temp_dir} and {fallback_dir} not writable") return f"Error: Temp directories not writable" temp_dir = fallback_dir sample_audio_path = os.path.join(temp_dir, "dummy_test.wav") logger.info(f"Generating synthetic audio at: {sample_audio_path}") sr = 16000 t = np.linspace(0, 2, 2 * sr) freq_mod = 440 + 10 * np.sin(2 * np.pi * 0.5 * t) amplitude_mod = 0.5 + 0.1 * np.sin(2 * np.pi * 0.3 * t) noise = 0.01 * np.random.normal(0, 1, len(t)) dummy_audio = amplitude_mod * np.sin(2 * np.pi * freq_mod * t) + noise try: soundfile.write(dummy_audio, sr, sample_audio_path) logger.info(f"Generated synthetic audio: {sample_audio_path}") except Exception as e: logger.error(f"Failed to write synthetic audio: {str(e)}") return f"Error: Failed to generate synthetic audio: {str(e)}" if not os.path.exists(sample_audio_path): logger.error(f"Synthetic audio not created: {sample_audio_path}") return f"Error: Synthetic audio not created: {sample_audio_path}" mock_transcription = "I have a cough and sore throat" logger.info(f"Mock transcription: {mock_transcription}") prediction, score = analyze_symptoms(mock_transcription) feedback = ( "No health condition detected, consult a doctor if symptoms persist. This is not a medical diagnosis." if prediction == "No health condition detected" else f"Possible {prediction.lower()} detected based on symptoms like '{mock_transcription.lower()}', consult a doctor. This is not a medical diagnosis." ) logger.info(f"Test feedback: {feedback}, Prediction: {prediction}, Score: {score:.4f}") # Save to Salesforce save_to_salesforce(user_id, mock_transcription, prediction, score, feedback, consent_granted) try: os.remove(sample_audio_path) logger.debug(f"Deleted test audio: {sample_audio_path}") except Exception: pass return feedback async def voicebot_interface(audio_file, language="en", user_id="anonymous", consent_granted=True): """Gradio interface wrapper.""" return await analyze_voice(audio_file, language, user_id, consent_granted) # Gradio interface iface = gr.Interface( fn=voicebot_interface, inputs=[ gr.Audio(type="filepath", label="Record or Upload Voice (WAV, MP3, FLAC, 1+ sec)"), gr.Dropdown(["en", "es", "hi", "zh"], label="Language", value="en"), gr.Textbox(label="User ID (optional)", value="anonymous"), gr.Checkbox(label="Consent to store data", value=True) ], outputs=gr.Textbox(label="Health Assessment Feedback"), title="Smart Voicebot for Public Health", description="Record or upload a voice sample describing symptoms (e.g., 'I have a cough') for preliminary health assessment. Supports English, Spanish, Hindi, Mandarin. Not a diagnostic tool. Data is encrypted and stored with consent. Complies with HIPAA/GDPR." ) if __name__ == "__main__": logger.info("Starting Voice Health Analyzer") # Test with synthetic audio loop = asyncio.get_event_loop() print(loop.run_until_complete(test_with_sample_audio())) iface.launch(server_name="0.0.0.0", server_port=7860)