Rename openx to droid + Improve all (not tested)

This commit is contained in:
Remi Cadene
2025-03-18 16:28:09 +00:00
parent 7866c1f7d1
commit 1a5c1ef9c7
14 changed files with 1241 additions and 2709 deletions

View File

@@ -0,0 +1,143 @@
# Port DROID 1.0.1 dataset to LeRobotDataset
## Download
TODO
It will take 2 TB in your local disk.
## Port on a single computer
First, install tensorflow dataset utilities to read from raw files:
```bash
pip install tensorflow
pip install tensorflow_datasets
```
Then run this script to start porting the dataset:
```bash
python examples/port_datasets/droid_rlds/port_droid.py \
--raw-dir /your/data/droid/1.0.1 \
--repo-id your_id/droid_1.0.1 \
--push-to-hub
```
It will take 400GB in your local disk.
As usual, your LeRobotDataset will be stored in your huggingface/lerobot cache folder.
WARNING: it will take 7 days for porting the dataset locally and 3 days to upload, so we will need to parallelize over multiple nodes on a slurm cluster.
NOTE: For development, run this script to start porting a shard:
```bash
python examples/port_datasets/droid_rlds/port.py \
--raw-dir /your/data/droid/1.0.1 \
--repo-id your_id/droid_1.0.1 \
--num-shards 2048 \
--shard-index 0
```
## Port over SLURM
### 1. Port one shard per job
First, install slurm utilities from Hugging Face:
```bash
pip install datatrove
```
Then run this script to start porting shards of the dataset:
```bash
python examples/port_datasets/droid_rlds/slurm_port_shards.py \
--raw-dir /your/data/droid/1.0.1 \
--repo-id your_id/droid_1.0.1 \
--logs-dir /your/logs \
--job-name port_droid \
--partition your_partition \
--workers 2048 \
--cpus-per-task 8 \
--mem-per-cpu 1950M
```
**Note on how to set your command line arguments**
Regarding `--partition`, find yours by running:
```bash
info --format="%R"`
```
and select the CPU partition if you have one. No GPU needed.
Regarding `--workers`, it is the number of slurm jobs you will launch in parallel. 2048 is the maximum number, since there is 2048 shards in Droid. This big number will certainly max-out your cluster.
Regarding `--cpus-per-task` and `--mem-per-cpu`, by default it will use ~16GB of RAM (8*1950M) which is recommended to load the raw frames and 8 CPUs which can be useful to parallelize the encoding of the frames.
Find the number of CPUs and Memory of the nodes of your partition by running:
```bash
sinfo -N -p your_partition -h -o "%N cpus=%c mem=%m"
```
**Useful commands to check progress and debug**
Check if your jobs are running:
```bash
squeue -u $USER`
```
You should see a list with job indices like `15125385_155` where `15125385` is the job index and `155` is the worker index. The output/print of this worker is written in real time in `/your/logs/job_name/slurm_jobs/15125385_155.out`. For instance, you can inspect the content of this file by running `less /your/logs/job_name/slurm_jobs/15125385_155.out`.
Check the progression of your jobs by running:
```bash
jobs_status /your/logs
```
If it's not 100% and no more slurm job is running, it means that some of them failed. Inspect the logs by running:
```bash
failed_logs /your/logs/job_name
```
If there is an issue in the code, you can fix it in debug mode with `--slurm 0` which allows to set breakpoint:
```bash
python examples/port_datasets/droid_rlds/slurm_port_shards.py --slurm 0 ...
```
And you can relaunch the same command, which will skip the completed jobs:
```bash
python examples/port_datasets/droid_rlds/slurm_port_shards.py --slurm 1 ...
```
Once all jobs are completed, you will have one dataset per shard (e.g. `droid_1.0.1_world_2048_rank_1594`) saved on disk in your `/lerobot/home/dir/your_id` directory. You can find your `/lerobot/home/dir` by running:
```bash
python -c "from lerobot.common.constants import HF_LEROBOT_HOME;print(HF_LEROBOT_HOME)"
```
### 2. Aggregate all shards
Run this script to start aggregation:
```bash
python examples/port_datasets/droid_rlds/slurm_aggregate_shards.py \
--repo-id your_id/droid_1.0.1 \
--logs-dir /your/logs \
--job-name aggr_droid \
--partition your_partition \
--workers 2048 \
--cpus-per-task 8 \
--mem-per-cpu 1950M
```
Once all jobs are completed, you will have one dataset your `/lerobot/home/dir/your_id/droid_1.0.1` directory.
### 3. Upload dataset
Run this script to start uploading:
```bash
python examples/port_datasets/droid_rlds/slurm_upload.py \
--repo-id your_id/droid_1.0.1 \
--logs-dir /your/logs \
--job-name aggr_droid \
--partition your_partition \
--workers 50 \
--cpus-per-task 4 \
--mem-per-cpu 1950M
```

