Dataset v3 (#1412)

Co-authored-by: Simon Alibert <75076266+aliberts@users.noreply.github.com>
Co-authored-by: Remi Cadene <re.cadene@gmail.com>
Co-authored-by: Tavish <tavish9.chen@gmail.com>
Co-authored-by: fracapuano <francesco.capuano@huggingface.co>
Co-authored-by: CarolinePascal <caroline8.pascal@gmail.com>
This commit is contained in:
Michel Aractingi
2025-09-15 09:53:30 +02:00
committed by GitHub
parent d602e8169c
commit f55c6e89f0
50 changed files with 4642 additions and 4092 deletions

View File

@@ -11,137 +11,166 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import logging
from pathlib import Path
import datasets
import jsonlines
import pyarrow.compute as pc
import pyarrow.parquet as pq
import numpy as np
import pandas as pd
import pytest
from datasets import Dataset
from lerobot.datasets.utils import (
EPISODES_PATH,
EPISODES_STATS_PATH,
INFO_PATH,
STATS_PATH,
TASKS_PATH,
DEFAULT_CHUNK_SIZE,
DEFAULT_DATA_FILE_SIZE_IN_MB,
DEFAULT_DATA_PATH,
get_hf_dataset_size_in_mb,
update_chunk_file_indices,
write_episodes,
write_info,
write_stats,
write_tasks,
)
def write_hf_dataset(
hf_dataset: Dataset,
local_dir: Path,
data_file_size_mb: float | None = None,
chunk_size: int | None = None,
):
"""
Writes a Hugging Face Dataset to one or more Parquet files in a structured directory format.
If the dataset size is within `DEFAULT_DATA_FILE_SIZE_IN_MB`, it's saved as a single file.
Otherwise, the dataset is split into multiple smaller Parquet files, each not exceeding the size limit.
The file and chunk indices are managed to organize the output files in a hierarchical structure,
e.g., `data/chunk-000/file-000.parquet`, `data/chunk-000/file-001.parquet`, etc.
This function ensures that episodes are not split across multiple files.
Args:
hf_dataset (Dataset): The Hugging Face Dataset to be written to disk.
local_dir (Path): The root directory where the dataset files will be stored.
data_file_size_mb (float, optional): Maximal size for the parquet data file, in MB. Defaults to DEFAULT_DATA_FILE_SIZE_IN_MB.
chunk_size (int, optional): Maximal number of files within a chunk folder before creating another one. Defaults to DEFAULT_CHUNK_SIZE.
"""
if data_file_size_mb is None:
data_file_size_mb = DEFAULT_DATA_FILE_SIZE_IN_MB
if chunk_size is None:
chunk_size = DEFAULT_CHUNK_SIZE
dataset_size_in_mb = get_hf_dataset_size_in_mb(hf_dataset)
if dataset_size_in_mb <= data_file_size_mb:
# If the dataset is small enough, write it to a single file.
path = local_dir / DEFAULT_DATA_PATH.format(chunk_index=0, file_index=0)
path.parent.mkdir(parents=True, exist_ok=True)
hf_dataset.to_parquet(path)
return
# If the dataset is too large, split it into smaller chunks, keeping episodes whole.
episode_indices = np.array(hf_dataset["episode_index"])
episode_boundaries = np.where(np.diff(episode_indices) != 0)[0] + 1
episode_starts = np.concatenate(([0], episode_boundaries))
episode_ends = np.concatenate((episode_boundaries, [len(hf_dataset)]))
num_episodes = len(episode_starts)
current_episode_idx = 0
chunk_idx, file_idx = 0, 0
while current_episode_idx < num_episodes:
shard_start_row = episode_starts[current_episode_idx]
shard_end_row = episode_ends[current_episode_idx]
next_episode_to_try_idx = current_episode_idx + 1
while next_episode_to_try_idx < num_episodes:
potential_shard_end_row = episode_ends[next_episode_to_try_idx]
dataset_shard_candidate = hf_dataset.select(range(shard_start_row, potential_shard_end_row))
shard_size_mb = get_hf_dataset_size_in_mb(dataset_shard_candidate)
if shard_size_mb > data_file_size_mb:
break
else:
shard_end_row = potential_shard_end_row
next_episode_to_try_idx += 1
dataset_shard = hf_dataset.select(range(shard_start_row, shard_end_row))
if (
shard_start_row == episode_starts[current_episode_idx]
and shard_end_row == episode_ends[current_episode_idx]
):
shard_size_mb = get_hf_dataset_size_in_mb(dataset_shard)
if shard_size_mb > data_file_size_mb:
logging.warning(
f"Episode with index {hf_dataset[shard_start_row.item()]['episode_index']} has size {shard_size_mb:.2f}MB, "
f"which is larger than data_file_size_mb ({data_file_size_mb}MB). "
"Writing it to a separate shard anyway to preserve episode integrity."
)
# Define the path for the current shard and ensure the directory exists.
path = local_dir / DEFAULT_DATA_PATH.format(chunk_index=chunk_idx, file_index=file_idx)
path.parent.mkdir(parents=True, exist_ok=True)
# Write the shard to a Parquet file.
dataset_shard.to_parquet(path)
# Update chunk and file indices for the next iteration.
chunk_idx, file_idx = update_chunk_file_indices(chunk_idx, file_idx, chunk_size)
current_episode_idx = next_episode_to_try_idx
@pytest.fixture(scope="session")
def info_path(info_factory):
def _create_info_json_file(dir: Path, info: dict | None = None) -> Path:
if not info:
def create_info(info_factory):
def _create_info(dir: Path, info: dict | None = None):
if info is None:
info = info_factory()
fpath = dir / INFO_PATH
fpath.parent.mkdir(parents=True, exist_ok=True)
with open(fpath, "w") as f:
json.dump(info, f, indent=4, ensure_ascii=False)
return fpath
write_info(info, dir)
return _create_info_json_file
return _create_info
@pytest.fixture(scope="session")
def stats_path(stats_factory):
def _create_stats_json_file(dir: Path, stats: dict | None = None) -> Path:
if not stats:
def create_stats(stats_factory):
def _create_stats(dir: Path, stats: dict | None = None):
if stats is None:
stats = stats_factory()
fpath = dir / STATS_PATH
fpath.parent.mkdir(parents=True, exist_ok=True)
with open(fpath, "w") as f:
json.dump(stats, f, indent=4, ensure_ascii=False)
return fpath
write_stats(stats, dir)
return _create_stats_json_file
return _create_stats
@pytest.fixture(scope="session")
def episodes_stats_path(episodes_stats_factory):
def _create_episodes_stats_jsonl_file(dir: Path, episodes_stats: list[dict] | None = None) -> Path:
if not episodes_stats:
episodes_stats = episodes_stats_factory()
fpath = dir / EPISODES_STATS_PATH
fpath.parent.mkdir(parents=True, exist_ok=True)
with jsonlines.open(fpath, "w") as writer:
writer.write_all(episodes_stats.values())
return fpath
return _create_episodes_stats_jsonl_file
@pytest.fixture(scope="session")
def tasks_path(tasks_factory):
def _create_tasks_jsonl_file(dir: Path, tasks: list | None = None) -> Path:
if not tasks:
def create_tasks(tasks_factory):
def _create_tasks(dir: Path, tasks: pd.DataFrame | None = None):
if tasks is None:
tasks = tasks_factory()
fpath = dir / TASKS_PATH
fpath.parent.mkdir(parents=True, exist_ok=True)
with jsonlines.open(fpath, "w") as writer:
writer.write_all(tasks.values())
return fpath
write_tasks(tasks, dir)
return _create_tasks_jsonl_file
return _create_tasks
@pytest.fixture(scope="session")
def episode_path(episodes_factory):
def _create_episodes_jsonl_file(dir: Path, episodes: list | None = None) -> Path:
if not episodes:
def create_episodes(episodes_factory):
def _create_episodes(dir: Path, episodes: datasets.Dataset | None = None):
if episodes is None:
# TODO(rcadene): add features, fps as arguments
episodes = episodes_factory()
fpath = dir / EPISODES_PATH
fpath.parent.mkdir(parents=True, exist_ok=True)
with jsonlines.open(fpath, "w") as writer:
writer.write_all(episodes.values())
return fpath
write_episodes(episodes, dir)
return _create_episodes_jsonl_file
return _create_episodes
@pytest.fixture(scope="session")
def single_episode_parquet_path(hf_dataset_factory, info_factory):
def _create_single_episode_parquet(
dir: Path, ep_idx: int = 0, hf_dataset: datasets.Dataset | None = None, info: dict | None = None
) -> Path:
if not info:
info = info_factory()
def create_hf_dataset(hf_dataset_factory):
def _create_hf_dataset(
dir: Path,
hf_dataset: datasets.Dataset | None = None,
data_file_size_in_mb: float | None = None,
chunk_size: int | None = None,
):
if hf_dataset is None:
hf_dataset = hf_dataset_factory()
write_hf_dataset(hf_dataset, dir, data_file_size_in_mb, chunk_size)
data_path = info["data_path"]
chunks_size = info["chunks_size"]
ep_chunk = ep_idx // chunks_size
fpath = dir / data_path.format(episode_chunk=ep_chunk, episode_index=ep_idx)
fpath.parent.mkdir(parents=True, exist_ok=True)
table = hf_dataset.data.table
ep_table = table.filter(pc.equal(table["episode_index"], ep_idx))
pq.write_table(ep_table, fpath)
return fpath
return _create_single_episode_parquet
@pytest.fixture(scope="session")
def multi_episode_parquet_path(hf_dataset_factory, info_factory):
def _create_multi_episode_parquet(
dir: Path, hf_dataset: datasets.Dataset | None = None, info: dict | None = None
) -> Path:
if not info:
info = info_factory()
if hf_dataset is None:
hf_dataset = hf_dataset_factory()
data_path = info["data_path"]
chunks_size = info["chunks_size"]
total_episodes = info["total_episodes"]
for ep_idx in range(total_episodes):
ep_chunk = ep_idx // chunks_size
fpath = dir / data_path.format(episode_chunk=ep_chunk, episode_index=ep_idx)
fpath.parent.mkdir(parents=True, exist_ok=True)
table = hf_dataset.data.table
ep_table = table.filter(pc.equal(table["episode_index"], ep_idx))
pq.write_table(ep_table, fpath)
return dir / "data"
return _create_multi_episode_parquet
return _create_hf_dataset