Add async_read and async_write (it doesnt work)

This commit is contained in:
Remi Cadene
2024-07-05 18:09:29 +02:00
parent 6e77a399a2
commit 3ff789c181
6 changed files with 243 additions and 62 deletions

View File

@@ -1,4 +1,5 @@
import argparse import argparse
import math
import time import time
from dataclasses import dataclass, replace from dataclasses import dataclass, replace
from pathlib import Path from pathlib import Path
@@ -8,6 +9,7 @@ import cv2
import numpy as np import numpy as np
from lerobot.common.robot_devices.cameras.utils import save_color_image from lerobot.common.robot_devices.cameras.utils import save_color_image
from lerobot.common.utils.utils import capture_timestamp_utc
def find_camera_indices(raise_when_empty=False, max_index_search_range=60): def find_camera_indices(raise_when_empty=False, max_index_search_range=60):
@@ -119,9 +121,9 @@ class OpenCVCamera:
self.camera = None self.camera = None
self.is_connected = False self.is_connected = False
self.t = Thread(target=self.capture_image_loop, args=()) self.threads = {}
self.t.daemon = True self.results = {}
self._color_image = None self.logs = {}
def connect(self): def connect(self):
if self.is_connected: if self.is_connected:
@@ -161,7 +163,7 @@ class OpenCVCamera:
actual_width = self.camera.get(cv2.CAP_PROP_FRAME_WIDTH) actual_width = self.camera.get(cv2.CAP_PROP_FRAME_WIDTH)
actual_height = self.camera.get(cv2.CAP_PROP_FRAME_HEIGHT) actual_height = self.camera.get(cv2.CAP_PROP_FRAME_HEIGHT)
if self.fps and self.fps != actual_fps: if self.fps and not math.isclose(self.fps, actual_fps, rel_tol=1e-3):
raise OSError( raise OSError(
f"Can't set {self.fps=} for camera {self.camera_index}. Actual value is {actual_fps}." f"Can't set {self.fps=} for camera {self.camera_index}. Actual value is {actual_fps}."
) )
@@ -175,12 +177,13 @@ class OpenCVCamera:
) )
self.is_connected = True self.is_connected = True
self.t.start()
def capture_image(self, temporary_color: str | None = None) -> np.ndarray: def read(self, temporary_color: str | None = None) -> np.ndarray:
if not self.is_connected: if not self.is_connected:
self.connect() self.connect()
start_time = time.perf_counter()
ret, color_image = self.camera.read() ret, color_image = self.camera.read()
if not ret: if not ret:
raise OSError(f"Can't capture color image from camera {self.camera_index}.") raise OSError(f"Can't capture color image from camera {self.camera_index}.")
@@ -196,20 +199,41 @@ class OpenCVCamera:
if requested_color == "rgb": if requested_color == "rgb":
color_image = cv2.cvtColor(color_image, cv2.COLOR_BGR2RGB) color_image = cv2.cvtColor(color_image, cv2.COLOR_BGR2RGB)
# log the number of seconds it took to read the image
self.logs["delta_timestamp_s"] = time.perf_counter() - start_time
# log the utc time at which the image was received
self.logs["timestamp_utc"] = capture_timestamp_utc()
return color_image return color_image
def capture_image_loop(self): def read_loop(self):
while True: while True:
self._color_image = self.capture_image() self.results["color_image"] = self.read()
def read(self): def async_read(self):
while self._color_image is None: if "read" not in self.threads:
time.sleep(0.1) self.threads["read"] = Thread(target=self.read_loop, args=())
return self._color_image self.threads["read"].daemon = True
self.threads["read"].start()
num_tries = 0
while "color_image" not in self.results:
num_tries += 1
time.sleep(1/self.fps)
if num_tries > self.fps:
if self.threads["read"].ident is None and not self.threads["read"].is_alive():
raise Exception("The thread responsible for `self.async_read()` took too much time to start. There might be an issue. Verify that `self.threads[\"read\"].start()` has been called.")
return self.results["color_image"] #, self.logs["timestamp_utc"]
def disconnect(self): def disconnect(self):
if getattr(self, "camera", None): if getattr(self, "camera", None):
self.camera.release() self.camera.release()
for name in self.threads:
if self.threads[name].is_alive():
# wait for the thread to finish
self.threads[name].join()
def __del__(self): def __del__(self):
self.disconnect() self.disconnect()
@@ -217,8 +241,10 @@ class OpenCVCamera:
def save_images_config(config: OpenCVCameraConfig, out_dir: Path): def save_images_config(config: OpenCVCameraConfig, out_dir: Path):
cameras = [] cameras = []
print(f"Available camera indices: {OpenCVCamera.AVAILABLE_CAMERAS_INDICES}")
for camera_idx in OpenCVCamera.AVAILABLE_CAMERAS_INDICES: available_cam_ids = find_camera_indices()
print(f"Available camera indices: {available_cam_ids}")
for camera_idx in available_cam_ids:
camera = OpenCVCamera(camera_idx, config) camera = OpenCVCamera(camera_idx, config)
cameras.append(camera) cameras.append(camera)

