forked from tangger/lerobot
Fix glove calibration
This commit is contained in:
@@ -18,11 +18,13 @@ import logging
|
|||||||
import threading
|
import threading
|
||||||
import time
|
import time
|
||||||
from enum import Enum
|
from enum import Enum
|
||||||
|
from pprint import pformat
|
||||||
|
|
||||||
import serial
|
import serial
|
||||||
|
|
||||||
from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
|
from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
|
||||||
from lerobot.common.motors import MotorCalibration
|
from lerobot.common.motors import MotorCalibration
|
||||||
|
from lerobot.common.utils.utils import enter_pressed, move_cursor_up
|
||||||
|
|
||||||
from ..teleoperator import Teleoperator
|
from ..teleoperator import Teleoperator
|
||||||
from .config_homonculus import HomonculusGloveConfig
|
from .config_homonculus import HomonculusGloveConfig
|
||||||
@@ -71,7 +73,7 @@ class HomonculusGlove(Teleoperator):
|
|||||||
"pinky_1",
|
"pinky_1",
|
||||||
"pinky_2",
|
"pinky_2",
|
||||||
]
|
]
|
||||||
self._state = dict.fromkeys(self.joints, 100)
|
# self._state = dict.fromkeys(self.joints, 100)
|
||||||
self.thread = threading.Thread(target=self._async_read, daemon=True, name=f"{self} _async_read")
|
self.thread = threading.Thread(target=self._async_read, daemon=True, name=f"{self} _async_read")
|
||||||
self._lock = threading.Lock()
|
self._lock = threading.Lock()
|
||||||
|
|
||||||
@@ -94,6 +96,7 @@ class HomonculusGlove(Teleoperator):
|
|||||||
if not self.serial.is_open:
|
if not self.serial.is_open:
|
||||||
self.serial.open()
|
self.serial.open()
|
||||||
self.thread.start()
|
self.thread.start()
|
||||||
|
time.sleep(0.5) # gives time for the thread to ramp up
|
||||||
logger.info(f"{self} connected.")
|
logger.info(f"{self} connected.")
|
||||||
|
|
||||||
@property
|
@property
|
||||||
@@ -101,49 +104,30 @@ class HomonculusGlove(Teleoperator):
|
|||||||
return self.calibration_fpath.is_file()
|
return self.calibration_fpath.is_file()
|
||||||
|
|
||||||
def calibrate(self) -> None:
|
def calibrate(self) -> None:
|
||||||
input("\nMove hand to open position. Press Enter to continue...")
|
range_mins, range_maxes = {}, {}
|
||||||
open_pos_start = self._read(normalize=False)
|
for finger in ["thumb", "index", "middle", "ring", "pinky"]:
|
||||||
open_pos_mins = open_pos_start.copy()
|
print(
|
||||||
open_pos_maxes = open_pos_start.copy()
|
f"\nMove {finger} through its entire range of motion."
|
||||||
for _ in range(100):
|
"\nRecording positions. Press ENTER to stop..."
|
||||||
positions = self._read(normalize=False)
|
)
|
||||||
open_pos_mins = {joint: min(positions[joint], min_) for joint, min_ in open_pos_mins.items()}
|
finger_joints = [joint for joint in self.joints if joint.startswith(finger)]
|
||||||
open_pos_maxes = {joint: max(positions[joint], max_) for joint, max_ in open_pos_maxes.items()}
|
finger_mins, finger_maxes = self._record_ranges_of_motion(finger_joints)
|
||||||
time.sleep(0.01)
|
range_mins.update(finger_mins)
|
||||||
|
range_maxes.update(finger_maxes)
|
||||||
|
|
||||||
print(f"{open_pos_mins=}")
|
|
||||||
print(f"{open_pos_maxes=}")
|
|
||||||
|
|
||||||
print("\nMove hand to closed position")
|
|
||||||
input("Press Enter to continue...")
|
|
||||||
closed_pos_start = self._read(normalize=False)
|
|
||||||
closed_pos_mins = closed_pos_start.copy()
|
|
||||||
closed_pos_maxes = closed_pos_start.copy()
|
|
||||||
for _ in range(100):
|
|
||||||
positions = self._read(normalize=False)
|
|
||||||
closed_pos_mins = {joint: min(positions[joint], min_) for joint, min_ in closed_pos_mins.items()}
|
|
||||||
closed_pos_maxes = {
|
|
||||||
joint: max(positions[joint], max_) for joint, max_ in closed_pos_maxes.items()
|
|
||||||
}
|
|
||||||
time.sleep(0.01)
|
|
||||||
|
|
||||||
open_pos = {joint: min(open_pos_maxes[joint], closed_pos_maxes[joint]) for joint in self.joints}
|
|
||||||
closed_pos = {joint: min(closed_pos_maxes[joint], closed_pos_maxes[joint]) for joint in self.joints}
|
|
||||||
|
|
||||||
# Due to magnet placement on the following joints, we need to inverse their min/max
|
|
||||||
inverted_joints = [
|
inverted_joints = [
|
||||||
"thumb_0",
|
"thumb_0",
|
||||||
"thumb_3",
|
"thumb_3",
|
||||||
|
"index_0",
|
||||||
"index_2",
|
"index_2",
|
||||||
"middle_2",
|
"middle_2",
|
||||||
"ring_2",
|
"ring_2",
|
||||||
"pinky_2",
|
"pinky_2",
|
||||||
"index_0",
|
|
||||||
]
|
]
|
||||||
for joint in inverted_joints:
|
# for joint in inverted_joints:
|
||||||
tmp_pos = open_pos[joint]
|
# tmp_pos = range_mins[joint]
|
||||||
open_pos[joint] = closed_pos[joint]
|
# range_mins[joint] = range_maxes[joint]
|
||||||
closed_pos[joint] = tmp_pos
|
# range_maxes[joint] = tmp_pos
|
||||||
|
|
||||||
self.calibration = {}
|
self.calibration = {}
|
||||||
for id_, joint in enumerate(self.joints):
|
for id_, joint in enumerate(self.joints):
|
||||||
@@ -151,17 +135,68 @@ class HomonculusGlove(Teleoperator):
|
|||||||
id=id_,
|
id=id_,
|
||||||
drive_mode=1 if joint in inverted_joints else 0,
|
drive_mode=1 if joint in inverted_joints else 0,
|
||||||
homing_offset=0,
|
homing_offset=0,
|
||||||
range_min=open_pos[joint],
|
range_min=range_mins[joint],
|
||||||
range_max=closed_pos[joint],
|
range_max=range_maxes[joint],
|
||||||
)
|
)
|
||||||
|
|
||||||
self._save_calibration()
|
self._save_calibration()
|
||||||
print("Calibration saved to", self.calibration_fpath)
|
print("Calibration saved to", self.calibration_fpath)
|
||||||
|
|
||||||
|
def _record_ranges_of_motion(
|
||||||
|
self, joints: list[str] | None = None, display_values: bool = True
|
||||||
|
) -> tuple[dict[str, int], dict[str, int]]:
|
||||||
|
"""Interactively record the min/max encoder values of each joint.
|
||||||
|
|
||||||
|
Move the fingers while the method streams live positions. Press :kbd:`Enter` to finish.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
joints (list[str] | None, optional): Joints to record. Defaults to every joint (`None`).
|
||||||
|
display_values (bool, optional): When `True` (default) a live table is printed to the console.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
TypeError: _description_
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
tuple[dict[str, int], dict[str, int]]: Two dictionaries *mins* and *maxes* with the extreme values
|
||||||
|
observed for each joint.
|
||||||
|
"""
|
||||||
|
if joints is None:
|
||||||
|
joints = list(self.joints)
|
||||||
|
elif not isinstance(joints, list):
|
||||||
|
raise TypeError(joints)
|
||||||
|
|
||||||
|
start_positions = self._read(joints, normalize=False)
|
||||||
|
mins = start_positions.copy()
|
||||||
|
maxes = start_positions.copy()
|
||||||
|
while True:
|
||||||
|
positions = self._read(joints, normalize=False)
|
||||||
|
mins = {joint: min(positions[joint], min_) for joint, min_ in mins.items()}
|
||||||
|
maxes = {joint: max(positions[joint], max_) for joint, max_ in maxes.items()}
|
||||||
|
|
||||||
|
if display_values:
|
||||||
|
print("\n-------------------------------------------")
|
||||||
|
print(f"{'NAME':<15} | {'MIN':>6} | {'POS':>6} | {'MAX':>6}")
|
||||||
|
for joint in joints:
|
||||||
|
print(f"{joint:<15} | {mins[joint]:>6} | {positions[joint]:>6} | {maxes[joint]:>6}")
|
||||||
|
|
||||||
|
if enter_pressed():
|
||||||
|
break
|
||||||
|
|
||||||
|
if display_values:
|
||||||
|
# Move cursor up to overwrite the previous output
|
||||||
|
move_cursor_up(len(joints) + 3)
|
||||||
|
|
||||||
|
same_min_max = [joint for joint in joints if mins[joint] == maxes[joint]]
|
||||||
|
if same_min_max:
|
||||||
|
raise ValueError(f"Some joints have the same min and max values:\n{pformat(same_min_max)}")
|
||||||
|
|
||||||
|
# TODO(Steven, aliberts): add check to ensure mins and maxes are different
|
||||||
|
return mins, maxes
|
||||||
|
|
||||||
def configure(self) -> None:
|
def configure(self) -> None:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def _normalize(self, values: dict[str, int], joints: list[str] | None):
|
def _normalize(self, values: dict[str, int], joints: list[str] | None = None):
|
||||||
"""Convert from unsigned int32 joint position range [0, 2**32[ to the universal float32 nominal degree range ]-180.0, 180.0[ with
|
"""Convert from unsigned int32 joint position range [0, 2**32[ to the universal float32 nominal degree range ]-180.0, 180.0[ with
|
||||||
a "zero position" at 0 degree.
|
a "zero position" at 0 degree.
|
||||||
|
|
||||||
@@ -243,6 +278,6 @@ class HomonculusGlove(Teleoperator):
|
|||||||
if not self.is_connected:
|
if not self.is_connected:
|
||||||
DeviceNotConnectedError(f"{self} is not connected.")
|
DeviceNotConnectedError(f"{self} is not connected.")
|
||||||
|
|
||||||
self.thread.join()
|
self.thread.join(timeout=0.5)
|
||||||
self.serial.close()
|
self.serial.close()
|
||||||
logger.info(f"{self} disconnected.")
|
logger.info(f"{self} disconnected.")
|
||||||
|
|||||||
Reference in New Issue
Block a user