Commit before episodes episodes_stats merging

This commit is contained in:
Remi Cadene
2025-04-09 15:20:15 +02:00
parent 53ecec5fb2
commit c1b28f0b58
12 changed files with 905 additions and 396 deletions

View File

@@ -21,40 +21,62 @@ from collections.abc import Iterator
from itertools import accumulate
from pathlib import Path
from pprint import pformat
import subprocess
import tempfile
from types import SimpleNamespace
from typing import Any
from typing import Any, Tuple
import datasets
import jsonlines
import numpy as np
import packaging.version
import pandas
import torch
from datasets.table import embed_table_storage
from huggingface_hub import DatasetCard, DatasetCardData, HfApi
from huggingface_hub.errors import RevisionNotFoundError
from PIL import Image as PILImage
from torchvision import transforms
from datasets import Dataset, concatenate_datasets
from lerobot.common.datasets.backward_compatibility import (
V21_MESSAGE,
V30_MESSAGE,
BackwardCompatibilityError,
ForwardCompatibilityError,
)
from lerobot.common.robot_devices.robots.utils import Robot
from lerobot.common.utils.utils import is_valid_numpy_dtype_string
from lerobot.configs.types import DictLike, FeatureType, PolicyFeature
import pyarrow.parquet as pq
DEFAULT_CHUNK_SIZE = 1000 # Max number of episodes per chunk
DEFAULT_CHUNK_SIZE = 1000 # Max number of files per chunk
DEFAULT_FILE_SIZE_IN_MB = 500.0 # Max size per file
# Keep legacy for `convert_dataset_v21_to_v30.py`
LEGACY_EPISODES_PATH = "meta/episodes.jsonl"
LEGACY_STATS_PATH = "meta/stats.json"
LEGACY_EPISODES_STATS_PATH = "meta/episodes_stats.jsonl"
LEGACY_TASKS_PATH = "meta/tasks.jsonl"
LEGACY_DEFAULT_VIDEO_PATH = "videos/chunk-{episode_chunk:03d}/{video_key}/episode_{episode_index:06d}.mp4"
LEGACY_DEFAULT_PARQUET_PATH = "data/chunk-{episode_chunk:03d}/episode_{episode_index:06d}.parquet"
# TODO
DEFAULT_IMAGE_PATH = "images/{image_key}/episode_{episode_index:06d}/frame_{frame_index:06d}.png"
EPISODES_DIR = "meta/episodes"
EPISODES_STATS_DIR = "meta/episodes_stats"
TASKS_DIR = "meta/tasks"
DATA_DIR = "data"
VIDEO_DIR = "videos"
INFO_PATH = "meta/info.json"
EPISODES_PATH = "meta/episodes.jsonl"
STATS_PATH = "meta/stats.json"
EPISODES_STATS_PATH = "meta/episodes_stats.jsonl"
TASKS_PATH = "meta/tasks.jsonl"
DEFAULT_VIDEO_PATH = "videos/chunk-{episode_chunk:03d}/{video_key}/episode_{episode_index:06d}.mp4"
DEFAULT_PARQUET_PATH = "data/chunk-{episode_chunk:03d}/episode_{episode_index:06d}.parquet"
DEFAULT_IMAGE_PATH = "images/{image_key}/episode_{episode_index:06d}/frame_{frame_index:06d}.png"
CHUNK_FILE_PATTERN = "chunk-{chunk_index:03d}/file-{file_index:03d}"
DEFAULT_EPISODES_PATH = EPISODES_DIR + "/" + CHUNK_FILE_PATTERN + ".parquet"
DEFAULT_EPISODES_STATS_PATH = EPISODES_STATS_DIR + "/" + CHUNK_FILE_PATTERN + ".parquet"
DEFAULT_TASKS_PATH = TASKS_DIR + "/" + CHUNK_FILE_PATTERN + ".parquet"
DEFAULT_DATA_PATH = DATA_DIR + "/" + CHUNK_FILE_PATTERN + ".parquet"
DEFAULT_VIDEO_PATH = VIDEO_DIR + "/{video_key}/" + CHUNK_FILE_PATTERN + ".mp4"
DATASET_CARD_TEMPLATE = """
---
@@ -75,6 +97,88 @@ DEFAULT_FEATURES = {
}
def get_hf_dataset_size_in_mb(hf_ds: Dataset) -> int:
return hf_ds.data.nbytes / (1024 ** 2)
def get_pd_dataframe_size_in_mb(df: pandas.DataFrame) -> int:
memory_usage_bytes = df.memory_usage(deep=True).sum()
return memory_usage_bytes / (1024 ** 2)
def get_chunk_file_indices(path: Path) -> Tuple[int, int]:
if not path.stem.startswith("file-") or not path.parent.name.startswith("chunk-"):
raise ValueError(f"Path does not follow {CHUNK_FILE_PATTERN}: '{path}'")
chunk_index = int(path.parent.replace("chunk-", ""))
file_index = int(path.stem.replace("file-", ""))
return chunk_index, file_index
def update_chunk_file_indices(chunk_idx: int, file_idx: int, chunks_size: int):
if file_idx == chunks_size - 1:
file_idx = 0
chunk_idx += 1
else:
file_idx += 1
return chunk_idx, file_idx
def load_nested_dataset(pq_dir: Path) -> Dataset:
""" Find parquet files in provided directory {pq_dir}/chunk-xxx/file-xxx.parquet
Convert parquet files to pyarrow memory mapped in a cache folder for efficient RAM usage
Concatenate all pyarrow references to return HF Dataset format
"""
# TODO(rcadene): set num_proc to accelerate conversion to pyarrow
return concatenate_datasets([Dataset.from_parquet(str(path)) for path in sorted(pq_dir.glob("*/*.parquet"))])
def get_latest_parquet_path(pq_dir: Path) -> Path:
return sorted(pq_dir.glob("*/*.parquet"))[-1]
def get_latest_video_path(pq_dir: Path, video_key: str) -> Path:
return sorted(pq_dir.glob(f"{video_key}/*/*.mp4"))[-1]
def get_parquet_num_frames(parquet_path):
metadata = pq.read_metadata(parquet_path)
return metadata.num_rows
def get_video_size_in_mb(mp4_path: Path):
file_size_bytes = mp4_path.stat().st_size
file_size_mb = file_size_bytes / (1024 ** 2)
return file_size_mb
def concat_video_files(paths_to_cat, new_root, video_key, chunk_idx, file_idx):
# Create a text file with the list of files to concatenate
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt') as f:
temp_file_path = f.name
for ep_path in paths_to_cat:
f.write(f"file '{str(ep_path)}'\n")
output_path = new_root / DEFAULT_VIDEO_PATH.format(video_key=video_key, chunk_index=chunk_idx, file_index=file_idx)
output_path.parent.mkdir(parents=True, exist_ok=True)
command = [
'ffmpeg',
'-y',
'-f', 'concat',
'-safe', '0',
'-i', str(temp_file_path),
'-c', 'copy',
str(output_path)
]
subprocess.run(command, check=True)
Path(temp_file_path).unlink()
def get_video_duration_in_s(mp4_file: Path):
result = subprocess.run(
[
'ffprobe',
'-v', 'error',
'-show_entries', 'format=duration',
'-of', 'default=noprint_wrappers=1:nokey=1',
mp4_file
],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT
)
return float(result.stdout)
def flatten_dict(d: dict, parent_key: str = "", sep: str = "/") -> dict:
"""Flatten a nested dictionary structure by collapsing nested keys into one key with a separator.
@@ -124,6 +228,8 @@ def serialize_dict(stats: dict[str, torch.Tensor | np.ndarray | dict]) -> dict:
for key, value in flatten_dict(stats).items():
if isinstance(value, (torch.Tensor, np.ndarray)):
serialized_dict[key] = value.tolist()
elif isinstance(value, list) and isinstance(value[0], (int, float, list)):
serialized_dict[key] = value
elif isinstance(value, np.generic):
serialized_dict[key] = value.item()
elif isinstance(value, (int, float)):
@@ -183,7 +289,7 @@ def load_info(local_dir: Path) -> dict:
def write_stats(stats: dict, local_dir: Path):
serialized_stats = serialize_dict(stats)
write_json(serialized_stats, local_dir / STATS_PATH)
write_json(serialized_stats, local_dir / LEGACY_STATS_PATH)
def cast_stats_to_numpy(stats) -> dict[str, dict[str, np.ndarray]]:
@@ -192,50 +298,91 @@ def cast_stats_to_numpy(stats) -> dict[str, dict[str, np.ndarray]]:
def load_stats(local_dir: Path) -> dict[str, dict[str, np.ndarray]]:
if not (local_dir / STATS_PATH).exists():
if not (local_dir / LEGACY_STATS_PATH).exists():
return None
stats = load_json(local_dir / STATS_PATH)
stats = load_json(local_dir / LEGACY_STATS_PATH)
return cast_stats_to_numpy(stats)
def write_task(task_index: int, task: dict, local_dir: Path):
def write_hf_dataset(hf_dataset: Dataset, local_dir: Path):
if get_hf_dataset_size_in_mb(hf_dataset) > DEFAULT_FILE_SIZE_IN_MB:
raise NotImplementedError("Contact a maintainer.")
path = local_dir / DEFAULT_DATA_PATH.format(chunk_index=0, file_index=0)
path.parent.mkdir(parents=True, exist_ok=True)
hf_dataset.to_parquet(path)
def write_tasks(tasks: pandas.DataFrame, local_dir: Path):
path = local_dir / DEFAULT_TASKS_PATH.format(chunk_index=0, file_index=0)
path.parent.mkdir(parents=True, exist_ok=True)
tasks.to_parquet(path)
def legacy_write_task(task_index: int, task: dict, local_dir: Path):
task_dict = {
"task_index": task_index,
"task": task,
}
append_jsonlines(task_dict, local_dir / TASKS_PATH)
append_jsonlines(task_dict, local_dir / LEGACY_TASKS_PATH)
def load_tasks(local_dir: Path) -> tuple[dict, dict]:
tasks = load_jsonlines(local_dir / TASKS_PATH)
def legacy_load_tasks(local_dir: Path) -> tuple[dict, dict]:
tasks = load_jsonlines(local_dir / LEGACY_TASKS_PATH)
tasks = {item["task_index"]: item["task"] for item in sorted(tasks, key=lambda x: x["task_index"])}
task_to_task_index = {task: task_index for task_index, task in tasks.items()}
return tasks, task_to_task_index
def load_tasks(local_dir: Path):
tasks = load_nested_dataset(local_dir / TASKS_DIR)
# TODO(rcadene): optimize this
task_to_task_index = {d["task"]: d["task_index"] for d in tasks}
return tasks, task_to_task_index
def write_episode(episode: dict, local_dir: Path):
append_jsonlines(episode, local_dir / EPISODES_PATH)
append_jsonlines(episode, local_dir / LEGACY_EPISODES_PATH)
def write_episodes(episodes: Dataset, local_dir: Path):
if get_hf_dataset_size_in_mb(episodes) > DEFAULT_FILE_SIZE_IN_MB:
raise NotImplementedError("Contact a maintainer.")
fpath = local_dir / DEFAULT_EPISODES_PATH.format(chunk_index=0, file_index=0)
fpath.parent.mkdir(parents=True, exist_ok=True)
episodes.to_parquet(fpath)
def load_episodes(local_dir: Path) -> dict:
episodes = load_jsonlines(local_dir / EPISODES_PATH)
def legacy_load_episodes(local_dir: Path) -> dict:
episodes = load_jsonlines(local_dir / LEGACY_EPISODES_PATH)
return {item["episode_index"]: item for item in sorted(episodes, key=lambda x: x["episode_index"])}
def load_episodes(local_dir: Path):
hf_dataset = load_nested_dataset(local_dir / EPISODES_DIR)
return hf_dataset
def write_episode_stats(episode_index: int, episode_stats: dict, local_dir: Path):
def legacy_write_episode_stats(episode_index: int, episode_stats: dict, local_dir: Path):
# We wrap episode_stats in a dictionary since `episode_stats["episode_index"]`
# is a dictionary of stats and not an integer.
episode_stats = {"episode_index": episode_index, "stats": serialize_dict(episode_stats)}
append_jsonlines(episode_stats, local_dir / EPISODES_STATS_PATH)
append_jsonlines(episode_stats, local_dir / LEGACY_EPISODES_STATS_PATH)
def load_episodes_stats(local_dir: Path) -> dict:
episodes_stats = load_jsonlines(local_dir / EPISODES_STATS_PATH)
def write_episodes_stats(episodes_stats: Dataset, local_dir: Path):
if get_hf_dataset_size_in_mb(episodes_stats) > DEFAULT_FILE_SIZE_IN_MB:
raise NotImplementedError("Contact a maintainer.")
fpath = local_dir / DEFAULT_EPISODES_STATS_PATH.format(chunk_index=0, file_index=0)
fpath.parent.mkdir(parents=True, exist_ok=True)
episodes_stats.to_parquet(fpath)
def legacy_load_episodes_stats(local_dir: Path) -> dict:
episodes_stats = load_jsonlines(local_dir / LEGACY_EPISODES_STATS_PATH)
return {
item["episode_index"]: cast_stats_to_numpy(item["stats"])
for item in sorted(episodes_stats, key=lambda x: x["episode_index"])
}
def load_episodes_stats(local_dir: Path):
hf_dataset = load_nested_dataset(local_dir / EPISODES_STATS_DIR)
return hf_dataset
def backward_compatible_episodes_stats(
stats: dict[str, dict[str, np.ndarray]], episodes: list[int]
@@ -388,6 +535,7 @@ def get_hf_features_from_features(features: dict) -> datasets.Features:
def get_features_from_robot(robot: Robot, use_videos: bool = True) -> dict:
# TODO(rcadene): add fps for each feature
camera_ft = {}
if robot.cameras:
camera_ft = {
@@ -442,11 +590,11 @@ def create_empty_dataset_info(
"total_frames": 0,
"total_tasks": 0,
"total_videos": 0,
"total_chunks": 0,
"chunks_size": DEFAULT_CHUNK_SIZE,
"files_size_in_mb": DEFAULT_FILE_SIZE_IN_MB,
"fps": fps,
"splits": {},
"data_path": DEFAULT_PARQUET_PATH,
"data_path": DEFAULT_DATA_PATH,
"video_path": DEFAULT_VIDEO_PATH if use_videos else None,
"features": features,
}