View File

@@ -1,5 +1,8 @@
import enum import enum
from copy import deepcopy from copy import deepcopy
from queue import Queue
from threading import Thread
import time
import numpy as np import numpy as np
from dynamixel_sdk import ( from dynamixel_sdk import (
@@ -14,6 +17,8 @@ from dynamixel_sdk import (
PortHandler, PortHandler,
) )
from lerobot.common.utils.utils import capture_timestamp_utc
PROTOCOL_VERSION = 2.0 PROTOCOL_VERSION = 2.0
BAUD_RATE = 1_000_000 BAUD_RATE = 1_000_000
TIMEOUT_MS = 1000 TIMEOUT_MS = 1000
@@ -149,6 +154,15 @@ def get_group_sync_key(data_name, motor_names):
group_key = f"{data_name}_" + "_".join(motor_names) group_key = f"{data_name}_" + "_".join(motor_names)
return group_key return group_key
def get_thread_name(fn_name, data_name, motor_names):
group_key = get_group_sync_key(data_name, motor_names)
thread_name = f"{fn_name}_{group_key}"
return thread_name
def get_log_name(var_name, fn_name, data_name, motor_names):
thread_name = get_thread_name(fn_name, data_name, motor_names)
log_name = f"{var_name}_{thread_name}"
return log_name
class TorqueMode(enum.Enum): class TorqueMode(enum.Enum):
ENABLED = 1 ENABLED = 1
@@ -197,6 +211,11 @@ class DynamixelMotorsBus:
self.calibration = None self.calibration = None
self.threads = {}
self.queues = {}
self.results = {}
self.logs = {}
@property @property
def motor_names(self) -> list[int]: def motor_names(self) -> list[int]:
return list(self.motors.keys()) return list(self.motors.keys())
@@ -239,6 +258,8 @@ class DynamixelMotorsBus:
return values return values
def read(self, data_name, motor_names: list[str] | None = None): def read(self, data_name, motor_names: list[str] | None = None):
start_time = time.perf_counter()
if motor_names is None: if motor_names is None:
motor_names = self.motor_names motor_names = self.motor_names
@@ -259,7 +280,12 @@ class DynamixelMotorsBus:
for idx in motor_ids: for idx in motor_ids:
self.group_readers[group_key].addParam(idx) self.group_readers[group_key].addParam(idx)
comm = self.group_readers[group_key].txRxPacket() NUM_TRIES = 10
for _ in range(NUM_TRIES):
comm = self.group_readers[group_key].txRxPacket()
if comm == COMM_SUCCESS:
break
if comm != COMM_SUCCESS: if comm != COMM_SUCCESS:
raise ConnectionError( raise ConnectionError(
f"Read failed due to communication error on port {self.port} for group_key {group_key}: " f"Read failed due to communication error on port {self.port} for group_key {group_key}: "
@@ -283,15 +309,30 @@ class DynamixelMotorsBus:
if data_name in CONVERT_POSITION_TO_ANGLE_REQUIRED: if data_name in CONVERT_POSITION_TO_ANGLE_REQUIRED:
values = motor_position_to_angle(values) values = motor_position_to_angle(values)
# log the number of seconds it took to read the data from the motors
delta_ts_name = get_log_name("delta_timestamp_s", "read", data_name, motor_names)
self.logs[delta_ts_name] = time.perf_counter() - start_time
# log the utc time at which the data was received
ts_utc_name = get_log_name("timestamp_utc", "read", data_name, motor_names)
self.logs[ts_utc_name] = capture_timestamp_utc()
return values return values
def write(self, data_name, values: int | float | np.ndarray, motor_names: str | list[str] | None = None): def write(self, data_name, values: int | float | np.ndarray, motor_names: str | list[str] | None = None):
start_time = time.perf_counter()
if motor_names is None: if motor_names is None:
motor_names = self.motor_names motor_names = self.motor_names
if isinstance(motor_names, str): if isinstance(motor_names, str):
motor_names = [motor_names] motor_names = [motor_names]
if isinstance(values, (int, float, np.integer)):
values = [int(values)] * len(motor_names)
values = np.array(values)
motor_ids = [] motor_ids = []
models = [] models = []
for name in motor_names: for name in motor_names:
@@ -299,11 +340,6 @@ class DynamixelMotorsBus:
motor_ids.append(motor_idx) motor_ids.append(motor_idx)
models.append(model) models.append(model)
if isinstance(values, (int, float, np.integer)):
values = [int(values)] * len(motor_ids)
values = np.array(values)
if data_name in CONVERT_POSITION_TO_ANGLE_REQUIRED: if data_name in CONVERT_POSITION_TO_ANGLE_REQUIRED:
values = motor_angle_to_position(values) values = motor_angle_to_position(values)
@@ -361,6 +397,99 @@ class DynamixelMotorsBus:
f"{self.packet_handler.getTxRxResult(comm)}" f"{self.packet_handler.getTxRxResult(comm)}"
) )
# log the number of seconds it took to write the data to the motors
delta_ts_name = get_log_name("delta_timestamp_s", "write", data_name, motor_names)
self.logs[delta_ts_name] = time.perf_counter() - start_time
# TODO(rcadene): should we log the time before sending the write command?
# log the utc time when the write has been completed
ts_utc_name = get_log_name("timestamp_utc", "write", data_name, motor_names)
self.logs[ts_utc_name] = capture_timestamp_utc()
def read_loop(self, data_name, motor_names: list[str] | None = None):
while True:
thread_name = get_thread_name("read", data_name, motor_names)
self.results[thread_name] = self.read(data_name, motor_names)
def async_read(self, data_name, motor_names: list[str] | None = None):
if motor_names is None:
motor_names = self.motor_names
if isinstance(motor_names, str):
motor_names = [motor_names]
thread_name = get_thread_name("read", data_name, motor_names)
if thread_name not in self.threads:
self.threads[thread_name] = Thread(target=self.read_loop, args=(data_name, motor_names))
self.threads[thread_name].daemon = True
self.threads[thread_name].start()
FPS = 200
num_tries = 0
while thread_name not in self.results:
num_tries += 1
time.sleep(1 / FPS)
if num_tries > FPS:
if self.threads[thread_name].ident is None and not self.threads[thread_name].is_alive():
raise Exception(f"The thread responsible for `self.async_read({data_name}, {motor_names})` took too much time to start. There might be an issue. Verify that `self.threads[thread_name].start()` has been called.")
# ts_utc_name = get_log_name("timestamp_utc", "read", data_name, motor_names)
return self.results[thread_name] #, self.logs[ts_utc_name]
def write_loop(self, data_name, queue: Queue, motor_names: list[str] | None = None):
while True:
values = queue.get()
if values is None: # A way to terminate the thread
break
self.write(data_name, values, motor_names)
queue.task_done()
def async_write(self, data_name, values: int | float | np.ndarray, motor_names: str | list[str] | None = None):
if motor_names is None:
motor_names = self.motor_names
if isinstance(motor_names, str):
motor_names = [motor_names]
if isinstance(values, (int, float, np.integer)):
values = [int(values)] * len(motor_names)
values = np.array(values)
thread_name = get_thread_name("write", data_name, motor_names)
ts_utc_name = get_log_name("timestamp_utc", "write", data_name, motor_names)
if thread_name not in self.threads:
self.queues[thread_name] = Queue()
self.threads[thread_name] = Thread(target=self.write_loop, args=(data_name, self.queues[thread_name], motor_names))
self.threads[thread_name].daemon = True
self.threads[thread_name].start()
self.queues[thread_name].put(values)
FPS = 200
num_tries = 0
while ts_utc_name not in self.logs:
num_tries += 1
time.sleep(1 / FPS)
if num_tries > FPS:
if self.threads[thread_name].ident is None and not self.threads[thread_name].is_alive():
raise Exception(f"The thread responsible for `self.async_write({data_name}, {values}, {motor_names})` took too much time to start. There might be an issue. Verify that `self.threads[thread_name].start()` has been called.")
return self.logs[ts_utc_name]
def __del__(self):
for thread_name in self.queues:
# Send value that corresponds to `break` logic
self.queues[thread_name].put(None)
self.queues[thread_name].join()
for thread_name in self.queues:
self.threads[thread_name].join()
# TODO(rcadene): find a simple way to exit threads created by async_read
# def read(self, data_name, motor_name: str): # def read(self, data_name, motor_name: str):
# motor_idx, model = self.motors[motor_name] # motor_idx, model = self.motors[motor_name]
# addr, bytes = self.model_ctrl_table[model][data_name] # addr, bytes = self.model_ctrl_table[model][data_name]

View File

@@ -34,7 +34,8 @@ def make_robot(name):
), ),
}, },
cameras={ cameras={
"main": OpenCVCamera(1, fps=30, width=640, height=480), "macbookpro": OpenCVCamera(1, fps=30, width=640, height=480),
"iphone": OpenCVCamera(2, fps=30, width=640, height=480),
}, },
) )
else: else:

