Spaces:
Sleeping
Sleeping
import cv2 | |
import threading | |
import queue | |
import time | |
from typing import Optional, Callable, Union | |
import numpy as np | |
class CameraManager: | |
""" | |
Manages video capture from various sources including webcams, IP cameras, and video files. | |
Provides threaded video capture for real-time processing. | |
""" | |
def __init__(self, source: Union[int, str] = 0, buffer_size: int = 10): | |
""" | |
Initialize camera manager. | |
Args: | |
source: Camera source (0 for default webcam, URL for IP camera, path for video file) | |
buffer_size: Size of frame buffer for threading | |
""" | |
self.source = source | |
self.buffer_size = buffer_size | |
self.cap = None | |
self.frame_queue = queue.Queue(maxsize=buffer_size) | |
self.capture_thread = None | |
self.is_running = False | |
self.fps = 60 # Higher FPS target | |
self.frame_width = 640 | |
self.frame_height = 480 | |
def connect(self) -> bool: | |
""" | |
Connect to the video source. | |
Returns: | |
True if connection successful, False otherwise | |
""" | |
try: | |
self.cap = cv2.VideoCapture(self.source) | |
if not self.cap.isOpened(): | |
print(f"Error: Could not open video source: {self.source}") | |
return False | |
# Set camera properties for higher performance | |
self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, self.frame_width) | |
self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self.frame_height) | |
self.cap.set(cv2.CAP_PROP_FPS, self.fps) | |
self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) # Reduce buffer to minimize delay | |
# Additional optimizations for higher FPS | |
self.cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc('M', 'J', 'P', 'G')) # Use MJPEG for speed | |
self.cap.set(cv2.CAP_PROP_AUTO_EXPOSURE, 0.25) # Disable auto exposure for consistent timing | |
# Get actual properties | |
self.frame_width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
self.frame_height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
self.fps = int(self.cap.get(cv2.CAP_PROP_FPS)) | |
print(f"Connected to camera: {self.frame_width}x{self.frame_height} @ {self.fps}fps") | |
return True | |
except Exception as e: | |
print(f"Error connecting to camera: {e}") | |
return False | |
def start_capture(self) -> bool: | |
""" | |
Start threaded video capture. | |
Returns: | |
True if capture started successfully, False otherwise | |
""" | |
if not self.cap or not self.cap.isOpened(): | |
if not self.connect(): | |
return False | |
if self.is_running: | |
print("Capture is already running") | |
return True | |
self.is_running = True | |
self.capture_thread = threading.Thread(target=self._capture_frames, daemon=True) | |
self.capture_thread.start() | |
print("Video capture started") | |
return True | |
def stop_capture(self): | |
"""Stop video capture and clean up resources.""" | |
self.is_running = False | |
if self.capture_thread and self.capture_thread.is_alive(): | |
self.capture_thread.join(timeout=2.0) | |
if self.cap: | |
self.cap.release() | |
self.cap = None | |
# Clear the frame queue | |
while not self.frame_queue.empty(): | |
try: | |
self.frame_queue.get_nowait() | |
except queue.Empty: | |
break | |
print("Video capture stopped") | |
def _capture_frames(self): | |
"""Internal method to capture frames in a separate thread.""" | |
while self.is_running and self.cap and self.cap.isOpened(): | |
try: | |
ret, frame = self.cap.read() | |
if not ret: | |
print("Failed to capture frame") | |
if isinstance(self.source, str) and not self.source.isdigit(): | |
# For video files, we might have reached the end | |
print("Reached end of video file") | |
break | |
continue | |
# Add timestamp to frame | |
timestamp = time.time() | |
# If queue is full, remove oldest frame | |
if self.frame_queue.full(): | |
try: | |
self.frame_queue.get_nowait() | |
except queue.Empty: | |
pass | |
# Add new frame to queue | |
self.frame_queue.put((frame, timestamp), block=False) | |
except Exception as e: | |
print(f"Error in frame capture: {e}") | |
time.sleep(0.1) | |
self.is_running = False | |
def get_frame(self) -> Optional[tuple]: | |
""" | |
Get the latest frame from the capture queue. | |
Returns: | |
Tuple of (frame, timestamp) or None if no frame available | |
""" | |
try: | |
return self.frame_queue.get_nowait() | |
except queue.Empty: | |
return None | |
def get_latest_frame(self) -> Optional[tuple]: | |
""" | |
Get the most recent frame, discarding any older frames in the queue. | |
Returns: | |
Tuple of (frame, timestamp) or None if no frame available | |
""" | |
latest_frame = None | |
# Get all frames and keep only the latest | |
while True: | |
try: | |
frame_data = self.frame_queue.get_nowait() | |
latest_frame = frame_data | |
except queue.Empty: | |
break | |
return latest_frame | |
def is_connected(self) -> bool: | |
""" | |
Check if camera is connected and capturing. | |
Returns: | |
True if connected and running, False otherwise | |
""" | |
return self.is_running and self.cap is not None and self.cap.isOpened() | |
def get_properties(self) -> dict: | |
""" | |
Get camera properties. | |
Returns: | |
Dictionary of camera properties | |
""" | |
if not self.cap: | |
return {} | |
return { | |
'width': self.frame_width, | |
'height': self.frame_height, | |
'fps': self.fps, | |
'source': self.source, | |
'is_running': self.is_running, | |
'buffer_size': self.buffer_size | |
} | |
def set_resolution(self, width: int, height: int) -> bool: | |
""" | |
Set camera resolution. | |
Args: | |
width: Frame width | |
height: Frame height | |
Returns: | |
True if successful, False otherwise | |
""" | |
if not self.cap: | |
return False | |
try: | |
self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, width) | |
self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height) | |
# Verify the change | |
actual_width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
actual_height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
self.frame_width = actual_width | |
self.frame_height = actual_height | |
print(f"Resolution set to: {actual_width}x{actual_height}") | |
return True | |
except Exception as e: | |
print(f"Error setting resolution: {e}") | |
return False | |
def __enter__(self): | |
"""Context manager entry.""" | |
self.start_capture() | |
return self | |
def __exit__(self, exc_type, exc_val, exc_tb): | |
"""Context manager exit.""" | |
self.stop_capture() | |
class MultiCameraManager: | |
""" | |
Manages multiple camera sources simultaneously. | |
""" | |
def __init__(self): | |
self.cameras = {} | |
self.is_running = False | |
def add_camera(self, camera_id: str, source: Union[int, str], | |
buffer_size: int = 10) -> bool: | |
""" | |
Add a camera to the manager. | |
Args: | |
camera_id: Unique identifier for the camera | |
source: Camera source | |
buffer_size: Frame buffer size | |
Returns: | |
True if camera added successfully, False otherwise | |
""" | |
try: | |
camera = CameraManager(source, buffer_size) | |
if camera.connect(): | |
self.cameras[camera_id] = camera | |
print(f"Camera '{camera_id}' added successfully") | |
return True | |
else: | |
print(f"Failed to add camera '{camera_id}'") | |
return False | |
except Exception as e: | |
print(f"Error adding camera '{camera_id}': {e}") | |
return False | |
def remove_camera(self, camera_id: str): | |
"""Remove a camera from the manager.""" | |
if camera_id in self.cameras: | |
self.cameras[camera_id].stop_capture() | |
del self.cameras[camera_id] | |
print(f"Camera '{camera_id}' removed") | |
def start_all(self): | |
"""Start capture for all cameras.""" | |
for camera_id, camera in self.cameras.items(): | |
if camera.start_capture(): | |
print(f"Started capture for camera '{camera_id}'") | |
else: | |
print(f"Failed to start capture for camera '{camera_id}'") | |
self.is_running = True | |
def stop_all(self): | |
"""Stop capture for all cameras.""" | |
for camera_id, camera in self.cameras.items(): | |
camera.stop_capture() | |
print(f"Stopped capture for camera '{camera_id}'") | |
self.is_running = False | |
def get_frame(self, camera_id: str) -> Optional[tuple]: | |
"""Get frame from specific camera.""" | |
if camera_id in self.cameras: | |
return self.cameras[camera_id].get_frame() | |
return None | |
def get_all_frames(self) -> dict: | |
"""Get frames from all cameras.""" | |
frames = {} | |
for camera_id, camera in self.cameras.items(): | |
frame_data = camera.get_latest_frame() | |
if frame_data: | |
frames[camera_id] = frame_data | |
return frames | |
def get_camera_list(self) -> list: | |
"""Get list of all camera IDs.""" | |
return list(self.cameras.keys()) |