safetyMaster / gradio_interface.py
mrvero's picture
Upload folder using huggingface_hub
0469d65 verified
#!/usr/bin/env python3
"""
SafetyMaster Pro - Gradio Interface
Real-time safety equipment detection with modern web UI
Optimized for easy deployment on Hugging Face Spaces, Gradio Cloud, and other platforms
"""
import gradio as gr
import cv2
import numpy as np
import PIL.Image
import time
import json
import os
from datetime import datetime
from typing import Dict, List, Tuple, Optional
import threading
import queue
# Import our existing safety detector
from safety_detector import SafetyDetector
from camera_manager import CameraManager
class SafetyMasterGradio:
"""Gradio interface for SafetyMaster Pro"""
def __init__(self):
"""Initialize the Gradio interface"""
self.detector = None
self.camera_manager = None
self.monitoring_active = False
self.violation_log = []
self.frame_queue = queue.Queue(maxsize=10)
# Initialize detector
self._initialize_detector()
def _initialize_detector(self):
"""Initialize the safety detector"""
try:
print("πŸ€– Loading AI model for safety detection...")
self.detector = SafetyDetector()
print("βœ… Safety detector initialized successfully")
return True
except Exception as e:
print(f"❌ Error initializing detector: {e}")
return False
def detect_safety_violations_image(self, image: PIL.Image.Image) -> Tuple[PIL.Image.Image, str, str]:
"""
Detect safety violations in uploaded image
Args:
image: PIL Image from Gradio
Returns:
Tuple of (annotated_image, violations_json, summary_text)
"""
if image is None:
return None, "No image provided", "Please upload an image"
if self.detector is None:
return image, "Detector not initialized", "Error: AI model not loaded"
try:
# Convert PIL to OpenCV format
cv_image = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR)
# Run detection
results = self.detector.detect_safety_violations(cv_image)
# Draw annotations
annotated_frame = self.detector.draw_detections(cv_image, results)
# Convert back to PIL for Gradio
annotated_image = PIL.Image.fromarray(cv2.cvtColor(annotated_frame, cv2.COLOR_BGR2RGB))
# Create violation summary
violations = results.get('violations', [])
people_count = results.get('people_count', 0)
safety_equipment = results.get('safety_equipment', {})
# Format violations as JSON
violations_json = json.dumps({
'people_detected': people_count,
'safety_equipment_detected': safety_equipment,
'violations': violations,
'processing_time': results.get('processing_time', 0),
'timestamp': datetime.now().isoformat()
}, indent=2)
# Create human-readable summary
summary_parts = [
f"πŸ‘₯ People Detected: {people_count}",
f"⚑ Processing Time: {results.get('processing_time', 0):.3f}s"
]
if safety_equipment:
summary_parts.append("\nπŸ›‘οΈ Safety Equipment Detected:")
for equipment, count in safety_equipment.items():
if count > 0:
summary_parts.append(f" β€’ {equipment.replace('_', ' ').title()}: {count}")
if violations:
summary_parts.append(f"\n⚠️ Safety Violations Found: {len(violations)}")
for violation in violations:
severity_emoji = "πŸ”΄" if violation.get('severity') == 'high' else "🟑"
summary_parts.append(f" {severity_emoji} {violation.get('description', 'Unknown violation')}")
else:
summary_parts.append("\nβœ… No Safety Violations Detected")
summary_text = "\n".join(summary_parts)
# Log violation if any
if violations:
self._log_violation(violations, 'image_upload')
return annotated_image, violations_json, summary_text
except Exception as e:
error_msg = f"Error processing image: {str(e)}"
return image, f'{{"error": "{error_msg}"}}', f"❌ {error_msg}"
def start_camera_monitoring(self) -> Tuple[str, str]:
"""Start real-time camera monitoring"""
try:
if self.monitoring_active:
return "⚠️ Monitoring already active", "Camera monitoring is already running"
# Initialize camera
self.camera_manager = CameraManager(source=0)
if not self.camera_manager.start_capture():
return "❌ Failed to start camera", "Could not access camera. Please check permissions."
self.monitoring_active = True
# Start monitoring thread
monitor_thread = threading.Thread(target=self._camera_monitoring_loop, daemon=True)
monitor_thread.start()
return "βœ… Camera monitoring started", "Real-time safety monitoring is now active"
except Exception as e:
return f"❌ Error: {str(e)}", f"Failed to start monitoring: {str(e)}"
def stop_camera_monitoring(self) -> Tuple[str, str]:
"""Stop real-time camera monitoring"""
try:
self.monitoring_active = False
if self.camera_manager:
self.camera_manager.stop_capture()
self.camera_manager = None
return "πŸ›‘ Camera monitoring stopped", "Real-time monitoring has been stopped"
except Exception as e:
return f"❌ Error: {str(e)}", f"Failed to stop monitoring: {str(e)}"
def _camera_monitoring_loop(self):
"""Background loop for camera monitoring"""
while self.monitoring_active and self.camera_manager:
try:
frame_data = self.camera_manager.get_latest_frame()
if frame_data is not None:
frame, timestamp = frame_data
# Run detection
results = self.detector.detect_safety_violations(frame)
# Draw annotations
annotated_frame = self.detector.draw_detections(frame, results)
# Convert to PIL for Gradio
pil_image = PIL.Image.fromarray(cv2.cvtColor(annotated_frame, cv2.COLOR_BGR2RGB))
# Add to queue (non-blocking)
try:
self.frame_queue.put_nowait((pil_image, results))
except queue.Full:
# Remove old frame and add new one
try:
self.frame_queue.get_nowait()
self.frame_queue.put_nowait((pil_image, results))
except queue.Empty:
pass
# Log violations
if results.get('violations'):
self._log_violation(results['violations'], 'camera_monitoring')
time.sleep(0.1) # 10 FPS
except Exception as e:
print(f"Error in camera monitoring: {e}")
time.sleep(1)
def get_camera_frame(self) -> Tuple[PIL.Image.Image, str]:
"""Get latest camera frame for Gradio display"""
try:
if not self.monitoring_active:
return None, "Camera monitoring not active"
# Get latest frame from queue
try:
pil_image, results = self.frame_queue.get_nowait()
# Create status text
people_count = results.get('people_count', 0)
violations = results.get('violations', [])
status_parts = [
f"πŸ‘₯ People: {people_count}",
f"⚠️ Violations: {len(violations)}",
f"πŸ•’ {datetime.now().strftime('%H:%M:%S')}"
]
if violations:
status_parts.append("πŸ”΄ SAFETY VIOLATIONS DETECTED!")
else:
status_parts.append("βœ… All Clear")
status_text = " | ".join(status_parts)
return pil_image, status_text
except queue.Empty:
return None, "Waiting for camera frame..."
except Exception as e:
return None, f"Error: {str(e)}"
def _log_violation(self, violations: List[Dict], source: str):
"""Log violations to internal log"""
timestamp = datetime.now().isoformat()
for violation in violations:
log_entry = {
'timestamp': timestamp,
'source': source,
'type': violation.get('type', 'unknown'),
'description': violation.get('description', 'Unknown violation'),
'severity': violation.get('severity', 'medium')
}
self.violation_log.append(log_entry)
# Keep only last 100 violations
if len(self.violation_log) > 100:
self.violation_log = self.violation_log[-100:]
def get_violation_log(self) -> str:
"""Get formatted violation log"""
if not self.violation_log:
return "No violations recorded"
log_text = "πŸ“‹ Recent Safety Violations:\n\n"
# Show last 10 violations
recent_violations = self.violation_log[-10:]
for i, violation in enumerate(reversed(recent_violations), 1):
timestamp = datetime.fromisoformat(violation['timestamp']).strftime('%H:%M:%S')
severity_emoji = "πŸ”΄" if violation['severity'] == 'high' else "🟑"
log_text += f"{i}. [{timestamp}] {severity_emoji} {violation['description']}\n"
log_text += f" Source: {violation['source']} | Type: {violation['type']}\n\n"
if len(self.violation_log) > 10:
log_text += f"... and {len(self.violation_log) - 10} more violations\n"
log_text += f"\nTotal violations logged: {len(self.violation_log)}"
return log_text
def get_model_info(self) -> str:
"""Get information about the loaded model"""
if self.detector is None:
return "❌ Detector not initialized"
try:
classes = self.detector.get_model_classes()
device = getattr(self.detector, 'device', 'unknown')
info_text = f"""
πŸ€– **SafetyMaster Pro AI Model Information**
**Device**: {device}
**Model Type**: YOLOv8 PPE Detection
**Classes Detected**: {len(classes)} total
**Safety Equipment**:
β€’ Hard Hats / Helmets
β€’ Safety Vests
β€’ Face Masks
β€’ Safety Glasses
β€’ Gloves
β€’ Hearing Protection
**Violations Detected**:
β€’ Missing Hard Hat
β€’ Missing Safety Vest
β€’ Missing Face Mask
β€’ Person without PPE
**Model Classes**: {', '.join(classes[:10])}{'...' if len(classes) > 10 else ''}
"""
return info_text.strip()
except Exception as e:
return f"❌ Error getting model info: {str(e)}"
def create_interface(self) -> gr.Blocks:
"""Create the Gradio interface"""
# Custom CSS for better styling
css = """
.gradio-container {
max-width: 1200px !important;
}
.violation-box {
background-color: #fee;
border: 2px solid #f88;
border-radius: 8px;
padding: 10px;
}
.success-box {
background-color: #efe;
border: 2px solid #8f8;
border-radius: 8px;
padding: 10px;
}
"""
with gr.Blocks(
title="SafetyMaster Pro - AI Safety Monitoring",
theme=gr.themes.Soft(),
css=css
) as interface:
# Header
gr.Markdown("""
# πŸ›‘οΈ SafetyMaster Pro - AI Safety Monitoring
**Real-time PPE detection and safety compliance monitoring**
Detects: Hard Hats, Safety Vests, Face Masks, Safety Glasses, and Safety Violations
""")
with gr.Tabs():
# Tab 1: Image Upload Detection
with gr.Tab("πŸ“· Image Analysis"):
gr.Markdown("### Upload an image to detect safety equipment and violations")
with gr.Row():
with gr.Column(scale=1):
input_image = gr.Image(
type="pil",
label="Upload Image",
height=400
)
detect_btn = gr.Button(
"πŸ” Analyze Safety Compliance",
variant="primary",
size="lg"
)
with gr.Column(scale=1):
output_image = gr.Image(
label="Detection Results",
height=400
)
with gr.Row():
with gr.Column():
summary_text = gr.Textbox(
label="πŸ“Š Summary",
lines=8,
max_lines=15
)
with gr.Column():
violations_json = gr.JSON(
label="πŸ” Detailed Results",
height=300
)
# Connect the detection function
detect_btn.click(
fn=self.detect_safety_violations_image,
inputs=[input_image],
outputs=[output_image, violations_json, summary_text]
)
# Tab 2: Real-time Camera Monitoring
with gr.Tab("πŸ“Ή Live Camera Monitoring"):
gr.Markdown("### Real-time safety monitoring using your camera")
with gr.Row():
start_btn = gr.Button("▢️ Start Monitoring", variant="primary")
stop_btn = gr.Button("⏹️ Stop Monitoring", variant="stop")
with gr.Row():
camera_status = gr.Textbox(
label="πŸ“‘ Camera Status",
value="Camera not started",
interactive=False
)
frame_status = gr.Textbox(
label="πŸ“Š Live Status",
value="No data",
interactive=False
)
live_image = gr.Image(
label="πŸ”΄ Live Camera Feed",
height=500
)
# Connect camera functions
start_btn.click(
fn=self.start_camera_monitoring,
outputs=[camera_status, frame_status]
)
stop_btn.click(
fn=self.stop_camera_monitoring,
outputs=[camera_status, frame_status]
)
# Auto-refresh camera feed every 2 seconds
interface.load(
fn=self.get_camera_frame,
outputs=[live_image, frame_status],
every=2
)
# Tab 3: Violation Log
with gr.Tab("πŸ“‹ Violation Log"):
gr.Markdown("### Recent safety violations and compliance history")
refresh_log_btn = gr.Button("πŸ”„ Refresh Log", variant="secondary")
violation_log_display = gr.Textbox(
label="πŸ“‹ Violation History",
lines=20,
max_lines=30,
value="No violations recorded"
)
refresh_log_btn.click(
fn=self.get_violation_log,
outputs=[violation_log_display]
)
# Auto-refresh log every 10 seconds
interface.load(
fn=self.get_violation_log,
outputs=[violation_log_display],
every=10
)
# Tab 4: Model Information
with gr.Tab("πŸ€– AI Model Info"):
gr.Markdown("### Information about the AI detection model")
model_info_display = gr.Markdown(
value=self.get_model_info()
)
refresh_model_btn = gr.Button("πŸ”„ Refresh Model Info")
refresh_model_btn.click(
fn=self.get_model_info,
outputs=[model_info_display]
)
# Footer
gr.Markdown("""
---
**SafetyMaster Pro** - Powered by YOLOv8 AI Detection | Built with ❀️ for workplace safety
⚠️ **Note**: For camera monitoring, please allow camera access when prompted by your browser.
""")
return interface
def main():
"""Main function to launch the Gradio app"""
print("πŸš€ Starting SafetyMaster Pro - Gradio Interface")
# Create the Gradio app
app = SafetyMasterGradio()
interface = app.create_interface()
# Launch configuration
launch_kwargs = {
"server_name": "0.0.0.0", # Allow external access
"server_port": int(os.environ.get("PORT", 7860)), # Use PORT env var or default
"share": False, # Set to True for public sharing
"debug": False,
"show_error": True,
"quiet": False
}
print(f"🌐 Launching on port {launch_kwargs['server_port']}")
print("πŸ“± Access the app at: http://localhost:7860")
# Launch the interface
interface.launch(**launch_kwargs)
if __name__ == "__main__":
main()