View File

@@ -244,6 +244,9 @@ class KochRobot:
self.follower_arms = self.config.follower_arms self.follower_arms = self.config.follower_arms
self.cameras = self.config.cameras self.cameras = self.config.cameras
self.async_read = False
self.async_write = False
def init_teleop(self): def init_teleop(self):
if self.calibration_path.exists(): if self.calibration_path.exists():
# Reset all arms before setting calibration # Reset all arms before setting calibration
@@ -265,6 +268,7 @@ class KochRobot:
for name in self.follower_arms: for name in self.follower_arms:
self.follower_arms[name].set_calibration(calibration[f"follower_{name}"]) self.follower_arms[name].set_calibration(calibration[f"follower_{name}"])
self.follower_arms[name].write("Torque_Enable", 1) self.follower_arms[name].write("Torque_Enable", 1)
self.follower_arms[name].write("Torque_Enable", 1)
for name in self.leader_arms: for name in self.leader_arms:
self.leader_arms[name].set_calibration(calibration[f"leader_{name}"]) self.leader_arms[name].set_calibration(calibration[f"leader_{name}"])
@@ -300,7 +304,10 @@ class KochRobot:
# Prepare to assign the positions of the leader to the follower # Prepare to assign the positions of the leader to the follower
leader_pos = {} leader_pos = {}
for name in self.leader_arms: for name in self.leader_arms:
leader_pos[name] = self.leader_arms[name].read("Present_Position") if self.async_read:
leader_pos[name] = self.leader_arms[name].async_read("Present_Position")
else:
leader_pos[name] = self.leader_arms[name].read("Present_Position")
follower_goal_pos = {} follower_goal_pos = {}
for name in self.leader_arms: for name in self.leader_arms:
@@ -308,7 +315,10 @@ class KochRobot:
# Send action # Send action
for name in self.follower_arms: for name in self.follower_arms:
self.follower_arms[name].write("Goal_Position", follower_goal_pos[name]) if self.async_write:
self.follower_arms[name].async_write("Goal_Position", follower_goal_pos[name])
else:
self.follower_arms[name].write("Goal_Position", follower_goal_pos[name])
# Early exit when recording data is not requested # Early exit when recording data is not requested
if not record_data: if not record_data:
@@ -317,7 +327,10 @@ class KochRobot:
# Read follower position # Read follower position
follower_pos = {} follower_pos = {}
for name in self.follower_arms: for name in self.follower_arms:
follower_pos[name] = self.follower_arms[name].read("Present_Position") if self.async_read:
follower_pos[name] = self.follower_arms[name].async_read("Present_Position")
else:
follower_pos[name] = self.follower_arms[name].read("Present_Position")
# Create state by concatenating follower current position # Create state by concatenating follower current position
state = [] state = []
@@ -336,7 +349,7 @@ class KochRobot:
# Capture images from cameras # Capture images from cameras
images = {} images = {}
for name in self.cameras: for name in self.cameras:
images[name] = self.cameras[name].read() images[name] = self.cameras[name].async_read()
# Populate output dictionnaries and format to pytorch # Populate output dictionnaries and format to pytorch
obs_dict, action_dict = {}, {} obs_dict, action_dict = {}, {}
@@ -351,7 +364,10 @@ class KochRobot:
# Read follower position # Read follower position
follower_pos = {} follower_pos = {}
for name in self.follower_arms: for name in self.follower_arms:
follower_pos[name] = self.follower_arms[name].read("Present_Position") if self.async_read:
follower_pos[name] = self.follower_arms[name].async_read("Present_Position")
else:
follower_pos[name] = self.follower_arms[name].read("Present_Position")
# Create state by concatenating follower current position # Create state by concatenating follower current position
state = [] state = []
@@ -363,7 +379,7 @@ class KochRobot:
# Capture images from cameras # Capture images from cameras
images = {} images = {}
for name in self.cameras: for name in self.cameras:
images[name] = self.cameras[name].read() images[name] = self.cameras[name].async_read()
# Populate output dictionnaries and format to pytorch # Populate output dictionnaries and format to pytorch
obs_dict = {} obs_dict = {}
@@ -383,4 +399,7 @@ class KochRobot:
from_idx = to_idx from_idx = to_idx
for name in self.follower_arms: for name in self.follower_arms:
self.follower_arms[name].write("Goal_Position", follower_goal_pos[name].astype(np.int32)) if self.async_write:
self.follower_arms[name].async_write("Goal_Position", follower_goal_pos[name].astype(np.int32))
else:
self.follower_arms[name].write("Goal_Position", follower_goal_pos[name].astype(np.int32))

View File

@@ -17,7 +17,7 @@ import logging
import os.path as osp import os.path as osp
import random import random
from contextlib import contextmanager from contextlib import contextmanager
from datetime import datetime from datetime import datetime, timezone
from pathlib import Path from pathlib import Path
from typing import Any, Generator from typing import Any, Generator
@@ -172,3 +172,7 @@ def print_cuda_memory_usage():
print("Maximum GPU Memory Allocated: {:.2f} MB".format(torch.cuda.max_memory_allocated(0) / 1024**2)) print("Maximum GPU Memory Allocated: {:.2f} MB".format(torch.cuda.max_memory_allocated(0) / 1024**2))
print("Current GPU Memory Reserved: {:.2f} MB".format(torch.cuda.memory_reserved(0) / 1024**2)) print("Current GPU Memory Reserved: {:.2f} MB".format(torch.cuda.memory_reserved(0) / 1024**2))
print("Maximum GPU Memory Reserved: {:.2f} MB".format(torch.cuda.max_memory_reserved(0) / 1024**2)) print("Maximum GPU Memory Reserved: {:.2f} MB".format(torch.cuda.max_memory_reserved(0) / 1024**2))
def capture_timestamp_utc():
return datetime.now(timezone.utc)

View File

@@ -101,7 +101,7 @@ def save_image(img_tensor, key, frame_index, episode_index, videos_dir):
def busy_wait(seconds): def busy_wait(seconds):
# Significantly more accurate than `time.sleep`, and mendatory for our use case, # Significantly more accurate than `time.sleep`, and mendatory for our use case,
# but it consumes CPU cycles. # but it consumes CPU cycles.
# TODO(rcadene): find an alternative # TODO(rcadene): find an alternative: from python 11, time.sleep is precise
end_time = time.perf_counter() + seconds end_time = time.perf_counter() + seconds
while time.perf_counter() < end_time: while time.perf_counter() < end_time:
pass pass
@@ -156,42 +156,46 @@ def record_dataset(
videos_dir = local_dir / "videos" videos_dir = local_dir / "videos"
videos_dir.mkdir(parents=True, exist_ok=True) videos_dir.mkdir(parents=True, exist_ok=True)
start_time = time.perf_counter()
is_warmup_print = False
is_record_print = False
ep_dicts = []
# Save images using threads to reach high fps (30 and more) # Save images using threads to reach high fps (30 and more)
# Using `with` ensures the program exists smoothly if an execption is raised. # Using `with` ensures the program exists smoothly if an execption is raised.
with concurrent.futures.ThreadPoolExecutor() as executor: with concurrent.futures.ThreadPoolExecutor() as executor:
timestamp = 0
start_time = time.perf_counter()
is_warmup_print = False
while timestamp < warmup_time_s:
if not is_warmup_print:
print("Warming up by skipping frames")
os.system('say "Warmup" &')
is_warmup_print = True
now = time.perf_counter()
observation, action = robot.teleop_step(record_data=True)
dt_s = time.perf_counter() - now
busy_wait(1 / fps - dt_s)
dt_s = time.perf_counter() - now
print(f"Latency (ms): {dt_s * 1000:.2f}\tFrequency: {1 / dt_s:.2f} (Warmup)")
timestamp = time.perf_counter() - start_time
ep_dicts = []
for episode_index in range(num_episodes): for episode_index in range(num_episodes):
ep_dict = {} ep_dict = {}
frame_index = 0 frame_index = 0
timestamp = 0
while True: start_time = time.perf_counter()
if not is_warmup_print: is_record_print = False
print("Warming up by skipping frames") while timestamp < episode_time_s:
os.system('say "Warmup"')
is_warmup_print = True
now = time.perf_counter()
observation, action = robot.teleop_step(record_data=True)
timestamp = time.perf_counter() - start_time
if timestamp < warmup_time_s:
dt_s = time.perf_counter() - now
busy_wait(1 / fps - dt_s)
dt_s = time.perf_counter() - now
print(f"Latency (ms): {dt_s * 1000:.2f}\tFrequency: {1 / dt_s:.2f} (Warmup)")
continue
if not is_record_print: if not is_record_print:
print("Recording") print(f"Recording episode {episode_index}")
os.system(f'say "Recording episode {episode_index}"') os.system(f'say "Recording episode {episode_index}" &')
is_record_print = True is_record_print = True
now = time.perf_counter()
observation, action = robot.teleop_step(record_data=True)
image_keys = [key for key in observation if "image" in key] image_keys = [key for key in observation if "image" in key]
not_image_keys = [key for key in observation if "image" not in key] not_image_keys = [key for key in observation if "image" not in key]
@@ -216,11 +220,9 @@ def record_dataset(
dt_s = time.perf_counter() - now dt_s = time.perf_counter() - now
print(f"Latency (ms): {dt_s * 1000:.2f}\tFrequency: {1 / dt_s:.2f}") print(f"Latency (ms): {dt_s * 1000:.2f}\tFrequency: {1 / dt_s:.2f}")
if timestamp > episode_time_s - warmup_time_s: timestamp = time.perf_counter() - start_time
break
print("Encoding to `LeRobotDataset` format") print("Encoding images to videos")
os.system('say "Encoding"')
num_frames = frame_index num_frames = frame_index
@@ -271,7 +273,7 @@ def record_dataset(
for key in image_keys: for key in image_keys:
time.sleep(10) time.sleep(10)
tmp_imgs_dir = videos_dir / f"{key}_episode_{episode_index:06d}" tmp_imgs_dir = videos_dir / f"{key}_episode_{episode_index:06d}"
# shutil.rmtree(tmp_imgs_dir) shutil.rmtree(tmp_imgs_dir)
lerobot_dataset = LeRobotDataset.from_preloaded( lerobot_dataset = LeRobotDataset.from_preloaded(
repo_id=repo_id, repo_id=repo_id,