Add policy/act_aloha_real.yaml + env/act_real.yaml (#429)

Co-authored-by: Simon Alibert <75076266+aliberts@users.noreply.github.com>
This commit is contained in:
Remi
2024-10-10 17:12:45 +02:00
committed by GitHub
parent c29e70e5a1
commit 97b1feb0b3
20 changed files with 464 additions and 188 deletions

View File

@@ -102,6 +102,7 @@ import argparse
import concurrent.futures
import json
import logging
import multiprocessing
import os
import platform
import shutil
@@ -163,9 +164,9 @@ def say(text, blocking=False):
os.system(cmd)
def save_image(img_tensor, key, frame_index, episode_index, videos_dir):
def save_image(img_tensor, key, frame_index, episode_index, videos_dir: str):
img = Image.fromarray(img_tensor.numpy())
path = videos_dir / f"{key}_episode_{episode_index:06d}" / f"frame_{frame_index:06d}.png"
path = Path(videos_dir) / f"{key}_episode_{episode_index:06d}" / f"frame_{frame_index:06d}.png"
path.parent.mkdir(parents=True, exist_ok=True)
img.save(str(path), quality=100)
@@ -255,6 +256,129 @@ def get_available_arms(robot):
return available_arms
########################################################################################
# Asynchrounous saving of images on disk
########################################################################################
def loop_to_save_images_in_threads(image_queue, num_threads):
if num_threads < 1:
raise NotImplementedError(f"Only `num_threads>=1` is supported for now, but {num_threads=} given.")
with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor:
futures = []
while True:
# Blocks until a frame is available
frame_data = image_queue.get()
# As usually done, exit loop when receiving None to stop the worker
if frame_data is None:
break
image, key, frame_index, episode_index, videos_dir = frame_data
futures.append(executor.submit(save_image, image, key, frame_index, episode_index, videos_dir))
# Before exiting function, wait for all threads to complete
with tqdm.tqdm(total=len(futures), desc="Writing images") as progress_bar:
concurrent.futures.wait(futures)
progress_bar.update(len(futures))
def start_image_writer_processes(image_queue, num_processes, num_threads_per_process):
if num_processes < 1:
raise ValueError(f"Only `num_processes>=1` is supported, but {num_processes=} given.")
if num_threads_per_process < 1:
raise NotImplementedError(
"Only `num_threads_per_process>=1` is supported for now, but {num_threads_per_process=} given."
)
processes = []
for _ in range(num_processes):
process = multiprocessing.Process(
target=loop_to_save_images_in_threads,
args=(image_queue, num_threads_per_process),
)
process.start()
processes.append(process)
return processes
def stop_processes(processes, queue, timeout):
# Send None to each process to signal them to stop
for _ in processes:
queue.put(None)
# Close the queue, no more items can be put in the queue
queue.close()
# Wait maximum 20 seconds for all processes to terminate
for process in processes:
process.join(timeout=timeout)
# If not terminated after 20 seconds, force termination
if process.is_alive():
process.terminate()
# Ensure all background queue threads have finished
queue.join_thread()
def start_image_writer(num_processes, num_threads):
"""This function abstract away the initialisation of processes or/and threads to
save images on disk asynchrounously, which is critical to control a robot and record data
at a high frame rate.
When `num_processes=0`, it returns a dictionary containing a threads pool of size `num_threads`.
When `num_processes>0`, it returns a dictionary containing a processes pool of size `num_processes`,
where each subprocess starts their own threads pool of size `num_threads`.
The optimal number of processes and threads depends on your computer capabilities.
We advise to use 4 threads per camera with 0 processes. If the fps is not stable, try to increase or lower
the number of threads. If it is still not stable, try to use 1 subprocess, or more.
"""
image_writer = {}
if num_processes == 0:
futures = []
threads_pool = concurrent.futures.ThreadPoolExecutor(max_workers=num_threads)
image_writer["threads_pool"], image_writer["futures"] = threads_pool, futures
else:
# TODO(rcadene): When using num_processes>1, `multiprocessing.Manager().Queue()`
# might be better than `multiprocessing.Queue()`. Source: https://www.geeksforgeeks.org/python-multiprocessing-queue-vs-multiprocessing-manager-queue
image_queue = multiprocessing.Queue()
processes_pool = start_image_writer_processes(
image_queue, num_processes=num_processes, num_threads_per_process=num_threads
)
image_writer["processes_pool"], image_writer["image_queue"] = processes_pool, image_queue
return image_writer
def async_save_image(image_writer, image, key, frame_index, episode_index, videos_dir):
"""This function abstract away the saving of an image on disk asynchrounously. It uses a dictionary
called image writer which contains either a pool of processes or a pool of threads.
"""
if "threads_pool" in image_writer:
threads_pool, futures = image_writer["threads_pool"], image_writer["futures"]
futures.append(threads_pool.submit(save_image, image, key, frame_index, episode_index, videos_dir))
else:
image_queue = image_writer["image_queue"]
image_queue.put((image, key, frame_index, episode_index, videos_dir))
def stop_image_writer(image_writer, timeout):
if "threads_pool" in image_writer:
futures = image_writer["futures"]
# Before exiting function, wait for all threads to complete
with tqdm.tqdm(total=len(futures), desc="Writing images") as progress_bar:
concurrent.futures.wait(futures, timeout=timeout)
progress_bar.update(len(futures))
else:
processes_pool, image_queue = image_writer["processes_pool"], image_writer["image_queue"]
stop_processes(processes_pool, image_queue, timeout=timeout)
########################################################################################
# Control modes
########################################################################################
@@ -342,9 +466,11 @@ def record(
run_compute_stats=True,
push_to_hub=True,
tags=None,
num_image_writers_per_camera=4,
num_image_writer_processes=0,
num_image_writer_threads_per_camera=4,
force_override=False,
display_cameras=True,
play_sounds=True,
):
# TODO(rcadene): Add option to record logs
# TODO(rcadene): Clean this function via decomposition in higher level functions
@@ -436,7 +562,8 @@ def record(
while timestamp < warmup_time_s:
if not is_warmup_print:
logging.info("Warming up (no data recording)")
say("Warming up")
if play_sounds:
say("Warming up")
is_warmup_print = True
start_loop_t = time.perf_counter()
@@ -463,16 +590,22 @@ def record(
if has_method(robot, "teleop_safety_stop"):
robot.teleop_safety_stop()
# Save images using threads to reach high fps (30 and more)
# Using `with` to exist smoothly if an execption is raised.
futures = []
num_image_writers = num_image_writers_per_camera * len(robot.cameras)
num_image_writers = max(num_image_writers, 1)
with concurrent.futures.ThreadPoolExecutor(max_workers=num_image_writers) as executor:
has_camera = len(robot.cameras) > 0
if has_camera:
# Initialize processes or/and threads dedicated to save images on disk asynchronously,
# which is critical to control a robot and record data at a high frame rate.
image_writer = start_image_writer(
num_processes=num_image_writer_processes,
num_threads=num_image_writer_threads_per_camera * len(robot.cameras),
)
# Using `try` to exist smoothly if an exception is raised
try:
# Start recording all episodes
while episode_index < num_episodes:
logging.info(f"Recording episode {episode_index}")
say(f"Recording episode {episode_index}")
if play_sounds:
say(f"Recording episode {episode_index}")
ep_dict = {}
frame_index = 0
timestamp = 0
@@ -488,12 +621,16 @@ def record(
image_keys = [key for key in observation if "image" in key]
not_image_keys = [key for key in observation if "image" not in key]
for key in image_keys:
futures += [
executor.submit(
save_image, observation[key], key, frame_index, episode_index, videos_dir
if has_camera > 0:
for key in image_keys:
async_save_image(
image_writer,
image=observation[key],
key=key,
frame_index=frame_index,
episode_index=episode_index,
videos_dir=str(videos_dir),
)
]
if display_cameras and not is_headless():
image_keys = [key for key in observation if "image" in key]
@@ -563,7 +700,8 @@ def record(
if not stop_recording:
# Start resetting env while the executor are finishing
logging.info("Reset the environment")
say("Reset the environment")
if play_sounds:
say("Reset the environment")
timestamp = 0
start_vencod_t = time.perf_counter()
@@ -635,18 +773,23 @@ def record(
if is_last_episode:
logging.info("Done recording")
say("Done recording", blocking=True)
if play_sounds:
say("Done recording", blocking=True)
if not is_headless():
listener.stop()
logging.info("Waiting for threads writing the images on disk to terminate...")
for _ in tqdm.tqdm(
concurrent.futures.as_completed(futures), total=len(futures), desc="Writting images"
):
pass
break
if has_camera > 0:
logging.info("Waiting for image writer to terminate...")
stop_image_writer(image_writer, timeout=20)
except Exception as e:
if has_camera > 0:
logging.info("Waiting for image writer to terminate...")
stop_image_writer(image_writer, timeout=20)
raise e
robot.disconnect()
if display_cameras and not is_headless():
cv2.destroyAllWindows()
@@ -654,7 +797,8 @@ def record(
if video:
logging.info("Encoding videos")
say("Encoding videos")
if play_sounds:
say("Encoding videos")
# Use ffmpeg to convert frames stored as png into mp4 videos
for episode_index in tqdm.tqdm(range(num_episodes)):
for key in image_keys:
@@ -699,7 +843,8 @@ def record(
)
if run_compute_stats:
logging.info("Computing dataset statistics")
say("Computing dataset statistics")
if play_sounds:
say("Computing dataset statistics")
stats = compute_stats(lerobot_dataset)
lerobot_dataset.stats = stats
else:
@@ -721,11 +866,14 @@ def record(
create_branch(repo_id, repo_type="dataset", branch=CODEBASE_VERSION)
logging.info("Exiting")
say("Exiting")
if play_sounds:
say("Exiting")
return lerobot_dataset
def replay(robot: Robot, episode: int, fps: int | None = None, root="data", repo_id="lerobot/debug"):
def replay(
robot: Robot, episode: int, fps: int | None = None, root="data", repo_id="lerobot/debug", play_sounds=True
):
# TODO(rcadene): Add option to record logs
local_dir = Path(root) / repo_id
if not local_dir.exists():
@@ -740,7 +888,8 @@ def replay(robot: Robot, episode: int, fps: int | None = None, root="data", repo
robot.connect()
logging.info("Replaying episode")
say("Replaying episode", blocking=True)
if play_sounds:
say("Replaying episode", blocking=True)
for idx in range(from_idx, to_idx):
start_episode_t = time.perf_counter()
@@ -840,12 +989,23 @@ if __name__ == "__main__":
help="Add tags to your dataset on the hub.",
)
parser_record.add_argument(
"--num-image-writers-per-camera",
"--num-image-writer-processes",
type=int,
default=0,
help=(
"Number of subprocesses handling the saving of frames as PNGs. Set to 0 to use threads only; "
"set to ≥1 to use subprocesses, each using threads to write images. The best number of processes "
"and threads depends on your system. We recommend 4 threads per camera with 0 processes. "
"If fps is unstable, adjust the thread count. If still unstable, try using 1 or more subprocesses."
),
)
parser_record.add_argument(
"--num-image-writer-threads-per-camera",
type=int,
default=4,
help=(
"Number of threads writing the frames as png images on disk, per camera. "
"Too much threads might cause unstable teleoperation fps due to main thread being blocked. "
"Too many threads might cause unstable teleoperation fps due to main thread being blocked. "
"Not enough threads might cause low camera fps."
),
)