Fix aggregate (num_frames, dataset_from_index, index)

This commit is contained in:
Remi Cadene
2025-05-06 15:13:35 +00:00
parent e11d2e4197
commit 588bf96559
2 changed files with 38 additions and 19 deletions

View File

@@ -47,9 +47,10 @@ def validate_all_metadata(all_metadata: list[LeRobotDatasetMetadata]):
return fps, robot_type, features
def update_episode_and_task(df, episode_index_to_add, old_tasks, new_tasks):
def update_episode_frame_task(df, episode_index_to_add, old_tasks, new_tasks, frame_index_to_add):
def _update(row):
row["episode_index"] = row["episode_index"] + episode_index_to_add
row["index"] = row["index"] + frame_index_to_add
task = old_tasks.iloc[row["task_index"]].name
row["task_index"] = new_tasks.loc[task].task_index.item()
return row
@@ -150,8 +151,10 @@ def aggregate_datasets(repo_ids: list[str], aggr_repo_id: str, roots: list[Path]
aggr_path = aggr_root / DEFAULT_EPISODES_PATH.format(
chunk_index=aggr_meta_chunk_idx, file_index=aggr_meta_file_idx
)
if aggr_path.exists():
if not aggr_path.exists():
aggr_path.parent.mkdir(parents=True, exist_ok=True)
df.to_parquet(aggr_path)
else:
size_in_mb = get_parquet_file_size_in_mb(path)
aggr_size_in_mb = get_parquet_file_size_in_mb(aggr_path)
@@ -160,13 +163,16 @@ def aggregate_datasets(repo_ids: list[str], aggr_repo_id: str, roots: list[Path]
aggr_meta_chunk_idx, aggr_meta_file_idx = update_chunk_file_indices(
aggr_meta_chunk_idx, aggr_meta_file_idx, DEFAULT_CHUNK_SIZE
)
aggr_path = aggr_root / DEFAULT_EPISODES_PATH.format(
chunk_index=aggr_meta_chunk_idx, file_index=aggr_meta_file_idx
)
aggr_path.parent.mkdir(parents=True, exist_ok=True)
df.to_parquet(aggr_path)
else:
# Update the existing parquet file with new rows
aggr_df = pd.read_parquet(aggr_path)
df = pd.concat([aggr_df, df], ignore_index=True)
aggr_path.parent.mkdir(parents=True, exist_ok=True)
df.to_parquet(aggr_path)
df.to_parquet(aggr_path)
# Aggregate videos if any
for key in video_keys:
@@ -187,7 +193,11 @@ def aggregate_datasets(repo_ids: list[str], aggr_repo_id: str, roots: list[Path]
chunk_index=aggr_videos_chunk_idx[key],
file_index=aggr_videos_file_idx[key],
)
if aggr_path.exists():
if not aggr_path.exists():
# First video
aggr_path.parent.mkdir(parents=True, exist_ok=True)
shutil.copy(str(path), str(aggr_path))
else:
size_in_mb = get_video_size_in_mb(path)
aggr_size_in_mb = get_video_size_in_mb(aggr_path)
@@ -196,6 +206,13 @@ def aggregate_datasets(repo_ids: list[str], aggr_repo_id: str, roots: list[Path]
aggr_videos_chunk_idx[key], aggr_videos_file_idx[key] = update_chunk_file_indices(
aggr_videos_chunk_idx[key], aggr_videos_file_idx[key], DEFAULT_CHUNK_SIZE
)
aggr_path = aggr_root / DEFAULT_VIDEO_PATH.format(
video_key=key,
chunk_index=aggr_videos_chunk_idx[key],
file_index=aggr_videos_file_idx[key],
)
aggr_path.parent.mkdir(parents=True, exist_ok=True)
shutil.copy(str(path), str(aggr_path))
else:
# Update the existing parquet file with new rows
concat_video_files(
@@ -205,10 +222,6 @@ def aggregate_datasets(repo_ids: list[str], aggr_repo_id: str, roots: list[Path]
aggr_videos_chunk_idx[key],
aggr_videos_file_idx[key],
)
else:
aggr_path.parent.mkdir(parents=True, exist_ok=True)
shutil.copy(str(path), str(aggr_path))
# copy_command = f"cp {video_path} {aggr_video_path} &"
# subprocess.Popen(copy_command, shell=True)
@@ -220,13 +233,15 @@ def aggregate_datasets(repo_ids: list[str], aggr_repo_id: str, roots: list[Path]
for chunk_idx, file_idx in data_chunk_file_ids:
path = meta.root / DEFAULT_DATA_PATH.format(chunk_index=chunk_idx, file_index=file_idx)
df = pd.read_parquet(path)
# TODO(rcadene): update frame index
df = update_episode_and_task(df, num_episodes, meta.tasks, aggr_meta.tasks)
df = update_episode_frame_task(df, num_episodes, meta.tasks, aggr_meta.tasks, num_frames)
aggr_path = aggr_root / DEFAULT_DATA_PATH.format(
chunk_index=aggr_data_chunk_idx, file_index=aggr_data_file_idx
)
if aggr_path.exists():
if not aggr_path.exists():
aggr_path.parent.mkdir(parents=True, exist_ok=True)
df.to_parquet(aggr_path)
else:
size_in_mb = get_parquet_file_size_in_mb(path)
aggr_size_in_mb = get_parquet_file_size_in_mb(aggr_path)
@@ -235,13 +250,16 @@ def aggregate_datasets(repo_ids: list[str], aggr_repo_id: str, roots: list[Path]
aggr_data_chunk_idx, aggr_data_file_idx = update_chunk_file_indices(
aggr_data_chunk_idx, aggr_data_file_idx, DEFAULT_CHUNK_SIZE
)
aggr_path = aggr_root / DEFAULT_DATA_PATH.format(
chunk_index=aggr_data_chunk_idx, file_index=aggr_data_file_idx
)
aggr_path.parent.mkdir(parents=True, exist_ok=True)
df.to_parquet(aggr_path)
else:
# Update the existing parquet file with new rows
aggr_df = pd.read_parquet(aggr_path)
df = pd.concat([aggr_df, df], ignore_index=True)
aggr_path.parent.mkdir(parents=True, exist_ok=True)
df.to_parquet(aggr_path)
df.to_parquet(aggr_path)
num_episodes += meta.total_episodes
num_frames += meta.total_frames

View File

@@ -276,6 +276,7 @@ class LeRobotDatasetMetadata:
ep_dataset = Dataset.from_dict(episode_dict)
ep_size_in_mb = get_hf_dataset_size_in_mb(ep_dataset)
df = pd.DataFrame(ep_dataset)
num_frames = episode_dict["length"][0]
if self.episodes is None:
# Initialize indices and frame count for a new dataset made of the first episode data
@@ -283,7 +284,7 @@ class LeRobotDatasetMetadata:
df["meta/episodes/chunk_index"] = [chunk_idx]
df["meta/episodes/file_index"] = [file_idx]
df["dataset_from_index"] = [0]
df["dataset_to_index"] = [len(df)]
df["dataset_to_index"] = [num_frames]
else:
# Retrieve information from the latest parquet file
latest_ep = self.episodes[-1]
@@ -301,7 +302,7 @@ class LeRobotDatasetMetadata:
df["meta/episodes/chunk_index"] = [chunk_idx]
df["meta/episodes/file_index"] = [file_idx]
df["dataset_from_index"] = [latest_ep["dataset_to_index"]]
df["dataset_to_index"] = [latest_ep["dataset_to_index"] + len(df)]
df["dataset_to_index"] = [latest_ep["dataset_to_index"] + num_frames]
if latest_size_in_mb + ep_size_in_mb < self.data_files_size_in_mb:
# Size limit wasnt reached, concatenate latest dataframe with new one