#!/usr/bin/env python3 """ SafetyMaster Pro - Gradio Interface Real-time safety equipment detection with modern web UI Optimized for easy deployment on Hugging Face Spaces, Gradio Cloud, and other platforms """ import gradio as gr import cv2 import numpy as np import PIL.Image import time import json import os from datetime import datetime from typing import Dict, List, Tuple, Optional import threading import queue # Import our existing safety detector from safety_detector import SafetyDetector from camera_manager import CameraManager class SafetyMasterGradio: """Gradio interface for SafetyMaster Pro""" def __init__(self): """Initialize the Gradio interface""" self.detector = None self.camera_manager = None self.monitoring_active = False self.violation_log = [] self.frame_queue = queue.Queue(maxsize=10) # Initialize detector self._initialize_detector() def _initialize_detector(self): """Initialize the safety detector""" try: print("šŸ¤– Loading AI model for safety detection...") self.detector = SafetyDetector() print("āœ… Safety detector initialized successfully") return True except Exception as e: print(f"āŒ Error initializing detector: {e}") return False def detect_safety_violations_image(self, image: PIL.Image.Image) -> Tuple[PIL.Image.Image, str, str]: """ Detect safety violations in uploaded image Args: image: PIL Image from Gradio Returns: Tuple of (annotated_image, violations_json, summary_text) """ if image is None: return None, "No image provided", "Please upload an image" if self.detector is None: return image, "Detector not initialized", "Error: AI model not loaded" try: # Convert PIL to OpenCV format cv_image = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR) # Run detection results = self.detector.detect_safety_violations(cv_image) # Draw annotations annotated_frame = self.detector.draw_detections(cv_image, results) # Convert back to PIL for Gradio annotated_image = PIL.Image.fromarray(cv2.cvtColor(annotated_frame, cv2.COLOR_BGR2RGB)) # Create violation summary violations = results.get('violations', []) people_count = results.get('people_count', 0) safety_equipment = results.get('safety_equipment', {}) # Format violations as JSON violations_json = json.dumps({ 'people_detected': people_count, 'safety_equipment_detected': safety_equipment, 'violations': violations, 'processing_time': results.get('processing_time', 0), 'timestamp': datetime.now().isoformat() }, indent=2) # Create human-readable summary summary_parts = [ f"šŸ‘„ People Detected: {people_count}", f"⚔ Processing Time: {results.get('processing_time', 0):.3f}s" ] if safety_equipment: summary_parts.append("\nšŸ›”ļø Safety Equipment Detected:") for equipment, count in safety_equipment.items(): if count > 0: summary_parts.append(f" • {equipment.replace('_', ' ').title()}: {count}") if violations: summary_parts.append(f"\nāš ļø Safety Violations Found: {len(violations)}") for violation in violations: severity_emoji = "šŸ”“" if violation.get('severity') == 'high' else "🟔" summary_parts.append(f" {severity_emoji} {violation.get('description', 'Unknown violation')}") else: summary_parts.append("\nāœ… No Safety Violations Detected") summary_text = "\n".join(summary_parts) # Log violation if any if violations: self._log_violation(violations, 'image_upload') return annotated_image, violations_json, summary_text except Exception as e: error_msg = f"Error processing image: {str(e)}" return image, f'{{"error": "{error_msg}"}}', f"āŒ {error_msg}" def start_camera_monitoring(self) -> Tuple[str, str]: """Start real-time camera monitoring""" try: if self.monitoring_active: return "āš ļø Monitoring already active", "Camera monitoring is already running" # Initialize camera self.camera_manager = CameraManager(source=0) if not self.camera_manager.start_capture(): return "āŒ Failed to start camera", "Could not access camera. Please check permissions." self.monitoring_active = True # Start monitoring thread monitor_thread = threading.Thread(target=self._camera_monitoring_loop, daemon=True) monitor_thread.start() return "āœ… Camera monitoring started", "Real-time safety monitoring is now active" except Exception as e: return f"āŒ Error: {str(e)}", f"Failed to start monitoring: {str(e)}" def stop_camera_monitoring(self) -> Tuple[str, str]: """Stop real-time camera monitoring""" try: self.monitoring_active = False if self.camera_manager: self.camera_manager.stop_capture() self.camera_manager = None return "šŸ›‘ Camera monitoring stopped", "Real-time monitoring has been stopped" except Exception as e: return f"āŒ Error: {str(e)}", f"Failed to stop monitoring: {str(e)}" def _camera_monitoring_loop(self): """Background loop for camera monitoring""" while self.monitoring_active and self.camera_manager: try: frame_data = self.camera_manager.get_latest_frame() if frame_data is not None: frame, timestamp = frame_data # Run detection results = self.detector.detect_safety_violations(frame) # Draw annotations annotated_frame = self.detector.draw_detections(frame, results) # Convert to PIL for Gradio pil_image = PIL.Image.fromarray(cv2.cvtColor(annotated_frame, cv2.COLOR_BGR2RGB)) # Add to queue (non-blocking) try: self.frame_queue.put_nowait((pil_image, results)) except queue.Full: # Remove old frame and add new one try: self.frame_queue.get_nowait() self.frame_queue.put_nowait((pil_image, results)) except queue.Empty: pass # Log violations if results.get('violations'): self._log_violation(results['violations'], 'camera_monitoring') time.sleep(0.1) # 10 FPS except Exception as e: print(f"Error in camera monitoring: {e}") time.sleep(1) def get_camera_frame(self) -> Tuple[PIL.Image.Image, str]: """Get latest camera frame for Gradio display""" try: if not self.monitoring_active: return None, "Camera monitoring not active" # Get latest frame from queue try: pil_image, results = self.frame_queue.get_nowait() # Create status text people_count = results.get('people_count', 0) violations = results.get('violations', []) status_parts = [ f"šŸ‘„ People: {people_count}", f"āš ļø Violations: {len(violations)}", f"šŸ•’ {datetime.now().strftime('%H:%M:%S')}" ] if violations: status_parts.append("šŸ”“ SAFETY VIOLATIONS DETECTED!") else: status_parts.append("āœ… All Clear") status_text = " | ".join(status_parts) return pil_image, status_text except queue.Empty: return None, "Waiting for camera frame..." except Exception as e: return None, f"Error: {str(e)}" def _log_violation(self, violations: List[Dict], source: str): """Log violations to internal log""" timestamp = datetime.now().isoformat() for violation in violations: log_entry = { 'timestamp': timestamp, 'source': source, 'type': violation.get('type', 'unknown'), 'description': violation.get('description', 'Unknown violation'), 'severity': violation.get('severity', 'medium') } self.violation_log.append(log_entry) # Keep only last 100 violations if len(self.violation_log) > 100: self.violation_log = self.violation_log[-100:] def get_violation_log(self) -> str: """Get formatted violation log""" if not self.violation_log: return "No violations recorded" log_text = "šŸ“‹ Recent Safety Violations:\n\n" # Show last 10 violations recent_violations = self.violation_log[-10:] for i, violation in enumerate(reversed(recent_violations), 1): timestamp = datetime.fromisoformat(violation['timestamp']).strftime('%H:%M:%S') severity_emoji = "šŸ”“" if violation['severity'] == 'high' else "🟔" log_text += f"{i}. [{timestamp}] {severity_emoji} {violation['description']}\n" log_text += f" Source: {violation['source']} | Type: {violation['type']}\n\n" if len(self.violation_log) > 10: log_text += f"... and {len(self.violation_log) - 10} more violations\n" log_text += f"\nTotal violations logged: {len(self.violation_log)}" return log_text def get_model_info(self) -> str: """Get information about the loaded model""" if self.detector is None: return "āŒ Detector not initialized" try: classes = self.detector.get_model_classes() device = getattr(self.detector, 'device', 'unknown') info_text = f""" šŸ¤– **SafetyMaster Pro AI Model Information** **Device**: {device} **Model Type**: YOLOv8 PPE Detection **Classes Detected**: {len(classes)} total **Safety Equipment**: • Hard Hats / Helmets • Safety Vests • Face Masks • Safety Glasses • Gloves • Hearing Protection **Violations Detected**: • Missing Hard Hat • Missing Safety Vest • Missing Face Mask • Person without PPE **Model Classes**: {', '.join(classes[:10])}{'...' if len(classes) > 10 else ''} """ return info_text.strip() except Exception as e: return f"āŒ Error getting model info: {str(e)}" def create_interface(self) -> gr.Blocks: """Create the Gradio interface""" # Custom CSS for better styling css = """ .gradio-container { max-width: 1200px !important; } .violation-box { background-color: #fee; border: 2px solid #f88; border-radius: 8px; padding: 10px; } .success-box { background-color: #efe; border: 2px solid #8f8; border-radius: 8px; padding: 10px; } """ with gr.Blocks( title="SafetyMaster Pro - AI Safety Monitoring", theme=gr.themes.Soft(), css=css ) as interface: # Header gr.Markdown(""" # šŸ›”ļø SafetyMaster Pro - AI Safety Monitoring **Real-time PPE detection and safety compliance monitoring** Detects: Hard Hats, Safety Vests, Face Masks, Safety Glasses, and Safety Violations """) with gr.Tabs(): # Tab 1: Image Upload Detection with gr.Tab("šŸ“· Image Analysis"): gr.Markdown("### Upload an image to detect safety equipment and violations") with gr.Row(): with gr.Column(scale=1): input_image = gr.Image( type="pil", label="Upload Image", height=400 ) detect_btn = gr.Button( "šŸ” Analyze Safety Compliance", variant="primary", size="lg" ) with gr.Column(scale=1): output_image = gr.Image( label="Detection Results", height=400 ) with gr.Row(): with gr.Column(): summary_text = gr.Textbox( label="šŸ“Š Summary", lines=8, max_lines=15 ) with gr.Column(): violations_json = gr.JSON( label="šŸ” Detailed Results", height=300 ) # Connect the detection function detect_btn.click( fn=self.detect_safety_violations_image, inputs=[input_image], outputs=[output_image, violations_json, summary_text] ) # Tab 2: Real-time Camera Monitoring with gr.Tab("šŸ“¹ Live Camera Monitoring"): gr.Markdown("### Real-time safety monitoring using your camera") with gr.Row(): start_btn = gr.Button("ā–¶ļø Start Monitoring", variant="primary") stop_btn = gr.Button("ā¹ļø Stop Monitoring", variant="stop") with gr.Row(): camera_status = gr.Textbox( label="šŸ“” Camera Status", value="Camera not started", interactive=False ) frame_status = gr.Textbox( label="šŸ“Š Live Status", value="No data", interactive=False ) live_image = gr.Image( label="šŸ”“ Live Camera Feed", height=500 ) # Connect camera functions start_btn.click( fn=self.start_camera_monitoring, outputs=[camera_status, frame_status] ) stop_btn.click( fn=self.stop_camera_monitoring, outputs=[camera_status, frame_status] ) # Auto-refresh camera feed every 2 seconds interface.load( fn=self.get_camera_frame, outputs=[live_image, frame_status], every=2 ) # Tab 3: Violation Log with gr.Tab("šŸ“‹ Violation Log"): gr.Markdown("### Recent safety violations and compliance history") refresh_log_btn = gr.Button("šŸ”„ Refresh Log", variant="secondary") violation_log_display = gr.Textbox( label="šŸ“‹ Violation History", lines=20, max_lines=30, value="No violations recorded" ) refresh_log_btn.click( fn=self.get_violation_log, outputs=[violation_log_display] ) # Auto-refresh log every 10 seconds interface.load( fn=self.get_violation_log, outputs=[violation_log_display], every=10 ) # Tab 4: Model Information with gr.Tab("šŸ¤– AI Model Info"): gr.Markdown("### Information about the AI detection model") model_info_display = gr.Markdown( value=self.get_model_info() ) refresh_model_btn = gr.Button("šŸ”„ Refresh Model Info") refresh_model_btn.click( fn=self.get_model_info, outputs=[model_info_display] ) # Footer gr.Markdown(""" --- **SafetyMaster Pro** - Powered by YOLOv8 AI Detection | Built with ā¤ļø for workplace safety āš ļø **Note**: For camera monitoring, please allow camera access when prompted by your browser. """) return interface def main(): """Main function to launch the Gradio app""" print("šŸš€ Starting SafetyMaster Pro - Gradio Interface") # Create the Gradio app app = SafetyMasterGradio() interface = app.create_interface() # Launch configuration launch_kwargs = { "server_name": "0.0.0.0", # Allow external access "server_port": int(os.environ.get("PORT", 7860)), # Use PORT env var or default "share": False, # Set to True for public sharing "debug": False, "show_error": True, "quiet": False } print(f"🌐 Launching on port {launch_kwargs['server_port']}") print("šŸ“± Access the app at: http://localhost:7860") # Launch the interface interface.launch(**launch_kwargs) if __name__ == "__main__": main()