import cv2 import threading import queue import time from typing import Optional, Callable, Union import numpy as np class CameraManager: """ Manages video capture from various sources including webcams, IP cameras, and video files. Provides threaded video capture for real-time processing. """ def __init__(self, source: Union[int, str] = 0, buffer_size: int = 10): """ Initialize camera manager. Args: source: Camera source (0 for default webcam, URL for IP camera, path for video file) buffer_size: Size of frame buffer for threading """ self.source = source self.buffer_size = buffer_size self.cap = None self.frame_queue = queue.Queue(maxsize=buffer_size) self.capture_thread = None self.is_running = False self.fps = 60 # Higher FPS target self.frame_width = 640 self.frame_height = 480 def connect(self) -> bool: """ Connect to the video source. Returns: True if connection successful, False otherwise """ try: self.cap = cv2.VideoCapture(self.source) if not self.cap.isOpened(): print(f"Error: Could not open video source: {self.source}") return False # Set camera properties for higher performance self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, self.frame_width) self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self.frame_height) self.cap.set(cv2.CAP_PROP_FPS, self.fps) self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) # Reduce buffer to minimize delay # Additional optimizations for higher FPS self.cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc('M', 'J', 'P', 'G')) # Use MJPEG for speed self.cap.set(cv2.CAP_PROP_AUTO_EXPOSURE, 0.25) # Disable auto exposure for consistent timing # Get actual properties self.frame_width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)) self.frame_height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) self.fps = int(self.cap.get(cv2.CAP_PROP_FPS)) print(f"Connected to camera: {self.frame_width}x{self.frame_height} @ {self.fps}fps") return True except Exception as e: print(f"Error connecting to camera: {e}") return False def start_capture(self) -> bool: """ Start threaded video capture. Returns: True if capture started successfully, False otherwise """ if not self.cap or not self.cap.isOpened(): if not self.connect(): return False if self.is_running: print("Capture is already running") return True self.is_running = True self.capture_thread = threading.Thread(target=self._capture_frames, daemon=True) self.capture_thread.start() print("Video capture started") return True def stop_capture(self): """Stop video capture and clean up resources.""" self.is_running = False if self.capture_thread and self.capture_thread.is_alive(): self.capture_thread.join(timeout=2.0) if self.cap: self.cap.release() self.cap = None # Clear the frame queue while not self.frame_queue.empty(): try: self.frame_queue.get_nowait() except queue.Empty: break print("Video capture stopped") def _capture_frames(self): """Internal method to capture frames in a separate thread.""" while self.is_running and self.cap and self.cap.isOpened(): try: ret, frame = self.cap.read() if not ret: print("Failed to capture frame") if isinstance(self.source, str) and not self.source.isdigit(): # For video files, we might have reached the end print("Reached end of video file") break continue # Add timestamp to frame timestamp = time.time() # If queue is full, remove oldest frame if self.frame_queue.full(): try: self.frame_queue.get_nowait() except queue.Empty: pass # Add new frame to queue self.frame_queue.put((frame, timestamp), block=False) except Exception as e: print(f"Error in frame capture: {e}") time.sleep(0.1) self.is_running = False def get_frame(self) -> Optional[tuple]: """ Get the latest frame from the capture queue. Returns: Tuple of (frame, timestamp) or None if no frame available """ try: return self.frame_queue.get_nowait() except queue.Empty: return None def get_latest_frame(self) -> Optional[tuple]: """ Get the most recent frame, discarding any older frames in the queue. Returns: Tuple of (frame, timestamp) or None if no frame available """ latest_frame = None # Get all frames and keep only the latest while True: try: frame_data = self.frame_queue.get_nowait() latest_frame = frame_data except queue.Empty: break return latest_frame def is_connected(self) -> bool: """ Check if camera is connected and capturing. Returns: True if connected and running, False otherwise """ return self.is_running and self.cap is not None and self.cap.isOpened() def get_properties(self) -> dict: """ Get camera properties. Returns: Dictionary of camera properties """ if not self.cap: return {} return { 'width': self.frame_width, 'height': self.frame_height, 'fps': self.fps, 'source': self.source, 'is_running': self.is_running, 'buffer_size': self.buffer_size } def set_resolution(self, width: int, height: int) -> bool: """ Set camera resolution. Args: width: Frame width height: Frame height Returns: True if successful, False otherwise """ if not self.cap: return False try: self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, width) self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height) # Verify the change actual_width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)) actual_height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) self.frame_width = actual_width self.frame_height = actual_height print(f"Resolution set to: {actual_width}x{actual_height}") return True except Exception as e: print(f"Error setting resolution: {e}") return False def __enter__(self): """Context manager entry.""" self.start_capture() return self def __exit__(self, exc_type, exc_val, exc_tb): """Context manager exit.""" self.stop_capture() class MultiCameraManager: """ Manages multiple camera sources simultaneously. """ def __init__(self): self.cameras = {} self.is_running = False def add_camera(self, camera_id: str, source: Union[int, str], buffer_size: int = 10) -> bool: """ Add a camera to the manager. Args: camera_id: Unique identifier for the camera source: Camera source buffer_size: Frame buffer size Returns: True if camera added successfully, False otherwise """ try: camera = CameraManager(source, buffer_size) if camera.connect(): self.cameras[camera_id] = camera print(f"Camera '{camera_id}' added successfully") return True else: print(f"Failed to add camera '{camera_id}'") return False except Exception as e: print(f"Error adding camera '{camera_id}': {e}") return False def remove_camera(self, camera_id: str): """Remove a camera from the manager.""" if camera_id in self.cameras: self.cameras[camera_id].stop_capture() del self.cameras[camera_id] print(f"Camera '{camera_id}' removed") def start_all(self): """Start capture for all cameras.""" for camera_id, camera in self.cameras.items(): if camera.start_capture(): print(f"Started capture for camera '{camera_id}'") else: print(f"Failed to start capture for camera '{camera_id}'") self.is_running = True def stop_all(self): """Stop capture for all cameras.""" for camera_id, camera in self.cameras.items(): camera.stop_capture() print(f"Stopped capture for camera '{camera_id}'") self.is_running = False def get_frame(self, camera_id: str) -> Optional[tuple]: """Get frame from specific camera.""" if camera_id in self.cameras: return self.cameras[camera_id].get_frame() return None def get_all_frames(self) -> dict: """Get frames from all cameras.""" frames = {} for camera_id, camera in self.cameras.items(): frame_data = camera.get_latest_frame() if frame_data: frames[camera_id] = frame_data return frames def get_camera_list(self) -> list: """Get list of all camera IDs.""" return list(self.cameras.keys())