Extend reward classifier for multiple camera views (#626)

This commit is contained in:
Michel Aractingi
2025-01-13 13:57:49 +01:00
parent d1d6ffd23c
commit 181727c0fe
9 changed files with 186 additions and 50 deletions

View File

@@ -25,13 +25,13 @@ from glob import glob
from pathlib import Path
import torch
import wandb
from huggingface_hub.constants import SAFETENSORS_SINGLE_FILE
from omegaconf import DictConfig, OmegaConf
from termcolor import colored
from torch.optim import Optimizer
from torch.optim.lr_scheduler import LRScheduler
import wandb
from lerobot.common.policies.policy_protocol import Policy
from lerobot.common.utils.utils import get_global_random_state, set_global_random_state

View File

@@ -13,6 +13,7 @@ class ClassifierConfig:
model_name: str = "microsoft/resnet-50"
device: str = "cpu"
model_type: str = "cnn" # "transformer" or "cnn"
num_cameras: int = 2
def save_pretrained(self, save_dir):
"""Save config to json file."""

View File

@@ -97,7 +97,7 @@ class Classifier(
raise ValueError("Unsupported transformer architecture since hidden_size is not found")
self.classifier_head = nn.Sequential(
nn.Linear(input_dim, self.config.hidden_dim),
nn.Linear(input_dim * self.config.num_cameras, self.config.hidden_dim),
nn.Dropout(self.config.dropout_rate),
nn.LayerNorm(self.config.hidden_dim),
nn.ReLU(),
@@ -130,11 +130,11 @@ class Classifier(
return outputs.pooler_output
return outputs.last_hidden_state[:, 0, :]
def forward(self, x: torch.Tensor) -> ClassifierOutput:
def forward(self, xs: torch.Tensor) -> ClassifierOutput:
"""Forward pass of the classifier."""
# For training, we expect input to be a tensor directly from LeRobotDataset
encoder_output = self._get_encoder_output(x)
logits = self.classifier_head(encoder_output)
encoder_outputs = torch.hstack([self._get_encoder_output(x) for x in xs])
logits = self.classifier_head(encoder_outputs)
if self.config.num_classes == 2:
logits = logits.squeeze(-1)
@@ -142,4 +142,10 @@ class Classifier(
else:
probabilities = torch.softmax(logits, dim=-1)
return ClassifierOutput(logits=logits, probabilities=probabilities, hidden_states=encoder_output)
return ClassifierOutput(logits=logits, probabilities=probabilities, hidden_states=encoder_outputs)
def predict_reward(self, x):
if self.config.num_classes == 2:
return (self.forward(x).probabilities > 0.5).float()
else:
return torch.argmax(self.forward(x).probabilities, dim=1)

View File

@@ -25,6 +25,7 @@ from copy import copy
from functools import cache
import rerun as rr
import numpy as np
import torch
from deepdiff import DeepDiff
from termcolor import colored
@@ -316,7 +317,17 @@ def reset_environment(robot, events, reset_time_s, fps):
)
def stop_recording(robot, listener, display_data):
def reset_follower_position(robot: Robot, target_position):
current_position = robot.follower_arms["main"].read("Present_Position")
trajectory = torch.from_numpy(
np.linspace(current_position, target_position, 30)
) # NOTE: 30 is just an aribtrary number
for pose in trajectory:
robot.send_action(pose)
busy_wait(0.015)
def stop_recording(robot, listener, display_cameras):
robot.disconnect()
if not is_headless() and listener is not None: