Compare commits

...

42 Commits

Author SHA1 Message Date
Simon Alibert
e578e0c2bb Merge branch 'user/aliberts/2025_02_25_refactor_robots' into user/aliberts/2025_04_03_add_hope_jr 2025-06-02 20:04:30 +02:00
Simon Alibert
e0d3179b72 Add control scripts 2025-05-27 16:54:14 +02:00
Simon Alibert
fbd636d6b0 More hacks 2025-05-27 16:53:35 +02:00
Simon Alibert
6b4d465595 Arm fixes and add config 2025-05-27 12:46:07 +02:00
Simon Alibert
dfa96b4258 Merge remote-tracking branch 'origin/user/aliberts/2025_02_25_refactor_robots' into user/aliberts/2025_04_03_add_hope_jr 2025-05-26 17:40:15 +02:00
Simon Alibert
d5ebac067a Left/Right sides + other fixes 2025-05-26 17:39:40 +02:00
Simon Alibert
c94d1c5745 Fix arm & glove 2025-05-23 17:31:56 +02:00
Simon Alibert
b623a15a16 Add timeout/event logic 2025-05-23 16:54:41 +02:00
Simon Alibert
325f5d72f8 Fix arm joints order 2025-05-23 16:02:03 +02:00
Simon Alibert
97e1860182 Merge remote-tracking branch 'origin/user/aliberts/2025_02_25_refactor_robots' into user/aliberts/2025_04_03_add_hope_jr 2025-05-23 14:48:46 +02:00
Simon Alibert
5f57289eac (WIP) Update arm 2025-05-23 12:32:28 +02:00
Simon Alibert
72d04ca3fe Cleanup glove 2025-05-23 12:31:51 +02:00
Simon Alibert
125d19df93 Implement arm calibration (hacks) 2025-05-22 17:56:31 +02:00
Simon Alibert
dafdbe4297 Remove pygame prompt 2025-05-22 17:52:44 +02:00
Simon Alibert
e62460f187 Merge remote-tracking branch 'origin/user/aliberts/2025_02_25_refactor_robots' into user/aliberts/2025_04_03_add_hope_jr 2025-05-22 16:03:41 +02:00
Simon Alibert
ef0134dc19 Fix glove to hand conversion 2025-05-22 16:00:07 +02:00
Simon Alibert
1742b2627b Merge remote-tracking branch 'origin/user/aliberts/2025_02_25_refactor_robots' into user/aliberts/2025_04_03_add_hope_jr 2025-05-22 11:35:09 +02:00
Simon Alibert
5504b5aee2 Fix joints translation 2025-05-22 11:20:38 +02:00
Simon Alibert
0f34b4b1bd Add drive_mode & norm_mode in glove calibration 2025-05-22 11:19:06 +02:00
Simon Alibert
1572e9b2aa (WIP) Add glove to hand joints translation 2025-05-21 16:18:51 +02:00
Simon Alibert
a3c671f7dd Anatomically precise joint names 2025-05-21 15:42:41 +02:00
Simon Alibert
f57015fbd9 Cleanup gui & add copyrights 2025-05-20 17:34:47 +02:00
Simon Alibert
df7b22de95 Fix feetech protocol 1 configure 2025-05-20 17:34:20 +02:00
Simon Alibert
a79f096750 Update calibration gui 2025-05-19 18:48:48 +02:00
Simon Alibert
e9d1b5588c Add setup_motors 2025-05-19 12:43:02 +02:00
Simon Alibert
a5eaec857d Merge remote-tracking branch 'origin/user/aliberts/2025_02_25_refactor_robots' into user/aliberts/2025_04_03_add_hope_jr 2025-05-19 11:24:57 +02:00
Simon Alibert
7aef264cf8 (WIP) add calibration gui 2025-05-19 11:06:47 +02:00
Simon Alibert
a2fbc54382 Complete docstring 2025-05-16 10:46:47 +02:00
Simon Alibert
9e8d726e8f Fix glove calibration 2025-05-15 18:39:18 +02:00
Simon Alibert
2a0ab40776 Update glove 2025-05-15 11:12:04 +02:00
Simon Alibert
5d322826c7 Merge remote-tracking branch 'origin/user/aliberts/2025_02_25_refactor_robots' into user/aliberts/2025_04_03_add_hope_jr 2025-05-13 18:50:06 +02:00
Simon Alibert
0df6a3fcef Update homonculus hand & arm 2025-05-13 18:49:30 +02:00
Simon Alibert
f9f3088060 Update hand & arm 2025-05-13 18:25:51 +02:00
Simon Alibert
30d3807afa Merge remote-tracking branch 'origin/user/aliberts/2025_02_25_refactor_robots' into user/aliberts/2025_04_03_add_hope_jr 2025-05-11 14:36:49 +02:00
Simon Alibert
98643b000e Fix hand & glove readings 2025-04-16 09:54:53 +02:00
Simon Alibert
c4b1cee615 Update branch & fix pre-commit errors 2025-04-15 13:01:34 +02:00
Simon Alibert
a0120a2446 Merge remote-tracking branch 'origin/user/aliberts/2025_02_25_refactor_robots' into user/aliberts/2025_04_03_add_hope_jr 2025-04-15 12:29:58 +02:00
pre-commit-ci[bot]
7c5813778b [pre-commit.ci] auto fixes from pre-commit.com hooks
for more information, see https://pre-commit.ci
2025-04-08 09:39:49 +00:00
pre-commit-ci[bot]
27feea019a [pre-commit.ci] auto fixes from pre-commit.com hooks
for more information, see https://pre-commit.ci
2025-04-08 11:39:10 +02:00
Simon Alibert
c81f293ae1 (WIP) Add homonculus arm & glove 2025-04-08 11:39:10 +02:00
Simon Alibert
c5ab08e175 Rename arm -> hand 2025-04-08 11:39:10 +02:00
Simon Alibert
58d2ac0c0b (WIP) Add Hope Jr 2025-04-08 11:39:09 +02:00
22 changed files with 1834 additions and 12 deletions

View File

