add mock=False

This commit is contained in:
Remi Cadene
2024-09-27 12:02:14 +02:00
parent 8da08935d4
commit a7350d9b65
12 changed files with 344 additions and 420 deletions

View File

@@ -14,9 +14,7 @@ from dataclasses import dataclass, replace
from pathlib import Path from pathlib import Path
from threading import Thread from threading import Thread
import cv2
import numpy as np import numpy as np
import pyrealsense2 as rs
from PIL import Image from PIL import Image
from lerobot.common.robot_devices.utils import ( from lerobot.common.robot_devices.utils import (
@@ -29,14 +27,23 @@ from lerobot.scripts.control_robot import busy_wait
SERIAL_NUMBER_INDEX = 1 SERIAL_NUMBER_INDEX = 1
def find_camera_indices(raise_when_empty=True) -> list[int]: def find_camera_indices(raise_when_empty=True, mock=False) -> list[int]:
""" """
Find the serial numbers of the Intel RealSense cameras Find the serial numbers of the Intel RealSense cameras
connected to the computer. connected to the computer.
""" """
if mock:
from tests.mock_intelrealsense import (
RSCameraInfo,
RSContext,
)
else:
from pyrealsense2 import camera_info as RSCameraInfo # noqa: N812
from pyrealsense2 import context as RSContext # noqa: N812
camera_ids = [] camera_ids = []
for device in rs.context().query_devices(): for device in RSContext().query_devices():
serial_number = int(device.get_info(rs.camera_info(SERIAL_NUMBER_INDEX))) serial_number = int(device.get_info(RSCameraInfo(SERIAL_NUMBER_INDEX)))
camera_ids.append(serial_number) camera_ids.append(serial_number)
if raise_when_empty and len(camera_ids) == 0: if raise_when_empty and len(camera_ids) == 0:
@@ -65,18 +72,24 @@ def save_images_from_cameras(
width=None, width=None,
height=None, height=None,
record_time_s=2, record_time_s=2,
mock=False,
): ):
""" """
Initializes all the cameras and saves images to the directory. Useful to visually identify the camera Initializes all the cameras and saves images to the directory. Useful to visually identify the camera
associated to a given camera index. associated to a given camera index.
""" """
if camera_ids is None: if camera_ids is None:
camera_ids = find_camera_indices() camera_ids = find_camera_indices(mock=mock)
if mock:
from tests.mock_opencv import COLOR_RGB2BGR, cvtColor
else:
from cv2 import COLOR_RGB2BGR, cvtColor
print("Connecting cameras") print("Connecting cameras")
cameras = [] cameras = []
for cam_idx in camera_ids: for cam_idx in camera_ids:
camera = IntelRealSenseCamera(cam_idx, fps=fps, width=width, height=height) camera = IntelRealSenseCamera(cam_idx, fps=fps, width=width, height=height, mock=mock)
camera.connect() camera.connect()
print( print(
f"IntelRealSenseCamera({camera.camera_index}, fps={camera.fps}, width={camera.width}, height={camera.height}, color_mode={camera.color_mode})" f"IntelRealSenseCamera({camera.camera_index}, fps={camera.fps}, width={camera.width}, height={camera.height}, color_mode={camera.color_mode})"
@@ -104,7 +117,8 @@ def save_images_from_cameras(
image = camera.read() if fps is None else camera.async_read() image = camera.read() if fps is None else camera.async_read()
if image is None: if image is None:
print("No Frame") print("No Frame")
bgr_converted_image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
bgr_converted_image = cvtColor(image, COLOR_RGB2BGR)
executor.submit( executor.submit(
save_image, save_image,
@@ -150,6 +164,7 @@ class IntelRealSenseCameraConfig:
color_mode: str = "rgb" color_mode: str = "rgb"
use_depth: bool = False use_depth: bool = False
force_hardware_reset: bool = True force_hardware_reset: bool = True
mock: bool = False
def __post_init__(self): def __post_init__(self):
if self.color_mode not in ["rgb", "bgr"]: if self.color_mode not in ["rgb", "bgr"]:
@@ -231,6 +246,7 @@ class IntelRealSenseCamera:
self.color_mode = config.color_mode self.color_mode = config.color_mode
self.use_depth = config.use_depth self.use_depth = config.use_depth
self.force_hardware_reset = config.force_hardware_reset self.force_hardware_reset = config.force_hardware_reset
self.mock = config.mock
self.camera = None self.camera = None
self.is_connected = False self.is_connected = False
@@ -246,22 +262,35 @@ class IntelRealSenseCamera:
f"IntelRealSenseCamera({self.camera_index}) is already connected." f"IntelRealSenseCamera({self.camera_index}) is already connected."
) )
config = rs.config() if self.mock:
from tests.mock_intelrealsense import (
RSConfig,
RSFormat,
RSPipeline,
RSStream,
)
else:
from pyrealsense2 import config as RSConfig # noqa: N812
from pyrealsense2 import format as RSFormat # noqa: N812
from pyrealsense2 import pipeline as RSPipeline # noqa: N812
from pyrealsense2 import stream as RSStream # noqa: N812
config = RSConfig()
config.enable_device(str(self.camera_index)) config.enable_device(str(self.camera_index))
if self.fps and self.width and self.height: if self.fps and self.width and self.height:
# TODO(rcadene): can we set rgb8 directly? # TODO(rcadene): can we set rgb8 directly?
config.enable_stream(rs.stream.color, self.width, self.height, rs.format.rgb8, self.fps) config.enable_stream(RSStream.color, self.width, self.height, RSFormat.rgb8, self.fps)
else: else:
config.enable_stream(rs.stream.color) config.enable_stream(RSStream.color)
if self.use_depth: if self.use_depth:
if self.fps and self.width and self.height: if self.fps and self.width and self.height:
config.enable_stream(rs.stream.depth, self.width, self.height, rs.format.z16, self.fps) config.enable_stream(RSStream.depth, self.width, self.height, RSFormat.z16, self.fps)
else: else:
config.enable_stream(rs.stream.depth) config.enable_stream(RSStream.depth)
self.camera = rs.pipeline() self.camera = RSPipeline()
try: try:
profile = self.camera.start(config) profile = self.camera.start(config)
is_camera_open = True is_camera_open = True
@@ -282,7 +311,7 @@ class IntelRealSenseCamera:
raise OSError(f"Can't access IntelRealSenseCamera({self.camera_index}).") raise OSError(f"Can't access IntelRealSenseCamera({self.camera_index}).")
color_stream = profile.get_stream(rs.stream.color) color_stream = profile.get_stream(RSStream.color)
color_profile = color_stream.as_video_stream_profile() color_profile = color_stream.as_video_stream_profile()
actual_fps = color_profile.fps() actual_fps = color_profile.fps()
actual_width = color_profile.width() actual_width = color_profile.width()
@@ -343,7 +372,12 @@ class IntelRealSenseCamera:
# IntelRealSense uses RGB format as default (red, green, blue). # IntelRealSense uses RGB format as default (red, green, blue).
if requested_color_mode == "bgr": if requested_color_mode == "bgr":
color_image = cv2.cvtColor(color_image, cv2.COLOR_RGB2BGR) if self.mock:
from tests.mock_opencv import COLOR_RGB2BGR, cvtColor
else:
from cv2 import COLOR_RGB2BGR, cvtColor
color_image = cvtColor(color_image, COLOR_RGB2BGR)
h, w, _ = color_image.shape h, w, _ = color_image.shape
if h != self.height or w != self.width: if h != self.height or w != self.width:

View File

@@ -13,7 +13,6 @@ from dataclasses import dataclass, replace
from pathlib import Path from pathlib import Path
from threading import Thread from threading import Thread
import cv2
import numpy as np import numpy as np
from PIL import Image from PIL import Image
@@ -24,10 +23,6 @@ from lerobot.common.robot_devices.utils import (
) )
from lerobot.common.utils.utils import capture_timestamp_utc from lerobot.common.utils.utils import capture_timestamp_utc
# Use 1 thread to avoid blocking the main thread. Especially useful during data collection
# when other threads are used to save the images.
cv2.setNumThreads(1)
# The maximum opencv device index depends on your operating system. For instance, # The maximum opencv device index depends on your operating system. For instance,
# if you have 3 cameras, they should be associated to index 0, 1, and 2. This is the case # if you have 3 cameras, they should be associated to index 0, 1, and 2. This is the case
# on MacOS. However, on Ubuntu, the indices are different like 6, 16, 23. # on MacOS. However, on Ubuntu, the indices are different like 6, 16, 23.
@@ -36,7 +31,7 @@ cv2.setNumThreads(1)
MAX_OPENCV_INDEX = 60 MAX_OPENCV_INDEX = 60
def find_camera_indices(raise_when_empty=False, max_index_search_range=MAX_OPENCV_INDEX): def find_camera_indices(raise_when_empty=False, max_index_search_range=MAX_OPENCV_INDEX, mock=False):
if platform.system() == "Linux": if platform.system() == "Linux":
# Linux uses camera ports # Linux uses camera ports
print("Linux detected. Finding available camera indices through scanning '/dev/video*' ports") print("Linux detected. Finding available camera indices through scanning '/dev/video*' ports")
@@ -51,9 +46,14 @@ def find_camera_indices(raise_when_empty=False, max_index_search_range=MAX_OPENC
) )
possible_camera_ids = range(max_index_search_range) possible_camera_ids = range(max_index_search_range)
if mock:
from tests.mock_opencv import VideoCapture
else:
from cv2 import VideoCapture
camera_ids = [] camera_ids = []
for camera_idx in possible_camera_ids: for camera_idx in possible_camera_ids:
camera = cv2.VideoCapture(camera_idx) camera = VideoCapture(camera_idx)
is_open = camera.isOpened() is_open = camera.isOpened()
camera.release() camera.release()
@@ -78,19 +78,25 @@ def save_image(img_array, camera_index, frame_index, images_dir):
def save_images_from_cameras( def save_images_from_cameras(
images_dir: Path, camera_ids: list[int] | None = None, fps=None, width=None, height=None, record_time_s=2 images_dir: Path,
camera_ids: list[int] | None = None,
fps=None,
width=None,
height=None,
record_time_s=2,
mock=False,
): ):
""" """
Initializes all the cameras and saves images to the directory. Useful to visually identify the camera Initializes all the cameras and saves images to the directory. Useful to visually identify the camera
associated to a given camera index. associated to a given camera index.
""" """
if camera_ids is None: if camera_ids is None:
camera_ids = find_camera_indices() camera_ids = find_camera_indices(mock=mock)
print("Connecting cameras") print("Connecting cameras")
cameras = [] cameras = []
for cam_idx in camera_ids: for cam_idx in camera_ids:
camera = OpenCVCamera(cam_idx, fps=fps, width=width, height=height) camera = OpenCVCamera(cam_idx, fps=fps, width=width, height=height, mock=mock)
camera.connect() camera.connect()
print( print(
f"OpenCVCamera({camera.camera_index}, fps={camera.fps}, width={camera.width}, " f"OpenCVCamera({camera.camera_index}, fps={camera.fps}, width={camera.width}, "
@@ -156,6 +162,7 @@ class OpenCVCameraConfig:
width: int | None = None width: int | None = None
height: int | None = None height: int | None = None
color_mode: str = "rgb" color_mode: str = "rgb"
mock: bool = False
def __post_init__(self): def __post_init__(self):
if self.color_mode not in ["rgb", "bgr"]: if self.color_mode not in ["rgb", "bgr"]:
@@ -215,6 +222,7 @@ class OpenCVCamera:
self.width = config.width self.width = config.width
self.height = config.height self.height = config.height
self.color_mode = config.color_mode self.color_mode = config.color_mode
self.mock = config.mock
self.camera = None self.camera = None
self.is_connected = False self.is_connected = False
@@ -235,11 +243,31 @@ class OpenCVCamera:
if self.is_connected: if self.is_connected:
raise RobotDeviceAlreadyConnectedError(f"OpenCVCamera({self.camera_index}) is already connected.") raise RobotDeviceAlreadyConnectedError(f"OpenCVCamera({self.camera_index}) is already connected.")
if self.mock:
from tests.mock_opencv import (
CAP_PROP_FPS,
CAP_PROP_FRAME_HEIGHT,
CAP_PROP_FRAME_WIDTH,
VideoCapture,
)
else:
import cv2
from cv2 import (
CAP_PROP_FPS,
CAP_PROP_FRAME_HEIGHT,
CAP_PROP_FRAME_WIDTH,
VideoCapture,
)
# Use 1 thread to avoid blocking the main thread. Especially useful during data collection
# when other threads are used to save the images.
cv2.setNumThreads(1)
camera_idx = f"/dev/video{self.camera_index}" if platform.system() == "Linux" else self.camera_index camera_idx = f"/dev/video{self.camera_index}" if platform.system() == "Linux" else self.camera_index
with self.lock: with self.lock:
# First create a temporary camera trying to access `camera_index`, # First create a temporary camera trying to access `camera_index`,
# and verify it is a valid camera by calling `isOpened`. # and verify it is a valid camera by calling `isOpened`.
tmp_camera = cv2.VideoCapture(camera_idx) tmp_camera = VideoCapture(camera_idx)
is_camera_open = tmp_camera.isOpened() is_camera_open = tmp_camera.isOpened()
# Release camera to make it accessible for `find_camera_indices` # Release camera to make it accessible for `find_camera_indices`
tmp_camera.release() tmp_camera.release()
@@ -262,18 +290,18 @@ class OpenCVCamera:
# Note: For some unknown reason, calling `isOpened` blocks the camera which then # Note: For some unknown reason, calling `isOpened` blocks the camera which then
# needs to be re-created. # needs to be re-created.
with self.lock: with self.lock:
self.camera = cv2.VideoCapture(camera_idx) self.camera = VideoCapture(camera_idx)
if self.fps is not None: if self.fps is not None:
self.camera.set(cv2.CAP_PROP_FPS, self.fps) self.camera.set(CAP_PROP_FPS, self.fps)
if self.width is not None: if self.width is not None:
self.camera.set(cv2.CAP_PROP_FRAME_WIDTH, self.width) self.camera.set(CAP_PROP_FRAME_WIDTH, self.width)
if self.height is not None: if self.height is not None:
self.camera.set(cv2.CAP_PROP_FRAME_HEIGHT, self.height) self.camera.set(CAP_PROP_FRAME_HEIGHT, self.height)
actual_fps = self.camera.get(cv2.CAP_PROP_FPS) actual_fps = self.camera.get(CAP_PROP_FPS)
actual_width = self.camera.get(cv2.CAP_PROP_FRAME_WIDTH) actual_width = self.camera.get(CAP_PROP_FRAME_WIDTH)
actual_height = self.camera.get(cv2.CAP_PROP_FRAME_HEIGHT) actual_height = self.camera.get(CAP_PROP_FRAME_HEIGHT)
# Using `math.isclose` since actual fps can be a float (e.g. 29.9 instead of 30) # Using `math.isclose` since actual fps can be a float (e.g. 29.9 instead of 30)
if self.fps is not None and not math.isclose(self.fps, actual_fps, rel_tol=1e-3): if self.fps is not None and not math.isclose(self.fps, actual_fps, rel_tol=1e-3):
@@ -327,7 +355,12 @@ class OpenCVCamera:
# However, Deep Learning framework such as LeRobot uses RGB format as default to train neural networks, # However, Deep Learning framework such as LeRobot uses RGB format as default to train neural networks,
# so we convert the image color from BGR to RGB. # so we convert the image color from BGR to RGB.
if requested_color_mode == "rgb": if requested_color_mode == "rgb":
color_image = cv2.cvtColor(color_image, cv2.COLOR_BGR2RGB) if self.mock:
from tests.mock_opencv import COLOR_BGR2RGB, cvtColor
else:
from cv2 import COLOR_BGR2RGB, cvtColor
color_image = cvtColor(color_image, COLOR_BGR2RGB)
h, w, _ = color_image.shape h, w, _ = color_image.shape
if h != self.height or w != self.width: if h != self.height or w != self.width:

View File

@@ -8,17 +8,6 @@ from pathlib import Path
import numpy as np import numpy as np
import tqdm import tqdm
from dynamixel_sdk import (
COMM_SUCCESS,
DXL_HIBYTE,
DXL_HIWORD,
DXL_LOBYTE,
DXL_LOWORD,
GroupSyncRead,
GroupSyncWrite,
PacketHandler,
PortHandler,
)
from lerobot.common.robot_devices.utils import RobotDeviceAlreadyConnectedError, RobotDeviceNotConnectedError from lerobot.common.robot_devices.utils import RobotDeviceAlreadyConnectedError, RobotDeviceNotConnectedError
from lerobot.common.utils.utils import capture_timestamp_utc from lerobot.common.utils.utils import capture_timestamp_utc
@@ -154,6 +143,8 @@ MODEL_BAUDRATE_TABLE = {
NUM_READ_RETRY = 10 NUM_READ_RETRY = 10
NUM_WRITE_RETRY = 10 NUM_WRITE_RETRY = 10
COMM_SUCCESS = 0
def convert_degrees_to_steps(degrees: float | np.ndarray, models: str | list[str]) -> np.ndarray: def convert_degrees_to_steps(degrees: float | np.ndarray, models: str | list[str]) -> np.ndarray:
"""This function converts the degree range to the step range for indicating motors rotation. """This function converts the degree range to the step range for indicating motors rotation.
@@ -166,7 +157,17 @@ def convert_degrees_to_steps(degrees: float | np.ndarray, models: str | list[str
return steps return steps
def convert_to_bytes(value, bytes): def convert_to_bytes(value, bytes, mock=False):
if mock:
return value
from dynamixel_sdk import (
DXL_HIBYTE,
DXL_HIWORD,
DXL_LOBYTE,
DXL_LOWORD,
)
# Note: No need to convert back into unsigned int, since this byte preprocessing # Note: No need to convert back into unsigned int, since this byte preprocessing
# already handles it for us. # already handles it for us.
if bytes == 1: if bytes == 1:
@@ -333,9 +334,11 @@ class DynamixelMotorsBus:
motors: dict[str, tuple[int, str]], motors: dict[str, tuple[int, str]],
extra_model_control_table: dict[str, list[tuple]] | None = None, extra_model_control_table: dict[str, list[tuple]] | None = None,
extra_model_resolution: dict[str, int] | None = None, extra_model_resolution: dict[str, int] | None = None,
mock=False,
): ):
self.port = port self.port = port
self.motors = motors self.motors = motors
self.mock = mock
self.model_ctrl_table = deepcopy(MODEL_CONTROL_TABLE) self.model_ctrl_table = deepcopy(MODEL_CONTROL_TABLE)
if extra_model_control_table: if extra_model_control_table:
@@ -359,6 +362,11 @@ class DynamixelMotorsBus:
f"DynamixelMotorsBus({self.port}) is already connected. Do not call `motors_bus.connect()` twice." f"DynamixelMotorsBus({self.port}) is already connected. Do not call `motors_bus.connect()` twice."
) )
if self.mock:
from tests.mock_dynamixel import PacketHandler, PortHandler
else:
from dynamixel_sdk import PacketHandler, PortHandler
self.port_handler = PortHandler(self.port) self.port_handler = PortHandler(self.port)
self.packet_handler = PacketHandler(PROTOCOL_VERSION) self.packet_handler = PacketHandler(PROTOCOL_VERSION)
@@ -392,10 +400,17 @@ class DynamixelMotorsBus:
self.configure_motors() self.configure_motors()
def reconnect(self): def reconnect(self):
if self.mock:
from tests.mock_dynamixel import PacketHandler, PortHandler
else:
from dynamixel_sdk import PacketHandler, PortHandler
self.port_handler = PortHandler(self.port) self.port_handler = PortHandler(self.port)
self.packet_handler = PacketHandler(PROTOCOL_VERSION) self.packet_handler = PacketHandler(PROTOCOL_VERSION)
if not self.port_handler.openPort(): if not self.port_handler.openPort():
raise OSError(f"Failed to open port '{self.port}'.") raise OSError(f"Failed to open port '{self.port}'.")
self.is_connected = True self.is_connected = True
def are_motors_configured(self): def are_motors_configured(self):
@@ -781,6 +796,11 @@ class DynamixelMotorsBus:
return values return values
def _read_with_motor_ids(self, motor_models, motor_ids, data_name): def _read_with_motor_ids(self, motor_models, motor_ids, data_name):
if self.mock:
from tests.mock_dynamixel import GroupSyncRead
else:
from dynamixel_sdk import GroupSyncRead
return_list = True return_list = True
if not isinstance(motor_ids, list): if not isinstance(motor_ids, list):
return_list = False return_list = False
@@ -817,6 +837,11 @@ class DynamixelMotorsBus:
start_time = time.perf_counter() start_time = time.perf_counter()
if self.mock:
from tests.mock_dynamixel import GroupSyncRead
else:
from dynamixel_sdk import GroupSyncRead
if motor_names is None: if motor_names is None:
motor_names = self.motor_names motor_names = self.motor_names
@@ -876,6 +901,11 @@ class DynamixelMotorsBus:
return values return values
def _write_with_motor_ids(self, motor_models, motor_ids, data_name, values): def _write_with_motor_ids(self, motor_models, motor_ids, data_name, values):
if self.mock:
from tests.mock_dynamixel import GroupSyncWrite
else:
from dynamixel_sdk import GroupSyncWrite
if not isinstance(motor_ids, list): if not isinstance(motor_ids, list):
motor_ids = [motor_ids] motor_ids = [motor_ids]
if not isinstance(values, list): if not isinstance(values, list):
@@ -885,7 +915,7 @@ class DynamixelMotorsBus:
addr, bytes = self.model_ctrl_table[motor_models[0]][data_name] addr, bytes = self.model_ctrl_table[motor_models[0]][data_name]
group = GroupSyncWrite(self.port_handler, self.packet_handler, addr, bytes) group = GroupSyncWrite(self.port_handler, self.packet_handler, addr, bytes)
for idx, value in zip(motor_ids, values, strict=True): for idx, value in zip(motor_ids, values, strict=True):
data = convert_to_bytes(value, bytes) data = convert_to_bytes(value, bytes, self.mock)
group.addParam(idx, data) group.addParam(idx, data)
comm = group.txPacket() comm = group.txPacket()
@@ -903,6 +933,11 @@ class DynamixelMotorsBus:
start_time = time.perf_counter() start_time = time.perf_counter()
if self.mock:
from tests.mock_dynamixel import GroupSyncWrite
else:
from dynamixel_sdk import GroupSyncWrite
if motor_names is None: if motor_names is None:
motor_names = self.motor_names motor_names = self.motor_names
@@ -937,7 +972,7 @@ class DynamixelMotorsBus:
) )
for idx, value in zip(motor_ids, values, strict=True): for idx, value in zip(motor_ids, values, strict=True):
data = convert_to_bytes(value, bytes) data = convert_to_bytes(value, bytes, self.mock)
if init_group: if init_group:
self.group_writers[group_key].addParam(idx, data) self.group_writers[group_key].addParam(idx, data)
else: else:

View File

@@ -62,8 +62,8 @@ def is_camera_available(camera_type):
del camera del camera
return True return True
except Exception: except Exception:
traceback.print_exc()
print(f"\nA {camera_type} camera is not available.") print(f"\nA {camera_type} camera is not available.")
traceback.print_exc()
return False return False
@@ -83,3 +83,12 @@ def is_motor_available(motor_type):
traceback.print_exc() traceback.print_exc()
print(f"\nA {motor_type} motor is not available.") print(f"\nA {motor_type} motor is not available.")
return False return False
@pytest.fixture
def patch_builtins_input(monkeypatch):
def print_text(text=None):
if text is not None:
print(text)
monkeypatch.setattr("builtins.input", print_text)

View File

@@ -5,19 +5,20 @@ Warning: These mocked versions are minimalist. They do not exactly mock every be
from the original classes and functions (e.g. return types might be None instead of boolean). from the original classes and functions (e.g. return types might be None instead of boolean).
""" """
from dynamixel_sdk import COMM_SUCCESS # from dynamixel_sdk import COMM_SUCCESS
DEFAULT_BAUDRATE = 9_600 DEFAULT_BAUDRATE = 9_600
COMM_SUCCESS = 0 # tx or rx packet communication success
def mock_convert_to_bytes(value, bytes): def convert_to_bytes(value, bytes):
# TODO(rcadene): remove need to mock `convert_to_bytes` by implemented the inverse transform # TODO(rcadene): remove need to mock `convert_to_bytes` by implemented the inverse transform
# `convert_bytes_to_value` # `convert_bytes_to_value`
del bytes # unused del bytes # unused
return value return value
class MockPortHandler: class PortHandler:
def __init__(self, port): def __init__(self, port):
self.port = port self.port = port
# factory default baudrate # factory default baudrate
@@ -39,14 +40,14 @@ class MockPortHandler:
self.baudrate = baudrate self.baudrate = baudrate
class MockPacketHandler: class PacketHandler:
def __init__(self, protocol_version): def __init__(self, protocol_version):
del protocol_version # unused del protocol_version # unused
# Use packet_handler.data to communicate across Read and Write # Use packet_handler.data to communicate across Read and Write
self.data = {} self.data = {}
class MockGroupSyncRead: class GroupSyncRead:
def __init__(self, port_handler, packet_handler, address, bytes): def __init__(self, port_handler, packet_handler, address, bytes):
self.packet_handler = packet_handler self.packet_handler = packet_handler
@@ -71,7 +72,7 @@ class MockGroupSyncRead:
return self.packet_handler.data[index][address] return self.packet_handler.data[index][address]
class MockGroupSyncWrite: class GroupSyncWrite:
def __init__(self, port_handler, packet_handler, address, bytes): def __init__(self, port_handler, packet_handler, address, bytes):
self.packet_handler = packet_handler self.packet_handler = packet_handler
self.address = address self.address = address

View File

@@ -3,33 +3,33 @@ import enum
import numpy as np import numpy as np
class MockStream(enum.Enum): class RSStream(enum.Enum):
color = 0 color = 0
depth = 1 depth = 1
class MockFormat(enum.Enum): class RSFormat(enum.Enum):
rgb8 = 0 rgb8 = 0
z16 = 1 z16 = 1
class MockConfig: class RSConfig:
def enable_device(self, device_id: str): def enable_device(self, device_id: str):
self.device_enabled = device_id self.device_enabled = device_id
def enable_stream( def enable_stream(
self, stream_type: MockStream, width=None, height=None, color_format: MockFormat = None, fps=None self, stream_type: RSStream, width=None, height=None, color_format: RSFormat = None, fps=None
): ):
self.stream_type = stream_type self.stream_type = stream_type
# Overwrite default values when possible # Overwrite default values when possible
self.width = 848 if width is None else width self.width = 848 if width is None else width
self.height = 480 if height is None else height self.height = 480 if height is None else height
self.color_format = MockFormat.rgb8 if color_format is None else color_format self.color_format = RSFormat.rgb8 if color_format is None else color_format
self.fps = 30 if fps is None else fps self.fps = 30 if fps is None else fps
class MockColorProfile: class RSColorProfile:
def __init__(self, config: MockConfig): def __init__(self, config: RSConfig):
self.config = config self.config = config
def fps(self): def fps(self):
@@ -42,32 +42,32 @@ class MockColorProfile:
return self.config.height return self.config.height
class MockColorStream: class RSColorStream:
def __init__(self, config: MockConfig): def __init__(self, config: RSConfig):
self.config = config self.config = config
def as_video_stream_profile(self): def as_video_stream_profile(self):
return MockColorProfile(self.config) return RSColorProfile(self.config)
class MockProfile: class RSProfile:
def __init__(self, config: MockConfig): def __init__(self, config: RSConfig):
self.config = config self.config = config
def get_stream(self, color_format: MockFormat): def get_stream(self, color_format: RSFormat):
del color_format # unused del color_format # unused
return MockColorStream(self.config) return RSColorStream(self.config)
class MockPipeline: class RSPipeline:
def __init__(self): def __init__(self):
self.started = False self.started = False
self.config = None self.config = None
def start(self, config: MockConfig): def start(self, config: RSConfig):
self.started = True self.started = True
self.config = config self.config = config
return MockProfile(self.config) return RSProfile(self.config)
def stop(self): def stop(self):
if not self.started: if not self.started:
@@ -77,22 +77,22 @@ class MockPipeline:
def wait_for_frames(self, timeout_ms=50000): def wait_for_frames(self, timeout_ms=50000):
del timeout_ms # unused del timeout_ms # unused
return MockFrames(self.config) return RSFrames(self.config)
class MockFrames: class RSFrames:
def __init__(self, config: MockConfig): def __init__(self, config: RSConfig):
self.config = config self.config = config
def get_color_frame(self): def get_color_frame(self):
return MockColorFrame(self.config) return RSColorFrame(self.config)
def get_depth_frame(self): def get_depth_frame(self):
return MockDepthFrame(self.config) return RSDepthFrame(self.config)
class MockColorFrame: class RSColorFrame:
def __init__(self, config: MockConfig): def __init__(self, config: RSConfig):
self.config = config self.config = config
def get_data(self): def get_data(self):
@@ -102,15 +102,15 @@ class MockColorFrame:
return data return data
class MockDepthFrame: class RSDepthFrame:
def __init__(self, config: MockConfig): def __init__(self, config: RSConfig):
self.config = config self.config = config
def get_data(self): def get_data(self):
return np.ones((self.config.height, self.config.width), dtype=np.uint16) return np.ones((self.config.height, self.config.width), dtype=np.uint16)
class MockDevice: class RSDevice:
def __init__(self): def __init__(self):
pass pass
@@ -120,9 +120,15 @@ class MockDevice:
return "123456789" return "123456789"
class MockContext: class RSContext:
def __init__(self): def __init__(self):
pass pass
def query_devices(self): def query_devices(self):
return [MockDevice()] return [RSDevice()]
class RSCameraInfo:
def __init__(self, serial_number):
del serial_number
pass

View File

@@ -1,20 +1,31 @@
from functools import cache from functools import cache
import cv2
import numpy as np import numpy as np
CAP_PROP_FPS = 5
CAP_PROP_FRAME_WIDTH = 3
CAP_PROP_FRAME_HEIGHT = 4
COLOR_BGR2RGB = 4
@cache @cache
def _generate_image(width: int, height: int): def _generate_image(width: int, height: int):
return np.random.randint(0, 256, size=(height, width, 3), dtype=np.uint8) return np.random.randint(0, 256, size=(height, width, 3), dtype=np.uint8)
class MockVideoCapture: def cvtColor(color_image, color_convertion): # noqa: N802
if color_convertion == COLOR_BGR2RGB:
return color_image[:, :, [2, 1, 0]]
else:
raise NotImplementedError(color_convertion)
class VideoCapture:
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
self._mock_dict = { self._mock_dict = {
cv2.CAP_PROP_FPS: 30, CAP_PROP_FPS: 30,
cv2.CAP_PROP_FRAME_WIDTH: 640, CAP_PROP_FRAME_WIDTH: 640,
cv2.CAP_PROP_FRAME_HEIGHT: 480, CAP_PROP_FRAME_HEIGHT: 480,
} }
self._is_opened = True self._is_opened = True
@@ -32,17 +43,17 @@ class MockVideoCapture:
raise RuntimeError("Camera is not opened") raise RuntimeError("Camera is not opened")
value = self._mock_dict[propId] value = self._mock_dict[propId]
if value == 0: if value == 0:
if propId == cv2.CAP_PROP_FRAME_HEIGHT: if propId == CAP_PROP_FRAME_HEIGHT:
value = 480 value = 480
elif propId == cv2.CAP_PROP_FRAME_WIDTH: elif propId == CAP_PROP_FRAME_WIDTH:
value = 640 value = 640
return value return value
def read(self): def read(self):
if not self._is_opened: if not self._is_opened:
raise RuntimeError("Camera is not opened") raise RuntimeError("Camera is not opened")
h = self.get(cv2.CAP_PROP_FRAME_HEIGHT) h = self.get(CAP_PROP_FRAME_HEIGHT)
w = self.get(cv2.CAP_PROP_FRAME_WIDTH) w = self.get(CAP_PROP_FRAME_WIDTH)
ret = True ret = True
return ret, _generate_image(width=w, height=h) return ret, _generate_image(width=w, height=h)

View File

@@ -24,9 +24,8 @@ pytest -sx 'tests/test_cameras.py::test_camera[intelrealsense-True]'
import numpy as np import numpy as np
import pytest import pytest
from lerobot import available_cameras
from lerobot.common.robot_devices.utils import RobotDeviceAlreadyConnectedError, RobotDeviceNotConnectedError from lerobot.common.robot_devices.utils import RobotDeviceAlreadyConnectedError, RobotDeviceNotConnectedError
from tests.utils import make_camera, require_camera, require_mock_camera from tests.utils import TEST_CAMERA_TYPES, make_camera, require_camera
# Maximum absolute difference between two consecutive images recored by a camera. # Maximum absolute difference between two consecutive images recored by a camera.
# This value differs with respect to the camera. # This value differs with respect to the camera.
@@ -37,7 +36,9 @@ def compute_max_pixel_difference(first_image, second_image):
return np.abs(first_image.astype(float) - second_image.astype(float)).max() return np.abs(first_image.astype(float) - second_image.astype(float)).max()
def _test_camera(camera_type): @pytest.mark.parametrize("camera_type, mock", TEST_CAMERA_TYPES)
@require_camera
def test_camera(request, camera_type, mock):
"""Test assumes that `camera.read()` returns the same image when called multiple times in a row. """Test assumes that `camera.read()` returns the same image when called multiple times in a row.
So the environment should not change (you shouldnt be in front of the camera) and the camera should not be moving. So the environment should not change (you shouldnt be in front of the camera) and the camera should not be moving.
@@ -47,8 +48,11 @@ def _test_camera(camera_type):
# TODO(rcadene): measure fps in nightly? # TODO(rcadene): measure fps in nightly?
# TODO(rcadene): test logs # TODO(rcadene): test logs
if camera_type == "opencv" and not mock:
pytest.skip("TODO(rcadene): fix test for opencv physical camera")
# Test instantiating # Test instantiating
camera = make_camera(camera_type) camera = make_camera(camera_type, mock=mock)
# Test reading, async reading, disconnecting before connecting raises an error # Test reading, async reading, disconnecting before connecting raises an error
with pytest.raises(RobotDeviceNotConnectedError): with pytest.raises(RobotDeviceNotConnectedError):
@@ -62,7 +66,7 @@ def _test_camera(camera_type):
del camera del camera
# Test connecting # Test connecting
camera = make_camera(camera_type) camera = make_camera(camera_type, mock=mock)
camera.connect() camera.connect()
assert camera.is_connected assert camera.is_connected
assert camera.fps is not None assert camera.fps is not None
@@ -102,12 +106,12 @@ def _test_camera(camera_type):
assert camera.thread is None assert camera.thread is None
# Test disconnecting with `__del__` # Test disconnecting with `__del__`
camera = make_camera(camera_type) camera = make_camera(camera_type, mock=mock)
camera.connect() camera.connect()
del camera del camera
# Test acquiring a bgr image # Test acquiring a bgr image
camera = make_camera(camera_type, color_mode="bgr") camera = make_camera(camera_type, color_mode="bgr", mock=mock)
camera.connect() camera.connect()
assert camera.color_mode == "bgr" assert camera.color_mode == "bgr"
bgr_color_image = camera.read() bgr_color_image = camera.read()
@@ -120,7 +124,7 @@ def _test_camera(camera_type):
# TODO(rcadene): Add a test for a camera that supports fps=60 # TODO(rcadene): Add a test for a camera that supports fps=60
# Test width and height can be set # Test width and height can be set
camera = make_camera(camera_type, fps=30, width=1280, height=720) camera = make_camera(camera_type, fps=30, width=1280, height=720, mock=mock)
camera.connect() camera.connect()
assert camera.fps == 30 assert camera.fps == 30
assert camera.width == 1280 assert camera.width == 1280
@@ -133,41 +137,19 @@ def _test_camera(camera_type):
del camera del camera
# Test not supported width and height raise an error # Test not supported width and height raise an error
camera = make_camera(camera_type, fps=30, width=0, height=0) camera = make_camera(camera_type, fps=30, width=0, height=0, mock=mock)
with pytest.raises(OSError): with pytest.raises(OSError):
camera.connect() camera.connect()
del camera del camera
def _test_save_images_from_cameras(tmpdir, camera_type): @pytest.mark.parametrize("camera_type, mock", TEST_CAMERA_TYPES)
@require_camera
def test_save_images_from_cameras(tmpdir, request, camera_type, mock):
# TODO(rcadene): refactor # TODO(rcadene): refactor
if camera_type == "opencv": if camera_type == "opencv":
from lerobot.common.robot_devices.cameras.opencv import save_images_from_cameras from lerobot.common.robot_devices.cameras.opencv import save_images_from_cameras
elif camera_type == "intelrealsense": elif camera_type == "intelrealsense":
from lerobot.common.robot_devices.cameras.intelrealsense import save_images_from_cameras from lerobot.common.robot_devices.cameras.intelrealsense import save_images_from_cameras
save_images_from_cameras(tmpdir, record_time_s=1) save_images_from_cameras(tmpdir, record_time_s=1, mock=mock)
@pytest.mark.parametrize("camera_type", available_cameras)
@require_mock_camera
def test_camera_mock(monkeypatch, camera_type):
_test_camera(camera_type)
@pytest.mark.parametrize("camera_type", available_cameras)
@require_camera
def test_camera(request, camera_type):
_test_camera(camera_type)
@pytest.mark.parametrize("camera_type", available_cameras)
@require_mock_camera
def test_save_images_from_cameras_mock(tmpdir, monkeypatch, camera_type):
_test_save_images_from_cameras(tmpdir, camera_type)
@pytest.mark.parametrize("camera_type", available_cameras)
@require_camera
def test_save_images_from_cameras(tmpdir, request, camera_type):
_test_save_images_from_cameras(tmpdir, camera_type)

View File

@@ -27,33 +27,44 @@ from pathlib import Path
import pytest import pytest
from lerobot import available_robots
from lerobot.common.policies.factory import make_policy from lerobot.common.policies.factory import make_policy
from lerobot.common.utils.utils import init_hydra_config from lerobot.common.utils.utils import init_hydra_config
from lerobot.scripts.control_robot import calibrate, get_available_arms, record, replay, teleoperate from lerobot.scripts.control_robot import calibrate, get_available_arms, record, replay, teleoperate
from tests.test_robots import make_robot from tests.test_robots import make_robot
from tests.utils import DEFAULT_CONFIG_PATH, DEVICE, require_mock_robot, require_robot from tests.utils import DEFAULT_CONFIG_PATH, DEVICE, TEST_ROBOT_TYPES, require_robot
def _test_teleoperate(robot_type): @pytest.mark.parametrize("robot_type, mock", TEST_ROBOT_TYPES)
robot = make_robot(robot_type) @require_robot
def test_teleoperate(request, robot_type, mock):
robot = make_robot(robot_type, mock=mock)
teleoperate(robot, teleop_time_s=1) teleoperate(robot, teleop_time_s=1)
teleoperate(robot, fps=30, teleop_time_s=1) teleoperate(robot, fps=30, teleop_time_s=1)
teleoperate(robot, fps=60, teleop_time_s=1) teleoperate(robot, fps=60, teleop_time_s=1)
del robot del robot
def _test_calibrate(robot_type): @pytest.mark.parametrize("robot_type, mock", TEST_ROBOT_TYPES)
robot = make_robot(robot_type) @require_robot
def test_calibrate(request, robot_type, mock):
if mock:
request.getfixturevalue("patch_builtins_input")
robot = make_robot(robot_type, mock=mock)
calibrate(robot, arms=get_available_arms(robot)) calibrate(robot, arms=get_available_arms(robot))
del robot del robot
def _test_record_without_cameras(tmpdir, robot_type): @pytest.mark.parametrize("robot_type, mock", TEST_ROBOT_TYPES)
@require_robot
def test_record_without_cameras(tmpdir, request, robot_type, mock):
if mock:
request.getfixturevalue("patch_builtins_input")
root = Path(tmpdir) root = Path(tmpdir)
repo_id = "lerobot/debug" repo_id = "lerobot/debug"
robot = make_robot(robot_type, overrides=["~cameras"]) robot = make_robot(robot_type, overrides=["~cameras"], mock=mock)
record( record(
robot, robot,
fps=30, fps=30,
@@ -68,14 +79,19 @@ def _test_record_without_cameras(tmpdir, robot_type):
) )
def _test_record_and_replay_and_policy(tmpdir, robot_type): @pytest.mark.parametrize("robot_type, mock", TEST_ROBOT_TYPES)
@require_robot
def test_record_and_replay_and_policy(tmpdir, request, robot_type, mock):
if mock:
request.getfixturevalue("patch_builtins_input")
env_name = "koch_real" env_name = "koch_real"
policy_name = "act_koch_real" policy_name = "act_koch_real"
root = Path(tmpdir) root = Path(tmpdir)
repo_id = "lerobot/debug" repo_id = "lerobot/debug"
robot = make_robot(robot_type) robot = make_robot(robot_type, mock=mock)
dataset = record( dataset = record(
robot, robot,
fps=30, fps=30,
@@ -115,51 +131,3 @@ def _test_record_and_replay_and_policy(tmpdir, robot_type):
) )
del robot del robot
@pytest.mark.parametrize("robot_type", available_robots)
@require_mock_robot
def test_teleoperate_mock(monkeypatch, robot_type):
_test_teleoperate(robot_type)
@pytest.mark.parametrize("robot_type", available_robots)
@require_robot
def test_teleoperate(request, robot_type):
_test_teleoperate(robot_type)
@pytest.mark.parametrize("robot_type", available_robots)
@require_mock_robot
def test_calibrate_mock(monkeypatch, robot_type):
_test_calibrate(robot_type)
@pytest.mark.parametrize("robot_type", available_robots)
@require_robot
def test_calibrate(request, robot_type):
_test_calibrate(robot_type)
@pytest.mark.parametrize("robot_type", available_robots)
@require_mock_robot
def test_record_without_cameras_mock(tmpdir, monkeypatch, robot_type):
_test_record_without_cameras(tmpdir, robot_type)
@pytest.mark.parametrize("robot_type", available_robots)
@require_robot
def test_record_without_cameras(tmpdir, request, robot_type):
_test_record_without_cameras(tmpdir, robot_type)
@pytest.mark.parametrize("robot_type", available_robots)
@require_mock_robot
def test_record_and_replay_and_policy_mock(tmpdir, monkeypatch, robot_type):
_test_record_and_replay_and_policy(tmpdir, robot_type)
@pytest.mark.parametrize("robot_type", available_robots)
@require_robot
def test_record_and_replay_and_policy(tmpdir, request, robot_type):
_test_record_and_replay_and_policy(tmpdir, robot_type)

View File

@@ -26,20 +26,35 @@ pytest -sx 'tests/test_motors.py::test_motors_bus[dynamixel-True]'
# TODO(rcadene): add compatibility with other motors bus # TODO(rcadene): add compatibility with other motors bus
import time import time
import traceback
import numpy as np import numpy as np
import pytest import pytest
from lerobot import available_motors from lerobot.common.robot_devices.motors.dynamixel import find_port
from lerobot.common.robot_devices.utils import RobotDeviceAlreadyConnectedError, RobotDeviceNotConnectedError from lerobot.common.robot_devices.utils import RobotDeviceAlreadyConnectedError, RobotDeviceNotConnectedError
from tests.utils import make_motors_bus, mock_builtins_input, require_motor from tests.utils import TEST_MOTOR_TYPES, make_motors_bus, require_motor
def _test_configure_motors_all_ids_1(motor_type): @pytest.mark.parametrize("motor_type, mock", TEST_MOTOR_TYPES)
@require_motor
def test_find_port_mock(request, motor_type, mock):
if mock:
request.getfixturevalue("patch_builtins_input")
with pytest.raises(OSError):
find_port()
else:
find_port()
@pytest.mark.parametrize("motor_type, mock", TEST_MOTOR_TYPES)
@require_motor
def test_configure_motors_all_ids_1_mock(request, motor_type, mock):
if mock:
request.getfixturevalue("patch_builtins_input")
input("Are you sure you want to re-configure the motors? Press enter to continue...") input("Are you sure you want to re-configure the motors? Press enter to continue...")
# This test expect the configuration was already correct. # This test expect the configuration was already correct.
motors_bus = make_motors_bus(motor_type) motors_bus = make_motors_bus(motor_type, mock=mock)
motors_bus.connect() motors_bus.connect()
motors_bus.write("Baud_Rate", [0] * len(motors_bus.motors)) motors_bus.write("Baud_Rate", [0] * len(motors_bus.motors))
motors_bus.set_bus_baudrate(9_600) motors_bus.set_bus_baudrate(9_600)
@@ -47,14 +62,19 @@ def _test_configure_motors_all_ids_1(motor_type):
del motors_bus del motors_bus
# Test configure # Test configure
motors_bus = make_motors_bus(motor_type) motors_bus = make_motors_bus(motor_type, mock=mock)
motors_bus.connect() motors_bus.connect()
assert motors_bus.are_motors_configured() assert motors_bus.are_motors_configured()
del motors_bus del motors_bus
def _test_motors_bus(motor_type): @pytest.mark.parametrize("motor_type, mock", TEST_MOTOR_TYPES)
motors_bus = make_motors_bus(motor_type) @require_motor
def test_motors_bus(request, motor_type, mock):
if mock:
request.getfixturevalue("patch_builtins_input")
motors_bus = make_motors_bus(motor_type, mock=mock)
# Test reading and writting before connecting raises an error # Test reading and writting before connecting raises an error
with pytest.raises(RobotDeviceNotConnectedError): with pytest.raises(RobotDeviceNotConnectedError):
@@ -68,7 +88,7 @@ def _test_motors_bus(motor_type):
del motors_bus del motors_bus
# Test connecting # Test connecting
motors_bus = make_motors_bus(motor_type) motors_bus = make_motors_bus(motor_type, mock=mock)
motors_bus.connect() motors_bus.connect()
# Test connecting twice raises an error # Test connecting twice raises an error
@@ -110,82 +130,3 @@ def _test_motors_bus(motor_type):
time.sleep(1) time.sleep(1)
new_values = motors_bus.read("Present_Position") new_values = motors_bus.read("Present_Position")
assert (new_values == values).all() assert (new_values == values).all()
@pytest.mark.parametrize("motor_type", available_motors)
def test_find_port_mock(monkeypatch, motor_type):
mock_motor(monkeypatch, motor_type)
from lerobot.common.robot_devices.motors.dynamixel import find_port
# To run find_port without user input
mock_builtins_input(monkeypatch)
with pytest.raises(OSError):
find_port()
@pytest.mark.parametrize("motor_type", available_motors)
@require_motor
def test_find_port(request, motor_type):
from lerobot.common.robot_devices.motors.dynamixel import find_port
find_port()
@pytest.mark.parametrize("motor_type", available_motors)
def test_configure_motors_all_ids_1_mock(monkeypatch, motor_type):
mock_motor(monkeypatch, motor_type)
_test_configure_motors_all_ids_1(motor_type)
@pytest.mark.parametrize("motor_type", available_motors)
@require_motor
def test_configure_motors_all_ids_1(request, motor_type):
_test_configure_motors_all_ids_1(motor_type)
@pytest.mark.parametrize("motor_type", available_motors)
def test_motors_bus_mock(monkeypatch, motor_type):
mock_motor(monkeypatch, motor_type)
_test_motors_bus(motor_type)
@pytest.mark.parametrize("motor_type", available_motors)
@require_motor
def test_motors_bus(request, motor_type):
_test_motors_bus(motor_type)
def mock_motor(monkeypatch, motor_type):
if motor_type not in available_motors:
raise ValueError(
f"The motor type '{motor_type}' is not valid. Expected one of these '{available_motors}"
)
if motor_type == "dynamixel":
try:
import dynamixel_sdk
from tests.mock_dynamixel import (
MockGroupSyncRead,
MockGroupSyncWrite,
MockPacketHandler,
MockPortHandler,
mock_convert_to_bytes,
)
monkeypatch.setattr(dynamixel_sdk, "GroupSyncRead", MockGroupSyncRead)
monkeypatch.setattr(dynamixel_sdk, "GroupSyncWrite", MockGroupSyncWrite)
monkeypatch.setattr(dynamixel_sdk, "PacketHandler", MockPacketHandler)
monkeypatch.setattr(dynamixel_sdk, "PortHandler", MockPortHandler)
# Import dynamixel AFTER mocking dynamixel_sdk to use mocked classes
from lerobot.common.robot_devices.motors import dynamixel
# TODO(rcadene): remove need to mock `convert_to_bytes` by implemented the inverse transform
# `convert_bytes_to_value`
monkeypatch.setattr(dynamixel, "convert_to_bytes", mock_convert_to_bytes)
except ImportError:
traceback.print_exc()
pytest.skip("To avoid skipping tests mocking dynamixel motors, run `pip install dynamixel-sdk`.")
else:
raise NotImplementedError("Implement mocking logic for new motor.")

View File

@@ -28,22 +28,26 @@ from pathlib import Path
import pytest import pytest
import torch import torch
from lerobot import available_robots from lerobot.common.robot_devices.robots.manipulator import ManipulatorRobot
from lerobot.common.robot_devices.utils import RobotDeviceAlreadyConnectedError, RobotDeviceNotConnectedError from lerobot.common.robot_devices.utils import RobotDeviceAlreadyConnectedError, RobotDeviceNotConnectedError
from tests.utils import make_robot, require_mock_robot, require_robot from tests.utils import TEST_ROBOT_TYPES, make_robot, require_robot
def _test_robot(tmpdir, robot_type, mock): @pytest.mark.parametrize("robot_type, mock", TEST_ROBOT_TYPES)
@require_robot
def test_robot(tmpdir, request, robot_type, mock):
# TODO(rcadene): measure fps in nightly? # TODO(rcadene): measure fps in nightly?
# TODO(rcadene): test logs # TODO(rcadene): test logs
# TODO(rcadene): add compatibility with other robots # TODO(rcadene): add compatibility with other robots
from lerobot.common.robot_devices.robots.manipulator import ManipulatorRobot
if robot_type == "aloha" and mock: if robot_type == "aloha" and mock:
# To simplify unit test, we do not rerun manual calibration for Aloha mock=True. # To simplify unit test, we do not rerun manual calibration for Aloha mock=True.
# Instead, we use the files from '.cache/calibration/aloha_default' # Instead, we use the files from '.cache/calibration/aloha_default'
overrides_calibration_dir = None overrides_calibration_dir = None
else: else:
if mock:
request.getfixturevalue("patch_builtins_input")
# Create an empty calibration directory to trigger manual calibration # Create an empty calibration directory to trigger manual calibration
tmpdir = Path(tmpdir) tmpdir = Path(tmpdir)
calibration_dir = tmpdir / robot_type calibration_dir = tmpdir / robot_type
@@ -72,7 +76,7 @@ def _test_robot(tmpdir, robot_type, mock):
del robot del robot
# Test connecting (triggers manual calibration) # Test connecting (triggers manual calibration)
robot = make_robot(robot_type, overrides=overrides_calibration_dir) robot = make_robot(robot_type, overrides=overrides_calibration_dir, mock=mock)
robot.connect() robot.connect()
assert robot.is_connected assert robot.is_connected
@@ -84,7 +88,7 @@ def _test_robot(tmpdir, robot_type, mock):
del robot del robot
# Test teleop can run # Test teleop can run
robot = make_robot(robot_type, overrides=overrides_calibration_dir) robot = make_robot(robot_type, overrides=overrides_calibration_dir, mock=mock)
if overrides_calibration_dir is not None: if overrides_calibration_dir is not None:
robot.calibration_dir = calibration_dir robot.calibration_dir = calibration_dir
robot.connect() robot.connect()
@@ -133,15 +137,3 @@ def _test_robot(tmpdir, robot_type, mock):
for name in robot.cameras: for name in robot.cameras:
assert not robot.cameras[name].is_connected assert not robot.cameras[name].is_connected
del robot del robot
@pytest.mark.parametrize("robot_type", available_robots)
@require_mock_robot
def test_robot_mock(tmpdir, monkeypatch, robot_type):
_test_robot(tmpdir, robot_type, mock=True)
@pytest.mark.parametrize("robot_type", available_robots)
@require_robot
def test_robot(tmpdir, request, robot_type):
_test_robot(tmpdir, robot_type, mock=False)

View File

@@ -15,7 +15,7 @@
# limitations under the License. # limitations under the License.
import os import os
import platform import platform
import traceback from copy import copy
from functools import wraps from functools import wraps
import pytest import pytest
@@ -207,14 +207,17 @@ def require_robot(func):
# Access the pytest request context to get the mockeypatch fixture # Access the pytest request context to get the mockeypatch fixture
request = kwargs.get("request") request = kwargs.get("request")
robot_type = kwargs.get("robot_type") robot_type = kwargs.get("robot_type")
mock = kwargs.get("mock")
if robot_type is None: if robot_type is None:
raise ValueError("The 'robot_type' must be an argument of the test function.") raise ValueError("The 'robot_type' must be an argument of the test function.")
if request is None: if request is None:
raise ValueError("The 'request' fixture must be an argument of the test function.") raise ValueError("The 'request' fixture must be an argument of the test function.")
if mock is None:
raise ValueError("The 'mock' variable must be an argument of the test function.")
# Run test with a real robot. Skip test if robot connection fails. # Run test with a real robot. Skip test if robot connection fails.
if not request.getfixturevalue("is_robot_available"): if not mock and not request.getfixturevalue("is_robot_available"):
pytest.skip(f"A {robot_type} robot is not available.") pytest.skip(f"A {robot_type} robot is not available.")
return func(*args, **kwargs) return func(*args, **kwargs)
@@ -227,13 +230,16 @@ def require_camera(func):
def wrapper(*args, **kwargs): def wrapper(*args, **kwargs):
request = kwargs.get("request") request = kwargs.get("request")
camera_type = kwargs.get("camera_type") camera_type = kwargs.get("camera_type")
mock = kwargs.get("mock")
if request is None: if request is None:
raise ValueError("The 'request' fixture must be an argument of the test function.") raise ValueError("The 'request' fixture must be an argument of the test function.")
if camera_type is None: if camera_type is None:
raise ValueError("The 'camera_type' must be an argument of the test function.") raise ValueError("The 'camera_type' must be an argument of the test function.")
if mock is None:
raise ValueError("The 'mock' variable must be an argument of the test function.")
if not request.getfixturevalue("is_camera_available"): if not mock and not request.getfixturevalue("is_camera_available"):
pytest.skip(f"A {camera_type} camera is not available.") pytest.skip(f"A {camera_type} camera is not available.")
return func(*args, **kwargs) return func(*args, **kwargs)
@@ -247,13 +253,16 @@ def require_motor(func):
# Access the pytest request context to get the mockeypatch fixture # Access the pytest request context to get the mockeypatch fixture
request = kwargs.get("request") request = kwargs.get("request")
motor_type = kwargs.get("motor_type") motor_type = kwargs.get("motor_type")
mock = kwargs.get("mock")
if request is None: if request is None:
raise ValueError("The 'request' fixture must be an argument of the test function.") raise ValueError("The 'request' fixture must be an argument of the test function.")
if motor_type is None: if motor_type is None:
raise ValueError("The 'motor_type' must be an argument of the test function.") raise ValueError("The 'motor_type' must be an argument of the test function.")
if mock is None:
raise ValueError("The 'mock' variable must be an argument of the test function.")
if not request.getfixturevalue("is_motor_available"): if not mock and not request.getfixturevalue("is_motor_available"):
pytest.skip(f"A {motor_type} motor is not available.") pytest.skip(f"A {motor_type} motor is not available.")
return func(*args, **kwargs) return func(*args, **kwargs)
@@ -261,134 +270,37 @@ def require_motor(func):
return wrapper return wrapper
def require_mock_robot(func): def make_robot(robot_type: str, overrides: list[str] | None = None, mock=False) -> Robot:
""" if mock:
Decorator over test function to mock the robot overrides = [] if overrides is None else copy(overrides)
The decorated function must have two arguments `monkeypatch` and `robot_type`. if robot_type == "koch":
overrides.append("+leader_arms.main.mock=true")
overrides.append("+follower_arms.main.mock=true")
overrides.append("+cameras.laptop.mock=true")
overrides.append("+cameras.phone.mock=true")
Example of usage: elif robot_type == "koch_bimanual":
```python overrides.append("+leader_arms.left.mock=true")
@pytest.mark.parametrize( overrides.append("+leader_arms.right.mock=true")
"robot_type", ["koch", "aloha"] overrides.append("+follower_arms.left.mock=true")
) overrides.append("+follower_arms.right.mock=true")
@require_robot overrides.append("+cameras.laptop.mock=true")
def test_require_robot(request, robot_type): overrides.append("+cameras.phone.mock=true")
pass
```
"""
@wraps(func) elif robot_type == "aloha":
def wrapper(*args, **kwargs): overrides.append("+leader_arms.left.mock=true")
# Access the pytest request context to get the mockeypatch fixture overrides.append("+leader_arms.right.mock=true")
monkeypatch = kwargs.get("monkeypatch") overrides.append("+follower_arms.left.mock=true")
robot_type = kwargs.get("robot_type") overrides.append("+follower_arms.right.mock=true")
overrides.append("+cameras.cam_high.mock=true")
overrides.append("+cameras.cam_low.mock=true")
overrides.append("+cameras.cam_left_wrist.mock=true")
overrides.append("+cameras.cam_right_wrist.mock=true")
if monkeypatch is None: else:
raise ValueError("The 'monkeypatch' fixture must be an argument of the test function.") raise NotImplementedError(robot_type)
if robot_type is None:
raise ValueError("The 'robot_type' must be an argument of the test function.")
mock_robot(monkeypatch, robot_type)
return func(*args, **kwargs)
return wrapper
def require_mock_camera(func):
@wraps(func)
def wrapper(*args, **kwargs):
# Access the pytest request context to get the mockeypatch fixture
monkeypatch = kwargs.get("monkeypatch")
camera_type = kwargs.get("camera_type")
if monkeypatch is None:
raise ValueError("The 'monkeypatch' fixture must be an argument of the test function.")
if camera_type is None:
raise ValueError("The 'camera_type' must be an argument of the test function.")
mock_camera(monkeypatch, camera_type)
return func(*args, **kwargs)
return wrapper
def mock_motor(**kwargs):
pass
def mock_robot(monkeypatch, robot_type):
if robot_type not in available_robots:
raise ValueError(
f"The camera type '{robot_type}' is not valid. Expected one of these '{available_robots}"
)
if robot_type in ["koch", "koch_bimanual"]:
mock_camera(monkeypatch, "opencv")
mock_motor(monkeypatch, "dynamixel")
elif robot_type == "aloha":
mock_camera(monkeypatch, "intelrealsense")
mock_motor(monkeypatch, "dynamixel")
else:
raise NotImplementedError("Implement mocking logic for new robot.")
# To run calibration without user input
mock_builtins_input(monkeypatch)
def mock_camera(monkeypatch, camera_type):
if camera_type not in available_cameras:
raise ValueError(
f"The motor type '{camera_type}' is not valid. Expected one of these '{available_cameras}"
)
if camera_type == "opencv":
try:
import cv2
from tests.mock_opencv import MockVideoCapture
monkeypatch.setattr(cv2, "VideoCapture", MockVideoCapture)
except ImportError:
traceback.print_exc()
pytest.skip("To avoid skipping tests mocking opencv cameras, run `pip install opencv-python`.")
elif camera_type == "intelrealsense":
try:
import pyrealsense2 as rs
from tests.mock_intelrealsense import (
MockConfig,
MockContext,
MockFormat,
MockPipeline,
MockStream,
)
monkeypatch.setattr(rs, "config", MockConfig)
monkeypatch.setattr(rs, "pipeline", MockPipeline)
monkeypatch.setattr(rs, "stream", MockStream)
monkeypatch.setattr(rs, "format", MockFormat)
monkeypatch.setattr(rs, "context", MockContext)
except ImportError:
traceback.print_exc()
pytest.skip(
"To avoid skipping tests mocking intelrealsense cameras, run `pip install pyrealsense2`."
)
else:
raise NotImplementedError("Implement mocking logic for new camera.")
def mock_builtins_input(monkeypatch):
def print_text(text=None):
if text is not None:
print(text)
monkeypatch.setattr("builtins.input", print_text)
def make_robot(robot_type: str, overrides: list[str] | None = None) -> Robot:
config_path = ROBOT_CONFIG_PATH_TEMPLATE.format(robot=robot_type) config_path = ROBOT_CONFIG_PATH_TEMPLATE.format(robot=robot_type)
robot_cfg = init_hydra_config(config_path, overrides) robot_cfg = init_hydra_config(config_path, overrides)
robot = make_robot_from_cfg(robot_cfg) robot = make_robot_from_cfg(robot_cfg)