From 2475645f5f2a1f3cfc8f6d48ea1dbd305baaee88 Mon Sep 17 00:00:00 2001 From: Michel Aractingi Date: Fri, 23 May 2025 17:08:51 +0200 Subject: [PATCH] Modified kinematics code to be independant of drive mode Modified gym_manipulator.py and find_joint_limits to adhere to the refactor of robot devices Modified the configuration of envs to take into account the refactor --- lerobot/calibrate.py | 2 +- lerobot/common/envs/configs.py | 36 +- lerobot/common/model/__init__.py | 3 +- lerobot/common/model/kinematics.py | 85 +++- lerobot/common/motors/motors_bus.py | 21 +- .../config_so100_follower_end_effector.py | 10 +- .../so100_follower_end_effector.py | 27 +- .../gamepad/configuration_gamepad.py | 6 +- .../teleoperators/gamepad/teleop_gamepad.py | 3 +- lerobot/scripts/server/find_joint_limits.py | 161 +++---- lerobot/scripts/server/gym_manipulator.py | 438 ++++++++++-------- 11 files changed, 437 insertions(+), 355 deletions(-) diff --git a/lerobot/calibrate.py b/lerobot/calibrate.py index 6780577f..2ff9bd93 100644 --- a/lerobot/calibrate.py +++ b/lerobot/calibrate.py @@ -40,7 +40,7 @@ from lerobot.common.robots import ( # noqa: F401 lekiwi, make_robot_from_config, so100_follower, - so101_follower, + so100_follower_end_effector, ) from lerobot.common.teleoperators import ( # noqa: F401 Teleoperator, diff --git a/lerobot/common/envs/configs.py b/lerobot/common/envs/configs.py index 22d437b5..0e1be280 100644 --- a/lerobot/common/envs/configs.py +++ b/lerobot/common/envs/configs.py @@ -18,8 +18,9 @@ from typing import Any, Dict, Optional, Tuple import draccus -from lerobot.common.constants import ACTION, OBS_ENV, OBS_IMAGE, OBS_IMAGES, OBS_ROBOT -from lerobot.common.robot_devices.robots.configs import RobotConfig +from lerobot.common.constants import ACTION, OBS_ENV_STATE, OBS_IMAGE, OBS_IMAGES, OBS_STATE +from lerobot.common.robots import RobotConfig +from lerobot.common.teleoperators.config import TeleoperatorConfig from lerobot.configs.types import FeatureType, PolicyFeature @@ -97,8 +98,8 @@ class PushtEnv(EnvConfig): features_map: dict[str, str] = field( default_factory=lambda: { "action": ACTION, - "agent_pos": OBS_ROBOT, - "environment_state": OBS_ENV, + "agent_pos": OBS_STATE, + "environment_state": OBS_ENV_STATE, "pixels": OBS_IMAGE, } ) @@ -139,7 +140,7 @@ class XarmEnv(EnvConfig): features_map: dict[str, str] = field( default_factory=lambda: { "action": ACTION, - "agent_pos": OBS_ROBOT, + "agent_pos": OBS_STATE, "pixels": OBS_IMAGE, } ) @@ -168,22 +169,23 @@ class VideoRecordConfig: trajectory_name: str = "trajectory" -@dataclass -class EEActionSpaceConfig: - """Configuration parameters for end-effector action space.""" +# @dataclass +# class EEActionSpaceConfig: +# """Configuration parameters for end-effector action space.""" - x_step_size: float - y_step_size: float - z_step_size: float - bounds: Dict[str, Any] # Contains 'min' and 'max' keys with position bounds - control_mode: str = "gamepad" +# x_step_size: float +# y_step_size: float +# z_step_size: float +# bounds: Dict[str, Any] # Contains 'min' and 'max' keys with position bounds +# control_mode: str = "gamepad" @dataclass class EnvTransformConfig: """Configuration for environment wrappers.""" - ee_action_space_params: EEActionSpaceConfig = field(default_factory=EEActionSpaceConfig) + # ee_action_space_params: EEActionSpaceConfig = field(default_factory=EEActionSpaceConfig) + control_mode: str = "gamepad" display_cameras: bool = False add_joint_velocity_to_observation: bool = False add_current_to_observation: bool = False @@ -205,6 +207,7 @@ class HILSerlRobotEnvConfig(EnvConfig): """Configuration for the HILSerlRobotEnv environment.""" robot: Optional[RobotConfig] = None + teleop: Optional[TeleoperatorConfig] = None wrapper: Optional[EnvTransformConfig] = None fps: int = 10 name: str = "real_robot" @@ -252,12 +255,13 @@ class HILEnvConfig(EnvConfig): default_factory=lambda: { "action": ACTION, "observation.image": OBS_IMAGE, - "observation.state": OBS_ROBOT, + "observation.state": OBS_STATE, } ) ################# args from hilserlrobotenv reward_classifier_pretrained_path: Optional[str] = None - robot: Optional[RobotConfig] = None + robot_config: Optional[RobotConfig] = None + teleop_config: Optional[TeleoperatorConfig] = None wrapper: Optional[EnvTransformConfig] = None mode: str = None # Either "record", "replay", None repo_id: Optional[str] = None diff --git a/lerobot/common/model/__init__.py b/lerobot/common/model/__init__.py index f4ab4b4f..543bfc37 100644 --- a/lerobot/common/model/__init__.py +++ b/lerobot/common/model/__init__.py @@ -1,3 +1,4 @@ from .kinematics import RobotKinematics +from lerobot.common.model.kinematics_utils import load_model, forward_kinematics, inverse_kinematics -__all__ = ["RobotKinematics"] +__all__ = ["RobotKinematics", "load_model", "forward_kinematics", "inverse_kinematics"] diff --git a/lerobot/common/model/kinematics.py b/lerobot/common/model/kinematics.py index c42d9b2f..a97b4813 100644 --- a/lerobot/common/model/kinematics.py +++ b/lerobot/common/model/kinematics.py @@ -125,6 +125,14 @@ class RobotKinematics: "shoulder": [0, 0, 0], "base": [0, 0, 0.02], }, + "so101": { + "gripper": [0.33, 0.0, 0.285], + "wrist": [0.30, 0.0, 0.267], + "forearm": [0.25, 0.0, 0.266], + "humerus": [0.06, 0.0, 0.264], + "shoulder": [0.0, 0.0, 0.238], + "base": [0.0, 0.0, 0.12], + }, } def __init__(self, robot_type="so100"): @@ -308,10 +316,15 @@ class RobotKinematics: def fk_humerus(self, robot_pos_deg): """Forward kinematics for the humerus frame.""" robot_pos_rad = robot_pos_deg / 180 * np.pi + + theta_shoulder_pan = robot_pos_rad[0] + # NOTE: Negate shoulder lift angle for all robot types + theta_shoulder_lift = -robot_pos_rad[1] + return ( self.X_WoBo - @ screw_axis_to_transform(self.S_BS, robot_pos_rad[0]) - @ screw_axis_to_transform(self.S_BH, robot_pos_rad[1]) + @ screw_axis_to_transform(self.S_BS, theta_shoulder_pan) + @ screw_axis_to_transform(self.S_BH, theta_shoulder_lift) @ self.X_HoHc @ self.X_BH ) @@ -319,11 +332,17 @@ class RobotKinematics: def fk_forearm(self, robot_pos_deg): """Forward kinematics for the forearm frame.""" robot_pos_rad = robot_pos_deg / 180 * np.pi + + theta_shoulder_pan = robot_pos_rad[0] + # NOTE: Negate shoulder lift angle for all robot types + theta_shoulder_lift = -robot_pos_rad[1] + theta_elbow_flex = robot_pos_rad[2] + return ( self.X_WoBo - @ screw_axis_to_transform(self.S_BS, robot_pos_rad[0]) - @ screw_axis_to_transform(self.S_BH, robot_pos_rad[1]) - @ screw_axis_to_transform(self.S_BF, robot_pos_rad[2]) + @ screw_axis_to_transform(self.S_BS, theta_shoulder_pan) + @ screw_axis_to_transform(self.S_BH, theta_shoulder_lift) + @ screw_axis_to_transform(self.S_BF, theta_elbow_flex) @ self.X_FoFc # spellchecker:disable-line @ self.X_BF ) @@ -331,12 +350,19 @@ class RobotKinematics: def fk_wrist(self, robot_pos_deg): """Forward kinematics for the wrist frame.""" robot_pos_rad = robot_pos_deg / 180 * np.pi + + theta_shoulder_pan = robot_pos_rad[0] + # NOTE: Negate shoulder lift angle for all robot types + theta_shoulder_lift = -robot_pos_rad[1] + theta_elbow_flex = robot_pos_rad[2] + theta_wrist_flex = robot_pos_rad[3] + return ( self.X_WoBo - @ screw_axis_to_transform(self.S_BS, robot_pos_rad[0]) - @ screw_axis_to_transform(self.S_BH, robot_pos_rad[1]) - @ screw_axis_to_transform(self.S_BF, robot_pos_rad[2]) - @ screw_axis_to_transform(self.S_BR, robot_pos_rad[3]) + @ screw_axis_to_transform(self.S_BS, theta_shoulder_pan) + @ screw_axis_to_transform(self.S_BH, theta_shoulder_lift) + @ screw_axis_to_transform(self.S_BF, theta_elbow_flex) + @ screw_axis_to_transform(self.S_BR, theta_wrist_flex) @ self.X_RoRc @ self.X_BR @ self.wrist_X0 @@ -345,26 +371,42 @@ class RobotKinematics: def fk_gripper(self, robot_pos_deg): """Forward kinematics for the gripper frame.""" robot_pos_rad = robot_pos_deg / 180 * np.pi + + theta_shoulder_pan = robot_pos_rad[0] + # NOTE: Negate shoulder lift angle for all robot types + theta_shoulder_lift = -robot_pos_rad[1] + theta_elbow_flex = robot_pos_rad[2] + theta_wrist_flex = robot_pos_rad[3] + theta_wrist_roll = robot_pos_rad[4] + return ( self.X_WoBo - @ screw_axis_to_transform(self.S_BS, robot_pos_rad[0]) - @ screw_axis_to_transform(self.S_BH, robot_pos_rad[1]) - @ screw_axis_to_transform(self.S_BF, robot_pos_rad[2]) - @ screw_axis_to_transform(self.S_BR, robot_pos_rad[3]) - @ screw_axis_to_transform(self.S_BG, robot_pos_rad[4]) + @ screw_axis_to_transform(self.S_BS, theta_shoulder_pan) + @ screw_axis_to_transform(self.S_BH, theta_shoulder_lift) + @ screw_axis_to_transform(self.S_BF, theta_elbow_flex) + @ screw_axis_to_transform(self.S_BR, theta_wrist_flex) + @ screw_axis_to_transform(self.S_BG, theta_wrist_roll) @ self._fk_gripper_post ) def fk_gripper_tip(self, robot_pos_deg): """Forward kinematics for the gripper tip frame.""" robot_pos_rad = robot_pos_deg / 180 * np.pi + + theta_shoulder_pan = robot_pos_rad[0] + # Negate shoulder lift angle for all robot types + theta_shoulder_lift = -robot_pos_rad[1] + theta_elbow_flex = robot_pos_rad[2] + theta_wrist_flex = robot_pos_rad[3] + theta_wrist_roll = robot_pos_rad[4] + return ( self.X_WoBo - @ screw_axis_to_transform(self.S_BS, robot_pos_rad[0]) - @ screw_axis_to_transform(self.S_BH, robot_pos_rad[1]) - @ screw_axis_to_transform(self.S_BF, robot_pos_rad[2]) - @ screw_axis_to_transform(self.S_BR, robot_pos_rad[3]) - @ screw_axis_to_transform(self.S_BG, robot_pos_rad[4]) + @ screw_axis_to_transform(self.S_BS, theta_shoulder_pan) + @ screw_axis_to_transform(self.S_BH, theta_shoulder_lift) + @ screw_axis_to_transform(self.S_BF, theta_elbow_flex) + @ screw_axis_to_transform(self.S_BR, theta_wrist_flex) + @ screw_axis_to_transform(self.S_BG, theta_wrist_roll) @ self.X_GoGt @ self.X_BoGo @ self.gripper_X0 @@ -422,7 +464,7 @@ class RobotKinematics: jac[:, el_ix] = Sdot return jac - def ik(self, current_joint_state, desired_ee_pose, position_only=True, fk_func=None): + def ik(self, current_joint_pos, desired_ee_pose, position_only=True, fk_func=None): """Inverse kinematics using gradient descent. Args: @@ -438,6 +480,7 @@ class RobotKinematics: fk_func = self.fk_gripper # Do gradient descent. + current_joint_state = current_joint_pos.copy() max_iterations = 5 learning_rate = 1 for _ in range(max_iterations): @@ -535,7 +578,7 @@ if __name__ == "__main__": # Run tests for all robot types results = {} - for robot_type in ["koch", "so100", "moss"]: + for robot_type in ["koch", "so100", "moss", "so101"]: results[robot_type] = run_test(robot_type) # Print overall summary diff --git a/lerobot/common/motors/motors_bus.py b/lerobot/common/motors/motors_bus.py index b1d32bbd..c1a423a2 100644 --- a/lerobot/common/motors/motors_bus.py +++ b/lerobot/common/motors/motors_bus.py @@ -35,6 +35,8 @@ from tqdm import tqdm from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError from lerobot.common.utils.utils import enter_pressed, move_cursor_up +import math + NameOrID: TypeAlias = str | int Value: TypeAlias = int | float @@ -42,7 +44,6 @@ MAX_ID_RANGE = 252 logger = logging.getLogger(__name__) - def get_ctrl_table(model_ctrl_table: dict[str, dict], model: str) -> dict[str, tuple[int, int]]: ctrl_table = model_ctrl_table.get(model) if ctrl_table is None: @@ -795,12 +796,12 @@ class MotorsBus(abc.ABC): norm = ((bounded_val - min_) / (max_ - min_)) * 100 normalized_values[id_] = 100 - norm if drive_mode else norm elif self.motors[motor].norm_mode is MotorNormMode.DEGREE: + homing_offset = self.calibration[motor].homing_offset + resolution = self.model_resolution_table[self.motors[motor].model] if drive_mode: val *= -1 - val += homing_offset - normalized_values[id_] = ( - val / (self.model_resolution_table[self.motors[motor].model] // 2) * 180 - ) + middle_pos = homing_offset + resolution // 2 + normalized_values[id_] = ((val - middle_pos) / (resolution // 2)) * 180 else: # TODO(alibers): velocity and degree modes raise NotImplementedError @@ -830,12 +831,14 @@ class MotorsBus(abc.ABC): unnormalized_values[id_] = int((bounded_val / 100) * (max_ - min_) + min_) elif self.motors[motor].norm_mode is MotorNormMode.DEGREE: homing_offset = self.calibration[motor].homing_offset - unnormalized_values[id_] = int( - val / 180 * (self.model_resolution_table[self.motors[motor].model] // 2) - ) - unnormalized_values[id_] -= homing_offset + resolution = self.model_resolution_table[self.motors[motor].model] + middle_pos = homing_offset + resolution // 2 + unnormalized_values[id_] = int((val / 180) * resolution//2) + middle_pos if drive_mode: unnormalized_values[id_] *= -1 + + if unnormalized_values[id_] < 0: + breakpoint() else: # TODO(aliberts): degree mode raise NotImplementedError diff --git a/lerobot/common/robots/so100_follower_end_effector/config_so100_follower_end_effector.py b/lerobot/common/robots/so100_follower_end_effector/config_so100_follower_end_effector.py index 6f0f8c54..59736e97 100644 --- a/lerobot/common/robots/so100_follower_end_effector/config_so100_follower_end_effector.py +++ b/lerobot/common/robots/so100_follower_end_effector/config_so100_follower_end_effector.py @@ -41,9 +41,13 @@ class SO100FollowerEndEffectorConfig(RobotConfig): cameras: dict[str, CameraConfig] = field(default_factory=dict) # Default bounds for the end-effector position (in meters) - bounds: Dict[str, List[float]] = field( + end_effector_bounds: Dict[str, List[float]] = field( default_factory=lambda: { - "min": [0.1, -0.3, 0.0], # min x, y, z - "max": [0.4, 0.3, 0.4], # max x, y, z + "min": [-1.0, -1.0, -0.0], # min x, y, z + "max": [1.0, 1.0, 1.0], # max x, y, z } ) + + max_gripper_pos: float = 50 + + urdf_path: str = "/Users/michel_aractingi/code/lerobot/so101_new_calib.urdf" diff --git a/lerobot/common/robots/so100_follower_end_effector/so100_follower_end_effector.py b/lerobot/common/robots/so100_follower_end_effector/so100_follower_end_effector.py index 694c3542..4dad7336 100644 --- a/lerobot/common/robots/so100_follower_end_effector/so100_follower_end_effector.py +++ b/lerobot/common/robots/so100_follower_end_effector/so100_follower_end_effector.py @@ -59,13 +59,13 @@ class SO100FollowerEndEffector(SO100Follower): self.config = config # Initialize the kinematics module for the so100 robot - self.kinematics = RobotKinematics(robot_type="so100") + self.kinematics = RobotKinematics(robot_type="so101") # Set the forward kinematics function self.fk_function = self.kinematics.fk_gripper_tip # Store the bounds for end-effector position - self.bounds = self.config.bounds + self.end_effector_bounds = self.config.end_effector_bounds # Store the joint mins and maxs self.joint_mins = None @@ -115,13 +115,16 @@ class SO100FollowerEndEffector(SO100Follower): ) action = np.zeros(4, dtype=np.float32) + self.bus.sync_write("Torque_Enable", 0) # Read current joint positions current_joint_pos = self.bus.sync_read("Present_Position") + # Convert dict to ordered list without gripper joint_names = ["shoulder_pan", "shoulder_lift", "elbow_flex", "wrist_flex", "wrist_roll", "gripper"] # Convert the joint positions from min-max to degrees current_joint_pos = np.array([current_joint_pos[name] for name in joint_names]) + print(current_joint_pos) # Calculate current end-effector position using forward kinematics current_ee_pos = self.fk_function(current_joint_pos) @@ -132,11 +135,11 @@ class SO100FollowerEndEffector(SO100Follower): # Add delta to position and clip to bounds desired_ee_pos[:3, 3] = current_ee_pos[:3, 3] + action[:3] - if self.bounds is not None: + if self.end_effector_bounds is not None: desired_ee_pos[:3, 3] = np.clip( desired_ee_pos[:3, 3], - self.bounds["min"], - self.bounds["max"], + self.end_effector_bounds["min"], + self.end_effector_bounds["max"], ) # Compute inverse kinematics to get joint positions @@ -154,14 +157,10 @@ class SO100FollowerEndEffector(SO100Follower): } # Handle gripper separately if included in action - if "gripper.pos" in action: - joint_action["gripper.pos"] = action["gripper.pos"] - else: - # Keep current gripper position - joint_action["gripper.pos"] = current_joint_pos[-1] - - import time - - time.sleep(0.001) + joint_action["gripper.pos"] = np.clip( + current_joint_pos[-1] + (action[-1] - 1) * self.config.max_gripper_pos, + 0, + self.config.max_gripper_pos, + ) # Send joint space action to parent class return super().send_action(joint_action) diff --git a/lerobot/common/teleoperators/gamepad/configuration_gamepad.py b/lerobot/common/teleoperators/gamepad/configuration_gamepad.py index 10cdf6aa..f6a73787 100644 --- a/lerobot/common/teleoperators/gamepad/configuration_gamepad.py +++ b/lerobot/common/teleoperators/gamepad/configuration_gamepad.py @@ -25,8 +25,8 @@ class GamepadTeleopConfig(TeleoperatorConfig): # TODO(Steven): Consider setting in here the keys that we want to capture/listen mock: bool = False - x_step_size: float = 0.05 - y_step_size: float = 0.05 - z_step_size: float = 0.05 + x_step_size: float = 0.1 + y_step_size: float = 0.1 + z_step_size: float = 0.1 use_gripper: bool = True diff --git a/lerobot/common/teleoperators/gamepad/teleop_gamepad.py b/lerobot/common/teleoperators/gamepad/teleop_gamepad.py index 5d4a5caf..a8e1ed9a 100644 --- a/lerobot/common/teleoperators/gamepad/teleop_gamepad.py +++ b/lerobot/common/teleoperators/gamepad/teleop_gamepad.py @@ -122,7 +122,8 @@ class GamepadTeleop(Teleoperator): "delta_z": gamepad_action[2], } - gripper_action = None + # Default gripper action is to stay + gripper_action = GripperAction.STAY.value if self.config.use_gripper: gripper_command = self.gamepad.gripper_command() gripper_action = gripper_action_map[gripper_command] diff --git a/lerobot/scripts/server/find_joint_limits.py b/lerobot/scripts/server/find_joint_limits.py index dd7ed1d8..178ea7fa 100644 --- a/lerobot/scripts/server/find_joint_limits.py +++ b/lerobot/scripts/server/find_joint_limits.py @@ -14,122 +14,93 @@ # See the License for the specific language governing permissions and # limitations under the License. -import argparse +""" +Simple script to control a robot from teleoperation. + +Example: + +```shell +python -m lerobot.scripts.server.find_joint_limits \ + --robot.type=so100_follower \ + --robot.port=/dev/tty.usbmodem58760431541 \ + --robot.id=black \ + --teleop.type=so100_leader \ + --teleop.port=/dev/tty.usbmodem58760431551 \ + --teleop.id=blue +``` +""" + import time -import cv2 import numpy as np +import draccus -from lerobot.common.robot_devices.control_utils import is_headless -from lerobot.common.robot_devices.robots.configs import RobotConfig -from lerobot.common.robot_devices.robots.utils import make_robot_from_config -from lerobot.configs import parser -from lerobot.scripts.server.kinematics import RobotKinematics +from lerobot.common.robots import ( # noqa: F401 + RobotConfig, + koch_follower, + make_robot_from_config, + so100_follower, + so100_follower_end_effector, +) +from lerobot.common.teleoperators import ( + TeleoperatorConfig, + make_teleoperator_from_config, +) + +from dataclasses import dataclass +from lerobot.common.teleoperators import gamepad, koch_leader, so100_leader # noqa: F401 +from lerobot.common.model.kinematics import RobotKinematics -def find_joint_bounds( - robot, - control_time_s=30, - display_cameras=False, -): - if not robot.is_connected: - robot.connect() +@dataclass +class FindJointLimitsConfig: + teleop: TeleoperatorConfig + robot: RobotConfig + # Limit the maximum frames per second. By default, no limit. + fps: int | None = None + teleop_time_s: float | None = None + # Display all cameras on screen + display_data: bool = False - start_episode_t = time.perf_counter() - pos_list = [] - while True: - observation, action = robot.teleop_step(record_data=True) +@draccus.wrap() +def find_joint_and_ee_bounds(cfg: FindJointLimitsConfig): + teleop = make_teleoperator_from_config(cfg.teleop) + robot = make_robot_from_config(cfg.robot) - # Wait for 5 seconds to stabilize the robot initial position - if time.perf_counter() - start_episode_t < 5: - continue - - pos_list.append(robot.follower_arms["main"].read("Present_Position")) - - if display_cameras and not is_headless(): - image_keys = [key for key in observation if "image" in key] - for key in image_keys: - cv2.imshow(key, cv2.cvtColor(observation[key].numpy(), cv2.COLOR_RGB2BGR)) - cv2.waitKey(1) - - if time.perf_counter() - start_episode_t > control_time_s: - max = np.max(np.stack(pos_list), 0) - min = np.min(np.stack(pos_list), 0) - print(f"Max angle position per joint {max}") - print(f"Min angle position per joint {min}") - break - - -def find_ee_bounds( - robot, - control_time_s=30, - display_cameras=False, -): - if not robot.is_connected: - robot.connect() + teleop.connect() + robot.connect() start_episode_t = time.perf_counter() ee_list = [] + pos_list = [] + robot_type = cfg.robot.type.split("_")[0] + kinematics = RobotKinematics(robot_type) + control_time_s = 30 while True: - observation, action = robot.teleop_step(record_data=True) + action = teleop.get_action() + robot.send_action(action) + observation = robot.get_observation() # Wait for 5 seconds to stabilize the robot initial position if time.perf_counter() - start_episode_t < 5: continue - kinematics = RobotKinematics(robot.robot_type) - joint_positions = robot.follower_arms["main"].read("Present_Position") - print(f"Joint positions: {joint_positions}") + joint_positions = robot.bus.sync_read("Present_Position") + joint_positions = np.array([joint_positions[key] for key in joint_positions.keys()]) ee_list.append(kinematics.fk_gripper_tip(joint_positions)[:3, 3]) - - if display_cameras and not is_headless(): - image_keys = [key for key in observation if "image" in key] - for key in image_keys: - cv2.imshow(key, cv2.cvtColor(observation[key].numpy(), cv2.COLOR_RGB2BGR)) - cv2.waitKey(1) + pos_list.append(joint_positions) if time.perf_counter() - start_episode_t > control_time_s: - max = np.max(np.stack(ee_list), 0) - min = np.min(np.stack(ee_list), 0) - print(f"Max ee position {max}") - print(f"Min ee position {min}") + max_ee = np.max(np.stack(ee_list), 0) + min_ee = np.min(np.stack(ee_list), 0) + max_pos = np.max(np.stack(pos_list), 0) + min_pos = np.min(np.stack(pos_list), 0) + print(f"Max ee position {max_ee}") + print(f"Min ee position {min_ee}") + print(f"Max joint pos position {max_pos}") + print(f"Min joint pos position {min_pos}") break if __name__ == "__main__": - # Create argparse for script-specific arguments - parser = argparse.ArgumentParser(add_help=False) # Set add_help=False to avoid conflict - parser.add_argument( - "--mode", - type=str, - default="joint", - choices=["joint", "ee"], - help="Mode to run the script in. Can be 'joint' or 'ee'.", - ) - parser.add_argument( - "--control-time-s", - type=int, - default=30, - help="Time step to use for control.", - ) - parser.add_argument( - "--robot-type", - type=str, - default="so100", - help="Robot type (so100, koch, aloha, etc.)", - ) - - # Only parse known args, leaving robot config args for Hydra if used - args = parser.parse_args() - - # Create robot with the appropriate config - robot_config = RobotConfig.get_choice_class(args.robot_type)(mock=False) - robot = make_robot_from_config(robot_config) - - if args.mode == "joint": - find_joint_bounds(robot, args.control_time_s) - elif args.mode == "ee": - find_ee_bounds(robot, args.control_time_s) - - if robot.is_connected: - robot.disconnect() + find_joint_and_ee_bounds() diff --git a/lerobot/scripts/server/gym_manipulator.py b/lerobot/scripts/server/gym_manipulator.py index 7ec22e31..aafed343 100644 --- a/lerobot/scripts/server/gym_manipulator.py +++ b/lerobot/scripts/server/gym_manipulator.py @@ -28,19 +28,32 @@ import torchvision.transforms.functional as F # noqa: N812 from lerobot.common.envs.configs import EnvConfig from lerobot.common.envs.utils import preprocess_observation -from lerobot.common.robot_devices.control_utils import ( - busy_wait, - is_headless, - reset_follower_position, -) -from lerobot.common.robot_devices.robots.utils import make_robot_from_config +from lerobot.common.utils.robot_utils import busy_wait + +from lerobot.common.robots import RobotConfig, make_robot_from_config, so100_follower_end_effector # noqa: F401 +from lerobot.common.cameras import intel, opencv # noqa: F401 from lerobot.common.utils.utils import log_say from lerobot.configs import parser -from lerobot.scripts.server.kinematics import RobotKinematics +from lerobot.common.model.kinematics import RobotKinematics +from lerobot.common.teleoperators.gamepad.teleop_gamepad import GamepadTeleop + +from lerobot.common.teleoperators.gamepad.configuration_gamepad import GamepadTeleopConfig +from lerobot.common.teleoperators import make_teleoperator_from_config +from lerobot.common.teleoperators import gamepad #noqa: F401 logging.basicConfig(level=logging.INFO) MAX_GRIPPER_COMMAND = 30 +def reset_follower_position(robot_arm, target_position): + current_position_dict = robot_arm.bus.sync_read("Present_Position") + current_position = np.array([current_position_dict[name] for name in current_position_dict.keys()], dtype=np.float32) + trajectory = torch.from_numpy( + np.linspace(current_position, target_position, 50) + ) # NOTE: 30 is just an arbitrary number + for pose in trajectory: + action_dict = {name: pose for name, pose in zip(current_position_dict.keys(), pose)} + robot_arm.bus.sync_write("Goal_Position", action_dict) + busy_wait(0.015) class TorchBox(gym.spaces.Box): """ @@ -207,10 +220,16 @@ class RobotEnv(gym.Env): self.current_step = 0 self.episode_data = None - self.current_joint_positions = self.robot.follower_arms["main"].read("Present_Position") + # Read initial joint positions using the bus + self.current_joint_positions = self._get_observation() self._setup_spaces() + def _get_observation(self) -> np.ndarray: + """Helper to convert a dictionary from bus.sync_read to an ordered numpy array.""" + joint_positions_dict = self.robot.bus.sync_read("Present_Position") + return np.array([joint_positions_dict[name] for name in joint_positions_dict.keys()], dtype=np.float32) + def _setup_spaces(self): """ Dynamically configure the observation and action spaces based on the robot's capabilities. @@ -223,7 +242,7 @@ class RobotEnv(gym.Env): - The action space is defined as a Box space representing joint position commands. It is defined as relative (delta) or absolute, based on the configuration. """ - example_obs = self.robot.capture_observation() + example_obs = self._get_observation() # Define observation spaces for images and other states. image_keys = [key for key in example_obs if "image" in key] @@ -241,7 +260,7 @@ class RobotEnv(gym.Env): self.observation_space = gym.spaces.Dict(observation_spaces) # Define the action space for joint positions along with setting an intervention flag. - action_dim = len(self.robot.follower_arms["main"].read("Present_Position")) + action_dim = len(self.robot.action_features["shape"]) bounds = {} bounds["min"] = np.ones(action_dim) * -1000 bounds["max"] = np.ones(action_dim) * 1000 @@ -270,7 +289,7 @@ class RobotEnv(gym.Env): super().reset(seed=seed, options=options) # Capture the initial observation. - observation = self.robot.capture_observation() + observation = self._get_observation() # Reset episode tracking variables. self.current_step = 0 @@ -296,10 +315,10 @@ class RobotEnv(gym.Env): - truncated (bool): True if the episode was truncated (e.g., time constraints). - info (dict): Additional debugging information including intervention status. """ - self.current_joint_positions = self.robot.follower_arms["main"].read("Present_Position") + self.current_joint_positions = self._get_observation() self.robot.send_action(torch.from_numpy(action)) - observation = self.robot.capture_observation() + observation = self._get_observation() if self.display_cameras: self.render() @@ -324,7 +343,7 @@ class RobotEnv(gym.Env): """ import cv2 - observation = self.robot.capture_observation() + observation = self._get_observation() image_keys = [key for key in observation if "image" in key] for key in image_keys: @@ -447,11 +466,11 @@ class AddCurrentToObservation(gym.ObservationWrapper): Returns: The modified observation with current values. """ - present_current = ( - self.unwrapped.robot.follower_arms["main"].read("Present_Current").astype(np.float32) - ) + present_current_dict = self.unwrapped.robot.bus.sync_read("Present_Current") + present_current_observation = torch.tensor([present_current_dict[name] for name in present_current_dict.keys()], dtype=np.float32) + observation["observation.state"] = torch.cat( - [observation["observation.state"], torch.from_numpy(present_current)], dim=-1 + [observation["observation.state"], present_current_observation], dim=-1 ) return observation @@ -778,14 +797,14 @@ class ResetWrapper(gym.Wrapper): start_time = time.perf_counter() if self.reset_pose is not None: log_say("Reset the environment.", play_sounds=True) - reset_follower_position(self.robot.follower_arms["main"], self.reset_pose) + reset_follower_position(self.unwrapped.robot, self.reset_pose) log_say("Reset the environment done.", play_sounds=True) - if len(self.robot.leader_arms) > 0: - self.robot.leader_arms["main"].write("Torque_Enable", 1) - log_say("Reset the leader robot.", play_sounds=True) - reset_follower_position(self.robot.leader_arms["main"], self.reset_pose) - log_say("Reset the leader robot done.", play_sounds=True) + # if len(self.robot.leader_arms) > 0: + # self.robot.leader_arms["main"].write("Torque_Enable", 1) + # log_say("Reset the leader robot.", play_sounds=True) + # reset_follower_position(self.robot.leader_arms["main"], self.reset_pose) + # log_say("Reset the leader robot done.", play_sounds=True) else: log_say( f"Manually reset the environment for {self.reset_time_s} seconds.", @@ -890,7 +909,8 @@ class GripperPenaltyWrapper(gym.RewardWrapper): Returns: Tuple of (observation, reward, terminated, truncated, info) with penalty applied. """ - self.last_gripper_state = self.unwrapped.robot.follower_arms["main"].read("Present_Position")[-1] + self.last_gripper_state = self.unwrapped.robot.bus.sync_read("Present_Position")["gripper"] + gripper_action = action[-1] obs, reward, terminated, truncated, info = self.env.step(action) gripper_penalty = self.reward(reward, gripper_action) @@ -969,9 +989,11 @@ class GripperActionWrapper(gym.ActionWrapper): np.sign(gripper_command) if abs(gripper_command) > self.quantization_threshold else 0.0 ) gripper_command = gripper_command * MAX_GRIPPER_COMMAND - gripper_state = self.unwrapped.robot.follower_arms["main"].read("Present_Position")[-1] - gripper_action = np.clip(gripper_state + gripper_command, 0, MAX_GRIPPER_COMMAND) - action[-1] = gripper_action.item() + + gripper_state = self.unwrapped.robot.bus.sync_read("Present_Position")["gripper"] + + gripper_action_value = np.clip(gripper_state + gripper_command, 0, MAX_GRIPPER_COMMAND) + action[-1] = gripper_action_value.item() return action def reset(self, **kwargs): @@ -1056,7 +1078,8 @@ class EEActionWrapper(gym.ActionWrapper): gripper_command = action[-1] action = action[:-1] - current_joint_pos = self.unwrapped.robot.follower_arms["main"].read("Present_Position") + current_joint_pos = self.unwrapped._get_observation() + current_ee_pos = self.fk_function(current_joint_pos) desired_ee_pos[:3, 3] = np.clip( current_ee_pos[:3, 3] + action, @@ -1118,7 +1141,8 @@ class EEObservationWrapper(gym.ObservationWrapper): Returns: Enhanced observation with end-effector pose information. """ - current_joint_pos = self.unwrapped.robot.follower_arms["main"].read("Present_Position") + current_joint_pos = self.unwrapped._get_observation() + current_ee_pos = self.fk_function(current_joint_pos) observation["observation.state"] = torch.cat( [ @@ -1156,8 +1180,8 @@ class BaseLeaderControlWrapper(gym.Wrapper): use_gripper: Whether to include gripper control. """ super().__init__(env) - self.robot_leader = env.unwrapped.robot.leader_arms["main"] - self.robot_follower = env.unwrapped.robot.follower_arms["main"] + self.robot_leader = env.unwrapped.teleop_device + self.robot_follower = env.unwrapped.robot self.use_geared_leader_arm = use_geared_leader_arm self.ee_action_space_params = ee_action_space_params self.use_ee_action_space = ee_action_space_params is not None @@ -1229,24 +1253,14 @@ class BaseLeaderControlWrapper(gym.Wrapper): This method sets up keyboard event handling if not in headless mode. """ - if is_headless(): - logging.warning( - "Headless environment detected. On-screen cameras display and keyboard inputs will not be available." - ) - return - try: - from pynput import keyboard + from pynput import keyboard - def on_press(key): - with self.event_lock: - self._handle_key_press(key, keyboard) + def on_press(key): + with self.event_lock: + self._handle_key_press(key, keyboard) - self.listener = keyboard.Listener(on_press=on_press) - self.listener.start() - - except ImportError: - logging.warning("Could not import pynput. Keyboard interface will not be available.") - self.listener = None + self.listener = keyboard.Listener(on_press=on_press) + self.listener.start() def _check_intervention(self): """ @@ -1273,8 +1287,11 @@ class BaseLeaderControlWrapper(gym.Wrapper): self.robot_leader.write("Torque_Enable", 0) self.leader_torque_enabled = False - leader_pos = self.robot_leader.read("Present_Position") - follower_pos = self.robot_follower.read("Present_Position") + leader_pos_dict = self.robot_leader.bus.sync_read("Present_Position") + follower_pos_dict = self.robot_follower.bus.sync_read("Present_Position") + + leader_pos = np.array([leader_pos_dict[name] for name in leader_pos_dict.keys()], dtype=np.float32) + follower_pos = np.array([follower_pos_dict[name] for name in follower_pos_dict.keys()], dtype=np.float32) # [:3, 3] Last column of the transformation matrix corresponds to the xyz translation leader_ee = self.kinematics.fk_gripper_tip(leader_pos)[:3, 3] @@ -1320,7 +1337,10 @@ class BaseLeaderControlWrapper(gym.Wrapper): self.robot_leader.write("Torque_Enable", 1) self.leader_torque_enabled = True - follower_pos = self.robot_follower.read("Present_Position") + + follower_pos_dict = self.robot_follower.bus.sync_read("Present_Position") + follower_pos = np.array([follower_pos_dict[name] for name in follower_pos_dict.keys()], dtype=np.float32) + self.robot_leader.write("Goal_Position", follower_pos) def step(self, action): @@ -1385,7 +1405,6 @@ class BaseLeaderControlWrapper(gym.Wrapper): self.listener.stop() return self.env.close() - class GearedLeaderControlWrapper(BaseLeaderControlWrapper): """ Wrapper that enables manual intervention via keyboard. @@ -1497,8 +1516,11 @@ class GearedLeaderAutomaticControlWrapper(BaseLeaderControlWrapper): return False # Get current positions - leader_positions = self.robot_leader.read("Present_Position") - follower_positions = self.robot_follower.read("Present_Position") + leader_positions_dict = self.robot_leader.bus.sync_read("Present_Position") + follower_positions_dict = self.robot_follower.bus.sync_read("Present_Position") + + leader_positions = np.array([leader_positions_dict[name] for name in leader_positions_dict.keys()], dtype=np.float32) + follower_positions = np.array([follower_positions_dict[name] for name in follower_positions_dict.keys()], dtype=np.float32) leader_ee = self.kinematics.fk_gripper_tip(leader_positions)[:3, 3] follower_ee = self.kinematics.fk_gripper_tip(follower_positions)[:3, 3] @@ -1568,52 +1590,35 @@ class GamepadControlWrapper(gym.Wrapper): def __init__( self, env, - x_step_size=1.0, - y_step_size=1.0, - z_step_size=1.0, - use_gripper=False, + teleop_device, # Accepts an instantiated teleoperator + use_gripper=False, # This should align with teleop_device's config auto_reset=False, - input_threshold=0.001, ): """ Initialize the gamepad controller wrapper. Args: env: The environment to wrap. - x_step_size: Base movement step size for X axis in meters. - y_step_size: Base movement step size for Y axis in meters. - z_step_size: Base movement step size for Z axis in meters. - use_gripper: Whether to include gripper control. + teleop_device: The instantiated teleoperation device (e.g., GamepadTeleop). + use_gripper: Whether to include gripper control (should match teleop_device.config.use_gripper). auto_reset: Whether to auto reset the environment when episode ends. - input_threshold: Minimum movement delta to consider as active input. """ super().__init__(env) - # use HidApi for macos - if sys.platform == "darwin": - # NOTE: On macOS, pygame doesn’t reliably detect input from some controllers so we fall back to hidapi - from lerobot.scripts.server.end_effector_control_utils import GamepadControllerHID + self.teleop_device = teleop_device + # Ensure the teleop_device is connected if it has a connect method + if hasattr(self.teleop_device, 'connect') and not self.teleop_device.is_connected: + self.teleop_device.connect() + + # self.controller attribute is removed - self.controller = GamepadControllerHID( - x_step_size=x_step_size, - y_step_size=y_step_size, - z_step_size=z_step_size, - ) - else: - from lerobot.scripts.server.end_effector_control_utils import GamepadController - - self.controller = GamepadController( - x_step_size=x_step_size, - y_step_size=y_step_size, - z_step_size=z_step_size, - ) self.auto_reset = auto_reset - self.use_gripper = use_gripper - self.input_threshold = input_threshold - self.controller.start() + # use_gripper from args should ideally match teleop_device.config.use_gripper + # For now, we use the one passed, but it can lead to inconsistency if not set correctly from config + self.use_gripper = use_gripper - logging.info("Gamepad control wrapper initialized") - print("Gamepad controls:") + logging.info("Gamepad control wrapper initialized with provided teleop_device.") + print("Gamepad controls (managed by the provided teleop_device - specific button mappings might vary):") print(" Left analog stick: Move in X-Y plane") print(" Right analog stick: Move in Z axis (up/down)") print(" X/Square button: End episode (FAILURE)") @@ -1628,42 +1633,43 @@ class GamepadControlWrapper(gym.Wrapper): Returns: Tuple containing: - - is_active: Whether gamepad input is active - - action: The action derived from gamepad input + - is_active: Whether gamepad input is active (from teleop_device.gamepad.should_intervene()) + - action: The action derived from gamepad input (from teleop_device.get_action()) - terminate_episode: Whether episode termination was requested - success: Whether episode success was signaled - rerecord_episode: Whether episode rerecording was requested """ - # Update the controller to get fresh inputs - self.controller.update() + if not hasattr(self.teleop_device, 'gamepad') or self.teleop_device.gamepad is None: + raise AttributeError("teleop_device does not have a 'gamepad' attribute or it is None. Expected for GamepadControlWrapper.") - # Get movement deltas from the controller - delta_x, delta_y, delta_z = self.controller.get_deltas() - - intervention_is_active = self.controller.should_intervene() - - # Create action from gamepad input - gamepad_action = np.array([delta_x, delta_y, delta_z], dtype=np.float32) - - if self.use_gripper: - gripper_command = self.controller.gripper_command() - if gripper_command == "open": - gamepad_action = np.concatenate([gamepad_action, [2.0]]) - elif gripper_command == "close": - gamepad_action = np.concatenate([gamepad_action, [0.0]]) - else: - gamepad_action = np.concatenate([gamepad_action, [1.0]]) - - # Check episode ending buttons - # We'll rely on controller.get_episode_end_status() which returns "success", "failure", or None - episode_end_status = self.controller.get_episode_end_status() + # Get status flags from the underlying gamepad controller within the teleop_device + self.teleop_device.gamepad.update() # Ensure gamepad state is fresh + intervention_is_active = self.teleop_device.gamepad.should_intervene() + episode_end_status = self.teleop_device.gamepad.get_episode_end_status() + terminate_episode = episode_end_status is not None success = episode_end_status == "success" rerecord_episode = episode_end_status == "rerecord_episode" + # Get the action dictionary from the teleop_device + action_dict = self.teleop_device.get_action() + + # Convert action_dict to numpy array based on expected structure + # Order: delta_x, delta_y, delta_z, gripper (if use_gripper) + action_list = [action_dict['delta_x'], action_dict['delta_y'], action_dict['delta_z']] + if self.use_gripper: + # GamepadTeleop returns gripper action as 0 (close), 1 (stay), 2 (open) + # This needs to be consistent with what EEActionWrapper expects if it's used downstream + # EEActionWrapper for gripper typically expects 0.0 (closed) to 2.0 (open) + # For now, we pass the direct value from GamepadTeleop, ensure downstream compatibility. + gripper_val = action_dict.get('gripper', 1.0) # Default to 1.0 (stay) if not present + action_list.append(float(gripper_val)) + + gamepad_action_np = np.array(action_list, dtype=np.float32) + return ( intervention_is_active, - gamepad_action, + gamepad_action_np, terminate_episode, success, rerecord_episode, @@ -1693,10 +1699,10 @@ class GamepadControlWrapper(gym.Wrapper): logging.info(f"Episode manually ended: {'SUCCESS' if success else 'FAILURE'}") # Only override the action if gamepad is active - action = gamepad_action if is_intervention else action + action_to_step = gamepad_action if is_intervention else action # Step the environment - obs, reward, terminated, truncated, info = self.env.step(action) + obs, reward, terminated, truncated, info = self.env.step(action_to_step) # Add episode ending if requested via gamepad terminated = terminated or truncated or terminate_episode @@ -1709,7 +1715,14 @@ class GamepadControlWrapper(gym.Wrapper): action = torch.from_numpy(action) info["is_intervention"] = is_intervention + # The original `BaseLeaderControlWrapper` puts `action_intervention` in info. + # For Gamepad, if intervention, `gamepad_action` is the intervention. + # If not intervention, policy's action is `action`. + # For consistency, let's store the *human's* action if intervention occurred. info["action_intervention"] = action + if is_intervention: + info["action_intervention"] = gamepad_action + info["rerecord_episode"] = rerecord_episode # If episode ended, reset the state @@ -1731,9 +1744,8 @@ class GamepadControlWrapper(gym.Wrapper): Returns: Result of closing the wrapped environment. """ - # Stop the controller - if hasattr(self, "controller"): - self.controller.stop() + if hasattr(self.teleop_device, 'disconnect'): + self.teleop_device.disconnect() # Call the parent close method return self.env.close() @@ -1792,9 +1804,9 @@ class GymHilObservationProcessorWrapper(gym.ObservationWrapper): ########################################################### -def make_robot_env(cfg) -> gym.vector.VectorEnv: +def make_robot_env(cfg: EnvConfig) -> gym.Env: """ - Factory function to create a vectorized robot environment. + Factory function to create a robot environment. This function builds a robot environment with all necessary wrappers based on the provided configuration. @@ -1803,8 +1815,7 @@ def make_robot_env(cfg) -> gym.vector.VectorEnv: cfg: Configuration object containing environment parameters. Returns: - - A vectorized gym environment with all necessary wrappers applied. + A gym environment with all necessary wrappers applied. """ if cfg.type == "hil": import gym_hil # noqa: F401 @@ -1824,24 +1835,60 @@ def make_robot_env(cfg) -> gym.vector.VectorEnv: env = TorchActionWrapper(env=env, device=cfg.device) return env + if not hasattr(cfg, 'robot') or not hasattr(cfg, 'teleop'): + raise ValueError("Configuration for 'gym_manipulator' must be HILSerlRobotEnvConfig with robot and teleop.") + + if cfg.robot is None: + raise ValueError("RobotConfig (cfg.robot) must be provided for gym_manipulator environment.") robot = make_robot_from_config(cfg.robot) + + # Instantiate teleoperation device + teleop_device = None + if cfg.teleop is not None: + # Ensure the config is of the correct type for GamepadTeleop if that's what make_teleoperator expects + # or if we are instantiating directly. + # For GamepadTeleop, its config is LerobotGamepadTeleopConfig. + if cfg.teleop.type == "gamepad": + if not isinstance(cfg.teleop, GamepadTeleopConfig): + # If the main teleop is just a base TeleoperatorConfig with type="gamepad", + # we might need to create the specific LerobotGamepadTeleopConfig here, + # or ensure the user provides the specific one in the main YAML config. + # For now, assume cfg.teleop IS a LerobotGamepadTeleopConfig instance. + raise ValueError("cfg.teleop must be an instance of LerobotGamepadTeleopConfig when type is 'gamepad'.") + # Directly instantiate GamepadTeleop, as make_teleoperator might expect a different setup or registry. + # Or, if make_teleoperator is robust, it could handle it. + # Given we know it's GamepadTeleop from common.teleoperators for this path: + teleop_device = GamepadTeleop(config=cfg.teleop) + else: + # For other types, rely on make_teleoperator if it's generic enough + try: + teleop_device = make_teleoperator_from_config(cfg.teleop) + except Exception as e: + raise NotImplementedError(f"Teleop device type '{cfg.teleop.type}' not supported or failed instantiation with make_teleoperator_from_config: {e}") + + + teleop_device.connect() + # Create base environment env = RobotEnv( robot=robot, - display_cameras=cfg.wrapper.display_cameras, + display_cameras=cfg.wrapper.display_cameras if cfg.wrapper else False, ) # Add observation and image processing - if cfg.wrapper.add_joint_velocity_to_observation: - env = AddJointVelocityToObservation(env=env, fps=cfg.fps) - if cfg.wrapper.add_current_to_observation: - env = AddCurrentToObservation(env=env) - if cfg.wrapper.add_ee_pose_to_observation: - env = EEObservationWrapper(env=env, ee_pose_limits=cfg.wrapper.ee_action_space_params.bounds) + if cfg.wrapper: + if cfg.wrapper.add_joint_velocity_to_observation: + env = AddJointVelocityToObservation(env=env, fps=cfg.fps) + if cfg.wrapper.add_current_to_observation: + env = AddCurrentToObservation(env=env) + if cfg.wrapper.add_ee_pose_to_observation: + if cfg.wrapper.ee_action_space_params is None or cfg.wrapper.ee_action_space_params.bounds is None: + raise ValueError("EEActionSpaceConfig with bounds must be provided for EEObservationWrapper.") + env = EEObservationWrapper(env=env, ee_pose_limits=cfg.wrapper.ee_action_space_params.bounds) env = ConvertToLeRobotObservation(env=env, device=cfg.device) - if cfg.wrapper.crop_params_dict is not None: + if cfg.wrapper and cfg.wrapper.crop_params_dict is not None: env = ImageCropResizeWrapper( env=env, crop_params_dict=cfg.wrapper.crop_params_dict, @@ -1852,49 +1899,56 @@ def make_robot_env(cfg) -> gym.vector.VectorEnv: reward_classifier = init_reward_classifier(cfg) if reward_classifier is not None: env = RewardWrapper(env=env, reward_classifier=reward_classifier, device=cfg.device) - env = TimeLimitWrapper(env=env, control_time_s=cfg.wrapper.control_time_s, fps=cfg.fps) - if cfg.wrapper.use_gripper: - env = GripperActionWrapper(env=env, quantization_threshold=cfg.wrapper.gripper_quantization_threshold) - if cfg.wrapper.gripper_penalty is not None: - env = GripperPenaltyWrapper( + + if cfg.wrapper: + env = TimeLimitWrapper(env=env, control_time_s=cfg.wrapper.control_time_s, fps=cfg.fps) + if cfg.wrapper.use_gripper: + env = GripperActionWrapper(env=env, quantization_threshold=cfg.wrapper.gripper_quantization_threshold) + if cfg.wrapper.gripper_penalty is not None: + env = GripperPenaltyWrapper( + env=env, + penalty=cfg.wrapper.gripper_penalty, + ) + + # env = EEActionWrapper( + # env=env, + # ee_action_space_params=cfg.wrapper.ee_action_space_params, + # use_gripper=cfg.wrapper.use_gripper, + # ) + + # Control mode specific wrappers + control_mode = cfg.wrapper.ee_action_space_params.control_mode + if control_mode == "gamepad": + if teleop_device is None: + raise ValueError("A teleop_device must be instantiated for gamepad control mode.") + if not isinstance(teleop_device, GamepadTeleop): + raise ValueError(f"teleop_device must be an instance of GamepadTeleop for gamepad control mode, got {type(teleop_device)}.") + env = GamepadControlWrapper( env=env, - penalty=cfg.wrapper.gripper_penalty, + teleop_device=teleop_device, + use_gripper=cfg.wrapper.use_gripper, ) - - env = EEActionWrapper( - env=env, - ee_action_space_params=cfg.wrapper.ee_action_space_params, - use_gripper=cfg.wrapper.use_gripper, - ) - - if cfg.wrapper.ee_action_space_params.control_mode == "gamepad": - env = GamepadControlWrapper( + elif control_mode == "leader": + env = GearedLeaderControlWrapper( + env=env, + ee_action_space_params=cfg.wrapper.ee_action_space_params, + use_gripper=cfg.wrapper.use_gripper, + ) + elif control_mode == "leader_automatic": + env = GearedLeaderAutomaticControlWrapper( + env=env, + ee_action_space_params=cfg.wrapper.ee_action_space_params, + use_gripper=cfg.wrapper.use_gripper, + ) + elif control_mode not in ["gamepad", "leader", "leader_automatic"]: # Ensure there's a fallback or error for unsupported control modes + raise ValueError(f"Invalid control mode: {control_mode}") + + env = ResetWrapper( env=env, - x_step_size=cfg.wrapper.ee_action_space_params.x_step_size, - y_step_size=cfg.wrapper.ee_action_space_params.y_step_size, - z_step_size=cfg.wrapper.ee_action_space_params.z_step_size, - use_gripper=cfg.wrapper.use_gripper, + reset_pose=cfg.wrapper.fixed_reset_joint_positions, + reset_time_s=cfg.wrapper.reset_time_s, ) - elif cfg.wrapper.ee_action_space_params.control_mode == "leader": - env = GearedLeaderControlWrapper( - env=env, - ee_action_space_params=cfg.wrapper.ee_action_space_params, - use_gripper=cfg.wrapper.use_gripper, - ) - elif cfg.wrapper.ee_action_space_params.control_mode == "leader_automatic": - env = GearedLeaderAutomaticControlWrapper( - env=env, - ee_action_space_params=cfg.wrapper.ee_action_space_params, - use_gripper=cfg.wrapper.use_gripper, - ) - else: - raise ValueError(f"Invalid control mode: {cfg.wrapper.ee_action_space_params.control_mode}") - env = ResetWrapper( - env=env, - reset_pose=cfg.wrapper.fixed_reset_joint_positions, - reset_time_s=cfg.wrapper.reset_time_s, - ) env = BatchCompatibleWrapper(env=env) env = TorchActionWrapper(env=env, device=cfg.device) @@ -2131,8 +2185,7 @@ def replay_episode(env, cfg): @parser.wrap() def main(cfg: EnvConfig): - """ - Main entry point for the robot environment script. + """ Main entry point for the robot environment script. This function runs the robot environment in one of several modes based on the provided configuration. @@ -2168,35 +2221,38 @@ def main(cfg: EnvConfig): env.reset() - # Initialize the smoothed action as a random sample. - smoothed_action = env.action_space.sample() + # # Initialize the smoothed action as a random sample. + # smoothed_action = env.action_space.sample() - # Smoothing coefficient (alpha) defines how much of the new random sample to mix in. - # A value close to 0 makes the trajectory very smooth (slow to change), while a value close to 1 is less smooth. - alpha = 1.0 + # # Smoothing coefficient (alpha) defines how much of the new random sample to mix in. + # # A value close to 0 makes the trajectory very smooth (slow to change), while a value close to 1 is less smooth. + # alpha = 1.0 - num_episode = 0 - successes = [] - while num_episode < 10: - start_loop_s = time.perf_counter() - # Sample a new random action from the robot's action space. - new_random_action = env.action_space.sample() - # Update the smoothed action using an exponential moving average. - smoothed_action = alpha * new_random_action + (1 - alpha) * smoothed_action + # num_episode = 0 + # successes = [] + # while num_episode < 10: + # start_loop_s = time.perf_counter() + # # Sample a new random action from the robot's action space. + # new_random_action = env.action_space.sample() + # # Update the smoothed action using an exponential moving average. + # smoothed_action = alpha * new_random_action + (1 - alpha) * smoothed_action - # Execute the step: wrap the NumPy action in a torch tensor. - obs, reward, terminated, truncated, info = env.step(smoothed_action) - if terminated or truncated: - successes.append(reward) - env.reset() - num_episode += 1 + # # Execute the step: wrap the NumPy action in a torch tensor. + # obs, reward, terminated, truncated, info = env.step(smoothed_action) + # if terminated or truncated: + # successes.append(reward) + # env.reset() + # num_episode += 1 - dt_s = time.perf_counter() - start_loop_s - busy_wait(1 / cfg.fps - dt_s) + # dt_s = time.perf_counter() - start_loop_s + # busy_wait(1 / cfg.fps - dt_s) - logging.info(f"Success after 20 steps {successes}") - logging.info(f"success rate {sum(successes) / len(successes)}") + # logging.info(f"Success after 20 steps {successes}") + # logging.info(f"success rate {sum(successes) / len(successes)}") if __name__ == "__main__": main() + + +