chore(robots): deprecate strech, vipex and widowx robots (#2205)

This commit is contained in:
Steven Palma
2025-10-17 15:36:19 +02:00
committed by GitHub
parent 44bf283701
commit 2c18210d02
17 changed files with 0 additions and 1293 deletions

View File

@@ -114,11 +114,6 @@ intelrealsense = [
"pyrealsense2-macosx>=2.54,<2.55.0 ; sys_platform == 'darwin'",
]
phone = ["hebi-py>=2.8.0,<2.12.0", "teleop>=0.1.0,<0.2.0"]
# stretch = [
# "hello-robot-stretch-body>=0.7.27 ; sys_platform == 'linux'",
# "pyrender @ git+https://github.com/mmatl/pyrender.git ; sys_platform == 'linux'",
# "pyrealsense2>=2.55.1.6486 ; sys_platform != 'darwin'"
# ] # TODO: Currently not supported
# Policies
pi = ["transformers @ git+https://github.com/huggingface/transformers.git@fix/lerobot_openpi"]

View File

@@ -1,173 +0,0 @@
This tutorial explains how to use [Stretch 3](https://hello-robot.com/stretch-3-product) with LeRobot.
## Setup
Familiarize yourself with Stretch by following its [tutorials](https://docs.hello-robot.com/0.3/getting_started/hello_robot/) (recommended).
To use LeRobot on Stretch, 3 options are available:
- [tethered setup](https://docs.hello-robot.com/0.3/getting_started/connecting_to_stretch/#tethered-setup)
- [untethered setup](https://docs.hello-robot.com/0.3/getting_started/connecting_to_stretch/#untethered-setup)
- ssh directly into Stretch (you will first need to install and configure openssh-server on stretch using one of the two above setups)
## Install LeRobot
On Stretch's CLI, follow these steps:
1. [Install Miniconda](https://docs.anaconda.com/miniconda/#quick-command-line-install):
```bash
mkdir -p ~/miniconda3
wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh -O ~/miniconda3/miniconda.sh
bash ~/miniconda3/miniconda.sh -b -u -p ~/miniconda3
rm ~/miniconda3/miniconda.sh
~/miniconda3/bin/conda init bash
```
2. Comment out these lines in `~/.profile` (this can mess up paths used by conda and ~/.local/bin should already be in your PATH)
```
# set PATH so it includes user's private bin if it exists
if [ -d "$HOME/.local/bin" ] ; then
PATH="$HOME/.local/bin:$PATH"
fi
```
3. Restart shell or `source ~/.bashrc`
4. Create and activate a fresh conda environment for lerobot
```bash
conda create -y -n lerobot python=3.10 && conda activate lerobot
```
5. Clone LeRobot:
```bash
git clone https://github.com/huggingface/lerobot.git ~/lerobot
```
6. When using `miniconda`, install `ffmpeg` in your environment:
```bash
conda install ffmpeg -c conda-forge
```
7. Install LeRobot with stretch dependencies:
```bash
cd ~/lerobot && pip install -e ".[stretch]"
```
> **Note:** If you get this message, you can ignore it: `ERROR: pip's dependency resolver does not currently take into account all the packages that are installed.`
8. Run a [system check](https://docs.hello-robot.com/0.3/getting_started/stretch_hardware_overview/#system-check) to make sure your robot is ready:
```bash
stretch_system_check.py
```
> **Note:** You may need to free the "robot process" after booting Stretch by running `stretch_free_robot_process.py`. For more info this Stretch's [doc](https://docs.hello-robot.com/0.3/getting_started/stretch_hardware_overview/#turning-off-gamepad-teleoperation).
You should get something like this:
```bash
For use with S T R E T C H (R) from Hello Robot Inc.
---------------------------------------------------------------------
Model = Stretch 3
Tool = DexWrist 3 w/ Gripper
Serial Number = stretch-se3-3054
---- Checking Hardware ----
[Pass] Comms are ready
[Pass] Actuators are ready
[Warn] Sensors not ready (IMU AZ = -10.19 out of range -10.1 to -9.5)
[Pass] Battery voltage is 13.6 V
---- Checking Software ----
[Pass] Ubuntu 22.04 is ready
[Pass] All APT pkgs are setup correctly
[Pass] Firmware is up-to-date
[Pass] Python pkgs are up-to-date
[Pass] ROS2 Humble is ready
```
## Teleoperate, record a dataset and run a policy
**Calibrate (Optional)**
Before operating Stretch, you need to [home](https://docs.hello-robot.com/0.3/getting_started/stretch_hardware_overview/#homing) it first. Be mindful about giving Stretch some space as this procedure will move the robot's arm and gripper. Now run this command:
```bash
python lerobot/scripts/control_robot.py \
--robot.type=stretch \
--control.type=calibrate
```
This is equivalent to running `stretch_robot_home.py`
> **Note:** If you run any of the LeRobot scripts below and Stretch is not properly homed, it will automatically home/calibrate first.
**Teleoperate**
Before trying teleoperation, you need to activate the gamepad controller by pressing the middle button. For more info, see Stretch's [doc](https://docs.hello-robot.com/0.3/getting_started/hello_robot/#gamepad-teleoperation).
Now try out teleoperation (see above documentation to learn about the gamepad controls):
> **NOTE:** To visualize the data, enable `--control.display_data=true`. This streams the data using `rerun`.
```bash
python lerobot/scripts/control_robot.py \
--robot.type=stretch \
--control.type=teleoperate
```
This is essentially the same as running `stretch_gamepad_teleop.py`
**Record a dataset**
Once you're familiar with the gamepad controls and after a bit of practice, you can try to record your first dataset with Stretch.
If you want to use the Hugging Face hub features for uploading your dataset and you haven't previously done it, make sure you've logged in using a write-access token, which can be generated from the [Hugging Face settings](https://huggingface.co/settings/tokens):
```bash
huggingface-cli login --token ${HUGGINGFACE_TOKEN} --add-to-git-credential
```
Store your Hugging Face repository name in a variable to run these commands:
```bash
HF_USER=$(huggingface-cli whoami | head -n 1)
echo $HF_USER
```
Record one episode:
```bash
python lerobot/scripts/control_robot.py \
--robot.type=stretch \
--control.type=record \
--control.fps=30 \
--control.single_task="Grasp a lego block and put it in the bin." \
--control.repo_id=${HF_USER}/stretch_test \
--control.tags='["tutorial"]' \
--control.warmup_time_s=5 \
--control.episode_time_s=30 \
--control.reset_time_s=30 \
--control.num_episodes=2 \
--control.push_to_hub=true
```
> **Note:** If you're using ssh to connect to Stretch and run this script, you won't be able to visualize its cameras feed (though they will still be recording). To see the cameras stream, use [tethered](https://docs.hello-robot.com/0.3/getting_started/connecting_to_stretch/#tethered-setup) or [untethered setup](https://docs.hello-robot.com/0.3/getting_started/connecting_to_stretch/#untethered-setup).
**Replay an episode**
Now try to replay this episode (make sure the robot's initial position is the same):
```bash
python lerobot/scripts/control_robot.py \
--robot.type=stretch \
--control.type=replay \
--control.fps=30 \
--control.repo_id=${HF_USER}/stretch_test \
--control.episode=0
```
If you need help, please reach out on Discord in the channel `#stretch3-mobile-arm`.

View File

@@ -1,18 +0,0 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .configuration_stretch3 import Stretch3RobotConfig
from .robot_stretch3 import Stretch3Robot

View File

@@ -1,51 +0,0 @@
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from dataclasses import dataclass, field
from lerobot.cameras import CameraConfig
from lerobot.cameras.opencv import OpenCVCameraConfig
from lerobot.cameras.realsense import RealSenseCameraConfig
from ..config import RobotConfig
@RobotConfig.register_subclass("stretch3")
@dataclass
class Stretch3RobotConfig(RobotConfig):
# cameras
cameras: dict[str, CameraConfig] = field(
default_factory=lambda: {
"navigation": OpenCVCameraConfig(
index_or_path="/dev/hello-nav-head-camera",
fps=10,
width=1280,
height=720,
rotation=-90,
),
"head": RealSenseCameraConfig(
name="Intel RealSense D435I",
fps=30,
width=640,
height=480,
rotation=90,
),
"wrist": RealSenseCameraConfig(
name="Intel RealSense D405",
fps=30,
width=640,
height=480,
),
}
)

View File

@@ -1,180 +0,0 @@
#!/usr/bin/env python
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import time
import numpy as np
from stretch_body.gamepad_teleop import GamePadTeleop
from stretch_body.robot import Robot as StretchAPI
from stretch_body.robot_params import RobotParams
from lerobot.cameras.utils import make_cameras_from_configs
from lerobot.datasets.utils import get_nested_item
from lerobot.utils.constants import OBS_IMAGES, OBS_STATE
from ..robot import Robot
from .configuration_stretch3 import Stretch3RobotConfig
# {lerobot_keys: stretch.api.keys}
STRETCH_MOTORS = {
"head_pan.pos": "head.head_pan.pos",
"head_tilt.pos": "head.head_tilt.pos",
"lift.pos": "lift.pos",
"arm.pos": "arm.pos",
"wrist_pitch.pos": "end_of_arm.wrist_pitch.pos",
"wrist_roll.pos": "end_of_arm.wrist_roll.pos",
"wrist_yaw.pos": "end_of_arm.wrist_yaw.pos",
"gripper.pos": "end_of_arm.stretch_gripper.pos",
"base_x.vel": "base.x_vel",
"base_y.vel": "base.y_vel",
"base_theta.vel": "base.theta_vel",
}
class Stretch3Robot(Robot):
"""[Stretch 3](https://hello-robot.com/stretch-3-product), by Hello Robot."""
config_class = Stretch3RobotConfig
name = "stretch3"
def __init__(self, config: Stretch3RobotConfig):
raise NotImplementedError
super().__init__(config)
self.config = config
self.robot_type = self.config.type
self.api = StretchAPI()
self.cameras = make_cameras_from_configs(config.cameras)
self.is_connected = False
self.logs = {}
self.teleop = None # TODO remove
# TODO(aliberts): test this
RobotParams.set_logging_level("WARNING")
RobotParams.set_logging_formatter("brief_console_formatter")
self.state_keys = None
self.action_keys = None
@property
def observation_features(self) -> dict:
return {
"dtype": "float32",
"shape": (len(STRETCH_MOTORS),),
"names": {"motors": list(STRETCH_MOTORS)},
}
@property
def action_features(self) -> dict:
return self.observation_features
@property
def camera_features(self) -> dict[str, dict]:
cam_ft = {}
for cam_key, cam in self.cameras.items():
cam_ft[cam_key] = {
"shape": (cam.height, cam.width, cam.channels),
"names": ["height", "width", "channels"],
"info": None,
}
return cam_ft
def connect(self) -> None:
self.is_connected = self.api.startup()
if not self.is_connected:
print("Another process is already using Stretch. Try running 'stretch_free_robot_process.py'")
raise ConnectionError()
for cam in self.cameras.values():
cam.connect()
self.is_connected = self.is_connected and cam.is_connected
if not self.is_connected:
print("Could not connect to the cameras, check that all cameras are plugged-in.")
raise ConnectionError()
self.calibrate()
def calibrate(self) -> None:
if not self.api.is_homed():
self.api.home()
def _get_state(self) -> dict:
status = self.api.get_status()
return {k: get_nested_item(status, v, sep=".") for k, v in STRETCH_MOTORS.items()}
def get_observation(self) -> dict[str, np.ndarray]:
obs_dict = {}
# Read Stretch state
before_read_t = time.perf_counter()
state = self._get_state()
self.logs["read_pos_dt_s"] = time.perf_counter() - before_read_t
if self.state_keys is None:
self.state_keys = list(state)
state = np.asarray(list(state.values()))
obs_dict[OBS_STATE] = state
# Capture images from cameras
for cam_key, cam in self.cameras.items():
before_camread_t = time.perf_counter()
obs_dict[f"{OBS_IMAGES}.{cam_key}"] = cam.async_read()
self.logs[f"read_camera_{cam_key}_dt_s"] = cam.logs["delta_timestamp_s"]
self.logs[f"async_read_camera_{cam_key}_dt_s"] = time.perf_counter() - before_camread_t
return obs_dict
def send_action(self, action: np.ndarray) -> np.ndarray:
if not self.is_connected:
raise ConnectionError()
if self.teleop is None:
self.teleop = GamePadTeleop(robot_instance=False)
self.teleop.startup(robot=self)
if self.action_keys is None:
dummy_action = self.teleop.gamepad_controller.get_state()
self.action_keys = list(dummy_action.keys())
action_dict = dict(zip(self.action_keys, action.tolist(), strict=True))
before_write_t = time.perf_counter()
self.teleop.do_motion(state=action_dict, robot=self)
self.push_command()
self.logs["write_pos_dt_s"] = time.perf_counter() - before_write_t
# TODO(aliberts): return action_sent when motion is limited
return action
def teleop_safety_stop(self) -> None:
if self.teleop is not None:
self.teleop._safety_stop(robot=self)
def disconnect(self) -> None:
self.api.stop()
if self.teleop is not None:
self.teleop.gamepad_controller.stop()
self.teleop.stop()
for cam in self.cameras.values():
cam.disconnect()
self.is_connected = False

View File

@@ -40,14 +40,6 @@ def make_robot_from_config(config: RobotConfig) -> Robot:
from .lekiwi import LeKiwi
return LeKiwi(config)
elif config.type == "stretch3":
from .stretch3 import Stretch3Robot
return Stretch3Robot(config)
elif config.type == "viperx":
from .viperx import ViperX
return ViperX(config)
elif config.type == "hope_jr_hand":
from .hope_jr import HopeJrHand

View File

@@ -1,196 +0,0 @@
This tutorial explains how to use [Aloha and Aloha 2 stationary](https://www.trossenrobotics.com/aloha-stationary) with LeRobot.
## Setup
Follow the [documentation from Trossen Robotics](https://docs.trossenrobotics.com/aloha_docs/2.0/getting_started/stationary/hardware_setup.html) for setting up the hardware and plugging the 4 arms and 4 cameras to your computer.
## Install LeRobot
On your computer:
1. [Install Miniconda](https://docs.anaconda.com/miniconda/#quick-command-line-install):
```bash
mkdir -p ~/miniconda3
wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh -O ~/miniconda3/miniconda.sh
bash ~/miniconda3/miniconda.sh -b -u -p ~/miniconda3
rm ~/miniconda3/miniconda.sh
~/miniconda3/bin/conda init bash
```
2. Restart shell or `source ~/.bashrc`
3. Create and activate a fresh conda environment for lerobot
```bash
conda create -y -n lerobot python=3.10 && conda activate lerobot
```
4. Clone LeRobot:
```bash
git clone https://github.com/huggingface/lerobot.git ~/lerobot
```
5. When using `miniconda`, install `ffmpeg` in your environment:
```bash
conda install ffmpeg -c conda-forge
```
6. Install LeRobot with dependencies for the Aloha motors (dynamixel) and cameras (intelrealsense):
```bash
cd ~/lerobot && pip install -e ".[dynamixel, intelrealsense]"
```
## Teleoperate
\*\*/!\ FOR SAFETY, READ THIS /!\*\*
Teleoperation consists in manually operating the leader arms to move the follower arms. Importantly:
1. Make sure your leader arms are in the same position as the follower arms, so that the follower arms don't move too fast to match the leader arms,
2. Our code assumes that your robot has been assembled following Trossen Robotics instructions. This allows us to skip calibration, as we use the pre-defined calibration files in `.cache/calibration/aloha_default`. If you replace a motor, make sure you follow the exact instructions from Trossen Robotics.
By running the following code, you can start your first **SAFE** teleoperation:
> **NOTE:** To visualize the data, enable `--control.display_data=true`. This streams the data using `rerun`.
```bash
python lerobot/scripts/control_robot.py \
--robot.type=aloha \
--robot.max_relative_target=5 \
--control.type=teleoperate
```
By adding `--robot.max_relative_target=5`, we override the default value for `max_relative_target` defined in [`ViperXConfig`](./config_viperx.py). It is expected to be `5` to limit the magnitude of the movement for more safety, but the teleoperation won't be smooth. When you feel confident, you can disable this limit by adding `--robot.max_relative_target=null` to the command line:
```bash
python lerobot/scripts/control_robot.py \
--robot.type=aloha \
--robot.max_relative_target=null \
--control.type=teleoperate
```
## Record a dataset
Once you're familiar with teleoperation, you can record your first dataset with Aloha.
If you want to use the Hugging Face hub features for uploading your dataset and you haven't previously done it, make sure you've logged in using a write-access token, which can be generated from the [Hugging Face settings](https://huggingface.co/settings/tokens):
```bash
huggingface-cli login --token ${HUGGINGFACE_TOKEN} --add-to-git-credential
```
Store your Hugging Face repository name in a variable to run these commands:
```bash
HF_USER=$(huggingface-cli whoami | head -n 1)
echo $HF_USER
```
Record 2 episodes and upload your dataset to the hub:
```bash
python lerobot/scripts/control_robot.py \
--robot.type=aloha \
--robot.max_relative_target=null \
--control.type=record \
--control.fps=30 \
--control.single_task="Grasp a lego block and put it in the bin." \
--control.repo_id=${HF_USER}/aloha_test \
--control.tags='["tutorial"]' \
--control.warmup_time_s=5 \
--control.episode_time_s=30 \
--control.reset_time_s=30 \
--control.num_episodes=2 \
--control.push_to_hub=true
```
## Visualize a dataset
If you uploaded your dataset to the hub with `--control.push_to_hub=true`, you can [visualize your dataset online](https://huggingface.co/spaces/lerobot/visualize_dataset) by copy pasting your repo id given by:
```bash
echo ${HF_USER}/aloha_test
```
If you didn't upload with `--control.push_to_hub=false`, you can also visualize it locally with [Rerun](https://github.com/rerun-io/rerun):
```bash
lerobot-dataset-viz \
--repo-id ${HF_USER}/aloha_test --episode 0
```
## Replay an episode
\*\*/!\ FOR SAFETY, READ THIS /!\*\*
Replay consists in automatically replaying the sequence of actions (i.e. goal positions for your motors) recorded in a given dataset episode. Make sure the current initial position of your robot is similar to the one in your episode, so that your follower arms don't move too fast to go to the first goal positions. For safety, you might want to add `--robot.max_relative_target=5` to your command line as explained above.
Now try to replay the first episode on your robot:
```bash
python lerobot/scripts/control_robot.py \
--robot.type=aloha \
--robot.max_relative_target=null \
--control.type=replay \
--control.fps=30 \
--control.repo_id=${HF_USER}/aloha_test \
--control.episode=0
```
## Train a policy
To train a policy to control your robot, use the [`lerobot-train`](../src/lerobot/scripts/train.py) script. A few arguments are required. Here is an example command:
```bash
lerobot-train \
--dataset.repo_id=${HF_USER}/aloha_test \
--policy.type=act \
--output_dir=outputs/train/act_aloha_test \
--job_name=act_aloha_test \
--policy.device=cuda \
--wandb.enable=true
```
Let's explain it:
1. We provided the dataset as argument with `--dataset.repo_id=${HF_USER}/aloha_test`.
2. We provided the policy with `policy.type=act`. This loads configurations from [`configuration_act.py`](../src/lerobot/policies/act/configuration_act.py). Importantly, this policy will automatically adapt to the number of motor states, motor actions and cameras of your robot (e.g. `laptop` and `phone`) which have been saved in your dataset.
3. We provided `policy.device=cuda` since we are training on a Nvidia GPU, but you could use `policy.device=mps` to train on Apple silicon.
4. We provided `wandb.enable=true` to use [Weights and Biases](https://docs.wandb.ai/quickstart) for visualizing training plots. This is optional but if you use it, make sure you are logged in by running `wandb login`.
For more information on the `train` script see the previous tutorial: [`examples/4_train_policy_with_script.md`](../examples/4_train_policy_with_script.md)
Training should take several hours. You will find checkpoints in `outputs/train/act_aloha_test/checkpoints`.
## Evaluate your policy
You can use the `record` function from [`lerobot/scripts/control_robot.py`](../src/lerobot/scripts/control_robot.py) but with a policy checkpoint as input. For instance, run this command to record 10 evaluation episodes:
```bash
python lerobot/scripts/control_robot.py \
--robot.type=aloha \
--control.type=record \
--control.fps=30 \
--control.single_task="Grasp a lego block and put it in the bin." \
--control.repo_id=${HF_USER}/eval_act_aloha_test \
--control.tags='["tutorial"]' \
--control.warmup_time_s=5 \
--control.episode_time_s=30 \
--control.reset_time_s=30 \
--control.num_episodes=10 \
--control.push_to_hub=true \
--control.policy.path=outputs/train/act_aloha_test/checkpoints/last/pretrained_model \
--control.num_image_writer_processes=1
```
As you can see, it's almost the same command as previously used to record your training dataset. Two things changed:
1. There is an additional `--control.policy.path` argument which indicates the path to your policy checkpoint with (e.g. `outputs/train/eval_act_aloha_test/checkpoints/last/pretrained_model`). You can also use the model repository if you uploaded a model checkpoint to the hub (e.g. `${HF_USER}/act_aloha_test`).
2. The name of dataset begins by `eval` to reflect that you are running inference (e.g. `${HF_USER}/eval_act_aloha_test`).
3. We use `--control.num_image_writer_processes=1` instead of the default value (`0`). On our computer, using a dedicated process to write images from the 4 cameras on disk allows to reach constant 30 fps during inference. Feel free to explore different values for `--control.num_image_writer_processes`.
## More
If you have any question or need help, please reach out on Discord in the channel `#aloha-arm`.

View File

@@ -1,18 +0,0 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .config_viperx import ViperXConfig
from .viperx import ViperX

View File

@@ -1,45 +0,0 @@
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from dataclasses import dataclass, field
from lerobot.cameras import CameraConfig
from ..config import RobotConfig
@RobotConfig.register_subclass("viperx")
@dataclass
class ViperXConfig(RobotConfig):
port: str # Port to connect to the arm
disable_torque_on_disconnect: bool = True
# /!\ FOR SAFETY, READ THIS /!\
# `max_relative_target` limits the magnitude of the relative positional target vector for safety purposes.
# Set this to a positive scalar to have the same value for all motors, or a dictionary that maps motor
# names to the max_relative_target value for that motor.
# For Aloha, for every goal position request, motor rotations are capped at 5 degrees by default.
# When you feel more confident with teleoperation or running the policy, you can extend
# this safety limit and even removing it by setting it to `null`.
# Also, everything is expected to work safely out-of-the-box, but we highly advise to
# first try to teleoperate the grippers only (by commenting out the rest of the motors in this yaml),
# then to gradually add more motors (by uncommenting), until you can teleoperate both arms fully
max_relative_target: float | dict[str, float] = 5.0
# cameras
cameras: dict[str, CameraConfig] = field(default_factory=dict)
# Troubleshooting: If one of your IntelRealSense cameras freeze during
# data recording due to bandwidth limit, you might need to plug the camera
# on another USB hub or PCIe card.

View File

@@ -1,233 +0,0 @@
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import time
from functools import cached_property
from typing import Any
from lerobot.cameras.utils import make_cameras_from_configs
from lerobot.motors import Motor, MotorCalibration, MotorNormMode
from lerobot.motors.dynamixel import (
DynamixelMotorsBus,
OperatingMode,
)
from lerobot.utils.constants import OBS_STATE
from lerobot.utils.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
from ..robot import Robot
from ..utils import ensure_safe_goal_position
from .config_viperx import ViperXConfig
logger = logging.getLogger(__name__)
class ViperX(Robot):
"""
[ViperX](https://www.trossenrobotics.com/viperx-300) developed by Trossen Robotics
"""
config_class = ViperXConfig
name = "viperx"
def __init__(
self,
config: ViperXConfig,
):
raise NotImplementedError
super().__init__(config)
self.config = config
self.bus = DynamixelMotorsBus(
port=self.config.port,
motors={
"waist": Motor(1, "xm540-w270", MotorNormMode.RANGE_M100_100),
"shoulder": Motor(2, "xm540-w270", MotorNormMode.RANGE_M100_100),
"shoulder_shadow": Motor(3, "xm540-w270", MotorNormMode.RANGE_M100_100),
"elbow": Motor(4, "xm540-w270", MotorNormMode.RANGE_M100_100),
"elbow_shadow": Motor(5, "xm540-w270", MotorNormMode.RANGE_M100_100),
"forearm_roll": Motor(6, "xm540-w270", MotorNormMode.RANGE_M100_100),
"wrist_angle": Motor(7, "xm540-w270", MotorNormMode.RANGE_M100_100),
"wrist_rotate": Motor(8, "xm430-w350", MotorNormMode.RANGE_M100_100),
"gripper": Motor(9, "xm430-w350", MotorNormMode.RANGE_0_100),
},
)
self.cameras = make_cameras_from_configs(config.cameras)
@property
def _motors_ft(self) -> dict[str, type]:
return {f"{motor}.pos": float for motor in self.bus.motors}
@property
def _cameras_ft(self) -> dict[str, tuple]:
return {
cam: (self.config.cameras[cam].height, self.config.cameras[cam].width, 3) for cam in self.cameras
}
@cached_property
def observation_features(self) -> dict[str, type | tuple]:
return {**self._motors_ft, **self._cameras_ft}
@cached_property
def action_features(self) -> dict[str, type]:
return self._motors_ft
@property
def is_connected(self) -> bool:
return self.bus.is_connected and all(cam.is_connected for cam in self.cameras.values())
def connect(self, calibrate: bool = True) -> None:
"""
We assume that at connection time, arm is in a rest position,
and torque can be safely disabled to run calibration.
"""
if self.is_connected:
raise DeviceAlreadyConnectedError(f"{self} already connected")
self.bus.connect()
if not self.is_calibrated and calibrate:
self.calibrate()
for cam in self.cameras.values():
cam.connect()
self.configure()
logger.info(f"{self} connected.")
@property
def is_calibrated(self) -> bool:
return self.bus.is_calibrated
def calibrate(self) -> None:
raise NotImplementedError # TODO(aliberts): adapt code below (copied from koch
logger.info(f"\nRunning calibration of {self}")
self.bus.disable_torque()
for motor in self.bus.motors:
self.bus.write("Operating_Mode", motor, OperatingMode.EXTENDED_POSITION.value)
input("Move robot to the middle of its range of motion and press ENTER....")
homing_offsets = self.bus.set_half_turn_homings()
full_turn_motors = ["shoulder_pan", "wrist_roll"]
unknown_range_motors = [motor for motor in self.bus.motors if motor not in full_turn_motors]
print(
f"Move all joints except {full_turn_motors} sequentially through their entire "
"ranges of motion.\nRecording positions. Press ENTER to stop..."
)
range_mins, range_maxes = self.bus.record_ranges_of_motion(unknown_range_motors)
for motor in full_turn_motors:
range_mins[motor] = 0
range_maxes[motor] = 4095
self.calibration = {}
for motor, m in self.bus.motors.items():
self.calibration[motor] = MotorCalibration(
id=m.id,
drive_mode=0,
homing_offset=homing_offsets[motor],
range_min=range_mins[motor],
range_max=range_maxes[motor],
)
self.bus.write_calibration(self.calibration)
self._save_calibration()
logger.info(f"Calibration saved to {self.calibration_fpath}")
def configure(self) -> None:
with self.bus.torque_disabled():
self.bus.configure_motors()
# Set secondary/shadow ID for shoulder and elbow. These joints have two motors.
# As a result, if only one of them is required to move to a certain position,
# the other will follow. This is to avoid breaking the motors.
self.bus.write("Secondary_ID", "shoulder_shadow", 2)
self.bus.write("Secondary_ID", "elbow_shadow", 4)
# Set a velocity limit of 131 as advised by Trossen Robotics
# TODO(aliberts): remove as it's actually useless in position control
self.bus.write("Velocity_Limit", 131)
# Use 'extended position mode' for all motors except gripper, because in joint mode the servos
# can't rotate more than 360 degrees (from 0 to 4095) And some mistake can happen while assembling
# the arm, you could end up with a servo with a position 0 or 4095 at a crucial point.
# See: https://emanual.robotis.com/docs/en/dxl/x/x_series/#operating-mode11
for motor in self.bus.motors:
if motor != "gripper":
self.bus.write("Operating_Mode", motor, OperatingMode.EXTENDED_POSITION.value)
# Use 'position control current based' for follower gripper to be limited by the limit of the
# current. It can grasp an object without forcing too much even tho, it's goal position is a
# complete grasp (both gripper fingers are ordered to join and reach a touch).
self.bus.write("Operating_Mode", "gripper", OperatingMode.CURRENT_POSITION.value)
def get_observation(self) -> dict[str, Any]:
"""The returned observations do not have a batch dimension."""
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
obs_dict = {}
# Read arm position
start = time.perf_counter()
obs_dict[OBS_STATE] = self.bus.sync_read("Present_Position")
obs_dict = {f"{motor}.pos": val for motor, val in obs_dict.items()}
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read state: {dt_ms:.1f}ms")
# Capture images from cameras
for cam_key, cam in self.cameras.items():
start = time.perf_counter()
obs_dict[cam_key] = cam.async_read()
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms")
return obs_dict
def send_action(self, action: dict[str, float]) -> dict[str, float]:
"""Command arm to move to a target joint configuration.
The relative action magnitude may be clipped depending on the configuration parameter
`max_relative_target`. In this case, the action sent differs from original action.
Thus, this function always returns the action actually sent.
Args:
action (dict[str, float]): The goal positions for the motors.
Returns:
dict[str, float]: The action sent to the motors, potentially clipped.
"""
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
goal_pos = {key.removesuffix(".pos"): val for key, val in action.items() if key.endswith(".pos")}
# Cap goal position when too far away from present position.
# /!\ Slower fps expected due to reading from the follower.
if self.config.max_relative_target is not None:
present_pos = self.bus.sync_read("Present_Position")
goal_present_pos = {key: (g_pos, present_pos[key]) for key, g_pos in goal_pos.items()}
goal_pos = ensure_safe_goal_position(goal_present_pos, self.config.max_relative_target)
# Send goal position to the arm
self.bus.sync_write("Goal_Position", goal_pos)
return {f"{motor}.pos": val for motor, val in goal_pos.items()}
def disconnect(self):
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
self.bus.disconnect(self.config.disable_torque_on_disconnect)
for cam in self.cameras.values():
cam.disconnect()
logger.info(f"{self} disconnected.")

View File

@@ -1,18 +0,0 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .configuration_stretch3 import Stretch3GamePadConfig
from .stretch3_gamepad import Stretch3GamePad

View File

@@ -1,25 +0,0 @@
#!/usr/bin/env python
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from dataclasses import dataclass
from ..config import TeleoperatorConfig
@TeleoperatorConfig.register_subclass("stretch3")
@dataclass
class Stretch3GamePadConfig(TeleoperatorConfig):
"""Stretch3GamePadConfig"""

View File

@@ -1,117 +0,0 @@
#!/usr/bin/env python
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import time
import numpy as np
from stretch_body.gamepad_teleop import GamePadTeleop
from stretch_body.robot_params import RobotParams
from lerobot.utils.errors import DeviceAlreadyConnectedError
from ..teleoperator import Teleoperator
from .configuration_stretch3 import Stretch3GamePadConfig
# from stretch_body.gamepad_controller.GamePadController
GAMEPAD_BUTTONS = [
"middle_led_ring_button_pressed",
"left_stick_x",
"left_stick_y",
"right_stick_x",
"right_stick_y",
"left_stick_button_pressed",
"right_stick_button_pressed",
"bottom_button_pressed",
"top_button_pressed",
"left_button_pressed",
"right_button_pressed",
"left_shoulder_button_pressed",
"right_shoulder_button_pressed",
"select_button_pressed",
"start_button_pressed",
"left_trigger_pulled",
"right_trigger_pulled",
"bottom_pad_pressed",
"top_pad_pressed",
"left_pad_pressed",
"right_pad_pressed",
]
class Stretch3GamePad(Teleoperator):
"""[Stretch 3](https://hello-robot.com/stretch-3-product), by Hello Robot."""
config_class = Stretch3GamePadConfig
name = "stretch3"
def __init__(self, config: Stretch3GamePadConfig):
raise NotImplementedError
super().__init__(config)
self.config = config
self.robot_type = self.config.type
self.api = GamePadTeleop(robot_instance=False)
self.is_connected = False
self.logs = {}
# TODO(aliberts): test this
RobotParams.set_logging_level("WARNING")
RobotParams.set_logging_formatter("brief_console_formatter")
@property
def action_features(self) -> dict:
return {
"dtype": "float32",
"shape": (len(GAMEPAD_BUTTONS),),
"names": {"buttons": GAMEPAD_BUTTONS},
}
@property
def feedback_features(self) -> dict:
return {}
def connect(self) -> None:
if self.is_connected:
raise DeviceAlreadyConnectedError(
"ManipulatorRobot is already connected. Do not run `robot.connect()` twice."
)
self.api.startup()
self.api._update_state() # Check controller can be read & written
self.api._update_modes()
self.is_connected = True
def calibrate(self) -> None:
pass
def get_action(self) -> np.ndarray:
# Read Stretch state
before_read_t = time.perf_counter()
action = self.api.gamepad_controller.get_state()
self.logs["read_pos_dt_s"] = time.perf_counter() - before_read_t
action = np.asarray(list(action.values()))
return action
def send_feedback(self, feedback: np.ndarray) -> None:
pass
def disconnect(self) -> None:
self.api.stop()
self.is_connected = False

View File

@@ -49,14 +49,6 @@ def make_teleoperator_from_config(config: TeleoperatorConfig) -> Teleoperator:
from .so101_leader import SO101Leader
return SO101Leader(config)
elif config.type == "stretch3":
from .stretch3_gamepad import Stretch3GamePad
return Stretch3GamePad(config)
elif config.type == "widowx":
from .widowx import WidowX
return WidowX(config)
elif config.type == "mock_teleop":
from tests.mocks.mock_teleop import MockTeleop

View File

@@ -1,18 +0,0 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .config_widowx import WidowXConfig
from .widowx import WidowX

View File

@@ -1,25 +0,0 @@
#!/usr/bin/env python
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from dataclasses import dataclass
from ..config import TeleoperatorConfig
@TeleoperatorConfig.register_subclass("widowx")
@dataclass
class WidowXConfig(TeleoperatorConfig):
port: str # Port to connect to the arm

View File

@@ -1,155 +0,0 @@
#!/usr/bin/env python
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import time
from lerobot.motors import Motor, MotorCalibration, MotorNormMode
from lerobot.motors.dynamixel import (
DriveMode,
DynamixelMotorsBus,
OperatingMode,
)
from lerobot.utils.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
from ..teleoperator import Teleoperator
from .config_widowx import WidowXConfig
logger = logging.getLogger(__name__)
class WidowX(Teleoperator):
"""
[WidowX](https://www.trossenrobotics.com/widowx-250) developed by Trossen Robotics
"""
config_class = WidowXConfig
name = "widowx"
def __init__(self, config: WidowXConfig):
raise NotImplementedError
super().__init__(config)
self.config = config
self.bus = DynamixelMotorsBus(
port=self.config.port,
motors={
"waist": Motor(1, "xm430-w350", MotorNormMode.RANGE_M100_100),
"shoulder": Motor(2, "xm430-w350", MotorNormMode.RANGE_M100_100),
"shoulder_shadow": Motor(3, "xm430-w350", MotorNormMode.RANGE_M100_100),
"elbow": Motor(4, "xm430-w350", MotorNormMode.RANGE_M100_100),
"elbow_shadow": Motor(5, "xm430-w350", MotorNormMode.RANGE_M100_100),
"forearm_roll": Motor(6, "xm430-w350", MotorNormMode.RANGE_M100_100),
"wrist_angle": Motor(7, "xm430-w350", MotorNormMode.RANGE_M100_100),
"wrist_rotate": Motor(8, "xl430-w250", MotorNormMode.RANGE_M100_100),
"gripper": Motor(9, "xc430-w150", MotorNormMode.RANGE_0_100),
},
)
@property
def action_features(self) -> dict[str, type]:
return {f"{motor}.pos": float for motor in self.bus.motors}
@property
def feedback_features(self) -> dict[str, type]:
return {}
@property
def is_connected(self) -> bool:
return self.bus.is_connected
def connect(self, calibrate: bool = True):
if self.is_connected:
raise DeviceAlreadyConnectedError(f"{self} already connected")
self.bus.connect()
if not self.is_calibrated and calibrate:
self.calibrate()
self.configure()
logger.info(f"{self} connected.")
@property
def is_calibrated(self) -> bool:
return self.bus.is_calibrated
def calibrate(self) -> None:
raise NotImplementedError # TODO(aliberts): adapt code below (copied from koch)
logger.info(f"\nRunning calibration of {self}")
self.bus.disable_torque()
for motor in self.bus.motors:
self.bus.write("Operating_Mode", motor, OperatingMode.EXTENDED_POSITION.value)
self.bus.write("Drive_Mode", "elbow_flex", DriveMode.INVERTED.value)
drive_modes = {motor: 1 if motor == "elbow_flex" else 0 for motor in self.bus.motors}
input("Move robot to the middle of its range of motion and press ENTER....")
homing_offsets = self.bus.set_half_turn_homings()
full_turn_motors = ["shoulder_pan", "wrist_roll"]
unknown_range_motors = [motor for motor in self.bus.motors if motor not in full_turn_motors]
print(
f"Move all joints except {full_turn_motors} sequentially through their "
"entire ranges of motion.\nRecording positions. Press ENTER to stop..."
)
range_mins, range_maxes = self.bus.record_ranges_of_motion(unknown_range_motors)
for motor in full_turn_motors:
range_mins[motor] = 0
range_maxes[motor] = 4095
self.calibration = {}
for motor, m in self.bus.motors.items():
self.calibration[motor] = MotorCalibration(
id=m.id,
drive_mode=drive_modes[motor],
homing_offset=homing_offsets[motor],
range_min=range_mins[motor],
range_max=range_maxes[motor],
)
self.bus.write_calibration(self.calibration)
self._save_calibration()
logger.info(f"Calibration saved to {self.calibration_fpath}")
def configure(self) -> None:
self.bus.disable_torque()
self.bus.configure_motors()
# Set secondary/shadow ID for shoulder and elbow. These joints have two motors.
# As a result, if only one of them is required to move to a certain position,
# the other will follow. This is to avoid breaking the motors.
self.bus.write("Secondary_ID", "shoulder_shadow", 2)
self.bus.write("Secondary_ID", "elbow_shadow", 4)
def get_action(self) -> dict[str, float]:
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
start = time.perf_counter()
action = self.bus.sync_read("Present_Position")
action = {f"{motor}.pos": val for motor, val in action.items()}
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read action: {dt_ms:.1f}ms")
return action
def send_feedback(self, feedback: dict[str, float]) -> None:
raise NotImplementedError
def disconnect(self) -> None:
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
self.bus.disconnect()
logger.info(f"{self} disconnected.")