forked from tangger/lerobot
Compare commits
42 Commits
user/miche
...
user/alibe
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e578e0c2bb | ||
|
|
e0d3179b72 | ||
|
|
fbd636d6b0 | ||
|
|
6b4d465595 | ||
|
|
dfa96b4258 | ||
|
|
d5ebac067a | ||
|
|
c94d1c5745 | ||
|
|
b623a15a16 | ||
|
|
325f5d72f8 | ||
|
|
97e1860182 | ||
|
|
5f57289eac | ||
|
|
72d04ca3fe | ||
|
|
125d19df93 | ||
|
|
dafdbe4297 | ||
|
|
e62460f187 | ||
|
|
ef0134dc19 | ||
|
|
1742b2627b | ||
|
|
5504b5aee2 | ||
|
|
0f34b4b1bd | ||
|
|
1572e9b2aa | ||
|
|
a3c671f7dd | ||
|
|
f57015fbd9 | ||
|
|
df7b22de95 | ||
|
|
a79f096750 | ||
|
|
e9d1b5588c | ||
|
|
a5eaec857d | ||
|
|
7aef264cf8 | ||
|
|
a2fbc54382 | ||
|
|
9e8d726e8f | ||
|
|
2a0ab40776 | ||
|
|
5d322826c7 | ||
|
|
0df6a3fcef | ||
|
|
f9f3088060 | ||
|
|
30d3807afa | ||
|
|
98643b000e | ||
|
|
c4b1cee615 | ||
|
|
a0120a2446 | ||
|
|
7c5813778b | ||
|
|
27feea019a | ||
|
|
c81f293ae1 | ||
|
|
c5ab08e175 | ||
|
|
58d2ac0c0b |
53
examples/hope_jr/teleop/left_arm_teleop.py
Normal file
53
examples/hope_jr/teleop/left_arm_teleop.py
Normal file
@@ -0,0 +1,53 @@
|
||||
import time
|
||||
import traceback
|
||||
|
||||
from lerobot.common.robots.hope_jr import HopeJrArm, HopeJrArmConfig
|
||||
from lerobot.common.teleoperators.homonculus import HomonculusArm, HomonculusArmConfig
|
||||
from lerobot.common.utils.utils import move_cursor_up
|
||||
|
||||
|
||||
def make_left_arm() -> tuple[HomonculusArm, HopeJrArm]:
|
||||
left_exo_arm_cfg = HomonculusArmConfig("/dev/tty.usbmodem2101", id="left")
|
||||
left_exo_arm = HomonculusArm(left_exo_arm_cfg)
|
||||
left_arm_cfg = HopeJrArmConfig("/dev/tty.usbserial-140", id="left")
|
||||
left_arm = HopeJrArm(left_arm_cfg)
|
||||
return left_exo_arm, left_arm
|
||||
|
||||
|
||||
def main():
|
||||
left_exo_arm, left_arm = make_left_arm()
|
||||
display_len = max(len(key) for key in left_exo_arm.action_features)
|
||||
left_exo_arm.connect()
|
||||
left_arm.connect()
|
||||
|
||||
try:
|
||||
while True:
|
||||
start = time.perf_counter()
|
||||
|
||||
left_arm_action = left_exo_arm.get_action()
|
||||
left_arm_action["shoulder_pitch.pos"] = -left_arm_action["shoulder_pitch.pos"]
|
||||
left_arm_action["wrist_yaw.pos"] = -left_arm_action["wrist_yaw.pos"]
|
||||
left_arm.send_action(left_arm_action)
|
||||
|
||||
time.sleep(0.01)
|
||||
loop_s = time.perf_counter() - start
|
||||
|
||||
print("\n" + "-" * (display_len + 10))
|
||||
print(f"{'NAME':<{display_len}} | {'ARM':>7}")
|
||||
for joint, val in left_arm_action.items():
|
||||
print(f"{joint:<{display_len}} | {val:>7.2f}")
|
||||
|
||||
print(f"\ntime: {loop_s * 1e3:.2f}ms ({1 / loop_s:.0f} Hz)")
|
||||
move_cursor_up(len(left_arm_action) + 5)
|
||||
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
except Exception:
|
||||
traceback.print_exc()
|
||||
finally:
|
||||
left_exo_arm.disconnect()
|
||||
left_arm.disconnect()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
51
examples/hope_jr/teleop/left_hand_teleop.py
Normal file
51
examples/hope_jr/teleop/left_hand_teleop.py
Normal file
@@ -0,0 +1,51 @@
|
||||
import time
|
||||
import traceback
|
||||
|
||||
from lerobot.common.robots.hope_jr import HopeJrHand, HopeJrHandConfig, homonculus_glove_to_hope_jr_hand
|
||||
from lerobot.common.teleoperators.homonculus import HomonculusGlove, HomonculusGloveConfig
|
||||
from lerobot.common.utils.utils import move_cursor_up
|
||||
|
||||
|
||||
def make_left_hand() -> tuple[HomonculusGlove, HopeJrHand]:
|
||||
left_glove_cfg = HomonculusGloveConfig("/dev/tty.usbmodem1101", id="left", side="left")
|
||||
left_glove = HomonculusGlove(left_glove_cfg)
|
||||
left_hand_cfg = HopeJrHandConfig("/dev/tty.usbmodem58760433641", id="left", side="left")
|
||||
left_hand = HopeJrHand(left_hand_cfg)
|
||||
return left_glove, left_hand
|
||||
|
||||
|
||||
def main():
|
||||
left_glove, left_hand = make_left_hand()
|
||||
left_glove.connect()
|
||||
left_hand.connect()
|
||||
display_len = max(len(key) for key in left_hand.action_features)
|
||||
|
||||
try:
|
||||
while True:
|
||||
start = time.perf_counter()
|
||||
|
||||
left_glove_action = left_glove.get_action()
|
||||
left_hand_action = homonculus_glove_to_hope_jr_hand(left_glove_action)
|
||||
left_hand.send_action(left_hand_action)
|
||||
|
||||
time.sleep(0.01)
|
||||
loop_s = time.perf_counter() - start
|
||||
|
||||
print(f"\n{'NAME':<{display_len}} | {'HAND':>7}")
|
||||
for joint, val in left_hand_action.items():
|
||||
print(f"{joint:<{display_len}} | {val:>7.2f}")
|
||||
|
||||
print(f"\ntime: {loop_s * 1e3:.2f}ms ({1 / loop_s:.0f} Hz)")
|
||||
move_cursor_up(len(left_hand_action) + 4)
|
||||
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
except Exception:
|
||||
traceback.print_exc()
|
||||
finally:
|
||||
left_glove.disconnect()
|
||||
left_hand.disconnect()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
53
examples/hope_jr/teleop/right_arm_teleop.py
Normal file
53
examples/hope_jr/teleop/right_arm_teleop.py
Normal file
@@ -0,0 +1,53 @@
|
||||
import time
|
||||
import traceback
|
||||
|
||||
from lerobot.common.robots.hope_jr import HopeJrArm, HopeJrArmConfig
|
||||
from lerobot.common.teleoperators.homonculus import HomonculusArm, HomonculusArmConfig
|
||||
from lerobot.common.utils.utils import move_cursor_up
|
||||
|
||||
|
||||
def make_right_arm() -> tuple[HomonculusArm, HopeJrArm]:
|
||||
right_exo_arm_cfg = HomonculusArmConfig("/dev/tty.usbmodem2101", id="right")
|
||||
right_exo_arm = HomonculusArm(right_exo_arm_cfg)
|
||||
right_arm_cfg = HopeJrArmConfig("/dev/tty.usbserial-140", id="right")
|
||||
right_arm = HopeJrArm(right_arm_cfg)
|
||||
return right_exo_arm, right_arm
|
||||
|
||||
|
||||
def main():
|
||||
right_exo_arm, right_arm = make_right_arm()
|
||||
display_len = max(len(key) for key in right_exo_arm.action_features)
|
||||
right_exo_arm.connect()
|
||||
right_arm.connect()
|
||||
|
||||
try:
|
||||
while True:
|
||||
start = time.perf_counter()
|
||||
|
||||
right_arm_action = right_exo_arm.get_action()
|
||||
right_arm_action["shoulder_pitch.pos"] = -right_arm_action["shoulder_pitch.pos"]
|
||||
right_arm_action["wrist_yaw.pos"] = -right_arm_action["wrist_yaw.pos"]
|
||||
right_arm.send_action(right_arm_action)
|
||||
|
||||
time.sleep(0.01)
|
||||
loop_s = time.perf_counter() - start
|
||||
|
||||
print("\n" + "-" * (display_len + 10))
|
||||
print(f"{'NAME':<{display_len}} | {'ARM':>7}")
|
||||
for joint, val in right_arm_action.items():
|
||||
print(f"{joint:<{display_len}} | {val:>7.2f}")
|
||||
|
||||
print(f"\ntime: {loop_s * 1e3:.2f}ms ({1 / loop_s:.0f} Hz)")
|
||||
move_cursor_up(len(right_arm_action) + 5)
|
||||
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
except Exception:
|
||||
traceback.print_exc()
|
||||
finally:
|
||||
right_exo_arm.disconnect()
|
||||
right_arm.disconnect()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
51
examples/hope_jr/teleop/right_hand_teleop.py
Normal file
51
examples/hope_jr/teleop/right_hand_teleop.py
Normal file
@@ -0,0 +1,51 @@
|
||||
import time
|
||||
import traceback
|
||||
|
||||
from lerobot.common.robots.hope_jr import HopeJrHand, HopeJrHandConfig, homonculus_glove_to_hope_jr_hand
|
||||
from lerobot.common.teleoperators.homonculus import HomonculusGlove, HomonculusGloveConfig
|
||||
from lerobot.common.utils.utils import move_cursor_up
|
||||
|
||||
|
||||
def make_right_hand() -> tuple[HomonculusGlove, HopeJrHand]:
|
||||
right_glove_cfg = HomonculusGloveConfig("/dev/tty.usbmodem2101", id="right", side="right")
|
||||
right_glove = HomonculusGlove(right_glove_cfg)
|
||||
right_hand_cfg = HopeJrHandConfig("/dev/tty.usbserial-140", id="right", side="right")
|
||||
right_hand = HopeJrHand(right_hand_cfg)
|
||||
return right_glove, right_hand
|
||||
|
||||
|
||||
def main():
|
||||
right_glove, right_hand = make_right_hand()
|
||||
right_glove.connect()
|
||||
right_hand.connect()
|
||||
display_len = max(len(key) for key in right_hand.action_features)
|
||||
|
||||
try:
|
||||
while True:
|
||||
start = time.perf_counter()
|
||||
|
||||
right_glove_action = right_glove.get_action()
|
||||
right_hand_action = homonculus_glove_to_hope_jr_hand(right_glove_action)
|
||||
right_hand.send_action(right_hand_action)
|
||||
|
||||
time.sleep(0.01)
|
||||
loop_s = time.perf_counter() - start
|
||||
|
||||
print(f"\n{'NAME':<{display_len}} | {'HAND':>7}")
|
||||
for joint, val in right_hand_action.items():
|
||||
print(f"{joint:<{display_len}} | {val:>7.2f}")
|
||||
|
||||
print(f"\ntime: {loop_s * 1e3:.2f}ms ({1 / loop_s:.0f} Hz)")
|
||||
move_cursor_up(len(right_hand_action) + 4)
|
||||
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
except Exception:
|
||||
traceback.print_exc()
|
||||
finally:
|
||||
right_glove.disconnect()
|
||||
right_hand.disconnect()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
36
examples/hope_jr/vis/arm.py
Normal file
36
examples/hope_jr/vis/arm.py
Normal file
@@ -0,0 +1,36 @@
|
||||
import time
|
||||
import traceback
|
||||
|
||||
from lerobot.common.robots.hope_jr import HopeJrArm, HopeJrArmConfig
|
||||
from lerobot.common.utils.utils import move_cursor_up
|
||||
|
||||
# cfg = HopeJrArmConfig("/dev/tty.usbserial-140", id="right")
|
||||
cfg = HopeJrArmConfig("/dev/tty.usbserial-140", id="left")
|
||||
arm = HopeJrArm(cfg)
|
||||
display_len = max(len(key) for key in arm.action_features)
|
||||
|
||||
arm.connect()
|
||||
# arm.calibrate()
|
||||
arm.bus.disable_torque()
|
||||
try:
|
||||
while True:
|
||||
start = time.perf_counter()
|
||||
raw_obs = arm.bus.sync_read("Present_Position", normalize=False)
|
||||
norm_obs = {arm.bus.motors[name].id: val for name, val in raw_obs.items()}
|
||||
norm_obs = arm.bus._normalize(norm_obs)
|
||||
norm_obs = {arm.bus._id_to_name(id_): val for id_, val in norm_obs.items()}
|
||||
loop_s = time.perf_counter() - start
|
||||
|
||||
print("\n----------------------------------")
|
||||
print(f"{'NAME':<15} | {'RAW':>7} | {'NORM':>7}")
|
||||
for motor in arm.bus.motors:
|
||||
print(f"{motor:<15} | {raw_obs[motor]:>7} | {norm_obs[motor]:>7.2f}")
|
||||
print(f"\ntime: {loop_s * 1e3:.2f}ms ({1 / loop_s:.0f} Hz)")
|
||||
move_cursor_up(len(arm.bus.motors) + 5)
|
||||
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
except Exception as e:
|
||||
traceback.print_exc(e)
|
||||
finally:
|
||||
arm.disconnect()
|
||||
33
examples/hope_jr/vis/exo.py
Normal file
33
examples/hope_jr/vis/exo.py
Normal file
@@ -0,0 +1,33 @@
|
||||
import time
|
||||
import traceback
|
||||
|
||||
from lerobot.common.teleoperators.homonculus import HomonculusArm, HomonculusArmConfig
|
||||
from lerobot.common.utils.utils import move_cursor_up
|
||||
|
||||
# cfg = HomonculusArmConfig("/dev/tty.usbmodem2101", id="left")
|
||||
cfg = HomonculusArmConfig("/dev/tty.usbmodem2101", id="right")
|
||||
arm = HomonculusArm(cfg)
|
||||
display_len = max(len(key) for key in arm.action_features)
|
||||
|
||||
arm.connect()
|
||||
# arm.calibrate()
|
||||
try:
|
||||
while True:
|
||||
start = time.perf_counter()
|
||||
raw_action = arm._read(normalize=False)
|
||||
norm_action = arm._normalize(raw_action)
|
||||
loop_s = time.perf_counter() - start
|
||||
|
||||
print("\n" + "-" * (display_len + 10))
|
||||
print(f"{'NAME':<{display_len}} | {'RAW':>7} | {'NORM':>7}")
|
||||
for joint in arm.joints:
|
||||
print(f"{joint:<{display_len}} | {raw_action[joint]:>7} | {norm_action[joint]:>7.2f}")
|
||||
print(f"\ntime: {loop_s * 1e3:.2f}ms ({1 / loop_s:.0f} Hz)")
|
||||
move_cursor_up(len(norm_action) + 5)
|
||||
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
except Exception:
|
||||
traceback.print_exc()
|
||||
finally:
|
||||
arm.disconnect()
|
||||
32
examples/hope_jr/vis/glove.py
Normal file
32
examples/hope_jr/vis/glove.py
Normal file
@@ -0,0 +1,32 @@
|
||||
import time
|
||||
import traceback
|
||||
|
||||
from lerobot.common.teleoperators.homonculus import HomonculusGlove, HomonculusGloveConfig
|
||||
from lerobot.common.utils.utils import move_cursor_up
|
||||
|
||||
config = HomonculusGloveConfig("/dev/tty.usbmodem2101", side="right", id="right")
|
||||
glove = HomonculusGlove(config)
|
||||
glove.connect()
|
||||
|
||||
display_len = max(len(key) for key in glove.action_features)
|
||||
glove.calibrate()
|
||||
try:
|
||||
while True:
|
||||
start = time.perf_counter()
|
||||
raw_action = glove._read(normalize=False)
|
||||
norm_action = glove._normalize(raw_action)
|
||||
loop_s = time.perf_counter() - start
|
||||
|
||||
print("\n" + "-" * (display_len + 10))
|
||||
print(f"{'NAME':<{display_len}} | {'RAW':>7} | {'NORM':>7}")
|
||||
for joint in glove.joints:
|
||||
print(f"{joint:<{display_len}} | {raw_action[joint]:>7} | {norm_action[joint]:>7.2f}")
|
||||
print(f"\ntime: {loop_s * 1e3:.2f}ms ({1 / loop_s:.0f} Hz)")
|
||||
move_cursor_up(len(norm_action) + 5)
|
||||
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
except Exception:
|
||||
traceback.print_exc()
|
||||
finally:
|
||||
glove.disconnect()
|
||||
8
examples/hope_jr/vis/hand.py
Normal file
8
examples/hope_jr/vis/hand.py
Normal file
@@ -0,0 +1,8 @@
|
||||
from lerobot.common.robots.hope_jr import HopeJrHand, HopeJrHandConfig
|
||||
|
||||
cfg = HopeJrHandConfig("/dev/tty.usbmodem58760433641", id="left", side="left")
|
||||
hand = HopeJrHand(cfg)
|
||||
|
||||
hand.connect()
|
||||
hand.calibrate()
|
||||
hand.disconnect()
|
||||
382
lerobot/common/motors/calibration_gui.py
Normal file
382
lerobot/common/motors/calibration_gui.py
Normal file
@@ -0,0 +1,382 @@
|
||||
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import math
|
||||
import os
|
||||
from dataclasses import dataclass
|
||||
|
||||
os.environ["PYGAME_HIDE_SUPPORT_PROMPT"] = "1"
|
||||
import pygame
|
||||
|
||||
from lerobot.common.motors import MotorCalibration, MotorsBus
|
||||
|
||||
BAR_LEN, BAR_THICKNESS = 450, 8
|
||||
HANDLE_R = 10
|
||||
BRACKET_W, BRACKET_H = 6, 14
|
||||
TRI_W, TRI_H = 12, 14
|
||||
|
||||
BTN_W, BTN_H = 60, 22
|
||||
SAVE_W, SAVE_H = 80, 28
|
||||
LOAD_W = 80
|
||||
DD_W, DD_H = 160, 28
|
||||
|
||||
TOP_GAP = 50
|
||||
PADDING_Y, TOP_OFFSET = 70, 60
|
||||
FONT_SIZE, FPS = 20, 60
|
||||
|
||||
BG_COLOR = (30, 30, 30)
|
||||
BAR_RED, BAR_GREEN = (200, 60, 60), (60, 200, 60)
|
||||
HANDLE_COLOR, TEXT_COLOR = (240, 240, 240), (250, 250, 250)
|
||||
TICK_COLOR = (250, 220, 40)
|
||||
BTN_COLOR, BTN_COLOR_HL = (80, 80, 80), (110, 110, 110)
|
||||
DD_COLOR, DD_COLOR_HL = (70, 70, 70), (100, 100, 100)
|
||||
|
||||
|
||||
def dist(a, b):
|
||||
return math.hypot(a[0] - b[0], a[1] - b[1])
|
||||
|
||||
|
||||
@dataclass
|
||||
class RangeValues:
|
||||
min_v: int
|
||||
pos_v: int
|
||||
max_v: int
|
||||
|
||||
|
||||
class RangeSlider:
|
||||
"""One motor = one slider row"""
|
||||
|
||||
def __init__(self, motor, idx, res, calibration, present, label_pad, base_y):
|
||||
self.motor = motor
|
||||
self.res = res
|
||||
self.x0 = 40 + label_pad
|
||||
self.x1 = self.x0 + BAR_LEN
|
||||
self.y = base_y + idx * PADDING_Y
|
||||
|
||||
self.min_v = calibration.range_min
|
||||
self.max_v = calibration.range_max
|
||||
self.pos_v = max(self.min_v, min(present, self.max_v))
|
||||
|
||||
self.min_x = self._pos_from_val(self.min_v)
|
||||
self.max_x = self._pos_from_val(self.max_v)
|
||||
self.pos_x = self._pos_from_val(self.pos_v)
|
||||
|
||||
self.min_btn = pygame.Rect(self.x0 - BTN_W - 6, self.y - BTN_H // 2, BTN_W, BTN_H)
|
||||
self.max_btn = pygame.Rect(self.x1 + 6, self.y - BTN_H // 2, BTN_W, BTN_H)
|
||||
|
||||
self.drag_min = self.drag_max = self.drag_pos = False
|
||||
self.tick_val = present
|
||||
self.font = pygame.font.Font(None, FONT_SIZE)
|
||||
|
||||
def _val_from_pos(self, x):
|
||||
return round((x - self.x0) / BAR_LEN * self.res)
|
||||
|
||||
def _pos_from_val(self, v):
|
||||
return self.x0 + (v / self.res) * BAR_LEN
|
||||
|
||||
def set_tick(self, v):
|
||||
self.tick_val = max(0, min(v, self.res))
|
||||
|
||||
def _triangle_hit(self, pos):
|
||||
tri_top = self.y - BAR_THICKNESS // 2 - 2
|
||||
return pygame.Rect(self.pos_x - TRI_W // 2, tri_top - TRI_H, TRI_W, TRI_H).collidepoint(pos)
|
||||
|
||||
def handle_event(self, e):
|
||||
if e.type == pygame.MOUSEBUTTONDOWN and e.button == 1:
|
||||
if self.min_btn.collidepoint(e.pos):
|
||||
self.min_x, self.min_v = self.pos_x, self.pos_v
|
||||
return
|
||||
if self.max_btn.collidepoint(e.pos):
|
||||
self.max_x, self.max_v = self.pos_x, self.pos_v
|
||||
return
|
||||
if dist(e.pos, (self.min_x, self.y)) <= HANDLE_R:
|
||||
self.drag_min = True
|
||||
elif dist(e.pos, (self.max_x, self.y)) <= HANDLE_R:
|
||||
self.drag_max = True
|
||||
elif self._triangle_hit(e.pos):
|
||||
self.drag_pos = True
|
||||
|
||||
elif e.type == pygame.MOUSEBUTTONUP and e.button == 1:
|
||||
self.drag_min = self.drag_max = self.drag_pos = False
|
||||
|
||||
elif e.type == pygame.MOUSEMOTION:
|
||||
x = e.pos[0]
|
||||
if self.drag_min:
|
||||
self.min_x = max(self.x0, min(x, self.pos_x))
|
||||
elif self.drag_max:
|
||||
self.max_x = min(self.x1, max(x, self.pos_x))
|
||||
elif self.drag_pos:
|
||||
self.pos_x = max(self.min_x, min(x, self.max_x))
|
||||
|
||||
self.min_v = self._val_from_pos(self.min_x)
|
||||
self.max_v = self._val_from_pos(self.max_x)
|
||||
self.pos_v = self._val_from_pos(self.pos_x)
|
||||
|
||||
def _draw_button(self, surf, rect, text):
|
||||
clr = BTN_COLOR_HL if rect.collidepoint(pygame.mouse.get_pos()) else BTN_COLOR
|
||||
pygame.draw.rect(surf, clr, rect, border_radius=4)
|
||||
t = self.font.render(text, True, TEXT_COLOR)
|
||||
surf.blit(t, (rect.centerx - t.get_width() // 2, rect.centery - t.get_height() // 2))
|
||||
|
||||
def draw(self, surf):
|
||||
# motor name above set-min button (right-aligned)
|
||||
name_surf = self.font.render(self.motor, True, TEXT_COLOR)
|
||||
surf.blit(
|
||||
name_surf,
|
||||
(self.min_btn.right - name_surf.get_width(), self.min_btn.y - name_surf.get_height() - 4),
|
||||
)
|
||||
|
||||
# bar + active section
|
||||
pygame.draw.rect(surf, BAR_RED, (self.x0, self.y - BAR_THICKNESS // 2, BAR_LEN, BAR_THICKNESS))
|
||||
pygame.draw.rect(
|
||||
surf, BAR_GREEN, (self.min_x, self.y - BAR_THICKNESS // 2, self.max_x - self.min_x, BAR_THICKNESS)
|
||||
)
|
||||
|
||||
# tick
|
||||
tick_x = self._pos_from_val(self.tick_val)
|
||||
pygame.draw.line(
|
||||
surf,
|
||||
TICK_COLOR,
|
||||
(tick_x, self.y - BAR_THICKNESS // 2 - 4),
|
||||
(tick_x, self.y + BAR_THICKNESS // 2 + 4),
|
||||
2,
|
||||
)
|
||||
|
||||
# brackets
|
||||
for x, sign in ((self.min_x, +1), (self.max_x, -1)):
|
||||
pygame.draw.line(
|
||||
surf, HANDLE_COLOR, (x, self.y - BRACKET_H // 2), (x, self.y + BRACKET_H // 2), 2
|
||||
)
|
||||
pygame.draw.line(
|
||||
surf,
|
||||
HANDLE_COLOR,
|
||||
(x, self.y - BRACKET_H // 2),
|
||||
(x + sign * BRACKET_W, self.y - BRACKET_H // 2),
|
||||
2,
|
||||
)
|
||||
pygame.draw.line(
|
||||
surf,
|
||||
HANDLE_COLOR,
|
||||
(x, self.y + BRACKET_H // 2),
|
||||
(x + sign * BRACKET_W, self.y + BRACKET_H // 2),
|
||||
2,
|
||||
)
|
||||
|
||||
# triangle ▼
|
||||
tri_top = self.y - BAR_THICKNESS // 2 - 2
|
||||
pygame.draw.polygon(
|
||||
surf,
|
||||
HANDLE_COLOR,
|
||||
[
|
||||
(self.pos_x, tri_top),
|
||||
(self.pos_x - TRI_W // 2, tri_top - TRI_H),
|
||||
(self.pos_x + TRI_W // 2, tri_top - TRI_H),
|
||||
],
|
||||
)
|
||||
|
||||
# numeric labels
|
||||
fh = self.font.get_height()
|
||||
pos_y = tri_top - TRI_H - 4 - fh
|
||||
txts = [
|
||||
(self.min_v, self.min_x, self.y - BRACKET_H // 2 - 4 - fh),
|
||||
(self.max_v, self.max_x, self.y - BRACKET_H // 2 - 4 - fh),
|
||||
(self.pos_v, self.pos_x, pos_y),
|
||||
]
|
||||
for v, x, y in txts:
|
||||
s = self.font.render(str(v), True, TEXT_COLOR)
|
||||
surf.blit(s, (x - s.get_width() // 2, y))
|
||||
|
||||
# buttons
|
||||
self._draw_button(surf, self.min_btn, "set min")
|
||||
self._draw_button(surf, self.max_btn, "set max")
|
||||
|
||||
# external
|
||||
def values(self) -> RangeValues:
|
||||
return RangeValues(self.min_v, self.pos_v, self.max_v)
|
||||
|
||||
|
||||
class RangeFinderGUI:
|
||||
def __init__(self, bus: MotorsBus, groups: dict[str, list[str]] | None = None):
|
||||
self.bus = bus
|
||||
self.groups = groups if groups is not None else {"all": list(bus.motors)}
|
||||
self.group_names = list(groups)
|
||||
self.current_group = self.group_names[0]
|
||||
|
||||
if not bus.is_connected:
|
||||
bus.connect()
|
||||
|
||||
self.calibration = bus.read_calibration()
|
||||
self.res_table = bus.model_resolution_table
|
||||
self.present_cache = {
|
||||
m: bus.read("Present_Position", m, normalize=False) for motors in groups.values() for m in motors
|
||||
}
|
||||
|
||||
pygame.init()
|
||||
self.font = pygame.font.Font(None, FONT_SIZE)
|
||||
|
||||
label_pad = max(self.font.size(m)[0] for ms in groups.values() for m in ms)
|
||||
self.label_pad = label_pad
|
||||
width = 40 + label_pad + BAR_LEN + 6 + BTN_W + 10 + SAVE_W + 10
|
||||
self.controls_bottom = 10 + SAVE_H
|
||||
self.base_y = self.controls_bottom + TOP_GAP
|
||||
height = self.base_y + PADDING_Y * len(groups[self.current_group]) + 40
|
||||
|
||||
self.screen = pygame.display.set_mode((width, height))
|
||||
pygame.display.set_caption("Motors range finder")
|
||||
|
||||
# ui rects
|
||||
self.save_btn = pygame.Rect(width - SAVE_W - 10, 10, SAVE_W, SAVE_H)
|
||||
self.load_btn = pygame.Rect(self.save_btn.left - LOAD_W - 10, 10, LOAD_W, SAVE_H)
|
||||
self.dd_btn = pygame.Rect(width // 2 - DD_W // 2, 10, DD_W, DD_H)
|
||||
self.dd_open = False # dropdown expanded?
|
||||
|
||||
self.clock = pygame.time.Clock()
|
||||
self._build_sliders()
|
||||
self._adjust_height()
|
||||
|
||||
def _adjust_height(self):
|
||||
motors = self.groups[self.current_group]
|
||||
new_h = self.base_y + PADDING_Y * len(motors) + 40
|
||||
if new_h != self.screen.get_height():
|
||||
w = self.screen.get_width()
|
||||
self.screen = pygame.display.set_mode((w, new_h))
|
||||
|
||||
def _build_sliders(self):
|
||||
self.sliders: list[RangeSlider] = []
|
||||
motors = self.groups[self.current_group]
|
||||
for i, m in enumerate(motors):
|
||||
self.sliders.append(
|
||||
RangeSlider(
|
||||
motor=m,
|
||||
idx=i,
|
||||
res=self.res_table[self.bus.motors[m].model] - 1,
|
||||
calibration=self.calibration[m],
|
||||
present=self.present_cache[m],
|
||||
label_pad=self.label_pad,
|
||||
base_y=self.base_y,
|
||||
)
|
||||
)
|
||||
|
||||
def _draw_dropdown(self):
|
||||
# collapsed box
|
||||
hover = self.dd_btn.collidepoint(pygame.mouse.get_pos())
|
||||
pygame.draw.rect(self.screen, DD_COLOR_HL if hover else DD_COLOR, self.dd_btn, border_radius=6)
|
||||
|
||||
txt = self.font.render(self.current_group, True, TEXT_COLOR)
|
||||
self.screen.blit(
|
||||
txt, (self.dd_btn.centerx - txt.get_width() // 2, self.dd_btn.centery - txt.get_height() // 2)
|
||||
)
|
||||
|
||||
tri_w, tri_h = 12, 6
|
||||
cx = self.dd_btn.right - 14
|
||||
cy = self.dd_btn.centery + 1
|
||||
pygame.draw.polygon(
|
||||
self.screen,
|
||||
TEXT_COLOR,
|
||||
[(cx - tri_w // 2, cy - tri_h // 2), (cx + tri_w // 2, cy - tri_h // 2), (cx, cy + tri_h // 2)],
|
||||
)
|
||||
|
||||
if not self.dd_open:
|
||||
return
|
||||
|
||||
# expanded list
|
||||
for i, name in enumerate(self.group_names):
|
||||
item_rect = pygame.Rect(self.dd_btn.left, self.dd_btn.bottom + i * DD_H, DD_W, DD_H)
|
||||
clr = DD_COLOR_HL if item_rect.collidepoint(pygame.mouse.get_pos()) else DD_COLOR
|
||||
pygame.draw.rect(self.screen, clr, item_rect)
|
||||
t = self.font.render(name, True, TEXT_COLOR)
|
||||
self.screen.blit(
|
||||
t, (item_rect.centerx - t.get_width() // 2, item_rect.centery - t.get_height() // 2)
|
||||
)
|
||||
|
||||
def _handle_dropdown_event(self, e):
|
||||
if e.type == pygame.MOUSEBUTTONDOWN and e.button == 1:
|
||||
if self.dd_btn.collidepoint(e.pos):
|
||||
self.dd_open = not self.dd_open
|
||||
return True
|
||||
if self.dd_open:
|
||||
for i, name in enumerate(self.group_names):
|
||||
item_rect = pygame.Rect(self.dd_btn.left, self.dd_btn.bottom + i * DD_H, DD_W, DD_H)
|
||||
if item_rect.collidepoint(e.pos):
|
||||
if name != self.current_group:
|
||||
self.current_group = name
|
||||
self._build_sliders()
|
||||
self._adjust_height()
|
||||
self.dd_open = False
|
||||
return True
|
||||
self.dd_open = False
|
||||
return False
|
||||
|
||||
def _save_current(self):
|
||||
for s in self.sliders:
|
||||
self.calibration[s.motor].range_min = s.min_v
|
||||
self.calibration[s.motor].range_max = s.max_v
|
||||
|
||||
with self.bus.torque_disabled():
|
||||
self.bus.write_calibration(self.calibration)
|
||||
|
||||
def _load_current(self):
|
||||
self.calibration = self.bus.read_calibration()
|
||||
for s in self.sliders:
|
||||
s.min_v = self.calibration[s.motor].range_min
|
||||
s.max_v = self.calibration[s.motor].range_max
|
||||
s.min_x = s._pos_from_val(s.min_v)
|
||||
s.max_x = s._pos_from_val(s.max_v)
|
||||
|
||||
def run(self) -> dict[str, MotorCalibration]:
|
||||
while True:
|
||||
for e in pygame.event.get():
|
||||
if e.type == pygame.QUIT:
|
||||
pygame.quit()
|
||||
return self.calibration
|
||||
|
||||
if self._handle_dropdown_event(e):
|
||||
continue
|
||||
|
||||
if e.type == pygame.MOUSEBUTTONDOWN and e.button == 1:
|
||||
if self.save_btn.collidepoint(e.pos):
|
||||
self._save_current()
|
||||
elif self.load_btn.collidepoint(e.pos):
|
||||
self._load_current()
|
||||
|
||||
for s in self.sliders:
|
||||
s.handle_event(e)
|
||||
|
||||
# live goal write while dragging
|
||||
for s in self.sliders:
|
||||
if s.drag_pos:
|
||||
self.bus.write("Goal_Position", s.motor, s.pos_v, normalize=False)
|
||||
|
||||
# tick update
|
||||
for s in self.sliders:
|
||||
pos = self.bus.read("Present_Position", s.motor, normalize=False)
|
||||
s.set_tick(pos)
|
||||
self.present_cache[s.motor] = pos
|
||||
|
||||
# ─ drawing
|
||||
self.screen.fill(BG_COLOR)
|
||||
for s in self.sliders:
|
||||
s.draw(self.screen)
|
||||
|
||||
self._draw_dropdown()
|
||||
|
||||
# load / save buttons
|
||||
for rect, text in ((self.load_btn, "LOAD"), (self.save_btn, "SAVE")):
|
||||
clr = BTN_COLOR_HL if rect.collidepoint(pygame.mouse.get_pos()) else BTN_COLOR
|
||||
pygame.draw.rect(self.screen, clr, rect, border_radius=6)
|
||||
t = self.font.render(text, True, TEXT_COLOR)
|
||||
self.screen.blit(t, (rect.centerx - t.get_width() // 2, rect.centery - t.get_height() // 2))
|
||||
|
||||
pygame.display.flip()
|
||||
self.clock.tick(FPS)
|
||||
@@ -191,13 +191,14 @@ class DynamixelMotorsBus(MotorsBus):
|
||||
|
||||
return calibration
|
||||
|
||||
def write_calibration(self, calibration_dict: dict[str, MotorCalibration]) -> None:
|
||||
def write_calibration(self, calibration_dict: dict[str, MotorCalibration], cache: bool = True) -> None:
|
||||
for motor, calibration in calibration_dict.items():
|
||||
self.write("Homing_Offset", motor, calibration.homing_offset)
|
||||
self.write("Min_Position_Limit", motor, calibration.range_min)
|
||||
self.write("Max_Position_Limit", motor, calibration.range_max)
|
||||
|
||||
self.calibration = calibration_dict
|
||||
if cache:
|
||||
self.calibration = calibration_dict
|
||||
|
||||
def disable_torque(self, motors: str | list[str] | None = None, num_retry: int = 0) -> None:
|
||||
for motor in self._get_motors_list(motors):
|
||||
|
||||
@@ -225,8 +225,8 @@ class FeetechMotorsBus(MotorsBus):
|
||||
# the 'Return_Delay_Time' address). We ensure this is reduced to the minimum of 2µs (value of 0).
|
||||
self.write("Return_Delay_Time", motor, 0)
|
||||
# Set 'Maximum_Acceleration' to 254 to speedup acceleration and deceleration of the motors.
|
||||
# Note: this address is not in the official STS3215 Memory Table
|
||||
self.write("Maximum_Acceleration", motor, 254)
|
||||
if self.protocol_version == 0:
|
||||
self.write("Maximum_Acceleration", motor, 254)
|
||||
self.write("Acceleration", motor, 254)
|
||||
|
||||
@property
|
||||
@@ -271,14 +271,15 @@ class FeetechMotorsBus(MotorsBus):
|
||||
|
||||
return calibration
|
||||
|
||||
def write_calibration(self, calibration_dict: dict[str, MotorCalibration]) -> None:
|
||||
def write_calibration(self, calibration_dict: dict[str, MotorCalibration], cache: bool = True) -> None:
|
||||
for motor, calibration in calibration_dict.items():
|
||||
if self.protocol_version == 0:
|
||||
self.write("Homing_Offset", motor, calibration.homing_offset)
|
||||
self.write("Min_Position_Limit", motor, calibration.range_min)
|
||||
self.write("Max_Position_Limit", motor, calibration.range_max)
|
||||
|
||||
self.calibration = calibration_dict
|
||||
if cache:
|
||||
self.calibration = calibration_dict
|
||||
|
||||
def _get_half_turn_homings(self, positions: dict[NameOrID, Value]) -> dict[NameOrID, Value]:
|
||||
"""
|
||||
|
||||
@@ -189,7 +189,7 @@ MODEL_RESOLUTION = {
|
||||
"scs_series": 1024,
|
||||
"sts3215": 4096,
|
||||
"sts3250": 4096,
|
||||
"sm8512bl": 65536,
|
||||
"sm8512bl": 4096, # 65536 (?)
|
||||
"scs0009": 1024,
|
||||
}
|
||||
|
||||
|
||||
@@ -588,7 +588,7 @@ class MotorsBus(abc.ABC):
|
||||
pass
|
||||
|
||||
@contextmanager
|
||||
def torque_disabled(self):
|
||||
def torque_disabled(self, motors: int | str | list[str] | None = None):
|
||||
"""Context-manager that guarantees torque is re-enabled.
|
||||
|
||||
This helper is useful to temporarily disable torque when configuring motors.
|
||||
@@ -598,11 +598,11 @@ class MotorsBus(abc.ABC):
|
||||
... # Safe operations here
|
||||
... pass
|
||||
"""
|
||||
self.disable_torque()
|
||||
self.disable_torque(motors)
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
self.enable_torque()
|
||||
self.enable_torque(motors)
|
||||
|
||||
def set_timeout(self, timeout_ms: int | None = None):
|
||||
"""Change the packet timeout used by the SDK.
|
||||
@@ -655,12 +655,13 @@ class MotorsBus(abc.ABC):
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def write_calibration(self, calibration_dict: dict[str, MotorCalibration]) -> None:
|
||||
"""Write calibration parameters to the motors and cache them.
|
||||
def write_calibration(self, calibration_dict: dict[str, MotorCalibration], cache: bool = True) -> None:
|
||||
"""Write calibration parameters to the motors and optionally cache them.
|
||||
|
||||
Args:
|
||||
calibration_dict (dict[str, MotorCalibration]): Calibration obtained from
|
||||
:pymeth:`read_calibration` or crafted by the user.
|
||||
cache (bool, optional): Save the calibration to :pyattr:`calibration`. Defaults to True.
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
4
lerobot/common/robots/hope_jr/__init__.py
Normal file
4
lerobot/common/robots/hope_jr/__init__.py
Normal file
@@ -0,0 +1,4 @@
|
||||
from .config_hope_jr import HopeJrArmConfig, HopeJrHandConfig
|
||||
from .hope_jr_arm import HopeJrArm
|
||||
from .hope_jr_hand import HopeJrHand
|
||||
from .joints_translation import homonculus_glove_to_hope_jr_hand
|
||||
52
lerobot/common/robots/hope_jr/config_hope_jr.py
Normal file
52
lerobot/common/robots/hope_jr/config_hope_jr.py
Normal file
@@ -0,0 +1,52 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from dataclasses import dataclass, field
|
||||
|
||||
from lerobot.common.cameras import CameraConfig
|
||||
|
||||
from ..config import RobotConfig
|
||||
|
||||
|
||||
@RobotConfig.register_subclass("hope_jr_hand")
|
||||
@dataclass
|
||||
class HopeJrHandConfig(RobotConfig):
|
||||
port: str # Port to connect to the hand
|
||||
side: str # "left" / "right"
|
||||
|
||||
disable_torque_on_disconnect: bool = True
|
||||
|
||||
cameras: dict[str, CameraConfig] = field(default_factory=dict)
|
||||
|
||||
def __post_init__(self):
|
||||
super().__post_init__()
|
||||
if self.side not in ["right", "left"]:
|
||||
raise ValueError(self.side)
|
||||
|
||||
|
||||
@RobotConfig.register_subclass("hope_jr_arm")
|
||||
@dataclass
|
||||
class HopeJrArmConfig(RobotConfig):
|
||||
port: str # Port to connect to the hand
|
||||
|
||||
disable_torque_on_disconnect: bool = True
|
||||
|
||||
# `max_relative_target` limits the magnitude of the relative positional target vector for safety purposes.
|
||||
# Set this to a positive scalar to have the same value for all motors, or a list that is the same length as
|
||||
# the number of motors in your follower arms.
|
||||
max_relative_target: int | None = None
|
||||
|
||||
cameras: dict[str, CameraConfig] = field(default_factory=dict)
|
||||
216
lerobot/common/robots/hope_jr/hope_jr_arm.py
Normal file
216
lerobot/common/robots/hope_jr/hope_jr_arm.py
Normal file
@@ -0,0 +1,216 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
import time
|
||||
from functools import cached_property
|
||||
from typing import Any
|
||||
|
||||
from lerobot.common.cameras.utils import make_cameras_from_configs
|
||||
from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
|
||||
from lerobot.common.motors import Motor, MotorCalibration, MotorNormMode
|
||||
from lerobot.common.motors.feetech import (
|
||||
FeetechMotorsBus,
|
||||
OperatingMode,
|
||||
)
|
||||
|
||||
from ..robot import Robot
|
||||
from ..utils import ensure_safe_goal_position
|
||||
from .config_hope_jr import HopeJrArmConfig
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class HopeJrArm(Robot):
|
||||
config_class = HopeJrArmConfig
|
||||
name = "hope_jr_arm"
|
||||
|
||||
def __init__(self, config: HopeJrArmConfig):
|
||||
super().__init__(config)
|
||||
self.config = config
|
||||
self.bus = FeetechMotorsBus(
|
||||
port=self.config.port,
|
||||
motors={
|
||||
"shoulder_pitch": Motor(2, "sm8512bl", MotorNormMode.RANGE_M100_100),
|
||||
"shoulder_yaw": Motor(3, "sts3250", MotorNormMode.RANGE_M100_100),
|
||||
"shoulder_roll": Motor(4, "sts3250", MotorNormMode.RANGE_M100_100),
|
||||
"elbow_flex": Motor(5, "sts3250", MotorNormMode.RANGE_M100_100),
|
||||
"wrist_roll": Motor(6, "sts3250", MotorNormMode.RANGE_M100_100),
|
||||
"wrist_yaw": Motor(7, "sts3250", MotorNormMode.RANGE_M100_100),
|
||||
"wrist_pitch": Motor(8, "sts3250", MotorNormMode.RANGE_M100_100),
|
||||
},
|
||||
# motors={
|
||||
# "shoulder_pitch": Motor(1, "sm8512bl", MotorNormMode.RANGE_M100_100),
|
||||
# "shoulder_yaw": Motor(2, "sts3250", MotorNormMode.RANGE_M100_100),
|
||||
# "shoulder_roll": Motor(3, "sts3250", MotorNormMode.RANGE_M100_100),
|
||||
# "elbow_flex": Motor(4, "sts3250", MotorNormMode.RANGE_M100_100),
|
||||
# "wrist_roll": Motor(5, "sts3250", MotorNormMode.RANGE_M100_100),
|
||||
# "wrist_yaw": Motor(6, "sts3250", MotorNormMode.RANGE_M100_100),
|
||||
# "wrist_pitch": Motor(7, "sts3250", MotorNormMode.RANGE_M100_100),
|
||||
# },
|
||||
calibration=self.calibration,
|
||||
)
|
||||
self.cameras = make_cameras_from_configs(config.cameras)
|
||||
|
||||
# HACK
|
||||
self.shoulder_pitch = "shoulder_pitch"
|
||||
self.other_motors = [m for m in self.bus.motors if m != "shoulder_pitch"]
|
||||
|
||||
@property
|
||||
def _motors_ft(self) -> dict[str, type]:
|
||||
return {f"{motor}.pos": float for motor in self.bus.motors}
|
||||
|
||||
@property
|
||||
def _cameras_ft(self) -> dict[str, tuple]:
|
||||
return {
|
||||
cam: (self.config.cameras[cam].height, self.config.cameras[cam].width, 3) for cam in self.cameras
|
||||
}
|
||||
|
||||
@cached_property
|
||||
def observation_features(self) -> dict[str, type | tuple]:
|
||||
return {**self._motors_ft, **self._cameras_ft}
|
||||
|
||||
@cached_property
|
||||
def action_features(self) -> dict[str, type]:
|
||||
return self._motors_ft
|
||||
|
||||
@property
|
||||
def is_connected(self) -> bool:
|
||||
# TODO(aliberts): add cam.is_connected for cam in self.cameras
|
||||
return self.bus.is_connected
|
||||
|
||||
def connect(self) -> None:
|
||||
"""
|
||||
We assume that at connection time, arm is in a rest position,
|
||||
and torque can be safely disabled to run calibration.
|
||||
"""
|
||||
if self.is_connected:
|
||||
raise DeviceAlreadyConnectedError(f"{self} already connected")
|
||||
|
||||
self.bus.connect(handshake=False)
|
||||
if not self.is_calibrated:
|
||||
self.calibrate()
|
||||
|
||||
# Connect the cameras
|
||||
for cam in self.cameras.values():
|
||||
cam.connect()
|
||||
|
||||
self.configure()
|
||||
logger.info(f"{self} connected.")
|
||||
|
||||
@property
|
||||
def is_calibrated(self) -> bool:
|
||||
return self.bus.is_calibrated
|
||||
|
||||
def calibrate(self) -> None:
|
||||
logger.info(f"\nRunning calibration of {self}")
|
||||
self.bus.disable_torque()
|
||||
for motor in self.bus.motors:
|
||||
self.bus.write("Operating_Mode", motor, OperatingMode.POSITION.value)
|
||||
|
||||
unknown_range_motors = self.other_motors
|
||||
|
||||
input("Move arm to the middle of its range of motion and press ENTER....")
|
||||
homing_offsets_full_turn = self.bus.set_half_turn_homings("shoulder_pitch")
|
||||
homing_offsets_unknown_range = self.bus.set_half_turn_homings(unknown_range_motors)
|
||||
homing_offsets = {**homing_offsets_full_turn, **homing_offsets_unknown_range}
|
||||
|
||||
logger.info(
|
||||
"Move all joints except 'shoulder_pitch' sequentially through their "
|
||||
"entire ranges of motion.\nRecording positions. Press ENTER to stop..."
|
||||
)
|
||||
range_mins, range_maxes = self.bus.record_ranges_of_motion(unknown_range_motors)
|
||||
range_mins["shoulder_pitch"] = 1024
|
||||
range_maxes["shoulder_pitch"] = 3071
|
||||
|
||||
range_mins["wrist_roll"] = 1500
|
||||
range_maxes["wrist_roll"] = 2500
|
||||
|
||||
self.calibration = {}
|
||||
for motor, m in self.bus.motors.items():
|
||||
self.calibration[motor] = MotorCalibration(
|
||||
id=m.id,
|
||||
drive_mode=0,
|
||||
homing_offset=homing_offsets[motor],
|
||||
range_min=range_mins[motor],
|
||||
range_max=range_maxes[motor],
|
||||
)
|
||||
|
||||
self.bus.write_calibration(self.calibration)
|
||||
self._save_calibration()
|
||||
print("Calibration saved to", self.calibration_fpath)
|
||||
|
||||
def configure(self) -> None:
|
||||
with self.bus.torque_disabled():
|
||||
for motor in self.bus.motors:
|
||||
self.bus.write("Return_Delay_Time", motor, 0)
|
||||
self.bus.write("Maximum_Acceleration", motor, 50)
|
||||
self.bus.write("Acceleration", motor, 50)
|
||||
|
||||
def setup_motors(self) -> None:
|
||||
for motor in reversed(self.bus.motors):
|
||||
input(f"Connect the controller board to the '{motor}' motor only and press enter.")
|
||||
self.bus.setup_motor(motor)
|
||||
print(f"'{motor}' motor id set to {self.bus.motors[motor].id}")
|
||||
|
||||
def get_observation(self) -> dict[str, Any]:
|
||||
if not self.is_connected:
|
||||
raise DeviceNotConnectedError(f"{self} is not connected.")
|
||||
|
||||
obs_dict = {}
|
||||
|
||||
# Read arm position
|
||||
start = time.perf_counter()
|
||||
obs_dict[self.shoulder_pitch] = self.bus.read("Present_Position", self.shoulder_pitch)
|
||||
obs_dict.update(self.bus.sync_read("Present_Position", self.other_motors))
|
||||
obs_dict = {f"{motor}.pos": val for motor, val in obs_dict.items()}
|
||||
dt_ms = (time.perf_counter() - start) * 1e3
|
||||
logger.debug(f"{self} read state: {dt_ms:.1f}ms")
|
||||
|
||||
# Capture images from cameras
|
||||
for cam_key, cam in self.cameras.items():
|
||||
start = time.perf_counter()
|
||||
obs_dict[cam_key] = cam.async_read()
|
||||
dt_ms = (time.perf_counter() - start) * 1e3
|
||||
logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms")
|
||||
|
||||
return obs_dict
|
||||
|
||||
def send_action(self, action: dict[str, Any]) -> dict[str, Any]:
|
||||
if not self.is_connected:
|
||||
raise DeviceNotConnectedError(f"{self} is not connected.")
|
||||
|
||||
goal_pos = {key.removesuffix(".pos"): val for key, val in action.items() if key.endswith(".pos")}
|
||||
|
||||
# Cap goal position when too far away from present position.
|
||||
# /!\ Slower fps expected due to reading from the follower.
|
||||
if self.config.max_relative_target is not None:
|
||||
present_pos = self.bus.sync_read("Present_Position")
|
||||
goal_present_pos = {key: (g_pos, present_pos[key]) for key, g_pos in goal_pos.items()}
|
||||
goal_pos = ensure_safe_goal_position(goal_present_pos, self.config.max_relative_target)
|
||||
|
||||
self.bus.sync_write("Goal_Position", goal_pos)
|
||||
return {f"{motor}.pos": val for motor, val in goal_pos.items()}
|
||||
|
||||
def disconnect(self):
|
||||
if not self.is_connected:
|
||||
raise DeviceNotConnectedError(f"{self} is not connected.")
|
||||
|
||||
self.bus.disconnect(self.config.disable_torque_on_disconnect)
|
||||
for cam in self.cameras.values():
|
||||
cam.disconnect()
|
||||
|
||||
logger.info(f"{self} disconnected.")
|
||||
204
lerobot/common/robots/hope_jr/hope_jr_hand.py
Normal file
204
lerobot/common/robots/hope_jr/hope_jr_hand.py
Normal file
@@ -0,0 +1,204 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
import time
|
||||
from functools import cached_property
|
||||
from typing import Any
|
||||
|
||||
from lerobot.common.cameras.utils import make_cameras_from_configs
|
||||
from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
|
||||
from lerobot.common.motors import Motor, MotorNormMode
|
||||
from lerobot.common.motors.calibration_gui import RangeFinderGUI
|
||||
from lerobot.common.motors.feetech import (
|
||||
FeetechMotorsBus,
|
||||
)
|
||||
|
||||
from ..robot import Robot
|
||||
from .config_hope_jr import HopeJrHandConfig
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
RIGHT_HAND_INVERSIONS = [
|
||||
"thumb_mcp",
|
||||
"thumb_dip",
|
||||
"index_ulnar_flexor",
|
||||
"middle_ulnar_flexor",
|
||||
"ring_ulnar_flexor",
|
||||
"ring_pip_dip",
|
||||
"pinky_ulnar_flexor",
|
||||
"pinky_pip_dip",
|
||||
]
|
||||
|
||||
LEFT_HAND_INVERSIONS = [
|
||||
"thumb_cmc",
|
||||
"thumb_mcp",
|
||||
"thumb_dip",
|
||||
"index_radial_flexor",
|
||||
"index_pip_dip",
|
||||
"middle_radial_flexor",
|
||||
"middle_pip_dip",
|
||||
"ring_radial_flexor",
|
||||
"ring_pip_dip",
|
||||
"pinky_radial_flexor",
|
||||
# "pinky_pip_dip",
|
||||
]
|
||||
|
||||
|
||||
class HopeJrHand(Robot):
|
||||
config_class = HopeJrHandConfig
|
||||
name = "hope_jr_hand"
|
||||
|
||||
def __init__(self, config: HopeJrHandConfig):
|
||||
super().__init__(config)
|
||||
self.config = config
|
||||
self.bus = FeetechMotorsBus(
|
||||
port=self.config.port,
|
||||
motors={
|
||||
# Thumb
|
||||
"thumb_cmc": Motor(1, "scs0009", MotorNormMode.RANGE_0_100),
|
||||
"thumb_mcp": Motor(2, "scs0009", MotorNormMode.RANGE_0_100),
|
||||
"thumb_pip": Motor(3, "scs0009", MotorNormMode.RANGE_0_100),
|
||||
"thumb_dip": Motor(4, "scs0009", MotorNormMode.RANGE_0_100),
|
||||
# Index
|
||||
"index_radial_flexor": Motor(5, "scs0009", MotorNormMode.RANGE_0_100),
|
||||
"index_ulnar_flexor": Motor(6, "scs0009", MotorNormMode.RANGE_0_100),
|
||||
"index_pip_dip": Motor(7, "scs0009", MotorNormMode.RANGE_0_100),
|
||||
# Middle
|
||||
"middle_radial_flexor": Motor(8, "scs0009", MotorNormMode.RANGE_0_100),
|
||||
"middle_ulnar_flexor": Motor(9, "scs0009", MotorNormMode.RANGE_0_100),
|
||||
"middle_pip_dip": Motor(10, "scs0009", MotorNormMode.RANGE_0_100),
|
||||
# Ring
|
||||
"ring_radial_flexor": Motor(11, "scs0009", MotorNormMode.RANGE_0_100),
|
||||
"ring_ulnar_flexor": Motor(12, "scs0009", MotorNormMode.RANGE_0_100),
|
||||
"ring_pip_dip": Motor(13, "scs0009", MotorNormMode.RANGE_0_100),
|
||||
# Pinky
|
||||
"pinky_radial_flexor": Motor(14, "scs0009", MotorNormMode.RANGE_0_100),
|
||||
"pinky_ulnar_flexor": Motor(15, "scs0009", MotorNormMode.RANGE_0_100),
|
||||
"pinky_pip_dip": Motor(16, "scs0009", MotorNormMode.RANGE_0_100),
|
||||
},
|
||||
calibration=self.calibration,
|
||||
protocol_version=1,
|
||||
)
|
||||
self.cameras = make_cameras_from_configs(config.cameras)
|
||||
self.inverted_motors = RIGHT_HAND_INVERSIONS if config.side == "right" else LEFT_HAND_INVERSIONS
|
||||
|
||||
@property
|
||||
def _motors_ft(self) -> dict[str, type]:
|
||||
return {f"{motor}.pos": float for motor in self.bus.motors}
|
||||
|
||||
@property
|
||||
def _cameras_ft(self) -> dict[str, tuple]:
|
||||
return {
|
||||
cam: (self.config.cameras[cam].height, self.config.cameras[cam].width, 3) for cam in self.cameras
|
||||
}
|
||||
|
||||
@cached_property
|
||||
def observation_features(self) -> dict[str, type | tuple]:
|
||||
return {**self._motors_ft, **self._cameras_ft}
|
||||
|
||||
@cached_property
|
||||
def action_features(self) -> dict[str, type]:
|
||||
return self._motors_ft
|
||||
|
||||
@property
|
||||
def is_connected(self) -> bool:
|
||||
# TODO(aliberts): add cam.is_connected for cam in self.cameras
|
||||
return self.bus.is_connected
|
||||
|
||||
def connect(self) -> None:
|
||||
if self.is_connected:
|
||||
raise DeviceAlreadyConnectedError(f"{self} already connected")
|
||||
|
||||
self.bus.connect()
|
||||
if not self.is_calibrated:
|
||||
self.calibrate()
|
||||
|
||||
# Connect the cameras
|
||||
for cam in self.cameras.values():
|
||||
cam.connect()
|
||||
|
||||
self.configure()
|
||||
logger.info(f"{self} connected.")
|
||||
|
||||
@property
|
||||
def is_calibrated(self) -> bool:
|
||||
return self.bus.is_calibrated
|
||||
|
||||
def calibrate(self) -> None:
|
||||
fingers = {}
|
||||
for finger in ["thumb", "index", "middle", "ring", "pinky"]:
|
||||
fingers[finger] = [motor for motor in self.bus.motors if motor.startswith(finger)]
|
||||
|
||||
self.calibration = RangeFinderGUI(self.bus, fingers).run()
|
||||
for motor in self.inverted_motors:
|
||||
self.calibration[motor].drive_mode = 1
|
||||
self._save_calibration()
|
||||
print("Calibration saved to", self.calibration_fpath)
|
||||
|
||||
def configure(self) -> None:
|
||||
with self.bus.torque_disabled():
|
||||
self.bus.configure_motors()
|
||||
# for motor in self.bus.motors:
|
||||
# self.bus.write("Return_Delay_Time", motor, 0)
|
||||
# self.bus.write("Acceleration_2", motor, 50)
|
||||
# self.bus.write("Acceleration", motor, 50)
|
||||
|
||||
def setup_motors(self) -> None:
|
||||
for motor in self.bus.motors:
|
||||
input(f"Connect the controller board to the '{motor}' motor only and press enter.")
|
||||
self.bus.setup_motor(motor)
|
||||
print(f"'{motor}' motor id set to {self.bus.motors[motor].id}")
|
||||
|
||||
def get_observation(self) -> dict[str, Any]:
|
||||
if not self.is_connected:
|
||||
raise DeviceNotConnectedError(f"{self} is not connected.")
|
||||
|
||||
obs_dict = {}
|
||||
|
||||
# Read hand position
|
||||
start = time.perf_counter()
|
||||
for motor in self.bus.motors:
|
||||
obs_dict[f"{motor}.pos"] = self.bus.read("Present_Position", motor, normalize=False)
|
||||
dt_ms = (time.perf_counter() - start) * 1e3
|
||||
logger.debug(f"{self} read state: {dt_ms:.1f}ms")
|
||||
|
||||
# Capture images from cameras
|
||||
for cam_key, cam in self.cameras.items():
|
||||
start = time.perf_counter()
|
||||
obs_dict[cam_key] = cam.async_read()
|
||||
dt_ms = (time.perf_counter() - start) * 1e3
|
||||
logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms")
|
||||
|
||||
return obs_dict
|
||||
|
||||
def send_action(self, action: dict[str, Any]) -> dict[str, Any]:
|
||||
if not self.is_connected:
|
||||
raise DeviceNotConnectedError(f"{self} is not connected.")
|
||||
|
||||
goal_pos = {key.removesuffix(".pos"): val for key, val in action.items() if key.endswith(".pos")}
|
||||
self.bus.sync_write("Goal_Position", goal_pos)
|
||||
return action
|
||||
|
||||
def disconnect(self):
|
||||
if not self.is_connected:
|
||||
raise DeviceNotConnectedError(f"{self} is not connected.")
|
||||
|
||||
self.bus.disconnect(self.config.disable_torque_on_disconnect)
|
||||
for cam in self.cameras.values():
|
||||
cam.disconnect()
|
||||
|
||||
logger.info(f"{self} disconnected.")
|
||||
53
lerobot/common/robots/hope_jr/joints_translation.py
Normal file
53
lerobot/common/robots/hope_jr/joints_translation.py
Normal file
@@ -0,0 +1,53 @@
|
||||
INDEX_SPLAY = 0.3
|
||||
MIDDLE_SPLAY = 0.3
|
||||
RING_SPLAY = 0.3
|
||||
PINKY_SPLAY = 0.5
|
||||
|
||||
|
||||
def get_ulnar_flexion(flexion: float, abduction: float, splay: float):
|
||||
# ulnar_component = max(-abduction, 0)
|
||||
ulnar_component = -abduction
|
||||
return ulnar_component * splay + flexion * (1 - splay)
|
||||
|
||||
|
||||
def get_radial_flexion(flexion: float, abduction: float, splay: float):
|
||||
radial_component = abduction
|
||||
# radial_component = max(abduction, 0)
|
||||
return radial_component * splay + flexion * (1 - splay)
|
||||
|
||||
|
||||
def homonculus_glove_to_hope_jr_hand(glove_action: dict[str, float]) -> dict[str, float]:
|
||||
return {
|
||||
"thumb_cmc.pos": glove_action["thumb_cmc.pos"],
|
||||
"thumb_mcp.pos": glove_action["thumb_mcp.pos"],
|
||||
"thumb_pip.pos": glove_action["thumb_pip.pos"],
|
||||
"thumb_dip.pos": glove_action["thumb_dip.pos"],
|
||||
"index_radial_flexor.pos": get_radial_flexion(
|
||||
glove_action["index_mcp_flexion.pos"], glove_action["index_mcp_abduction.pos"], INDEX_SPLAY
|
||||
),
|
||||
"index_ulnar_flexor.pos": get_ulnar_flexion(
|
||||
glove_action["index_mcp_flexion.pos"], glove_action["index_mcp_abduction.pos"], INDEX_SPLAY
|
||||
),
|
||||
"index_pip_dip.pos": glove_action["index_dip.pos"],
|
||||
"middle_radial_flexor.pos": get_radial_flexion(
|
||||
glove_action["middle_mcp_flexion.pos"], glove_action["middle_mcp_abduction.pos"], MIDDLE_SPLAY
|
||||
),
|
||||
"middle_ulnar_flexor.pos": get_ulnar_flexion(
|
||||
glove_action["middle_mcp_flexion.pos"], glove_action["middle_mcp_abduction.pos"], MIDDLE_SPLAY
|
||||
),
|
||||
"middle_pip_dip.pos": glove_action["middle_dip.pos"],
|
||||
"ring_radial_flexor.pos": get_radial_flexion(
|
||||
glove_action["ring_mcp_flexion.pos"], glove_action["ring_mcp_abduction.pos"], RING_SPLAY
|
||||
),
|
||||
"ring_ulnar_flexor.pos": get_ulnar_flexion(
|
||||
glove_action["ring_mcp_flexion.pos"], glove_action["ring_mcp_abduction.pos"], RING_SPLAY
|
||||
),
|
||||
"ring_pip_dip.pos": glove_action["ring_dip.pos"],
|
||||
"pinky_radial_flexor.pos": get_radial_flexion(
|
||||
glove_action["pinky_mcp_flexion.pos"], glove_action["pinky_mcp_abduction.pos"], PINKY_SPLAY
|
||||
),
|
||||
"pinky_ulnar_flexor.pos": get_ulnar_flexion(
|
||||
glove_action["pinky_mcp_flexion.pos"], glove_action["pinky_mcp_abduction.pos"], PINKY_SPLAY
|
||||
),
|
||||
"pinky_pip_dip.pos": glove_action["pinky_dip.pos"],
|
||||
}
|
||||
3
lerobot/common/teleoperators/homonculus/__init__.py
Normal file
3
lerobot/common/teleoperators/homonculus/__init__.py
Normal file
@@ -0,0 +1,3 @@
|
||||
from .config_homonculus import HomonculusArmConfig, HomonculusGloveConfig
|
||||
from .homonculus_arm import HomonculusArm
|
||||
from .homonculus_glove import HomonculusGlove
|
||||
38
lerobot/common/teleoperators/homonculus/config_homonculus.py
Normal file
38
lerobot/common/teleoperators/homonculus/config_homonculus.py
Normal file
@@ -0,0 +1,38 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from dataclasses import dataclass
|
||||
|
||||
from ..config import TeleoperatorConfig
|
||||
|
||||
|
||||
@TeleoperatorConfig.register_subclass("homonculus_glove")
|
||||
@dataclass
|
||||
class HomonculusGloveConfig(TeleoperatorConfig):
|
||||
port: str # Port to connect to the glove
|
||||
side: str # "left" / "right"
|
||||
baud_rate: int = 115_200
|
||||
|
||||
def __post_init__(self):
|
||||
if self.side not in ["right", "left"]:
|
||||
raise ValueError(self.side)
|
||||
|
||||
|
||||
@TeleoperatorConfig.register_subclass("homonculus_arm")
|
||||
@dataclass
|
||||
class HomonculusArmConfig(TeleoperatorConfig):
|
||||
port: str # Port to connect to the arm
|
||||
baud_rate: int = 115_200
|
||||
257
lerobot/common/teleoperators/homonculus/homonculus_arm.py
Normal file
257
lerobot/common/teleoperators/homonculus/homonculus_arm.py
Normal file
@@ -0,0 +1,257 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
import threading
|
||||
from pprint import pformat
|
||||
|
||||
import serial
|
||||
|
||||
from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
|
||||
from lerobot.common.motors.motors_bus import MotorCalibration, MotorNormMode
|
||||
from lerobot.common.utils.utils import enter_pressed, move_cursor_up
|
||||
|
||||
from ..teleoperator import Teleoperator
|
||||
from .config_homonculus import HomonculusArmConfig
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class HomonculusArm(Teleoperator):
|
||||
"""
|
||||
Homonculus Arm designed by Hugging Face.
|
||||
"""
|
||||
|
||||
config_class = HomonculusArmConfig
|
||||
name = "homonculus_arm"
|
||||
|
||||
def __init__(self, config: HomonculusArmConfig):
|
||||
super().__init__(config)
|
||||
self.config = config
|
||||
self.serial = serial.Serial(config.port, config.baud_rate, timeout=1)
|
||||
|
||||
self.joints = {
|
||||
"shoulder_pitch": MotorNormMode.RANGE_M100_100,
|
||||
"shoulder_yaw": MotorNormMode.RANGE_M100_100,
|
||||
"shoulder_roll": MotorNormMode.RANGE_M100_100,
|
||||
"elbow_flex": MotorNormMode.RANGE_M100_100,
|
||||
"wrist_roll": MotorNormMode.RANGE_M100_100,
|
||||
"wrist_yaw": MotorNormMode.RANGE_M100_100,
|
||||
"wrist_pitch": MotorNormMode.RANGE_M100_100,
|
||||
}
|
||||
|
||||
self._state: dict[str, float] | None = None
|
||||
self.new_state_event = threading.Event()
|
||||
self.thread = threading.Thread(target=self._read_loop, daemon=True, name=f"{self} _read_loop")
|
||||
self.lock = threading.Lock()
|
||||
|
||||
@property
|
||||
def action_features(self) -> dict:
|
||||
return {f"{joint}.pos": float for joint in self.joints}
|
||||
|
||||
@property
|
||||
def feedback_features(self) -> dict:
|
||||
return {}
|
||||
|
||||
@property
|
||||
def is_connected(self) -> bool:
|
||||
return self.thread.is_alive() and self.serial.is_open
|
||||
|
||||
def connect(self) -> None:
|
||||
if self.is_connected:
|
||||
raise DeviceAlreadyConnectedError(f"{self} already connected")
|
||||
|
||||
if not self.serial.is_open:
|
||||
self.serial.open()
|
||||
self.thread.start()
|
||||
|
||||
# wait for the thread to ramp up & 1st state to be ready
|
||||
if not self.new_state_event.wait(timeout=2):
|
||||
raise TimeoutError(f"{self}: Timed out waiting for state after 2s.")
|
||||
|
||||
if not self.is_calibrated:
|
||||
self.calibrate()
|
||||
|
||||
logger.info(f"{self} connected.")
|
||||
|
||||
@property
|
||||
def is_calibrated(self) -> bool:
|
||||
return self.calibration_fpath.is_file()
|
||||
|
||||
def calibrate(self) -> None:
|
||||
print(
|
||||
"\nMove all joints through their entire range of motion."
|
||||
"\nRecording positions. Press ENTER to stop..."
|
||||
)
|
||||
range_mins, range_maxes = self._record_ranges_of_motion()
|
||||
|
||||
self.calibration = {}
|
||||
for id_, joint in enumerate(self.joints):
|
||||
self.calibration[joint] = MotorCalibration(
|
||||
id=id_,
|
||||
drive_mode=0,
|
||||
homing_offset=0,
|
||||
range_min=range_mins[joint],
|
||||
range_max=range_maxes[joint],
|
||||
)
|
||||
|
||||
self._save_calibration()
|
||||
print("Calibration saved to", self.calibration_fpath)
|
||||
|
||||
def _record_ranges_of_motion(
|
||||
self, joints: list[str] | None = None, display_values: bool = True
|
||||
) -> tuple[dict[str, int], dict[str, int]]:
|
||||
"""Interactively record the min/max encoder values of each joint.
|
||||
|
||||
Move the joints while the method streams live positions. Press :kbd:`Enter` to finish.
|
||||
|
||||
Args:
|
||||
joints (list[str] | None, optional): Joints to record. Defaults to every joint (`None`).
|
||||
display_values (bool, optional): When `True` (default) a live table is printed to the console.
|
||||
|
||||
Raises:
|
||||
TypeError: `joints` is not `None` or a list.
|
||||
ValueError: any joint's recorded min and max are the same.
|
||||
|
||||
Returns:
|
||||
tuple[dict[str, int], dict[str, int]]: Two dictionaries *mins* and *maxes* with the extreme values
|
||||
observed for each joint.
|
||||
"""
|
||||
if joints is None:
|
||||
joints = list(self.joints)
|
||||
elif not isinstance(joints, list):
|
||||
raise TypeError(joints)
|
||||
|
||||
start_positions = self._read(joints, normalize=False)
|
||||
mins = start_positions.copy()
|
||||
maxes = start_positions.copy()
|
||||
while True:
|
||||
positions = self._read(joints, normalize=False)
|
||||
mins = {joint: min(positions[joint], min_) for joint, min_ in mins.items()}
|
||||
maxes = {joint: max(positions[joint], max_) for joint, max_ in maxes.items()}
|
||||
|
||||
if display_values:
|
||||
print("\n-------------------------------------------")
|
||||
print(f"{'NAME':<15} | {'MIN':>6} | {'POS':>6} | {'MAX':>6}")
|
||||
for joint in joints:
|
||||
print(f"{joint:<15} | {mins[joint]:>6} | {positions[joint]:>6} | {maxes[joint]:>6}")
|
||||
|
||||
if enter_pressed():
|
||||
break
|
||||
|
||||
if display_values:
|
||||
# Move cursor up to overwrite the previous output
|
||||
move_cursor_up(len(joints) + 3)
|
||||
|
||||
same_min_max = [joint for joint in joints if mins[joint] == maxes[joint]]
|
||||
if same_min_max:
|
||||
raise ValueError(f"Some joints have the same min and max values:\n{pformat(same_min_max)}")
|
||||
|
||||
return mins, maxes
|
||||
|
||||
def configure(self) -> None:
|
||||
pass
|
||||
|
||||
def _normalize(self, values: dict[str, int], joints: list[str] | None = None):
|
||||
if not self.calibration:
|
||||
raise RuntimeError(f"{self} has no calibration registered.")
|
||||
|
||||
normalized_values = {}
|
||||
for joint, val in values.items():
|
||||
min_ = self.calibration[joint].range_min
|
||||
max_ = self.calibration[joint].range_max
|
||||
drive_mode = self.calibration[joint].drive_mode
|
||||
bounded_val = min(max_, max(min_, val))
|
||||
|
||||
if self.joints[joint] is MotorNormMode.RANGE_M100_100:
|
||||
norm = (((bounded_val - min_) / (max_ - min_)) * 200) - 100
|
||||
normalized_values[joint] = -norm if drive_mode else norm
|
||||
elif self.joints[joint] is MotorNormMode.RANGE_0_100:
|
||||
norm = ((bounded_val - min_) / (max_ - min_)) * 100
|
||||
normalized_values[joint] = 100 - norm if drive_mode else norm
|
||||
|
||||
return normalized_values
|
||||
|
||||
def _read(
|
||||
self, joints: list[str] | None = None, normalize: bool = True, timeout: float = 1
|
||||
) -> dict[str, int | float]:
|
||||
"""
|
||||
Return the most recent (single) values from self.last_d,
|
||||
optionally applying calibration.
|
||||
"""
|
||||
if not self.new_state_event.wait(timeout=timeout):
|
||||
raise TimeoutError(f"{self}: Timed out waiting for state after {timeout}s.")
|
||||
|
||||
with self.lock:
|
||||
state = self._state
|
||||
|
||||
self.new_state_event.clear()
|
||||
|
||||
if state is None:
|
||||
raise RuntimeError(f"{self} Internal error: Event set but no state available.")
|
||||
|
||||
if joints is not None:
|
||||
state = {k: v for k, v in state.items() if k in joints}
|
||||
|
||||
if normalize:
|
||||
state = self._normalize(state, joints)
|
||||
|
||||
return state
|
||||
|
||||
def _read_loop(self):
|
||||
"""
|
||||
Continuously read from the serial buffer in its own thread and sends values to the main thread through
|
||||
a queue.
|
||||
"""
|
||||
while True:
|
||||
try:
|
||||
if self.serial.in_waiting > 0:
|
||||
self.serial.flush()
|
||||
raw_values = self.serial.readline().decode("utf-8").strip().split(" ")
|
||||
if len(raw_values) != 21: # 16 raw + 5 angle values
|
||||
continue
|
||||
|
||||
joint_angles = {
|
||||
"shoulder_pitch": int(raw_values[19]),
|
||||
"shoulder_yaw": int(raw_values[18]),
|
||||
"shoulder_roll": int(raw_values[20]),
|
||||
"elbow_flex": int(raw_values[17]),
|
||||
"wrist_roll": int(raw_values[16]),
|
||||
"wrist_yaw": int(raw_values[1]),
|
||||
"wrist_pitch": int(raw_values[0]),
|
||||
}
|
||||
|
||||
with self.lock:
|
||||
self._state = joint_angles
|
||||
self.new_state_event.set()
|
||||
|
||||
except Exception as e:
|
||||
logger.debug(f"Error reading frame in background thread for {self}: {e}")
|
||||
|
||||
def get_action(self) -> dict[str, float]:
|
||||
joint_positions = self._read()
|
||||
return {f"{joint}.pos": pos for joint, pos in joint_positions.items()}
|
||||
|
||||
def send_feedback(self, feedback: dict[str, float]) -> None:
|
||||
raise NotImplementedError
|
||||
|
||||
def disconnect(self) -> None:
|
||||
if not self.is_connected:
|
||||
DeviceNotConnectedError(f"{self} is not connected.")
|
||||
|
||||
self.thread.join(timeout=0.5)
|
||||
self.serial.close()
|
||||
logger.info(f"{self} disconnected.")
|
||||
293
lerobot/common/teleoperators/homonculus/homonculus_glove.py
Normal file
293
lerobot/common/teleoperators/homonculus/homonculus_glove.py
Normal file
@@ -0,0 +1,293 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
import threading
|
||||
from pprint import pformat
|
||||
|
||||
import serial
|
||||
|
||||
from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
|
||||
from lerobot.common.motors import MotorCalibration
|
||||
from lerobot.common.motors.motors_bus import MotorNormMode
|
||||
from lerobot.common.utils.utils import enter_pressed, move_cursor_up
|
||||
|
||||
from ..teleoperator import Teleoperator
|
||||
from .config_homonculus import HomonculusGloveConfig
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
LEFT_HAND_INVERSIONS = [
|
||||
"thumb_cmc",
|
||||
"index_dip",
|
||||
"middle_mcp_abduction",
|
||||
"middle_dip",
|
||||
"pinky_mcp_abduction",
|
||||
"pinky_dip",
|
||||
]
|
||||
|
||||
RIGHT_HAND_INVERSIONS = [
|
||||
"thumb_mcp",
|
||||
"thumb_cmc",
|
||||
"index_mcp_abduction",
|
||||
"index_dip",
|
||||
"middle_mcp_abduction",
|
||||
"middle_dip",
|
||||
"ring_mcp_abduction",
|
||||
"ring_mcp_flexion",
|
||||
"ring_dip",
|
||||
"pinky_mcp_abduction",
|
||||
]
|
||||
|
||||
|
||||
class HomonculusGlove(Teleoperator):
|
||||
"""
|
||||
Homonculus Glove designed by Hugging Face.
|
||||
"""
|
||||
|
||||
config_class = HomonculusGloveConfig
|
||||
name = "homonculus_glove"
|
||||
|
||||
def __init__(self, config: HomonculusGloveConfig):
|
||||
super().__init__(config)
|
||||
self.config = config
|
||||
self.serial = serial.Serial(config.port, config.baud_rate, timeout=1)
|
||||
|
||||
self.joints = {
|
||||
"thumb_cmc": MotorNormMode.RANGE_0_100,
|
||||
"thumb_mcp": MotorNormMode.RANGE_0_100,
|
||||
"thumb_pip": MotorNormMode.RANGE_0_100,
|
||||
"thumb_dip": MotorNormMode.RANGE_0_100,
|
||||
"index_mcp_abduction": MotorNormMode.RANGE_M100_100,
|
||||
"index_mcp_flexion": MotorNormMode.RANGE_0_100,
|
||||
"index_dip": MotorNormMode.RANGE_0_100,
|
||||
"middle_mcp_abduction": MotorNormMode.RANGE_M100_100,
|
||||
"middle_mcp_flexion": MotorNormMode.RANGE_0_100,
|
||||
"middle_dip": MotorNormMode.RANGE_0_100,
|
||||
"ring_mcp_abduction": MotorNormMode.RANGE_M100_100,
|
||||
"ring_mcp_flexion": MotorNormMode.RANGE_0_100,
|
||||
"ring_dip": MotorNormMode.RANGE_0_100,
|
||||
"pinky_mcp_abduction": MotorNormMode.RANGE_M100_100,
|
||||
"pinky_mcp_flexion": MotorNormMode.RANGE_0_100,
|
||||
"pinky_dip": MotorNormMode.RANGE_0_100,
|
||||
}
|
||||
self.inverted_joints = RIGHT_HAND_INVERSIONS if config.side == "right" else LEFT_HAND_INVERSIONS
|
||||
|
||||
self._state: dict[str, float] | None = None
|
||||
self.new_state_event = threading.Event()
|
||||
self.thread = threading.Thread(target=self._read_loop, daemon=True, name=f"{self} _read_loop")
|
||||
self.lock = threading.Lock()
|
||||
|
||||
@property
|
||||
def action_features(self) -> dict:
|
||||
return {f"{joint}.pos": float for joint in self.joints}
|
||||
|
||||
@property
|
||||
def feedback_features(self) -> dict:
|
||||
return {}
|
||||
|
||||
@property
|
||||
def is_connected(self) -> bool:
|
||||
return self.thread.is_alive() and self.serial.is_open
|
||||
|
||||
def connect(self) -> None:
|
||||
if self.is_connected:
|
||||
raise DeviceAlreadyConnectedError(f"{self} already connected")
|
||||
|
||||
if not self.serial.is_open:
|
||||
self.serial.open()
|
||||
self.thread.start()
|
||||
|
||||
# wait for the thread to ramp up & 1st state to be ready
|
||||
if not self.new_state_event.wait(timeout=2):
|
||||
raise TimeoutError(f"{self}: Timed out waiting for state after 2s.")
|
||||
|
||||
if not self.is_calibrated:
|
||||
self.calibrate()
|
||||
|
||||
logger.info(f"{self} connected.")
|
||||
|
||||
@property
|
||||
def is_calibrated(self) -> bool:
|
||||
return self.calibration_fpath.is_file()
|
||||
|
||||
def calibrate(self) -> None:
|
||||
range_mins, range_maxes = {}, {}
|
||||
for finger in ["thumb", "index", "middle", "ring", "pinky"]:
|
||||
print(
|
||||
f"\nMove {finger} through its entire range of motion."
|
||||
"\nRecording positions. Press ENTER to stop..."
|
||||
)
|
||||
finger_joints = [joint for joint in self.joints if joint.startswith(finger)]
|
||||
finger_mins, finger_maxes = self._record_ranges_of_motion(finger_joints)
|
||||
range_mins.update(finger_mins)
|
||||
range_maxes.update(finger_maxes)
|
||||
|
||||
self.calibration = {}
|
||||
for id_, joint in enumerate(self.joints):
|
||||
self.calibration[joint] = MotorCalibration(
|
||||
id=id_,
|
||||
drive_mode=1 if joint in self.inverted_joints else 0,
|
||||
homing_offset=0,
|
||||
range_min=range_mins[joint],
|
||||
range_max=range_maxes[joint],
|
||||
)
|
||||
|
||||
self._save_calibration()
|
||||
print("Calibration saved to", self.calibration_fpath)
|
||||
|
||||
def _record_ranges_of_motion(
|
||||
self, joints: list[str] | None = None, display_values: bool = True
|
||||
) -> tuple[dict[str, int], dict[str, int]]:
|
||||
"""Interactively record the min/max encoder values of each joint.
|
||||
|
||||
Move the fingers while the method streams live positions. Press :kbd:`Enter` to finish.
|
||||
|
||||
Args:
|
||||
joints (list[str] | None, optional): Joints to record. Defaults to every joint (`None`).
|
||||
display_values (bool, optional): When `True` (default) a live table is printed to the console.
|
||||
|
||||
Raises:
|
||||
TypeError: `joints` is not `None` or a list.
|
||||
ValueError: any joint's recorded min and max are the same.
|
||||
|
||||
Returns:
|
||||
tuple[dict[str, int], dict[str, int]]: Two dictionaries *mins* and *maxes* with the extreme values
|
||||
observed for each joint.
|
||||
"""
|
||||
if joints is None:
|
||||
joints = list(self.joints)
|
||||
elif not isinstance(joints, list):
|
||||
raise TypeError(joints)
|
||||
|
||||
display_len = max(len(key) for key in joints)
|
||||
|
||||
start_positions = self._read(joints, normalize=False)
|
||||
mins = start_positions.copy()
|
||||
maxes = start_positions.copy()
|
||||
while True:
|
||||
positions = self._read(joints, normalize=False)
|
||||
mins = {joint: min(positions[joint], min_) for joint, min_ in mins.items()}
|
||||
maxes = {joint: max(positions[joint], max_) for joint, max_ in maxes.items()}
|
||||
|
||||
if display_values:
|
||||
print("\n-------------------------------------------")
|
||||
print(f"{'NAME':<{display_len}} | {'MIN':>6} | {'POS':>6} | {'MAX':>6}")
|
||||
for joint in joints:
|
||||
print(
|
||||
f"{joint:<{display_len}} | {mins[joint]:>6} | {positions[joint]:>6} | {maxes[joint]:>6}"
|
||||
)
|
||||
|
||||
if enter_pressed():
|
||||
break
|
||||
|
||||
if display_values:
|
||||
# Move cursor up to overwrite the previous output
|
||||
move_cursor_up(len(joints) + 3)
|
||||
|
||||
same_min_max = [joint for joint in joints if mins[joint] == maxes[joint]]
|
||||
if same_min_max:
|
||||
raise ValueError(f"Some joints have the same min and max values:\n{pformat(same_min_max)}")
|
||||
|
||||
return mins, maxes
|
||||
|
||||
def configure(self) -> None:
|
||||
pass
|
||||
|
||||
def _normalize(self, values: dict[str, int], joints: list[str] | None = None):
|
||||
if not self.calibration:
|
||||
raise RuntimeError(f"{self} has no calibration registered.")
|
||||
|
||||
normalized_values = {}
|
||||
for joint, val in values.items():
|
||||
min_ = self.calibration[joint].range_min
|
||||
max_ = self.calibration[joint].range_max
|
||||
drive_mode = self.calibration[joint].drive_mode
|
||||
bounded_val = min(max_, max(min_, val))
|
||||
|
||||
if self.joints[joint] is MotorNormMode.RANGE_M100_100:
|
||||
norm = (((bounded_val - min_) / (max_ - min_)) * 200) - 100
|
||||
normalized_values[joint] = -norm if drive_mode else norm
|
||||
elif self.joints[joint] is MotorNormMode.RANGE_0_100:
|
||||
norm = ((bounded_val - min_) / (max_ - min_)) * 100
|
||||
normalized_values[joint] = 100 - norm if drive_mode else norm
|
||||
|
||||
return normalized_values
|
||||
|
||||
def _read(
|
||||
self, joints: list[str] | None = None, normalize: bool = True, timeout: float = 1
|
||||
) -> dict[str, int | float]:
|
||||
"""
|
||||
Return the most recent (single) values from self.last_d,
|
||||
optionally applying calibration.
|
||||
"""
|
||||
if not self.new_state_event.wait(timeout=timeout):
|
||||
raise TimeoutError(f"{self}: Timed out waiting for state after {timeout}s.")
|
||||
|
||||
with self.lock:
|
||||
state = self._state
|
||||
|
||||
self.new_state_event.clear()
|
||||
|
||||
if state is None:
|
||||
raise RuntimeError(f"{self} Internal error: Event set but no state available.")
|
||||
|
||||
if joints is not None:
|
||||
state = {k: v for k, v in state.items() if k in joints}
|
||||
|
||||
if normalize:
|
||||
state = self._normalize(state, joints)
|
||||
|
||||
return state
|
||||
|
||||
def _read_loop(self):
|
||||
"""
|
||||
Continuously read from the serial buffer in its own thread and sends values to the main thread through
|
||||
a queue.
|
||||
"""
|
||||
while True:
|
||||
try:
|
||||
if self.serial.in_waiting > 0:
|
||||
self.serial.flush()
|
||||
positions = self.serial.readline().decode("utf-8").strip().split(" ")
|
||||
if len(positions) != len(self.joints):
|
||||
continue
|
||||
|
||||
joint_positions = {
|
||||
joint: int(pos) for joint, pos in zip(self.joints, positions, strict=True)
|
||||
}
|
||||
|
||||
with self.lock:
|
||||
self._state = joint_positions
|
||||
self.new_state_event.set()
|
||||
|
||||
except Exception as e:
|
||||
logger.debug(f"Error reading frame in background thread for {self}: {e}")
|
||||
|
||||
def get_action(self) -> dict[str, float]:
|
||||
joint_positions = self._read()
|
||||
return {f"{joint}.pos": pos for joint, pos in joint_positions.items()}
|
||||
|
||||
def send_feedback(self, feedback: dict[str, float]) -> None:
|
||||
raise NotImplementedError
|
||||
|
||||
def disconnect(self) -> None:
|
||||
if not self.is_connected:
|
||||
DeviceNotConnectedError(f"{self} is not connected.")
|
||||
|
||||
self.thread.join(timeout=0.5)
|
||||
self.serial.close()
|
||||
logger.info(f"{self} disconnected.")
|
||||
Reference in New Issue
Block a user