Compare commits

..

70 Commits

Author SHA1 Message Date
Remi Cadene
82df3feaee TOREMOVE: isolate aloha on __init__ to see if it creates the bug 2024-10-07 12:12:32 +02:00
Remi Cadene
2a8a9dc25a TOREMOVE: remove aloha from __init__ to test if this creates the bug 2024-10-07 12:11:24 +02:00
Remi Cadene
dc08c3bfa4 small 2024-10-07 12:10:46 +02:00
Remi Cadene
68fff561de Merge remote-tracking branch 'origin/main' into user/rcadene/2024_09_10_train_aloha 2024-10-04 19:08:55 +02:00
Remi Cadene
433e950348 Merge remote-tracking branch 'origin/main' into user/rcadene/2024_09_10_train_aloha 2024-10-03 17:16:59 +02:00
Remi Cadene
e58e59411a Add num_workers >=1 capabilities (default to 1) 2024-09-28 16:05:54 +02:00
Remi Cadene
3369d351a7 Fix slow fps 2024-09-28 15:41:15 +02:00
Remi Cadene
8b89d03d74 Merge remote-tracking branch 'origin/user/rcadene/2024_09_10_train_aloha' into user/rcadene/2024_09_10_train_aloha 2024-09-28 15:01:15 +02:00
Remi Cadene
77ba43d25b WIP: add multiprocess 2024-09-28 15:00:38 +02:00
Remi Cadene
9b76ee9eb0 Merge remote-tracking branch 'origin/user/rcadene/2024_09_01_mock_robot_devices' into user/rcadene/2024_09_10_train_aloha 2024-09-28 14:32:33 +02:00
Remi Cadene
48911e0cd3 Merge remote-tracking branch 'origin/main' into user/rcadene/2024_09_10_train_aloha 2024-09-28 13:25:51 +02:00
Remi Cadene
5c73bec913 Address Jess comments 2024-09-28 13:11:45 +02:00
Remi
1de04e4756 Merge branch 'main' into user/rcadene/2024_09_01_mock_robot_devices 2024-09-27 18:04:56 +02:00
Remi Cadene
83cfe60783 tests 2024-09-27 17:46:49 +02:00
Remi Cadene
0e63f7c1b5 test 2024-09-27 17:42:48 +02:00
Remi Cadene
bc479cb2d4 test 2024-09-27 17:22:51 +02:00
Remi Cadene
2c9defabdd test 2024-09-27 17:15:21 +02:00
Remi Cadene
cc5c623179 test 2024-09-27 17:12:40 +02:00
Remi Cadene
88c2ed419e fix unit tests 2024-09-27 17:03:27 +02:00
Remi Cadene
2e694fcf8f test 2024-09-27 16:56:53 +02:00
Remi Cadene
9dea00ee9e retest 2024-09-27 16:39:53 +02:00
Remi Cadene
50a979d6de Check if file exists 2024-09-27 16:33:58 +02:00
Remi Cadene
76cc47956a add 2024-09-27 16:21:27 +02:00
Remi Cadene
675d4286c8 add 2024-09-27 16:20:00 +02:00
Remi Cadene
da1888a378 revert to all tests 2024-09-27 14:59:17 +02:00
Remi Cadene
3f9f3dd027 Add pyserial 2024-09-27 14:57:32 +02:00
Remi Cadene
c704eb94c0 improve except 2024-09-27 13:54:32 +02:00
Remi Cadene
0352c61b00 Add more exception except 2024-09-27 13:44:41 +02:00
Remi Cadene
e499d60742 fix unit test 2024-09-27 12:29:58 +02:00
Remi Cadene
81f17d505e if not '~cameras' in overrides 2024-09-27 12:21:06 +02:00
Remi Cadene
bf7e906b70 add +COLOR_RGB2BGR 2024-09-27 12:11:48 +02:00
Remi Cadene
a7350d9b65 add mock=False 2024-09-27 12:02:14 +02:00
Remi Cadene
8da08935d4 move mock_motor in test_motors.py 2024-09-26 16:45:04 +02:00
Remi Cadene
7450adc72b no more require_mock_motor 2024-09-26 16:40:24 +02:00
Remi Cadene
e66900e387 mock_motor instead of require_mock_motor 2024-09-26 16:35:37 +02:00
Remi Cadene
89b2b7397e fix unit tests 2024-09-26 16:31:23 +02:00
Remi Cadene
48be576cc6 fix unit tests 2024-09-26 16:28:08 +02:00
Remi Cadene
395720a5de Revert "Remove @require_x"
This reverts commit 8a7b5c45c7.
2024-09-26 14:35:26 +02:00
Remi Cadene
8a7b5c45c7 Remove @require_x 2024-09-26 14:35:17 +02:00
Remi Cadene
b6b7fda5f8 custom pytest speedup (TOREMOVE) 2024-09-26 13:53:31 +02:00
Remi Cadene
8b36223832 fix unit tests 2024-09-26 13:51:45 +02:00
Remi Cadene
a236382590 fix unit tests 2024-09-26 13:19:29 +02:00
Remi Cadene
3cb85bcd4b Fix unit test 2024-09-26 13:09:08 +02:00
Remi Cadene
f2b1842d69 fix unit test 2024-09-26 11:48:22 +02:00
Remi Cadene
500d505bf6 Add support for video=False in record (no tested yet) 2024-09-26 11:41:32 +02:00
Remi Cadene
2c0171632f fix aloha mock 2024-09-25 15:18:21 +02:00
Remi Cadene
bded8cbbe9 Fix unit tests 2024-09-25 14:11:28 +02:00
Remi Cadene
6377d2a96c mock) 2024-09-25 12:29:53 +02:00
Remi Cadene
558420115e mock=False 2024-09-25 12:22:22 +02:00
Remi Cadene
bcf27b8c01 Skip mocking tests with minimal pytest 2024-09-25 12:11:27 +02:00
Remi
f0452c222a Merge branch 'main' into user/rcadene/2024_09_01_mock_robot_devices 2024-09-25 11:36:58 +02:00
Remi Cadene
1bf284562e pre-commit run --all-files 2024-09-25 11:36:08 +02:00
Simon Alibert
886923a890 Fix opencv segmentation fault (#442)
Co-authored-by: Remi <remi.cadene@huggingface.co>
2024-09-25 11:29:59 +02:00
Remi Cadene
adc8dc9bfb Address comments 2024-09-16 14:53:45 +02:00
Remi Cadene
624551bea9 Address comments 2024-09-16 14:52:27 +02:00
Remi Cadene
6636db5b51 Address comments 2024-09-16 14:51:25 +02:00
Remi
ccc0586d45 Apply suggestions from code review
Co-authored-by: Simon Alibert <75076266+aliberts@users.noreply.github.com>
2024-09-16 14:49:19 +02:00
Remi
bab19d9b1d Merge branch 'main' into user/rcadene/2024_09_10_train_aloha 2024-09-15 17:44:52 +02:00
Remi Cadene
783b78ae9a Fix unit test test_policies, backward, Remove no_state from test 2024-09-15 17:30:48 +02:00
Remi Cadene
e47856add6 Fix unit test test_policies, backward, Remove no_state from test 2024-09-15 17:22:12 +02:00
Remi Cadene
3f993d5250 fix typo 2024-09-12 02:44:46 +02:00
Remi Cadene
cd4d2257d3 Fix unit test 2024-09-12 02:43:21 +02:00
Remi Cadene
53ebf9cf9f Mock robots (WIP segmentation fault) 2024-09-12 01:43:32 +02:00
Remi Cadene
4151630c24 Mock dynamixel_sdk 2024-09-12 01:08:44 +02:00
Remi Cadene
bc0e691280 force push aloha_real.yaml 2024-09-10 23:31:05 +02:00
Remi Cadene
e1763aa906 Clean + Add act_aloha_real.yaml + Add act_real.yaml 2024-09-10 19:45:59 +02:00
Remi Cadene
3bd5ea4d7a WIP 2024-09-10 18:30:39 +02:00
Remi Cadene
44b8394365 add dynamic import for cv2 and pyrealsense2 2024-09-09 19:32:35 +02:00
Remi Cadene
2469c99053 fix unit tests 2024-09-09 19:19:05 +02:00
Remi Cadene
96cc2433d6 Mock OpenCVCamera 2024-09-09 13:37:37 +02:00
16 changed files with 149 additions and 1347 deletions

View File

@@ -65,6 +65,7 @@ htmlcov/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
@@ -72,11 +73,6 @@ coverage.xml
.hypothesis/
.pytest_cache/
# Ignore .cache except calibration
.cache/*
!.cache/calibration/
!.cache/calibration/**
# Translations
*.mo
*.pot

View File

@@ -1,179 +0,0 @@
This tutorial explains how to use [Aloha and Aloha 2 stationary](https://www.trossenrobotics.com/aloha-stationary) with LeRobot.
## Setup
Follow the [documentation from Trossen Robotics](https://docs.trossenrobotics.com/aloha_docs/getting_started/stationary/hardware_setup.html) for setting up the hardware and plugging the 4 arms and 4 cameras to your computer.
## Install LeRobot
On your computer:
1. [Install Miniconda](https://docs.anaconda.com/miniconda/#quick-command-line-install):
```bash
mkdir -p ~/miniconda3
wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh -O ~/miniconda3/miniconda.sh
bash ~/miniconda3/miniconda.sh -b -u -p ~/miniconda3
rm ~/miniconda3/miniconda.sh
~/miniconda3/bin/conda init bash
```
2. Restart shell or `source ~/.bashrc`
3. Create and activate a fresh conda environment for lerobot
```bash
conda create -y -n lerobot python=3.10 && conda activate lerobot
```
4. Clone LeRobot:
```bash
git clone https://github.com/huggingface/lerobot.git ~/lerobot
```
5. Install LeRobot with dependencies for the Aloha motors (dynamixel) and cameras (intelrealsense):
```bash
cd ~/lerobot && pip install -e ".[dynamixel intelrealsense]"
```
And install extra dependencies for recording datasets on Linux:
```bash
conda install -y -c conda-forge ffmpeg
pip uninstall -y opencv-python
conda install -y -c conda-forge "opencv>=4.10.0"
```
## Teleoperate
**/!\ FOR SAFETY, READ THIS /!\**
Teleoperation consists in manually operating the leader arms to move the follower arms. Importantly:
1. Make sure your leader arms are in the same position as the follower arms, so that the follower arms don't move too fast to match the leader arms,
2. Our code assumes that your robot has been assembled following Trossen Robotics instructions. This allows us to skip calibration, as we use the pre-defined calibration files in `.cache/calibration/aloha_default`. If you replace a motor, make sure you follow the exact instructions from Trossen Robotics.
By running the following code, you can start your first **SAFE** teleoperation:
```bash
python lerobot/scripts/control_robot.py teleoperate \
--robot-path lerobot/configs/robot/aloha.yaml \
--robot-overrides max_relative_target=5
```
By adding `--robot-overrides max_relative_target=5`, we override the default value for `max_relative_target` defined in `lerobot/configs/robot/aloha.yaml`. It is expected to be `5` to limit the magnitude of the movement for more safety, but the teloperation won't be smooth. When you feel confident, you can disable this limit by adding `--robot-overrides max_relative_target=null` to the command line:
```bash
python lerobot/scripts/control_robot.py teleoperate \
--robot-path lerobot/configs/robot/aloha.yaml \
--robot-overrides max_relative_target=null
```
## Record a dataset
Once you're familiar with teleoperation, you can record your first dataset with Aloha.
If you want to use the Hugging Face hub features for uploading your dataset and you haven't previously done it, make sure you've logged in using a write-access token, which can be generated from the [Hugging Face settings](https://huggingface.co/settings/tokens):
```bash
huggingface-cli login --token ${HUGGINGFACE_TOKEN} --add-to-git-credential
```
Store your Hugging Face repository name in a variable to run these commands:
```bash
HF_USER=$(huggingface-cli whoami | head -n 1)
echo $HF_USER
```
Record 2 episodes and upload your dataset to the hub:
```bash
python lerobot/scripts/control_robot.py record \
--robot-path lerobot/configs/robot/aloha.yaml \
--robot-overrides max_relative_target=null \
--fps 30 \
--root data \
--repo-id ${HF_USER}/aloha_test \
--tags aloha tutorial \
--warmup-time-s 5 \
--episode-time-s 40 \
--reset-time-s 10 \
--num-episodes 2 \
--push-to-hub 1
```
## Visualize a dataset
If you uploaded your dataset to the hub with `--push-to-hub 1`, you can [visualize your dataset online](https://huggingface.co/spaces/lerobot/visualize_dataset) by copy pasting your repo id given by:
```bash
echo ${HF_USER}/aloha_test
```
If you didn't upload with `--push-to-hub 0`, you can also visualize it locally with:
```bash
python lerobot/scripts/visualize_dataset_html.py \
--root data \
--repo-id ${HF_USER}/aloha_test
```
## Replay an episode
**/!\ FOR SAFETY, READ THIS /!\**
Replay consists in automatically replaying the sequence of actions (i.e. goal positions for your motors) recorded in a given dataset episode. Make sure the current initial position of your robot is similar to the one in your episode, so that your follower arms don't move too fast to go to the first goal positions. For safety, you might want to add `--robot-overrides max_relative_target=5` to your command line as explained above.
Now try to replay the first episode on your robot:
```bash
python lerobot/scripts/control_robot.py replay \
--robot-path lerobot/configs/robot/aloha.yaml \
--robot-overrides max_relative_target=null \
--fps 30 \
--root data \
--repo-id ${HF_USER}/aloha_test \
--episode 0
```
## Train a policy
To train a policy to control your robot, use the [`python lerobot/scripts/train.py`](../lerobot/scripts/train.py) script. A few arguments are required. Here is an example command:
```bash
DATA_DIR=data python lerobot/scripts/train.py \
dataset_repo_id=${HF_USER}/aloha_test \
policy=act_aloha_real \
env=aloha_real \
hydra.run.dir=outputs/train/act_aloha_test \
hydra.job.name=act_aloha_test \
device=cuda \
wandb.enable=true
```
Let's explain it:
1. We provided the dataset as argument with `dataset_repo_id=${HF_USER}/aloha_test`.
2. We provided the policy with `policy=act_aloha_real`. This loads configurations from [`lerobot/configs/policy/act_aloha_real.yaml`](../lerobot/configs/policy/act_aloha_real.yaml). Importantly, this policy uses 4 cameras as input `cam_right_wrist`, `cam_left_wrist`, `cam_high`, and `cam_low`.
3. We provided an environment as argument with `env=aloha_real`. This loads configurations from [`lerobot/configs/env/aloha_real.yaml`](../lerobot/configs/env/aloha_real.yaml). Note: this yaml defines 18 dimensions for the `state_dim` and `action_dim`, corresponding to 18 motors, not 14 motors as used in previous Aloha work. This is because, we include the `shoulder_shadow` and `elbow_shadow` motors for simplicity.
4. We provided `device=cuda` since we are training on a Nvidia GPU.
5. We provided `wandb.enable=true` to use [Weights and Biases](https://docs.wandb.ai/quickstart) for visualizing training plots. This is optional but if you use it, make sure you are logged in by running `wandb login`.
6. We added `DATA_DIR=data` to access your dataset stored in your local `data` directory. If you dont provide `DATA_DIR`, your dataset will be downloaded from Hugging Face hub to your cache folder `$HOME/.cache/hugginface`. In future versions of `lerobot`, both directories will be in sync.
Training should take several hours. You will find checkpoints in `outputs/train/act_aloha_test/checkpoints`.
## Evaluate your policy
You can use the `record` function from [`lerobot/scripts/control_robot.py`](../lerobot/scripts/control_robot.py) but with a policy checkpoint as input. For instance, run this command to record 10 evaluation episodes:
```bash
python lerobot/scripts/control_robot.py record \
--robot-path lerobot/configs/robot/aloha.yaml \
--robot-overrides max_relative_target=null \
--fps 30 \
--root data \
--repo-id ${HF_USER}/eval_act_aloha_test \
--tags aloha tutorial eval \
--warmup-time-s 5 \
--episode-time-s 40 \
--reset-time-s 10 \
--num-episodes 10 \
--num-image-writer-processes 1 \
-p outputs/train/act_aloha_test/checkpoints/last/pretrained_model
```
As you can see, it's almost the same command as previously used to record your training dataset. Two things changed:
1. There is an additional `-p` argument which indicates the path to your policy checkpoint with (e.g. `-p outputs/train/eval_aloha_test/checkpoints/last/pretrained_model`). You can also use the model repository if you uploaded a model checkpoint to the hub (e.g. `-p ${HF_USER}/act_aloha_test`).
2. The name of dataset begins by `eval` to reflect that you are running inference (e.g. `--repo-id ${HF_USER}/eval_act_aloha_test`).
3. We use `--num-image-writer-processes 1` instead of the default value (`0`). On our computer, using a dedicated process to write images from the 4 cameras on disk allows to reach constent 30 fps during inference. Feel free to explore different values for `--num-image-writer-processes`.
## More
Follow this [previous tutorial](https://github.com/huggingface/lerobot/blob/main/examples/7_get_started_with_real_robot.md#4-train-a-policy-on-your-data) for a more in-depth explaination.
If you have any question or need help, please reach out on Discord in the channel `#aloha-arm`.

View File

@@ -195,8 +195,8 @@ available_policies = [
# lists all available robots from `lerobot/common/robot_devices/robots`
available_robots = [
"koch",
"koch_bimanual",
# "koch",
# "koch_bimanual",
"aloha",
]

View File

@@ -40,7 +40,7 @@ def make_env(cfg: DictConfig, n_envs: int | None = None) -> gym.vector.VectorEnv
)
raise e
gym_handle = f"{package_name}/{cfg.env.task}" if cfg.env.get('handle') is None else cfg.env.handle
gym_handle = f"{package_name}/{cfg.env.task}"
gym_kwgs = dict(cfg.env.get("gym", {}))
if cfg.env.get("episode_length"):

View File

@@ -18,11 +18,6 @@ import numpy as np
import torch
from torch import Tensor
##############################################
### TODO this script is modified to hackathon purposes and should be reset after.
##############################################
PIXELS_KEY="image_front"
def preprocess_observation(observations: dict[str, np.ndarray]) -> dict[str, Tensor]:
"""Convert environment observation to LeRobot format observation.
@@ -33,24 +28,28 @@ def preprocess_observation(observations: dict[str, np.ndarray]) -> dict[str, Ten
"""
# map to expected inputs for the policy
return_observations = {}
#if PIXELS_KEY in observations:
# if isinstance(observations[PIXELS_KEY], dict):
# imgs = {f"observation.images.{key}": img for key, img in observations["pixels"].items()}
# else:
# imgs = {"observation.image": observations["pixels"]}
imgs = {"observation.images.image_front": observations["image_front"]}
for imgkey, img in imgs.items():
img = torch.from_numpy(img)
# sanity check that images are channel last
_, h, w, c = img.shape
assert c < h and c < w, f"expect channel last images, but instead got {img.shape=}"
# sanity check that images are uint8
assert img.dtype == torch.uint8, f"expect torch.uint8, but instead {img.dtype=}"
# convert to channel first of type float32 in range [0,1]
img = einops.rearrange(img, "b h w c -> b c h w").contiguous()
img = img.type(torch.float32)
img /= 255
return_observations[imgkey] = img
if "pixels" in observations:
if isinstance(observations["pixels"], dict):
imgs = {f"observation.images.{key}": img for key, img in observations["pixels"].items()}
else:
imgs = {"observation.image": observations["pixels"]}
for imgkey, img in imgs.items():
img = torch.from_numpy(img)
# sanity check that images are channel last
_, h, w, c = img.shape
assert c < h and c < w, f"expect channel last images, but instead got {img.shape=}"
# sanity check that images are uint8
assert img.dtype == torch.uint8, f"expect torch.uint8, but instead {img.dtype=}"
# convert to channel first of type float32 in range [0,1]
img = einops.rearrange(img, "b h w c -> b c h w").contiguous()
img = img.type(torch.float32)
img /= 255
return_observations[imgkey] = img
if "environment_state" in observations:
return_observations["observation.environment_state"] = torch.from_numpy(
@@ -59,5 +58,5 @@ def preprocess_observation(observations: dict[str, np.ndarray]) -> dict[str, Ten
# TODO(rcadene): enable pixels only baseline with `obs_type="pixels"` in environment by removing
# requirement for "agent_pos"
return_observations["observation.state"] = torch.from_numpy(observations["arm_qpos"]).float()
return_observations["observation.state"] = torch.from_numpy(observations["agent_pos"]).float()
return return_observations

View File

@@ -137,8 +137,6 @@ class TDMPCPolicy(
if self._use_image:
batch = dict(batch) # shallow copy so that adding a key doesn't modify the original
batch["observation.image"] = batch[self.input_image_key]
#TODO michel_aractingi temp fix to remove before merge
del batch[self.input_image_key]
self._queues = populate_queues(self._queues, batch)

View File

@@ -156,7 +156,7 @@ def save_images_from_cameras(
executor.submit(
save_image,
image,
camera.camera_index,
camera.index,
frame_index,
images_dir,
)

View File

@@ -364,7 +364,6 @@ class ManipulatorRobot:
for name in self.follower_arms:
print(f"Connecting {name} follower arm.")
self.follower_arms[name].connect()
for name in self.leader_arms:
print(f"Connecting {name} leader arm.")
self.leader_arms[name].connect()

View File

@@ -5,6 +5,6 @@ fps: 30
env:
name: real_world
task: null
state_dim: 18
action_dim: 18
state_dim: 14
action_dim: 14
fps: ${fps}

View File

@@ -10,7 +10,7 @@ max_relative_target: null
leader_arms:
main:
_target_: lerobot.common.robot_devices.motors.dynamixel.DynamixelMotorsBus
port: /dev/tty.usbmodem58760430441
port: /dev/tty.usbmodem575E0031751
motors:
# name: (index, model)
shoulder_pan: [1, "xl330-m077"]

View File

@@ -164,9 +164,9 @@ def say(text, blocking=False):
os.system(cmd)
def save_image(img_tensor, key, frame_index, episode_index, videos_dir: str):
def save_image(img_tensor, key, frame_index, episode_index, videos_dir):
img = Image.fromarray(img_tensor.numpy())
path = Path(videos_dir) / f"{key}_episode_{episode_index:06d}" / f"frame_{frame_index:06d}.png"
path = videos_dir / f"{key}_episode_{episode_index:06d}" / f"frame_{frame_index:06d}.png"
path.parent.mkdir(parents=True, exist_ok=True)
img.save(str(path), quality=100)
@@ -240,6 +240,48 @@ def is_headless():
return True
def loop_to_save_frame_in_threads(frame_queue, num_image_writers):
with concurrent.futures.ThreadPoolExecutor(max_workers=num_image_writers) as executor:
futures = []
while True:
# Blocks until a frame is available
frame_data = frame_queue.get()
# Exit if we send None to stop the worker
if frame_data is None:
# Wait for all submitted futures to complete before exiting
for _ in tqdm.tqdm(
concurrent.futures.as_completed(futures), total=len(futures), desc="Writting images"
):
pass
break
frame, key, frame_index, episode_index, videos_dir = frame_data
futures.append(executor.submit(save_image, frame, key, frame_index, episode_index, videos_dir))
def start_frame_workers(frame_queue, num_image_writers, num_workers=1):
workers = []
for _ in range(num_workers):
worker = multiprocessing.Process(
target=loop_to_save_frame_in_threads,
args=(frame_queue, num_image_writers),
)
worker.start()
workers.append(worker)
return workers
def stop_workers(workers, frame_queue):
# Send None to each process to signal it to stop
for _ in workers:
frame_queue.put(None)
# Wait for all processes to terminate
for process in workers:
process.join()
def has_method(_object: object, method_name: str):
return hasattr(_object, method_name) and callable(getattr(_object, method_name))
@@ -256,129 +298,6 @@ def get_available_arms(robot):
return available_arms
########################################################################################
# Asynchrounous saving of images on disk
########################################################################################
def loop_to_save_images_in_threads(image_queue, num_threads):
if num_threads < 1:
raise NotImplementedError(f"Only `num_threads>=1` is supported for now, but {num_threads=} given.")
with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor:
futures = []
while True:
# Blocks until a frame is available
frame_data = image_queue.get()
# As usually done, exit loop when receiving None to stop the worker
if frame_data is None:
break
image, key, frame_index, episode_index, videos_dir = frame_data
futures.append(executor.submit(save_image, image, key, frame_index, episode_index, videos_dir))
# Before exiting function, wait for all threads to complete
with tqdm.tqdm(total=len(futures), desc="Writing images") as progress_bar:
concurrent.futures.wait(futures)
progress_bar.update(len(futures))
def start_image_writer_processes(image_queue, num_processes, num_threads_per_process):
if num_processes < 1:
raise ValueError(f"Only `num_processes>=1` is supported, but {num_processes=} given.")
if num_threads_per_process < 1:
raise NotImplementedError(
"Only `num_threads_per_process>=1` is supported for now, but {num_threads_per_process=} given."
)
processes = []
for _ in range(num_processes):
process = multiprocessing.Process(
target=loop_to_save_images_in_threads,
args=(image_queue, num_threads_per_process),
)
process.start()
processes.append(process)
return processes
def stop_processes(processes, queue, timeout):
# Send None to each process to signal them to stop
for _ in processes:
queue.put(None)
# Close the queue, no more items can be put in the queue
queue.close()
# Wait maximum 20 seconds for all processes to terminate
for process in processes:
process.join(timeout=timeout)
# If not terminated after 20 seconds, force termination
if process.is_alive():
process.terminate()
# Ensure all background queue threads have finished
queue.join_thread()
def start_image_writer(num_processes, num_threads):
"""This function abstract away the initialisation of processes or/and threads to
save images on disk asynchrounously, which is critical to control a robot and record data
at a high frame rate.
When `num_processes=0`, it returns a dictionary containing a threads pool of size `num_threads`.
When `num_processes>0`, it returns a dictionary containing a processes pool of size `num_processes`,
where each subprocess starts their own threads pool of size `num_threads`.
The optimal number of processes and threads depends on your computer capabilities.
We advise to use 4 threads per camera with 0 processes. If the fps is not stable, try to increase or lower
the number of threads. If it is still not stable, try to use 1 subprocess, or more.
"""
image_writer = {}
if num_processes == 0:
futures = []
threads_pool = concurrent.futures.ThreadPoolExecutor(max_workers=num_threads)
image_writer["threads_pool"], image_writer["futures"] = threads_pool, futures
else:
# TODO(rcadene): When using num_processes>1, `multiprocessing.Manager().Queue()`
# might be better than `multiprocessing.Queue()`. Source: https://www.geeksforgeeks.org/python-multiprocessing-queue-vs-multiprocessing-manager-queue
image_queue = multiprocessing.Queue()
processes_pool = start_image_writer_processes(
image_queue, num_processes=num_processes, num_threads_per_process=num_threads
)
image_writer["processes_pool"], image_writer["image_queue"] = processes_pool, image_queue
return image_writer
def async_save_image(image_writer, image, key, frame_index, episode_index, videos_dir):
"""This function abstract away the saving of an image on disk asynchrounously. It uses a dictionary
called image writer which contains either a pool of processes or a pool of threads.
"""
if "threads_pool" in image_writer:
threads_pool, futures = image_writer["threads_pool"], image_writer["futures"]
futures.append(threads_pool.submit(save_image, image, key, frame_index, episode_index, videos_dir))
else:
image_queue = image_writer["image_queue"]
image_queue.put((image, key, frame_index, episode_index, videos_dir))
def stop_image_writer(image_writer, timeout):
if "threads_pool" in image_writer:
futures = image_writer["futures"]
# Before exiting function, wait for all threads to complete
with tqdm.tqdm(total=len(futures), desc="Writing images") as progress_bar:
concurrent.futures.wait(futures, timeout=timeout)
progress_bar.update(len(futures))
else:
processes_pool, image_queue = image_writer["processes_pool"], image_writer["image_queue"]
stop_processes(processes_pool, image_queue, timeout=timeout)
########################################################################################
# Control modes
########################################################################################
@@ -466,11 +385,9 @@ def record(
run_compute_stats=True,
push_to_hub=True,
tags=None,
num_image_writer_processes=0,
num_image_writer_threads_per_camera=4,
num_image_writers_per_camera=4,
force_override=False,
display_cameras=True,
play_sounds=True,
):
# TODO(rcadene): Add option to record logs
# TODO(rcadene): Clean this function via decomposition in higher level functions
@@ -562,8 +479,7 @@ def record(
while timestamp < warmup_time_s:
if not is_warmup_print:
logging.info("Warming up (no data recording)")
if play_sounds:
say("Warming up")
say("Warming up")
is_warmup_print = True
start_loop_t = time.perf_counter()
@@ -590,22 +506,19 @@ def record(
if has_method(robot, "teleop_safety_stop"):
robot.teleop_safety_stop()
has_camera = len(robot.cameras) > 0
if has_camera:
# Initialize processes or/and threads dedicated to save images on disk asynchronously,
# which is critical to control a robot and record data at a high frame rate.
image_writer = start_image_writer(
num_processes=num_image_writer_processes,
num_threads=num_image_writer_threads_per_camera * len(robot.cameras),
)
# Save images using threads to reach high fps (30 and more)
# Using `with` to exist smoothly if an execption is raised.
num_image_writers = num_image_writers_per_camera * len(robot.cameras)
num_image_writers = max(num_image_writers, 1)
frame_queue = multiprocessing.Queue()
frame_workers = start_frame_workers(frame_queue, num_image_writers)
# Using `try` to exist smoothly if an exception is raised
try:
# Start recording all episodes
while episode_index < num_episodes:
logging.info(f"Recording episode {episode_index}")
if play_sounds:
say(f"Recording episode {episode_index}")
say(f"Recording episode {episode_index}")
ep_dict = {}
frame_index = 0
timestamp = 0
@@ -621,16 +534,8 @@ def record(
image_keys = [key for key in observation if "image" in key]
not_image_keys = [key for key in observation if "image" not in key]
if has_camera > 0:
for key in image_keys:
async_save_image(
image_writer,
image=observation[key],
key=key,
frame_index=frame_index,
episode_index=episode_index,
videos_dir=str(videos_dir),
)
for key in image_keys:
frame_queue.put((observation[key], key, frame_index, episode_index, videos_dir))
if display_cameras and not is_headless():
image_keys = [key for key in observation if "image" in key]
@@ -700,8 +605,7 @@ def record(
if not stop_recording:
# Start resetting env while the executor are finishing
logging.info("Reset the environment")
if play_sounds:
say("Reset the environment")
say("Reset the environment")
timestamp = 0
start_vencod_t = time.perf_counter()
@@ -773,23 +677,18 @@ def record(
if is_last_episode:
logging.info("Done recording")
if play_sounds:
say("Done recording", blocking=True)
say("Done recording", blocking=True)
if not is_headless():
listener.stop()
if has_camera > 0:
logging.info("Waiting for image writer to terminate...")
stop_image_writer(image_writer, timeout=20)
logging.info("Waiting for threads writing the images on disk to terminate...")
stop_workers(frame_workers, frame_queue)
except Exception as e:
if has_camera > 0:
logging.info("Waiting for image writer to terminate...")
stop_image_writer(image_writer, timeout=20)
raise e
except Exception:
traceback.print_exc()
stop_workers(frame_workers, frame_queue)
robot.disconnect()
if display_cameras and not is_headless():
cv2.destroyAllWindows()
@@ -797,8 +696,7 @@ def record(
if video:
logging.info("Encoding videos")
if play_sounds:
say("Encoding videos")
say("Encoding videos")
# Use ffmpeg to convert frames stored as png into mp4 videos
for episode_index in tqdm.tqdm(range(num_episodes)):
for key in image_keys:
@@ -843,8 +741,7 @@ def record(
)
if run_compute_stats:
logging.info("Computing dataset statistics")
if play_sounds:
say("Computing dataset statistics")
say("Computing dataset statistics")
stats = compute_stats(lerobot_dataset)
lerobot_dataset.stats = stats
else:
@@ -866,14 +763,11 @@ def record(
create_branch(repo_id, repo_type="dataset", branch=CODEBASE_VERSION)
logging.info("Exiting")
if play_sounds:
say("Exiting")
say("Exiting")
return lerobot_dataset
def replay(
robot: Robot, episode: int, fps: int | None = None, root="data", repo_id="lerobot/debug", play_sounds=True
):
def replay(robot: Robot, episode: int, fps: int | None = None, root="data", repo_id="lerobot/debug"):
# TODO(rcadene): Add option to record logs
local_dir = Path(root) / repo_id
if not local_dir.exists():
@@ -888,8 +782,7 @@ def replay(
robot.connect()
logging.info("Replaying episode")
if play_sounds:
say("Replaying episode", blocking=True)
say("Replaying episode", blocking=True)
for idx in range(from_idx, to_idx):
start_episode_t = time.perf_counter()
@@ -989,23 +882,12 @@ if __name__ == "__main__":
help="Add tags to your dataset on the hub.",
)
parser_record.add_argument(
"--num-image-writer-processes",
type=int,
default=0,
help=(
"Number of subprocesses handling the saving of frames as PNGs. Set to 0 to use threads only; "
"set to ≥1 to use subprocesses, each using threads to write images. The best number of processes "
"and threads depends on your system. We recommend 4 threads per camera with 0 processes. "
"If fps is unstable, adjust the thread count. If still unstable, try using 1 or more subprocesses."
),
)
parser_record.add_argument(
"--num-image-writer-threads-per-camera",
"--num-image-writers-per-camera",
type=int,
default=4,
help=(
"Number of threads writing the frames as png images on disk, per camera. "
"Too many threads might cause unstable teleoperation fps due to main thread being blocked. "
"Too much threads might cause unstable teleoperation fps due to main thread being blocked. "
"Not enough threads might cause low camera fps."
),
)
@@ -1056,7 +938,6 @@ if __name__ == "__main__":
control_mode = args.mode
robot_path = args.robot_path
robot_overrides = args.robot_overrides
kwargs = vars(args)
del kwargs["mode"]
del kwargs["robot_path"]

View File

@@ -1,857 +0,0 @@
"""
Utilities to control a robot in simulation.
Useful to record a dataset, replay a recorded episode and record an evaluation dataset.
Examples of usage:
- Unlimited teleoperation at a limited frequency of 30 Hz, to simulate data recording frequency.
You can modify this value depending on how fast your simulation can run:
```bash
python lerobot/scripts/control_robot.py teleoperate \
--fps 30 \
--robot-path lerobot/configs/robot/your_robot_config.yaml \
--sim-config lerobot/configs/env/your_sim_config.yaml
```
- Record one episode in order to test replay:
```bash
python lerobot/scripts/control_sim_robot.py record \
--robot-path lerobot/configs/robot/your_robot_config.yaml \
--sim-config lerobot/configs/env/your_sim_config.yaml \
--fps 30 \
--root tmp/data \
--repo-id $USER/robot_sim_test \
--num-episodes 1 \
--run-compute-stats 0
```
- Visualize dataset:
```bash
python lerobot/scripts/visualize_dataset.py \
--root tmp/data \
--repo-id $USER/robot_sim_test \
--episode-index 0
```
- Replay this test episode:
```bash
python lerobot/scripts/control_sim_robot.py replay \
--sim-config lerobot/configs/env/your_sim_config.yaml \
--fps 30 \
--root tmp/data \
--repo-id $USER/koch_test \
--episodes 0
```
- Record a full dataset in order to train a policy,
30 seconds of recording for each episode, and 10 seconds to reset the environment in between episodes:
```bash
python lerobot/scripts/control_sim_robot.py record \
--robot-path lerobot/configs/robot/your_robot_config.yaml \
--sim-config lerobot/configs/env/your_sim_config.yaml \
--fps 30 \
--root data \
--repo-id $USER/robot_sim_test \
--num-episodes 50 \
--episode-time-s 30 \
--reset-time-s 10
```
**NOTE**: You can use your keyboard to control data recording flow.
- Tap right arrow key '->' to early exit while recording an episode and go to resseting the environment.
- Tap right arrow key '->' to early exit while resetting the environment and got to recording the next episode.
- Tap left arrow key '<-' to early exit and re-record the current episode.
- Tap escape key 'esc' to stop the data recording.
This might require a sudo permission to allow your terminal to monitor keyboard events.
**NOTE**: You can resume/continue data recording by running the same data recording command twice.
To avoid resuming by deleting the dataset, use `--force-override 1`.
"""
import argparse
import concurrent.futures
import json
import logging
import multiprocessing.process
import os
import platform
import shutil
import time
import traceback
from functools import cache
from pathlib import Path
import gymnasium as gym
import multiprocessing
from contextlib import nullcontext
import importlib
import cv2
import torch
import numpy as np
import tqdm
from omegaconf import DictConfig
from PIL import Image
from datasets import Dataset, Features, Sequence, Value
# from safetensors.torch import load_file, save_file
from lerobot.common.datasets.compute_stats import compute_stats
from lerobot.common.datasets.lerobot_dataset import CODEBASE_VERSION, LeRobotDataset
from lerobot.common.datasets.video_utils import VideoFrame, encode_video_frames
from lerobot.common.datasets.push_dataset_to_hub.utils import concatenate_episodes, get_default_encoding
from lerobot.common.datasets.utils import calculate_episode_data_index, create_branch, hf_transform_to_torch
from lerobot.common.utils.utils import get_safe_torch_device, init_hydra_config, init_logging, set_global_seed
from lerobot.common.datasets.video_utils import encode_video_frames
from lerobot.common.robot_devices.robots.factory import make_robot
from lerobot.common.policies.factory import make_policy
from lerobot.common.robot_devices.robots.utils import Robot
from lerobot.common.robot_devices.utils import busy_wait
from lerobot.common.envs.factory import make_env
from lerobot.common.utils.utils import init_hydra_config, init_logging
from lerobot.scripts.eval import get_pretrained_policy_path
from lerobot.scripts.push_dataset_to_hub import (
push_dataset_card_to_hub,
push_meta_data_to_hub,
push_videos_to_hub,
save_meta_data,
)
########################################################################################
# Utilities
########################################################################################
def say(text, blocking=False):
# Check if mac, linux, or windows.
if platform.system() == "Darwin":
cmd = f'say "{text}"'
elif platform.system() == "Linux":
cmd = f'spd-say "{text}"'
elif platform.system() == "Windows":
cmd = (
'PowerShell -Command "Add-Type -AssemblyName System.Speech; '
f"(New-Object System.Speech.Synthesis.SpeechSynthesizer).Speak('{text}')\""
)
if not blocking and platform.system() in ["Darwin", "Linux"]:
# TODO(rcadene): Make it work for Windows
# Use the ampersand to run command in the background
cmd += " &"
os.system(cmd)
def save_image(img_arr, key, frame_index, episode_index, videos_dir):
img = Image.fromarray(img_arr)
path = videos_dir / f"{key}_episode_{episode_index:06d}" / f"frame_{frame_index:06d}.png"
path.parent.mkdir(parents=True, exist_ok=True)
img.save(str(path), quality=100)
def show_image_observations(observation_queue:multiprocessing.Queue):
keys = None
while True:
observations = observation_queue.get()
images = []
if keys is None: keys = [k for k in observations if 'image' in k]
for key in keys:
images.append(observations[key])#.squeeze(0))
cat_image = np.concatenate(images, 1)
cv2.imshow('observations', cv2.cvtColor(cat_image, cv2.COLOR_RGB2BGR))
cv2.waitKey(1)
def none_or_int(value):
if value == "None":
return None
return int(value)
@cache
def is_headless():
"""Detects if python is running without a monitor."""
try:
import pynput # noqa
return False
except Exception:
print(
"Error trying to import pynput. Switching to headless mode. "
"As a result, the video stream from the cameras won't be shown, "
"and you won't be able to change the control flow with keyboards. "
"For more info, see traceback below.\n"
)
traceback.print_exc()
print()
return True
def get_action_from_policy(policy, observation, device, use_amp=False):
with (
torch.inference_mode(),
torch.autocast(device_type=device.type)
if device.type == "cuda" and use_amp
else nullcontext(),
):
# Convert to pytorch format: channel first and float32 in [0,1] with batch dimension
for name in observation:
if "image" in name:
observation[name] = observation[name].type(torch.float32) / 255
observation[name] = observation[name].permute(2, 0, 1).contiguous()
observation[name] = observation[name].unsqueeze(0)
observation[name] = observation[name].to(device)
# Compute the next action with the policy
# based on the current observation
action = policy.select_action(observation)
# Remove batch dimension
action = action.squeeze(0)
# Move to cpu, if not already the case
return action.to("cpu")
def init_read_leader(robot, fps, **kwargs):
axis_directions = kwargs.get('axis_directions', [1])
offsets = kwargs.get('offsets', [0])
command_queue = multiprocessing.Queue(1000)
read_leader = multiprocessing.Process(target=read_commands_from_leader, args=(robot, command_queue, fps, axis_directions, offsets))
return read_leader, command_queue
def read_commands_from_leader(robot: Robot, queue: multiprocessing.Queue, fps: int, axis_directions: list, offsets: list, stop_flag=None):
if not robot.is_connected:
robot.connect()
# Constants necessary for transforming the joint pos of the real robot to the sim
# depending on the robot discription used in that sim.
start_pos = np.array(robot.leader_arms.main.calibration['start_pos'])
axis_directions = np.array(axis_directions)
offsets = np.array(offsets) * np.pi
counts_to_radians = 2.0 * np.pi / 4096
if stop_flag is None:
stop_flag = multiprocessing.Value('b', False)
#TODO(michel_aractingi): temp fix to disable calibration while reading from the leader arms
# different calculation for joint commands would be needed
robot.leader_arms.main.calibration = None
while True:
#with stop_flag.get_lock():
# stop_flag_value = stop_flag.value
start_loop_t = time.perf_counter()
#if not stop_flag_value:
real_positions = np.array(robot.leader_arms.main.read('Present_Position'))
joint_commands = axis_directions * (real_positions - start_pos) * counts_to_radians + offsets
queue.put(joint_commands)
if fps is not None:
dt_s = time.perf_counter() - start_loop_t
busy_wait(1 / fps - dt_s)
#else:
#queue.get() #TODO (michel_aractingi): remove elements from queue in case get_lock is delayed
#print('here!!!')
#busy_wait(0.01)
def create_rl_hf_dataset(data_dict):
features = {}
keys = [key for key in data_dict if "observation.images." in key]
for key in keys:
features[key] = VideoFrame()
features["observation.state"] = Sequence(
length=data_dict["observation.state"].shape[1], feature=Value(dtype="float32", id=None)
)
if "observation.velocity" in data_dict:
features["observation.velocity"] = Sequence(
length=data_dict["observation.velocity"].shape[1], feature=Value(dtype="float32", id=None)
)
if "observation.effort" in data_dict:
features["observation.effort"] = Sequence(
length=data_dict["observation.effort"].shape[1], feature=Value(dtype="float32", id=None)
)
features["action"] = Sequence(
length=data_dict["action"].shape[1], feature=Value(dtype="float32", id=None)
)
features["next.reward"] = Value(dtype="float32", id=None)
features["seed"] = Value(dtype="int64", id=None)
features["next.success"] = Value(dtype="bool", id=None)
features["episode_index"] = Value(dtype="int64", id=None)
features["frame_index"] = Value(dtype="int64", id=None)
features["timestamp"] = Value(dtype="float32", id=None)
features["next.done"] = Value(dtype="bool", id=None)
features["index"] = Value(dtype="int64", id=None)
hf_dataset = Dataset.from_dict(data_dict, features=Features(features))
hf_dataset.set_transform(hf_transform_to_torch)
return hf_dataset
########################################################################################
# Control modes
########################################################################################
def teleoperate(env, robot: Robot, teleop_time_s=None, **kwargs):
env = env()
env.reset()
read_leader, command_queue = init_read_leader(robot, **kwargs)
start_teleop_t = time.perf_counter()
read_leader.start()
while True:
action = command_queue.get()
env.step(np.expand_dims(action, 0))
if teleop_time_s is not None and time.perf_counter() - start_teleop_t > teleop_time_s:
read_leader.terminate()
command_queue.close()
print("Teleoperation processes finished.")
break
def record(
env,
robot: Robot,
policy: torch.nn.Module | None = None,
policy_cfg: DictConfig | None = None,
fps: int | None = None,
root="data",
repo_id="lerobot/debug",
episode_time_s=30,
num_episodes=50,
video=True,
run_compute_stats=True,
push_to_hub=True,
tags=None,
num_image_writers_per_camera=4,
force_override=False,
visualize_images=0,
**kwargs
):
local_dir = Path(root) / repo_id
if local_dir.exists() and force_override:
shutil.rmtree(local_dir)
episodes_dir = local_dir / "episodes"
episodes_dir.mkdir(parents=True, exist_ok=True)
videos_dir = local_dir / "videos"
videos_dir.mkdir(parents=True, exist_ok=True)
# Logic to resume data recording
rec_info_path = episodes_dir / "data_recording_info.json"
if rec_info_path.exists():
with open(rec_info_path) as f:
rec_info = json.load(f)
episode_index = rec_info["last_episode_index"] + 1
else:
episode_index = 0
if is_headless():
logging.warning(
"Headless environment detected. On-screen cameras display and keyboard inputs will not be available."
)
# Allow to exit early while recording an episode or resetting the environment,
# by tapping the right arrow key '->'. This might require a sudo permission
# to allow your terminal to monitor keyboard events.
exit_early = False
rerecord_episode = False
stop_recording = False
# Only import pynput if not in a headless environment
if not is_headless():
from pynput import keyboard
def on_press(key):
nonlocal exit_early, rerecord_episode, stop_recording
try:
if key == keyboard.Key.right:
print("Right arrow key pressed. Exiting loop...")
exit_early = True
elif key == keyboard.Key.left:
print("Left arrow key pressed. Exiting loop and rerecord the last episode...")
rerecord_episode = True
exit_early = True
elif key == keyboard.Key.esc:
print("Escape key pressed. Stopping data recording...")
stop_recording = True
exit_early = True
except Exception as e:
print(f"Error handling key press: {e}")
listener = keyboard.Listener(on_press=on_press)
listener.start()
# create env
env = env()
# Save images using threads to reach high fps (30 and more)
# Using `with` to exist smoothly if an execption is raised.
futures = []
num_image_writers = num_image_writers_per_camera * 2 ###############
num_image_writers = max(num_image_writers, 1)
# Load policy if any
if policy is not None:
# Check device is available
device = get_safe_torch_device(policy_cfg.device, log=True)
policy.eval()
policy.to(device)
torch.backends.cudnn.benchmark = True
torch.backends.cuda.matmul.allow_tf32 = True
set_global_seed(policy_cfg.seed)
# override fps using policy fps
fps = policy_cfg.env.fps
else:
read_leader, command_queue = init_read_leader(robot, fps, **kwargs)
if not is_headless() and visualize_images:
observations_queue = multiprocessing.Queue(1000)
show_images = multiprocessing.Process(target=show_image_observations, args=(observations_queue, ))
show_images.start()
state_keys_dict = env_cfg.state_keys
image_keys = env_cfg.image_keys
with concurrent.futures.ThreadPoolExecutor(max_workers=num_image_writers) as executor:
# Start recording all episodes
# start reading from leader, disable stop flag in leader process
while episode_index < num_episodes:
logging.info(f"Recording episode {episode_index}")
say(f"Recording episode {episode_index}")
ep_dict = {'action':[], 'next.reward':[], 'next.success':[]}
for k in state_keys_dict:
ep_dict[k] = []
frame_index = 0
timestamp = 0
start_episode_t = time.perf_counter()
# save seed so we can restore the environment state when we want to replay the trajectories
seed = np.random.randint(0,1e5)
observation, info = env.reset(seed=seed)
#with stop_reading_leader.get_lock():
#stop_reading_leader.Value = 0
if policy is None:
read_leader.start()
while timestamp < episode_time_s:
if policy is None:
action = command_queue.get()
else:
action = get_action_from_policy(policy, observation)
for key in image_keys:
str_key = key if key.startswith('observation.images.') else 'observation.images.' + key
futures += [
executor.submit(
save_image, observation[key], str_key, frame_index, episode_index, videos_dir)
]
if not is_headless() and visualize_images:
observations_queue.put(observation)
for key, obs_key in state_keys_dict.items():
ep_dict[key].append(torch.from_numpy(observation[obs_key]))
# Advance the sim environment
if len(action.shape) == 1:
action = np.expand_dims(action, 0)
observation, reward, terminated, _ , info = env.step(action)
success = info.get('is_success', False)
ep_dict['action'].append(torch.from_numpy(action))
ep_dict['next.reward'].append(torch.tensor(reward))
ep_dict['next.success'].append(torch.tensor(success))
frame_index += 1
timestamp = time.perf_counter() - start_episode_t
if exit_early or terminated:
exit_early = False
break
# enable stop reading leader flag
#with stop_reading_leader.get_lock():
#stop_reading_leader.Value = 1
# TODO (michel_aractinig): temp fix until I figure out the problem with shared memory
# stop_reading_leader is blocking
if policy is None:
command_queue.close()
read_leader.terminate()
read_leader, command_queue = init_read_leader(robot, fps, **kwargs)
timestamp = 0
# During env reset we save the data and encode the videos
num_frames = frame_index
for key in image_keys:
if not key.startswith('observation.images.'):
key = 'observation.images.' + key
if video:
tmp_imgs_dir = videos_dir / f"{key}_episode_{episode_index:06d}"
fname = f"{key}_episode_{episode_index:06d}.mp4"
video_path = local_dir / "videos" / fname
if video_path.exists():
video_path.unlink()
# Store the reference to the video frame, even tho the videos are not yet encoded
ep_dict[key] = []
for i in range(num_frames):
ep_dict[key].append({"path": f"videos/{fname}", "timestamp": i / fps})
else:
imgs_dir = videos_dir / f"{key}_episode_{episode_index:06d}"
ep_dict[key] = []
for i in range(num_frames):
img_path = imgs_dir / f"frame_{i:06d}.png"
ep_dict[key].append({"path": str(img_path)})
for key in state_keys_dict:
ep_dict[key] = torch.vstack(ep_dict[key]) * 180.0 / np.pi
ep_dict['action'] = torch.vstack(ep_dict['action']) * 180.0 / np.pi
ep_dict['next.reward'] = torch.stack(ep_dict['next.reward'])
ep_dict['next.success'] = torch.stack(ep_dict['next.success'])
ep_dict["seed"] = torch.tensor([seed] * num_frames)
ep_dict["episode_index"] = torch.tensor([episode_index] * num_frames)
ep_dict["frame_index"] = torch.arange(0, num_frames, 1)
ep_dict["timestamp"] = torch.arange(0, num_frames, 1) / fps
done = torch.zeros(num_frames, dtype=torch.bool)
done[-1] = True
ep_dict["next.done"] = done
ep_path = episodes_dir / f"episode_{episode_index}.pth"
print("Saving episode dictionary...")
torch.save(ep_dict, ep_path)
rec_info = {
"last_episode_index": episode_index,
}
with open(rec_info_path, "w") as f:
json.dump(rec_info, f)
is_last_episode = stop_recording or (episode_index == (num_episodes - 1))
# Skip updating episode index which forces re-recording episode
if rerecord_episode:
rerecord_episode = False
continue
episode_index += 1
if is_last_episode:
logging.info("Done recording")
say("Done recording", blocking=True)
logging.info("Waiting for threads writing the images on disk to terminate...")
for _ in tqdm.tqdm(
concurrent.futures.as_completed(futures), total=len(futures), desc="Writting images"
):
pass
if not is_headless() and visualize_images:
show_images.terminate()
observations_queue.close()
break
else:
print('Waiting for two seconds before starting the next recording session.....')
busy_wait(2)
num_episodes = episode_index
if video:
logging.info("Encoding videos")
say("Encoding videos")
# Use ffmpeg to convert frames stored as png into mp4 videos
for episode_index in tqdm.tqdm(range(num_episodes)):
for key in image_keys:
if not key.startswith('observation.images.'):
key = 'observation.images.' + key
tmp_imgs_dir = videos_dir / f"{key}_episode_{episode_index:06d}"
fname = f"{key}_episode_{episode_index:06d}.mp4"
video_path = local_dir / "videos" / fname
if video_path.exists():
# Skip if video is already encoded. Could be the case when resuming data recording.
continue
# note: `encode_video_frames` is a blocking call. Making it asynchronous shouldn't speedup encoding,
# since video encoding with ffmpeg is already using multithreading.
encode_video_frames(tmp_imgs_dir, video_path, fps, overwrite=True)
shutil.rmtree(tmp_imgs_dir)
logging.info("Concatenating episodes")
ep_dicts = []
for episode_index in tqdm.tqdm(range(num_episodes)):
ep_path = episodes_dir / f"episode_{episode_index}.pth"
ep_dict = torch.load(ep_path)
ep_dicts.append(ep_dict)
data_dict = concatenate_episodes(ep_dicts)
total_frames = data_dict["frame_index"].shape[0]
data_dict["index"] = torch.arange(0, total_frames, 1)
hf_dataset = create_rl_hf_dataset(data_dict)
episode_data_index = calculate_episode_data_index(hf_dataset)
info = {
"codebase_version": CODEBASE_VERSION,
"fps": fps,
"video": video,
}
if video:
info["encoding"] = get_default_encoding()
lerobot_dataset = LeRobotDataset.from_preloaded(
repo_id=repo_id,
hf_dataset=hf_dataset,
episode_data_index=episode_data_index,
info=info,
videos_dir=videos_dir,
)
if run_compute_stats:
logging.info("Computing dataset statistics")
say("Computing dataset statistics")
stats = compute_stats(lerobot_dataset)
lerobot_dataset.stats = stats
else:
stats = {}
logging.info("Skipping computation of the dataset statistics")
hf_dataset = hf_dataset.with_format(None) # to remove transforms that cant be saved
hf_dataset.save_to_disk(str(local_dir / "train"))
meta_data_dir = local_dir / "meta_data"
save_meta_data(info, stats, episode_data_index, meta_data_dir)
if push_to_hub:
hf_dataset.push_to_hub(repo_id, revision="main")
push_meta_data_to_hub(repo_id, meta_data_dir, revision="main")
push_dataset_card_to_hub(repo_id, revision="main", tags=tags)
if video:
push_videos_to_hub(repo_id, videos_dir, revision="main")
create_branch(repo_id, repo_type="dataset", branch=CODEBASE_VERSION)
logging.info("Exiting")
say("Exiting")
return lerobot_dataset
def replay(env,
episodes: list,
fps: int | None = None,
root="data",
repo_id="lerobot/debug"):
env = env()
local_dir = Path(root) / repo_id
if not local_dir.exists():
raise ValueError(local_dir)
dataset = LeRobotDataset(repo_id, root=root)
items = dataset.hf_dataset.select_columns("action")
seeds = dataset.hf_dataset.select_columns("seed")['seed']
for episode in episodes:
from_idx = dataset.episode_data_index["from"][episode].item()
to_idx = dataset.episode_data_index["to"][episode].item()
env.reset(seed=seeds[from_idx].item())
logging.info("Replaying episode")
say("Replaying episode", blocking=True)
for idx in range(from_idx, to_idx):
start_episode_t = time.perf_counter()
action = items[idx]["action"]
env.step(action.numpy() * np.pi / 180.0)
dt_s = time.perf_counter() - start_episode_t
busy_wait(1 / fps - dt_s)
# wait before playing next episode
busy_wait(5)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers(dest="mode", required=True)
# Set common options for all the subparsers
base_parser = argparse.ArgumentParser(add_help=False)
base_parser.add_argument(
"--robot-path",
type=str,
default="lerobot/configs/robot/koch.yaml",
help="Path to robot yaml file used to instantiate the robot using `make_robot` factory function.",
)
base_parser.add_argument(
"--sim-config",
help="Path to a yaml config you want to use for initializing a sim environment based on gym ",
)
parser_teleop = subparsers.add_parser("teleoperate", parents=[base_parser])
parser_teleop.add_argument(
"--fps", type=none_or_int, default=None, help="Frames per second (set to None to disable)"
)
parser_record = subparsers.add_parser("record", parents=[base_parser])
parser_record.add_argument(
"--fps", type=none_or_int, default=None, help="Frames per second (set to None to disable)"
)
parser_record.add_argument(
"--root",
type=Path,
default="data",
help="Root directory where the dataset will be stored locally at '{root}/{repo_id}' (e.g. 'data/hf_username/dataset_name').",
)
parser_record.add_argument(
"--repo-id",
type=str,
default="lerobot/test",
help="Dataset identifier. By convention it should match '{hf_username}/{dataset_name}' (e.g. `lerobot/test`).",
)
parser_record.add_argument(
"--episode-time-s",
type=int,
default=60,
help="Number of seconds for data recording for each episode.",
)
parser_record.add_argument(
"--reset-time-s",
type=int,
default=60,
help="Number of seconds for resetting the environment after each episode.",
)
parser_record.add_argument("--num-episodes", type=int, default=50, help="Number of episodes to record.")
parser_record.add_argument(
"--run-compute-stats",
type=int,
default=1,
help="By default, run the computation of the data statistics at the end of data collection. Compute intensive and not required to just replay an episode.",
)
parser_record.add_argument(
"--push-to-hub",
type=int,
default=1,
help="Upload dataset to Hugging Face hub.",
)
parser_record.add_argument(
"--tags",
type=str,
nargs="*",
help="Add tags to your dataset on the hub.",
)
parser_record.add_argument(
"--num-image-writers-per-camera",
type=int,
default=4,
help=(
"Number of threads writing the frames as png images on disk, per camera. "
"Too much threads might cause unstable teleoperation fps due to main thread being blocked. "
"Not enough threads might cause low camera fps."
),
)
parser_record.add_argument(
"--force-override",
type=int,
default=0,
help="By default, data recording is resumed. When set to 1, delete the local directory and start data recording from scratch.",
)
parser_record.add_argument(
"--visualize-images",
type=int,
default=0,
help="Visualize image observations with opencv.",
)
parser_record.add_argument(
"-p",
"--pretrained-policy-name-or-path",
type=str,
help=(
"Either the repo ID of a model hosted on the Hub or a path to a directory containing weights "
"saved using `Policy.save_pretrained`."
),
)
parser_record.add_argument(
"--policy-overrides",
type=str,
nargs="*",
help="Any key=value arguments to override config values (use dots for.nested=overrides)",
)
parser_replay = subparsers.add_parser("replay", parents=[base_parser])
parser_replay.add_argument(
"--fps", type=none_or_int, default=None, help="Frames per second (set to None to disable)"
)
parser_replay.add_argument(
"--root",
type=Path,
default="data",
help="Root directory where the dataset will be stored locally at '{root}/{repo_id}' (e.g. 'data/hf_username/dataset_name').",
)
parser_replay.add_argument(
"--repo-id",
type=str,
default="lerobot/test",
help="Dataset identifier. By convention it should match '{hf_username}/{dataset_name}' (e.g. `lerobot/test`).",
)
parser_replay.add_argument("--episodes", nargs='+', type=int, default=[0], help="Indices of the episodes to replay.")
args = parser.parse_args()
init_logging()
control_mode = args.mode
robot_path = args.robot_path
env_config_path = args.sim_config
kwargs = vars(args)
del kwargs["mode"]
del kwargs["robot_path"]
del kwargs["sim_config"]
# make gym env
env_cfg = init_hydra_config(env_config_path)
#env_fn = lambda: make_env(env_cfg, n_envs=1)
package_name = f"gym_{env_cfg.env.name}"
importlib.import_module(f"gym_{env_cfg.env.name}")
env_fn = lambda: gym.make(env_cfg.env.handle, disable_env_checker=True, **env_cfg.env.gym)
robot = None
if control_mode != 'replay':
# make robot
robot_overrides = ['~cameras', '~follower_arms']
robot_cfg = init_hydra_config(robot_path, robot_overrides)
robot = make_robot(robot_cfg)
kwargs.update(env_cfg.calibration)
if control_mode == "teleoperate":
teleoperate(env_fn, robot, **kwargs)
elif control_mode == "record":
pretrained_policy_name_or_path = args.pretrained_policy_name_or_path
policy_overrides = args.policy_overrides
del kwargs["pretrained_policy_name_or_path"]
del kwargs["policy_overrides"]
if pretrained_policy_name_or_path is not None:
pretrained_policy_path = get_pretrained_policy_path(pretrained_policy_name_or_path)
kwargs["policy_cfg"] = init_hydra_config(pretrained_policy_path / "config.yaml", policy_overrides)
kwargs["policy"] = make_policy(hydra_cfg=kwargs["policy_cfg"], pretrained_policy_name_or_path=pretrained_policy_path)
record(env_fn, robot, **kwargs)
elif control_mode == "replay":
replay(env_fn, **kwargs)
else:
raise ValueError(f"Invalid control mode: '{control_mode}', only valid modes are teleoperate, record and replay." )
if robot and robot.is_connected:
# Disconnect manually to avoid a "Core dump" during process
# termination due to camera threads not properly exiting.
robot.disconnect()

View File

@@ -158,14 +158,14 @@ def rollout(
action = action.to("cpu").numpy()
assert action.ndim == 2, "Action dimensions should be (batch, action_dim)"
# Apply the next action. TODO (michel_aractingi) temp fix
# Apply the next action.
observation, reward, terminated, truncated, info = env.step(action)
if render_callback is not None:
render_callback(env)
# VectorEnv stores is_success in `info["final_info"][env_index]["is_success"]`. "final_info" isn't
# available of none of the envs finished.
if "final_info" in info:
if "final_info" in info:
successes = [info["is_success"] if info is not None else False for info in info["final_info"]]
else:
successes = [False] * env.num_envs

View File

@@ -135,8 +135,8 @@ def update_policy(
# Optimizer's gradients are already unscaled, so scaler.step does not unscale them,
# although it still skips optimizer.step() if the gradients contain infs or NaNs.
#with lock if lock is not None else nullcontext():
grad_scaler.step(optimizer)
with lock if lock is not None else nullcontext():
grad_scaler.step(optimizer)
# Updates the scale for next iteration.
grad_scaler.update()
@@ -311,11 +311,6 @@ def train(cfg: DictConfig, out_dir: str | None = None, job_name: str | None = No
logging.info("make_dataset")
offline_dataset = make_dataset(cfg)
remove_indices=['observation.images.image_top', 'observation.velocity', 'seed']
# temp fix michel_Aractingi TODO
offline_dataset.hf_dataset = offline_dataset.hf_dataset.remove_columns(remove_indices)
if isinstance(offline_dataset, MultiLeRobotDataset):
logging.info(
"Multiple datasets were provided. Applied the following index mapping to the provided datasets: "
@@ -509,9 +504,6 @@ def train(cfg: DictConfig, out_dir: str | None = None, job_name: str | None = No
num_samples=len(concat_dataset),
replacement=True,
)
# TODO michel_aractingi temp fix for incosistent keys
dataloader = torch.utils.data.DataLoader(
concat_dataset,
batch_size=cfg.training.batch_size,
@@ -546,8 +538,8 @@ def train(cfg: DictConfig, out_dir: str | None = None, job_name: str | None = No
def sample_trajectory_and_update_buffer():
nonlocal rollout_start_seed
#with lock:
online_rollout_policy.load_state_dict(policy.state_dict())
with lock:
online_rollout_policy.load_state_dict(policy.state_dict())
online_rollout_policy.eval()
start_rollout_time = time.perf_counter()
with torch.no_grad():
@@ -564,35 +556,37 @@ def train(cfg: DictConfig, out_dir: str | None = None, job_name: str | None = No
)
online_rollout_s = time.perf_counter() - start_rollout_time
#with lock:
start_update_buffer_time = time.perf_counter()
online_dataset.add_data(eval_info["episodes"])
# Update the concatenated dataset length used during sampling.
concat_dataset.cumulative_sizes = concat_dataset.cumsum(concat_dataset.datasets)
# Update the sampling weights.
sampler.weights = compute_sampler_weights(
offline_dataset,
offline_drop_n_last_frames=cfg.training.get("drop_n_last_frames", 0),
online_dataset=online_dataset,
# +1 because online rollouts return an extra frame for the "final observation". Note: we don't have
# this final observation in the offline datasets, but we might add them in future.
online_drop_n_last_frames=cfg.training.get("drop_n_last_frames", 0) + 1,
online_sampling_ratio=cfg.training.online_sampling_ratio,
)
sampler.num_samples = len(concat_dataset)
update_online_buffer_s = time.perf_counter() - start_update_buffer_time
with lock:
start_update_buffer_time = time.perf_counter()
online_dataset.add_data(eval_info["episodes"])
# Update the concatenated dataset length used during sampling.
concat_dataset.cumulative_sizes = concat_dataset.cumsum(concat_dataset.datasets)
# Update the sampling weights.
sampler.weights = compute_sampler_weights(
offline_dataset,
offline_drop_n_last_frames=cfg.training.get("drop_n_last_frames", 0),
online_dataset=online_dataset,
# +1 because online rollouts return an extra frame for the "final observation". Note: we don't have
# this final observation in the offline datasets, but we might add them in future.
online_drop_n_last_frames=cfg.training.get("drop_n_last_frames", 0) + 1,
online_sampling_ratio=cfg.training.online_sampling_ratio,
)
sampler.num_samples = len(concat_dataset)
update_online_buffer_s = time.perf_counter() - start_update_buffer_time
return online_rollout_s, update_online_buffer_s
# TODO remove parallelization for sim
#future = executor.submit(sample_trajectory_and_update_buffer)
future = executor.submit(sample_trajectory_and_update_buffer)
# If we aren't doing async rollouts, or if we haven't yet gotten enough examples in our buffer, wait
# here until the rollout and buffer update is done, before proceeding to the policy update steps.
if (
not cfg.training.do_online_rollout_async
or len(online_dataset) <= cfg.training.online_buffer_seed_size
):
online_rollout_s, update_online_buffer_s = sample_trajectory_and_update_buffer()#future.result()
online_rollout_s, update_online_buffer_s = future.result()
if len(online_dataset) <= cfg.training.online_buffer_seed_size:
logging.info(
@@ -602,15 +596,12 @@ def train(cfg: DictConfig, out_dir: str | None = None, job_name: str | None = No
policy.train()
for _ in range(cfg.training.online_steps_between_rollouts):
#with lock:
start_time = time.perf_counter()
batch = next(dl_iter)
dataloading_s = time.perf_counter() - start_time
with lock:
start_time = time.perf_counter()
batch = next(dl_iter)
dataloading_s = time.perf_counter() - start_time
for key in batch:
# TODO michel aractingi convert float64 to float32 for mac
if batch[key].dtype == torch.float64:
batch[key] = batch[key].float()
batch[key] = batch[key].to(cfg.device, non_blocking=True)
train_info = update_policy(
@@ -628,8 +619,8 @@ def train(cfg: DictConfig, out_dir: str | None = None, job_name: str | None = No
train_info["online_rollout_s"] = online_rollout_s
train_info["update_online_buffer_s"] = update_online_buffer_s
train_info["await_update_online_buffer_s"] = await_update_online_buffer_s
#with lock:
train_info["online_buffer_size"] = len(online_dataset)
with lock:
train_info["online_buffer_size"] = len(online_dataset)
if step % cfg.training.log_freq == 0:
log_train_info(logger, train_info, step, cfg, online_dataset, is_online=True)
@@ -643,10 +634,10 @@ def train(cfg: DictConfig, out_dir: str | None = None, job_name: str | None = No
# If we're doing async rollouts, we should now wait until we've completed them before proceeding
# to do the next batch of rollouts.
#if future.running():
#start = time.perf_counter()
#online_rollout_s, update_online_buffer_s = sample_trajectory_and_update_buffer()#future.result()
#await_update_online_buffer_s = time.perf_counter() - start
if future.running():
start = time.perf_counter()
online_rollout_s, update_online_buffer_s = future.result()
await_update_online_buffer_s = time.perf_counter() - start
if online_step >= cfg.training.online_steps:
break

View File

@@ -52,9 +52,8 @@ def is_robot_available(robot_type):
print(f"\nInstall module '{e.name}'")
elif isinstance(e, SerialException):
print("\nNo physical motors bus detected.")
else:
traceback.print_exc()
traceback.print_exc()
return False
@@ -78,9 +77,8 @@ def is_camera_available(camera_type):
print(f"\nInstall module '{e.name}'")
elif isinstance(e, ValueError) and "camera_index" in e.args[0]:
print("\nNo physical camera detected.")
else:
traceback.print_exc()
traceback.print_exc()
return False
@@ -104,9 +102,8 @@ def is_motor_available(motor_type):
print(f"\nInstall module '{e.name}'")
elif isinstance(e, SerialException):
print("\nNo physical motors bus detected.")
else:
traceback.print_exc()
traceback.print_exc()
return False

View File

@@ -23,7 +23,6 @@ pytest -sx 'tests/test_control_robot.py::test_teleoperate[aloha-True]'
```
"""
import multiprocessing
from pathlib import Path
import pytest
@@ -38,7 +37,7 @@ from tests.utils import DEFAULT_CONFIG_PATH, DEVICE, TEST_ROBOT_TYPES, require_r
@pytest.mark.parametrize("robot_type, mock", TEST_ROBOT_TYPES)
@require_robot
def test_teleoperate(tmpdir, request, robot_type, mock):
if mock and robot_type != "aloha":
if mock:
request.getfixturevalue("patch_builtins_input")
# Create an empty calibration directory to trigger manual calibration
@@ -79,7 +78,7 @@ def test_record_without_cameras(tmpdir, request, robot_type, mock):
# Avoid using cameras
overrides = ["~cameras"]
if mock and robot_type != "aloha":
if mock:
request.getfixturevalue("patch_builtins_input")
# Create an empty calibration directory to trigger manual calibration
@@ -102,14 +101,13 @@ def test_record_without_cameras(tmpdir, request, robot_type, mock):
run_compute_stats=False,
push_to_hub=False,
video=False,
play_sounds=False,
)
@pytest.mark.parametrize("robot_type, mock", TEST_ROBOT_TYPES)
@require_robot
def test_record_and_replay_and_policy(tmpdir, request, robot_type, mock):
if mock and robot_type != "aloha":
if mock:
request.getfixturevalue("patch_builtins_input")
# Create an empty calibration directory to trigger manual calibration
@@ -117,9 +115,12 @@ def test_record_and_replay_and_policy(tmpdir, request, robot_type, mock):
calibration_dir = Path(tmpdir) / robot_type
overrides = [f"calibration_dir={calibration_dir}"]
else:
# Use the default .cache/calibration folder when mock=False or for aloha
# Use the default .cache/calibration folder when mock=False
overrides = None
if robot_type == "aloha":
pytest.skip("TODO(rcadene): enable test once aloha_real and act_aloha_real are merged")
env_name = "koch_real"
policy_name = "act_koch_real"
@@ -140,10 +141,9 @@ def test_record_and_replay_and_policy(tmpdir, request, robot_type, mock):
video=False,
# TODO(rcadene): display cameras through cv2 sometimes crashes on mac
display_cameras=False,
play_sounds=False,
)
replay(robot, episode=0, fps=30, root=root, repo_id=repo_id, play_sounds=False)
replay(robot, episode=0, fps=30, root=root, repo_id=repo_id)
# TODO(rcadene, aliberts): rethink this design
if robot_type == "aloha":
@@ -171,27 +171,6 @@ def test_record_and_replay_and_policy(tmpdir, request, robot_type, mock):
policy = make_policy(hydra_cfg=cfg, dataset_stats=dataset.stats)
# In `examples/9_use_aloha.md`, we advise using `num_image_writer_processes=1`
# during inference, to reach constent fps, so we test this here.
if robot_type == "aloha":
num_image_writer_processes = 1
# `multiprocessing.set_start_method("spawn", force=True)` avoids a hanging issue
# before exiting pytest. However, it outputs the following error in the log:
# Traceback (most recent call last):
# File "<string>", line 1, in <module>
# File "/Users/rcadene/miniconda3/envs/lerobot/lib/python3.10/multiprocessing/spawn.py", line 116, in spawn_main
# exitcode = _main(fd, parent_sentinel)
# File "/Users/rcadene/miniconda3/envs/lerobot/lib/python3.10/multiprocessing/spawn.py", line 126, in _main
# self = reduction.pickle.load(from_parent)
# File "/Users/rcadene/miniconda3/envs/lerobot/lib/python3.10/multiprocessing/synchronize.py", line 110, in __setstate__
# self._semlock = _multiprocessing.SemLock._rebuild(*state)
# FileNotFoundError: [Errno 2] No such file or directory
# TODO(rcadene, aliberts): fix FileNotFoundError in multiprocessing
multiprocessing.set_start_method("spawn", force=True)
else:
num_image_writer_processes = 0
record(
robot,
policy,
@@ -203,8 +182,6 @@ def test_record_and_replay_and_policy(tmpdir, request, robot_type, mock):
push_to_hub=False,
video=False,
display_cameras=False,
play_sounds=False,
num_image_writer_processes=num_image_writer_processes,
)
del robot