Compare commits

..

15 Commits

Author SHA1 Message Date
Steven Palma
e511e7eda5 Merge branch 'main' into fix/lint_warnings 2025-03-10 09:39:00 +01:00
Steven Palma
f5ed3723f0 fix(tests): typo in fixture name 2025-03-07 18:21:55 +01:00
Steven Palma
b104be0d04 Merge branch 'main' into fix/lint_warnings 2025-03-07 18:07:30 +01:00
Steven Palma
f9e4a1f5c4 chore(style): fix format 2025-03-07 17:44:18 +01:00
Steven Palma
0eb56cec14 fix(tests): remove lint warnings/errors 2025-03-07 17:44:18 +01:00
Steven Palma
e59ef036e1 fix(lerobot/common/datasets): remove lint warnings/errors 2025-03-07 16:50:22 +01:00
Steven Palma
9b380eaf67 fix(lerobot/common/envs): remove lint warnings/errors 2025-03-07 16:50:22 +01:00
Steven Palma
1187604ba0 fix(lerobot/common/optim): remove lint warnings/errors 2025-03-07 16:50:22 +01:00
Steven Palma
5c6f2d2cd0 fix(lerobot/common/policies): remove lint warnings/errors 2025-03-07 16:50:22 +01:00
Steven Palma
652fedf69c fix(lerobot/common/robot_devices): remove lint warnings/errors 2025-03-07 16:50:22 +01:00
Steven Palma
85214ec303 fix(lerobot/common/utils): remove lint warnings/errors 2025-03-07 16:50:22 +01:00
Steven Palma
dffa5a18db fix(lerobot/configs): remove lint warning/errors 2025-03-07 16:50:22 +01:00
Steven Palma
301f152a34 fix(lerobot/scripts): remove lint warnings/errors 2025-03-07 16:50:21 +01:00
Steven Palma
0ed08c0b1f fix(examples): remove lint warnings/errors 2025-03-07 14:26:33 +01:00
Steven Palma
254bc707e7 fix(benchmarks): remove lint warnings/errors 2025-03-07 14:25:42 +01:00
148 changed files with 1409 additions and 1336 deletions

View File

@@ -73,7 +73,7 @@ pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
!tests/artifacts
!tests/data
htmlcov/
.tox/
.nox/

2
.gitignore vendored
View File

@@ -78,7 +78,7 @@ pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
!tests/artifacts
!tests/data
htmlcov/
.tox/
.nox/

View File

@@ -12,17 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
exclude: "tests/artifacts/.*\\.safetensors$"
exclude: ^(tests/data)
default_language_version:
python: python3.10
repos:
##### Meta #####
- repo: meta
hooks:
- id: check-useless-excludes
- id: check-hooks-apply
##### Style / Misc. #####
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v5.0.0
@@ -35,37 +28,31 @@ repos:
- id: check-toml
- id: end-of-file-fixer
- id: trailing-whitespace
- repo: https://github.com/crate-ci/typos
rev: v1.30.2
rev: v1.30.0
hooks:
- id: typos
args: [--force-exclude]
- repo: https://github.com/asottile/pyupgrade
rev: v3.19.1
hooks:
- id: pyupgrade
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.9.10
rev: v0.9.9
hooks:
- id: ruff
args: [--fix]
- id: ruff-format
##### Security #####
- repo: https://github.com/gitleaks/gitleaks
rev: v8.24.0
hooks:
- id: gitleaks
- repo: https://github.com/woodruffw/zizmor-pre-commit
rev: v1.4.1
hooks:
- id: zizmor
- repo: https://github.com/PyCQA/bandit
rev: 1.8.3
hooks:

View File

@@ -291,7 +291,7 @@ sudo apt-get install git-lfs
git lfs install
```
Pull artifacts if they're not in [tests/artifacts](tests/artifacts)
Pull artifacts if they're not in [tests/data](tests/data)
```bash
git lfs pull
```

View File

@@ -232,8 +232,8 @@ python lerobot/scripts/eval.py \
--env.type=pusht \
--eval.batch_size=10 \
--eval.n_episodes=10 \
--policy.use_amp=false \
--policy.device=cuda
--use_amp=false \
--device=cuda
```
Note: After training your own policy, you can re-evaluate the checkpoints with:

View File

