From 8d360927af768eb09ce152e4d8a027f572ce3494 Mon Sep 17 00:00:00 2001 From: Remi Cadene Date: Fri, 16 May 2025 17:40:34 +0000 Subject: [PATCH] WIP aggregate --- .../droid_rlds/slurm_port_shards.py | 7 +- lerobot/common/datasets/aggregate.py | 460 +++++++++++------- lerobot/common/datasets/lerobot_dataset.py | 2 +- 3 files changed, 286 insertions(+), 183 deletions(-) diff --git a/examples/port_datasets/droid_rlds/slurm_port_shards.py b/examples/port_datasets/droid_rlds/slurm_port_shards.py index 7a1e8dd2b..36b4d795b 100644 --- a/examples/port_datasets/droid_rlds/slurm_port_shards.py +++ b/examples/port_datasets/droid_rlds/slurm_port_shards.py @@ -6,7 +6,6 @@ from datatrove.executor.slurm import SlurmPipelineExecutor from datatrove.pipeline.base import PipelineStep from examples.port_datasets.droid_rlds.port_droid import DROID_SHARDS -from lerobot.common.datasets.lerobot_dataset import LeRobotDatasetMetadata class PortDroidShards(PipelineStep): @@ -30,6 +29,12 @@ class PortDroidShards(PipelineStep): shard_repo_id = f"{self.repo_id}_world_{world_size}_rank_{rank}" + try: + validate_dataset(shard_repo_id) + return + except: + pass + port_droid( self.raw_dir, shard_repo_id, diff --git a/lerobot/common/datasets/aggregate.py b/lerobot/common/datasets/aggregate.py index 2cf58ff57..c698c3a51 100644 --- a/lerobot/common/datasets/aggregate.py +++ b/lerobot/common/datasets/aggregate.py @@ -5,6 +5,7 @@ from pathlib import Path import pandas as pd import tqdm +from lerobot.common.constants import HF_LEROBOT_HOME from lerobot.common.datasets.compute_stats import aggregate_stats from lerobot.common.datasets.lerobot_dataset import LeRobotDataset, LeRobotDatasetMetadata from lerobot.common.datasets.utils import ( @@ -47,12 +48,12 @@ def validate_all_metadata(all_metadata: list[LeRobotDatasetMetadata]): return fps, robot_type, features -def update_episode_frame_task(df, episode_index_to_add, old_tasks, new_tasks, frame_index_to_add): +def update_data_df(df, src_meta, dst_meta): def _update(row): - row["episode_index"] = row["episode_index"] + episode_index_to_add - row["index"] = row["index"] + frame_index_to_add - task = old_tasks.iloc[row["task_index"]].name - row["task_index"] = new_tasks.loc[task].task_index.item() + row["episode_index"] = row["episode_index"] + dst_meta["total_episodes"] + row["index"] = row["index"] + dst_meta["total_frames"] + task = src_meta.tasks.iloc[row["task_index"]].name + row["task_index"] = dst_meta.tasks.loc[task].task_index.item() return row return df.apply(_update, axis=1) @@ -60,26 +61,28 @@ def update_episode_frame_task(df, episode_index_to_add, old_tasks, new_tasks, fr def update_meta_data( df, - meta_chunk_index_to_add, - meta_file_index_to_add, - data_chunk_index_to_add, - data_file_index_to_add, - videos_chunk_index_to_add, - videos_file_index_to_add, - frame_index_to_add, + dst_meta, + meta_idx, + data_idx, + videos_idx, ): def _update(row): - row["meta/episodes/chunk_index"] = row["meta/episodes/chunk_index"] + meta_chunk_index_to_add - row["meta/episodes/file_index"] = row["meta/episodes/file_index"] + meta_file_index_to_add - row["data/chunk_index"] = row["data/chunk_index"] + data_chunk_index_to_add - row["data/file_index"] = row["data/file_index"] + data_file_index_to_add - for key in videos_chunk_index_to_add: - row[f"videos/{key}/chunk_index"] = ( - row[f"videos/{key}/chunk_index"] + videos_chunk_index_to_add[key] + row["meta/episodes/chunk_index"] = row["meta/episodes/chunk_index"] + meta_idx["chunk_index"] + row["meta/episodes/file_index"] = row["meta/episodes/file_index"] + meta_idx["file_index"] + row["data/chunk_index"] = row["data/chunk_index"] + data_idx["chunk_index"] + row["data/file_index"] = row["data/file_index"] + data_idx["file_index"] + for key, video_idx in videos_idx.items(): + row[f"videos/{key}/chunk_index"] = row[f"videos/{key}/chunk_index"] + video_idx["chunk_index"] + row[f"videos/{key}/file_index"] = row[f"videos/{key}/file_index"] + video_idx["file_index"] + row[f"videos/{key}/from_timestamp"] = ( + row[f"videos/{key}/from_timestamp"] + video_idx["latest_duration"] ) - row[f"videos/{key}/file_index"] = row[f"videos/{key}/file_index"] + videos_file_index_to_add[key] - row["dataset_from_index"] = row["dataset_from_index"] + frame_index_to_add - row["dataset_to_index"] = row["dataset_to_index"] + frame_index_to_add + row[f"videos/{key}/to_timestamp"] = ( + row[f"videos/{key}/to_timestamp"] + video_idx["latest_duration"] + ) + row["dataset_from_index"] = row["dataset_from_index"] + dst_meta.info["total_frames"] + row["dataset_to_index"] = row["dataset_to_index"] + dst_meta.info["total_frames"] + row["episode_index"] = row["episode_index"] + dst_meta.info["total_episodes"] return row return df.apply(_update, axis=1) @@ -88,214 +91,309 @@ def update_meta_data( def aggregate_datasets(repo_ids: list[str], aggr_repo_id: str, roots: list[Path] = None, aggr_root=None): logging.info("Start aggregate_datasets") - if roots is None: - all_metadata = [LeRobotDatasetMetadata(repo_id) for repo_id in repo_ids] - else: - all_metadata = [ + # Load metadata + all_metadata = ( + [LeRobotDatasetMetadata(repo_id) for repo_id in repo_ids] + if roots is None + else [ LeRobotDatasetMetadata(repo_id, root=root) for repo_id, root in zip(repo_ids, roots, strict=False) ] - + ) fps, robot_type, features = validate_all_metadata(all_metadata) - video_keys = [key for key in features if features[key]["dtype"] == "video"] + video_keys = [k for k, v in features.items() if v["dtype"] == "video"] - # Create resulting dataset folder - aggr_meta = LeRobotDatasetMetadata.create( + # Initialize output dataset metadata + dst_meta = LeRobotDatasetMetadata.create( repo_id=aggr_repo_id, fps=fps, robot_type=robot_type, features=features, root=aggr_root, ) - aggr_root = aggr_meta.root + # Aggregate task info logging.info("Find all tasks") - unique_tasks = pd.concat([meta.tasks for meta in all_metadata]).index.unique() - aggr_meta.tasks = pd.DataFrame({"task_index": range(len(unique_tasks))}, index=unique_tasks) + unique_tasks = pd.concat([m.tasks for m in all_metadata]).index.unique() + dst_meta.tasks = pd.DataFrame({"task_index": range(len(unique_tasks))}, index=unique_tasks) - num_episodes = 0 - num_frames = 0 + # Track counters and indices + meta_idx = {"chunk": 0, "file": 0} + data_idx = {"chunk": 0, "file": 0} + videos_idx = { + key: {"chunk": 0, "file": 0, "latest_duration": 0, "episode_duration": 0} for key in video_keys + } - aggr_meta_chunk_idx = 0 - aggr_meta_file_idx = 0 + # Process each dataset + for src_meta in tqdm.tqdm(all_metadata, desc="Copy data and videos"): + videos_idx = aggregate_videos(src_meta, dst_meta, videos_idx) + data_idx = aggregate_data(src_meta, dst_meta, data_idx) + meta_idx = aggregate_metadata(src_meta, dst_meta, meta_idx, data_idx, videos_idx, video_keys) - aggr_data_chunk_idx = 0 - aggr_data_file_idx = 0 + dst_meta.info["total_episodes"] += src_meta.total_episodes + dst_meta.info["total_frames"] += src_meta.total_frames - aggr_videos_chunk_idx = dict.fromkeys(video_keys, 0) - aggr_videos_file_idx = dict.fromkeys(video_keys, 0) + finalize_aggregation(aggr_meta, all_metadata) + logging.info("Aggregation complete.") - for meta in tqdm.tqdm(all_metadata, desc="Copy data and videos"): - # Aggregate episodes meta data - meta_chunk_file_ids = { - (c, f) - for c, f in zip( - meta.episodes["meta/episodes/chunk_index"], - meta.episodes["meta/episodes/file_index"], + +# ------------------------------- +# Helper Functions +# ------------------------------- + + +def aggregate_videos(src_meta, dst_meta, videos_idx): + """ + Aggregates video chunks from a dataset into the aggregated dataset folder. + """ + for key, video_idx in videos_idx.items(): + # Get unique (chunk, file) combinations + unique_chunk_file_pairs = { + (chunk, file) + for chunk, file in zip( + src_meta.episodes[f"videos/{key}/chunk_index"], + src_meta.episodes[f"videos/{key}/file_index"], strict=False, ) } - for chunk_idx, file_idx in meta_chunk_file_ids: - path = meta.root / DEFAULT_EPISODES_PATH.format(chunk_index=chunk_idx, file_index=file_idx) - df = pd.read_parquet(path) - df = update_meta_data( - df, - aggr_meta_chunk_idx, - aggr_meta_file_idx, - aggr_data_chunk_idx, - aggr_data_file_idx, - aggr_videos_chunk_idx, - aggr_videos_file_idx, - num_frames, + + # Current target chunk/file index + chunk_idx = video_idx["chunk_idx"] + file_idx = video_idx["file_idx"] + + for src_chunk_idx, src_file_idx in unique_chunk_file_pairs: + src_path = src_meta.root / DEFAULT_VIDEO_PATH.format( + video_key=key, + chunk_index=src_chunk_idx, + file_index=src_file_idx, ) - aggr_path = aggr_root / DEFAULT_EPISODES_PATH.format( - chunk_index=aggr_meta_chunk_idx, file_index=aggr_meta_file_idx + dst_path = dst_meta.root / DEFAULT_VIDEO_PATH.format( + video_key=key, + chunk_index=chunk_idx, + file_index=file_idx, ) - if not aggr_path.exists(): - aggr_path.parent.mkdir(parents=True, exist_ok=True) - df.to_parquet(aggr_path) - else: - size_in_mb = get_parquet_file_size_in_mb(path) - aggr_size_in_mb = get_parquet_file_size_in_mb(aggr_path) - if aggr_size_in_mb + size_in_mb >= DEFAULT_DATA_FILE_SIZE_IN_MB: - # Size limit is reached, prepare new parquet file - aggr_meta_chunk_idx, aggr_meta_file_idx = update_chunk_file_indices( - aggr_meta_chunk_idx, aggr_meta_file_idx, DEFAULT_CHUNK_SIZE - ) - aggr_path = aggr_root / DEFAULT_EPISODES_PATH.format( - chunk_index=aggr_meta_chunk_idx, file_index=aggr_meta_file_idx - ) - aggr_path.parent.mkdir(parents=True, exist_ok=True) - df.to_parquet(aggr_path) - else: - # Update the existing parquet file with new rows - aggr_df = pd.read_parquet(aggr_path) - df = pd.concat([aggr_df, df], ignore_index=True) - df.to_parquet(aggr_path) + if not dst_path.exists(): + # First write to this destination file + dst_path.parent.mkdir(parents=True, exist_ok=True) + shutil.copy(str(src_path), str(dst_path)) + continue - # Aggregate videos if any - for key in video_keys: - video_chunk_file_ids = { - (c, f) - for c, f in zip( - meta.episodes[f"videos/{key}/chunk_index"], - meta.episodes[f"videos/{key}/file_index"], - strict=False, - ) - } - for chunk_idx, file_idx in video_chunk_file_ids: - path = meta.root / DEFAULT_VIDEO_PATH.format( - video_key=key, chunk_index=chunk_idx, file_index=file_idx - ) - aggr_path = aggr_root / DEFAULT_VIDEO_PATH.format( + # Check file sizes before appending + src_size = get_video_size_in_mb(src_path) + dst_size = get_video_size_in_mb(dst_path) + + if dst_size + src_size >= DEFAULT_VIDEO_FILE_SIZE_IN_MB: + # Rotate to a new chunk/file + chunk_idx, file_idx = update_chunk_file_indices(chunk_idx, file_idx, DEFAULT_CHUNK_SIZE) + dst_path = dst_meta.root / DEFAULT_VIDEO_PATH.format( video_key=key, - chunk_index=aggr_videos_chunk_idx[key], - file_index=aggr_videos_file_idx[key], + chunk_index=chunk_idx, + file_index=file_idx, ) - if not aggr_path.exists(): - # First video - aggr_path.parent.mkdir(parents=True, exist_ok=True) - shutil.copy(str(path), str(aggr_path)) - else: - size_in_mb = get_video_size_in_mb(path) - aggr_size_in_mb = get_video_size_in_mb(aggr_path) - - if aggr_size_in_mb + size_in_mb >= DEFAULT_VIDEO_FILE_SIZE_IN_MB: - # Size limit is reached, prepare new parquet file - aggr_videos_chunk_idx[key], aggr_videos_file_idx[key] = update_chunk_file_indices( - aggr_videos_chunk_idx[key], aggr_videos_file_idx[key], DEFAULT_CHUNK_SIZE - ) - aggr_path = aggr_root / DEFAULT_VIDEO_PATH.format( - video_key=key, - chunk_index=aggr_videos_chunk_idx[key], - file_index=aggr_videos_file_idx[key], - ) - aggr_path.parent.mkdir(parents=True, exist_ok=True) - shutil.copy(str(path), str(aggr_path)) - else: - # Update the existing parquet file with new rows - concat_video_files( - [aggr_path, path], - aggr_root, - key, - aggr_videos_chunk_idx[key], - aggr_videos_file_idx[key], - ) - # copy_command = f"cp {video_path} {aggr_video_path} &" - # subprocess.Popen(copy_command, shell=True) - - # Aggregate data - data_chunk_file_ids = { - (c, f) - for c, f in zip(meta.episodes["data/chunk_index"], meta.episodes["data/file_index"], strict=False) - } - for chunk_idx, file_idx in data_chunk_file_ids: - path = meta.root / DEFAULT_DATA_PATH.format(chunk_index=chunk_idx, file_index=file_idx) - df = pd.read_parquet(path) - df = update_episode_frame_task(df, num_episodes, meta.tasks, aggr_meta.tasks, num_frames) - - aggr_path = aggr_root / DEFAULT_DATA_PATH.format( - chunk_index=aggr_data_chunk_idx, file_index=aggr_data_file_idx - ) - if not aggr_path.exists(): - aggr_path.parent.mkdir(parents=True, exist_ok=True) - df.to_parquet(aggr_path) + dst_path.parent.mkdir(parents=True, exist_ok=True) + shutil.copy(str(src_path), str(dst_path)) else: - size_in_mb = get_parquet_file_size_in_mb(path) - aggr_size_in_mb = get_parquet_file_size_in_mb(aggr_path) + # Append to existing video file + concat_video_files( + [dst_path, src_path], + dst_meta.root, + key, + chunk_idx, + file_idx, + ) - if aggr_size_in_mb + size_in_mb >= DEFAULT_DATA_FILE_SIZE_IN_MB: - # Size limit is reached, prepare new parquet file - aggr_data_chunk_idx, aggr_data_file_idx = update_chunk_file_indices( - aggr_data_chunk_idx, aggr_data_file_idx, DEFAULT_CHUNK_SIZE - ) - aggr_path = aggr_root / DEFAULT_DATA_PATH.format( - chunk_index=aggr_data_chunk_idx, file_index=aggr_data_file_idx - ) - aggr_path.parent.mkdir(parents=True, exist_ok=True) - df.to_parquet(aggr_path) - else: - # Update the existing parquet file with new rows - aggr_df = pd.read_parquet(aggr_path) - df = pd.concat([aggr_df, df], ignore_index=True) - df.to_parquet(aggr_path) + # Update the video index tracking + video_idx["chunk_idx"] = chunk_idx + video_idx["file_idx"] = file_idx - num_episodes += meta.total_episodes - num_frames += meta.total_frames + return videos_idx + +def aggregate_data(src_meta, dst_meta, data_idx): + unique_chunk_file_ids = { + (c, f) + for c, f in zip( + src_meta.episodes["data/chunk_index"], src_meta.episodes["data/file_index"], strict=False + ) + } + for src_chunk_idx, src_file_idx in unique_chunk_file_ids: + src_path = src_meta.root / DEFAULT_DATA_PATH.format( + chunk_index=src_chunk_idx, file_index=src_file_idx + ) + df = pd.read_parquet(src_path) + df = update_data_df(df, src_meta, dst_meta) + + dst_path = aggr_root / DEFAULT_DATA_PATH.format( + chunk_index=data_idx["chunk"], file_index=data_idx["file"] + ) + data_idx = write_parquet_safely( + df, + src_path, + dst_path, + data_idx, + DEFAULT_DATA_FILE_SIZE_IN_MB, + DEFAULT_CHUNK_SIZE, + DEFAULT_DATA_PATH, + ) + + return data_idx + + +def aggregate_metadata(src_meta, dst_meta, meta_idx, data_idx, videos_idx): + chunk_file_ids = { + (c, f) + for c, f in zip( + src_meta.episodes["meta/episodes/chunk_index"], + src_meta.episodes["meta/episodes/file_index"], + strict=False, + ) + } + + for chunk_idx, file_idx in chunk_file_ids: + src_path = src_meta.root / DEFAULT_EPISODES_PATH.format(chunk_index=chunk_idx, file_index=file_idx) + df = pd.read_parquet(src_path) + df = update_meta_data( + df, + dst_meta, + meta_idx, + data_idx, + videos_idx, + ) + + # for k in video_keys: + # video_idx[k]["latest_duration"] += video_idx[k]["episode_duration"] + + dst_path = dst_meta.root / DEFAULT_EPISODES_PATH.format( + chunk_index=meta_idx["chunk"], file_index=meta_idx["file"] + ) + write_parquet_safely( + df, + src_path, + dst_path, + meta_idx, + DEFAULT_DATA_FILE_SIZE_IN_MB, + DEFAULT_CHUNK_SIZE, + DEFAULT_EPISODES_PATH, + ) + + return meta_idx + + +def write_parquet_safely( + df: pd.DataFrame, + src_path: Path, + dst_path: Path, + idx: dict[str, int], + max_mb: float, + chunk_size: int, + default_path: str, +): + """ + Safely appends or creates a Parquet file at dst_path based on size constraints. + + Parameters: + df (pd.DataFrame): Data to write. + src_path (Path): Path to source file (used to get size). + dst_path (Path): Target path for writing. + idx (dict): Dictionary containing 'chunk' and 'file' indices. + max_mb (float): Maximum allowed file size in MB. + chunk_size (int): Maximum number of files per chunk. + default_path (str): Format string for generating a new file path. + + Returns: + dict: Updated index dictionary. + """ + + # If destination file doesn't exist, just write the new one + if not dst_path.exists(): + dst_path.parent.mkdir(parents=True, exist_ok=True) + df.to_parquet(dst_path) + return idx + + # Otherwise, check if we exceed the size limit + src_size = get_parquet_file_size_in_mb(src_path) + dst_size = get_parquet_file_size_in_mb(dst_path) + + if dst_size + src_size >= max_mb: + # File is too large, move to a new one + idx["chunk"], idx["file"] = update_chunk_file_indices(idx["chunk"], idx["file"], chunk_size) + new_path = dst_path.parent / default_path.format(chunk_index=idx["chunk"], file_index=idx["file"]) + new_path.parent.mkdir(parents=True, exist_ok=True) + df.to_parquet(new_path) + else: + # Append to existing file + existing_df = pd.read_parquet(dst_path) + combined_df = pd.concat([existing_df, df], ignore_index=True) + combined_df.to_parquet(dst_path) + + return idx + + +def finalize_aggregation(aggr_meta, all_metadata): logging.info("write tasks") write_tasks(aggr_meta.tasks, aggr_meta.root) logging.info("write info") - aggr_meta.info["total_tasks"] = len(aggr_meta.tasks) - aggr_meta.info["total_episodes"] = sum([meta.total_episodes for meta in all_metadata]) - aggr_meta.info["total_frames"] = sum([meta.total_frames for meta in all_metadata]) - aggr_meta.info["splits"] = {"train": f"0:{aggr_meta.total_episodes}"} + aggr_meta.info.update( + { + "total_tasks": len(aggr_meta.tasks), + "total_episodes": sum(m.total_episodes for m in all_metadata), + "total_frames": sum(m.total_frames for m in all_metadata), + "splits": {"train": f"0:{sum(m.total_episodes for m in all_metadata)}"}, + } + ) write_info(aggr_meta.info, aggr_meta.root) logging.info("write stats") - aggr_meta.stats = aggregate_stats([meta.stats for meta in all_metadata]) + aggr_meta.stats = aggregate_stats([m.stats for m in all_metadata]) write_stats(aggr_meta.stats, aggr_meta.root) if __name__ == "__main__": init_logging() - aggr_repo_id = "cadene/aggregate_test" - aggr_root = Path(f"/tmp/{aggr_repo_id}") + + num_shards = 2048 + repo_id = "cadene/droid_1.0.1_v30" + aggr_repo_id = f"{repo_id}_compact_6" + tags = ["openx"] + + # num_shards = 210 + # repo_id = "cadene/agibot_alpha_v30" + # aggr_repo_id = f"{repo_id}" + # tags = None + + # aggr_root = Path(f"/tmp/{aggr_repo_id}") + aggr_root = HF_LEROBOT_HOME / aggr_repo_id if aggr_root.exists(): shutil.rmtree(aggr_root) + repo_ids = [] + roots = [] + for rank in range(num_shards): + shard_repo_id = f"{repo_id}_world_{num_shards}_rank_{rank}" + shard_root = HF_LEROBOT_HOME / shard_repo_id + try: + meta = LeRobotDatasetMetadata(shard_repo_id, root=shard_root) + if len(meta.video_keys) == 0: + continue + repo_ids.append(shard_repo_id) + roots.append(shard_root) + except: + pass + + if rank == 1: + break + aggregate_datasets( - ["lerobot/aloha_sim_transfer_cube_human", "lerobot/aloha_sim_insertion_human"], + repo_ids, aggr_repo_id, + roots=roots, aggr_root=aggr_root, ) aggr_dataset = LeRobotDataset(repo_id=aggr_repo_id, root=aggr_root) - - for i in tqdm.tqdm(range(len(aggr_dataset))): - aggr_dataset[i] - pass - - aggr_dataset.push_to_hub(tags=["openx"]) + # for i in tqdm.tqdm(range(len(aggr_dataset))): + # aggr_dataset[i] + # pass + aggr_dataset.push_to_hub(tags=tags, upload_large_folder=True) diff --git a/lerobot/common/datasets/lerobot_dataset.py b/lerobot/common/datasets/lerobot_dataset.py index 4df310284..cf70bbf46 100644 --- a/lerobot/common/datasets/lerobot_dataset.py +++ b/lerobot/common/datasets/lerobot_dataset.py @@ -31,7 +31,7 @@ from datasets import Dataset from huggingface_hub import HfApi, snapshot_download from huggingface_hub.constants import REPOCARD_NAME from huggingface_hub.errors import RevisionNotFoundError - +from torch.profiler import record_function from lerobot.common.constants import HF_LEROBOT_HOME from lerobot.common.datasets.compute_stats import aggregate_stats, compute_episode_stats from lerobot.common.datasets.image_writer import AsyncImageWriter, write_image