@@ -0,0 +1,53 @@
import time
import traceback
from lerobot.common.robots.hope_jr import HopeJrArm, HopeJrArmConfig
from lerobot.common.teleoperators.homonculus import HomonculusArm, HomonculusArmConfig
from lerobot.common.utils.utils import move_cursor_up
def make_left_arm() -> tuple[HomonculusArm, HopeJrArm]:
left_exo_arm_cfg = HomonculusArmConfig("/dev/tty.usbmodem2101", id="left")
left_exo_arm = HomonculusArm(left_exo_arm_cfg)
left_arm_cfg = HopeJrArmConfig("/dev/tty.usbserial-140", id="left")
left_arm = HopeJrArm(left_arm_cfg)
return left_exo_arm, left_arm
def main():
left_exo_arm, left_arm = make_left_arm()
display_len = max(len(key) for key in left_exo_arm.action_features)
left_exo_arm.connect()
left_arm.connect()
try:
while True:
start = time.perf_counter()
left_arm_action = left_exo_arm.get_action()
left_arm_action["shoulder_pitch.pos"] = -left_arm_action["shoulder_pitch.pos"]
left_arm_action["wrist_yaw.pos"] = -left_arm_action["wrist_yaw.pos"]
left_arm.send_action(left_arm_action)
time.sleep(0.01)
loop_s = time.perf_counter() - start
print("\n" + "-" * (display_len + 10))
print(f"{'NAME':<{display_len}} | {'ARM':>7}")
for joint, val in left_arm_action.items():
print(f"{joint:<{display_len}} | {val:>7.2f}")
print(f"\ntime: {loop_s * 1e3:.2f}ms ({1 / loop_s:.0f} Hz)")
move_cursor_up(len(left_arm_action) + 5)
except KeyboardInterrupt:
pass
except Exception:
traceback.print_exc()
finally:
left_exo_arm.disconnect()
left_arm.disconnect()
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,51 @@
import time
import traceback
from lerobot.common.robots.hope_jr import HopeJrHand, HopeJrHandConfig, homonculus_glove_to_hope_jr_hand
from lerobot.common.teleoperators.homonculus import HomonculusGlove, HomonculusGloveConfig
from lerobot.common.utils.utils import move_cursor_up
def make_left_hand() -> tuple[HomonculusGlove, HopeJrHand]:
left_glove_cfg = HomonculusGloveConfig("/dev/tty.usbmodem1101", id="left", side="left")
left_glove = HomonculusGlove(left_glove_cfg)
left_hand_cfg = HopeJrHandConfig("/dev/tty.usbmodem58760433641", id="left", side="left")
left_hand = HopeJrHand(left_hand_cfg)
return left_glove, left_hand
def main():
left_glove, left_hand = make_left_hand()
left_glove.connect()
left_hand.connect()
display_len = max(len(key) for key in left_hand.action_features)
try:
while True:
start = time.perf_counter()
left_glove_action = left_glove.get_action()
left_hand_action = homonculus_glove_to_hope_jr_hand(left_glove_action)
left_hand.send_action(left_hand_action)
time.sleep(0.01)
loop_s = time.perf_counter() - start
print(f"\n{'NAME':<{display_len}} | {'HAND':>7}")
for joint, val in left_hand_action.items():
print(f"{joint:<{display_len}} | {val:>7.2f}")
print(f"\ntime: {loop_s * 1e3:.2f}ms ({1 / loop_s:.0f} Hz)")
move_cursor_up(len(left_hand_action) + 4)
except KeyboardInterrupt:
pass
except Exception:
traceback.print_exc()
finally:
left_glove.disconnect()
left_hand.disconnect()
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,53 @@
import time
import traceback
from lerobot.common.robots.hope_jr import HopeJrArm, HopeJrArmConfig
from lerobot.common.teleoperators.homonculus import HomonculusArm, HomonculusArmConfig
from lerobot.common.utils.utils import move_cursor_up
def make_right_arm() -> tuple[HomonculusArm, HopeJrArm]:
right_exo_arm_cfg = HomonculusArmConfig("/dev/tty.usbmodem2101", id="right")
right_exo_arm = HomonculusArm(right_exo_arm_cfg)
right_arm_cfg = HopeJrArmConfig("/dev/tty.usbserial-140", id="right")
right_arm = HopeJrArm(right_arm_cfg)
return right_exo_arm, right_arm
def main():
right_exo_arm, right_arm = make_right_arm()
display_len = max(len(key) for key in right_exo_arm.action_features)
right_exo_arm.connect()
right_arm.connect()
try:
while True:
start = time.perf_counter()
right_arm_action = right_exo_arm.get_action()
right_arm_action["shoulder_pitch.pos"] = -right_arm_action["shoulder_pitch.pos"]
right_arm_action["wrist_yaw.pos"] = -right_arm_action["wrist_yaw.pos"]
right_arm.send_action(right_arm_action)
time.sleep(0.01)
loop_s = time.perf_counter() - start
print("\n" + "-" * (display_len + 10))
print(f"{'NAME':<{display_len}} | {'ARM':>7}")
for joint, val in right_arm_action.items():
print(f"{joint:<{display_len}} | {val:>7.2f}")
print(f"\ntime: {loop_s * 1e3:.2f}ms ({1 / loop_s:.0f} Hz)")
move_cursor_up(len(right_arm_action) + 5)
except KeyboardInterrupt:
pass
except Exception:
traceback.print_exc()
finally:
right_exo_arm.disconnect()
right_arm.disconnect()
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,51 @@
import time
import traceback
from lerobot.common.robots.hope_jr import HopeJrHand, HopeJrHandConfig, homonculus_glove_to_hope_jr_hand
from lerobot.common.teleoperators.homonculus import HomonculusGlove, HomonculusGloveConfig
from lerobot.common.utils.utils import move_cursor_up
def make_right_hand() -> tuple[HomonculusGlove, HopeJrHand]:
right_glove_cfg = HomonculusGloveConfig("/dev/tty.usbmodem2101", id="right", side="right")
right_glove = HomonculusGlove(right_glove_cfg)
right_hand_cfg = HopeJrHandConfig("/dev/tty.usbserial-140", id="right", side="right")
right_hand = HopeJrHand(right_hand_cfg)
return right_glove, right_hand
def main():
right_glove, right_hand = make_right_hand()
right_glove.connect()
right_hand.connect()
display_len = max(len(key) for key in right_hand.action_features)
try:
while True:
start = time.perf_counter()
right_glove_action = right_glove.get_action()
right_hand_action = homonculus_glove_to_hope_jr_hand(right_glove_action)
right_hand.send_action(right_hand_action)
time.sleep(0.01)
loop_s = time.perf_counter() - start
print(f"\n{'NAME':<{display_len}} | {'HAND':>7}")
for joint, val in right_hand_action.items():
print(f"{joint:<{display_len}} | {val:>7.2f}")
print(f"\ntime: {loop_s * 1e3:.2f}ms ({1 / loop_s:.0f} Hz)")
move_cursor_up(len(right_hand_action) + 4)
except KeyboardInterrupt:
pass
except Exception:
traceback.print_exc()
finally:
right_glove.disconnect()
right_hand.disconnect()
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,36 @@
import time
import traceback
from lerobot.common.robots.hope_jr import HopeJrArm, HopeJrArmConfig
from lerobot.common.utils.utils import move_cursor_up
# cfg = HopeJrArmConfig("/dev/tty.usbserial-140", id="right")
cfg = HopeJrArmConfig("/dev/tty.usbserial-140", id="left")
arm = HopeJrArm(cfg)
display_len = max(len(key) for key in arm.action_features)
arm.connect()
# arm.calibrate()
arm.bus.disable_torque()
try:
while True:
start = time.perf_counter()
raw_obs = arm.bus.sync_read("Present_Position", normalize=False)
norm_obs = {arm.bus.motors[name].id: val for name, val in raw_obs.items()}
norm_obs = arm.bus._normalize(norm_obs)
norm_obs = {arm.bus._id_to_name(id_): val for id_, val in norm_obs.items()}
loop_s = time.perf_counter() - start
print("\n----------------------------------")
print(f"{'NAME':<15} | {'RAW':>7} | {'NORM':>7}")
for motor in arm.bus.motors:
print(f"{motor:<15} | {raw_obs[motor]:>7} | {norm_obs[motor]:>7.2f}")
print(f"\ntime: {loop_s * 1e3:.2f}ms ({1 / loop_s:.0f} Hz)")
move_cursor_up(len(arm.bus.motors) + 5)
except KeyboardInterrupt:
pass
except Exception as e:
traceback.print_exc(e)
finally:
arm.disconnect()

View File

@@ -0,0 +1,33 @@
import time
import traceback
from lerobot.common.teleoperators.homonculus import HomonculusArm, HomonculusArmConfig
from lerobot.common.utils.utils import move_cursor_up
# cfg = HomonculusArmConfig("/dev/tty.usbmodem2101", id="left")
cfg = HomonculusArmConfig("/dev/tty.usbmodem2101", id="right")
arm = HomonculusArm(cfg)
display_len = max(len(key) for key in arm.action_features)
arm.connect()
# arm.calibrate()
try:
while True:
start = time.perf_counter()
raw_action = arm._read(normalize=False)
norm_action = arm._normalize(raw_action)
loop_s = time.perf_counter() - start
print("\n" + "-" * (display_len + 10))
print(f"{'NAME':<{display_len}} | {'RAW':>7} | {'NORM':>7}")
for joint in arm.joints:
print(f"{joint:<{display_len}} | {raw_action[joint]:>7} | {norm_action[joint]:>7.2f}")
print(f"\ntime: {loop_s * 1e3:.2f}ms ({1 / loop_s:.0f} Hz)")
move_cursor_up(len(norm_action) + 5)
except KeyboardInterrupt:
pass
except Exception:
traceback.print_exc()
finally:
arm.disconnect()

View File

@@ -0,0 +1,32 @@
import time
import traceback
from lerobot.common.teleoperators.homonculus import HomonculusGlove, HomonculusGloveConfig
from lerobot.common.utils.utils import move_cursor_up
config = HomonculusGloveConfig("/dev/tty.usbmodem2101", side="right", id="right")
glove = HomonculusGlove(config)
glove.connect()
display_len = max(len(key) for key in glove.action_features)
glove.calibrate()
try:
while True:
start = time.perf_counter()
raw_action = glove._read(normalize=False)
norm_action = glove._normalize(raw_action)
loop_s = time.perf_counter() - start
print("\n" + "-" * (display_len + 10))
print(f"{'NAME':<{display_len}} | {'RAW':>7} | {'NORM':>7}")
for joint in glove.joints:
print(f"{joint:<{display_len}} | {raw_action[joint]:>7} | {norm_action[joint]:>7.2f}")
print(f"\ntime: {loop_s * 1e3:.2f}ms ({1 / loop_s:.0f} Hz)")
move_cursor_up(len(norm_action) + 5)
except KeyboardInterrupt:
pass
except Exception:
traceback.print_exc()
finally:
glove.disconnect()

View File

@@ -0,0 +1,8 @@
from lerobot.common.robots.hope_jr import HopeJrHand, HopeJrHandConfig
cfg = HopeJrHandConfig("/dev/tty.usbmodem58760433641", id="left", side="left")
hand = HopeJrHand(cfg)
hand.connect()
hand.calibrate()
hand.disconnect()

