safetyMasterPro / web_interface_webcam.py
mrvero's picture
Upload 4 files
a97f941 verified
#!/usr/bin/env python3
"""
SafetyMaster Pro - Webcam Streaming Version
Optimized for cloud deployment with client-side webcam capture
"""
import cv2
import base64
import json
import time
import os
import numpy as np
from flask import Flask, render_template, jsonify
from flask_socketio import SocketIO, emit
import threading
from datetime import datetime
from io import BytesIO
from PIL import Image
from safety_detector import SafetyDetector
app = Flask(__name__)
app.config['SECRET_KEY'] = os.environ.get('SECRET_KEY', 'safety_monitor_secret_key')
socketio = SocketIO(app, cors_allowed_origins="*", async_mode='threading')
# Global variables
detector = None
violation_log = []
def initialize_components():
"""Initialize the safety detector."""
global detector
try:
detector = SafetyDetector()
print("Safety detector initialized successfully")
return True
except Exception as e:
print(f"Error initializing components: {e}")
return False
def base64_to_frame(base64_string):
"""Convert base64 string to OpenCV frame."""
try:
# Decode base64 to bytes
image_bytes = base64.b64decode(base64_string)
# Convert to PIL Image
pil_image = Image.open(BytesIO(image_bytes))
# Convert to OpenCV format (BGR)
frame = cv2.cvtColor(np.array(pil_image), cv2.COLOR_RGB2BGR)
return frame
except Exception as e:
print(f"Error converting base64 to frame: {e}")
return None
def frame_to_base64(frame):
"""Convert OpenCV frame to base64 string."""
try:
# Encode frame as JPEG
_, buffer = cv2.imencode('.jpg', frame, [cv2.IMWRITE_JPEG_QUALITY, 85])
# Convert to base64
frame_base64 = base64.b64encode(buffer).decode('utf-8')
return frame_base64
except Exception as e:
print(f"Error converting frame to base64: {e}")
return None
@app.route('/')
def dashboard():
"""Serve the webcam-enabled dashboard."""
return render_template('dashboard_webcam.html')
@app.route('/health')
def health_check():
"""Health check endpoint."""
return jsonify({
'status': 'healthy',
'service': 'SafetyMaster Pro (Webcam)',
'timestamp': datetime.now().isoformat(),
'detector_loaded': detector is not None
})
@app.route('/api/violations')
def get_violations():
"""Get recent violations."""
try:
return jsonify({
'success': True,
'violations': violation_log[-20:], # Last 20 violations
'total_count': len(violation_log)
})
except Exception as e:
return jsonify({
'success': False,
'message': f'Error getting violations: {str(e)}'
}), 500
@app.route('/api/model_info')
def get_model_info():
"""Get information about the loaded model."""
try:
if detector:
return jsonify({
'success': True,
'model_classes': detector.get_model_classes(),
'device': detector.device
})
else:
return jsonify({
'success': False,
'message': 'Detector not initialized'
}), 500
except Exception as e:
return jsonify({
'success': False,
'message': f'Error getting model info: {str(e)}'
}), 500
@socketio.on('connect')
def handle_connect():
"""Handle client connection."""
print('Client connected')
emit('status', {'message': 'Connected to Safety Monitor'})
@socketio.on('disconnect')
def handle_disconnect():
"""Handle client disconnection."""
print('Client disconnected')
@socketio.on('process_frame')
def handle_process_frame(data):
"""Process a frame sent from the client webcam."""
try:
if not detector:
emit('error', {'message': 'Detector not initialized'})
return
# Get frame data
frame_data = data.get('frame')
if not frame_data:
emit('error', {'message': 'No frame data received'})
return
# Convert base64 to OpenCV frame
frame = base64_to_frame(frame_data)
if frame is None:
emit('error', {'message': 'Failed to decode frame'})
return
# Run AI detection
results = detector.detect_safety_violations(frame)
# Draw detections on frame
annotated_frame = detector.draw_detections(frame, results)
# Convert back to base64
processed_frame_base64 = frame_to_base64(annotated_frame)
# Log violations
if results['violations']:
current_time = datetime.now().isoformat()
for violation in results['violations']:
violation_entry = {
'timestamp': current_time,
'type': violation['type'],
'description': violation['description'],
'severity': violation.get('severity', 'high'),
'count': violation.get('count', 1)
}
violation_log.append(violation_entry)
# Keep only last 50 violations
if len(violation_log) > 50:
violation_log.pop(0)
# Send results back to client
response_data = {
'people_count': results['people_count'],
'safety_equipment': results['safety_equipment'],
'violations': results['violations'],
'fps': results['fps'],
'timestamp': datetime.now().isoformat(),
'processed_frame': processed_frame_base64
}
emit('detection_result', response_data)
except Exception as e:
print(f"Error processing frame: {e}")
emit('error', {'message': f'Error processing frame: {str(e)}'})
@socketio.on('request_model_info')
def handle_model_info_request():
"""Send model information to client."""
try:
if detector:
model_info = {
'classes': detector.get_model_classes(),
'device': detector.device
}
emit('model_info', model_info)
else:
emit('error', {'message': 'Detector not initialized'})
except Exception as e:
emit('error', {'message': f'Error getting model info: {str(e)}'})
def main():
"""Main function to run the web application."""
print("🤖 Loading AI model (this may take a moment on first run)...")
print(" Using pre-uploaded models for faster startup...")
if not initialize_components():
print("❌ Failed to initialize components")
return
# Get port from environment variable (Hugging Face Spaces uses 7860)
port = int(os.environ.get('PORT', 7860))
host = '0.0.0.0' # Required for cloud deployment
print("🚀 Starting SafetyMaster Pro (Webcam Version)...")
print(f" Running on: http://{host}:{port}")
print(" 🎥 Webcam streaming enabled for cloud deployment")
print(" Press Ctrl+C to stop")
try:
socketio.run(app,
host=host,
port=port,
debug=False, # Disable debug in production
use_reloader=False, # Disable reloader in production
allow_unsafe_werkzeug=True)
except KeyboardInterrupt:
print("\n🛑 Shutting down Safety Monitor...")
print(" Safety Monitor stopped")
if __name__ == '__main__':
main()