Spaces:
Sleeping
Sleeping
#!/usr/bin/env python3 | |
""" | |
SafetyMaster Pro - Webcam Streaming Version | |
Optimized for cloud deployment with client-side webcam capture | |
""" | |
import cv2 | |
import base64 | |
import json | |
import time | |
import os | |
import numpy as np | |
from flask import Flask, render_template, jsonify | |
from flask_socketio import SocketIO, emit | |
import threading | |
from datetime import datetime | |
from io import BytesIO | |
from PIL import Image | |
from safety_detector import SafetyDetector | |
app = Flask(__name__) | |
app.config['SECRET_KEY'] = os.environ.get('SECRET_KEY', 'safety_monitor_secret_key') | |
socketio = SocketIO(app, cors_allowed_origins="*", async_mode='threading') | |
# Global variables | |
detector = None | |
violation_log = [] | |
def initialize_components(): | |
"""Initialize the safety detector.""" | |
global detector | |
try: | |
detector = SafetyDetector() | |
print("Safety detector initialized successfully") | |
return True | |
except Exception as e: | |
print(f"Error initializing components: {e}") | |
return False | |
def base64_to_frame(base64_string): | |
"""Convert base64 string to OpenCV frame.""" | |
try: | |
# Decode base64 to bytes | |
image_bytes = base64.b64decode(base64_string) | |
# Convert to PIL Image | |
pil_image = Image.open(BytesIO(image_bytes)) | |
# Convert to OpenCV format (BGR) | |
frame = cv2.cvtColor(np.array(pil_image), cv2.COLOR_RGB2BGR) | |
return frame | |
except Exception as e: | |
print(f"Error converting base64 to frame: {e}") | |
return None | |
def frame_to_base64(frame): | |
"""Convert OpenCV frame to base64 string.""" | |
try: | |
# Encode frame as JPEG | |
_, buffer = cv2.imencode('.jpg', frame, [cv2.IMWRITE_JPEG_QUALITY, 85]) | |
# Convert to base64 | |
frame_base64 = base64.b64encode(buffer).decode('utf-8') | |
return frame_base64 | |
except Exception as e: | |
print(f"Error converting frame to base64: {e}") | |
return None | |
def dashboard(): | |
"""Serve the webcam-enabled dashboard.""" | |
return render_template('dashboard_webcam.html') | |
def health_check(): | |
"""Health check endpoint.""" | |
return jsonify({ | |
'status': 'healthy', | |
'service': 'SafetyMaster Pro (Webcam)', | |
'timestamp': datetime.now().isoformat(), | |
'detector_loaded': detector is not None | |
}) | |
def get_violations(): | |
"""Get recent violations.""" | |
try: | |
return jsonify({ | |
'success': True, | |
'violations': violation_log[-20:], # Last 20 violations | |
'total_count': len(violation_log) | |
}) | |
except Exception as e: | |
return jsonify({ | |
'success': False, | |
'message': f'Error getting violations: {str(e)}' | |
}), 500 | |
def get_model_info(): | |
"""Get information about the loaded model.""" | |
try: | |
if detector: | |
return jsonify({ | |
'success': True, | |
'model_classes': detector.get_model_classes(), | |
'device': detector.device | |
}) | |
else: | |
return jsonify({ | |
'success': False, | |
'message': 'Detector not initialized' | |
}), 500 | |
except Exception as e: | |
return jsonify({ | |
'success': False, | |
'message': f'Error getting model info: {str(e)}' | |
}), 500 | |
def handle_connect(): | |
"""Handle client connection.""" | |
print('Client connected') | |
emit('status', {'message': 'Connected to Safety Monitor'}) | |
def handle_disconnect(): | |
"""Handle client disconnection.""" | |
print('Client disconnected') | |
def handle_process_frame(data): | |
"""Process a frame sent from the client webcam.""" | |
try: | |
if not detector: | |
emit('error', {'message': 'Detector not initialized'}) | |
return | |
# Get frame data | |
frame_data = data.get('frame') | |
if not frame_data: | |
emit('error', {'message': 'No frame data received'}) | |
return | |
# Convert base64 to OpenCV frame | |
frame = base64_to_frame(frame_data) | |
if frame is None: | |
emit('error', {'message': 'Failed to decode frame'}) | |
return | |
# Run AI detection | |
results = detector.detect_safety_violations(frame) | |
# Draw detections on frame | |
annotated_frame = detector.draw_detections(frame, results) | |
# Convert back to base64 | |
processed_frame_base64 = frame_to_base64(annotated_frame) | |
# Log violations | |
if results['violations']: | |
current_time = datetime.now().isoformat() | |
for violation in results['violations']: | |
violation_entry = { | |
'timestamp': current_time, | |
'type': violation['type'], | |
'description': violation['description'], | |
'severity': violation.get('severity', 'high'), | |
'count': violation.get('count', 1) | |
} | |
violation_log.append(violation_entry) | |
# Keep only last 50 violations | |
if len(violation_log) > 50: | |
violation_log.pop(0) | |
# Send results back to client | |
response_data = { | |
'people_count': results['people_count'], | |
'safety_equipment': results['safety_equipment'], | |
'violations': results['violations'], | |
'fps': results['fps'], | |
'timestamp': datetime.now().isoformat(), | |
'processed_frame': processed_frame_base64 | |
} | |
emit('detection_result', response_data) | |
except Exception as e: | |
print(f"Error processing frame: {e}") | |
emit('error', {'message': f'Error processing frame: {str(e)}'}) | |
def handle_model_info_request(): | |
"""Send model information to client.""" | |
try: | |
if detector: | |
model_info = { | |
'classes': detector.get_model_classes(), | |
'device': detector.device | |
} | |
emit('model_info', model_info) | |
else: | |
emit('error', {'message': 'Detector not initialized'}) | |
except Exception as e: | |
emit('error', {'message': f'Error getting model info: {str(e)}'}) | |
def main(): | |
"""Main function to run the web application.""" | |
print("🤖 Loading AI model (this may take a moment on first run)...") | |
print(" Using pre-uploaded models for faster startup...") | |
if not initialize_components(): | |
print("❌ Failed to initialize components") | |
return | |
# Get port from environment variable (Hugging Face Spaces uses 7860) | |
port = int(os.environ.get('PORT', 7860)) | |
host = '0.0.0.0' # Required for cloud deployment | |
print("🚀 Starting SafetyMaster Pro (Webcam Version)...") | |
print(f" Running on: http://{host}:{port}") | |
print(" 🎥 Webcam streaming enabled for cloud deployment") | |
print(" Press Ctrl+C to stop") | |
try: | |
socketio.run(app, | |
host=host, | |
port=port, | |
debug=False, # Disable debug in production | |
use_reloader=False, # Disable reloader in production | |
allow_unsafe_werkzeug=True) | |
except KeyboardInterrupt: | |
print("\n🛑 Shutting down Safety Monitor...") | |
print(" Safety Monitor stopped") | |
if __name__ == '__main__': | |
main() |