diff --git a/lerobot/common/robot_devices/cameras/intelrealsense.py b/lerobot/common/robot_devices/cameras/intelrealsense.py index 4a9310ac..5d604524 100644 --- a/lerobot/common/robot_devices/cameras/intelrealsense.py +++ b/lerobot/common/robot_devices/cameras/intelrealsense.py @@ -14,9 +14,7 @@ from dataclasses import dataclass, replace from pathlib import Path from threading import Thread -import cv2 import numpy as np -import pyrealsense2 as rs from PIL import Image from lerobot.common.robot_devices.utils import ( @@ -29,14 +27,23 @@ from lerobot.scripts.control_robot import busy_wait SERIAL_NUMBER_INDEX = 1 -def find_camera_indices(raise_when_empty=True) -> list[int]: +def find_camera_indices(raise_when_empty=True, mock=False) -> list[int]: """ Find the serial numbers of the Intel RealSense cameras connected to the computer. """ + if mock: + from tests.mock_intelrealsense import ( + RSCameraInfo, + RSContext, + ) + else: + from pyrealsense2 import camera_info as RSCameraInfo # noqa: N812 + from pyrealsense2 import context as RSContext # noqa: N812 + camera_ids = [] - for device in rs.context().query_devices(): - serial_number = int(device.get_info(rs.camera_info(SERIAL_NUMBER_INDEX))) + for device in RSContext().query_devices(): + serial_number = int(device.get_info(RSCameraInfo(SERIAL_NUMBER_INDEX))) camera_ids.append(serial_number) if raise_when_empty and len(camera_ids) == 0: @@ -65,18 +72,24 @@ def save_images_from_cameras( width=None, height=None, record_time_s=2, + mock=False, ): """ Initializes all the cameras and saves images to the directory. Useful to visually identify the camera associated to a given camera index. """ if camera_ids is None: - camera_ids = find_camera_indices() + camera_ids = find_camera_indices(mock=mock) + + if mock: + from tests.mock_opencv import COLOR_RGB2BGR, cvtColor + else: + from cv2 import COLOR_RGB2BGR, cvtColor print("Connecting cameras") cameras = [] for cam_idx in camera_ids: - camera = IntelRealSenseCamera(cam_idx, fps=fps, width=width, height=height) + camera = IntelRealSenseCamera(cam_idx, fps=fps, width=width, height=height, mock=mock) camera.connect() print( f"IntelRealSenseCamera({camera.camera_index}, fps={camera.fps}, width={camera.width}, height={camera.height}, color_mode={camera.color_mode})" @@ -104,7 +117,8 @@ def save_images_from_cameras( image = camera.read() if fps is None else camera.async_read() if image is None: print("No Frame") - bgr_converted_image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) + + bgr_converted_image = cvtColor(image, COLOR_RGB2BGR) executor.submit( save_image, @@ -150,6 +164,7 @@ class IntelRealSenseCameraConfig: color_mode: str = "rgb" use_depth: bool = False force_hardware_reset: bool = True + mock: bool = False def __post_init__(self): if self.color_mode not in ["rgb", "bgr"]: @@ -231,6 +246,7 @@ class IntelRealSenseCamera: self.color_mode = config.color_mode self.use_depth = config.use_depth self.force_hardware_reset = config.force_hardware_reset + self.mock = config.mock self.camera = None self.is_connected = False @@ -246,22 +262,35 @@ class IntelRealSenseCamera: f"IntelRealSenseCamera({self.camera_index}) is already connected." ) - config = rs.config() + if self.mock: + from tests.mock_intelrealsense import ( + RSConfig, + RSFormat, + RSPipeline, + RSStream, + ) + else: + from pyrealsense2 import config as RSConfig # noqa: N812 + from pyrealsense2 import format as RSFormat # noqa: N812 + from pyrealsense2 import pipeline as RSPipeline # noqa: N812 + from pyrealsense2 import stream as RSStream # noqa: N812 + + config = RSConfig() config.enable_device(str(self.camera_index)) if self.fps and self.width and self.height: # TODO(rcadene): can we set rgb8 directly? - config.enable_stream(rs.stream.color, self.width, self.height, rs.format.rgb8, self.fps) + config.enable_stream(RSStream.color, self.width, self.height, RSFormat.rgb8, self.fps) else: - config.enable_stream(rs.stream.color) + config.enable_stream(RSStream.color) if self.use_depth: if self.fps and self.width and self.height: - config.enable_stream(rs.stream.depth, self.width, self.height, rs.format.z16, self.fps) + config.enable_stream(RSStream.depth, self.width, self.height, RSFormat.z16, self.fps) else: - config.enable_stream(rs.stream.depth) + config.enable_stream(RSStream.depth) - self.camera = rs.pipeline() + self.camera = RSPipeline() try: profile = self.camera.start(config) is_camera_open = True @@ -282,7 +311,7 @@ class IntelRealSenseCamera: raise OSError(f"Can't access IntelRealSenseCamera({self.camera_index}).") - color_stream = profile.get_stream(rs.stream.color) + color_stream = profile.get_stream(RSStream.color) color_profile = color_stream.as_video_stream_profile() actual_fps = color_profile.fps() actual_width = color_profile.width() @@ -343,7 +372,12 @@ class IntelRealSenseCamera: # IntelRealSense uses RGB format as default (red, green, blue). if requested_color_mode == "bgr": - color_image = cv2.cvtColor(color_image, cv2.COLOR_RGB2BGR) + if self.mock: + from tests.mock_opencv import COLOR_RGB2BGR, cvtColor + else: + from cv2 import COLOR_RGB2BGR, cvtColor + + color_image = cvtColor(color_image, COLOR_RGB2BGR) h, w, _ = color_image.shape if h != self.height or w != self.width: diff --git a/lerobot/common/robot_devices/cameras/opencv.py b/lerobot/common/robot_devices/cameras/opencv.py index a2a4407e..9ff47b6a 100644 --- a/lerobot/common/robot_devices/cameras/opencv.py +++ b/lerobot/common/robot_devices/cameras/opencv.py @@ -13,7 +13,6 @@ from dataclasses import dataclass, replace from pathlib import Path from threading import Thread -import cv2 import numpy as np from PIL import Image @@ -24,10 +23,6 @@ from lerobot.common.robot_devices.utils import ( ) from lerobot.common.utils.utils import capture_timestamp_utc -# Use 1 thread to avoid blocking the main thread. Especially useful during data collection -# when other threads are used to save the images. -cv2.setNumThreads(1) - # The maximum opencv device index depends on your operating system. For instance, # if you have 3 cameras, they should be associated to index 0, 1, and 2. This is the case # on MacOS. However, on Ubuntu, the indices are different like 6, 16, 23. @@ -36,7 +31,7 @@ cv2.setNumThreads(1) MAX_OPENCV_INDEX = 60 -def find_camera_indices(raise_when_empty=False, max_index_search_range=MAX_OPENCV_INDEX): +def find_camera_indices(raise_when_empty=False, max_index_search_range=MAX_OPENCV_INDEX, mock=False): if platform.system() == "Linux": # Linux uses camera ports print("Linux detected. Finding available camera indices through scanning '/dev/video*' ports") @@ -51,9 +46,14 @@ def find_camera_indices(raise_when_empty=False, max_index_search_range=MAX_OPENC ) possible_camera_ids = range(max_index_search_range) + if mock: + from tests.mock_opencv import VideoCapture + else: + from cv2 import VideoCapture + camera_ids = [] for camera_idx in possible_camera_ids: - camera = cv2.VideoCapture(camera_idx) + camera = VideoCapture(camera_idx) is_open = camera.isOpened() camera.release() @@ -78,19 +78,25 @@ def save_image(img_array, camera_index, frame_index, images_dir): def save_images_from_cameras( - images_dir: Path, camera_ids: list[int] | None = None, fps=None, width=None, height=None, record_time_s=2 + images_dir: Path, + camera_ids: list[int] | None = None, + fps=None, + width=None, + height=None, + record_time_s=2, + mock=False, ): """ Initializes all the cameras and saves images to the directory. Useful to visually identify the camera associated to a given camera index. """ if camera_ids is None: - camera_ids = find_camera_indices() + camera_ids = find_camera_indices(mock=mock) print("Connecting cameras") cameras = [] for cam_idx in camera_ids: - camera = OpenCVCamera(cam_idx, fps=fps, width=width, height=height) + camera = OpenCVCamera(cam_idx, fps=fps, width=width, height=height, mock=mock) camera.connect() print( f"OpenCVCamera({camera.camera_index}, fps={camera.fps}, width={camera.width}, " @@ -156,6 +162,7 @@ class OpenCVCameraConfig: width: int | None = None height: int | None = None color_mode: str = "rgb" + mock: bool = False def __post_init__(self): if self.color_mode not in ["rgb", "bgr"]: @@ -215,6 +222,7 @@ class OpenCVCamera: self.width = config.width self.height = config.height self.color_mode = config.color_mode + self.mock = config.mock self.camera = None self.is_connected = False @@ -235,11 +243,31 @@ class OpenCVCamera: if self.is_connected: raise RobotDeviceAlreadyConnectedError(f"OpenCVCamera({self.camera_index}) is already connected.") + if self.mock: + from tests.mock_opencv import ( + CAP_PROP_FPS, + CAP_PROP_FRAME_HEIGHT, + CAP_PROP_FRAME_WIDTH, + VideoCapture, + ) + else: + import cv2 + from cv2 import ( + CAP_PROP_FPS, + CAP_PROP_FRAME_HEIGHT, + CAP_PROP_FRAME_WIDTH, + VideoCapture, + ) + + # Use 1 thread to avoid blocking the main thread. Especially useful during data collection + # when other threads are used to save the images. + cv2.setNumThreads(1) + camera_idx = f"/dev/video{self.camera_index}" if platform.system() == "Linux" else self.camera_index with self.lock: # First create a temporary camera trying to access `camera_index`, # and verify it is a valid camera by calling `isOpened`. - tmp_camera = cv2.VideoCapture(camera_idx) + tmp_camera = VideoCapture(camera_idx) is_camera_open = tmp_camera.isOpened() # Release camera to make it accessible for `find_camera_indices` tmp_camera.release() @@ -262,18 +290,18 @@ class OpenCVCamera: # Note: For some unknown reason, calling `isOpened` blocks the camera which then # needs to be re-created. with self.lock: - self.camera = cv2.VideoCapture(camera_idx) + self.camera = VideoCapture(camera_idx) if self.fps is not None: - self.camera.set(cv2.CAP_PROP_FPS, self.fps) + self.camera.set(CAP_PROP_FPS, self.fps) if self.width is not None: - self.camera.set(cv2.CAP_PROP_FRAME_WIDTH, self.width) + self.camera.set(CAP_PROP_FRAME_WIDTH, self.width) if self.height is not None: - self.camera.set(cv2.CAP_PROP_FRAME_HEIGHT, self.height) + self.camera.set(CAP_PROP_FRAME_HEIGHT, self.height) - actual_fps = self.camera.get(cv2.CAP_PROP_FPS) - actual_width = self.camera.get(cv2.CAP_PROP_FRAME_WIDTH) - actual_height = self.camera.get(cv2.CAP_PROP_FRAME_HEIGHT) + actual_fps = self.camera.get(CAP_PROP_FPS) + actual_width = self.camera.get(CAP_PROP_FRAME_WIDTH) + actual_height = self.camera.get(CAP_PROP_FRAME_HEIGHT) # Using `math.isclose` since actual fps can be a float (e.g. 29.9 instead of 30) if self.fps is not None and not math.isclose(self.fps, actual_fps, rel_tol=1e-3): @@ -327,7 +355,12 @@ class OpenCVCamera: # However, Deep Learning framework such as LeRobot uses RGB format as default to train neural networks, # so we convert the image color from BGR to RGB. if requested_color_mode == "rgb": - color_image = cv2.cvtColor(color_image, cv2.COLOR_BGR2RGB) + if self.mock: + from tests.mock_opencv import COLOR_BGR2RGB, cvtColor + else: + from cv2 import COLOR_BGR2RGB, cvtColor + + color_image = cvtColor(color_image, COLOR_BGR2RGB) h, w, _ = color_image.shape if h != self.height or w != self.width: diff --git a/lerobot/common/robot_devices/motors/dynamixel.py b/lerobot/common/robot_devices/motors/dynamixel.py index 491963fe..b6dd8aff 100644 --- a/lerobot/common/robot_devices/motors/dynamixel.py +++ b/lerobot/common/robot_devices/motors/dynamixel.py @@ -8,17 +8,6 @@ from pathlib import Path import numpy as np import tqdm -from dynamixel_sdk import ( - COMM_SUCCESS, - DXL_HIBYTE, - DXL_HIWORD, - DXL_LOBYTE, - DXL_LOWORD, - GroupSyncRead, - GroupSyncWrite, - PacketHandler, - PortHandler, -) from lerobot.common.robot_devices.utils import RobotDeviceAlreadyConnectedError, RobotDeviceNotConnectedError from lerobot.common.utils.utils import capture_timestamp_utc @@ -154,6 +143,8 @@ MODEL_BAUDRATE_TABLE = { NUM_READ_RETRY = 10 NUM_WRITE_RETRY = 10 +COMM_SUCCESS = 0 + def convert_degrees_to_steps(degrees: float | np.ndarray, models: str | list[str]) -> np.ndarray: """This function converts the degree range to the step range for indicating motors rotation. @@ -166,7 +157,17 @@ def convert_degrees_to_steps(degrees: float | np.ndarray, models: str | list[str return steps -def convert_to_bytes(value, bytes): +def convert_to_bytes(value, bytes, mock=False): + if mock: + return value + + from dynamixel_sdk import ( + DXL_HIBYTE, + DXL_HIWORD, + DXL_LOBYTE, + DXL_LOWORD, + ) + # Note: No need to convert back into unsigned int, since this byte preprocessing # already handles it for us. if bytes == 1: @@ -333,9 +334,11 @@ class DynamixelMotorsBus: motors: dict[str, tuple[int, str]], extra_model_control_table: dict[str, list[tuple]] | None = None, extra_model_resolution: dict[str, int] | None = None, + mock=False, ): self.port = port self.motors = motors + self.mock = mock self.model_ctrl_table = deepcopy(MODEL_CONTROL_TABLE) if extra_model_control_table: @@ -359,6 +362,11 @@ class DynamixelMotorsBus: f"DynamixelMotorsBus({self.port}) is already connected. Do not call `motors_bus.connect()` twice." ) + if self.mock: + from tests.mock_dynamixel import PacketHandler, PortHandler + else: + from dynamixel_sdk import PacketHandler, PortHandler + self.port_handler = PortHandler(self.port) self.packet_handler = PacketHandler(PROTOCOL_VERSION) @@ -392,10 +400,17 @@ class DynamixelMotorsBus: self.configure_motors() def reconnect(self): + if self.mock: + from tests.mock_dynamixel import PacketHandler, PortHandler + else: + from dynamixel_sdk import PacketHandler, PortHandler + self.port_handler = PortHandler(self.port) self.packet_handler = PacketHandler(PROTOCOL_VERSION) + if not self.port_handler.openPort(): raise OSError(f"Failed to open port '{self.port}'.") + self.is_connected = True def are_motors_configured(self): @@ -781,6 +796,11 @@ class DynamixelMotorsBus: return values def _read_with_motor_ids(self, motor_models, motor_ids, data_name): + if self.mock: + from tests.mock_dynamixel import GroupSyncRead + else: + from dynamixel_sdk import GroupSyncRead + return_list = True if not isinstance(motor_ids, list): return_list = False @@ -817,6 +837,11 @@ class DynamixelMotorsBus: start_time = time.perf_counter() + if self.mock: + from tests.mock_dynamixel import GroupSyncRead + else: + from dynamixel_sdk import GroupSyncRead + if motor_names is None: motor_names = self.motor_names @@ -876,6 +901,11 @@ class DynamixelMotorsBus: return values def _write_with_motor_ids(self, motor_models, motor_ids, data_name, values): + if self.mock: + from tests.mock_dynamixel import GroupSyncWrite + else: + from dynamixel_sdk import GroupSyncWrite + if not isinstance(motor_ids, list): motor_ids = [motor_ids] if not isinstance(values, list): @@ -885,7 +915,7 @@ class DynamixelMotorsBus: addr, bytes = self.model_ctrl_table[motor_models[0]][data_name] group = GroupSyncWrite(self.port_handler, self.packet_handler, addr, bytes) for idx, value in zip(motor_ids, values, strict=True): - data = convert_to_bytes(value, bytes) + data = convert_to_bytes(value, bytes, self.mock) group.addParam(idx, data) comm = group.txPacket() @@ -903,6 +933,11 @@ class DynamixelMotorsBus: start_time = time.perf_counter() + if self.mock: + from tests.mock_dynamixel import GroupSyncWrite + else: + from dynamixel_sdk import GroupSyncWrite + if motor_names is None: motor_names = self.motor_names @@ -937,7 +972,7 @@ class DynamixelMotorsBus: ) for idx, value in zip(motor_ids, values, strict=True): - data = convert_to_bytes(value, bytes) + data = convert_to_bytes(value, bytes, self.mock) if init_group: self.group_writers[group_key].addParam(idx, data) else: diff --git a/tests/conftest.py b/tests/conftest.py index ac1d0b76..826c6bf4 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -62,8 +62,8 @@ def is_camera_available(camera_type): del camera return True except Exception: - traceback.print_exc() print(f"\nA {camera_type} camera is not available.") + traceback.print_exc() return False @@ -83,3 +83,12 @@ def is_motor_available(motor_type): traceback.print_exc() print(f"\nA {motor_type} motor is not available.") return False + + +@pytest.fixture +def patch_builtins_input(monkeypatch): + def print_text(text=None): + if text is not None: + print(text) + + monkeypatch.setattr("builtins.input", print_text) diff --git a/tests/mock_dynamixel.py b/tests/mock_dynamixel.py index a3063e63..6d0ed20e 100644 --- a/tests/mock_dynamixel.py +++ b/tests/mock_dynamixel.py @@ -5,19 +5,20 @@ Warning: These mocked versions are minimalist. They do not exactly mock every be from the original classes and functions (e.g. return types might be None instead of boolean). """ -from dynamixel_sdk import COMM_SUCCESS +# from dynamixel_sdk import COMM_SUCCESS DEFAULT_BAUDRATE = 9_600 +COMM_SUCCESS = 0 # tx or rx packet communication success -def mock_convert_to_bytes(value, bytes): +def convert_to_bytes(value, bytes): # TODO(rcadene): remove need to mock `convert_to_bytes` by implemented the inverse transform # `convert_bytes_to_value` del bytes # unused return value -class MockPortHandler: +class PortHandler: def __init__(self, port): self.port = port # factory default baudrate @@ -39,14 +40,14 @@ class MockPortHandler: self.baudrate = baudrate -class MockPacketHandler: +class PacketHandler: def __init__(self, protocol_version): del protocol_version # unused # Use packet_handler.data to communicate across Read and Write self.data = {} -class MockGroupSyncRead: +class GroupSyncRead: def __init__(self, port_handler, packet_handler, address, bytes): self.packet_handler = packet_handler @@ -71,7 +72,7 @@ class MockGroupSyncRead: return self.packet_handler.data[index][address] -class MockGroupSyncWrite: +class GroupSyncWrite: def __init__(self, port_handler, packet_handler, address, bytes): self.packet_handler = packet_handler self.address = address diff --git a/tests/mock_intelrealsense.py b/tests/mock_intelrealsense.py index c1d6add2..b9c6ccf9 100644 --- a/tests/mock_intelrealsense.py +++ b/tests/mock_intelrealsense.py @@ -3,33 +3,33 @@ import enum import numpy as np -class MockStream(enum.Enum): +class RSStream(enum.Enum): color = 0 depth = 1 -class MockFormat(enum.Enum): +class RSFormat(enum.Enum): rgb8 = 0 z16 = 1 -class MockConfig: +class RSConfig: def enable_device(self, device_id: str): self.device_enabled = device_id def enable_stream( - self, stream_type: MockStream, width=None, height=None, color_format: MockFormat = None, fps=None + self, stream_type: RSStream, width=None, height=None, color_format: RSFormat = None, fps=None ): self.stream_type = stream_type # Overwrite default values when possible self.width = 848 if width is None else width self.height = 480 if height is None else height - self.color_format = MockFormat.rgb8 if color_format is None else color_format + self.color_format = RSFormat.rgb8 if color_format is None else color_format self.fps = 30 if fps is None else fps -class MockColorProfile: - def __init__(self, config: MockConfig): +class RSColorProfile: + def __init__(self, config: RSConfig): self.config = config def fps(self): @@ -42,32 +42,32 @@ class MockColorProfile: return self.config.height -class MockColorStream: - def __init__(self, config: MockConfig): +class RSColorStream: + def __init__(self, config: RSConfig): self.config = config def as_video_stream_profile(self): - return MockColorProfile(self.config) + return RSColorProfile(self.config) -class MockProfile: - def __init__(self, config: MockConfig): +class RSProfile: + def __init__(self, config: RSConfig): self.config = config - def get_stream(self, color_format: MockFormat): + def get_stream(self, color_format: RSFormat): del color_format # unused - return MockColorStream(self.config) + return RSColorStream(self.config) -class MockPipeline: +class RSPipeline: def __init__(self): self.started = False self.config = None - def start(self, config: MockConfig): + def start(self, config: RSConfig): self.started = True self.config = config - return MockProfile(self.config) + return RSProfile(self.config) def stop(self): if not self.started: @@ -77,22 +77,22 @@ class MockPipeline: def wait_for_frames(self, timeout_ms=50000): del timeout_ms # unused - return MockFrames(self.config) + return RSFrames(self.config) -class MockFrames: - def __init__(self, config: MockConfig): +class RSFrames: + def __init__(self, config: RSConfig): self.config = config def get_color_frame(self): - return MockColorFrame(self.config) + return RSColorFrame(self.config) def get_depth_frame(self): - return MockDepthFrame(self.config) + return RSDepthFrame(self.config) -class MockColorFrame: - def __init__(self, config: MockConfig): +class RSColorFrame: + def __init__(self, config: RSConfig): self.config = config def get_data(self): @@ -102,15 +102,15 @@ class MockColorFrame: return data -class MockDepthFrame: - def __init__(self, config: MockConfig): +class RSDepthFrame: + def __init__(self, config: RSConfig): self.config = config def get_data(self): return np.ones((self.config.height, self.config.width), dtype=np.uint16) -class MockDevice: +class RSDevice: def __init__(self): pass @@ -120,9 +120,15 @@ class MockDevice: return "123456789" -class MockContext: +class RSContext: def __init__(self): pass def query_devices(self): - return [MockDevice()] + return [RSDevice()] + + +class RSCameraInfo: + def __init__(self, serial_number): + del serial_number + pass diff --git a/tests/mock_opencv.py b/tests/mock_opencv.py index 8a89c0b5..31d808cb 100644 --- a/tests/mock_opencv.py +++ b/tests/mock_opencv.py @@ -1,20 +1,31 @@ from functools import cache -import cv2 import numpy as np +CAP_PROP_FPS = 5 +CAP_PROP_FRAME_WIDTH = 3 +CAP_PROP_FRAME_HEIGHT = 4 +COLOR_BGR2RGB = 4 + @cache def _generate_image(width: int, height: int): return np.random.randint(0, 256, size=(height, width, 3), dtype=np.uint8) -class MockVideoCapture: +def cvtColor(color_image, color_convertion): # noqa: N802 + if color_convertion == COLOR_BGR2RGB: + return color_image[:, :, [2, 1, 0]] + else: + raise NotImplementedError(color_convertion) + + +class VideoCapture: def __init__(self, *args, **kwargs): self._mock_dict = { - cv2.CAP_PROP_FPS: 30, - cv2.CAP_PROP_FRAME_WIDTH: 640, - cv2.CAP_PROP_FRAME_HEIGHT: 480, + CAP_PROP_FPS: 30, + CAP_PROP_FRAME_WIDTH: 640, + CAP_PROP_FRAME_HEIGHT: 480, } self._is_opened = True @@ -32,17 +43,17 @@ class MockVideoCapture: raise RuntimeError("Camera is not opened") value = self._mock_dict[propId] if value == 0: - if propId == cv2.CAP_PROP_FRAME_HEIGHT: + if propId == CAP_PROP_FRAME_HEIGHT: value = 480 - elif propId == cv2.CAP_PROP_FRAME_WIDTH: + elif propId == CAP_PROP_FRAME_WIDTH: value = 640 return value def read(self): if not self._is_opened: raise RuntimeError("Camera is not opened") - h = self.get(cv2.CAP_PROP_FRAME_HEIGHT) - w = self.get(cv2.CAP_PROP_FRAME_WIDTH) + h = self.get(CAP_PROP_FRAME_HEIGHT) + w = self.get(CAP_PROP_FRAME_WIDTH) ret = True return ret, _generate_image(width=w, height=h) diff --git a/tests/test_cameras.py b/tests/test_cameras.py index 2adeb92e..72da248d 100644 --- a/tests/test_cameras.py +++ b/tests/test_cameras.py @@ -24,9 +24,8 @@ pytest -sx 'tests/test_cameras.py::test_camera[intelrealsense-True]' import numpy as np import pytest -from lerobot import available_cameras from lerobot.common.robot_devices.utils import RobotDeviceAlreadyConnectedError, RobotDeviceNotConnectedError -from tests.utils import make_camera, require_camera, require_mock_camera +from tests.utils import TEST_CAMERA_TYPES, make_camera, require_camera # Maximum absolute difference between two consecutive images recored by a camera. # This value differs with respect to the camera. @@ -37,7 +36,9 @@ def compute_max_pixel_difference(first_image, second_image): return np.abs(first_image.astype(float) - second_image.astype(float)).max() -def _test_camera(camera_type): +@pytest.mark.parametrize("camera_type, mock", TEST_CAMERA_TYPES) +@require_camera +def test_camera(request, camera_type, mock): """Test assumes that `camera.read()` returns the same image when called multiple times in a row. So the environment should not change (you shouldnt be in front of the camera) and the camera should not be moving. @@ -47,8 +48,11 @@ def _test_camera(camera_type): # TODO(rcadene): measure fps in nightly? # TODO(rcadene): test logs + if camera_type == "opencv" and not mock: + pytest.skip("TODO(rcadene): fix test for opencv physical camera") + # Test instantiating - camera = make_camera(camera_type) + camera = make_camera(camera_type, mock=mock) # Test reading, async reading, disconnecting before connecting raises an error with pytest.raises(RobotDeviceNotConnectedError): @@ -62,7 +66,7 @@ def _test_camera(camera_type): del camera # Test connecting - camera = make_camera(camera_type) + camera = make_camera(camera_type, mock=mock) camera.connect() assert camera.is_connected assert camera.fps is not None @@ -102,12 +106,12 @@ def _test_camera(camera_type): assert camera.thread is None # Test disconnecting with `__del__` - camera = make_camera(camera_type) + camera = make_camera(camera_type, mock=mock) camera.connect() del camera # Test acquiring a bgr image - camera = make_camera(camera_type, color_mode="bgr") + camera = make_camera(camera_type, color_mode="bgr", mock=mock) camera.connect() assert camera.color_mode == "bgr" bgr_color_image = camera.read() @@ -120,7 +124,7 @@ def _test_camera(camera_type): # TODO(rcadene): Add a test for a camera that supports fps=60 # Test width and height can be set - camera = make_camera(camera_type, fps=30, width=1280, height=720) + camera = make_camera(camera_type, fps=30, width=1280, height=720, mock=mock) camera.connect() assert camera.fps == 30 assert camera.width == 1280 @@ -133,41 +137,19 @@ def _test_camera(camera_type): del camera # Test not supported width and height raise an error - camera = make_camera(camera_type, fps=30, width=0, height=0) + camera = make_camera(camera_type, fps=30, width=0, height=0, mock=mock) with pytest.raises(OSError): camera.connect() del camera -def _test_save_images_from_cameras(tmpdir, camera_type): +@pytest.mark.parametrize("camera_type, mock", TEST_CAMERA_TYPES) +@require_camera +def test_save_images_from_cameras(tmpdir, request, camera_type, mock): # TODO(rcadene): refactor if camera_type == "opencv": from lerobot.common.robot_devices.cameras.opencv import save_images_from_cameras elif camera_type == "intelrealsense": from lerobot.common.robot_devices.cameras.intelrealsense import save_images_from_cameras - save_images_from_cameras(tmpdir, record_time_s=1) - - -@pytest.mark.parametrize("camera_type", available_cameras) -@require_mock_camera -def test_camera_mock(monkeypatch, camera_type): - _test_camera(camera_type) - - -@pytest.mark.parametrize("camera_type", available_cameras) -@require_camera -def test_camera(request, camera_type): - _test_camera(camera_type) - - -@pytest.mark.parametrize("camera_type", available_cameras) -@require_mock_camera -def test_save_images_from_cameras_mock(tmpdir, monkeypatch, camera_type): - _test_save_images_from_cameras(tmpdir, camera_type) - - -@pytest.mark.parametrize("camera_type", available_cameras) -@require_camera -def test_save_images_from_cameras(tmpdir, request, camera_type): - _test_save_images_from_cameras(tmpdir, camera_type) + save_images_from_cameras(tmpdir, record_time_s=1, mock=mock) diff --git a/tests/test_control_robot.py b/tests/test_control_robot.py index 9940c888..75238004 100644 --- a/tests/test_control_robot.py +++ b/tests/test_control_robot.py @@ -27,33 +27,44 @@ from pathlib import Path import pytest -from lerobot import available_robots from lerobot.common.policies.factory import make_policy from lerobot.common.utils.utils import init_hydra_config from lerobot.scripts.control_robot import calibrate, get_available_arms, record, replay, teleoperate from tests.test_robots import make_robot -from tests.utils import DEFAULT_CONFIG_PATH, DEVICE, require_mock_robot, require_robot +from tests.utils import DEFAULT_CONFIG_PATH, DEVICE, TEST_ROBOT_TYPES, require_robot -def _test_teleoperate(robot_type): - robot = make_robot(robot_type) +@pytest.mark.parametrize("robot_type, mock", TEST_ROBOT_TYPES) +@require_robot +def test_teleoperate(request, robot_type, mock): + robot = make_robot(robot_type, mock=mock) teleoperate(robot, teleop_time_s=1) teleoperate(robot, fps=30, teleop_time_s=1) teleoperate(robot, fps=60, teleop_time_s=1) del robot -def _test_calibrate(robot_type): - robot = make_robot(robot_type) +@pytest.mark.parametrize("robot_type, mock", TEST_ROBOT_TYPES) +@require_robot +def test_calibrate(request, robot_type, mock): + if mock: + request.getfixturevalue("patch_builtins_input") + + robot = make_robot(robot_type, mock=mock) calibrate(robot, arms=get_available_arms(robot)) del robot -def _test_record_without_cameras(tmpdir, robot_type): +@pytest.mark.parametrize("robot_type, mock", TEST_ROBOT_TYPES) +@require_robot +def test_record_without_cameras(tmpdir, request, robot_type, mock): + if mock: + request.getfixturevalue("patch_builtins_input") + root = Path(tmpdir) repo_id = "lerobot/debug" - robot = make_robot(robot_type, overrides=["~cameras"]) + robot = make_robot(robot_type, overrides=["~cameras"], mock=mock) record( robot, fps=30, @@ -68,14 +79,19 @@ def _test_record_without_cameras(tmpdir, robot_type): ) -def _test_record_and_replay_and_policy(tmpdir, robot_type): +@pytest.mark.parametrize("robot_type, mock", TEST_ROBOT_TYPES) +@require_robot +def test_record_and_replay_and_policy(tmpdir, request, robot_type, mock): + if mock: + request.getfixturevalue("patch_builtins_input") + env_name = "koch_real" policy_name = "act_koch_real" root = Path(tmpdir) repo_id = "lerobot/debug" - robot = make_robot(robot_type) + robot = make_robot(robot_type, mock=mock) dataset = record( robot, fps=30, @@ -115,51 +131,3 @@ def _test_record_and_replay_and_policy(tmpdir, robot_type): ) del robot - - -@pytest.mark.parametrize("robot_type", available_robots) -@require_mock_robot -def test_teleoperate_mock(monkeypatch, robot_type): - _test_teleoperate(robot_type) - - -@pytest.mark.parametrize("robot_type", available_robots) -@require_robot -def test_teleoperate(request, robot_type): - _test_teleoperate(robot_type) - - -@pytest.mark.parametrize("robot_type", available_robots) -@require_mock_robot -def test_calibrate_mock(monkeypatch, robot_type): - _test_calibrate(robot_type) - - -@pytest.mark.parametrize("robot_type", available_robots) -@require_robot -def test_calibrate(request, robot_type): - _test_calibrate(robot_type) - - -@pytest.mark.parametrize("robot_type", available_robots) -@require_mock_robot -def test_record_without_cameras_mock(tmpdir, monkeypatch, robot_type): - _test_record_without_cameras(tmpdir, robot_type) - - -@pytest.mark.parametrize("robot_type", available_robots) -@require_robot -def test_record_without_cameras(tmpdir, request, robot_type): - _test_record_without_cameras(tmpdir, robot_type) - - -@pytest.mark.parametrize("robot_type", available_robots) -@require_mock_robot -def test_record_and_replay_and_policy_mock(tmpdir, monkeypatch, robot_type): - _test_record_and_replay_and_policy(tmpdir, robot_type) - - -@pytest.mark.parametrize("robot_type", available_robots) -@require_robot -def test_record_and_replay_and_policy(tmpdir, request, robot_type): - _test_record_and_replay_and_policy(tmpdir, robot_type) diff --git a/tests/test_motors.py b/tests/test_motors.py index fcf33a3e..f3f5650c 100644 --- a/tests/test_motors.py +++ b/tests/test_motors.py @@ -26,20 +26,35 @@ pytest -sx 'tests/test_motors.py::test_motors_bus[dynamixel-True]' # TODO(rcadene): add compatibility with other motors bus import time -import traceback import numpy as np import pytest -from lerobot import available_motors +from lerobot.common.robot_devices.motors.dynamixel import find_port from lerobot.common.robot_devices.utils import RobotDeviceAlreadyConnectedError, RobotDeviceNotConnectedError -from tests.utils import make_motors_bus, mock_builtins_input, require_motor +from tests.utils import TEST_MOTOR_TYPES, make_motors_bus, require_motor -def _test_configure_motors_all_ids_1(motor_type): +@pytest.mark.parametrize("motor_type, mock", TEST_MOTOR_TYPES) +@require_motor +def test_find_port_mock(request, motor_type, mock): + if mock: + request.getfixturevalue("patch_builtins_input") + with pytest.raises(OSError): + find_port() + else: + find_port() + + +@pytest.mark.parametrize("motor_type, mock", TEST_MOTOR_TYPES) +@require_motor +def test_configure_motors_all_ids_1_mock(request, motor_type, mock): + if mock: + request.getfixturevalue("patch_builtins_input") + input("Are you sure you want to re-configure the motors? Press enter to continue...") # This test expect the configuration was already correct. - motors_bus = make_motors_bus(motor_type) + motors_bus = make_motors_bus(motor_type, mock=mock) motors_bus.connect() motors_bus.write("Baud_Rate", [0] * len(motors_bus.motors)) motors_bus.set_bus_baudrate(9_600) @@ -47,14 +62,19 @@ def _test_configure_motors_all_ids_1(motor_type): del motors_bus # Test configure - motors_bus = make_motors_bus(motor_type) + motors_bus = make_motors_bus(motor_type, mock=mock) motors_bus.connect() assert motors_bus.are_motors_configured() del motors_bus -def _test_motors_bus(motor_type): - motors_bus = make_motors_bus(motor_type) +@pytest.mark.parametrize("motor_type, mock", TEST_MOTOR_TYPES) +@require_motor +def test_motors_bus(request, motor_type, mock): + if mock: + request.getfixturevalue("patch_builtins_input") + + motors_bus = make_motors_bus(motor_type, mock=mock) # Test reading and writting before connecting raises an error with pytest.raises(RobotDeviceNotConnectedError): @@ -68,7 +88,7 @@ def _test_motors_bus(motor_type): del motors_bus # Test connecting - motors_bus = make_motors_bus(motor_type) + motors_bus = make_motors_bus(motor_type, mock=mock) motors_bus.connect() # Test connecting twice raises an error @@ -110,82 +130,3 @@ def _test_motors_bus(motor_type): time.sleep(1) new_values = motors_bus.read("Present_Position") assert (new_values == values).all() - - -@pytest.mark.parametrize("motor_type", available_motors) -def test_find_port_mock(monkeypatch, motor_type): - mock_motor(monkeypatch, motor_type) - from lerobot.common.robot_devices.motors.dynamixel import find_port - - # To run find_port without user input - mock_builtins_input(monkeypatch) - with pytest.raises(OSError): - find_port() - - -@pytest.mark.parametrize("motor_type", available_motors) -@require_motor -def test_find_port(request, motor_type): - from lerobot.common.robot_devices.motors.dynamixel import find_port - - find_port() - - -@pytest.mark.parametrize("motor_type", available_motors) -def test_configure_motors_all_ids_1_mock(monkeypatch, motor_type): - mock_motor(monkeypatch, motor_type) - _test_configure_motors_all_ids_1(motor_type) - - -@pytest.mark.parametrize("motor_type", available_motors) -@require_motor -def test_configure_motors_all_ids_1(request, motor_type): - _test_configure_motors_all_ids_1(motor_type) - - -@pytest.mark.parametrize("motor_type", available_motors) -def test_motors_bus_mock(monkeypatch, motor_type): - mock_motor(monkeypatch, motor_type) - _test_motors_bus(motor_type) - - -@pytest.mark.parametrize("motor_type", available_motors) -@require_motor -def test_motors_bus(request, motor_type): - _test_motors_bus(motor_type) - - -def mock_motor(monkeypatch, motor_type): - if motor_type not in available_motors: - raise ValueError( - f"The motor type '{motor_type}' is not valid. Expected one of these '{available_motors}" - ) - - if motor_type == "dynamixel": - try: - import dynamixel_sdk - - from tests.mock_dynamixel import ( - MockGroupSyncRead, - MockGroupSyncWrite, - MockPacketHandler, - MockPortHandler, - mock_convert_to_bytes, - ) - - monkeypatch.setattr(dynamixel_sdk, "GroupSyncRead", MockGroupSyncRead) - monkeypatch.setattr(dynamixel_sdk, "GroupSyncWrite", MockGroupSyncWrite) - monkeypatch.setattr(dynamixel_sdk, "PacketHandler", MockPacketHandler) - monkeypatch.setattr(dynamixel_sdk, "PortHandler", MockPortHandler) - - # Import dynamixel AFTER mocking dynamixel_sdk to use mocked classes - from lerobot.common.robot_devices.motors import dynamixel - - # TODO(rcadene): remove need to mock `convert_to_bytes` by implemented the inverse transform - # `convert_bytes_to_value` - monkeypatch.setattr(dynamixel, "convert_to_bytes", mock_convert_to_bytes) - except ImportError: - traceback.print_exc() - pytest.skip("To avoid skipping tests mocking dynamixel motors, run `pip install dynamixel-sdk`.") - else: - raise NotImplementedError("Implement mocking logic for new motor.") diff --git a/tests/test_robots.py b/tests/test_robots.py index d3160804..93c1a1fe 100644 --- a/tests/test_robots.py +++ b/tests/test_robots.py @@ -28,22 +28,26 @@ from pathlib import Path import pytest import torch -from lerobot import available_robots +from lerobot.common.robot_devices.robots.manipulator import ManipulatorRobot from lerobot.common.robot_devices.utils import RobotDeviceAlreadyConnectedError, RobotDeviceNotConnectedError -from tests.utils import make_robot, require_mock_robot, require_robot +from tests.utils import TEST_ROBOT_TYPES, make_robot, require_robot -def _test_robot(tmpdir, robot_type, mock): +@pytest.mark.parametrize("robot_type, mock", TEST_ROBOT_TYPES) +@require_robot +def test_robot(tmpdir, request, robot_type, mock): # TODO(rcadene): measure fps in nightly? # TODO(rcadene): test logs # TODO(rcadene): add compatibility with other robots - from lerobot.common.robot_devices.robots.manipulator import ManipulatorRobot if robot_type == "aloha" and mock: # To simplify unit test, we do not rerun manual calibration for Aloha mock=True. # Instead, we use the files from '.cache/calibration/aloha_default' overrides_calibration_dir = None else: + if mock: + request.getfixturevalue("patch_builtins_input") + # Create an empty calibration directory to trigger manual calibration tmpdir = Path(tmpdir) calibration_dir = tmpdir / robot_type @@ -72,7 +76,7 @@ def _test_robot(tmpdir, robot_type, mock): del robot # Test connecting (triggers manual calibration) - robot = make_robot(robot_type, overrides=overrides_calibration_dir) + robot = make_robot(robot_type, overrides=overrides_calibration_dir, mock=mock) robot.connect() assert robot.is_connected @@ -84,7 +88,7 @@ def _test_robot(tmpdir, robot_type, mock): del robot # Test teleop can run - robot = make_robot(robot_type, overrides=overrides_calibration_dir) + robot = make_robot(robot_type, overrides=overrides_calibration_dir, mock=mock) if overrides_calibration_dir is not None: robot.calibration_dir = calibration_dir robot.connect() @@ -133,15 +137,3 @@ def _test_robot(tmpdir, robot_type, mock): for name in robot.cameras: assert not robot.cameras[name].is_connected del robot - - -@pytest.mark.parametrize("robot_type", available_robots) -@require_mock_robot -def test_robot_mock(tmpdir, monkeypatch, robot_type): - _test_robot(tmpdir, robot_type, mock=True) - - -@pytest.mark.parametrize("robot_type", available_robots) -@require_robot -def test_robot(tmpdir, request, robot_type): - _test_robot(tmpdir, robot_type, mock=False) diff --git a/tests/utils.py b/tests/utils.py index 4bd47413..51356edf 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -15,7 +15,7 @@ # limitations under the License. import os import platform -import traceback +from copy import copy from functools import wraps import pytest @@ -207,14 +207,17 @@ def require_robot(func): # Access the pytest request context to get the mockeypatch fixture request = kwargs.get("request") robot_type = kwargs.get("robot_type") + mock = kwargs.get("mock") if robot_type is None: raise ValueError("The 'robot_type' must be an argument of the test function.") if request is None: raise ValueError("The 'request' fixture must be an argument of the test function.") + if mock is None: + raise ValueError("The 'mock' variable must be an argument of the test function.") # Run test with a real robot. Skip test if robot connection fails. - if not request.getfixturevalue("is_robot_available"): + if not mock and not request.getfixturevalue("is_robot_available"): pytest.skip(f"A {robot_type} robot is not available.") return func(*args, **kwargs) @@ -227,13 +230,16 @@ def require_camera(func): def wrapper(*args, **kwargs): request = kwargs.get("request") camera_type = kwargs.get("camera_type") + mock = kwargs.get("mock") if request is None: raise ValueError("The 'request' fixture must be an argument of the test function.") if camera_type is None: raise ValueError("The 'camera_type' must be an argument of the test function.") + if mock is None: + raise ValueError("The 'mock' variable must be an argument of the test function.") - if not request.getfixturevalue("is_camera_available"): + if not mock and not request.getfixturevalue("is_camera_available"): pytest.skip(f"A {camera_type} camera is not available.") return func(*args, **kwargs) @@ -247,13 +253,16 @@ def require_motor(func): # Access the pytest request context to get the mockeypatch fixture request = kwargs.get("request") motor_type = kwargs.get("motor_type") + mock = kwargs.get("mock") if request is None: raise ValueError("The 'request' fixture must be an argument of the test function.") if motor_type is None: raise ValueError("The 'motor_type' must be an argument of the test function.") + if mock is None: + raise ValueError("The 'mock' variable must be an argument of the test function.") - if not request.getfixturevalue("is_motor_available"): + if not mock and not request.getfixturevalue("is_motor_available"): pytest.skip(f"A {motor_type} motor is not available.") return func(*args, **kwargs) @@ -261,134 +270,37 @@ def require_motor(func): return wrapper -def require_mock_robot(func): - """ - Decorator over test function to mock the robot +def make_robot(robot_type: str, overrides: list[str] | None = None, mock=False) -> Robot: + if mock: + overrides = [] if overrides is None else copy(overrides) - The decorated function must have two arguments `monkeypatch` and `robot_type`. + if robot_type == "koch": + overrides.append("+leader_arms.main.mock=true") + overrides.append("+follower_arms.main.mock=true") + overrides.append("+cameras.laptop.mock=true") + overrides.append("+cameras.phone.mock=true") - Example of usage: - ```python - @pytest.mark.parametrize( - "robot_type", ["koch", "aloha"] - ) - @require_robot - def test_require_robot(request, robot_type): - pass - ``` - """ + elif robot_type == "koch_bimanual": + overrides.append("+leader_arms.left.mock=true") + overrides.append("+leader_arms.right.mock=true") + overrides.append("+follower_arms.left.mock=true") + overrides.append("+follower_arms.right.mock=true") + overrides.append("+cameras.laptop.mock=true") + overrides.append("+cameras.phone.mock=true") - @wraps(func) - def wrapper(*args, **kwargs): - # Access the pytest request context to get the mockeypatch fixture - monkeypatch = kwargs.get("monkeypatch") - robot_type = kwargs.get("robot_type") + elif robot_type == "aloha": + overrides.append("+leader_arms.left.mock=true") + overrides.append("+leader_arms.right.mock=true") + overrides.append("+follower_arms.left.mock=true") + overrides.append("+follower_arms.right.mock=true") + overrides.append("+cameras.cam_high.mock=true") + overrides.append("+cameras.cam_low.mock=true") + overrides.append("+cameras.cam_left_wrist.mock=true") + overrides.append("+cameras.cam_right_wrist.mock=true") - if monkeypatch is None: - raise ValueError("The 'monkeypatch' fixture must be an argument of the test function.") + else: + raise NotImplementedError(robot_type) - if robot_type is None: - raise ValueError("The 'robot_type' must be an argument of the test function.") - - mock_robot(monkeypatch, robot_type) - return func(*args, **kwargs) - - return wrapper - - -def require_mock_camera(func): - @wraps(func) - def wrapper(*args, **kwargs): - # Access the pytest request context to get the mockeypatch fixture - monkeypatch = kwargs.get("monkeypatch") - camera_type = kwargs.get("camera_type") - - if monkeypatch is None: - raise ValueError("The 'monkeypatch' fixture must be an argument of the test function.") - if camera_type is None: - raise ValueError("The 'camera_type' must be an argument of the test function.") - - mock_camera(monkeypatch, camera_type) - return func(*args, **kwargs) - - return wrapper - - -def mock_motor(**kwargs): - pass - - -def mock_robot(monkeypatch, robot_type): - if robot_type not in available_robots: - raise ValueError( - f"The camera type '{robot_type}' is not valid. Expected one of these '{available_robots}" - ) - - if robot_type in ["koch", "koch_bimanual"]: - mock_camera(monkeypatch, "opencv") - mock_motor(monkeypatch, "dynamixel") - elif robot_type == "aloha": - mock_camera(monkeypatch, "intelrealsense") - mock_motor(monkeypatch, "dynamixel") - else: - raise NotImplementedError("Implement mocking logic for new robot.") - - # To run calibration without user input - mock_builtins_input(monkeypatch) - - -def mock_camera(monkeypatch, camera_type): - if camera_type not in available_cameras: - raise ValueError( - f"The motor type '{camera_type}' is not valid. Expected one of these '{available_cameras}" - ) - - if camera_type == "opencv": - try: - import cv2 - - from tests.mock_opencv import MockVideoCapture - - monkeypatch.setattr(cv2, "VideoCapture", MockVideoCapture) - except ImportError: - traceback.print_exc() - pytest.skip("To avoid skipping tests mocking opencv cameras, run `pip install opencv-python`.") - - elif camera_type == "intelrealsense": - try: - import pyrealsense2 as rs - - from tests.mock_intelrealsense import ( - MockConfig, - MockContext, - MockFormat, - MockPipeline, - MockStream, - ) - - monkeypatch.setattr(rs, "config", MockConfig) - monkeypatch.setattr(rs, "pipeline", MockPipeline) - monkeypatch.setattr(rs, "stream", MockStream) - monkeypatch.setattr(rs, "format", MockFormat) - monkeypatch.setattr(rs, "context", MockContext) - except ImportError: - traceback.print_exc() - pytest.skip( - "To avoid skipping tests mocking intelrealsense cameras, run `pip install pyrealsense2`." - ) - else: - raise NotImplementedError("Implement mocking logic for new camera.") - - -def mock_builtins_input(monkeypatch): - def print_text(text=None): - if text is not None: - print(text) - - monkeypatch.setattr("builtins.input", print_text) - - -def make_robot(robot_type: str, overrides: list[str] | None = None) -> Robot: config_path = ROBOT_CONFIG_PATH_TEMPLATE.format(robot=robot_type) robot_cfg = init_hydra_config(config_path, overrides) robot = make_robot_from_cfg(robot_cfg)