From 2a0ab407765d981833fde34278a0360a8884f34c Mon Sep 17 00:00:00 2001 From: Simon Alibert Date: Thu, 15 May 2025 11:12:04 +0200 Subject: [PATCH] Update glove --- .../homonculus/config_homonculus.py | 2 - .../homonculus/homonculus_glove.py | 323 +++++++----------- 2 files changed, 123 insertions(+), 202 deletions(-) diff --git a/lerobot/common/teleoperators/homonculus/config_homonculus.py b/lerobot/common/teleoperators/homonculus/config_homonculus.py index 55c0e1ed..be511325 100644 --- a/lerobot/common/teleoperators/homonculus/config_homonculus.py +++ b/lerobot/common/teleoperators/homonculus/config_homonculus.py @@ -24,7 +24,6 @@ from ..config import TeleoperatorConfig class HomonculusGloveConfig(TeleoperatorConfig): port: str # Port to connect to the glove baud_rate: int = 115_200 - buffer_size: int = 10 # Number of past values to keep in memory @TeleoperatorConfig.register_subclass("homonculus_arm") @@ -32,4 +31,3 @@ class HomonculusGloveConfig(TeleoperatorConfig): class HomonculusArmConfig(TeleoperatorConfig): port: str # Port to connect to the arm baud_rate: int = 115_200 - buffer_size: int = 10 # Number of past values to keep in memory diff --git a/lerobot/common/teleoperators/homonculus/homonculus_glove.py b/lerobot/common/teleoperators/homonculus/homonculus_glove.py index 4af7a57e..ece34bf2 100644 --- a/lerobot/common/teleoperators/homonculus/homonculus_glove.py +++ b/lerobot/common/teleoperators/homonculus/homonculus_glove.py @@ -15,17 +15,14 @@ # limitations under the License. import logging -import os -import pickle # nosec import threading import time -from collections import deque from enum import Enum -import numpy as np import serial from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError +from lerobot.common.motors import MotorCalibration from ..teleoperator import Teleoperator from .config_homonculus import HomonculusGloveConfig @@ -44,6 +41,10 @@ class CalibrationMode(Enum): class HomonculusGlove(Teleoperator): + """ + HomonculusGlove designed by Hugging Face. + """ + config_class = HomonculusGloveConfig name = "homonculus_glove" @@ -51,7 +52,6 @@ class HomonculusGlove(Teleoperator): super().__init__(config) self.config = config self.serial = serial.Serial(config.port, config.baud_rate, timeout=1) - self.buffer_size = config.buffer_size self.joints = [ "thumb_0", @@ -70,22 +70,17 @@ class HomonculusGlove(Teleoperator): "pinky_0", "pinky_1", "pinky_2", - "battery_voltage", # TODO(aliberts): Should this be in joints? ] - # Initialize a buffer (deque) for each joint - self.joints_buffer = {joint: deque(maxlen=self.buffer_size) for joint in self.joints} - # Last read dictionary - self.last_d = dict.fromkeys(self.joints, 100) - - self.calibration = None - self.thread = threading.Thread(target=self.async_read, daemon=True) + self._state = dict.fromkeys(self.joints, 100) + self.thread = threading.Thread(target=self._async_read, daemon=True, name=f"{self} _async_read") + self._lock = threading.Lock() @property - def action_feature(self) -> dict: + def action_features(self) -> dict: return {f"{joint}.pos": float for joint in self.joints} @property - def feedback_feature(self) -> dict: + def feedback_features(self) -> dict: return {} @property @@ -99,183 +94,74 @@ class HomonculusGlove(Teleoperator): if not self.serial.is_open: self.serial.open() self.thread.start() - self.configure() logger.info(f"{self} connected.") @property def is_calibrated(self) -> bool: - raise NotImplementedError # TODO + return self.calibration_fpath.is_file() def calibrate(self) -> None: - raise NotImplementedError # TODO - - def configure(self) -> None: - raise NotImplementedError # TODO - - def get_action(self) -> dict[str, float]: - raise NotImplementedError # TODO - - def send_feedback(self, feedback: dict[str, float]) -> None: - raise NotImplementedError - - def disconnect(self) -> None: - if not self.is_connected: - DeviceNotConnectedError(f"{self} is not connected.") - - self.thread.join() - self.serial.close() - logger.info(f"{self} disconnected.") - - ### WIP below ### - - @property - def joint_names(self): - return list(self.last_d.keys()) - - def read(self, motor_names: list[str] | None = None): - """ - Return the most recent (single) values from self.last_d, - optionally applying calibration. - """ - if motor_names is None: - motor_names = self.joint_names - - # Get raw (last) values - values = np.array([self.last_d[k] for k in motor_names]) - - # Apply calibration if available - if self.calibration is not None: - values = self.apply_calibration(values, motor_names) - print(values) - return values - - def read_running_average(self, motor_names: list[str] | None = None, linearize=False): - """ - Return the AVERAGE of the most recent self.buffer_size (or fewer, if not enough data) readings - for each joint, optionally applying calibration. - """ - if motor_names is None: - motor_names = self.joint_names - - # Gather averaged readings from buffers - smoothed_vals = [] - for name in motor_names: - buf = self.joints_buffer[name] - if len(buf) == 0: - # If no data has been read yet, fall back to last_d - smoothed_vals.append(self.last_d[name]) - else: - # Otherwise, average over the existing buffer - smoothed_vals.append(np.mean(buf)) - - smoothed_vals = np.array(smoothed_vals, dtype=np.float32) - - # Apply calibration if available - if self.calibration is not None: - smoothed_vals = self.apply_calibration(smoothed_vals, motor_names) - - return smoothed_vals - - def async_read(self): - """ - Continuously read from the serial buffer in its own thread, - store into `self.last_d` and also append to the rolling buffer (joint_buffer). - """ - while True: - if self.serial.in_waiting > 0: - self.serial.flush() - vals = self.serial.readline().decode("utf-8").strip() - vals = vals.split(" ") - if len(vals) != 17: - continue - - self.last_d = { - joint: int(val) for joint, val in zip(self.joints, self.joint_names, strict=True) - } - - # Also push these new values into the rolling buffers - for joint, val in self.last_d.items(): - self.joints_buffer[joint].append(val) - - def run_calibration(self): - print("\nMove hand to open position") - input("Press Enter to continue...") - open_pos_list = [] + input("\nMove hand to open position. Press Enter to continue...") + open_pos_start = self._read(normalize=False) + open_pos_mins = open_pos_start.copy() + open_pos_maxes = open_pos_start.copy() for _ in range(100): - open_pos = self.read() - open_pos_list.append(open_pos) + positions = self._read(normalize=False) + open_pos_mins = {joint: min(positions[joint], min_) for joint, min_ in open_pos_mins.items()} + open_pos_maxes = {joint: max(positions[joint], max_) for joint, max_ in open_pos_maxes.items()} time.sleep(0.01) - open_pos = np.array(open_pos_list) - max_open_pos = open_pos.max(axis=0) - min_open_pos = open_pos.min(axis=0) - print(f"{max_open_pos=}") - print(f"{min_open_pos=}") + print(f"{open_pos_mins=}") + print(f"{open_pos_maxes=}") print("\nMove hand to closed position") input("Press Enter to continue...") - closed_pos_list = [] + closed_pos_start = self._read(normalize=False) + closed_pos_mins = closed_pos_start.copy() + closed_pos_maxes = closed_pos_start.copy() for _ in range(100): - closed_pos = self.read() - closed_pos_list.append(closed_pos) + positions = self._read(normalize=False) + closed_pos_mins = {joint: min(positions[joint], min_) for joint, min_ in closed_pos_mins.items()} + closed_pos_maxes = { + joint: max(positions[joint], max_) for joint, max_ in closed_pos_maxes.items() + } time.sleep(0.01) - closed_pos = np.array(closed_pos_list) - max_closed_pos = closed_pos.max(axis=0) - closed_pos[closed_pos < 1000] = 60000 - min_closed_pos = closed_pos.min(axis=0) - print(f"{max_closed_pos=}") - print(f"{min_closed_pos=}") + open_pos = {joint: min(open_pos_maxes[joint], closed_pos_maxes[joint]) for joint in self.joints} + closed_pos = {joint: min(closed_pos_maxes[joint], closed_pos_maxes[joint]) for joint in self.joints} - open_pos = np.array([max_open_pos, max_closed_pos]).max(axis=0) - closed_pos = np.array([min_open_pos, min_closed_pos]).min(axis=0) + # Due to magnet placement on the following joints, we need to inverse their min/max + inverted_joints = [ + "thumb_0", + "thumb_3", + "index_2", + "middle_2", + "ring_2", + "pinky_2", + "index_0", + ] + for joint in inverted_joints: + tmp_pos = open_pos[joint] + open_pos[joint] = closed_pos[joint] + closed_pos[joint] = tmp_pos - # INVERSION - for i, jname in enumerate(self.joint_names): - if jname in [ - "thumb_0", - "thumb_3", - "index_2", - "middle_2", - "ring_2", - "pinky_2", - "index_0", - ]: - tmp_pos = open_pos[i] - open_pos[i] = closed_pos[i] - closed_pos[i] = tmp_pos + self.calibration = {} + for id_, joint in enumerate(self.joints): + self.calibration[joint] = MotorCalibration( + id=id_, + drive_mode=1 if joint in inverted_joints else 0, + homing_offset=0, + range_min=open_pos[joint], + range_max=closed_pos[joint], + ) - print() - print(f"{open_pos=}") - print(f"{closed_pos=}") + self._save_calibration() + print("Calibration saved to", self.calibration_fpath) - homing_offset = [0] * len(self.joint_names) - drive_mode = [0] * len(self.joint_names) - calib_modes = [CalibrationMode.LINEAR.name] * len(self.joint_names) + def configure(self) -> None: + pass - calib_dict = { - "homing_offset": homing_offset, - "drive_mode": drive_mode, - "start_pos": open_pos, - "end_pos": closed_pos, - "calib_mode": calib_modes, - "motor_names": self.joint_names, - } - - file_path = "examples/hopejr/settings/hand_calib.pkl" - - if not os.path.exists(file_path): - with open(file_path, "wb") as f: - pickle.dump(calib_dict, f) # TODO(aliberts): use json - print(f"Dictionary saved to {file_path}") - - # return calib_dict - self.set_calibration(calib_dict) - - def set_calibration(self, calibration: dict[str, list]): - self.calibration = calibration - - def apply_calibration(self, values: np.ndarray | list, motor_names: list[str] | None): + def _normalize(self, values: dict[str, int], joints: list[str] | None): """Convert from unsigned int32 joint position range [0, 2**32[ to the universal float32 nominal degree range ]-180.0, 180.0[ with a "zero position" at 0 degree. @@ -289,37 +175,74 @@ class HomonculusGlove(Teleoperator): To harmonize between motors of the same model, different robots, or even models of different brands, we propose to work in the centered nominal degree range ]-180, 180[. """ - if motor_names is None: - motor_names = self.motor_names + # if joints is None: + # joints = self.motor_names - # Convert from unsigned int32 original range [0, 2**32] to signed float32 range - values = values.astype(np.float32) + # # Convert from unsigned int32 original range [0, 2**32] to signed float32 range + # values = values.astype(np.float32) - for i, name in enumerate(motor_names): - calib_idx = self.calibration["motor_names"].index(name) - calib_mode = self.calibration["calib_mode"][calib_idx] + if not self.calibration: + raise RuntimeError(f"{self} has no calibration registered.") - if CalibrationMode[calib_mode] == CalibrationMode.LINEAR: - start_pos = self.calibration["start_pos"][calib_idx] - end_pos = self.calibration["end_pos"][calib_idx] + normalized_values = {} + for joint, val in values.items(): + min_ = self.calibration[joint].range_min + max_ = self.calibration[joint].range_max + bounded_val = min(max_, max(min_, val)) - # Rescale the present position to a nominal range [0, 100] %, - # useful for joints with linear motions like Aloha gripper - values[i] = (values[i] - start_pos) / (end_pos - start_pos) * 100 + normalized_values[joint] = ((bounded_val - min_) / (max_ - min_)) * 100 - if (values[i] < LOWER_BOUND_LINEAR) or (values[i] > UPPER_BOUND_LINEAR): - if name == "pinky_1" and (values[i] < LOWER_BOUND_LINEAR): - values[i] = end_pos - else: - msg = ( - f"Wrong motor position range detected for {name}. " - f"Expected to be in nominal range of [0, 100] % (a full linear translation), " - f"with a maximum range of [{LOWER_BOUND_LINEAR}, {UPPER_BOUND_LINEAR}] % to account for some imprecision during calibration, " - f"but present value is {values[i]} %. " - "This might be due to a cable connection issue creating an artificial jump in motor values. " - "You need to recalibrate by running: `python lerobot/scripts/control_robot.py calibrate`" - ) - print(msg) - # raise JointOutOfRangeError(msg) + return normalized_values - return values + def _read(self, joints: list[str] | None = None, normalize: bool = True) -> dict[str, int | float]: + """ + Return the most recent (single) values from self.last_d, + optionally applying calibration. + """ + with self._lock: + state = self._state + + if joints is not None: + state = {k: v for k, v in state.items() if k in joints} + + if normalize: + state = self._normalize(state, joints) + + return state + + def _async_read(self): + """ + Continuously read from the serial buffer in its own thread and sends values to the main thread through + a queue. + """ + while True: + if self.serial.in_waiting > 0: + self.serial.flush() + positions = self.serial.readline().decode("utf-8").strip().split(" ") + if len(positions) != len(self.joints): + continue + + try: + joint_positions = { + joint: int(pos) for joint, pos in zip(self.joints, positions, strict=True) + } + except ValueError: + continue + + with self._lock: + self._state = joint_positions + + def get_action(self) -> dict[str, float]: + joint_positions = self._read() + return {f"{joint}.pos": pos for joint, pos in joint_positions.items()} + + def send_feedback(self, feedback: dict[str, float]) -> None: + raise NotImplementedError + + def disconnect(self) -> None: + if not self.is_connected: + DeviceNotConnectedError(f"{self} is not connected.") + + self.thread.join() + self.serial.close() + logger.info(f"{self} disconnected.")