diff --git a/lerobot/common/robots/__init__.py b/lerobot/common/robots/__init__.py index 58b868ed..d8fd0de9 100644 --- a/lerobot/common/robots/__init__.py +++ b/lerobot/common/robots/__init__.py @@ -1,4 +1,3 @@ from .config import RobotConfig from .robot import Robot - -__all__ = ["RobotConfig", "Robot"] +from .utils import make_robot_from_config diff --git a/lerobot/common/robots/koch_follower/README.md b/lerobot/common/robots/koch_follower/README.md new file mode 100644 index 00000000..f802106d --- /dev/null +++ b/lerobot/common/robots/koch_follower/README.md @@ -0,0 +1,330 @@ +# Using the [Koch v1.1](https://github.com/jess-moss/koch-v1-1) with LeRobot + +## Table of Contents + + - [A. Order and Assemble the parts](#a-order-and-assemble-the-parts) + - [B. Install LeRobot](#b-install-lerobot) + - [C. Configure the Motors](#c-configure-the-motors) + - [D. Calibrate](#d-calibrate) + - [E. Teleoperate](#e-teleoperate) + - [F. Record a dataset](#f-record-a-dataset) + - [G. Visualize a dataset](#g-visualize-a-dataset) + - [H. Replay an episode](#h-replay-an-episode) + - [I. Train a policy](#i-train-a-policy) + - [J. Evaluate your policy](#j-evaluate-your-policy) + - [K. More Information](#k-more-information) + +## A. Order and Assemble the parts + +Follow the sourcing and assembling instructions provided on the [Koch v1.1 Github page](https://github.com/jess-moss/koch-v1-1). This will guide you through setting up both the follower and leader arms, as shown in the image below. + +
+
|
|
|
+
+Make sure both arms are connected and run this script to launch manual calibration:
+```bash
+python lerobot/scripts/control_robot.py \
+ --robot.type=so100 \
+ --robot.cameras='{}' \
+ --control.type=calibrate \
+ --control.arms='["main_follower"]'
+```
+
+#### b. Manual calibration of leader arm
+Follow step 6 of the [assembly video](https://youtu.be/FioA2oeFZ5I?t=724) which illustrates the manual calibration. You will need to move the leader arm to these positions sequentially:
+
+| 1. Zero position | 2. Rotated position | 3. Rest position |
+| ------------------------------------------------------------------------------------------------------------------------------------------------------ | --------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------ |
+|
|
|
|
+
+Run this script to launch manual calibration:
+```bash
+python lerobot/scripts/control_robot.py \
+ --robot.type=so100 \
+ --robot.cameras='{}' \
+ --control.type=calibrate \
+ --control.arms='["main_leader"]'
+```
+
+## E. Teleoperate
+
+**Simple teleop**
+Then you are ready to teleoperate your robot! Run this simple script (it won't connect and display the cameras):
+```bash
+python lerobot/scripts/control_robot.py \
+ --robot.type=so100 \
+ --robot.cameras='{}' \
+ --control.type=teleoperate
+```
+
+
+#### a. Teleop with displaying cameras
+Follow [this guide to setup your cameras](https://github.com/huggingface/lerobot/blob/main/examples/7_get_started_with_real_robot.md#c-add-your-cameras-with-opencvcamera). Then you will be able to display the cameras on your computer while you are teleoperating by running the following code. This is useful to prepare your setup before recording your first dataset.
+
+> **NOTE:** To visualize the data, enable `--control.display_data=true`. This streams the data using `rerun`.
+
+```bash
+python lerobot/scripts/control_robot.py \
+ --robot.type=so100 \
+ --control.type=teleoperate
+```
+
+## F. Record a dataset
+
+Once you're familiar with teleoperation, you can record your first dataset with SO-100.
+
+If you want to use the Hugging Face hub features for uploading your dataset and you haven't previously done it, make sure you've logged in using a write-access token, which can be generated from the [Hugging Face settings](https://huggingface.co/settings/tokens):
+```bash
+huggingface-cli login --token ${HUGGINGFACE_TOKEN} --add-to-git-credential
+```
+
+Store your Hugging Face repository name in a variable to run these commands:
+```bash
+HF_USER=$(huggingface-cli whoami | head -n 1)
+echo $HF_USER
+```
+
+Record 2 episodes and upload your dataset to the hub:
+```bash
+python lerobot/scripts/control_robot.py \
+ --robot.type=so100 \
+ --control.type=record \
+ --control.fps=30 \
+ --control.single_task="Grasp a lego block and put it in the bin." \
+ --control.repo_id=${HF_USER}/so100_test \
+ --control.tags='["so100","tutorial"]' \
+ --control.warmup_time_s=5 \
+ --control.episode_time_s=30 \
+ --control.reset_time_s=30 \
+ --control.num_episodes=2 \
+ --control.push_to_hub=true
+```
+
+Note: You can resume recording by adding `--control.resume=true`.
+
+## G. Visualize a dataset
+
+If you uploaded your dataset to the hub with `--control.push_to_hub=true`, you can [visualize your dataset online](https://huggingface.co/spaces/lerobot/visualize_dataset) by copy pasting your repo id given by:
+```bash
+echo ${HF_USER}/so100_test
+```
+
+If you didn't upload with `--control.push_to_hub=false`, you can also visualize it locally with (a window can be opened in the browser `http://127.0.0.1:9090` with the visualization tool):
+```bash
+python lerobot/scripts/visualize_dataset_html.py \
+ --repo-id ${HF_USER}/so100_test \
+ --local-files-only 1
+```
+
+## H. Replay an episode
+
+Now try to replay the first episode on your robot:
+```bash
+python lerobot/scripts/control_robot.py \
+ --robot.type=so100 \
+ --control.type=replay \
+ --control.fps=30 \
+ --control.repo_id=${HF_USER}/so100_test \
+ --control.episode=0
+```
+
+## I. Train a policy
+
+To train a policy to control your robot, use the [`python lerobot/scripts/train.py`](../lerobot/scripts/train.py) script. A few arguments are required. Here is an example command:
+```bash
+python lerobot/scripts/train.py \
+ --dataset.repo_id=${HF_USER}/so100_test \
+ --policy.type=act \
+ --output_dir=outputs/train/act_so100_test \
+ --job_name=act_so100_test \
+ --policy.device=cuda \
+ --wandb.enable=true
+```
+
+Let's explain it:
+1. We provided the dataset as argument with `--dataset.repo_id=${HF_USER}/so100_test`.
+2. We provided the policy with `policy.type=act`. This loads configurations from [`configuration_act.py`](../lerobot/common/policies/act/configuration_act.py). Importantly, this policy will automatically adapt to the number of motor sates, motor actions and cameras of your robot (e.g. `laptop` and `phone`) which have been saved in your dataset.
+4. We provided `policy.device=cuda` since we are training on a Nvidia GPU, but you could use `policy.device=mps` to train on Apple silicon.
+5. We provided `wandb.enable=true` to use [Weights and Biases](https://docs.wandb.ai/quickstart) for visualizing training plots. This is optional but if you use it, make sure you are logged in by running `wandb login`.
+
+Training should take several hours. You will find checkpoints in `outputs/train/act_so100_test/checkpoints`.
+
+To resume training from a checkpoint, below is an example command to resume from `last` checkpoint of the `act_so100_test` policy:
+```bash
+python lerobot/scripts/train.py \
+ --config_path=outputs/train/act_so100_test/checkpoints/last/pretrained_model/train_config.json \
+ --resume=true
+```
+
+## J. Evaluate your policy
+
+You can use the `record` function from [`lerobot/scripts/control_robot.py`](../lerobot/scripts/control_robot.py) but with a policy checkpoint as input. For instance, run this command to record 10 evaluation episodes:
+```bash
+python lerobot/scripts/control_robot.py \
+ --robot.type=so100 \
+ --control.type=record \
+ --control.fps=30 \
+ --control.single_task="Grasp a lego block and put it in the bin." \
+ --control.repo_id=${HF_USER}/eval_act_so100_test \
+ --control.tags='["tutorial"]' \
+ --control.warmup_time_s=5 \
+ --control.episode_time_s=30 \
+ --control.reset_time_s=30 \
+ --control.num_episodes=10 \
+ --control.push_to_hub=true \
+ --control.policy.path=outputs/train/act_so100_test/checkpoints/last/pretrained_model
+```
+
+As you can see, it's almost the same command as previously used to record your training dataset. Two things changed:
+1. There is an additional `--control.policy.path` argument which indicates the path to your policy checkpoint with (e.g. `outputs/train/eval_act_so100_test/checkpoints/last/pretrained_model`). You can also use the model repository if you uploaded a model checkpoint to the hub (e.g. `${HF_USER}/act_so100_test`).
+2. The name of dataset begins by `eval` to reflect that you are running inference (e.g. `${HF_USER}/eval_act_so100_test`).
+
+## K. More Information
+
+Follow this [previous tutorial](https://github.com/huggingface/lerobot/blob/main/examples/7_get_started_with_real_robot.md#4-train-a-policy-on-your-data) for a more in-depth tutorial on controlling real robots with LeRobot.
+
+> [!TIP]
+> If you have any questions or need help, please reach out on [Discord](https://discord.com/invite/s3KuuzsPFb) in the channel [`#so100-arm`](https://discord.com/channels/1216765309076115607/1237741463832363039).
diff --git a/lerobot/common/robots/koch/__init__.py b/lerobot/common/robots/koch_follower/__init__.py
similarity index 100%
rename from lerobot/common/robots/koch/__init__.py
rename to lerobot/common/robots/koch_follower/__init__.py
diff --git a/lerobot/common/robots/koch/config_koch_follower.py b/lerobot/common/robots/koch_follower/config_koch_follower.py
similarity index 100%
rename from lerobot/common/robots/koch/config_koch_follower.py
rename to lerobot/common/robots/koch_follower/config_koch_follower.py
diff --git a/lerobot/common/robots/koch/koch_follower.py b/lerobot/common/robots/koch_follower/koch_follower.py
similarity index 90%
rename from lerobot/common/robots/koch/koch_follower.py
rename to lerobot/common/robots/koch_follower/koch_follower.py
index 4ca996c0..42beda09 100644
--- a/lerobot/common/robots/koch/koch_follower.py
+++ b/lerobot/common/robots/koch_follower/koch_follower.py
@@ -16,10 +16,11 @@
import logging
import time
+from functools import cached_property
from typing import Any
from lerobot.common.cameras.utils import make_cameras_from_configs
-from lerobot.common.constants import OBS_IMAGES, OBS_STATE
+from lerobot.common.constants import OBS_STATE
from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
from lerobot.common.motors import Motor, MotorCalibration, MotorNormMode
from lerobot.common.motors.dynamixel import (
@@ -62,27 +63,22 @@ class KochFollower(Robot):
self.cameras = make_cameras_from_configs(config.cameras)
@property
- def state_feature(self) -> dict:
+ def _motors_ft(self) -> dict[str, type]:
+ return {f"{motor}.pos": float for motor in self.arm.motors}
+
+ @property
+ def _cameras_ft(self) -> dict[str, tuple]:
return {
- "dtype": "float32",
- "shape": (len(self.arm),),
- "names": {"motors": list(self.arm.motors)},
+ cam: (self.config.cameras[cam].height, self.config.cameras[cam].width, 3) for cam in self.cameras
}
- @property
- def action_feature(self) -> dict:
- return self.state_feature
+ @cached_property
+ def observation_features(self) -> dict[str, type | tuple]:
+ return {**self._motors_ft, **self._cameras_ft}
- @property
- def camera_features(self) -> dict[str, dict]:
- cam_ft = {}
- for cam_key, cam in self.cameras.items():
- cam_ft[cam_key] = {
- "shape": (cam.height, cam.width, cam.channels),
- "names": ["height", "width", "channels"],
- "info": None,
- }
- return cam_ft
+ @cached_property
+ def action_features(self) -> dict[str, type]:
+ return self._motors_ft
@property
def is_connected(self) -> bool:
@@ -184,13 +180,14 @@ class KochFollower(Robot):
# Read arm position
start = time.perf_counter()
obs_dict[OBS_STATE] = self.arm.sync_read("Present_Position")
+ obs_dict = {f"{motor}.pos": val for motor, val in obs_dict.items()}
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read state: {dt_ms:.1f}ms")
# Capture images from cameras
for cam_key, cam in self.cameras.items():
start = time.perf_counter()
- obs_dict[f"{OBS_IMAGES}.{cam_key}"] = cam.async_read()
+ obs_dict[cam_key] = cam.async_read()
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms")
@@ -212,7 +209,7 @@ class KochFollower(Robot):
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
- goal_pos = action
+ goal_pos = {key.removesuffix(".pos"): val for key, val in action.items() if key.endswith(".pos")}
# Cap goal position when too far away from present position.
# /!\ Slower fps expected due to reading from the follower.
@@ -223,7 +220,7 @@ class KochFollower(Robot):
# Send goal position to the arm
self.arm.sync_write("Goal_Position", goal_pos)
- return goal_pos
+ return {f"{motor}.pos": val for motor, val in goal_pos.items()}
def disconnect(self):
if not self.is_connected:
diff --git a/lerobot/common/robots/moss/__init__.py b/lerobot/common/robots/moss/__init__.py
deleted file mode 100644
index f7c840a3..00000000
--- a/lerobot/common/robots/moss/__init__.py
+++ /dev/null
@@ -1,4 +0,0 @@
-from .configuration_moss import MossRobotConfig
-from .robot_moss import MossRobot
-
-__all__ = ["MossRobotConfig", "MossRobot"]
diff --git a/lerobot/common/robots/moss/robot_moss.py b/lerobot/common/robots/moss/robot_moss.py
deleted file mode 100644
index 0f7e5e77..00000000
--- a/lerobot/common/robots/moss/robot_moss.py
+++ /dev/null
@@ -1,223 +0,0 @@
-#!/usr/bin/env python
-
-# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import json
-import logging
-import time
-
-import numpy as np
-
-from lerobot.common.cameras.utils import make_cameras_from_configs
-from lerobot.common.constants import OBS_IMAGES, OBS_STATE
-from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
-from lerobot.common.motors import TorqueMode
-from lerobot.common.motors.feetech import (
- FeetechMotorsBus,
- apply_feetech_offsets_from_calibration,
- run_full_arm_calibration,
-)
-
-from ..robot import Robot
-from ..utils import ensure_safe_goal_position
-from .configuration_moss import MossRobotConfig
-
-
-class MossRobot(Robot):
- """
- [Moss Arm](https://github.com/jess-moss/moss-robot-arms) designed by Jess Moss
- """
-
- config_class = MossRobotConfig
- name = "moss"
-
- def __init__(self, config: MossRobotConfig):
- super().__init__(config)
- self.config = config
- self.robot_type = config.type
-
- self.arm = FeetechMotorsBus(
- port=self.config.port,
- motors={
- "shoulder_pan": config.shoulder_pan,
- "shoulder_lift": config.shoulder_lift,
- "elbow_flex": config.elbow_flex,
- "wrist_flex": config.wrist_flex,
- "wrist_roll": config.wrist_roll,
- "gripper": config.gripper,
- },
- )
- self.cameras = make_cameras_from_configs(config.cameras)
-
- self.is_connected = False
- self.logs = {}
-
- @property
- def state_feature(self) -> dict:
- return {
- "dtype": "float32",
- "shape": (len(self.arm),),
- "names": {"motors": list(self.arm.motors)},
- }
-
- @property
- def action_feature(self) -> dict:
- return self.state_feature
-
- @property
- def camera_features(self) -> dict[str, dict]:
- cam_ft = {}
- for cam_key, cam in self.cameras.items():
- cam_ft[cam_key] = {
- "shape": (cam.height, cam.width, cam.channels),
- "names": ["height", "width", "channels"],
- "info": None,
- }
- return cam_ft
-
- def connect(self) -> None:
- if self.is_connected:
- raise DeviceAlreadyConnectedError(
- "ManipulatorRobot is already connected. Do not run `robot.connect()` twice."
- )
-
- logging.info("Connecting arm.")
- self.arm.connect()
-
- # We assume that at connection time, arm is in a rest position,
- # and torque can be safely disabled to run calibration.
- self.arm.write("Torque_Enable", TorqueMode.DISABLED.value)
- self.calibrate()
-
- # Mode=0 for Position Control
- self.arm.write("Mode", 0)
- # Set P_Coefficient to lower value to avoid shakiness (Default is 32)
- self.arm.write("P_Coefficient", 16)
- # Set I_Coefficient and D_Coefficient to default value 0 and 32
- self.arm.write("I_Coefficient", 0)
- self.arm.write("D_Coefficient", 32)
- # Close the write lock so that Maximum_Acceleration gets written to EPROM address,
- # which is mandatory for Maximum_Acceleration to take effect after rebooting.
- self.arm.write("Lock", 0)
- # Set Maximum_Acceleration to 254 to speedup acceleration and deceleration of
- # the motors. Note: this configuration is not in the official STS3215 Memory Table
- self.arm.write("Maximum_Acceleration", 254)
- self.arm.write("Acceleration", 254)
-
- logging.info("Activating torque.")
- self.arm.write("Torque_Enable", TorqueMode.ENABLED.value)
-
- # Check arm can be read
- self.arm.read("Present_Position")
-
- # Connect the cameras
- for cam in self.cameras.values():
- cam.connect()
-
- self.is_connected = True
-
- def calibrate(self) -> None:
- """After calibration all motors function in human interpretable ranges.
- Rotations are expressed in degrees in nominal range of [-180, 180],
- and linear motions (like gripper of Aloha) in nominal range of [0, 100].
- """
- if self.calibration_fpath.exists():
- with open(self.calibration_fpath) as f:
- calibration = json.load(f)
- else:
- # TODO(rcadene): display a warning in __init__ if calibration file not available
- logging.info(f"Missing calibration file '{self.calibration_fpath}'")
- calibration = run_full_arm_calibration(self.arm, self.robot_type, self.name, "follower")
-
- logging.info(f"Calibration is done! Saving calibration file '{self.calibration_fpath}'")
- self.calibration_fpath.parent.mkdir(parents=True, exist_ok=True)
- with open(self.calibration_fpath, "w") as f:
- json.dump(calibration, f)
-
- self.arm.set_calibration(calibration)
- apply_feetech_offsets_from_calibration(self.arm, calibration)
-
- def get_observation(self) -> dict[str, np.ndarray]:
- """The returned observations do not have a batch dimension."""
- if not self.is_connected:
- raise DeviceNotConnectedError(
- "ManipulatorRobot is not connected. You need to run `robot.connect()`."
- )
-
- obs_dict = {}
-
- # Read arm position
- before_read_t = time.perf_counter()
- obs_dict[OBS_STATE] = self.arm.read("Present_Position")
- self.logs["read_pos_dt_s"] = time.perf_counter() - before_read_t
-
- # Capture images from cameras
- for cam_key, cam in self.cameras.items():
- before_camread_t = time.perf_counter()
- obs_dict[f"{OBS_IMAGES}.{cam_key}"] = cam.async_read()
- self.logs[f"read_camera_{cam_key}_dt_s"] = cam.logs["delta_timestamp_s"]
- self.logs[f"async_read_camera_{cam_key}_dt_s"] = time.perf_counter() - before_camread_t
-
- return obs_dict
-
- def send_action(self, action: np.ndarray) -> np.ndarray:
- """Command arm to move to a target joint configuration.
-
- The relative action magnitude may be clipped depending on the configuration parameter
- `max_relative_target`. In this case, the action sent differs from original action.
- Thus, this function always returns the action actually sent.
-
- Args:
- action (np.ndarray): array containing the goal positions for the motors.
-
- Raises:
- RobotDeviceNotConnectedError: if robot is not connected.
-
- Returns:
- np.ndarray: the action sent to the motors, potentially clipped.
- """
- if not self.is_connected:
- raise DeviceNotConnectedError(
- "ManipulatorRobot is not connected. You need to run `robot.connect()`."
- )
-
- goal_pos = action
-
- # Cap goal position when too far away from present position.
- # /!\ Slower fps expected due to reading from the follower.
- if self.config.max_relative_target is not None:
- present_pos = self.arm.read("Present_Position")
- goal_pos = ensure_safe_goal_position(goal_pos, present_pos, self.config.max_relative_target)
-
- # Send goal position to the arm
- self.arm.write("Goal_Position", goal_pos.astype(np.int32))
-
- return goal_pos
-
- def print_logs(self):
- # TODO(aliberts): move robot-specific logs logic here
- pass
-
- def disconnect(self):
- if not self.is_connected:
- raise DeviceNotConnectedError(
- "ManipulatorRobot is not connected. You need to run `robot.connect()` before disconnecting."
- )
-
- self.arm.disconnect()
- for cam in self.cameras.values():
- cam.disconnect()
-
- self.is_connected = False
diff --git a/lerobot/common/robots/moss/README.md b/lerobot/common/robots/moss_follower/README.md
similarity index 100%
rename from lerobot/common/robots/moss/README.md
rename to lerobot/common/robots/moss_follower/README.md
diff --git a/lerobot/common/robots/moss_follower/__init__.py b/lerobot/common/robots/moss_follower/__init__.py
new file mode 100644
index 00000000..2ab82c1d
--- /dev/null
+++ b/lerobot/common/robots/moss_follower/__init__.py
@@ -0,0 +1,2 @@
+from .configuration_moss import MossRobotConfig
+from .moss_follower import MossRobot
diff --git a/lerobot/common/robots/moss/configuration_moss.py b/lerobot/common/robots/moss_follower/configuration_moss.py
similarity index 100%
rename from lerobot/common/robots/moss/configuration_moss.py
rename to lerobot/common/robots/moss_follower/configuration_moss.py
diff --git a/lerobot/common/robots/moss_follower/moss_follower.py b/lerobot/common/robots/moss_follower/moss_follower.py
new file mode 100644
index 00000000..e27fb8d7
--- /dev/null
+++ b/lerobot/common/robots/moss_follower/moss_follower.py
@@ -0,0 +1,215 @@
+#!/usr/bin/env python
+
+# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import time
+from functools import cached_property
+from typing import Any
+
+from lerobot.common.cameras.utils import make_cameras_from_configs
+from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
+from lerobot.common.motors import Motor, MotorCalibration, MotorNormMode
+from lerobot.common.motors.feetech import (
+ FeetechMotorsBus,
+ OperatingMode,
+)
+
+from ..robot import Robot
+from ..utils import ensure_safe_goal_position
+from .configuration_moss import MossRobotConfig
+
+logger = logging.getLogger(__name__)
+
+
+class MossRobot(Robot):
+ """
+ [Moss Arm](https://github.com/jess-moss/moss-robot-arms) designed by Jess Moss
+ """
+
+ config_class = MossRobotConfig
+ name = "moss_follower"
+
+ def __init__(self, config: MossRobotConfig):
+ super().__init__(config)
+ self.config = config
+ self.arm = FeetechMotorsBus(
+ port=self.config.port,
+ motors={
+ "shoulder_pan": Motor(1, "sts3215", MotorNormMode.RANGE_M100_100),
+ "shoulder_lift": Motor(2, "sts3215", MotorNormMode.RANGE_M100_100),
+ "elbow_flex": Motor(3, "sts3215", MotorNormMode.RANGE_M100_100),
+ "wrist_flex": Motor(4, "sts3215", MotorNormMode.RANGE_M100_100),
+ "wrist_roll": Motor(5, "sts3215", MotorNormMode.RANGE_M100_100),
+ "gripper": Motor(6, "sts3215", MotorNormMode.RANGE_0_100),
+ },
+ calibration=self.calibration,
+ )
+ self.cameras = make_cameras_from_configs(config.cameras)
+
+ @property
+ def _motors_ft(self) -> dict[str, type]:
+ return {f"{motor}.pos": float for motor in self.arm.motors}
+
+ @property
+ def _cameras_ft(self) -> dict[str, tuple]:
+ return {
+ cam: (self.config.cameras[cam].height, self.config.cameras[cam].width, 3) for cam in self.cameras
+ }
+
+ @cached_property
+ def observation_features(self) -> dict[str, type | tuple]:
+ return {**self._motors_ft, **self._cameras_ft}
+
+ @cached_property
+ def action_features(self) -> dict[str, type]:
+ return self._motors_ft
+
+ @property
+ def is_connected(self) -> bool:
+ # TODO(aliberts): add cam.is_connected for cam in self.cameras
+ return self.arm.is_connected
+
+ def connect(self) -> None:
+ """
+ We assume that at connection time, arm is in a rest position,
+ and torque can be safely disabled to run calibration.
+ """
+ if self.is_connected:
+ raise DeviceAlreadyConnectedError(f"{self} already connected")
+
+ self.arm.connect()
+ if not self.is_calibrated:
+ self.calibrate()
+
+ # Connect the cameras
+ for cam in self.cameras.values():
+ cam.connect()
+
+ self.configure()
+ logger.info(f"{self} connected.")
+
+ @property
+ def is_calibrated(self) -> bool:
+ return self.arm.is_calibrated
+
+ def calibrate(self) -> None:
+ logger.info(f"\nRunning calibration of {self}")
+ self.arm.disable_torque()
+ for motor in self.arm.motors:
+ self.arm.write("Operating_Mode", motor, OperatingMode.POSITION.value)
+
+ input(f"Move {self} to the middle of its range of motion and press ENTER....")
+ homing_offsets = self.arm.set_half_turn_homings()
+
+ full_turn_motor = "wrist_roll"
+ unknown_range_motors = [motor for motor in self.arm.motors if motor != full_turn_motor]
+ print(
+ f"Move all joints except '{full_turn_motor}' sequentially through their "
+ "entire ranges of motion.\nRecording positions. Press ENTER to stop..."
+ )
+ range_mins, range_maxes = self.arm.record_ranges_of_motion(unknown_range_motors)
+ range_mins[full_turn_motor] = 0
+ range_maxes[full_turn_motor] = 4095
+
+ self.calibration = {}
+ for motor, m in self.arm.motors.items():
+ self.calibration[motor] = MotorCalibration(
+ id=m.id,
+ drive_mode=0,
+ homing_offset=homing_offsets[motor],
+ range_min=range_mins[motor],
+ range_max=range_maxes[motor],
+ )
+
+ self.arm.write_calibration(self.calibration)
+ self._save_calibration()
+ print("Calibration saved to", self.calibration_fpath)
+
+ def configure(self) -> None:
+ with self.arm.torque_disabled():
+ self.arm.configure_motors()
+ for motor in self.arm.motors:
+ self.arm.write("Operating_Mode", motor, OperatingMode.POSITION.value)
+ # Set P_Coefficient to lower value to avoid shakiness (Default is 32)
+ self.arm.write("P_Coefficient", motor, 16)
+ # Set I_Coefficient and D_Coefficient to default value 0 and 32
+ self.arm.write("I_Coefficient", motor, 0)
+ self.arm.write("D_Coefficient", motor, 32)
+
+ def setup_motors(self) -> None:
+ for motor in reversed(self.arm.motors):
+ input(f"Connect the controller board to the '{motor}' motor only and press enter.")
+ self.arm.setup_motor(motor)
+ print(f"'{motor}' motor id set to {self.arm.motors[motor].id}")
+
+ def get_observation(self) -> dict[str, Any]:
+ if not self.is_connected:
+ raise DeviceNotConnectedError(f"{self} is not connected.")
+
+ # Read arm position
+ start = time.perf_counter()
+ obs_dict = self.arm.sync_read("Present_Position")
+ obs_dict = {f"{motor}.pos": val for motor, val in obs_dict.items()}
+ dt_ms = (time.perf_counter() - start) * 1e3
+ logger.debug(f"{self} read state: {dt_ms:.1f}ms")
+
+ # Capture images from cameras
+ for cam_key, cam in self.cameras.items():
+ start = time.perf_counter()
+ obs_dict[cam_key] = cam.async_read()
+ dt_ms = (time.perf_counter() - start) * 1e3
+ logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms")
+
+ return obs_dict
+
+ def send_action(self, action: dict[str, Any]) -> dict[str, Any]:
+ """Command arm to move to a target joint configuration.
+
+ The relative action magnitude may be clipped depending on the configuration parameter
+ `max_relative_target`. In this case, the action sent differs from original action.
+ Thus, this function always returns the action actually sent.
+
+ Raises:
+ RobotDeviceNotConnectedError: if robot is not connected.
+
+ Returns:
+ the action sent to the motors, potentially clipped.
+ """
+ if not self.is_connected:
+ raise DeviceNotConnectedError(f"{self} is not connected.")
+
+ goal_pos = {key.removesuffix(".pos"): val for key, val in action.items() if key.endswith(".pos")}
+
+ # Cap goal position when too far away from present position.
+ # /!\ Slower fps expected due to reading from the follower.
+ if self.config.max_relative_target is not None:
+ present_pos = self.arm.sync_read("Present_Position")
+ goal_present_pos = {key: (g_pos, present_pos[key]) for key, g_pos in goal_pos.items()}
+ goal_pos = ensure_safe_goal_position(goal_present_pos, self.config.max_relative_target)
+
+ # Send goal position to the arm
+ self.arm.sync_write("Goal_Position", goal_pos)
+ return {f"{motor}.pos": val for motor, val in goal_pos.items()}
+
+ def disconnect(self):
+ if not self.is_connected:
+ raise DeviceNotConnectedError(f"{self} is not connected.")
+
+ self.arm.disconnect(self.config.disable_torque_on_disconnect)
+ for cam in self.cameras.values():
+ cam.disconnect()
+
+ logger.info(f"{self} disconnected.")
diff --git a/lerobot/common/robots/robot.py b/lerobot/common/robots/robot.py
index a7ec4eda..e7b58aa3 100644
--- a/lerobot/common/robots/robot.py
+++ b/lerobot/common/robots/robot.py
@@ -36,15 +36,11 @@ class Robot(abc.ABC):
# TODO(aliberts): create a proper Feature class for this that links with datasets
@abc.abstractproperty
- def state_feature(self) -> dict:
+ def observation_features(self) -> dict:
pass
@abc.abstractproperty
- def action_feature(self) -> dict:
- pass
-
- @abc.abstractproperty
- def camera_features(self) -> dict[str, dict]:
+ def action_features(self) -> dict:
pass
@abc.abstractproperty
diff --git a/lerobot/common/robots/so100/README.md b/lerobot/common/robots/so100_follower/README.md
similarity index 100%
rename from lerobot/common/robots/so100/README.md
rename to lerobot/common/robots/so100_follower/README.md
diff --git a/lerobot/common/robots/so100/__init__.py b/lerobot/common/robots/so100_follower/__init__.py
similarity index 100%
rename from lerobot/common/robots/so100/__init__.py
rename to lerobot/common/robots/so100_follower/__init__.py
diff --git a/lerobot/common/robots/so100/config_so100_follower.py b/lerobot/common/robots/so100_follower/config_so100_follower.py
similarity index 100%
rename from lerobot/common/robots/so100/config_so100_follower.py
rename to lerobot/common/robots/so100_follower/config_so100_follower.py
diff --git a/lerobot/common/robots/so100/so100_follower.py b/lerobot/common/robots/so100_follower/so100_follower.py
similarity index 89%
rename from lerobot/common/robots/so100/so100_follower.py
rename to lerobot/common/robots/so100_follower/so100_follower.py
index aad030bc..3e063c1b 100644
--- a/lerobot/common/robots/so100/so100_follower.py
+++ b/lerobot/common/robots/so100_follower/so100_follower.py
@@ -16,10 +16,10 @@
import logging
import time
+from functools import cached_property
from typing import Any
from lerobot.common.cameras.utils import make_cameras_from_configs
-from lerobot.common.constants import OBS_IMAGES, OBS_STATE
from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
from lerobot.common.motors import Motor, MotorCalibration, MotorNormMode
from lerobot.common.motors.feetech import (
@@ -60,27 +60,22 @@ class SO100Follower(Robot):
self.cameras = make_cameras_from_configs(config.cameras)
@property
- def state_feature(self) -> dict:
+ def _motors_ft(self) -> dict[str, type]:
+ return {f"{motor}.pos": float for motor in self.arm.motors}
+
+ @property
+ def _cameras_ft(self) -> dict[str, tuple]:
return {
- "dtype": "float32",
- "shape": (len(self.arm),),
- "names": {"motors": list(self.arm.motors)},
+ cam: (self.config.cameras[cam].height, self.config.cameras[cam].width, 3) for cam in self.cameras
}
- @property
- def action_feature(self) -> dict:
- return self.state_feature
+ @cached_property
+ def observation_features(self) -> dict[str, type | tuple]:
+ return {**self._motors_ft, **self._cameras_ft}
- @property
- def camera_features(self) -> dict[str, dict]:
- cam_ft = {}
- for cam_key, cam in self.cameras.items():
- cam_ft[cam_key] = {
- "shape": (cam.height, cam.width, cam.channels),
- "names": ["height", "width", "channels"],
- "info": None,
- }
- return cam_ft
+ @cached_property
+ def action_features(self) -> dict[str, type]:
+ return self._motors_ft
@property
def is_connected(self) -> bool:
@@ -164,18 +159,17 @@ class SO100Follower(Robot):
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
- obs_dict = {}
-
# Read arm position
start = time.perf_counter()
- obs_dict[OBS_STATE] = self.arm.sync_read("Present_Position")
+ obs_dict = self.arm.sync_read("Present_Position")
+ obs_dict = {f"{motor}.pos": val for motor, val in obs_dict.items()}
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read state: {dt_ms:.1f}ms")
# Capture images from cameras
for cam_key, cam in self.cameras.items():
start = time.perf_counter()
- obs_dict[f"{OBS_IMAGES}.{cam_key}"] = cam.async_read()
+ obs_dict[cam_key] = cam.async_read()
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms")
@@ -197,7 +191,7 @@ class SO100Follower(Robot):
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
- goal_pos = action
+ goal_pos = {key.removesuffix(".pos"): val for key, val in action.items() if key.endswith(".pos")}
# Cap goal position when too far away from present position.
# /!\ Slower fps expected due to reading from the follower.
@@ -208,7 +202,7 @@ class SO100Follower(Robot):
# Send goal position to the arm
self.arm.sync_write("Goal_Position", goal_pos)
- return goal_pos
+ return {f"{motor}.pos": val for motor, val in goal_pos.items()}
def disconnect(self):
if not self.is_connected:
diff --git a/lerobot/common/robots/stretch3/__init__.py b/lerobot/common/robots/stretch3/__init__.py
new file mode 100644
index 00000000..e2a859cd
--- /dev/null
+++ b/lerobot/common/robots/stretch3/__init__.py
@@ -0,0 +1,2 @@
+from .configuration_stretch3 import Stretch3RobotConfig
+from .robot_stretch3 import Stretch3Robot
diff --git a/lerobot/common/robots/stretch3/robot_stretch3.py b/lerobot/common/robots/stretch3/robot_stretch3.py
index e07e3f1e..8a7ce9d5 100644
--- a/lerobot/common/robots/stretch3/robot_stretch3.py
+++ b/lerobot/common/robots/stretch3/robot_stretch3.py
@@ -72,7 +72,7 @@ class Stretch3Robot(Robot):
self.action_keys = None
@property
- def state_feature(self) -> dict:
+ def observation_features(self) -> dict:
return {
"dtype": "float32",
"shape": (len(STRETCH_MOTORS),),
@@ -80,8 +80,8 @@ class Stretch3Robot(Robot):
}
@property
- def action_feature(self) -> dict:
- return self.state_feature
+ def action_features(self) -> dict:
+ return self.observation_features
@property
def camera_features(self) -> dict[str, dict]:
diff --git a/lerobot/common/robots/utils.py b/lerobot/common/robots/utils.py
index 87f159ba..3c543b64 100644
--- a/lerobot/common/robots/utils.py
+++ b/lerobot/common/robots/utils.py
@@ -1,47 +1,27 @@
import logging
from pprint import pformat
-from typing import Protocol
from lerobot.common.robots import RobotConfig
-
-def get_arm_id(name, arm_type):
- """Returns the string identifier of a robot arm. For instance, for a bimanual manipulator
- like Aloha, it could be left_follower, right_follower, left_leader, or right_leader.
- """
- return f"{name}_{arm_type}"
-
-
-# TODO(aliberts): Remove and point to lerobot.common.robots.Robot
-class Robot(Protocol):
- robot_type: str
- features: dict
-
- def connect(self): ...
- def run_calibration(self): ...
- def teleop_step(self, record_data=False): ...
- def capture_observation(self): ...
- def send_action(self, action): ...
- def disconnect(self): ...
+from .robot import Robot
def make_robot_config(robot_type: str, **kwargs) -> RobotConfig:
if robot_type == "aloha":
- from .aloha.configuration_aloha import AlohaRobotConfig
+ raise NotImplementedError # TODO
- return AlohaRobotConfig(**kwargs)
elif robot_type == "koch_follower":
- from .koch.config_koch_follower import KochFollowerConfig
+ from .koch_follower.config_koch_follower import KochFollowerConfig
return KochFollowerConfig(**kwargs)
# elif robot_type == "koch_bimanual":
# return KochBimanualRobotConfig(**kwargs)
elif robot_type == "moss":
- from .moss.configuration_moss import MossRobotConfig
+ from .moss_follower.configuration_moss import MossRobotConfig
return MossRobotConfig(**kwargs)
elif robot_type == "so100_leader":
- from .so100.config_so100_follower import SO100FollowerConfig
+ from .so100_follower.config_so100_follower import SO100FollowerConfig
return SO100FollowerConfig(**kwargs)
elif robot_type == "stretch":
@@ -56,23 +36,29 @@ def make_robot_config(robot_type: str, **kwargs) -> RobotConfig:
raise ValueError(f"Robot type '{robot_type}' is not available.")
-def make_robot_from_config(config: RobotConfig):
- from .lekiwi.config_lekiwi import LeKiwiConfig
- from .manipulator import ManipulatorRobotConfig
+def make_robot_from_config(config: RobotConfig) -> Robot:
+ if config.type == "koch_follower":
+ from .koch_follower import KochFollower
- if isinstance(config, ManipulatorRobotConfig):
- from lerobot.common.robots.manipulator import ManipulatorRobot
+ return KochFollower(config)
+ elif config.type == "so100_follower":
+ from .so100_follower import SO100Follower
- return ManipulatorRobot(config)
- elif isinstance(config, LeKiwiConfig):
- from lerobot.common.robots.lekiwi import LeKiwiClient
+ return SO100Follower(config)
+ elif config.type == "lekiwi":
+ from .lekiwi import LeKiwiClient
return LeKiwiClient(config)
- ...
- else:
- from lerobot.common.robots.stretch3.robot_stretch3 import Stretch3Robot
+ elif config.type == "stretch3":
+ from .stretch3 import Stretch3Robot
return Stretch3Robot(config)
+ elif config.type == "viperx":
+ from .viperx import ViperX
+
+ return ViperX(config)
+ else:
+ raise ValueError(config.type)
def make_robot(robot_type: str, **kwargs) -> Robot:
@@ -116,3 +102,11 @@ def ensure_safe_goal_position(
)
return safe_goal_positions
+
+
+# TODO(aliberts): Remove
+def get_arm_id(name, arm_type):
+ """Returns the string identifier of a robot arm. For instance, for a bimanual manipulator
+ like Aloha, it could be left_follower, right_follower, left_leader, or right_leader.
+ """
+ return f"{name}_{arm_type}"
diff --git a/lerobot/common/robots/viperx/viperx.py b/lerobot/common/robots/viperx/viperx.py
index 7c294864..7639fd72 100644
--- a/lerobot/common/robots/viperx/viperx.py
+++ b/lerobot/common/robots/viperx/viperx.py
@@ -6,10 +6,11 @@ and send orders to its motors.
import logging
import time
+from functools import cached_property
from typing import Any
from lerobot.common.cameras.utils import make_cameras_from_configs
-from lerobot.common.constants import OBS_IMAGES, OBS_STATE
+from lerobot.common.constants import OBS_STATE
from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
from lerobot.common.motors import Motor, MotorCalibration, MotorNormMode
from lerobot.common.motors.dynamixel import (
@@ -55,28 +56,22 @@ class ViperX(Robot):
self.cameras = make_cameras_from_configs(config.cameras)
@property
- def state_feature(self) -> dict:
+ def _motors_ft(self) -> dict[str, type]:
+ return {f"{motor}.pos": float for motor in self.arm.motors}
+
+ @property
+ def _cameras_ft(self) -> dict[str, tuple]:
return {
- "dtype": "float32",
- "shape": (len(self.arm),),
- "names": {"motors": list(self.arm.motors)},
+ cam: (self.config.cameras[cam].height, self.config.cameras[cam].width, 3) for cam in self.cameras
}
- @property
- def action_feature(self) -> dict:
- return self.state_feature
+ @cached_property
+ def observation_features(self) -> dict[str, type | tuple]:
+ return {**self._motors_ft, **self._cameras_ft}
- @property
- def camera_features(self) -> dict[str, dict]:
- cam_ft = {}
- for cam_key, cam in self.cameras.items():
- key = f"observation.images.{cam_key}"
- cam_ft[key] = {
- "shape": (cam.height, cam.width, cam.channels),
- "names": ["height", "width", "channels"],
- "info": None,
- }
- return cam_ft
+ @cached_property
+ def action_features(self) -> dict[str, type]:
+ return self._motors_ft
@property
def is_connected(self) -> bool:
@@ -177,13 +172,14 @@ class ViperX(Robot):
# Read arm position
start = time.perf_counter()
obs_dict[OBS_STATE] = self.arm.sync_read("Present_Position")
+ obs_dict = {f"{motor}.pos": val for motor, val in obs_dict.items()}
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read state: {dt_ms:.1f}ms")
# Capture images from cameras
for cam_key, cam in self.cameras.items():
start = time.perf_counter()
- obs_dict[f"{OBS_IMAGES}.{cam_key}"] = cam.async_read()
+ obs_dict[cam_key] = cam.async_read()
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms")
@@ -205,7 +201,7 @@ class ViperX(Robot):
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
- goal_pos = action
+ goal_pos = {key.removesuffix(".pos"): val for key, val in action.items() if key.endswith(".pos")}
# Cap goal position when too far away from present position.
# /!\ Slower fps expected due to reading from the follower.
@@ -216,7 +212,7 @@ class ViperX(Robot):
# Send goal position to the arm
self.arm.sync_write("Goal_Position", goal_pos)
- return goal_pos
+ return {f"{motor}.pos": val for motor, val in goal_pos.items()}
def disconnect(self):
if not self.is_connected: