From 588bf965595dd9e3bbf076cdb5dbb78e2ffd0893 Mon Sep 17 00:00:00 2001 From: Remi Cadene Date: Tue, 6 May 2025 15:13:35 +0000 Subject: [PATCH] Fix aggregate (num_frames, dataset_from_index, index) --- lerobot/common/datasets/aggregate.py | 52 +++++++++++++++------- lerobot/common/datasets/lerobot_dataset.py | 5 ++- 2 files changed, 38 insertions(+), 19 deletions(-) diff --git a/lerobot/common/datasets/aggregate.py b/lerobot/common/datasets/aggregate.py index 5b5768fd..2cf58ff5 100644 --- a/lerobot/common/datasets/aggregate.py +++ b/lerobot/common/datasets/aggregate.py @@ -47,9 +47,10 @@ def validate_all_metadata(all_metadata: list[LeRobotDatasetMetadata]): return fps, robot_type, features -def update_episode_and_task(df, episode_index_to_add, old_tasks, new_tasks): +def update_episode_frame_task(df, episode_index_to_add, old_tasks, new_tasks, frame_index_to_add): def _update(row): row["episode_index"] = row["episode_index"] + episode_index_to_add + row["index"] = row["index"] + frame_index_to_add task = old_tasks.iloc[row["task_index"]].name row["task_index"] = new_tasks.loc[task].task_index.item() return row @@ -150,8 +151,10 @@ def aggregate_datasets(repo_ids: list[str], aggr_repo_id: str, roots: list[Path] aggr_path = aggr_root / DEFAULT_EPISODES_PATH.format( chunk_index=aggr_meta_chunk_idx, file_index=aggr_meta_file_idx ) - - if aggr_path.exists(): + if not aggr_path.exists(): + aggr_path.parent.mkdir(parents=True, exist_ok=True) + df.to_parquet(aggr_path) + else: size_in_mb = get_parquet_file_size_in_mb(path) aggr_size_in_mb = get_parquet_file_size_in_mb(aggr_path) @@ -160,13 +163,16 @@ def aggregate_datasets(repo_ids: list[str], aggr_repo_id: str, roots: list[Path] aggr_meta_chunk_idx, aggr_meta_file_idx = update_chunk_file_indices( aggr_meta_chunk_idx, aggr_meta_file_idx, DEFAULT_CHUNK_SIZE ) + aggr_path = aggr_root / DEFAULT_EPISODES_PATH.format( + chunk_index=aggr_meta_chunk_idx, file_index=aggr_meta_file_idx + ) + aggr_path.parent.mkdir(parents=True, exist_ok=True) + df.to_parquet(aggr_path) else: # Update the existing parquet file with new rows aggr_df = pd.read_parquet(aggr_path) df = pd.concat([aggr_df, df], ignore_index=True) - - aggr_path.parent.mkdir(parents=True, exist_ok=True) - df.to_parquet(aggr_path) + df.to_parquet(aggr_path) # Aggregate videos if any for key in video_keys: @@ -187,7 +193,11 @@ def aggregate_datasets(repo_ids: list[str], aggr_repo_id: str, roots: list[Path] chunk_index=aggr_videos_chunk_idx[key], file_index=aggr_videos_file_idx[key], ) - if aggr_path.exists(): + if not aggr_path.exists(): + # First video + aggr_path.parent.mkdir(parents=True, exist_ok=True) + shutil.copy(str(path), str(aggr_path)) + else: size_in_mb = get_video_size_in_mb(path) aggr_size_in_mb = get_video_size_in_mb(aggr_path) @@ -196,6 +206,13 @@ def aggregate_datasets(repo_ids: list[str], aggr_repo_id: str, roots: list[Path] aggr_videos_chunk_idx[key], aggr_videos_file_idx[key] = update_chunk_file_indices( aggr_videos_chunk_idx[key], aggr_videos_file_idx[key], DEFAULT_CHUNK_SIZE ) + aggr_path = aggr_root / DEFAULT_VIDEO_PATH.format( + video_key=key, + chunk_index=aggr_videos_chunk_idx[key], + file_index=aggr_videos_file_idx[key], + ) + aggr_path.parent.mkdir(parents=True, exist_ok=True) + shutil.copy(str(path), str(aggr_path)) else: # Update the existing parquet file with new rows concat_video_files( @@ -205,10 +222,6 @@ def aggregate_datasets(repo_ids: list[str], aggr_repo_id: str, roots: list[Path] aggr_videos_chunk_idx[key], aggr_videos_file_idx[key], ) - else: - aggr_path.parent.mkdir(parents=True, exist_ok=True) - shutil.copy(str(path), str(aggr_path)) - # copy_command = f"cp {video_path} {aggr_video_path} &" # subprocess.Popen(copy_command, shell=True) @@ -220,13 +233,15 @@ def aggregate_datasets(repo_ids: list[str], aggr_repo_id: str, roots: list[Path] for chunk_idx, file_idx in data_chunk_file_ids: path = meta.root / DEFAULT_DATA_PATH.format(chunk_index=chunk_idx, file_index=file_idx) df = pd.read_parquet(path) - # TODO(rcadene): update frame index - df = update_episode_and_task(df, num_episodes, meta.tasks, aggr_meta.tasks) + df = update_episode_frame_task(df, num_episodes, meta.tasks, aggr_meta.tasks, num_frames) aggr_path = aggr_root / DEFAULT_DATA_PATH.format( chunk_index=aggr_data_chunk_idx, file_index=aggr_data_file_idx ) - if aggr_path.exists(): + if not aggr_path.exists(): + aggr_path.parent.mkdir(parents=True, exist_ok=True) + df.to_parquet(aggr_path) + else: size_in_mb = get_parquet_file_size_in_mb(path) aggr_size_in_mb = get_parquet_file_size_in_mb(aggr_path) @@ -235,13 +250,16 @@ def aggregate_datasets(repo_ids: list[str], aggr_repo_id: str, roots: list[Path] aggr_data_chunk_idx, aggr_data_file_idx = update_chunk_file_indices( aggr_data_chunk_idx, aggr_data_file_idx, DEFAULT_CHUNK_SIZE ) + aggr_path = aggr_root / DEFAULT_DATA_PATH.format( + chunk_index=aggr_data_chunk_idx, file_index=aggr_data_file_idx + ) + aggr_path.parent.mkdir(parents=True, exist_ok=True) + df.to_parquet(aggr_path) else: # Update the existing parquet file with new rows aggr_df = pd.read_parquet(aggr_path) df = pd.concat([aggr_df, df], ignore_index=True) - - aggr_path.parent.mkdir(parents=True, exist_ok=True) - df.to_parquet(aggr_path) + df.to_parquet(aggr_path) num_episodes += meta.total_episodes num_frames += meta.total_frames diff --git a/lerobot/common/datasets/lerobot_dataset.py b/lerobot/common/datasets/lerobot_dataset.py index dddd11ab..0d280063 100644 --- a/lerobot/common/datasets/lerobot_dataset.py +++ b/lerobot/common/datasets/lerobot_dataset.py @@ -276,6 +276,7 @@ class LeRobotDatasetMetadata: ep_dataset = Dataset.from_dict(episode_dict) ep_size_in_mb = get_hf_dataset_size_in_mb(ep_dataset) df = pd.DataFrame(ep_dataset) + num_frames = episode_dict["length"][0] if self.episodes is None: # Initialize indices and frame count for a new dataset made of the first episode data @@ -283,7 +284,7 @@ class LeRobotDatasetMetadata: df["meta/episodes/chunk_index"] = [chunk_idx] df["meta/episodes/file_index"] = [file_idx] df["dataset_from_index"] = [0] - df["dataset_to_index"] = [len(df)] + df["dataset_to_index"] = [num_frames] else: # Retrieve information from the latest parquet file latest_ep = self.episodes[-1] @@ -301,7 +302,7 @@ class LeRobotDatasetMetadata: df["meta/episodes/chunk_index"] = [chunk_idx] df["meta/episodes/file_index"] = [file_idx] df["dataset_from_index"] = [latest_ep["dataset_to_index"]] - df["dataset_to_index"] = [latest_ep["dataset_to_index"] + len(df)] + df["dataset_to_index"] = [latest_ep["dataset_to_index"] + num_frames] if latest_size_in_mb + ep_size_in_mb < self.data_files_size_in_mb: # Size limit wasnt reached, concatenate latest dataframe with new one