Spaces:
Runtime error
Runtime error
import cv2 | |
import numpy as np | |
from typing import Optional, Tuple, Callable | |
import platform | |
import threading | |
# Only import Windows-specific library if on Windows | |
if platform.system() == "Windows": | |
from pygrabber.dshow_graph import FilterGraph | |
class VideoCapturer: | |
def __init__(self, device_index: int): | |
self.device_index = device_index | |
self.frame_callback = None | |
self._current_frame = None | |
self._frame_ready = threading.Event() | |
self.is_running = False | |
self.cap = None | |
# Initialize Windows-specific components if on Windows | |
if platform.system() == "Windows": | |
self.graph = FilterGraph() | |
# Verify device exists | |
devices = self.graph.get_input_devices() | |
if self.device_index >= len(devices): | |
raise ValueError( | |
f"Invalid device index {device_index}. Available devices: {len(devices)}" | |
) | |
def start(self, width: int = 960, height: int = 540, fps: int = 60) -> bool: | |
"""Initialize and start video capture""" | |
try: | |
if platform.system() == "Windows": | |
# Windows-specific capture methods | |
capture_methods = [ | |
(self.device_index, cv2.CAP_DSHOW), # Try DirectShow first | |
(self.device_index, cv2.CAP_ANY), # Then try default backend | |
(-1, cv2.CAP_ANY), # Try -1 as fallback | |
(0, cv2.CAP_ANY), # Finally try 0 without specific backend | |
] | |
for dev_id, backend in capture_methods: | |
try: | |
self.cap = cv2.VideoCapture(dev_id, backend) | |
if self.cap.isOpened(): | |
break | |
self.cap.release() | |
except Exception: | |
continue | |
else: | |
# Unix-like systems (Linux/Mac) capture method | |
self.cap = cv2.VideoCapture(self.device_index) | |
if not self.cap or not self.cap.isOpened(): | |
raise RuntimeError("Failed to open camera") | |
# Configure format | |
self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, width) | |
self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height) | |
self.cap.set(cv2.CAP_PROP_FPS, fps) | |
self.is_running = True | |
return True | |
except Exception as e: | |
print(f"Failed to start capture: {str(e)}") | |
if self.cap: | |
self.cap.release() | |
return False | |
def read(self) -> Tuple[bool, Optional[np.ndarray]]: | |
"""Read a frame from the camera""" | |
if not self.is_running or self.cap is None: | |
return False, None | |
ret, frame = self.cap.read() | |
if ret: | |
self._current_frame = frame | |
if self.frame_callback: | |
self.frame_callback(frame) | |
return True, frame | |
return False, None | |
def release(self) -> None: | |
"""Stop capture and release resources""" | |
if self.is_running and self.cap is not None: | |
self.cap.release() | |
self.is_running = False | |
self.cap = None | |
def set_frame_callback(self, callback: Callable[[np.ndarray], None]) -> None: | |
"""Set callback for frame processing""" | |
self.frame_callback = callback | |