Clean logging, Refactor
This commit is contained in:
@@ -177,6 +177,14 @@ class PushtExperienceReplay(TensorDictReplayBuffer):
|
||||
transform=transform,
|
||||
)
|
||||
|
||||
@property
|
||||
def num_samples(self):
|
||||
return len(self)
|
||||
|
||||
@property
|
||||
def num_episodes(self):
|
||||
return len(self._storage._storage["episode"].unique())
|
||||
|
||||
@property
|
||||
def data_path_root(self):
|
||||
if self.streaming:
|
||||
|
||||
@@ -109,6 +109,14 @@ class SimxarmExperienceReplay(TensorDictReplayBuffer):
|
||||
transform=transform,
|
||||
)
|
||||
|
||||
@property
|
||||
def num_samples(self):
|
||||
return len(self)
|
||||
|
||||
@property
|
||||
def num_episodes(self):
|
||||
return len(self._storage._storage["episode"].unique())
|
||||
|
||||
@property
|
||||
def data_path_root(self):
|
||||
if self.streaming:
|
||||
|
||||
@@ -8,25 +8,6 @@ import pandas as pd
|
||||
from omegaconf import OmegaConf
|
||||
from termcolor import colored
|
||||
|
||||
CONSOLE_FORMAT = [
|
||||
("episode", "E", "int"),
|
||||
("step", "S", "int"),
|
||||
("avg_sum_reward", "RS", "float"),
|
||||
("avg_max_reward", "RM", "float"),
|
||||
("pc_success", "SR", "float"),
|
||||
("total_time", "T", "time"),
|
||||
]
|
||||
AGENT_METRICS = [
|
||||
"consistency_loss",
|
||||
"reward_loss",
|
||||
"value_loss",
|
||||
"total_loss",
|
||||
"weighted_loss",
|
||||
"pi_loss",
|
||||
"grad_norm",
|
||||
]
|
||||
|
||||
|
||||
def make_dir(dir_path):
|
||||
"""Create directory if it does not already exist."""
|
||||
with contextlib.suppress(OSError):
|
||||
@@ -80,10 +61,11 @@ class Logger:
|
||||
"""Primary logger object. Logs either locally or using wandb."""
|
||||
|
||||
def __init__(self, log_dir, job_name, cfg):
|
||||
self._log_dir = make_dir(Path(log_dir))
|
||||
self._log_dir = Path(log_dir)
|
||||
self._log_dir.mkdir(parents=True, exist_ok=True)
|
||||
self._job_name = job_name
|
||||
self._model_dir = make_dir(self._log_dir / "models")
|
||||
self._buffer_dir = make_dir(self._log_dir / "buffers")
|
||||
self._model_dir = self._log_dir / "models"
|
||||
self._buffer_dir = self._log_dir / "buffers"
|
||||
self._save_model = cfg.save_model
|
||||
self._save_buffer = cfg.save_buffer
|
||||
self._group = cfg_to_group(cfg)
|
||||
@@ -121,10 +103,11 @@ class Logger:
|
||||
print(colored("Logs will be synced with wandb.", "blue", attrs=["bold"]))
|
||||
self._wandb = wandb
|
||||
|
||||
def save_model(self, agent, identifier):
|
||||
def save_model(self, policy, identifier):
|
||||
if self._save_model:
|
||||
self._model_dir.mkdir(parents=True, exist_ok=True)
|
||||
fp = self._model_dir / f"{str(identifier)}.pt"
|
||||
agent.save(fp)
|
||||
policy.save(fp)
|
||||
if self._wandb:
|
||||
artifact = self._wandb.Artifact(
|
||||
self._group + "-" + str(self._seed) + "-" + str(identifier),
|
||||
@@ -134,6 +117,7 @@ class Logger:
|
||||
self._wandb.log_artifact(artifact)
|
||||
|
||||
def save_buffer(self, buffer, identifier):
|
||||
self._buffer_dir.mkdir(parents=True, exist_ok=True)
|
||||
fp = self._buffer_dir / f"{str(identifier)}.pkl"
|
||||
buffer.save(fp)
|
||||
if self._wandb:
|
||||
@@ -153,31 +137,13 @@ class Logger:
|
||||
self._wandb.finish()
|
||||
print_run(self._cfg, self._eval[-1][-1])
|
||||
|
||||
def _format(self, key, value, ty):
|
||||
if ty == "int":
|
||||
return f'{colored(key + ":", "yellow")} {int(value):,}'
|
||||
elif ty == "float":
|
||||
return f'{colored(key + ":", "yellow")} {value:.01f}'
|
||||
elif ty == "time":
|
||||
value = str(datetime.timedelta(seconds=int(value)))
|
||||
return f'{colored(key + ":", "yellow")} {value}'
|
||||
else:
|
||||
raise f"invalid log format type: {ty}"
|
||||
|
||||
def _print(self, d, category):
|
||||
category = colored(category, "blue" if category == "train" else "green")
|
||||
pieces = [f" {category:<14}"]
|
||||
for k, disp_k, ty in CONSOLE_FORMAT:
|
||||
pieces.append(f"{self._format(disp_k, d.get(k, 0), ty):<26}")
|
||||
print(" ".join(pieces))
|
||||
|
||||
def log(self, d, category="train"):
|
||||
assert category in {"train", "eval"}
|
||||
def log_dict(self, d, step, mode="train"):
|
||||
assert mode in {"train", "eval"}
|
||||
if self._wandb is not None:
|
||||
for k, v in d.items():
|
||||
self._wandb.log({category + "/" + k: v}, step=d["step"])
|
||||
if category == "eval":
|
||||
keys = ["step", "avg_sum_reward", "avg_max_reward", "pc_success"]
|
||||
self._eval.append(np.array([d[key] for key in keys]))
|
||||
pd.DataFrame(np.array(self._eval)).to_csv(self._log_dir / "eval.log", header=keys, index=None)
|
||||
self._print(d, category)
|
||||
self._wandb.log({f"{mode}/{k}": v}, step=step)
|
||||
|
||||
def log_video(self, video, step, mode="train"):
|
||||
assert mode in {"train", "eval"}
|
||||
wandb_video = self._wandb.Video(video, fps=self.cfg.fps, format="mp4")
|
||||
self._wandb.log({f"{mode}/video": wandb_video}, step=step)
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import copy
|
||||
import time
|
||||
|
||||
import hydra
|
||||
import torch
|
||||
@@ -110,6 +111,8 @@ class DiffusionPolicy(nn.Module):
|
||||
return action
|
||||
|
||||
def update(self, replay_buffer, step):
|
||||
start_time = time.time()
|
||||
|
||||
self.diffusion.train()
|
||||
|
||||
num_slices = self.cfg.batch_size
|
||||
@@ -125,19 +128,31 @@ class DiffusionPolicy(nn.Module):
|
||||
|
||||
out = {
|
||||
"obs": {
|
||||
"image": batch["observation", "image"].to(self.device),
|
||||
"agent_pos": batch["observation", "state"].to(self.device),
|
||||
"image": batch["observation", "image"].to(
|
||||
self.device, non_blocking=True
|
||||
),
|
||||
"agent_pos": batch["observation", "state"].to(
|
||||
self.device, non_blocking=True
|
||||
),
|
||||
},
|
||||
"action": batch["action"].to(self.device),
|
||||
"action": batch["action"].to(self.device, non_blocking=True),
|
||||
}
|
||||
return out
|
||||
|
||||
batch = replay_buffer.sample(batch_size) if self.cfg.balanced_sampling else replay_buffer.sample()
|
||||
batch = process_batch(batch, self.cfg.horizon, num_slices)
|
||||
|
||||
data_s = time.time() - start_time
|
||||
|
||||
loss = self.diffusion.compute_loss(batch)
|
||||
loss.backward()
|
||||
|
||||
grad_norm = torch.nn.utils.clip_grad_norm_(
|
||||
self.diffusion.parameters(),
|
||||
self.cfg.grad_clip_norm,
|
||||
error_if_nonfinite=False,
|
||||
)
|
||||
|
||||
self.optimizer.step()
|
||||
self.optimizer.zero_grad()
|
||||
self.lr_scheduler.step()
|
||||
@@ -145,9 +160,12 @@ class DiffusionPolicy(nn.Module):
|
||||
if self.ema is not None:
|
||||
self.ema.step(self.diffusion)
|
||||
|
||||
metrics = {
|
||||
"total_loss": loss.item(),
|
||||
info = {
|
||||
"loss": loss.item(),
|
||||
"grad_norm": float(grad_norm),
|
||||
"lr": self.lr_scheduler.get_last_lr()[0],
|
||||
"data_s": data_s,
|
||||
"update_s": time.time() - start_time,
|
||||
}
|
||||
|
||||
# TODO(rcadene): remove hardcoding
|
||||
@@ -155,7 +173,7 @@ class DiffusionPolicy(nn.Module):
|
||||
if step % 168 == 0:
|
||||
self.global_step += 1
|
||||
|
||||
return metrics
|
||||
return info
|
||||
|
||||
def save(self, fp):
|
||||
torch.save(self.state_dict(), fp)
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
# ruff: noqa: N806
|
||||
|
||||
import time
|
||||
from copy import deepcopy
|
||||
|
||||
import einops
|
||||
@@ -285,6 +286,7 @@ class TDMPC(nn.Module):
|
||||
|
||||
def update(self, replay_buffer, step, demo_buffer=None):
|
||||
"""Main update function. Corresponds to one iteration of the model learning."""
|
||||
start_time = time.time()
|
||||
|
||||
num_slices = self.cfg.batch_size
|
||||
batch_size = self.cfg.horizon * num_slices
|
||||
@@ -326,6 +328,14 @@ class TDMPC(nn.Module):
|
||||
}
|
||||
reward = batch["next", "reward"]
|
||||
|
||||
# TODO(rcadene): add non_blocking=True
|
||||
# for key in obs:
|
||||
# obs[key] = obs[key].to(self.device, non_blocking=True)
|
||||
# next_obses[key] = next_obses[key].to(self.device, non_blocking=True)
|
||||
|
||||
# action = action.to(self.device, non_blocking=True)
|
||||
# reward = reward.to(self.device, non_blocking=True)
|
||||
|
||||
# TODO(rcadene): rearrange directly in offline dataset
|
||||
if reward.ndim == 2:
|
||||
reward = einops.rearrange(reward, "h t -> h t 1")
|
||||
@@ -399,6 +409,8 @@ class TDMPC(nn.Module):
|
||||
self.std = h.linear_schedule(self.cfg.std_schedule, step)
|
||||
self.model.train()
|
||||
|
||||
data_s = time.time() - start_time
|
||||
|
||||
# Compute targets
|
||||
with torch.no_grad():
|
||||
next_z = self.model.encode(next_obses)
|
||||
@@ -482,21 +494,23 @@ class TDMPC(nn.Module):
|
||||
h.ema(self.model._Qs, self.model_target._Qs, self.cfg.tau)
|
||||
|
||||
self.model.eval()
|
||||
metrics = {
|
||||
|
||||
info = {
|
||||
"consistency_loss": float(consistency_loss.mean().item()),
|
||||
"reward_loss": float(reward_loss.mean().item()),
|
||||
"Q_value_loss": float(q_value_loss.mean().item()),
|
||||
"V_value_loss": float(v_value_loss.mean().item()),
|
||||
"total_loss": float(total_loss.mean().item()),
|
||||
"weighted_loss": float(weighted_loss.mean().item()),
|
||||
"sum_loss": float(total_loss.mean().item()),
|
||||
"loss": float(weighted_loss.mean().item()),
|
||||
"grad_norm": float(grad_norm),
|
||||
"lr": self.cfg.lr,
|
||||
"data_s": data_s,
|
||||
"update_s": time.time() - start_time,
|
||||
}
|
||||
# for key in ["demo_batch_size", "expectile"]:
|
||||
# if hasattr(self, key):
|
||||
metrics["demo_batch_size"] = demo_batch_size
|
||||
metrics["expectile"] = expectile
|
||||
metrics.update(value_info)
|
||||
metrics.update(pi_update_info)
|
||||
info["demo_batch_size"] = demo_batch_size
|
||||
info["expectile"] = expectile
|
||||
info.update(value_info)
|
||||
info.update(pi_update_info)
|
||||
|
||||
self.step[0] = step
|
||||
return metrics
|
||||
return info
|
||||
|
||||
@@ -1,4 +1,6 @@
|
||||
import logging
|
||||
import random
|
||||
from datetime import datetime
|
||||
|
||||
import numpy as np
|
||||
import torch
|
||||
@@ -10,3 +12,34 @@ def set_seed(seed):
|
||||
np.random.seed(seed)
|
||||
torch.manual_seed(seed)
|
||||
torch.cuda.manual_seed_all(seed)
|
||||
|
||||
|
||||
def init_logging():
|
||||
def custom_format(record):
|
||||
dt = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||
fnameline = f"{record.pathname}:{record.lineno}"
|
||||
message = f"{record.levelname} {dt} {fnameline[-15:]:>15} {record.msg}"
|
||||
return message
|
||||
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
|
||||
for handler in logging.root.handlers[:]:
|
||||
logging.root.removeHandler(handler)
|
||||
|
||||
formatter = logging.Formatter()
|
||||
formatter.format = custom_format
|
||||
console_handler = logging.StreamHandler()
|
||||
console_handler.setFormatter(formatter)
|
||||
logging.getLogger().addHandler(console_handler)
|
||||
|
||||
|
||||
def format_number_KMB(num):
|
||||
suffixes = ["", "K", "M", "B", "T", "Q"]
|
||||
divisor = 1000.0
|
||||
|
||||
for suffix in suffixes:
|
||||
if abs(num) < divisor:
|
||||
return f"{num:.0f}{suffix}"
|
||||
num /= divisor
|
||||
|
||||
return num
|
||||
|
||||
Reference in New Issue
Block a user