Use PytorchModelHubMixin to save models as safetensors (#125)

Co-authored-by: Remi <re.cadene@gmail.com>
This commit is contained in:
Alexander Soare
2024-05-01 16:17:18 +01:00
committed by GitHub
parent 01d5490d44
commit a4891095e4
18 changed files with 556 additions and 527 deletions

View File

@@ -14,6 +14,7 @@ import numpy as np
import torch
import torch.nn.functional as F # noqa: N812
import torchvision
from huggingface_hub import PyTorchModelHubMixin
from torch import Tensor, nn
from torchvision.models._utils import IntermediateLayerGetter
from torchvision.ops.misc import FrozenBatchNorm2d
@@ -22,7 +23,7 @@ from lerobot.common.policies.act.configuration_act import ACTConfig
from lerobot.common.policies.normalize import Normalize, Unnormalize
class ACTPolicy(nn.Module):
class ACTPolicy(nn.Module, PyTorchModelHubMixin):
"""
Action Chunking Transformer Policy as per Learning Fine-Grained Bimanual Manipulation with Low-Cost
Hardware (paper: https://arxiv.org/abs/2304.13705, code: https://github.com/tonyzhaozh/act)
@@ -30,27 +31,31 @@ class ACTPolicy(nn.Module):
name = "act"
def __init__(self, cfg: ACTConfig | None = None, dataset_stats=None):
def __init__(self, config: ACTConfig | None = None, dataset_stats=None):
"""
Args:
cfg: Policy configuration class instance or None, in which case the default instantiation of the
configuration class is used.
config: Policy configuration class instance or None, in which case the default instantiation of
the configuration class is used.
"""
super().__init__()
if cfg is None:
cfg = ACTConfig()
self.cfg = cfg
self.normalize_inputs = Normalize(cfg.input_shapes, cfg.input_normalization_modes, dataset_stats)
self.normalize_targets = Normalize(cfg.output_shapes, cfg.output_normalization_modes, dataset_stats)
self.unnormalize_outputs = Unnormalize(
cfg.output_shapes, cfg.output_normalization_modes, dataset_stats
if config is None:
config = ACTConfig()
self.config = config
self.normalize_inputs = Normalize(
config.input_shapes, config.input_normalization_modes, dataset_stats
)
self.model = ACT(cfg)
self.normalize_targets = Normalize(
config.output_shapes, config.output_normalization_modes, dataset_stats
)
self.unnormalize_outputs = Unnormalize(
config.output_shapes, config.output_normalization_modes, dataset_stats
)
self.model = ACT(config)
def reset(self):
"""This should be called whenever the environment is reset."""
if self.cfg.n_action_steps is not None:
self._action_queue = deque([], maxlen=self.cfg.n_action_steps)
if self.config.n_action_steps is not None:
self._action_queue = deque([], maxlen=self.config.n_action_steps)
@torch.no_grad
def select_action(self, batch: dict[str, Tensor], **_) -> Tensor:
@@ -68,7 +73,7 @@ class ACTPolicy(nn.Module):
if len(self._action_queue) == 0:
# `self.model.forward` returns a (batch_size, n_action_steps, action_dim) tensor, but the queue
# effectively has shape (n_action_steps, batch_size, *), hence the transpose.
actions = self.model(batch)[0][: self.cfg.n_action_steps]
actions = self.model(batch)[0][: self.config.n_action_steps]
# TODO(rcadene): make _forward return output dictionary?
actions = self.unnormalize_outputs({"action": actions})["action"]
@@ -88,7 +93,7 @@ class ACTPolicy(nn.Module):
).mean()
loss_dict = {"l1_loss": l1_loss}
if self.cfg.use_vae:
if self.config.use_vae:
# Calculate Dₖₗ(latent_pdf || standard_normal). Note: After computing the KL-divergence for
# each dimension independently, we sum over the latent dimension to get the total
# KL-divergence per batch element, then take the mean over the batch.
@@ -97,7 +102,7 @@ class ACTPolicy(nn.Module):
(-0.5 * (1 + log_sigma_x2_hat - mu_hat.pow(2) - (log_sigma_x2_hat).exp())).sum(-1).mean()
)
loss_dict["kld_loss"] = mean_kld
loss_dict["loss"] = l1_loss + mean_kld * self.cfg.kl_weight
loss_dict["loss"] = l1_loss + mean_kld * self.config.kl_weight
else:
loss_dict["loss"] = l1_loss
@@ -114,17 +119,10 @@ class ACTPolicy(nn.Module):
"""
# Stack images in the order dictated by input_shapes.
batch["observation.images"] = torch.stack(
[batch[k] for k in self.cfg.input_shapes if k.startswith("observation.images.")],
[batch[k] for k in self.config.input_shapes if k.startswith("observation.images.")],
dim=-4,
)
def save(self, fp):
torch.save(self.state_dict(), fp)
def load(self, fp):
d = torch.load(fp)
self.load_state_dict(d)
class ACT(nn.Module):
"""Action Chunking Transformer: The underlying neural network for ACTPolicy.
@@ -161,36 +159,36 @@ class ACT(nn.Module):
└───────────────────────┘
"""
def __init__(self, cfg: ACTConfig):
def __init__(self, config: ACTConfig):
super().__init__()
self.cfg = cfg
self.config = config
# BERT style VAE encoder with input [cls, *joint_space_configuration, *action_sequence].
# The cls token forms parameters of the latent's distribution (like this [*means, *log_variances]).
if self.cfg.use_vae:
self.vae_encoder = ACTEncoder(cfg)
self.vae_encoder_cls_embed = nn.Embedding(1, cfg.d_model)
if self.config.use_vae:
self.vae_encoder = ACTEncoder(config)
self.vae_encoder_cls_embed = nn.Embedding(1, config.dim_model)
# Projection layer for joint-space configuration to hidden dimension.
self.vae_encoder_robot_state_input_proj = nn.Linear(
cfg.input_shapes["observation.state"][0], cfg.d_model
config.input_shapes["observation.state"][0], config.dim_model
)
# Projection layer for action (joint-space target) to hidden dimension.
self.vae_encoder_action_input_proj = nn.Linear(
cfg.input_shapes["observation.state"][0], cfg.d_model
config.input_shapes["observation.state"][0], config.dim_model
)
self.latent_dim = cfg.latent_dim
self.latent_dim = config.latent_dim
# Projection layer from the VAE encoder's output to the latent distribution's parameter space.
self.vae_encoder_latent_output_proj = nn.Linear(cfg.d_model, self.latent_dim * 2)
self.vae_encoder_latent_output_proj = nn.Linear(config.dim_model, self.latent_dim * 2)
# Fixed sinusoidal positional embedding the whole input to the VAE encoder. Unsqueeze for batch
# dimension.
self.register_buffer(
"vae_encoder_pos_enc",
create_sinusoidal_position_embedding(1 + 1 + cfg.chunk_size, cfg.d_model).unsqueeze(0),
create_sinusoidal_pos_embedding(1 + 1 + config.chunk_size, config.dim_model).unsqueeze(0),
)
# Backbone for image feature extraction.
backbone_model = getattr(torchvision.models, cfg.vision_backbone)(
replace_stride_with_dilation=[False, False, cfg.replace_final_stride_with_dilation],
weights=cfg.pretrained_backbone_weights,
backbone_model = getattr(torchvision.models, config.vision_backbone)(
replace_stride_with_dilation=[False, False, config.replace_final_stride_with_dilation],
weights=config.pretrained_backbone_weights,
norm_layer=FrozenBatchNorm2d,
)
# Note: The assumption here is that we are using a ResNet model (and hence layer4 is the final feature
@@ -199,26 +197,28 @@ class ACT(nn.Module):
self.backbone = IntermediateLayerGetter(backbone_model, return_layers={"layer4": "feature_map"})
# Transformer (acts as VAE decoder when training with the variational objective).
self.encoder = ACTEncoder(cfg)
self.decoder = ACTDecoder(cfg)
self.encoder = ACTEncoder(config)
self.decoder = ACTDecoder(config)
# Transformer encoder input projections. The tokens will be structured like
# [latent, robot_state, image_feature_map_pixels].
self.encoder_robot_state_input_proj = nn.Linear(cfg.input_shapes["observation.state"][0], cfg.d_model)
self.encoder_latent_input_proj = nn.Linear(self.latent_dim, cfg.d_model)
self.encoder_robot_state_input_proj = nn.Linear(
config.input_shapes["observation.state"][0], config.dim_model
)
self.encoder_latent_input_proj = nn.Linear(self.latent_dim, config.dim_model)
self.encoder_img_feat_input_proj = nn.Conv2d(
backbone_model.fc.in_features, cfg.d_model, kernel_size=1
backbone_model.fc.in_features, config.dim_model, kernel_size=1
)
# Transformer encoder positional embeddings.
self.encoder_robot_and_latent_pos_embed = nn.Embedding(2, cfg.d_model)
self.encoder_cam_feat_pos_embed = ACTSinusoidalPositionEmbedding2d(cfg.d_model // 2)
self.encoder_robot_and_latent_pos_embed = nn.Embedding(2, config.dim_model)
self.encoder_cam_feat_pos_embed = ACTSinusoidalPositionEmbedding2d(config.dim_model // 2)
# Transformer decoder.
# Learnable positional embedding for the transformer's decoder (in the style of DETR object queries).
self.decoder_pos_embed = nn.Embedding(cfg.chunk_size, cfg.d_model)
self.decoder_pos_embed = nn.Embedding(config.chunk_size, config.dim_model)
# Final action regression head on the output of the transformer's decoder.
self.action_head = nn.Linear(cfg.d_model, cfg.output_shapes["action"][0])
self.action_head = nn.Linear(config.dim_model, config.output_shapes["action"][0])
self._reset_parameters()
@@ -244,7 +244,7 @@ class ACT(nn.Module):
Tuple containing the latent PDF's parameters (mean, log(σ²)) both as (B, L) tensors where L is the
latent dimension.
"""
if self.cfg.use_vae and self.training:
if self.config.use_vae and self.training:
assert (
"action" in batch
), "actions must be provided when using the variational objective in training mode."
@@ -252,7 +252,7 @@ class ACT(nn.Module):
batch_size = batch["observation.state"].shape[0]
# Prepare the latent for input to the transformer encoder.
if self.cfg.use_vae and "action" in batch:
if self.config.use_vae and "action" in batch:
# Prepare the input to the VAE encoder: [cls, *joint_space_configuration, *action_sequence].
cls_embed = einops.repeat(
self.vae_encoder_cls_embed.weight, "1 d -> b 1 d", b=batch_size
@@ -322,7 +322,7 @@ class ACT(nn.Module):
# Forward pass through the transformer modules.
encoder_out = self.encoder(encoder_in, pos_embed=pos_embed)
decoder_in = torch.zeros(
(self.cfg.chunk_size, batch_size, self.cfg.d_model),
(self.config.chunk_size, batch_size, self.config.dim_model),
dtype=pos_embed.dtype,
device=pos_embed.device,
)
@@ -344,10 +344,10 @@ class ACT(nn.Module):
class ACTEncoder(nn.Module):
"""Convenience module for running multiple encoder layers, maybe followed by normalization."""
def __init__(self, cfg: ACTConfig):
def __init__(self, config: ACTConfig):
super().__init__()
self.layers = nn.ModuleList([ACTEncoderLayer(cfg) for _ in range(cfg.n_encoder_layers)])
self.norm = nn.LayerNorm(cfg.d_model) if cfg.pre_norm else nn.Identity()
self.layers = nn.ModuleList([ACTEncoderLayer(config) for _ in range(config.n_encoder_layers)])
self.norm = nn.LayerNorm(config.dim_model) if config.pre_norm else nn.Identity()
def forward(self, x: Tensor, pos_embed: Tensor | None = None) -> Tensor:
for layer in self.layers:
@@ -357,22 +357,22 @@ class ACTEncoder(nn.Module):
class ACTEncoderLayer(nn.Module):
def __init__(self, cfg: ACTConfig):
def __init__(self, config: ACTConfig):
super().__init__()
self.self_attn = nn.MultiheadAttention(cfg.d_model, cfg.n_heads, dropout=cfg.dropout)
self.self_attn = nn.MultiheadAttention(config.dim_model, config.n_heads, dropout=config.dropout)
# Feed forward layers.
self.linear1 = nn.Linear(cfg.d_model, cfg.dim_feedforward)
self.dropout = nn.Dropout(cfg.dropout)
self.linear2 = nn.Linear(cfg.dim_feedforward, cfg.d_model)
self.linear1 = nn.Linear(config.dim_model, config.dim_feedforward)
self.dropout = nn.Dropout(config.dropout)
self.linear2 = nn.Linear(config.dim_feedforward, config.dim_model)
self.norm1 = nn.LayerNorm(cfg.d_model)
self.norm2 = nn.LayerNorm(cfg.d_model)
self.dropout1 = nn.Dropout(cfg.dropout)
self.dropout2 = nn.Dropout(cfg.dropout)
self.norm1 = nn.LayerNorm(config.dim_model)
self.norm2 = nn.LayerNorm(config.dim_model)
self.dropout1 = nn.Dropout(config.dropout)
self.dropout2 = nn.Dropout(config.dropout)
self.activation = get_activation_fn(cfg.feedforward_activation)
self.pre_norm = cfg.pre_norm
self.activation = get_activation_fn(config.feedforward_activation)
self.pre_norm = config.pre_norm
def forward(self, x, pos_embed: Tensor | None = None) -> Tensor:
skip = x
@@ -395,11 +395,11 @@ class ACTEncoderLayer(nn.Module):
class ACTDecoder(nn.Module):
def __init__(self, cfg: ACTConfig):
def __init__(self, config: ACTConfig):
"""Convenience module for running multiple decoder layers followed by normalization."""
super().__init__()
self.layers = nn.ModuleList([ACTDecoderLayer(cfg) for _ in range(cfg.n_decoder_layers)])
self.norm = nn.LayerNorm(cfg.d_model)
self.layers = nn.ModuleList([ACTDecoderLayer(config) for _ in range(config.n_decoder_layers)])
self.norm = nn.LayerNorm(config.dim_model)
def forward(
self,
@@ -418,25 +418,25 @@ class ACTDecoder(nn.Module):
class ACTDecoderLayer(nn.Module):
def __init__(self, cfg: ACTConfig):
def __init__(self, config: ACTConfig):
super().__init__()
self.self_attn = nn.MultiheadAttention(cfg.d_model, cfg.n_heads, dropout=cfg.dropout)
self.multihead_attn = nn.MultiheadAttention(cfg.d_model, cfg.n_heads, dropout=cfg.dropout)
self.self_attn = nn.MultiheadAttention(config.dim_model, config.n_heads, dropout=config.dropout)
self.multihead_attn = nn.MultiheadAttention(config.dim_model, config.n_heads, dropout=config.dropout)
# Feed forward layers.
self.linear1 = nn.Linear(cfg.d_model, cfg.dim_feedforward)
self.dropout = nn.Dropout(cfg.dropout)
self.linear2 = nn.Linear(cfg.dim_feedforward, cfg.d_model)
self.linear1 = nn.Linear(config.dim_model, config.dim_feedforward)
self.dropout = nn.Dropout(config.dropout)
self.linear2 = nn.Linear(config.dim_feedforward, config.dim_model)
self.norm1 = nn.LayerNorm(cfg.d_model)
self.norm2 = nn.LayerNorm(cfg.d_model)
self.norm3 = nn.LayerNorm(cfg.d_model)
self.dropout1 = nn.Dropout(cfg.dropout)
self.dropout2 = nn.Dropout(cfg.dropout)
self.dropout3 = nn.Dropout(cfg.dropout)
self.norm1 = nn.LayerNorm(config.dim_model)
self.norm2 = nn.LayerNorm(config.dim_model)
self.norm3 = nn.LayerNorm(config.dim_model)
self.dropout1 = nn.Dropout(config.dropout)
self.dropout2 = nn.Dropout(config.dropout)
self.dropout3 = nn.Dropout(config.dropout)
self.activation = get_activation_fn(cfg.feedforward_activation)
self.pre_norm = cfg.pre_norm
self.activation = get_activation_fn(config.feedforward_activation)
self.pre_norm = config.pre_norm
def maybe_add_pos_embed(self, tensor: Tensor, pos_embed: Tensor | None) -> Tensor:
return tensor if pos_embed is None else tensor + pos_embed
@@ -489,7 +489,7 @@ class ACTDecoderLayer(nn.Module):
return x
def create_sinusoidal_position_embedding(num_positions: int, dimension: int) -> Tensor:
def create_sinusoidal_pos_embedding(num_positions: int, dimension: int) -> Tensor:
"""1D sinusoidal positional embeddings as in Attention is All You Need.
Args: