precomit nits

This commit is contained in:
Michel Aractingi
2025-05-24 00:20:15 +02:00
committed by AdilZouitine
parent 2f62e5496e
commit e044208534
5 changed files with 111 additions and 76 deletions

View File

@@ -1,4 +1,5 @@
from lerobot.common.model.kinematics_utils import forward_kinematics, inverse_kinematics, load_model
from .kinematics import RobotKinematics
from lerobot.common.model.kinematics_utils import load_model, forward_kinematics, inverse_kinematics
__all__ = ["RobotKinematics", "load_model", "forward_kinematics", "inverse_kinematics"]

View File

@@ -35,8 +35,6 @@ from tqdm import tqdm
from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
from lerobot.common.utils.utils import enter_pressed, move_cursor_up
import math
NameOrID: TypeAlias = str | int
Value: TypeAlias = int | float
@@ -44,6 +42,7 @@ MAX_ID_RANGE = 252
logger = logging.getLogger(__name__)
def get_ctrl_table(model_ctrl_table: dict[str, dict], model: str) -> dict[str, tuple[int, int]]:
ctrl_table = model_ctrl_table.get(model)
if ctrl_table is None:
@@ -833,7 +832,7 @@ class MotorsBus(abc.ABC):
homing_offset = self.calibration[motor].homing_offset
resolution = self.model_resolution_table[self.motors[motor].model]
middle_pos = homing_offset + resolution // 2
unnormalized_values[id_] = int((val / 180) * resolution//2) + middle_pos
unnormalized_values[id_] = int((val / 180) * resolution // 2) + middle_pos
if drive_mode:
unnormalized_values[id_] *= -1

View File

@@ -15,11 +15,12 @@
# limitations under the License.
import logging
import time
from typing import Any, Dict
import numpy as np
import time
from lerobot.common.cameras import make_cameras_from_configs
from lerobot.common.errors import DeviceNotConnectedError
from lerobot.common.model.kinematics import RobotKinematics
from lerobot.common.motors import Motor, MotorNormMode
@@ -27,7 +28,6 @@ from lerobot.common.motors.feetech import FeetechMotorsBus
from ..so100_follower import SO100Follower
from .config_so100_follower_end_effector import SO100FollowerEndEffectorConfig
from lerobot.common.cameras import make_cameras_from_configs
logger = logging.getLogger(__name__)
@@ -123,7 +123,6 @@ class SO100FollowerEndEffector(SO100Follower):
# Read current joint positions
current_joint_pos = self.bus.sync_read("Present_Position")
# Convert dict to ordered list without gripper
joint_names = ["shoulder_pan", "shoulder_lift", "elbow_flex", "wrist_flex", "wrist_roll", "gripper"]
# Convert the joint positions from min-max to degrees
@@ -168,7 +167,6 @@ class SO100FollowerEndEffector(SO100Follower):
)
# Send joint space action to parent class
return super().send_action(joint_action)
def get_observation(self) -> dict[str, Any]:
if not self.is_connected:

View File

@@ -31,10 +31,12 @@ python -m lerobot.scripts.server.find_joint_limits \
"""
import time
from dataclasses import dataclass
import numpy as np
import draccus
import numpy as np
from lerobot.common.model.kinematics import RobotKinematics
from lerobot.common.robots import ( # noqa: F401
RobotConfig,
koch_follower,
@@ -42,15 +44,14 @@ from lerobot.common.robots import ( # noqa: F401
so100_follower,
so100_follower_end_effector,
)
from lerobot.common.teleoperators import (
from lerobot.common.teleoperators import ( # noqa: F401
TeleoperatorConfig,
gamepad,
koch_leader,
make_teleoperator_from_config,
so100_leader,
)
from dataclasses import dataclass
from lerobot.common.teleoperators import gamepad, koch_leader, so100_leader # noqa: F401
from lerobot.common.model.kinematics import RobotKinematics
@dataclass
class FindJointLimitsConfig:
@@ -62,6 +63,7 @@ class FindJointLimitsConfig:
# Display all cameras on screen
display_data: bool = False
@draccus.wrap()
def find_joint_and_ee_bounds(cfg: FindJointLimitsConfig):
teleop = make_teleoperator_from_config(cfg.teleop)

View File

@@ -15,7 +15,6 @@
# limitations under the License.
import logging
import sys
import time
from collections import deque
from threading import Lock
@@ -26,35 +25,43 @@ import numpy as np
import torch
import torchvision.transforms.functional as F # noqa: N812
from lerobot.common.cameras import intel, opencv # noqa: F401
from lerobot.common.envs.configs import EnvConfig
from lerobot.common.envs.utils import preprocess_observation
from lerobot.common.utils.robot_utils import busy_wait
from lerobot.common.robots import RobotConfig, make_robot_from_config, so100_follower_end_effector # noqa: F401
from lerobot.common.cameras import intel, opencv # noqa: F401
from lerobot.common.model.kinematics import RobotKinematics
from lerobot.common.robots import ( # noqa: F401
RobotConfig,
make_robot_from_config,
so100_follower_end_effector,
)
from lerobot.common.teleoperators import (
gamepad, # noqa: F401
make_teleoperator_from_config,
)
from lerobot.common.teleoperators.gamepad.configuration_gamepad import GamepadTeleopConfig
from lerobot.common.teleoperators.gamepad.teleop_gamepad import GamepadTeleop
from lerobot.common.utils.robot_utils import busy_wait
from lerobot.common.utils.utils import log_say
from lerobot.configs import parser
from lerobot.common.model.kinematics import RobotKinematics
from lerobot.common.teleoperators.gamepad.teleop_gamepad import GamepadTeleop
from lerobot.common.teleoperators.gamepad.configuration_gamepad import GamepadTeleopConfig
from lerobot.common.teleoperators import make_teleoperator_from_config
from lerobot.common.teleoperators import gamepad #noqa: F401
logging.basicConfig(level=logging.INFO)
MAX_GRIPPER_COMMAND = 30
def reset_follower_position(robot_arm, target_position):
current_position_dict = robot_arm.bus.sync_read("Present_Position")
current_position = np.array([current_position_dict[name] for name in current_position_dict.keys()], dtype=np.float32)
current_position = np.array(
[current_position_dict[name] for name in current_position_dict.keys()], dtype=np.float32
)
trajectory = torch.from_numpy(
np.linspace(current_position, target_position, 50)
) # NOTE: 30 is just an arbitrary number
for pose in trajectory:
action_dict = {name: pose for name, pose in zip(current_position_dict.keys(), pose)}
action_dict = {name: pose for name, pose in zip(current_position_dict.keys(), pose, strict=False)}
robot_arm.bus.sync_write("Goal_Position", action_dict)
busy_wait(0.015)
class TorchBox(gym.spaces.Box):
"""
A version of gym.spaces.Box that handles PyTorch tensors.
@@ -226,14 +233,13 @@ class RobotEnv(gym.Env):
# Read initial joint positions using the bus
self.current_joint_positions = self._get_observation()["agent_pos"]
self._setup_spaces()
def _get_observation(self) -> np.ndarray:
"""Helper to convert a dictionary from bus.sync_read to an ordered numpy array."""
obs_dict = self.robot.get_observation()
joint_positions = np.array([obs_dict[name] for name in self._joint_names], dtype=np.float32)
images = {key: obs_dict[key] for key in self._image_keys}
return {"agent_pos": joint_positions, "pixels": images}
@@ -257,7 +263,9 @@ class RobotEnv(gym.Env):
if "pixels" in example_obs:
prefix = "observation.images" if len(example_obs["pixels"]) > 1 else "observation.image"
observation_spaces = {
f"{prefix}.{key}": gym.spaces.Box(low=0, high=255, shape=example_obs["pixels"][key].shape, dtype=np.uint8)
f"{prefix}.{key}": gym.spaces.Box(
low=0, high=255, shape=example_obs["pixels"][key].shape, dtype=np.uint8
)
for key in example_obs["pixels"]
}
@@ -476,7 +484,9 @@ class AddCurrentToObservation(gym.ObservationWrapper):
The modified observation with current values.
"""
present_current_observation = self.unwrapped._get_observation()["agent_pos"]
observation["agent_pos"] = np.concatenate([observation["agent_pos"], present_current_observation], axis=-1)
observation["agent_pos"] = np.concatenate(
[observation["agent_pos"], present_current_observation], axis=-1
)
return observation
@@ -746,7 +756,10 @@ class ConvertToLeRobotObservation(gym.ObservationWrapper):
The processed observation with normalized images and proper tensor formats.
"""
observation = preprocess_observation(observation)
observation = {key: observation[key].to(self.device, non_blocking=self.device.type == "cuda") for key in observation}
observation = {
key: observation[key].to(self.device, non_blocking=self.device.type == "cuda")
for key in observation
}
return observation
@@ -986,7 +999,7 @@ class GripperActionWrapper(gym.ActionWrapper):
np.sign(gripper_command) if abs(gripper_command) > self.quantization_threshold else 0.0
)
gripper_command = gripper_command * MAX_GRIPPER_COMMAND
gripper_state = self.unwrapped.robot.bus.sync_read("Present_Position")["gripper"]
gripper_action_value = np.clip(gripper_state + gripper_command, 0, MAX_GRIPPER_COMMAND)
@@ -1288,7 +1301,9 @@ class BaseLeaderControlWrapper(gym.Wrapper):
follower_pos_dict = self.robot_follower.bus.sync_read("Present_Position")
leader_pos = np.array([leader_pos_dict[name] for name in leader_pos_dict.keys()], dtype=np.float32)
follower_pos = np.array([follower_pos_dict[name] for name in follower_pos_dict.keys()], dtype=np.float32)
follower_pos = np.array(
[follower_pos_dict[name] for name in follower_pos_dict.keys()], dtype=np.float32
)
# [:3, 3] Last column of the transformation matrix corresponds to the xyz translation
leader_ee = self.kinematics.fk_gripper_tip(leader_pos)[:3, 3]
@@ -1334,9 +1349,10 @@ class BaseLeaderControlWrapper(gym.Wrapper):
self.robot_leader.write("Torque_Enable", 1)
self.leader_torque_enabled = True
follower_pos_dict = self.robot_follower.bus.sync_read("Present_Position")
follower_pos = np.array([follower_pos_dict[name] for name in follower_pos_dict.keys()], dtype=np.float32)
follower_pos = np.array(
[follower_pos_dict[name] for name in follower_pos_dict.keys()], dtype=np.float32
)
self.robot_leader.write("Goal_Position", follower_pos)
@@ -1402,6 +1418,7 @@ class BaseLeaderControlWrapper(gym.Wrapper):
self.listener.stop()
return self.env.close()
class GearedLeaderControlWrapper(BaseLeaderControlWrapper):
"""
Wrapper that enables manual intervention via keyboard.
@@ -1516,8 +1533,12 @@ class GearedLeaderAutomaticControlWrapper(BaseLeaderControlWrapper):
leader_positions_dict = self.robot_leader.bus.sync_read("Present_Position")
follower_positions_dict = self.robot_follower.bus.sync_read("Present_Position")
leader_positions = np.array([leader_positions_dict[name] for name in leader_positions_dict.keys()], dtype=np.float32)
follower_positions = np.array([follower_positions_dict[name] for name in follower_positions_dict.keys()], dtype=np.float32)
leader_positions = np.array(
[leader_positions_dict[name] for name in leader_positions_dict.keys()], dtype=np.float32
)
follower_positions = np.array(
[follower_positions_dict[name] for name in follower_positions_dict.keys()], dtype=np.float32
)
leader_ee = self.kinematics.fk_gripper_tip(leader_positions)[:3, 3]
follower_ee = self.kinematics.fk_gripper_tip(follower_positions)[:3, 3]
@@ -1587,8 +1608,8 @@ class GamepadControlWrapper(gym.Wrapper):
def __init__(
self,
env,
teleop_device, # Accepts an instantiated teleoperator
use_gripper=False, # This should align with teleop_device's config
teleop_device, # Accepts an instantiated teleoperator
use_gripper=False, # This should align with teleop_device's config
auto_reset=False,
):
"""
@@ -1604,18 +1625,20 @@ class GamepadControlWrapper(gym.Wrapper):
self.teleop_device = teleop_device
# Ensure the teleop_device is connected if it has a connect method
if hasattr(self.teleop_device, 'connect') and not self.teleop_device.is_connected:
if hasattr(self.teleop_device, "connect") and not self.teleop_device.is_connected:
self.teleop_device.connect()
# self.controller attribute is removed
self.auto_reset = auto_reset
# use_gripper from args should ideally match teleop_device.config.use_gripper
# For now, we use the one passed, but it can lead to inconsistency if not set correctly from config
self.use_gripper = use_gripper
self.use_gripper = use_gripper
logging.info("Gamepad control wrapper initialized with provided teleop_device.")
print("Gamepad controls (managed by the provided teleop_device - specific button mappings might vary):")
print(
"Gamepad controls (managed by the provided teleop_device - specific button mappings might vary):"
)
print(" Left analog stick: Move in X-Y plane")
print(" Right analog stick: Move in Z axis (up/down)")
print(" X/Square button: End episode (FAILURE)")
@@ -1636,32 +1659,34 @@ class GamepadControlWrapper(gym.Wrapper):
- success: Whether episode success was signaled
- rerecord_episode: Whether episode rerecording was requested
"""
if not hasattr(self.teleop_device, 'gamepad') or self.teleop_device.gamepad is None:
raise AttributeError("teleop_device does not have a 'gamepad' attribute or it is None. Expected for GamepadControlWrapper.")
if not hasattr(self.teleop_device, "gamepad") or self.teleop_device.gamepad is None:
raise AttributeError(
"teleop_device does not have a 'gamepad' attribute or it is None. Expected for GamepadControlWrapper."
)
# Get status flags from the underlying gamepad controller within the teleop_device
self.teleop_device.gamepad.update() # Ensure gamepad state is fresh
self.teleop_device.gamepad.update() # Ensure gamepad state is fresh
intervention_is_active = self.teleop_device.gamepad.should_intervene()
episode_end_status = self.teleop_device.gamepad.get_episode_end_status()
terminate_episode = episode_end_status is not None
success = episode_end_status == "success"
rerecord_episode = episode_end_status == "rerecord_episode"
# Get the action dictionary from the teleop_device
action_dict = self.teleop_device.get_action()
action_dict = self.teleop_device.get_action()
# Convert action_dict to numpy array based on expected structure
# Order: delta_x, delta_y, delta_z, gripper (if use_gripper)
action_list = [action_dict['delta_x'], action_dict['delta_y'], action_dict['delta_z']]
action_list = [action_dict["delta_x"], action_dict["delta_y"], action_dict["delta_z"]]
if self.use_gripper:
# GamepadTeleop returns gripper action as 0 (close), 1 (stay), 2 (open)
# This needs to be consistent with what EEActionWrapper expects if it's used downstream
# EEActionWrapper for gripper typically expects 0.0 (closed) to 2.0 (open)
# For now, we pass the direct value from GamepadTeleop, ensure downstream compatibility.
gripper_val = action_dict.get('gripper', 1.0) # Default to 1.0 (stay) if not present
action_list.append(float(gripper_val))
gripper_val = action_dict.get("gripper", 1.0) # Default to 1.0 (stay) if not present
action_list.append(float(gripper_val))
gamepad_action_np = np.array(action_list, dtype=np.float32)
return (
@@ -1741,7 +1766,7 @@ class GamepadControlWrapper(gym.Wrapper):
Returns:
Result of closing the wrapped environment.
"""
if hasattr(self.teleop_device, 'disconnect'):
if hasattr(self.teleop_device, "disconnect"):
self.teleop_device.disconnect()
# Call the parent close method
@@ -1832,8 +1857,10 @@ def make_robot_env(cfg: EnvConfig) -> gym.Env:
env = TorchActionWrapper(env=env, device=cfg.device)
return env
if not hasattr(cfg, 'robot') or not hasattr(cfg, 'teleop'):
raise ValueError("Configuration for 'gym_manipulator' must be HILSerlRobotEnvConfig with robot and teleop.")
if not hasattr(cfg, "robot") or not hasattr(cfg, "teleop"):
raise ValueError(
"Configuration for 'gym_manipulator' must be HILSerlRobotEnvConfig with robot and teleop."
)
if cfg.robot is None:
raise ValueError("RobotConfig (cfg.robot) must be provided for gym_manipulator environment.")
@@ -1847,11 +1874,13 @@ def make_robot_env(cfg: EnvConfig) -> gym.Env:
# For GamepadTeleop, its config is LerobotGamepadTeleopConfig.
if cfg.teleop.type == "gamepad":
if not isinstance(cfg.teleop, GamepadTeleopConfig):
# If the main teleop is just a base TeleoperatorConfig with type="gamepad",
# we might need to create the specific LerobotGamepadTeleopConfig here,
# or ensure the user provides the specific one in the main YAML config.
# For now, assume cfg.teleop IS a LerobotGamepadTeleopConfig instance.
raise ValueError("cfg.teleop must be an instance of LerobotGamepadTeleopConfig when type is 'gamepad'.")
# If the main teleop is just a base TeleoperatorConfig with type="gamepad",
# we might need to create the specific LerobotGamepadTeleopConfig here,
# or ensure the user provides the specific one in the main YAML config.
# For now, assume cfg.teleop IS a LerobotGamepadTeleopConfig instance.
raise ValueError(
"cfg.teleop must be an instance of LerobotGamepadTeleopConfig when type is 'gamepad'."
)
# Directly instantiate GamepadTeleop, as make_teleoperator might expect a different setup or registry.
# Or, if make_teleoperator is robust, it could handle it.
# Given we know it's GamepadTeleop from common.teleoperators for this path:
@@ -1861,9 +1890,10 @@ def make_robot_env(cfg: EnvConfig) -> gym.Env:
try:
teleop_device = make_teleoperator_from_config(cfg.teleop)
except Exception as e:
raise NotImplementedError(f"Teleop device type '{cfg.teleop.type}' not supported or failed instantiation with make_teleoperator_from_config: {e}")
raise NotImplementedError(
f"Teleop device type '{cfg.teleop.type}' not supported or failed instantiation with make_teleoperator_from_config: {e}"
)
teleop_device.connect()
# Create base environment
@@ -1894,11 +1924,13 @@ def make_robot_env(cfg: EnvConfig) -> gym.Env:
reward_classifier = init_reward_classifier(cfg)
if reward_classifier is not None:
env = RewardWrapper(env=env, reward_classifier=reward_classifier, device=cfg.device)
if cfg.wrapper:
env = TimeLimitWrapper(env=env, control_time_s=cfg.wrapper.control_time_s, fps=cfg.fps)
if cfg.wrapper.use_gripper:
env = GripperActionWrapper(env=env, quantization_threshold=cfg.wrapper.gripper_quantization_threshold)
env = GripperActionWrapper(
env=env, quantization_threshold=cfg.wrapper.gripper_quantization_threshold
)
if cfg.wrapper.gripper_penalty is not None:
env = GripperPenaltyWrapper(
env=env,
@@ -1915,13 +1947,15 @@ def make_robot_env(cfg: EnvConfig) -> gym.Env:
control_mode = cfg.wrapper.control_mode
if control_mode == "gamepad":
if teleop_device is None:
raise ValueError("A teleop_device must be instantiated for gamepad control mode.")
raise ValueError("A teleop_device must be instantiated for gamepad control mode.")
if not isinstance(teleop_device, GamepadTeleop):
raise ValueError(f"teleop_device must be an instance of GamepadTeleop for gamepad control mode, got {type(teleop_device)}.")
raise ValueError(
f"teleop_device must be an instance of GamepadTeleop for gamepad control mode, got {type(teleop_device)}."
)
env = GamepadControlWrapper(
env=env,
teleop_device=teleop_device,
use_gripper=cfg.wrapper.use_gripper,
use_gripper=cfg.wrapper.use_gripper,
)
elif control_mode == "leader":
env = GearedLeaderControlWrapper(
@@ -1935,9 +1969,13 @@ def make_robot_env(cfg: EnvConfig) -> gym.Env:
ee_action_space_params=cfg.wrapper.ee_action_space_params,
use_gripper=cfg.wrapper.use_gripper,
)
elif control_mode not in ["gamepad", "leader", "leader_automatic"]: # Ensure there's a fallback or error for unsupported control modes
raise ValueError(f"Invalid control mode: {control_mode}")
elif control_mode not in [
"gamepad",
"leader",
"leader_automatic",
]: # Ensure there's a fallback or error for unsupported control modes
raise ValueError(f"Invalid control mode: {control_mode}")
env = ResetWrapper(
env=env,
reset_pose=cfg.wrapper.fixed_reset_joint_positions,
@@ -2180,7 +2218,7 @@ def replay_episode(env, cfg):
@parser.wrap()
def main(cfg: EnvConfig):
""" Main entry point for the robot environment script.
"""Main entry point for the robot environment script.
This function runs the robot environment in one of several modes
based on the provided configuration.
@@ -2248,6 +2286,3 @@ def main(cfg: EnvConfig):
if __name__ == "__main__":
main()