2 add reachy 2 to updated lerobot (#1767)

* Start adding Reachy 2 (no camera)

* Fix joint shape

* Remove print

* Modify observation_features

* Fix observation state

* Try adding a fake Reachy teleoperator

* Saving test scripts

* Add reachy2camera to cameras

* Add teleop_left camera to observation

* Create test_reachy2_camera.py

* Update utils.py

* Add all rgb cameras

* Future depth work

* Try adding mobile_base velocity

* Update tests

* Update data_acquisition_server.py

* Update with use_external_commands

* Replay

* Usable with or without mobile base

* No need for new isntance

* Use same ip for cameras

* Remove useless imports

* Add resume

* Divide joints in multiple dicts

* Divide joinits into several dicts in teleoperator

* Fix forgotten method call

* Create test_robot_client.py

* Open gripper on start

* Add arguments for cameras

* Modify get_frame() requested size

* Call generate_joints_dict on _init_

* black + isort

* Add reachy2 in imports

* Add reachy2 dependencies

* Add documentation

* Update reachy2.mdx

* Update reachy2.mdx

* Clean files and add types

* Fix type in send_action

* Remove print

* Delete test files

* Clean code

* Update cameras

* Disconnect from camera

* Run pre-commit hooks

* Update pyproject.toml

* Create test_reachy2.py

* Fix generate_joints

* Update test_reachy2.py

* Update send_action test

* Update reachy2_cameras depth + CameraManager

* Update reachy2_camera tests

* Remove useless import and args

* Rename reachy2_teleoperator

* Create test_reachy2_teleoperator.py

* Fix remainging fake_teleoperator

* Remove useless elements

* Mock cameras in test_reachy2

* Delete commented lines

* Add use_present_position to teleoperator

* Add cameras tests

* Add check no part + test

* Use disable_torque_on_disconnect

* Use odometry for vel with present_position

* Update documentation

* Fix vel value type

* Use ensure_safe_goal_position

* Import joints dict from classes

* Update reachy2.mdx

* Update reachy2.mdx

* Update minimal version

* Update minimal version

* fix(tests) fixes for reachy2 tests; removing reachy2 references from the script

* Add reachy2_sdk fake as plugins

---------

Co-authored-by: Michel Aractingi <michel.aractingi@huggingface.co>
This commit is contained in:
Gaëlle Lannuzel
2025-09-05 11:03:14 +02:00
committed by GitHub
parent d74494d92b
commit 6a3d57031a
22 changed files with 1984 additions and 3 deletions

View File

@@ -35,6 +35,8 @@
title: Koch v1.1
- local: lekiwi
title: LeKiwi
- local: reachy2
title: Reachy 2
title: "Robots"
- sections:
- local: notebooks

288
docs/source/reachy2.mdx Normal file
View File

@@ -0,0 +1,288 @@
# Reachy 2
Reachy 2 is an open-source humanoid robot made by Pollen Robotics, specifically designed for the development of embodied AI and real-world applications.
Check out [Pollen Robotics website](https://www.pollen-robotics.com/reachy/), or access [Reachy 2 documentation](https://docs.pollen-robotics.com/) for more information on the platform!
## Teleoperate Reachy 2
Currently, there are two ways to teleoperate Reachy 2:
- Pollen Robotics VR teleoperation (not included in LeRobot).
- Robot-to-robot teleoperation (use one Reachy 2 to control another).
## Reachy 2 Simulation
**(Linux only)** You can run Reachy 2 in simulation (Gazebo or MuJoCo) using the provided [Docker image](https://hub.docker.com/r/pollenrobotics/reachy2_core).
1. Install [Docker Engine](https://docs.docker.com/engine/).
2. Run (for MuJoCo):
```
docker run --rm -it \
--name reachy \
--privileged \
--network host \
--ipc host \
--device-cgroup-rule='c 189:* rwm' \
--group-add audio \
-e ROS_DOMAIN_ID="$ROS_DOMAIN_ID" \
-e DISPLAY="$DISPLAY" \
-e RCUTILS_CONSOLE_OUTPUT_FORMAT="[{severity}]: {message}" \
-e REACHY2_CORE_SERVICE_FAKE="${REACHY2_CORE_SERVICE_FAKE:-true}" \
-v /dev:/dev \
-v "$HOME/.reachy_config":/home/reachy/.reachy_config_override \
-v "$HOME/.reachy.log":/home/reachy/.ros/log \
-v /usr/lib/x86_64-linux-gnu:/opt/host-libs \
--entrypoint /package/launch.sh \
pollenrobotics/reachy2_core:1.7.5.9_deploy \
start_rviz:=true start_sdk_server:=true mujoco:=true
```
> If MuJoCo runs slowly (low simulation frequency), append `-e LD_LIBRARY_PATH="/opt/host-libs:$LD_LIBRARY_PATH" \` to the previous command to improve performance:
>
> ```
> docker run --rm -it \
> --name reachy \
> --privileged \
> --network host \
> --ipc host \
> --device-cgroup-rule='c 189:* rwm' \
> --group-add audio \
> -e ROS_DOMAIN_ID="$ROS_DOMAIN_ID" \
> -e DISPLAY="$DISPLAY" \
> -e RCUTILS_CONSOLE_OUTPUT_FORMAT="[{severity}]: {message}" \
> -e REACHY2_CORE_SERVICE_FAKE="${REACHY2_CORE_SERVICE_FAKE:-true}" \
> -e LD_LIBRARY_PATH="/opt/host-libs:$LD_LIBRARY_PATH" \
> -v /dev:/dev \
> -v "$HOME/.reachy_config":/home/reachy/.reachy_config_override \
> -v "$HOME/.reachy.log":/home/reachy/.ros/log \
> -v /usr/lib/x86_64-linux-gnu:/opt/host-libs \
> --entrypoint /package/launch.sh \
> pollenrobotics/reachy2_core:1.7.5.9_deploy \
> start_rviz:=true start_sdk_server:=true mujoco:=true
> ```
## Setup
### Prerequisites
- On your robot, check the **service images** meet the minimum versions:
- **reachy2-core >= 1.7.5.2**
- **webrtc >= 2.0.1.1**
Then, if you want to use VR teleoperation:
- Install the [Reachy 2 teleoperation application](https://docs.pollen-robotics.com/teleoperation/teleoperation-introduction/discover-teleoperation/).
Use version **>=v1.2.0**
We recommend using two computers: one for teleoperation (Windows required) and another for recording with LeRobot.
### Install LeRobot
Follow the [installation instructions](https://github.com/huggingface/lerobot#installation) to install LeRobot.
Install LeRobot with Reachy 2 dependencies:
```bash
pip install -e ".[reachy2]"
```
### (Optional but recommended) Install pollen_data_acquisition_server
How you manage Reachy 2 recording sessions is up to you, but the **easiest** way is to use this server so you can control sessions directly from the VR teleoperation app.
> **Note:** Currently, only the VR teleoperation application works as a client for this server, so this step primarily targets teleoperation. Youre free to develop custom clients to manage sessions to your needs.
In your LeRobot environment, install the server from source:
```bash
git clone https://github.com/pollen-robotics/pollen_data_acquisition_server.git
cd pollen_data_acquisition_server
pip install -e .
```
Find the [pollen_data_acquisition_server documentation here](https://github.com/pollen-robotics/pollen_data_acquisition_server).
## Step 1: Recording
### Get Reachy 2 IP address
Before starting teleoperation and data recording, find the [robot's IP address](https://docs.pollen-robotics.com/getting-started/setup-reachy2/connect-reachy2/).
We strongly recommend connecting all devices (PC and robot) via **Ethernet**.
### Launch recording
There are two ways to manage recording sessions when using the Reachy 2 VR teleoperation application:
- **Using the data acquisition server (recommended for VR teleop)**: The VR app orchestrates sessions (via the server it tells LeRobot when to create datasets, start/stop episodes) while also controlling the robots motions.
- **Using LeRobots record script**: LeRobot owns session control and decides when to start/stop episodes. If you also use the VR teleop app, its only for motion control.
### Option 1: Using Pollen data acquisition server (recommended for VR teleop)
Make sure you have installed pollen_data_acquisition_server, as explained in the Setup section.
Launch the data acquisition server to be able to manage your session directly from the teleoperation application:
```bash
python -m pollen_data_acquisition_server.server
```
Then get into the teleoperation application and choose "Data acquisition session".
You can finally setup your session by following the screens displayed.
> Even without the VR app, you can use the `pollen_data_acquisition_server` with your own client implementation.
### Option 2: Using lerobot.record
Reachy 2 is fully supported by LeRobots recording features.
If you choose this option but still want to use the VR teleoperation application, select "Standard session" in the app.
**Example: start a recording without the mobile base:**
First add reachy2 and reachy2_teleoperator to the imports of the record script. Then you can use the following command:
```bash
python -m lerobot.record \
--robot.type=reachy2 \
--robot.ip_address=192.168.0.200 \
--robot.id=r2-0000 \
--robot.use_external_commands=true \
--robot.with_mobile_base=false \
--teleop.type=reachy2_teleoperator \
--teleop.ip_address=192.168.0.200 \
--teleop.with_mobile_base=false \
--dataset.repo_id=pollen_robotics/record_test \
--dataset.single_task="Reachy 2 recording test" \
--dataset.num_episodes=1 \
--dataset.episode_time_s=5 \
--dataset.fps=15 \
--dataset.push_to_hub=true \
--dataset.private=true \
--display_data=true
```
#### Specific Options
**Extended setup overview (all options included):**
```bash
python -m lerobot.record \
--robot.type=reachy2 \
--robot.ip_address=192.168.0.200 \
--robot.use_external_commands=true \
--robot.with_mobile_base=true \
--robot.with_l_arm=true \
--robot.with_r_arm=true \
--robot.with_neck=true \
--robot.with_antennas=true \
--robot.with_left_teleop_camera=true \
--robot.with_right_teleop_camera=true \
--robot.with_torso_camera=false \
--robot.disable_torque_on_disconnect=false \
--robot.max_relative_target=5.0 \
--teleop.type=reachy2_teleoperator \
--teleop.ip_address=192.168.0.200 \
--teleop.use_present_position=false \
--teleop.with_mobile_base=false \
--teleop.with_l_arm=true \
--teleop.with_r_arm=true \
--teleop.with_neck=true \
--teleop.with_antennas=true \
--dataset.repo_id=pollen_robotics/record_test \
--dataset.single_task="Reachy 2 recording test" \
--dataset.num_episodes=1 \
--dataset.episode_time_s=5 \
--dataset.fps=15 \
--dataset.push_to_hub=true \
--dataset.private=true \
--display_data=true
```
##### `--robot.use_external_commands`
Determine whether LeRobot robot.send_action() sends commands to the robot.
**Must** be set to false while using the VR teleoperation application, as the app already sends commands.
##### `--teleop.use_present_position`
Determine whether the teleoperator reads the goal or present position of the robot.
Must be set to true if a compliant Reachy 2 is used to control another one.
##### Use the relevant parts
From our initial tests, recording **all** joints when only some are moving can reduce model quality with certain policies.
To avoid this, you can exclude specific parts from recording and replay using:
````
--robot.with_<part>=false
```,
with `<part>` being one of : `mobile_base`, `l_arm`, `r_arm", `neck`, `antennas`.
It determine whether the corresponding part is recorded in the observations. True if not set.
By default, **all parts are recorded**.
The same per-part mechanism is available in `reachy2_teleoperator` as well.
````
--teleop.with\_<part>
```
with `<part>` being one of : `mobile_base`, `l_arm`, `r_arm", `neck`, `antennas`.
Determine whether the corresponding part is recorded in the actions. True if not set.
> **Important:** In a given session, the **enabled parts must match** on both the robot and the teleoperator.
For example, if the robot runs with `--robot.with_mobile_base=false`, the teleoperator must disable the same part `--teleoperator.with_mobile_base=false`.
##### Use the relevant cameras
You can do the same for **cameras**. By default, only the **teleoperation cameras** are recorded (both `left_teleop_camera` and `right_teleop_camera`). Enable or disable each camera with:
```
--robot.with_left_teleop_camera=<true|false>
--robot.with_right_teleop_camera=<true|false>
--robot.with_torso_camera=<true|false>
````
## Step 2: Replay
Make sure the robot is configured with the same parts as the dataset:
```bash
python -m lerobot.replay \
--robot.type=reachy2 \
--robot.ip_address=192.168.0.200 \
--robot.use_external_commands=false \
--robot.with_mobile_base=false \
--dataset.repo_id=pollen_robotics/record_test \
--dataset.episode=0
--display_data=true
````
## Step 3: Train
```bash
python -m lerobot.scripts.train \
--dataset.repo_id=pollen_robotics/record_test \
--policy.type=act \
--output_dir=outputs/train/reachy2_test \
--job_name=reachy2 \
--policy.device=mps \
--wandb.enable=true \
--policy.repo_id=pollen_robotics/record_test_policy
```
## Step 4: Evaluate
```bash
python -m lerobot.record \
--robot.type=reachy2 \
--robot.ip_address=192.168.0.200 \
--display_data=false \
--dataset.repo_id=pollen_robotics/eval_record_test \
--dataset.single_task="Evaluate reachy2 policy" \
--dataset.num_episodes=10 \
--policy.path=outputs/train/reachy2_test/checkpoints/last/pretrained_model
```

View File

@@ -106,6 +106,7 @@ dynamixel = ["dynamixel-sdk>=3.7.31"]
gamepad = ["lerobot[pygame-dep]", "hidapi>=0.14.0"]
hopejr = ["lerobot[feetech]", "lerobot[pygame-dep]"]
lekiwi = ["lerobot[feetech]", "pyzmq>=26.2.1"]
reachy2 = ["reachy2_sdk>=1.0.14"]
kinematics = ["lerobot[placo-dep]"]
intelrealsense = [
"pyrealsense2>=2.55.1.6486 ; sys_platform != 'darwin'",
@@ -141,6 +142,7 @@ all = [
"lerobot[gamepad]",
"lerobot[hopejr]",
"lerobot[lekiwi]",
"lerobot[reachy2]",
"lerobot[kinematics]",
"lerobot[intelrealsense]",
"lerobot[pi0]",

View File

@@ -0,0 +1,16 @@
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .configuration_reachy2_camera import Reachy2CameraConfig
from .reachy2_camera import Reachy2Camera

View File

@@ -0,0 +1,78 @@
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from dataclasses import dataclass
from ..configs import CameraConfig, ColorMode
@CameraConfig.register_subclass("reachy2_camera")
@dataclass
class Reachy2CameraConfig(CameraConfig):
"""Configuration class for Reachy 2 camera devices.
This class provides configuration options for Reachy 2 cameras,
supporting both the teleop and depth cameras. It includes settings
for resolution, frame rate, color mode, and the selection of the cameras.
Example configurations:
```python
# Basic configurations
Reachy2CameraConfig(
name="teleop",
image_type="left",
ip_address="192.168.0.200", # IP address of the robot
fps=15,
width=640,
height=480,
color_mode=ColorMode.RGB,
) # Left teleop camera, 640x480 @ 15FPS
```
Attributes:
name: Name of the camera device. Can be "teleop" or "depth".
image_type: Type of image stream. For "teleop" camera, can be "left" or "right".
For "depth" camera, can be "rgb" or "depth". (depth is not supported yet)
fps: Requested frames per second for the color stream.
width: Requested frame width in pixels for the color stream.
height: Requested frame height in pixels for the color stream.
color_mode: Color mode for image output (RGB or BGR). Defaults to RGB.
ip_address: IP address of the robot. Defaults to "localhost".
port: Port number for the camera server. Defaults to 50065.
Note:
- Only 3-channel color output (RGB/BGR) is currently supported.
"""
name: str
image_type: str
color_mode: ColorMode = ColorMode.RGB
ip_address: str | None = "localhost"
port: int = 50065
# use_depth: bool = False
def __post_init__(self):
if self.name not in ["teleop", "depth"]:
raise ValueError(f"`name` is expected to be 'teleop' or 'depth', but {self.name} is provided.")
if (self.name == "teleop" and self.image_type not in ["left", "right"]) or (
self.name == "depth" and self.image_type not in ["rgb", "depth"]
):
raise ValueError(
f"`image_type` is expected to be 'left' or 'right' for teleop camera, and 'rgb' or 'depth' for depth camera, but {self.image_type} is provided."
)
if self.color_mode not in ["rgb", "bgr"]:
raise ValueError(
f"`color_mode` is expected to be 'rgb' or 'bgr', but {self.color_mode} is provided."
)

View File

@@ -0,0 +1,288 @@
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Provides the Reachy2Camera class for capturing frames from Reachy 2 cameras using Reachy 2's CameraManager.
"""
import logging
import os
import platform
import time
from threading import Event, Lock, Thread
from typing import Any
# Fix MSMF hardware transform compatibility for Windows before importing cv2
if platform.system() == "Windows" and "OPENCV_VIDEOIO_MSMF_ENABLE_HW_TRANSFORMS" not in os.environ:
os.environ["OPENCV_VIDEOIO_MSMF_ENABLE_HW_TRANSFORMS"] = "0"
import cv2
import numpy as np
from reachy2_sdk.media.camera import CameraView
from reachy2_sdk.media.camera_manager import CameraManager
from lerobot.errors import DeviceNotConnectedError
from ..camera import Camera
from .configuration_reachy2_camera import ColorMode, Reachy2CameraConfig
logger = logging.getLogger(__name__)
class Reachy2Camera(Camera):
"""
Manages Reachy 2 camera using Reachy 2 CameraManager.
This class provides a high-level interface to connect to, configure, and read
frames from Reachy 2 cameras. It supports both synchronous and asynchronous
frame reading.
An Reachy2Camera instance requires a camera name (e.g., "teleop") and an image
type (e.g., "left") to be specified in the configuration.
The camera's default settings (FPS, resolution, color mode) are used unless
overridden in the configuration.
"""
def __init__(self, config: Reachy2CameraConfig):
"""
Initializes the Reachy2Camera instance.
Args:
config: The configuration settings for the camera.
"""
super().__init__(config)
self.config = config
self.fps = config.fps
self.color_mode = config.color_mode
self.cam_manager: CameraManager | None = None
self.thread: Thread | None = None
self.stop_event: Event | None = None
self.frame_lock: Lock = Lock()
self.latest_frame: np.ndarray | None = None
self.new_frame_event: Event = Event()
def __str__(self) -> str:
return f"{self.__class__.__name__}({self.config.name}, {self.config.image_type})"
@property
def is_connected(self) -> bool:
"""Checks if the camera is currently connected and opened."""
if self.config.name == "teleop":
return self.cam_manager._grpc_connected and self.cam_manager.teleop if self.cam_manager else False
elif self.config.name == "depth":
return self.cam_manager._grpc_connected and self.cam_manager.depth if self.cam_manager else False
else:
raise ValueError(f"Invalid camera name '{self.config.name}'. Expected 'teleop' or 'depth'.")
def connect(self, warmup: bool = True):
"""
Connects to the Reachy2 CameraManager as specified in the configuration.
"""
self.cam_manager = CameraManager(host=self.config.ip_address, port=self.config.port)
self.cam_manager.initialize_cameras()
logger.info(f"{self} connected.")
@staticmethod
def find_cameras(ip_address: str = "localhost", port: int = 50065) -> list[dict[str, Any]]:
"""
Detects available Reachy 2 cameras.
Returns:
List[Dict[str, Any]]: A list of dictionaries,
where each dictionary contains 'name', 'stereo',
and the default profile properties (width, height, fps).
"""
initialized_cameras = []
camera_manager = CameraManager(host=ip_address, port=port)
for camera in [camera_manager.teleop, camera_manager.depth]:
if camera is None:
continue
height, width, _, _, _, _, _ = camera.get_parameters()
camera_info = {
"name": camera._cam_info.name,
"stereo": camera._cam_info.stereo,
"default_profile": {
"width": width,
"height": height,
"fps": 30,
},
}
initialized_cameras.append(camera_info)
camera_manager.disconnect()
return initialized_cameras
def read(self, color_mode: ColorMode | None = None) -> np.ndarray:
"""
Reads a single frame synchronously from the camera.
This is a blocking call.
Args:
color_mode (Optional[ColorMode]): If specified, overrides the default
color mode (`self.color_mode`) for this read operation (e.g.,
request RGB even if default is BGR).
Returns:
np.ndarray: The captured frame as a NumPy array in the format
(height, width, channels), using the specified or default
color mode and applying any configured rotation.
"""
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
start_time = time.perf_counter()
frame = None
if self.cam_manager is None:
raise DeviceNotConnectedError(f"{self} is not connected.")
else:
if self.config.name == "teleop" and hasattr(self.cam_manager, "teleop"):
if self.config.image_type == "left":
frame = self.cam_manager.teleop.get_frame(CameraView.LEFT, size=(640, 480))[0]
elif self.config.image_type == "right":
frame = self.cam_manager.teleop.get_frame(CameraView.RIGHT, size=(640, 480))[0]
elif self.config.name == "depth" and hasattr(self.cam_manager, "depth"):
if self.config.image_type == "depth":
frame = self.cam_manager.depth.get_depth_frame()[0]
elif self.config.image_type == "rgb":
frame = self.cam_manager.depth.get_frame(size=(640, 480))[0]
if frame is None:
return np.empty((0, 0, 3), dtype=np.uint8)
if self.config.color_mode == "rgb":
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
read_duration_ms = (time.perf_counter() - start_time) * 1e3
logger.debug(f"{self} read took: {read_duration_ms:.1f}ms")
return frame
def _read_loop(self):
"""
Internal loop run by the background thread for asynchronous reading.
On each iteration:
1. Reads a color frame
2. Stores result in latest_frame (thread-safe)
3. Sets new_frame_event to notify listeners
Stops on DeviceNotConnectedError, logs other errors and continues.
"""
while not self.stop_event.is_set():
try:
color_image = self.read()
with self.frame_lock:
self.latest_frame = color_image
self.new_frame_event.set()
except DeviceNotConnectedError:
break
except Exception as e:
logger.warning(f"Error reading frame in background thread for {self}: {e}")
def _start_read_thread(self) -> None:
"""Starts or restarts the background read thread if it's not running."""
if self.thread is not None and self.thread.is_alive():
self.thread.join(timeout=0.1)
if self.stop_event is not None:
self.stop_event.set()
self.stop_event = Event()
self.thread = Thread(target=self._read_loop, args=(), name=f"{self}_read_loop")
self.thread.daemon = True
self.thread.start()
def _stop_read_thread(self) -> None:
"""Signals the background read thread to stop and waits for it to join."""
if self.stop_event is not None:
self.stop_event.set()
if self.thread is not None and self.thread.is_alive():
self.thread.join(timeout=2.0)
self.thread = None
self.stop_event = None
def async_read(self, timeout_ms: float = 200) -> np.ndarray:
"""
Reads the latest available frame asynchronously.
This method retrieves the most recent frame captured by the background
read thread. It does not block waiting for the camera hardware directly,
but may wait up to timeout_ms for the background thread to provide a frame.
Args:
timeout_ms (float): Maximum time in milliseconds to wait for a frame
to become available. Defaults to 200ms (0.2 seconds).
Returns:
np.ndarray: The latest captured frame as a NumPy array in the format
(height, width, channels), processed according to configuration.
Raises:
DeviceNotConnectedError: If the camera is not connected.
TimeoutError: If no frame becomes available within the specified timeout.
RuntimeError: If an unexpected error occurs.
"""
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
if self.thread is None or not self.thread.is_alive():
self._start_read_thread()
if not self.new_frame_event.wait(timeout=timeout_ms / 1000.0):
thread_alive = self.thread is not None and self.thread.is_alive()
raise TimeoutError(
f"Timed out waiting for frame from camera {self} after {timeout_ms} ms. "
f"Read thread alive: {thread_alive}."
)
with self.frame_lock:
frame = self.latest_frame
self.new_frame_event.clear()
if frame is None:
raise RuntimeError(f"Internal error: Event set but no frame available for {self}.")
return frame
def disconnect(self):
"""
Stops the background read thread (if running).
Raises:
DeviceNotConnectedError: If the camera is already disconnected.
"""
if not self.is_connected and self.thread is None:
raise DeviceNotConnectedError(f"{self} not connected.")
if self.thread is not None:
self._stop_read_thread()
if self.cam_manager is not None:
self.cam_manager.disconnect()
logger.info(f"{self} disconnected.")

View File

@@ -37,8 +37,14 @@ def make_cameras_from_configs(camera_configs: dict[str, CameraConfig]) -> dict[s
from .realsense.camera_realsense import RealSenseCamera
cameras[key] = RealSenseCamera(cfg)
elif cfg.type == "reachy2_camera":
from .reachy2_camera.reachy2_camera import Reachy2Camera
cameras[key] = Reachy2Camera(cfg)
else:
raise ValueError(f"The motor type '{cfg.type}' is not valid.")
raise ValueError(f"The camera type '{cfg.type}' is not valid.")
return cameras

View File

@@ -67,7 +67,6 @@ from lerobot.cameras import ( # noqa: F401
CameraConfig, # noqa: F401
)
from lerobot.cameras.opencv.configuration_opencv import OpenCVCameraConfig # noqa: F401
from lerobot.cameras.realsense.configuration_realsense import RealSenseCameraConfig # noqa: F401
from lerobot.configs import parser
from lerobot.configs.policies import PreTrainedConfig
from lerobot.datasets.image_writer import safe_stop_image_writer
@@ -209,7 +208,14 @@ def record_loop(
(
t
for t in teleop
if isinstance(t, (so100_leader.SO100Leader, so101_leader.SO101Leader, koch_leader.KochLeader))
if isinstance(
t,
(
so100_leader.SO100Leader,
so101_leader.SO101Leader,
koch_leader.KochLeader,
),
)
),
None,
)

View File

@@ -55,6 +55,7 @@ from lerobot.robots import ( # noqa: F401
hope_jr,
koch_follower,
make_robot_from_config,
reachy2,
so100_follower,
so101_follower,
)

View File

@@ -0,0 +1,25 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .configuration_reachy2 import Reachy2RobotConfig
from .robot_reachy2 import (
REACHY2_ANTENNAS_JOINTS,
REACHY2_L_ARM_JOINTS,
REACHY2_NECK_JOINTS,
REACHY2_R_ARM_JOINTS,
REACHY2_VEL,
Reachy2Robot,
)

View File

@@ -0,0 +1,107 @@
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from dataclasses import dataclass, field
from lerobot.cameras import CameraConfig
from lerobot.cameras.configs import ColorMode
from lerobot.cameras.reachy2_camera import Reachy2CameraConfig
from ..config import RobotConfig
@RobotConfig.register_subclass("reachy2")
@dataclass
class Reachy2RobotConfig(RobotConfig):
# `max_relative_target` limits the magnitude of the relative positional target vector for safety purposes.
# Set this to a positive scalar to have the same value for all motors.
max_relative_target: float | None = None
# IP address of the Reachy 2 robot
ip_address: str | None = "localhost"
# If True, turn_off_smoothly() will be sent to the robot before disconnecting.
disable_torque_on_disconnect: bool = False
# Tag for external commands control
# Set to True if you use an external commands system to control the robot,
# such as the official teleoperation application: https://github.com/pollen-robotics/Reachy2Teleoperation
# If True, robot.send_action() will not send commands to the robot.
use_external_commands: bool = False
# Robot parts
# Set to False to not add the corresponding joints part to the robot list of joints.
# By default, all parts are set to True.
with_mobile_base: bool = True
with_l_arm: bool = True
with_r_arm: bool = True
with_neck: bool = True
with_antennas: bool = True
# Robot cameras
# Set to True if you want to use the corresponding cameras in the observations.
# By default, only the teleop cameras are used.
with_left_teleop_camera: bool = True
with_right_teleop_camera: bool = True
with_torso_camera: bool = False
cameras: dict[str, CameraConfig] = field(default_factory=dict)
def __post_init__(self) -> None:
# Add cameras with same ip_address as the robot
if self.with_left_teleop_camera:
self.cameras["teleop_left"] = Reachy2CameraConfig(
name="teleop",
image_type="left",
ip_address=self.ip_address,
fps=15,
width=640,
height=480,
color_mode=ColorMode.RGB,
)
if self.with_right_teleop_camera:
self.cameras["teleop_right"] = Reachy2CameraConfig(
name="teleop",
image_type="right",
ip_address=self.ip_address,
fps=15,
width=640,
height=480,
color_mode=ColorMode.RGB,
)
if self.with_torso_camera:
self.cameras["torso_rgb"] = Reachy2CameraConfig(
name="depth",
image_type="rgb",
ip_address=self.ip_address,
fps=15,
width=640,
height=480,
color_mode=ColorMode.RGB,
)
super().__post_init__()
if not (
self.with_mobile_base
or self.with_l_arm
or self.with_r_arm
or self.with_neck
or self.with_antennas
):
raise ValueError(
"No Reachy2Robot part used.\n"
"At least one part of the robot must be set to True "
"(with_mobile_base, with_l_arm, with_r_arm, with_neck, with_antennas)"
)

View File

@@ -0,0 +1,230 @@
#!/usr/bin/env python
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import time
from typing import Any
import numpy as np
from reachy2_sdk import ReachySDK
from lerobot.cameras.utils import make_cameras_from_configs
from ..robot import Robot
from ..utils import ensure_safe_goal_position
from .configuration_reachy2 import Reachy2RobotConfig
# {lerobot_keys: reachy2_sdk_keys}
REACHY2_NECK_JOINTS = {
"neck_yaw.pos": "head.neck.yaw",
"neck_pitch.pos": "head.neck.pitch",
"neck_roll.pos": "head.neck.roll",
}
REACHY2_ANTENNAS_JOINTS = {
"l_antenna.pos": "head.l_antenna",
"r_antenna.pos": "head.r_antenna",
}
REACHY2_R_ARM_JOINTS = {
"r_shoulder_pitch.pos": "r_arm.shoulder.pitch",
"r_shoulder_roll.pos": "r_arm.shoulder.roll",
"r_elbow_yaw.pos": "r_arm.elbow.yaw",
"r_elbow_pitch.pos": "r_arm.elbow.pitch",
"r_wrist_roll.pos": "r_arm.wrist.roll",
"r_wrist_pitch.pos": "r_arm.wrist.pitch",
"r_wrist_yaw.pos": "r_arm.wrist.yaw",
"r_gripper.pos": "r_arm.gripper",
}
REACHY2_L_ARM_JOINTS = {
"l_shoulder_pitch.pos": "l_arm.shoulder.pitch",
"l_shoulder_roll.pos": "l_arm.shoulder.roll",
"l_elbow_yaw.pos": "l_arm.elbow.yaw",
"l_elbow_pitch.pos": "l_arm.elbow.pitch",
"l_wrist_roll.pos": "l_arm.wrist.roll",
"l_wrist_pitch.pos": "l_arm.wrist.pitch",
"l_wrist_yaw.pos": "l_arm.wrist.yaw",
"l_gripper.pos": "l_arm.gripper",
}
REACHY2_VEL = {
"mobile_base.vx": "vx",
"mobile_base.vy": "vy",
"mobile_base.vtheta": "vtheta",
}
class Reachy2Robot(Robot):
"""
[Reachy 2](https://www.pollen-robotics.com/reachy/), by Pollen Robotics.
"""
config_class = Reachy2RobotConfig
name = "reachy2"
def __init__(self, config: Reachy2RobotConfig):
super().__init__(config)
self.config = config
self.robot_type = self.config.type
self.use_external_commands = self.config.use_external_commands
self.reachy: None | ReachySDK = None
self.cameras = make_cameras_from_configs(config.cameras)
self.logs: dict[str, float] = {}
self.joints_dict: dict[str, str] = self._generate_joints_dict()
@property
def observation_features(self) -> dict[str, Any]:
return {**self.motors_features, **self.camera_features}
@property
def action_features(self) -> dict[str, type]:
return self.motors_features
@property
def camera_features(self) -> dict[str, tuple[int | None, int | None, int]]:
return {cam: (self.cameras[cam].height, self.cameras[cam].width, 3) for cam in self.cameras}
@property
def motors_features(self) -> dict[str, type]:
if self.config.with_mobile_base:
return {
**dict.fromkeys(
self.joints_dict.keys(),
float,
),
**dict.fromkeys(
REACHY2_VEL.keys(),
float,
),
}
else:
return dict.fromkeys(self.joints_dict.keys(), float)
@property
def is_connected(self) -> bool:
return self.reachy.is_connected() if self.reachy is not None else False
def connect(self, calibrate: bool = False) -> None:
self.reachy = ReachySDK(self.config.ip_address)
if not self.is_connected:
raise ConnectionError()
for cam in self.cameras.values():
cam.connect()
self.configure()
def configure(self) -> None:
if self.reachy is not None:
self.reachy.turn_on()
self.reachy.reset_default_limits()
@property
def is_calibrated(self) -> bool:
return True
def calibrate(self) -> None:
pass
def _generate_joints_dict(self) -> dict[str, str]:
joints = {}
if self.config.with_neck:
joints.update(REACHY2_NECK_JOINTS)
if self.config.with_l_arm:
joints.update(REACHY2_L_ARM_JOINTS)
if self.config.with_r_arm:
joints.update(REACHY2_R_ARM_JOINTS)
if self.config.with_antennas:
joints.update(REACHY2_ANTENNAS_JOINTS)
return joints
def _get_state(self) -> dict[str, float]:
if self.reachy is not None:
pos_dict = {k: self.reachy.joints[v].present_position for k, v in self.joints_dict.items()}
if not self.config.with_mobile_base:
return pos_dict
vel_dict = {k: self.reachy.mobile_base.odometry[v] for k, v in REACHY2_VEL.items()}
return {**pos_dict, **vel_dict}
else:
return {}
def get_observation(self) -> dict[str, np.ndarray]:
obs_dict: dict[str, Any] = {}
# Read Reachy 2 state
before_read_t = time.perf_counter()
obs_dict.update(self._get_state())
self.logs["read_pos_dt_s"] = time.perf_counter() - before_read_t
# Capture images from cameras
for cam_key, cam in self.cameras.items():
obs_dict[cam_key] = cam.async_read()
return obs_dict
def send_action(self, action: dict[str, Any]) -> dict[str, Any]:
if self.reachy is not None:
if not self.is_connected:
raise ConnectionError()
before_write_t = time.perf_counter()
vel = {}
goal_pos = {}
for key, val in action.items():
if key not in self.joints_dict:
if key not in REACHY2_VEL:
raise KeyError(f"Key '{key}' is not a valid motor key in Reachy 2.")
else:
vel[REACHY2_VEL[key]] = float(val)
else:
if not self.use_external_commands and self.config.max_relative_target is not None:
goal_pos[key] = float(val)
goal_present_pos = {
key: (
goal_pos[key],
self.reachy.joints[self.joints_dict[key]].present_position,
)
}
safe_goal_pos = ensure_safe_goal_position(
goal_present_pos, float(self.config.max_relative_target)
)
val = safe_goal_pos[key]
self.reachy.joints[self.joints_dict[key]].goal_position = float(val)
if self.config.with_mobile_base:
self.reachy.mobile_base.set_goal_speed(vel["vx"], vel["vy"], vel["vtheta"])
# We don't send the goal positions if we control Reachy 2 externally
if not self.use_external_commands:
self.reachy.send_goal_positions()
if self.config.with_mobile_base:
self.reachy.mobile_base.send_speed_command()
self.logs["write_pos_dt_s"] = time.perf_counter() - before_write_t
return action
def disconnect(self) -> None:
if self.reachy is not None:
for cam in self.cameras.values():
cam.disconnect()
if self.config.disable_torque_on_disconnect:
self.reachy.turn_off_smoothly()
self.reachy.disconnect()

View File

@@ -61,6 +61,10 @@ def make_robot_from_config(config: RobotConfig) -> Robot:
from .bi_so100_follower import BiSO100Follower
return BiSO100Follower(config)
elif config.type == "reachy2":
from .reachy2 import Reachy2Robot
return Reachy2Robot(config)
elif config.type == "mock_robot":
from tests.mocks.mock_robot import MockRobot

View File

@@ -0,0 +1,25 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .config_reachy2_teleoperator import Reachy2TeleoperatorConfig
from .reachy2_teleoperator import (
REACHY2_ANTENNAS_JOINTS,
REACHY2_L_ARM_JOINTS,
REACHY2_NECK_JOINTS,
REACHY2_R_ARM_JOINTS,
REACHY2_VEL,
Reachy2Teleoperator,
)

View File

@@ -0,0 +1,51 @@
#!/usr/bin/env python
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from dataclasses import dataclass
from ..config import TeleoperatorConfig
@TeleoperatorConfig.register_subclass("reachy2_teleoperator")
@dataclass
class Reachy2TeleoperatorConfig(TeleoperatorConfig):
# IP address of the Reachy 2 robot used as teleoperator
ip_address: str | None = "localhost"
# Whether to use the present position of the joints as actions
# if False, the goal position of the joints will be used
use_present_position: bool = False
# Which parts of the robot to use
with_mobile_base: bool = True
with_l_arm: bool = True
with_r_arm: bool = True
with_neck: bool = True
with_antennas: bool = True
def __post_init__(self):
if not (
self.with_mobile_base
or self.with_l_arm
or self.with_r_arm
or self.with_neck
or self.with_antennas
):
raise ValueError(
"No Reachy2Teleoperator part used.\n"
"At least one part of the robot must be set to True "
"(with_mobile_base, with_l_arm, with_r_arm, with_neck, with_antennas)"
)

View File

@@ -0,0 +1,164 @@
#!/usr/bin/env python
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import time
from reachy2_sdk import ReachySDK
from ..teleoperator import Teleoperator
from .config_reachy2_teleoperator import Reachy2TeleoperatorConfig
logger = logging.getLogger(__name__)
# {lerobot_keys: reachy2_sdk_keys}
REACHY2_NECK_JOINTS = {
"neck_yaw.pos": "head.neck.yaw",
"neck_pitch.pos": "head.neck.pitch",
"neck_roll.pos": "head.neck.roll",
}
REACHY2_ANTENNAS_JOINTS = {
"l_antenna.pos": "head.l_antenna",
"r_antenna.pos": "head.r_antenna",
}
REACHY2_R_ARM_JOINTS = {
"r_shoulder_pitch.pos": "r_arm.shoulder.pitch",
"r_shoulder_roll.pos": "r_arm.shoulder.roll",
"r_elbow_yaw.pos": "r_arm.elbow.yaw",
"r_elbow_pitch.pos": "r_arm.elbow.pitch",
"r_wrist_roll.pos": "r_arm.wrist.roll",
"r_wrist_pitch.pos": "r_arm.wrist.pitch",
"r_wrist_yaw.pos": "r_arm.wrist.yaw",
"r_gripper.pos": "r_arm.gripper",
}
REACHY2_L_ARM_JOINTS = {
"l_shoulder_pitch.pos": "l_arm.shoulder.pitch",
"l_shoulder_roll.pos": "l_arm.shoulder.roll",
"l_elbow_yaw.pos": "l_arm.elbow.yaw",
"l_elbow_pitch.pos": "l_arm.elbow.pitch",
"l_wrist_roll.pos": "l_arm.wrist.roll",
"l_wrist_pitch.pos": "l_arm.wrist.pitch",
"l_wrist_yaw.pos": "l_arm.wrist.yaw",
"l_gripper.pos": "l_arm.gripper",
}
REACHY2_VEL = {
"mobile_base.vx": "vx",
"mobile_base.vy": "vy",
"mobile_base.vtheta": "vtheta",
}
class Reachy2Teleoperator(Teleoperator):
"""
[Reachy 2](https://www.pollen-robotics.com/reachy/), by Pollen Robotics.
"""
config_class = Reachy2TeleoperatorConfig
name = "reachy2_specific"
def __init__(self, config: Reachy2TeleoperatorConfig):
super().__init__(config)
self.config = config
self.reachy: None | ReachySDK = None
self.joints_dict: dict[str, str] = self._generate_joints_dict()
def _generate_joints_dict(self) -> dict[str, str]:
joints = {}
if self.config.with_neck:
joints.update(REACHY2_NECK_JOINTS)
if self.config.with_l_arm:
joints.update(REACHY2_L_ARM_JOINTS)
if self.config.with_r_arm:
joints.update(REACHY2_R_ARM_JOINTS)
if self.config.with_antennas:
joints.update(REACHY2_ANTENNAS_JOINTS)
return joints
@property
def action_features(self) -> dict[str, type]:
if self.config.with_mobile_base:
return {
**dict.fromkeys(
self.joints_dict.keys(),
float,
),
**dict.fromkeys(
REACHY2_VEL.keys(),
float,
),
}
else:
return dict.fromkeys(self.joints_dict.keys(), float)
@property
def feedback_features(self) -> dict[str, type]:
return {}
@property
def is_connected(self) -> bool:
return self.reachy.is_connected() if self.reachy is not None else False
def connect(self, calibrate: bool = True) -> None:
self.reachy = ReachySDK(self.config.ip_address)
if not self.is_connected:
raise ConnectionError()
logger.info(f"{self} connected.")
@property
def is_calibrated(self) -> bool:
return True
def calibrate(self) -> None:
pass
def configure(self) -> None:
pass
def get_action(self) -> dict[str, float]:
start = time.perf_counter()
if self.reachy and self.is_connected:
if self.config.use_present_position:
joint_action = {
k: self.reachy.joints[v].present_position for k, v in self.joints_dict.items()
}
else:
joint_action = {k: self.reachy.joints[v].goal_position for k, v in self.joints_dict.items()}
if not self.config.with_mobile_base:
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read action: {dt_ms:.1f}ms")
return joint_action
if self.config.use_present_position:
vel_action = {k: self.reachy.mobile_base.odometry[v] for k, v in REACHY2_VEL.items()}
else:
vel_action = {k: self.reachy.mobile_base.last_cmd_vel[v] for k, v in REACHY2_VEL.items()}
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read action: {dt_ms:.1f}ms")
return {**joint_action, **vel_action}
def send_feedback(self, feedback: dict[str, float]) -> None:
raise NotImplementedError
def disconnect(self) -> None:
if self.reachy and self.is_connected:
self.reachy.disconnect()

View File

@@ -65,5 +65,9 @@ def make_teleoperator_from_config(config: TeleoperatorConfig) -> Teleoperator:
from .bi_so100_leader import BiSO100Leader
return BiSO100Leader(config)
elif config.type == "reachy2_teleoperator":
from .reachy2_teleoperator import Reachy2Teleoperator
return Reachy2Teleoperator(config)
else:
raise ValueError(config.type)

View File

@@ -0,0 +1,177 @@
#!/usr/bin/env python
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import time
from unittest.mock import MagicMock, patch
import numpy as np
import pytest
from lerobot.cameras.reachy2_camera import Reachy2Camera, Reachy2CameraConfig
from lerobot.errors import DeviceNotConnectedError
PARAMS = [
("teleop", "left"),
("teleop", "right"),
("depth", "rgb"),
# ("depth", "depth"), # Depth camera is not available yet
]
def _make_cam_manager_mock():
c = MagicMock(name="CameraManagerMock")
teleop = MagicMock(name="TeleopCam")
teleop.width = 640
teleop.height = 480
teleop.get_frame = MagicMock(
side_effect=lambda *_, **__: (
np.zeros((480, 640, 3), dtype=np.uint8),
time.time(),
)
)
depth = MagicMock(name="DepthCam")
depth.width = 640
depth.height = 480
depth.get_frame = MagicMock(
side_effect=lambda *_, **__: (
np.zeros((480, 640, 3), dtype=np.uint8),
time.time(),
)
)
c.is_connected.return_value = True
c.teleop = teleop
c.depth = depth
def _connect():
c.teleop = teleop
c.depth = depth
c.is_connected.return_value = True
def _disconnect():
c.teleop = None
c.depth = None
c.is_connected.return_value = False
c.connect = MagicMock(side_effect=_connect)
c.disconnect = MagicMock(side_effect=_disconnect)
# Mock methods
c.initialize_cameras = MagicMock()
return c
@pytest.fixture(
params=PARAMS,
# ids=["teleop-left", "teleop-right", "torso-rgb", "torso-depth"],
ids=["teleop-left", "teleop-right", "torso-rgb"],
)
def camera(request):
name, image_type = request.param
with (
patch(
"lerobot.cameras.reachy2_camera.reachy2_camera.CameraManager",
side_effect=lambda *a, **k: _make_cam_manager_mock(),
),
):
config = Reachy2CameraConfig(name=name, image_type=image_type)
cam = Reachy2Camera(config)
yield cam
if cam.is_connected:
cam.disconnect()
def test_connect(camera):
camera.connect()
assert camera.is_connected
camera.cam_manager.initialize_cameras.assert_called_once()
def test_read(camera):
camera.connect()
img = camera.read()
if camera.config.name == "teleop":
camera.cam_manager.teleop.get_frame.assert_called_once()
elif camera.config.name == "depth":
camera.cam_manager.depth.get_frame.assert_called_once()
assert isinstance(img, np.ndarray)
assert img.shape == (480, 640, 3)
def test_disconnect(camera):
camera.connect()
camera.disconnect()
assert not camera.is_connected
def test_async_read(camera):
camera.connect()
try:
img = camera.async_read()
assert camera.thread is not None
assert camera.thread.is_alive()
assert isinstance(img, np.ndarray)
finally:
if camera.is_connected:
camera.disconnect()
def test_async_read_timeout(camera):
camera.connect()
try:
with pytest.raises(TimeoutError):
camera.async_read(timeout_ms=0)
finally:
if camera.is_connected:
camera.disconnect()
def test_read_before_connect(camera):
with pytest.raises(DeviceNotConnectedError):
_ = camera.read()
def test_disconnect_before_connect(camera):
with pytest.raises(DeviceNotConnectedError):
camera.disconnect()
def test_async_read_before_connect(camera):
with pytest.raises(DeviceNotConnectedError):
_ = camera.async_read()
def test_wrong_camera_name():
with pytest.raises(ValueError):
_ = Reachy2CameraConfig(name="wrong-name", image_type="left")
def test_wrong_image_type():
with pytest.raises(ValueError):
_ = Reachy2CameraConfig(name="teleop", image_type="rgb")
with pytest.raises(ValueError):
_ = Reachy2CameraConfig(name="depth", image_type="left")
def test_wrong_color_mode():
with pytest.raises(ValueError):
_ = Reachy2CameraConfig(name="teleop", image_type="left", color_mode="wrong-color")

View File

@@ -28,6 +28,7 @@ pytest_plugins = [
"tests.fixtures.files",
"tests.fixtures.hub",
"tests.fixtures.optimizers",
"tests.plugins.reachy2_sdk",
]

View File

@@ -0,0 +1,30 @@
import sys
import types
from unittest.mock import MagicMock
def _install_reachy2_sdk_stub():
sdk = types.ModuleType("reachy2_sdk")
sdk.__path__ = []
sdk.ReachySDK = MagicMock(name="ReachySDK")
media = types.ModuleType("reachy2_sdk.media")
media.__path__ = []
camera = types.ModuleType("reachy2_sdk.media.camera")
camera.CameraView = MagicMock(name="CameraView")
camera_manager = types.ModuleType("reachy2_sdk.media.camera_manager")
camera_manager.CameraManager = MagicMock(name="CameraManager")
sdk.media = media
media.camera = camera
media.camera_manager = camera_manager
# Register in sys.modules
sys.modules.setdefault("reachy2_sdk", sdk)
sys.modules.setdefault("reachy2_sdk.media", media)
sys.modules.setdefault("reachy2_sdk.media.camera", camera)
sys.modules.setdefault("reachy2_sdk.media.camera_manager", camera_manager)
def pytest_sessionstart(session):
_install_reachy2_sdk_stub()

View File

@@ -0,0 +1,326 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from unittest.mock import MagicMock, patch
import numpy as np
import pytest
from lerobot.robots.reachy2 import (
REACHY2_ANTENNAS_JOINTS,
REACHY2_L_ARM_JOINTS,
REACHY2_NECK_JOINTS,
REACHY2_R_ARM_JOINTS,
REACHY2_VEL,
Reachy2Robot,
Reachy2RobotConfig,
)
# {lerobot_keys: reachy2_sdk_keys}
REACHY2_JOINTS = {
**REACHY2_NECK_JOINTS,
**REACHY2_ANTENNAS_JOINTS,
**REACHY2_R_ARM_JOINTS,
**REACHY2_L_ARM_JOINTS,
}
PARAMS = [
{}, # default config
{"with_mobile_base": False},
{"with_mobile_base": False, "with_l_arm": False, "with_antennas": False},
{"with_r_arm": False, "with_neck": False, "with_antennas": False},
{"use_external_commands": True, "disable_torque_on_disconnect": True},
{"use_external_commands": True, "with_mobile_base": False, "with_neck": False},
{"disable_torque_on_disconnect": False},
{"max_relative_target": 5},
{"with_right_teleop_camera": False},
{"with_left_teleop_camera": False, "with_right_teleop_camera": False},
{"with_left_teleop_camera": False, "with_torso_camera": True},
]
def _make_reachy2_sdk_mock():
class JointSpy:
__slots__ = (
"present_position",
"_goal_position",
"_on_set",
)
def __init__(self, present_position=0.0, on_set=None):
self.present_position = present_position
self._goal_position = present_position
self._on_set = on_set
@property
def goal_position(self):
return self._goal_position
@goal_position.setter
def goal_position(self, v):
self._goal_position = v
if self._on_set:
self._on_set()
r = MagicMock(name="ReachySDKMock")
r.is_connected.return_value = True
def _connect():
r.is_connected.return_value = True
def _disconnect():
r.is_connected.return_value = False
# Global counter of goal_position sets
r._goal_position_set_total = 0
def _on_any_goal_set():
r._goal_position_set_total += 1
# Mock joints with some dummy positions
joints = {
k: JointSpy(
present_position=float(i),
on_set=_on_any_goal_set,
)
for i, k in enumerate(REACHY2_JOINTS.values())
}
r.joints = joints
# Mock mobile base with some dummy odometry
r.mobile_base = MagicMock()
r.mobile_base.odometry = {
"x": 0.1,
"y": -0.2,
"theta": 21.3,
"vx": 0.001,
"vy": 0.002,
"vtheta": 0.0,
}
r.connect = MagicMock(side_effect=_connect)
r.disconnect = MagicMock(side_effect=_disconnect)
# Mock methods
r.turn_on = MagicMock()
r.reset_default_limits = MagicMock()
r.send_goal_positions = MagicMock()
r.turn_off_smoothly = MagicMock()
r.mobile_base.set_goal_speed = MagicMock()
r.mobile_base.send_speed_command = MagicMock()
return r
def _make_reachy2_camera_mock(*args, **kwargs):
cfg = args[0] if args else kwargs.get("config")
name = getattr(cfg, "name", kwargs.get("name", "cam"))
image_type = getattr(cfg, "image_type", kwargs.get("image_type", "cam"))
width = getattr(cfg, "width", kwargs.get("width", 640))
height = getattr(cfg, "height", kwargs.get("height", 480))
cam = MagicMock(name=f"Reachy2CameraMock:{name}")
cam.name = name
cam.image_type = image_type
cam.width = width
cam.height = height
cam.connect = MagicMock()
cam.disconnect = MagicMock()
cam.async_read = MagicMock(side_effect=lambda: np.zeros((height, width, 3), dtype=np.uint8))
return cam
@pytest.fixture(params=PARAMS, ids=lambda p: "default" if not p else ",".join(p.keys()))
def reachy2(request):
with (
patch(
"lerobot.robots.reachy2.robot_reachy2.ReachySDK",
side_effect=lambda *a, **k: _make_reachy2_sdk_mock(),
),
patch(
"lerobot.cameras.reachy2_camera.reachy2_camera.Reachy2Camera",
side_effect=_make_reachy2_camera_mock,
),
):
overrides = request.param
cfg = Reachy2RobotConfig(ip_address="192.168.0.200", **overrides)
robot = Reachy2Robot(cfg)
yield robot
if robot.is_connected:
robot.disconnect()
def test_connect_disconnect(reachy2):
assert not reachy2.is_connected
reachy2.connect()
assert reachy2.is_connected
reachy2.reachy.turn_on.assert_called_once()
reachy2.reachy.reset_default_limits.assert_called_once()
reachy2.disconnect()
assert not reachy2.is_connected
if reachy2.config.disable_torque_on_disconnect:
reachy2.reachy.turn_off_smoothly.assert_called_once()
else:
reachy2.reachy.turn_off_smoothly.assert_not_called()
reachy2.reachy.disconnect.assert_called_once()
def test_get_joints_dict(reachy2):
reachy2.connect()
if reachy2.config.with_neck:
assert "neck_yaw.pos" in reachy2.joints_dict
assert "neck_pitch.pos" in reachy2.joints_dict
assert "neck_roll.pos" in reachy2.joints_dict
else:
assert "neck_yaw.pos" not in reachy2.joints_dict
assert "neck_pitch.pos" not in reachy2.joints_dict
assert "neck_roll.pos" not in reachy2.joints_dict
if reachy2.config.with_antennas:
assert "l_antenna.pos" in reachy2.joints_dict
assert "r_antenna.pos" in reachy2.joints_dict
else:
assert "l_antenna.pos" not in reachy2.joints_dict
assert "r_antenna.pos" not in reachy2.joints_dict
if reachy2.config.with_r_arm:
assert "r_shoulder_pitch.pos" in reachy2.joints_dict
assert "r_shoulder_roll.pos" in reachy2.joints_dict
assert "r_elbow_yaw.pos" in reachy2.joints_dict
assert "r_elbow_pitch.pos" in reachy2.joints_dict
assert "r_wrist_roll.pos" in reachy2.joints_dict
assert "r_wrist_pitch.pos" in reachy2.joints_dict
assert "r_wrist_yaw.pos" in reachy2.joints_dict
assert "r_gripper.pos" in reachy2.joints_dict
else:
assert "r_shoulder_pitch.pos" not in reachy2.joints_dict
assert "r_shoulder_roll.pos" not in reachy2.joints_dict
assert "r_elbow_yaw.pos" not in reachy2.joints_dict
assert "r_elbow_pitch.pos" not in reachy2.joints_dict
assert "r_wrist_roll.pos" not in reachy2.joints_dict
assert "r_wrist_pitch.pos" not in reachy2.joints_dict
assert "r_wrist_yaw.pos" not in reachy2.joints_dict
assert "r_gripper.pos" not in reachy2.joints_dict
if reachy2.config.with_l_arm:
assert "l_shoulder_pitch.pos" in reachy2.joints_dict
assert "l_shoulder_roll.pos" in reachy2.joints_dict
assert "l_elbow_yaw.pos" in reachy2.joints_dict
assert "l_elbow_pitch.pos" in reachy2.joints_dict
assert "l_wrist_roll.pos" in reachy2.joints_dict
assert "l_wrist_pitch.pos" in reachy2.joints_dict
assert "l_wrist_yaw.pos" in reachy2.joints_dict
assert "l_gripper.pos" in reachy2.joints_dict
else:
assert "l_shoulder_pitch.pos" not in reachy2.joints_dict
assert "l_shoulder_roll.pos" not in reachy2.joints_dict
assert "l_elbow_yaw.pos" not in reachy2.joints_dict
assert "l_elbow_pitch.pos" not in reachy2.joints_dict
assert "l_wrist_roll.pos" not in reachy2.joints_dict
assert "l_wrist_pitch.pos" not in reachy2.joints_dict
assert "l_wrist_yaw.pos" not in reachy2.joints_dict
assert "l_gripper.pos" not in reachy2.joints_dict
def test_get_observation(reachy2):
reachy2.connect()
obs = reachy2.get_observation()
expected_keys = set(reachy2.joints_dict)
expected_keys.update(f"{v}" for v in REACHY2_VEL.keys() if reachy2.config.with_mobile_base)
expected_keys.update(reachy2.cameras.keys())
assert set(obs.keys()) == expected_keys
for motor in reachy2.joints_dict.keys():
assert obs[motor] == reachy2.reachy.joints[REACHY2_JOINTS[motor]].present_position
if reachy2.config.with_mobile_base:
for vel in REACHY2_VEL.keys():
assert obs[vel] == reachy2.reachy.mobile_base.odometry[REACHY2_VEL[vel]]
if reachy2.config.with_left_teleop_camera:
assert obs["teleop_left"].shape == (
reachy2.config.cameras["teleop_left"].height,
reachy2.config.cameras["teleop_left"].width,
3,
)
if reachy2.config.with_right_teleop_camera:
assert obs["teleop_right"].shape == (
reachy2.config.cameras["teleop_right"].height,
reachy2.config.cameras["teleop_right"].width,
3,
)
if reachy2.config.with_torso_camera:
assert obs["torso_rgb"].shape == (
reachy2.config.cameras["torso_rgb"].height,
reachy2.config.cameras["torso_rgb"].width,
3,
)
def test_send_action(reachy2):
reachy2.connect()
action = {k: i * 10.0 for i, k in enumerate(reachy2.joints_dict.keys(), start=1)}
if reachy2.config.with_mobile_base:
action.update({k: i * 0.1 for i, k in enumerate(REACHY2_VEL.keys(), start=1)})
previous_present_position = {
k: reachy2.reachy.joints[REACHY2_JOINTS[k]].present_position for k in reachy2.joints_dict.keys()
}
returned = reachy2.send_action(action)
if reachy2.config.max_relative_target is None:
assert returned == action
assert reachy2.reachy._goal_position_set_total == len(reachy2.joints_dict)
for motor in reachy2.joints_dict.keys():
expected_pos = action[motor]
real_pos = reachy2.reachy.joints[REACHY2_JOINTS[motor]].goal_position
if reachy2.config.max_relative_target is None:
assert real_pos == expected_pos
else:
assert real_pos == previous_present_position[motor] + np.sign(expected_pos) * min(
abs(expected_pos - real_pos), reachy2.config.max_relative_target
)
if reachy2.config.with_mobile_base:
goal_speed = [i * 0.1 for i, _ in enumerate(REACHY2_VEL.keys(), start=1)]
reachy2.reachy.mobile_base.set_goal_speed.assert_called_once_with(*goal_speed)
if reachy2.config.use_external_commands:
reachy2.reachy.send_goal_positions.assert_not_called()
if reachy2.config.with_mobile_base:
reachy2.reachy.mobile_base.send_speed_command.assert_not_called()
else:
reachy2.reachy.send_goal_positions.assert_called_once()
if reachy2.config.with_mobile_base:
reachy2.reachy.mobile_base.send_speed_command.assert_called_once()
def test_no_part_declared():
with pytest.raises(ValueError):
_ = Reachy2RobotConfig(
ip_address="192.168.0.200",
with_mobile_base=False,
with_l_arm=False,
with_r_arm=False,
with_neck=False,
with_antennas=False,
)

View File

@@ -0,0 +1,150 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from unittest.mock import MagicMock, patch
import pytest
from lerobot.teleoperators.reachy2_teleoperator import (
REACHY2_ANTENNAS_JOINTS,
REACHY2_L_ARM_JOINTS,
REACHY2_NECK_JOINTS,
REACHY2_R_ARM_JOINTS,
REACHY2_VEL,
Reachy2Teleoperator,
Reachy2TeleoperatorConfig,
)
# {lerobot_keys: reachy2_sdk_keys}
REACHY2_JOINTS = {
**REACHY2_NECK_JOINTS,
**REACHY2_ANTENNAS_JOINTS,
**REACHY2_R_ARM_JOINTS,
**REACHY2_L_ARM_JOINTS,
}
PARAMS = [
{}, # default config
{"with_mobile_base": False},
{"with_mobile_base": False, "with_l_arm": False, "with_antennas": False},
{"with_r_arm": False, "with_neck": False, "with_antennas": False},
{"with_mobile_base": False, "with_neck": False},
{"use_present_position": True},
]
def _make_reachy2_sdk_mock():
r = MagicMock(name="ReachySDKMock")
r.is_connected.return_value = True
def _connect():
r.is_connected.return_value = True
def _disconnect():
r.is_connected.return_value = False
# Mock joints with some dummy positions
joints = {
k: MagicMock(
present_position=float(i),
goal_position=float(i) + 0.5,
)
for i, k in enumerate(REACHY2_JOINTS.values())
}
r.joints = joints
# Mock mobile base with some dummy odometry
r.mobile_base = MagicMock()
r.mobile_base.last_cmd_vel = {
"vx": -0.2,
"vy": 0.2,
"vtheta": 11.0,
}
r.mobile_base.odometry = {
"x": 1.0,
"y": 2.0,
"theta": 20.0,
"vx": 0.1,
"vy": -0.1,
"vtheta": 8.0,
}
r.connect = MagicMock(side_effect=_connect)
r.disconnect = MagicMock(side_effect=_disconnect)
return r
@pytest.fixture(params=PARAMS, ids=lambda p: "default" if not p else ",".join(p.keys()))
def reachy2(request):
with (
patch(
"lerobot.teleoperators.reachy2_teleoperator.reachy2_teleoperator.ReachySDK",
side_effect=lambda *a, **k: _make_reachy2_sdk_mock(),
),
):
overrides = request.param
cfg = Reachy2TeleoperatorConfig(ip_address="192.168.0.200", **overrides)
robot = Reachy2Teleoperator(cfg)
yield robot
if robot.is_connected:
robot.disconnect()
def test_connect_disconnect(reachy2):
assert not reachy2.is_connected
reachy2.connect()
assert reachy2.is_connected
reachy2.disconnect()
assert not reachy2.is_connected
reachy2.reachy.disconnect.assert_called_once()
def test_get_action(reachy2):
reachy2.connect()
action = reachy2.get_action()
expected_keys = set(reachy2.joints_dict)
expected_keys.update(f"{v}" for v in REACHY2_VEL.keys() if reachy2.config.with_mobile_base)
assert set(action.keys()) == expected_keys
for motor in reachy2.joints_dict.keys():
if reachy2.config.use_present_position:
assert action[motor] == reachy2.reachy.joints[REACHY2_JOINTS[motor]].present_position
else:
assert action[motor] == reachy2.reachy.joints[REACHY2_JOINTS[motor]].goal_position
if reachy2.config.with_mobile_base:
if reachy2.config.use_present_position:
for vel in REACHY2_VEL.keys():
assert action[vel] == reachy2.reachy.mobile_base.odometry[REACHY2_VEL[vel]]
else:
for vel in REACHY2_VEL.keys():
assert action[vel] == reachy2.reachy.mobile_base.last_cmd_vel[REACHY2_VEL[vel]]
def test_no_part_declared():
with pytest.raises(ValueError):
_ = Reachy2TeleoperatorConfig(
ip_address="192.168.0.200",
with_mobile_base=False,
with_l_arm=False,
with_r_arm=False,
with_neck=False,
with_antennas=False,
)