Adapted gym_manipulator to teh new convention in robot devices

changed the motor bus tranformation to degrees but considering the min and max
fixed the kinematics in the so100_follower_end_effector
This commit is contained in:
Michel Aractingi
2025-05-26 19:06:26 +02:00
committed by AdilZouitine
parent e044208534
commit df96e5b3b2
9 changed files with 133 additions and 163 deletions

View File

@@ -1,5 +1,3 @@
from lerobot.common.model.kinematics_utils import forward_kinematics, inverse_kinematics, load_model
from lerobot.common.model.kinematics_utils import RobotKinematics
from .kinematics import RobotKinematics
__all__ = ["RobotKinematics", "load_model", "forward_kinematics", "inverse_kinematics"]
__all__ = ["RobotKinematics"]

View File

@@ -799,7 +799,8 @@ class MotorsBus(abc.ABC):
resolution = self.model_resolution_table[self.motors[motor].model]
if drive_mode:
val *= -1
middle_pos = homing_offset + resolution // 2
# middle_pos = homing_offset + (resolution - 1) // 2
middle_pos = int((max_ + min_) / 2)
normalized_values[id_] = ((val - middle_pos) / (resolution // 2)) * 180
else:
# TODO(alibers): velocity and degree modes
@@ -831,7 +832,8 @@ class MotorsBus(abc.ABC):
elif self.motors[motor].norm_mode is MotorNormMode.DEGREE:
homing_offset = self.calibration[motor].homing_offset
resolution = self.model_resolution_table[self.motors[motor].model]
middle_pos = homing_offset + resolution // 2
# middle_pos = homing_offset + resolution // 2
middle_pos = int((max_ + min_) / 2)
unnormalized_values[id_] = int((val / 180) * resolution // 2) + middle_pos
if drive_mode:
unnormalized_values[id_] *= -1

View File

@@ -43,11 +43,11 @@ class SO100FollowerEndEffectorConfig(RobotConfig):
# Default bounds for the end-effector position (in meters)
end_effector_bounds: Dict[str, List[float]] = field(
default_factory=lambda: {
"min": [-1.0, -1.0, -0.0], # min x, y, z
"min": [-1.0, -1.0, -1.0], # min x, y, z
"max": [1.0, 1.0, 1.0], # max x, y, z
}
)
max_gripper_pos: float = 50
urdf_path: str = "/Users/michel_aractingi/code/lerobot/so101_new_calib.urdf"
urdf_path: str = "/Users/michel_aractingi/code/SO-ARM100/Simulation/SO101/so101_new_calib.urdf"

View File

@@ -75,6 +75,9 @@ class SO100FollowerEndEffector(SO100Follower):
self.joint_mins = None
self.joint_maxs = None
self.current_ee_pos = None
self.current_joint_pos = None
@property
def action_features(self) -> Dict[str, Any]:
"""
@@ -119,25 +122,21 @@ class SO100FollowerEndEffector(SO100Follower):
)
action = np.zeros(4, dtype=np.float32)
self.bus.sync_write("Torque_Enable", 0)
# Read current joint positions
current_joint_pos = self.bus.sync_read("Present_Position")
# Convert dict to ordered list without gripper
joint_names = ["shoulder_pan", "shoulder_lift", "elbow_flex", "wrist_flex", "wrist_roll", "gripper"]
# Convert the joint positions from min-max to degrees
current_joint_pos = np.array([current_joint_pos[name] for name in joint_names])
print(current_joint_pos)
if self.current_joint_pos is None:
# Read current joint positions
current_joint_pos = self.bus.sync_read("Present_Position")
self.current_joint_pos = np.array([current_joint_pos[name] for name in self.bus.motors.keys()])
# Calculate current end-effector position using forward kinematics
current_ee_pos = self.fk_function(current_joint_pos)
if self.current_ee_pos is None:
self.current_ee_pos = self.fk_function(self.current_joint_pos)
# Set desired end-effector position by adding delta
desired_ee_pos = np.eye(4)
desired_ee_pos[:3, :3] = current_ee_pos[:3, :3] # Keep orientation
desired_ee_pos[:3, :3] = self.current_ee_pos[:3, :3] # Keep orientation
# Add delta to position and clip to bounds
desired_ee_pos[:3, 3] = current_ee_pos[:3, 3] + action[:3]
desired_ee_pos[:3, 3] = self.current_ee_pos[:3, 3] + action[:3]
if self.end_effector_bounds is not None:
desired_ee_pos[:3, 3] = np.clip(
desired_ee_pos[:3, 3],
@@ -147,7 +146,7 @@ class SO100FollowerEndEffector(SO100Follower):
# Compute inverse kinematics to get joint positions
target_joint_values_in_degrees = self.kinematics.ik(
current_joint_pos,
self.current_joint_pos,
desired_ee_pos,
position_only=True,
fk_func=self.fk_function,
@@ -155,16 +154,21 @@ class SO100FollowerEndEffector(SO100Follower):
# Create joint space action dictionary
joint_action = {
f"{joint_names[i]}.pos": target_joint_values_in_degrees[i]
for i in range(len(joint_names) - 1) # Exclude gripper
f"{key}.pos": target_joint_values_in_degrees[i]
for i, key in enumerate(self.bus.motors.keys())
}
# Handle gripper separately if included in action
joint_action["gripper.pos"] = np.clip(
current_joint_pos[-1] + (action[-1] - 1) * self.config.max_gripper_pos,
self.current_joint_pos[-1] + (action[-1] - 1) * self.config.max_gripper_pos,
0,
self.config.max_gripper_pos,
)
self.current_ee_pos = desired_ee_pos.copy()
self.current_joint_pos = target_joint_values_in_degrees.copy()
self.current_joint_pos[-1] = joint_action["gripper.pos"]
# Send joint space action to parent class
return super().send_action(joint_action)

View File

@@ -25,8 +25,8 @@ class GamepadTeleopConfig(TeleoperatorConfig):
# TODO(Steven): Consider setting in here the keys that we want to capture/listen
mock: bool = False
x_step_size: float = 0.1
y_step_size: float = 0.1
z_step_size: float = 0.1
x_step_size: float = 0.02
y_step_size: float = 0.02
z_step_size: float = 0.02
use_gripper: bool = True

View File

@@ -16,6 +16,7 @@
import logging
import time
from typing import Any
from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
from lerobot.common.motors import Motor, MotorCalibration, MotorNormMode
@@ -27,6 +28,7 @@ from lerobot.common.motors.feetech import (
from ..teleoperator import Teleoperator
from .config_so101_leader import SO101LeaderConfig
logger = logging.getLogger(__name__)
@@ -44,11 +46,11 @@ class SO101Leader(Teleoperator):
self.bus = FeetechMotorsBus(
port=self.config.port,
motors={
"shoulder_pan": Motor(1, "sts3215", MotorNormMode.RANGE_M100_100),
"shoulder_lift": Motor(2, "sts3215", MotorNormMode.RANGE_M100_100),
"elbow_flex": Motor(3, "sts3215", MotorNormMode.RANGE_M100_100),
"wrist_flex": Motor(4, "sts3215", MotorNormMode.RANGE_M100_100),
"wrist_roll": Motor(5, "sts3215", MotorNormMode.RANGE_M100_100),
"shoulder_pan": Motor(1, "sts3215", MotorNormMode.DEGREE),
"shoulder_lift": Motor(2, "sts3215", MotorNormMode.DEGREE),
"elbow_flex": Motor(3, "sts3215", MotorNormMode.DEGREE),
"wrist_flex": Motor(4, "sts3215", MotorNormMode.DEGREE),
"wrist_roll": Motor(5, "sts3215", MotorNormMode.DEGREE),
"gripper": Motor(6, "sts3215", MotorNormMode.RANGE_0_100),
},
calibration=self.calibration,
@@ -139,4 +141,4 @@ class SO101Leader(Teleoperator):
DeviceNotConnectedError(f"{self} is not connected.")
self.bus.disconnect()
logger.info(f"{self} disconnected.")
logger.info(f"{self} disconnected.")

View File

@@ -36,7 +36,7 @@ from dataclasses import dataclass
import draccus
import numpy as np
from lerobot.common.model.kinematics import RobotKinematics
from lerobot.common.model.kinematics_utils import RobotKinematics
from lerobot.common.robots import ( # noqa: F401
RobotConfig,
koch_follower,
@@ -63,6 +63,8 @@ class FindJointLimitsConfig:
# Display all cameras on screen
display_data: bool = False
urdf_path: str = "/Users/michel_aractingi/code/SO-ARM100/Simulation/SO101/so101_new_calib.urdf"
@draccus.wrap()
def find_joint_and_ee_bounds(cfg: FindJointLimitsConfig):
@@ -75,21 +77,17 @@ def find_joint_and_ee_bounds(cfg: FindJointLimitsConfig):
start_episode_t = time.perf_counter()
ee_list = []
pos_list = []
robot_type = cfg.robot.type.split("_")[0]
kinematics = RobotKinematics(robot_type)
control_time_s = 30
kinematics = RobotKinematics(cfg.urdf_path)
control_time_s = 10
while True:
action = teleop.get_action()
robot.send_action(action)
observation = robot.get_observation()
# Wait for 5 seconds to stabilize the robot initial position
if time.perf_counter() - start_episode_t < 5:
continue
joint_positions = robot.bus.sync_read("Present_Position")
joint_positions = np.array([joint_positions[key] for key in joint_positions.keys()])
ee_list.append(kinematics.fk_gripper_tip(joint_positions)[:3, 3])
ee_pos, ee_rot, _ = kinematics.forward_kinematics(joint_positions * np.pi / 180)
ee_list.append(ee_pos.copy())
pos_list.append(joint_positions)
if time.perf_counter() - start_episode_t > control_time_s:

View File

@@ -1,4 +1,4 @@
#!/usr/bin/env python
# !/usr/bin/env python
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
@@ -37,6 +37,7 @@ from lerobot.common.robots import ( # noqa: F401
from lerobot.common.teleoperators import (
gamepad, # noqa: F401
make_teleoperator_from_config,
so101_leader,
)
from lerobot.common.teleoperators.gamepad.configuration_gamepad import GamepadTeleopConfig
from lerobot.common.teleoperators.gamepad.teleop_gamepad import GamepadTeleop
@@ -45,7 +46,6 @@ from lerobot.common.utils.utils import log_say
from lerobot.configs import parser
logging.basicConfig(level=logging.INFO)
MAX_GRIPPER_COMMAND = 30
def reset_follower_position(robot_arm, target_position):
@@ -279,10 +279,10 @@ class RobotEnv(gym.Env):
self.observation_space = gym.spaces.Dict(observation_spaces)
# Define the action space for joint positions along with setting an intervention flag.
action_dim = len(self.robot.action_features["shape"])
action_dim = self.robot.action_features["shape"][0]
bounds = {}
bounds["min"] = np.ones(action_dim) * -1000
bounds["max"] = np.ones(action_dim) * 1000
bounds["min"] = np.ones(action_dim) * -0.01
bounds["max"] = np.ones(action_dim) * 0.01
self.action_space = gym.spaces.Box(
low=bounds["min"],
@@ -334,10 +334,10 @@ class RobotEnv(gym.Env):
- truncated (bool): True if the episode was truncated (e.g., time constraints).
- info (dict): Additional debugging information including intervention status.
"""
self.current_joint_positions = self._get_observation()["observation.state"]
self.current_joint_positions = self._get_observation()["agent_pos"]
self.robot.send_action(torch.from_numpy(action))
observation = self._get_observation()["observation.state"]
action_dict = {"delta_x": action[0], "delta_y": action[1], "delta_z": action[2], "gripper": action[3]}
self.robot.send_action(action_dict)
if self.display_cameras:
self.render()
@@ -349,7 +349,7 @@ class RobotEnv(gym.Env):
truncated = False
return (
observation,
self._get_observation(),
reward,
terminated,
truncated,
@@ -810,11 +810,11 @@ class ResetWrapper(gym.Wrapper):
reset_follower_position(self.unwrapped.robot, self.reset_pose)
log_say("Reset the environment done.", play_sounds=True)
# if len(self.robot.leader_arms) > 0:
# self.robot.leader_arms["main"].write("Torque_Enable", 1)
# log_say("Reset the leader robot.", play_sounds=True)
# reset_follower_position(self.robot.leader_arms["main"], self.reset_pose)
# log_say("Reset the leader robot done.", play_sounds=True)
if hasattr(self.env, "robot_leader"):
self.env.robot_leader.bus.sync_write("Torque_Enable", 1)
log_say("Reset the leader robot.", play_sounds=True)
reset_follower_position(self.env.robot_leader, self.reset_pose)
log_say("Reset the leader robot done.", play_sounds=True)
else:
log_say(
f"Manually reset the environment for {self.reset_time_s} seconds.",
@@ -822,7 +822,8 @@ class ResetWrapper(gym.Wrapper):
)
start_time = time.perf_counter()
while time.perf_counter() - start_time < self.reset_time_s:
self.robot.teleop_step()
action = self.env.robot_leader.get_action()
self.unwrapped.robot.send_action(action)
log_say("Manual reset of the environment done.", play_sounds=True)
@@ -899,7 +900,7 @@ class GripperPenaltyWrapper(gym.RewardWrapper):
Returns:
Modified reward with penalty applied if necessary.
"""
gripper_state_normalized = self.last_gripper_state / MAX_GRIPPER_COMMAND
gripper_state_normalized = self.last_gripper_state / self.unwrapped.robot.config.max_gripper_pos
action_normalized = action - 1.0 # action / MAX_GRIPPER_COMMAND
@@ -998,11 +999,11 @@ class GripperActionWrapper(gym.ActionWrapper):
gripper_command = (
np.sign(gripper_command) if abs(gripper_command) > self.quantization_threshold else 0.0
)
gripper_command = gripper_command * MAX_GRIPPER_COMMAND
gripper_command = gripper_command * self.unwrapped.robot.config.max_gripper_pos
gripper_state = self.unwrapped.robot.bus.sync_read("Present_Position")["gripper"]
gripper_action_value = np.clip(gripper_state + gripper_command, 0, MAX_GRIPPER_COMMAND)
gripper_action_value = np.clip(gripper_state + gripper_command, 0, self.unwrapped.robot.config.max_gripper_pos)
action[-1] = gripper_action_value.item()
return action
@@ -1178,23 +1179,21 @@ class BaseLeaderControlWrapper(gym.Wrapper):
"""
def __init__(
self, env, use_geared_leader_arm: bool = False, ee_action_space_params=None, use_gripper=False
self, env, teleop_device, use_geared_leader_arm: bool = False, use_gripper=False
):
"""
Initialize the base leader control wrapper.
Args:
env: The environment to wrap.
teleop_device: The teleoperation device.
use_geared_leader_arm: Whether to use a geared leader arm setup.
ee_action_space_params: Parameters defining the end-effector action space.
use_gripper: Whether to include gripper control.
"""
super().__init__(env)
self.robot_leader = env.unwrapped.teleop_device
self.robot_leader = teleop_device
self.robot_follower = env.unwrapped.robot
self.use_geared_leader_arm = use_geared_leader_arm
self.ee_action_space_params = ee_action_space_params
self.use_ee_action_space = ee_action_space_params is not None
self.use_gripper: bool = use_gripper
# Set up keyboard event tracking
@@ -1207,16 +1206,18 @@ class BaseLeaderControlWrapper(gym.Wrapper):
self.prev_leader_ee = None
self.prev_leader_pos = None
self.leader_torque_enabled = True
self.prev_leader_gripper = None
# Configure leader arm
# NOTE: Lower the gains of leader arm for automatic take-over
# With lower gains we can manually move the leader arm without risk of injury to ourselves or the robot
# With higher gains, it would be dangerous and difficult to modify the leader's pose while torque is enabled
# Default value for P_coeff is 32
self.robot_leader.write("Torque_Enable", 1)
self.robot_leader.write("P_Coefficient", 16)
self.robot_leader.write("I_Coefficient", 0)
self.robot_leader.write("D_Coefficient", 16)
self.robot_leader.bus.sync_write("Torque_Enable", 1)
for motor in self.robot_leader.bus.motors:
self.robot_leader.bus.write("P_Coefficient", motor, 16)
self.robot_leader.bus.write("I_Coefficient", motor, 0)
self.robot_leader.bus.write("D_Coefficient", motor, 16)
self._init_keyboard_listener()
@@ -1294,7 +1295,7 @@ class BaseLeaderControlWrapper(gym.Wrapper):
Tuple of (modified_action, intervention_action).
"""
if self.leader_torque_enabled:
self.robot_leader.write("Torque_Enable", 0)
self.robot_leader.bus.sync_write("Torque_Enable", 0)
self.leader_torque_enabled = False
leader_pos_dict = self.robot_leader.bus.sync_read("Present_Position")
@@ -1315,29 +1316,33 @@ class BaseLeaderControlWrapper(gym.Wrapper):
# NOTE: Using the leader's position delta for teleoperation is too noisy
# Instead, we move the follower to match the leader's absolute position,
# and record the leader's position changes as the intervention action
action = leader_ee - follower_ee
action_intervention = leader_ee - self.prev_leader_ee
action = np.clip(leader_ee - follower_ee, -0.01, 0.01)
# action_intervention = leader_ee - self.prev_leader_ee
self.prev_leader_ee = leader_ee
if self.use_gripper:
if self.prev_leader_gripper is None:
self.prev_leader_gripper = np.clip(leader_pos[-1], 0, self.robot_follower.config.max_gripper_pos)
# Get gripper action delta based on leader pose
leader_gripper = leader_pos[-1]
follower_gripper = follower_pos[-1]
gripper_delta = leader_gripper - follower_gripper
leader_gripper = leader_pos[-1]
# follower_gripper = follower_pos[-1]
gripper_delta = leader_gripper - self.prev_leader_gripper
print(leader_gripper, self.prev_leader_gripper, gripper_delta)
# Normalize by max angle and quantize to {0,1,2}
normalized_delta = gripper_delta / MAX_GRIPPER_COMMAND
if normalized_delta > 0.3:
normalized_delta = gripper_delta / self.robot_follower.config.max_gripper_pos
if normalized_delta >= 0.7:
gripper_action = 2
elif normalized_delta < -0.3:
elif normalized_delta <= 0.3:
gripper_action = 0
else:
gripper_action = 1
action = np.append(action, gripper_action)
action_intervention = np.append(action_intervention, gripper_delta)
# action_intervention = np.append(action_intervention, gripper_delta)
return action, action_intervention
return action # , action_intervention
def _handle_leader_teleoperation(self):
"""
@@ -1346,7 +1351,7 @@ class BaseLeaderControlWrapper(gym.Wrapper):
This method synchronizes the leader robot position with the follower.
"""
if not self.leader_torque_enabled:
self.robot_leader.write("Torque_Enable", 1)
self.robot_leader.bus.sync_write("Torque_Enable", 1)
self.leader_torque_enabled = True
follower_pos_dict = self.robot_follower.bus.sync_read("Present_Position")
@@ -1354,7 +1359,10 @@ class BaseLeaderControlWrapper(gym.Wrapper):
[follower_pos_dict[name] for name in follower_pos_dict.keys()], dtype=np.float32
)
self.robot_leader.write("Goal_Position", follower_pos)
# self.robot_leader.write("Goal_Position", follower_pos)
# self.robot_leader.send_action(follower_pos)
goal_pos = {f"{motor}": follower_pos[i] for i, motor in enumerate(self.robot_leader.bus.motors)}
self.robot_leader.bus.sync_write("Goal_Position", goal_pos)
def step(self, action):
"""
@@ -1367,11 +1375,10 @@ class BaseLeaderControlWrapper(gym.Wrapper):
Tuple of (observation, reward, terminated, truncated, info).
"""
is_intervention = self._check_intervention()
action_intervention = None
# NOTE:
if is_intervention:
action, action_intervention = self._handle_intervention(action)
action = self._handle_intervention(action)
else:
self._handle_leader_teleoperation()
@@ -1380,7 +1387,9 @@ class BaseLeaderControlWrapper(gym.Wrapper):
# Add intervention info
info["is_intervention"] = is_intervention
info["action_intervention"] = action_intervention if is_intervention else None
info["action_intervention"] = action if is_intervention else None
self.prev_leader_gripper = np.clip(self.robot_leader.bus.sync_read("Present_Position")["gripper"], 0, self.robot_follower.config.max_gripper_pos)
# Check for success or manual termination
success = self.keyboard_events["episode_success"]
@@ -1482,7 +1491,7 @@ class GearedLeaderAutomaticControlWrapper(BaseLeaderControlWrapper):
def __init__(
self,
env,
ee_action_space_params=None,
teleop_device,
use_gripper=False,
intervention_threshold=0.005,
release_threshold=5e-6,
@@ -1493,13 +1502,13 @@ class GearedLeaderAutomaticControlWrapper(BaseLeaderControlWrapper):
Args:
env: The environment to wrap.
ee_action_space_params: Parameters defining the end-effector action space.
teleop_device: The teleoperation device.
use_gripper: Whether to include gripper control.
intervention_threshold: Error threshold to trigger intervention.
release_threshold: Error threshold to release intervention.
queue_size: Number of error measurements to track for smoothing.
"""
super().__init__(env, ee_action_space_params=ee_action_space_params, use_gripper=use_gripper)
super().__init__(env, teleop_device, use_gripper=use_gripper)
# Error tracking parameters
self.intervention_threshold = intervention_threshold # Threshold to trigger intervention
@@ -1560,6 +1569,8 @@ class GearedLeaderAutomaticControlWrapper(BaseLeaderControlWrapper):
self.ee_error_over_time_queue[-1] - self.previous_ee_error_over_time_over_time
)
self.previous_ee_error_over_time_over_time = self.ee_error_over_time_queue[-1]
self.intervention_threshold = 0.02
# Determine if intervention should start or stop based on the thresholds set in the constructor
if not self.is_intervention_active and avg_error_over_time > self.intervention_threshold:
@@ -1721,10 +1732,10 @@ class GamepadControlWrapper(gym.Wrapper):
logging.info(f"Episode manually ended: {'SUCCESS' if success else 'FAILURE'}")
# Only override the action if gamepad is active
action_to_step = gamepad_action if is_intervention else action
action = gamepad_action if is_intervention else action
# Step the environment
obs, reward, terminated, truncated, info = self.env.step(action_to_step)
obs, reward, terminated, truncated, info = self.env.step(action)
# Add episode ending if requested via gamepad
terminated = terminated or truncated or terminate_episode
@@ -1742,8 +1753,6 @@ class GamepadControlWrapper(gym.Wrapper):
# If not intervention, policy's action is `action`.
# For consistency, let's store the *human's* action if intervention occurred.
info["action_intervention"] = action
if is_intervention:
info["action_intervention"] = gamepad_action
info["rerecord_episode"] = rerecord_episode
@@ -1866,34 +1875,7 @@ def make_robot_env(cfg: EnvConfig) -> gym.Env:
raise ValueError("RobotConfig (cfg.robot) must be provided for gym_manipulator environment.")
robot = make_robot_from_config(cfg.robot)
# Instantiate teleoperation device
teleop_device = None
if cfg.teleop is not None:
# Ensure the config is of the correct type for GamepadTeleop if that's what make_teleoperator expects
# or if we are instantiating directly.
# For GamepadTeleop, its config is LerobotGamepadTeleopConfig.
if cfg.teleop.type == "gamepad":
if not isinstance(cfg.teleop, GamepadTeleopConfig):
# If the main teleop is just a base TeleoperatorConfig with type="gamepad",
# we might need to create the specific LerobotGamepadTeleopConfig here,
# or ensure the user provides the specific one in the main YAML config.
# For now, assume cfg.teleop IS a LerobotGamepadTeleopConfig instance.
raise ValueError(
"cfg.teleop must be an instance of LerobotGamepadTeleopConfig when type is 'gamepad'."
)
# Directly instantiate GamepadTeleop, as make_teleoperator might expect a different setup or registry.
# Or, if make_teleoperator is robust, it could handle it.
# Given we know it's GamepadTeleop from common.teleoperators for this path:
teleop_device = GamepadTeleop(config=cfg.teleop)
else:
# For other types, rely on make_teleoperator if it's generic enough
try:
teleop_device = make_teleoperator_from_config(cfg.teleop)
except Exception as e:
raise NotImplementedError(
f"Teleop device type '{cfg.teleop.type}' not supported or failed instantiation with make_teleoperator_from_config: {e}"
)
teleop_device = make_teleoperator_from_config(cfg.teleop)
teleop_device.connect()
# Create base environment
@@ -1937,21 +1919,10 @@ def make_robot_env(cfg: EnvConfig) -> gym.Env:
penalty=cfg.wrapper.gripper_penalty,
)
# env = EEActionWrapper(
# env=env,
# ee_action_space_params=cfg.wrapper.ee_action_space_params,
# use_gripper=cfg.wrapper.use_gripper,
# )
# Control mode specific wrappers
control_mode = cfg.wrapper.control_mode
if control_mode == "gamepad":
if teleop_device is None:
raise ValueError("A teleop_device must be instantiated for gamepad control mode.")
if not isinstance(teleop_device, GamepadTeleop):
raise ValueError(
f"teleop_device must be an instance of GamepadTeleop for gamepad control mode, got {type(teleop_device)}."
)
assert isinstance(teleop_device, GamepadTeleop), "teleop_device must be an instance of GamepadTeleop for gamepad control mode"
env = GamepadControlWrapper(
env=env,
teleop_device=teleop_device,
@@ -1960,20 +1931,16 @@ def make_robot_env(cfg: EnvConfig) -> gym.Env:
elif control_mode == "leader":
env = GearedLeaderControlWrapper(
env=env,
ee_action_space_params=cfg.wrapper.ee_action_space_params,
teleop_device=teleop_device,
use_gripper=cfg.wrapper.use_gripper,
)
elif control_mode == "leader_automatic":
env = GearedLeaderAutomaticControlWrapper(
env=env,
ee_action_space_params=cfg.wrapper.ee_action_space_params,
teleop_device=teleop_device,
use_gripper=cfg.wrapper.use_gripper,
)
elif control_mode not in [
"gamepad",
"leader",
"leader_automatic",
]: # Ensure there's a fallback or error for unsupported control modes
else:
raise ValueError(f"Invalid control mode: {control_mode}")
env = ResetWrapper(
@@ -2254,34 +2221,34 @@ def main(cfg: EnvConfig):
env.reset()
# # Initialize the smoothed action as a random sample.
# smoothed_action = env.action_space.sample()
# Initialize the smoothed action as a random sample.
smoothed_action = env.action_space.sample() * 0.0
# # Smoothing coefficient (alpha) defines how much of the new random sample to mix in.
# # A value close to 0 makes the trajectory very smooth (slow to change), while a value close to 1 is less smooth.
# alpha = 1.0
# Smoothing coefficient (alpha) defines how much of the new random sample to mix in.
# A value close to 0 makes the trajectory very smooth (slow to change), while a value close to 1 is less smooth.
alpha = 1.0
# num_episode = 0
# successes = []
# while num_episode < 10:
# start_loop_s = time.perf_counter()
# # Sample a new random action from the robot's action space.
# new_random_action = env.action_space.sample()
# # Update the smoothed action using an exponential moving average.
# smoothed_action = alpha * new_random_action + (1 - alpha) * smoothed_action
num_episode = 0
successes = []
while num_episode < 10:
start_loop_s = time.perf_counter()
# Sample a new random action from the robot's action space.
new_random_action = env.action_space.sample()
# Update the smoothed action using an exponential moving average.
smoothed_action = alpha * new_random_action + (1 - alpha) * smoothed_action
# # Execute the step: wrap the NumPy action in a torch tensor.
# obs, reward, terminated, truncated, info = env.step(smoothed_action)
# if terminated or truncated:
# successes.append(reward)
# env.reset()
# num_episode += 1
# Execute the step: wrap the NumPy action in a torch tensor.
obs, reward, terminated, truncated, info = env.step(smoothed_action)
if terminated or truncated:
successes.append(reward)
env.reset()
num_episode += 1
# dt_s = time.perf_counter() - start_loop_s
# busy_wait(1 / cfg.fps - dt_s)
dt_s = time.perf_counter() - start_loop_s
busy_wait(1 / cfg.fps - dt_s)
# logging.info(f"Success after 20 steps {successes}")
# logging.info(f"success rate {sum(successes) / len(successes)}")
logging.info(f"Success after 20 steps {successes}")
logging.info(f"success rate {sum(successes) / len(successes)}")
if __name__ == "__main__":

View File

@@ -91,7 +91,6 @@ def teleop_loop(
if isinstance(val, float):
rr.log(f"action_{act}", rr.Scalar(val))
breakpoint()
robot.send_action(action)
loop_s = time.perf_counter() - loop_start