forked from tangger/lerobot
test(cameras): add minimal opencv test
This commit is contained in:
@@ -116,13 +116,6 @@ class OpenCVCamera(Camera):
|
||||
self.capture_width: int | None = config.width
|
||||
self.capture_height: int | None = config.height
|
||||
|
||||
if config.rotation in [-90, 90]:
|
||||
self.width: int | None = config.height
|
||||
self.height: int | None = config.width
|
||||
else:
|
||||
self.width: int | None = config.width
|
||||
self.height: int | None = config.height
|
||||
|
||||
self.fps: int | None = config.fps
|
||||
self.channels: int = config.channels
|
||||
self.color_mode: ColorMode = config.color_mode
|
||||
@@ -146,6 +139,7 @@ class OpenCVCamera(Camera):
|
||||
"""Checks if the camera is currently connected and opened."""
|
||||
return isinstance(self.videocapture_camera, cv2.VideoCapture) and self.videocapture_camera.isOpened()
|
||||
|
||||
# NOTE(Steven): Make it a class method and an util for calling it in utils.py (don't raise)
|
||||
def _scan_available_cameras_and_raise(self, index_or_path: IndexOrPath) -> None:
|
||||
"""
|
||||
Scans for available cameras and raises an error if the specified
|
||||
@@ -173,7 +167,7 @@ class OpenCVCamera(Camera):
|
||||
)
|
||||
|
||||
# NOTE(Steven): Moving it to a different function for now. To be evaluated later if it is worth it and if it makes sense to have it as an abstract method
|
||||
def _configure_capture_settings(self, fps: int | None, width: int | None, height: int | None) -> None:
|
||||
def _configure_capture_settings(self) -> None:
|
||||
"""
|
||||
Applies the specified FPS, width, and height settings to the connected camera.
|
||||
|
||||
@@ -194,12 +188,14 @@ class OpenCVCamera(Camera):
|
||||
if not self.is_connected:
|
||||
raise DeviceNotConnectedError(f"Cannot configure settings for {self} as it is not connected.")
|
||||
|
||||
if fps is not None:
|
||||
self._set_fps(fps)
|
||||
if width is not None:
|
||||
self._set_capture_width(width)
|
||||
if height is not None:
|
||||
self._set_capture_height(height)
|
||||
self._set_fps()
|
||||
self._set_capture_width()
|
||||
self._set_capture_height()
|
||||
|
||||
if self.rotation in [cv2.ROTATE_90_CLOCKWISE, cv2.ROTATE_90_COUNTERCLOCKWISE]:
|
||||
self.width, self.height = self.capture_height, self.capture_width
|
||||
else:
|
||||
self.width, self.height = self.capture_width, self.capture_height
|
||||
|
||||
def connect(self):
|
||||
"""
|
||||
@@ -225,53 +221,72 @@ class OpenCVCamera(Camera):
|
||||
self.videocapture_camera = cv2.VideoCapture(self.index_or_path, self.backend)
|
||||
|
||||
if not self.videocapture_camera.isOpened():
|
||||
logger.error(f"Failed to open camera {self.index_or_path}.")
|
||||
self.videocapture_camera.release()
|
||||
self.videocapture_camera = None
|
||||
self._scan_available_cameras_and_raise(self.index_or_path) # This raises an exception everytime
|
||||
raise ConnectionError(
|
||||
f"Failed to open camera {self.index_or_path}. Run NOTE(Steven): Add right file to scan for available cameras."
|
||||
) # NOTE(Steven): Run this _scan_available_cameras_and_raise
|
||||
|
||||
logger.debug(f"Successfully opened camera {self.index_or_path}. Applying configuration...")
|
||||
self._configure_capture_settings(self.fps, self.capture_width, self.capture_height)
|
||||
self._configure_capture_settings()
|
||||
logger.debug(f"Camera {self.index_or_path} connected and configured successfully.")
|
||||
|
||||
def _set_fps(self, fps: int) -> None:
|
||||
def _set_fps(self) -> None:
|
||||
"""Sets the camera's frames per second (FPS)."""
|
||||
success = self.videocapture_camera.set(cv2.CAP_PROP_FPS, float(fps))
|
||||
|
||||
if self.fps is None:
|
||||
self.fps = self.videocapture_camera.get(cv2.CAP_PROP_FPS)
|
||||
logger.info(f"FPS set to camera default: {self.fps}.")
|
||||
return
|
||||
|
||||
success = self.videocapture_camera.set(cv2.CAP_PROP_FPS, float(self.fps))
|
||||
actual_fps = self.videocapture_camera.get(cv2.CAP_PROP_FPS)
|
||||
# Use math.isclose for robust float comparison
|
||||
if not success or not math.isclose(fps, actual_fps, rel_tol=1e-3):
|
||||
if not success or not math.isclose(self.fps, actual_fps, rel_tol=1e-3):
|
||||
logger.warning(
|
||||
f"Requested FPS {fps} for {self}, but camera reported {actual_fps} (set success: {success}). "
|
||||
f"Requested FPS {self.fps} for {self}, but camera reported {actual_fps} (set success: {success}). "
|
||||
"This might be due to camera limitations."
|
||||
)
|
||||
raise RuntimeError(
|
||||
f"Failed to set requested FPS {fps} for {self}. Actual value reported: {actual_fps}."
|
||||
f"Failed to set requested FPS {self.fps} for {self}. Actual value reported: {actual_fps}."
|
||||
)
|
||||
logger.debug(f"FPS set to {actual_fps} for {self}.")
|
||||
|
||||
def _set_capture_width(self, capture_width: int) -> None:
|
||||
def _set_capture_width(self) -> None:
|
||||
"""Sets the camera's frame capture width."""
|
||||
success = self.videocapture_camera.set(cv2.CAP_PROP_FRAME_WIDTH, float(capture_width))
|
||||
|
||||
if self.capture_width is None:
|
||||
self.capture_width = self.videocapture_camera.get(cv2.CAP_PROP_FRAME_WIDTH)
|
||||
logger.info(f"Capture width set to camera default: {self.capture_width}.")
|
||||
return
|
||||
|
||||
success = self.videocapture_camera.set(cv2.CAP_PROP_FRAME_WIDTH, float(self.capture_width))
|
||||
actual_width = round(self.videocapture_camera.get(cv2.CAP_PROP_FRAME_WIDTH))
|
||||
if not success or self.capture_width != actual_width:
|
||||
logger.warning(
|
||||
f"Requested capture width {capture_width} for {self}, but camera reported {actual_width} (set success: {success})."
|
||||
f"Requested capture width {self.capture_width} for {self}, but camera reported {actual_width} (set success: {success})."
|
||||
)
|
||||
raise RuntimeError(
|
||||
f"Failed to set requested capture width {capture_width} for {self}. Actual value: {actual_width}."
|
||||
f"Failed to set requested capture width {self.capture_width} for {self}. Actual value: {actual_width}."
|
||||
)
|
||||
logger.debug(f"Capture width set to {actual_width} for {self}.")
|
||||
|
||||
def _set_capture_height(self, capture_height: int) -> None:
|
||||
def _set_capture_height(self) -> None:
|
||||
"""Sets the camera's frame capture height."""
|
||||
success = self.videocapture_camera.set(cv2.CAP_PROP_FRAME_HEIGHT, float(capture_height))
|
||||
|
||||
if self.capture_height is None:
|
||||
self.capture_height = self.videocapture_camera.get(cv2.CAP_PROP_FRAME_HEIGHT)
|
||||
logger.info(f"Capture height set to camera default: {self.capture_height}.")
|
||||
return
|
||||
|
||||
success = self.videocapture_camera.set(cv2.CAP_PROP_FRAME_HEIGHT, float(self.capture_height))
|
||||
actual_height = round(self.videocapture_camera.get(cv2.CAP_PROP_FRAME_HEIGHT))
|
||||
if not success or self.capture_height != actual_height:
|
||||
logger.warning(
|
||||
f"Requested capture height {capture_height} for {self}, but camera reported {actual_height} (set success: {success})."
|
||||
f"Requested capture height {self.capture_height} for {self}, but camera reported {actual_height} (set success: {success})."
|
||||
)
|
||||
raise RuntimeError(
|
||||
f"Failed to set requested capture height {capture_height} for {self}. Actual value: {actual_height}."
|
||||
f"Failed to set requested capture height {self.capture_height} for {self}. Actual value: {actual_height}."
|
||||
)
|
||||
logger.debug(f"Capture height set to {actual_height} for {self}.")
|
||||
|
||||
@@ -406,9 +421,9 @@ class OpenCVCamera(Camera):
|
||||
processed_image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
|
||||
logger.debug(f"Converted frame from BGR to RGB for {self}.")
|
||||
|
||||
if self.rotation is not None:
|
||||
if self.rotation in [cv2.ROTATE_90_CLOCKWISE, cv2.ROTATE_90_COUNTERCLOCKWISE]:
|
||||
processed_image = cv2.rotate(processed_image, self.rotation)
|
||||
logger.debug(f"Rotated frame by {self.config.rotation} degrees for {self}.")
|
||||
logger.debug(f"Rotated frame by {self.config.rotation} degrees for {self}.")
|
||||
|
||||
return processed_image
|
||||
|
||||
|
||||
@@ -53,7 +53,6 @@ def get_cv2_rotation(rotation: Cv2Rotation) -> int:
|
||||
Cv2Rotation.ROTATE_270: cv2.ROTATE_90_COUNTERCLOCKWISE,
|
||||
Cv2Rotation.ROTATE_90: cv2.ROTATE_90_CLOCKWISE,
|
||||
Cv2Rotation.ROTATE_180: cv2.ROTATE_180,
|
||||
Cv2Rotation.NO_ROTATION: 0,
|
||||
}.get(rotation)
|
||||
|
||||
|
||||
|
||||
BIN
tests/artifacts/cameras/fake_cam.png
Normal file
BIN
tests/artifacts/cameras/fake_cam.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 753 KiB |
@@ -1,13 +1,163 @@
|
||||
# from lerobot.common.cameras.opencv import OpenCVCamera, OpenCVCameraConfig
|
||||
#!/usr/bin/env python
|
||||
|
||||
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import numpy as np
|
||||
import pytest
|
||||
|
||||
from lerobot.common.cameras.configs import Cv2Rotation
|
||||
from lerobot.common.cameras.opencv import OpenCVCamera, OpenCVCameraConfig
|
||||
from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
|
||||
|
||||
|
||||
# class TestCameras(unittest.TestCase):
|
||||
# # NOTE(Steven): Independent test func
|
||||
# def test_connect():
|
||||
# config = OpenCVCameraConfig(camera_id=0)
|
||||
# camera = OpenCVCamera(config)
|
||||
# camera.connect()
|
||||
# assert camera.is_connected
|
||||
def test_base_class_implementation():
|
||||
config = OpenCVCameraConfig(index_or_path=0)
|
||||
|
||||
_ = OpenCVCamera(config)
|
||||
|
||||
|
||||
# # 1. Refactor -> opencv , write some test , think about realsese, iterate
|
||||
def test_connect():
|
||||
config = OpenCVCameraConfig(index_or_path="tests/artifacts/cameras/fake_cam.png")
|
||||
camera = OpenCVCamera(config)
|
||||
|
||||
camera.connect()
|
||||
|
||||
assert camera.is_connected
|
||||
|
||||
|
||||
def test_connect_already_connected():
|
||||
config = OpenCVCameraConfig(index_or_path="tests/artifacts/cameras/fake_cam.png")
|
||||
camera = OpenCVCamera(config)
|
||||
camera.connect()
|
||||
|
||||
with pytest.raises(DeviceAlreadyConnectedError):
|
||||
camera.connect()
|
||||
|
||||
|
||||
def test_connect_invalid_camera_path():
|
||||
config = OpenCVCameraConfig(index_or_path="nonexistent/camera.png")
|
||||
camera = OpenCVCamera(config)
|
||||
|
||||
with pytest.raises(ConnectionError):
|
||||
camera.connect()
|
||||
|
||||
|
||||
def test_invalid_width_connect():
|
||||
config = OpenCVCameraConfig(
|
||||
index_or_path="tests/artifacts/cameras/fake_cam.png",
|
||||
width=99999, # Invalid width to trigger error
|
||||
height=480,
|
||||
)
|
||||
camera = OpenCVCamera(config)
|
||||
|
||||
with pytest.raises(RuntimeError):
|
||||
camera.connect()
|
||||
|
||||
|
||||
def test_read():
|
||||
config = OpenCVCameraConfig(index_or_path="tests/artifacts/cameras/fake_cam.png")
|
||||
camera = OpenCVCamera(config)
|
||||
camera.connect()
|
||||
|
||||
img = camera.read()
|
||||
|
||||
assert isinstance(img, np.ndarray)
|
||||
|
||||
|
||||
def test_read_before_connect():
|
||||
config = OpenCVCameraConfig(index_or_path="tests/artifacts/cameras/fake_cam.png")
|
||||
camera = OpenCVCamera(config)
|
||||
|
||||
with pytest.raises(DeviceNotConnectedError):
|
||||
_ = camera.read()
|
||||
|
||||
|
||||
def test_disconnect():
|
||||
config = OpenCVCameraConfig(index_or_path="tests/artifacts/cameras/fake_cam.png")
|
||||
camera = OpenCVCamera(config)
|
||||
camera.connect()
|
||||
|
||||
camera.disconnect()
|
||||
|
||||
assert not camera.is_connected
|
||||
|
||||
|
||||
def test_disconnect_before_connect():
|
||||
config = OpenCVCameraConfig(index_or_path="tests/artifacts/cameras/fake_cam.png")
|
||||
camera = OpenCVCamera(config)
|
||||
|
||||
with pytest.raises(DeviceNotConnectedError):
|
||||
_ = camera.disconnect()
|
||||
|
||||
|
||||
def test_async_read():
|
||||
config = OpenCVCameraConfig(index_or_path="tests/artifacts/cameras/fake_cam.png")
|
||||
camera = OpenCVCamera(config)
|
||||
camera.connect()
|
||||
|
||||
img = camera.async_read()
|
||||
|
||||
assert camera.thread is not None
|
||||
assert camera.thread.is_alive()
|
||||
assert isinstance(img, np.ndarray)
|
||||
camera.disconnect() # To stop/join the thread. Otherwise get warnings when the test ends
|
||||
|
||||
|
||||
def test_async_read_timeout():
|
||||
config = OpenCVCameraConfig(index_or_path="tests/artifacts/cameras/fake_cam.png")
|
||||
camera = OpenCVCamera(config)
|
||||
camera.connect()
|
||||
|
||||
with pytest.raises(TimeoutError):
|
||||
_ = camera.async_read(timeout_ms=1)
|
||||
|
||||
camera.disconnect()
|
||||
|
||||
|
||||
def test_async_read_before_connect():
|
||||
config = OpenCVCameraConfig(index_or_path="tests/artifacts/cameras/fake_cam.png")
|
||||
camera = OpenCVCamera(config)
|
||||
|
||||
with pytest.raises(DeviceNotConnectedError):
|
||||
_ = camera.async_read()
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"rotation",
|
||||
[
|
||||
Cv2Rotation.NO_ROTATION,
|
||||
Cv2Rotation.ROTATE_90,
|
||||
Cv2Rotation.ROTATE_180,
|
||||
Cv2Rotation.ROTATE_270,
|
||||
],
|
||||
)
|
||||
def test_all_rotations(rotation):
|
||||
config = OpenCVCameraConfig(index_or_path="tests/artifacts/cameras/fake_cam.png", rotation=rotation)
|
||||
camera = OpenCVCamera(config)
|
||||
camera.connect()
|
||||
|
||||
img = camera.read()
|
||||
assert isinstance(img, np.ndarray)
|
||||
|
||||
if rotation in (Cv2Rotation.ROTATE_90, Cv2Rotation.ROTATE_270):
|
||||
assert camera.width == 480
|
||||
assert camera.height == 640
|
||||
assert img.shape[:2] == (640, 480)
|
||||
else:
|
||||
assert camera.width == 640
|
||||
assert camera.height == 480
|
||||
assert img.shape[:2] == (480, 640)
|
||||
|
||||
camera.disconnect()
|
||||
|
||||
Reference in New Issue
Block a user