Modified kinematics code to be independant of drive mode

Modified gym_manipulator.py and find_joint_limits to adhere to the refactor of robot devices
Modified the configuration of envs to take into account the refactor
This commit is contained in:
Michel Aractingi
2025-05-23 17:08:51 +02:00
committed by AdilZouitine
parent ba477e81ce
commit 2475645f5f
11 changed files with 437 additions and 355 deletions

View File

@@ -40,7 +40,7 @@ from lerobot.common.robots import ( # noqa: F401
lekiwi,
make_robot_from_config,
so100_follower,
so101_follower,
so100_follower_end_effector,
)
from lerobot.common.teleoperators import ( # noqa: F401
Teleoperator,

View File

@@ -18,8 +18,9 @@ from typing import Any, Dict, Optional, Tuple
import draccus
from lerobot.common.constants import ACTION, OBS_ENV, OBS_IMAGE, OBS_IMAGES, OBS_ROBOT
from lerobot.common.robot_devices.robots.configs import RobotConfig
from lerobot.common.constants import ACTION, OBS_ENV_STATE, OBS_IMAGE, OBS_IMAGES, OBS_STATE
from lerobot.common.robots import RobotConfig
from lerobot.common.teleoperators.config import TeleoperatorConfig
from lerobot.configs.types import FeatureType, PolicyFeature
@@ -97,8 +98,8 @@ class PushtEnv(EnvConfig):
features_map: dict[str, str] = field(
default_factory=lambda: {
"action": ACTION,
"agent_pos": OBS_ROBOT,
"environment_state": OBS_ENV,
"agent_pos": OBS_STATE,
"environment_state": OBS_ENV_STATE,
"pixels": OBS_IMAGE,
}
)
@@ -139,7 +140,7 @@ class XarmEnv(EnvConfig):
features_map: dict[str, str] = field(
default_factory=lambda: {
"action": ACTION,
"agent_pos": OBS_ROBOT,
"agent_pos": OBS_STATE,
"pixels": OBS_IMAGE,
}
)
@@ -168,22 +169,23 @@ class VideoRecordConfig:
trajectory_name: str = "trajectory"
@dataclass
class EEActionSpaceConfig:
"""Configuration parameters for end-effector action space."""
# @dataclass
# class EEActionSpaceConfig:
# """Configuration parameters for end-effector action space."""
x_step_size: float
y_step_size: float
z_step_size: float
bounds: Dict[str, Any] # Contains 'min' and 'max' keys with position bounds
control_mode: str = "gamepad"
# x_step_size: float
# y_step_size: float
# z_step_size: float
# bounds: Dict[str, Any] # Contains 'min' and 'max' keys with position bounds
# control_mode: str = "gamepad"
@dataclass
class EnvTransformConfig:
"""Configuration for environment wrappers."""
ee_action_space_params: EEActionSpaceConfig = field(default_factory=EEActionSpaceConfig)
# ee_action_space_params: EEActionSpaceConfig = field(default_factory=EEActionSpaceConfig)
control_mode: str = "gamepad"
display_cameras: bool = False
add_joint_velocity_to_observation: bool = False
add_current_to_observation: bool = False
@@ -205,6 +207,7 @@ class HILSerlRobotEnvConfig(EnvConfig):
"""Configuration for the HILSerlRobotEnv environment."""
robot: Optional[RobotConfig] = None
teleop: Optional[TeleoperatorConfig] = None
wrapper: Optional[EnvTransformConfig] = None
fps: int = 10
name: str = "real_robot"
@@ -252,12 +255,13 @@ class HILEnvConfig(EnvConfig):
default_factory=lambda: {
"action": ACTION,
"observation.image": OBS_IMAGE,
"observation.state": OBS_ROBOT,
"observation.state": OBS_STATE,
}
)
################# args from hilserlrobotenv
reward_classifier_pretrained_path: Optional[str] = None
robot: Optional[RobotConfig] = None
robot_config: Optional[RobotConfig] = None
teleop_config: Optional[TeleoperatorConfig] = None
wrapper: Optional[EnvTransformConfig] = None
mode: str = None # Either "record", "replay", None
repo_id: Optional[str] = None

View File

@@ -1,3 +1,4 @@
from .kinematics import RobotKinematics
from lerobot.common.model.kinematics_utils import load_model, forward_kinematics, inverse_kinematics
__all__ = ["RobotKinematics"]
__all__ = ["RobotKinematics", "load_model", "forward_kinematics", "inverse_kinematics"]

View File

