Update glove

This commit is contained in:
Simon Alibert
2025-05-15 11:12:04 +02:00
parent 5d322826c7
commit 2a0ab40776
2 changed files with 123 additions and 202 deletions

View File

@@ -24,7 +24,6 @@ from ..config import TeleoperatorConfig
class HomonculusGloveConfig(TeleoperatorConfig): class HomonculusGloveConfig(TeleoperatorConfig):
port: str # Port to connect to the glove port: str # Port to connect to the glove
baud_rate: int = 115_200 baud_rate: int = 115_200
buffer_size: int = 10 # Number of past values to keep in memory
@TeleoperatorConfig.register_subclass("homonculus_arm") @TeleoperatorConfig.register_subclass("homonculus_arm")
@@ -32,4 +31,3 @@ class HomonculusGloveConfig(TeleoperatorConfig):
class HomonculusArmConfig(TeleoperatorConfig): class HomonculusArmConfig(TeleoperatorConfig):
port: str # Port to connect to the arm port: str # Port to connect to the arm
baud_rate: int = 115_200 baud_rate: int = 115_200
buffer_size: int = 10 # Number of past values to keep in memory

View File

@@ -15,17 +15,14 @@
# limitations under the License. # limitations under the License.
import logging import logging
import os
import pickle # nosec
import threading import threading
import time import time
from collections import deque
from enum import Enum from enum import Enum
import numpy as np
import serial import serial
from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
from lerobot.common.motors import MotorCalibration
from ..teleoperator import Teleoperator from ..teleoperator import Teleoperator
from .config_homonculus import HomonculusGloveConfig from .config_homonculus import HomonculusGloveConfig
@@ -44,6 +41,10 @@ class CalibrationMode(Enum):
class HomonculusGlove(Teleoperator): class HomonculusGlove(Teleoperator):
"""
HomonculusGlove designed by Hugging Face.
"""
config_class = HomonculusGloveConfig config_class = HomonculusGloveConfig
name = "homonculus_glove" name = "homonculus_glove"
@@ -51,7 +52,6 @@ class HomonculusGlove(Teleoperator):
super().__init__(config) super().__init__(config)
self.config = config self.config = config
self.serial = serial.Serial(config.port, config.baud_rate, timeout=1) self.serial = serial.Serial(config.port, config.baud_rate, timeout=1)
self.buffer_size = config.buffer_size
self.joints = [ self.joints = [
"thumb_0", "thumb_0",
@@ -70,22 +70,17 @@ class HomonculusGlove(Teleoperator):
"pinky_0", "pinky_0",
"pinky_1", "pinky_1",
"pinky_2", "pinky_2",
"battery_voltage", # TODO(aliberts): Should this be in joints?
] ]
# Initialize a buffer (deque) for each joint self._state = dict.fromkeys(self.joints, 100)
self.joints_buffer = {joint: deque(maxlen=self.buffer_size) for joint in self.joints} self.thread = threading.Thread(target=self._async_read, daemon=True, name=f"{self} _async_read")
# Last read dictionary self._lock = threading.Lock()
self.last_d = dict.fromkeys(self.joints, 100)
self.calibration = None
self.thread = threading.Thread(target=self.async_read, daemon=True)
@property @property
def action_feature(self) -> dict: def action_features(self) -> dict:
return {f"{joint}.pos": float for joint in self.joints} return {f"{joint}.pos": float for joint in self.joints}
@property @property
def feedback_feature(self) -> dict: def feedback_features(self) -> dict:
return {} return {}
@property @property
@@ -99,183 +94,74 @@ class HomonculusGlove(Teleoperator):
if not self.serial.is_open: if not self.serial.is_open:
self.serial.open() self.serial.open()
self.thread.start() self.thread.start()
self.configure()
logger.info(f"{self} connected.") logger.info(f"{self} connected.")
@property @property
def is_calibrated(self) -> bool: def is_calibrated(self) -> bool:
raise NotImplementedError # TODO return self.calibration_fpath.is_file()
def calibrate(self) -> None: def calibrate(self) -> None:
raise NotImplementedError # TODO input("\nMove hand to open position. Press Enter to continue...")
open_pos_start = self._read(normalize=False)
def configure(self) -> None: open_pos_mins = open_pos_start.copy()
raise NotImplementedError # TODO open_pos_maxes = open_pos_start.copy()
def get_action(self) -> dict[str, float]:
raise NotImplementedError # TODO
def send_feedback(self, feedback: dict[str, float]) -> None:
raise NotImplementedError
def disconnect(self) -> None:
if not self.is_connected:
DeviceNotConnectedError(f"{self} is not connected.")
self.thread.join()
self.serial.close()
logger.info(f"{self} disconnected.")
### WIP below ###
@property
def joint_names(self):
return list(self.last_d.keys())
def read(self, motor_names: list[str] | None = None):
"""
Return the most recent (single) values from self.last_d,
optionally applying calibration.
"""
if motor_names is None:
motor_names = self.joint_names
# Get raw (last) values
values = np.array([self.last_d[k] for k in motor_names])
# Apply calibration if available
if self.calibration is not None:
values = self.apply_calibration(values, motor_names)
print(values)
return values
def read_running_average(self, motor_names: list[str] | None = None, linearize=False):
"""
Return the AVERAGE of the most recent self.buffer_size (or fewer, if not enough data) readings
for each joint, optionally applying calibration.
"""
if motor_names is None:
motor_names = self.joint_names
# Gather averaged readings from buffers
smoothed_vals = []
for name in motor_names:
buf = self.joints_buffer[name]
if len(buf) == 0:
# If no data has been read yet, fall back to last_d
smoothed_vals.append(self.last_d[name])
else:
# Otherwise, average over the existing buffer
smoothed_vals.append(np.mean(buf))
smoothed_vals = np.array(smoothed_vals, dtype=np.float32)
# Apply calibration if available
if self.calibration is not None:
smoothed_vals = self.apply_calibration(smoothed_vals, motor_names)
return smoothed_vals
def async_read(self):
"""
Continuously read from the serial buffer in its own thread,
store into `self.last_d` and also append to the rolling buffer (joint_buffer).
"""
while True:
if self.serial.in_waiting > 0:
self.serial.flush()
vals = self.serial.readline().decode("utf-8").strip()
vals = vals.split(" ")
if len(vals) != 17:
continue
self.last_d = {
joint: int(val) for joint, val in zip(self.joints, self.joint_names, strict=True)
}
# Also push these new values into the rolling buffers
for joint, val in self.last_d.items():
self.joints_buffer[joint].append(val)
def run_calibration(self):
print("\nMove hand to open position")
input("Press Enter to continue...")
open_pos_list = []
for _ in range(100): for _ in range(100):
open_pos = self.read() positions = self._read(normalize=False)
open_pos_list.append(open_pos) open_pos_mins = {joint: min(positions[joint], min_) for joint, min_ in open_pos_mins.items()}
open_pos_maxes = {joint: max(positions[joint], max_) for joint, max_ in open_pos_maxes.items()}
time.sleep(0.01) time.sleep(0.01)
open_pos = np.array(open_pos_list)
max_open_pos = open_pos.max(axis=0)
min_open_pos = open_pos.min(axis=0)
print(f"{max_open_pos=}") print(f"{open_pos_mins=}")
print(f"{min_open_pos=}") print(f"{open_pos_maxes=}")
print("\nMove hand to closed position") print("\nMove hand to closed position")
input("Press Enter to continue...") input("Press Enter to continue...")
closed_pos_list = [] closed_pos_start = self._read(normalize=False)
closed_pos_mins = closed_pos_start.copy()
closed_pos_maxes = closed_pos_start.copy()
for _ in range(100): for _ in range(100):
closed_pos = self.read() positions = self._read(normalize=False)
closed_pos_list.append(closed_pos) closed_pos_mins = {joint: min(positions[joint], min_) for joint, min_ in closed_pos_mins.items()}
closed_pos_maxes = {
joint: max(positions[joint], max_) for joint, max_ in closed_pos_maxes.items()
}
time.sleep(0.01) time.sleep(0.01)
closed_pos = np.array(closed_pos_list)
max_closed_pos = closed_pos.max(axis=0)
closed_pos[closed_pos < 1000] = 60000
min_closed_pos = closed_pos.min(axis=0)
print(f"{max_closed_pos=}") open_pos = {joint: min(open_pos_maxes[joint], closed_pos_maxes[joint]) for joint in self.joints}
print(f"{min_closed_pos=}") closed_pos = {joint: min(closed_pos_maxes[joint], closed_pos_maxes[joint]) for joint in self.joints}
open_pos = np.array([max_open_pos, max_closed_pos]).max(axis=0) # Due to magnet placement on the following joints, we need to inverse their min/max
closed_pos = np.array([min_open_pos, min_closed_pos]).min(axis=0) inverted_joints = [
"thumb_0",
"thumb_3",
"index_2",
"middle_2",
"ring_2",
"pinky_2",
"index_0",
]
for joint in inverted_joints:
tmp_pos = open_pos[joint]
open_pos[joint] = closed_pos[joint]
closed_pos[joint] = tmp_pos
# INVERSION self.calibration = {}
for i, jname in enumerate(self.joint_names): for id_, joint in enumerate(self.joints):
if jname in [ self.calibration[joint] = MotorCalibration(
"thumb_0", id=id_,
"thumb_3", drive_mode=1 if joint in inverted_joints else 0,
"index_2", homing_offset=0,
"middle_2", range_min=open_pos[joint],
"ring_2", range_max=closed_pos[joint],
"pinky_2", )
"index_0",
]:
tmp_pos = open_pos[i]
open_pos[i] = closed_pos[i]
closed_pos[i] = tmp_pos
print() self._save_calibration()
print(f"{open_pos=}") print("Calibration saved to", self.calibration_fpath)
print(f"{closed_pos=}")
homing_offset = [0] * len(self.joint_names) def configure(self) -> None:
drive_mode = [0] * len(self.joint_names) pass
calib_modes = [CalibrationMode.LINEAR.name] * len(self.joint_names)
calib_dict = { def _normalize(self, values: dict[str, int], joints: list[str] | None):
"homing_offset": homing_offset,
"drive_mode": drive_mode,
"start_pos": open_pos,
"end_pos": closed_pos,
"calib_mode": calib_modes,
"motor_names": self.joint_names,
}
file_path = "examples/hopejr/settings/hand_calib.pkl"
if not os.path.exists(file_path):
with open(file_path, "wb") as f:
pickle.dump(calib_dict, f) # TODO(aliberts): use json
print(f"Dictionary saved to {file_path}")
# return calib_dict
self.set_calibration(calib_dict)
def set_calibration(self, calibration: dict[str, list]):
self.calibration = calibration
def apply_calibration(self, values: np.ndarray | list, motor_names: list[str] | None):
"""Convert from unsigned int32 joint position range [0, 2**32[ to the universal float32 nominal degree range ]-180.0, 180.0[ with """Convert from unsigned int32 joint position range [0, 2**32[ to the universal float32 nominal degree range ]-180.0, 180.0[ with
a "zero position" at 0 degree. a "zero position" at 0 degree.
@@ -289,37 +175,74 @@ class HomonculusGlove(Teleoperator):
To harmonize between motors of the same model, different robots, or even models of different brands, we propose to work To harmonize between motors of the same model, different robots, or even models of different brands, we propose to work
in the centered nominal degree range ]-180, 180[. in the centered nominal degree range ]-180, 180[.
""" """
if motor_names is None: # if joints is None:
motor_names = self.motor_names # joints = self.motor_names
# Convert from unsigned int32 original range [0, 2**32] to signed float32 range # # Convert from unsigned int32 original range [0, 2**32] to signed float32 range
values = values.astype(np.float32) # values = values.astype(np.float32)
for i, name in enumerate(motor_names): if not self.calibration:
calib_idx = self.calibration["motor_names"].index(name) raise RuntimeError(f"{self} has no calibration registered.")
calib_mode = self.calibration["calib_mode"][calib_idx]
if CalibrationMode[calib_mode] == CalibrationMode.LINEAR: normalized_values = {}
start_pos = self.calibration["start_pos"][calib_idx] for joint, val in values.items():
end_pos = self.calibration["end_pos"][calib_idx] min_ = self.calibration[joint].range_min
max_ = self.calibration[joint].range_max
bounded_val = min(max_, max(min_, val))
# Rescale the present position to a nominal range [0, 100] %, normalized_values[joint] = ((bounded_val - min_) / (max_ - min_)) * 100
# useful for joints with linear motions like Aloha gripper
values[i] = (values[i] - start_pos) / (end_pos - start_pos) * 100
if (values[i] < LOWER_BOUND_LINEAR) or (values[i] > UPPER_BOUND_LINEAR): return normalized_values
if name == "pinky_1" and (values[i] < LOWER_BOUND_LINEAR):
values[i] = end_pos
else:
msg = (
f"Wrong motor position range detected for {name}. "
f"Expected to be in nominal range of [0, 100] % (a full linear translation), "
f"with a maximum range of [{LOWER_BOUND_LINEAR}, {UPPER_BOUND_LINEAR}] % to account for some imprecision during calibration, "
f"but present value is {values[i]} %. "
"This might be due to a cable connection issue creating an artificial jump in motor values. "
"You need to recalibrate by running: `python lerobot/scripts/control_robot.py calibrate`"
)
print(msg)
# raise JointOutOfRangeError(msg)
return values def _read(self, joints: list[str] | None = None, normalize: bool = True) -> dict[str, int | float]:
"""
Return the most recent (single) values from self.last_d,
optionally applying calibration.
"""
with self._lock:
state = self._state
if joints is not None:
state = {k: v for k, v in state.items() if k in joints}
if normalize:
state = self._normalize(state, joints)
return state
def _async_read(self):
"""
Continuously read from the serial buffer in its own thread and sends values to the main thread through
a queue.
"""
while True:
if self.serial.in_waiting > 0:
self.serial.flush()
positions = self.serial.readline().decode("utf-8").strip().split(" ")
if len(positions) != len(self.joints):
continue
try:
joint_positions = {
joint: int(pos) for joint, pos in zip(self.joints, positions, strict=True)
}
except ValueError:
continue
with self._lock:
self._state = joint_positions
def get_action(self) -> dict[str, float]:
joint_positions = self._read()
return {f"{joint}.pos": pos for joint, pos in joint_positions.items()}
def send_feedback(self, feedback: dict[str, float]) -> None:
raise NotImplementedError
def disconnect(self) -> None:
if not self.is_connected:
DeviceNotConnectedError(f"{self} is not connected.")
self.thread.join()
self.serial.close()
logger.info(f"{self} disconnected.")