diff --git a/examples/port_datasets/droid_rlds/README.md b/examples/port_datasets/droid_rlds/README.md index 11020d09..b4da4986 100644 --- a/examples/port_datasets/droid_rlds/README.md +++ b/examples/port_datasets/droid_rlds/README.md @@ -39,14 +39,15 @@ python examples/port_datasets/droid_rlds/port.py \ ## Port over SLURM -### 1. Port one shard per job - -First, install slurm utilities from Hugging Face: +Install slurm utilities from Hugging Face: ```bash pip install datatrove ``` -Then run this script to start porting shards of the dataset: + +### 1. Port one shard per job + +Run this script to start porting shards of the dataset: ```bash python examples/port_datasets/droid_rlds/slurm_port_shards.py \ --raw-dir /your/data/droid/1.0.1 \ @@ -83,7 +84,7 @@ Check if your jobs are running: squeue -u $USER` ``` -You should see a list with job indices like `15125385_155` where `15125385` is the job index and `155` is the worker index. The output/print of this worker is written in real time in `/your/logs/job_name/slurm_jobs/15125385_155.out`. For instance, you can inspect the content of this file by running `less /your/logs/job_name/slurm_jobs/15125385_155.out`. +You should see a list with job indices like `15125385_155` where `15125385` is the index of the run and `155` is the worker index. The output/print of this worker is written in real time in `/your/logs/job_name/slurm_jobs/15125385_155.out`. For instance, you can inspect the content of this file by running `less /your/logs/job_name/slurm_jobs/15125385_155.out`. Check the progression of your jobs by running: ```bash diff --git a/examples/port_datasets/droid_rlds/port_droid.py b/examples/port_datasets/droid_rlds/port_droid.py index ca6b9fce..b92cd1f7 100644 --- a/examples/port_datasets/droid_rlds/port_droid.py +++ b/examples/port_datasets/droid_rlds/port_droid.py @@ -307,7 +307,7 @@ def generate_lerobot_frames(tf_episode): def port_droid( raw_dir: Path, - repo_id: str = None, + repo_id: str, push_to_hub: bool = False, num_shards: int | None = None, shard_index: int | None = None, @@ -349,11 +349,12 @@ def port_droid( logging.info(f"Number of episodes {num_episodes}") for episode_index, episode in enumerate(raw_dataset): - logging.info(f"{episode_index} / {num_episodes} episodes processed") - elapsed_time = time.time() - start_time d, h, m, s = get_elapsed_time_in_days_hours_minutes_seconds(elapsed_time) - logging.info(f"It has been {d} days, {h} hours, {m} minutes, {s:.3f} seconds") + + logging.info( + f"{episode_index} / {num_episodes} episodes processed (after {d} days, {h} hours, {m} minutes, {s:.3f} seconds)" + ) for frame in generate_lerobot_frames(episode): lerobot_dataset.add_frame(frame) diff --git a/examples/port_datasets/droid_rlds/slurm_aggregate_shards.py b/examples/port_datasets/droid_rlds/slurm_aggregate_shards.py index 621b3600..d2f2dfdf 100644 --- a/examples/port_datasets/droid_rlds/slurm_aggregate_shards.py +++ b/examples/port_datasets/droid_rlds/slurm_aggregate_shards.py @@ -16,6 +16,7 @@ import argparse import logging +from pathlib import Path import tqdm from datatrove.executor import LocalPipelineExecutor @@ -197,7 +198,7 @@ def make_aggregate_executor( "pipeline": [ AggregateDatasets(repo_ids, repo_id), ], - "logging_dir": str(logs_dir), + "logging_dir": str(logs_dir / job_name), } if slurm: @@ -235,7 +236,7 @@ def main(): ) parser.add_argument( "--logs-dir", - type=str, + type=Path, help="Path to logs directory for `datatrove`.", ) parser.add_argument( diff --git a/examples/port_datasets/droid_rlds/slurm_port_shards.py b/examples/port_datasets/droid_rlds/slurm_port_shards.py index 89d6bd85..08e36bc3 100644 --- a/examples/port_datasets/droid_rlds/slurm_port_shards.py +++ b/examples/port_datasets/droid_rlds/slurm_port_shards.py @@ -67,7 +67,7 @@ def make_port_executor( "pipeline": [ PortDroidShards(raw_dir, repo_id), ], - "logging_dir": str(logs_dir), + "logging_dir": str(logs_dir / job_name), } if slurm: @@ -111,7 +111,7 @@ def main(): ) parser.add_argument( "--logs-dir", - type=str, + type=Path, help="Path to logs directory for `datatrove`.", ) parser.add_argument( diff --git a/examples/port_datasets/droid_rlds/slurm_upload.py b/examples/port_datasets/droid_rlds/slurm_upload.py index f7f03716..43849468 100644 --- a/examples/port_datasets/droid_rlds/slurm_upload.py +++ b/examples/port_datasets/droid_rlds/slurm_upload.py @@ -12,6 +12,7 @@ from huggingface_hub.constants import REPOCARD_NAME from examples.port_datasets.droid_rlds.port_droid import DROID_SHARDS from lerobot.common.datasets.lerobot_dataset import CODEBASE_VERSION, LeRobotDatasetMetadata from lerobot.common.datasets.utils import create_lerobot_dataset_card +from lerobot.common.utils.utils import init_logging class UploadDataset(PipelineStep): @@ -23,10 +24,12 @@ class UploadDataset(PipelineStep): tags: list | None = None, license: str | None = "apache-2.0", private: bool = False, + distant_repo_id: str | None = None, **card_kwargs, ): super().__init__() self.repo_id = repo_id + self.distant_repo_id = self.repo_id if distant_repo_id is None else distant_repo_id self.branch = branch self.tags = tags self.license = license @@ -43,96 +46,123 @@ class UploadDataset(PipelineStep): self.create_repo() def create_repo(self): - hub_api = HfApi() - + logging.info(f"Loading meta data from {self.repo_id}...") meta = LeRobotDatasetMetadata(self.repo_id) + + logging.info(f"Creating repo {self.distant_repo_id}...") + hub_api = HfApi() hub_api.create_repo( - repo_id=self.repo_id, + repo_id=self.distant_repo_id, private=self.private, repo_type="dataset", exist_ok=True, ) if self.branch: hub_api.create_branch( - repo_id=self.repo_id, + repo_id=self.distant_repo_id, branch=self.branch, revision=self.revision, repo_type="dataset", exist_ok=True, ) - if not hub_api.file_exists(self.repo_id, REPOCARD_NAME, repo_type="dataset", revision=self.branch): + if not hub_api.file_exists( + self.distant_repo_id, REPOCARD_NAME, repo_type="dataset", revision=self.branch + ): card = create_lerobot_dataset_card( tags=self.tags, dataset_info=meta.info, license=self.license, **self.card_kwargs ) - card.push_to_hub(repo_id=self.repo_id, repo_type="dataset", revision=self.branch) + card.push_to_hub(repo_id=self.distant_repo_id, repo_type="dataset", revision=self.branch) def list_files_recursively(directory): base_path = Path(directory) return [str(file.relative_to(base_path)) for file in base_path.rglob("*") if file.is_file()] - meta = LeRobotDatasetMetadata(self.repo_id) + logging.info(f"Listing all local files from {self.repo_id}...") self.file_paths = list_files_recursively(meta.root) self.file_paths = sorted(self.file_paths) - def run(self, data=None, rank: int = 0, world_size: int = 1): - import logging - import random - import time + def create_chunks(self, lst, n): from itertools import islice - from huggingface_hub import CommitOperationAdd, create_commit, preupload_lfs_files + it = iter(lst) + return [list(islice(it, size)) for size in [len(lst) // n + (i < len(lst) % n) for i in range(n)]] + + def create_commits(self, additions): + import logging + import math + import random + import time + + from huggingface_hub import create_commit from huggingface_hub.utils import HfHubHTTPError + FILES_BETWEEN_COMMITS = 10 # noqa: N806 + BASE_DELAY = 0.1 # noqa: N806 + MAX_RETRIES = 12 # noqa: N806 + + # Split the files into smaller chunks for faster commit + # and avoiding "A commit has happened since" error + num_chunks = math.ceil(len(additions) / FILES_BETWEEN_COMMITS) + chunks = self.create_chunks(additions, num_chunks) + + for chunk in chunks: + retries = 0 + while True: + try: + create_commit( + self.distant_repo_id, + repo_type="dataset", + operations=chunk, + commit_message=f"DataTrove upload ({len(chunk)} files)", + revision=self.branch, + ) + logging.info("create_commit completed!") + break + except HfHubHTTPError as e: + if "A commit has happened since" in e.server_message: + if retries >= MAX_RETRIES: + logging.error(f"Failed to create commit after {MAX_RETRIES=}. Giving up.") + raise e + logging.info("Commit creation race condition issue. Waiting...") + time.sleep(BASE_DELAY * 2**retries + random.uniform(0, 2)) + retries += 1 + else: + raise e + + def run(self, data=None, rank: int = 0, world_size: int = 1): + import logging + + from datasets.utils.tqdm import disable_progress_bars + from huggingface_hub import CommitOperationAdd, preupload_lfs_files + from lerobot.common.datasets.lerobot_dataset import LeRobotDatasetMetadata from lerobot.common.utils.utils import init_logging - BASE_DELAY = 1.0 # noqa: N806 - MAX_RETRIES = 24 # noqa: N806 - init_logging() + disable_progress_bars() - def chunked(lst, n): - it = iter(lst) - return [list(islice(it, size)) for size in [len(lst) // n + (i < len(lst) % n) for i in range(n)]] - - chunks = chunked(self.file_paths, world_size) + chunks = self.create_chunks(self.file_paths, world_size) file_paths = chunks[rank] if len(file_paths) == 0: raise ValueError(file_paths) + logging.info("Pre-uploading LFS files...") + for i, path in enumerate(file_paths): + logging.info(f"{i}: {path}") + meta = LeRobotDatasetMetadata(self.repo_id) additions = [ CommitOperationAdd(path_in_repo=path, path_or_fileobj=meta.root / path) for path in file_paths ] - logging.info(f"Uploading {','.join(file_paths)} to the hub...") preupload_lfs_files( - repo_id=self.repo_id, repo_type="dataset", additions=additions, revision=self.branch + repo_id=self.distant_repo_id, repo_type="dataset", additions=additions, revision=self.branch ) - logging.info(f"Upload of {','.join(file_paths)} to the hub complete!") - retries = 0 - while True: - try: - create_commit( - self.repo_id, - repo_type="dataset", - operations=additions, - commit_message=f"DataTrove upload ({len(additions)} files)", - revision=self.branch, - ) - break - except HfHubHTTPError as e: - if "A commit has happened since" in e.server_message: - if retries >= MAX_RETRIES: - logging.error(f"Failed to create commit after {MAX_RETRIES=}. Giving up.") - raise e - logging.info("Commit creation race condition issue. Waiting...") - time.sleep(BASE_DELAY * 2**retries + random.uniform(0, 2)) - retries += 1 - else: - raise e + logging.info("Creating commits...") + self.create_commits(additions) + logging.info("Done!") def make_upload_executor( @@ -142,7 +172,7 @@ def make_upload_executor( "pipeline": [ UploadDataset(repo_id), ], - "logging_dir": str(logs_dir), + "logging_dir": str(logs_dir / job_name), } if slurm: @@ -180,7 +210,7 @@ def main(): ) parser.add_argument( "--logs-dir", - type=str, + type=Path, help="Path to logs directory for `datatrove`.", ) parser.add_argument( @@ -209,7 +239,7 @@ def main(): parser.add_argument( "--cpus-per-task", type=int, - default=4, + default=8, help="Number of cpus that each slurm worker will use.", ) parser.add_argument( @@ -219,6 +249,8 @@ def main(): help="Memory per cpu that each worker will use.", ) + init_logging() + args = parser.parse_args() kwargs = vars(args) kwargs["slurm"] = kwargs.pop("slurm") == 1