Update viperx & widowx

This commit is contained in:
Simon Alibert
2025-04-03 18:34:08 +02:00
parent e393af2d88
commit f7672e14c7
4 changed files with 173 additions and 194 deletions

View File

@@ -22,17 +22,4 @@ from ..config import TeleoperatorConfig
@TeleoperatorConfig.register_subclass("widowx")
@dataclass
class WidowXTeleopConfig(TeleoperatorConfig):
port: str # Port to connect to the teloperator
mock: bool = False
# /!\ FOR SAFETY, READ THIS /!\
# `max_relative_target` limits the magnitude of the relative positional target vector for safety purposes.
# Set this to a positive scalar to have the same value for all motors, or a list that is the same length as
# the number of motors in your follower arms.
# For Aloha, for every goal position request, motor rotations are capped at 5 degrees by default.
# When you feel more confident with teleoperation or running the policy, you can extend
# this safety limit and even removing it by setting it to `null`.
# Also, everything is expected to work safely out-of-the-box, but we highly advise to
# first try to teleoperate the grippers only (by commenting out the rest of the motors in this yaml),
# then to gradually add more motors (by uncommenting), until you can teleoperate both arms fully
max_relative_target: int | None = 5
port: str # Port to connect to the arm

View File

@@ -14,22 +14,22 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import logging
import time
import numpy as np
from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
from lerobot.common.motors import Motor, MotorNormMode, TorqueMode
from lerobot.common.motors import Motor, MotorCalibration, MotorNormMode
from lerobot.common.motors.dynamixel import (
DriveMode,
DynamixelMotorsBus,
run_arm_calibration,
OperatingMode,
)
from ..teleoperator import Teleoperator
from .configuration_widowx import WidowXTeleopConfig
logger = logging.getLogger(__name__)
class WidowXTeleop(Teleoperator):
"""
@@ -42,8 +42,6 @@ class WidowXTeleop(Teleoperator):
def __init__(self, config: WidowXTeleopConfig):
super().__init__(config)
self.config = config
self.robot_type = config.type
self.arm = DynamixelMotorsBus(
port=self.config.port,
motors={
@@ -59,9 +57,6 @@ class WidowXTeleop(Teleoperator):
},
)
self.is_connected = False
self.logs = {}
@property
def action_feature(self) -> dict:
return {
@@ -74,84 +69,85 @@ class WidowXTeleop(Teleoperator):
def feedback_feature(self) -> dict:
return {}
def _set_shadow_motors(self):
"""
Set secondary/shadow ID for shoulder and elbow. These joints have two motors.
As a result, if only one of them is required to move to a certain position,
the other will follow. This is to avoid breaking the motors.
"""
shoulder_idx = self.config.shoulder[0]
self.arm.write("Secondary_ID", shoulder_idx, "shoulder_shadow")
elbow_idx = self.config.elbow[0]
self.arm.write("Secondary_ID", elbow_idx, "elbow_shadow")
@property
def is_connected(self) -> bool:
return self.arm.is_connected
def connect(self):
if self.is_connected:
raise DeviceAlreadyConnectedError(
"ManipulatorRobot is already connected. Do not run `robot.connect()` twice."
)
raise DeviceAlreadyConnectedError(f"{self} already connected")
logging.info("Connecting arm.")
self.arm.connect()
if not self.is_calibrated:
self.calibrate()
# We assume that at connection time, arm is in a rest position,
# and torque can be safely disabled to run calibration.
self.arm.write("Torque_Enable", TorqueMode.DISABLED.value)
self.calibrate()
self._set_shadow_motors()
logging.info("Activating torque.")
self.arm.write("Torque_Enable", TorqueMode.ENABLED.value)
# Check arm can be read
self.arm.read("Present_Position")
# Connect the cameras
for cam in self.cameras.values():
cam.connect()
self.is_connected = True
self.configure()
logger.info(f"{self} connected.")
def calibrate(self) -> None:
"""After calibration all motors function in human interpretable ranges.
Rotations are expressed in degrees in nominal range of [-180, 180],
and linear motions (like gripper of Aloha) in nominal range of [0, 100].
"""
if self.calibration_fpath.exists():
with open(self.calibration_fpath) as f:
calibration = json.load(f)
else:
# TODO(rcadene): display a warning in __init__ if calibration file not available
logging.info(f"Missing calibration file '{self.calibration_fpath}'")
calibration = run_arm_calibration(self.arm, self.robot_type, self.name, "leader")
raise NotImplementedError # TODO(aliberts): adapt code below (copied from koch)
logger.info(f"\nRunning calibration of {self}")
self.arm.disable_torque()
for name in self.arm.names:
self.arm.write("Operating_Mode", name, OperatingMode.EXTENDED_POSITION.value)
logging.info(f"Calibration is done! Saving calibration file '{self.calibration_fpath}'")
self.calibration_fpath.parent.mkdir(parents=True, exist_ok=True)
with open(self.calibration_fpath, "w") as f:
json.dump(calibration, f)
self.arm.write("Drive_Mode", "elbow_flex", DriveMode.INVERTED.value)
drive_modes = {name: 1 if name == "elbow_flex" else 0 for name in self.arm.names}
self.arm.set_calibration(calibration)
input("Move robot to the middle of its range of motion and press ENTER....")
homing_offsets = self.arm.set_half_turn_homings()
def get_action(self) -> np.ndarray:
"""The returned action does not have a batch dimension."""
# Read arm position
before_read_t = time.perf_counter()
full_turn_motors = ["shoulder_pan", "wrist_roll"]
unknown_range_motors = [name for name in self.arm.names if name not in full_turn_motors]
logger.info(
f"Move all joints except {full_turn_motors} sequentially through their "
"entire ranges of motion.\nRecording positions. Press ENTER to stop..."
)
range_mins, range_maxes = self.arm.record_ranges_of_motion(unknown_range_motors)
for name in full_turn_motors:
range_mins[name] = 0
range_maxes[name] = 4095
self.calibration = {}
for name, motor in self.arm.motors.items():
self.calibration[name] = MotorCalibration(
id=motor.id,
drive_mode=drive_modes[name],
homing_offset=homing_offsets[name],
range_min=range_mins[name],
range_max=range_maxes[name],
)
self.arm.write_calibration(self.calibration)
self._save_calibration()
logger.info(f"Calibration saved to {self.calibration_fpath}")
def configure(self) -> None:
self.arm.disable_torque()
self.arm.configure_motors()
# Set secondary/shadow ID for shoulder and elbow. These joints have two motors.
# As a result, if only one of them is required to move to a certain position,
# the other will follow. This is to avoid breaking the motors.
self.arm.write("Secondary_ID", "shoulder_shadow", 2)
self.arm.write("Secondary_ID", "elbow_shadow", 4)
def get_action(self) -> dict[str, float]:
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
start = time.perf_counter()
action = self.arm.read("Present_Position")
self.logs["read_pos_dt_s"] = time.perf_counter() - before_read_t
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read action: {dt_ms:.1f}ms")
return action
def send_feedback(self, feedback: np.ndarray) -> None:
# TODO(rcadene, aliberts): Implement force feedback
pass
def send_feedback(self, feedback: dict[str, float]) -> None:
raise NotImplementedError
def disconnect(self) -> None:
if not self.is_connected:
raise DeviceNotConnectedError(
"ManipulatorRobot is not connected. You need to run `robot.connect()` before disconnecting."
)
raise DeviceNotConnectedError(f"{self} is not connected.")
self.arm.disconnect()
self.is_connected = False
logger.info(f"{self} disconnected.")