@@ -67,6 +67,7 @@ def parse_int_or_none(value) -> int | None:
def check_datasets_formats(repo_ids: list) -> None:
for repo_id in repo_ids:
dataset = LeRobotDataset(repo_id)
# TODO(Steven): Seems this API has changed
if dataset.video:
raise ValueError(
f"Use only image dataset for running this benchmark. Video dataset provided: {repo_id}"

View File

@@ -454,8 +454,8 @@ Next, you'll need to calibrate your SO-100 robot to ensure that the leader and f
You will need to move the follower arm to these positions sequentially:
| 1. Zero position | 2. Rotated position | 3. Rest position |
| ------------------------------------------------------------------------------------------------------------------------------------------------------------ | --------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------ |
| 1. Zero position | 2. Rotated position | 3. Rest position |
|---|---|---|
| <img src="../media/so100/follower_zero.webp?raw=true" alt="SO-100 follower arm zero position" title="SO-100 follower arm zero position" style="width:100%;"> | <img src="../media/so100/follower_rotated.webp?raw=true" alt="SO-100 follower arm rotated position" title="SO-100 follower arm rotated position" style="width:100%;"> | <img src="../media/so100/follower_rest.webp?raw=true" alt="SO-100 follower arm rest position" title="SO-100 follower arm rest position" style="width:100%;"> |
Make sure both arms are connected and run this script to launch manual calibration:
@@ -470,8 +470,8 @@ python lerobot/scripts/control_robot.py \
#### b. Manual calibration of leader arm
Follow step 6 of the [assembly video](https://youtu.be/FioA2oeFZ5I?t=724) which illustrates the manual calibration. You will need to move the leader arm to these positions sequentially:
| 1. Zero position | 2. Rotated position | 3. Rest position |
| ------------------------------------------------------------------------------------------------------------------------------------------------------ | --------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------ |
| 1. Zero position | 2. Rotated position | 3. Rest position |
|---|---|---|
| <img src="../media/so100/leader_zero.webp?raw=true" alt="SO-100 leader arm zero position" title="SO-100 leader arm zero position" style="width:100%;"> | <img src="../media/so100/leader_rotated.webp?raw=true" alt="SO-100 leader arm rotated position" title="SO-100 leader arm rotated position" style="width:100%;"> | <img src="../media/so100/leader_rest.webp?raw=true" alt="SO-100 leader arm rest position" title="SO-100 leader arm rest position" style="width:100%;"> |
Run this script to launch manual calibration:
@@ -571,14 +571,14 @@ python lerobot/scripts/train.py \
--policy.type=act \
--output_dir=outputs/train/act_so100_test \
--job_name=act_so100_test \
--policy.device=cuda \
--device=cuda \
--wandb.enable=true
```
Let's explain it:
1. We provided the dataset as argument with `--dataset.repo_id=${HF_USER}/so100_test`.
2. We provided the policy with `policy.type=act`. This loads configurations from [`configuration_act.py`](../lerobot/common/policies/act/configuration_act.py). Importantly, this policy will automatically adapt to the number of motor sates, motor actions and cameras of your robot (e.g. `laptop` and `phone`) which have been saved in your dataset.
4. We provided `policy.device=cuda` since we are training on a Nvidia GPU, but you could use `policy.device=mps` to train on Apple silicon.
4. We provided `device=cuda` since we are training on a Nvidia GPU, but you could use `device=mps` to train on Apple silicon.
5. We provided `wandb.enable=true` to use [Weights and Biases](https://docs.wandb.ai/quickstart) for visualizing training plots. This is optional but if you use it, make sure you are logged in by running `wandb login`.
Training should take several hours. You will find checkpoints in `outputs/train/act_so100_test/checkpoints`.

View File

@@ -366,8 +366,8 @@ Now we have to calibrate the leader arm and the follower arm. The wheel motors d
You will need to move the follower arm to these positions sequentially:
| 1. Zero position | 2. Rotated position | 3. Rest position |
| ----------------------------------------------------------------------------------------------------------------------------------------------------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| 1. Zero position | 2. Rotated position | 3. Rest position |
|---|---|---|
| <img src="../media/lekiwi/mobile_calib_zero.webp?raw=true" alt="SO-100 follower arm zero position" title="SO-100 follower arm zero position" style="width:100%;"> | <img src="../media/lekiwi/mobile_calib_rotated.webp?raw=true" alt="SO-100 follower arm rotated position" title="SO-100 follower arm rotated position" style="width:100%;"> | <img src="../media/lekiwi/mobile_calib_rest.webp?raw=true" alt="SO-100 follower arm rest position" title="SO-100 follower arm rest position" style="width:100%;"> |
Make sure the arm is connected to the Raspberry Pi and run this script (on the Raspberry Pi) to launch manual calibration:
@@ -385,8 +385,8 @@ If you have the **wired** LeKiwi version please run all commands including this
### Calibrate leader arm
Then to calibrate the leader arm (which is attached to the laptop/pc). You will need to move the leader arm to these positions sequentially:
| 1. Zero position | 2. Rotated position | 3. Rest position |
| ------------------------------------------------------------------------------------------------------------------------------------------------------ | --------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------ |
| 1. Zero position | 2. Rotated position | 3. Rest position |
|---|---|---|
| <img src="../media/so100/leader_zero.webp?raw=true" alt="SO-100 leader arm zero position" title="SO-100 leader arm zero position" style="width:100%;"> | <img src="../media/so100/leader_rotated.webp?raw=true" alt="SO-100 leader arm rotated position" title="SO-100 leader arm rotated position" style="width:100%;"> | <img src="../media/so100/leader_rest.webp?raw=true" alt="SO-100 leader arm rest position" title="SO-100 leader arm rest position" style="width:100%;"> |
Run this script (on your laptop/pc) to launch manual calibration:
@@ -416,22 +416,22 @@ python lerobot/scripts/control_robot.py \
You should see on your laptop something like this: ```[INFO] Connected to remote robot at tcp://172.17.133.91:5555 and video stream at tcp://172.17.133.91:5556.``` Now you can move the leader arm and use the keyboard (w,a,s,d) to drive forward, left, backwards, right. And use (z,x) to turn left or turn right. You can use (r,f) to increase and decrease the speed of the mobile robot. There are three speed modes, see the table below:
| Speed Mode | Linear Speed (m/s) | Rotation Speed (deg/s) |
| ---------- | ------------------ | ---------------------- |
| Fast | 0.4 | 90 |
| Medium | 0.25 | 60 |
| Slow | 0.1 | 30 |
|------------|-------------------|-----------------------|
| Fast | 0.4 | 90 |
| Medium | 0.25 | 60 |
| Slow | 0.1 | 30 |
| Key | Action |
| --- | -------------- |
| W | Move forward |
| A | Move left |
| S | Move backward |
| D | Move right |
| Z | Turn left |
| X | Turn right |
| R | Increase speed |
| F | Decrease speed |
| Key | Action |
|------|--------------------------------|
| W | Move forward |
| A | Move left |
| S | Move backward |
| D | Move right |
| Z | Turn left |
| X | Turn right |
| R | Increase speed |
| F | Decrease speed |
> [!TIP]
> If you use a different keyboard you can change the keys for each command in the [`LeKiwiRobotConfig`](../lerobot/common/robot_devices/robots/configs.py).
@@ -549,14 +549,14 @@ python lerobot/scripts/train.py \
--policy.type=act \
--output_dir=outputs/train/act_lekiwi_test \
--job_name=act_lekiwi_test \
--policy.device=cuda \
--device=cuda \
--wandb.enable=true
```
Let's explain it:
1. We provided the dataset as argument with `--dataset.repo_id=${HF_USER}/lekiwi_test`.
2. We provided the policy with `policy.type=act`. This loads configurations from [`configuration_act.py`](../lerobot/common/policies/act/configuration_act.py). Importantly, this policy will automatically adapt to the number of motor sates, motor actions and cameras of your robot (e.g. `laptop` and `phone`) which have been saved in your dataset.
4. We provided `policy.device=cuda` since we are training on a Nvidia GPU, but you could use `policy.device=mps` to train on Apple silicon.
4. We provided `device=cuda` since we are training on a Nvidia GPU, but you could use `device=mps` to train on Apple silicon.
5. We provided `wandb.enable=true` to use [Weights and Biases](https://docs.wandb.ai/quickstart) for visualizing training plots. This is optional but if you use it, make sure you are logged in by running `wandb login`.
Training should take several hours. You will find checkpoints in `outputs/train/act_lekiwi_test/checkpoints`.

View File

@@ -176,8 +176,8 @@ Next, you'll need to calibrate your Moss v1 robot to ensure that the leader and
You will need to move the follower arm to these positions sequentially:
| 1. Zero position | 2. Rotated position | 3. Rest position |
| ------------------------------------------------------------------------------------------------------------------------------------------------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| 1. Zero position | 2. Rotated position | 3. Rest position |
|---|---|---|
| <img src="../media/moss/follower_zero.webp?raw=true" alt="Moss v1 follower arm zero position" title="Moss v1 follower arm zero position" style="width:100%;"> | <img src="../media/moss/follower_rotated.webp?raw=true" alt="Moss v1 follower arm rotated position" title="Moss v1 follower arm rotated position" style="width:100%;"> | <img src="../media/moss/follower_rest.webp?raw=true" alt="Moss v1 follower arm rest position" title="Moss v1 follower arm rest position" style="width:100%;"> |
Make sure both arms are connected and run this script to launch manual calibration:
@@ -192,8 +192,8 @@ python lerobot/scripts/control_robot.py \
**Manual calibration of leader arm**
Follow step 6 of the [assembly video](https://www.youtube.com/watch?v=DA91NJOtMic) which illustrates the manual calibration. You will need to move the leader arm to these positions sequentially:
| 1. Zero position | 2. Rotated position | 3. Rest position |
| ------------------------------------------------------------------------------------------------------------------------------------------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------- |
| 1. Zero position | 2. Rotated position | 3. Rest position |
|---|---|---|
| <img src="../media/moss/leader_zero.webp?raw=true" alt="Moss v1 leader arm zero position" title="Moss v1 leader arm zero position" style="width:100%;"> | <img src="../media/moss/leader_rotated.webp?raw=true" alt="Moss v1 leader arm rotated position" title="Moss v1 leader arm rotated position" style="width:100%;"> | <img src="../media/moss/leader_rest.webp?raw=true" alt="Moss v1 leader arm rest position" title="Moss v1 leader arm rest position" style="width:100%;"> |
Run this script to launch manual calibration:
@@ -293,14 +293,14 @@ python lerobot/scripts/train.py \
--policy.type=act \
--output_dir=outputs/train/act_moss_test \
--job_name=act_moss_test \
--policy.device=cuda \
--device=cuda \
--wandb.enable=true
```
Let's explain it:
1. We provided the dataset as argument with `--dataset.repo_id=${HF_USER}/moss_test`.
2. We provided the policy with `policy.type=act`. This loads configurations from [`configuration_act.py`](../lerobot/common/policies/act/configuration_act.py). Importantly, this policy will automatically adapt to the number of motor sates, motor actions and cameras of your robot (e.g. `laptop` and `phone`) which have been saved in your dataset.
4. We provided `policy.device=cuda` since we are training on a Nvidia GPU, but you could use `policy.device=mps` to train on Apple silicon.
4. We provided `device=cuda` since we are training on a Nvidia GPU, but you could use `device=mps` to train on Apple silicon.
5. We provided `wandb.enable=true` to use [Weights and Biases](https://docs.wandb.ai/quickstart) for visualizing training plots. This is optional but if you use it, make sure you are logged in by running `wandb login`.
Training should take several hours. You will find checkpoints in `outputs/train/act_moss_test/checkpoints`.

View File

@@ -1,5 +1,5 @@
This tutorial will explain the training script, how to use it, and particularly how to configure everything needed for the training run.
> **Note:** The following assume you're running these commands on a machine equipped with a cuda GPU. If you don't have one (or if you're using a Mac), you can add `--policy.device=cpu` (`--policy.device=mps` respectively). However, be advised that the code executes much slower on cpu.
> **Note:** The following assume you're running these commands on a machine equipped with a cuda GPU. If you don't have one (or if you're using a Mac), you can add `--device=cpu` (`--device=mps` respectively). However, be advised that the code executes much slower on cpu.
## The training script

View File

@@ -386,14 +386,14 @@ When you connect your robot for the first time, the [`ManipulatorRobot`](../lero
Here are the positions you'll move the follower arm to:
| 1. Zero position | 2. Rotated position | 3. Rest position |
| ----------------------------------------------------------------------------------------------------------------------------------------------------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| 1. Zero position | 2. Rotated position | 3. Rest position |
|---|---|---|
| <img src="../media/koch/follower_zero.webp?raw=true" alt="Koch v1.1 follower arm zero position" title="Koch v1.1 follower arm zero position" style="width:100%;"> | <img src="../media/koch/follower_rotated.webp?raw=true" alt="Koch v1.1 follower arm rotated position" title="Koch v1.1 follower arm rotated position" style="width:100%;"> | <img src="../media/koch/follower_rest.webp?raw=true" alt="Koch v1.1 follower arm rest position" title="Koch v1.1 follower arm rest position" style="width:100%;"> |
And here are the corresponding positions for the leader arm:
| 1. Zero position | 2. Rotated position | 3. Rest position |
| ----------------------------------------------------------------------------------------------------------------------------------------------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------- |
| 1. Zero position | 2. Rotated position | 3. Rest position |
|---|---|---|
| <img src="../media/koch/leader_zero.webp?raw=true" alt="Koch v1.1 leader arm zero position" title="Koch v1.1 leader arm zero position" style="width:100%;"> | <img src="../media/koch/leader_rotated.webp?raw=true" alt="Koch v1.1 leader arm rotated position" title="Koch v1.1 leader arm rotated position" style="width:100%;"> | <img src="../media/koch/leader_rest.webp?raw=true" alt="Koch v1.1 leader arm rest position" title="Koch v1.1 leader arm rest position" style="width:100%;"> |
You can watch a [video tutorial of the calibration procedure](https://youtu.be/8drnU9uRY24) for more details.
@@ -898,14 +898,14 @@ python lerobot/scripts/train.py \
--policy.type=act \
--output_dir=outputs/train/act_koch_test \
--job_name=act_koch_test \
--policy.device=cuda \
--device=cuda \
--wandb.enable=true
```
Let's explain it:
1. We provided the dataset as argument with `--dataset.repo_id=${HF_USER}/koch_test`.
2. We provided the policy with `policy.type=act`. This loads configurations from [`configuration_act.py`](../lerobot/common/policies/act/configuration_act.py). Importantly, this policy will automatically adapt to the number of motor sates, motor actions and cameras of your robot (e.g. `laptop` and `phone`) which have been saved in your dataset.
4. We provided `policy.device=cuda` since we are training on a Nvidia GPU, but you could use `policy.device=mps` to train on Apple silicon.
4. We provided `device=cuda` since we are training on a Nvidia GPU, but you could use `device=mps` to train on Apple silicon.
5. We provided `wandb.enable=true` to use [Weights and Biases](https://docs.wandb.ai/quickstart) for visualizing training plots. This is optional but if you use it, make sure you are logged in by running `wandb login`.
For more information on the `train` script see the previous tutorial: [`examples/4_train_policy_with_script.md`](../examples/4_train_policy_with_script.md)

View File

@@ -135,14 +135,14 @@ python lerobot/scripts/train.py \
--policy.type=act \
--output_dir=outputs/train/act_aloha_test \
--job_name=act_aloha_test \
--policy.device=cuda \
--device=cuda \
--wandb.enable=true
```
Let's explain it:
1. We provided the dataset as argument with `--dataset.repo_id=${HF_USER}/aloha_test`.
2. We provided the policy with `policy.type=act`. This loads configurations from [`configuration_act.py`](../lerobot/common/policies/act/configuration_act.py). Importantly, this policy will automatically adapt to the number of motor sates, motor actions and cameras of your robot (e.g. `laptop` and `phone`) which have been saved in your dataset.
4. We provided `policy.device=cuda` since we are training on a Nvidia GPU, but you could use `policy.device=mps` to train on Apple silicon.
4. We provided `device=cuda` since we are training on a Nvidia GPU, but you could use `device=mps` to train on Apple silicon.
5. We provided `wandb.enable=true` to use [Weights and Biases](https://docs.wandb.ai/quickstart) for visualizing training plots. This is optional but if you use it, make sure you are logged in by running `wandb login`.
For more information on the `train` script see the previous tutorial: [`examples/4_train_policy_with_script.md`](../examples/4_train_policy_with_script.md)

View File

@@ -222,7 +222,7 @@ def main(raw_dir: Path, repo_id: str, mode: str = "video", push_to_hub: bool = T
if __name__ == "__main__":
# To try this script, modify the repo id with your own HuggingFace user (e.g cadene/pusht)
repo_id = "lerobot/pusht"
repository_id = "lerobot/pusht"
modes = ["video", "image", "keypoints"]
# Uncomment if you want to try with a specific mode
@@ -230,13 +230,13 @@ if __name__ == "__main__":
# modes = ["image"]
# modes = ["keypoints"]
raw_dir = Path("data/lerobot-raw/pusht_raw")
for mode in modes:
if mode in ["image", "keypoints"]:
repo_id += f"_{mode}"
data_dir = Path("data/lerobot-raw/pusht_raw")
for available_mode in modes:
if available_mode in ["image", "keypoints"]:
repository_id += f"_{available_mode}"
# download and load raw dataset, create LeRobotDataset, populate it, push to hub
main(raw_dir, repo_id=repo_id, mode=mode)
main(data_dir, repo_id=repository_id, mode=available_mode)
# Uncomment if you want to load the local dataset and explore it
# dataset = LeRobotDataset(repo_id=repo_id)

View File

@@ -13,8 +13,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from pprint import pformat
import torch
@@ -98,17 +96,17 @@ def make_dataset(cfg: TrainPipelineConfig) -> LeRobotDataset | MultiLeRobotDatas
)
else:
raise NotImplementedError("The MultiLeRobotDataset isn't supported for now.")
dataset = MultiLeRobotDataset(
cfg.dataset.repo_id,
# TODO(aliberts): add proper support for multi dataset
# delta_timestamps=delta_timestamps,
image_transforms=image_transforms,
video_backend=cfg.dataset.video_backend,
)
logging.info(
"Multiple datasets were provided. Applied the following index mapping to the provided datasets: "
f"{pformat(dataset.repo_id_to_index, indent=2)}"
)
# dataset = MultiLeRobotDataset(
# cfg.dataset.repo_id,
# # TODO(aliberts): add proper support for multi dataset
# # delta_timestamps=delta_timestamps,
# image_transforms=image_transforms,
# video_backend=cfg.dataset.video_backend,
# )
# logging.info(
# "Multiple datasets were provided. Applied the following index mapping to the provided datasets: "
# f"{pformat(dataset.repo_id_to_index, indent=2)}"
# )
if cfg.dataset.use_imagenet_stats:
for key in dataset.meta.camera_keys:

View File

@@ -81,21 +81,21 @@ def write_image(image: np.ndarray | PIL.Image.Image, fpath: Path):
print(f"Error writing image {fpath}: {e}")
def worker_thread_loop(queue: queue.Queue):
def worker_thread_loop(task_queue: queue.Queue):
while True:
item = queue.get()
item = task_queue.get()
if item is None:
queue.task_done()
task_queue.task_done()
break
image_array, fpath = item
write_image(image_array, fpath)
queue.task_done()
task_queue.task_done()
def worker_process(queue: queue.Queue, num_threads: int):
def worker_process(task_queue: queue.Queue, num_threads: int):
threads = []
for _ in range(num_threads):
t = threading.Thread(target=worker_thread_loop, args=(queue,))
t = threading.Thread(target=worker_thread_loop, args=(task_queue,))
t.daemon = True
t.start()
threads.append(t)

View File

@@ -87,6 +87,7 @@ class LeRobotDatasetMetadata:
self.repo_id = repo_id
self.revision = revision if revision else CODEBASE_VERSION
self.root = Path(root) if root is not None else HF_LEROBOT_HOME / repo_id
self.stats = None
try:
if force_cache_sync:
@@ -102,10 +103,10 @@ class LeRobotDatasetMetadata:
def load_metadata(self):
self.info = load_info(self.root)
check_version_compatibility(self.repo_id, self._version, CODEBASE_VERSION)
check_version_compatibility(self.repo_id, self.version, CODEBASE_VERSION)
self.tasks, self.task_to_task_index = load_tasks(self.root)
self.episodes = load_episodes(self.root)
if self._version < packaging.version.parse("v2.1"):
if self.version < packaging.version.parse("v2.1"):
self.stats = load_stats(self.root)
self.episodes_stats = backward_compatible_episodes_stats(self.stats, self.episodes)
else:
@@ -127,7 +128,7 @@ class LeRobotDatasetMetadata:
)
@property
def _version(self) -> packaging.version.Version:
def version(self) -> packaging.version.Version:
"""Codebase version used to create this dataset."""
return packaging.version.parse(self.info["codebase_version"])
@@ -321,8 +322,9 @@ class LeRobotDatasetMetadata:
robot_type = robot.robot_type
if not all(cam.fps == fps for cam in robot.cameras.values()):
logging.warning(
f"Some cameras in your {robot.robot_type} robot don't have an fps matching the fps of your dataset."
"In this case, frames from lower fps cameras will be repeated to fill in the blanks."
"Some cameras in your %s robot don't have an fps matching the fps of your dataset."
"In this case, frames from lower fps cameras will be repeated to fill in the blanks.",
robot.robot_type,
)
elif features is None:
raise ValueError(
@@ -486,7 +488,7 @@ class LeRobotDataset(torch.utils.data.Dataset):
self.meta = LeRobotDatasetMetadata(
self.repo_id, self.root, self.revision, force_cache_sync=force_cache_sync
)
if self.episodes is not None and self.meta._version >= packaging.version.parse("v2.1"):
if self.episodes is not None and self.meta.version >= packaging.version.parse("v2.1"):
episodes_stats = [self.meta.episodes_stats[ep_idx] for ep_idx in self.episodes]
self.stats = aggregate_stats(episodes_stats)
@@ -518,7 +520,7 @@ class LeRobotDataset(torch.utils.data.Dataset):
self,
branch: str | None = None,
tags: list | None = None,
license: str | None = "apache-2.0",
dataset_license: str | None = "apache-2.0",
tag_version: bool = True,
push_videos: bool = True,
private: bool = False,
@@ -561,7 +563,7 @@ class LeRobotDataset(torch.utils.data.Dataset):
if not hub_api.file_exists(self.repo_id, REPOCARD_NAME, repo_type="dataset", revision=branch):
card = create_lerobot_dataset_card(
tags=tags, dataset_info=self.meta.info, license=license, **card_kwargs
tags=tags, dataset_info=self.meta.info, license=dataset_license, **card_kwargs
)
card.push_to_hub(repo_id=self.repo_id, repo_type="dataset", revision=branch)
@@ -842,6 +844,7 @@ class LeRobotDataset(torch.utils.data.Dataset):
save the current episode in self.episode_buffer, which is filled with 'add_frame'. Defaults to
None.
"""
episode_buffer = None
if not episode_data:
episode_buffer = self.episode_buffer
@@ -1086,8 +1089,9 @@ class MultiLeRobotDataset(torch.utils.data.Dataset):
for repo_id, ds in zip(self.repo_ids, self._datasets, strict=True):
extra_keys = set(ds.features).difference(intersection_features)
logging.warning(
f"keys {extra_keys} of {repo_id} were disabled as they are not contained in all the "
"other datasets."
"keys %s of %s were disabled as they are not contained in all the other datasets.",
extra_keys,
repo_id,
)
self.disabled_features.update(extra_keys)

View File

@@ -53,7 +53,7 @@ def rechunk_recompress_array(group, name, chunks=None, chunk_length=None, compre
# rechunk recompress
group.move(name, tmp_key)
old_arr = group[tmp_key]
n_copied, n_skipped, n_bytes_copied = zarr.copy(
_n_copied, _n_skipped, _n_bytes_copied = zarr.copy(
source=old_arr,
dest=group,
name=name,
@@ -192,7 +192,7 @@ class ReplayBuffer:
else:
root = zarr.group(store=store)
# copy without recompression
n_copied, n_skipped, n_bytes_copied = zarr.copy_store(
_n_copied, _n_skipped, _n_bytes_copied = zarr.copy_store(
source=src_store, dest=store, source_path="/meta", dest_path="/meta", if_exists=if_exists
)
data_group = root.create_group("data", overwrite=True)
@@ -205,7 +205,7 @@ class ReplayBuffer:
if cks == value.chunks and cpr == value.compressor:
# copy without recompression
this_path = "/data/" + key
n_copied, n_skipped, n_bytes_copied = zarr.copy_store(
_n_copied, _n_skipped, _n_bytes_copied = zarr.copy_store(
source=src_store,
dest=store,
source_path=this_path,
@@ -214,7 +214,7 @@ class ReplayBuffer:
)
else:
# copy with recompression
n_copied, n_skipped, n_bytes_copied = zarr.copy(
_n_copied, _n_skipped, _n_bytes_copied = zarr.copy(
source=value,
dest=data_group,
name=key,
@@ -275,7 +275,7 @@ class ReplayBuffer:
compressors = {}
if self.backend == "zarr":
# recompression free copy
n_copied, n_skipped, n_bytes_copied = zarr.copy_store(
_n_copied, _n_skipped, _n_bytes_copied = zarr.copy_store(
source=self.root.store,
dest=store,
source_path="/meta",
@@ -297,7 +297,7 @@ class ReplayBuffer:
if cks == value.chunks and cpr == value.compressor:
# copy without recompression
this_path = "/data/" + key
n_copied, n_skipped, n_bytes_copied = zarr.copy_store(
_n_copied, _n_skipped, _n_bytes_copied = zarr.copy_store(
source=self.root.store,
dest=store,
source_path=this_path,

View File

@@ -162,9 +162,9 @@ def download_raw(raw_dir: Path, repo_id: str):
)
raw_dir.mkdir(parents=True, exist_ok=True)
logging.info(f"Start downloading from huggingface.co/{user_id} for {dataset_id}")
logging.info("Start downloading from huggingface.co/%s for %s", user_id, dataset_id)
snapshot_download(repo_id, repo_type="dataset", local_dir=raw_dir)
logging.info(f"Finish downloading from huggingface.co/{user_id} for {dataset_id}")
logging.info("Finish downloading from huggingface.co/%s for %s", user_id, dataset_id)
def download_all_raw_datasets(data_dir: Path | None = None):

View File

@@ -72,7 +72,7 @@ def check_format(raw_dir) -> bool:
assert data[f"/observations/images/{camera}"].ndim == 2
else:
assert data[f"/observations/images/{camera}"].ndim == 4
b, h, w, c = data[f"/observations/images/{camera}"].shape
_, h, w, c = data[f"/observations/images/{camera}"].shape
assert c < h and c < w, f"Expect (h,w,c) image format but ({h=},{w=},{c=}) provided."
@@ -103,6 +103,7 @@ def load_from_raw(
state = torch.from_numpy(ep["/observations/qpos"][:])
action = torch.from_numpy(ep["/action"][:])
velocity = None
if "/observations/qvel" in ep:
velocity = torch.from_numpy(ep["/observations/qvel"][:])
if "/observations/effort" in ep:

View File

@@ -96,6 +96,7 @@ def from_raw_to_lerobot_format(
if fps is None:
fps = 30
# TODO(Steven): Is this meant to call cam_png_format.load_from_raw?
data_dict = load_from_raw(raw_dir, videos_dir, fps, video, episodes)
hf_dataset = to_hf_dataset(data_dict, video)
episode_data_index = calculate_episode_data_index(hf_dataset)

View File

@@ -42,7 +42,9 @@ def check_format(raw_dir) -> bool:
return True
def load_from_raw(raw_dir: Path, videos_dir: Path, fps: int, video: bool, episodes: list[int] | None = None):
def load_from_raw(
raw_dir: Path, videos_dir: Path, fps: int, _video: bool, _episodes: list[int] | None = None
):
# Load data stream that will be used as reference for the timestamps synchronization
reference_files = list(raw_dir.glob("observation.images.cam_*.parquet"))
if len(reference_files) == 0:

View File

@@ -55,7 +55,7 @@ def save_images_concurrently(imgs_array: numpy.array, out_dir: Path, max_workers
num_images = len(imgs_array)
with ThreadPoolExecutor(max_workers=max_workers) as executor:
[executor.submit(save_image, imgs_array[i], i, out_dir) for i in range(num_images)]
_ = [executor.submit(save_image, imgs_array[i], i, out_dir) for i in range(num_images)]
def get_default_encoding() -> dict:
@@ -92,24 +92,23 @@ def calculate_episode_data_index(hf_dataset: datasets.Dataset) -> Dict[str, torc
episode_data_index = {"from": [], "to": []}
current_episode = None
"""
The episode_index is a list of integers, each representing the episode index of the corresponding example.
For instance, the following is a valid episode_index:
[0, 0, 0, 1, 1, 1, 1, 2, 2, 2, 2, 2]
Below, we iterate through the episode_index and populate the episode_data_index dictionary with the starting and
ending index of each episode. For the episode_index above, the episode_data_index dictionary will look like this:
{
"from": [0, 3, 7],
"to": [3, 7, 12]
}
"""
# The episode_index is a list of integers, each representing the episode index of the corresponding example.
# For instance, the following is a valid episode_index:
# [0, 0, 0, 1, 1, 1, 1, 2, 2, 2, 2, 2]
#
# Below, we iterate through the episode_index and populate the episode_data_index dictionary with the starting and
# ending index of each episode. For the episode_index above, the episode_data_index dictionary will look like this:
# {
# "from": [0, 3, 7],
# "to": [3, 7, 12]
# }
if len(hf_dataset) == 0:
episode_data_index = {
"from": torch.tensor([]),
"to": torch.tensor([]),
}
return episode_data_index
idx = None
for idx, episode_idx in enumerate(hf_dataset["episode_index"]):
if episode_idx != current_episode:
# We encountered a new episode, so we append its starting location to the "from" list

View File

@@ -23,6 +23,7 @@ from torchvision.transforms.v2 import Transform
from torchvision.transforms.v2 import functional as F # noqa: N812
# TODO(Steven): Missing transform() implementation
class RandomSubsetApply(Transform):
"""Apply a random subset of N transformations from a list of transformations.
@@ -218,6 +219,7 @@ def make_transform_from_config(cfg: ImageTransformConfig):
raise ValueError(f"Transform '{cfg.type}' is not valid.")
# TODO(Steven): Missing transform() implementation
class ImageTransforms(Transform):
"""A class to compose image transforms based on configuration."""

View File

@@ -135,21 +135,21 @@ def serialize_dict(stats: dict[str, torch.Tensor | np.ndarray | dict]) -> dict:
def embed_images(dataset: datasets.Dataset) -> datasets.Dataset:
# Embed image bytes into the table before saving to parquet
format = dataset.format
ds_format = dataset.format
dataset = dataset.with_format("arrow")
dataset = dataset.map(embed_table_storage, batched=False)
dataset = dataset.with_format(**format)
dataset = dataset.with_format(**ds_format)
return dataset
def load_json(fpath: Path) -> Any:
with open(fpath) as f:
with open(fpath, encoding="utf-8") as f:
return json.load(f)
def write_json(data: dict, fpath: Path) -> None:
fpath.parent.mkdir(exist_ok=True, parents=True)
with open(fpath, "w") as f:
with open(fpath, "w", encoding="utf-8") as f:
json.dump(data, f, indent=4, ensure_ascii=False)
@@ -300,7 +300,7 @@ def check_version_compatibility(
if v_check.major < v_current.major and enforce_breaking_major:
raise BackwardCompatibilityError(repo_id, v_check)
elif v_check.minor < v_current.minor:
logging.warning(V21_MESSAGE.format(repo_id=repo_id, version=v_check))
logging.warning("%s", V21_MESSAGE.format(repo_id=repo_id, version=v_check))
def get_repo_versions(repo_id: str) -> list[packaging.version.Version]:
@@ -348,7 +348,9 @@ def get_safe_version(repo_id: str, version: str | packaging.version.Version) ->
if compatibles:
return_version = max(compatibles)
if return_version < target_version:
logging.warning(f"Revision {version} for {repo_id} not found, using version v{return_version}")
logging.warning(
"Revision %s for %s not found, using version v%s", version, repo_id, return_version
)
return f"v{return_version}"
lower_major = [v for v in hub_versions if v.major < target_version.major]
@@ -403,7 +405,7 @@ def dataset_to_policy_features(features: dict[str, dict]) -> dict[str, PolicyFea
for key, ft in features.items():
shape = ft["shape"]
if ft["dtype"] in ["image", "video"]:
type = FeatureType.VISUAL
feature_type = FeatureType.VISUAL
if len(shape) != 3:
raise ValueError(f"Number of dimensions of {key} != 3 (shape={shape})")
@@ -412,16 +414,16 @@ def dataset_to_policy_features(features: dict[str, dict]) -> dict[str, PolicyFea
if names[2] in ["channel", "channels"]: # (h, w, c) -> (c, h, w)
shape = (shape[2], shape[0], shape[1])
elif key == "observation.environment_state":
type = FeatureType.ENV
feature_type = FeatureType.ENV
elif key.startswith("observation"):
type = FeatureType.STATE
feature_type = FeatureType.STATE
elif key == "action":
type = FeatureType.ACTION
feature_type = FeatureType.ACTION
else:
continue
policy_features[key] = PolicyFeature(
type=type,
type=feature_type,
shape=shape,
)

View File

@@ -871,11 +871,11 @@ def batch_convert():
try:
convert_dataset(repo_id, LOCAL_DIR, **kwargs)
status = f"{repo_id}: success."
with open(logfile, "a") as file:
with open(logfile, "a", encoding="utf-8") as file:
file.write(status + "\n")
except Exception:
status = f"{repo_id}: failed\n {traceback.format_exc()}"
with open(logfile, "a") as file:
with open(logfile, "a", encoding="utf-8") as file:
file.write(status + "\n")
continue

View File

@@ -190,11 +190,11 @@ def convert_stats_to_json(v1_dir: Path, v2_dir: Path) -> None:
json_path = v2_dir / STATS_PATH
json_path.parent.mkdir(exist_ok=True, parents=True)
with open(json_path, "w") as f:
with open(json_path, "w", encoding="utf-8") as f:
json.dump(serialized_stats, f, indent=4)
# Sanity check
with open(json_path) as f:
with open(json_path, encoding="utf-8") as f:
stats_json = json.load(f)
stats_json = flatten_dict(stats_json)
@@ -213,7 +213,7 @@ def get_features_from_hf_dataset(
dtype = ft.dtype
shape = (1,)
names = None
if isinstance(ft, datasets.Sequence):
elif isinstance(ft, datasets.Sequence):
assert isinstance(ft.feature, datasets.Value)
dtype = ft.feature.dtype
shape = (ft.length,)
@@ -232,6 +232,8 @@ def get_features_from_hf_dataset(
dtype = "video"
shape = None # Add shape later
names = ["height", "width", "channels"]
else:
raise NotImplementedError(f"Feature type {ft._type} not supported.")
features[key] = {
"dtype": dtype,
@@ -358,9 +360,9 @@ def move_videos(
if len(video_dirs) == 1:
video_path = video_dirs[0] / video_file
else:
for dir in video_dirs:
if (dir / video_file).is_file():
video_path = dir / video_file
for v_dir in video_dirs:
if (v_dir / video_file).is_file():
video_path = v_dir / video_file
break
video_path.rename(work_dir / target_path)
@@ -652,6 +654,7 @@ def main():
if not args.local_dir:
args.local_dir = Path("/tmp/lerobot_dataset_v2")
robot_config = None
if args.robot is not None:
robot_config = make_robot_config(args.robot)

View File

@@ -50,7 +50,7 @@ def fix_dataset(repo_id: str) -> str:
return f"{repo_id}: skipped (no diff)"
if diff_meta_parquet:
logging.warning(f"In info.json not in parquet: {meta_features - parquet_features}")
logging.warning("In info.json not in parquet: %s", meta_features - parquet_features)
assert diff_meta_parquet == {"language_instruction"}
lerobot_metadata.features.pop("language_instruction")
write_info(lerobot_metadata.info, lerobot_metadata.root)
@@ -79,7 +79,7 @@ def batch_fix():
status = f"{repo_id}: failed\n {traceback.format_exc()}"
logging.info(status)
with open(logfile, "a") as file:
with open(logfile, "a", encoding="utf-8") as file:
file.write(status + "\n")

View File

@@ -46,7 +46,7 @@ def batch_convert():
except Exception:
status = f"{repo_id}: failed\n {traceback.format_exc()}"
with open(logfile, "a") as file:
with open(logfile, "a", encoding="utf-8") as file:
file.write(status + "\n")

View File

@@ -45,6 +45,9 @@ V21 = "v2.1"
class SuppressWarnings:
def __init__(self):
self.previous_level = None
def __enter__(self):
self.previous_level = logging.getLogger().getEffectiveLevel()
logging.getLogger().setLevel(logging.ERROR)

View File

@@ -83,7 +83,7 @@ def decode_video_frames_torchvision(
for frame in reader:
current_ts = frame["pts"]
if log_loaded_timestamps:
logging.info(f"frame loaded at timestamp={current_ts:.4f}")
logging.info("frame loaded at timestamp=%.4f", current_ts)
loaded_frames.append(frame["data"])
loaded_ts.append(current_ts)
if current_ts >= last_ts:
@@ -118,7 +118,7 @@ def decode_video_frames_torchvision(
closest_ts = loaded_ts[argmin_]
if log_loaded_timestamps:
logging.info(f"{closest_ts=}")
logging.info("closest_ts=%s", closest_ts)
# convert to the pytorch format which is float32 in [0,1] range (and channel first)
closest_frames = closest_frames.type(torch.float32) / 255
@@ -227,7 +227,9 @@ def get_audio_info(video_path: Path | str) -> dict:
"json",
str(video_path),
]
result = subprocess.run(ffprobe_audio_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
result = subprocess.run(
ffprobe_audio_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=True
)
if result.returncode != 0:
raise RuntimeError(f"Error running ffprobe: {result.stderr}")
@@ -263,7 +265,9 @@ def get_video_info(video_path: Path | str) -> dict:
"json",
str(video_path),
]
result = subprocess.run(ffprobe_video_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
result = subprocess.run(
ffprobe_video_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=True
)
if result.returncode != 0:
raise RuntimeError(f"Error running ffprobe: {result.stderr}")

View File

@@ -32,7 +32,8 @@ class EnvConfig(draccus.ChoiceRegistry, abc.ABC):
def type(self) -> str:
return self.get_choice_name(self.__class__)
@abc.abstractproperty
@property
@abc.abstractmethod
def gym_kwargs(self) -> dict:
raise NotImplementedError()

View File

@@ -44,7 +44,7 @@ class OptimizerConfig(draccus.ChoiceRegistry, abc.ABC):
return "adam"
@abc.abstractmethod
def build(self) -> torch.optim.Optimizer:
def build(self, params: dict) -> torch.optim.Optimizer:
raise NotImplementedError

View File

@@ -140,7 +140,7 @@ class ACTConfig(PreTrainedConfig):
def __post_init__(self):
super().__post_init__()
"""Input validation (not exhaustive)."""
# Input validation (not exhaustive).
if not self.vision_backbone.startswith("resnet"):
raise ValueError(
f"`vision_backbone` must be one of the ResNet variants. Got {self.vision_backbone}."

View File

@@ -222,6 +222,8 @@ class ACTTemporalEnsembler:
self.chunk_size = chunk_size
self.ensemble_weights = torch.exp(-temporal_ensemble_coeff * torch.arange(chunk_size))
self.ensemble_weights_cumsum = torch.cumsum(self.ensemble_weights, dim=0)
self.ensembled_actions = None
self.ensembled_actions_count = None
self.reset()
def reset(self):

View File

@@ -162,7 +162,7 @@ class DiffusionConfig(PreTrainedConfig):
def __post_init__(self):
super().__post_init__()
"""Input validation (not exhaustive)."""
# Input validation (not exhaustive).
if not self.vision_backbone.startswith("resnet"):
raise ValueError(
f"`vision_backbone` must be one of the ResNet variants. Got {self.vision_backbone}."

View File

@@ -170,6 +170,7 @@ def _make_noise_scheduler(name: str, **kwargs: dict) -> DDPMScheduler | DDIMSche
raise ValueError(f"Unsupported noise scheduler type {name}")
# TODO(Steven): Missing forward() implementation
class DiffusionModel(nn.Module):
def __init__(self, config: DiffusionConfig):
super().__init__()
@@ -203,6 +204,7 @@ class DiffusionModel(nn.Module):
)
if config.num_inference_steps is None:
# TODO(Steven): Consider type check?
self.num_inference_steps = self.noise_scheduler.config.num_train_timesteps
else:
self.num_inference_steps = config.num_inference_steps
@@ -333,7 +335,7 @@ class DiffusionModel(nn.Module):
# Sample a random noising timestep for each item in the batch.
timesteps = torch.randint(
low=0,
high=self.noise_scheduler.config.num_train_timesteps,
high=self.noise_scheduler.config.num_train_timesteps, # TODO(Steven): Consider type check?
size=(trajectory.shape[0],),
device=trajectory.device,
).long()

View File

@@ -69,12 +69,12 @@ def create_stats_buffers(
}
)
elif norm_mode is NormalizationMode.MIN_MAX:
min = torch.ones(shape, dtype=torch.float32) * torch.inf
max = torch.ones(shape, dtype=torch.float32) * torch.inf
min_norm = torch.ones(shape, dtype=torch.float32) * torch.inf
max_norm = torch.ones(shape, dtype=torch.float32) * torch.inf
buffer = nn.ParameterDict(
{
"min": nn.Parameter(min, requires_grad=False),
"max": nn.Parameter(max, requires_grad=False),
"min": nn.Parameter(min_norm, requires_grad=False),
"max": nn.Parameter(max_norm, requires_grad=False),
}
)
@@ -170,12 +170,12 @@ class Normalize(nn.Module):
assert not torch.isinf(std).any(), _no_stats_error_str("std")
batch[key] = (batch[key] - mean) / (std + 1e-8)
elif norm_mode is NormalizationMode.MIN_MAX:
min = buffer["min"]
max = buffer["max"]
assert not torch.isinf(min).any(), _no_stats_error_str("min")
assert not torch.isinf(max).any(), _no_stats_error_str("max")
min_norm = buffer["min"]
max_norm = buffer["max"]
assert not torch.isinf(min_norm).any(), _no_stats_error_str("min")
assert not torch.isinf(max_norm).any(), _no_stats_error_str("max")
# normalize to [0,1]
batch[key] = (batch[key] - min) / (max - min + 1e-8)
batch[key] = (batch[key] - min_norm) / (max_norm - min_norm + 1e-8)
# normalize to [-1, 1]
batch[key] = batch[key] * 2 - 1
else:
@@ -243,12 +243,12 @@ class Unnormalize(nn.Module):
assert not torch.isinf(std).any(), _no_stats_error_str("std")
batch[key] = batch[key] * std + mean
elif norm_mode is NormalizationMode.MIN_MAX:
min = buffer["min"]
max = buffer["max"]
assert not torch.isinf(min).any(), _no_stats_error_str("min")
assert not torch.isinf(max).any(), _no_stats_error_str("max")
min_norm = buffer["min"]
max_norm = buffer["max"]
assert not torch.isinf(min_norm).any(), _no_stats_error_str("min")
assert not torch.isinf(max_norm).any(), _no_stats_error_str("max")
batch[key] = (batch[key] + 1) / 2
batch[key] = batch[key] * (max - min) + min
batch[key] = batch[key] * (max_norm - min_norm) + min_norm
else:
raise ValueError(norm_mode)
return batch

View File

@@ -91,7 +91,7 @@ class PI0Config(PreTrainedConfig):
super().__post_init__()
# TODO(Steven): Validate device and amp? in all policy configs?
"""Input validation (not exhaustive)."""
# Input validation (not exhaustive).
if self.n_action_steps > self.chunk_size:
raise ValueError(
f"The chunk size is the upper bound for the number of action steps per model invocation. Got "

View File

@@ -55,7 +55,7 @@ def main():
with open(save_dir / "noise.pkl", "rb") as f:
noise = pickle.load(f)
with open(ckpt_jax_dir / "assets/norm_stats.json") as f:
with open(ckpt_jax_dir / "assets/norm_stats.json", encoding="utf-8") as f:
norm_stats = json.load(f)
# Override stats

View File

@@ -318,7 +318,7 @@ def update_keys_with_prefix(d: dict, prefix: str) -> dict:
return {f"{prefix}{key}": value for key, value in d.items()}
def convert_pi0_checkpoint(checkpoint_dir: str, precision: str, tokenizer_id: str, output_path: str):
def convert_pi0_checkpoint(checkpoint_dir: str, precision: str, _tokenizer_id: str, output_path: str):
# Break down orbax ckpts - they are in OCDBT
initial_params = slice_initial_orbax_checkpoint(checkpoint_dir=checkpoint_dir)
# process projection params
@@ -432,6 +432,6 @@ if __name__ == "__main__":
convert_pi0_checkpoint(
checkpoint_dir=args.checkpoint_dir,
precision=args.precision,
tokenizer_id=args.tokenizer_hub_id,
_tokenizer_id=args.tokenizer_hub_id,
output_path=args.output_path,
)

View File

@@ -16,6 +16,7 @@ import torch
import torch.nn.functional as F # noqa: N812
from packaging.version import Version
# TODO(Steven): Consider settings this a dependency constraint
if Version(torch.__version__) > Version("2.5.0"):
# Ffex attention is only available from torch 2.5 onwards
from torch.nn.attention.flex_attention import (
@@ -121,7 +122,7 @@ def flex_attention_forward(
)
# mask is applied inside the kernel, ideally more efficiently than score_mod.
attn_output, attention_weights = flex_attention(
attn_output, _attention_weights = flex_attention(
query_states,
key_states,
value_states,

View File

@@ -162,7 +162,7 @@ class TDMPCConfig(PreTrainedConfig):
def __post_init__(self):
super().__post_init__()
"""Input validation (not exhaustive)."""
# Input validation (not exhaustive).
if self.n_gaussian_samples <= 0:
raise ValueError(
f"The number of gaussian samples for CEM should be non-zero. Got `{self.n_gaussian_samples=}`"

View File

@@ -88,6 +88,9 @@ class TDMPCPolicy(PreTrainedPolicy):
for param in self.model_target.parameters():
param.requires_grad = False
self._queues = None
self._prev_mean: torch.Tensor | None = None
self.reset()
def get_optim_params(self) -> dict:
@@ -108,7 +111,7 @@ class TDMPCPolicy(PreTrainedPolicy):
self._queues["observation.environment_state"] = deque(maxlen=1)
# Previous mean obtained from the cross-entropy method (CEM) used during MPC. It is used to warm start
# CEM for the next step.
self._prev_mean: torch.Tensor | None = None
self._prev_mean = None
@torch.no_grad()
def select_action(self, batch: dict[str, Tensor]) -> Tensor:
@@ -514,6 +517,7 @@ class TDMPCPolicy(PreTrainedPolicy):
update_ema_parameters(self.model_target, self.model, self.config.target_model_momentum)
# TODO(Steven): forward implementation missing
class TDMPCTOLD(nn.Module):
"""Task-Oriented Latent Dynamics (TOLD) model used in TD-MPC."""

View File

@@ -144,7 +144,7 @@ class VQBeTConfig(PreTrainedConfig):
def __post_init__(self):
super().__post_init__()
"""Input validation (not exhaustive)."""
# Input validation (not exhaustive).
if not self.vision_backbone.startswith("resnet"):
raise ValueError(
f"`vision_backbone` must be one of the ResNet variants. Got {self.vision_backbone}."

View File

@@ -70,6 +70,8 @@ class VQBeTPolicy(PreTrainedPolicy):
self.vqbet = VQBeTModel(config)
self._queues = None
self.reset()
def get_optim_params(self) -> dict:
@@ -535,7 +537,7 @@ class VQBeTHead(nn.Module):
cbet_logits, "(NT) (G C) -> (NT) G C", G=self.vqvae_model.vqvae_num_layers
)
cbet_probs = torch.softmax(cbet_logits / self.config.bet_softmax_temperature, dim=-1)
NT, G, choices = cbet_probs.shape
NT, _G, choices = cbet_probs.shape
sampled_centers = einops.rearrange(
torch.multinomial(cbet_probs.view(-1, choices), num_samples=1),
"(NT G) 1 -> NT G",
@@ -578,7 +580,7 @@ class VQBeTHead(nn.Module):
"decoded_action": decoded_action,
}
def loss_fn(self, pred, target, **kwargs):
def loss_fn(self, pred, target, **_kwargs):
"""
for given ground truth action values (target), and prediction (pred) this function calculates the overall loss.
@@ -605,7 +607,7 @@ class VQBeTHead(nn.Module):
# Figure out the loss for the actions.
# First, we need to find the closest cluster center for each ground truth action.
with torch.no_grad():
state_vq, action_bins = self.vqvae_model.get_code(action_seq) # action_bins: NT, G
_state_vq, action_bins = self.vqvae_model.get_code(action_seq) # action_bins: NT, G
# Now we can compute the loss.
@@ -762,6 +764,7 @@ def _replace_submodules(
return root_module
# TODO(Steven): Missing implementation of forward, is it maybe vqvae_forward?
class VqVae(nn.Module):
def __init__(
self,
@@ -876,13 +879,13 @@ class FocalLoss(nn.Module):
self.gamma = gamma
self.size_average = size_average
def forward(self, input, target):
if len(input.shape) == 3:
N, T, _ = input.shape
logpt = F.log_softmax(input, dim=-1)
def forward(self, forward_input, target):
if len(forward_input.shape) == 3:
N, T, _ = forward_input.shape
logpt = F.log_softmax(forward_input, dim=-1)
logpt = logpt.gather(-1, target.view(N, T, 1)).view(N, T)
elif len(input.shape) == 2:
logpt = F.log_softmax(input, dim=-1)
elif len(forward_input.shape) == 2:
logpt = F.log_softmax(forward_input, dim=-1)
logpt = logpt.gather(-1, target.view(-1, 1)).view(-1)
pt = logpt.exp()

View File

@@ -34,63 +34,58 @@ from lerobot.common.policies.vqbet.configuration_vqbet import VQBeTConfig
# ruff: noqa: N806
"""
This file is part of a VQ-BeT that utilizes code from the following repositories:
# This file is part of a VQ-BeT that utilizes code from the following repositories:
#
# - Vector Quantize PyTorch code is licensed under the MIT License:
# Original source: https://github.com/lucidrains/vector-quantize-pytorch
#
# - nanoGPT part is an adaptation of Andrej Karpathy's nanoGPT implementation in PyTorch.
# Original source: https://github.com/karpathy/nanoGPT
#
# We also made some changes to the original code to adapt it to our needs. The changes are described in the code below.
- Vector Quantize PyTorch code is licensed under the MIT License:
Original source: https://github.com/lucidrains/vector-quantize-pytorch
- nanoGPT part is an adaptation of Andrej Karpathy's nanoGPT implementation in PyTorch.
Original source: https://github.com/karpathy/nanoGPT
We also made some changes to the original code to adapt it to our needs. The changes are described in the code below.
"""
"""
This is a part for nanoGPT that utilizes code from the following repository:
- Andrej Karpathy's nanoGPT implementation in PyTorch.
Original source: https://github.com/karpathy/nanoGPT
- The nanoGPT code is licensed under the MIT License:
MIT License
Copyright (c) 2022 Andrej Karpathy
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
- We've made some changes to the original code to adapt it to our needs.
Changed variable names:
- n_head -> gpt_n_head
- n_embd -> gpt_hidden_dim
- block_size -> gpt_block_size
- n_layer -> gpt_n_layer
class GPT(nn.Module):
- removed unused functions `def generate`, `def estimate_mfu`, and `def from_pretrained`
- changed the `configure_optimizers` to `def configure_parameters` and made it to return only the parameters of the model: we use an external optimizer in our training loop.
- in the function `forward`, we removed target loss calculation parts, since it will be calculated in the training loop (after passing through bin prediction and offset prediction heads).
"""
# This is a part for nanoGPT that utilizes code from the following repository:
#
# - Andrej Karpathy's nanoGPT implementation in PyTorch.
# Original source: https://github.com/karpathy/nanoGPT
#
# - The nanoGPT code is licensed under the MIT License:
#
# MIT License
#
# Copyright (c) 2022 Andrej Karpathy
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#
# - We've made some changes to the original code to adapt it to our needs.
#
# Changed variable names:
# - n_head -> gpt_n_head
# - n_embd -> gpt_hidden_dim
# - block_size -> gpt_block_size
# - n_layer -> gpt_n_layer
#
#
# class GPT(nn.Module):
# - removed unused functions `def generate`, `def estimate_mfu`, and `def from_pretrained`
# - changed the `configure_optimizers` to `def configure_parameters` and made it to return only the parameters of the model: we use an external optimizer in our training loop.
# - in the function `forward`, we removed target loss calculation parts, since it will be calculated in the training loop (after passing through bin prediction and offset prediction heads).
class CausalSelfAttention(nn.Module):
@@ -200,9 +195,9 @@ class GPT(nn.Module):
n_params = sum(p.numel() for p in self.parameters())
print("number of parameters: {:.2f}M".format(n_params / 1e6))
def forward(self, input, targets=None):
device = input.device
b, t, d = input.size()
def forward(self, forward_input):
device = forward_input.device
_, t, _ = forward_input.size()
assert t <= self.config.gpt_block_size, (
f"Cannot forward sequence of length {t}, block size is only {self.config.gpt_block_size}"
)
@@ -211,7 +206,7 @@ class GPT(nn.Module):
pos = torch.arange(0, t, dtype=torch.long, device=device).unsqueeze(0) # shape (1, t)
# forward the GPT model itself
tok_emb = self.transformer.wte(input) # token embeddings of shape (b, t, gpt_hidden_dim)
tok_emb = self.transformer.wte(forward_input) # token embeddings of shape (b, t, gpt_hidden_dim)
pos_emb = self.transformer.wpe(pos) # position embeddings of shape (1, t, gpt_hidden_dim)
x = self.transformer.drop(tok_emb + pos_emb)
for block in self.transformer.h:
@@ -285,51 +280,48 @@ class GPT(nn.Module):
return decay, no_decay
"""
This file is a part for Residual Vector Quantization that utilizes code from the following repository:
- Phil Wang's vector-quantize-pytorch implementation in PyTorch.
Original source: https://github.com/lucidrains/vector-quantize-pytorch
- The vector-quantize-pytorch code is licensed under the MIT License:
MIT License
Copyright (c) 2020 Phil Wang
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
- We've made some changes to the original code to adapt it to our needs.
class ResidualVQ(nn.Module):
- added `self.register_buffer('freeze_codebook', torch.tensor(False))` to the __init__ method:
This enables the user to save an indicator whether the codebook is frozen or not.
- changed the name of function `get_codes_from_indices` → `get_codebook_vector_from_indices`:
This is to make the function name more descriptive.
class VectorQuantize(nn.Module):
- removed the `use_cosine_sim` and `layernorm_after_project_in` parameters from the __init__ method:
These parameters are not used in the code.
- changed the name of function `get_codes_from_indices` → `get_codebook_vector_from_indices`:
This is to make the function name more descriptive.
"""
# This file is a part for Residual Vector Quantization that utilizes code from the following repository:
#
# - Phil Wang's vector-quantize-pytorch implementation in PyTorch.
# Original source: https://github.com/lucidrains/vector-quantize-pytorch
#
# - The vector-quantize-pytorch code is licensed under the MIT License:
#
# MIT License
#
# Copyright (c) 2020 Phil Wang
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#
# - We've made some changes to the original code to adapt it to our needs.
#
# class ResidualVQ(nn.Module):
# - added `self.register_buffer('freeze_codebook', torch.tensor(False))` to the __init__ method:
# This enables the user to save an indicator whether the codebook is frozen or not.
# - changed the name of function `get_codes_from_indices` → `get_codebook_vector_from_indices`:
# This is to make the function name more descriptive.
#
# class VectorQuantize(nn.Module):
# - removed the `use_cosine_sim` and `layernorm_after_project_in` parameters from the __init__ method:
# These parameters are not used in the code.
# - changed the name of function `get_codes_from_indices` → `get_codebook_vector_from_indices`:
# This is to make the function name more descriptive.
class ResidualVQ(nn.Module):
@@ -479,6 +471,9 @@ class ResidualVQ(nn.Module):
should_quantize_dropout = self.training and self.quantize_dropout and not return_loss
null_indices = None
null_loss = None
# sample a layer index at which to dropout further residual quantization
# also prepare null indices and loss
@@ -933,7 +928,7 @@ class VectorQuantize(nn.Module):
return quantize, embed_ind, loss
def noop(*args, **kwargs):
def noop(*_args, **_kwargs):
pass

View File

@@ -48,7 +48,7 @@ def find_cameras(raise_when_empty=True, mock=False) -> list[dict]:
connected to the computer.
"""
if mock:
import tests.cameras.mock_pyrealsense2 as rs
import tests.mock_pyrealsense2 as rs
else:
import pyrealsense2 as rs
@@ -77,9 +77,9 @@ def save_image(img_array, serial_number, frame_index, images_dir):
path = images_dir / f"camera_{serial_number}_frame_{frame_index:06d}.png"
path.parent.mkdir(parents=True, exist_ok=True)
img.save(str(path), quality=100)
logging.info(f"Saved image: {path}")
logging.info("Saved image: %s", path)
except Exception as e:
logging.error(f"Failed to save image for camera {serial_number} frame {frame_index}: {e}")
logging.error("Failed to save image for camera %s frame %s: %s", serial_number, frame_index, e)
def save_images_from_cameras(
@@ -100,7 +100,7 @@ def save_images_from_cameras(
serial_numbers = [cam["serial_number"] for cam in camera_infos]
if mock:
import tests.cameras.mock_cv2 as cv2
import tests.mock_cv2 as cv2
else:
import cv2
@@ -114,7 +114,7 @@ def save_images_from_cameras(
camera = IntelRealSenseCamera(config)
camera.connect()
print(
f"IntelRealSenseCamera({camera.serial_number}, fps={camera.fps}, width={camera.capture_width}, height={camera.capture_height}, color_mode={camera.color_mode})"
f"IntelRealSenseCamera({camera.serial_number}, fps={camera.fps}, width={camera.width}, height={camera.height}, color_mode={camera.color_mode})"
)
cameras.append(camera)
@@ -224,20 +224,9 @@ class IntelRealSenseCamera:
self.serial_number = self.find_serial_number_from_name(config.name)
else:
self.serial_number = config.serial_number
# Store the raw (capture) resolution from the config.
self.capture_width = config.width
self.capture_height = config.height
# If rotated by ±90, swap width and height.
if config.rotation in [-90, 90]:
self.width = config.height
self.height = config.width
else:
self.width = config.width
self.height = config.height
self.fps = config.fps
self.width = config.width
self.height = config.height
self.channels = config.channels
self.color_mode = config.color_mode
self.use_depth = config.use_depth
@@ -253,10 +242,11 @@ class IntelRealSenseCamera:
self.logs = {}
if self.mock:
import tests.cameras.mock_cv2 as cv2
import tests.mock_cv2 as cv2
else:
import cv2
# TODO(alibets): Do we keep original width/height or do we define them after rotation?
self.rotation = None
if config.rotation == -90:
self.rotation = cv2.ROTATE_90_COUNTERCLOCKWISE
@@ -287,26 +277,22 @@ class IntelRealSenseCamera:
)
if self.mock:
import tests.cameras.mock_pyrealsense2 as rs
import tests.mock_pyrealsense2 as rs
else:
import pyrealsense2 as rs
config = rs.config()
config.enable_device(str(self.serial_number))
if self.fps and self.capture_width and self.capture_height:
if self.fps and self.width and self.height:
# TODO(rcadene): can we set rgb8 directly?
config.enable_stream(
rs.stream.color, self.capture_width, self.capture_height, rs.format.rgb8, self.fps
)
config.enable_stream(rs.stream.color, self.width, self.height, rs.format.rgb8, self.fps)
else:
config.enable_stream(rs.stream.color)
if self.use_depth:
if self.fps and self.capture_width and self.capture_height:
config.enable_stream(
rs.stream.depth, self.capture_width, self.capture_height, rs.format.z16, self.fps
)
if self.fps and self.width and self.height:
config.enable_stream(rs.stream.depth, self.width, self.height, rs.format.z16, self.fps)
else:
config.enable_stream(rs.stream.depth)
@@ -344,18 +330,18 @@ class IntelRealSenseCamera:
raise OSError(
f"Can't set {self.fps=} for IntelRealSenseCamera({self.serial_number}). Actual value is {actual_fps}."
)
if self.capture_width is not None and self.capture_width != actual_width:
if self.width is not None and self.width != actual_width:
raise OSError(
f"Can't set {self.capture_width=} for IntelRealSenseCamera({self.serial_number}). Actual value is {actual_width}."
f"Can't set {self.width=} for IntelRealSenseCamera({self.serial_number}). Actual value is {actual_width}."
)
if self.capture_height is not None and self.capture_height != actual_height:
if self.height is not None and self.height != actual_height:
raise OSError(
f"Can't set {self.capture_height=} for IntelRealSenseCamera({self.serial_number}). Actual value is {actual_height}."
f"Can't set {self.height=} for IntelRealSenseCamera({self.serial_number}). Actual value is {actual_height}."
)
self.fps = round(actual_fps)
self.capture_width = round(actual_width)
self.capture_height = round(actual_height)
self.width = round(actual_width)
self.height = round(actual_height)
self.is_connected = True
@@ -375,7 +361,7 @@ class IntelRealSenseCamera:
)
if self.mock:
import tests.cameras.mock_cv2 as cv2
import tests.mock_cv2 as cv2
else:
import cv2
@@ -401,7 +387,7 @@ class IntelRealSenseCamera:
color_image = cv2.cvtColor(color_image, cv2.COLOR_RGB2BGR)
h, w, _ = color_image.shape
if h != self.capture_height or w != self.capture_width:
if h != self.height or w != self.width:
raise OSError(
f"Can't capture color image with expected height and width ({self.height} x {self.width}). ({h} x {w}) returned instead."
)
@@ -423,7 +409,7 @@ class IntelRealSenseCamera:
depth_map = np.asanyarray(depth_frame.get_data())
h, w = depth_map.shape
if h != self.capture_height or w != self.capture_width:
if h != self.height or w != self.width:
raise OSError(
f"Can't capture depth map with expected height and width ({self.height} x {self.width}). ({h} x {w}) returned instead."
)
@@ -461,7 +447,7 @@ class IntelRealSenseCamera:
num_tries += 1
time.sleep(1 / self.fps)
if num_tries > self.fps and (self.thread.ident is None or not self.thread.is_alive()):
raise Exception(
raise TimeoutError(
"The thread responsible for `self.async_read()` took too much time to start. There might be an issue. Verify that `self.thread.start()` has been called."
)

View File

@@ -45,7 +45,7 @@ from lerobot.common.utils.utils import capture_timestamp_utc
MAX_OPENCV_INDEX = 60
def find_cameras(raise_when_empty=False, max_index_search_range=MAX_OPENCV_INDEX, mock=False) -> list[dict]:
def find_cameras(max_index_search_range=MAX_OPENCV_INDEX, mock=False) -> list[dict]:
cameras = []
if platform.system() == "Linux":
print("Linux detected. Finding available camera indices through scanning '/dev/video*' ports")
@@ -80,7 +80,7 @@ def _find_cameras(
possible_camera_ids: list[int | str], raise_when_empty=False, mock=False
) -> list[int | str]:
if mock:
import tests.cameras.mock_cv2 as cv2
import tests.mock_cv2 as cv2
else:
import cv2
@@ -144,8 +144,8 @@ def save_images_from_cameras(
camera = OpenCVCamera(config)
camera.connect()
print(
f"OpenCVCamera({camera.camera_index}, fps={camera.fps}, width={camera.capture_width}, "
f"height={camera.capture_height}, color_mode={camera.color_mode})"
f"OpenCVCamera({camera.camera_index}, fps={camera.fps}, width={camera.width}, "
f"height={camera.height}, color_mode={camera.color_mode})"
)
cameras.append(camera)
@@ -244,19 +244,9 @@ class OpenCVCamera:
else:
raise ValueError(f"Please check the provided camera_index: {self.camera_index}")
# Store the raw (capture) resolution from the config.
self.capture_width = config.width
self.capture_height = config.height
# If rotated by ±90, swap width and height.
if config.rotation in [-90, 90]:
self.width = config.height
self.height = config.width
else:
self.width = config.width
self.height = config.height
self.fps = config.fps
self.width = config.width
self.height = config.height
self.channels = config.channels
self.color_mode = config.color_mode
self.mock = config.mock
@@ -269,10 +259,11 @@ class OpenCVCamera:
self.logs = {}
if self.mock:
import tests.cameras.mock_cv2 as cv2
import tests.mock_cv2 as cv2
else:
import cv2
# TODO(aliberts): Do we keep original width/height or do we define them after rotation?
self.rotation = None
if config.rotation == -90:
self.rotation = cv2.ROTATE_90_COUNTERCLOCKWISE
@@ -286,7 +277,7 @@ class OpenCVCamera:
raise RobotDeviceAlreadyConnectedError(f"OpenCVCamera({self.camera_index}) is already connected.")
if self.mock:
import tests.cameras.mock_cv2 as cv2
import tests.mock_cv2 as cv2
else:
import cv2
@@ -334,10 +325,10 @@ class OpenCVCamera:
if self.fps is not None:
self.camera.set(cv2.CAP_PROP_FPS, self.fps)
if self.capture_width is not None:
self.camera.set(cv2.CAP_PROP_FRAME_WIDTH, self.capture_width)
if self.capture_height is not None:
self.camera.set(cv2.CAP_PROP_FRAME_HEIGHT, self.capture_height)
if self.width is not None:
self.camera.set(cv2.CAP_PROP_FRAME_WIDTH, self.width)
if self.height is not None:
self.camera.set(cv2.CAP_PROP_FRAME_HEIGHT, self.height)
actual_fps = self.camera.get(cv2.CAP_PROP_FPS)
actual_width = self.camera.get(cv2.CAP_PROP_FRAME_WIDTH)
@@ -349,22 +340,19 @@ class OpenCVCamera:
raise OSError(
f"Can't set {self.fps=} for OpenCVCamera({self.camera_index}). Actual value is {actual_fps}."
)
if self.capture_width is not None and not math.isclose(
self.capture_width, actual_width, rel_tol=1e-3
):
if self.width is not None and not math.isclose(self.width, actual_width, rel_tol=1e-3):
raise OSError(
f"Can't set {self.capture_width=} for OpenCVCamera({self.camera_index}). Actual value is {actual_width}."
f"Can't set {self.width=} for OpenCVCamera({self.camera_index}). Actual value is {actual_width}."
)
if self.capture_height is not None and not math.isclose(
self.capture_height, actual_height, rel_tol=1e-3
):
if self.height is not None and not math.isclose(self.height, actual_height, rel_tol=1e-3):
raise OSError(
f"Can't set {self.capture_height=} for OpenCVCamera({self.camera_index}). Actual value is {actual_height}."
f"Can't set {self.height=} for OpenCVCamera({self.camera_index}). Actual value is {actual_height}."
)
self.fps = round(actual_fps)
self.capture_width = round(actual_width)
self.capture_height = round(actual_height)
self.width = round(actual_width)
self.height = round(actual_height)
self.is_connected = True
def read(self, temporary_color_mode: str | None = None) -> np.ndarray:
@@ -398,14 +386,14 @@ class OpenCVCamera:
# so we convert the image color from BGR to RGB.
if requested_color_mode == "rgb":
if self.mock:
import tests.cameras.mock_cv2 as cv2
import tests.mock_cv2 as cv2
else:
import cv2
color_image = cv2.cvtColor(color_image, cv2.COLOR_BGR2RGB)
h, w, _ = color_image.shape
if h != self.capture_height or w != self.capture_width:
if h != self.height or w != self.width:
raise OSError(
f"Can't capture color image with expected height and width ({self.height} x {self.width}). ({h} x {w}) returned instead."
)

View File

@@ -169,7 +169,8 @@ def convert_degrees_to_steps(degrees: float | np.ndarray, models: str | list[str
return steps
def convert_to_bytes(value, bytes, mock=False):
# TODO(Steven): Similar function in feetch.py, should be moved to a common place.
def convert_to_bytes(value, byte, mock=False):
if mock:
return value
@@ -177,16 +178,16 @@ def convert_to_bytes(value, bytes, mock=False):
# Note: No need to convert back into unsigned int, since this byte preprocessing
# already handles it for us.
if bytes == 1:
if byte == 1:
data = [
dxl.DXL_LOBYTE(dxl.DXL_LOWORD(value)),
]
elif bytes == 2:
elif byte == 2:
data = [
dxl.DXL_LOBYTE(dxl.DXL_LOWORD(value)),
dxl.DXL_HIBYTE(dxl.DXL_LOWORD(value)),
]
elif bytes == 4:
elif byte == 4:
data = [
dxl.DXL_LOBYTE(dxl.DXL_LOWORD(value)),
dxl.DXL_HIBYTE(dxl.DXL_LOWORD(value)),
@@ -196,7 +197,7 @@ def convert_to_bytes(value, bytes, mock=False):
else:
raise NotImplementedError(
f"Value of the number of bytes to be sent is expected to be in [1, 2, 4], but "
f"{bytes} is provided instead."
f"{byte} is provided instead."
)
return data
@@ -228,9 +229,9 @@ def assert_same_address(model_ctrl_table, motor_models, data_name):
all_addr = []
all_bytes = []
for model in motor_models:
addr, bytes = model_ctrl_table[model][data_name]
addr, byte = model_ctrl_table[model][data_name]
all_addr.append(addr)
all_bytes.append(bytes)
all_bytes.append(byte)
if len(set(all_addr)) != 1:
raise NotImplementedError(
@@ -332,7 +333,7 @@ class DynamixelMotorsBus:
)
if self.mock:
import tests.motors.mock_dynamixel_sdk as dxl
import tests.mock_dynamixel_sdk as dxl
else:
import dynamixel_sdk as dxl
@@ -356,7 +357,7 @@ class DynamixelMotorsBus:
def reconnect(self):
if self.mock:
import tests.motors.mock_dynamixel_sdk as dxl
import tests.mock_dynamixel_sdk as dxl
else:
import dynamixel_sdk as dxl
@@ -576,6 +577,8 @@ class DynamixelMotorsBus:
# (start_pos - values[i]) / resolution <= factor <= (end_pos - values[i]) / resolution
low_factor = (start_pos - values[i]) / resolution
upp_factor = (end_pos - values[i]) / resolution
else:
raise ValueError(f"Unknown calibration mode '{calib_mode}'.")
if not in_range:
# Get first integer between the two bounds
@@ -596,10 +599,15 @@ class DynamixelMotorsBus:
elif CalibrationMode[calib_mode] == CalibrationMode.LINEAR:
out_of_range_str = f"{LOWER_BOUND_LINEAR} < {calib_val} < {UPPER_BOUND_LINEAR} %"
in_range_str = f"{LOWER_BOUND_LINEAR} < {calib_val} < {UPPER_BOUND_LINEAR} %"
else:
raise ValueError(f"Unknown calibration mode '{calib_mode}'.")
logging.warning(
f"Auto-correct calibration of motor '{name}' by shifting value by {abs(factor)} full turns, "
f"from '{out_of_range_str}' to '{in_range_str}'."
"Auto-correct calibration of motor '%s' by shifting value by {abs(factor)} full turns, "
"from '%s' to '%s'.",
name,
out_of_range_str,
in_range_str,
)
# A full turn corresponds to 360 degrees but also to 4096 steps for a motor resolution of 4096.
@@ -646,7 +654,7 @@ class DynamixelMotorsBus:
def read_with_motor_ids(self, motor_models, motor_ids, data_name, num_retry=NUM_READ_RETRY):
if self.mock:
import tests.motors.mock_dynamixel_sdk as dxl
import tests.mock_dynamixel_sdk as dxl
else:
import dynamixel_sdk as dxl
@@ -656,8 +664,8 @@ class DynamixelMotorsBus:
motor_ids = [motor_ids]
assert_same_address(self.model_ctrl_table, self.motor_models, data_name)
addr, bytes = self.model_ctrl_table[motor_models[0]][data_name]
group = dxl.GroupSyncRead(self.port_handler, self.packet_handler, addr, bytes)
addr, byte = self.model_ctrl_table[motor_models[0]][data_name]
group = dxl.GroupSyncRead(self.port_handler, self.packet_handler, addr, byte)
for idx in motor_ids:
group.addParam(idx)
@@ -674,7 +682,7 @@ class DynamixelMotorsBus:
values = []
for idx in motor_ids:
value = group.getData(idx, addr, bytes)
value = group.getData(idx, addr, byte)
values.append(value)
if return_list:
@@ -691,7 +699,7 @@ class DynamixelMotorsBus:
start_time = time.perf_counter()
if self.mock:
import tests.motors.mock_dynamixel_sdk as dxl
import tests.mock_dynamixel_sdk as dxl
else:
import dynamixel_sdk as dxl
@@ -709,13 +717,13 @@ class DynamixelMotorsBus:
models.append(model)
assert_same_address(self.model_ctrl_table, models, data_name)
addr, bytes = self.model_ctrl_table[model][data_name]
addr, byte = self.model_ctrl_table[model][data_name]
group_key = get_group_sync_key(data_name, motor_names)
if data_name not in self.group_readers:
# create new group reader
self.group_readers[group_key] = dxl.GroupSyncRead(
self.port_handler, self.packet_handler, addr, bytes
self.port_handler, self.packet_handler, addr, byte
)
for idx in motor_ids:
self.group_readers[group_key].addParam(idx)
@@ -733,7 +741,7 @@ class DynamixelMotorsBus:
values = []
for idx in motor_ids:
value = self.group_readers[group_key].getData(idx, addr, bytes)
value = self.group_readers[group_key].getData(idx, addr, byte)
values.append(value)
values = np.array(values)
@@ -757,7 +765,7 @@ class DynamixelMotorsBus:
def write_with_motor_ids(self, motor_models, motor_ids, data_name, values, num_retry=NUM_WRITE_RETRY):
if self.mock:
import tests.motors.mock_dynamixel_sdk as dxl
import tests.mock_dynamixel_sdk as dxl
else:
import dynamixel_sdk as dxl
@@ -767,10 +775,10 @@ class DynamixelMotorsBus:
values = [values]
assert_same_address(self.model_ctrl_table, motor_models, data_name)
addr, bytes = self.model_ctrl_table[motor_models[0]][data_name]
group = dxl.GroupSyncWrite(self.port_handler, self.packet_handler, addr, bytes)
addr, byte = self.model_ctrl_table[motor_models[0]][data_name]
group = dxl.GroupSyncWrite(self.port_handler, self.packet_handler, addr, byte)
for idx, value in zip(motor_ids, values, strict=True):
data = convert_to_bytes(value, bytes, self.mock)
data = convert_to_bytes(value, byte, self.mock)
group.addParam(idx, data)
for _ in range(num_retry):
@@ -793,7 +801,7 @@ class DynamixelMotorsBus:
start_time = time.perf_counter()
if self.mock:
import tests.motors.mock_dynamixel_sdk as dxl
import tests.mock_dynamixel_sdk as dxl
else:
import dynamixel_sdk as dxl
@@ -821,17 +829,17 @@ class DynamixelMotorsBus:
values = values.tolist()
assert_same_address(self.model_ctrl_table, models, data_name)
addr, bytes = self.model_ctrl_table[model][data_name]
addr, byte = self.model_ctrl_table[model][data_name]
group_key = get_group_sync_key(data_name, motor_names)
init_group = data_name not in self.group_readers
if init_group:
self.group_writers[group_key] = dxl.GroupSyncWrite(
self.port_handler, self.packet_handler, addr, bytes
self.port_handler, self.packet_handler, addr, byte
)
for idx, value in zip(motor_ids, values, strict=True):
data = convert_to_bytes(value, bytes, self.mock)
data = convert_to_bytes(value, byte, self.mock)
if init_group:
self.group_writers[group_key].addParam(idx, data)
else:

View File

@@ -13,6 +13,8 @@
# limitations under the License.
import enum
import logging
import math
import time
import traceback
from copy import deepcopy
@@ -30,6 +32,13 @@ TIMEOUT_MS = 1000
MAX_ID_RANGE = 252
# The following bounds define the lower and upper joints range (after calibration).
# For joints in degree (i.e. revolute joints), their nominal range is [-180, 180] degrees
# which corresponds to a half rotation on the left and half rotation on the right.
# Some joints might require higher range, so we allow up to [-270, 270] degrees until
# an error is raised.
LOWER_BOUND_DEGREE = -270
UPPER_BOUND_DEGREE = 270
# For joints in percentage (i.e. joints that move linearly like the prismatic joint of a gripper),
# their nominal range is [0, 100] %. For instance, for Aloha gripper, 0% is fully
# closed, and 100% is fully open. To account for slight calibration issue, we allow up to
@@ -39,6 +48,7 @@ UPPER_BOUND_LINEAR = 110
HALF_TURN_DEGREE = 180
# See this link for STS3215 Memory Table:
# https://docs.google.com/spreadsheets/d/1GVs7W1VS1PqdhA1nW-abeyAHhTUxKUdR/edit?usp=sharing&ouid=116566590112741600240&rtpof=true&sd=true
# data_name: (address, size_byte)
@@ -104,6 +114,8 @@ SCS_SERIES_BAUDRATE_TABLE = {
}
CALIBRATION_REQUIRED = ["Goal_Position", "Present_Position"]
CONVERT_UINT32_TO_INT32_REQUIRED = ["Goal_Position", "Present_Position"]
MODEL_CONTROL_TABLE = {
"scs_series": SCS_SERIES_CONTROL_TABLE,
@@ -125,66 +137,18 @@ NUM_READ_RETRY = 20
NUM_WRITE_RETRY = 20
def convert_ticks_to_degrees(ticks, model):
resolutions = MODEL_RESOLUTION[model]
# Convert the ticks to degrees
return ticks * (360.0 / resolutions)
def convert_degrees_to_ticks(degrees, model):
resolutions = MODEL_RESOLUTION[model]
# Convert degrees to motor ticks
return int(degrees * (resolutions / 360.0))
def adjusted_to_homing_ticks(raw_motor_ticks: int, model: str, motorbus, motor_id: int) -> int:
def convert_degrees_to_steps(degrees: float | np.ndarray, models: str | list[str]) -> np.ndarray:
"""This function converts the degree range to the step range for indicating motors rotation.
It assumes a motor achieves a full rotation by going from -180 degree position to +180.
The motor resolution (e.g. 4096) corresponds to the number of steps needed to achieve a full rotation.
"""
Takes a raw reading [0..(res-1)] (e.g. 0..4095) and shifts it so that '2048'
becomes 0 in the homed coordinate system ([-2048..+2047] for 4096 resolution).
"""
resolutions = MODEL_RESOLUTION[model]
# Shift raw ticks by half-resolution so 2048 -> 0, then wrap [0..res-1].
ticks = (raw_motor_ticks - (resolutions // 2)) % resolutions
# If above halfway, fold it into negative territory => [-2048..+2047].
if ticks > (resolutions // 2):
ticks -= resolutions
# Flip sign if drive_mode is set.
drive_mode = 0
if motorbus.calibration is not None:
drive_mode = motorbus.calibration["drive_mode"][motor_id - 1]
if drive_mode:
ticks *= -1
return ticks
resolutions = [MODEL_RESOLUTION[model] for model in models]
steps = degrees / 180 * np.array(resolutions) / 2
steps = steps.astype(int)
return steps
def adjusted_to_motor_ticks(adjusted_pos: int, model: str, motorbus, motor_id: int) -> int:
"""
Inverse of adjusted_to_homing_ticks(). Takes a 'homed' position in [-2048..+2047]
and recovers the raw [0..(res-1)] ticks with 2048 as midpoint.
"""
# Flip sign if drive_mode was set.
drive_mode = 0
if motorbus.calibration is not None:
drive_mode = motorbus.calibration["drive_mode"][motor_id - 1]
if drive_mode:
adjusted_pos *= -1
resolutions = MODEL_RESOLUTION[model]
# Shift by +half-resolution and wrap into [0..res-1].
# This undoes the earlier shift by -half-resolution.
ticks = (adjusted_pos + (resolutions // 2)) % resolutions
return ticks
def convert_to_bytes(value, bytes, mock=False):
def convert_to_bytes(value, byte, mock=False):
if mock:
return value
@@ -192,16 +156,16 @@ def convert_to_bytes(value, bytes, mock=False):
# Note: No need to convert back into unsigned int, since this byte preprocessing
# already handles it for us.
if bytes == 1:
if byte == 1:
data = [
scs.SCS_LOBYTE(scs.SCS_LOWORD(value)),
]
elif bytes == 2:
elif byte == 2:
data = [
scs.SCS_LOBYTE(scs.SCS_LOWORD(value)),
scs.SCS_HIBYTE(scs.SCS_LOWORD(value)),
]
elif bytes == 4:
elif byte == 4:
data = [
scs.SCS_LOBYTE(scs.SCS_LOWORD(value)),
scs.SCS_HIBYTE(scs.SCS_LOWORD(value)),
@@ -211,7 +175,7 @@ def convert_to_bytes(value, bytes, mock=False):
else:
raise NotImplementedError(
f"Value of the number of bytes to be sent is expected to be in [1, 2, 4], but "
f"{bytes} is provided instead."
f"{byte} is provided instead."
)
return data
@@ -243,9 +207,9 @@ def assert_same_address(model_ctrl_table, motor_models, data_name):
all_addr = []
all_bytes = []
for model in motor_models:
addr, bytes = model_ctrl_table[model][data_name]
addr, byte = model_ctrl_table[model][data_name]
all_addr.append(addr)
all_bytes.append(bytes)
all_bytes.append(byte)
if len(set(all_addr)) != 1:
raise NotImplementedError(
@@ -340,6 +304,8 @@ class FeetechMotorsBus:
self.group_writers = {}
self.logs = {}
self.track_positions = {}
def connect(self):
if self.is_connected:
raise RobotDeviceAlreadyConnectedError(
@@ -347,7 +313,7 @@ class FeetechMotorsBus:
)
if self.mock:
import tests.motors.mock_scservo_sdk as scs
import tests.mock_scservo_sdk as scs
else:
import scservo_sdk as scs
@@ -371,7 +337,7 @@ class FeetechMotorsBus:
def reconnect(self):
if self.mock:
import tests.motors.mock_scservo_sdk as scs
import tests.mock_scservo_sdk as scs
else:
import scservo_sdk as scs
@@ -436,7 +402,33 @@ class FeetechMotorsBus:
def set_calibration(self, calibration: dict[str, list]):
self.calibration = calibration
def apply_calibration_autocorrect(self, values: np.ndarray | list, motor_names: list[str] | None):
"""This function apply the calibration, automatically detects out of range errors for motors values and attempt to correct.
For more info, see docstring of `apply_calibration` and `autocorrect_calibration`.
"""
try:
values = self.apply_calibration(values, motor_names)
except JointOutOfRangeError as e:
print(e)
self.autocorrect_calibration(values, motor_names)
values = self.apply_calibration(values, motor_names)
return values
def apply_calibration(self, values: np.ndarray | list, motor_names: list[str] | None):
"""Convert from unsigned int32 joint position range [0, 2**32[ to the universal float32 nominal degree range ]-180.0, 180.0[ with
a "zero position" at 0 degree.
Note: We say "nominal degree range" since the motors can take values outside this range. For instance, 190 degrees, if the motor
rotate more than a half a turn from the zero position. However, most motors can't rotate more than 180 degrees and will stay in this range.
Joints values are original in [0, 2**32[ (unsigned int32). Each motor are expected to complete a full rotation
when given a goal position that is + or - their resolution. For instance, feetech xl330-m077 have a resolution of 4096, and
at any position in their original range, let's say the position 56734, they complete a full rotation clockwise by moving to 60830,
or anticlockwise by moving to 52638. The position in the original range is arbitrary and might change a lot between each motor.
To harmonize between motors of the same model, different robots, or even models of different brands, we propose to work
in the centered nominal degree range ]-180, 180[.
"""
if motor_names is None:
motor_names = self.motor_names
@@ -448,11 +440,34 @@ class FeetechMotorsBus:
calib_mode = self.calibration["calib_mode"][calib_idx]
if CalibrationMode[calib_mode] == CalibrationMode.DEGREE:
motor_idx, model = self.motors[name]
drive_mode = self.calibration["drive_mode"][calib_idx]
homing_offset = self.calibration["homing_offset"][calib_idx]
_, model = self.motors[name]
resolution = self.model_resolution[model]
# Convert raw motor ticks to homed ticks, then convert the homed ticks to degrees
values[i] = adjusted_to_homing_ticks(values[i], model, self, motor_idx)
values[i] = convert_ticks_to_degrees(values[i], model)
# Update direction of rotation of the motor to match between leader and follower.
# In fact, the motor of the leader for a given joint can be assembled in an
# opposite direction in term of rotation than the motor of the follower on the same joint.
if drive_mode:
values[i] *= -1
# Convert from range [-2**31, 2**31[ to
# nominal range ]-resolution, resolution[ (e.g. ]-2048, 2048[)
values[i] += homing_offset
# Convert from range ]-resolution, resolution[ to
# universal float32 centered degree range ]-180, 180[
values[i] = values[i] / (resolution // 2) * HALF_TURN_DEGREE
if (values[i] < LOWER_BOUND_DEGREE) or (values[i] > UPPER_BOUND_DEGREE):
raise JointOutOfRangeError(
f"Wrong motor position range detected for {name}. "
f"Expected to be in nominal range of [-{HALF_TURN_DEGREE}, {HALF_TURN_DEGREE}] degrees (a full rotation), "
f"with a maximum range of [{LOWER_BOUND_DEGREE}, {UPPER_BOUND_DEGREE}] degrees to account for joints that can rotate a bit more, "
f"but present value is {values[i]} degree. "
"This might be due to a cable connection issue creating an artificial 360 degrees jump in motor values. "
"You need to recalibrate by running: `python lerobot/scripts/control_robot.py calibrate`"
)
elif CalibrationMode[calib_mode] == CalibrationMode.LINEAR:
start_pos = self.calibration["start_pos"][calib_idx]
@@ -474,6 +489,111 @@ class FeetechMotorsBus:
return values
def autocorrect_calibration(self, values: np.ndarray | list, motor_names: list[str] | None):
"""This function automatically detects issues with values of motors after calibration, and correct for these issues.
Some motors might have values outside of expected maximum bounds after calibration.
For instance, for a joint in degree, its value can be outside [-270, 270] degrees, which is totally unexpected given
a nominal range of [-180, 180] degrees, which represents half a turn to the left or right starting from zero position.
Known issues:
#1: Motor value randomly shifts of a full turn, caused by hardware/connection errors.
#2: Motor internal homing offset is shifted of a full turn, caused by using default calibration (e.g Aloha).
#3: motor internal homing offset is shifted of less or more than a full turn, caused by using default calibration
or by human error during manual calibration.
Issues #1 and #2 can be solved by shifting the calibration homing offset by a full turn.
Issue #3 will be visually detected by user and potentially captured by the safety feature `max_relative_target`,
that will slow down the motor, raise an error asking to recalibrate. Manual recalibrating will solve the issue.
Note: A full turn corresponds to 360 degrees but also to 4096 steps for a motor resolution of 4096.
"""
if motor_names is None:
motor_names = self.motor_names
# Convert from unsigned int32 original range [0, 2**32] to signed float32 range
values = values.astype(np.float32)
for i, name in enumerate(motor_names):
calib_idx = self.calibration["motor_names"].index(name)
calib_mode = self.calibration["calib_mode"][calib_idx]
if CalibrationMode[calib_mode] == CalibrationMode.DEGREE:
drive_mode = self.calibration["drive_mode"][calib_idx]
homing_offset = self.calibration["homing_offset"][calib_idx]
_, model = self.motors[name]
resolution = self.model_resolution[model]
if drive_mode:
values[i] *= -1
# Convert from initial range to range [-180, 180] degrees
calib_val = (values[i] + homing_offset) / (resolution // 2) * HALF_TURN_DEGREE
in_range = (calib_val > LOWER_BOUND_DEGREE) and (calib_val < UPPER_BOUND_DEGREE)
# Solve this inequality to find the factor to shift the range into [-180, 180] degrees
# values[i] = (values[i] + homing_offset + resolution * factor) / (resolution // 2) * HALF_TURN_DEGREE
# - HALF_TURN_DEGREE <= (values[i] + homing_offset + resolution * factor) / (resolution // 2) * HALF_TURN_DEGREE <= HALF_TURN_DEGREE
# (- HALF_TURN_DEGREE / HALF_TURN_DEGREE * (resolution // 2) - values[i] - homing_offset) / resolution <= factor <= (HALF_TURN_DEGREE / 180 * (resolution // 2) - values[i] - homing_offset) / resolution
low_factor = (
-HALF_TURN_DEGREE / HALF_TURN_DEGREE * (resolution // 2) - values[i] - homing_offset
) / resolution
upp_factor = (
HALF_TURN_DEGREE / HALF_TURN_DEGREE * (resolution // 2) - values[i] - homing_offset
) / resolution
elif CalibrationMode[calib_mode] == CalibrationMode.LINEAR:
start_pos = self.calibration["start_pos"][calib_idx]
end_pos = self.calibration["end_pos"][calib_idx]
# Convert from initial range to range [0, 100] in %
calib_val = (values[i] - start_pos) / (end_pos - start_pos) * 100
in_range = (calib_val > LOWER_BOUND_LINEAR) and (calib_val < UPPER_BOUND_LINEAR)
# Solve this inequality to find the factor to shift the range into [0, 100] %
# values[i] = (values[i] - start_pos + resolution * factor) / (end_pos + resolution * factor - start_pos - resolution * factor) * 100
# values[i] = (values[i] - start_pos + resolution * factor) / (end_pos - start_pos) * 100
# 0 <= (values[i] - start_pos + resolution * factor) / (end_pos - start_pos) * 100 <= 100
# (start_pos - values[i]) / resolution <= factor <= (end_pos - values[i]) / resolution
low_factor = (start_pos - values[i]) / resolution
upp_factor = (end_pos - values[i]) / resolution
else:
raise ValueError(f"Unknown calibration mode {calib_mode}")
if not in_range:
# Get first integer between the two bounds
if low_factor < upp_factor:
factor = math.ceil(low_factor)
if factor > upp_factor:
raise ValueError(f"No integer found between bounds [{low_factor=}, {upp_factor=}]")
else:
factor = math.ceil(upp_factor)
if factor > low_factor:
raise ValueError(f"No integer found between bounds [{low_factor=}, {upp_factor=}]")
if CalibrationMode[calib_mode] == CalibrationMode.DEGREE:
out_of_range_str = f"{LOWER_BOUND_DEGREE} < {calib_val} < {UPPER_BOUND_DEGREE} degrees"
in_range_str = f"{LOWER_BOUND_DEGREE} < {calib_val} < {UPPER_BOUND_DEGREE} degrees"
elif CalibrationMode[calib_mode] == CalibrationMode.LINEAR:
out_of_range_str = f"{LOWER_BOUND_LINEAR} < {calib_val} < {UPPER_BOUND_LINEAR} %"
in_range_str = f"{LOWER_BOUND_LINEAR} < {calib_val} < {UPPER_BOUND_LINEAR} %"
else:
raise ValueError(f"Unknown calibration mode {calib_mode}")
logging.warning(
"Auto-correct calibration of motor '%s' by shifting value by %s full turns, "
"from '%s' to '%s'.",
name,
abs(factor),
out_of_range_str,
in_range_str,
)
# A full turn corresponds to 360 degrees but also to 4096 steps for a motor resolution of 4096.
self.calibration["homing_offset"][calib_idx] += resolution * factor
def revert_calibration(self, values: np.ndarray | list, motor_names: list[str] | None):
"""Inverse of `apply_calibration`."""
if motor_names is None:
@@ -484,11 +604,23 @@ class FeetechMotorsBus:
calib_mode = self.calibration["calib_mode"][calib_idx]
if CalibrationMode[calib_mode] == CalibrationMode.DEGREE:
motor_idx, model = self.motors[name]
drive_mode = self.calibration["drive_mode"][calib_idx]
homing_offset = self.calibration["homing_offset"][calib_idx]
_, model = self.motors[name]
resolution = self.model_resolution[model]
# Convert degrees to homed ticks, then convert the homed ticks to raw ticks
values[i] = convert_degrees_to_ticks(values[i], model)
values[i] = adjusted_to_motor_ticks(values[i], model, self, motor_idx)
# Convert from nominal 0-centered degree range [-180, 180] to
# 0-centered resolution range (e.g. [-2048, 2048] for resolution=4096)
values[i] = values[i] / HALF_TURN_DEGREE * (resolution // 2)
# Subtract the homing offsets to come back to actual motor range of values
# which can be arbitrary.
values[i] -= homing_offset
# Remove drive mode, which is the rotation direction of the motor, to come back to
# actual motor rotation direction which can be arbitrary.
if drive_mode:
values[i] *= -1
elif CalibrationMode[calib_mode] == CalibrationMode.LINEAR:
start_pos = self.calibration["start_pos"][calib_idx]
@@ -501,9 +633,46 @@ class FeetechMotorsBus:
values = np.round(values).astype(np.int32)
return values
def avoid_rotation_reset(self, values, motor_names, data_name):
if data_name not in self.track_positions:
self.track_positions[data_name] = {
"prev": [None] * len(self.motor_names),
# Assume False at initialization
"below_zero": [False] * len(self.motor_names),
"above_max": [False] * len(self.motor_names),
}
track = self.track_positions[data_name]
if motor_names is None:
motor_names = self.motor_names
for i, name in enumerate(motor_names):
idx = self.motor_names.index(name)
if track["prev"][idx] is None:
track["prev"][idx] = values[i]
continue
# Detect a full rotation occurred
if abs(track["prev"][idx] - values[i]) > 2048:
# Position went below 0 and got reset to 4095
if track["prev"][idx] < values[i]:
# So we set negative value by adding a full rotation
values[i] -= 4096
# Position went above 4095 and got reset to 0
elif track["prev"][idx] > values[i]:
# So we add a full rotation
values[i] += 4096
track["prev"][idx] = values[i]
return values
def read_with_motor_ids(self, motor_models, motor_ids, data_name, num_retry=NUM_READ_RETRY):
if self.mock:
import tests.motors.mock_scservo_sdk as scs
import tests.mock_scservo_sdk as scs
else:
import scservo_sdk as scs
@@ -513,8 +682,8 @@ class FeetechMotorsBus:
motor_ids = [motor_ids]
assert_same_address(self.model_ctrl_table, self.motor_models, data_name)
addr, bytes = self.model_ctrl_table[motor_models[0]][data_name]
group = scs.GroupSyncRead(self.port_handler, self.packet_handler, addr, bytes)
addr, byte = self.model_ctrl_table[motor_models[0]][data_name]
group = scs.GroupSyncRead(self.port_handler, self.packet_handler, addr, byte)
for idx in motor_ids:
group.addParam(idx)
@@ -531,7 +700,7 @@ class FeetechMotorsBus:
values = []
for idx in motor_ids:
value = group.getData(idx, addr, bytes)
value = group.getData(idx, addr, byte)
values.append(value)
if return_list:
@@ -541,7 +710,7 @@ class FeetechMotorsBus:
def read(self, data_name, motor_names: str | list[str] | None = None):
if self.mock:
import tests.motors.mock_scservo_sdk as scs
import tests.mock_scservo_sdk as scs
else:
import scservo_sdk as scs
@@ -566,7 +735,7 @@ class FeetechMotorsBus:
models.append(model)
assert_same_address(self.model_ctrl_table, models, data_name)
addr, bytes = self.model_ctrl_table[model][data_name]
addr, byte = self.model_ctrl_table[model][data_name]
group_key = get_group_sync_key(data_name, motor_names)
if data_name not in self.group_readers:
@@ -574,9 +743,9 @@ class FeetechMotorsBus:
self.port_handler.ser.reset_output_buffer()
self.port_handler.ser.reset_input_buffer()
# Create new group reader
# create new group reader
self.group_readers[group_key] = scs.GroupSyncRead(
self.port_handler, self.packet_handler, addr, bytes
self.port_handler, self.packet_handler, addr, byte
)
for idx in motor_ids:
self.group_readers[group_key].addParam(idx)
@@ -594,13 +763,20 @@ class FeetechMotorsBus:
values = []
for idx in motor_ids:
value = self.group_readers[group_key].getData(idx, addr, bytes)
value = self.group_readers[group_key].getData(idx, addr, byte)
values.append(value)
values = np.array(values)
# Convert to signed int to use range [-2048, 2048] for our motor positions.
if data_name in CONVERT_UINT32_TO_INT32_REQUIRED:
values = values.astype(np.int32)
if data_name in CALIBRATION_REQUIRED:
values = self.avoid_rotation_reset(values, motor_names, data_name)
if data_name in CALIBRATION_REQUIRED and self.calibration is not None:
values = self.apply_calibration(values, motor_names)
values = self.apply_calibration_autocorrect(values, motor_names)
# log the number of seconds it took to read the data from the motors
delta_ts_name = get_log_name("delta_timestamp_s", "read", data_name, motor_names)
@@ -614,7 +790,7 @@ class FeetechMotorsBus:
def write_with_motor_ids(self, motor_models, motor_ids, data_name, values, num_retry=NUM_WRITE_RETRY):
if self.mock:
import tests.motors.mock_scservo_sdk as scs
import tests.mock_scservo_sdk as scs
else:
import scservo_sdk as scs
@@ -624,10 +800,10 @@ class FeetechMotorsBus:
values = [values]
assert_same_address(self.model_ctrl_table, motor_models, data_name)
addr, bytes = self.model_ctrl_table[motor_models[0]][data_name]
group = scs.GroupSyncWrite(self.port_handler, self.packet_handler, addr, bytes)
addr, byte = self.model_ctrl_table[motor_models[0]][data_name]
group = scs.GroupSyncWrite(self.port_handler, self.packet_handler, addr, byte)
for idx, value in zip(motor_ids, values, strict=True):
data = convert_to_bytes(value, bytes, self.mock)
data = convert_to_bytes(value, byte, self.mock)
group.addParam(idx, data)
for _ in range(num_retry):
@@ -650,7 +826,7 @@ class FeetechMotorsBus:
start_time = time.perf_counter()
if self.mock:
import tests.motors.mock_scservo_sdk as scs
import tests.mock_scservo_sdk as scs
else:
import scservo_sdk as scs
@@ -678,17 +854,17 @@ class FeetechMotorsBus:
values = values.tolist()
assert_same_address(self.model_ctrl_table, models, data_name)
addr, bytes = self.model_ctrl_table[model][data_name]
addr, byte = self.model_ctrl_table[model][data_name]
group_key = get_group_sync_key(data_name, motor_names)
init_group = data_name not in self.group_readers
if init_group:
self.group_writers[group_key] = scs.GroupSyncWrite(
self.port_handler, self.packet_handler, addr, bytes
self.port_handler, self.packet_handler, addr, byte
)
for idx, value in zip(motor_ids, values, strict=True):
data = convert_to_bytes(value, bytes, self.mock)
data = convert_to_bytes(value, byte, self.mock)
if init_group:
self.group_writers[group_key].addParam(idx, data)
else:

View File

@@ -11,11 +11,18 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Logic to calibrate a robot arm built with feetech motors"""
# TODO(rcadene, aliberts): move this logic into the robot code when refactoring
import time
import numpy as np
from lerobot.common.robot_devices.motors.feetech import (
CalibrationMode,
TorqueMode,
convert_degrees_to_steps,
)
from lerobot.common.robot_devices.motors.utils import MotorsBus
@@ -23,164 +30,471 @@ URL_TEMPLATE = (
"https://raw.githubusercontent.com/huggingface/lerobot/main/media/{robot}/{arm}_{position}.webp"
)
# The following positions are provided in nominal degree range ]-180, +180[
# For more info on these constants, see comments in the code where they get used.
ZERO_POSITION_DEGREE = 0
ROTATED_POSITION_DEGREE = 90
def disable_torque(arm: MotorsBus):
def assert_drive_mode(drive_mode):
# `drive_mode` is in [0,1] with 0 means original rotation direction for the motor, and 1 means inverted.
if not np.all(np.isin(drive_mode, [0, 1])):
raise ValueError(f"`drive_mode` contains values other than 0 or 1: ({drive_mode})")
def apply_drive_mode(position, drive_mode):
assert_drive_mode(drive_mode)
# Convert `drive_mode` from [0, 1] with 0 indicates original rotation direction and 1 inverted,
# to [-1, 1] with 1 indicates original rotation direction and -1 inverted.
signed_drive_mode = -(drive_mode * 2 - 1)
position *= signed_drive_mode
return position
def move_until_block(arm, motor_name, positive_direction=True, while_move_hook=None):
count = 0
while True:
present_pos = arm.read("Present_Position", motor_name)
if positive_direction:
# Move +100 steps every time. Lower the steps to lower the speed at which the arm moves.
arm.write("Goal_Position", present_pos + 100, motor_name)
else:
arm.write("Goal_Position", present_pos - 100, motor_name)
if while_move_hook is not None:
while_move_hook()
present_pos = arm.read("Present_Position", motor_name).item()
present_speed = arm.read("Present_Speed", motor_name).item()
present_current = arm.read("Present_Current", motor_name).item()
# present_load = arm.read("Present_Load", motor_name).item()
# present_voltage = arm.read("Present_Voltage", motor_name).item()
# present_temperature = arm.read("Present_Temperature", motor_name).item()
# print(f"{present_pos=}")
# print(f"{present_speed=}")
# print(f"{present_current=}")
# print(f"{present_load=}")
# print(f"{present_voltage=}")
# print(f"{present_temperature=}")
if present_speed == 0 and present_current > 40:
count += 1
if count > 100 or present_current > 300:
return present_pos
else:
count = 0
def move_to_calibrate(
arm,
motor_name,
invert_drive_mode=False,
positive_first=True,
in_between_move_hook=None,
while_move_hook=None,
):
initial_pos = arm.read("Present_Position", motor_name)
p_present_pos = None
n_present_pos = None
if positive_first:
p_present_pos = move_until_block(
arm, motor_name, positive_direction=True, while_move_hook=while_move_hook
)
else:
n_present_pos = move_until_block(
arm, motor_name, positive_direction=False, while_move_hook=while_move_hook
)
if in_between_move_hook is not None:
in_between_move_hook()
if positive_first:
n_present_pos = move_until_block(
arm, motor_name, positive_direction=False, while_move_hook=while_move_hook
)
else:
p_present_pos = move_until_block(
arm, motor_name, positive_direction=True, while_move_hook=while_move_hook
)
zero_pos = (n_present_pos + p_present_pos) / 2
calib_data = {
"initial_pos": initial_pos,
"homing_offset": zero_pos if invert_drive_mode else -zero_pos,
"invert_drive_mode": invert_drive_mode,
"drive_mode": -1 if invert_drive_mode else 0,
"zero_pos": zero_pos,
"start_pos": n_present_pos if invert_drive_mode else p_present_pos,
"end_pos": p_present_pos if invert_drive_mode else n_present_pos,
}
return calib_data
def apply_offset(calib, offset):
calib["zero_pos"] += offset
if calib["drive_mode"]:
calib["homing_offset"] += offset
else:
calib["homing_offset"] -= offset
return calib
def run_arm_auto_calibration(arm: MotorsBus, robot_type: str, arm_name: str, arm_type: str):
if robot_type == "so100":
return run_arm_auto_calibration_so100(arm, robot_type, arm_name, arm_type)
elif robot_type == "moss":
return run_arm_auto_calibration_moss(arm, robot_type, arm_name, arm_type)
else:
raise ValueError(robot_type)
def run_arm_auto_calibration_so100(arm: MotorsBus, robot_type: str, arm_name: str, arm_type: str):
"""All the offsets and magic numbers are hand tuned, and are unique to SO-100 follower arms"""
if (arm.read("Torque_Enable") != TorqueMode.DISABLED.value).any():
raise ValueError("To run calibration, the torque must be disabled on all motors.")
if not (robot_type == "so100" and arm_type == "follower"):
raise NotImplementedError("Auto calibration only supports the follower of so100 arms for now.")
def get_calibration_modes(arm: MotorsBus):
"""Returns calibration modes for each motor (DEGREE for rotational, LINEAR for gripper)."""
return [
CalibrationMode.LINEAR.name if name == "gripper" else CalibrationMode.DEGREE.name
for name in arm.motor_names
]
print(f"\nRunning calibration of {robot_type} {arm_name} {arm_type}...")
print("\nMove arm to initial position")
print("See: " + URL_TEMPLATE.format(robot=robot_type, arm=arm_type, position="initial"))
input("Press Enter to continue...")
def reset_offset(motor_id, motor_bus):
# Open the write lock, changes to EEPROM do NOT persist yet
motor_bus.write("Lock", 1)
# Lower the acceleration of the motors (in [0,254])
initial_acceleration = arm.read("Acceleration")
arm.write("Lock", 0)
arm.write("Acceleration", 10)
time.sleep(1)
# Set offset to 0
motor_name = motor_bus.motor_names[motor_id - 1]
motor_bus.write("Offset", 0, motor_names=[motor_name])
arm.write("Torque_Enable", TorqueMode.ENABLED.value)
# Close the write lock, changes to EEPROM do persist
motor_bus.write("Lock", 0)
print(f'{arm.read("Present_Position", "elbow_flex")=}')
# Confirm that the offset is zero by reading it back
confirmed_offset = motor_bus.read("Offset")[motor_id - 1]
print(f"Offset for motor {motor_id} reset to: {confirmed_offset}")
return confirmed_offset
calib = {}
init_wf_pos = arm.read("Present_Position", "wrist_flex")
init_sl_pos = arm.read("Present_Position", "shoulder_lift")
init_ef_pos = arm.read("Present_Position", "elbow_flex")
arm.write("Goal_Position", init_wf_pos - 800, "wrist_flex")
arm.write("Goal_Position", init_sl_pos + 150 + 1024, "shoulder_lift")
arm.write("Goal_Position", init_ef_pos - 2048, "elbow_flex")
time.sleep(2)
def calibrate_homing_motor(motor_id, motor_bus):
reset_offset(motor_id, motor_bus)
print("Calibrate shoulder_pan")
calib["shoulder_pan"] = move_to_calibrate(arm, "shoulder_pan")
arm.write("Goal_Position", calib["shoulder_pan"]["zero_pos"], "shoulder_pan")
time.sleep(1)
home_ticks = motor_bus.read("Present_Position")[motor_id - 1] # Read index starts at 0
print(f"Encoder offset (present position in homing position): {home_ticks}")
print("Calibrate gripper")
calib["gripper"] = move_to_calibrate(arm, "gripper", invert_drive_mode=True)
time.sleep(1)
return home_ticks
print("Calibrate wrist_flex")
calib["wrist_flex"] = move_to_calibrate(arm, "wrist_flex")
calib["wrist_flex"] = apply_offset(calib["wrist_flex"], offset=80)
def in_between_move_hook_elbow():
nonlocal arm, calib
time.sleep(2)
ef_pos = arm.read("Present_Position", "elbow_flex")
sl_pos = arm.read("Present_Position", "shoulder_lift")
arm.write("Goal_Position", ef_pos + 1024, "elbow_flex")
arm.write("Goal_Position", sl_pos - 1024, "shoulder_lift")
time.sleep(2)
def calibrate_linear_motor(motor_id, motor_bus):
motor_names = motor_bus.motor_names
motor_name = motor_names[motor_id - 1]
print("Calibrate elbow_flex")
calib["elbow_flex"] = move_to_calibrate(
arm, "elbow_flex", positive_first=False, in_between_move_hook=in_between_move_hook_elbow
)
calib["elbow_flex"] = apply_offset(calib["elbow_flex"], offset=80 - 1024)
reset_offset(motor_id, motor_bus)
arm.write("Goal_Position", calib["elbow_flex"]["zero_pos"] + 1024 + 512, "elbow_flex")
time.sleep(1)
input(f"Close the {motor_name}, then press Enter...")
start_pos = motor_bus.read("Present_Position")[motor_id - 1] # Read index starts ar 0
print(f" [Motor {motor_id}] start position recorded: {start_pos}")
def in_between_move_hook_shoulder():
nonlocal arm, calib
arm.write("Goal_Position", calib["elbow_flex"]["zero_pos"], "elbow_flex")
input(f"Open the {motor_name} fully, then press Enter...")
end_pos = motor_bus.read("Present_Position")[motor_id - 1] # Read index starts ar 0
print(f" [Motor {motor_id}] end position recorded: {end_pos}")
print("Calibrate shoulder_lift")
calib["shoulder_lift"] = move_to_calibrate(
arm,
"shoulder_lift",
invert_drive_mode=True,
positive_first=False,
in_between_move_hook=in_between_move_hook_shoulder,
)
# add an 30 steps as offset to align with body
calib["shoulder_lift"] = apply_offset(calib["shoulder_lift"], offset=1024 - 50)
return start_pos, end_pos
def while_move_hook():
nonlocal arm, calib
positions = {
"shoulder_lift": round(calib["shoulder_lift"]["zero_pos"] - 1600),
"elbow_flex": round(calib["elbow_flex"]["zero_pos"] + 1700),
"wrist_flex": round(calib["wrist_flex"]["zero_pos"] + 800),
"gripper": round(calib["gripper"]["end_pos"]),
}
arm.write("Goal_Position", list(positions.values()), list(positions.keys()))
arm.write("Goal_Position", round(calib["shoulder_lift"]["zero_pos"] - 1600), "shoulder_lift")
time.sleep(2)
arm.write("Goal_Position", round(calib["elbow_flex"]["zero_pos"] + 1700), "elbow_flex")
time.sleep(2)
arm.write("Goal_Position", round(calib["wrist_flex"]["zero_pos"] + 800), "wrist_flex")
time.sleep(2)
arm.write("Goal_Position", round(calib["gripper"]["end_pos"]), "gripper")
time.sleep(2)
def single_motor_calibration(arm: MotorsBus, motor_id: int):
"""Calibrates a single motor and returns its calibration data for updating the calibration file."""
print("Calibrate wrist_roll")
calib["wrist_roll"] = move_to_calibrate(
arm, "wrist_roll", invert_drive_mode=True, positive_first=False, while_move_hook=while_move_hook
)
disable_torque(arm)
print(f"\n--- Calibrating Motor {motor_id} ---")
arm.write("Goal_Position", calib["wrist_roll"]["zero_pos"], "wrist_roll")
time.sleep(1)
arm.write("Goal_Position", calib["gripper"]["start_pos"], "gripper")
time.sleep(1)
arm.write("Goal_Position", calib["wrist_flex"]["zero_pos"], "wrist_flex")
time.sleep(1)
arm.write("Goal_Position", calib["elbow_flex"]["zero_pos"] + 2048, "elbow_flex")
arm.write("Goal_Position", calib["shoulder_lift"]["zero_pos"] - 2048, "shoulder_lift")
time.sleep(1)
arm.write("Goal_Position", calib["shoulder_pan"]["zero_pos"], "shoulder_pan")
time.sleep(1)
start_pos = 0
end_pos = 0
encoder_offset = 0
calib_modes = []
for name in arm.motor_names:
if name == "gripper":
calib_modes.append(CalibrationMode.LINEAR.name)
else:
calib_modes.append(CalibrationMode.DEGREE.name)
if motor_id == 6:
start_pos, end_pos = calibrate_linear_motor(motor_id, arm)
else:
input("Move the motor to (zero) position, then press Enter...")
encoder_offset = calibrate_homing_motor(motor_id, arm)
print(f"Calibration for motor ID:{motor_id} done.")
# Create a calibration dictionary for the single motor
calib_dict = {
"homing_offset": int(encoder_offset),
"drive_mode": 0,
"start_pos": int(start_pos),
"end_pos": int(end_pos),
"calib_mode": get_calibration_modes(arm)[motor_id - 1],
"motor_name": arm.motor_names[motor_id - 1],
"homing_offset": [calib[name]["homing_offset"] for name in arm.motor_names],
"drive_mode": [calib[name]["drive_mode"] for name in arm.motor_names],
"start_pos": [calib[name]["start_pos"] for name in arm.motor_names],
"end_pos": [calib[name]["end_pos"] for name in arm.motor_names],
"calib_mode": calib_modes,
"motor_names": arm.motor_names,
}
# Re-enable original accerlation
arm.write("Lock", 0)
arm.write("Acceleration", initial_acceleration)
time.sleep(1)
return calib_dict
def run_full_arm_calibration(arm: MotorsBus, robot_type: str, arm_name: str, arm_type: str):
"""
Runs a full calibration process for all motors in a robotic arm.
def run_arm_auto_calibration_moss(arm: MotorsBus, robot_type: str, arm_name: str, arm_type: str):
"""All the offsets and magic numbers are hand tuned, and are unique to SO-100 follower arms"""
if (arm.read("Torque_Enable") != TorqueMode.DISABLED.value).any():
raise ValueError("To run calibration, the torque must be disabled on all motors.")
This function calibrates each motor in the arm, determining encoder offsets and
start/end positions for linear and rotational motors. The calibration data is then
stored in a dictionary for later use.
**Calibration Process:**
- The user is prompted to move the arm to its homing position before starting.
- Motors with rotational motion are calibrated using a homing method.
- Linear actuators (e.g., grippers) are calibrated separately.
- Encoder offsets, start positions, and end positions are recorded.
**Example Usage:**
```python
run_full_arm_calibration(arm, "so100", "left", "follower")
```
"""
disable_torque(arm)
if not (robot_type == "moss" and arm_type == "follower"):
raise NotImplementedError("Auto calibration only supports the follower of moss arms for now.")
print(f"\nRunning calibration of {robot_type} {arm_name} {arm_type}...")
print("\nMove arm to homing position (middle)")
print(
"See: " + URL_TEMPLATE.format(robot=robot_type, arm=arm_type, position="zero")
) # TODO(pepijn): replace with new instruction homing pos (all motors in middle) in tutorial
print("\nMove arm to initial position")
print("See: " + URL_TEMPLATE.format(robot=robot_type, arm=arm_type, position="initial"))
input("Press Enter to continue...")
start_positions = np.zeros(len(arm.motor_indices))
end_positions = np.zeros(len(arm.motor_indices))
encoder_offsets = np.zeros(len(arm.motor_indices))
# Lower the acceleration of the motors (in [0,254])
initial_acceleration = arm.read("Acceleration")
arm.write("Lock", 0)
arm.write("Acceleration", 10)
time.sleep(1)
modes = get_calibration_modes(arm)
arm.write("Torque_Enable", TorqueMode.ENABLED.value)
for i, motor_id in enumerate(arm.motor_indices):
if modes[i] == CalibrationMode.DEGREE.name:
encoder_offsets[i] = calibrate_homing_motor(motor_id, arm)
start_positions[i] = 0
end_positions[i] = 0
sl_pos = arm.read("Present_Position", "shoulder_lift")
arm.write("Goal_Position", sl_pos - 1024 - 450, "shoulder_lift")
ef_pos = arm.read("Present_Position", "elbow_flex")
arm.write("Goal_Position", ef_pos + 1024 + 450, "elbow_flex")
time.sleep(2)
for i, motor_id in enumerate(arm.motor_indices):
if modes[i] == CalibrationMode.LINEAR.name:
start_positions[i], end_positions[i] = calibrate_linear_motor(motor_id, arm)
encoder_offsets[i] = 0
calib = {}
print("\nMove arm to rest position")
input("Press Enter to continue...")
print("Calibrate shoulder_pan")
calib["shoulder_pan"] = move_to_calibrate(arm, "shoulder_pan")
arm.write("Goal_Position", calib["shoulder_pan"]["zero_pos"], "shoulder_pan")
time.sleep(1)
print(f"\n calibration of {robot_type} {arm_name} {arm_type} done!")
print("Calibrate gripper")
calib["gripper"] = move_to_calibrate(arm, "gripper", invert_drive_mode=True)
time.sleep(1)
# Force drive_mode values (can be static)
drive_modes = [0, 1, 0, 0, 1, 0]
print("Calibrate wrist_flex")
calib["wrist_flex"] = move_to_calibrate(arm, "wrist_flex", invert_drive_mode=True)
calib["wrist_flex"] = apply_offset(calib["wrist_flex"], offset=-210 + 1024)
wr_pos = arm.read("Present_Position", "wrist_roll")
arm.write("Goal_Position", calib["wrist_flex"]["zero_pos"] - 1024, "wrist_flex")
time.sleep(1)
arm.write("Goal_Position", wr_pos - 1024, "wrist_roll")
time.sleep(1)
arm.write("Goal_Position", calib["wrist_flex"]["zero_pos"] - 2048, "wrist_flex")
time.sleep(1)
arm.write("Goal_Position", calib["gripper"]["end_pos"], "gripper")
time.sleep(1)
print("Calibrate wrist_roll")
calib["wrist_roll"] = move_to_calibrate(arm, "wrist_roll", invert_drive_mode=True)
calib["wrist_roll"] = apply_offset(calib["wrist_roll"], offset=790)
arm.write("Goal_Position", calib["wrist_roll"]["zero_pos"] - 1024, "wrist_roll")
arm.write("Goal_Position", calib["gripper"]["start_pos"], "gripper")
arm.write("Goal_Position", calib["wrist_flex"]["zero_pos"] - 1024, "wrist_flex")
time.sleep(1)
arm.write("Goal_Position", calib["wrist_roll"]["zero_pos"], "wrist_roll")
arm.write("Goal_Position", calib["wrist_flex"]["zero_pos"] - 2048, "wrist_flex")
def in_between_move_elbow_flex_hook():
nonlocal arm, calib
arm.write("Goal_Position", calib["wrist_flex"]["zero_pos"], "wrist_flex")
print("Calibrate elbow_flex")
calib["elbow_flex"] = move_to_calibrate(
arm,
"elbow_flex",
invert_drive_mode=True,
in_between_move_hook=in_between_move_elbow_flex_hook,
)
arm.write("Goal_Position", calib["wrist_flex"]["zero_pos"] - 1024, "wrist_flex")
def in_between_move_shoulder_lift_hook():
nonlocal arm, calib
sl = arm.read("Present_Position", "shoulder_lift")
arm.write("Goal_Position", sl - 1500, "shoulder_lift")
time.sleep(1)
arm.write("Goal_Position", calib["elbow_flex"]["zero_pos"] + 1536, "elbow_flex")
time.sleep(1)
arm.write("Goal_Position", calib["wrist_flex"]["start_pos"], "wrist_flex")
time.sleep(1)
print("Calibrate shoulder_lift")
calib["shoulder_lift"] = move_to_calibrate(
arm, "shoulder_lift", in_between_move_hook=in_between_move_shoulder_lift_hook
)
calib["shoulder_lift"] = apply_offset(calib["shoulder_lift"], offset=-1024)
arm.write("Goal_Position", calib["wrist_flex"]["zero_pos"] - 1024, "wrist_flex")
time.sleep(1)
arm.write("Goal_Position", calib["shoulder_lift"]["zero_pos"] + 2048, "shoulder_lift")
arm.write("Goal_Position", calib["elbow_flex"]["zero_pos"] - 1024 - 400, "elbow_flex")
time.sleep(2)
calib_modes = []
for name in arm.motor_names:
if name == "gripper":
calib_modes.append(CalibrationMode.LINEAR.name)
else:
calib_modes.append(CalibrationMode.DEGREE.name)
calib_dict = {
"homing_offset": encoder_offsets.astype(int).tolist(),
"drive_mode": drive_modes,
"start_pos": start_positions.astype(int).tolist(),
"end_pos": end_positions.astype(int).tolist(),
"calib_mode": get_calibration_modes(arm),
"homing_offset": [calib[name]["homing_offset"] for name in arm.motor_names],
"drive_mode": [calib[name]["drive_mode"] for name in arm.motor_names],
"start_pos": [calib[name]["start_pos"] for name in arm.motor_names],
"end_pos": [calib[name]["end_pos"] for name in arm.motor_names],
"calib_mode": calib_modes,
"motor_names": arm.motor_names,
}
# Re-enable original accerlation
arm.write("Lock", 0)
arm.write("Acceleration", initial_acceleration)
time.sleep(1)
return calib_dict
def run_arm_manual_calibration(arm: MotorsBus, robot_type: str, arm_name: str, arm_type: str):
"""This function ensures that a neural network trained on data collected on a given robot
can work on another robot. For instance before calibration, setting a same goal position
for each motor of two different robots will get two very different positions. But after calibration,
the two robots will move to the same position.To this end, this function computes the homing offset
and the drive mode for each motor of a given robot.
Homing offset is used to shift the motor position to a ]-2048, +2048[ nominal range (when the motor uses 2048 steps
to complete a half a turn). This range is set around an arbitrary "zero position" corresponding to all motor positions
being 0. During the calibration process, you will need to manually move the robot to this "zero position".
Drive mode is used to invert the rotation direction of the motor. This is useful when some motors have been assembled
in the opposite orientation for some robots. During the calibration process, you will need to manually move the robot
to the "rotated position".
After calibration, the homing offsets and drive modes are stored in a cache.
Example of usage:
```python
run_arm_calibration(arm, "so100", "left", "follower")
```
"""
if (arm.read("Torque_Enable") != TorqueMode.DISABLED.value).any():
raise ValueError("To run calibration, the torque must be disabled on all motors.")
print(f"\nRunning calibration of {robot_type} {arm_name} {arm_type}...")
print("\nMove arm to zero position")
print("See: " + URL_TEMPLATE.format(robot=robot_type, arm=arm_type, position="zero"))
input("Press Enter to continue...")
# We arbitrarily chose our zero target position to be a straight horizontal position with gripper upwards and closed.
# It is easy to identify and all motors are in a "quarter turn" position. Once calibration is done, this position will
# correspond to every motor angle being 0. If you set all 0 as Goal Position, the arm will move in this position.
zero_target_pos = convert_degrees_to_steps(ZERO_POSITION_DEGREE, arm.motor_models)
# Compute homing offset so that `present_position + homing_offset ~= target_position`.
zero_pos = arm.read("Present_Position")
homing_offset = zero_target_pos - zero_pos
# The rotated target position corresponds to a rotation of a quarter turn from the zero position.
# This allows to identify the rotation direction of each motor.
# For instance, if the motor rotates 90 degree, and its value is -90 after applying the homing offset, then we know its rotation direction
# is inverted. However, for the calibration being successful, we need everyone to follow the same target position.
# Sometimes, there is only one possible rotation direction. For instance, if the gripper is closed, there is only one direction which
# corresponds to opening the gripper. When the rotation direction is ambiguous, we arbitrarily rotate clockwise from the point of view
# of the previous motor in the kinetic chain.
print("\nMove arm to rotated target position")
print("See: " + URL_TEMPLATE.format(robot=robot_type, arm=arm_type, position="rotated"))
input("Press Enter to continue...")
rotated_target_pos = convert_degrees_to_steps(ROTATED_POSITION_DEGREE, arm.motor_models)
# Find drive mode by rotating each motor by a quarter of a turn.
# Drive mode indicates if the motor rotation direction should be inverted (=1) or not (=0).
rotated_pos = arm.read("Present_Position")
drive_mode = (rotated_pos < zero_pos).astype(np.int32)
# Re-compute homing offset to take into account drive mode
rotated_drived_pos = apply_drive_mode(rotated_pos, drive_mode)
homing_offset = rotated_target_pos - rotated_drived_pos
print("\nMove arm to rest position")
print("See: " + URL_TEMPLATE.format(robot=robot_type, arm=arm_type, position="rest"))
input("Press Enter to continue...")
print()
# Joints with rotational motions are expressed in degrees in nominal range of [-180, 180]
calib_modes = []
for name in arm.motor_names:
if name == "gripper":
calib_modes.append(CalibrationMode.LINEAR.name)
else:
calib_modes.append(CalibrationMode.DEGREE.name)
calib_dict = {
"homing_offset": homing_offset.tolist(),
"drive_mode": drive_mode.tolist(),
"start_pos": zero_pos.tolist(),
"end_pos": rotated_pos.tolist(),
"calib_mode": calib_modes,
"motor_names": arm.motor_names,
}
return calib_dict
def run_full_auto_arm_calibration(arm: MotorsBus, robot_type: str, arm_name: str, arm_type: str):
"""TODO(pepijn): Add this method later as extra
Example of usage:
```python
run_full_auto_arm_calibration(arm, "so100", "left", "follower")
```
"""
print(f"\nRunning calibration of {robot_type} {arm_name} {arm_type}...")

View File

@@ -67,14 +67,14 @@ def calibrate_follower_arm(motors_bus, calib_dir_str):
return
if calib_file.exists():
with open(calib_file) as f:
with open(calib_file, encoding="utf-8") as f:
calibration = json.load(f)
print(f"[INFO] Loaded calibration from {calib_file}")
else:
print("[INFO] Calibration file not found. Running manual calibration...")
calibration = run_arm_manual_calibration(motors_bus, "lekiwi", "follower_arm", "follower")
print(f"[INFO] Calibration complete. Saving to {calib_file}")
with open(calib_file, "w") as f:
with open(calib_file, "w", encoding="utf-8") as f:
json.dump(calibration, f)
try:
motors_bus.set_calibration(calibration)

View File

@@ -47,80 +47,15 @@ def ensure_safe_goal_position(
if not torch.allclose(goal_pos, safe_goal_pos):
logging.warning(
"Relative goal position magnitude had to be clamped to be safe.\n"
f" requested relative goal position target: {diff}\n"
f" clamped relative goal position target: {safe_diff}"
" requested relative goal position target: %s\n"
" clamped relative goal position target: %s",
diff,
safe_diff,
)
return safe_goal_pos
def apply_feetech_offsets_from_calibration(motorsbus, calibration_dict: dict):
"""
Reads 'calibration_dict' containing 'homing_offset' and 'motor_names',
then writes each motor's offset to the servo's internal Offset (0x1F) in EPROM.
This version is modified so each homed position (originally 0) will now read
2047, i.e. 180° away from 0 in the 4096-count circle. Offsets are permanently
stored in EEPROM, so the servo's Present_Position is hardware-shifted even
after power cycling.
Steps:
1) Subtract 2047 from the old offset (so 0 -> 2047).
2) Clamp to [-2047..+2047].
3) Encode sign bit and magnitude into a 12-bit number.
"""
homing_offsets = calibration_dict["homing_offset"]
motor_names = calibration_dict["motor_names"]
start_pos = calibration_dict["start_pos"]
# Open the write lock, changes to EEPROM do NOT persist yet
motorsbus.write("Lock", 1)
# For each motor, set the 'Offset' parameter
for m_name, old_offset in zip(motor_names, homing_offsets, strict=False):
# If bus doesnt have a motor named m_name, skip
if m_name not in motorsbus.motors:
print(f"Warning: '{m_name}' not found in motorsbus.motors; skipping offset.")
continue
if m_name == "gripper":
old_offset = start_pos # If gripper set the offset to the start position of the gripper
continue
# Shift the offset so the homed position reads 2047
new_offset = old_offset - 2047
# Clamp to [-2047..+2047]
if new_offset > 2047:
new_offset = 2047
print(
f"Warning: '{new_offset}' is getting clamped because its larger then 2047; This should not happen!"
)
elif new_offset < -2047:
new_offset = -2047
print(
f"Warning: '{new_offset}' is getting clamped because its smaller then -2047; This should not happen!"
)
# Determine the direction (sign) bit and magnitude
direction_bit = 1 if new_offset < 0 else 0
magnitude = abs(new_offset)
# Combine sign bit (bit 11) with the magnitude (bits 0..10)
servo_offset = (direction_bit << 11) | magnitude
# Write offset to servo
motorsbus.write("Offset", servo_offset, motor_names=m_name)
print(
f"Set offset for {m_name}: "
f"old_offset={old_offset}, new_offset={new_offset}, servo_encoded={magnitude} + direction={direction_bit}"
)
motorsbus.write("Lock", 0)
print("Offsets have been saved to EEPROM successfully.")
class ManipulatorRobot:
# TODO(rcadene): Implement force feedback
"""This class allows to control any manipulator robot of various number of motors.
@@ -312,6 +247,8 @@ class ManipulatorRobot:
from lerobot.common.robot_devices.motors.dynamixel import TorqueMode
elif self.robot_type in ["so100", "moss", "lekiwi"]:
from lerobot.common.robot_devices.motors.feetech import TorqueMode
else:
raise NotImplementedError(f"Robot type {self.robot_type} is not supported")
# We assume that at connection time, arms are in a rest position, and torque can
# be safely disabled to run calibration and/or set robot preset configurations.
@@ -369,7 +306,7 @@ class ManipulatorRobot:
arm_calib_path = self.calibration_dir / f"{arm_id}.json"
if arm_calib_path.exists():
with open(arm_calib_path) as f:
with open(arm_calib_path, encoding="utf-8") as f:
calibration = json.load(f)
else:
# TODO(rcadene): display a warning in __init__ if calibration file not available
@@ -382,36 +319,24 @@ class ManipulatorRobot:
elif self.robot_type in ["so100", "moss", "lekiwi"]:
from lerobot.common.robot_devices.robots.feetech_calibration import (
run_full_arm_calibration,
run_arm_manual_calibration,
)
calibration = run_full_arm_calibration(arm, self.robot_type, name, arm_type)
calibration = run_arm_manual_calibration(arm, self.robot_type, name, arm_type)
print(f"Calibration is done! Saving calibration file '{arm_calib_path}'")
arm_calib_path.parent.mkdir(parents=True, exist_ok=True)
with open(arm_calib_path, "w") as f:
with open(arm_calib_path, "w", encoding="utf-8") as f:
json.dump(calibration, f)
return calibration
# For each follower arm
for name, arm_bus in self.follower_arms.items():
calibration = load_or_run_calibration_(name, arm_bus, "follower")
arm_bus.set_calibration(calibration)
# If this is a Feetech robot, also set the servo offset into EEPROM
if self.robot_type in ["so100", "lekiwi"]:
apply_feetech_offsets_from_calibration(arm_bus, calibration)
# For each leader arm
for name, arm_bus in self.leader_arms.items():
calibration = load_or_run_calibration_(name, arm_bus, "leader")
arm_bus.set_calibration(calibration)
# Optionally also set offset for leader if you want the servo offsets as well
if self.robot_type in ["so100", "lekiwi"]:
apply_feetech_offsets_from_calibration(arm_bus, calibration)
for name, arm in self.follower_arms.items():
calibration = load_or_run_calibration_(name, arm, "follower")
arm.set_calibration(calibration)
for name, arm in self.leader_arms.items():
calibration = load_or_run_calibration_(name, arm, "leader")
arm.set_calibration(calibration)
def set_koch_robot_preset(self):
def set_operating_mode_(arm):

View File

@@ -262,14 +262,14 @@ class MobileManipulator:
arm_calib_path = self.calibration_dir / f"{arm_id}.json"
if arm_calib_path.exists():
with open(arm_calib_path) as f:
with open(arm_calib_path, encoding="utf-8") as f:
calibration = json.load(f)
else:
print(f"Missing calibration file '{arm_calib_path}'")
calibration = run_arm_manual_calibration(arm, self.robot_type, name, arm_type)
print(f"Calibration is done! Saving calibration file '{arm_calib_path}'")
arm_calib_path.parent.mkdir(parents=True, exist_ok=True)
with open(arm_calib_path, "w") as f:
with open(arm_calib_path, "w", encoding="utf-8") as f:
json.dump(calibration, f)
return calibration
@@ -372,6 +372,7 @@ class MobileManipulator:
present_speed = self.last_present_speed
# TODO(Steven): [WARN] Plenty of general exceptions
except Exception as e:
print(f"[DEBUG] Error decoding video message: {e}")
# If decode fails, fall back to old data

View File

@@ -68,9 +68,9 @@ class TimeBenchmark(ContextDecorator):
Block took approximately 10.00 milliseconds
"""
def __init__(self, print=False):
def __init__(self, print_time=False):
self.local = threading.local()
self.print_time = print
self.print_time = print_time
def __enter__(self):
self.local.start_time = time.perf_counter()

View File

@@ -46,7 +46,7 @@ def is_package_available(pkg_name: str, return_version: bool = False) -> tuple[b
else:
# For packages other than "torch", don't attempt the fallback and set as not available
package_exists = False
logging.debug(f"Detected {pkg_name} version: {package_version}")
logging.debug("Detected %s version: %s", {pkg_name}, package_version)
if return_version:
return package_exists, package_version
else:

View File

@@ -27,6 +27,8 @@ class AverageMeter:
def __init__(self, name: str, fmt: str = ":f"):
self.name = name
self.fmt = fmt
self.val = 0.0
self.avg = 0.0
self.reset()
def reset(self) -> None:

View File

@@ -69,7 +69,7 @@ def get_safe_torch_device(try_device: str, log: bool = False) -> torch.device:
case _:
device = torch.device(try_device)
if log:
logging.warning(f"Using custom {try_device} device.")
logging.warning("Using custom %s device.", try_device)
return device

View File

@@ -86,7 +86,7 @@ class WandBLogger:
resume="must" if cfg.resume else None,
)
print(colored("Logs will be synced with wandb.", "blue", attrs=["bold"]))
logging.info(f"Track this run --> {colored(wandb.run.get_url(), 'yellow', attrs=['bold'])}")
logging.info("Track this run --> %s", colored(wandb.run.get_url(), "yellow", attrs=["bold"]))
self._wandb = wandb
def log_policy(self, checkpoint_dir: Path):
@@ -108,7 +108,7 @@ class WandBLogger:
for k, v in d.items():
if not isinstance(v, (int, float, str)):
logging.warning(
f'WandB logging of key "{k}" was ignored as its type is not handled by this wrapper.'
'WandB logging of key "%s" was ignored as its type is not handled by this wrapper.', k
)
continue
self._wandb.log({f"{mode}/{k}": v}, step=step)

View File

@@ -11,9 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import importlib
import inspect
import pkgutil
import sys
from argparse import ArgumentError
from functools import wraps
@@ -25,7 +23,6 @@ import draccus
from lerobot.common.utils.utils import has_method
PATH_KEY = "path"
PLUGIN_DISCOVERY_SUFFIX = "discover_packages_path"
draccus.set_config_type("json")
@@ -61,86 +58,6 @@ def parse_arg(arg_name: str, args: Sequence[str] | None = None) -> str | None:
return None
def parse_plugin_args(plugin_arg_suffix: str, args: Sequence[str]) -> dict:
"""Parse plugin-related arguments from command-line arguments.
This function extracts arguments from command-line arguments that match a specified suffix pattern.
It processes arguments in the format '--key=value' and returns them as a dictionary.
Args:
plugin_arg_suffix (str): The suffix to identify plugin-related arguments.
cli_args (Sequence[str]): A sequence of command-line arguments to parse.
Returns:
dict: A dictionary containing the parsed plugin arguments where:
- Keys are the argument names (with '--' prefix removed if present)
- Values are the corresponding argument values
Example:
>>> args = ['--env.discover_packages_path=my_package',
... '--other_arg=value']
>>> parse_plugin_args('discover_packages_path', args)
{'env.discover_packages_path': 'my_package'}
"""
plugin_args = {}
for arg in args:
if "=" in arg and plugin_arg_suffix in arg:
key, value = arg.split("=", 1)
# Remove leading '--' if present
if key.startswith("--"):
key = key[2:]
plugin_args[key] = value
return plugin_args
class PluginLoadError(Exception):
"""Raised when a plugin fails to load."""
def load_plugin(plugin_path: str) -> None:
"""Load and initialize a plugin from a given Python package path.
This function attempts to load a plugin by importing its package and any submodules.
Plugin registration is expected to happen during package initialization, i.e. when
the package is imported the gym environment should be registered and the config classes
registered with their parents using the `register_subclass` decorator.
Args:
plugin_path (str): The Python package path to the plugin (e.g. "mypackage.plugins.myplugin")
Raises:
PluginLoadError: If the plugin cannot be loaded due to import errors or if the package path is invalid.
Examples:
>>> load_plugin("external_plugin.core") # Loads plugin from external package
Notes:
- The plugin package should handle its own registration during import
- All submodules in the plugin package will be imported
- Implementation follows the plugin discovery pattern from Python packaging guidelines
See Also:
https://packaging.python.org/en/latest/guides/creating-and-discovering-plugins/
"""
try:
package_module = importlib.import_module(plugin_path, __package__)
except (ImportError, ModuleNotFoundError) as e:
raise PluginLoadError(
f"Failed to load plugin '{plugin_path}'. Verify the path and installation: {str(e)}"
) from e
def iter_namespace(ns_pkg):
return pkgutil.iter_modules(ns_pkg.__path__, ns_pkg.__name__ + ".")
try:
for _finder, pkg_name, _ispkg in iter_namespace(package_module):
importlib.import_module(pkg_name)
except ImportError as e:
raise PluginLoadError(
f"Failed to load plugin '{plugin_path}'. Verify the path and installation: {str(e)}"
) from e
def get_path_arg(field_name: str, args: Sequence[str] | None = None) -> str | None:
return parse_arg(f"{field_name}.{PATH_KEY}", args)
@@ -188,13 +105,10 @@ def filter_path_args(fields_to_filter: str | list[str], args: Sequence[str] | No
def wrap(config_path: Path | None = None):
"""
HACK: Similar to draccus.wrap but does three additional things:
HACK: Similar to draccus.wrap but does two additional things:
- Will remove '.path' arguments from CLI in order to process them later on.
- If a 'config_path' is passed and the main config class has a 'from_pretrained' method, will
initialize it from there to allow to fetch configs from the hub directly
- Will load plugins specified in the CLI arguments. These plugins will typically register
their own subclasses of config classes, so that draccus can find the right class to instantiate
from the CLI '.type' arguments
"""
def wrapper_outer(fn):
@@ -207,14 +121,6 @@ def wrap(config_path: Path | None = None):
args = args[1:]
else:
cli_args = sys.argv[1:]
plugin_args = parse_plugin_args(PLUGIN_DISCOVERY_SUFFIX, cli_args)
for plugin_cli_arg, plugin_path in plugin_args.items():
try:
load_plugin(plugin_path)
except PluginLoadError as e:
# add the relevant CLI arg to the error message
raise PluginLoadError(f"{e}\nFailed plugin CLI Arg: {plugin_cli_arg}") from e
cli_args = filter_arg(plugin_cli_arg, cli_args)
config_path_cli = parse_arg("config_path", cli_args)
if has_method(argtype, "__get_path_fields__"):
path_fields = argtype.__get_path_fields__()

View File

@@ -64,13 +64,14 @@ class PreTrainedConfig(draccus.ChoiceRegistry, HubMixin, abc.ABC):
self.pretrained_path = None
if not self.device or not is_torch_device_available(self.device):
auto_device = auto_select_torch_device()
logging.warning(f"Device '{self.device}' is not available. Switching to '{auto_device}'.")
logging.warning("Device '%s' is not available. Switching to '%s'.", self.device, auto_device)
self.device = auto_device.type
# Automatically deactivate AMP if necessary
if self.use_amp and not is_amp_available(self.device):
logging.warning(
f"Automatic Mixed Precision (amp) is not available on device '{self.device}'. Deactivating AMP."
"Automatic Mixed Precision (amp) is not available on device '%s'. Deactivating AMP.",
self.device,
)
self.use_amp = False
@@ -78,15 +79,18 @@ class PreTrainedConfig(draccus.ChoiceRegistry, HubMixin, abc.ABC):
def type(self) -> str:
return self.get_choice_name(self.__class__)
@abc.abstractproperty
@property
@abc.abstractmethod
def observation_delta_indices(self) -> list | None:
raise NotImplementedError
@abc.abstractproperty
@property
@abc.abstractmethod
def action_delta_indices(self) -> list | None:
raise NotImplementedError
@abc.abstractproperty
@property
@abc.abstractmethod
def reward_delta_indices(self) -> list | None:
raise NotImplementedError
@@ -128,7 +132,7 @@ class PreTrainedConfig(draccus.ChoiceRegistry, HubMixin, abc.ABC):
return None
def _save_pretrained(self, save_directory: Path) -> None:
with open(save_directory / CONFIG_NAME, "w") as f, draccus.config_type("json"):
with open(save_directory / CONFIG_NAME, "w", encoding="utf-8") as f, draccus.config_type("json"):
draccus.dump(self, f, indent=4)
@classmethod

View File

@@ -123,7 +123,10 @@ class TrainPipelineConfig(HubMixin):
return draccus.encode(self)
def _save_pretrained(self, save_directory: Path) -> None:
with open(save_directory / TRAIN_CONFIG_NAME, "w") as f, draccus.config_type("json"):
with (
open(save_directory / TRAIN_CONFIG_NAME, "w", encoding="utf-8") as f,
draccus.config_type("json"),
):
draccus.dump(self, f, indent=4)
@classmethod

View File

@@ -1,176 +0,0 @@
import argparse
import json
import time
from pathlib import Path
from lerobot.common.robot_devices.motors.feetech import (
FeetechMotorsBus,
adjusted_to_homing_ticks,
adjusted_to_motor_ticks,
convert_degrees_to_ticks,
convert_ticks_to_degrees,
)
from lerobot.common.robot_devices.robots.configs import So100RobotConfig
from lerobot.common.robot_devices.robots.utils import make_robot_from_config
def apply_feetech_offsets_from_calibration(motorsbus: FeetechMotorsBus, calibration_dict: dict):
"""
Reads 'calibration_dict' containing 'homing_offset' and 'motor_names',
then writes each motor's offset to the servo's internal Offset (0x1F) in EPROM.
This version is modified so each homed position (originally 0) will now read
2047, i.e. 180° away from 0 in the 4096-count circle. Offsets are permanently
stored in EEPROM, so the servo's Present_Position is hardware-shifted even
after power cycling.
Steps:
1) Subtract 2047 from the old offset (so 0 -> 2047).
2) Clamp to [-2047..+2047].
3) Encode sign bit and magnitude into a 12-bit number.
"""
homing_offsets = calibration_dict["homing_offset"]
motor_names = calibration_dict["motor_names"]
start_pos = calibration_dict["start_pos"]
# Open the write lock, changes to EEPROM do NOT persist yet
motorsbus.write("Lock", 1)
# For each motor, set the 'Offset' parameter
for m_name, old_offset in zip(motor_names, homing_offsets, strict=False):
# If bus doesnt have a motor named m_name, skip
if m_name not in motorsbus.motors:
print(f"Warning: '{m_name}' not found in motorsbus.motors; skipping offset.")
continue
if m_name == "gripper":
old_offset = start_pos # If gripper set the offset to the start position of the gripper
continue
# Shift the offset so the homed position reads 2047
new_offset = old_offset - 2047
# Clamp to [-2047..+2047]
if new_offset > 2047:
new_offset = 2047
print(
f"Warning: '{new_offset}' is getting clamped because its larger then 2047; This should not happen!"
)
elif new_offset < -2047:
new_offset = -2047
print(
f"Warning: '{new_offset}' is getting clamped because its smaller then -2047; This should not happen!"
)
# Determine the direction (sign) bit and magnitude
direction_bit = 1 if new_offset < 0 else 0
magnitude = abs(new_offset)
# Combine sign bit (bit 11) with the magnitude (bits 0..10)
servo_offset = (direction_bit << 11) | magnitude
# Write offset to servo
motorsbus.write("Offset", servo_offset, motor_names=m_name)
print(
f"Set offset for {m_name}: "
f"old_offset={old_offset}, new_offset={new_offset}, servo_encoded={magnitude} + direction={direction_bit}"
)
motorsbus.write("Lock", 0)
print("Offsets have been saved to EEPROM successfully.")
def debug_feetech_positions(cfg, arm_arg: str):
"""
Reads each joints (1) raw ticks, (2) homed ticks, (3) degrees, and (4) invert-adjusted ticks.
:param arm_arg: One of "main_leader" or "main_follower".
"""
robot = make_robot_from_config(cfg)
# Parse which arm we want: 'main_leader' or 'main_follower'
if arm_arg not in ("main_leader", "main_follower"):
raise ValueError("Please specify --arm=main_leader or --arm=main_follower")
bus_config = robot.leader_arms["main"] if arm_arg == "main_leader" else robot.follower_arms["main"]
bus = FeetechMotorsBus(bus_config)
# Load calibration if it exists
calib_file = Path(robot.calibration_dir) / f"{arm_arg}.json"
if calib_file.exists():
with open(calib_file) as f:
calibration_dict = json.load(f)
bus.set_calibration(calibration_dict)
print(f"Loaded calibration from {calib_file}")
else:
print(f"No calibration file found at {calib_file}, skipping calibration set.")
bus.connect()
print(f"Connected to Feetech bus on port: {bus.port}")
# Apply offset to servo EEPROM so the servos Present_Position is hardware-shifted
if calibration_dict is not None:
print("Applying offsets from calibration object to servo EEPROM. Be careful—this is permanent!")
apply_feetech_offsets_from_calibration(bus, calibration_dict)
# Disable torque on all motors so you can move them freely by hand
bus.write("Torque_Enable", 0, motor_names=bus.motor_names)
print("Torque disabled on all joints.")
try:
print("\nPress Ctrl+C to quit.\n")
while True:
# Read *raw* positions (no calibration).
raw_positions = bus.read_with_motor_ids(
bus.motor_models, bus.motor_indices, data_name="Present_Position"
)
# Read *already-homed* positions
homed_positions = bus.read("Present_Position")
for i, name in enumerate(bus.motor_names):
motor_idx, model = bus.motors[name]
raw_ticks = raw_positions[i] # 0..4095
homed_val = homed_positions[i] # degrees or % if linear
# Manually compute "adjusted ticks" from raw ticks
manual_adjusted = adjusted_to_homing_ticks(raw_ticks, model, bus, motor_idx)
# Convert to degrees
manual_degs = convert_ticks_to_degrees(manual_adjusted, model)
# Convert that deg back to ticks
manual_ticks = convert_degrees_to_ticks(manual_degs, model)
# Then invert them using offset & bus drive mode
inv_ticks = adjusted_to_motor_ticks(manual_ticks, model, bus, motor_idx)
print(
f"{name:15s} | "
f"RAW={raw_ticks:4d} | "
f"HOMED_FROM_READ={homed_val:7.2f} | "
f"HOMED_TICKS={manual_adjusted:6d} | "
f"MANUAL_ADJ_DEG={manual_degs:7.2f} | "
f"MANUAL_ADJ_TICKS={manual_ticks:6d} | "
f"INV_TICKS={inv_ticks:4d} "
)
print("----------------------------------------------------")
time.sleep(0.25)
except KeyboardInterrupt:
pass
finally:
print("\nExiting. Disconnecting bus...")
bus.disconnect()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Debug Feetech positions.")
parser.add_argument(
"--arm",
type=str,
choices=["main_leader", "main_follower"],
default="main_leader",
help="Which arm to debug: 'main_leader' or 'main_follower'.",
)
args = parser.parse_args()
cfg = So100RobotConfig()
debug_feetech_positions(cfg, arm_arg=args.arm)

View File

@@ -90,6 +90,7 @@ def configure_motor(port, brand, model, motor_idx_des, baudrate_des):
print("Scanning all baudrates and motor indices")
all_baudrates = set(series_baudrate_table.values())
motor_index = -1 # Set the motor index to an out-of-range value.
baudrate = None
for baudrate in all_baudrates:
motor_bus.set_bus_baudrate(baudrate)
@@ -145,12 +146,13 @@ def configure_motor(port, brand, model, motor_idx_des, baudrate_des):
# the motors. Note: this configuration is not in the official STS3215 Memory Table
motor_bus.write("Lock", 0)
motor_bus.write("Maximum_Acceleration", 254)
motor_bus.write("Max_Angle_Limit", 4095) # default 4095
motor_bus.write("Min_Angle_Limit", 0) # default 0
motor_bus.write("Offset", 0)
motor_bus.write("Mode", 0)
motor_bus.write("Goal_Position", 2048)
motor_bus.write("Lock", 1)
time.sleep(4)
print("Present Position", motor_bus.read("Present_Position"))
motor_bus.write("Offset", 0)
time.sleep(4)
print("Offset", motor_bus.read("Offset"))
except Exception as e:

View File

@@ -81,6 +81,7 @@ This might require a sudo permission to allow your terminal to monitor keyboard
**NOTE**: You can resume/continue data recording by running the same data recording command twice.
"""
# TODO(Steven): This script should be updated to use the new robot API and the new dataset API.
import argparse
import importlib
import logging

View File

@@ -59,7 +59,7 @@ np_version = np.__version__ if HAS_NP else "N/A"
torch_version = torch.__version__ if HAS_TORCH else "N/A"
torch_cuda_available = torch.cuda.is_available() if HAS_TORCH else "N/A"
cuda_version = torch._C._cuda_getCompiledVersion() if HAS_TORCH and torch.version.cuda is not None else "N/A"
cuda_version = torch.version.cuda if HAS_TORCH and torch.version.cuda is not None else "N/A"
# TODO(aliberts): refactor into an actual command `lerobot env`

View File

@@ -259,6 +259,10 @@ def eval_policy(
threads = [] # for video saving threads
n_episodes_rendered = 0 # for saving the correct number of videos
video_paths: list[str] = [] # max_episodes_rendered > 0:
ep_frames: list[np.ndarray] = [] # max_episodes_rendered > 0
episode_data: dict | None = None # return_episode_data == True
# Callback for visualization.
def render_frame(env: gym.vector.VectorEnv):
# noqa: B023
@@ -271,19 +275,11 @@ def eval_policy(
# Here we must render all frames and discard any we don't need.
ep_frames.append(np.stack(env.call("render")[:n_to_render_now]))
if max_episodes_rendered > 0:
video_paths: list[str] = []
if return_episode_data:
episode_data: dict | None = None
# we dont want progress bar when we use slurm, since it clutters the logs
progbar = trange(n_batches, desc="Stepping through eval batches", disable=inside_slurm())
for batch_ix in progbar:
# Cache frames for rendering videos. Each item will be (b, h, w, c), and the list indexes the rollout
# step.
if max_episodes_rendered > 0:
ep_frames: list[np.ndarray] = []
if start_seed is None:
seeds = None
@@ -320,13 +316,19 @@ def eval_policy(
else:
all_seeds.append(None)
# FIXME: episode_data is either None or it doesn't exist
if return_episode_data:
if episode_data is None:
start_data_index = 0
elif isinstance(episode_data, dict):
start_data_index = episode_data["index"][-1].item() + 1
else:
start_data_index = 0
this_episode_data = _compile_episode_data(
rollout_data,
done_indices,
start_episode_index=batch_ix * env.num_envs,
start_data_index=(0 if episode_data is None else (episode_data["index"][-1].item() + 1)),
start_data_index=start_data_index,
fps=env.unwrapped.metadata["render_fps"],
)
if episode_data is None:
@@ -453,6 +455,7 @@ def _compile_episode_data(
return data_dict
# TODO(Steven): [WARN] Redefining built-in 'eval'
@parser.wrap()
def eval_main(cfg: EvalPipelineConfig):
logging.info(pformat(asdict(cfg)))
@@ -489,7 +492,7 @@ def eval_main(cfg: EvalPipelineConfig):
print(info["aggregated"])
# Save info
with open(Path(cfg.output_dir) / "eval_info.json", "w") as f:
with open(Path(cfg.output_dir) / "eval_info.json", "w", encoding="utf-8") as f:
json.dump(info, f, indent=2)
env.close()

View File

@@ -53,6 +53,7 @@ import torch
from huggingface_hub import HfApi
from safetensors.torch import save_file
# TODO(Steven): #711 Broke this
from lerobot.common.datasets.compute_stats import compute_stats
from lerobot.common.datasets.lerobot_dataset import CODEBASE_VERSION, LeRobotDataset
from lerobot.common.datasets.push_dataset_to_hub.utils import check_repo_id
@@ -89,7 +90,7 @@ def save_meta_data(
# save info
info_path = meta_data_dir / "info.json"
with open(str(info_path), "w") as f:
with open(str(info_path), "w", encoding="utf-8") as f:
json.dump(info, f, indent=4)
# save stats
@@ -120,11 +121,11 @@ def push_dataset_card_to_hub(
repo_id: str,
revision: str | None,
tags: list | None = None,
license: str = "apache-2.0",
dataset_license: str = "apache-2.0",
**card_kwargs,
):
"""Creates and pushes a LeRobotDataset Card with appropriate tags to easily find it on the hub."""
card = create_lerobot_dataset_card(tags=tags, license=license, **card_kwargs)
card = create_lerobot_dataset_card(tags=tags, license=dataset_license, **card_kwargs)
card.push_to_hub(repo_id=repo_id, repo_type="dataset", revision=revision)
@@ -213,6 +214,7 @@ def push_dataset_to_hub(
encoding,
)
# TODO(Steven): This doesn't seem to exist, maybe it was removed/changed recently?
lerobot_dataset = LeRobotDataset.from_preloaded(
repo_id=repo_id,
hf_dataset=hf_dataset,

View File

@@ -155,12 +155,14 @@ def train(cfg: TrainPipelineConfig):
logging.info(colored("Output dir:", "yellow", attrs=["bold"]) + f" {cfg.output_dir}")
if cfg.env is not None:
logging.info(f"{cfg.env.task=}")
logging.info(f"{cfg.steps=} ({format_big_number(cfg.steps)})")
logging.info(f"{dataset.num_frames=} ({format_big_number(dataset.num_frames)})")
logging.info(f"{dataset.num_episodes=}")
logging.info(f"{num_learnable_params=} ({format_big_number(num_learnable_params)})")
logging.info(f"{num_total_params=} ({format_big_number(num_total_params)})")
logging.info("cfg.env.task=%s", cfg.env.task)
logging.info("cfg.steps=%s (%s)", cfg.steps, format_big_number(cfg.steps))
logging.info("dataset.num_frames=%s (%s)", dataset.num_frames, format_big_number(dataset.num_frames))
logging.info("dataset.num_episodes=%s", dataset.num_episodes)
logging.info(
"num_learnable_params=%s (%s)", num_learnable_params, format_big_number(num_learnable_params)
)
logging.info("num_total_params=%s (%s)", num_total_params, format_big_number(num_total_params))
# create dataloader for offline training
if hasattr(cfg.policy, "drop_n_last_frames"):
@@ -238,7 +240,7 @@ def train(cfg: TrainPipelineConfig):
train_tracker.reset_averages()
if cfg.save_checkpoint and is_saving_step:
logging.info(f"Checkpoint policy after step {step}")
logging.info("Checkpoint policy after step %s", step)
checkpoint_dir = get_step_checkpoint_dir(cfg.output_dir, cfg.steps, step)
save_checkpoint(checkpoint_dir, step, cfg, policy, optimizer, lr_scheduler)
update_last_checkpoint(checkpoint_dir)
@@ -247,7 +249,7 @@ def train(cfg: TrainPipelineConfig):
if cfg.env and is_eval_step:
step_id = get_step_identifier(step, cfg.steps)
logging.info(f"Eval policy at step {step}")
logging.info("Eval policy at step %s", step)
with (
torch.no_grad(),
torch.autocast(device_type=device.type) if cfg.policy.use_amp else nullcontext(),

View File

@@ -265,25 +265,13 @@ def main():
),
)
parser.add_argument(
"--tolerance-s",
type=float,
default=1e-4,
help=(
"Tolerance in seconds used to ensure data timestamps respect the dataset fps value"
"This is argument passed to the constructor of LeRobotDataset and maps to its tolerance_s constructor argument"
"If not given, defaults to 1e-4."
),
)
args = parser.parse_args()
kwargs = vars(args)
repo_id = kwargs.pop("repo_id")
root = kwargs.pop("root")
tolerance_s = kwargs.pop("tolerance_s")
logging.info("Loading dataset")
dataset = LeRobotDataset(repo_id, root=root, tolerance_s=tolerance_s)
dataset = LeRobotDataset(repo_id, root=root)
visualize_dataset(dataset, **vars(args))

View File

@@ -150,7 +150,7 @@ def run_server(
400,
)
dataset_version = (
str(dataset.meta._version) if isinstance(dataset, LeRobotDataset) else dataset.codebase_version
str(dataset.meta.version) if isinstance(dataset, LeRobotDataset) else dataset.codebase_version
)
match = re.search(r"v(\d+)\.", dataset_version)
if match:
@@ -358,7 +358,7 @@ def visualize_dataset_html(
if force_override:
shutil.rmtree(output_dir)
else:
logging.info(f"Output directory already exists. Loading from it: '{output_dir}'")
logging.info("Output directory already exists. Loading from it: '%s'", {output_dir})
output_dir.mkdir(parents=True, exist_ok=True)
@@ -446,31 +446,15 @@ def main():
help="Delete the output directory if it exists already.",
)
parser.add_argument(
"--tolerance-s",
type=float,
default=1e-4,
help=(
"Tolerance in seconds used to ensure data timestamps respect the dataset fps value"
"This is argument passed to the constructor of LeRobotDataset and maps to its tolerance_s constructor argument"
"If not given, defaults to 1e-4."
),
)
args = parser.parse_args()
kwargs = vars(args)
repo_id = kwargs.pop("repo_id")
load_from_hf_hub = kwargs.pop("load_from_hf_hub")
root = kwargs.pop("root")
tolerance_s = kwargs.pop("tolerance_s")
dataset = None
if repo_id:
dataset = (
LeRobotDataset(repo_id, root=root, tolerance_s=tolerance_s)
if not load_from_hf_hub
else get_dataset_info(repo_id)
)
dataset = LeRobotDataset(repo_id, root=root) if not load_from_hf_hub else get_dataset_info(repo_id)
visualize_dataset_html(dataset, **vars(args))

View File

@@ -56,6 +56,7 @@ dependencies = [
"gymnasium==0.29.1", # TODO(rcadene, aliberts): Make gym 1.0.0 work
"h5py>=3.10.0",
"huggingface-hub[hf-transfer,cli]>=0.27.1 ; python_version < '4.0'",
"hydra-core>=1.3.2",
"imageio[ffmpeg]>=2.34.0",
"jsonlines>=4.0.0",
"numba>=0.59.0",
@@ -102,7 +103,30 @@ requires-poetry = ">=2.1"
[tool.ruff]
line-length = 110
target-version = "py310"
exclude = ["tests/artifacts/**/*.safetensors"]
exclude = [
"tests/data",
".bzr",
".direnv",
".eggs",
".git",
".git-rewrite",
".hg",
".mypy_cache",
".nox",
".pants.d",
".pytype",
".ruff_cache",
".svn",
".tox",
".venv",
"__pypackages__",
"_build",
"buck-out",
"build",
"dist",
"node_modules",
"venv",
]
[tool.ruff.lint]
select = ["E4", "E7", "E9", "F", "I", "N", "B", "C4", "SIM"]

View File

@@ -1,89 +0,0 @@
import sys
from dataclasses import dataclass
from pathlib import Path
from typing import Generator
import pytest
from lerobot.common.envs.configs import EnvConfig
from lerobot.configs.parser import PluginLoadError, load_plugin, parse_plugin_args, wrap
def create_plugin_code(*, base_class: str = "EnvConfig", plugin_name: str = "test_env") -> str:
"""Creates a dummy plugin module that implements its own EnvConfig subclass."""
return f"""
from dataclasses import dataclass
from lerobot.common.envs.configs import {base_class}
@{base_class}.register_subclass("{plugin_name}")
@dataclass
class TestPluginConfig:
value: int = 42
"""
@pytest.fixture
def plugin_dir(tmp_path: Path) -> Generator[Path, None, None]:
"""Creates a temporary plugin package structure."""
plugin_pkg = tmp_path / "test_plugin"
plugin_pkg.mkdir()
(plugin_pkg / "__init__.py").touch()
with open(plugin_pkg / "my_plugin.py", "w") as f:
f.write(create_plugin_code())
# Add tmp_path to Python path so we can import from it
sys.path.insert(0, str(tmp_path))
yield plugin_pkg
sys.path.pop(0)
def test_parse_plugin_args():
cli_args = [
"--env.type=test",
"--model.discover_packages_path=some.package",
"--env.discover_packages_path=other.package",
]
plugin_args = parse_plugin_args("discover_packages_path", cli_args)
assert plugin_args == {
"model.discover_packages_path": "some.package",
"env.discover_packages_path": "other.package",
}
def test_load_plugin_success(plugin_dir: Path):
# Import should work and register the plugin with the real EnvConfig
load_plugin("test_plugin")
assert "test_env" in EnvConfig.get_known_choices()
plugin_cls = EnvConfig.get_choice_class("test_env")
plugin_instance = plugin_cls()
assert plugin_instance.value == 42
def test_load_plugin_failure():
with pytest.raises(PluginLoadError) as exc_info:
load_plugin("nonexistent_plugin")
assert "Failed to load plugin 'nonexistent_plugin'" in str(exc_info.value)
def test_wrap_with_plugin(plugin_dir: Path):
@dataclass
class Config:
env: EnvConfig
@wrap()
def dummy_func(cfg: Config):
return cfg
# Test loading plugin via CLI args
sys.argv = [
"dummy_script.py",
"--env.discover_packages_path=test_plugin",
"--env.type=test_env",
]
cfg = dummy_func()
assert isinstance(cfg, Config)
assert isinstance(cfg.env, EnvConfig.get_choice_class("test_env"))
assert cfg.env.value == 42

Some files were not shown because too many files have changed in this diff Show More