Spaces:
Runtime error
Runtime error
#!/usr/bin/env python3 | |
""" | |
Safety Monitor Demo Script | |
Real-time safety compliance detection using webcam or video file. | |
""" | |
import cv2 | |
import argparse | |
import time | |
import sys | |
from safety_detector import SafetyDetector | |
from camera_manager import CameraManager | |
def main(): | |
parser = argparse.ArgumentParser(description='Safety Monitor Demo') | |
parser.add_argument('--source', type=str, default='0', | |
help='Video source (0 for webcam, path for video file, URL for IP camera)') | |
parser.add_argument('--model', type=str, default=None, | |
help='Path to custom YOLO model (optional)') | |
parser.add_argument('--confidence', type=float, default=0.5, | |
help='Detection confidence threshold (0.1-1.0)') | |
parser.add_argument('--save-violations', action='store_true', | |
help='Save violation images to disk') | |
parser.add_argument('--fullscreen', action='store_true', | |
help='Display in fullscreen mode') | |
args = parser.parse_args() | |
# Convert source to int if it's a digit (for webcam) | |
source = int(args.source) if args.source.isdigit() else args.source | |
print("๐ Safety Monitor Demo Starting...") | |
print(f"๐น Video Source: {source}") | |
print(f"๐ฏ Confidence Threshold: {args.confidence}") | |
print(f"๐ค Model: {'Custom' if args.model else 'YOLOv8 (default)'}") | |
print("\nControls:") | |
print(" SPACE - Pause/Resume") | |
print(" S - Save current frame") | |
print(" Q/ESC - Quit") | |
print("-" * 50) | |
try: | |
# Initialize safety detector | |
print("Loading safety detection model...") | |
detector = SafetyDetector(args.model, args.confidence) | |
print("โ Safety detector initialized") | |
# Initialize camera | |
print("Connecting to camera...") | |
camera = CameraManager(source) | |
if not camera.start_capture(): | |
print("โ Failed to start camera capture") | |
return 1 | |
print("โ Camera connected and capturing") | |
print("\n๐ Starting real-time safety monitoring...\n") | |
# Stats tracking | |
frame_count = 0 | |
fps_start_time = time.time() | |
total_violations = 0 | |
paused = False | |
while True: | |
if not paused: | |
# Get latest frame | |
frame_data = camera.get_latest_frame() | |
if frame_data is None: | |
time.sleep(0.01) | |
continue | |
frame, timestamp = frame_data | |
frame_count += 1 | |
# Process frame for safety detection | |
annotated_frame, analysis = detector.process_frame(frame) | |
# Update stats | |
total_violations += analysis['violations'] | |
# Calculate FPS | |
current_time = time.time() | |
if current_time - fps_start_time >= 1.0: | |
fps = frame_count / (current_time - fps_start_time) | |
frame_count = 0 | |
fps_start_time = current_time | |
# Print stats | |
print(f"\r๐ FPS: {fps:.1f} | People: {analysis['total_people']} | " | |
f"Compliant: {analysis['compliant_people']} | " | |
f"Violations: {analysis['violations']} | " | |
f"Total Violations: {total_violations}", end='', flush=True) | |
# Display frame | |
display_frame = annotated_frame | |
else: | |
# Use last frame when paused | |
if 'display_frame' not in locals(): | |
continue | |
# Add pause indicator | |
pause_frame = display_frame.copy() | |
cv2.putText(pause_frame, "PAUSED - Press SPACE to resume", | |
(10, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 2) | |
display_frame = pause_frame | |
# Show the frame | |
if args.fullscreen: | |
cv2.namedWindow('Safety Monitor', cv2.WINDOW_NORMAL) | |
cv2.setWindowProperty('Safety Monitor', cv2.WND_PROP_FULLSCREEN, cv2.WINDOW_FULLSCREEN) | |
cv2.imshow('Safety Monitor', display_frame) | |
# Handle keyboard input | |
key = cv2.waitKey(1) & 0xFF | |
if key == ord('q') or key == 27: # Q or ESC | |
break | |
elif key == ord(' '): # SPACE | |
paused = not paused | |
if paused: | |
print("\nโธ๏ธ PAUSED") | |
else: | |
print("\nโถ๏ธ RESUMED") | |
elif key == ord('s'): # S | |
timestamp_str = time.strftime("%Y%m%d_%H%M%S") | |
filename = f"safety_monitor_capture_{timestamp_str}.jpg" | |
cv2.imwrite(filename, display_frame) | |
print(f"\n๐ธ Frame saved as {filename}") | |
except KeyboardInterrupt: | |
print("\n\nโน๏ธ Monitoring stopped by user") | |
except Exception as e: | |
print(f"\nโ Error: {e}") | |
return 1 | |
finally: | |
# Cleanup | |
if 'camera' in locals(): | |
camera.stop_capture() | |
cv2.destroyAllWindows() | |
# Final stats | |
print(f"\n\n๐ Final Statistics:") | |
print(f" Total Violations Detected: {total_violations}") | |
if 'detector' in locals(): | |
violation_summary = detector.get_violation_summary() | |
print(f" Violations Saved: {violation_summary['total_violations']}") | |
print(" Thank you for using Safety Monitor! ๐") | |
return 0 | |
if __name__ == "__main__": | |
sys.exit(main()) |