Moved the step size from the teleop device to the robot; simplified the automatic takeover code

This commit is contained in:
Michel Aractingi
2025-05-28 18:38:33 +02:00
committed by AdilZouitine
parent 1edfbf792a
commit 8feda920da
8 changed files with 138 additions and 165 deletions

View File

@@ -50,4 +50,12 @@ class SO100FollowerEndEffectorConfig(RobotConfig):
max_gripper_pos: float = 50
end_effector_step_sizes: Dict[str, float] = field(
default_factory=lambda: {
"x": 0.02,
"y": 0.02,
"z": 0.02,
}
)
urdf_path: str = "/Users/michel_aractingi/code/SO-ARM100/Simulation/SO101/so101_new_calib.urdf"

View File

@@ -106,6 +106,7 @@ class SO100FollowerEndEffector(SO100Follower):
Returns:
The joint-space action that was sent to the motors
"""
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
@@ -113,7 +114,12 @@ class SO100FollowerEndEffector(SO100Follower):
if isinstance(action, dict):
if all(k in action for k in ["delta_x", "delta_y", "delta_z", "gripper"]):
action = np.array(
[action["delta_x"], action["delta_y"], action["delta_z"], action["gripper"]],
[
action["delta_x"] * self.config.end_effector_step_sizes["x"],
action["delta_y"] * self.config.end_effector_step_sizes["y"],
action["delta_z"] * self.config.end_effector_step_sizes["z"],
action["gripper"],
],
dtype=np.float32,
)
else:

View File

@@ -25,8 +25,4 @@ class GamepadTeleopConfig(TeleoperatorConfig):
# TODO(Steven): Consider setting in here the keys that we want to capture/listen
mock: bool = False
x_step_size: float = 0.02
y_step_size: float = 0.02
z_step_size: float = 0.02
use_gripper: bool = True

View File

@@ -20,14 +20,14 @@ import time
import numpy as np
import torch
from lerobot.common.robots.kinematics import RobotKinematics
from lerobot.common.model.kinematics import RobotKinematics
from lerobot.common.utils.robot_utils import busy_wait
class InputController:
"""Base class for input controllers that generate motion deltas."""
def __init__(self, x_step_size=0.01, y_step_size=0.01, z_step_size=0.01):
def __init__(self, x_step_size=1.0, y_step_size=1.0, z_step_size=1.0):
"""
Initialize the controller.
@@ -102,7 +102,7 @@ class InputController:
class KeyboardController(InputController):
"""Generate motion deltas from keyboard input."""
def __init__(self, x_step_size=0.01, y_step_size=0.01, z_step_size=0.01):
def __init__(self, x_step_size=1.0, y_step_size=1.0, z_step_size=1.0):
super().__init__(x_step_size, y_step_size, z_step_size)
self.key_states = {
"forward_x": False,
@@ -215,7 +215,7 @@ class KeyboardController(InputController):
class GamepadController(InputController):
"""Generate motion deltas from gamepad input."""
def __init__(self, x_step_size=0.01, y_step_size=0.01, z_step_size=0.01, deadzone=0.1):
def __init__(self, x_step_size=1.0, y_step_size=1.0, z_step_size=1.0, deadzone=0.1):
super().__init__(x_step_size, y_step_size, z_step_size)
self.deadzone = deadzone
self.joystick = None
@@ -330,9 +330,9 @@ class GamepadControllerHID(InputController):
def __init__(
self,
x_step_size=0.01,
y_step_size=0.01,
z_step_size=0.01,
x_step_size=1.0,
y_step_size=1.0,
z_step_size=1.0,
deadzone=0.1,
vendor_id=0x046D,
product_id=0xC219,

View File

@@ -93,11 +93,7 @@ class GamepadTeleop(Teleoperator):
else:
from lerobot.scripts.server.end_effector_control_utils import GamepadController as Gamepad
self.gamepad = Gamepad(
x_step_size=self.config.x_step_size,
y_step_size=self.config.y_step_size,
z_step_size=self.config.z_step_size,
)
self.gamepad = Gamepad(x_step_size=1.0, y_step_size=1.0, z_step_size=1.0)
self.gamepad.start()
def calibrate(self) -> None:

View File

@@ -82,11 +82,10 @@ def find_joint_and_ee_bounds(cfg: FindJointLimitsConfig):
while True:
action = teleop.get_action()
robot.send_action(action)
observation = robot.get_observation()
joint_positions = robot.bus.sync_read("Present_Position")
joint_positions = np.array([joint_positions[key] for key in joint_positions.keys()])
ee_pos, ee_rot, _ = kinematics.forward_kinematics(joint_positions * np.pi / 180)
joint_positions = np.array([joint_positions[key] for key in joint_positions])
ee_pos, _, _ = kinematics.forward_kinematics(joint_positions * np.pi / 180)
ee_list.append(ee_pos.copy())
pos_list.append(joint_positions)

View File

@@ -37,6 +37,7 @@ from lerobot.common.robots import ( # noqa: F401
from lerobot.common.teleoperators import (
gamepad, # noqa: F401
make_teleoperator_from_config,
so101_leader, # noqa: F401
)
from lerobot.common.teleoperators.gamepad.teleop_gamepad import GamepadTeleop
from lerobot.common.utils.robot_utils import busy_wait
@@ -49,13 +50,13 @@ logging.basicConfig(level=logging.INFO)
def reset_follower_position(robot_arm, target_position):
current_position_dict = robot_arm.bus.sync_read("Present_Position")
current_position = np.array(
[current_position_dict[name] for name in current_position_dict.keys()], dtype=np.float32
[current_position_dict[name] for name in current_position_dict], dtype=np.float32
)
trajectory = torch.from_numpy(
np.linspace(current_position, target_position, 50)
) # NOTE: 30 is just an arbitrary number
for pose in trajectory:
action_dict = {name: pose for name, pose in zip(current_position_dict.keys(), pose, strict=False)}
action_dict = dict(zip(current_position_dict, pose, strict=False))
robot_arm.bus.sync_write("Goal_Position", action_dict)
busy_wait(0.015)
@@ -200,6 +201,7 @@ class RobotEnv(gym.Env):
def __init__(
self,
robot,
use_gripper: bool = False,
display_cameras: bool = False,
):
"""
@@ -225,12 +227,14 @@ class RobotEnv(gym.Env):
self.current_step = 0
self.episode_data = None
self._joint_names = [f"{key}.pos" for key in self.robot.bus.motors.keys()]
self._joint_names = [f"{key}.pos" for key in self.robot.bus.motors]
self._image_keys = self.robot.cameras.keys()
# Read initial joint positions using the bus
self.current_joint_positions = self._get_observation()["agent_pos"]
self.use_gripper = use_gripper
self._setup_spaces()
def _get_observation(self) -> np.ndarray:
@@ -277,10 +281,15 @@ class RobotEnv(gym.Env):
self.observation_space = gym.spaces.Dict(observation_spaces)
# Define the action space for joint positions along with setting an intervention flag.
action_dim = self.robot.action_features["shape"][0]
action_dim = 3
bounds = {}
bounds["min"] = np.ones(action_dim) * -0.01
bounds["max"] = np.ones(action_dim) * 0.01
bounds["min"] = -np.ones(action_dim)
bounds["max"] = np.ones(action_dim)
if self.use_gripper:
action_dim += 1
bounds["min"] = np.concatenate([bounds["min"], [0]])
bounds["max"] = np.concatenate([bounds["max"], [2]])
self.action_space = gym.spaces.Box(
low=bounds["min"],
@@ -1174,7 +1183,14 @@ class BaseLeaderControlWrapper(gym.Wrapper):
where the human can control a leader robot to guide the follower robot's movements.
"""
def __init__(self, env, teleop_device, use_geared_leader_arm: bool = False, use_gripper=False):
def __init__(
self,
env,
teleop_device,
end_effector_step_sizes,
use_geared_leader_arm: bool = False,
use_gripper=False,
):
"""
Initialize the base leader control wrapper.
@@ -1189,6 +1205,7 @@ class BaseLeaderControlWrapper(gym.Wrapper):
self.robot_follower = env.unwrapped.robot
self.use_geared_leader_arm = use_geared_leader_arm
self.use_gripper: bool = use_gripper
self.end_effector_step_sizes = np.array(list(end_effector_step_sizes.values()))
# Set up keyboard event tracking
self._init_keyboard_events()
@@ -1197,8 +1214,6 @@ class BaseLeaderControlWrapper(gym.Wrapper):
# Initialize robot control
robot_type = getattr(env.unwrapped.robot.config, "robot_type", "so100")
self.kinematics = RobotKinematics(robot_type)
self.prev_leader_ee = None
self.prev_leader_pos = None
self.leader_torque_enabled = True
self.prev_leader_gripper = None
@@ -1213,6 +1228,7 @@ class BaseLeaderControlWrapper(gym.Wrapper):
self.robot_leader.bus.write("I_Coefficient", motor, 0)
self.robot_leader.bus.write("D_Coefficient", motor, 16)
self.leader_tracking_error_queue = deque(maxlen=4)
self._init_keyboard_listener()
def _init_keyboard_events(self):
@@ -1295,24 +1311,18 @@ class BaseLeaderControlWrapper(gym.Wrapper):
leader_pos_dict = self.robot_leader.bus.sync_read("Present_Position")
follower_pos_dict = self.robot_follower.bus.sync_read("Present_Position")
leader_pos = np.array([leader_pos_dict[name] for name in leader_pos_dict.keys()], dtype=np.float32)
follower_pos = np.array(
[follower_pos_dict[name] for name in follower_pos_dict.keys()], dtype=np.float32
)
leader_pos = np.array([leader_pos_dict[name] for name in leader_pos_dict], dtype=np.float32)
follower_pos = np.array([follower_pos_dict[name] for name in follower_pos_dict], dtype=np.float32)
self.leader_tracking_error_queue.append(np.linalg.norm(follower_pos[:-1] - leader_pos[:-1]))
# [:3, 3] Last column of the transformation matrix corresponds to the xyz translation
leader_ee = self.kinematics.fk_gripper_tip(leader_pos)[:3, 3]
follower_ee = self.kinematics.fk_gripper_tip(follower_pos)[:3, 3]
if self.prev_leader_ee is None:
self.prev_leader_ee = leader_ee
# NOTE: Using the leader's position delta for teleoperation is too noisy
# Instead, we move the follower to match the leader's absolute position,
# and record the leader's position changes as the intervention action
action = np.clip(leader_ee - follower_ee, -0.01, 0.01)
# action_intervention = leader_ee - self.prev_leader_ee
self.prev_leader_ee = leader_ee
action = np.clip(leader_ee - follower_ee, -self.end_effector_step_sizes, self.end_effector_step_sizes)
# Normalize the action to the range [-1, 1]
action = action / self.end_effector_step_sizes
if self.use_gripper:
if self.prev_leader_gripper is None:
@@ -1322,23 +1332,20 @@ class BaseLeaderControlWrapper(gym.Wrapper):
# Get gripper action delta based on leader pose
leader_gripper = leader_pos[-1]
# follower_gripper = follower_pos[-1]
gripper_delta = leader_gripper - self.prev_leader_gripper
print(leader_gripper, self.prev_leader_gripper, gripper_delta)
# Normalize by max angle and quantize to {0,1,2}
normalized_delta = gripper_delta / self.robot_follower.config.max_gripper_pos
if normalized_delta >= 0.7:
if normalized_delta >= 0.3:
gripper_action = 2
elif normalized_delta <= 0.3:
elif normalized_delta <= 0.1:
gripper_action = 0
else:
gripper_action = 1
action = np.append(action, gripper_action)
# action_intervention = np.append(action_intervention, gripper_delta)
return action # , action_intervention
return action
def _handle_leader_teleoperation(self):
"""
@@ -1346,20 +1353,24 @@ class BaseLeaderControlWrapper(gym.Wrapper):
This method synchronizes the leader robot position with the follower.
"""
prev_leader_pos_dict = self.robot_leader.bus.sync_read("Present_Position")
prev_leader_pos = np.array(
[prev_leader_pos_dict[name] for name in prev_leader_pos_dict], dtype=np.float32
)
if not self.leader_torque_enabled:
self.robot_leader.bus.sync_write("Torque_Enable", 1)
self.leader_torque_enabled = True
follower_pos_dict = self.robot_follower.bus.sync_read("Present_Position")
follower_pos = np.array(
[follower_pos_dict[name] for name in follower_pos_dict.keys()], dtype=np.float32
)
follower_pos = np.array([follower_pos_dict[name] for name in follower_pos_dict], dtype=np.float32)
# self.robot_leader.write("Goal_Position", follower_pos)
# self.robot_leader.send_action(follower_pos)
goal_pos = {f"{motor}": follower_pos[i] for i, motor in enumerate(self.robot_leader.bus.motors)}
self.robot_leader.bus.sync_write("Goal_Position", goal_pos)
self.leader_tracking_error_queue.append(np.linalg.norm(follower_pos[:-1] - prev_leader_pos[:-1]))
def step(self, action):
"""
Execute a step with possible human intervention.
@@ -1411,9 +1422,8 @@ class BaseLeaderControlWrapper(gym.Wrapper):
Returns:
The initial observation and info.
"""
self.prev_leader_ee = None
self.prev_leader_pos = None
self.keyboard_events = dict.fromkeys(self.keyboard_events, False)
self.leader_tracking_error_queue.clear()
return super().reset(**kwargs)
def close(self):
@@ -1492,10 +1502,10 @@ class GearedLeaderAutomaticControlWrapper(BaseLeaderControlWrapper):
self,
env,
teleop_device,
end_effector_step_sizes,
use_gripper=False,
intervention_threshold=0.005,
release_threshold=5e-6,
queue_size=3,
intervention_threshold=10.0,
release_threshold=1e-2,
):
"""
Initialize the automatic intervention wrapper.
@@ -1508,18 +1518,11 @@ class GearedLeaderAutomaticControlWrapper(BaseLeaderControlWrapper):
release_threshold: Error threshold to release intervention.
queue_size: Number of error measurements to track for smoothing.
"""
super().__init__(env, teleop_device, use_gripper=use_gripper)
super().__init__(env, teleop_device, end_effector_step_sizes, use_gripper=use_gripper)
# Error tracking parameters
self.intervention_threshold = intervention_threshold # Threshold to trigger intervention
self.release_threshold = release_threshold # Threshold to release intervention
self.queue_size = queue_size # Number of error measurements to keep
# Error tracking variables
self.ee_error_queue = deque(maxlen=self.queue_size)
self.ee_error_over_time_queue = deque(maxlen=self.queue_size)
self.previous_ee_error = 0.0
self.previous_ee_error_over_time_over_time = 0.0
self.is_intervention_active = False
self.start_time = time.perf_counter()
@@ -1534,59 +1537,32 @@ class GearedLeaderAutomaticControlWrapper(BaseLeaderControlWrapper):
Returns:
Boolean indicating whether intervention should be active.
"""
# Skip intervention logic for the first few steps to collect data
if time.perf_counter() - self.start_time < 1.0: # Wait 1 second before enabling
# Condition for starting the intervention
# If the error in teleoperation is too high, that means the a user has grasped the leader robot and he wants to take over
if (
not self.is_intervention_active
and len(self.leader_tracking_error_queue) == self.leader_tracking_error_queue.maxlen
and np.var(list(self.leader_tracking_error_queue)[-2:]) > self.intervention_threshold
):
self.is_intervention_active = True
self.leader_tracking_error_queue.clear()
log_say("Intervention started", play_sounds=True)
return True
# Track the error over time in leader_tracking_error_queue
# If the variance of the tracking error is too low, that means the user has let go of the leader robot and the intervention is over
if (
self.is_intervention_active
and len(self.leader_tracking_error_queue) == self.leader_tracking_error_queue.maxlen
and np.var(self.leader_tracking_error_queue) < self.release_threshold
):
self.is_intervention_active = False
self.leader_tracking_error_queue.clear()
log_say("Intervention ended", play_sounds=True)
return False
# Get current positions
leader_positions_dict = self.robot_leader.bus.sync_read("Present_Position")
follower_positions_dict = self.robot_follower.bus.sync_read("Present_Position")
leader_positions = np.array(
[leader_positions_dict[name] for name in leader_positions_dict.keys()], dtype=np.float32
)
follower_positions = np.array(
[follower_positions_dict[name] for name in follower_positions_dict.keys()], dtype=np.float32
)
leader_ee = self.kinematics.fk_gripper_tip(leader_positions)[:3, 3]
follower_ee = self.kinematics.fk_gripper_tip(follower_positions)[:3, 3]
ee_error = np.linalg.norm(leader_ee - follower_ee)
ee_error_over_time = np.abs(ee_error - self.previous_ee_error)
self.ee_error_queue.append(ee_error)
self.ee_error_over_time_queue.append(ee_error_over_time)
# Calculate averages if we have enough data
if len(self.ee_error_over_time_queue) >= self.queue_size:
if self.previous_ee_error_over_time_over_time is None:
# Initialize the previous error over time
self.previous_ee_error_over_time_over_time = self.ee_error_over_time_queue[-1]
else:
# Calculate the rate of change of error over time
avg_error_over_time = np.abs(
self.ee_error_over_time_queue[-1] - self.previous_ee_error_over_time_over_time
)
self.previous_ee_error_over_time_over_time = self.ee_error_over_time_queue[-1]
self.intervention_threshold = 0.02
# Determine if intervention should start or stop based on the thresholds set in the constructor
if not self.is_intervention_active and avg_error_over_time > self.intervention_threshold:
# Transition to intervention mode
self.is_intervention_active = True
logging.info(f"Starting automatic intervention: error rate {avg_error_over_time:.4f}")
self.ee_error_over_time_queue.clear()
self.previous_ee_error_over_time_over_time = None
elif self.is_intervention_active and avg_error_over_time < self.release_threshold:
# End intervention mode
self.is_intervention_active = False
logging.info(f"Ending automatic intervention: error rate {avg_error_over_time:.4f}")
self.ee_error_over_time_queue.clear()
self.previous_ee_error_over_time_over_time = None
# If not change has happened that merits a change in the intervention state, return the current state
return self.is_intervention_active
def reset(self, **kwargs):
@@ -1600,11 +1576,6 @@ class GearedLeaderAutomaticControlWrapper(BaseLeaderControlWrapper):
The initial observation and info.
"""
self.is_intervention_active = False
self.start_time = time.perf_counter()
self.ee_error_queue.clear()
self.ee_error_over_time_queue.clear()
self.previous_ee_error = 0.0
self.old_avg_error_over_time = 0.0
return super().reset(**kwargs)
@@ -1881,6 +1852,7 @@ def make_robot_env(cfg: EnvConfig) -> gym.Env:
# Create base environment
env = RobotEnv(
robot=robot,
use_gripper=cfg.wrapper.use_gripper,
display_cameras=cfg.wrapper.display_cameras if cfg.wrapper else False,
)
@@ -1907,50 +1879,47 @@ def make_robot_env(cfg: EnvConfig) -> gym.Env:
if reward_classifier is not None:
env = RewardWrapper(env=env, reward_classifier=reward_classifier, device=cfg.device)
if cfg.wrapper:
env = TimeLimitWrapper(env=env, control_time_s=cfg.wrapper.control_time_s, fps=cfg.fps)
if cfg.wrapper.use_gripper:
# env = GripperActionWrapper(
# env=env, quantization_threshold=cfg.wrapper.gripper_quantization_threshold
# )
if cfg.wrapper.gripper_penalty is not None:
env = GripperPenaltyWrapper(
env=env,
penalty=cfg.wrapper.gripper_penalty,
)
# Control mode specific wrappers
control_mode = cfg.wrapper.control_mode
if control_mode == "gamepad":
assert isinstance(teleop_device, GamepadTeleop), (
"teleop_device must be an instance of GamepadTeleop for gamepad control mode"
)
env = GamepadControlWrapper(
env=env,
teleop_device=teleop_device,
use_gripper=cfg.wrapper.use_gripper,
)
elif control_mode == "leader":
env = GearedLeaderControlWrapper(
env=env,
teleop_device=teleop_device,
use_gripper=cfg.wrapper.use_gripper,
)
elif control_mode == "leader_automatic":
env = GearedLeaderAutomaticControlWrapper(
env=env,
teleop_device=teleop_device,
use_gripper=cfg.wrapper.use_gripper,
)
else:
raise ValueError(f"Invalid control mode: {control_mode}")
env = ResetWrapper(
env = TimeLimitWrapper(env=env, control_time_s=cfg.wrapper.control_time_s, fps=cfg.fps)
if cfg.wrapper.use_gripper and cfg.wrapper.gripper_penalty is not None:
env = GripperPenaltyWrapper(
env=env,
reset_pose=cfg.wrapper.fixed_reset_joint_positions,
reset_time_s=cfg.wrapper.reset_time_s,
penalty=cfg.wrapper.gripper_penalty,
)
# Control mode specific wrappers
control_mode = cfg.wrapper.control_mode
if control_mode == "gamepad":
assert isinstance(teleop_device, GamepadTeleop), (
"teleop_device must be an instance of GamepadTeleop for gamepad control mode"
)
env = GamepadControlWrapper(
env=env,
teleop_device=teleop_device,
use_gripper=cfg.wrapper.use_gripper,
)
elif control_mode == "leader":
env = GearedLeaderControlWrapper(
env=env,
teleop_device=teleop_device,
end_effector_step_sizes=cfg.robot.end_effector_step_sizes,
use_gripper=cfg.wrapper.use_gripper,
)
elif control_mode == "leader_automatic":
env = GearedLeaderAutomaticControlWrapper(
env=env,
teleop_device=teleop_device,
end_effector_step_sizes=cfg.robot.end_effector_step_sizes,
use_gripper=cfg.wrapper.use_gripper,
)
else:
raise ValueError(f"Invalid control mode: {control_mode}")
env = ResetWrapper(
env=env,
reset_pose=cfg.wrapper.fixed_reset_joint_positions,
reset_time_s=cfg.wrapper.reset_time_s,
)
env = BatchCompatibleWrapper(env=env)
env = TorchActionWrapper(env=env, device=cfg.device)
@@ -2234,7 +2203,7 @@ def main(cfg: EnvConfig):
while num_episode < 10:
start_loop_s = time.perf_counter()
# Sample a new random action from the robot's action space.
new_random_action = env.action_space.sample() * 0.0
new_random_action = env.action_space.sample()
# Update the smoothed action using an exponential moving average.
smoothed_action = alpha * new_random_action + (1 - alpha) * smoothed_action

View File

@@ -91,7 +91,6 @@ from torch import nn
from torch.multiprocessing import Queue
from torch.optim.optimizer import Optimizer
from lerobot.common.cameras import so100_follower_end_effector # noqa: F401
from lerobot.common.constants import (
CHECKPOINTS_DIR,
LAST_CHECKPOINT_LINK,