Files
lerobot/tests/cameras/test_reachy2_camera.py
Gaëlle Lannuzel 6a3d57031a 2 add reachy 2 to updated lerobot (#1767)
* Start adding Reachy 2 (no camera)

* Fix joint shape

* Remove print

* Modify observation_features

* Fix observation state

* Try adding a fake Reachy teleoperator

* Saving test scripts

* Add reachy2camera to cameras

* Add teleop_left camera to observation

* Create test_reachy2_camera.py

* Update utils.py

* Add all rgb cameras

* Future depth work

* Try adding mobile_base velocity

* Update tests

* Update data_acquisition_server.py

* Update with use_external_commands

* Replay

* Usable with or without mobile base

* No need for new isntance

* Use same ip for cameras

* Remove useless imports

* Add resume

* Divide joints in multiple dicts

* Divide joinits into several dicts in teleoperator

* Fix forgotten method call

* Create test_robot_client.py

* Open gripper on start

* Add arguments for cameras

* Modify get_frame() requested size

* Call generate_joints_dict on _init_

* black + isort

* Add reachy2 in imports

* Add reachy2 dependencies

* Add documentation

* Update reachy2.mdx

* Update reachy2.mdx

* Clean files and add types

* Fix type in send_action

* Remove print

* Delete test files

* Clean code

* Update cameras

* Disconnect from camera

* Run pre-commit hooks

* Update pyproject.toml

* Create test_reachy2.py

* Fix generate_joints

* Update test_reachy2.py

* Update send_action test

* Update reachy2_cameras depth + CameraManager

* Update reachy2_camera tests

* Remove useless import and args

* Rename reachy2_teleoperator

* Create test_reachy2_teleoperator.py

* Fix remainging fake_teleoperator

* Remove useless elements

* Mock cameras in test_reachy2

* Delete commented lines

* Add use_present_position to teleoperator

* Add cameras tests

* Add check no part + test

* Use disable_torque_on_disconnect

* Use odometry for vel with present_position

* Update documentation

* Fix vel value type

* Use ensure_safe_goal_position

* Import joints dict from classes

* Update reachy2.mdx

* Update reachy2.mdx

* Update minimal version

* Update minimal version

* fix(tests) fixes for reachy2 tests; removing reachy2 references from the script

* Add reachy2_sdk fake as plugins

---------

Co-authored-by: Michel Aractingi <michel.aractingi@huggingface.co>
2025-09-05 11:03:14 +02:00

178 lines
4.6 KiB
Python

#!/usr/bin/env python
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import time
from unittest.mock import MagicMock, patch
import numpy as np
import pytest
from lerobot.cameras.reachy2_camera import Reachy2Camera, Reachy2CameraConfig
from lerobot.errors import DeviceNotConnectedError
PARAMS = [
("teleop", "left"),
("teleop", "right"),
("depth", "rgb"),
# ("depth", "depth"), # Depth camera is not available yet
]
def _make_cam_manager_mock():
c = MagicMock(name="CameraManagerMock")
teleop = MagicMock(name="TeleopCam")
teleop.width = 640
teleop.height = 480
teleop.get_frame = MagicMock(
side_effect=lambda *_, **__: (
np.zeros((480, 640, 3), dtype=np.uint8),
time.time(),
)
)
depth = MagicMock(name="DepthCam")
depth.width = 640
depth.height = 480
depth.get_frame = MagicMock(
side_effect=lambda *_, **__: (
np.zeros((480, 640, 3), dtype=np.uint8),
time.time(),
)
)
c.is_connected.return_value = True
c.teleop = teleop
c.depth = depth
def _connect():
c.teleop = teleop
c.depth = depth
c.is_connected.return_value = True
def _disconnect():
c.teleop = None
c.depth = None
c.is_connected.return_value = False
c.connect = MagicMock(side_effect=_connect)
c.disconnect = MagicMock(side_effect=_disconnect)
# Mock methods
c.initialize_cameras = MagicMock()
return c
@pytest.fixture(
params=PARAMS,
# ids=["teleop-left", "teleop-right", "torso-rgb", "torso-depth"],
ids=["teleop-left", "teleop-right", "torso-rgb"],
)
def camera(request):
name, image_type = request.param
with (
patch(
"lerobot.cameras.reachy2_camera.reachy2_camera.CameraManager",
side_effect=lambda *a, **k: _make_cam_manager_mock(),
),
):
config = Reachy2CameraConfig(name=name, image_type=image_type)
cam = Reachy2Camera(config)
yield cam
if cam.is_connected:
cam.disconnect()
def test_connect(camera):
camera.connect()
assert camera.is_connected
camera.cam_manager.initialize_cameras.assert_called_once()
def test_read(camera):
camera.connect()
img = camera.read()
if camera.config.name == "teleop":
camera.cam_manager.teleop.get_frame.assert_called_once()
elif camera.config.name == "depth":
camera.cam_manager.depth.get_frame.assert_called_once()
assert isinstance(img, np.ndarray)
assert img.shape == (480, 640, 3)
def test_disconnect(camera):
camera.connect()
camera.disconnect()
assert not camera.is_connected
def test_async_read(camera):
camera.connect()
try:
img = camera.async_read()
assert camera.thread is not None
assert camera.thread.is_alive()
assert isinstance(img, np.ndarray)
finally:
if camera.is_connected:
camera.disconnect()
def test_async_read_timeout(camera):
camera.connect()
try:
with pytest.raises(TimeoutError):
camera.async_read(timeout_ms=0)
finally:
if camera.is_connected:
camera.disconnect()
def test_read_before_connect(camera):
with pytest.raises(DeviceNotConnectedError):
_ = camera.read()
def test_disconnect_before_connect(camera):
with pytest.raises(DeviceNotConnectedError):
camera.disconnect()
def test_async_read_before_connect(camera):
with pytest.raises(DeviceNotConnectedError):
_ = camera.async_read()
def test_wrong_camera_name():
with pytest.raises(ValueError):
_ = Reachy2CameraConfig(name="wrong-name", image_type="left")
def test_wrong_image_type():
with pytest.raises(ValueError):
_ = Reachy2CameraConfig(name="teleop", image_type="rgb")
with pytest.raises(ValueError):
_ = Reachy2CameraConfig(name="depth", image_type="left")
def test_wrong_color_mode():
with pytest.raises(ValueError):
_ = Reachy2CameraConfig(name="teleop", image_type="left", color_mode="wrong-color")