diff --git a/lerobot/common/teleoperators/homonculus/homonculus_glove.py b/lerobot/common/teleoperators/homonculus/homonculus_glove.py index ece34bf2..1e47b568 100644 --- a/lerobot/common/teleoperators/homonculus/homonculus_glove.py +++ b/lerobot/common/teleoperators/homonculus/homonculus_glove.py @@ -18,11 +18,13 @@ import logging import threading import time from enum import Enum +from pprint import pformat import serial from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError from lerobot.common.motors import MotorCalibration +from lerobot.common.utils.utils import enter_pressed, move_cursor_up from ..teleoperator import Teleoperator from .config_homonculus import HomonculusGloveConfig @@ -71,7 +73,7 @@ class HomonculusGlove(Teleoperator): "pinky_1", "pinky_2", ] - self._state = dict.fromkeys(self.joints, 100) + # self._state = dict.fromkeys(self.joints, 100) self.thread = threading.Thread(target=self._async_read, daemon=True, name=f"{self} _async_read") self._lock = threading.Lock() @@ -94,6 +96,7 @@ class HomonculusGlove(Teleoperator): if not self.serial.is_open: self.serial.open() self.thread.start() + time.sleep(0.5) # gives time for the thread to ramp up logger.info(f"{self} connected.") @property @@ -101,49 +104,30 @@ class HomonculusGlove(Teleoperator): return self.calibration_fpath.is_file() def calibrate(self) -> None: - input("\nMove hand to open position. Press Enter to continue...") - open_pos_start = self._read(normalize=False) - open_pos_mins = open_pos_start.copy() - open_pos_maxes = open_pos_start.copy() - for _ in range(100): - positions = self._read(normalize=False) - open_pos_mins = {joint: min(positions[joint], min_) for joint, min_ in open_pos_mins.items()} - open_pos_maxes = {joint: max(positions[joint], max_) for joint, max_ in open_pos_maxes.items()} - time.sleep(0.01) + range_mins, range_maxes = {}, {} + for finger in ["thumb", "index", "middle", "ring", "pinky"]: + print( + f"\nMove {finger} through its entire range of motion." + "\nRecording positions. Press ENTER to stop..." + ) + finger_joints = [joint for joint in self.joints if joint.startswith(finger)] + finger_mins, finger_maxes = self._record_ranges_of_motion(finger_joints) + range_mins.update(finger_mins) + range_maxes.update(finger_maxes) - print(f"{open_pos_mins=}") - print(f"{open_pos_maxes=}") - - print("\nMove hand to closed position") - input("Press Enter to continue...") - closed_pos_start = self._read(normalize=False) - closed_pos_mins = closed_pos_start.copy() - closed_pos_maxes = closed_pos_start.copy() - for _ in range(100): - positions = self._read(normalize=False) - closed_pos_mins = {joint: min(positions[joint], min_) for joint, min_ in closed_pos_mins.items()} - closed_pos_maxes = { - joint: max(positions[joint], max_) for joint, max_ in closed_pos_maxes.items() - } - time.sleep(0.01) - - open_pos = {joint: min(open_pos_maxes[joint], closed_pos_maxes[joint]) for joint in self.joints} - closed_pos = {joint: min(closed_pos_maxes[joint], closed_pos_maxes[joint]) for joint in self.joints} - - # Due to magnet placement on the following joints, we need to inverse their min/max inverted_joints = [ "thumb_0", "thumb_3", + "index_0", "index_2", "middle_2", "ring_2", "pinky_2", - "index_0", ] - for joint in inverted_joints: - tmp_pos = open_pos[joint] - open_pos[joint] = closed_pos[joint] - closed_pos[joint] = tmp_pos + # for joint in inverted_joints: + # tmp_pos = range_mins[joint] + # range_mins[joint] = range_maxes[joint] + # range_maxes[joint] = tmp_pos self.calibration = {} for id_, joint in enumerate(self.joints): @@ -151,17 +135,68 @@ class HomonculusGlove(Teleoperator): id=id_, drive_mode=1 if joint in inverted_joints else 0, homing_offset=0, - range_min=open_pos[joint], - range_max=closed_pos[joint], + range_min=range_mins[joint], + range_max=range_maxes[joint], ) self._save_calibration() print("Calibration saved to", self.calibration_fpath) + def _record_ranges_of_motion( + self, joints: list[str] | None = None, display_values: bool = True + ) -> tuple[dict[str, int], dict[str, int]]: + """Interactively record the min/max encoder values of each joint. + + Move the fingers while the method streams live positions. Press :kbd:`Enter` to finish. + + Args: + joints (list[str] | None, optional): Joints to record. Defaults to every joint (`None`). + display_values (bool, optional): When `True` (default) a live table is printed to the console. + + Raises: + TypeError: _description_ + + Returns: + tuple[dict[str, int], dict[str, int]]: Two dictionaries *mins* and *maxes* with the extreme values + observed for each joint. + """ + if joints is None: + joints = list(self.joints) + elif not isinstance(joints, list): + raise TypeError(joints) + + start_positions = self._read(joints, normalize=False) + mins = start_positions.copy() + maxes = start_positions.copy() + while True: + positions = self._read(joints, normalize=False) + mins = {joint: min(positions[joint], min_) for joint, min_ in mins.items()} + maxes = {joint: max(positions[joint], max_) for joint, max_ in maxes.items()} + + if display_values: + print("\n-------------------------------------------") + print(f"{'NAME':<15} | {'MIN':>6} | {'POS':>6} | {'MAX':>6}") + for joint in joints: + print(f"{joint:<15} | {mins[joint]:>6} | {positions[joint]:>6} | {maxes[joint]:>6}") + + if enter_pressed(): + break + + if display_values: + # Move cursor up to overwrite the previous output + move_cursor_up(len(joints) + 3) + + same_min_max = [joint for joint in joints if mins[joint] == maxes[joint]] + if same_min_max: + raise ValueError(f"Some joints have the same min and max values:\n{pformat(same_min_max)}") + + # TODO(Steven, aliberts): add check to ensure mins and maxes are different + return mins, maxes + def configure(self) -> None: pass - def _normalize(self, values: dict[str, int], joints: list[str] | None): + def _normalize(self, values: dict[str, int], joints: list[str] | None = None): """Convert from unsigned int32 joint position range [0, 2**32[ to the universal float32 nominal degree range ]-180.0, 180.0[ with a "zero position" at 0 degree. @@ -243,6 +278,6 @@ class HomonculusGlove(Teleoperator): if not self.is_connected: DeviceNotConnectedError(f"{self} is not connected.") - self.thread.join() + self.thread.join(timeout=0.5) self.serial.close() logger.info(f"{self} disconnected.")