View File

@@ -0,0 +1,410 @@
#!/usr/bin/env python
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import logging
import time
from pathlib import Path
import numpy as np
import tensorflow_datasets as tfds
from lerobot.common.datasets.lerobot_dataset import LeRobotDataset
from lerobot.common.utils.utils import get_elapsed_time_in_days_hours_minutes_seconds
DROID_SHARDS = 2048
DROID_FPS = 15
DROID_ROBOT_TYPE = "Franka"
# Dataset schema slightly adapted from: https://droid-dataset.github.io/droid/the-droid-dataset.html#-dataset-schema
DROID_FEATURES = {
# true on first step of the episode
"is_first": {
"dtype": "bool",
"shape": (1,),
"names": None,
},
# true on last step of the episode
"is_last": {
"dtype": "bool",
"shape": (1,),
"names": None,
},
# true on last step of the episode if it is a terminal step, True for demos
"is_terminal": {
"dtype": "bool",
"shape": (1,),
"names": None,
},
# language_instruction is also stored as "task" to follow LeRobot standard
"language_instruction": {
"dtype": "string",
"shape": (1,),
"names": None,
},
"language_instruction_2": {
"dtype": "string",
"shape": (1,),
"names": None,
},
"language_instruction_3": {
"dtype": "string",
"shape": (1,),
"names": None,
},
"observation.state.gripper_position": {
"dtype": "float32",
"shape": (1,),
"names": {
"axes": ["gripper"],
},
},
"observation.state.cartesian_position": {
"dtype": "float32",
"shape": (6,),
"names": {
"axes": ["x", "y", "z", "roll", "pitch", "yaw"],
},
},
"observation.state.joint_position": {
"dtype": "float32",
"shape": (7,),
"names": {
"axes": ["x", "y", "z", "roll", "pitch", "yaw"],
},
},
# Add this new feature to follow LeRobot standard of using joint position + gripper
"observation.state": {
"dtype": "float32",
"shape": (8,),
"names": {
"axes": ["joint_0", "joint_1", "joint_2", "joint_3", "joint_4", "joint_5", "joint_6", "gripper"],
},
},
# Initially called wrist_image_left
"observation.images.wrist_left": {
"dtype": "video",
"shape": (180, 320, 3),
"names": [
"height",
"width",
"channels",
],
},
# Initially called exterior_image_1_left
"observation.images.exterior_1_left": {
"dtype": "video",
"shape": (180, 320, 3),
"names": [
"height",
"width",
"channels",
],
},
# Initially called exterior_image_2_left
"observation.images.exterior_2_left": {
"dtype": "video",
"shape": (180, 320, 3),
"names": [
"height",
"width",
"channels",
],
},
"action.gripper_position": {
"dtype": "float32",
"shape": (1,),
"names": {
"axes": ["gripper"],
},
},
"action.gripper_velocity": {
"dtype": "float32",
"shape": (1,),
"names": {
"axes": ["gripper"],
},
},
"action.cartesian_position": {
"dtype": "float32",
"shape": (6,),
"names": {
"axes": ["x", "y", "z", "roll", "pitch", "yaw"],
},
},
"action.cartesian_velocity": {
"dtype": "float32",
"shape": (6,),
"names": {
"axes": ["x", "y", "z", "roll", "pitch", "yaw"],
},
},
"action.joint_position": {
"dtype": "float32",
"shape": (7,),
"names": {
"axes": ["joint_0", "joint_1", "joint_2", "joint_3", "joint_4", "joint_5", "joint_6"],
},
},
"action.joint_velocity": {
"dtype": "float32",
"shape": (7,),
"names": {
"axes": ["joint_0", "joint_1", "joint_2", "joint_3", "joint_4", "joint_5", "joint_6"],
},
},
# This feature was called "action" in RLDS dataset and consists of [6x joint velocities, 1x gripper position]
"action.original": {
"dtype": "float32",
"shape": (7,),
"names": {
"axes": ["x", "y", "z", "roll", "pitch", "yaw", "gripper"],
},
},
# Add this new feature to follow LeRobot standard of using joint position + gripper
"action": {
"dtype": "float32",
"shape": (8,),
"names": {
"axes": ["joint_0", "joint_1", "joint_2", "joint_3", "joint_4", "joint_5", "joint_6", "gripper"],
},
},
"discount": {
"dtype": "float32",
"shape": (1,),
"names": None,
},
"reward": {
"dtype": "float32",
"shape": (1,),
"names": None,
},
# Meta data that are the same for all frames in the episode
"task_category": {
"dtype": "string",
"shape": (1,),
"names": None,
},
"building": {
"dtype": "string",
"shape": (1,),
"names": None,
},
"collector_id": {
"dtype": "string",
"shape": (1,),
"names": None,
},
"date": {
"dtype": "string",
"shape": (1,),
"names": None,
},
"camera_extrinsics.wrist_left": {
"dtype": "float32",
"shape": (6,),
"names": {
"axes": ["x", "y", "z", "roll", "pitch", "yaw"],
},
},
"camera_extrinsics.exterior_1_left": {
"dtype": "float32",
"shape": (6,),
"names": {
"axes": ["x", "y", "z", "roll", "pitch", "yaw"],
},
},
"camera_extrinsics.exterior_2_left": {
"dtype": "float32",
"shape": (6,),
"names": {
"axes": ["x", "y", "z", "roll", "pitch", "yaw"],
},
},
"is_episode_successful": {
"dtype": "bool",
"shape": (1,),
"names": None,
},
}
def is_episode_successful(tf_episode_metadata):
# Adapted from: https://github.com/droid-dataset/droid_policy_learning/blob/dd1020eb20d981f90b5ff07dc80d80d5c0cb108b/robomimic/utils/rlds_utils.py#L8
return "/success/" in tf_episode_metadata["file_path"].numpy().decode()
def generate_lerobot_frames(tf_episode):
m = tf_episode["episode_metadata"]
frame_meta = {
"task_category": m["building"].numpy().decode(),
"building": m["building"].numpy().decode(),
"collector_id": m["collector_id"].numpy().decode(),
"date": m["date"].numpy().decode(),
"camera_extrinsics.wrist_left": m["extrinsics_wrist_cam"].numpy(),
"camera_extrinsics.exterior_1_left": m["extrinsics_exterior_cam_1"].numpy(),
"camera_extrinsics.exterior_2_left": m["extrinsics_exterior_cam_2"].numpy(),
"is_episode_successful": np.array([is_episode_successful(m)]),
}
for f in tf_episode["steps"]:
# Dataset schema slightly adapted from: https://droid-dataset.github.io/droid/the-droid-dataset.html#-dataset-schema
frame = {
"is_first": np.array([f["is_first"].numpy()]),
"is_last": np.array([f["is_last"].numpy()]),
"is_terminal": np.array([f["is_terminal"].numpy()]),
"language_instruction": f["language_instruction"].numpy().decode(),
"language_instruction_2": f["language_instruction_2"].numpy().decode(),
"language_instruction_3": f["language_instruction_3"].numpy().decode(),
"observation.state.gripper_position": f["observation"]["gripper_position"].numpy(),
"observation.state.cartesian_position": f["observation"]["cartesian_position"].numpy(),
"observation.state.joint_position": f["observation"]["joint_position"].numpy(),
"observation.images.wrist_left": f["observation"]["wrist_image_left"].numpy(),
"observation.images.exterior_1_left": f["observation"]["exterior_image_1_left"].numpy(),
"observation.images.exterior_2_left": f["observation"]["exterior_image_2_left"].numpy(),
"action.gripper_position": f["action_dict"]["gripper_position"].numpy(),
"action.gripper_velocity": f["action_dict"]["gripper_velocity"].numpy(),
"action.cartesian_position": f["action_dict"]["cartesian_position"].numpy(),
"action.cartesian_velocity": f["action_dict"]["cartesian_velocity"].numpy(),
"action.joint_position": f["action_dict"]["joint_position"].numpy(),
"action.joint_velocity": f["action_dict"]["joint_velocity"].numpy(),
"discount": np.array([f["discount"].numpy()]),
"reward": np.array([f["reward"].numpy()]),
"action.original": f["action"].numpy(),
}
# language_instruction is also stored as "task" to follow LeRobot standard
frame["task"] = frame["language_instruction"]
# Add this new feature to follow LeRobot standard of using joint position + gripper
frame["observation.state"] = np.concatenate(
[frame["observation.state.joint_position"], frame["observation.state.gripper_position"]]
)
frame["action"] = np.concatenate([frame["action.joint_position"], frame["action.gripper_position"]])
# Meta data that are the same for all frames in the episode
frame.update(frame_meta)
# Cast fp64 to fp32
for key in frame:
if isinstance(frame[key], np.ndarray) and frame[key].dtype == np.float64:
frame[key] = frame[key].astype(np.float32)
yield frame
def port_droid(
raw_dir: Path,
repo_id: str = None,
push_to_hub: bool = False,
num_shards: int | None = None,
shard_index: int | None = None,
):
dataset_name = raw_dir.parent.name
version = raw_dir.name
data_dir = raw_dir.parent.parent
builder = tfds.builder(f"{dataset_name}/{version}", data_dir=data_dir, version="")
if num_shards is not None:
tfds_num_shards = builder.info.splits["train"].num_shards
if tfds_num_shards != DROID_SHARDS:
raise ValueError(
f"Number of shards of Droid dataset is expected to be {DROID_SHARDS} but is {tfds_num_shards}."
)
if num_shards != tfds_num_shards:
raise ValueError(
f"We only shard over the fixed number of shards provided by tensorflow dataset ({tfds_num_shards}), but {num_shards} shards provided instead."
)
if shard_index >= tfds_num_shards:
raise ValueError(
f"Shard index is greater than the num of shards ({shard_index} >= {num_shards})."
)
raw_dataset = builder.as_dataset(split=f"train[{shard_index}shard]")
else:
raw_dataset = builder.as_dataset(split="train")
lerobot_dataset = LeRobotDataset.create(
repo_id=repo_id,
robot_type=DROID_ROBOT_TYPE,
fps=DROID_FPS,
features=DROID_FEATURES,
)
start_time = time.time()
num_episodes = raw_dataset.cardinality().numpy().item()
logging.info(f"Number of episodes {num_episodes}")
for episode_index, episode in enumerate(raw_dataset):
logging.info(f"{episode_index} / {num_episodes} episodes processed")
elapsed_time = time.time() - start_time
d, h, m, s = get_elapsed_time_in_days_hours_minutes_seconds(elapsed_time)
logging.info(f"It has been {d} days, {h} hours, {m} minutes, {s:.3f} seconds")
for frame in generate_lerobot_frames(episode):
lerobot_dataset.add_frame(frame)
lerobot_dataset.save_episode()
logging.info("Save_episode")
if push_to_hub:
lerobot_dataset.push_to_hub(
# Add openx tag, since it belongs to the openx collection of datasets
tags=["openx"],
private=False,
)
def main():
parser = argparse.ArgumentParser()
parser.add_argument(
"--raw-dir",
type=Path,
required=True,
help="Directory containing input raw datasets (e.g. `path/to/dataset` or `path/to/dataset/version).",
)
parser.add_argument(
"--repo-id",
type=str,
help="Repositery identifier on Hugging Face: a community or a user name `/` the name of the dataset, required when push-to-hub is True",
)
parser.add_argument(
"--push-to-hub",
action="store_true",
help="Upload to hub.",
)
parser.add_argument(
"--num-shards",
type=int,
default=None,
help="Number of shards. Can be either None to load the full dataset, or 2048 to load one of the 2048 tensorflow dataset files.",
)
parser.add_argument(
"--shard-index",
type=int,
default=None,
help="Index of the shard. Can be either None to load the full dataset, or in [0,2047] to load one of the 2048 tensorflow dataset files.",
)
args = parser.parse_args()
port_droid(**vars(args))
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,287 @@
#!/usr/bin/env python
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import logging
import tqdm
from datatrove.executor import LocalPipelineExecutor
from datatrove.executor.slurm import SlurmPipelineExecutor
from datatrove.pipeline.base import PipelineStep
from examples.port_datasets.droid_rlds.port_droid import DROID_SHARDS
from lerobot.common.datasets.aggregate import validate_all_metadata
from lerobot.common.datasets.lerobot_dataset import LeRobotDatasetMetadata
from lerobot.common.datasets.utils import write_episode, write_episode_stats, write_info, write_task
from lerobot.common.utils.utils import init_logging
class AggregateDatasets(PipelineStep):
def __init__(
self,
repo_ids: list[str],
aggregated_repo_id: str,
):
super().__init__()
self.repo_ids = repo_ids
self.aggr_repo_id = aggregated_repo_id
self.create_aggr_dataset()
def create_aggr_dataset(self):
init_logging()
logging.info("Start aggregate_datasets")
all_metadata = [LeRobotDatasetMetadata(repo_id) for repo_id in self.repo_ids]
fps, robot_type, features = validate_all_metadata(all_metadata)
# Create resulting dataset folder
aggr_meta = LeRobotDatasetMetadata.create(
repo_id=self.aggr_repo_id,
fps=fps,
robot_type=robot_type,
features=features,
)
logging.info("Find all tasks")
# find all tasks, deduplicate them, create new task indices for each dataset
# indexed by dataset index
datasets_task_index_to_aggr_task_index = {}
aggr_task_index = 0
for dataset_index, meta in enumerate(tqdm.tqdm(all_metadata, desc="Find all tasks")):
task_index_to_aggr_task_index = {}
for task_index, task in meta.tasks.items():
if task not in aggr_meta.task_to_task_index:
# add the task to aggr tasks mappings
aggr_meta.tasks[aggr_task_index] = task
aggr_meta.task_to_task_index[task] = aggr_task_index
aggr_task_index += 1
# add task_index anyway
task_index_to_aggr_task_index[task_index] = aggr_meta.task_to_task_index[task]
datasets_task_index_to_aggr_task_index[dataset_index] = task_index_to_aggr_task_index
logging.info("Prepare copy data and videos")
datasets_ep_idx_to_aggr_ep_idx = {}
datasets_aggr_episode_index_shift = {}
aggr_episode_index_shift = 0
for dataset_index, meta in enumerate(tqdm.tqdm(all_metadata, desc="Prepare copy data and videos")):
ep_idx_to_aggr_ep_idx = {}
for episode_index in range(meta.total_episodes):
aggr_episode_index = episode_index + aggr_episode_index_shift
ep_idx_to_aggr_ep_idx[episode_index] = aggr_episode_index
datasets_ep_idx_to_aggr_ep_idx[dataset_index] = ep_idx_to_aggr_ep_idx
datasets_aggr_episode_index_shift[dataset_index] = aggr_episode_index_shift
# populate episodes
for episode_index, episode_dict in meta.episodes.items():
aggr_episode_index = episode_index + aggr_episode_index_shift
episode_dict["episode_index"] = aggr_episode_index
aggr_meta.episodes[aggr_episode_index] = episode_dict
# populate episodes_stats
for episode_index, episode_stats in meta.episodes_stats.items():
aggr_episode_index = episode_index + aggr_episode_index_shift
aggr_meta.episodes_stats[aggr_episode_index] = episode_stats
# populate info
aggr_meta.info["total_episodes"] += meta.total_episodes
aggr_meta.info["total_frames"] += meta.total_frames
aggr_meta.info["total_videos"] += len(aggr_meta.video_keys) * meta.total_episodes
aggr_episode_index_shift += meta.total_episodes
logging.info("Write meta data")
aggr_meta.info["total_tasks"] = len(aggr_meta.tasks)
aggr_meta.info["total_chunks"] = aggr_meta.get_episode_chunk(aggr_episode_index_shift - 1)
aggr_meta.info["splits"] = {"train": f"0:{aggr_meta.info['total_episodes']}"}
# create a new episodes jsonl with updated episode_index using write_episode
for episode_dict in tqdm.tqdm(aggr_meta.episodes.values(), desc="Write episodes"):
write_episode(episode_dict, aggr_meta.root)
# create a new episode_stats jsonl with updated episode_index using write_episode_stats
for episode_index, episode_stats in tqdm.tqdm(
aggr_meta.episodes_stats.items(), desc="Write episodes stats"
):
write_episode_stats(episode_index, episode_stats, aggr_meta.root)
# create a new task jsonl with updated episode_index using write_task
for task_index, task in tqdm.tqdm(aggr_meta.tasks.items(), desc="Write tasks"):
write_task(task_index, task, aggr_meta.root)
write_info(aggr_meta.info, aggr_meta.root)
self.datasets_task_index_to_aggr_task_index = datasets_task_index_to_aggr_task_index
self.datasets_ep_idx_to_aggr_ep_idx = datasets_ep_idx_to_aggr_ep_idx
self.datasets_aggr_episode_index_shift = datasets_aggr_episode_index_shift
logging.info("Meta data done writing!")
def run(self, data=None, rank: int = 0, world_size: int = 1):
import logging
import shutil
import pandas as pd
from lerobot.common.datasets.aggregate import get_update_episode_and_task_func
from lerobot.common.datasets.lerobot_dataset import LeRobotDatasetMetadata
from lerobot.common.utils.utils import init_logging
init_logging()
aggr_meta = LeRobotDatasetMetadata(self.aggr_repo_id)
all_metadata = [LeRobotDatasetMetadata(repo_id) for repo_id in self.repo_ids]
if world_size != len(all_metadata):
raise ValueError()
dataset_index = rank
meta = all_metadata[dataset_index]
aggr_episode_index_shift = self.datasets_aggr_episode_index_shift[dataset_index]
logging.info("Copy data")
for episode_index in range(meta.total_episodes):
aggr_episode_index = self.datasets_ep_idx_to_aggr_ep_idx[dataset_index][episode_index]
data_path = meta.root / meta.get_data_file_path(episode_index)
aggr_data_path = aggr_meta.root / aggr_meta.get_data_file_path(aggr_episode_index)
# update episode_index and task_index
df = pd.read_parquet(data_path)
update_row_func = get_update_episode_and_task_func(
aggr_episode_index_shift, self.datasets_task_index_to_aggr_task_index[dataset_index]
)
df = df.apply(update_row_func, axis=1)
aggr_data_path.parent.mkdir(parents=True, exist_ok=True)
df.to_parquet(aggr_data_path)
logging.info("Copy videos")
for episode_index in range(meta.total_episodes):
aggr_episode_index = episode_index + aggr_episode_index_shift
for vid_key in meta.video_keys:
video_path = meta.root / meta.get_video_file_path(episode_index, vid_key)
aggr_video_path = aggr_meta.root / aggr_meta.get_video_file_path(aggr_episode_index, vid_key)
aggr_video_path.parent.mkdir(parents=True, exist_ok=True)
shutil.copy(video_path, aggr_video_path)
# copy_command = f"cp {video_path} {aggr_video_path} &"
# subprocess.Popen(copy_command, shell=True)
logging.info("Done!")
def make_aggregate_executor(
repo_ids, repo_id, job_name, logs_dir, workers, partition, cpus_per_task, mem_per_cpu, slurm=True
):
kwargs = {
"pipeline": [
AggregateDatasets(repo_ids, repo_id),
],
"logging_dir": str(logs_dir),
}
if slurm:
kwargs.update(
{
"job_name": job_name,
"tasks": DROID_SHARDS,
"workers": workers,
"time": "08:00:00",
"partition": partition,
"cpus_per_task": cpus_per_task,
"sbatch_args": {"mem-per-cpu": mem_per_cpu},
}
)
executor = SlurmPipelineExecutor(**kwargs)
else:
kwargs.update(
{
"tasks": DROID_SHARDS,
"workers": 1,
}
)
executor = LocalPipelineExecutor(**kwargs)
return executor
def main():
parser = argparse.ArgumentParser()
parser.add_argument(
"--repo-id",
type=str,
help="Repositery identifier on Hugging Face: a community or a user name `/` the name of the dataset, required when push-to-hub is True.",
)
parser.add_argument(
"--logs-dir",
type=str,
help="Path to logs directory for `datatrove`.",
)
parser.add_argument(
"--job-name",
type=str,
default="port_droid",
help="Job name used in slurm, and name of the directory created inside the provided logs directory.",
)
parser.add_argument(
"--slurm",
type=int,
default=1,
help="Launch over slurm. Use `--slurm 0` to launch sequentially (useful to debug).",
)
parser.add_argument(
"--workers",
type=int,
default=2048,
help="Number of slurm workers. It should be less than the maximum number of shards.",
)
parser.add_argument(
"--partition",
type=str,
help="Slurm partition. Ideally a CPU partition. No need for GPU partition.",
)
parser.add_argument(
"--cpus-per-task",
type=int,
default=8,
help="Number of cpus that each slurm worker will use.",
)
parser.add_argument(
"--mem-per-cpu",
type=str,
default="1950M",
help="Memory per cpu that each worker will use.",
)
args = parser.parse_args()
kwargs = vars(args)
kwargs["slurm"] = kwargs.pop("slurm") == 1
repo_ids = [f"{args.repo_id}_world_{DROID_SHARDS}_rank_{rank}" for rank in range(DROID_SHARDS)]
aggregate_executor = make_aggregate_executor(repo_ids, **kwargs)
aggregate_executor.run()
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,161 @@
import argparse
from pathlib import Path
from datatrove.executor import LocalPipelineExecutor
from datatrove.executor.slurm import SlurmPipelineExecutor
from datatrove.pipeline.base import PipelineStep
from examples.port_datasets.droid_rlds.port_droid import DROID_SHARDS
from lerobot.common.datasets.lerobot_dataset import LeRobotDatasetMetadata
def validate_shard(repo_id):
"""Sanity check that ensure meta data can be loaded and all files are present."""
meta = LeRobotDatasetMetadata(repo_id)
if meta.total_episodes == 0:
raise ValueError("Number of episodes is 0.")
for ep_idx in range(meta.total_episodes):
data_path = meta.root / meta.get_data_file_path(ep_idx)
if not data_path.exists():
raise ValueError(f"Parquet file is missing in: {data_path}")
for vid_key in meta.video_keys:
vid_path = meta.root / meta.get_video_file_path(ep_idx, vid_key)
if not vid_path.exists():
raise ValueError(f"Video file is missing in: {vid_path}")
class PortDroidShards(PipelineStep):
def __init__(
self,
raw_dir: Path | str,
repo_id: str = None,
):
super().__init__()
self.raw_dir = Path(raw_dir)
self.repo_id = repo_id
def run(self, data=None, rank: int = 0, world_size: int = 1):
from datasets.utils.tqdm import disable_progress_bars
from examples.port_datasets.droid_rlds.port_droid import port_droid
from lerobot.common.utils.utils import init_logging
init_logging()
disable_progress_bars()
shard_repo_id = f"{self.repo_id}_world_{world_size}_rank_{rank}"
port_droid(
self.raw_dir,
shard_repo_id,
push_to_hub=False,
num_shards=world_size,
shard_index=rank,
)
validate_shard(shard_repo_id)
def make_port_executor(
raw_dir, repo_id, job_name, logs_dir, workers, partition, cpus_per_task, mem_per_cpu, slurm=True
):
kwargs = {
"pipeline": [
PortDroidShards(raw_dir, repo_id),
],
"logging_dir": str(logs_dir),
}
if slurm:
kwargs.update(
{
"job_name": job_name,
"tasks": DROID_SHARDS,
"workers": workers,
"time": "08:00:00",
"partition": partition,
"cpus_per_task": cpus_per_task,
"sbatch_args": {"mem-per-cpu": mem_per_cpu},
}
)
executor = SlurmPipelineExecutor(**kwargs)
else:
kwargs.update(
{
"tasks": 1,
"workers": 1,
}
)
executor = LocalPipelineExecutor(**kwargs)
return executor
def main():
parser = argparse.ArgumentParser()
parser.add_argument(
"--raw-dir",
type=Path,
required=True,
help="Directory containing input raw datasets (e.g. `path/to/dataset` or `path/to/dataset/version).",
)
parser.add_argument(
"--repo-id",
type=str,
help="Repositery identifier on Hugging Face: a community or a user name `/` the name of the dataset, required when push-to-hub is True.",
)
parser.add_argument(
"--logs-dir",
type=str,
help="Path to logs directory for `datatrove`.",
)
parser.add_argument(
"--job-name",
type=str,
default="port_droid",
help="Job name used in slurm, and name of the directory created inside the provided logs directory.",
)
parser.add_argument(
"--slurm",
type=int,
default=1,
help="Launch over slurm. Use `--slurm 0` to launch sequentially (useful to debug).",
)
parser.add_argument(
"--workers",
type=int,
default=2048,
help="Number of slurm workers. It should be less than the maximum number of shards.",
)
parser.add_argument(
"--partition",
type=str,
help="Slurm partition. Ideally a CPU partition. No need for GPU partition.",
)
parser.add_argument(
"--cpus-per-task",
type=int,
default=8,
help="Number of cpus that each slurm worker will use.",
)
parser.add_argument(
"--mem-per-cpu",
type=str,
default="1950M",
help="Memory per cpu that each worker will use.",
)
args = parser.parse_args()
kwargs = vars(args)
kwargs["slurm"] = kwargs.pop("slurm") == 1
port_executor = make_port_executor(**kwargs)
port_executor.run()
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,230 @@
import argparse
import logging
import os
from pathlib import Path
from datatrove.executor import LocalPipelineExecutor
from datatrove.executor.slurm import SlurmPipelineExecutor
from datatrove.pipeline.base import PipelineStep
from huggingface_hub import HfApi
from huggingface_hub.constants import REPOCARD_NAME
from examples.port_datasets.droid_rlds.port_droid import DROID_SHARDS
from lerobot.common.datasets.lerobot_dataset import CODEBASE_VERSION, LeRobotDatasetMetadata
from lerobot.common.datasets.utils import create_lerobot_dataset_card
class UploadDataset(PipelineStep):
def __init__(
self,
repo_id: str,
branch: str | None = None,
revision: str | None = None,
tags: list | None = None,
license: str | None = "apache-2.0",
private: bool = False,
**card_kwargs,
):
super().__init__()
self.repo_id = repo_id
self.branch = branch
self.tags = tags
self.license = license
self.private = private
self.card_kwargs = card_kwargs
self.revision = revision if revision else CODEBASE_VERSION
if os.environ.get("HF_HUB_ENABLE_HF_TRANSFER", "0") != "1":
logging.warning(
'HF_HUB_ENABLE_HF_TRANSFER is not set to "1". Install hf_transfer and set the env '
"variable for faster uploads:\npip install hf-transfer\nexport HF_HUB_ENABLE_HF_TRANSFER=1"
)
self.create_repo()
def create_repo(self):
hub_api = HfApi()
meta = LeRobotDatasetMetadata(self.repo_id)
hub_api.create_repo(
repo_id=self.repo_id,
private=self.private,
repo_type="dataset",
exist_ok=True,
)
if self.branch:
hub_api.create_branch(
repo_id=self.repo_id,
branch=self.branch,
revision=self.revision,
repo_type="dataset",
exist_ok=True,
)
if not hub_api.file_exists(self.repo_id, REPOCARD_NAME, repo_type="dataset", revision=self.branch):
card = create_lerobot_dataset_card(
tags=self.tags, dataset_info=meta.info, license=self.license, **self.card_kwargs
)
card.push_to_hub(repo_id=self.repo_id, repo_type="dataset", revision=self.branch)
def list_files_recursively(directory):
base_path = Path(directory)
return [str(file.relative_to(base_path)) for file in base_path.rglob("*") if file.is_file()]
meta = LeRobotDatasetMetadata(self.repo_id)
self.file_paths = list_files_recursively(meta.root)
self.file_paths = sorted(self.file_paths)
def run(self, data=None, rank: int = 0, world_size: int = 1):
import logging
import random
import time
from itertools import islice
from huggingface_hub import CommitOperationAdd, create_commit, preupload_lfs_files
from huggingface_hub.utils import HfHubHTTPError
from lerobot.common.datasets.lerobot_dataset import LeRobotDatasetMetadata
from lerobot.common.utils.utils import init_logging
BASE_DELAY = 1.0 # noqa: N806
MAX_RETRIES = 24 # noqa: N806
init_logging()
def chunked(lst, n):
it = iter(lst)
return [list(islice(it, size)) for size in [len(lst) // n + (i < len(lst) % n) for i in range(n)]]
chunks = chunked(self.file_paths, world_size)
file_paths = chunks[rank]
if len(file_paths) == 0:
raise ValueError(file_paths)
meta = LeRobotDatasetMetadata(self.repo_id)
additions = [
CommitOperationAdd(path_in_repo=path, path_or_fileobj=meta.root / path) for path in file_paths
]
logging.info(f"Uploading {','.join(file_paths)} to the hub...")
preupload_lfs_files(
repo_id=self.repo_id, repo_type="dataset", additions=additions, revision=self.branch
)
logging.info(f"Upload of {','.join(file_paths)} to the hub complete!")
retries = 0
while True:
try:
create_commit(
self.repo_id,
repo_type="dataset",
operations=additions,
commit_message=f"DataTrove upload ({len(additions)} files)",
revision=self.branch,
)
break
except HfHubHTTPError as e:
if "A commit has happened since" in e.server_message:
if retries >= MAX_RETRIES:
logging.error(f"Failed to create commit after {MAX_RETRIES=}. Giving up.")
raise e
logging.info("Commit creation race condition issue. Waiting...")
time.sleep(BASE_DELAY * 2**retries + random.uniform(0, 2))
retries += 1
else:
raise e
def make_upload_executor(
repo_id, job_name, logs_dir, workers, partition, cpus_per_task, mem_per_cpu, slurm=True
):
kwargs = {
"pipeline": [
UploadDataset(repo_id),
],
"logging_dir": str(logs_dir),
}
if slurm:
kwargs.update(
{
"job_name": job_name,
"tasks": DROID_SHARDS,
"workers": workers,
"time": "08:00:00",
"partition": partition,
"cpus_per_task": cpus_per_task,
"sbatch_args": {"mem-per-cpu": mem_per_cpu},
}
)
executor = SlurmPipelineExecutor(**kwargs)
else:
kwargs.update(
{
"tasks": DROID_SHARDS,
"workers": 1,
}
)
executor = LocalPipelineExecutor(**kwargs)
return executor
def main():
parser = argparse.ArgumentParser()
parser.add_argument(
"--repo-id",
type=str,
help="Repositery identifier on Hugging Face: a community or a user name `/` the name of the dataset, required when push-to-hub is True.",
)
parser.add_argument(
"--logs-dir",
type=str,
help="Path to logs directory for `datatrove`.",
)
parser.add_argument(
"--job-name",
type=str,
default="port_droid",
help="Job name used in slurm, and name of the directory created inside the provided logs directory.",
)
parser.add_argument(
"--slurm",
type=int,
default=1,
help="Launch over slurm. Use `--slurm 0` to launch sequentially (useful to debug).",
)
parser.add_argument(
"--workers",
type=int,
default=2048,
help="Number of slurm workers. It should be less than the maximum number of shards.",
)
parser.add_argument(
"--partition",
type=str,
help="Slurm partition. Ideally a CPU partition. No need for GPU partition.",
)
parser.add_argument(
"--cpus-per-task",
type=int,
default=8,
help="Number of cpus that each slurm worker will use.",
)
parser.add_argument(
"--mem-per-cpu",
type=str,
default="1950M",
help="Memory per cpu that each worker will use.",
)
args = parser.parse_args()
kwargs = vars(args)
kwargs["slurm"] = kwargs.pop("slurm") == 1
upload_executor = make_upload_executor(**kwargs)
upload_executor.run()
if __name__ == "__main__":
main()