View File

@@ -0,0 +1,382 @@
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import math
import os
from dataclasses import dataclass
os.environ["PYGAME_HIDE_SUPPORT_PROMPT"] = "1"
import pygame
from lerobot.common.motors import MotorCalibration, MotorsBus
BAR_LEN, BAR_THICKNESS = 450, 8
HANDLE_R = 10
BRACKET_W, BRACKET_H = 6, 14
TRI_W, TRI_H = 12, 14
BTN_W, BTN_H = 60, 22
SAVE_W, SAVE_H = 80, 28
LOAD_W = 80
DD_W, DD_H = 160, 28
TOP_GAP = 50
PADDING_Y, TOP_OFFSET = 70, 60
FONT_SIZE, FPS = 20, 60
BG_COLOR = (30, 30, 30)
BAR_RED, BAR_GREEN = (200, 60, 60), (60, 200, 60)
HANDLE_COLOR, TEXT_COLOR = (240, 240, 240), (250, 250, 250)
TICK_COLOR = (250, 220, 40)
BTN_COLOR, BTN_COLOR_HL = (80, 80, 80), (110, 110, 110)
DD_COLOR, DD_COLOR_HL = (70, 70, 70), (100, 100, 100)
def dist(a, b):
return math.hypot(a[0] - b[0], a[1] - b[1])
@dataclass
class RangeValues:
min_v: int
pos_v: int
max_v: int
class RangeSlider:
"""One motor = one slider row"""
def __init__(self, motor, idx, res, calibration, present, label_pad, base_y):
self.motor = motor
self.res = res
self.x0 = 40 + label_pad
self.x1 = self.x0 + BAR_LEN
self.y = base_y + idx * PADDING_Y
self.min_v = calibration.range_min
self.max_v = calibration.range_max
self.pos_v = max(self.min_v, min(present, self.max_v))
self.min_x = self._pos_from_val(self.min_v)
self.max_x = self._pos_from_val(self.max_v)
self.pos_x = self._pos_from_val(self.pos_v)
self.min_btn = pygame.Rect(self.x0 - BTN_W - 6, self.y - BTN_H // 2, BTN_W, BTN_H)
self.max_btn = pygame.Rect(self.x1 + 6, self.y - BTN_H // 2, BTN_W, BTN_H)
self.drag_min = self.drag_max = self.drag_pos = False
self.tick_val = present
self.font = pygame.font.Font(None, FONT_SIZE)
def _val_from_pos(self, x):
return round((x - self.x0) / BAR_LEN * self.res)
def _pos_from_val(self, v):
return self.x0 + (v / self.res) * BAR_LEN
def set_tick(self, v):
self.tick_val = max(0, min(v, self.res))
def _triangle_hit(self, pos):
tri_top = self.y - BAR_THICKNESS // 2 - 2
return pygame.Rect(self.pos_x - TRI_W // 2, tri_top - TRI_H, TRI_W, TRI_H).collidepoint(pos)
def handle_event(self, e):
if e.type == pygame.MOUSEBUTTONDOWN and e.button == 1:
if self.min_btn.collidepoint(e.pos):
self.min_x, self.min_v = self.pos_x, self.pos_v
return
if self.max_btn.collidepoint(e.pos):
self.max_x, self.max_v = self.pos_x, self.pos_v
return
if dist(e.pos, (self.min_x, self.y)) <= HANDLE_R:
self.drag_min = True
elif dist(e.pos, (self.max_x, self.y)) <= HANDLE_R:
self.drag_max = True
elif self._triangle_hit(e.pos):
self.drag_pos = True
elif e.type == pygame.MOUSEBUTTONUP and e.button == 1:
self.drag_min = self.drag_max = self.drag_pos = False
elif e.type == pygame.MOUSEMOTION:
x = e.pos[0]
if self.drag_min:
self.min_x = max(self.x0, min(x, self.pos_x))
elif self.drag_max:
self.max_x = min(self.x1, max(x, self.pos_x))
elif self.drag_pos:
self.pos_x = max(self.min_x, min(x, self.max_x))
self.min_v = self._val_from_pos(self.min_x)
self.max_v = self._val_from_pos(self.max_x)
self.pos_v = self._val_from_pos(self.pos_x)
def _draw_button(self, surf, rect, text):
clr = BTN_COLOR_HL if rect.collidepoint(pygame.mouse.get_pos()) else BTN_COLOR
pygame.draw.rect(surf, clr, rect, border_radius=4)
t = self.font.render(text, True, TEXT_COLOR)
surf.blit(t, (rect.centerx - t.get_width() // 2, rect.centery - t.get_height() // 2))
def draw(self, surf):
# motor name above set-min button (right-aligned)
name_surf = self.font.render(self.motor, True, TEXT_COLOR)
surf.blit(
name_surf,
(self.min_btn.right - name_surf.get_width(), self.min_btn.y - name_surf.get_height() - 4),
)
# bar + active section
pygame.draw.rect(surf, BAR_RED, (self.x0, self.y - BAR_THICKNESS // 2, BAR_LEN, BAR_THICKNESS))
pygame.draw.rect(
surf, BAR_GREEN, (self.min_x, self.y - BAR_THICKNESS // 2, self.max_x - self.min_x, BAR_THICKNESS)
)
# tick
tick_x = self._pos_from_val(self.tick_val)
pygame.draw.line(
surf,
TICK_COLOR,
(tick_x, self.y - BAR_THICKNESS // 2 - 4),
(tick_x, self.y + BAR_THICKNESS // 2 + 4),
2,
)
# brackets
for x, sign in ((self.min_x, +1), (self.max_x, -1)):
pygame.draw.line(
surf, HANDLE_COLOR, (x, self.y - BRACKET_H // 2), (x, self.y + BRACKET_H // 2), 2
)
pygame.draw.line(
surf,
HANDLE_COLOR,
(x, self.y - BRACKET_H // 2),
(x + sign * BRACKET_W, self.y - BRACKET_H // 2),
2,
)
pygame.draw.line(
surf,
HANDLE_COLOR,
(x, self.y + BRACKET_H // 2),
(x + sign * BRACKET_W, self.y + BRACKET_H // 2),
2,
)
# triangle ▼
tri_top = self.y - BAR_THICKNESS // 2 - 2
pygame.draw.polygon(
surf,
HANDLE_COLOR,
[
(self.pos_x, tri_top),
(self.pos_x - TRI_W // 2, tri_top - TRI_H),
(self.pos_x + TRI_W // 2, tri_top - TRI_H),
],
)
# numeric labels
fh = self.font.get_height()
pos_y = tri_top - TRI_H - 4 - fh
txts = [
(self.min_v, self.min_x, self.y - BRACKET_H // 2 - 4 - fh),
(self.max_v, self.max_x, self.y - BRACKET_H // 2 - 4 - fh),
(self.pos_v, self.pos_x, pos_y),
]
for v, x, y in txts:
s = self.font.render(str(v), True, TEXT_COLOR)
surf.blit(s, (x - s.get_width() // 2, y))
# buttons
self._draw_button(surf, self.min_btn, "set min")
self._draw_button(surf, self.max_btn, "set max")
# external
def values(self) -> RangeValues:
return RangeValues(self.min_v, self.pos_v, self.max_v)
class RangeFinderGUI:
def __init__(self, bus: MotorsBus, groups: dict[str, list[str]] | None = None):
self.bus = bus
self.groups = groups if groups is not None else {"all": list(bus.motors)}
self.group_names = list(groups)
self.current_group = self.group_names[0]
if not bus.is_connected:
bus.connect()
self.calibration = bus.read_calibration()
self.res_table = bus.model_resolution_table
self.present_cache = {
m: bus.read("Present_Position", m, normalize=False) for motors in groups.values() for m in motors
}
pygame.init()
self.font = pygame.font.Font(None, FONT_SIZE)
label_pad = max(self.font.size(m)[0] for ms in groups.values() for m in ms)
self.label_pad = label_pad
width = 40 + label_pad + BAR_LEN + 6 + BTN_W + 10 + SAVE_W + 10
self.controls_bottom = 10 + SAVE_H
self.base_y = self.controls_bottom + TOP_GAP
height = self.base_y + PADDING_Y * len(groups[self.current_group]) + 40
self.screen = pygame.display.set_mode((width, height))
pygame.display.set_caption("Motors range finder")
# ui rects
self.save_btn = pygame.Rect(width - SAVE_W - 10, 10, SAVE_W, SAVE_H)
self.load_btn = pygame.Rect(self.save_btn.left - LOAD_W - 10, 10, LOAD_W, SAVE_H)
self.dd_btn = pygame.Rect(width // 2 - DD_W // 2, 10, DD_W, DD_H)
self.dd_open = False # dropdown expanded?
self.clock = pygame.time.Clock()
self._build_sliders()
self._adjust_height()
def _adjust_height(self):
motors = self.groups[self.current_group]
new_h = self.base_y + PADDING_Y * len(motors) + 40
if new_h != self.screen.get_height():
w = self.screen.get_width()
self.screen = pygame.display.set_mode((w, new_h))
def _build_sliders(self):
self.sliders: list[RangeSlider] = []
motors = self.groups[self.current_group]
for i, m in enumerate(motors):
self.sliders.append(
RangeSlider(
motor=m,
idx=i,
res=self.res_table[self.bus.motors[m].model] - 1,
calibration=self.calibration[m],
present=self.present_cache[m],
label_pad=self.label_pad,
base_y=self.base_y,
)
)
def _draw_dropdown(self):
# collapsed box
hover = self.dd_btn.collidepoint(pygame.mouse.get_pos())
pygame.draw.rect(self.screen, DD_COLOR_HL if hover else DD_COLOR, self.dd_btn, border_radius=6)
txt = self.font.render(self.current_group, True, TEXT_COLOR)
self.screen.blit(
txt, (self.dd_btn.centerx - txt.get_width() // 2, self.dd_btn.centery - txt.get_height() // 2)
)
tri_w, tri_h = 12, 6
cx = self.dd_btn.right - 14
cy = self.dd_btn.centery + 1
pygame.draw.polygon(
self.screen,
TEXT_COLOR,
[(cx - tri_w // 2, cy - tri_h // 2), (cx + tri_w // 2, cy - tri_h // 2), (cx, cy + tri_h // 2)],
)
if not self.dd_open:
return
# expanded list
for i, name in enumerate(self.group_names):
item_rect = pygame.Rect(self.dd_btn.left, self.dd_btn.bottom + i * DD_H, DD_W, DD_H)
clr = DD_COLOR_HL if item_rect.collidepoint(pygame.mouse.get_pos()) else DD_COLOR
pygame.draw.rect(self.screen, clr, item_rect)
t = self.font.render(name, True, TEXT_COLOR)
self.screen.blit(
t, (item_rect.centerx - t.get_width() // 2, item_rect.centery - t.get_height() // 2)
)
def _handle_dropdown_event(self, e):
if e.type == pygame.MOUSEBUTTONDOWN and e.button == 1:
if self.dd_btn.collidepoint(e.pos):
self.dd_open = not self.dd_open
return True
if self.dd_open:
for i, name in enumerate(self.group_names):
item_rect = pygame.Rect(self.dd_btn.left, self.dd_btn.bottom + i * DD_H, DD_W, DD_H)
if item_rect.collidepoint(e.pos):
if name != self.current_group:
self.current_group = name
self._build_sliders()
self._adjust_height()
self.dd_open = False
return True
self.dd_open = False
return False
def _save_current(self):
for s in self.sliders:
self.calibration[s.motor].range_min = s.min_v
self.calibration[s.motor].range_max = s.max_v
with self.bus.torque_disabled():
self.bus.write_calibration(self.calibration)
def _load_current(self):
self.calibration = self.bus.read_calibration()
for s in self.sliders:
s.min_v = self.calibration[s.motor].range_min
s.max_v = self.calibration[s.motor].range_max
s.min_x = s._pos_from_val(s.min_v)
s.max_x = s._pos_from_val(s.max_v)
def run(self) -> dict[str, MotorCalibration]:
while True:
for e in pygame.event.get():
if e.type == pygame.QUIT:
pygame.quit()
return self.calibration
if self._handle_dropdown_event(e):
continue
if e.type == pygame.MOUSEBUTTONDOWN and e.button == 1:
if self.save_btn.collidepoint(e.pos):
self._save_current()
elif self.load_btn.collidepoint(e.pos):
self._load_current()
for s in self.sliders:
s.handle_event(e)
# live goal write while dragging
for s in self.sliders:
if s.drag_pos:
self.bus.write("Goal_Position", s.motor, s.pos_v, normalize=False)
# tick update
for s in self.sliders:
pos = self.bus.read("Present_Position", s.motor, normalize=False)
s.set_tick(pos)
self.present_cache[s.motor] = pos
# ─ drawing
self.screen.fill(BG_COLOR)
for s in self.sliders:
s.draw(self.screen)
self._draw_dropdown()
# load / save buttons
for rect, text in ((self.load_btn, "LOAD"), (self.save_btn, "SAVE")):
clr = BTN_COLOR_HL if rect.collidepoint(pygame.mouse.get_pos()) else BTN_COLOR
pygame.draw.rect(self.screen, clr, rect, border_radius=6)
t = self.font.render(text, True, TEXT_COLOR)
self.screen.blit(t, (rect.centerx - t.get_width() // 2, rect.centery - t.get_height() // 2))
pygame.display.flip()
self.clock.tick(FPS)

View File

@@ -191,13 +191,14 @@ class DynamixelMotorsBus(MotorsBus):
return calibration
def write_calibration(self, calibration_dict: dict[str, MotorCalibration]) -> None:
def write_calibration(self, calibration_dict: dict[str, MotorCalibration], cache: bool = True) -> None:
for motor, calibration in calibration_dict.items():
self.write("Homing_Offset", motor, calibration.homing_offset)
self.write("Min_Position_Limit", motor, calibration.range_min)
self.write("Max_Position_Limit", motor, calibration.range_max)
self.calibration = calibration_dict
if cache:
self.calibration = calibration_dict
def disable_torque(self, motors: str | list[str] | None = None, num_retry: int = 0) -> None:
for motor in self._get_motors_list(motors):

View File

@@ -225,8 +225,8 @@ class FeetechMotorsBus(MotorsBus):
# the 'Return_Delay_Time' address). We ensure this is reduced to the minimum of 2µs (value of 0).
self.write("Return_Delay_Time", motor, 0)
# Set 'Maximum_Acceleration' to 254 to speedup acceleration and deceleration of the motors.
# Note: this address is not in the official STS3215 Memory Table
self.write("Maximum_Acceleration", motor, 254)
if self.protocol_version == 0:
self.write("Maximum_Acceleration", motor, 254)
self.write("Acceleration", motor, 254)
@property
@@ -271,14 +271,15 @@ class FeetechMotorsBus(MotorsBus):
return calibration
def write_calibration(self, calibration_dict: dict[str, MotorCalibration]) -> None:
def write_calibration(self, calibration_dict: dict[str, MotorCalibration], cache: bool = True) -> None:
for motor, calibration in calibration_dict.items():
if self.protocol_version == 0:
self.write("Homing_Offset", motor, calibration.homing_offset)
self.write("Min_Position_Limit", motor, calibration.range_min)
self.write("Max_Position_Limit", motor, calibration.range_max)
self.calibration = calibration_dict
if cache:
self.calibration = calibration_dict
def _get_half_turn_homings(self, positions: dict[NameOrID, Value]) -> dict[NameOrID, Value]:
"""

View File

@@ -189,7 +189,7 @@ MODEL_RESOLUTION = {
"scs_series": 1024,
"sts3215": 4096,
"sts3250": 4096,
"sm8512bl": 65536,
"sm8512bl": 4096, # 65536 (?)
"scs0009": 1024,
}

View File

@@ -588,7 +588,7 @@ class MotorsBus(abc.ABC):
pass
@contextmanager
def torque_disabled(self):
def torque_disabled(self, motors: int | str | list[str] | None = None):
"""Context-manager that guarantees torque is re-enabled.
This helper is useful to temporarily disable torque when configuring motors.
@@ -598,11 +598,11 @@ class MotorsBus(abc.ABC):
... # Safe operations here
... pass
"""
self.disable_torque()
self.disable_torque(motors)
try:
yield
finally:
self.enable_torque()
self.enable_torque(motors)
def set_timeout(self, timeout_ms: int | None = None):
"""Change the packet timeout used by the SDK.
@@ -655,12 +655,13 @@ class MotorsBus(abc.ABC):
pass
@abc.abstractmethod
def write_calibration(self, calibration_dict: dict[str, MotorCalibration]) -> None:
"""Write calibration parameters to the motors and cache them.
def write_calibration(self, calibration_dict: dict[str, MotorCalibration], cache: bool = True) -> None:
"""Write calibration parameters to the motors and optionally cache them.
Args:
calibration_dict (dict[str, MotorCalibration]): Calibration obtained from
:pymeth:`read_calibration` or crafted by the user.
cache (bool, optional): Save the calibration to :pyattr:`calibration`. Defaults to True.
"""
pass

View File

@@ -0,0 +1,4 @@
from .config_hope_jr import HopeJrArmConfig, HopeJrHandConfig
from .hope_jr_arm import HopeJrArm
from .hope_jr_hand import HopeJrHand
from .joints_translation import homonculus_glove_to_hope_jr_hand

View File

@@ -0,0 +1,52 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from dataclasses import dataclass, field
from lerobot.common.cameras import CameraConfig
from ..config import RobotConfig
@RobotConfig.register_subclass("hope_jr_hand")
@dataclass
class HopeJrHandConfig(RobotConfig):
port: str # Port to connect to the hand
side: str # "left" / "right"
disable_torque_on_disconnect: bool = True
cameras: dict[str, CameraConfig] = field(default_factory=dict)
def __post_init__(self):
super().__post_init__()
if self.side not in ["right", "left"]:
raise ValueError(self.side)
@RobotConfig.register_subclass("hope_jr_arm")
@dataclass
class HopeJrArmConfig(RobotConfig):
port: str # Port to connect to the hand
disable_torque_on_disconnect: bool = True
# `max_relative_target` limits the magnitude of the relative positional target vector for safety purposes.
# Set this to a positive scalar to have the same value for all motors, or a list that is the same length as
# the number of motors in your follower arms.
max_relative_target: int | None = None
cameras: dict[str, CameraConfig] = field(default_factory=dict)

View File

@@ -0,0 +1,216 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import time
from functools import cached_property
from typing import Any
from lerobot.common.cameras.utils import make_cameras_from_configs
from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
from lerobot.common.motors import Motor, MotorCalibration, MotorNormMode
from lerobot.common.motors.feetech import (
FeetechMotorsBus,
OperatingMode,
)
from ..robot import Robot
from ..utils import ensure_safe_goal_position
from .config_hope_jr import HopeJrArmConfig
logger = logging.getLogger(__name__)
class HopeJrArm(Robot):
config_class = HopeJrArmConfig
name = "hope_jr_arm"
def __init__(self, config: HopeJrArmConfig):
super().__init__(config)
self.config = config
self.bus = FeetechMotorsBus(
port=self.config.port,
motors={
"shoulder_pitch": Motor(2, "sm8512bl", MotorNormMode.RANGE_M100_100),
"shoulder_yaw": Motor(3, "sts3250", MotorNormMode.RANGE_M100_100),
"shoulder_roll": Motor(4, "sts3250", MotorNormMode.RANGE_M100_100),
"elbow_flex": Motor(5, "sts3250", MotorNormMode.RANGE_M100_100),
"wrist_roll": Motor(6, "sts3250", MotorNormMode.RANGE_M100_100),
"wrist_yaw": Motor(7, "sts3250", MotorNormMode.RANGE_M100_100),
"wrist_pitch": Motor(8, "sts3250", MotorNormMode.RANGE_M100_100),
},
# motors={
# "shoulder_pitch": Motor(1, "sm8512bl", MotorNormMode.RANGE_M100_100),
# "shoulder_yaw": Motor(2, "sts3250", MotorNormMode.RANGE_M100_100),
# "shoulder_roll": Motor(3, "sts3250", MotorNormMode.RANGE_M100_100),
# "elbow_flex": Motor(4, "sts3250", MotorNormMode.RANGE_M100_100),
# "wrist_roll": Motor(5, "sts3250", MotorNormMode.RANGE_M100_100),
# "wrist_yaw": Motor(6, "sts3250", MotorNormMode.RANGE_M100_100),
# "wrist_pitch": Motor(7, "sts3250", MotorNormMode.RANGE_M100_100),
# },
calibration=self.calibration,
)
self.cameras = make_cameras_from_configs(config.cameras)
# HACK
self.shoulder_pitch = "shoulder_pitch"
self.other_motors = [m for m in self.bus.motors if m != "shoulder_pitch"]
@property
def _motors_ft(self) -> dict[str, type]:
return {f"{motor}.pos": float for motor in self.bus.motors}
@property
def _cameras_ft(self) -> dict[str, tuple]:
return {
cam: (self.config.cameras[cam].height, self.config.cameras[cam].width, 3) for cam in self.cameras
}
@cached_property
def observation_features(self) -> dict[str, type | tuple]:
return {**self._motors_ft, **self._cameras_ft}
@cached_property
def action_features(self) -> dict[str, type]:
return self._motors_ft
@property
def is_connected(self) -> bool:
# TODO(aliberts): add cam.is_connected for cam in self.cameras
return self.bus.is_connected
def connect(self) -> None:
"""
We assume that at connection time, arm is in a rest position,
and torque can be safely disabled to run calibration.
"""
if self.is_connected:
raise DeviceAlreadyConnectedError(f"{self} already connected")
self.bus.connect(handshake=False)
if not self.is_calibrated:
self.calibrate()
# Connect the cameras
for cam in self.cameras.values():
cam.connect()
self.configure()
logger.info(f"{self} connected.")
@property
def is_calibrated(self) -> bool:
return self.bus.is_calibrated
def calibrate(self) -> None:
logger.info(f"\nRunning calibration of {self}")
self.bus.disable_torque()
for motor in self.bus.motors:
self.bus.write("Operating_Mode", motor, OperatingMode.POSITION.value)
unknown_range_motors = self.other_motors
input("Move arm to the middle of its range of motion and press ENTER....")
homing_offsets_full_turn = self.bus.set_half_turn_homings("shoulder_pitch")
homing_offsets_unknown_range = self.bus.set_half_turn_homings(unknown_range_motors)
homing_offsets = {**homing_offsets_full_turn, **homing_offsets_unknown_range}
logger.info(
"Move all joints except 'shoulder_pitch' sequentially through their "
"entire ranges of motion.\nRecording positions. Press ENTER to stop..."
)
range_mins, range_maxes = self.bus.record_ranges_of_motion(unknown_range_motors)
range_mins["shoulder_pitch"] = 1024
range_maxes["shoulder_pitch"] = 3071
range_mins["wrist_roll"] = 1500
range_maxes["wrist_roll"] = 2500
self.calibration = {}
for motor, m in self.bus.motors.items():
self.calibration[motor] = MotorCalibration(
id=m.id,
drive_mode=0,
homing_offset=homing_offsets[motor],
range_min=range_mins[motor],
range_max=range_maxes[motor],
)
self.bus.write_calibration(self.calibration)
self._save_calibration()
print("Calibration saved to", self.calibration_fpath)
def configure(self) -> None:
with self.bus.torque_disabled():
for motor in self.bus.motors:
self.bus.write("Return_Delay_Time", motor, 0)
self.bus.write("Maximum_Acceleration", motor, 50)
self.bus.write("Acceleration", motor, 50)
def setup_motors(self) -> None:
for motor in reversed(self.bus.motors):
input(f"Connect the controller board to the '{motor}' motor only and press enter.")
self.bus.setup_motor(motor)
print(f"'{motor}' motor id set to {self.bus.motors[motor].id}")
def get_observation(self) -> dict[str, Any]:
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
obs_dict = {}
# Read arm position
start = time.perf_counter()
obs_dict[self.shoulder_pitch] = self.bus.read("Present_Position", self.shoulder_pitch)
obs_dict.update(self.bus.sync_read("Present_Position", self.other_motors))
obs_dict = {f"{motor}.pos": val for motor, val in obs_dict.items()}
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read state: {dt_ms:.1f}ms")
# Capture images from cameras
for cam_key, cam in self.cameras.items():
start = time.perf_counter()
obs_dict[cam_key] = cam.async_read()
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms")
return obs_dict
def send_action(self, action: dict[str, Any]) -> dict[str, Any]:
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
goal_pos = {key.removesuffix(".pos"): val for key, val in action.items() if key.endswith(".pos")}
# Cap goal position when too far away from present position.
# /!\ Slower fps expected due to reading from the follower.
if self.config.max_relative_target is not None:
present_pos = self.bus.sync_read("Present_Position")
goal_present_pos = {key: (g_pos, present_pos[key]) for key, g_pos in goal_pos.items()}
goal_pos = ensure_safe_goal_position(goal_present_pos, self.config.max_relative_target)
self.bus.sync_write("Goal_Position", goal_pos)
return {f"{motor}.pos": val for motor, val in goal_pos.items()}
def disconnect(self):
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
self.bus.disconnect(self.config.disable_torque_on_disconnect)
for cam in self.cameras.values():
cam.disconnect()
logger.info(f"{self} disconnected.")

View File

@@ -0,0 +1,204 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import time
from functools import cached_property
from typing import Any
from lerobot.common.cameras.utils import make_cameras_from_configs
from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
from lerobot.common.motors import Motor, MotorNormMode
from lerobot.common.motors.calibration_gui import RangeFinderGUI
from lerobot.common.motors.feetech import (
FeetechMotorsBus,
)
from ..robot import Robot
from .config_hope_jr import HopeJrHandConfig
logger = logging.getLogger(__name__)
RIGHT_HAND_INVERSIONS = [
"thumb_mcp",
"thumb_dip",
"index_ulnar_flexor",
"middle_ulnar_flexor",
"ring_ulnar_flexor",
"ring_pip_dip",
"pinky_ulnar_flexor",
"pinky_pip_dip",
]
LEFT_HAND_INVERSIONS = [
"thumb_cmc",
"thumb_mcp",
"thumb_dip",
"index_radial_flexor",
"index_pip_dip",
"middle_radial_flexor",
"middle_pip_dip",
"ring_radial_flexor",
"ring_pip_dip",
"pinky_radial_flexor",
# "pinky_pip_dip",
]
class HopeJrHand(Robot):
config_class = HopeJrHandConfig
name = "hope_jr_hand"
def __init__(self, config: HopeJrHandConfig):
super().__init__(config)
self.config = config
self.bus = FeetechMotorsBus(
port=self.config.port,
motors={
# Thumb
"thumb_cmc": Motor(1, "scs0009", MotorNormMode.RANGE_0_100),
"thumb_mcp": Motor(2, "scs0009", MotorNormMode.RANGE_0_100),
"thumb_pip": Motor(3, "scs0009", MotorNormMode.RANGE_0_100),
"thumb_dip": Motor(4, "scs0009", MotorNormMode.RANGE_0_100),
# Index
"index_radial_flexor": Motor(5, "scs0009", MotorNormMode.RANGE_0_100),
"index_ulnar_flexor": Motor(6, "scs0009", MotorNormMode.RANGE_0_100),
"index_pip_dip": Motor(7, "scs0009", MotorNormMode.RANGE_0_100),
# Middle
"middle_radial_flexor": Motor(8, "scs0009", MotorNormMode.RANGE_0_100),
"middle_ulnar_flexor": Motor(9, "scs0009", MotorNormMode.RANGE_0_100),
"middle_pip_dip": Motor(10, "scs0009", MotorNormMode.RANGE_0_100),
# Ring
"ring_radial_flexor": Motor(11, "scs0009", MotorNormMode.RANGE_0_100),
"ring_ulnar_flexor": Motor(12, "scs0009", MotorNormMode.RANGE_0_100),
"ring_pip_dip": Motor(13, "scs0009", MotorNormMode.RANGE_0_100),
# Pinky
"pinky_radial_flexor": Motor(14, "scs0009", MotorNormMode.RANGE_0_100),
"pinky_ulnar_flexor": Motor(15, "scs0009", MotorNormMode.RANGE_0_100),
"pinky_pip_dip": Motor(16, "scs0009", MotorNormMode.RANGE_0_100),
},
calibration=self.calibration,
protocol_version=1,
)
self.cameras = make_cameras_from_configs(config.cameras)
self.inverted_motors = RIGHT_HAND_INVERSIONS if config.side == "right" else LEFT_HAND_INVERSIONS
@property
def _motors_ft(self) -> dict[str, type]:
return {f"{motor}.pos": float for motor in self.bus.motors}
@property
def _cameras_ft(self) -> dict[str, tuple]:
return {
cam: (self.config.cameras[cam].height, self.config.cameras[cam].width, 3) for cam in self.cameras
}
@cached_property
def observation_features(self) -> dict[str, type | tuple]:
return {**self._motors_ft, **self._cameras_ft}
@cached_property
def action_features(self) -> dict[str, type]:
return self._motors_ft
@property
def is_connected(self) -> bool:
# TODO(aliberts): add cam.is_connected for cam in self.cameras
return self.bus.is_connected
def connect(self) -> None:
if self.is_connected:
raise DeviceAlreadyConnectedError(f"{self} already connected")
self.bus.connect()
if not self.is_calibrated:
self.calibrate()
# Connect the cameras
for cam in self.cameras.values():
cam.connect()
self.configure()
logger.info(f"{self} connected.")
@property
def is_calibrated(self) -> bool:
return self.bus.is_calibrated
def calibrate(self) -> None:
fingers = {}
for finger in ["thumb", "index", "middle", "ring", "pinky"]:
fingers[finger] = [motor for motor in self.bus.motors if motor.startswith(finger)]
self.calibration = RangeFinderGUI(self.bus, fingers).run()
for motor in self.inverted_motors:
self.calibration[motor].drive_mode = 1
self._save_calibration()
print("Calibration saved to", self.calibration_fpath)
def configure(self) -> None:
with self.bus.torque_disabled():
self.bus.configure_motors()
# for motor in self.bus.motors:
# self.bus.write("Return_Delay_Time", motor, 0)
# self.bus.write("Acceleration_2", motor, 50)
# self.bus.write("Acceleration", motor, 50)
def setup_motors(self) -> None:
for motor in self.bus.motors:
input(f"Connect the controller board to the '{motor}' motor only and press enter.")
self.bus.setup_motor(motor)
print(f"'{motor}' motor id set to {self.bus.motors[motor].id}")
def get_observation(self) -> dict[str, Any]:
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
obs_dict = {}
# Read hand position
start = time.perf_counter()
for motor in self.bus.motors:
obs_dict[f"{motor}.pos"] = self.bus.read("Present_Position", motor, normalize=False)
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read state: {dt_ms:.1f}ms")
# Capture images from cameras
for cam_key, cam in self.cameras.items():
start = time.perf_counter()
obs_dict[cam_key] = cam.async_read()
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms")
return obs_dict
def send_action(self, action: dict[str, Any]) -> dict[str, Any]:
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
goal_pos = {key.removesuffix(".pos"): val for key, val in action.items() if key.endswith(".pos")}
self.bus.sync_write("Goal_Position", goal_pos)
return action
def disconnect(self):
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
self.bus.disconnect(self.config.disable_torque_on_disconnect)
for cam in self.cameras.values():
cam.disconnect()
logger.info(f"{self} disconnected.")

View File

@@ -0,0 +1,53 @@
INDEX_SPLAY = 0.3
MIDDLE_SPLAY = 0.3
RING_SPLAY = 0.3
PINKY_SPLAY = 0.5
def get_ulnar_flexion(flexion: float, abduction: float, splay: float):
# ulnar_component = max(-abduction, 0)
ulnar_component = -abduction
return ulnar_component * splay + flexion * (1 - splay)
def get_radial_flexion(flexion: float, abduction: float, splay: float):
radial_component = abduction
# radial_component = max(abduction, 0)
return radial_component * splay + flexion * (1 - splay)
def homonculus_glove_to_hope_jr_hand(glove_action: dict[str, float]) -> dict[str, float]:
return {
"thumb_cmc.pos": glove_action["thumb_cmc.pos"],
"thumb_mcp.pos": glove_action["thumb_mcp.pos"],
"thumb_pip.pos": glove_action["thumb_pip.pos"],
"thumb_dip.pos": glove_action["thumb_dip.pos"],
"index_radial_flexor.pos": get_radial_flexion(
glove_action["index_mcp_flexion.pos"], glove_action["index_mcp_abduction.pos"], INDEX_SPLAY
),
"index_ulnar_flexor.pos": get_ulnar_flexion(
glove_action["index_mcp_flexion.pos"], glove_action["index_mcp_abduction.pos"], INDEX_SPLAY
),
"index_pip_dip.pos": glove_action["index_dip.pos"],
"middle_radial_flexor.pos": get_radial_flexion(
glove_action["middle_mcp_flexion.pos"], glove_action["middle_mcp_abduction.pos"], MIDDLE_SPLAY
),
"middle_ulnar_flexor.pos": get_ulnar_flexion(
glove_action["middle_mcp_flexion.pos"], glove_action["middle_mcp_abduction.pos"], MIDDLE_SPLAY
),
"middle_pip_dip.pos": glove_action["middle_dip.pos"],
"ring_radial_flexor.pos": get_radial_flexion(
glove_action["ring_mcp_flexion.pos"], glove_action["ring_mcp_abduction.pos"], RING_SPLAY
),
"ring_ulnar_flexor.pos": get_ulnar_flexion(
glove_action["ring_mcp_flexion.pos"], glove_action["ring_mcp_abduction.pos"], RING_SPLAY
),
"ring_pip_dip.pos": glove_action["ring_dip.pos"],
"pinky_radial_flexor.pos": get_radial_flexion(
glove_action["pinky_mcp_flexion.pos"], glove_action["pinky_mcp_abduction.pos"], PINKY_SPLAY
),
"pinky_ulnar_flexor.pos": get_ulnar_flexion(
glove_action["pinky_mcp_flexion.pos"], glove_action["pinky_mcp_abduction.pos"], PINKY_SPLAY
),
"pinky_pip_dip.pos": glove_action["pinky_dip.pos"],
}

View File

@@ -0,0 +1,3 @@
from .config_homonculus import HomonculusArmConfig, HomonculusGloveConfig
from .homonculus_arm import HomonculusArm
from .homonculus_glove import HomonculusGlove

View File

@@ -0,0 +1,38 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from dataclasses import dataclass
from ..config import TeleoperatorConfig
@TeleoperatorConfig.register_subclass("homonculus_glove")
@dataclass
class HomonculusGloveConfig(TeleoperatorConfig):
port: str # Port to connect to the glove
side: str # "left" / "right"
baud_rate: int = 115_200
def __post_init__(self):
if self.side not in ["right", "left"]:
raise ValueError(self.side)
@TeleoperatorConfig.register_subclass("homonculus_arm")
@dataclass
class HomonculusArmConfig(TeleoperatorConfig):
port: str # Port to connect to the arm
baud_rate: int = 115_200

View File

@@ -0,0 +1,257 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import threading
from pprint import pformat
import serial
from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
from lerobot.common.motors.motors_bus import MotorCalibration, MotorNormMode
from lerobot.common.utils.utils import enter_pressed, move_cursor_up
from ..teleoperator import Teleoperator
from .config_homonculus import HomonculusArmConfig
logger = logging.getLogger(__name__)
class HomonculusArm(Teleoperator):
"""
Homonculus Arm designed by Hugging Face.
"""
config_class = HomonculusArmConfig
name = "homonculus_arm"
def __init__(self, config: HomonculusArmConfig):
super().__init__(config)
self.config = config
self.serial = serial.Serial(config.port, config.baud_rate, timeout=1)
self.joints = {
"shoulder_pitch": MotorNormMode.RANGE_M100_100,
"shoulder_yaw": MotorNormMode.RANGE_M100_100,
"shoulder_roll": MotorNormMode.RANGE_M100_100,
"elbow_flex": MotorNormMode.RANGE_M100_100,
"wrist_roll": MotorNormMode.RANGE_M100_100,
"wrist_yaw": MotorNormMode.RANGE_M100_100,
"wrist_pitch": MotorNormMode.RANGE_M100_100,
}
self._state: dict[str, float] | None = None
self.new_state_event = threading.Event()
self.thread = threading.Thread(target=self._read_loop, daemon=True, name=f"{self} _read_loop")
self.lock = threading.Lock()
@property
def action_features(self) -> dict:
return {f"{joint}.pos": float for joint in self.joints}
@property
def feedback_features(self) -> dict:
return {}
@property
def is_connected(self) -> bool:
return self.thread.is_alive() and self.serial.is_open
def connect(self) -> None:
if self.is_connected:
raise DeviceAlreadyConnectedError(f"{self} already connected")
if not self.serial.is_open:
self.serial.open()
self.thread.start()
# wait for the thread to ramp up & 1st state to be ready
if not self.new_state_event.wait(timeout=2):
raise TimeoutError(f"{self}: Timed out waiting for state after 2s.")
if not self.is_calibrated:
self.calibrate()
logger.info(f"{self} connected.")
@property
def is_calibrated(self) -> bool:
return self.calibration_fpath.is_file()
def calibrate(self) -> None:
print(
"\nMove all joints through their entire range of motion."
"\nRecording positions. Press ENTER to stop..."
)
range_mins, range_maxes = self._record_ranges_of_motion()
self.calibration = {}
for id_, joint in enumerate(self.joints):
self.calibration[joint] = MotorCalibration(
id=id_,
drive_mode=0,
homing_offset=0,
range_min=range_mins[joint],
range_max=range_maxes[joint],
)
self._save_calibration()
print("Calibration saved to", self.calibration_fpath)
def _record_ranges_of_motion(
self, joints: list[str] | None = None, display_values: bool = True
) -> tuple[dict[str, int], dict[str, int]]:
"""Interactively record the min/max encoder values of each joint.
Move the joints while the method streams live positions. Press :kbd:`Enter` to finish.
Args:
joints (list[str] | None, optional): Joints to record. Defaults to every joint (`None`).
display_values (bool, optional): When `True` (default) a live table is printed to the console.
Raises:
TypeError: `joints` is not `None` or a list.
ValueError: any joint's recorded min and max are the same.
Returns:
tuple[dict[str, int], dict[str, int]]: Two dictionaries *mins* and *maxes* with the extreme values
observed for each joint.
"""
if joints is None:
joints = list(self.joints)
elif not isinstance(joints, list):
raise TypeError(joints)
start_positions = self._read(joints, normalize=False)
mins = start_positions.copy()
maxes = start_positions.copy()
while True:
positions = self._read(joints, normalize=False)
mins = {joint: min(positions[joint], min_) for joint, min_ in mins.items()}
maxes = {joint: max(positions[joint], max_) for joint, max_ in maxes.items()}
if display_values:
print("\n-------------------------------------------")
print(f"{'NAME':<15} | {'MIN':>6} | {'POS':>6} | {'MAX':>6}")
for joint in joints:
print(f"{joint:<15} | {mins[joint]:>6} | {positions[joint]:>6} | {maxes[joint]:>6}")
if enter_pressed():
break
if display_values:
# Move cursor up to overwrite the previous output
move_cursor_up(len(joints) + 3)
same_min_max = [joint for joint in joints if mins[joint] == maxes[joint]]
if same_min_max:
raise ValueError(f"Some joints have the same min and max values:\n{pformat(same_min_max)}")
return mins, maxes
def configure(self) -> None:
pass
def _normalize(self, values: dict[str, int], joints: list[str] | None = None):
if not self.calibration:
raise RuntimeError(f"{self} has no calibration registered.")
normalized_values = {}
for joint, val in values.items():
min_ = self.calibration[joint].range_min
max_ = self.calibration[joint].range_max
drive_mode = self.calibration[joint].drive_mode
bounded_val = min(max_, max(min_, val))
if self.joints[joint] is MotorNormMode.RANGE_M100_100:
norm = (((bounded_val - min_) / (max_ - min_)) * 200) - 100
normalized_values[joint] = -norm if drive_mode else norm
elif self.joints[joint] is MotorNormMode.RANGE_0_100:
norm = ((bounded_val - min_) / (max_ - min_)) * 100
normalized_values[joint] = 100 - norm if drive_mode else norm
return normalized_values
def _read(
self, joints: list[str] | None = None, normalize: bool = True, timeout: float = 1
) -> dict[str, int | float]:
"""
Return the most recent (single) values from self.last_d,
optionally applying calibration.
"""
if not self.new_state_event.wait(timeout=timeout):
raise TimeoutError(f"{self}: Timed out waiting for state after {timeout}s.")
with self.lock:
state = self._state
self.new_state_event.clear()
if state is None:
raise RuntimeError(f"{self} Internal error: Event set but no state available.")
if joints is not None:
state = {k: v for k, v in state.items() if k in joints}
if normalize:
state = self._normalize(state, joints)
return state
def _read_loop(self):
"""
Continuously read from the serial buffer in its own thread and sends values to the main thread through
a queue.
"""
while True:
try:
if self.serial.in_waiting > 0:
self.serial.flush()
raw_values = self.serial.readline().decode("utf-8").strip().split(" ")
if len(raw_values) != 21: # 16 raw + 5 angle values
continue
joint_angles = {
"shoulder_pitch": int(raw_values[19]),
"shoulder_yaw": int(raw_values[18]),
"shoulder_roll": int(raw_values[20]),
"elbow_flex": int(raw_values[17]),
"wrist_roll": int(raw_values[16]),
"wrist_yaw": int(raw_values[1]),
"wrist_pitch": int(raw_values[0]),
}
with self.lock:
self._state = joint_angles
self.new_state_event.set()
except Exception as e:
logger.debug(f"Error reading frame in background thread for {self}: {e}")
def get_action(self) -> dict[str, float]:
joint_positions = self._read()
return {f"{joint}.pos": pos for joint, pos in joint_positions.items()}
def send_feedback(self, feedback: dict[str, float]) -> None:
raise NotImplementedError
def disconnect(self) -> None:
if not self.is_connected:
DeviceNotConnectedError(f"{self} is not connected.")
self.thread.join(timeout=0.5)
self.serial.close()
logger.info(f"{self} disconnected.")

View File

@@ -0,0 +1,293 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import threading
from pprint import pformat
import serial
from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
from lerobot.common.motors import MotorCalibration
from lerobot.common.motors.motors_bus import MotorNormMode
from lerobot.common.utils.utils import enter_pressed, move_cursor_up
from ..teleoperator import Teleoperator
from .config_homonculus import HomonculusGloveConfig
logger = logging.getLogger(__name__)
LEFT_HAND_INVERSIONS = [
"thumb_cmc",
"index_dip",
"middle_mcp_abduction",
"middle_dip",
"pinky_mcp_abduction",
"pinky_dip",
]
RIGHT_HAND_INVERSIONS = [
"thumb_mcp",
"thumb_cmc",
"index_mcp_abduction",
"index_dip",
"middle_mcp_abduction",
"middle_dip",
"ring_mcp_abduction",
"ring_mcp_flexion",
"ring_dip",
"pinky_mcp_abduction",
]
class HomonculusGlove(Teleoperator):
"""
Homonculus Glove designed by Hugging Face.
"""
config_class = HomonculusGloveConfig
name = "homonculus_glove"
def __init__(self, config: HomonculusGloveConfig):
super().__init__(config)
self.config = config
self.serial = serial.Serial(config.port, config.baud_rate, timeout=1)
self.joints = {
"thumb_cmc": MotorNormMode.RANGE_0_100,
"thumb_mcp": MotorNormMode.RANGE_0_100,
"thumb_pip": MotorNormMode.RANGE_0_100,
"thumb_dip": MotorNormMode.RANGE_0_100,
"index_mcp_abduction": MotorNormMode.RANGE_M100_100,
"index_mcp_flexion": MotorNormMode.RANGE_0_100,
"index_dip": MotorNormMode.RANGE_0_100,
"middle_mcp_abduction": MotorNormMode.RANGE_M100_100,
"middle_mcp_flexion": MotorNormMode.RANGE_0_100,
"middle_dip": MotorNormMode.RANGE_0_100,
"ring_mcp_abduction": MotorNormMode.RANGE_M100_100,
"ring_mcp_flexion": MotorNormMode.RANGE_0_100,
"ring_dip": MotorNormMode.RANGE_0_100,
"pinky_mcp_abduction": MotorNormMode.RANGE_M100_100,
"pinky_mcp_flexion": MotorNormMode.RANGE_0_100,
"pinky_dip": MotorNormMode.RANGE_0_100,
}
self.inverted_joints = RIGHT_HAND_INVERSIONS if config.side == "right" else LEFT_HAND_INVERSIONS
self._state: dict[str, float] | None = None
self.new_state_event = threading.Event()
self.thread = threading.Thread(target=self._read_loop, daemon=True, name=f"{self} _read_loop")
self.lock = threading.Lock()
@property
def action_features(self) -> dict:
return {f"{joint}.pos": float for joint in self.joints}
@property
def feedback_features(self) -> dict:
return {}
@property
def is_connected(self) -> bool:
return self.thread.is_alive() and self.serial.is_open
def connect(self) -> None:
if self.is_connected:
raise DeviceAlreadyConnectedError(f"{self} already connected")
if not self.serial.is_open:
self.serial.open()
self.thread.start()
# wait for the thread to ramp up & 1st state to be ready
if not self.new_state_event.wait(timeout=2):
raise TimeoutError(f"{self}: Timed out waiting for state after 2s.")
if not self.is_calibrated:
self.calibrate()
logger.info(f"{self} connected.")
@property
def is_calibrated(self) -> bool:
return self.calibration_fpath.is_file()
def calibrate(self) -> None:
range_mins, range_maxes = {}, {}
for finger in ["thumb", "index", "middle", "ring", "pinky"]:
print(
f"\nMove {finger} through its entire range of motion."
"\nRecording positions. Press ENTER to stop..."
)
finger_joints = [joint for joint in self.joints if joint.startswith(finger)]
finger_mins, finger_maxes = self._record_ranges_of_motion(finger_joints)
range_mins.update(finger_mins)
range_maxes.update(finger_maxes)
self.calibration = {}
for id_, joint in enumerate(self.joints):
self.calibration[joint] = MotorCalibration(
id=id_,
drive_mode=1 if joint in self.inverted_joints else 0,
homing_offset=0,
range_min=range_mins[joint],
range_max=range_maxes[joint],
)
self._save_calibration()
print("Calibration saved to", self.calibration_fpath)
def _record_ranges_of_motion(
self, joints: list[str] | None = None, display_values: bool = True
) -> tuple[dict[str, int], dict[str, int]]:
"""Interactively record the min/max encoder values of each joint.
Move the fingers while the method streams live positions. Press :kbd:`Enter` to finish.
Args:
joints (list[str] | None, optional): Joints to record. Defaults to every joint (`None`).
display_values (bool, optional): When `True` (default) a live table is printed to the console.
Raises:
TypeError: `joints` is not `None` or a list.
ValueError: any joint's recorded min and max are the same.
Returns:
tuple[dict[str, int], dict[str, int]]: Two dictionaries *mins* and *maxes* with the extreme values
observed for each joint.
"""
if joints is None:
joints = list(self.joints)
elif not isinstance(joints, list):
raise TypeError(joints)
display_len = max(len(key) for key in joints)
start_positions = self._read(joints, normalize=False)
mins = start_positions.copy()
maxes = start_positions.copy()
while True:
positions = self._read(joints, normalize=False)
mins = {joint: min(positions[joint], min_) for joint, min_ in mins.items()}
maxes = {joint: max(positions[joint], max_) for joint, max_ in maxes.items()}
if display_values:
print("\n-------------------------------------------")
print(f"{'NAME':<{display_len}} | {'MIN':>6} | {'POS':>6} | {'MAX':>6}")
for joint in joints:
print(
f"{joint:<{display_len}} | {mins[joint]:>6} | {positions[joint]:>6} | {maxes[joint]:>6}"
)
if enter_pressed():
break
if display_values:
# Move cursor up to overwrite the previous output
move_cursor_up(len(joints) + 3)
same_min_max = [joint for joint in joints if mins[joint] == maxes[joint]]
if same_min_max:
raise ValueError(f"Some joints have the same min and max values:\n{pformat(same_min_max)}")
return mins, maxes
def configure(self) -> None:
pass
def _normalize(self, values: dict[str, int], joints: list[str] | None = None):
if not self.calibration:
raise RuntimeError(f"{self} has no calibration registered.")
normalized_values = {}
for joint, val in values.items():
min_ = self.calibration[joint].range_min
max_ = self.calibration[joint].range_max
drive_mode = self.calibration[joint].drive_mode
bounded_val = min(max_, max(min_, val))
if self.joints[joint] is MotorNormMode.RANGE_M100_100:
norm = (((bounded_val - min_) / (max_ - min_)) * 200) - 100
normalized_values[joint] = -norm if drive_mode else norm
elif self.joints[joint] is MotorNormMode.RANGE_0_100:
norm = ((bounded_val - min_) / (max_ - min_)) * 100
normalized_values[joint] = 100 - norm if drive_mode else norm
return normalized_values
def _read(
self, joints: list[str] | None = None, normalize: bool = True, timeout: float = 1
) -> dict[str, int | float]:
"""
Return the most recent (single) values from self.last_d,
optionally applying calibration.
"""
if not self.new_state_event.wait(timeout=timeout):
raise TimeoutError(f"{self}: Timed out waiting for state after {timeout}s.")
with self.lock:
state = self._state
self.new_state_event.clear()
if state is None:
raise RuntimeError(f"{self} Internal error: Event set but no state available.")
if joints is not None:
state = {k: v for k, v in state.items() if k in joints}
if normalize:
state = self._normalize(state, joints)
return state
def _read_loop(self):
"""
Continuously read from the serial buffer in its own thread and sends values to the main thread through
a queue.
"""
while True:
try:
if self.serial.in_waiting > 0:
self.serial.flush()
positions = self.serial.readline().decode("utf-8").strip().split(" ")
if len(positions) != len(self.joints):
continue
joint_positions = {
joint: int(pos) for joint, pos in zip(self.joints, positions, strict=True)
}
with self.lock:
self._state = joint_positions
self.new_state_event.set()
except Exception as e:
logger.debug(f"Error reading frame in background thread for {self}: {e}")
def get_action(self) -> dict[str, float]:
joint_positions = self._read()
return {f"{joint}.pos": pos for joint, pos in joint_positions.items()}
def send_feedback(self, feedback: dict[str, float]) -> None:
raise NotImplementedError
def disconnect(self) -> None:
if not self.is_connected:
DeviceNotConnectedError(f"{self} is not connected.")
self.thread.join(timeout=0.5)
self.serial.close()
logger.info(f"{self} disconnected.")