Apply suggestions from code review camera_opencv

Co-authored-by: Simon Alibert <75076266+aliberts@users.noreply.github.com>
This commit is contained in:
Steven Palma
2025-05-20 13:47:00 +02:00
committed by GitHub
parent 05d8825bcb
commit 3890c38c12

View File

@@ -114,19 +114,18 @@ class OpenCVCamera(Camera):
super().__init__(config)
self.config = config
self.index_or_path: IndexOrPath = config.index_or_path
self.index_or_path = config.index_or_path
self.fps: int | None = config.fps
self.channels: int = config.channels
self.color_mode: ColorMode = config.color_mode
self.fps = config.fps
self.channels = config.channels
self.color_mode = config.color_mode
self.videocapture_camera: cv2.VideoCapture | None = None
self.videocapture: cv2.VideoCapture | None = None
self.thread: Thread | None = None
self.stop_event: Event | None = None
self.frame_queue: queue.Queue = queue.Queue(maxsize=1)
self.logs: dict = {} # NOTE(Steven): Might be removed in the future
self.rotation: int | None = get_cv2_rotation(config.rotation)
self.backend: int = get_cv2_backend() # NOTE(Steven): If we specify backend the opencv open fails
@@ -138,7 +137,6 @@ class OpenCVCamera(Camera):
self.prerotated_width, self.prerotated_height = self.width, self.height
def __str__(self) -> str:
"""Returns a string representation of the camera instance."""
return f"{self.__class__.__name__}({self.index_or_path})"
@property
@@ -170,7 +168,7 @@ class OpenCVCamera(Camera):
self._validate_fps()
self._validate_width_and_height()
def connect(self, do_warmup_read: bool = True):
def connect(self, warmup: bool = True):
"""
Connects to the OpenCV camera specified in the configuration.
@@ -190,7 +188,6 @@ class OpenCVCamera(Camera):
# blocking in multi-threaded applications, especially during data collection.
cv2.setNumThreads(1)
logger.debug(f"Attempting to connect to camera {self.index_or_path} using backend {self.backend}...")
self.videocapture_camera = cv2.VideoCapture(self.index_or_path)
if not self.videocapture_camera.isOpened():
@@ -201,7 +198,6 @@ class OpenCVCamera(Camera):
f"Run 'python -m find_cameras list-cameras' for details."
)
logger.debug(f"Successfully opened camera {self.index_or_path}. Applying configuration...")
self._configure_capture_settings()
if do_warmup_read:
@@ -272,7 +268,7 @@ class OpenCVCamera(Camera):
@staticmethod
def find_cameras(
max_index_search_range=MAX_OPENCV_INDEX, raise_when_empty: bool = True
max_index_search_range=MAX_OPENCV_INDEX
) -> List[Dict[str, Any]]:
"""
Detects available OpenCV cameras connected to the system.
@@ -451,7 +447,7 @@ class OpenCVCamera(Camera):
logger.debug(f"Stopping read loop thread for {self}.")
def _ensure_read_thread_running(self):
def _start_read_thread(self) -> None:
"""Starts or restarts the background read thread if it's not running."""
if self.thread is not None and self.thread.is_alive():
self.thread.join(timeout=0.1)
@@ -510,7 +506,7 @@ class OpenCVCamera(Camera):
logger.exception(f"Unexpected error getting frame from queue for {self}: {e}")
raise RuntimeError(f"Error getting frame from queue for camera {self.index_or_path}: {e}") from e
def _shutdown_read_thread(self):
def _stop_read_thread(self) -> None:
"""Signals the background read thread to stop and waits for it to join."""
if self.stop_event is not None:
logger.debug(f"Signaling stop event for read thread of {self}.")
@@ -539,17 +535,15 @@ class OpenCVCamera(Camera):
"""
if not self.is_connected and self.thread is None:
raise DeviceNotConnectedError(
f"Attempted to disconnect {self}, but it appears already disconnected."
f"{self} not connected."
)
logger.debug(f"Disconnecting from camera {self.index_or_path}...")
if self.thread is not None:
self._shutdown_read_thread()
if self.videocapture_camera is not None:
logger.debug(f"Releasing OpenCV VideoCapture object for {self}.")
self.videocapture_camera.release()
self.videocapture_camera = None
logger.info(f"Camera {self.index_or_path} disconnected successfully.")
logger.info(f"{self} disconnected.")