diff --git a/lerobot/common/cameras/opencv/camera_opencv.py b/lerobot/common/cameras/opencv/camera_opencv.py index 824c5ca3..d3d7ad62 100644 --- a/lerobot/common/cameras/opencv/camera_opencv.py +++ b/lerobot/common/cameras/opencv/camera_opencv.py @@ -116,13 +116,6 @@ class OpenCVCamera(Camera): self.capture_width: int | None = config.width self.capture_height: int | None = config.height - if config.rotation in [-90, 90]: - self.width: int | None = config.height - self.height: int | None = config.width - else: - self.width: int | None = config.width - self.height: int | None = config.height - self.fps: int | None = config.fps self.channels: int = config.channels self.color_mode: ColorMode = config.color_mode @@ -146,6 +139,7 @@ class OpenCVCamera(Camera): """Checks if the camera is currently connected and opened.""" return isinstance(self.videocapture_camera, cv2.VideoCapture) and self.videocapture_camera.isOpened() + # NOTE(Steven): Make it a class method and an util for calling it in utils.py (don't raise) def _scan_available_cameras_and_raise(self, index_or_path: IndexOrPath) -> None: """ Scans for available cameras and raises an error if the specified @@ -173,7 +167,7 @@ class OpenCVCamera(Camera): ) # NOTE(Steven): Moving it to a different function for now. To be evaluated later if it is worth it and if it makes sense to have it as an abstract method - def _configure_capture_settings(self, fps: int | None, width: int | None, height: int | None) -> None: + def _configure_capture_settings(self) -> None: """ Applies the specified FPS, width, and height settings to the connected camera. @@ -194,12 +188,14 @@ class OpenCVCamera(Camera): if not self.is_connected: raise DeviceNotConnectedError(f"Cannot configure settings for {self} as it is not connected.") - if fps is not None: - self._set_fps(fps) - if width is not None: - self._set_capture_width(width) - if height is not None: - self._set_capture_height(height) + self._set_fps() + self._set_capture_width() + self._set_capture_height() + + if self.rotation in [cv2.ROTATE_90_CLOCKWISE, cv2.ROTATE_90_COUNTERCLOCKWISE]: + self.width, self.height = self.capture_height, self.capture_width + else: + self.width, self.height = self.capture_width, self.capture_height def connect(self): """ @@ -225,53 +221,72 @@ class OpenCVCamera(Camera): self.videocapture_camera = cv2.VideoCapture(self.index_or_path, self.backend) if not self.videocapture_camera.isOpened(): - logger.error(f"Failed to open camera {self.index_or_path}.") self.videocapture_camera.release() self.videocapture_camera = None - self._scan_available_cameras_and_raise(self.index_or_path) # This raises an exception everytime + raise ConnectionError( + f"Failed to open camera {self.index_or_path}. Run NOTE(Steven): Add right file to scan for available cameras." + ) # NOTE(Steven): Run this _scan_available_cameras_and_raise logger.debug(f"Successfully opened camera {self.index_or_path}. Applying configuration...") - self._configure_capture_settings(self.fps, self.capture_width, self.capture_height) + self._configure_capture_settings() logger.debug(f"Camera {self.index_or_path} connected and configured successfully.") - def _set_fps(self, fps: int) -> None: + def _set_fps(self) -> None: """Sets the camera's frames per second (FPS).""" - success = self.videocapture_camera.set(cv2.CAP_PROP_FPS, float(fps)) + + if self.fps is None: + self.fps = self.videocapture_camera.get(cv2.CAP_PROP_FPS) + logger.info(f"FPS set to camera default: {self.fps}.") + return + + success = self.videocapture_camera.set(cv2.CAP_PROP_FPS, float(self.fps)) actual_fps = self.videocapture_camera.get(cv2.CAP_PROP_FPS) # Use math.isclose for robust float comparison - if not success or not math.isclose(fps, actual_fps, rel_tol=1e-3): + if not success or not math.isclose(self.fps, actual_fps, rel_tol=1e-3): logger.warning( - f"Requested FPS {fps} for {self}, but camera reported {actual_fps} (set success: {success}). " + f"Requested FPS {self.fps} for {self}, but camera reported {actual_fps} (set success: {success}). " "This might be due to camera limitations." ) raise RuntimeError( - f"Failed to set requested FPS {fps} for {self}. Actual value reported: {actual_fps}." + f"Failed to set requested FPS {self.fps} for {self}. Actual value reported: {actual_fps}." ) logger.debug(f"FPS set to {actual_fps} for {self}.") - def _set_capture_width(self, capture_width: int) -> None: + def _set_capture_width(self) -> None: """Sets the camera's frame capture width.""" - success = self.videocapture_camera.set(cv2.CAP_PROP_FRAME_WIDTH, float(capture_width)) + + if self.capture_width is None: + self.capture_width = self.videocapture_camera.get(cv2.CAP_PROP_FRAME_WIDTH) + logger.info(f"Capture width set to camera default: {self.capture_width}.") + return + + success = self.videocapture_camera.set(cv2.CAP_PROP_FRAME_WIDTH, float(self.capture_width)) actual_width = round(self.videocapture_camera.get(cv2.CAP_PROP_FRAME_WIDTH)) if not success or self.capture_width != actual_width: logger.warning( - f"Requested capture width {capture_width} for {self}, but camera reported {actual_width} (set success: {success})." + f"Requested capture width {self.capture_width} for {self}, but camera reported {actual_width} (set success: {success})." ) raise RuntimeError( - f"Failed to set requested capture width {capture_width} for {self}. Actual value: {actual_width}." + f"Failed to set requested capture width {self.capture_width} for {self}. Actual value: {actual_width}." ) logger.debug(f"Capture width set to {actual_width} for {self}.") - def _set_capture_height(self, capture_height: int) -> None: + def _set_capture_height(self) -> None: """Sets the camera's frame capture height.""" - success = self.videocapture_camera.set(cv2.CAP_PROP_FRAME_HEIGHT, float(capture_height)) + + if self.capture_height is None: + self.capture_height = self.videocapture_camera.get(cv2.CAP_PROP_FRAME_HEIGHT) + logger.info(f"Capture height set to camera default: {self.capture_height}.") + return + + success = self.videocapture_camera.set(cv2.CAP_PROP_FRAME_HEIGHT, float(self.capture_height)) actual_height = round(self.videocapture_camera.get(cv2.CAP_PROP_FRAME_HEIGHT)) if not success or self.capture_height != actual_height: logger.warning( - f"Requested capture height {capture_height} for {self}, but camera reported {actual_height} (set success: {success})." + f"Requested capture height {self.capture_height} for {self}, but camera reported {actual_height} (set success: {success})." ) raise RuntimeError( - f"Failed to set requested capture height {capture_height} for {self}. Actual value: {actual_height}." + f"Failed to set requested capture height {self.capture_height} for {self}. Actual value: {actual_height}." ) logger.debug(f"Capture height set to {actual_height} for {self}.") @@ -406,9 +421,9 @@ class OpenCVCamera(Camera): processed_image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) logger.debug(f"Converted frame from BGR to RGB for {self}.") - if self.rotation is not None: + if self.rotation in [cv2.ROTATE_90_CLOCKWISE, cv2.ROTATE_90_COUNTERCLOCKWISE]: processed_image = cv2.rotate(processed_image, self.rotation) - logger.debug(f"Rotated frame by {self.config.rotation} degrees for {self}.") + logger.debug(f"Rotated frame by {self.config.rotation} degrees for {self}.") return processed_image diff --git a/lerobot/common/cameras/utils.py b/lerobot/common/cameras/utils.py index 1968c0a3..674cfb0a 100644 --- a/lerobot/common/cameras/utils.py +++ b/lerobot/common/cameras/utils.py @@ -53,7 +53,6 @@ def get_cv2_rotation(rotation: Cv2Rotation) -> int: Cv2Rotation.ROTATE_270: cv2.ROTATE_90_COUNTERCLOCKWISE, Cv2Rotation.ROTATE_90: cv2.ROTATE_90_CLOCKWISE, Cv2Rotation.ROTATE_180: cv2.ROTATE_180, - Cv2Rotation.NO_ROTATION: 0, }.get(rotation) diff --git a/tests/artifacts/cameras/fake_cam.png b/tests/artifacts/cameras/fake_cam.png new file mode 100644 index 00000000..29aef4ed Binary files /dev/null and b/tests/artifacts/cameras/fake_cam.png differ diff --git a/tests/cameras/test_opencv.py b/tests/cameras/test_opencv.py index 5791019b..820f2c4e 100644 --- a/tests/cameras/test_opencv.py +++ b/tests/cameras/test_opencv.py @@ -1,13 +1,163 @@ -# from lerobot.common.cameras.opencv import OpenCVCamera, OpenCVCameraConfig +#!/usr/bin/env python + +# Copyright 2024 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import numpy as np +import pytest + +from lerobot.common.cameras.configs import Cv2Rotation +from lerobot.common.cameras.opencv import OpenCVCamera, OpenCVCameraConfig +from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError -# class TestCameras(unittest.TestCase): -# # NOTE(Steven): Independent test func -# def test_connect(): -# config = OpenCVCameraConfig(camera_id=0) -# camera = OpenCVCamera(config) -# camera.connect() -# assert camera.is_connected +def test_base_class_implementation(): + config = OpenCVCameraConfig(index_or_path=0) + + _ = OpenCVCamera(config) -# # 1. Refactor -> opencv , write some test , think about realsese, iterate +def test_connect(): + config = OpenCVCameraConfig(index_or_path="tests/artifacts/cameras/fake_cam.png") + camera = OpenCVCamera(config) + + camera.connect() + + assert camera.is_connected + + +def test_connect_already_connected(): + config = OpenCVCameraConfig(index_or_path="tests/artifacts/cameras/fake_cam.png") + camera = OpenCVCamera(config) + camera.connect() + + with pytest.raises(DeviceAlreadyConnectedError): + camera.connect() + + +def test_connect_invalid_camera_path(): + config = OpenCVCameraConfig(index_or_path="nonexistent/camera.png") + camera = OpenCVCamera(config) + + with pytest.raises(ConnectionError): + camera.connect() + + +def test_invalid_width_connect(): + config = OpenCVCameraConfig( + index_or_path="tests/artifacts/cameras/fake_cam.png", + width=99999, # Invalid width to trigger error + height=480, + ) + camera = OpenCVCamera(config) + + with pytest.raises(RuntimeError): + camera.connect() + + +def test_read(): + config = OpenCVCameraConfig(index_or_path="tests/artifacts/cameras/fake_cam.png") + camera = OpenCVCamera(config) + camera.connect() + + img = camera.read() + + assert isinstance(img, np.ndarray) + + +def test_read_before_connect(): + config = OpenCVCameraConfig(index_or_path="tests/artifacts/cameras/fake_cam.png") + camera = OpenCVCamera(config) + + with pytest.raises(DeviceNotConnectedError): + _ = camera.read() + + +def test_disconnect(): + config = OpenCVCameraConfig(index_or_path="tests/artifacts/cameras/fake_cam.png") + camera = OpenCVCamera(config) + camera.connect() + + camera.disconnect() + + assert not camera.is_connected + + +def test_disconnect_before_connect(): + config = OpenCVCameraConfig(index_or_path="tests/artifacts/cameras/fake_cam.png") + camera = OpenCVCamera(config) + + with pytest.raises(DeviceNotConnectedError): + _ = camera.disconnect() + + +def test_async_read(): + config = OpenCVCameraConfig(index_or_path="tests/artifacts/cameras/fake_cam.png") + camera = OpenCVCamera(config) + camera.connect() + + img = camera.async_read() + + assert camera.thread is not None + assert camera.thread.is_alive() + assert isinstance(img, np.ndarray) + camera.disconnect() # To stop/join the thread. Otherwise get warnings when the test ends + + +def test_async_read_timeout(): + config = OpenCVCameraConfig(index_or_path="tests/artifacts/cameras/fake_cam.png") + camera = OpenCVCamera(config) + camera.connect() + + with pytest.raises(TimeoutError): + _ = camera.async_read(timeout_ms=1) + + camera.disconnect() + + +def test_async_read_before_connect(): + config = OpenCVCameraConfig(index_or_path="tests/artifacts/cameras/fake_cam.png") + camera = OpenCVCamera(config) + + with pytest.raises(DeviceNotConnectedError): + _ = camera.async_read() + + +@pytest.mark.parametrize( + "rotation", + [ + Cv2Rotation.NO_ROTATION, + Cv2Rotation.ROTATE_90, + Cv2Rotation.ROTATE_180, + Cv2Rotation.ROTATE_270, + ], +) +def test_all_rotations(rotation): + config = OpenCVCameraConfig(index_or_path="tests/artifacts/cameras/fake_cam.png", rotation=rotation) + camera = OpenCVCamera(config) + camera.connect() + + img = camera.read() + assert isinstance(img, np.ndarray) + + if rotation in (Cv2Rotation.ROTATE_90, Cv2Rotation.ROTATE_270): + assert camera.width == 480 + assert camera.height == 640 + assert img.shape[:2] == (640, 480) + else: + assert camera.width == 640 + assert camera.height == 480 + assert img.shape[:2] == (480, 640) + + camera.disconnect()