diff --git a/lerobot/common/datasets/push_dataset_to_hub/_download_raw.py b/lerobot/common/datasets/push_dataset_to_hub/_download_raw.py index 263c929d..d11ebabf 100644 --- a/lerobot/common/datasets/push_dataset_to_hub/_download_raw.py +++ b/lerobot/common/datasets/push_dataset_to_hub/_download_raw.py @@ -4,20 +4,21 @@ useless dependencies when using datasets. """ import io +import shutil from pathlib import Path import tqdm -def download_raw(root, dataset_id) -> Path: +def download_raw(raw_dir, dataset_id) -> Path: if "pusht" in dataset_id: - return download_pusht(root=root, dataset_id=dataset_id) + return download_pusht(raw_dir) elif "xarm" in dataset_id: - return download_xarm(root=root, dataset_id=dataset_id) + return download_xarm(raw_dir) elif "aloha" in dataset_id: - return download_aloha(root=root, dataset_id=dataset_id) + return download_aloha(raw_dir, dataset_id) elif "umi" in dataset_id: - return download_umi(root=root, dataset_id=dataset_id) + return download_umi(raw_dir) else: raise ValueError(dataset_id) @@ -50,42 +51,37 @@ def download_and_extract_zip(url: str, destination_folder: Path) -> bool: return False -def download_pusht(root: str, dataset_id: str = "pusht", fps: int = 10) -> Path: +def download_pusht(raw_dir: str): pusht_url = "https://diffusion-policy.cs.columbia.edu/data/training/pusht.zip" - pusht_zarr = Path("pusht/pusht_cchi_v7_replay.zarr") - root = Path(root) - raw_dir: Path = root / f"{dataset_id}_raw" - zarr_path: Path = (raw_dir / pusht_zarr).resolve() + zarr_path = raw_dir / "pusht_cchi_v7_replay.zarr" if not zarr_path.is_dir(): raw_dir.mkdir(parents=True, exist_ok=True) - download_and_extract_zip(pusht_url, raw_dir) - return zarr_path + download_and_extract_zip(pusht_url, zarr_path) -def download_xarm(root: str, dataset_id: str, fps: int = 15) -> Path: - root = Path(root) - raw_dir: Path = root / "xarm_datasets_raw" - if not raw_dir.exists(): - import zipfile +def download_xarm(raw_dir: str) -> Path: + """Download all xarm datasets at once""" + import zipfile - import gdown + import gdown - raw_dir.mkdir(parents=True, exist_ok=True) - # from https://github.com/fyhMer/fowm/blob/main/scripts/download_datasets.py - url = "https://drive.google.com/uc?id=1nhxpykGtPDhmQKm-_B8zBSywVRdgeVya" - zip_path = raw_dir / "data.zip" - gdown.download(url, str(zip_path), quiet=False) - print("Extracting...") - with zipfile.ZipFile(str(zip_path), "r") as zip_f: - for member in zip_f.namelist(): - if member.startswith("data/xarm") and member.endswith(".pkl"): - print(member) - zip_f.extract(member=member) - zip_path.unlink() - - dataset_path: Path = root / f"{dataset_id}" - return dataset_path + raw_dir.mkdir(parents=True, exist_ok=True) + # from https://github.com/fyhMer/fowm/blob/main/scripts/download_datasets.py + url = "https://drive.google.com/uc?id=1nhxpykGtPDhmQKm-_B8zBSywVRdgeVya" + zip_path = raw_dir / "data.zip" + gdown.download(url, str(zip_path), quiet=False) + print("Extracting...") + with zipfile.ZipFile(str(zip_path), "r") as zip_f: + for path in zip_f.namelist(): + if path.startswith("data/xarm") and path.endswith(".pkl"): + zip_f.extract(member=path) + # move to corresponding raw directory + member_dir = path.replace("/buffer.pkl", "") + member_raw_dir = path.replace("/buffer.pkl", "_raw") + shutil.move(path, member_raw_dir) + shutil.rmtree(member_dir) + zip_path.unlink() def download_aloha(root: str, dataset_id: str) -> Path: @@ -148,13 +144,9 @@ def download_aloha(root: str, dataset_id: str) -> Path: return raw_dir -def download_umi(root: str, dataset_id: str) -> Path: +def download_umi(raw_dir: Path) -> Path: url_cup_in_the_wild = "https://real.stanford.edu/umi/data/zarr_datasets/cup_in_the_wild.zarr.zip" - cup_in_the_wild_zarr = Path("umi/cup_in_the_wild/cup_in_the_wild.zarr") - - root = Path(root) - raw_dir: Path = root / f"{dataset_id}_raw" - zarr_path: Path = (raw_dir / cup_in_the_wild_zarr).resolve() + zarr_path = raw_dir / "cup_in_the_wild.zarr" if not zarr_path.is_dir(): raw_dir.mkdir(parents=True, exist_ok=True) download_and_extract_zip(url_cup_in_the_wild, zarr_path) @@ -162,7 +154,7 @@ def download_umi(root: str, dataset_id: str) -> Path: if __name__ == "__main__": - root = "data" + data_dir = Path("data") dataset_ids = [ "pusht", "xarm_lift_medium", @@ -176,4 +168,5 @@ if __name__ == "__main__": "umi_cup_in_the_wild", ] for dataset_id in dataset_ids: - download_raw(root=root, dataset_id=dataset_id) + raw_dir = data_dir / f"{dataset_id}_raw" + download_raw(raw_dir, dataset_id) diff --git a/lerobot/common/datasets/push_dataset_to_hub/aloha_hdf5_format.py b/lerobot/common/datasets/push_dataset_to_hub/aloha_hdf5_format.py new file mode 100644 index 00000000..6ef534fb --- /dev/null +++ b/lerobot/common/datasets/push_dataset_to_hub/aloha_hdf5_format.py @@ -0,0 +1,171 @@ +""" +Contains utilities to process raw data format of HDF5 files like in: https://github.com/tonyzhaozh/act +""" + +import re +import shutil +from pathlib import Path + +import h5py +import torch +import tqdm +from datasets import Dataset, Features, Image, Sequence, Value +from PIL import Image as PILImage + +from lerobot.common.datasets.push_dataset_to_hub.utils import concatenate_episodes, save_images_concurrently +from lerobot.common.datasets.utils import ( + hf_transform_to_torch, +) +from lerobot.common.datasets.video_utils import encode_video_frames + + +def is_valid_raw_format(raw_dir) -> bool: + cameras = ["top"] + + hdf5_files: list[Path] = list(raw_dir.glob("episode_*.hdf5")) + if len(hdf5_files) == 0: + return False + try: + hdf5_files = sorted(hdf5_files, key=lambda x: int(re.search(r"episode_(\d+).hdf5", x.name).group(1))) + except AttributeError: + # All file names must contain a numerical identifier matching 'episode_(\\d+).hdf5 + return False + + # Check if the sequence is consecutive eg episode_0, episode_1, episode_2, etc. + # If not, return False + previous_number = None + for file in hdf5_files: + current_number = int(re.search(r"episode_(\d+).hdf5", file.name).group(1)) + if previous_number is not None and current_number - previous_number != 1: + return False + previous_number = current_number + + for file in hdf5_files: + try: + with h5py.File(file, "r") as file: + # Check for the expected datasets within the HDF5 file + required_datasets = ["/action", "/observations/qpos"] + # Add camera-specific image datasets to the required datasets + camera_datasets = [f"/observations/images/{cam}" for cam in cameras] + required_datasets.extend(camera_datasets) + + if not all(dataset in file for dataset in required_datasets): + return False + except OSError: + return False + return True + + +def load_from_raw(raw_dir, out_dir, fps, video, debug): + hdf5_files = list(raw_dir.glob("*.hdf5")) + hdf5_files = sorted(hdf5_files, key=lambda x: int(re.search(r"episode_(\d+)", x.name).group(1))) + ep_dicts = [] + episode_data_index = {"from": [], "to": []} + + id_from = 0 + + for ep_path in tqdm.tqdm(hdf5_files): + with h5py.File(ep_path, "r") as ep: + ep_idx = int(re.search(r"episode_(\d+)", ep_path.name).group(1)) + num_frames = ep["/action"].shape[0] + + # last step of demonstration is considered done + done = torch.zeros(num_frames, dtype=torch.bool) + done[-1] = True + + state = torch.from_numpy(ep["/observations/qpos"][:]) + action = torch.from_numpy(ep["/action"][:]) + + ep_dict = {} + + cameras = list(ep["/observations/images"].keys()) + for cam in cameras: + img_key = f"observation.images.{cam}" + imgs_array = ep[f"/observations/images/{cam}"][:] # b h w c + if video: + # save png images in temporary directory + tmp_imgs_dir = out_dir / "tmp_images" + save_images_concurrently(imgs_array, tmp_imgs_dir) + + # encode images to a mp4 video + video_path = out_dir / "videos" / f"{img_key}_episode_{ep_idx:06d}.mp4" + encode_video_frames(tmp_imgs_dir, video_path, fps) + + # clean temporary images directory + shutil.rmtree(tmp_imgs_dir) + + # store the episode idx + ep_dict[img_key] = torch.tensor([ep_idx] * num_frames, dtype=torch.int) + else: + ep_dict[img_key] = [PILImage.fromarray(x) for x in imgs_array] + + ep_dict["observation.state"] = state + ep_dict["action"] = action + ep_dict["episode_index"] = torch.tensor([ep_idx] * num_frames) + ep_dict["frame_index"] = torch.arange(0, num_frames, 1) + ep_dict["timestamp"] = torch.arange(0, num_frames, 1) / fps + ep_dict["next.done"] = done + # TODO(rcadene): compute reward and success + # ep_dict[""next.reward"] = reward + # ep_dict[""next.success"] = success + + assert isinstance(ep_idx, int) + ep_dicts.append(ep_dict) + + episode_data_index["from"].append(id_from) + episode_data_index["to"].append(id_from + num_frames) + + id_from += num_frames + + # process first episode only + if debug: + break + + data_dict = concatenate_episodes(ep_dicts) + return data_dict, episode_data_index + + +def to_hf_dataset(data_dict, video) -> Dataset: + features = {} + + image_keys = [key for key in data_dict if "observation.images." in key] + for image_key in image_keys: + if video: + features[image_key] = Value(dtype="int64", id="video") + else: + features[image_key] = Image() + + features["observation.state"] = Sequence( + length=data_dict["observation.state"].shape[1], feature=Value(dtype="float32", id=None) + ) + features["action"] = Sequence( + length=data_dict["action"].shape[1], feature=Value(dtype="float32", id=None) + ) + features["episode_index"] = Value(dtype="int64", id=None) + features["frame_index"] = Value(dtype="int64", id=None) + features["timestamp"] = Value(dtype="float32", id=None) + features["next.done"] = Value(dtype="bool", id=None) + features["index"] = Value(dtype="int64", id=None) + # TODO(rcadene): add reward and success + # features["next.reward"] = Value(dtype="float32", id=None) + # features["next.success"] = Value(dtype="bool", id=None) + + hf_dataset = Dataset.from_dict(data_dict, features=Features(features)) + hf_dataset.set_transform(hf_transform_to_torch) + return hf_dataset + + +def from_raw_to_lerobot_format(raw_dir: Path, out_dir: Path, fps=None, video=True, debug=False): + assert is_valid_raw_format(raw_dir), f"{raw_dir} does not match the expected format." + + if fps is None: + fps = 50 + + data_dir, episode_data_index = load_from_raw(raw_dir, out_dir, fps, video, debug) + hf_dataset = to_hf_dataset(data_dir, video) + + info = { + "fps": fps, + "video": video, + } + return hf_dataset, episode_data_index, info diff --git a/lerobot/common/datasets/push_dataset_to_hub/aloha_processor.py b/lerobot/common/datasets/push_dataset_to_hub/aloha_processor.py deleted file mode 100644 index f6a66577..00000000 --- a/lerobot/common/datasets/push_dataset_to_hub/aloha_processor.py +++ /dev/null @@ -1,199 +0,0 @@ -import re -from pathlib import Path - -import h5py -import torch -import tqdm -from datasets import Dataset, Features, Image, Sequence, Value -from PIL import Image as PILImage - -from lerobot.common.datasets.push_dataset_to_hub.utils import concatenate_episodes -from lerobot.common.datasets.utils import ( - hf_transform_to_torch, -) - - -class AlohaProcessor: - """ - Process HDF5 files formatted like in: https://github.com/tonyzhaozh/act - - Attributes: - folder_path (Path): Path to the directory containing HDF5 files. - cameras (list[str]): List of camera identifiers to check in the files. - fps (int): Frames per second used in timestamp calculations. - - Methods: - is_valid() -> bool: - Validates if each HDF5 file within the folder contains all required datasets. - preprocess() -> dict: - Processes the files and returns structured data suitable for further analysis. - to_hf_dataset(data_dict: dict) -> Dataset: - Converts processed data into a Hugging Face Dataset object. - """ - - def __init__(self, folder_path: Path, cameras: list[str] | None = None, fps: int | None = None): - """ - Initializes the AlohaProcessor with a specified directory path containing HDF5 files, - an optional list of cameras, and a frame rate. - - Args: - folder_path (Path): The directory path where HDF5 files are stored. - cameras (list[str] | None): Optional list of cameras to validate within the files. Defaults to ['top'] if None. - fps (int): Frame rate for the datasets, used in time calculations. Default is 50. - - Examples: - >>> processor = AlohaProcessor(Path("path_to_hdf5_directory"), ["camera1", "camera2"]) - >>> processor.is_valid() - True - """ - self.folder_path = folder_path - if cameras is None: - cameras = ["top"] - self.cameras = cameras - if fps is None: - fps = 50 - self._fps = fps - - @property - def fps(self) -> int: - return self._fps - - def is_valid(self) -> bool: - """ - Validates the HDF5 files in the specified folder to ensure they contain the required datasets - for actions, positions, and images for each specified camera. - - Returns: - bool: True if all files are valid HDF5 files with all required datasets, False otherwise. - """ - hdf5_files: list[Path] = list(self.folder_path.glob("episode_*.hdf5")) - if len(hdf5_files) == 0: - return False - try: - hdf5_files = sorted( - hdf5_files, key=lambda x: int(re.search(r"episode_(\d+).hdf5", x.name).group(1)) - ) - except AttributeError: - # All file names must contain a numerical identifier matching 'episode_(\\d+).hdf5 - return False - - # Check if the sequence is consecutive eg episode_0, episode_1, episode_2, etc. - # If not, return False - previous_number = None - for file in hdf5_files: - current_number = int(re.search(r"episode_(\d+).hdf5", file.name).group(1)) - if previous_number is not None and current_number - previous_number != 1: - return False - previous_number = current_number - - for file in hdf5_files: - try: - with h5py.File(file, "r") as file: - # Check for the expected datasets within the HDF5 file - required_datasets = ["/action", "/observations/qpos"] - # Add camera-specific image datasets to the required datasets - camera_datasets = [f"/observations/images/{cam}" for cam in self.cameras] - required_datasets.extend(camera_datasets) - - if not all(dataset in file for dataset in required_datasets): - return False - except OSError: - return False - return True - - def preprocess(self): - """ - Collects episode data from the HDF5 file and returns it as an AlohaStep named tuple. - - Returns: - AlohaStep: Named tuple containing episode data. - - Raises: - ValueError: If the file is not valid. - """ - if not self.is_valid(): - raise ValueError("The HDF5 file is invalid or does not contain the required datasets.") - - hdf5_files = list(self.folder_path.glob("*.hdf5")) - hdf5_files = sorted(hdf5_files, key=lambda x: int(re.search(r"episode_(\d+)", x.name).group(1))) - ep_dicts = [] - episode_data_index = {"from": [], "to": []} - - id_from = 0 - - for ep_path in tqdm.tqdm(hdf5_files): - with h5py.File(ep_path, "r") as ep: - ep_id = int(re.search(r"episode_(\d+)", ep_path.name).group(1)) - num_frames = ep["/action"].shape[0] - - # last step of demonstration is considered done - done = torch.zeros(num_frames, dtype=torch.bool) - done[-1] = True - - state = torch.from_numpy(ep["/observations/qpos"][:]) - action = torch.from_numpy(ep["/action"][:]) - - ep_dict = {} - - for cam in self.cameras: - image = torch.from_numpy(ep[f"/observations/images/{cam}"][:]) # b h w c - ep_dict[f"observation.images.{cam}"] = [PILImage.fromarray(x.numpy()) for x in image] - - ep_dict.update( - { - "observation.state": state, - "action": action, - "episode_index": torch.tensor([ep_id] * num_frames), - "frame_index": torch.arange(0, num_frames, 1), - "timestamp": torch.arange(0, num_frames, 1) / self.fps, - # TODO(rcadene): compute reward and success - # "next.reward": reward, - "next.done": done, - # "next.success": success, - } - ) - - assert isinstance(ep_id, int) - ep_dicts.append(ep_dict) - - episode_data_index["from"].append(id_from) - episode_data_index["to"].append(id_from + num_frames) - - id_from += num_frames - - data_dict = concatenate_episodes(ep_dicts) - return data_dict, episode_data_index - - def to_hf_dataset(self, data_dict) -> Dataset: - """ - Converts a dictionary of data into a Hugging Face Dataset object. - - Args: - data_dict (dict): A dictionary containing the data to be converted. - - Returns: - Dataset: The converted Hugging Face Dataset object. - """ - image_features = {f"observation.images.{cam}": Image() for cam in self.cameras} - features = { - "observation.state": Sequence( - length=data_dict["observation.state"].shape[1], feature=Value(dtype="float32", id=None) - ), - "action": Sequence(length=data_dict["action"].shape[1], feature=Value(dtype="float32", id=None)), - "episode_index": Value(dtype="int64", id=None), - "frame_index": Value(dtype="int64", id=None), - "timestamp": Value(dtype="float32", id=None), - # "next.reward": Value(dtype="float32", id=None), - "next.done": Value(dtype="bool", id=None), - # "next.success": Value(dtype="bool", id=None), - "index": Value(dtype="int64", id=None), - } - update_features = {**image_features, **features} - features = Features(update_features) - hf_dataset = Dataset.from_dict(data_dict, features=features) - hf_dataset.set_transform(hf_transform_to_torch) - - return hf_dataset - - def cleanup(self): - pass diff --git a/lerobot/common/datasets/push_dataset_to_hub/pusht_zarr_format.py b/lerobot/common/datasets/push_dataset_to_hub/pusht_zarr_format.py new file mode 100644 index 00000000..03b765ce --- /dev/null +++ b/lerobot/common/datasets/push_dataset_to_hub/pusht_zarr_format.py @@ -0,0 +1,215 @@ +"""Process zarr files formatted like in: https://github.com/real-stanford/diffusion_policy""" + +import shutil +from pathlib import Path + +import numpy as np +import torch +import tqdm +import zarr +from datasets import Dataset, Features, Image, Sequence, Value +from PIL import Image as PILImage + +from lerobot.common.datasets.push_dataset_to_hub.utils import concatenate_episodes, save_images_concurrently +from lerobot.common.datasets.utils import ( + hf_transform_to_torch, +) +from lerobot.common.datasets.video_utils import encode_video_frames + + +def is_valid_raw_format(raw_dir) -> bool: + zarr_path = raw_dir / "pusht_cchi_v7_replay.zarr" + try: + zarr_data = zarr.open(zarr_path, mode="r") + except Exception: + # TODO (azouitine): Handle the exception properly + return False + required_datasets = { + "data/action", + "data/img", + "data/keypoint", + "data/n_contacts", + "data/state", + "meta/episode_ends", + } + for dataset in required_datasets: + if dataset not in zarr_data: + return False + nb_frames = zarr_data["data/img"].shape[0] + + required_datasets.remove("meta/episode_ends") + + return all(nb_frames == zarr_data[dataset].shape[0] for dataset in required_datasets) + + +def load_from_raw(raw_dir, out_dir, fps, video, debug): + try: + import pymunk + from gym_pusht.envs.pusht import PushTEnv, pymunk_to_shapely + + from lerobot.common.datasets.push_dataset_to_hub._diffusion_policy_replay_buffer import ( + ReplayBuffer as DiffusionPolicyReplayBuffer, + ) + except ModuleNotFoundError as e: + print("`gym_pusht` is not installed. Please install it with `pip install 'lerobot[gym_pusht]'`") + raise e + # as define in gmy-pusht env: https://github.com/huggingface/gym-pusht/blob/e0684ff988d223808c0a9dcfaba9dc4991791370/gym_pusht/envs/pusht.py#L174 + success_threshold = 0.95 # 95% coverage, + + zarr_path = raw_dir / "pusht_cchi_v7_replay.zarr" + zarr_data = DiffusionPolicyReplayBuffer.copy_from_path(zarr_path) + + episode_ids = torch.from_numpy(zarr_data.get_episode_idxs()) + num_episodes = zarr_data.meta["episode_ends"].shape[0] + assert len( + {zarr_data[key].shape[0] for key in zarr_data.keys()} # noqa: SIM118 + ), "Some data type dont have the same number of total frames." + + # TODO(rcadene): verify that goal pose is expected to be fixed + goal_pos_angle = np.array([256, 256, np.pi / 4]) # x, y, theta (in radians) + goal_body = PushTEnv.get_goal_pose_body(goal_pos_angle) + + imgs = torch.from_numpy(zarr_data["img"]) # b h w c + states = torch.from_numpy(zarr_data["state"]) + actions = torch.from_numpy(zarr_data["action"]) + + ep_dicts = [] + episode_data_index = {"from": [], "to": []} + + id_from = 0 + for ep_idx in tqdm.tqdm(range(num_episodes)): + id_to = zarr_data.meta["episode_ends"][ep_idx] + num_frames = id_to - id_from + + # sanity check + assert (episode_ids[id_from:id_to] == ep_idx).all() + + # get image + image = imgs[id_from:id_to] + assert image.min() >= 0.0 + assert image.max() <= 255.0 + image = image.type(torch.uint8) + + # get state + state = states[id_from:id_to] + agent_pos = state[:, :2] + block_pos = state[:, 2:4] + block_angle = state[:, 4] + + # get reward, success, done + reward = torch.zeros(num_frames) + success = torch.zeros(num_frames, dtype=torch.bool) + done = torch.zeros(num_frames, dtype=torch.bool) + for i in range(num_frames): + space = pymunk.Space() + space.gravity = 0, 0 + space.damping = 0 + + # Add walls. + walls = [ + PushTEnv.add_segment(space, (5, 506), (5, 5), 2), + PushTEnv.add_segment(space, (5, 5), (506, 5), 2), + PushTEnv.add_segment(space, (506, 5), (506, 506), 2), + PushTEnv.add_segment(space, (5, 506), (506, 506), 2), + ] + space.add(*walls) + + block_body = PushTEnv.add_tee(space, block_pos[i].tolist(), block_angle[i].item()) + goal_geom = pymunk_to_shapely(goal_body, block_body.shapes) + block_geom = pymunk_to_shapely(block_body, block_body.shapes) + intersection_area = goal_geom.intersection(block_geom).area + goal_area = goal_geom.area + coverage = intersection_area / goal_area + reward[i] = np.clip(coverage / success_threshold, 0, 1) + success[i] = coverage > success_threshold + + # last step of demonstration is considered done + done[-1] = True + + ep_dict = {} + + imgs_array = [x.numpy() for x in image] + if video: + # save png images in temporary directory + tmp_imgs_dir = out_dir / "tmp_images" + save_images_concurrently(imgs_array, tmp_imgs_dir) + + # encode images to a mp4 video + video_path = out_dir / "videos" / f"observation.image_episode_{ep_idx:06d}.mp4" + encode_video_frames(tmp_imgs_dir, video_path, fps) + + # clean temporary images directory + shutil.rmtree(tmp_imgs_dir) + + # store the episode index + ep_dict["observation.image"] = torch.tensor([ep_idx] * num_frames, dtype=torch.int) + else: + ep_dict["observation.image"] = [PILImage.fromarray(x) for x in imgs_array] + + ep_dict["observation.state"] = agent_pos + ep_dict["action"] = actions[id_from:id_to] + ep_dict["episode_index"] = torch.tensor([ep_idx] * num_frames, dtype=torch.int) + ep_dict["frame_index"] = torch.arange(0, num_frames, 1) + ep_dict["timestamp"] = torch.arange(0, num_frames, 1) / fps + # ep_dict["next.observation.image"] = image[1:], + # ep_dict["next.observation.state"] = agent_pos[1:], + # TODO(rcadene)] = verify that reward and done are aligned with image and agent_pos + ep_dict["next.reward"] = torch.cat([reward[1:], reward[[-1]]]) + ep_dict["next.done"] = torch.cat([done[1:], done[[-1]]]) + ep_dict["next.success"] = torch.cat([success[1:], success[[-1]]]) + ep_dicts.append(ep_dict) + + episode_data_index["from"].append(id_from) + episode_data_index["to"].append(id_from + num_frames) + + id_from += num_frames + + # process first episode only + if debug: + break + + data_dict = concatenate_episodes(ep_dicts) + return data_dict, episode_data_index + + +def to_hf_dataset(data_dict, video): + features = {} + + if video: + features["observation.image"] = Value(dtype="int64", id="video") + else: + features["observation.image"] = Image() + + features["observation.state"] = Sequence( + length=data_dict["observation.state"].shape[1], feature=Value(dtype="float32", id=None) + ) + features["action"] = Sequence( + length=data_dict["action"].shape[1], feature=Value(dtype="float32", id=None) + ) + features["episode_index"] = Value(dtype="int64", id=None) + features["frame_index"] = Value(dtype="int64", id=None) + features["timestamp"] = Value(dtype="float32", id=None) + features["next.reward"] = Value(dtype="float32", id=None) + features["next.done"] = Value(dtype="bool", id=None) + features["next.success"] = Value(dtype="bool", id=None) + features["index"] = Value(dtype="int64", id=None) + + hf_dataset = Dataset.from_dict(data_dict, features=Features(features)) + hf_dataset.set_transform(hf_transform_to_torch) + return hf_dataset + + +def from_raw_to_lerobot_format(raw_dir: Path, out_dir: Path, fps=None, video=True, debug=False): + assert is_valid_raw_format(raw_dir), f"{raw_dir} does not match the expected format." + + if fps is None: + fps = 10 + + data_dict, episode_data_index = load_from_raw(raw_dir, out_dir, fps, video, debug) + hf_dataset = to_hf_dataset(data_dict, video) + + info = { + "fps": fps, + "video": video, + } + return hf_dataset, episode_data_index, info diff --git a/lerobot/common/datasets/push_dataset_to_hub/umi_zarr_format.py b/lerobot/common/datasets/push_dataset_to_hub/umi_zarr_format.py new file mode 100644 index 00000000..4d2cba15 --- /dev/null +++ b/lerobot/common/datasets/push_dataset_to_hub/umi_zarr_format.py @@ -0,0 +1,203 @@ +"""Process UMI (Universal Manipulation Interface) data stored in Zarr format like in: https://github.com/real-stanford/universal_manipulation_interface""" + +import shutil +from pathlib import Path + +import numpy as np +import torch +import tqdm +import zarr +from datasets import Dataset, Features, Image, Sequence, Value +from PIL import Image as PILImage + +from lerobot.common.datasets.push_dataset_to_hub._umi_imagecodecs_numcodecs import register_codecs +from lerobot.common.datasets.push_dataset_to_hub.utils import concatenate_episodes, save_images_concurrently +from lerobot.common.datasets.utils import ( + hf_transform_to_torch, +) +from lerobot.common.datasets.video_utils import encode_video_frames + + +def is_valid_raw_format(raw_dir) -> bool: + zarr_path = raw_dir / "cup_in_the_wild.zarr" + + try: + zarr_data = zarr.open(zarr_path, mode="r") + except Exception: + # TODO (azouitine): Handle the exception properly + return False + required_datasets = { + "data/robot0_demo_end_pose", + "data/robot0_demo_start_pose", + "data/robot0_eef_pos", + "data/robot0_eef_rot_axis_angle", + "data/robot0_gripper_width", + "meta/episode_ends", + "data/camera0_rgb", + } + for dataset in required_datasets: + if dataset not in zarr_data: + return False + + register_codecs() + + nb_frames = zarr_data["data/camera0_rgb"].shape[0] + + required_datasets.remove("meta/episode_ends") + + return all(nb_frames == zarr_data[dataset].shape[0] for dataset in required_datasets) + + +def load_from_raw(raw_dir, out_dir, fps, video, debug): + zarr_path = raw_dir / "cup_in_the_wild.zarr" + zarr_data = zarr.open(zarr_path, mode="r") + + # We process the image data separately because it is too large to fit in memory + end_pose = torch.from_numpy(zarr_data["data/robot0_demo_end_pose"][:]) + start_pos = torch.from_numpy(zarr_data["data/robot0_demo_start_pose"][:]) + eff_pos = torch.from_numpy(zarr_data["data/robot0_eef_pos"][:]) + eff_rot_axis_angle = torch.from_numpy(zarr_data["data/robot0_eef_rot_axis_angle"][:]) + gripper_width = torch.from_numpy(zarr_data["data/robot0_gripper_width"][:]) + + states_pos = torch.cat([eff_pos, eff_rot_axis_angle], dim=1) + states = torch.cat([states_pos, gripper_width], dim=1) + + episode_ends = zarr_data["meta/episode_ends"][:] + num_episodes = episode_ends.shape[0] + + episode_ids = torch.from_numpy(get_episode_idxs(episode_ends)) + + # We convert it in torch tensor later because the jit function does not support torch tensors + episode_ends = torch.from_numpy(episode_ends) + + ep_dicts = [] + episode_data_index = {"from": [], "to": []} + + id_from = 0 + for ep_idx in tqdm.tqdm(range(num_episodes)): + id_to = episode_ends[ep_idx] + num_frames = id_to - id_from + + # sanity heck + assert (episode_ids[id_from:id_to] == ep_idx).all() + + # TODO(rcadene): save temporary images of the episode? + + state = states[id_from:id_to] + + ep_dict = {} + + # load 57MB of images in RAM (400x224x224x3 uint8) + imgs_array = zarr_data["data/camera0_rgb"][id_from:id_to] + if video: + # save png images in temporary directory + tmp_imgs_dir = out_dir / "tmp_images" + save_images_concurrently(imgs_array, tmp_imgs_dir) + + # encode images to a mp4 video + video_path = out_dir / "videos" / f"observation.image_episode_{ep_idx:06d}.mp4" + encode_video_frames(tmp_imgs_dir, video_path, fps) + + # clean temporary images directory + shutil.rmtree(tmp_imgs_dir) + + # store the episode index + ep_dict["observation.image"] = torch.tensor([ep_idx] * num_frames, dtype=torch.int) + else: + ep_dict["observation.image"] = [PILImage.fromarray(x) for x in imgs_array] + + ep_dict["observation.state"] = state + ep_dict["episode_index"] = torch.tensor([ep_idx] * num_frames, dtype=torch.int) + ep_dict["frame_index"] = torch.arange(0, num_frames, 1) + ep_dict["timestamp"] = torch.arange(0, num_frames, 1) / fps + ep_dict["episode_data_index_from"] = torch.tensor([id_from] * num_frames) + ep_dict["episode_data_index_to"] = torch.tensor([id_from + num_frames] * num_frames) + ep_dict["end_pose"] = end_pose[id_from:id_to] + ep_dict["start_pos"] = start_pos[id_from:id_to] + ep_dict["gripper_width"] = gripper_width[id_from:id_to] + ep_dicts.append(ep_dict) + + episode_data_index["from"].append(id_from) + episode_data_index["to"].append(id_from + num_frames) + id_from += num_frames + + # process first episode only + if debug: + break + + data_dict = concatenate_episodes(ep_dicts) + + total_frames = id_from + data_dict["index"] = torch.arange(0, total_frames, 1) + + return data_dict, episode_data_index + + +def to_hf_dataset(data_dict, video): + features = {} + + if video: + features["observation.image"] = Value(dtype="int64", id="video") + else: + features["observation.image"] = Image() + + features["observation.state"] = Sequence( + length=data_dict["observation.state"].shape[1], feature=Value(dtype="float32", id=None) + ) + features["episode_index"] = Value(dtype="int64", id=None) + features["frame_index"] = Value(dtype="int64", id=None) + features["timestamp"] = Value(dtype="float32", id=None) + features["index"] = Value(dtype="int64", id=None) + features["episode_data_index_from"] = Value(dtype="int64", id=None) + features["episode_data_index_to"] = Value(dtype="int64", id=None) + # `start_pos` and `end_pos` respectively represent the positions of the end-effector + # at the beginning and the end of the episode. + # `gripper_width` indicates the distance between the grippers, and this value is included + # in the state vector, which comprises the concatenation of the end-effector position + # and gripper width. + features["end_pose"] = Sequence( + length=data_dict["end_pose"].shape[1], feature=Value(dtype="float32", id=None) + ) + features["start_pos"] = Sequence( + length=data_dict["start_pos"].shape[1], feature=Value(dtype="float32", id=None) + ) + features["gripper_width"] = Sequence( + length=data_dict["gripper_width"].shape[1], feature=Value(dtype="float32", id=None) + ) + + hf_dataset = Dataset.from_dict(data_dict, features=Features(features)) + hf_dataset.set_transform(hf_transform_to_torch) + return hf_dataset + + +def from_raw_to_lerobot_format(raw_dir: Path, out_dir: Path, fps=None, video=True, debug=False): + assert is_valid_raw_format(raw_dir), f"{raw_dir} does not match the expected format." + + if fps is None: + # For umi cup in the wild: https://arxiv.org/pdf/2402.10329#table.caption.16 + fps = 10 + + data_dict, episode_data_index = load_from_raw(raw_dir, out_dir, fps, video, debug) + hf_dataset = to_hf_dataset(data_dict, video) + + info = { + "fps": fps, + "video": video, + } + return hf_dataset, episode_data_index, info + + +def get_episode_idxs(episode_ends: np.ndarray) -> np.ndarray: + # Optimized and simplified version of this function: https://github.com/real-stanford/universal_manipulation_interface/blob/298776ce251f33b6b3185a98d6e7d1f9ad49168b/diffusion_policy/common/replay_buffer.py#L374 + from numba import jit + + @jit(nopython=True) + def _get_episode_idxs(episode_ends): + result = np.zeros((episode_ends[-1],), dtype=np.int64) + start_idx = 0 + for episode_number, end_idx in enumerate(episode_ends): + result[start_idx:end_idx] = episode_number + start_idx = end_idx + return result + + return _get_episode_idxs(episode_ends) diff --git a/lerobot/common/datasets/push_dataset_to_hub/utils.py b/lerobot/common/datasets/push_dataset_to_hub/utils.py index 1076eb4e..1b12c0b7 100644 --- a/lerobot/common/datasets/push_dataset_to_hub/utils.py +++ b/lerobot/common/datasets/push_dataset_to_hub/utils.py @@ -1,3 +1,8 @@ +from concurrent.futures import ThreadPoolExecutor +from pathlib import Path + +import numpy +import PIL import torch @@ -18,3 +23,16 @@ def concatenate_episodes(ep_dicts): total_frames = data_dict["frame_index"].shape[0] data_dict["index"] = torch.arange(0, total_frames, 1) return data_dict + + +def save_images_concurrently(imgs_array: numpy.array, out_dir: Path, max_workers: int = 4): + out_dir = Path(out_dir) + out_dir.mkdir(parents=True, exist_ok=True) + + def save_image(img_array, i, out_dir): + img = PIL.Image.fromarray(img_array) + img.save(str(out_dir / f"frame_{i:06d}.png"), quality=100) + + num_images = len(imgs_array) + with ThreadPoolExecutor(max_workers=max_workers) as executor: + [executor.submit(save_image, imgs_array[i], i, out_dir) for i in range(num_images)] diff --git a/lerobot/common/datasets/push_dataset_to_hub/xarm_pkl_format.py b/lerobot/common/datasets/push_dataset_to_hub/xarm_pkl_format.py new file mode 100644 index 00000000..ef46566d --- /dev/null +++ b/lerobot/common/datasets/push_dataset_to_hub/xarm_pkl_format.py @@ -0,0 +1,173 @@ +"""Process pickle files formatted like in: https://github.com/fyhMer/fowm""" + +import pickle +import shutil +from pathlib import Path + +import einops +import torch +import tqdm +from datasets import Dataset, Features, Image, Sequence, Value +from PIL import Image as PILImage + +from lerobot.common.datasets.push_dataset_to_hub.utils import concatenate_episodes, save_images_concurrently +from lerobot.common.datasets.utils import ( + hf_transform_to_torch, +) +from lerobot.common.datasets.video_utils import encode_video_frames + + +def is_valid_raw_format(raw_dir): + keys = {"actions", "rewards", "dones"} + nested_keys = {"observations": {"rgb", "state"}, "next_observations": {"rgb", "state"}} + + xarm_files = list(raw_dir.glob("*.pkl")) + if len(xarm_files) != 1: + return False + + try: + with open(xarm_files[0], "rb") as f: + dataset_dict = pickle.load(f) + except Exception: + return False + + if not isinstance(dataset_dict, dict): + return False + + if not all(k in dataset_dict for k in keys): + return False + + # Check for consistent lengths in nested keys + try: + expected_len = len(dataset_dict["actions"]) + if any(len(dataset_dict[key]) != expected_len for key in keys if key in dataset_dict): + return False + + for key, subkeys in nested_keys.items(): + nested_dict = dataset_dict.get(key, {}) + if any(len(nested_dict[subkey]) != expected_len for subkey in subkeys if subkey in nested_dict): + return False + except KeyError: # If any expected key or subkey is missing + return False + + return True # All checks passed + + +def load_from_raw(raw_dir, out_dir, fps, video, debug): + xarm_files = list(raw_dir.glob("*.pkl")) + + with open(xarm_files[0], "rb") as f: + dataset_dict = pickle.load(f) + ep_dicts = [] + episode_data_index = {"from": [], "to": []} + + id_from = 0 + id_to = 0 + ep_idx = 0 + total_frames = dataset_dict["actions"].shape[0] + for i in tqdm.tqdm(range(total_frames)): + id_to += 1 + + if not dataset_dict["dones"][i]: + continue + + num_frames = id_to - id_from + + image = torch.tensor(dataset_dict["observations"]["rgb"][id_from:id_to]) + image = einops.rearrange(image, "b c h w -> b h w c") + state = torch.tensor(dataset_dict["observations"]["state"][id_from:id_to]) + action = torch.tensor(dataset_dict["actions"][id_from:id_to]) + # TODO(rcadene): we have a missing last frame which is the observation when the env is done + # it is critical to have this frame for tdmpc to predict a "done observation/state" + # next_image = torch.tensor(dataset_dict["next_observations"]["rgb"][id_from:id_to]) + # next_state = torch.tensor(dataset_dict["next_observations"]["state"][id_from:id_to]) + next_reward = torch.tensor(dataset_dict["rewards"][id_from:id_to]) + next_done = torch.tensor(dataset_dict["dones"][id_from:id_to]) + + ep_dict = {} + + imgs_array = [x.numpy() for x in image] + if video: + # save png images in temporary directory + tmp_imgs_dir = out_dir / "tmp_images" + save_images_concurrently(imgs_array, tmp_imgs_dir) + + # encode images to a mp4 video + video_path = out_dir / "videos" / f"observation.image_episode_{ep_idx:06d}.mp4" + encode_video_frames(tmp_imgs_dir, video_path, fps) + + # clean temporary images directory + shutil.rmtree(tmp_imgs_dir) + + # store the episode index + ep_dict["observation.image"] = torch.tensor([ep_idx] * num_frames, dtype=torch.int) + else: + ep_dict["observation.image"] = [PILImage.fromarray(x) for x in imgs_array] + + ep_dict["observation.state"] = state + ep_dict["action"] = action + ep_dict["episode_index"] = torch.tensor([ep_idx] * num_frames, dtype=torch.int) + ep_dict["frame_index"] = torch.arange(0, num_frames, 1) + ep_dict["timestamp"] = torch.arange(0, num_frames, 1) / fps + # ep_dict["next.observation.image"] = next_image + # ep_dict["next.observation.state"] = next_state + ep_dict["next.reward"] = next_reward + ep_dict["next.done"] = next_done + ep_dicts.append(ep_dict) + + episode_data_index["from"].append(id_from) + episode_data_index["to"].append(id_from + num_frames) + + id_from = id_to + ep_idx += 1 + + # process first episode only + if debug: + break + + data_dict = concatenate_episodes(ep_dicts) + return data_dict, episode_data_index + + +def to_hf_dataset(data_dict, video): + features = {} + + if video: + features["observation.image"] = Value(dtype="int64", id="video") + else: + features["observation.image"] = Image() + + features["observation.state"] = Sequence( + length=data_dict["observation.state"].shape[1], feature=Value(dtype="float32", id=None) + ) + features["action"] = Sequence( + length=data_dict["action"].shape[1], feature=Value(dtype="float32", id=None) + ) + features["episode_index"] = Value(dtype="int64", id=None) + features["frame_index"] = Value(dtype="int64", id=None) + features["timestamp"] = Value(dtype="float32", id=None) + features["next.reward"] = Value(dtype="float32", id=None) + features["next.done"] = Value(dtype="bool", id=None) + features["index"] = Value(dtype="int64", id=None) + # TODO(rcadene): add success + # features["next.success"] = Value(dtype='bool', id=None) + + hf_dataset = Dataset.from_dict(data_dict, features=Features(features)) + hf_dataset.set_transform(hf_transform_to_torch) + return hf_dataset + + +def from_raw_to_lerobot_format(raw_dir: Path, out_dir: Path, fps=None, video=True, debug=False): + assert is_valid_raw_format(raw_dir), f"{raw_dir} does not match the expected format." + + if fps is None: + fps = 15 + + data_dict, episode_data_index = load_from_raw(raw_dir, out_dir, fps, video, debug) + hf_dataset = to_hf_dataset(data_dict, video) + + info = { + "fps": fps, + "video": video, + } + return hf_dataset, episode_data_index, info diff --git a/lerobot/scripts/push_dataset_to_hub.py b/lerobot/scripts/push_dataset_to_hub.py index 0c04fd37..ffcecc93 100644 --- a/lerobot/scripts/push_dataset_to_hub.py +++ b/lerobot/scripts/push_dataset_to_hub.py @@ -1,295 +1,215 @@ +""" +Use this script to convert your dataset into our dataset format and upload it to the Hugging Face hub, +or store it locally. Our dataset format is lightweight, fast to load from, and does not require any +installation of neural net specific packages like pytorch, tensorflow, jax. + +Example: +``` +python lerobot/scripts/push_dataset_to_hub.py \ +--data-dir data \ +--dataset-id pusht \ +--raw-format pusht_zarr \ +--community-id lerobot \ +--revision v1.2 \ +--dry-run 1 \ +--save-to-disk 1 \ +--save-tests-to-disk 0 \ +--debug 1 + +python lerobot/scripts/push_dataset_to_hub.py \ +--data-dir data \ +--dataset-id xarm_lift_medium \ +--raw-format xarm_pkl \ +--community-id lerobot \ +--revision v1.2 \ +--dry-run 1 \ +--save-to-disk 1 \ +--save-tests-to-disk 0 \ +--debug 1 + +python lerobot/scripts/push_dataset_to_hub.py \ +--data-dir data \ +--dataset-id aloha_sim_insertion_scripted \ +--raw-format aloha_hdf5 \ +--community-id lerobot \ +--revision v1.2 \ +--dry-run 1 \ +--save-to-disk 1 \ +--save-tests-to-disk 0 \ +--debug 1 + +python lerobot/scripts/push_dataset_to_hub.py \ +--data-dir data \ +--dataset-id umi_cup_in_the_wild \ +--raw-format umi_zarr \ +--community-id lerobot \ +--revision v1.2 \ +--dry-run 1 \ +--save-to-disk 1 \ +--save-tests-to-disk 0 \ +--debug 1 +``` +""" + import argparse import json import shutil from pathlib import Path -from typing import Any, Protocol import torch -from datasets import Dataset from huggingface_hub import HfApi from safetensors.torch import save_file from lerobot.common.datasets.push_dataset_to_hub._download_raw import download_raw -from lerobot.common.datasets.push_dataset_to_hub.aloha_processor import ( - AlohaProcessor, -) -from lerobot.common.datasets.push_dataset_to_hub.pusht_processor import PushTProcessor -from lerobot.common.datasets.push_dataset_to_hub.umi_processor import UmiProcessor -from lerobot.common.datasets.push_dataset_to_hub.xarm_processor import XarmProcessor from lerobot.common.datasets.utils import compute_stats, flatten_dict -def push_lerobot_dataset_to_hub( - hf_dataset: Dataset, - episode_data_index: dict[str, list[int]], - info: dict[str, Any], - stats: dict[str, dict[str, torch.Tensor]], - root: Path, - revision: str, - dataset_id: str, - community_id: str = "lerobot", - dry_run: bool = False, -) -> None: - """ - Pushes a dataset to the Hugging Face Hub. +def get_from_raw_to_lerobot_format_fn(raw_format): + if raw_format == "pusht_zarr": + from lerobot.common.datasets.push_dataset_to_hub.pusht_zarr_format import from_raw_to_lerobot_format + elif raw_format == "umi_zarr": + from lerobot.common.datasets.push_dataset_to_hub.umi_zarr_format import from_raw_to_lerobot_format + elif raw_format == "aloha_hdf5": + from lerobot.common.datasets.push_dataset_to_hub.aloha_hdf5_format import from_raw_to_lerobot_format + elif raw_format == "xarm_pkl": + from lerobot.common.datasets.push_dataset_to_hub.xarm_pkl_format import from_raw_to_lerobot_format + else: + raise ValueError(raw_format) - Args: - hf_dataset (Dataset): The dataset to be pushed. - episode_data_index (dict[str, list[int]]): The index of episode data. - info (dict[str, Any]): Information about the dataset, eg. fps. - stats (dict[str, dict[str, torch.Tensor]]): Statistics of the dataset. - root (Path): The root directory of the dataset. - revision (str): The revision of the dataset. - dataset_id (str): The ID of the dataset. - community_id (str, optional): The ID of the community or the user where the - dataset will be stored. Defaults to "lerobot". - dry_run (bool, optional): If True, performs a dry run without actually pushing the dataset. Defaults to False. - """ - if not dry_run: - # push to main to indicate latest version - hf_dataset.push_to_hub(f"{community_id}/{dataset_id}", token=True) + return from_raw_to_lerobot_format - # push to version branch - hf_dataset.push_to_hub(f"{community_id}/{dataset_id}", token=True, revision=revision) - # create and store meta_data - meta_data_dir = root / community_id / dataset_id / "meta_data" +def save_meta_data(info, stats, episode_data_index, meta_data_dir): meta_data_dir.mkdir(parents=True, exist_ok=True) - # info + # save info info_path = meta_data_dir / "info.json" - with open(str(info_path), "w") as f: json.dump(info, f, indent=4) - # stats + + # save stats stats_path = meta_data_dir / "stats.safetensors" save_file(flatten_dict(stats), stats_path) - # episode_data_index + # save episode_data_index episode_data_index = {key: torch.tensor(episode_data_index[key]) for key in episode_data_index} ep_data_idx_path = meta_data_dir / "episode_data_index.safetensors" save_file(episode_data_index, ep_data_idx_path) - if not dry_run: - api = HfApi() +def push_meta_data_to_hub(meta_data_dir, repo_id, revision): + api = HfApi() + + def upload(filename, revision): api.upload_file( - path_or_fileobj=info_path, - path_in_repo=str(info_path).replace(f"{root}/{community_id}/{dataset_id}", ""), - repo_id=f"{community_id}/{dataset_id}", - repo_type="dataset", - ) - api.upload_file( - path_or_fileobj=info_path, - path_in_repo=str(info_path).replace(f"{root}/{community_id}/{dataset_id}", ""), - repo_id=f"{community_id}/{dataset_id}", - repo_type="dataset", + path_or_fileobj=meta_data_dir / filename, + path_in_repo=f"meta_data/{filename}", + repo_id=repo_id, revision=revision, + repo_type="dataset", ) - # stats - api.upload_file( - path_or_fileobj=stats_path, - path_in_repo=str(stats_path).replace(f"{root}/{community_id}/{dataset_id}", ""), - repo_id=f"{community_id}/{dataset_id}", - repo_type="dataset", - ) - api.upload_file( - path_or_fileobj=stats_path, - path_in_repo=str(stats_path).replace(f"{root}/{community_id}/{dataset_id}", ""), - repo_id=f"{community_id}/{dataset_id}", - repo_type="dataset", - revision=revision, - ) - - api.upload_file( - path_or_fileobj=ep_data_idx_path, - path_in_repo=str(ep_data_idx_path).replace(f"{root}/{community_id}/{dataset_id}", ""), - repo_id=f"{community_id}/{dataset_id}", - repo_type="dataset", - ) - api.upload_file( - path_or_fileobj=ep_data_idx_path, - path_in_repo=str(ep_data_idx_path).replace(f"{root}/{community_id}/{dataset_id}", ""), - repo_id=f"{community_id}/{dataset_id}", - repo_type="dataset", - revision=revision, - ) - - # copy in tests folder, the first episode and the meta_data directory - num_items_first_ep = episode_data_index["to"][0] - episode_data_index["from"][0] - hf_dataset.select(range(num_items_first_ep)).with_format("torch").save_to_disk( - f"tests/data/{community_id}/{dataset_id}/train" - ) - if Path(f"tests/data/{community_id}/{dataset_id}/meta_data").exists(): - shutil.rmtree(f"tests/data/{community_id}/{dataset_id}/meta_data") - shutil.copytree(meta_data_dir, f"tests/data/{community_id}/{dataset_id}/meta_data") + upload("info.json", "main") + upload("info.json", revision) + upload("stats.safetensors", "main") + upload("stats.safetensors", revision) + upload("episode_data_index.safetensors", "main") + upload("episode_data_index.safetensors", revision) def push_dataset_to_hub( + data_dir: Path, dataset_id: str, - root: Path, - fps: int | None, - dataset_folder: Path | None = None, - dry_run: bool = False, - revision: str = "v1.1", - community_id: str = "lerobot", - no_preprocess: bool = False, - path_save_to_disk: str | None = None, - **kwargs, -) -> None: - """ - Download a raw dataset if needed or access a local raw dataset, detect the raw format (e.g. aloha, pusht, umi) and process it accordingly in a common data format which is then pushed to the Hugging Face Hub. + raw_format: str | None, + community_id: str, + revision: str, + dry_run: bool, + save_to_disk: bool, + tests_data_dir: Path, + save_tests_to_disk: bool, + fps: int, + video: bool, + debug: bool, +): + raw_dir = data_dir / f"{dataset_id}_raw" - Args: - dataset_id (str): The ID of the dataset. - root (Path): The root directory where the dataset will be downloaded. - fps (int | None): The desired frames per second for the dataset. - dataset_folder (Path | None, optional): The path to the dataset folder. If not provided, the dataset will be downloaded using the dataset ID. Defaults to None. - dry_run (bool, optional): If True, performs a dry run without actually pushing the dataset. Defaults to False. - revision (str, optional): Version of the `push_dataset_to_hub.py` codebase used to preprocess the dataset. Defaults to "v1.1". - community_id (str, optional): The ID of the community. Defaults to "lerobot". - no_preprocess (bool, optional): If True, does not preprocesses the dataset. Defaults to False. - path_save_to_disk (str | None, optional): The path to save the dataset to disk. Works when `dry_run` is True, which allows to only save on disk without uploading. By default, the dataset is not saved on disk. - **kwargs: Additional keyword arguments for the preprocessor init method. + out_dir = data_dir / community_id / dataset_id + meta_data_dir = out_dir / "meta_data" + videos_dir = out_dir / "videos" + tests_out_dir = tests_data_dir / community_id / dataset_id + tests_meta_data_dir = tests_out_dir / "meta_data" - """ - if dataset_folder is None: - dataset_folder = download_raw(root=root, dataset_id=dataset_id) + if out_dir.exists(): + shutil.rmtree(out_dir) - if not no_preprocess: - processor = guess_dataset_type(dataset_folder=dataset_folder, fps=fps, **kwargs) - data_dict, episode_data_index = processor.preprocess() - hf_dataset = processor.to_hf_dataset(data_dict) + if tests_out_dir.exists(): + shutil.rmtree(tests_out_dir) - info = { - "fps": processor.fps, - } - stats: dict[str, dict[str, torch.Tensor]] = compute_stats(hf_dataset) + if not raw_dir.exists(): + download_raw(raw_dir, dataset_id) - push_lerobot_dataset_to_hub( - hf_dataset=hf_dataset, - episode_data_index=episode_data_index, - info=info, - stats=stats, - root=root, - revision=revision, - dataset_id=dataset_id, - community_id=community_id, - dry_run=dry_run, - ) - if path_save_to_disk: - hf_dataset.with_format("torch").save_to_disk(dataset_path=str(path_save_to_disk)) + if raw_format is None: + # TODO(rcadene, adilzouitine): implement auto_find_raw_format + raise NotImplementedError() + # raw_format = auto_find_raw_format(raw_dir) - processor.cleanup() + from_raw_to_lerobot_format = get_from_raw_to_lerobot_format_fn(raw_format) + # convert dataset from original raw format to LeRobot format + hf_dataset, episode_data_index, info = from_raw_to_lerobot_format(raw_dir, out_dir, fps, video, debug) -class DatasetProcessor(Protocol): - """A class for processing datasets. + stats = compute_stats(hf_dataset) - This class provides methods for validating, preprocessing, and converting datasets. + if save_to_disk: + hf_dataset = hf_dataset.with_format(None) # to remove transforms that cant be saved + hf_dataset.save_to_disk(str(out_dir)) - Args: - folder_path (str): The path to the folder containing the dataset. - fps (int | None): The frames per second of the dataset. If None, the default value is used. - *args: Additional positional arguments. - **kwargs: Additional keyword arguments. - """ + if not dry_run or save_to_disk: + # mandatory for upload + save_meta_data(info, stats, episode_data_index, meta_data_dir) - def __init__(self, folder_path: str, fps: int | None, *args, **kwargs) -> None: ... + if not dry_run: + repo_id = f"{community_id}/{dataset_id}" + hf_dataset.push_to_hub(repo_id, token=True, revision="main") + hf_dataset.push_to_hub(repo_id, token=True, revision=revision) + push_meta_data_to_hub(repo_id, meta_data_dir) + if video: + push_meta_data_to_hub(repo_id, videos_dir) - def is_valid(self) -> bool: - """Check if the dataset is valid. + if save_tests_to_disk: + # get the first episode + num_items_first_ep = episode_data_index["to"][0] - episode_data_index["from"][0] + test_hf_dataset = hf_dataset.select(range(num_items_first_ep)) - Returns: - bool: True if the dataset is valid, False otherwise. - """ - ... + test_hf_dataset = test_hf_dataset.with_format(None) + test_hf_dataset.save_to_disk(str(tests_out_dir / "train")) - def preprocess(self) -> tuple[dict, dict]: - """Preprocess the dataset. - - Returns: - tuple[dict, dict]: A tuple containing two dictionaries representing the preprocessed data. - """ - ... - - def to_hf_dataset(self, data_dict: dict) -> Dataset: - """Convert the preprocessed data to a Hugging Face dataset. - - Args: - data_dict (dict): The preprocessed data. - - Returns: - Dataset: The converted Hugging Face dataset. - """ - ... - - @property - def fps(self) -> int: - """Get the frames per second of the dataset. - - Returns: - int: The frames per second. - """ - ... - - def cleanup(self): - """Clean up any resources used by the dataset processor.""" - ... - - -def guess_dataset_type(dataset_folder: Path, **processor_kwargs) -> DatasetProcessor: - if (processor := AlohaProcessor(folder_path=dataset_folder, **processor_kwargs)).is_valid(): - return processor - if (processor := XarmProcessor(folder_path=dataset_folder, **processor_kwargs)).is_valid(): - return processor - if (processor := PushTProcessor(folder_path=dataset_folder, **processor_kwargs)).is_valid(): - return processor - if (processor := UmiProcessor(folder_path=dataset_folder, **processor_kwargs)).is_valid(): - return processor - # TODO: Propose a registration mechanism for new dataset types - raise ValueError(f"Could not guess dataset type for folder {dataset_folder}") + # copy meta data to tests directory + if Path(tests_meta_data_dir).exists(): + shutil.rmtree(tests_meta_data_dir) + shutil.copytree(meta_data_dir, tests_meta_data_dir) def main(): - """ - Main function to process command line arguments and push dataset to Hugging Face Hub. - - Parses command line arguments to get dataset details and conditions under which the dataset - is processed and pushed. It manages dataset preparation and uploading based on the user-defined parameters. - """ - parser = argparse.ArgumentParser( - description="Push a dataset to the Hugging Face Hub with optional parameters for customization.", - epilog=""" - Example usage: - python -m lerobot.scripts.push_dataset_to_hub --dataset-folder /path/to/dataset --dataset-id example_dataset --root /path/to/root --dry-run --revision v2.0 --community-id example_community --fps 30 --path-save-to-disk /path/to/save --no-preprocess - - This processes and optionally pushes 'example_dataset' located in '/path/to/dataset' to Hugging Face Hub, - with various parameters to control the processing and uploading behavior. - """, - ) + parser = argparse.ArgumentParser() parser.add_argument( - "--dataset-folder", + "--data-dir", type=Path, - default=None, - help="The filesystem path to the dataset folder. If not provided, the dataset must be identified and managed by other means.", + required=True, + help="Root directory containing datasets (e.g. `data` or `tmp/data` or `/tmp/lerobot/data`).", ) parser.add_argument( "--dataset-id", type=str, required=True, - help="Unique identifier for the dataset to be processed and uploaded.", + help="Name of the dataset (e.g. `pusht`, `aloha_sim_insertion_human`), which matches the folder where the data is stored (e.g. `data/pusht`).", ) parser.add_argument( - "--root", type=Path, required=True, help="Root directory where the dataset operations are managed." - ) - parser.add_argument( - "--dry-run", - action="store_true", - help="Simulate the push process without uploading any data, for testing purposes.", + "--raw-format", + type=str, + help="Dataset type (e.g. `pusht_zarr`, `umi_zarr`, `aloha_hdf5`, `xarm_pkl`). If not provided, will be detected automatically.", ) parser.add_argument( "--community-id", @@ -297,41 +217,57 @@ def main(): default="lerobot", help="Community or user ID under which the dataset will be hosted on the Hub.", ) - parser.add_argument( - "--fps", - type=int, - help="Target frame rate for video or image sequence datasets. Optional and applicable only if the dataset includes temporal media.", - ) parser.add_argument( "--revision", type=str, - default="v1.0", - help="Dataset version identifier to manage different iterations of the dataset.", + default="v1.2", + help="Codebase version used to generate the dataset.", ) parser.add_argument( - "--no-preprocess", - action="store_true", - help="Does not preprocess the dataset, set this flag if you only want dowload the dataset raw.", + "--dry-run", + type=int, + default=0, + help="Run everything without uploading to hub, for testing purposes or storing a dataset locally.", ) parser.add_argument( - "--path-save-to-disk", + "--save-to-disk", + type=int, + default=1, + help="Save the dataset in the directory specified by `--data-dir`.", + ) + parser.add_argument( + "--tests-data-dir", type=Path, - help="Optional path where the processed dataset can be saved locally.", + default="tests/data", + help="Directory containing tests artifacts datasets.", + ) + parser.add_argument( + "--save-tests-to-disk", + type=int, + default=1, + help="Save the dataset with 1 episode used for unit tests in the directory specified by `--tests-data-dir`.", + ) + parser.add_argument( + "--fps", + type=int, + help="Frame rate used to collect videos. If not provided, use the default one specified in the code.", + ) + parser.add_argument( + "--video", + type=int, + # TODO(rcadene): enable when video PR merges + default=0, + help="Convert each episode of the raw dataset to an mp4 video. This option allows 60 times lower disk space consumption and 25 faster loading time during training.", + ) + parser.add_argument( + "--debug", + type=int, + default=0, + help="Debug mode process the first episode only.", ) args = parser.parse_args() - - push_dataset_to_hub( - dataset_folder=args.dataset_folder, - dataset_id=args.dataset_id, - root=args.root, - fps=args.fps, - dry_run=args.dry_run, - community_id=args.community_id, - revision=args.revision, - no_preprocess=args.no_preprocess, - path_save_to_disk=args.path_save_to_disk, - ) + push_dataset_to_hub(**vars(args)) if __name__ == "__main__": diff --git a/tests/test_datasets.py b/tests/test_datasets.py index 9438abb7..d7f54ecb 100644 --- a/tests/test_datasets.py +++ b/tests/test_datasets.py @@ -12,7 +12,9 @@ from safetensors.torch import load_file import lerobot from lerobot.common.datasets.factory import make_dataset -from lerobot.common.datasets.lerobot_dataset import LeRobotDataset +from lerobot.common.datasets.lerobot_dataset import ( + LeRobotDataset, +) from lerobot.common.datasets.utils import ( compute_stats, flatten_dict, @@ -22,8 +24,7 @@ from lerobot.common.datasets.utils import ( unflatten_dict, ) from lerobot.common.utils.utils import init_hydra_config - -from .utils import DEFAULT_CONFIG_PATH, DEVICE +from tests.utils import DEFAULT_CONFIG_PATH, DEVICE @pytest.mark.parametrize("env_name, repo_id, policy_name", lerobot.env_dataset_policy_triplets)