|
import gradio as gr |
|
import subprocess |
|
import os |
|
import threading |
|
import time |
|
import librosa |
|
import requests |
|
import numpy as np |
|
from datetime import datetime |
|
from transformers import pipeline |
|
|
|
|
|
try: |
|
print("[INFO] Loading Hugging Face model...") |
|
|
|
|
|
|
|
|
|
classifier = pipeline( |
|
"audio-classification", |
|
model="padmalcom/wav2vec2-large-nonverbalvocalization-classification" |
|
) |
|
print(f"[INFO] Model labels: {classifier.model.config.id2label.values()}") |
|
except Exception as e: |
|
print(f"[ERROR] Failed to load model: {e}") |
|
classifier = None |
|
|
|
|
|
def convert_audio(input_path, output_path="input.wav"): |
|
""" |
|
Converts audio files to a standard WAV format (16kHz, mono, 16-bit PCM). |
|
This ensures compatibility with the Hugging Face model. |
|
""" |
|
try: |
|
cmd = [ |
|
"ffmpeg", "-i", input_path, |
|
"-acodec", "pcm_s16le", "-ar", "16000", "-ac", "1", |
|
output_path, "-y" |
|
] |
|
|
|
result = subprocess.run(cmd, check=True, capture_output=True, text=True) |
|
print(f"[DEBUG] Audio converted to WAV: {output_path}") |
|
if result.stdout: |
|
print(f"[DEBUG] ffmpeg stdout: {result.stdout.strip()}") |
|
if result.stderr: |
|
print(f"[DEBUG] ffmpeg stderr: {result.stderr.strip()}") |
|
return output_path |
|
except subprocess.CalledProcessError as e: |
|
print(f"[ERROR] ffmpeg conversion failed: {e.stderr.strip()}") |
|
raise RuntimeError(f"Audio conversion failed: {e.stderr.strip()}") |
|
except FileNotFoundError: |
|
print("[ERROR] ffmpeg command not found. Please ensure ffmpeg is installed and in your PATH.") |
|
raise RuntimeError("ffmpeg not found. Please install it.") |
|
except Exception as e: |
|
print(f"[ERROR] Unexpected error during audio conversion: {e}") |
|
raise RuntimeError(f"Unexpected audio conversion error: {e}") |
|
|
|
|
|
def detect_scream(audio_path): |
|
""" |
|
Detects screams in an audio file using the loaded Hugging Face model. |
|
Returns the top detected label and its confidence score. |
|
""" |
|
if classifier is None: |
|
return {"label": "model_not_loaded", "score": 0.0} |
|
|
|
try: |
|
|
|
audio, sr = librosa.load(audio_path, sr=16000) |
|
print(f"[DEBUG] Loaded audio: {len(audio)} samples at {sr} Hz") |
|
|
|
if len(audio) == 0: |
|
print("[WARNING] Empty audio file provided for detection.") |
|
return {"label": "no_audio_data", "score": 0.0} |
|
|
|
|
|
results = classifier(audio) |
|
print(f"[DEBUG] Model output: {results}") |
|
|
|
if not results: |
|
print("[WARNING] Model returned no detection results.") |
|
return {"label": "no_detection", "score": 0.0} |
|
|
|
|
|
top_prediction = sorted(results, key=lambda x: x['score'], reverse=True)[0] |
|
|
|
|
|
return {"label": top_prediction["label"].lower(), "score": float(top_prediction["score"]) * 100} |
|
except Exception as e: |
|
print(f"[ERROR] Detection failed for {audio_path}: {e}") |
|
return {"label": "detection_error", "score": 0.0} |
|
|
|
|
|
def send_salesforce_alert(audio_meta, detection): |
|
""" |
|
Sends an alert payload to a configured Salesforce endpoint. |
|
Retrieves Salesforce URL and token from environment variables. |
|
""" |
|
SF_URL = os.getenv("SF_ALERT_URL") |
|
SF_TOKEN = os.getenv("SF_API_TOKEN") |
|
|
|
if not SF_URL or not SF_TOKEN: |
|
print("[ERROR] Salesforce configuration (SF_ALERT_URL or SF_API_TOKEN) missing.") |
|
raise RuntimeError("Salesforce configuration missing. Cannot send alert.") |
|
|
|
headers = { |
|
"Authorization": f"Bearer {SF_TOKEN}", |
|
"Content-Type": "application/json" |
|
} |
|
payload = { |
|
"AudioName": audio_meta.get("filename", "unknown_audio"), |
|
"DetectedLabel": detection["label"], |
|
"Score": round(detection["score"], 2), |
|
"AlertLevel": audio_meta["alert_level"], |
|
"Timestamp": audio_meta["timestamp"], |
|
} |
|
|
|
print(f"[DEBUG] Sending payload to Salesforce: {payload}") |
|
try: |
|
resp = requests.post(SF_URL, json=payload, headers=headers, timeout=10) |
|
resp.raise_for_status() |
|
print(f"[INFO] Salesforce alert sent successfully. Response: {resp.json()}") |
|
return resp.json() |
|
except requests.exceptions.Timeout: |
|
print("[ERROR] Salesforce alert request timed out.") |
|
raise RuntimeError("Salesforce alert timed out.") |
|
except requests.exceptions.RequestException as e: |
|
print(f"[ERROR] Error sending Salesforce alert: {e}") |
|
|
|
if hasattr(e, 'response') and e.response is not None: |
|
print(f"[ERROR] Salesforce response content: {e.response.text}") |
|
raise RuntimeError(f"Failed to send Salesforce alert: {e}") |
|
|
|
|
|
def process_uploaded(audio_file, system_state, high_thresh, med_thresh): |
|
""" |
|
Main function for Gradio interface. Processes uploaded audio, |
|
performs scream detection, and sends alerts to Salesforce based on thresholds. |
|
""" |
|
if system_state != "Start": |
|
return "π System is stopped. Change 'System State' to 'Start' to enable processing." |
|
|
|
if audio_file is None: |
|
return "Please upload an audio file or record one." |
|
|
|
print(f"[INFO] Processing uploaded audio: {audio_file}") |
|
|
|
try: |
|
|
|
wav_path = convert_audio(audio_file) |
|
except RuntimeError as e: |
|
return f"β Audio conversion error: {e}" |
|
except Exception as e: |
|
return f"β An unexpected error occurred during audio conversion: {e}" |
|
|
|
|
|
detection = detect_scream(wav_path) |
|
label = detection["label"] |
|
score = detection["score"] |
|
|
|
|
|
alert_message = f"π’ Detection: {label} ({score:.1f}%) β " |
|
level = "None" |
|
|
|
|
|
|
|
|
|
if "scream" in label: |
|
if score >= high_thresh: |
|
level = "High-Risk" |
|
elif score >= med_thresh: |
|
level = "Medium-Risk" |
|
|
|
elif "crying" in label and score >= med_thresh: |
|
|
|
|
|
level = "None" |
|
|
|
alert_message += f"Alert Level: {level}" |
|
|
|
audio_meta = { |
|
"filename": os.path.basename(audio_file), |
|
"timestamp": datetime.utcnow().isoformat() + "Z", |
|
"alert_level": level |
|
} |
|
|
|
|
|
if level in ("High-Risk", "Medium-Risk"): |
|
try: |
|
sf_resp = send_salesforce_alert(audio_meta, detection) |
|
alert_message = f"β
Detection: {label} ({score:.1f}%) β {level} β Alert sent to Salesforce (ID: {sf_resp.get('id', 'N/A')})" |
|
except RuntimeError as e: |
|
alert_message = f"β οΈ Detection: {label} ({score:.1f}%) β {level} β Salesforce ERROR: {e}" |
|
except Exception as e: |
|
alert_message = f"β οΈ Detection: {label} ({score:.1f}%) β {level} β Unexpected Salesforce error: {e}" |
|
|
|
|
|
if os.path.exists(wav_path): |
|
os.remove(wav_path) |
|
print(f"[DEBUG] Cleaned up {wav_path}") |
|
|
|
return alert_message |
|
|
|
|
|
|
|
iface = gr.Interface( |
|
fn=process_uploaded, |
|
inputs=[ |
|
gr.Audio(type="filepath", label="Upload Audio (or Record)"), |
|
gr.Radio(["Start", "Stop"], label="System State", value="Start", |
|
info="Set to 'Start' to enable audio processing and alerts."), |
|
gr.Slider(0, 100, value=80, step=1, label="High-Risk Threshold (%)", |
|
info="Confidence score for High-Risk scream detection."), |
|
gr.Slider(0, 100, value=50, step=1, label="Medium-Risk Threshold (%)", |
|
info="Confidence score for Medium-Risk scream detection.") |
|
], |
|
outputs="text", |
|
title="π’ Emotion-Triggered Alarm System", |
|
description=""" |
|
π§ Upload or record audio for real-time scream detection. |
|
β οΈ Alerts are sent to Salesforce for High-Risk (confidence > 80%) and Medium-Risk (confidence 50-80%) detections. |
|
The system aims to detect panic-indicating screams. |
|
""", |
|
allow_flagging="never" |
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
def pi_listener(high_thresh=80, med_thresh=50, interval=1.0): |
|
""" |
|
Simulates a real-time audio listener for devices like Raspberry Pi. |
|
Captures audio chunks, processes them, and sends alerts. |
|
""" |
|
try: |
|
import sounddevice as sd |
|
import numpy as np |
|
except ImportError: |
|
print("[ERROR] sounddevice or numpy not found. Real-time listener cannot be started.") |
|
print("Please install them: pip install sounddevice numpy") |
|
return |
|
|
|
if classifier is None: |
|
print("[ERROR] Model not loaded. Real-time listener cannot operate.") |
|
return |
|
|
|
def callback(indata, frames, time_info, status): |
|
"""Callback function for sounddevice to process audio chunks.""" |
|
if status: |
|
print(f"[WARNING] Sounddevice status: {status}") |
|
|
|
|
|
wav = indata.squeeze() |
|
if wav.ndim > 1: |
|
wav = wav[:, 0] |
|
wav = wav.astype(np.float32) |
|
|
|
if len(wav) == 0: |
|
return |
|
|
|
try: |
|
|
|
detection_results = classifier(wav) |
|
if not detection_results: |
|
return |
|
|
|
|
|
top_prediction = sorted(detection_results, key=lambda x: x['score'], reverse=True)[0] |
|
lbl, sc = (top_prediction["label"].lower(), float(top_prediction["score"]) * 100) |
|
|
|
level = "None" |
|
if "scream" in lbl: |
|
if sc >= high_thresh: |
|
level = "High-Risk" |
|
elif sc >= med_thresh: |
|
level = "Medium-Risk" |
|
|
|
if level != "None": |
|
timestamp = datetime.utcnow().isoformat() + "Z" |
|
audio_meta = { |
|
"filename": f"live-stream-{timestamp}", |
|
"timestamp": timestamp, |
|
"alert_level": level |
|
} |
|
detection_info = {"label": lbl, "score": sc} |
|
|
|
try: |
|
send_salesforce_alert(audio_meta, detection_info) |
|
print(f"[{timestamp}] {level} scream detected ({sc:.1f}%) β alert sent.") |
|
except RuntimeError as e: |
|
print(f"[{timestamp}] {level} scream detected ({sc:.1f}%) β Salesforce alert failed: {e}") |
|
except Exception as e: |
|
print(f"[{timestamp}] {level} scream detected ({sc:.1f}%) β Unexpected error sending alert: {e}") |
|
|
|
except Exception as e: |
|
print(f"[ERROR] Error in real-time detection callback: {e}") |
|
|
|
|
|
try: |
|
|
|
with sd.InputStream(channels=1, samplerate=16000, callback=callback, blocksize=16000): |
|
print("π Real-time detection started...") |
|
while True: |
|
time.sleep(interval) |
|
except sd.PortAudioError as e: |
|
print(f"[ERROR] PortAudio error: {e}. Check your audio device setup.") |
|
except Exception as e: |
|
print(f"[ERROR] An unexpected error occurred in the real-time listener: {e}") |
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
iface.launch(server_name="0.0.0.0", server_port=int(os.getenv("PORT", 7860))) |
|
|