@@ -125,6 +125,14 @@ class RobotKinematics:
"shoulder": [0, 0, 0],
"base": [0, 0, 0.02],
},
"so101": {
"gripper": [0.33, 0.0, 0.285],
"wrist": [0.30, 0.0, 0.267],
"forearm": [0.25, 0.0, 0.266],
"humerus": [0.06, 0.0, 0.264],
"shoulder": [0.0, 0.0, 0.238],
"base": [0.0, 0.0, 0.12],
},
}
def __init__(self, robot_type="so100"):
@@ -308,10 +316,15 @@ class RobotKinematics:
def fk_humerus(self, robot_pos_deg):
"""Forward kinematics for the humerus frame."""
robot_pos_rad = robot_pos_deg / 180 * np.pi
theta_shoulder_pan = robot_pos_rad[0]
# NOTE: Negate shoulder lift angle for all robot types
theta_shoulder_lift = -robot_pos_rad[1]
return (
self.X_WoBo
@ screw_axis_to_transform(self.S_BS, robot_pos_rad[0])
@ screw_axis_to_transform(self.S_BH, robot_pos_rad[1])
@ screw_axis_to_transform(self.S_BS, theta_shoulder_pan)
@ screw_axis_to_transform(self.S_BH, theta_shoulder_lift)
@ self.X_HoHc
@ self.X_BH
)
@@ -319,11 +332,17 @@ class RobotKinematics:
def fk_forearm(self, robot_pos_deg):
"""Forward kinematics for the forearm frame."""
robot_pos_rad = robot_pos_deg / 180 * np.pi
theta_shoulder_pan = robot_pos_rad[0]
# NOTE: Negate shoulder lift angle for all robot types
theta_shoulder_lift = -robot_pos_rad[1]
theta_elbow_flex = robot_pos_rad[2]
return (
self.X_WoBo
@ screw_axis_to_transform(self.S_BS, robot_pos_rad[0])
@ screw_axis_to_transform(self.S_BH, robot_pos_rad[1])
@ screw_axis_to_transform(self.S_BF, robot_pos_rad[2])
@ screw_axis_to_transform(self.S_BS, theta_shoulder_pan)
@ screw_axis_to_transform(self.S_BH, theta_shoulder_lift)
@ screw_axis_to_transform(self.S_BF, theta_elbow_flex)
@ self.X_FoFc # spellchecker:disable-line
@ self.X_BF
)
@@ -331,12 +350,19 @@ class RobotKinematics:
def fk_wrist(self, robot_pos_deg):
"""Forward kinematics for the wrist frame."""
robot_pos_rad = robot_pos_deg / 180 * np.pi
theta_shoulder_pan = robot_pos_rad[0]
# NOTE: Negate shoulder lift angle for all robot types
theta_shoulder_lift = -robot_pos_rad[1]
theta_elbow_flex = robot_pos_rad[2]
theta_wrist_flex = robot_pos_rad[3]
return (
self.X_WoBo
@ screw_axis_to_transform(self.S_BS, robot_pos_rad[0])
@ screw_axis_to_transform(self.S_BH, robot_pos_rad[1])
@ screw_axis_to_transform(self.S_BF, robot_pos_rad[2])
@ screw_axis_to_transform(self.S_BR, robot_pos_rad[3])
@ screw_axis_to_transform(self.S_BS, theta_shoulder_pan)
@ screw_axis_to_transform(self.S_BH, theta_shoulder_lift)
@ screw_axis_to_transform(self.S_BF, theta_elbow_flex)
@ screw_axis_to_transform(self.S_BR, theta_wrist_flex)
@ self.X_RoRc
@ self.X_BR
@ self.wrist_X0
@@ -345,26 +371,42 @@ class RobotKinematics:
def fk_gripper(self, robot_pos_deg):
"""Forward kinematics for the gripper frame."""
robot_pos_rad = robot_pos_deg / 180 * np.pi
theta_shoulder_pan = robot_pos_rad[0]
# NOTE: Negate shoulder lift angle for all robot types
theta_shoulder_lift = -robot_pos_rad[1]
theta_elbow_flex = robot_pos_rad[2]
theta_wrist_flex = robot_pos_rad[3]
theta_wrist_roll = robot_pos_rad[4]
return (
self.X_WoBo
@ screw_axis_to_transform(self.S_BS, robot_pos_rad[0])
@ screw_axis_to_transform(self.S_BH, robot_pos_rad[1])
@ screw_axis_to_transform(self.S_BF, robot_pos_rad[2])
@ screw_axis_to_transform(self.S_BR, robot_pos_rad[3])
@ screw_axis_to_transform(self.S_BG, robot_pos_rad[4])
@ screw_axis_to_transform(self.S_BS, theta_shoulder_pan)
@ screw_axis_to_transform(self.S_BH, theta_shoulder_lift)
@ screw_axis_to_transform(self.S_BF, theta_elbow_flex)
@ screw_axis_to_transform(self.S_BR, theta_wrist_flex)
@ screw_axis_to_transform(self.S_BG, theta_wrist_roll)
@ self._fk_gripper_post
)
def fk_gripper_tip(self, robot_pos_deg):
"""Forward kinematics for the gripper tip frame."""
robot_pos_rad = robot_pos_deg / 180 * np.pi
theta_shoulder_pan = robot_pos_rad[0]
# Negate shoulder lift angle for all robot types
theta_shoulder_lift = -robot_pos_rad[1]
theta_elbow_flex = robot_pos_rad[2]
theta_wrist_flex = robot_pos_rad[3]
theta_wrist_roll = robot_pos_rad[4]
return (
self.X_WoBo
@ screw_axis_to_transform(self.S_BS, robot_pos_rad[0])
@ screw_axis_to_transform(self.S_BH, robot_pos_rad[1])
@ screw_axis_to_transform(self.S_BF, robot_pos_rad[2])
@ screw_axis_to_transform(self.S_BR, robot_pos_rad[3])
@ screw_axis_to_transform(self.S_BG, robot_pos_rad[4])
@ screw_axis_to_transform(self.S_BS, theta_shoulder_pan)
@ screw_axis_to_transform(self.S_BH, theta_shoulder_lift)
@ screw_axis_to_transform(self.S_BF, theta_elbow_flex)
@ screw_axis_to_transform(self.S_BR, theta_wrist_flex)
@ screw_axis_to_transform(self.S_BG, theta_wrist_roll)
@ self.X_GoGt
@ self.X_BoGo
@ self.gripper_X0
@@ -422,7 +464,7 @@ class RobotKinematics:
jac[:, el_ix] = Sdot
return jac
def ik(self, current_joint_state, desired_ee_pose, position_only=True, fk_func=None):
def ik(self, current_joint_pos, desired_ee_pose, position_only=True, fk_func=None):
"""Inverse kinematics using gradient descent.
Args:
@@ -438,6 +480,7 @@ class RobotKinematics:
fk_func = self.fk_gripper
# Do gradient descent.
current_joint_state = current_joint_pos.copy()
max_iterations = 5
learning_rate = 1
for _ in range(max_iterations):
@@ -535,7 +578,7 @@ if __name__ == "__main__":
# Run tests for all robot types
results = {}
for robot_type in ["koch", "so100", "moss"]:
for robot_type in ["koch", "so100", "moss", "so101"]:
results[robot_type] = run_test(robot_type)
# Print overall summary

View File

@@ -35,6 +35,8 @@ from tqdm import tqdm
from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
from lerobot.common.utils.utils import enter_pressed, move_cursor_up
import math
NameOrID: TypeAlias = str | int
Value: TypeAlias = int | float
@@ -42,7 +44,6 @@ MAX_ID_RANGE = 252
logger = logging.getLogger(__name__)
def get_ctrl_table(model_ctrl_table: dict[str, dict], model: str) -> dict[str, tuple[int, int]]:
ctrl_table = model_ctrl_table.get(model)
if ctrl_table is None:
@@ -795,12 +796,12 @@ class MotorsBus(abc.ABC):
norm = ((bounded_val - min_) / (max_ - min_)) * 100
normalized_values[id_] = 100 - norm if drive_mode else norm
elif self.motors[motor].norm_mode is MotorNormMode.DEGREE:
homing_offset = self.calibration[motor].homing_offset
resolution = self.model_resolution_table[self.motors[motor].model]
if drive_mode:
val *= -1
val += homing_offset
normalized_values[id_] = (
val / (self.model_resolution_table[self.motors[motor].model] // 2) * 180
)
middle_pos = homing_offset + resolution // 2
normalized_values[id_] = ((val - middle_pos) / (resolution // 2)) * 180
else:
# TODO(alibers): velocity and degree modes
raise NotImplementedError
@@ -830,12 +831,14 @@ class MotorsBus(abc.ABC):
unnormalized_values[id_] = int((bounded_val / 100) * (max_ - min_) + min_)
elif self.motors[motor].norm_mode is MotorNormMode.DEGREE:
homing_offset = self.calibration[motor].homing_offset
unnormalized_values[id_] = int(
val / 180 * (self.model_resolution_table[self.motors[motor].model] // 2)
)
unnormalized_values[id_] -= homing_offset
resolution = self.model_resolution_table[self.motors[motor].model]
middle_pos = homing_offset + resolution // 2
unnormalized_values[id_] = int((val / 180) * resolution//2) + middle_pos
if drive_mode:
unnormalized_values[id_] *= -1
if unnormalized_values[id_] < 0:
breakpoint()
else:
# TODO(aliberts): degree mode
raise NotImplementedError

View File

@@ -41,9 +41,13 @@ class SO100FollowerEndEffectorConfig(RobotConfig):
cameras: dict[str, CameraConfig] = field(default_factory=dict)
# Default bounds for the end-effector position (in meters)
bounds: Dict[str, List[float]] = field(
end_effector_bounds: Dict[str, List[float]] = field(
default_factory=lambda: {
"min": [0.1, -0.3, 0.0], # min x, y, z
"max": [0.4, 0.3, 0.4], # max x, y, z
"min": [-1.0, -1.0, -0.0], # min x, y, z
"max": [1.0, 1.0, 1.0], # max x, y, z
}
)
max_gripper_pos: float = 50
urdf_path: str = "/Users/michel_aractingi/code/lerobot/so101_new_calib.urdf"

View File

@@ -59,13 +59,13 @@ class SO100FollowerEndEffector(SO100Follower):
self.config = config
# Initialize the kinematics module for the so100 robot
self.kinematics = RobotKinematics(robot_type="so100")
self.kinematics = RobotKinematics(robot_type="so101")
# Set the forward kinematics function
self.fk_function = self.kinematics.fk_gripper_tip
# Store the bounds for end-effector position
self.bounds = self.config.bounds
self.end_effector_bounds = self.config.end_effector_bounds
# Store the joint mins and maxs
self.joint_mins = None
@@ -115,13 +115,16 @@ class SO100FollowerEndEffector(SO100Follower):
)
action = np.zeros(4, dtype=np.float32)
self.bus.sync_write("Torque_Enable", 0)
# Read current joint positions
current_joint_pos = self.bus.sync_read("Present_Position")
# Convert dict to ordered list without gripper
joint_names = ["shoulder_pan", "shoulder_lift", "elbow_flex", "wrist_flex", "wrist_roll", "gripper"]
# Convert the joint positions from min-max to degrees
current_joint_pos = np.array([current_joint_pos[name] for name in joint_names])
print(current_joint_pos)
# Calculate current end-effector position using forward kinematics
current_ee_pos = self.fk_function(current_joint_pos)
@@ -132,11 +135,11 @@ class SO100FollowerEndEffector(SO100Follower):
# Add delta to position and clip to bounds
desired_ee_pos[:3, 3] = current_ee_pos[:3, 3] + action[:3]
if self.bounds is not None:
if self.end_effector_bounds is not None:
desired_ee_pos[:3, 3] = np.clip(
desired_ee_pos[:3, 3],
self.bounds["min"],
self.bounds["max"],
self.end_effector_bounds["min"],
self.end_effector_bounds["max"],
)
# Compute inverse kinematics to get joint positions
@@ -154,14 +157,10 @@ class SO100FollowerEndEffector(SO100Follower):
}
# Handle gripper separately if included in action
if "gripper.pos" in action:
joint_action["gripper.pos"] = action["gripper.pos"]
else:
# Keep current gripper position
joint_action["gripper.pos"] = current_joint_pos[-1]
import time
time.sleep(0.001)
joint_action["gripper.pos"] = np.clip(
current_joint_pos[-1] + (action[-1] - 1) * self.config.max_gripper_pos,
0,
self.config.max_gripper_pos,
)
# Send joint space action to parent class
return super().send_action(joint_action)

View File

@@ -25,8 +25,8 @@ class GamepadTeleopConfig(TeleoperatorConfig):
# TODO(Steven): Consider setting in here the keys that we want to capture/listen
mock: bool = False
x_step_size: float = 0.05
y_step_size: float = 0.05
z_step_size: float = 0.05
x_step_size: float = 0.1
y_step_size: float = 0.1
z_step_size: float = 0.1
use_gripper: bool = True

View File

@@ -122,7 +122,8 @@ class GamepadTeleop(Teleoperator):
"delta_z": gamepad_action[2],
}
gripper_action = None
# Default gripper action is to stay
gripper_action = GripperAction.STAY.value
if self.config.use_gripper:
gripper_command = self.gamepad.gripper_command()
gripper_action = gripper_action_map[gripper_command]

View File

@@ -14,122 +14,93 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
"""
Simple script to control a robot from teleoperation.
Example:
```shell
python -m lerobot.scripts.server.find_joint_limits \
--robot.type=so100_follower \
--robot.port=/dev/tty.usbmodem58760431541 \
--robot.id=black \
--teleop.type=so100_leader \
--teleop.port=/dev/tty.usbmodem58760431551 \
--teleop.id=blue
```
"""
import time
import cv2
import numpy as np
import draccus
from lerobot.common.robot_devices.control_utils import is_headless
from lerobot.common.robot_devices.robots.configs import RobotConfig
from lerobot.common.robot_devices.robots.utils import make_robot_from_config
from lerobot.configs import parser
from lerobot.scripts.server.kinematics import RobotKinematics
from lerobot.common.robots import ( # noqa: F401
RobotConfig,
koch_follower,
make_robot_from_config,
so100_follower,
so100_follower_end_effector,
)
from lerobot.common.teleoperators import (
TeleoperatorConfig,
make_teleoperator_from_config,
)
from dataclasses import dataclass
from lerobot.common.teleoperators import gamepad, koch_leader, so100_leader # noqa: F401
from lerobot.common.model.kinematics import RobotKinematics
def find_joint_bounds(
robot,
control_time_s=30,
display_cameras=False,
):
if not robot.is_connected:
robot.connect()
@dataclass
class FindJointLimitsConfig:
teleop: TeleoperatorConfig
robot: RobotConfig
# Limit the maximum frames per second. By default, no limit.
fps: int | None = None
teleop_time_s: float | None = None
# Display all cameras on screen
display_data: bool = False
start_episode_t = time.perf_counter()
pos_list = []
while True:
observation, action = robot.teleop_step(record_data=True)
@draccus.wrap()
def find_joint_and_ee_bounds(cfg: FindJointLimitsConfig):
teleop = make_teleoperator_from_config(cfg.teleop)
robot = make_robot_from_config(cfg.robot)
# Wait for 5 seconds to stabilize the robot initial position
if time.perf_counter() - start_episode_t < 5:
continue
pos_list.append(robot.follower_arms["main"].read("Present_Position"))
if display_cameras and not is_headless():
image_keys = [key for key in observation if "image" in key]
for key in image_keys:
cv2.imshow(key, cv2.cvtColor(observation[key].numpy(), cv2.COLOR_RGB2BGR))
cv2.waitKey(1)
if time.perf_counter() - start_episode_t > control_time_s:
max = np.max(np.stack(pos_list), 0)
min = np.min(np.stack(pos_list), 0)
print(f"Max angle position per joint {max}")
print(f"Min angle position per joint {min}")
break
def find_ee_bounds(
robot,
control_time_s=30,
display_cameras=False,
):
if not robot.is_connected:
robot.connect()
teleop.connect()
robot.connect()
start_episode_t = time.perf_counter()
ee_list = []
pos_list = []
robot_type = cfg.robot.type.split("_")[0]
kinematics = RobotKinematics(robot_type)
control_time_s = 30
while True:
observation, action = robot.teleop_step(record_data=True)
action = teleop.get_action()
robot.send_action(action)
observation = robot.get_observation()
# Wait for 5 seconds to stabilize the robot initial position
if time.perf_counter() - start_episode_t < 5:
continue
kinematics = RobotKinematics(robot.robot_type)
joint_positions = robot.follower_arms["main"].read("Present_Position")
print(f"Joint positions: {joint_positions}")
joint_positions = robot.bus.sync_read("Present_Position")
joint_positions = np.array([joint_positions[key] for key in joint_positions.keys()])
ee_list.append(kinematics.fk_gripper_tip(joint_positions)[:3, 3])
if display_cameras and not is_headless():
image_keys = [key for key in observation if "image" in key]
for key in image_keys:
cv2.imshow(key, cv2.cvtColor(observation[key].numpy(), cv2.COLOR_RGB2BGR))
cv2.waitKey(1)
pos_list.append(joint_positions)
if time.perf_counter() - start_episode_t > control_time_s:
max = np.max(np.stack(ee_list), 0)
min = np.min(np.stack(ee_list), 0)
print(f"Max ee position {max}")
print(f"Min ee position {min}")
max_ee = np.max(np.stack(ee_list), 0)
min_ee = np.min(np.stack(ee_list), 0)
max_pos = np.max(np.stack(pos_list), 0)
min_pos = np.min(np.stack(pos_list), 0)
print(f"Max ee position {max_ee}")
print(f"Min ee position {min_ee}")
print(f"Max joint pos position {max_pos}")
print(f"Min joint pos position {min_pos}")
break
if __name__ == "__main__":
# Create argparse for script-specific arguments
parser = argparse.ArgumentParser(add_help=False) # Set add_help=False to avoid conflict
parser.add_argument(
"--mode",
type=str,
default="joint",
choices=["joint", "ee"],
help="Mode to run the script in. Can be 'joint' or 'ee'.",
)
parser.add_argument(
"--control-time-s",
type=int,
default=30,
help="Time step to use for control.",
)
parser.add_argument(
"--robot-type",
type=str,
default="so100",
help="Robot type (so100, koch, aloha, etc.)",
)
# Only parse known args, leaving robot config args for Hydra if used
args = parser.parse_args()
# Create robot with the appropriate config
robot_config = RobotConfig.get_choice_class(args.robot_type)(mock=False)
robot = make_robot_from_config(robot_config)
if args.mode == "joint":
find_joint_bounds(robot, args.control_time_s)
elif args.mode == "ee":
find_ee_bounds(robot, args.control_time_s)
if robot.is_connected:
robot.disconnect()
find_joint_and_ee_bounds()

View File

@@ -28,19 +28,32 @@ import torchvision.transforms.functional as F # noqa: N812
from lerobot.common.envs.configs import EnvConfig
from lerobot.common.envs.utils import preprocess_observation
from lerobot.common.robot_devices.control_utils import (
busy_wait,
is_headless,
reset_follower_position,
)
from lerobot.common.robot_devices.robots.utils import make_robot_from_config
from lerobot.common.utils.robot_utils import busy_wait
from lerobot.common.robots import RobotConfig, make_robot_from_config, so100_follower_end_effector # noqa: F401
from lerobot.common.cameras import intel, opencv # noqa: F401
from lerobot.common.utils.utils import log_say
from lerobot.configs import parser
from lerobot.scripts.server.kinematics import RobotKinematics
from lerobot.common.model.kinematics import RobotKinematics
from lerobot.common.teleoperators.gamepad.teleop_gamepad import GamepadTeleop
from lerobot.common.teleoperators.gamepad.configuration_gamepad import GamepadTeleopConfig
from lerobot.common.teleoperators import make_teleoperator_from_config
from lerobot.common.teleoperators import gamepad #noqa: F401
logging.basicConfig(level=logging.INFO)
MAX_GRIPPER_COMMAND = 30
def reset_follower_position(robot_arm, target_position):
current_position_dict = robot_arm.bus.sync_read("Present_Position")
current_position = np.array([current_position_dict[name] for name in current_position_dict.keys()], dtype=np.float32)
trajectory = torch.from_numpy(
np.linspace(current_position, target_position, 50)
) # NOTE: 30 is just an arbitrary number
for pose in trajectory:
action_dict = {name: pose for name, pose in zip(current_position_dict.keys(), pose)}
robot_arm.bus.sync_write("Goal_Position", action_dict)
busy_wait(0.015)
class TorchBox(gym.spaces.Box):
"""
@@ -207,10 +220,16 @@ class RobotEnv(gym.Env):
self.current_step = 0
self.episode_data = None
self.current_joint_positions = self.robot.follower_arms["main"].read("Present_Position")
# Read initial joint positions using the bus
self.current_joint_positions = self._get_observation()
self._setup_spaces()
def _get_observation(self) -> np.ndarray:
"""Helper to convert a dictionary from bus.sync_read to an ordered numpy array."""
joint_positions_dict = self.robot.bus.sync_read("Present_Position")
return np.array([joint_positions_dict[name] for name in joint_positions_dict.keys()], dtype=np.float32)
def _setup_spaces(self):
"""
Dynamically configure the observation and action spaces based on the robot's capabilities.
@@ -223,7 +242,7 @@ class RobotEnv(gym.Env):
- The action space is defined as a Box space representing joint position commands. It is defined as relative (delta)
or absolute, based on the configuration.
"""
example_obs = self.robot.capture_observation()
example_obs = self._get_observation()
# Define observation spaces for images and other states.
image_keys = [key for key in example_obs if "image" in key]
@@ -241,7 +260,7 @@ class RobotEnv(gym.Env):
self.observation_space = gym.spaces.Dict(observation_spaces)
# Define the action space for joint positions along with setting an intervention flag.
action_dim = len(self.robot.follower_arms["main"].read("Present_Position"))
action_dim = len(self.robot.action_features["shape"])
bounds = {}
bounds["min"] = np.ones(action_dim) * -1000
bounds["max"] = np.ones(action_dim) * 1000
@@ -270,7 +289,7 @@ class RobotEnv(gym.Env):
super().reset(seed=seed, options=options)
# Capture the initial observation.
observation = self.robot.capture_observation()
observation = self._get_observation()
# Reset episode tracking variables.
self.current_step = 0
@@ -296,10 +315,10 @@ class RobotEnv(gym.Env):
- truncated (bool): True if the episode was truncated (e.g., time constraints).
- info (dict): Additional debugging information including intervention status.
"""
self.current_joint_positions = self.robot.follower_arms["main"].read("Present_Position")
self.current_joint_positions = self._get_observation()
self.robot.send_action(torch.from_numpy(action))
observation = self.robot.capture_observation()
observation = self._get_observation()
if self.display_cameras:
self.render()
@@ -324,7 +343,7 @@ class RobotEnv(gym.Env):
"""
import cv2
observation = self.robot.capture_observation()
observation = self._get_observation()
image_keys = [key for key in observation if "image" in key]
for key in image_keys:
@@ -447,11 +466,11 @@ class AddCurrentToObservation(gym.ObservationWrapper):
Returns:
The modified observation with current values.
"""
present_current = (
self.unwrapped.robot.follower_arms["main"].read("Present_Current").astype(np.float32)
)
present_current_dict = self.unwrapped.robot.bus.sync_read("Present_Current")
present_current_observation = torch.tensor([present_current_dict[name] for name in present_current_dict.keys()], dtype=np.float32)
observation["observation.state"] = torch.cat(
[observation["observation.state"], torch.from_numpy(present_current)], dim=-1
[observation["observation.state"], present_current_observation], dim=-1
)
return observation
@@ -778,14 +797,14 @@ class ResetWrapper(gym.Wrapper):
start_time = time.perf_counter()
if self.reset_pose is not None:
log_say("Reset the environment.", play_sounds=True)
reset_follower_position(self.robot.follower_arms["main"], self.reset_pose)
reset_follower_position(self.unwrapped.robot, self.reset_pose)
log_say("Reset the environment done.", play_sounds=True)
if len(self.robot.leader_arms) > 0:
self.robot.leader_arms["main"].write("Torque_Enable", 1)
log_say("Reset the leader robot.", play_sounds=True)
reset_follower_position(self.robot.leader_arms["main"], self.reset_pose)
log_say("Reset the leader robot done.", play_sounds=True)
# if len(self.robot.leader_arms) > 0:
# self.robot.leader_arms["main"].write("Torque_Enable", 1)
# log_say("Reset the leader robot.", play_sounds=True)
# reset_follower_position(self.robot.leader_arms["main"], self.reset_pose)
# log_say("Reset the leader robot done.", play_sounds=True)
else:
log_say(
f"Manually reset the environment for {self.reset_time_s} seconds.",
@@ -890,7 +909,8 @@ class GripperPenaltyWrapper(gym.RewardWrapper):
Returns:
Tuple of (observation, reward, terminated, truncated, info) with penalty applied.
"""
self.last_gripper_state = self.unwrapped.robot.follower_arms["main"].read("Present_Position")[-1]
self.last_gripper_state = self.unwrapped.robot.bus.sync_read("Present_Position")["gripper"]
gripper_action = action[-1]
obs, reward, terminated, truncated, info = self.env.step(action)
gripper_penalty = self.reward(reward, gripper_action)
@@ -969,9 +989,11 @@ class GripperActionWrapper(gym.ActionWrapper):
np.sign(gripper_command) if abs(gripper_command) > self.quantization_threshold else 0.0
)
gripper_command = gripper_command * MAX_GRIPPER_COMMAND
gripper_state = self.unwrapped.robot.follower_arms["main"].read("Present_Position")[-1]
gripper_action = np.clip(gripper_state + gripper_command, 0, MAX_GRIPPER_COMMAND)
action[-1] = gripper_action.item()
gripper_state = self.unwrapped.robot.bus.sync_read("Present_Position")["gripper"]
gripper_action_value = np.clip(gripper_state + gripper_command, 0, MAX_GRIPPER_COMMAND)
action[-1] = gripper_action_value.item()
return action
def reset(self, **kwargs):
@@ -1056,7 +1078,8 @@ class EEActionWrapper(gym.ActionWrapper):
gripper_command = action[-1]
action = action[:-1]
current_joint_pos = self.unwrapped.robot.follower_arms["main"].read("Present_Position")
current_joint_pos = self.unwrapped._get_observation()
current_ee_pos = self.fk_function(current_joint_pos)
desired_ee_pos[:3, 3] = np.clip(
current_ee_pos[:3, 3] + action,
@@ -1118,7 +1141,8 @@ class EEObservationWrapper(gym.ObservationWrapper):
Returns:
Enhanced observation with end-effector pose information.
"""
current_joint_pos = self.unwrapped.robot.follower_arms["main"].read("Present_Position")
current_joint_pos = self.unwrapped._get_observation()
current_ee_pos = self.fk_function(current_joint_pos)
observation["observation.state"] = torch.cat(
[
@@ -1156,8 +1180,8 @@ class BaseLeaderControlWrapper(gym.Wrapper):
use_gripper: Whether to include gripper control.
"""
super().__init__(env)
self.robot_leader = env.unwrapped.robot.leader_arms["main"]
self.robot_follower = env.unwrapped.robot.follower_arms["main"]
self.robot_leader = env.unwrapped.teleop_device
self.robot_follower = env.unwrapped.robot
self.use_geared_leader_arm = use_geared_leader_arm
self.ee_action_space_params = ee_action_space_params
self.use_ee_action_space = ee_action_space_params is not None
@@ -1229,24 +1253,14 @@ class BaseLeaderControlWrapper(gym.Wrapper):
This method sets up keyboard event handling if not in headless mode.
"""
if is_headless():
logging.warning(
"Headless environment detected. On-screen cameras display and keyboard inputs will not be available."
)
return
try:
from pynput import keyboard
from pynput import keyboard
def on_press(key):
with self.event_lock:
self._handle_key_press(key, keyboard)
def on_press(key):
with self.event_lock:
self._handle_key_press(key, keyboard)
self.listener = keyboard.Listener(on_press=on_press)
self.listener.start()
except ImportError:
logging.warning("Could not import pynput. Keyboard interface will not be available.")
self.listener = None
self.listener = keyboard.Listener(on_press=on_press)
self.listener.start()
def _check_intervention(self):
"""
@@ -1273,8 +1287,11 @@ class BaseLeaderControlWrapper(gym.Wrapper):
self.robot_leader.write("Torque_Enable", 0)
self.leader_torque_enabled = False
leader_pos = self.robot_leader.read("Present_Position")
follower_pos = self.robot_follower.read("Present_Position")
leader_pos_dict = self.robot_leader.bus.sync_read("Present_Position")
follower_pos_dict = self.robot_follower.bus.sync_read("Present_Position")
leader_pos = np.array([leader_pos_dict[name] for name in leader_pos_dict.keys()], dtype=np.float32)
follower_pos = np.array([follower_pos_dict[name] for name in follower_pos_dict.keys()], dtype=np.float32)
# [:3, 3] Last column of the transformation matrix corresponds to the xyz translation
leader_ee = self.kinematics.fk_gripper_tip(leader_pos)[:3, 3]
@@ -1320,7 +1337,10 @@ class BaseLeaderControlWrapper(gym.Wrapper):
self.robot_leader.write("Torque_Enable", 1)
self.leader_torque_enabled = True
follower_pos = self.robot_follower.read("Present_Position")
follower_pos_dict = self.robot_follower.bus.sync_read("Present_Position")
follower_pos = np.array([follower_pos_dict[name] for name in follower_pos_dict.keys()], dtype=np.float32)
self.robot_leader.write("Goal_Position", follower_pos)
def step(self, action):
@@ -1385,7 +1405,6 @@ class BaseLeaderControlWrapper(gym.Wrapper):
self.listener.stop()
return self.env.close()
class GearedLeaderControlWrapper(BaseLeaderControlWrapper):
"""
Wrapper that enables manual intervention via keyboard.
@@ -1497,8 +1516,11 @@ class GearedLeaderAutomaticControlWrapper(BaseLeaderControlWrapper):
return False
# Get current positions
leader_positions = self.robot_leader.read("Present_Position")
follower_positions = self.robot_follower.read("Present_Position")
leader_positions_dict = self.robot_leader.bus.sync_read("Present_Position")
follower_positions_dict = self.robot_follower.bus.sync_read("Present_Position")
leader_positions = np.array([leader_positions_dict[name] for name in leader_positions_dict.keys()], dtype=np.float32)
follower_positions = np.array([follower_positions_dict[name] for name in follower_positions_dict.keys()], dtype=np.float32)
leader_ee = self.kinematics.fk_gripper_tip(leader_positions)[:3, 3]
follower_ee = self.kinematics.fk_gripper_tip(follower_positions)[:3, 3]
@@ -1568,52 +1590,35 @@ class GamepadControlWrapper(gym.Wrapper):
def __init__(
self,
env,
x_step_size=1.0,
y_step_size=1.0,
z_step_size=1.0,
use_gripper=False,
teleop_device, # Accepts an instantiated teleoperator
use_gripper=False, # This should align with teleop_device's config
auto_reset=False,
input_threshold=0.001,
):
"""
Initialize the gamepad controller wrapper.
Args:
env: The environment to wrap.
x_step_size: Base movement step size for X axis in meters.
y_step_size: Base movement step size for Y axis in meters.
z_step_size: Base movement step size for Z axis in meters.
use_gripper: Whether to include gripper control.
teleop_device: The instantiated teleoperation device (e.g., GamepadTeleop).
use_gripper: Whether to include gripper control (should match teleop_device.config.use_gripper).
auto_reset: Whether to auto reset the environment when episode ends.
input_threshold: Minimum movement delta to consider as active input.
"""
super().__init__(env)
# use HidApi for macos
if sys.platform == "darwin":
# NOTE: On macOS, pygame doesnt reliably detect input from some controllers so we fall back to hidapi
from lerobot.scripts.server.end_effector_control_utils import GamepadControllerHID
self.teleop_device = teleop_device
# Ensure the teleop_device is connected if it has a connect method
if hasattr(self.teleop_device, 'connect') and not self.teleop_device.is_connected:
self.teleop_device.connect()
# self.controller attribute is removed
self.controller = GamepadControllerHID(
x_step_size=x_step_size,
y_step_size=y_step_size,
z_step_size=z_step_size,
)
else:
from lerobot.scripts.server.end_effector_control_utils import GamepadController
self.controller = GamepadController(
x_step_size=x_step_size,
y_step_size=y_step_size,
z_step_size=z_step_size,
)
self.auto_reset = auto_reset
self.use_gripper = use_gripper
self.input_threshold = input_threshold
self.controller.start()
# use_gripper from args should ideally match teleop_device.config.use_gripper
# For now, we use the one passed, but it can lead to inconsistency if not set correctly from config
self.use_gripper = use_gripper
logging.info("Gamepad control wrapper initialized")
print("Gamepad controls:")
logging.info("Gamepad control wrapper initialized with provided teleop_device.")
print("Gamepad controls (managed by the provided teleop_device - specific button mappings might vary):")
print(" Left analog stick: Move in X-Y plane")
print(" Right analog stick: Move in Z axis (up/down)")
print(" X/Square button: End episode (FAILURE)")
@@ -1628,42 +1633,43 @@ class GamepadControlWrapper(gym.Wrapper):
Returns:
Tuple containing:
- is_active: Whether gamepad input is active
- action: The action derived from gamepad input
- is_active: Whether gamepad input is active (from teleop_device.gamepad.should_intervene())
- action: The action derived from gamepad input (from teleop_device.get_action())
- terminate_episode: Whether episode termination was requested
- success: Whether episode success was signaled
- rerecord_episode: Whether episode rerecording was requested
"""
# Update the controller to get fresh inputs
self.controller.update()
if not hasattr(self.teleop_device, 'gamepad') or self.teleop_device.gamepad is None:
raise AttributeError("teleop_device does not have a 'gamepad' attribute or it is None. Expected for GamepadControlWrapper.")
# Get movement deltas from the controller
delta_x, delta_y, delta_z = self.controller.get_deltas()
intervention_is_active = self.controller.should_intervene()
# Create action from gamepad input
gamepad_action = np.array([delta_x, delta_y, delta_z], dtype=np.float32)
if self.use_gripper:
gripper_command = self.controller.gripper_command()
if gripper_command == "open":
gamepad_action = np.concatenate([gamepad_action, [2.0]])
elif gripper_command == "close":
gamepad_action = np.concatenate([gamepad_action, [0.0]])
else:
gamepad_action = np.concatenate([gamepad_action, [1.0]])
# Check episode ending buttons
# We'll rely on controller.get_episode_end_status() which returns "success", "failure", or None
episode_end_status = self.controller.get_episode_end_status()
# Get status flags from the underlying gamepad controller within the teleop_device
self.teleop_device.gamepad.update() # Ensure gamepad state is fresh
intervention_is_active = self.teleop_device.gamepad.should_intervene()
episode_end_status = self.teleop_device.gamepad.get_episode_end_status()
terminate_episode = episode_end_status is not None
success = episode_end_status == "success"
rerecord_episode = episode_end_status == "rerecord_episode"
# Get the action dictionary from the teleop_device
action_dict = self.teleop_device.get_action()
# Convert action_dict to numpy array based on expected structure
# Order: delta_x, delta_y, delta_z, gripper (if use_gripper)
action_list = [action_dict['delta_x'], action_dict['delta_y'], action_dict['delta_z']]
if self.use_gripper:
# GamepadTeleop returns gripper action as 0 (close), 1 (stay), 2 (open)
# This needs to be consistent with what EEActionWrapper expects if it's used downstream
# EEActionWrapper for gripper typically expects 0.0 (closed) to 2.0 (open)
# For now, we pass the direct value from GamepadTeleop, ensure downstream compatibility.
gripper_val = action_dict.get('gripper', 1.0) # Default to 1.0 (stay) if not present
action_list.append(float(gripper_val))
gamepad_action_np = np.array(action_list, dtype=np.float32)
return (
intervention_is_active,
gamepad_action,
gamepad_action_np,
terminate_episode,
success,
rerecord_episode,
@@ -1693,10 +1699,10 @@ class GamepadControlWrapper(gym.Wrapper):
logging.info(f"Episode manually ended: {'SUCCESS' if success else 'FAILURE'}")
# Only override the action if gamepad is active
action = gamepad_action if is_intervention else action
action_to_step = gamepad_action if is_intervention else action
# Step the environment
obs, reward, terminated, truncated, info = self.env.step(action)
obs, reward, terminated, truncated, info = self.env.step(action_to_step)
# Add episode ending if requested via gamepad
terminated = terminated or truncated or terminate_episode
@@ -1709,7 +1715,14 @@ class GamepadControlWrapper(gym.Wrapper):
action = torch.from_numpy(action)
info["is_intervention"] = is_intervention
# The original `BaseLeaderControlWrapper` puts `action_intervention` in info.
# For Gamepad, if intervention, `gamepad_action` is the intervention.
# If not intervention, policy's action is `action`.
# For consistency, let's store the *human's* action if intervention occurred.
info["action_intervention"] = action
if is_intervention:
info["action_intervention"] = gamepad_action
info["rerecord_episode"] = rerecord_episode
# If episode ended, reset the state
@@ -1731,9 +1744,8 @@ class GamepadControlWrapper(gym.Wrapper):
Returns:
Result of closing the wrapped environment.
"""
# Stop the controller
if hasattr(self, "controller"):
self.controller.stop()
if hasattr(self.teleop_device, 'disconnect'):
self.teleop_device.disconnect()
# Call the parent close method
return self.env.close()
@@ -1792,9 +1804,9 @@ class GymHilObservationProcessorWrapper(gym.ObservationWrapper):
###########################################################
def make_robot_env(cfg) -> gym.vector.VectorEnv:
def make_robot_env(cfg: EnvConfig) -> gym.Env:
"""
Factory function to create a vectorized robot environment.
Factory function to create a robot environment.
This function builds a robot environment with all necessary wrappers
based on the provided configuration.
@@ -1803,8 +1815,7 @@ def make_robot_env(cfg) -> gym.vector.VectorEnv:
cfg: Configuration object containing environment parameters.
Returns:
A vectorized gym environment with all necessary wrappers applied.
A gym environment with all necessary wrappers applied.
"""
if cfg.type == "hil":
import gym_hil # noqa: F401
@@ -1824,24 +1835,60 @@ def make_robot_env(cfg) -> gym.vector.VectorEnv:
env = TorchActionWrapper(env=env, device=cfg.device)
return env
if not hasattr(cfg, 'robot') or not hasattr(cfg, 'teleop'):
raise ValueError("Configuration for 'gym_manipulator' must be HILSerlRobotEnvConfig with robot and teleop.")
if cfg.robot is None:
raise ValueError("RobotConfig (cfg.robot) must be provided for gym_manipulator environment.")
robot = make_robot_from_config(cfg.robot)
# Instantiate teleoperation device
teleop_device = None
if cfg.teleop is not None:
# Ensure the config is of the correct type for GamepadTeleop if that's what make_teleoperator expects
# or if we are instantiating directly.
# For GamepadTeleop, its config is LerobotGamepadTeleopConfig.
if cfg.teleop.type == "gamepad":
if not isinstance(cfg.teleop, GamepadTeleopConfig):
# If the main teleop is just a base TeleoperatorConfig with type="gamepad",
# we might need to create the specific LerobotGamepadTeleopConfig here,
# or ensure the user provides the specific one in the main YAML config.
# For now, assume cfg.teleop IS a LerobotGamepadTeleopConfig instance.
raise ValueError("cfg.teleop must be an instance of LerobotGamepadTeleopConfig when type is 'gamepad'.")
# Directly instantiate GamepadTeleop, as make_teleoperator might expect a different setup or registry.
# Or, if make_teleoperator is robust, it could handle it.
# Given we know it's GamepadTeleop from common.teleoperators for this path:
teleop_device = GamepadTeleop(config=cfg.teleop)
else:
# For other types, rely on make_teleoperator if it's generic enough
try:
teleop_device = make_teleoperator_from_config(cfg.teleop)
except Exception as e:
raise NotImplementedError(f"Teleop device type '{cfg.teleop.type}' not supported or failed instantiation with make_teleoperator_from_config: {e}")
teleop_device.connect()
# Create base environment
env = RobotEnv(
robot=robot,
display_cameras=cfg.wrapper.display_cameras,
display_cameras=cfg.wrapper.display_cameras if cfg.wrapper else False,
)
# Add observation and image processing
if cfg.wrapper.add_joint_velocity_to_observation:
env = AddJointVelocityToObservation(env=env, fps=cfg.fps)
if cfg.wrapper.add_current_to_observation:
env = AddCurrentToObservation(env=env)
if cfg.wrapper.add_ee_pose_to_observation:
env = EEObservationWrapper(env=env, ee_pose_limits=cfg.wrapper.ee_action_space_params.bounds)
if cfg.wrapper:
if cfg.wrapper.add_joint_velocity_to_observation:
env = AddJointVelocityToObservation(env=env, fps=cfg.fps)
if cfg.wrapper.add_current_to_observation:
env = AddCurrentToObservation(env=env)
if cfg.wrapper.add_ee_pose_to_observation:
if cfg.wrapper.ee_action_space_params is None or cfg.wrapper.ee_action_space_params.bounds is None:
raise ValueError("EEActionSpaceConfig with bounds must be provided for EEObservationWrapper.")
env = EEObservationWrapper(env=env, ee_pose_limits=cfg.wrapper.ee_action_space_params.bounds)
env = ConvertToLeRobotObservation(env=env, device=cfg.device)
if cfg.wrapper.crop_params_dict is not None:
if cfg.wrapper and cfg.wrapper.crop_params_dict is not None:
env = ImageCropResizeWrapper(
env=env,
crop_params_dict=cfg.wrapper.crop_params_dict,
@@ -1852,49 +1899,56 @@ def make_robot_env(cfg) -> gym.vector.VectorEnv:
reward_classifier = init_reward_classifier(cfg)
if reward_classifier is not None:
env = RewardWrapper(env=env, reward_classifier=reward_classifier, device=cfg.device)
env = TimeLimitWrapper(env=env, control_time_s=cfg.wrapper.control_time_s, fps=cfg.fps)
if cfg.wrapper.use_gripper:
env = GripperActionWrapper(env=env, quantization_threshold=cfg.wrapper.gripper_quantization_threshold)
if cfg.wrapper.gripper_penalty is not None:
env = GripperPenaltyWrapper(
if cfg.wrapper:
env = TimeLimitWrapper(env=env, control_time_s=cfg.wrapper.control_time_s, fps=cfg.fps)
if cfg.wrapper.use_gripper:
env = GripperActionWrapper(env=env, quantization_threshold=cfg.wrapper.gripper_quantization_threshold)
if cfg.wrapper.gripper_penalty is not None:
env = GripperPenaltyWrapper(
env=env,
penalty=cfg.wrapper.gripper_penalty,
)
# env = EEActionWrapper(
# env=env,
# ee_action_space_params=cfg.wrapper.ee_action_space_params,
# use_gripper=cfg.wrapper.use_gripper,
# )
# Control mode specific wrappers
control_mode = cfg.wrapper.ee_action_space_params.control_mode
if control_mode == "gamepad":
if teleop_device is None:
raise ValueError("A teleop_device must be instantiated for gamepad control mode.")
if not isinstance(teleop_device, GamepadTeleop):
raise ValueError(f"teleop_device must be an instance of GamepadTeleop for gamepad control mode, got {type(teleop_device)}.")
env = GamepadControlWrapper(
env=env,
penalty=cfg.wrapper.gripper_penalty,
teleop_device=teleop_device,
use_gripper=cfg.wrapper.use_gripper,
)
env = EEActionWrapper(
env=env,
ee_action_space_params=cfg.wrapper.ee_action_space_params,
use_gripper=cfg.wrapper.use_gripper,
)
if cfg.wrapper.ee_action_space_params.control_mode == "gamepad":
env = GamepadControlWrapper(
elif control_mode == "leader":
env = GearedLeaderControlWrapper(
env=env,
ee_action_space_params=cfg.wrapper.ee_action_space_params,
use_gripper=cfg.wrapper.use_gripper,
)
elif control_mode == "leader_automatic":
env = GearedLeaderAutomaticControlWrapper(
env=env,
ee_action_space_params=cfg.wrapper.ee_action_space_params,
use_gripper=cfg.wrapper.use_gripper,
)
elif control_mode not in ["gamepad", "leader", "leader_automatic"]: # Ensure there's a fallback or error for unsupported control modes
raise ValueError(f"Invalid control mode: {control_mode}")
env = ResetWrapper(
env=env,
x_step_size=cfg.wrapper.ee_action_space_params.x_step_size,
y_step_size=cfg.wrapper.ee_action_space_params.y_step_size,
z_step_size=cfg.wrapper.ee_action_space_params.z_step_size,
use_gripper=cfg.wrapper.use_gripper,
reset_pose=cfg.wrapper.fixed_reset_joint_positions,
reset_time_s=cfg.wrapper.reset_time_s,
)
elif cfg.wrapper.ee_action_space_params.control_mode == "leader":
env = GearedLeaderControlWrapper(
env=env,
ee_action_space_params=cfg.wrapper.ee_action_space_params,
use_gripper=cfg.wrapper.use_gripper,
)
elif cfg.wrapper.ee_action_space_params.control_mode == "leader_automatic":
env = GearedLeaderAutomaticControlWrapper(
env=env,
ee_action_space_params=cfg.wrapper.ee_action_space_params,
use_gripper=cfg.wrapper.use_gripper,
)
else:
raise ValueError(f"Invalid control mode: {cfg.wrapper.ee_action_space_params.control_mode}")
env = ResetWrapper(
env=env,
reset_pose=cfg.wrapper.fixed_reset_joint_positions,
reset_time_s=cfg.wrapper.reset_time_s,
)
env = BatchCompatibleWrapper(env=env)
env = TorchActionWrapper(env=env, device=cfg.device)
@@ -2131,8 +2185,7 @@ def replay_episode(env, cfg):
@parser.wrap()
def main(cfg: EnvConfig):
"""
Main entry point for the robot environment script.
""" Main entry point for the robot environment script.
This function runs the robot environment in one of several modes
based on the provided configuration.
@@ -2168,35 +2221,38 @@ def main(cfg: EnvConfig):
env.reset()
# Initialize the smoothed action as a random sample.
smoothed_action = env.action_space.sample()
# # Initialize the smoothed action as a random sample.
# smoothed_action = env.action_space.sample()
# Smoothing coefficient (alpha) defines how much of the new random sample to mix in.
# A value close to 0 makes the trajectory very smooth (slow to change), while a value close to 1 is less smooth.
alpha = 1.0
# # Smoothing coefficient (alpha) defines how much of the new random sample to mix in.
# # A value close to 0 makes the trajectory very smooth (slow to change), while a value close to 1 is less smooth.
# alpha = 1.0
num_episode = 0
successes = []
while num_episode < 10:
start_loop_s = time.perf_counter()
# Sample a new random action from the robot's action space.
new_random_action = env.action_space.sample()
# Update the smoothed action using an exponential moving average.
smoothed_action = alpha * new_random_action + (1 - alpha) * smoothed_action
# num_episode = 0
# successes = []
# while num_episode < 10:
# start_loop_s = time.perf_counter()
# # Sample a new random action from the robot's action space.
# new_random_action = env.action_space.sample()
# # Update the smoothed action using an exponential moving average.
# smoothed_action = alpha * new_random_action + (1 - alpha) * smoothed_action
# Execute the step: wrap the NumPy action in a torch tensor.
obs, reward, terminated, truncated, info = env.step(smoothed_action)
if terminated or truncated:
successes.append(reward)
env.reset()
num_episode += 1
# # Execute the step: wrap the NumPy action in a torch tensor.
# obs, reward, terminated, truncated, info = env.step(smoothed_action)
# if terminated or truncated:
# successes.append(reward)
# env.reset()
# num_episode += 1
dt_s = time.perf_counter() - start_loop_s
busy_wait(1 / cfg.fps - dt_s)
# dt_s = time.perf_counter() - start_loop_s
# busy_wait(1 / cfg.fps - dt_s)
logging.info(f"Success after 20 steps {successes}")
logging.info(f"success rate {sum(successes) / len(successes)}")
# logging.info(f"Success after 20 steps {successes}")
# logging.info(f"success rate {sum(successes) / len(successes)}")
if __name__ == "__main__":
main()