Add online training with TD-MPC as proof of concept (#338)

This commit is contained in:
Alexander Soare
2024-07-25 11:16:38 +01:00
committed by GitHub
parent abbb1d2367
commit f8a6574698
25 changed files with 1291 additions and 233 deletions

View File

@@ -15,20 +15,25 @@
# limitations under the License.
import logging
import time
from concurrent.futures import ThreadPoolExecutor
from contextlib import nullcontext
from copy import deepcopy
from pathlib import Path
from pprint import pformat
from threading import Lock
import hydra
import numpy as np
import torch
from deepdiff import DeepDiff
from omegaconf import DictConfig, OmegaConf
from omegaconf import DictConfig, ListConfig, OmegaConf
from termcolor import colored
from torch import nn
from torch.cuda.amp import GradScaler
from lerobot.common.datasets.factory import make_dataset, resolve_delta_timestamps
from lerobot.common.datasets.lerobot_dataset import MultiLeRobotDataset
from lerobot.common.datasets.online_buffer import OnlineBuffer, compute_sampler_weights
from lerobot.common.datasets.sampler import EpisodeAwareSampler
from lerobot.common.datasets.utils import cycle
from lerobot.common.envs.factory import make_env
@@ -107,6 +112,7 @@ def update_policy(
grad_scaler: GradScaler,
lr_scheduler=None,
use_amp: bool = False,
lock=None,
):
"""Returns a dictionary of items for logging."""
start_time = time.perf_counter()
@@ -129,7 +135,8 @@ def update_policy(
# Optimizer's gradients are already unscaled, so scaler.step does not unscale them,
# although it still skips optimizer.step() if the gradients contain infs or NaNs.
grad_scaler.step(optimizer)
with lock if lock is not None else nullcontext():
grad_scaler.step(optimizer)
# Updates the scale for next iteration.
grad_scaler.update()
@@ -149,11 +156,12 @@ def update_policy(
"update_s": time.perf_counter() - start_time,
**{k: v for k, v in output_dict.items() if k != "loss"},
}
info.update({k: v for k, v in output_dict.items() if k not in info})
return info
def log_train_info(logger: Logger, info, step, cfg, dataset, is_offline):
def log_train_info(logger: Logger, info, step, cfg, dataset, is_online):
loss = info["loss"]
grad_norm = info["grad_norm"]
lr = info["lr"]
@@ -187,12 +195,12 @@ def log_train_info(logger: Logger, info, step, cfg, dataset, is_offline):
info["num_samples"] = num_samples
info["num_episodes"] = num_episodes
info["num_epochs"] = num_epochs
info["is_offline"] = is_offline
info["is_online"] = is_online
logger.log_dict(info, step, mode="train")
def log_eval_info(logger, info, step, cfg, dataset, is_offline):
def log_eval_info(logger, info, step, cfg, dataset, is_online):
eval_s = info["eval_s"]
avg_sum_reward = info["avg_sum_reward"]
pc_success = info["pc_success"]
@@ -221,7 +229,7 @@ def log_eval_info(logger, info, step, cfg, dataset, is_offline):
info["num_samples"] = num_samples
info["num_episodes"] = num_episodes
info["num_epochs"] = num_epochs
info["is_offline"] = is_offline
info["is_online"] = is_online
logger.log_dict(info, step, mode="eval")
@@ -234,6 +242,9 @@ def train(cfg: DictConfig, out_dir: str | None = None, job_name: str | None = No
init_logging()
if cfg.training.online_steps > 0 and isinstance(cfg.dataset_repo_id, ListConfig):
raise NotImplementedError("Online training with LeRobotMultiDataset is not implemented.")
# If we are resuming a run, we need to check that a checkpoint exists in the log directory, and we need
# to check for any differences between the provided config and the checkpoint's config.
if cfg.resume:
@@ -279,9 +290,6 @@ def train(cfg: DictConfig, out_dir: str | None = None, job_name: str | None = No
# log metrics to terminal and wandb
logger = Logger(cfg, out_dir, wandb_job_name=job_name)
if cfg.training.online_steps > 0:
raise NotImplementedError("Online training is not implemented yet.")
set_global_seed(cfg.seed)
# Check device is available
@@ -336,7 +344,7 @@ def train(cfg: DictConfig, out_dir: str | None = None, job_name: str | None = No
logging.info(f"{num_total_params=} ({format_big_number(num_total_params)})")
# Note: this helper will be used in offline and online training loops.
def evaluate_and_checkpoint_if_needed(step):
def evaluate_and_checkpoint_if_needed(step, is_online):
_num_digits = max(6, len(str(cfg.training.offline_steps + cfg.training.online_steps)))
step_identifier = f"{step:0{_num_digits}d}"
@@ -352,7 +360,7 @@ def train(cfg: DictConfig, out_dir: str | None = None, job_name: str | None = No
max_episodes_rendered=4,
start_seed=cfg.seed,
)
log_eval_info(logger, eval_info["aggregated"], step, cfg, offline_dataset, is_offline=True)
log_eval_info(logger, eval_info["aggregated"], step, cfg, offline_dataset, is_online=is_online)
if cfg.wandb.enable:
logger.log_video(eval_info["video_paths"][0], step, mode="eval")
logging.info("Resume training")
@@ -396,8 +404,9 @@ def train(cfg: DictConfig, out_dir: str | None = None, job_name: str | None = No
dl_iter = cycle(dataloader)
policy.train()
offline_step = 0
for _ in range(step, cfg.training.offline_steps):
if step == 0:
if offline_step == 0:
logging.info("Start offline training on a fixed dataset")
start_time = time.perf_counter()
@@ -420,13 +429,207 @@ def train(cfg: DictConfig, out_dir: str | None = None, job_name: str | None = No
train_info["dataloading_s"] = dataloading_s
if step % cfg.training.log_freq == 0:
log_train_info(logger, train_info, step, cfg, offline_dataset, is_offline=True)
log_train_info(logger, train_info, step, cfg, offline_dataset, is_online=False)
# Note: evaluate_and_checkpoint_if_needed happens **after** the `step`th training update has completed,
# so we pass in step + 1.
evaluate_and_checkpoint_if_needed(step + 1)
evaluate_and_checkpoint_if_needed(step + 1, is_online=False)
step += 1
offline_step += 1 # noqa: SIM113
if cfg.training.online_steps == 0:
if eval_env:
eval_env.close()
logging.info("End of training")
return
# Online training.
# Create an env dedicated to online episodes collection from policy rollout.
online_env = make_env(cfg, n_envs=cfg.training.online_rollout_batch_size)
resolve_delta_timestamps(cfg)
online_buffer_path = logger.log_dir / "online_buffer"
if cfg.resume and not online_buffer_path.exists():
# If we are resuming a run, we default to the data shapes and buffer capacity from the saved online
# buffer.
logging.warning(
"When online training is resumed, we load the latest online buffer from the prior run, "
"and this might not coincide with the state of the buffer as it was at the moment the checkpoint "
"was made. This is because the online buffer is updated on disk during training, independently "
"of our explicit checkpointing mechanisms."
)
online_dataset = OnlineBuffer(
online_buffer_path,
data_spec={
**{k: {"shape": v, "dtype": np.dtype("float32")} for k, v in policy.config.input_shapes.items()},
**{k: {"shape": v, "dtype": np.dtype("float32")} for k, v in policy.config.output_shapes.items()},
"next.reward": {"shape": (), "dtype": np.dtype("float32")},
"next.done": {"shape": (), "dtype": np.dtype("?")},
"next.success": {"shape": (), "dtype": np.dtype("?")},
},
buffer_capacity=cfg.training.online_buffer_capacity,
fps=online_env.unwrapped.metadata["render_fps"],
delta_timestamps=cfg.training.delta_timestamps,
)
# If we are doing online rollouts asynchronously, deepcopy the policy to use for online rollouts (this
# makes it possible to do online rollouts in parallel with training updates).
online_rollout_policy = deepcopy(policy) if cfg.training.do_online_rollout_async else policy
# Create dataloader for online training.
concat_dataset = torch.utils.data.ConcatDataset([offline_dataset, online_dataset])
sampler_weights = compute_sampler_weights(
offline_dataset,
offline_drop_n_last_frames=cfg.training.get("drop_n_last_frames", 0),
online_dataset=online_dataset,
# +1 because online rollouts return an extra frame for the "final observation". Note: we don't have
# this final observation in the offline datasets, but we might add them in future.
online_drop_n_last_frames=cfg.training.get("drop_n_last_frames", 0) + 1,
online_sampling_ratio=cfg.training.online_sampling_ratio,
)
sampler = torch.utils.data.WeightedRandomSampler(
sampler_weights,
num_samples=len(concat_dataset),
replacement=True,
)
dataloader = torch.utils.data.DataLoader(
concat_dataset,
batch_size=cfg.training.batch_size,
num_workers=cfg.training.num_workers,
sampler=sampler,
pin_memory=device.type != "cpu",
drop_last=True,
)
dl_iter = cycle(dataloader)
# Lock and thread pool executor for asynchronous online rollouts. When asynchronous mode is disabled,
# these are still used but effectively do nothing.
lock = Lock()
# Note: 1 worker because we only ever want to run one set of online rollouts at a time. Batch
# parallelization of rollouts is handled within the job.
executor = ThreadPoolExecutor(max_workers=1)
online_step = 0
online_rollout_s = 0 # time take to do online rollout
update_online_buffer_s = 0 # time taken to update the online buffer with the online rollout data
# Time taken waiting for the online buffer to finish being updated. This is relevant when using the async
# online rollout option.
await_update_online_buffer_s = 0
rollout_start_seed = cfg.training.online_env_seed
while True:
if online_step == cfg.training.online_steps:
break
if online_step == 0:
logging.info("Start online training by interacting with environment")
def sample_trajectory_and_update_buffer():
nonlocal rollout_start_seed
with lock:
online_rollout_policy.load_state_dict(policy.state_dict())
online_rollout_policy.eval()
start_rollout_time = time.perf_counter()
with torch.no_grad():
eval_info = eval_policy(
online_env,
online_rollout_policy,
n_episodes=cfg.training.online_rollout_n_episodes,
max_episodes_rendered=min(10, cfg.training.online_rollout_n_episodes),
videos_dir=logger.log_dir / "online_rollout_videos",
return_episode_data=True,
start_seed=(
rollout_start_seed := (rollout_start_seed + cfg.training.batch_size) % 1000000
),
)
online_rollout_s = time.perf_counter() - start_rollout_time
with lock:
start_update_buffer_time = time.perf_counter()
online_dataset.add_data(eval_info["episodes"])
# Update the concatenated dataset length used during sampling.
concat_dataset.cumulative_sizes = concat_dataset.cumsum(concat_dataset.datasets)
# Update the sampling weights.
sampler.weights = compute_sampler_weights(
offline_dataset,
offline_drop_n_last_frames=cfg.training.get("drop_n_last_frames", 0),
online_dataset=online_dataset,
# +1 because online rollouts return an extra frame for the "final observation". Note: we don't have
# this final observation in the offline datasets, but we might add them in future.
online_drop_n_last_frames=cfg.training.get("drop_n_last_frames", 0) + 1,
online_sampling_ratio=cfg.training.online_sampling_ratio,
)
sampler.num_samples = len(concat_dataset)
update_online_buffer_s = time.perf_counter() - start_update_buffer_time
return online_rollout_s, update_online_buffer_s
future = executor.submit(sample_trajectory_and_update_buffer)
# If we aren't doing async rollouts, or if we haven't yet gotten enough examples in our buffer, wait
# here until the rollout and buffer update is done, before proceeding to the policy update steps.
if (
not cfg.training.do_online_rollout_async
or len(online_dataset) <= cfg.training.online_buffer_seed_size
):
online_rollout_s, update_online_buffer_s = future.result()
if len(online_dataset) <= cfg.training.online_buffer_seed_size:
logging.info(
f"Seeding online buffer: {len(online_dataset)}/{cfg.training.online_buffer_seed_size}"
)
continue
policy.train()
for _ in range(cfg.training.online_steps_between_rollouts):
with lock:
start_time = time.perf_counter()
batch = next(dl_iter)
dataloading_s = time.perf_counter() - start_time
for key in batch:
batch[key] = batch[key].to(cfg.device, non_blocking=True)
train_info = update_policy(
policy,
batch,
optimizer,
cfg.training.grad_clip_norm,
grad_scaler=grad_scaler,
lr_scheduler=lr_scheduler,
use_amp=cfg.use_amp,
lock=lock,
)
train_info["dataloading_s"] = dataloading_s
train_info["online_rollout_s"] = online_rollout_s
train_info["update_online_buffer_s"] = update_online_buffer_s
train_info["await_update_online_buffer_s"] = await_update_online_buffer_s
with lock:
train_info["online_buffer_size"] = len(online_dataset)
if step % cfg.training.log_freq == 0:
log_train_info(logger, train_info, step, cfg, online_dataset, is_online=True)
# Note: evaluate_and_checkpoint_if_needed happens **after** the `step`th training update has completed,
# so we pass in step + 1.
evaluate_and_checkpoint_if_needed(step + 1, is_online=True)
step += 1
online_step += 1
# If we're doing async rollouts, we should now wait until we've completed them before proceeding
# to do the next batch of rollouts.
if future.running():
start = time.perf_counter()
online_rollout_s, update_online_buffer_s = future.result()
await_update_online_buffer_s = time.perf_counter() - start
if online_step >= cfg.training.online_steps:
break
if eval_env:
eval_env.close()