From 87a8cb6d8950f96e49fb9fc0b0eeef80b7fd891f Mon Sep 17 00:00:00 2001 From: Simon Alibert Date: Thu, 8 May 2025 12:43:47 +0200 Subject: [PATCH] Update robot features & naming --- lerobot/common/robots/__init__.py | 3 +- lerobot/common/robots/koch_follower/README.md | 330 ++++++++++++++++++ .../{koch => koch_follower}/__init__.py | 0 .../config_koch_follower.py | 0 .../{koch => koch_follower}/koch_follower.py | 39 +-- lerobot/common/robots/moss/__init__.py | 4 - lerobot/common/robots/moss/robot_moss.py | 223 ------------ .../robots/{moss => moss_follower}/README.md | 0 .../common/robots/moss_follower/__init__.py | 2 + .../configuration_moss.py | 0 .../robots/moss_follower/moss_follower.py | 215 ++++++++++++ lerobot/common/robots/robot.py | 8 +- .../{so100 => so100_follower}/README.md | 0 .../{so100 => so100_follower}/__init__.py | 0 .../config_so100_follower.py | 0 .../so100_follower.py | 42 +-- lerobot/common/robots/stretch3/__init__.py | 2 + .../common/robots/stretch3/robot_stretch3.py | 6 +- lerobot/common/robots/utils.py | 66 ++-- lerobot/common/robots/viperx/viperx.py | 40 +-- 20 files changed, 639 insertions(+), 341 deletions(-) create mode 100644 lerobot/common/robots/koch_follower/README.md rename lerobot/common/robots/{koch => koch_follower}/__init__.py (100%) rename lerobot/common/robots/{koch => koch_follower}/config_koch_follower.py (100%) rename lerobot/common/robots/{koch => koch_follower}/koch_follower.py (90%) delete mode 100644 lerobot/common/robots/moss/__init__.py delete mode 100644 lerobot/common/robots/moss/robot_moss.py rename lerobot/common/robots/{moss => moss_follower}/README.md (100%) create mode 100644 lerobot/common/robots/moss_follower/__init__.py rename lerobot/common/robots/{moss => moss_follower}/configuration_moss.py (100%) create mode 100644 lerobot/common/robots/moss_follower/moss_follower.py rename lerobot/common/robots/{so100 => so100_follower}/README.md (100%) rename lerobot/common/robots/{so100 => so100_follower}/__init__.py (100%) rename lerobot/common/robots/{so100 => so100_follower}/config_so100_follower.py (100%) rename lerobot/common/robots/{so100 => so100_follower}/so100_follower.py (89%) create mode 100644 lerobot/common/robots/stretch3/__init__.py diff --git a/lerobot/common/robots/__init__.py b/lerobot/common/robots/__init__.py index 58b868ed..d8fd0de9 100644 --- a/lerobot/common/robots/__init__.py +++ b/lerobot/common/robots/__init__.py @@ -1,4 +1,3 @@ from .config import RobotConfig from .robot import Robot - -__all__ = ["RobotConfig", "Robot"] +from .utils import make_robot_from_config diff --git a/lerobot/common/robots/koch_follower/README.md b/lerobot/common/robots/koch_follower/README.md new file mode 100644 index 00000000..f802106d --- /dev/null +++ b/lerobot/common/robots/koch_follower/README.md @@ -0,0 +1,330 @@ +# Using the [Koch v1.1](https://github.com/jess-moss/koch-v1-1) with LeRobot + +## Table of Contents + + - [A. Order and Assemble the parts](#a-order-and-assemble-the-parts) + - [B. Install LeRobot](#b-install-lerobot) + - [C. Configure the Motors](#c-configure-the-motors) + - [D. Calibrate](#d-calibrate) + - [E. Teleoperate](#e-teleoperate) + - [F. Record a dataset](#f-record-a-dataset) + - [G. Visualize a dataset](#g-visualize-a-dataset) + - [H. Replay an episode](#h-replay-an-episode) + - [I. Train a policy](#i-train-a-policy) + - [J. Evaluate your policy](#j-evaluate-your-policy) + - [K. More Information](#k-more-information) + +## A. Order and Assemble the parts + +Follow the sourcing and assembling instructions provided on the [Koch v1.1 Github page](https://github.com/jess-moss/koch-v1-1). This will guide you through setting up both the follower and leader arms, as shown in the image below. + +
+ Koch v1.1 leader and follower arms +
+ +For a visual walkthrough of the assembly process, you can refer to [this video tutorial](https://youtu.be/8nQIg9BwwTk). + +> [!IMPORTANT] +> Since the production of this video, we simplified the configuration phase (detailed in [section C](#c-configure-the-motors)) of the motors. +> Because of this, two things differ from the instructions in that video: +> - Don't plug all the motors cables right away and wait for being instructed to do so in [section C](#c-configure-the-motors). +> - Don't screw in the controller board (PCB) to the base right away and wait for being instructed to do so in [section C](#c-configure-the-motors). + + +## B. Install LeRobot + +> [!TIP] +> We use the Command Prompt (cmd) quite a lot. If you are not comfortable using the cmd or want to brush up using the command line you can have a look here: [Command line crash course](https://developer.mozilla.org/en-US/docs/Learn_web_development/Getting_started/Environment_setup/Command_line) + +Follow instructions on our [README](https://github.com/huggingface/lerobot) to install LeRobot. + +In addition to these instructions, you need to install the dynamixel sdk: +```bash +pip install -e ".[dynamixel]" +``` + +## C. Configure the motors + +### 1. Find the USB ports associated to each arm + +For each controller board (Waveshare Serial Bus Servo Driver Board, one for the leader arm and one for the follower), connect it first to your computer through usb. To then find the internal port its connected to -which we will need later on- run the utility script: +```bash +python -m lerobot.find_port +``` + +> [!NOTE] +> Note: On Linux, you might need to give access to the USB ports by running: +> ```bash +> sudo chmod 666 /dev/ttyACM0 +> sudo chmod 666 /dev/ttyACM1 +> ``` + +This will first display all currently available ports on your computer. As prompted by the script, unplug the controller board usb cable from your computer. The script will then detect which port has been disconnected and will display it. + + +Example output when identifying the leader arm's port (e.g., `/dev/tty.usbmodem575E0031751` on Mac, or possibly `/dev/ttyACM0` on Linux): +``` +Finding all available ports for the MotorBus. +['/dev/tty.usbmodem575E0032081', '/dev/tty.usbmodem575E0031751'] +Remove the usb cable from your MotorsBus and press Enter when done. + +[...Disconnect leader arm and press Enter...] + +The port of this MotorsBus is /dev/tty.usbmodem575E0031751 +Reconnect the usb cable. +``` + +You can now reconnect the usb cable to your computer. + +### 2. Set the motors ids and baudrate + +Each motor is identified by a unique id on the bus. When brand new, motors usually come with a default id of `1`. For the communication to work properly between the motors and the controller, we first need to set a unique, different id to each motor. Additionally, the speed at which data is transmitted on the bus is determined by the baudrate. In order to talk to each other, the controller and all the motors need to be configured with the same baudrate. + +To that end, we first need to connect to each motor individually with the controller in order to set these. Since we will write these parameters in the non-volatile section of the motors' internal memory (EEPROM), we'll only need to do this once. + +> [!NOTE] +> Note: If you are repurposing motors from another robot, you will probably also need to perform this step as the ids and baudrate likely won't match. + +Connect the usb cable from your computer and the 5V power supply to the leader arm's controller board. Then, run the following command with the port you got from the previous step. You'll also need to give your leader arm a name with the `id` parameter. + +```bash +python -m lerobot.setup_motors \ + --device.type=so100_leader \ + --device.port=/dev/tty.usbmodem575E0031751 \ # <- paste here the port found at previous step + --device.id=my_awesome_leader_arm # <- give it a nice, unique name +``` + +Note that the command above is equivalent to running the following script: +
+Setup script + + ```python + from lerobot.common.teleoperators.koch import KochLeader, KochLeaderConfig + + config = KochLeaderConfig( + port="/dev/tty.usbmodem575E0031751", + id="my_awesome_leader_arm", + ) + leader = KochLeader(config) + leader.setup_motors() + ``` +
+ + +You should see the following instruction +``` +Connect the controller board to the 'gripper' motor only and press enter. +``` + +As instructed, plug the gripper's motor. Make sure it's the only motor connected to the board, and that the motor itself is not yet daisy chained to any other motor. As you press `[Enter]`, the script will automatically set the id and baudrate for that motor. + + +
+Troubleshooting + + If you get an error at that point, check your cables and make sure they are plugged-in properly: + - Power supply + - USB cable between from your computer to the controller board + - The 3-pin cable from the controller board to the motor. + + If you are using a Waveshare controller board, make sure that the two jumpers are set on the `B` channel (USB). +
+ +You should then see the following message: +``` +'gripper' motor id set to 6 +``` + +Followed by the next instruction: +``` +Connect the controller board to the 'wrist_roll' motor only and press enter. +``` + +You can disconnect the 3-pin cable from the controller board but you can leave it connected to the gripper motor on the other end as it will already be in the right place. Now, plug-in another 3-pin cable to the wrist roll motor and connect it to the controller board. As with the previous motor, make sure it is the only motor connected to the board and that the motor itself isn't connected to any other one. + +Repeat the operation for each motor as instructed. + +> [!TIP] +> Check your cabling at each step before pressing Enter. For instance, the power supply cable is not solidly anchored to the board and might disconnect easily as you manipulate the board. + +When you are done, the script will simply finish, at which point the motors are ready to be used. You can now plug the 3-pin cable from each motor to the next one, and the cable from the first motor (the 'shoulder pan' with id=1) to the controller board, which can now be attached to the base of the arm. + +## D. Calibrate + +Next, you'll need to calibrate your SO-100 robot to ensure that the leader and follower arms have the same position values when they are in the same physical position. This calibration is essential because it allows a neural network trained on one SO-100 robot to work on another. + +#### a. Manual calibration of follower arm + +> [!IMPORTANT] +> Contrarily to step 6 of the [assembly video](https://youtu.be/FioA2oeFZ5I?t=724) which illustrates the auto calibration, we will actually do manual calibration of follower for now. + +You will need to move the follower arm to these positions sequentially: + +| 1. Zero position | 2. Rotated position | 3. Rest position | +| ------------------------------------------------------------------------------------------------------------------------------------------------------------ | --------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------ | +| SO-100 follower arm zero position | SO-100 follower arm rotated position | SO-100 follower arm rest position | + +Make sure both arms are connected and run this script to launch manual calibration: +```bash +python lerobot/scripts/control_robot.py \ + --robot.type=so100 \ + --robot.cameras='{}' \ + --control.type=calibrate \ + --control.arms='["main_follower"]' +``` + +#### b. Manual calibration of leader arm +Follow step 6 of the [assembly video](https://youtu.be/FioA2oeFZ5I?t=724) which illustrates the manual calibration. You will need to move the leader arm to these positions sequentially: + +| 1. Zero position | 2. Rotated position | 3. Rest position | +| ------------------------------------------------------------------------------------------------------------------------------------------------------ | --------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------ | +| SO-100 leader arm zero position | SO-100 leader arm rotated position | SO-100 leader arm rest position | + +Run this script to launch manual calibration: +```bash +python lerobot/scripts/control_robot.py \ + --robot.type=so100 \ + --robot.cameras='{}' \ + --control.type=calibrate \ + --control.arms='["main_leader"]' +``` + +## E. Teleoperate + +**Simple teleop** +Then you are ready to teleoperate your robot! Run this simple script (it won't connect and display the cameras): +```bash +python lerobot/scripts/control_robot.py \ + --robot.type=so100 \ + --robot.cameras='{}' \ + --control.type=teleoperate +``` + + +#### a. Teleop with displaying cameras +Follow [this guide to setup your cameras](https://github.com/huggingface/lerobot/blob/main/examples/7_get_started_with_real_robot.md#c-add-your-cameras-with-opencvcamera). Then you will be able to display the cameras on your computer while you are teleoperating by running the following code. This is useful to prepare your setup before recording your first dataset. + +> **NOTE:** To visualize the data, enable `--control.display_data=true`. This streams the data using `rerun`. + +```bash +python lerobot/scripts/control_robot.py \ + --robot.type=so100 \ + --control.type=teleoperate +``` + +## F. Record a dataset + +Once you're familiar with teleoperation, you can record your first dataset with SO-100. + +If you want to use the Hugging Face hub features for uploading your dataset and you haven't previously done it, make sure you've logged in using a write-access token, which can be generated from the [Hugging Face settings](https://huggingface.co/settings/tokens): +```bash +huggingface-cli login --token ${HUGGINGFACE_TOKEN} --add-to-git-credential +``` + +Store your Hugging Face repository name in a variable to run these commands: +```bash +HF_USER=$(huggingface-cli whoami | head -n 1) +echo $HF_USER +``` + +Record 2 episodes and upload your dataset to the hub: +```bash +python lerobot/scripts/control_robot.py \ + --robot.type=so100 \ + --control.type=record \ + --control.fps=30 \ + --control.single_task="Grasp a lego block and put it in the bin." \ + --control.repo_id=${HF_USER}/so100_test \ + --control.tags='["so100","tutorial"]' \ + --control.warmup_time_s=5 \ + --control.episode_time_s=30 \ + --control.reset_time_s=30 \ + --control.num_episodes=2 \ + --control.push_to_hub=true +``` + +Note: You can resume recording by adding `--control.resume=true`. + +## G. Visualize a dataset + +If you uploaded your dataset to the hub with `--control.push_to_hub=true`, you can [visualize your dataset online](https://huggingface.co/spaces/lerobot/visualize_dataset) by copy pasting your repo id given by: +```bash +echo ${HF_USER}/so100_test +``` + +If you didn't upload with `--control.push_to_hub=false`, you can also visualize it locally with (a window can be opened in the browser `http://127.0.0.1:9090` with the visualization tool): +```bash +python lerobot/scripts/visualize_dataset_html.py \ + --repo-id ${HF_USER}/so100_test \ + --local-files-only 1 +``` + +## H. Replay an episode + +Now try to replay the first episode on your robot: +```bash +python lerobot/scripts/control_robot.py \ + --robot.type=so100 \ + --control.type=replay \ + --control.fps=30 \ + --control.repo_id=${HF_USER}/so100_test \ + --control.episode=0 +``` + +## I. Train a policy + +To train a policy to control your robot, use the [`python lerobot/scripts/train.py`](../lerobot/scripts/train.py) script. A few arguments are required. Here is an example command: +```bash +python lerobot/scripts/train.py \ + --dataset.repo_id=${HF_USER}/so100_test \ + --policy.type=act \ + --output_dir=outputs/train/act_so100_test \ + --job_name=act_so100_test \ + --policy.device=cuda \ + --wandb.enable=true +``` + +Let's explain it: +1. We provided the dataset as argument with `--dataset.repo_id=${HF_USER}/so100_test`. +2. We provided the policy with `policy.type=act`. This loads configurations from [`configuration_act.py`](../lerobot/common/policies/act/configuration_act.py). Importantly, this policy will automatically adapt to the number of motor sates, motor actions and cameras of your robot (e.g. `laptop` and `phone`) which have been saved in your dataset. +4. We provided `policy.device=cuda` since we are training on a Nvidia GPU, but you could use `policy.device=mps` to train on Apple silicon. +5. We provided `wandb.enable=true` to use [Weights and Biases](https://docs.wandb.ai/quickstart) for visualizing training plots. This is optional but if you use it, make sure you are logged in by running `wandb login`. + +Training should take several hours. You will find checkpoints in `outputs/train/act_so100_test/checkpoints`. + +To resume training from a checkpoint, below is an example command to resume from `last` checkpoint of the `act_so100_test` policy: +```bash +python lerobot/scripts/train.py \ + --config_path=outputs/train/act_so100_test/checkpoints/last/pretrained_model/train_config.json \ + --resume=true +``` + +## J. Evaluate your policy + +You can use the `record` function from [`lerobot/scripts/control_robot.py`](../lerobot/scripts/control_robot.py) but with a policy checkpoint as input. For instance, run this command to record 10 evaluation episodes: +```bash +python lerobot/scripts/control_robot.py \ + --robot.type=so100 \ + --control.type=record \ + --control.fps=30 \ + --control.single_task="Grasp a lego block and put it in the bin." \ + --control.repo_id=${HF_USER}/eval_act_so100_test \ + --control.tags='["tutorial"]' \ + --control.warmup_time_s=5 \ + --control.episode_time_s=30 \ + --control.reset_time_s=30 \ + --control.num_episodes=10 \ + --control.push_to_hub=true \ + --control.policy.path=outputs/train/act_so100_test/checkpoints/last/pretrained_model +``` + +As you can see, it's almost the same command as previously used to record your training dataset. Two things changed: +1. There is an additional `--control.policy.path` argument which indicates the path to your policy checkpoint with (e.g. `outputs/train/eval_act_so100_test/checkpoints/last/pretrained_model`). You can also use the model repository if you uploaded a model checkpoint to the hub (e.g. `${HF_USER}/act_so100_test`). +2. The name of dataset begins by `eval` to reflect that you are running inference (e.g. `${HF_USER}/eval_act_so100_test`). + +## K. More Information + +Follow this [previous tutorial](https://github.com/huggingface/lerobot/blob/main/examples/7_get_started_with_real_robot.md#4-train-a-policy-on-your-data) for a more in-depth tutorial on controlling real robots with LeRobot. + +> [!TIP] +> If you have any questions or need help, please reach out on [Discord](https://discord.com/invite/s3KuuzsPFb) in the channel [`#so100-arm`](https://discord.com/channels/1216765309076115607/1237741463832363039). diff --git a/lerobot/common/robots/koch/__init__.py b/lerobot/common/robots/koch_follower/__init__.py similarity index 100% rename from lerobot/common/robots/koch/__init__.py rename to lerobot/common/robots/koch_follower/__init__.py diff --git a/lerobot/common/robots/koch/config_koch_follower.py b/lerobot/common/robots/koch_follower/config_koch_follower.py similarity index 100% rename from lerobot/common/robots/koch/config_koch_follower.py rename to lerobot/common/robots/koch_follower/config_koch_follower.py diff --git a/lerobot/common/robots/koch/koch_follower.py b/lerobot/common/robots/koch_follower/koch_follower.py similarity index 90% rename from lerobot/common/robots/koch/koch_follower.py rename to lerobot/common/robots/koch_follower/koch_follower.py index 4ca996c0..42beda09 100644 --- a/lerobot/common/robots/koch/koch_follower.py +++ b/lerobot/common/robots/koch_follower/koch_follower.py @@ -16,10 +16,11 @@ import logging import time +from functools import cached_property from typing import Any from lerobot.common.cameras.utils import make_cameras_from_configs -from lerobot.common.constants import OBS_IMAGES, OBS_STATE +from lerobot.common.constants import OBS_STATE from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError from lerobot.common.motors import Motor, MotorCalibration, MotorNormMode from lerobot.common.motors.dynamixel import ( @@ -62,27 +63,22 @@ class KochFollower(Robot): self.cameras = make_cameras_from_configs(config.cameras) @property - def state_feature(self) -> dict: + def _motors_ft(self) -> dict[str, type]: + return {f"{motor}.pos": float for motor in self.arm.motors} + + @property + def _cameras_ft(self) -> dict[str, tuple]: return { - "dtype": "float32", - "shape": (len(self.arm),), - "names": {"motors": list(self.arm.motors)}, + cam: (self.config.cameras[cam].height, self.config.cameras[cam].width, 3) for cam in self.cameras } - @property - def action_feature(self) -> dict: - return self.state_feature + @cached_property + def observation_features(self) -> dict[str, type | tuple]: + return {**self._motors_ft, **self._cameras_ft} - @property - def camera_features(self) -> dict[str, dict]: - cam_ft = {} - for cam_key, cam in self.cameras.items(): - cam_ft[cam_key] = { - "shape": (cam.height, cam.width, cam.channels), - "names": ["height", "width", "channels"], - "info": None, - } - return cam_ft + @cached_property + def action_features(self) -> dict[str, type]: + return self._motors_ft @property def is_connected(self) -> bool: @@ -184,13 +180,14 @@ class KochFollower(Robot): # Read arm position start = time.perf_counter() obs_dict[OBS_STATE] = self.arm.sync_read("Present_Position") + obs_dict = {f"{motor}.pos": val for motor, val in obs_dict.items()} dt_ms = (time.perf_counter() - start) * 1e3 logger.debug(f"{self} read state: {dt_ms:.1f}ms") # Capture images from cameras for cam_key, cam in self.cameras.items(): start = time.perf_counter() - obs_dict[f"{OBS_IMAGES}.{cam_key}"] = cam.async_read() + obs_dict[cam_key] = cam.async_read() dt_ms = (time.perf_counter() - start) * 1e3 logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms") @@ -212,7 +209,7 @@ class KochFollower(Robot): if not self.is_connected: raise DeviceNotConnectedError(f"{self} is not connected.") - goal_pos = action + goal_pos = {key.removesuffix(".pos"): val for key, val in action.items() if key.endswith(".pos")} # Cap goal position when too far away from present position. # /!\ Slower fps expected due to reading from the follower. @@ -223,7 +220,7 @@ class KochFollower(Robot): # Send goal position to the arm self.arm.sync_write("Goal_Position", goal_pos) - return goal_pos + return {f"{motor}.pos": val for motor, val in goal_pos.items()} def disconnect(self): if not self.is_connected: diff --git a/lerobot/common/robots/moss/__init__.py b/lerobot/common/robots/moss/__init__.py deleted file mode 100644 index f7c840a3..00000000 --- a/lerobot/common/robots/moss/__init__.py +++ /dev/null @@ -1,4 +0,0 @@ -from .configuration_moss import MossRobotConfig -from .robot_moss import MossRobot - -__all__ = ["MossRobotConfig", "MossRobot"] diff --git a/lerobot/common/robots/moss/robot_moss.py b/lerobot/common/robots/moss/robot_moss.py deleted file mode 100644 index 0f7e5e77..00000000 --- a/lerobot/common/robots/moss/robot_moss.py +++ /dev/null @@ -1,223 +0,0 @@ -#!/usr/bin/env python - -# Copyright 2024 The HuggingFace Inc. team. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import json -import logging -import time - -import numpy as np - -from lerobot.common.cameras.utils import make_cameras_from_configs -from lerobot.common.constants import OBS_IMAGES, OBS_STATE -from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError -from lerobot.common.motors import TorqueMode -from lerobot.common.motors.feetech import ( - FeetechMotorsBus, - apply_feetech_offsets_from_calibration, - run_full_arm_calibration, -) - -from ..robot import Robot -from ..utils import ensure_safe_goal_position -from .configuration_moss import MossRobotConfig - - -class MossRobot(Robot): - """ - [Moss Arm](https://github.com/jess-moss/moss-robot-arms) designed by Jess Moss - """ - - config_class = MossRobotConfig - name = "moss" - - def __init__(self, config: MossRobotConfig): - super().__init__(config) - self.config = config - self.robot_type = config.type - - self.arm = FeetechMotorsBus( - port=self.config.port, - motors={ - "shoulder_pan": config.shoulder_pan, - "shoulder_lift": config.shoulder_lift, - "elbow_flex": config.elbow_flex, - "wrist_flex": config.wrist_flex, - "wrist_roll": config.wrist_roll, - "gripper": config.gripper, - }, - ) - self.cameras = make_cameras_from_configs(config.cameras) - - self.is_connected = False - self.logs = {} - - @property - def state_feature(self) -> dict: - return { - "dtype": "float32", - "shape": (len(self.arm),), - "names": {"motors": list(self.arm.motors)}, - } - - @property - def action_feature(self) -> dict: - return self.state_feature - - @property - def camera_features(self) -> dict[str, dict]: - cam_ft = {} - for cam_key, cam in self.cameras.items(): - cam_ft[cam_key] = { - "shape": (cam.height, cam.width, cam.channels), - "names": ["height", "width", "channels"], - "info": None, - } - return cam_ft - - def connect(self) -> None: - if self.is_connected: - raise DeviceAlreadyConnectedError( - "ManipulatorRobot is already connected. Do not run `robot.connect()` twice." - ) - - logging.info("Connecting arm.") - self.arm.connect() - - # We assume that at connection time, arm is in a rest position, - # and torque can be safely disabled to run calibration. - self.arm.write("Torque_Enable", TorqueMode.DISABLED.value) - self.calibrate() - - # Mode=0 for Position Control - self.arm.write("Mode", 0) - # Set P_Coefficient to lower value to avoid shakiness (Default is 32) - self.arm.write("P_Coefficient", 16) - # Set I_Coefficient and D_Coefficient to default value 0 and 32 - self.arm.write("I_Coefficient", 0) - self.arm.write("D_Coefficient", 32) - # Close the write lock so that Maximum_Acceleration gets written to EPROM address, - # which is mandatory for Maximum_Acceleration to take effect after rebooting. - self.arm.write("Lock", 0) - # Set Maximum_Acceleration to 254 to speedup acceleration and deceleration of - # the motors. Note: this configuration is not in the official STS3215 Memory Table - self.arm.write("Maximum_Acceleration", 254) - self.arm.write("Acceleration", 254) - - logging.info("Activating torque.") - self.arm.write("Torque_Enable", TorqueMode.ENABLED.value) - - # Check arm can be read - self.arm.read("Present_Position") - - # Connect the cameras - for cam in self.cameras.values(): - cam.connect() - - self.is_connected = True - - def calibrate(self) -> None: - """After calibration all motors function in human interpretable ranges. - Rotations are expressed in degrees in nominal range of [-180, 180], - and linear motions (like gripper of Aloha) in nominal range of [0, 100]. - """ - if self.calibration_fpath.exists(): - with open(self.calibration_fpath) as f: - calibration = json.load(f) - else: - # TODO(rcadene): display a warning in __init__ if calibration file not available - logging.info(f"Missing calibration file '{self.calibration_fpath}'") - calibration = run_full_arm_calibration(self.arm, self.robot_type, self.name, "follower") - - logging.info(f"Calibration is done! Saving calibration file '{self.calibration_fpath}'") - self.calibration_fpath.parent.mkdir(parents=True, exist_ok=True) - with open(self.calibration_fpath, "w") as f: - json.dump(calibration, f) - - self.arm.set_calibration(calibration) - apply_feetech_offsets_from_calibration(self.arm, calibration) - - def get_observation(self) -> dict[str, np.ndarray]: - """The returned observations do not have a batch dimension.""" - if not self.is_connected: - raise DeviceNotConnectedError( - "ManipulatorRobot is not connected. You need to run `robot.connect()`." - ) - - obs_dict = {} - - # Read arm position - before_read_t = time.perf_counter() - obs_dict[OBS_STATE] = self.arm.read("Present_Position") - self.logs["read_pos_dt_s"] = time.perf_counter() - before_read_t - - # Capture images from cameras - for cam_key, cam in self.cameras.items(): - before_camread_t = time.perf_counter() - obs_dict[f"{OBS_IMAGES}.{cam_key}"] = cam.async_read() - self.logs[f"read_camera_{cam_key}_dt_s"] = cam.logs["delta_timestamp_s"] - self.logs[f"async_read_camera_{cam_key}_dt_s"] = time.perf_counter() - before_camread_t - - return obs_dict - - def send_action(self, action: np.ndarray) -> np.ndarray: - """Command arm to move to a target joint configuration. - - The relative action magnitude may be clipped depending on the configuration parameter - `max_relative_target`. In this case, the action sent differs from original action. - Thus, this function always returns the action actually sent. - - Args: - action (np.ndarray): array containing the goal positions for the motors. - - Raises: - RobotDeviceNotConnectedError: if robot is not connected. - - Returns: - np.ndarray: the action sent to the motors, potentially clipped. - """ - if not self.is_connected: - raise DeviceNotConnectedError( - "ManipulatorRobot is not connected. You need to run `robot.connect()`." - ) - - goal_pos = action - - # Cap goal position when too far away from present position. - # /!\ Slower fps expected due to reading from the follower. - if self.config.max_relative_target is not None: - present_pos = self.arm.read("Present_Position") - goal_pos = ensure_safe_goal_position(goal_pos, present_pos, self.config.max_relative_target) - - # Send goal position to the arm - self.arm.write("Goal_Position", goal_pos.astype(np.int32)) - - return goal_pos - - def print_logs(self): - # TODO(aliberts): move robot-specific logs logic here - pass - - def disconnect(self): - if not self.is_connected: - raise DeviceNotConnectedError( - "ManipulatorRobot is not connected. You need to run `robot.connect()` before disconnecting." - ) - - self.arm.disconnect() - for cam in self.cameras.values(): - cam.disconnect() - - self.is_connected = False diff --git a/lerobot/common/robots/moss/README.md b/lerobot/common/robots/moss_follower/README.md similarity index 100% rename from lerobot/common/robots/moss/README.md rename to lerobot/common/robots/moss_follower/README.md diff --git a/lerobot/common/robots/moss_follower/__init__.py b/lerobot/common/robots/moss_follower/__init__.py new file mode 100644 index 00000000..2ab82c1d --- /dev/null +++ b/lerobot/common/robots/moss_follower/__init__.py @@ -0,0 +1,2 @@ +from .configuration_moss import MossRobotConfig +from .moss_follower import MossRobot diff --git a/lerobot/common/robots/moss/configuration_moss.py b/lerobot/common/robots/moss_follower/configuration_moss.py similarity index 100% rename from lerobot/common/robots/moss/configuration_moss.py rename to lerobot/common/robots/moss_follower/configuration_moss.py diff --git a/lerobot/common/robots/moss_follower/moss_follower.py b/lerobot/common/robots/moss_follower/moss_follower.py new file mode 100644 index 00000000..e27fb8d7 --- /dev/null +++ b/lerobot/common/robots/moss_follower/moss_follower.py @@ -0,0 +1,215 @@ +#!/usr/bin/env python + +# Copyright 2024 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import time +from functools import cached_property +from typing import Any + +from lerobot.common.cameras.utils import make_cameras_from_configs +from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError +from lerobot.common.motors import Motor, MotorCalibration, MotorNormMode +from lerobot.common.motors.feetech import ( + FeetechMotorsBus, + OperatingMode, +) + +from ..robot import Robot +from ..utils import ensure_safe_goal_position +from .configuration_moss import MossRobotConfig + +logger = logging.getLogger(__name__) + + +class MossRobot(Robot): + """ + [Moss Arm](https://github.com/jess-moss/moss-robot-arms) designed by Jess Moss + """ + + config_class = MossRobotConfig + name = "moss_follower" + + def __init__(self, config: MossRobotConfig): + super().__init__(config) + self.config = config + self.arm = FeetechMotorsBus( + port=self.config.port, + motors={ + "shoulder_pan": Motor(1, "sts3215", MotorNormMode.RANGE_M100_100), + "shoulder_lift": Motor(2, "sts3215", MotorNormMode.RANGE_M100_100), + "elbow_flex": Motor(3, "sts3215", MotorNormMode.RANGE_M100_100), + "wrist_flex": Motor(4, "sts3215", MotorNormMode.RANGE_M100_100), + "wrist_roll": Motor(5, "sts3215", MotorNormMode.RANGE_M100_100), + "gripper": Motor(6, "sts3215", MotorNormMode.RANGE_0_100), + }, + calibration=self.calibration, + ) + self.cameras = make_cameras_from_configs(config.cameras) + + @property + def _motors_ft(self) -> dict[str, type]: + return {f"{motor}.pos": float for motor in self.arm.motors} + + @property + def _cameras_ft(self) -> dict[str, tuple]: + return { + cam: (self.config.cameras[cam].height, self.config.cameras[cam].width, 3) for cam in self.cameras + } + + @cached_property + def observation_features(self) -> dict[str, type | tuple]: + return {**self._motors_ft, **self._cameras_ft} + + @cached_property + def action_features(self) -> dict[str, type]: + return self._motors_ft + + @property + def is_connected(self) -> bool: + # TODO(aliberts): add cam.is_connected for cam in self.cameras + return self.arm.is_connected + + def connect(self) -> None: + """ + We assume that at connection time, arm is in a rest position, + and torque can be safely disabled to run calibration. + """ + if self.is_connected: + raise DeviceAlreadyConnectedError(f"{self} already connected") + + self.arm.connect() + if not self.is_calibrated: + self.calibrate() + + # Connect the cameras + for cam in self.cameras.values(): + cam.connect() + + self.configure() + logger.info(f"{self} connected.") + + @property + def is_calibrated(self) -> bool: + return self.arm.is_calibrated + + def calibrate(self) -> None: + logger.info(f"\nRunning calibration of {self}") + self.arm.disable_torque() + for motor in self.arm.motors: + self.arm.write("Operating_Mode", motor, OperatingMode.POSITION.value) + + input(f"Move {self} to the middle of its range of motion and press ENTER....") + homing_offsets = self.arm.set_half_turn_homings() + + full_turn_motor = "wrist_roll" + unknown_range_motors = [motor for motor in self.arm.motors if motor != full_turn_motor] + print( + f"Move all joints except '{full_turn_motor}' sequentially through their " + "entire ranges of motion.\nRecording positions. Press ENTER to stop..." + ) + range_mins, range_maxes = self.arm.record_ranges_of_motion(unknown_range_motors) + range_mins[full_turn_motor] = 0 + range_maxes[full_turn_motor] = 4095 + + self.calibration = {} + for motor, m in self.arm.motors.items(): + self.calibration[motor] = MotorCalibration( + id=m.id, + drive_mode=0, + homing_offset=homing_offsets[motor], + range_min=range_mins[motor], + range_max=range_maxes[motor], + ) + + self.arm.write_calibration(self.calibration) + self._save_calibration() + print("Calibration saved to", self.calibration_fpath) + + def configure(self) -> None: + with self.arm.torque_disabled(): + self.arm.configure_motors() + for motor in self.arm.motors: + self.arm.write("Operating_Mode", motor, OperatingMode.POSITION.value) + # Set P_Coefficient to lower value to avoid shakiness (Default is 32) + self.arm.write("P_Coefficient", motor, 16) + # Set I_Coefficient and D_Coefficient to default value 0 and 32 + self.arm.write("I_Coefficient", motor, 0) + self.arm.write("D_Coefficient", motor, 32) + + def setup_motors(self) -> None: + for motor in reversed(self.arm.motors): + input(f"Connect the controller board to the '{motor}' motor only and press enter.") + self.arm.setup_motor(motor) + print(f"'{motor}' motor id set to {self.arm.motors[motor].id}") + + def get_observation(self) -> dict[str, Any]: + if not self.is_connected: + raise DeviceNotConnectedError(f"{self} is not connected.") + + # Read arm position + start = time.perf_counter() + obs_dict = self.arm.sync_read("Present_Position") + obs_dict = {f"{motor}.pos": val for motor, val in obs_dict.items()} + dt_ms = (time.perf_counter() - start) * 1e3 + logger.debug(f"{self} read state: {dt_ms:.1f}ms") + + # Capture images from cameras + for cam_key, cam in self.cameras.items(): + start = time.perf_counter() + obs_dict[cam_key] = cam.async_read() + dt_ms = (time.perf_counter() - start) * 1e3 + logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms") + + return obs_dict + + def send_action(self, action: dict[str, Any]) -> dict[str, Any]: + """Command arm to move to a target joint configuration. + + The relative action magnitude may be clipped depending on the configuration parameter + `max_relative_target`. In this case, the action sent differs from original action. + Thus, this function always returns the action actually sent. + + Raises: + RobotDeviceNotConnectedError: if robot is not connected. + + Returns: + the action sent to the motors, potentially clipped. + """ + if not self.is_connected: + raise DeviceNotConnectedError(f"{self} is not connected.") + + goal_pos = {key.removesuffix(".pos"): val for key, val in action.items() if key.endswith(".pos")} + + # Cap goal position when too far away from present position. + # /!\ Slower fps expected due to reading from the follower. + if self.config.max_relative_target is not None: + present_pos = self.arm.sync_read("Present_Position") + goal_present_pos = {key: (g_pos, present_pos[key]) for key, g_pos in goal_pos.items()} + goal_pos = ensure_safe_goal_position(goal_present_pos, self.config.max_relative_target) + + # Send goal position to the arm + self.arm.sync_write("Goal_Position", goal_pos) + return {f"{motor}.pos": val for motor, val in goal_pos.items()} + + def disconnect(self): + if not self.is_connected: + raise DeviceNotConnectedError(f"{self} is not connected.") + + self.arm.disconnect(self.config.disable_torque_on_disconnect) + for cam in self.cameras.values(): + cam.disconnect() + + logger.info(f"{self} disconnected.") diff --git a/lerobot/common/robots/robot.py b/lerobot/common/robots/robot.py index a7ec4eda..e7b58aa3 100644 --- a/lerobot/common/robots/robot.py +++ b/lerobot/common/robots/robot.py @@ -36,15 +36,11 @@ class Robot(abc.ABC): # TODO(aliberts): create a proper Feature class for this that links with datasets @abc.abstractproperty - def state_feature(self) -> dict: + def observation_features(self) -> dict: pass @abc.abstractproperty - def action_feature(self) -> dict: - pass - - @abc.abstractproperty - def camera_features(self) -> dict[str, dict]: + def action_features(self) -> dict: pass @abc.abstractproperty diff --git a/lerobot/common/robots/so100/README.md b/lerobot/common/robots/so100_follower/README.md similarity index 100% rename from lerobot/common/robots/so100/README.md rename to lerobot/common/robots/so100_follower/README.md diff --git a/lerobot/common/robots/so100/__init__.py b/lerobot/common/robots/so100_follower/__init__.py similarity index 100% rename from lerobot/common/robots/so100/__init__.py rename to lerobot/common/robots/so100_follower/__init__.py diff --git a/lerobot/common/robots/so100/config_so100_follower.py b/lerobot/common/robots/so100_follower/config_so100_follower.py similarity index 100% rename from lerobot/common/robots/so100/config_so100_follower.py rename to lerobot/common/robots/so100_follower/config_so100_follower.py diff --git a/lerobot/common/robots/so100/so100_follower.py b/lerobot/common/robots/so100_follower/so100_follower.py similarity index 89% rename from lerobot/common/robots/so100/so100_follower.py rename to lerobot/common/robots/so100_follower/so100_follower.py index aad030bc..3e063c1b 100644 --- a/lerobot/common/robots/so100/so100_follower.py +++ b/lerobot/common/robots/so100_follower/so100_follower.py @@ -16,10 +16,10 @@ import logging import time +from functools import cached_property from typing import Any from lerobot.common.cameras.utils import make_cameras_from_configs -from lerobot.common.constants import OBS_IMAGES, OBS_STATE from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError from lerobot.common.motors import Motor, MotorCalibration, MotorNormMode from lerobot.common.motors.feetech import ( @@ -60,27 +60,22 @@ class SO100Follower(Robot): self.cameras = make_cameras_from_configs(config.cameras) @property - def state_feature(self) -> dict: + def _motors_ft(self) -> dict[str, type]: + return {f"{motor}.pos": float for motor in self.arm.motors} + + @property + def _cameras_ft(self) -> dict[str, tuple]: return { - "dtype": "float32", - "shape": (len(self.arm),), - "names": {"motors": list(self.arm.motors)}, + cam: (self.config.cameras[cam].height, self.config.cameras[cam].width, 3) for cam in self.cameras } - @property - def action_feature(self) -> dict: - return self.state_feature + @cached_property + def observation_features(self) -> dict[str, type | tuple]: + return {**self._motors_ft, **self._cameras_ft} - @property - def camera_features(self) -> dict[str, dict]: - cam_ft = {} - for cam_key, cam in self.cameras.items(): - cam_ft[cam_key] = { - "shape": (cam.height, cam.width, cam.channels), - "names": ["height", "width", "channels"], - "info": None, - } - return cam_ft + @cached_property + def action_features(self) -> dict[str, type]: + return self._motors_ft @property def is_connected(self) -> bool: @@ -164,18 +159,17 @@ class SO100Follower(Robot): if not self.is_connected: raise DeviceNotConnectedError(f"{self} is not connected.") - obs_dict = {} - # Read arm position start = time.perf_counter() - obs_dict[OBS_STATE] = self.arm.sync_read("Present_Position") + obs_dict = self.arm.sync_read("Present_Position") + obs_dict = {f"{motor}.pos": val for motor, val in obs_dict.items()} dt_ms = (time.perf_counter() - start) * 1e3 logger.debug(f"{self} read state: {dt_ms:.1f}ms") # Capture images from cameras for cam_key, cam in self.cameras.items(): start = time.perf_counter() - obs_dict[f"{OBS_IMAGES}.{cam_key}"] = cam.async_read() + obs_dict[cam_key] = cam.async_read() dt_ms = (time.perf_counter() - start) * 1e3 logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms") @@ -197,7 +191,7 @@ class SO100Follower(Robot): if not self.is_connected: raise DeviceNotConnectedError(f"{self} is not connected.") - goal_pos = action + goal_pos = {key.removesuffix(".pos"): val for key, val in action.items() if key.endswith(".pos")} # Cap goal position when too far away from present position. # /!\ Slower fps expected due to reading from the follower. @@ -208,7 +202,7 @@ class SO100Follower(Robot): # Send goal position to the arm self.arm.sync_write("Goal_Position", goal_pos) - return goal_pos + return {f"{motor}.pos": val for motor, val in goal_pos.items()} def disconnect(self): if not self.is_connected: diff --git a/lerobot/common/robots/stretch3/__init__.py b/lerobot/common/robots/stretch3/__init__.py new file mode 100644 index 00000000..e2a859cd --- /dev/null +++ b/lerobot/common/robots/stretch3/__init__.py @@ -0,0 +1,2 @@ +from .configuration_stretch3 import Stretch3RobotConfig +from .robot_stretch3 import Stretch3Robot diff --git a/lerobot/common/robots/stretch3/robot_stretch3.py b/lerobot/common/robots/stretch3/robot_stretch3.py index e07e3f1e..8a7ce9d5 100644 --- a/lerobot/common/robots/stretch3/robot_stretch3.py +++ b/lerobot/common/robots/stretch3/robot_stretch3.py @@ -72,7 +72,7 @@ class Stretch3Robot(Robot): self.action_keys = None @property - def state_feature(self) -> dict: + def observation_features(self) -> dict: return { "dtype": "float32", "shape": (len(STRETCH_MOTORS),), @@ -80,8 +80,8 @@ class Stretch3Robot(Robot): } @property - def action_feature(self) -> dict: - return self.state_feature + def action_features(self) -> dict: + return self.observation_features @property def camera_features(self) -> dict[str, dict]: diff --git a/lerobot/common/robots/utils.py b/lerobot/common/robots/utils.py index 87f159ba..3c543b64 100644 --- a/lerobot/common/robots/utils.py +++ b/lerobot/common/robots/utils.py @@ -1,47 +1,27 @@ import logging from pprint import pformat -from typing import Protocol from lerobot.common.robots import RobotConfig - -def get_arm_id(name, arm_type): - """Returns the string identifier of a robot arm. For instance, for a bimanual manipulator - like Aloha, it could be left_follower, right_follower, left_leader, or right_leader. - """ - return f"{name}_{arm_type}" - - -# TODO(aliberts): Remove and point to lerobot.common.robots.Robot -class Robot(Protocol): - robot_type: str - features: dict - - def connect(self): ... - def run_calibration(self): ... - def teleop_step(self, record_data=False): ... - def capture_observation(self): ... - def send_action(self, action): ... - def disconnect(self): ... +from .robot import Robot def make_robot_config(robot_type: str, **kwargs) -> RobotConfig: if robot_type == "aloha": - from .aloha.configuration_aloha import AlohaRobotConfig + raise NotImplementedError # TODO - return AlohaRobotConfig(**kwargs) elif robot_type == "koch_follower": - from .koch.config_koch_follower import KochFollowerConfig + from .koch_follower.config_koch_follower import KochFollowerConfig return KochFollowerConfig(**kwargs) # elif robot_type == "koch_bimanual": # return KochBimanualRobotConfig(**kwargs) elif robot_type == "moss": - from .moss.configuration_moss import MossRobotConfig + from .moss_follower.configuration_moss import MossRobotConfig return MossRobotConfig(**kwargs) elif robot_type == "so100_leader": - from .so100.config_so100_follower import SO100FollowerConfig + from .so100_follower.config_so100_follower import SO100FollowerConfig return SO100FollowerConfig(**kwargs) elif robot_type == "stretch": @@ -56,23 +36,29 @@ def make_robot_config(robot_type: str, **kwargs) -> RobotConfig: raise ValueError(f"Robot type '{robot_type}' is not available.") -def make_robot_from_config(config: RobotConfig): - from .lekiwi.config_lekiwi import LeKiwiConfig - from .manipulator import ManipulatorRobotConfig +def make_robot_from_config(config: RobotConfig) -> Robot: + if config.type == "koch_follower": + from .koch_follower import KochFollower - if isinstance(config, ManipulatorRobotConfig): - from lerobot.common.robots.manipulator import ManipulatorRobot + return KochFollower(config) + elif config.type == "so100_follower": + from .so100_follower import SO100Follower - return ManipulatorRobot(config) - elif isinstance(config, LeKiwiConfig): - from lerobot.common.robots.lekiwi import LeKiwiClient + return SO100Follower(config) + elif config.type == "lekiwi": + from .lekiwi import LeKiwiClient return LeKiwiClient(config) - ... - else: - from lerobot.common.robots.stretch3.robot_stretch3 import Stretch3Robot + elif config.type == "stretch3": + from .stretch3 import Stretch3Robot return Stretch3Robot(config) + elif config.type == "viperx": + from .viperx import ViperX + + return ViperX(config) + else: + raise ValueError(config.type) def make_robot(robot_type: str, **kwargs) -> Robot: @@ -116,3 +102,11 @@ def ensure_safe_goal_position( ) return safe_goal_positions + + +# TODO(aliberts): Remove +def get_arm_id(name, arm_type): + """Returns the string identifier of a robot arm. For instance, for a bimanual manipulator + like Aloha, it could be left_follower, right_follower, left_leader, or right_leader. + """ + return f"{name}_{arm_type}" diff --git a/lerobot/common/robots/viperx/viperx.py b/lerobot/common/robots/viperx/viperx.py index 7c294864..7639fd72 100644 --- a/lerobot/common/robots/viperx/viperx.py +++ b/lerobot/common/robots/viperx/viperx.py @@ -6,10 +6,11 @@ and send orders to its motors. import logging import time +from functools import cached_property from typing import Any from lerobot.common.cameras.utils import make_cameras_from_configs -from lerobot.common.constants import OBS_IMAGES, OBS_STATE +from lerobot.common.constants import OBS_STATE from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError from lerobot.common.motors import Motor, MotorCalibration, MotorNormMode from lerobot.common.motors.dynamixel import ( @@ -55,28 +56,22 @@ class ViperX(Robot): self.cameras = make_cameras_from_configs(config.cameras) @property - def state_feature(self) -> dict: + def _motors_ft(self) -> dict[str, type]: + return {f"{motor}.pos": float for motor in self.arm.motors} + + @property + def _cameras_ft(self) -> dict[str, tuple]: return { - "dtype": "float32", - "shape": (len(self.arm),), - "names": {"motors": list(self.arm.motors)}, + cam: (self.config.cameras[cam].height, self.config.cameras[cam].width, 3) for cam in self.cameras } - @property - def action_feature(self) -> dict: - return self.state_feature + @cached_property + def observation_features(self) -> dict[str, type | tuple]: + return {**self._motors_ft, **self._cameras_ft} - @property - def camera_features(self) -> dict[str, dict]: - cam_ft = {} - for cam_key, cam in self.cameras.items(): - key = f"observation.images.{cam_key}" - cam_ft[key] = { - "shape": (cam.height, cam.width, cam.channels), - "names": ["height", "width", "channels"], - "info": None, - } - return cam_ft + @cached_property + def action_features(self) -> dict[str, type]: + return self._motors_ft @property def is_connected(self) -> bool: @@ -177,13 +172,14 @@ class ViperX(Robot): # Read arm position start = time.perf_counter() obs_dict[OBS_STATE] = self.arm.sync_read("Present_Position") + obs_dict = {f"{motor}.pos": val for motor, val in obs_dict.items()} dt_ms = (time.perf_counter() - start) * 1e3 logger.debug(f"{self} read state: {dt_ms:.1f}ms") # Capture images from cameras for cam_key, cam in self.cameras.items(): start = time.perf_counter() - obs_dict[f"{OBS_IMAGES}.{cam_key}"] = cam.async_read() + obs_dict[cam_key] = cam.async_read() dt_ms = (time.perf_counter() - start) * 1e3 logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms") @@ -205,7 +201,7 @@ class ViperX(Robot): if not self.is_connected: raise DeviceNotConnectedError(f"{self} is not connected.") - goal_pos = action + goal_pos = {key.removesuffix(".pos"): val for key, val in action.items() if key.endswith(".pos")} # Cap goal position when too far away from present position. # /!\ Slower fps expected due to reading from the follower. @@ -216,7 +212,7 @@ class ViperX(Robot): # Send goal position to the arm self.arm.sync_write("Goal_Position", goal_pos) - return goal_pos + return {f"{motor}.pos": val for motor, val in goal_pos.items()} def disconnect(self): if not self.is_connected: