diff --git a/lerobot/common/robot_devices/cameras/opencv.py b/lerobot/common/robot_devices/cameras/opencv.py index d2c10a568..0fc02e0e3 100644 --- a/lerobot/common/robot_devices/cameras/opencv.py +++ b/lerobot/common/robot_devices/cameras/opencv.py @@ -1,4 +1,5 @@ import argparse +import math import time from dataclasses import dataclass, replace from pathlib import Path @@ -8,6 +9,7 @@ import cv2 import numpy as np from lerobot.common.robot_devices.cameras.utils import save_color_image +from lerobot.common.utils.utils import capture_timestamp_utc def find_camera_indices(raise_when_empty=False, max_index_search_range=60): @@ -119,9 +121,9 @@ class OpenCVCamera: self.camera = None self.is_connected = False - self.t = Thread(target=self.capture_image_loop, args=()) - self.t.daemon = True - self._color_image = None + self.threads = {} + self.results = {} + self.logs = {} def connect(self): if self.is_connected: @@ -161,7 +163,7 @@ class OpenCVCamera: actual_width = self.camera.get(cv2.CAP_PROP_FRAME_WIDTH) actual_height = self.camera.get(cv2.CAP_PROP_FRAME_HEIGHT) - if self.fps and self.fps != actual_fps: + if self.fps and not math.isclose(self.fps, actual_fps, rel_tol=1e-3): raise OSError( f"Can't set {self.fps=} for camera {self.camera_index}. Actual value is {actual_fps}." ) @@ -175,12 +177,13 @@ class OpenCVCamera: ) self.is_connected = True - self.t.start() - def capture_image(self, temporary_color: str | None = None) -> np.ndarray: + def read(self, temporary_color: str | None = None) -> np.ndarray: if not self.is_connected: self.connect() + start_time = time.perf_counter() + ret, color_image = self.camera.read() if not ret: raise OSError(f"Can't capture color image from camera {self.camera_index}.") @@ -196,20 +199,41 @@ class OpenCVCamera: if requested_color == "rgb": color_image = cv2.cvtColor(color_image, cv2.COLOR_BGR2RGB) + # log the number of seconds it took to read the image + self.logs["delta_timestamp_s"] = time.perf_counter() - start_time + + # log the utc time at which the image was received + self.logs["timestamp_utc"] = capture_timestamp_utc() + return color_image - def capture_image_loop(self): + def read_loop(self): while True: - self._color_image = self.capture_image() + self.results["color_image"] = self.read() - def read(self): - while self._color_image is None: - time.sleep(0.1) - return self._color_image + def async_read(self): + if "read" not in self.threads: + self.threads["read"] = Thread(target=self.read_loop, args=()) + self.threads["read"].daemon = True + self.threads["read"].start() + + num_tries = 0 + while "color_image" not in self.results: + num_tries += 1 + time.sleep(1/self.fps) + if num_tries > self.fps: + if self.threads["read"].ident is None and not self.threads["read"].is_alive(): + raise Exception("The thread responsible for `self.async_read()` took too much time to start. There might be an issue. Verify that `self.threads[\"read\"].start()` has been called.") + + return self.results["color_image"] #, self.logs["timestamp_utc"] def disconnect(self): if getattr(self, "camera", None): self.camera.release() + for name in self.threads: + if self.threads[name].is_alive(): + # wait for the thread to finish + self.threads[name].join() def __del__(self): self.disconnect() @@ -217,8 +241,10 @@ class OpenCVCamera: def save_images_config(config: OpenCVCameraConfig, out_dir: Path): cameras = [] - print(f"Available camera indices: {OpenCVCamera.AVAILABLE_CAMERAS_INDICES}") - for camera_idx in OpenCVCamera.AVAILABLE_CAMERAS_INDICES: + + available_cam_ids = find_camera_indices() + print(f"Available camera indices: {available_cam_ids}") + for camera_idx in available_cam_ids: camera = OpenCVCamera(camera_idx, config) cameras.append(camera) diff --git a/lerobot/common/robot_devices/motors/dynamixel.py b/lerobot/common/robot_devices/motors/dynamixel.py index 88b0cf9f4..c88eebcef 100644 --- a/lerobot/common/robot_devices/motors/dynamixel.py +++ b/lerobot/common/robot_devices/motors/dynamixel.py @@ -1,5 +1,8 @@ import enum from copy import deepcopy +from queue import Queue +from threading import Thread +import time import numpy as np from dynamixel_sdk import ( @@ -14,6 +17,8 @@ from dynamixel_sdk import ( PortHandler, ) +from lerobot.common.utils.utils import capture_timestamp_utc + PROTOCOL_VERSION = 2.0 BAUD_RATE = 1_000_000 TIMEOUT_MS = 1000 @@ -149,6 +154,15 @@ def get_group_sync_key(data_name, motor_names): group_key = f"{data_name}_" + "_".join(motor_names) return group_key +def get_thread_name(fn_name, data_name, motor_names): + group_key = get_group_sync_key(data_name, motor_names) + thread_name = f"{fn_name}_{group_key}" + return thread_name + +def get_log_name(var_name, fn_name, data_name, motor_names): + thread_name = get_thread_name(fn_name, data_name, motor_names) + log_name = f"{var_name}_{thread_name}" + return log_name class TorqueMode(enum.Enum): ENABLED = 1 @@ -197,6 +211,11 @@ class DynamixelMotorsBus: self.calibration = None + self.threads = {} + self.queues = {} + self.results = {} + self.logs = {} + @property def motor_names(self) -> list[int]: return list(self.motors.keys()) @@ -239,6 +258,8 @@ class DynamixelMotorsBus: return values def read(self, data_name, motor_names: list[str] | None = None): + start_time = time.perf_counter() + if motor_names is None: motor_names = self.motor_names @@ -259,7 +280,12 @@ class DynamixelMotorsBus: for idx in motor_ids: self.group_readers[group_key].addParam(idx) - comm = self.group_readers[group_key].txRxPacket() + NUM_TRIES = 10 + for _ in range(NUM_TRIES): + comm = self.group_readers[group_key].txRxPacket() + if comm == COMM_SUCCESS: + break + if comm != COMM_SUCCESS: raise ConnectionError( f"Read failed due to communication error on port {self.port} for group_key {group_key}: " @@ -283,15 +309,30 @@ class DynamixelMotorsBus: if data_name in CONVERT_POSITION_TO_ANGLE_REQUIRED: values = motor_position_to_angle(values) + # log the number of seconds it took to read the data from the motors + delta_ts_name = get_log_name("delta_timestamp_s", "read", data_name, motor_names) + self.logs[delta_ts_name] = time.perf_counter() - start_time + + # log the utc time at which the data was received + ts_utc_name = get_log_name("timestamp_utc", "read", data_name, motor_names) + self.logs[ts_utc_name] = capture_timestamp_utc() + return values def write(self, data_name, values: int | float | np.ndarray, motor_names: str | list[str] | None = None): + start_time = time.perf_counter() + if motor_names is None: motor_names = self.motor_names if isinstance(motor_names, str): motor_names = [motor_names] + if isinstance(values, (int, float, np.integer)): + values = [int(values)] * len(motor_names) + + values = np.array(values) + motor_ids = [] models = [] for name in motor_names: @@ -299,11 +340,6 @@ class DynamixelMotorsBus: motor_ids.append(motor_idx) models.append(model) - if isinstance(values, (int, float, np.integer)): - values = [int(values)] * len(motor_ids) - - values = np.array(values) - if data_name in CONVERT_POSITION_TO_ANGLE_REQUIRED: values = motor_angle_to_position(values) @@ -361,6 +397,99 @@ class DynamixelMotorsBus: f"{self.packet_handler.getTxRxResult(comm)}" ) + # log the number of seconds it took to write the data to the motors + delta_ts_name = get_log_name("delta_timestamp_s", "write", data_name, motor_names) + self.logs[delta_ts_name] = time.perf_counter() - start_time + + # TODO(rcadene): should we log the time before sending the write command? + # log the utc time when the write has been completed + ts_utc_name = get_log_name("timestamp_utc", "write", data_name, motor_names) + self.logs[ts_utc_name] = capture_timestamp_utc() + + def read_loop(self, data_name, motor_names: list[str] | None = None): + while True: + thread_name = get_thread_name("read", data_name, motor_names) + self.results[thread_name] = self.read(data_name, motor_names) + + def async_read(self, data_name, motor_names: list[str] | None = None): + if motor_names is None: + motor_names = self.motor_names + + if isinstance(motor_names, str): + motor_names = [motor_names] + + thread_name = get_thread_name("read", data_name, motor_names) + + if thread_name not in self.threads: + self.threads[thread_name] = Thread(target=self.read_loop, args=(data_name, motor_names)) + self.threads[thread_name].daemon = True + self.threads[thread_name].start() + + FPS = 200 + num_tries = 0 + while thread_name not in self.results: + num_tries += 1 + time.sleep(1 / FPS) + if num_tries > FPS: + if self.threads[thread_name].ident is None and not self.threads[thread_name].is_alive(): + raise Exception(f"The thread responsible for `self.async_read({data_name}, {motor_names})` took too much time to start. There might be an issue. Verify that `self.threads[thread_name].start()` has been called.") + + # ts_utc_name = get_log_name("timestamp_utc", "read", data_name, motor_names) + return self.results[thread_name] #, self.logs[ts_utc_name] + + def write_loop(self, data_name, queue: Queue, motor_names: list[str] | None = None): + while True: + values = queue.get() + if values is None: # A way to terminate the thread + break + self.write(data_name, values, motor_names) + queue.task_done() + + def async_write(self, data_name, values: int | float | np.ndarray, motor_names: str | list[str] | None = None): + if motor_names is None: + motor_names = self.motor_names + + if isinstance(motor_names, str): + motor_names = [motor_names] + + if isinstance(values, (int, float, np.integer)): + values = [int(values)] * len(motor_names) + + values = np.array(values) + + thread_name = get_thread_name("write", data_name, motor_names) + ts_utc_name = get_log_name("timestamp_utc", "write", data_name, motor_names) + + if thread_name not in self.threads: + self.queues[thread_name] = Queue() + self.threads[thread_name] = Thread(target=self.write_loop, args=(data_name, self.queues[thread_name], motor_names)) + self.threads[thread_name].daemon = True + self.threads[thread_name].start() + + self.queues[thread_name].put(values) + + FPS = 200 + num_tries = 0 + while ts_utc_name not in self.logs: + num_tries += 1 + time.sleep(1 / FPS) + if num_tries > FPS: + if self.threads[thread_name].ident is None and not self.threads[thread_name].is_alive(): + raise Exception(f"The thread responsible for `self.async_write({data_name}, {values}, {motor_names})` took too much time to start. There might be an issue. Verify that `self.threads[thread_name].start()` has been called.") + + return self.logs[ts_utc_name] + + def __del__(self): + for thread_name in self.queues: + # Send value that corresponds to `break` logic + self.queues[thread_name].put(None) + self.queues[thread_name].join() + + for thread_name in self.queues: + self.threads[thread_name].join() + + # TODO(rcadene): find a simple way to exit threads created by async_read + # def read(self, data_name, motor_name: str): # motor_idx, model = self.motors[motor_name] # addr, bytes = self.model_ctrl_table[model][data_name] diff --git a/lerobot/common/robot_devices/robots/factory.py b/lerobot/common/robot_devices/robots/factory.py index afbf822ef..944661157 100644 --- a/lerobot/common/robot_devices/robots/factory.py +++ b/lerobot/common/robot_devices/robots/factory.py @@ -34,7 +34,8 @@ def make_robot(name): ), }, cameras={ - "main": OpenCVCamera(1, fps=30, width=640, height=480), + "macbookpro": OpenCVCamera(1, fps=30, width=640, height=480), + "iphone": OpenCVCamera(2, fps=30, width=640, height=480), }, ) else: diff --git a/lerobot/common/robot_devices/robots/koch.py b/lerobot/common/robot_devices/robots/koch.py index fd0636125..6a3cc798a 100644 --- a/lerobot/common/robot_devices/robots/koch.py +++ b/lerobot/common/robot_devices/robots/koch.py @@ -244,6 +244,9 @@ class KochRobot: self.follower_arms = self.config.follower_arms self.cameras = self.config.cameras + self.async_read = False + self.async_write = False + def init_teleop(self): if self.calibration_path.exists(): # Reset all arms before setting calibration @@ -265,6 +268,7 @@ class KochRobot: for name in self.follower_arms: self.follower_arms[name].set_calibration(calibration[f"follower_{name}"]) self.follower_arms[name].write("Torque_Enable", 1) + self.follower_arms[name].write("Torque_Enable", 1) for name in self.leader_arms: self.leader_arms[name].set_calibration(calibration[f"leader_{name}"]) @@ -300,7 +304,10 @@ class KochRobot: # Prepare to assign the positions of the leader to the follower leader_pos = {} for name in self.leader_arms: - leader_pos[name] = self.leader_arms[name].read("Present_Position") + if self.async_read: + leader_pos[name] = self.leader_arms[name].async_read("Present_Position") + else: + leader_pos[name] = self.leader_arms[name].read("Present_Position") follower_goal_pos = {} for name in self.leader_arms: @@ -308,7 +315,10 @@ class KochRobot: # Send action for name in self.follower_arms: - self.follower_arms[name].write("Goal_Position", follower_goal_pos[name]) + if self.async_write: + self.follower_arms[name].async_write("Goal_Position", follower_goal_pos[name]) + else: + self.follower_arms[name].write("Goal_Position", follower_goal_pos[name]) # Early exit when recording data is not requested if not record_data: @@ -317,7 +327,10 @@ class KochRobot: # Read follower position follower_pos = {} for name in self.follower_arms: - follower_pos[name] = self.follower_arms[name].read("Present_Position") + if self.async_read: + follower_pos[name] = self.follower_arms[name].async_read("Present_Position") + else: + follower_pos[name] = self.follower_arms[name].read("Present_Position") # Create state by concatenating follower current position state = [] @@ -336,7 +349,7 @@ class KochRobot: # Capture images from cameras images = {} for name in self.cameras: - images[name] = self.cameras[name].read() + images[name] = self.cameras[name].async_read() # Populate output dictionnaries and format to pytorch obs_dict, action_dict = {}, {} @@ -351,7 +364,10 @@ class KochRobot: # Read follower position follower_pos = {} for name in self.follower_arms: - follower_pos[name] = self.follower_arms[name].read("Present_Position") + if self.async_read: + follower_pos[name] = self.follower_arms[name].async_read("Present_Position") + else: + follower_pos[name] = self.follower_arms[name].read("Present_Position") # Create state by concatenating follower current position state = [] @@ -363,7 +379,7 @@ class KochRobot: # Capture images from cameras images = {} for name in self.cameras: - images[name] = self.cameras[name].read() + images[name] = self.cameras[name].async_read() # Populate output dictionnaries and format to pytorch obs_dict = {} @@ -383,4 +399,7 @@ class KochRobot: from_idx = to_idx for name in self.follower_arms: - self.follower_arms[name].write("Goal_Position", follower_goal_pos[name].astype(np.int32)) + if self.async_write: + self.follower_arms[name].async_write("Goal_Position", follower_goal_pos[name].astype(np.int32)) + else: + self.follower_arms[name].write("Goal_Position", follower_goal_pos[name].astype(np.int32)) diff --git a/lerobot/common/utils/utils.py b/lerobot/common/utils/utils.py index c429efbdb..79db627af 100644 --- a/lerobot/common/utils/utils.py +++ b/lerobot/common/utils/utils.py @@ -17,7 +17,7 @@ import logging import os.path as osp import random from contextlib import contextmanager -from datetime import datetime +from datetime import datetime, timezone from pathlib import Path from typing import Any, Generator @@ -172,3 +172,7 @@ def print_cuda_memory_usage(): print("Maximum GPU Memory Allocated: {:.2f} MB".format(torch.cuda.max_memory_allocated(0) / 1024**2)) print("Current GPU Memory Reserved: {:.2f} MB".format(torch.cuda.memory_reserved(0) / 1024**2)) print("Maximum GPU Memory Reserved: {:.2f} MB".format(torch.cuda.max_memory_reserved(0) / 1024**2)) + + +def capture_timestamp_utc(): + return datetime.now(timezone.utc) diff --git a/lerobot/scripts/control_robot.py b/lerobot/scripts/control_robot.py index 5d83adde2..ba6481fac 100644 --- a/lerobot/scripts/control_robot.py +++ b/lerobot/scripts/control_robot.py @@ -101,7 +101,7 @@ def save_image(img_tensor, key, frame_index, episode_index, videos_dir): def busy_wait(seconds): # Significantly more accurate than `time.sleep`, and mendatory for our use case, # but it consumes CPU cycles. - # TODO(rcadene): find an alternative + # TODO(rcadene): find an alternative: from python 11, time.sleep is precise end_time = time.perf_counter() + seconds while time.perf_counter() < end_time: pass @@ -156,42 +156,46 @@ def record_dataset( videos_dir = local_dir / "videos" videos_dir.mkdir(parents=True, exist_ok=True) - start_time = time.perf_counter() - - is_warmup_print = False - is_record_print = False - ep_dicts = [] - # Save images using threads to reach high fps (30 and more) # Using `with` ensures the program exists smoothly if an execption is raised. with concurrent.futures.ThreadPoolExecutor() as executor: + + timestamp = 0 + start_time = time.perf_counter() + is_warmup_print = False + while timestamp < warmup_time_s: + if not is_warmup_print: + print("Warming up by skipping frames") + os.system('say "Warmup" &') + is_warmup_print = True + + now = time.perf_counter() + observation, action = robot.teleop_step(record_data=True) + + dt_s = time.perf_counter() - now + busy_wait(1 / fps - dt_s) + + dt_s = time.perf_counter() - now + print(f"Latency (ms): {dt_s * 1000:.2f}\tFrequency: {1 / dt_s:.2f} (Warmup)") + + timestamp = time.perf_counter() - start_time + + ep_dicts = [] for episode_index in range(num_episodes): ep_dict = {} frame_index = 0 - - while True: - if not is_warmup_print: - print("Warming up by skipping frames") - os.system('say "Warmup"') - is_warmup_print = True - now = time.perf_counter() - - observation, action = robot.teleop_step(record_data=True) - timestamp = time.perf_counter() - start_time - - if timestamp < warmup_time_s: - dt_s = time.perf_counter() - now - busy_wait(1 / fps - dt_s) - - dt_s = time.perf_counter() - now - print(f"Latency (ms): {dt_s * 1000:.2f}\tFrequency: {1 / dt_s:.2f} (Warmup)") - continue - + timestamp = 0 + start_time = time.perf_counter() + is_record_print = False + while timestamp < episode_time_s: if not is_record_print: - print("Recording") - os.system(f'say "Recording episode {episode_index}"') + print(f"Recording episode {episode_index}") + os.system(f'say "Recording episode {episode_index}" &') is_record_print = True + now = time.perf_counter() + observation, action = robot.teleop_step(record_data=True) + image_keys = [key for key in observation if "image" in key] not_image_keys = [key for key in observation if "image" not in key] @@ -216,11 +220,9 @@ def record_dataset( dt_s = time.perf_counter() - now print(f"Latency (ms): {dt_s * 1000:.2f}\tFrequency: {1 / dt_s:.2f}") - if timestamp > episode_time_s - warmup_time_s: - break + timestamp = time.perf_counter() - start_time - print("Encoding to `LeRobotDataset` format") - os.system('say "Encoding"') + print("Encoding images to videos") num_frames = frame_index @@ -271,7 +273,7 @@ def record_dataset( for key in image_keys: time.sleep(10) tmp_imgs_dir = videos_dir / f"{key}_episode_{episode_index:06d}" - # shutil.rmtree(tmp_imgs_dir) + shutil.rmtree(tmp_imgs_dir) lerobot_dataset = LeRobotDataset.from_preloaded( repo_id=repo_id,