WIP aggregate

This commit is contained in:
Remi Cadene
2025-05-16 17:40:34 +00:00
parent e88af0e588
commit 8d360927af
3 changed files with 286 additions and 183 deletions

View File

@@ -6,7 +6,6 @@ from datatrove.executor.slurm import SlurmPipelineExecutor
from datatrove.pipeline.base import PipelineStep
from examples.port_datasets.droid_rlds.port_droid import DROID_SHARDS
from lerobot.common.datasets.lerobot_dataset import LeRobotDatasetMetadata
class PortDroidShards(PipelineStep):
@@ -30,6 +29,12 @@ class PortDroidShards(PipelineStep):
shard_repo_id = f"{self.repo_id}_world_{world_size}_rank_{rank}"
try:
validate_dataset(shard_repo_id)
return
except:
pass
port_droid(
self.raw_dir,
shard_repo_id,

View File

@@ -5,6 +5,7 @@ from pathlib import Path
import pandas as pd
import tqdm
from lerobot.common.constants import HF_LEROBOT_HOME
from lerobot.common.datasets.compute_stats import aggregate_stats
from lerobot.common.datasets.lerobot_dataset import LeRobotDataset, LeRobotDatasetMetadata
from lerobot.common.datasets.utils import (
@@ -47,12 +48,12 @@ def validate_all_metadata(all_metadata: list[LeRobotDatasetMetadata]):
return fps, robot_type, features
def update_episode_frame_task(df, episode_index_to_add, old_tasks, new_tasks, frame_index_to_add):
def update_data_df(df, src_meta, dst_meta):
def _update(row):
row["episode_index"] = row["episode_index"] + episode_index_to_add
row["index"] = row["index"] + frame_index_to_add
task = old_tasks.iloc[row["task_index"]].name
row["task_index"] = new_tasks.loc[task].task_index.item()
row["episode_index"] = row["episode_index"] + dst_meta["total_episodes"]
row["index"] = row["index"] + dst_meta["total_frames"]
task = src_meta.tasks.iloc[row["task_index"]].name
row["task_index"] = dst_meta.tasks.loc[task].task_index.item()
return row
return df.apply(_update, axis=1)
@@ -60,26 +61,28 @@ def update_episode_frame_task(df, episode_index_to_add, old_tasks, new_tasks, fr
def update_meta_data(
df,
meta_chunk_index_to_add,
meta_file_index_to_add,
data_chunk_index_to_add,
data_file_index_to_add,
videos_chunk_index_to_add,
videos_file_index_to_add,
frame_index_to_add,
dst_meta,
meta_idx,
data_idx,
videos_idx,
):
def _update(row):
row["meta/episodes/chunk_index"] = row["meta/episodes/chunk_index"] + meta_chunk_index_to_add
row["meta/episodes/file_index"] = row["meta/episodes/file_index"] + meta_file_index_to_add
row["data/chunk_index"] = row["data/chunk_index"] + data_chunk_index_to_add
row["data/file_index"] = row["data/file_index"] + data_file_index_to_add
for key in videos_chunk_index_to_add:
row[f"videos/{key}/chunk_index"] = (
row[f"videos/{key}/chunk_index"] + videos_chunk_index_to_add[key]
row["meta/episodes/chunk_index"] = row["meta/episodes/chunk_index"] + meta_idx["chunk_index"]
row["meta/episodes/file_index"] = row["meta/episodes/file_index"] + meta_idx["file_index"]
row["data/chunk_index"] = row["data/chunk_index"] + data_idx["chunk_index"]
row["data/file_index"] = row["data/file_index"] + data_idx["file_index"]
for key, video_idx in videos_idx.items():
row[f"videos/{key}/chunk_index"] = row[f"videos/{key}/chunk_index"] + video_idx["chunk_index"]
row[f"videos/{key}/file_index"] = row[f"videos/{key}/file_index"] + video_idx["file_index"]
row[f"videos/{key}/from_timestamp"] = (
row[f"videos/{key}/from_timestamp"] + video_idx["latest_duration"]
)
row[f"videos/{key}/file_index"] = row[f"videos/{key}/file_index"] + videos_file_index_to_add[key]
row["dataset_from_index"] = row["dataset_from_index"] + frame_index_to_add
row["dataset_to_index"] = row["dataset_to_index"] + frame_index_to_add
row[f"videos/{key}/to_timestamp"] = (
row[f"videos/{key}/to_timestamp"] + video_idx["latest_duration"]
)
row["dataset_from_index"] = row["dataset_from_index"] + dst_meta.info["total_frames"]
row["dataset_to_index"] = row["dataset_to_index"] + dst_meta.info["total_frames"]
row["episode_index"] = row["episode_index"] + dst_meta.info["total_episodes"]
return row
return df.apply(_update, axis=1)
@@ -88,214 +91,309 @@ def update_meta_data(
def aggregate_datasets(repo_ids: list[str], aggr_repo_id: str, roots: list[Path] = None, aggr_root=None):
logging.info("Start aggregate_datasets")
if roots is None:
all_metadata = [LeRobotDatasetMetadata(repo_id) for repo_id in repo_ids]
else:
all_metadata = [
# Load metadata
all_metadata = (
[LeRobotDatasetMetadata(repo_id) for repo_id in repo_ids]
if roots is None
else [
LeRobotDatasetMetadata(repo_id, root=root) for repo_id, root in zip(repo_ids, roots, strict=False)
]
)
fps, robot_type, features = validate_all_metadata(all_metadata)
video_keys = [key for key in features if features[key]["dtype"] == "video"]
video_keys = [k for k, v in features.items() if v["dtype"] == "video"]
# Create resulting dataset folder
aggr_meta = LeRobotDatasetMetadata.create(
# Initialize output dataset metadata
dst_meta = LeRobotDatasetMetadata.create(
repo_id=aggr_repo_id,
fps=fps,
robot_type=robot_type,
features=features,
root=aggr_root,
)
aggr_root = aggr_meta.root
# Aggregate task info
logging.info("Find all tasks")
unique_tasks = pd.concat([meta.tasks for meta in all_metadata]).index.unique()
aggr_meta.tasks = pd.DataFrame({"task_index": range(len(unique_tasks))}, index=unique_tasks)
unique_tasks = pd.concat([m.tasks for m in all_metadata]).index.unique()
dst_meta.tasks = pd.DataFrame({"task_index": range(len(unique_tasks))}, index=unique_tasks)
num_episodes = 0
num_frames = 0
# Track counters and indices
meta_idx = {"chunk": 0, "file": 0}
data_idx = {"chunk": 0, "file": 0}
videos_idx = {
key: {"chunk": 0, "file": 0, "latest_duration": 0, "episode_duration": 0} for key in video_keys
}
aggr_meta_chunk_idx = 0
aggr_meta_file_idx = 0
# Process each dataset
for src_meta in tqdm.tqdm(all_metadata, desc="Copy data and videos"):
videos_idx = aggregate_videos(src_meta, dst_meta, videos_idx)
data_idx = aggregate_data(src_meta, dst_meta, data_idx)
meta_idx = aggregate_metadata(src_meta, dst_meta, meta_idx, data_idx, videos_idx, video_keys)
aggr_data_chunk_idx = 0
aggr_data_file_idx = 0
dst_meta.info["total_episodes"] += src_meta.total_episodes
dst_meta.info["total_frames"] += src_meta.total_frames
aggr_videos_chunk_idx = dict.fromkeys(video_keys, 0)
aggr_videos_file_idx = dict.fromkeys(video_keys, 0)
finalize_aggregation(aggr_meta, all_metadata)
logging.info("Aggregation complete.")
for meta in tqdm.tqdm(all_metadata, desc="Copy data and videos"):
# Aggregate episodes meta data
meta_chunk_file_ids = {
(c, f)
for c, f in zip(
meta.episodes["meta/episodes/chunk_index"],
meta.episodes["meta/episodes/file_index"],
# -------------------------------
# Helper Functions
# -------------------------------
def aggregate_videos(src_meta, dst_meta, videos_idx):
"""
Aggregates video chunks from a dataset into the aggregated dataset folder.
"""
for key, video_idx in videos_idx.items():
# Get unique (chunk, file) combinations
unique_chunk_file_pairs = {
(chunk, file)
for chunk, file in zip(
src_meta.episodes[f"videos/{key}/chunk_index"],
src_meta.episodes[f"videos/{key}/file_index"],
strict=False,
)
}
for chunk_idx, file_idx in meta_chunk_file_ids:
path = meta.root / DEFAULT_EPISODES_PATH.format(chunk_index=chunk_idx, file_index=file_idx)
df = pd.read_parquet(path)
df = update_meta_data(
df,
aggr_meta_chunk_idx,
aggr_meta_file_idx,
aggr_data_chunk_idx,
aggr_data_file_idx,
aggr_videos_chunk_idx,
aggr_videos_file_idx,
num_frames,
# Current target chunk/file index
chunk_idx = video_idx["chunk_idx"]
file_idx = video_idx["file_idx"]
for src_chunk_idx, src_file_idx in unique_chunk_file_pairs:
src_path = src_meta.root / DEFAULT_VIDEO_PATH.format(
video_key=key,
chunk_index=src_chunk_idx,
file_index=src_file_idx,
)
aggr_path = aggr_root / DEFAULT_EPISODES_PATH.format(
chunk_index=aggr_meta_chunk_idx, file_index=aggr_meta_file_idx
dst_path = dst_meta.root / DEFAULT_VIDEO_PATH.format(
video_key=key,
chunk_index=chunk_idx,
file_index=file_idx,
)
if not aggr_path.exists():
aggr_path.parent.mkdir(parents=True, exist_ok=True)
df.to_parquet(aggr_path)
else:
size_in_mb = get_parquet_file_size_in_mb(path)
aggr_size_in_mb = get_parquet_file_size_in_mb(aggr_path)
if aggr_size_in_mb + size_in_mb >= DEFAULT_DATA_FILE_SIZE_IN_MB:
# Size limit is reached, prepare new parquet file
aggr_meta_chunk_idx, aggr_meta_file_idx = update_chunk_file_indices(
aggr_meta_chunk_idx, aggr_meta_file_idx, DEFAULT_CHUNK_SIZE
)
aggr_path = aggr_root / DEFAULT_EPISODES_PATH.format(
chunk_index=aggr_meta_chunk_idx, file_index=aggr_meta_file_idx
)
aggr_path.parent.mkdir(parents=True, exist_ok=True)
df.to_parquet(aggr_path)
else:
# Update the existing parquet file with new rows
aggr_df = pd.read_parquet(aggr_path)
df = pd.concat([aggr_df, df], ignore_index=True)
df.to_parquet(aggr_path)
if not dst_path.exists():
# First write to this destination file
dst_path.parent.mkdir(parents=True, exist_ok=True)
shutil.copy(str(src_path), str(dst_path))
continue
# Aggregate videos if any
for key in video_keys:
video_chunk_file_ids = {
(c, f)
for c, f in zip(
meta.episodes[f"videos/{key}/chunk_index"],
meta.episodes[f"videos/{key}/file_index"],
strict=False,
)
}
for chunk_idx, file_idx in video_chunk_file_ids:
path = meta.root / DEFAULT_VIDEO_PATH.format(
video_key=key, chunk_index=chunk_idx, file_index=file_idx
)
aggr_path = aggr_root / DEFAULT_VIDEO_PATH.format(
# Check file sizes before appending
src_size = get_video_size_in_mb(src_path)
dst_size = get_video_size_in_mb(dst_path)
if dst_size + src_size >= DEFAULT_VIDEO_FILE_SIZE_IN_MB:
# Rotate to a new chunk/file
chunk_idx, file_idx = update_chunk_file_indices(chunk_idx, file_idx, DEFAULT_CHUNK_SIZE)
dst_path = dst_meta.root / DEFAULT_VIDEO_PATH.format(
video_key=key,
chunk_index=aggr_videos_chunk_idx[key],
file_index=aggr_videos_file_idx[key],
chunk_index=chunk_idx,
file_index=file_idx,
)
if not aggr_path.exists():
# First video
aggr_path.parent.mkdir(parents=True, exist_ok=True)
shutil.copy(str(path), str(aggr_path))
else:
size_in_mb = get_video_size_in_mb(path)
aggr_size_in_mb = get_video_size_in_mb(aggr_path)
if aggr_size_in_mb + size_in_mb >= DEFAULT_VIDEO_FILE_SIZE_IN_MB:
# Size limit is reached, prepare new parquet file
aggr_videos_chunk_idx[key], aggr_videos_file_idx[key] = update_chunk_file_indices(
aggr_videos_chunk_idx[key], aggr_videos_file_idx[key], DEFAULT_CHUNK_SIZE
)
aggr_path = aggr_root / DEFAULT_VIDEO_PATH.format(
video_key=key,
chunk_index=aggr_videos_chunk_idx[key],
file_index=aggr_videos_file_idx[key],
)
aggr_path.parent.mkdir(parents=True, exist_ok=True)
shutil.copy(str(path), str(aggr_path))
else:
# Update the existing parquet file with new rows
concat_video_files(
[aggr_path, path],
aggr_root,
key,
aggr_videos_chunk_idx[key],
aggr_videos_file_idx[key],
)
# copy_command = f"cp {video_path} {aggr_video_path} &"
# subprocess.Popen(copy_command, shell=True)
# Aggregate data
data_chunk_file_ids = {
(c, f)
for c, f in zip(meta.episodes["data/chunk_index"], meta.episodes["data/file_index"], strict=False)
}
for chunk_idx, file_idx in data_chunk_file_ids:
path = meta.root / DEFAULT_DATA_PATH.format(chunk_index=chunk_idx, file_index=file_idx)
df = pd.read_parquet(path)
df = update_episode_frame_task(df, num_episodes, meta.tasks, aggr_meta.tasks, num_frames)
aggr_path = aggr_root / DEFAULT_DATA_PATH.format(
chunk_index=aggr_data_chunk_idx, file_index=aggr_data_file_idx
)
if not aggr_path.exists():
aggr_path.parent.mkdir(parents=True, exist_ok=True)
df.to_parquet(aggr_path)
dst_path.parent.mkdir(parents=True, exist_ok=True)
shutil.copy(str(src_path), str(dst_path))
else:
size_in_mb = get_parquet_file_size_in_mb(path)
aggr_size_in_mb = get_parquet_file_size_in_mb(aggr_path)
# Append to existing video file
concat_video_files(
[dst_path, src_path],
dst_meta.root,
key,
chunk_idx,
file_idx,
)
if aggr_size_in_mb + size_in_mb >= DEFAULT_DATA_FILE_SIZE_IN_MB:
# Size limit is reached, prepare new parquet file
aggr_data_chunk_idx, aggr_data_file_idx = update_chunk_file_indices(
aggr_data_chunk_idx, aggr_data_file_idx, DEFAULT_CHUNK_SIZE
)
aggr_path = aggr_root / DEFAULT_DATA_PATH.format(
chunk_index=aggr_data_chunk_idx, file_index=aggr_data_file_idx
)
aggr_path.parent.mkdir(parents=True, exist_ok=True)
df.to_parquet(aggr_path)
else:
# Update the existing parquet file with new rows
aggr_df = pd.read_parquet(aggr_path)
df = pd.concat([aggr_df, df], ignore_index=True)
df.to_parquet(aggr_path)
# Update the video index tracking
video_idx["chunk_idx"] = chunk_idx
video_idx["file_idx"] = file_idx
num_episodes += meta.total_episodes
num_frames += meta.total_frames
return videos_idx
def aggregate_data(src_meta, dst_meta, data_idx):
unique_chunk_file_ids = {
(c, f)
for c, f in zip(
src_meta.episodes["data/chunk_index"], src_meta.episodes["data/file_index"], strict=False
)
}
for src_chunk_idx, src_file_idx in unique_chunk_file_ids:
src_path = src_meta.root / DEFAULT_DATA_PATH.format(
chunk_index=src_chunk_idx, file_index=src_file_idx
)
df = pd.read_parquet(src_path)
df = update_data_df(df, src_meta, dst_meta)
dst_path = aggr_root / DEFAULT_DATA_PATH.format(
chunk_index=data_idx["chunk"], file_index=data_idx["file"]
)
data_idx = write_parquet_safely(
df,
src_path,
dst_path,
data_idx,
DEFAULT_DATA_FILE_SIZE_IN_MB,
DEFAULT_CHUNK_SIZE,
DEFAULT_DATA_PATH,
)
return data_idx
def aggregate_metadata(src_meta, dst_meta, meta_idx, data_idx, videos_idx):
chunk_file_ids = {
(c, f)
for c, f in zip(
src_meta.episodes["meta/episodes/chunk_index"],
src_meta.episodes["meta/episodes/file_index"],
strict=False,
)
}
for chunk_idx, file_idx in chunk_file_ids:
src_path = src_meta.root / DEFAULT_EPISODES_PATH.format(chunk_index=chunk_idx, file_index=file_idx)
df = pd.read_parquet(src_path)
df = update_meta_data(
df,
dst_meta,
meta_idx,
data_idx,
videos_idx,
)
# for k in video_keys:
# video_idx[k]["latest_duration"] += video_idx[k]["episode_duration"]
dst_path = dst_meta.root / DEFAULT_EPISODES_PATH.format(
chunk_index=meta_idx["chunk"], file_index=meta_idx["file"]
)
write_parquet_safely(
df,
src_path,
dst_path,
meta_idx,
DEFAULT_DATA_FILE_SIZE_IN_MB,
DEFAULT_CHUNK_SIZE,
DEFAULT_EPISODES_PATH,
)
return meta_idx
def write_parquet_safely(
df: pd.DataFrame,
src_path: Path,
dst_path: Path,
idx: dict[str, int],
max_mb: float,
chunk_size: int,
default_path: str,
):
"""
Safely appends or creates a Parquet file at dst_path based on size constraints.
Parameters:
df (pd.DataFrame): Data to write.
src_path (Path): Path to source file (used to get size).
dst_path (Path): Target path for writing.
idx (dict): Dictionary containing 'chunk' and 'file' indices.
max_mb (float): Maximum allowed file size in MB.
chunk_size (int): Maximum number of files per chunk.
default_path (str): Format string for generating a new file path.
Returns:
dict: Updated index dictionary.
"""
# If destination file doesn't exist, just write the new one
if not dst_path.exists():
dst_path.parent.mkdir(parents=True, exist_ok=True)
df.to_parquet(dst_path)
return idx
# Otherwise, check if we exceed the size limit
src_size = get_parquet_file_size_in_mb(src_path)
dst_size = get_parquet_file_size_in_mb(dst_path)
if dst_size + src_size >= max_mb:
# File is too large, move to a new one
idx["chunk"], idx["file"] = update_chunk_file_indices(idx["chunk"], idx["file"], chunk_size)
new_path = dst_path.parent / default_path.format(chunk_index=idx["chunk"], file_index=idx["file"])
new_path.parent.mkdir(parents=True, exist_ok=True)
df.to_parquet(new_path)
else:
# Append to existing file
existing_df = pd.read_parquet(dst_path)
combined_df = pd.concat([existing_df, df], ignore_index=True)
combined_df.to_parquet(dst_path)
return idx
def finalize_aggregation(aggr_meta, all_metadata):
logging.info("write tasks")
write_tasks(aggr_meta.tasks, aggr_meta.root)
logging.info("write info")
aggr_meta.info["total_tasks"] = len(aggr_meta.tasks)
aggr_meta.info["total_episodes"] = sum([meta.total_episodes for meta in all_metadata])
aggr_meta.info["total_frames"] = sum([meta.total_frames for meta in all_metadata])
aggr_meta.info["splits"] = {"train": f"0:{aggr_meta.total_episodes}"}
aggr_meta.info.update(
{
"total_tasks": len(aggr_meta.tasks),
"total_episodes": sum(m.total_episodes for m in all_metadata),
"total_frames": sum(m.total_frames for m in all_metadata),
"splits": {"train": f"0:{sum(m.total_episodes for m in all_metadata)}"},
}
)
write_info(aggr_meta.info, aggr_meta.root)
logging.info("write stats")
aggr_meta.stats = aggregate_stats([meta.stats for meta in all_metadata])
aggr_meta.stats = aggregate_stats([m.stats for m in all_metadata])
write_stats(aggr_meta.stats, aggr_meta.root)
if __name__ == "__main__":
init_logging()
aggr_repo_id = "cadene/aggregate_test"
aggr_root = Path(f"/tmp/{aggr_repo_id}")
num_shards = 2048
repo_id = "cadene/droid_1.0.1_v30"
aggr_repo_id = f"{repo_id}_compact_6"
tags = ["openx"]
# num_shards = 210
# repo_id = "cadene/agibot_alpha_v30"
# aggr_repo_id = f"{repo_id}"
# tags = None
# aggr_root = Path(f"/tmp/{aggr_repo_id}")
aggr_root = HF_LEROBOT_HOME / aggr_repo_id
if aggr_root.exists():
shutil.rmtree(aggr_root)
repo_ids = []
roots = []
for rank in range(num_shards):
shard_repo_id = f"{repo_id}_world_{num_shards}_rank_{rank}"
shard_root = HF_LEROBOT_HOME / shard_repo_id
try:
meta = LeRobotDatasetMetadata(shard_repo_id, root=shard_root)
if len(meta.video_keys) == 0:
continue
repo_ids.append(shard_repo_id)
roots.append(shard_root)
except:
pass
if rank == 1:
break
aggregate_datasets(
["lerobot/aloha_sim_transfer_cube_human", "lerobot/aloha_sim_insertion_human"],
repo_ids,
aggr_repo_id,
roots=roots,
aggr_root=aggr_root,
)
aggr_dataset = LeRobotDataset(repo_id=aggr_repo_id, root=aggr_root)
for i in tqdm.tqdm(range(len(aggr_dataset))):
aggr_dataset[i]
pass
aggr_dataset.push_to_hub(tags=["openx"])
# for i in tqdm.tqdm(range(len(aggr_dataset))):
# aggr_dataset[i]
# pass
aggr_dataset.push_to_hub(tags=tags, upload_large_folder=True)

View File

@@ -31,7 +31,7 @@ from datasets import Dataset
from huggingface_hub import HfApi, snapshot_download
from huggingface_hub.constants import REPOCARD_NAME
from huggingface_hub.errors import RevisionNotFoundError
from torch.profiler import record_function
from lerobot.common.constants import HF_LEROBOT_HOME
from lerobot.common.datasets.compute_stats import aggregate_stats, compute_episode_stats
from lerobot.common.datasets.image_writer import AsyncImageWriter, write_image