test(cameras): add minimal realsense test

This commit is contained in:
Steven Palma
2025-05-05 18:36:41 +02:00
parent 15b5d28f45
commit b089c6db3a
5 changed files with 188 additions and 2 deletions

View File

@@ -217,7 +217,7 @@ class RealSenseCamera(Camera):
def _configure_realsense_settings(self) -> rs.config:
"""Creates and configures the RealSense pipeline configuration object."""
rs_config = rs.config()
rs_config.enable_device(self.serial_number)
rs.config.enable_device(rs_config, self.serial_number)
if self.capture_width and self.capture_height and self.fps:
logger.debug(
@@ -300,6 +300,7 @@ class RealSenseCamera(Camera):
self.width, self.height = self.capture_width, self.capture_height
logger.debug(f"Final image dimensions set to: {self.width}x{self.height} (after rotation if any)")
# NOTE(Steven): Add a wamr-up period time config
def connect(self):
"""
Connects to the RealSense camera specified in the configuration.

View File

@@ -36,7 +36,7 @@ class RealSenseCameraConfig(CameraConfig):
name: str | None = None
serial_number: int | None = None
fps: int | None = None
width: int | None = None
width: int | None = None # NOTE(Steven): Make this not None allowed!
height: int | None = None
color_mode: ColorMode = ColorMode.RGB
channels: int | None = 3

Binary file not shown.

View File

@@ -21,6 +21,8 @@ from lerobot.common.cameras.configs import Cv2Rotation
from lerobot.common.cameras.opencv import OpenCVCamera, OpenCVCameraConfig
from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
# NOTE(Steven): Patch get/set calls
def test_base_class_implementation():
config = OpenCVCameraConfig(index_or_path=0)

View File

@@ -0,0 +1,183 @@
#!/usr/bin/env python
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from unittest.mock import patch
import numpy as np
import pytest
from lerobot.common.cameras.configs import Cv2Rotation
from lerobot.common.cameras.intel import RealSenseCamera, RealSenseCameraConfig
from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
TEST_ARTIFACTS_DIR = os.path.join(os.path.dirname(os.path.dirname(__file__)), "artifacts", "cameras")
BAG_FILE_PATH = os.path.join(TEST_ARTIFACTS_DIR, "test.bag")
if not os.path.exists(BAG_FILE_PATH):
print(f"Warning: Bag file not found at {BAG_FILE_PATH}. Some tests might fail or be skipped.")
def mock_rs_config_enable_device_from_file(rs_config_instance, sn):
if not os.path.exists(BAG_FILE_PATH):
raise FileNotFoundError(f"Test bag file not found: {BAG_FILE_PATH}")
print(f"MOCK: Calling enable_device_from_file with: {BAG_FILE_PATH}")
return rs_config_instance.enable_device_from_file(BAG_FILE_PATH, repeat_playback=True)
def mock_rs_config_enable_device_bad_file(rs_config_instance, sn):
return rs_config_instance.enable_device_from_file("non_existent_file.bag", repeat_playback=True)
@patch("pyrealsense2.config.enable_device", side_effect=mock_rs_config_enable_device_from_file)
def test_connect(mock_enable_device):
config = RealSenseCameraConfig(serial_number=42)
camera = RealSenseCamera(config)
camera.connect()
assert camera.is_connected
def test_base_class_implementation():
config = RealSenseCameraConfig(serial_number=42)
_ = RealSenseCamera(config)
@patch("pyrealsense2.config.enable_device", side_effect=mock_rs_config_enable_device_from_file)
def test_connect_already_connected(mock_enable_device):
config = RealSenseCameraConfig(serial_number=42)
camera = RealSenseCamera(config)
camera.connect()
with pytest.raises(DeviceAlreadyConnectedError):
camera.connect()
@patch("pyrealsense2.config.enable_device", side_effect=mock_rs_config_enable_device_bad_file)
def test_connect_invalid_camera_path(mock_enable_device):
config = RealSenseCameraConfig(serial_number=42)
camera = RealSenseCamera(config)
with pytest.raises(ConnectionError):
camera.connect()
@patch("pyrealsense2.config.enable_device", side_effect=mock_rs_config_enable_device_from_file)
def test_invalid_width_connect(mock_enable_device):
config = RealSenseCameraConfig(serial_number=42, width=99999, height=480, fps=30)
camera = RealSenseCamera(config)
with pytest.raises(ConnectionError):
camera.connect()
@patch("pyrealsense2.config.enable_device", side_effect=mock_rs_config_enable_device_from_file)
def test_read(mock_enable_device):
config = RealSenseCameraConfig(serial_number=42, width=640, height=480, fps=30)
camera = RealSenseCamera(config)
camera.connect()
img = camera.read()
assert isinstance(img, np.ndarray)
def test_read_before_connect():
config = RealSenseCameraConfig(serial_number=42)
camera = RealSenseCamera(config)
with pytest.raises(DeviceNotConnectedError):
_ = camera.read()
@patch("pyrealsense2.config.enable_device", side_effect=mock_rs_config_enable_device_from_file)
def test_disconnect(mock_enable_device):
config = RealSenseCameraConfig(serial_number=42)
camera = RealSenseCamera(config)
camera.connect()
camera.disconnect()
assert not camera.is_connected
def test_disconnect_before_connect():
config = RealSenseCameraConfig(serial_number=42)
camera = RealSenseCamera(config)
with pytest.raises(DeviceNotConnectedError):
camera.disconnect()
@patch("pyrealsense2.config.enable_device", side_effect=mock_rs_config_enable_device_from_file)
def test_async_read(mock_enable_device):
config = RealSenseCameraConfig(serial_number=42, width=640, height=480, fps=30)
camera = RealSenseCamera(config)
camera.connect()
img = camera.async_read()
assert camera.thread is not None
assert camera.thread.is_alive()
assert isinstance(img, np.ndarray)
camera.disconnect() # To stop/join the thread. Otherwise get warnings when the test ends
@patch("pyrealsense2.config.enable_device", side_effect=mock_rs_config_enable_device_from_file)
def test_async_read_timeout(mock_enable_device):
config = RealSenseCameraConfig(serial_number=42, width=640, height=480, fps=30)
camera = RealSenseCamera(config)
camera.connect()
with pytest.raises(TimeoutError):
camera.async_read(timeout_ms=1)
camera.disconnect()
def test_async_read_before_connect():
config = RealSenseCameraConfig(serial_number=42)
camera = RealSenseCamera(config)
with pytest.raises(DeviceNotConnectedError):
_ = camera.async_read()
@pytest.mark.parametrize(
"rotation",
[
Cv2Rotation.NO_ROTATION,
Cv2Rotation.ROTATE_90,
Cv2Rotation.ROTATE_180,
Cv2Rotation.ROTATE_270,
],
)
@patch("pyrealsense2.config.enable_device", side_effect=mock_rs_config_enable_device_from_file)
def test_all_rotations(mock_enable_device, rotation):
config = RealSenseCameraConfig(serial_number=42, rotation=rotation)
camera = RealSenseCamera(config)
camera.connect()
img = camera.read()
assert isinstance(img, np.ndarray)
if rotation in (Cv2Rotation.ROTATE_90, Cv2Rotation.ROTATE_270):
assert camera.width == 480
assert camera.height == 640
assert img.shape[:2] == (640, 480)
else:
assert camera.width == 640
assert camera.height == 480
assert img.shape[:2] == (480, 640)