Compare commits
13 Commits
user/fraca
...
user/azoui
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0961836de5 | ||
|
|
dcb24837e1 | ||
|
|
a75d00970f | ||
|
|
c7b204f4a6 | ||
|
|
4df18de636 | ||
|
|
8dc69c6126 | ||
|
|
7d481e6048 | ||
|
|
b43ece8934 | ||
|
|
c10c5a0e64 | ||
|
|
a8db91c40e | ||
|
|
0f5f7ac780 | ||
|
|
768e36660d | ||
|
|
790d6740ba |
@@ -48,7 +48,7 @@ repos:
|
||||
- id: pyupgrade
|
||||
|
||||
- repo: https://github.com/astral-sh/ruff-pre-commit
|
||||
rev: v0.11.4
|
||||
rev: v0.11.5
|
||||
hooks:
|
||||
- id: ruff
|
||||
args: [--fix]
|
||||
@@ -57,7 +57,7 @@ repos:
|
||||
|
||||
##### Security #####
|
||||
- repo: https://github.com/gitleaks/gitleaks
|
||||
rev: v8.24.2
|
||||
rev: v8.24.3
|
||||
hooks:
|
||||
- id: gitleaks
|
||||
|
||||
|
||||
@@ -103,13 +103,20 @@ When using `miniconda`, install `ffmpeg` in your environment:
|
||||
conda install ffmpeg -c conda-forge
|
||||
```
|
||||
|
||||
> **NOTE:** This usually installs `ffmpeg 7.X` for your platform compiled with the `libsvtav1` encoder. If `libsvtav1` is not supported (check supported encoders with `ffmpeg -encoders`), you can:
|
||||
> - _[On any platform]_ Explicitly install `ffmpeg 7.X` using:
|
||||
> ```bash
|
||||
> conda install ffmpeg=7.1.1 -c conda-forge
|
||||
> ```
|
||||
> - _[On Linux only]_ Install [ffmpeg build dependencies](https://trac.ffmpeg.org/wiki/CompilationGuide/Ubuntu#GettheDependencies) and [compile ffmpeg from source with libsvtav1](https://trac.ffmpeg.org/wiki/CompilationGuide/Ubuntu#libsvtav1), and make sure you use the corresponding ffmpeg binary to your install with `which ffmpeg`.
|
||||
|
||||
Install 🤗 LeRobot:
|
||||
```bash
|
||||
pip install -e .
|
||||
```
|
||||
|
||||
> **NOTE:** If you encounter build errors, you may need to install additional dependencies (`cmake`, `build-essential`, and `ffmpeg libs`). On Linux, run:
|
||||
`sudo apt-get install cmake build-essential python-dev pkg-config libavformat-dev libavcodec-dev libavdevice-dev libavutil-dev libswscale-dev libswresample-dev libavfilter-dev pkg-config`. For other systems, see: [Compiling PyAV](https://pyav.org/docs/develop/overview/installation.html#bring-your-own-ffmpeg)
|
||||
`sudo apt-get install cmake build-essential python3-dev pkg-config libavformat-dev libavcodec-dev libavdevice-dev libavutil-dev libswscale-dev libswresample-dev libavfilter-dev pkg-config`. For other systems, see: [Compiling PyAV](https://pyav.org/docs/develop/overview/installation.html#bring-your-own-ffmpeg)
|
||||
|
||||
For simulations, 🤗 LeRobot comes with gymnasium environments that can be installed as extras:
|
||||
- [aloha](https://github.com/huggingface/gym-aloha)
|
||||
|
||||
@@ -14,7 +14,7 @@ RUN apt-get update && apt-get install -y --no-install-recommends \
|
||||
tcpdump sysstat screen tmux \
|
||||
libglib2.0-0 libgl1-mesa-glx libegl1-mesa \
|
||||
speech-dispatcher portaudio19-dev libgeos-dev \
|
||||
python${PYTHON_VERSION} python${PYTHON_VERSION}-venv \
|
||||
python${PYTHON_VERSION} python${PYTHON_VERSION}-venv python${PYTHON_VERSION}-dev \
|
||||
&& apt-get clean && rm -rf /var/lib/apt/lists/*
|
||||
|
||||
# Install ffmpeg build dependencies. See:
|
||||
|
||||
@@ -4,7 +4,7 @@ This tutorial will explain the training script, how to use it, and particularly
|
||||
|
||||
## The training script
|
||||
|
||||
LeRobot offers a training script at [`lerobot/scripts/train.py`](../../lerobot/scripts/train.py). At a high level it does the following:
|
||||
LeRobot offers a training script at [`lerobot/scripts/train.py`](../lerobot/scripts/train.py). At a high level it does the following:
|
||||
|
||||
- Initialize/load a configuration for the following steps using.
|
||||
- Instantiates a dataset.
|
||||
@@ -21,7 +21,7 @@ In the training script, the main function `train` expects a `TrainPipelineConfig
|
||||
def train(cfg: TrainPipelineConfig):
|
||||
```
|
||||
|
||||
You can inspect the `TrainPipelineConfig` defined in [`lerobot/configs/train.py`](../../lerobot/configs/train.py) (which is heavily commented and meant to be a reference to understand any option)
|
||||
You can inspect the `TrainPipelineConfig` defined in [`lerobot/configs/train.py`](../lerobot/configs/train.py) (which is heavily commented and meant to be a reference to understand any option)
|
||||
|
||||
When running the script, inputs for the command line are parsed thanks to the `@parser.wrap()` decorator and an instance of this class is automatically generated. Under the hood, this is done with [Draccus](https://github.com/dlwh/draccus) which is a tool dedicated for this purpose. If you're familiar with Hydra, Draccus can similarly load configurations from config files (.json, .yaml) and also override their values through command line inputs. Unlike Hydra, these configurations are pre-defined in the code through dataclasses rather than being defined entirely in config files. This allows for more rigorous serialization/deserialization, typing, and to manipulate configuration as objects directly in the code and not as dictionaries or namespaces (which enables nice features in an IDE such as autocomplete, jump-to-def, etc.)
|
||||
|
||||
@@ -50,7 +50,7 @@ By default, every field takes its default value specified in the dataclass. If a
|
||||
|
||||
## Specifying values from the CLI
|
||||
|
||||
Let's say that we want to train [Diffusion Policy](../../lerobot/common/policies/diffusion) on the [pusht](https://huggingface.co/datasets/lerobot/pusht) dataset, using the [gym_pusht](https://github.com/huggingface/gym-pusht) environment for evaluation. The command to do so would look like this:
|
||||
Let's say that we want to train [Diffusion Policy](../lerobot/common/policies/diffusion) on the [pusht](https://huggingface.co/datasets/lerobot/pusht) dataset, using the [gym_pusht](https://github.com/huggingface/gym-pusht) environment for evaluation. The command to do so would look like this:
|
||||
```bash
|
||||
python lerobot/scripts/train.py \
|
||||
--dataset.repo_id=lerobot/pusht \
|
||||
@@ -60,10 +60,10 @@ python lerobot/scripts/train.py \
|
||||
|
||||
Let's break this down:
|
||||
- To specify the dataset, we just need to specify its `repo_id` on the hub which is the only required argument in the `DatasetConfig`. The rest of the fields have default values and in this case we are fine with those so we can just add the option `--dataset.repo_id=lerobot/pusht`.
|
||||
- To specify the policy, we can just select diffusion policy using `--policy` appended with `.type`. Here, `.type` is a special argument which allows us to select config classes inheriting from `draccus.ChoiceRegistry` and that have been decorated with the `register_subclass()` method. To have a better explanation of this feature, have a look at this [Draccus demo](https://github.com/dlwh/draccus?tab=readme-ov-file#more-flexible-configuration-with-choice-types). In our code, we use this mechanism mainly to select policies, environments, robots, and some other components like optimizers. The policies available to select are located in [lerobot/common/policies](../../lerobot/common/policies)
|
||||
- Similarly, we select the environment with `--env.type=pusht`. The different environment configs are available in [`lerobot/common/envs/configs.py`](../../lerobot/common/envs/configs.py)
|
||||
- To specify the policy, we can just select diffusion policy using `--policy` appended with `.type`. Here, `.type` is a special argument which allows us to select config classes inheriting from `draccus.ChoiceRegistry` and that have been decorated with the `register_subclass()` method. To have a better explanation of this feature, have a look at this [Draccus demo](https://github.com/dlwh/draccus?tab=readme-ov-file#more-flexible-configuration-with-choice-types). In our code, we use this mechanism mainly to select policies, environments, robots, and some other components like optimizers. The policies available to select are located in [lerobot/common/policies](../lerobot/common/policies)
|
||||
- Similarly, we select the environment with `--env.type=pusht`. The different environment configs are available in [`lerobot/common/envs/configs.py`](../lerobot/common/envs/configs.py)
|
||||
|
||||
Let's see another example. Let's say you've been training [ACT](../../lerobot/common/policies/act) on [lerobot/aloha_sim_insertion_human](https://huggingface.co/datasets/lerobot/aloha_sim_insertion_human) using the [gym-aloha](https://github.com/huggingface/gym-aloha) environment for evaluation with:
|
||||
Let's see another example. Let's say you've been training [ACT](../lerobot/common/policies/act) on [lerobot/aloha_sim_insertion_human](https://huggingface.co/datasets/lerobot/aloha_sim_insertion_human) using the [gym-aloha](https://github.com/huggingface/gym-aloha) environment for evaluation with:
|
||||
```bash
|
||||
python lerobot/scripts/train.py \
|
||||
--policy.type=act \
|
||||
@@ -74,7 +74,7 @@ python lerobot/scripts/train.py \
|
||||
> Notice we added `--output_dir` to explicitly tell where to write outputs from this run (checkpoints, training state, configs etc.). This is not mandatory and if you don't specify it, a default directory will be created from the current date and time, env.type and policy.type. This will typically look like `outputs/train/2025-01-24/16-10-05_aloha_act`.
|
||||
|
||||
We now want to train a different policy for aloha on another task. We'll change the dataset and use [lerobot/aloha_sim_transfer_cube_human](https://huggingface.co/datasets/lerobot/aloha_sim_transfer_cube_human) instead. Of course, we also need to change the task of the environment as well to match this other task.
|
||||
Looking at the [`AlohaEnv`](../../lerobot/common/envs/configs.py) config, the task is `"AlohaInsertion-v0"` by default, which corresponds to the task we trained on in the command above. The [gym-aloha](https://github.com/huggingface/gym-aloha?tab=readme-ov-file#description) environment also has the `AlohaTransferCube-v0` task which corresponds to this other task we want to train on. Putting this together, we can train this new policy on this different task using:
|
||||
Looking at the [`AlohaEnv`](../lerobot/common/envs/configs.py) config, the task is `"AlohaInsertion-v0"` by default, which corresponds to the task we trained on in the command above. The [gym-aloha](https://github.com/huggingface/gym-aloha?tab=readme-ov-file#description) environment also has the `AlohaTransferCube-v0` task which corresponds to this other task we want to train on. Putting this together, we can train this new policy on this different task using:
|
||||
```bash
|
||||
python lerobot/scripts/train.py \
|
||||
--policy.type=act \
|
||||
|
||||
@@ -830,11 +830,6 @@ It contains:
|
||||
- `dtRphone:33.84 (29.5hz)` which is the delta time of capturing an image from the phone camera in the thread running asynchronously.
|
||||
|
||||
Troubleshooting:
|
||||
- On Linux, if you encounter any issue during video encoding with `ffmpeg: unknown encoder libsvtav1`, you can:
|
||||
- install with conda-forge by running `conda install -c conda-forge ffmpeg` (it should be compiled with `libsvtav1`),
|
||||
> **NOTE:** This usually installs `ffmpeg 7.X` for your platform (check the version installed with `ffmpeg -encoders | grep libsvtav1`). If it isn't `ffmpeg 7.X` or lacks `libsvtav1` support, you can explicitly install `ffmpeg 7.X` using: `conda install ffmpeg=7.1.1 -c conda-forge`
|
||||
- or, install [ffmpeg build dependencies](https://trac.ffmpeg.org/wiki/CompilationGuide/Ubuntu#GettheDependencies) and [compile ffmpeg from source with libsvtav1](https://trac.ffmpeg.org/wiki/CompilationGuide/Ubuntu#libsvtav1),
|
||||
- and, make sure you use the corresponding ffmpeg binary to your install with `which ffmpeg`.
|
||||
- On Linux, if the left and right arrow keys and escape key don't have any effect during data recording, make sure you've set the `$DISPLAY` environment variable. See [pynput limitations](https://pynput.readthedocs.io/en/latest/limitations.html#linux).
|
||||
|
||||
At the end of data recording, your dataset will be uploaded on your Hugging Face page (e.g. https://huggingface.co/datasets/cadene/koch_test) that you can obtain by running:
|
||||
|
||||
@@ -1 +0,0 @@
|
||||
# Common mocks for robot devices and testing
|
||||
@@ -1,101 +0,0 @@
|
||||
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
from functools import cache
|
||||
|
||||
import numpy as np
|
||||
|
||||
CAP_V4L2 = 200
|
||||
CAP_DSHOW = 700
|
||||
CAP_AVFOUNDATION = 1200
|
||||
CAP_ANY = -1
|
||||
|
||||
CAP_PROP_FPS = 5
|
||||
CAP_PROP_FRAME_WIDTH = 3
|
||||
CAP_PROP_FRAME_HEIGHT = 4
|
||||
COLOR_RGB2BGR = 4
|
||||
COLOR_BGR2RGB = 4
|
||||
|
||||
ROTATE_90_COUNTERCLOCKWISE = 2
|
||||
ROTATE_90_CLOCKWISE = 0
|
||||
ROTATE_180 = 1
|
||||
|
||||
|
||||
@cache
|
||||
def _generate_image(width: int, height: int):
|
||||
return np.random.randint(0, 256, size=(height, width, 3), dtype=np.uint8)
|
||||
|
||||
|
||||
def cvtColor(color_image, color_conversion): # noqa: N802
|
||||
if color_conversion in [COLOR_RGB2BGR, COLOR_BGR2RGB]:
|
||||
return color_image[:, :, [2, 1, 0]]
|
||||
else:
|
||||
raise NotImplementedError(color_conversion)
|
||||
|
||||
|
||||
def rotate(color_image, rotation):
|
||||
if rotation is None:
|
||||
return color_image
|
||||
elif rotation == ROTATE_90_CLOCKWISE:
|
||||
return np.rot90(color_image, k=1)
|
||||
elif rotation == ROTATE_180:
|
||||
return np.rot90(color_image, k=2)
|
||||
elif rotation == ROTATE_90_COUNTERCLOCKWISE:
|
||||
return np.rot90(color_image, k=3)
|
||||
else:
|
||||
raise NotImplementedError(rotation)
|
||||
|
||||
|
||||
class VideoCapture:
|
||||
def __init__(self, *args, **kwargs):
|
||||
self._mock_dict = {
|
||||
CAP_PROP_FPS: 30,
|
||||
CAP_PROP_FRAME_WIDTH: 640,
|
||||
CAP_PROP_FRAME_HEIGHT: 480,
|
||||
}
|
||||
self._is_opened = True
|
||||
|
||||
def isOpened(self): # noqa: N802
|
||||
return self._is_opened
|
||||
|
||||
def set(self, propId: int, value: float) -> bool: # noqa: N803
|
||||
if not self._is_opened:
|
||||
raise RuntimeError("Camera is not opened")
|
||||
self._mock_dict[propId] = value
|
||||
return True
|
||||
|
||||
def get(self, propId: int) -> float: # noqa: N803
|
||||
if not self._is_opened:
|
||||
raise RuntimeError("Camera is not opened")
|
||||
value = self._mock_dict[propId]
|
||||
if value == 0:
|
||||
if propId == CAP_PROP_FRAME_HEIGHT:
|
||||
value = 480
|
||||
elif propId == CAP_PROP_FRAME_WIDTH:
|
||||
value = 640
|
||||
return value
|
||||
|
||||
def read(self):
|
||||
if not self._is_opened:
|
||||
raise RuntimeError("Camera is not opened")
|
||||
h = self.get(CAP_PROP_FRAME_HEIGHT)
|
||||
w = self.get(CAP_PROP_FRAME_WIDTH)
|
||||
ret = True
|
||||
return ret, _generate_image(width=w, height=h)
|
||||
|
||||
def release(self):
|
||||
self._is_opened = False
|
||||
|
||||
def __del__(self):
|
||||
if self._is_opened:
|
||||
self.release()
|
||||
@@ -1,148 +0,0 @@
|
||||
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import enum
|
||||
|
||||
import numpy as np
|
||||
|
||||
|
||||
class stream(enum.Enum): # noqa: N801
|
||||
color = 0
|
||||
depth = 1
|
||||
|
||||
|
||||
class format(enum.Enum): # noqa: N801
|
||||
rgb8 = 0
|
||||
z16 = 1
|
||||
|
||||
|
||||
class config: # noqa: N801
|
||||
def enable_device(self, device_id: str):
|
||||
self.device_enabled = device_id
|
||||
|
||||
def enable_stream(self, stream_type: stream, width=None, height=None, color_format=None, fps=None):
|
||||
self.stream_type = stream_type
|
||||
# Overwrite default values when possible
|
||||
self.width = 848 if width is None else width
|
||||
self.height = 480 if height is None else height
|
||||
self.color_format = format.rgb8 if color_format is None else color_format
|
||||
self.fps = 30 if fps is None else fps
|
||||
|
||||
|
||||
class RSColorProfile:
|
||||
def __init__(self, config):
|
||||
self.config = config
|
||||
|
||||
def fps(self):
|
||||
return self.config.fps
|
||||
|
||||
def width(self):
|
||||
return self.config.width
|
||||
|
||||
def height(self):
|
||||
return self.config.height
|
||||
|
||||
|
||||
class RSColorStream:
|
||||
def __init__(self, config):
|
||||
self.config = config
|
||||
|
||||
def as_video_stream_profile(self):
|
||||
return RSColorProfile(self.config)
|
||||
|
||||
|
||||
class RSProfile:
|
||||
def __init__(self, config):
|
||||
self.config = config
|
||||
|
||||
def get_stream(self, color_format):
|
||||
del color_format # unused
|
||||
return RSColorStream(self.config)
|
||||
|
||||
|
||||
class pipeline: # noqa: N801
|
||||
def __init__(self):
|
||||
self.started = False
|
||||
self.config = None
|
||||
|
||||
def start(self, config):
|
||||
self.started = True
|
||||
self.config = config
|
||||
return RSProfile(self.config)
|
||||
|
||||
def stop(self):
|
||||
if not self.started:
|
||||
raise RuntimeError("You need to start the camera before stop.")
|
||||
self.started = False
|
||||
self.config = None
|
||||
|
||||
def wait_for_frames(self, timeout_ms=50000):
|
||||
del timeout_ms # unused
|
||||
return RSFrames(self.config)
|
||||
|
||||
|
||||
class RSFrames:
|
||||
def __init__(self, config):
|
||||
self.config = config
|
||||
|
||||
def get_color_frame(self):
|
||||
return RSColorFrame(self.config)
|
||||
|
||||
def get_depth_frame(self):
|
||||
return RSDepthFrame(self.config)
|
||||
|
||||
|
||||
class RSColorFrame:
|
||||
def __init__(self, config):
|
||||
self.config = config
|
||||
|
||||
def get_data(self):
|
||||
data = np.ones((self.config.height, self.config.width, 3), dtype=np.uint8)
|
||||
# Create a difference between rgb and bgr
|
||||
data[:, :, 0] = 2
|
||||
return data
|
||||
|
||||
|
||||
class RSDepthFrame:
|
||||
def __init__(self, config):
|
||||
self.config = config
|
||||
|
||||
def get_data(self):
|
||||
return np.ones((self.config.height, self.config.width), dtype=np.uint16)
|
||||
|
||||
|
||||
class RSDevice:
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
def get_info(self, camera_info) -> str:
|
||||
del camera_info # unused
|
||||
# return fake serial number
|
||||
return "123456789"
|
||||
|
||||
|
||||
class context: # noqa: N801
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
def query_devices(self):
|
||||
return [RSDevice()]
|
||||
|
||||
|
||||
class camera_info: # noqa: N801
|
||||
# fake name
|
||||
name = "Intel RealSense D435I"
|
||||
|
||||
def __init__(self, serial_number):
|
||||
del serial_number
|
||||
pass
|
||||
@@ -1 +0,0 @@
|
||||
# Mocks for motor modules
|
||||
@@ -1,107 +0,0 @@
|
||||
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
"""Mocked classes and functions from dynamixel_sdk to allow for continuous integration
|
||||
and testing code logic that requires hardware and devices (e.g. robot arms, cameras)
|
||||
|
||||
Warning: These mocked versions are minimalist. They do not exactly mock every behaviors
|
||||
from the original classes and functions (e.g. return types might be None instead of boolean).
|
||||
"""
|
||||
|
||||
# from dynamixel_sdk import COMM_SUCCESS
|
||||
|
||||
DEFAULT_BAUDRATE = 9_600
|
||||
COMM_SUCCESS = 0 # tx or rx packet communication success
|
||||
|
||||
|
||||
def convert_to_bytes(value, bytes):
|
||||
# TODO(rcadene): remove need to mock `convert_to_bytes` by implemented the inverse transform
|
||||
# `convert_bytes_to_value`
|
||||
del bytes # unused
|
||||
return value
|
||||
|
||||
|
||||
def get_default_motor_values(motor_index):
|
||||
return {
|
||||
# Key (int) are from X_SERIES_CONTROL_TABLE
|
||||
7: motor_index, # ID
|
||||
8: DEFAULT_BAUDRATE, # Baud_rate
|
||||
10: 0, # Drive_Mode
|
||||
64: 0, # Torque_Enable
|
||||
# Set 2560 since calibration values for Aloha gripper is between start_pos=2499 and end_pos=3144
|
||||
# For other joints, 2560 will be autocorrected to be in calibration range
|
||||
132: 2560, # Present_Position
|
||||
}
|
||||
|
||||
|
||||
class PortHandler:
|
||||
def __init__(self, port):
|
||||
self.port = port
|
||||
# factory default baudrate
|
||||
self.baudrate = DEFAULT_BAUDRATE
|
||||
|
||||
def openPort(self): # noqa: N802
|
||||
return True
|
||||
|
||||
def closePort(self): # noqa: N802
|
||||
pass
|
||||
|
||||
def setPacketTimeoutMillis(self, timeout_ms): # noqa: N802
|
||||
del timeout_ms # unused
|
||||
|
||||
def getBaudRate(self): # noqa: N802
|
||||
return self.baudrate
|
||||
|
||||
def setBaudRate(self, baudrate): # noqa: N802
|
||||
self.baudrate = baudrate
|
||||
|
||||
|
||||
class PacketHandler:
|
||||
def __init__(self, protocol_version):
|
||||
del protocol_version # unused
|
||||
# Use packet_handler.data to communicate across Read and Write
|
||||
self.data = {}
|
||||
|
||||
|
||||
class GroupSyncRead:
|
||||
def __init__(self, port_handler, packet_handler, address, bytes):
|
||||
self.packet_handler = packet_handler
|
||||
|
||||
def addParam(self, motor_index): # noqa: N802
|
||||
# Initialize motor default values
|
||||
if motor_index not in self.packet_handler.data:
|
||||
self.packet_handler.data[motor_index] = get_default_motor_values(motor_index)
|
||||
|
||||
def txRxPacket(self): # noqa: N802
|
||||
return COMM_SUCCESS
|
||||
|
||||
def getData(self, index, address, bytes): # noqa: N802
|
||||
return self.packet_handler.data[index][address]
|
||||
|
||||
|
||||
class GroupSyncWrite:
|
||||
def __init__(self, port_handler, packet_handler, address, bytes):
|
||||
self.packet_handler = packet_handler
|
||||
self.address = address
|
||||
|
||||
def addParam(self, index, data): # noqa: N802
|
||||
# Initialize motor default values
|
||||
if index not in self.packet_handler.data:
|
||||
self.packet_handler.data[index] = get_default_motor_values(index)
|
||||
self.changeParam(index, data)
|
||||
|
||||
def txPacket(self): # noqa: N802
|
||||
return COMM_SUCCESS
|
||||
|
||||
def changeParam(self, index, data): # noqa: N802
|
||||
self.packet_handler.data[index][self.address] = data
|
||||
@@ -1,125 +0,0 @@
|
||||
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
"""Mocked classes and functions from dynamixel_sdk to allow for continuous integration
|
||||
and testing code logic that requires hardware and devices (e.g. robot arms, cameras)
|
||||
|
||||
Warning: These mocked versions are minimalist. They do not exactly mock every behaviors
|
||||
from the original classes and functions (e.g. return types might be None instead of boolean).
|
||||
"""
|
||||
|
||||
# from dynamixel_sdk import COMM_SUCCESS
|
||||
|
||||
DEFAULT_BAUDRATE = 1_000_000
|
||||
COMM_SUCCESS = 0 # tx or rx packet communication success
|
||||
|
||||
|
||||
def convert_to_bytes(value, bytes):
|
||||
# TODO(rcadene): remove need to mock `convert_to_bytes` by implemented the inverse transform
|
||||
# `convert_bytes_to_value`
|
||||
del bytes # unused
|
||||
return value
|
||||
|
||||
|
||||
def get_default_motor_values(motor_index):
|
||||
return {
|
||||
# Key (int) are from SCS_SERIES_CONTROL_TABLE
|
||||
5: motor_index, # ID
|
||||
6: DEFAULT_BAUDRATE, # Baud_rate
|
||||
10: 0, # Drive_Mode
|
||||
21: 32, # P_Coefficient
|
||||
22: 32, # D_Coefficient
|
||||
23: 0, # I_Coefficient
|
||||
40: 0, # Torque_Enable
|
||||
41: 254, # Acceleration
|
||||
31: -2047, # Offset
|
||||
33: 0, # Mode
|
||||
55: 1, # Lock
|
||||
# Set 2560 since calibration values for Aloha gripper is between start_pos=2499 and end_pos=3144
|
||||
# For other joints, 2560 will be autocorrected to be in calibration range
|
||||
56: 2560, # Present_Position
|
||||
58: 0, # Present_Speed
|
||||
69: 0, # Present_Current
|
||||
85: 150, # Maximum_Acceleration
|
||||
}
|
||||
|
||||
|
||||
class PortHandler:
|
||||
def __init__(self, port):
|
||||
self.port = port
|
||||
# factory default baudrate
|
||||
self.baudrate = DEFAULT_BAUDRATE
|
||||
self.ser = SerialMock()
|
||||
|
||||
def openPort(self): # noqa: N802
|
||||
return True
|
||||
|
||||
def closePort(self): # noqa: N802
|
||||
pass
|
||||
|
||||
def setPacketTimeoutMillis(self, timeout_ms): # noqa: N802
|
||||
del timeout_ms # unused
|
||||
|
||||
def getBaudRate(self): # noqa: N802
|
||||
return self.baudrate
|
||||
|
||||
def setBaudRate(self, baudrate): # noqa: N802
|
||||
self.baudrate = baudrate
|
||||
|
||||
|
||||
class PacketHandler:
|
||||
def __init__(self, protocol_version):
|
||||
del protocol_version # unused
|
||||
# Use packet_handler.data to communicate across Read and Write
|
||||
self.data = {}
|
||||
|
||||
|
||||
class GroupSyncRead:
|
||||
def __init__(self, port_handler, packet_handler, address, bytes):
|
||||
self.packet_handler = packet_handler
|
||||
|
||||
def addParam(self, motor_index): # noqa: N802
|
||||
# Initialize motor default values
|
||||
if motor_index not in self.packet_handler.data:
|
||||
self.packet_handler.data[motor_index] = get_default_motor_values(motor_index)
|
||||
|
||||
def txRxPacket(self): # noqa: N802
|
||||
return COMM_SUCCESS
|
||||
|
||||
def getData(self, index, address, bytes): # noqa: N802
|
||||
return self.packet_handler.data[index][address]
|
||||
|
||||
|
||||
class GroupSyncWrite:
|
||||
def __init__(self, port_handler, packet_handler, address, bytes):
|
||||
self.packet_handler = packet_handler
|
||||
self.address = address
|
||||
|
||||
def addParam(self, index, data): # noqa: N802
|
||||
if index not in self.packet_handler.data:
|
||||
self.packet_handler.data[index] = get_default_motor_values(index)
|
||||
self.changeParam(index, data)
|
||||
|
||||
def txPacket(self): # noqa: N802
|
||||
return COMM_SUCCESS
|
||||
|
||||
def changeParam(self, index, data): # noqa: N802
|
||||
self.packet_handler.data[index][self.address] = data
|
||||
|
||||
|
||||
class SerialMock:
|
||||
def reset_output_buffer(self):
|
||||
pass
|
||||
|
||||
def reset_input_buffer(self):
|
||||
pass
|
||||
@@ -48,7 +48,7 @@ def find_cameras(raise_when_empty=True, mock=False) -> list[dict]:
|
||||
connected to the computer.
|
||||
"""
|
||||
if mock:
|
||||
import lerobot.common.mocks.cameras.mock_pyrealsense2 as rs
|
||||
import tests.cameras.mock_pyrealsense2 as rs
|
||||
else:
|
||||
import pyrealsense2 as rs
|
||||
|
||||
@@ -100,7 +100,7 @@ def save_images_from_cameras(
|
||||
serial_numbers = [cam["serial_number"] for cam in camera_infos]
|
||||
|
||||
if mock:
|
||||
import lerobot.common.mocks.cameras.mock_cv2 as cv2
|
||||
import tests.cameras.mock_cv2 as cv2
|
||||
else:
|
||||
import cv2
|
||||
|
||||
@@ -253,7 +253,7 @@ class IntelRealSenseCamera:
|
||||
self.logs = {}
|
||||
|
||||
if self.mock:
|
||||
import lerobot.common.mocks.cameras.mock_cv2 as cv2
|
||||
import tests.cameras.mock_cv2 as cv2
|
||||
else:
|
||||
import cv2
|
||||
|
||||
@@ -287,7 +287,7 @@ class IntelRealSenseCamera:
|
||||
)
|
||||
|
||||
if self.mock:
|
||||
import lerobot.common.mocks.cameras.mock_pyrealsense2 as rs
|
||||
import tests.cameras.mock_pyrealsense2 as rs
|
||||
else:
|
||||
import pyrealsense2 as rs
|
||||
|
||||
@@ -375,7 +375,7 @@ class IntelRealSenseCamera:
|
||||
)
|
||||
|
||||
if self.mock:
|
||||
import lerobot.common.mocks.cameras.mock_cv2 as cv2
|
||||
import tests.cameras.mock_cv2 as cv2
|
||||
else:
|
||||
import cv2
|
||||
|
||||
@@ -512,13 +512,13 @@ if __name__ == "__main__":
|
||||
)
|
||||
parser.add_argument(
|
||||
"--width",
|
||||
type=str,
|
||||
type=int,
|
||||
default=640,
|
||||
help="Set the width for all cameras. If not provided, use the default width of each camera.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--height",
|
||||
type=str,
|
||||
type=int,
|
||||
default=480,
|
||||
help="Set the height for all cameras. If not provided, use the default height of each camera.",
|
||||
)
|
||||
|
||||
@@ -80,7 +80,7 @@ def _find_cameras(
|
||||
possible_camera_ids: list[int | str], raise_when_empty=False, mock=False
|
||||
) -> list[int | str]:
|
||||
if mock:
|
||||
import lerobot.common.mocks.cameras.mock_cv2 as cv2
|
||||
import tests.cameras.mock_cv2 as cv2
|
||||
else:
|
||||
import cv2
|
||||
|
||||
@@ -269,7 +269,7 @@ class OpenCVCamera:
|
||||
self.logs = {}
|
||||
|
||||
if self.mock:
|
||||
import lerobot.common.mocks.cameras.mock_cv2 as cv2
|
||||
import tests.cameras.mock_cv2 as cv2
|
||||
else:
|
||||
import cv2
|
||||
|
||||
@@ -286,7 +286,7 @@ class OpenCVCamera:
|
||||
raise RobotDeviceAlreadyConnectedError(f"OpenCVCamera({self.camera_index}) is already connected.")
|
||||
|
||||
if self.mock:
|
||||
import lerobot.common.mocks.cameras.mock_cv2 as cv2
|
||||
import tests.cameras.mock_cv2 as cv2
|
||||
else:
|
||||
import cv2
|
||||
|
||||
@@ -398,7 +398,7 @@ class OpenCVCamera:
|
||||
# so we convert the image color from BGR to RGB.
|
||||
if requested_color_mode == "rgb":
|
||||
if self.mock:
|
||||
import lerobot.common.mocks.cameras.mock_cv2 as cv2
|
||||
import tests.cameras.mock_cv2 as cv2
|
||||
else:
|
||||
import cv2
|
||||
|
||||
@@ -492,13 +492,13 @@ if __name__ == "__main__":
|
||||
)
|
||||
parser.add_argument(
|
||||
"--width",
|
||||
type=str,
|
||||
type=int,
|
||||
default=None,
|
||||
help="Set the width for all cameras. If not provided, use the default width of each camera.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--height",
|
||||
type=str,
|
||||
type=int,
|
||||
default=None,
|
||||
help="Set the height for all cameras. If not provided, use the default height of each camera.",
|
||||
)
|
||||
|
||||
@@ -332,7 +332,7 @@ class DynamixelMotorsBus:
|
||||
)
|
||||
|
||||
if self.mock:
|
||||
import lerobot.common.mocks.motors.mock_dynamixel_sdk as dxl
|
||||
import tests.motors.mock_dynamixel_sdk as dxl
|
||||
else:
|
||||
import dynamixel_sdk as dxl
|
||||
|
||||
@@ -356,7 +356,7 @@ class DynamixelMotorsBus:
|
||||
|
||||
def reconnect(self):
|
||||
if self.mock:
|
||||
import lerobot.common.mocks.motors.mock_dynamixel_sdk as dxl
|
||||
import tests.motors.mock_dynamixel_sdk as dxl
|
||||
else:
|
||||
import dynamixel_sdk as dxl
|
||||
|
||||
@@ -646,7 +646,7 @@ class DynamixelMotorsBus:
|
||||
|
||||
def read_with_motor_ids(self, motor_models, motor_ids, data_name, num_retry=NUM_READ_RETRY):
|
||||
if self.mock:
|
||||
import lerobot.common.mocks.motors.mock_dynamixel_sdk as dxl
|
||||
import tests.motors.mock_dynamixel_sdk as dxl
|
||||
else:
|
||||
import dynamixel_sdk as dxl
|
||||
|
||||
@@ -691,7 +691,7 @@ class DynamixelMotorsBus:
|
||||
start_time = time.perf_counter()
|
||||
|
||||
if self.mock:
|
||||
import lerobot.common.mocks.motors.mock_dynamixel_sdk as dxl
|
||||
import tests.motors.mock_dynamixel_sdk as dxl
|
||||
else:
|
||||
import dynamixel_sdk as dxl
|
||||
|
||||
@@ -757,7 +757,7 @@ class DynamixelMotorsBus:
|
||||
|
||||
def write_with_motor_ids(self, motor_models, motor_ids, data_name, values, num_retry=NUM_WRITE_RETRY):
|
||||
if self.mock:
|
||||
import lerobot.common.mocks.motors.mock_dynamixel_sdk as dxl
|
||||
import tests.motors.mock_dynamixel_sdk as dxl
|
||||
else:
|
||||
import dynamixel_sdk as dxl
|
||||
|
||||
@@ -793,7 +793,7 @@ class DynamixelMotorsBus:
|
||||
start_time = time.perf_counter()
|
||||
|
||||
if self.mock:
|
||||
import lerobot.common.mocks.motors.mock_dynamixel_sdk as dxl
|
||||
import tests.motors.mock_dynamixel_sdk as dxl
|
||||
else:
|
||||
import dynamixel_sdk as dxl
|
||||
|
||||
|
||||
@@ -313,7 +313,7 @@ class FeetechMotorsBus:
|
||||
)
|
||||
|
||||
if self.mock:
|
||||
import lerobot.common.mocks.motors.mock_scservo_sdk as scs
|
||||
import tests.motors.mock_scservo_sdk as scs
|
||||
else:
|
||||
import scservo_sdk as scs
|
||||
|
||||
@@ -337,7 +337,7 @@ class FeetechMotorsBus:
|
||||
|
||||
def reconnect(self):
|
||||
if self.mock:
|
||||
import lerobot.common.mocks.motors.mock_scservo_sdk as scs
|
||||
import tests.motors.mock_scservo_sdk as scs
|
||||
else:
|
||||
import scservo_sdk as scs
|
||||
|
||||
@@ -664,7 +664,7 @@ class FeetechMotorsBus:
|
||||
|
||||
def read_with_motor_ids(self, motor_models, motor_ids, data_name, num_retry=NUM_READ_RETRY):
|
||||
if self.mock:
|
||||
import lerobot.common.mocks.motors.mock_scservo_sdk as scs
|
||||
import tests.motors.mock_scservo_sdk as scs
|
||||
else:
|
||||
import scservo_sdk as scs
|
||||
|
||||
@@ -702,7 +702,7 @@ class FeetechMotorsBus:
|
||||
|
||||
def read(self, data_name, motor_names: str | list[str] | None = None):
|
||||
if self.mock:
|
||||
import lerobot.common.mocks.motors.mock_scservo_sdk as scs
|
||||
import tests.motors.mock_scservo_sdk as scs
|
||||
else:
|
||||
import scservo_sdk as scs
|
||||
|
||||
@@ -782,7 +782,7 @@ class FeetechMotorsBus:
|
||||
|
||||
def write_with_motor_ids(self, motor_models, motor_ids, data_name, values, num_retry=NUM_WRITE_RETRY):
|
||||
if self.mock:
|
||||
import lerobot.common.mocks.motors.mock_scservo_sdk as scs
|
||||
import tests.motors.mock_scservo_sdk as scs
|
||||
else:
|
||||
import scservo_sdk as scs
|
||||
|
||||
@@ -818,7 +818,7 @@ class FeetechMotorsBus:
|
||||
start_time = time.perf_counter()
|
||||
|
||||
if self.mock:
|
||||
import lerobot.common.mocks.motors.mock_scservo_sdk as scs
|
||||
import tests.motors.mock_scservo_sdk as scs
|
||||
else:
|
||||
import scservo_sdk as scs
|
||||
|
||||
|
||||
@@ -443,7 +443,7 @@ class So100RobotConfig(ManipulatorRobotConfig):
|
||||
leader_arms: dict[str, MotorsBusConfig] = field(
|
||||
default_factory=lambda: {
|
||||
"main": FeetechMotorsBusConfig(
|
||||
port="/dev/tty.usbmodem58760429101",
|
||||
port="/dev/tty.usbmodem58760431091",
|
||||
motors={
|
||||
# name: (index, model)
|
||||
"shoulder_pan": [1, "sts3215"],
|
||||
@@ -460,7 +460,7 @@ class So100RobotConfig(ManipulatorRobotConfig):
|
||||
follower_arms: dict[str, MotorsBusConfig] = field(
|
||||
default_factory=lambda: {
|
||||
"main": FeetechMotorsBusConfig(
|
||||
port="/dev/tty.usbmodem58760435821",
|
||||
port="/dev/tty.usbmodem585A0076891",
|
||||
motors={
|
||||
# name: (index, model)
|
||||
"shoulder_pan": [1, "sts3215"],
|
||||
|
||||
@@ -1,53 +0,0 @@
|
||||
// fmt: off
|
||||
// flake8: noqa
|
||||
// !/usr/bin/env python
|
||||
|
||||
// Copyright 2024 The HuggingFace Inc. team.
|
||||
// All rights reserved.
|
||||
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
syntax = "proto3";
|
||||
|
||||
package async_inference;
|
||||
|
||||
// AsyncInference: from Robot perspective
|
||||
// Robot send observations to & executes action received from a remote Policy server
|
||||
service AsyncInference {
|
||||
// Robot -> Policy to share observations with a remote inference server
|
||||
// Policy -> Robot to share actions predicted for given observations
|
||||
rpc SendObservations(stream Observation) returns (Empty);
|
||||
rpc StreamActions(Empty) returns (stream Action);
|
||||
rpc Ready(Empty) returns (Empty);
|
||||
}
|
||||
|
||||
enum TransferState {
|
||||
TRANSFER_UNKNOWN = 0;
|
||||
TRANSFER_BEGIN = 1;
|
||||
TRANSFER_MIDDLE = 2;
|
||||
TRANSFER_END = 3;
|
||||
}
|
||||
|
||||
// Messages
|
||||
message Observation {
|
||||
// sent by Robot, to remote Policy
|
||||
TransferState transfer_state = 1;
|
||||
bytes data = 2;
|
||||
}
|
||||
|
||||
message Action {
|
||||
// sent by remote Policy, to Robot
|
||||
TransferState transfer_state = 1;
|
||||
bytes data = 2;
|
||||
}
|
||||
|
||||
message Empty {}
|
||||
@@ -1,46 +0,0 @@
|
||||
# fmt: off
|
||||
# flake8: noqa
|
||||
# -*- coding: utf-8 -*-
|
||||
# Generated by the protocol buffer compiler. DO NOT EDIT!
|
||||
# NO CHECKED-IN PROTOBUF GENCODE
|
||||
# source: async_inference.proto
|
||||
# Protobuf Python Version: 5.29.0
|
||||
"""Generated protocol buffer code."""
|
||||
from google.protobuf import descriptor as _descriptor
|
||||
from google.protobuf import descriptor_pool as _descriptor_pool
|
||||
from google.protobuf import runtime_version as _runtime_version
|
||||
from google.protobuf import symbol_database as _symbol_database
|
||||
from google.protobuf.internal import builder as _builder
|
||||
_runtime_version.ValidateProtobufRuntimeVersion(
|
||||
_runtime_version.Domain.PUBLIC,
|
||||
5,
|
||||
29,
|
||||
0,
|
||||
'',
|
||||
'async_inference.proto'
|
||||
)
|
||||
# @@protoc_insertion_point(imports)
|
||||
|
||||
_sym_db = _symbol_database.Default()
|
||||
|
||||
|
||||
|
||||
|
||||
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x15\x61sync_inference.proto\x12\x0f\x61sync_inference\"S\n\x0bObservation\x12\x36\n\x0etransfer_state\x18\x01 \x01(\x0e\x32\x1e.async_inference.TransferState\x12\x0c\n\x04\x64\x61ta\x18\x02 \x01(\x0c\"N\n\x06\x41\x63tion\x12\x36\n\x0etransfer_state\x18\x01 \x01(\x0e\x32\x1e.async_inference.TransferState\x12\x0c\n\x04\x64\x61ta\x18\x02 \x01(\x0c\"\x07\n\x05\x45mpty*`\n\rTransferState\x12\x14\n\x10TRANSFER_UNKNOWN\x10\x00\x12\x12\n\x0eTRANSFER_BEGIN\x10\x01\x12\x13\n\x0fTRANSFER_MIDDLE\x10\x02\x12\x10\n\x0cTRANSFER_END\x10\x03\x32\xd9\x01\n\x0e\x41syncInference\x12J\n\x10SendObservations\x12\x1c.async_inference.Observation\x1a\x16.async_inference.Empty(\x01\x12\x42\n\rStreamActions\x12\x16.async_inference.Empty\x1a\x17.async_inference.Action0\x01\x12\x37\n\x05Ready\x12\x16.async_inference.Empty\x1a\x16.async_inference.Emptyb\x06proto3')
|
||||
|
||||
_globals = globals()
|
||||
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals)
|
||||
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'async_inference_pb2', _globals)
|
||||
if not _descriptor._USE_C_DESCRIPTORS:
|
||||
DESCRIPTOR._loaded_options = None
|
||||
_globals['_TRANSFERSTATE']._serialized_start=216
|
||||
_globals['_TRANSFERSTATE']._serialized_end=312
|
||||
_globals['_OBSERVATION']._serialized_start=42
|
||||
_globals['_OBSERVATION']._serialized_end=125
|
||||
_globals['_ACTION']._serialized_start=127
|
||||
_globals['_ACTION']._serialized_end=205
|
||||
_globals['_EMPTY']._serialized_start=207
|
||||
_globals['_EMPTY']._serialized_end=214
|
||||
_globals['_ASYNCINFERENCE']._serialized_start=315
|
||||
_globals['_ASYNCINFERENCE']._serialized_end=532
|
||||
# @@protoc_insertion_point(module_scope)
|
||||
@@ -1,193 +0,0 @@
|
||||
# fmt: off
|
||||
# flake8: noqa
|
||||
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
|
||||
"""Client and server classes corresponding to protobuf-defined services."""
|
||||
import grpc
|
||||
import warnings
|
||||
|
||||
import async_inference_pb2 as async__inference__pb2
|
||||
|
||||
GRPC_GENERATED_VERSION = '1.71.0'
|
||||
GRPC_VERSION = grpc.__version__
|
||||
_version_not_supported = False
|
||||
|
||||
try:
|
||||
from grpc._utilities import first_version_is_lower
|
||||
_version_not_supported = first_version_is_lower(GRPC_VERSION, GRPC_GENERATED_VERSION)
|
||||
except ImportError:
|
||||
_version_not_supported = True
|
||||
|
||||
if _version_not_supported:
|
||||
raise RuntimeError(
|
||||
f'The grpc package installed is at version {GRPC_VERSION},'
|
||||
+ f' but the generated code in async_inference_pb2_grpc.py depends on'
|
||||
+ f' grpcio>={GRPC_GENERATED_VERSION}.'
|
||||
+ f' Please upgrade your grpc module to grpcio>={GRPC_GENERATED_VERSION}'
|
||||
+ f' or downgrade your generated code using grpcio-tools<={GRPC_VERSION}.'
|
||||
)
|
||||
|
||||
|
||||
class AsyncInferenceStub:
|
||||
"""AsyncInference: from Robot perspective
|
||||
Robot send observations to & executes action received from a remote Policy server
|
||||
"""
|
||||
|
||||
def __init__(self, channel):
|
||||
"""Constructor.
|
||||
|
||||
Args:
|
||||
channel: A grpc.Channel.
|
||||
"""
|
||||
self.SendObservations = channel.stream_unary(
|
||||
'/async_inference.AsyncInference/SendObservations',
|
||||
request_serializer=async__inference__pb2.Observation.SerializeToString,
|
||||
response_deserializer=async__inference__pb2.Empty.FromString,
|
||||
_registered_method=True)
|
||||
self.StreamActions = channel.unary_stream(
|
||||
'/async_inference.AsyncInference/StreamActions',
|
||||
request_serializer=async__inference__pb2.Empty.SerializeToString,
|
||||
response_deserializer=async__inference__pb2.Action.FromString,
|
||||
_registered_method=True)
|
||||
self.Ready = channel.unary_unary(
|
||||
'/async_inference.AsyncInference/Ready',
|
||||
request_serializer=async__inference__pb2.Empty.SerializeToString,
|
||||
response_deserializer=async__inference__pb2.Empty.FromString,
|
||||
_registered_method=True)
|
||||
|
||||
|
||||
class AsyncInferenceServicer:
|
||||
"""AsyncInference: from Robot perspective
|
||||
Robot send observations to & executes action received from a remote Policy server
|
||||
"""
|
||||
|
||||
def SendObservations(self, request_iterator, context):
|
||||
"""Robot -> Policy to share observations with a remote inference server
|
||||
Policy -> Robot to share actions predicted for given observations
|
||||
"""
|
||||
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
|
||||
context.set_details('Method not implemented!')
|
||||
raise NotImplementedError('Method not implemented!')
|
||||
|
||||
def StreamActions(self, request, context):
|
||||
"""Missing associated documentation comment in .proto file."""
|
||||
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
|
||||
context.set_details('Method not implemented!')
|
||||
raise NotImplementedError('Method not implemented!')
|
||||
|
||||
def Ready(self, request, context):
|
||||
"""Missing associated documentation comment in .proto file."""
|
||||
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
|
||||
context.set_details('Method not implemented!')
|
||||
raise NotImplementedError('Method not implemented!')
|
||||
|
||||
|
||||
def add_AsyncInferenceServicer_to_server(servicer, server):
|
||||
rpc_method_handlers = {
|
||||
'SendObservations': grpc.stream_unary_rpc_method_handler(
|
||||
servicer.SendObservations,
|
||||
request_deserializer=async__inference__pb2.Observation.FromString,
|
||||
response_serializer=async__inference__pb2.Empty.SerializeToString,
|
||||
),
|
||||
'StreamActions': grpc.unary_stream_rpc_method_handler(
|
||||
servicer.StreamActions,
|
||||
request_deserializer=async__inference__pb2.Empty.FromString,
|
||||
response_serializer=async__inference__pb2.Action.SerializeToString,
|
||||
),
|
||||
'Ready': grpc.unary_unary_rpc_method_handler(
|
||||
servicer.Ready,
|
||||
request_deserializer=async__inference__pb2.Empty.FromString,
|
||||
response_serializer=async__inference__pb2.Empty.SerializeToString,
|
||||
),
|
||||
}
|
||||
generic_handler = grpc.method_handlers_generic_handler(
|
||||
'async_inference.AsyncInference', rpc_method_handlers)
|
||||
server.add_generic_rpc_handlers((generic_handler,))
|
||||
server.add_registered_method_handlers('async_inference.AsyncInference', rpc_method_handlers)
|
||||
|
||||
|
||||
# This class is part of an EXPERIMENTAL API.
|
||||
class AsyncInference:
|
||||
"""AsyncInference: from Robot perspective
|
||||
Robot send observations to & executes action received from a remote Policy server
|
||||
"""
|
||||
|
||||
@staticmethod
|
||||
def SendObservations(request_iterator,
|
||||
target,
|
||||
options=(),
|
||||
channel_credentials=None,
|
||||
call_credentials=None,
|
||||
insecure=False,
|
||||
compression=None,
|
||||
wait_for_ready=None,
|
||||
timeout=None,
|
||||
metadata=None):
|
||||
return grpc.experimental.stream_unary(
|
||||
request_iterator,
|
||||
target,
|
||||
'/async_inference.AsyncInference/SendObservations',
|
||||
async__inference__pb2.Observation.SerializeToString,
|
||||
async__inference__pb2.Empty.FromString,
|
||||
options,
|
||||
channel_credentials,
|
||||
insecure,
|
||||
call_credentials,
|
||||
compression,
|
||||
wait_for_ready,
|
||||
timeout,
|
||||
metadata,
|
||||
_registered_method=True)
|
||||
|
||||
@staticmethod
|
||||
def StreamActions(request,
|
||||
target,
|
||||
options=(),
|
||||
channel_credentials=None,
|
||||
call_credentials=None,
|
||||
insecure=False,
|
||||
compression=None,
|
||||
wait_for_ready=None,
|
||||
timeout=None,
|
||||
metadata=None):
|
||||
return grpc.experimental.unary_stream(
|
||||
request,
|
||||
target,
|
||||
'/async_inference.AsyncInference/StreamActions',
|
||||
async__inference__pb2.Empty.SerializeToString,
|
||||
async__inference__pb2.Action.FromString,
|
||||
options,
|
||||
channel_credentials,
|
||||
insecure,
|
||||
call_credentials,
|
||||
compression,
|
||||
wait_for_ready,
|
||||
timeout,
|
||||
metadata,
|
||||
_registered_method=True)
|
||||
|
||||
@staticmethod
|
||||
def Ready(request,
|
||||
target,
|
||||
options=(),
|
||||
channel_credentials=None,
|
||||
call_credentials=None,
|
||||
insecure=False,
|
||||
compression=None,
|
||||
wait_for_ready=None,
|
||||
timeout=None,
|
||||
metadata=None):
|
||||
return grpc.experimental.unary_unary(
|
||||
request,
|
||||
target,
|
||||
'/async_inference.AsyncInference/Ready',
|
||||
async__inference__pb2.Empty.SerializeToString,
|
||||
async__inference__pb2.Empty.FromString,
|
||||
options,
|
||||
channel_credentials,
|
||||
insecure,
|
||||
call_credentials,
|
||||
compression,
|
||||
wait_for_ready,
|
||||
timeout,
|
||||
metadata,
|
||||
_registered_method=True)
|
||||
@@ -1,199 +0,0 @@
|
||||
import itertools
|
||||
import pickle # nosec
|
||||
import time
|
||||
from concurrent import futures
|
||||
from queue import Queue
|
||||
from typing import Generator, List, Optional
|
||||
|
||||
import async_inference_pb2 # type: ignore
|
||||
import async_inference_pb2_grpc # type: ignore
|
||||
import grpc
|
||||
import torch
|
||||
from datasets import load_dataset
|
||||
|
||||
from lerobot.common.policies.act.modeling_act import ACTPolicy
|
||||
from lerobot.scripts.server.robot_client import TimedAction, TimedObservation, environment_dt
|
||||
|
||||
inference_latency = 1 / 3
|
||||
idle_wait = 0.1
|
||||
|
||||
|
||||
class PolicyServer(async_inference_pb2_grpc.AsyncInferenceServicer):
|
||||
def __init__(self):
|
||||
# TODO: Add device specification for policy inference at init
|
||||
self.device = "mps"
|
||||
start = time.time()
|
||||
self.policy = ACTPolicy.from_pretrained("fracapuano/act_so100_test")
|
||||
self.policy.to(self.device)
|
||||
end = time.time()
|
||||
print(f"Time taken to put policy on {self.device}: {end - start} seconds")
|
||||
|
||||
# Initialize dataset action generator
|
||||
self.action_generator = itertools.cycle(self._stream_action_chunks_from_dataset())
|
||||
|
||||
self._setup_server()
|
||||
|
||||
self.actions_per_chunk = 20
|
||||
self.actions_overlap = 10
|
||||
|
||||
def _setup_server(self) -> None:
|
||||
"""Flushes server state when new client connects."""
|
||||
# only running inference on the latest observation received by the server
|
||||
self.observation_queue = Queue(maxsize=1)
|
||||
|
||||
def Ready(self, request, context): # noqa: N802
|
||||
self._setup_server()
|
||||
print("Client connected and ready")
|
||||
|
||||
return async_inference_pb2.Empty()
|
||||
|
||||
def SendObservations(self, request_iterator, context): # noqa: N802
|
||||
"""Receive observations from the robot client"""
|
||||
# client_id = context.peer()
|
||||
# print(f"Receiving observations from {client_id}")
|
||||
|
||||
for observation in request_iterator:
|
||||
timed_observation = pickle.loads(observation.data) # nosec
|
||||
|
||||
# If queue is full, get the old observation to make room
|
||||
if self.observation_queue.full():
|
||||
# pops from queue
|
||||
_ = self.observation_queue.get_nowait()
|
||||
|
||||
# Now put the new observation (never blocks as queue is non-full here)
|
||||
self.observation_queue.put(timed_observation)
|
||||
print("Received observation no: ", timed_observation.get_timestep())
|
||||
|
||||
return async_inference_pb2.Empty()
|
||||
|
||||
def StreamActions(self, request, context): # noqa: N802
|
||||
"""Stream actions to the robot client"""
|
||||
# client_id = context.peer()
|
||||
# print(f"Client {client_id} connected for action streaming")
|
||||
|
||||
# Generate action based on the most recent observation and its timestep
|
||||
obs = self.observation_queue.get()
|
||||
print("Running inference for timestep: ", obs.get_timestep())
|
||||
|
||||
if obs:
|
||||
yield self._predict_action_chunk(obs)
|
||||
|
||||
else:
|
||||
print("No observation in queue yet!")
|
||||
time.sleep(idle_wait)
|
||||
|
||||
return async_inference_pb2.Empty()
|
||||
|
||||
def _time_action_chunk(self, t_0: float, action_chunk: list[torch.Tensor], i_0: int) -> list[TimedAction]:
|
||||
"""Turn a chunk of actions into a list of TimedAction instances,
|
||||
with the first action corresponding to t_0 and the rest corresponding to
|
||||
t_0 + i*environment_dt for i in range(len(action_chunk))
|
||||
"""
|
||||
return [
|
||||
TimedAction(t_0 + i * environment_dt, action, i_0 + i) for i, action in enumerate(action_chunk)
|
||||
]
|
||||
|
||||
@torch.no_grad()
|
||||
def _predict_action_chunk(self, observation_t: TimedObservation) -> list[TimedAction]:
|
||||
"""Predict an action based on the observation"""
|
||||
self.policy.eval()
|
||||
|
||||
observation = {}
|
||||
for k, v in observation_t.get_observation().items():
|
||||
if "image" in k:
|
||||
observation[k] = v.permute(2, 0, 1).unsqueeze(0).to(self.device)
|
||||
else:
|
||||
observation[k] = v.unsqueeze(0).to(self.device)
|
||||
|
||||
# Remove batch dimension
|
||||
action_tensor = self.policy.select_action(observation).squeeze(0)
|
||||
|
||||
if action_tensor.dim() == 1:
|
||||
# No chunk dimension, so repeat action to create a (dummy) chunk of actions
|
||||
action_tensor = action_tensor.cpu().repeat(self.actions_per_chunk, 1)
|
||||
|
||||
action_chunk = self._time_action_chunk(
|
||||
observation_t.get_timestamp(), list(action_tensor), observation_t.get_timestep()
|
||||
)
|
||||
|
||||
action_bytes = pickle.dumps(action_chunk) # nosec
|
||||
# Create and return the Action message
|
||||
action = async_inference_pb2.Action(transfer_state=observation_t.transfer_state, data=action_bytes)
|
||||
|
||||
time.sleep(inference_latency) # slow action generation, emulates inference time (ACT is very fast)
|
||||
|
||||
return action
|
||||
|
||||
def _stream_action_chunks_from_dataset(self) -> Generator[List[torch.Tensor], None, None]:
|
||||
"""Stream chunks of actions from a prerecorded dataset.
|
||||
|
||||
Returns:
|
||||
Generator that yields chunks of actions from the dataset
|
||||
"""
|
||||
dataset = load_dataset("fracapuano/so100_test", split="train").with_format("torch")
|
||||
|
||||
# 1. Select the action column only, where you will find tensors with 6 elements
|
||||
actions = dataset["action"]
|
||||
action_indices = torch.arange(len(actions))
|
||||
|
||||
# 2. Chunk the iterable of tensors into chunks with 10 elements each
|
||||
# sending only first element for debugging
|
||||
indices_chunks = action_indices.unfold(
|
||||
0, self.actions_per_chunk, self.actions_per_chunk - self.actions_overlap
|
||||
)
|
||||
|
||||
for idx_chunk in indices_chunks:
|
||||
yield actions[idx_chunk[0] : idx_chunk[-1] + 1, :]
|
||||
|
||||
def _read_action_chunk(self, observation: Optional[TimedObservation] = None):
|
||||
"""Dummy function for predicting action chunk given observation.
|
||||
|
||||
Instead of computing actions on-the-fly, this method streams
|
||||
actions from a prerecorded dataset.
|
||||
"""
|
||||
import warnings
|
||||
|
||||
warnings.warn(
|
||||
"This method is deprecated and will be removed in the future.", DeprecationWarning, stacklevel=2
|
||||
)
|
||||
|
||||
if not observation:
|
||||
observation = TimedObservation(timestamp=time.time(), observation={}, timestep=0)
|
||||
transfer_state = 0
|
||||
else:
|
||||
transfer_state = observation.transfer_state
|
||||
|
||||
# Get chunk of actions from the generator
|
||||
actions_chunk = next(self.action_generator)
|
||||
|
||||
# Return a list of TimedActions, with timestamps starting from the observation timestamp
|
||||
action_data = self._time_action_chunk(
|
||||
observation.get_timestamp(), actions_chunk, observation.get_timestep()
|
||||
)
|
||||
action_bytes = pickle.dumps(action_data) # nosec
|
||||
|
||||
# Create and return the Action message
|
||||
action = async_inference_pb2.Action(transfer_state=transfer_state, data=action_bytes)
|
||||
|
||||
time.sleep(inference_latency) # slow action generation, emulates inference time
|
||||
|
||||
return action
|
||||
|
||||
|
||||
def serve():
|
||||
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
|
||||
async_inference_pb2_grpc.add_AsyncInferenceServicer_to_server(PolicyServer(), server)
|
||||
server.add_insecure_port("[::]:50051")
|
||||
server.start()
|
||||
print("PolicyServer started on port 50051")
|
||||
|
||||
try:
|
||||
while True:
|
||||
time.sleep(86400) # Sleep for a day, or until interrupted
|
||||
except KeyboardInterrupt:
|
||||
server.stop(0)
|
||||
print("Server stopped")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
serve()
|
||||
@@ -1,357 +0,0 @@
|
||||
import pickle # nosec
|
||||
import threading
|
||||
import time
|
||||
from queue import Empty, Queue
|
||||
from typing import Any, Optional
|
||||
|
||||
import async_inference_pb2 # type: ignore
|
||||
import async_inference_pb2_grpc # type: ignore
|
||||
import grpc
|
||||
import torch
|
||||
|
||||
from lerobot.common.robot_devices.robots.utils import make_robot
|
||||
|
||||
environment_dt = 1 / 30
|
||||
idle_wait = 0.1
|
||||
|
||||
|
||||
class TimedData:
|
||||
def __init__(self, timestamp: float, data: Any, timestep: int):
|
||||
"""Initialize a TimedData object.
|
||||
|
||||
Args:
|
||||
timestamp: Unix timestamp relative to data's creation.
|
||||
data: The actual data to wrap a timestamp around.
|
||||
"""
|
||||
self.timestamp = timestamp
|
||||
self.data = data
|
||||
self.timestep = timestep
|
||||
|
||||
def get_data(self):
|
||||
return self.data
|
||||
|
||||
def get_timestamp(self):
|
||||
return self.timestamp
|
||||
|
||||
def get_timestep(self):
|
||||
return self.timestep
|
||||
|
||||
|
||||
class TimedAction(TimedData):
|
||||
def __init__(self, timestamp: float, action: torch.Tensor, timestep: int):
|
||||
super().__init__(timestamp=timestamp, data=action, timestep=timestep)
|
||||
|
||||
def get_action(self):
|
||||
return self.get_data()
|
||||
|
||||
|
||||
class TimedObservation(TimedData):
|
||||
def __init__(
|
||||
self, timestamp: float, observation: dict[str, torch.Tensor], timestep: int, transfer_state: int = 0
|
||||
):
|
||||
super().__init__(timestamp=timestamp, data=observation, timestep=timestep)
|
||||
self.transfer_state = transfer_state
|
||||
|
||||
def get_observation(self):
|
||||
return self.get_data()
|
||||
|
||||
|
||||
class RobotClient:
|
||||
def __init__(
|
||||
self,
|
||||
# cfg: RobotConfig,
|
||||
server_address="localhost:50051",
|
||||
use_robot=True,
|
||||
):
|
||||
self.channel = grpc.insecure_channel(server_address)
|
||||
self.stub = async_inference_pb2_grpc.AsyncInferenceStub(self.channel)
|
||||
|
||||
self.running = False
|
||||
self.first_observation_sent = False
|
||||
self.latest_action = 0
|
||||
self.action_chunk_size = 20
|
||||
|
||||
self.action_queue = Queue()
|
||||
self.start_barrier = threading.Barrier(3)
|
||||
|
||||
# Create a lock for robot access
|
||||
self.robot_lock = threading.Lock()
|
||||
|
||||
self.use_robot = use_robot
|
||||
if self.use_robot:
|
||||
self.robot = make_robot("so100")
|
||||
self.robot.connect()
|
||||
|
||||
time.sleep(idle_wait) # sleep waiting for cameras to activate
|
||||
print("Robot connected")
|
||||
|
||||
self.robot_reading = True
|
||||
|
||||
def timestamps(self):
|
||||
"""Get the timestamps of the actions in the queue"""
|
||||
return sorted([action.get_timestep() for action in self.action_queue.queue])
|
||||
|
||||
def start(self):
|
||||
"""Start the robot client and connect to the policy server"""
|
||||
try:
|
||||
# client-server handshake
|
||||
self.stub.Ready(async_inference_pb2.Empty())
|
||||
print("Connected to policy server")
|
||||
|
||||
self.running = True
|
||||
return True
|
||||
|
||||
except grpc.RpcError as e:
|
||||
print(f"Failed to connect to policy server: {e}")
|
||||
return False
|
||||
|
||||
def stop(self):
|
||||
"""Stop the robot client"""
|
||||
self.running = False
|
||||
if self.use_robot and hasattr(self, "robot"):
|
||||
self.robot.disconnect()
|
||||
self.channel.close()
|
||||
|
||||
def send_observation(
|
||||
self,
|
||||
obs: TimedObservation,
|
||||
transfer_state: async_inference_pb2.TransferState = async_inference_pb2.TRANSFER_MIDDLE,
|
||||
) -> bool:
|
||||
"""Send observation to the policy server.
|
||||
Returns True if the observation was sent successfully, False otherwise."""
|
||||
if not self.running:
|
||||
print("Client not running")
|
||||
return False
|
||||
|
||||
assert isinstance(obs, TimedObservation), "Input observation needs to be a TimedObservation!"
|
||||
|
||||
observation_bytes = pickle.dumps(obs)
|
||||
observation = async_inference_pb2.Observation(transfer_state=transfer_state, data=observation_bytes)
|
||||
|
||||
try:
|
||||
_ = self.stub.SendObservations(iter([observation]))
|
||||
if transfer_state == async_inference_pb2.TRANSFER_BEGIN:
|
||||
self.first_observation_sent = True
|
||||
return True
|
||||
|
||||
except grpc.RpcError as e:
|
||||
print(f"Error sending observation: {e}")
|
||||
return False
|
||||
|
||||
def _validate_action(self, action: TimedAction):
|
||||
"""Received actions are keps only when they have been produced for now or later, never before"""
|
||||
return not action.get_timestamp() < self.latest_action
|
||||
|
||||
def _validate_action_chunk(self, actions: list[TimedAction]):
|
||||
assert len(actions) == self.action_chunk_size, (
|
||||
f"Action batch size must match action chunk!size: {len(actions)} != {self.action_chunk_size}"
|
||||
)
|
||||
assert all(self._validate_action(action) for action in actions), "Invalid action in chunk"
|
||||
|
||||
return True
|
||||
|
||||
def _inspect_action_queue(self):
|
||||
print("Queue size: ", self.action_queue.qsize())
|
||||
print("Queue contents: ", sorted([action.get_timestep() for action in self.action_queue.queue]))
|
||||
|
||||
def _clear_queue(self):
|
||||
"""Clear the existing queue"""
|
||||
while not self.action_queue.empty():
|
||||
try:
|
||||
self.action_queue.get_nowait()
|
||||
except Empty:
|
||||
break
|
||||
|
||||
def _fill_action_queue(self, actions: list[TimedAction]):
|
||||
"""Fill the action queue with incoming valid actions"""
|
||||
for action in actions:
|
||||
if self._validate_action(action):
|
||||
self.action_queue.put(action)
|
||||
|
||||
def _update_action_queue(self, actions: list[TimedAction]):
|
||||
"""Aggregate incoming actions into the action queue.
|
||||
Raises NotImplementedError as this is not implemented yet.
|
||||
|
||||
Args:
|
||||
actions: List of TimedAction instances to queue
|
||||
"""
|
||||
# TODO: Implement this
|
||||
raise NotImplementedError("Not implemented")
|
||||
|
||||
def _clear_and_fill_action_queue(self, actions: list[TimedAction]):
|
||||
"""Clear the existing queue and fill it with new actions.
|
||||
This is a higher-level function that combines clearing and filling operations.
|
||||
|
||||
Args:
|
||||
actions: List of TimedAction instances to queue
|
||||
"""
|
||||
print("*** Current latest action: ", self.latest_action, "***")
|
||||
print("\t**** Current queue content ****: ")
|
||||
self._inspect_action_queue()
|
||||
|
||||
print("\t*** Incoming actions ****: ")
|
||||
print([a.get_timestep() for a in actions])
|
||||
|
||||
self._clear_queue()
|
||||
self._fill_action_queue(actions)
|
||||
|
||||
print("\t*** Queue after clearing and filling ****: ")
|
||||
self._inspect_action_queue()
|
||||
|
||||
def receive_actions(self):
|
||||
"""Receive actions from the policy server"""
|
||||
# Wait at barrier for synchronized start
|
||||
self.start_barrier.wait()
|
||||
print("Action receiving thread starting")
|
||||
|
||||
while self.running:
|
||||
try:
|
||||
# Use StreamActions to get a stream of actions from the server
|
||||
for actions_chunk in self.stub.StreamActions(async_inference_pb2.Empty()):
|
||||
# Deserialize bytes back into list[TimedAction]
|
||||
timed_actions = pickle.loads(actions_chunk.data) # nosec
|
||||
|
||||
# strategy for queue composition is specified in the method
|
||||
self._clear_and_fill_action_queue(timed_actions)
|
||||
|
||||
except grpc.RpcError as e:
|
||||
print(f"Error receiving actions: {e}")
|
||||
time.sleep(idle_wait) # Avoid tight loop on error
|
||||
|
||||
def _get_next_action(self) -> Optional[TimedAction]:
|
||||
"""Get the next action from the queue"""
|
||||
try:
|
||||
action = self.action_queue.get_nowait()
|
||||
return action
|
||||
|
||||
except Empty:
|
||||
return None
|
||||
|
||||
def execute_actions(self):
|
||||
"""Continuously execute actions from the queue"""
|
||||
# Wait at barrier for synchronized start
|
||||
self.start_barrier.wait()
|
||||
print("Action execution thread starting")
|
||||
|
||||
while self.running:
|
||||
# Get the next action from the queue
|
||||
time.sleep(environment_dt)
|
||||
timed_action = self._get_next_action()
|
||||
|
||||
if timed_action is not None:
|
||||
# self.latest_action = timed_action.get_timestep()
|
||||
self.latest_action = timed_action.get_timestamp()
|
||||
|
||||
# Convert action to tensor and send to robot
|
||||
if self.use_robot:
|
||||
# Acquire lock before accessing the robot
|
||||
if self.robot_lock.acquire(timeout=1.0): # Wait up to 1 second to acquire the lock
|
||||
try:
|
||||
self.robot.send_action(timed_action.get_action())
|
||||
finally:
|
||||
# Always release the lock in a finally block to ensure it's released
|
||||
self.robot_lock.release()
|
||||
else:
|
||||
print("Could not acquire robot lock for action execution, retrying next cycle")
|
||||
|
||||
else:
|
||||
# No action available, wait and retry fetching from queue
|
||||
time.sleep(idle_wait)
|
||||
|
||||
def stream_observations(self, get_observation_fn):
|
||||
"""Continuously stream observations to the server"""
|
||||
# Wait at barrier for synchronized start
|
||||
self.start_barrier.wait()
|
||||
print("Observation streaming thread starting")
|
||||
|
||||
first_observation = True
|
||||
while self.running:
|
||||
try:
|
||||
# Get serialized observation bytes from the function
|
||||
time.sleep(environment_dt)
|
||||
observation = get_observation_fn()
|
||||
|
||||
# Skip if observation is None (couldn't acquire lock)
|
||||
if observation is None:
|
||||
continue
|
||||
|
||||
# Set appropriate transfer state
|
||||
if first_observation:
|
||||
state = async_inference_pb2.TRANSFER_BEGIN
|
||||
first_observation = False
|
||||
else:
|
||||
state = async_inference_pb2.TRANSFER_MIDDLE
|
||||
|
||||
self.send_observation(observation, state)
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error in observation sender: {e}")
|
||||
time.sleep(idle_wait)
|
||||
|
||||
|
||||
def async_client():
|
||||
# Example of how to use the RobotClient
|
||||
client = RobotClient()
|
||||
|
||||
if client.start():
|
||||
# Function to generate mock observations
|
||||
def get_observation():
|
||||
# Create a counter attribute if it doesn't exist
|
||||
if not hasattr(get_observation, "counter"):
|
||||
get_observation.counter = 0
|
||||
|
||||
# Acquire lock before accessing the robot
|
||||
observation_content = None
|
||||
if client.robot_lock.acquire(timeout=1.0): # Wait up to 1 second to acquire the lock
|
||||
try:
|
||||
observation_content = client.robot.capture_observation()
|
||||
finally:
|
||||
# Always release the lock in a finally block to ensure it's released
|
||||
client.robot_lock.release()
|
||||
else:
|
||||
print("Could not acquire robot lock for observation capture, skipping this cycle")
|
||||
return None # Return None to indicate no observation was captured
|
||||
|
||||
observation = TimedObservation(
|
||||
timestamp=time.time(), observation=observation_content, timestep=get_observation.counter
|
||||
)
|
||||
|
||||
# Increment counter for next call
|
||||
get_observation.counter += 1
|
||||
|
||||
return observation
|
||||
|
||||
print("Starting all threads...")
|
||||
|
||||
# Create and start observation sender thread
|
||||
obs_thread = threading.Thread(target=client.stream_observations, args=(get_observation,))
|
||||
obs_thread.daemon = True
|
||||
|
||||
# Create and start action receiver thread
|
||||
action_receiver_thread = threading.Thread(target=client.receive_actions)
|
||||
action_receiver_thread.daemon = True
|
||||
|
||||
# Create action execution thread
|
||||
action_execution_thread = threading.Thread(target=client.execute_actions)
|
||||
action_execution_thread.daemon = True
|
||||
|
||||
# Start all threads
|
||||
obs_thread.start()
|
||||
action_receiver_thread.start()
|
||||
action_execution_thread.start()
|
||||
|
||||
try:
|
||||
# Main thread just keeps everything alive
|
||||
while client.running:
|
||||
time.sleep(idle_wait)
|
||||
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
|
||||
finally:
|
||||
client.stop()
|
||||
print("Client stopped")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
async_client()
|
||||
@@ -174,7 +174,10 @@ def run_server(
|
||||
dataset.meta.get_video_file_path(episode_id, key) for key in dataset.meta.video_keys
|
||||
]
|
||||
videos_info = [
|
||||
{"url": url_for("static", filename=video_path), "filename": video_path.parent.name}
|
||||
{
|
||||
"url": url_for("static", filename=str(video_path).replace("\\", "/")),
|
||||
"filename": video_path.parent.name,
|
||||
}
|
||||
for video_path in video_paths
|
||||
]
|
||||
tasks = dataset.meta.episodes[episode_id]["tasks"]
|
||||
@@ -381,7 +384,7 @@ def visualize_dataset_html(
|
||||
if isinstance(dataset, LeRobotDataset):
|
||||
ln_videos_dir = static_dir / "videos"
|
||||
if not ln_videos_dir.exists():
|
||||
ln_videos_dir.symlink_to((dataset.root / "videos").resolve())
|
||||
ln_videos_dir.symlink_to((dataset.root / "videos").resolve().as_posix())
|
||||
|
||||
if serve:
|
||||
run_server(dataset, episodes, host, port, static_dir, template_dir)
|
||||
|
||||
@@ -49,7 +49,7 @@ dependencies = [
|
||||
"datasets>=2.19.0",
|
||||
"deepdiff>=7.0.1",
|
||||
"diffusers>=0.27.2",
|
||||
"draccus>=0.10.0",
|
||||
"draccus==0.10.0",
|
||||
"einops>=0.8.0",
|
||||
"flask>=3.0.3",
|
||||
"gdown>=5.1.0",
|
||||
|
||||
@@ -37,7 +37,6 @@ def test_diffuser_scheduler(optimizer):
|
||||
"base_lrs": [0.001],
|
||||
"last_epoch": 1,
|
||||
"lr_lambdas": [None],
|
||||
"verbose": False,
|
||||
}
|
||||
assert scheduler.state_dict() == expected_state_dict
|
||||
|
||||
@@ -56,7 +55,6 @@ def test_vqbet_scheduler(optimizer):
|
||||
"base_lrs": [0.001],
|
||||
"last_epoch": 1,
|
||||
"lr_lambdas": [None],
|
||||
"verbose": False,
|
||||
}
|
||||
assert scheduler.state_dict() == expected_state_dict
|
||||
|
||||
@@ -77,7 +75,6 @@ def test_cosine_decay_with_warmup_scheduler(optimizer):
|
||||
"base_lrs": [0.001],
|
||||
"last_epoch": 1,
|
||||
"lr_lambdas": [None],
|
||||
"verbose": False,
|
||||
}
|
||||
assert scheduler.state_dict() == expected_state_dict
|
||||
|
||||
|
||||
Reference in New Issue
Block a user