import gradio as gr import subprocess import os import threading import time import librosa import requests import numpy as np # Added for sounddevice callback from datetime import datetime from transformers import pipeline # 🎙️ Load detection model try: print("[INFO] Loading Hugging Face model...") # The current model includes 'screaming' as a label, but might misclassify # high-pitched screams as 'crying' due to its general non-verbal vocalization training. # For higher accuracy in distinguishing screams from crying, fine-tuning on a specific # dataset or exploring other specialized models would be recommended. classifier = pipeline( "audio-classification", model="padmalcom/wav2vec2-large-nonverbalvocalization-classification" ) print(f"[INFO] Model labels: {classifier.model.config.id2label.values()}") except Exception as e: print(f"[ERROR] Failed to load model: {e}") classifier = None # === Audio Conversion === def convert_audio(input_path, output_path="input.wav"): """ Converts audio files to a standard WAV format (16kHz, mono, 16-bit PCM). This ensures compatibility with the Hugging Face model. """ try: cmd = [ "ffmpeg", "-i", input_path, "-acodec", "pcm_s16le", "-ar", "16000", "-ac", "1", output_path, "-y" ] # Use subprocess.run with capture_output=True for better error handling result = subprocess.run(cmd, check=True, capture_output=True, text=True) print(f"[DEBUG] Audio converted to WAV: {output_path}") if result.stdout: print(f"[DEBUG] ffmpeg stdout: {result.stdout.strip()}") if result.stderr: print(f"[DEBUG] ffmpeg stderr: {result.stderr.strip()}") return output_path except subprocess.CalledProcessError as e: print(f"[ERROR] ffmpeg conversion failed: {e.stderr.strip()}") raise RuntimeError(f"Audio conversion failed: {e.stderr.strip()}") except FileNotFoundError: print("[ERROR] ffmpeg command not found. Please ensure ffmpeg is installed and in your PATH.") raise RuntimeError("ffmpeg not found. Please install it.") except Exception as e: print(f"[ERROR] Unexpected error during audio conversion: {e}") raise RuntimeError(f"Unexpected audio conversion error: {e}") # === Scream Detection === def detect_scream(audio_path): """ Detects screams in an audio file using the loaded Hugging Face model. Returns the top detected label and its confidence score. """ if classifier is None: return {"label": "model_not_loaded", "score": 0.0} try: # Librosa loads audio, automatically resamples if needed audio, sr = librosa.load(audio_path, sr=16000) print(f"[DEBUG] Loaded audio: {len(audio)} samples at {sr} Hz") if len(audio) == 0: print("[WARNING] Empty audio file provided for detection.") return {"label": "no_audio_data", "score": 0.0} # The pipeline expects raw audio data (numpy array) results = classifier(audio) print(f"[DEBUG] Model output: {results}") if not results: print("[WARNING] Model returned no detection results.") return {"label": "no_detection", "score": 0.0} # Sort results by score in descending order to get the top prediction top_prediction = sorted(results, key=lambda x: x['score'], reverse=True)[0] # Ensure label is lowercase for consistent comparison return {"label": top_prediction["label"].lower(), "score": float(top_prediction["score"]) * 100} except Exception as e: print(f"[ERROR] Detection failed for {audio_path}: {e}") return {"label": "detection_error", "score": 0.0} # === Send Alert to Salesforce === def send_salesforce_alert(audio_meta, detection): """ Sends an alert payload to a configured Salesforce endpoint. Retrieves Salesforce URL and token from environment variables. """ SF_URL = os.getenv("SF_ALERT_URL") SF_TOKEN = os.getenv("SF_API_TOKEN") if not SF_URL or not SF_TOKEN: print("[ERROR] Salesforce configuration (SF_ALERT_URL or SF_API_TOKEN) missing.") raise RuntimeError("Salesforce configuration missing. Cannot send alert.") headers = { "Authorization": f"Bearer {SF_TOKEN}", "Content-Type": "application/json" } payload = { "AudioName": audio_meta.get("filename", "unknown_audio"), "DetectedLabel": detection["label"], "Score": round(detection["score"], 2), # Round score for cleaner data "AlertLevel": audio_meta["alert_level"], "Timestamp": audio_meta["timestamp"], } print(f"[DEBUG] Sending payload to Salesforce: {payload}") try: resp = requests.post(SF_URL, json=payload, headers=headers, timeout=10) # Increased timeout resp.raise_for_status() # Raises HTTPError for bad responses (4xx or 5xx) print(f"[INFO] Salesforce alert sent successfully. Response: {resp.json()}") return resp.json() except requests.exceptions.Timeout: print("[ERROR] Salesforce alert request timed out.") raise RuntimeError("Salesforce alert timed out.") except requests.exceptions.RequestException as e: print(f"[ERROR] Error sending Salesforce alert: {e}") # Attempt to print response content if available for more details if hasattr(e, 'response') and e.response is not None: print(f"[ERROR] Salesforce response content: {e.response.text}") raise RuntimeError(f"Failed to send Salesforce alert: {e}") # === Main Gradio Function === def process_uploaded(audio_file, system_state, high_thresh, med_thresh): """ Main function for Gradio interface. Processes uploaded audio, performs scream detection, and sends alerts to Salesforce based on thresholds. """ if system_state != "Start": return "🛑 System is stopped. Change 'System State' to 'Start' to enable processing." if audio_file is None: return "Please upload an audio file or record one." print(f"[INFO] Processing uploaded audio: {audio_file}") try: # Convert audio to the required WAV format wav_path = convert_audio(audio_file) except RuntimeError as e: return f"❌ Audio conversion error: {e}" except Exception as e: return f"❌ An unexpected error occurred during audio conversion: {e}" # Perform scream detection detection = detect_scream(wav_path) label = detection["label"] score = detection["score"] # Determine risk level based on detected label and score alert_message = f"🟢 Detection: {label} ({score:.1f}%) — " level = "None" # Check for 'scream' or related labels. The model might output 'screaming' or similar. # It's important to check if 'scream' is *in* the label, as some models might output # variations or combine labels. if "scream" in label: if score >= high_thresh: level = "High-Risk" elif score >= med_thresh: level = "Medium-Risk" # Add explicit check for 'crying' if it's a known misclassification target elif "crying" in label and score >= med_thresh: # Consider if crying should also trigger an alert # This part can be adjusted based on whether 'crying' is also an alertable event. # For now, we'll treat it as 'None' unless explicitly defined as a risk. level = "None" # Or set to "Low-Risk" if crying is also a concern. alert_message += f"Alert Level: {level}" audio_meta = { "filename": os.path.basename(audio_file), "timestamp": datetime.utcnow().isoformat() + "Z", "alert_level": level } # Send to Salesforce if a risk level is determined if level in ("High-Risk", "Medium-Risk"): try: sf_resp = send_salesforce_alert(audio_meta, detection) alert_message = f"✅ Detection: {label} ({score:.1f}%) — {level} — Alert sent to Salesforce (ID: {sf_resp.get('id', 'N/A')})" except RuntimeError as e: alert_message = f"⚠️ Detection: {label} ({score:.1f}%) — {level} — Salesforce ERROR: {e}" except Exception as e: alert_message = f"⚠️ Detection: {label} ({score:.1f}%) — {level} — Unexpected Salesforce error: {e}" # Clean up the converted WAV file if os.path.exists(wav_path): os.remove(wav_path) print(f"[DEBUG] Cleaned up {wav_path}") return alert_message # === Gradio UI === # Ensure the title and description align with the requirements iface = gr.Interface( fn=process_uploaded, inputs=[ gr.Audio(type="filepath", label="Upload Audio (or Record)"), gr.Radio(["Start", "Stop"], label="System State", value="Start", info="Set to 'Start' to enable audio processing and alerts."), gr.Slider(0, 100, value=80, step=1, label="High-Risk Threshold (%)", info="Confidence score for High-Risk scream detection."), gr.Slider(0, 100, value=50, step=1, label="Medium-Risk Threshold (%)", info="Confidence score for Medium-Risk scream detection.") ], outputs="text", title="📢 Emotion-Triggered Alarm System", description=""" 🎧 Upload or record audio for real-time scream detection. ⚠️ Alerts are sent to Salesforce for High-Risk (confidence > 80%) and Medium-Risk (confidence 50-80%) detections. The system aims to detect panic-indicating screams. """, allow_flagging="never" # As per requirement ) # === Optional Real-Time Listener (for Raspberry Pi or similar) === # This section demonstrates how a real-time listener could be implemented. # It requires `sounddevice` and `numpy`. # For actual deployment, environment variables for SF_URL and SF_TOKEN must be set. # This part is commented out by default as it requires specific hardware/setup. def pi_listener(high_thresh=80, med_thresh=50, interval=1.0): """ Simulates a real-time audio listener for devices like Raspberry Pi. Captures audio chunks, processes them, and sends alerts. """ try: import sounddevice as sd import numpy as np except ImportError: print("[ERROR] sounddevice or numpy not found. Real-time listener cannot be started.") print("Please install them: pip install sounddevice numpy") return if classifier is None: print("[ERROR] Model not loaded. Real-time listener cannot operate.") return def callback(indata, frames, time_info, status): """Callback function for sounddevice to process audio chunks.""" if status: print(f"[WARNING] Sounddevice status: {status}") # Ensure indata is a 1D array of float32 wav = indata.squeeze() if wav.ndim > 1: wav = wav[:, 0] # Take first channel if stereo wav = wav.astype(np.float32) if len(wav) == 0: return # Skip if no audio data try: # Classify the audio chunk detection_results = classifier(wav) if not detection_results: return # Get the top prediction top_prediction = sorted(detection_results, key=lambda x: x['score'], reverse=True)[0] lbl, sc = (top_prediction["label"].lower(), float(top_prediction["score"]) * 100) level = "None" if "scream" in lbl: # Check if 'scream' is in the label if sc >= high_thresh: level = "High-Risk" elif sc >= med_thresh: level = "Medium-Risk" if level != "None": timestamp = datetime.utcnow().isoformat() + "Z" audio_meta = { "filename": f"live-stream-{timestamp}", "timestamp": timestamp, "alert_level": level } detection_info = {"label": lbl, "score": sc} try: send_salesforce_alert(audio_meta, detection_info) print(f"[{timestamp}] {level} scream detected ({sc:.1f}%) – alert sent.") except RuntimeError as e: print(f"[{timestamp}] {level} scream detected ({sc:.1f}%) – Salesforce alert failed: {e}") except Exception as e: print(f"[{timestamp}] {level} scream detected ({sc:.1f}%) – Unexpected error sending alert: {e}") except Exception as e: print(f"[ERROR] Error in real-time detection callback: {e}") # Start audio stream try: # Adjust blocksize if needed for performance vs. latency with sd.InputStream(channels=1, samplerate=16000, callback=callback, blocksize=16000): # 1 second chunks print("🔊 Real-time detection started...") while True: time.sleep(interval) # Keep the main thread alive except sd.PortAudioError as e: print(f"[ERROR] PortAudio error: {e}. Check your audio device setup.") except Exception as e: print(f"[ERROR] An unexpected error occurred in the real-time listener: {e}") # === App Entry === if __name__ == "__main__": # Optional: enable real-time listener for Raspberry Pi or similar. # Uncomment the lines below to enable it. # Remember to install sounddevice and numpy: pip install sounddevice numpy # Also, ensure your system has PortAudio installed for sounddevice to work. # pi_thread = threading.Thread(target=pi_listener, daemon=True) # pi_thread.start() iface.launch(server_name="0.0.0.0", server_port=int(os.getenv("PORT", 7860)))