Add Hope Jr (#935)

* Fix imports

* Add feetech write tests

* Nit

* Add autoclosing fixture

* Assert ping stub called

* Add CalibrationMode

* Add Motor in dxl robots

* Simplify split_int_bytes

* Rename read/write -> sync_read/write, refactor, add write

* Rename tests

* Refactor dxl tests by functionality

* Add dxl write test

* Refactor _is_comm_success

* Refactor feetech tests by functionality

* Add feetech write test

* Simplify _is_comm_success & _is_error

* Move mock_serial patch to dedicated file

* Remove test skips & fix docstrings

* Nit

* Add dxl operating modes

* Add is_connected in robots and teleops

* Update Koch

* Add feetech operating modes

* Caps dxl OperatingMode

* Update ensure_safe_goal_position

* Update so100

* Privatize methods & renames

* Fix dict

* Add _configure_motors & move ping methods

* Return models (str) with pings

* Implement feetech broadcast ping

* Add raw_values option

* Rename idx -> id_

* Improve errors

* Fix feetech ping tests

* Ensure motors exist at connection time

* Update tests

* Add test_motors_bus

* Move DriveMode & TorqueMode

* Update Koch imports

* Update so100 imports

* Fix visualize_motors_bus

* Fix imports

* Add calibration

* Rename idx -> id_

* Rename idx -> id_

* (WIP) _async_read

* Add new calibration method for robot refactor (#896)

Co-authored-by: Simon Alibert <simon.alibert@huggingface.co>

* Remove deprecated scripts

* Rename CalibrationMode -> MotorNormMode

* Fix calibration functions

* Remove todo

* Add scan_port utility

* Add calibration utilities

* Move encoding functions to encoding_utils

* Add test_encoding_utils

* Rename test

* Add more calibration utilities

* Format baudrate tables

* Implement SO-100 leader calibration

* Implement SO-100 follower calibration

* Implement Koch calibration

* Add test_scan_port (TODO)

* Fix calibration

* Hack feetech firmware bug

* Update tests

* Update Koch & SO-100

* Improve format

* Rename SO-100 classes

* Rename Koch classes

* Add calibration tests

* Remove old calibration tests

* Revert feetech hack and monkeypatch instead

* Simplify motors mocks

* Add is_calibrated test

* Update viperx & widowx

* Rename viperx & widowx

* Remove old calibration

* feat(teleop): thread-safe keyboard teleop implementation (#869)

Co-authored-by: Simon Alibert <75076266+aliberts@users.noreply.github.com>

* Add support for feetech scs series + various fixes

* Update dynamixel with motors bus & tables changes

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* (WIP) Add Hope Jr

* Rename arm -> hand

* (WIP) Add homonculus arm & glove

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Add Feetech protocol version

* Implement read

* Use constants from sdks

* (nit) move write

* Fix broadcast ping type hint

* Add protocol 1 broadcast ping

* Refactor & add _serialize_data

* Add feetech sm8512bl

* Make feetech broadcast ping faster in protocol 1

* Cleanup

* Add support for feetech protocol 1 to _split_into_byte_chunks

* Fix unormalize

* Remove test_motors_bus fixtures

* Add more segmented tests (base motor bus & feetech), add feetech protocol 1 support

* Add more segmented tests (dynamixel)

* Refactor tests

* Add handshake, fix feetech _read_firmware_version

* Fix tests

* Motors config & disconnect fixes

* Add torque_disabled context

* Update branch & fix pre-commit errors

* Fix hand & glove readings

* Update feetech tables

* Move read/write_calibration implementations

* Add setup_motor

* Fix calibration msg display

* Fix setup_motor & add it to robots

* Fix _find_single_motor

* Remove deprecated configure_motor

* Remove deprecated dynamixel_calibration

* Remove names

* Remove deprecated import

* refactor/lekiwi robot (#863)

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: Simon Alibert <simon.alibert@huggingface.co>
Co-authored-by: Simon Alibert <75076266+aliberts@users.noreply.github.com>

* fix(teleoperators): use property is_connected (#1075)

* Remove deprecated manipulator

* Update robot features & naming

* Update teleop features & naming

* Add make_teleoperator_from_config

* Rename find_port

* Fix config parsing

* Remove app script

* Add setup_motors

* Add teleoperate

* Add record

* Add replay

* Fix test_datasets

* Add mock robot & teleop

* Add new test_control_robot

* Add test_record_and_resume

* Remove deprecated scripts & tests

* Add calibrate

* Add docstrings

* Fix tests (no-extras install)

* Add SO101

* Remove pynput from optional deps

* Rename example 7

* Remove unecessary id

* Add MotorsBus docstrings

* Rename arm -> bus

* Remove Moss arm

* Fix setup_motors & calibrate configs

* Fix test_calibrate

* Add copyrights

* Update hand & arm

* Update homonculus hand & arm

* Fix dxl _find_single_motor

* Update glove

* Add setup_motors for lekiwi

* Fix glove calibration

* Complete docstring

* Add check for same min and max during calibration

* Move MockMotorsBus

* Add so100_follower tests

* (WIP) add calibration gui

* Fix test

* Add setup_motors

* Update calibration gui

* Remove old .cache folder

* Replace deprecated abc.abstractproperty

* Fix feetech protocol 1 configure

* Cleanup gui & add copyrights

* Anatomically precise joint names

* (WIP) Add glove to hand joints translation

* Move make_robot_config

* Add drive_mode & norm_mode in glove calibration

* Fix joints translation

* Fix normalization drive_mode

* nit

* Fix glove to hand conversion

* Adapt feetech calibration

* Remove pygame prompt

* Implement arm calibration (hacks)

* Better MotorsBus error messages

* Update feetech read_calibration

* Fix feetech test_is_calibrated

* Cleanup glove

* (WIP) Update arm

* Add changes from #1117

* refactor(cameras): cameras implementations + tests improvements (#1108)

Co-authored-by: Simon Alibert <simon.alibert@huggingface.co>
Co-authored-by: Simon Alibert <75076266+aliberts@users.noreply.github.com>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>

* Fix arm joints order

* Add timeout/event logic

* Fix arm & glove

* Fix predict_action from record

* fix(cameras): update docstring + handle sn when starts with 0 + update timeouts to more reasonable value (#1154)

* fix(scripts): parser instead of draccus in record + add __get_path_fields__() to RecordConfig (#1155)

* Left/Right sides + other fixes

* Arm fixes and add config

* More hacks

* Add control scripts

* Fix merge errors

* push changes to calibration, teleop and docs

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Move readme to docs

* update readme

Signed-off-by: Martino Russi <77496684+nepyope@users.noreply.github.com>

* Add files via upload

Signed-off-by: Martino Russi <77496684+nepyope@users.noreply.github.com>

* Update image sources

* Symlink doc

* Compress image

* Move image

* Update docs link

* fix docs

* simplify teleop scripts

* fix variable names

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Address code review

* add EMA to glove

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* integrate teleoperation for hand

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* update docs

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* import hopejr/homunculus in teleoperate

* update docs for teleoperate, record, replay, train and inference

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* chore(hopejr): address comments

* chore(hopejr): address coments 2

* chore(docs): update teleoperation instructions for the hand/glove

* fix(hopejr): calibration int + update docs

---------

Signed-off-by: Martino Russi <77496684+nepyope@users.noreply.github.com>
Signed-off-by: Simon Alibert <75076266+aliberts@users.noreply.github.com>
Co-authored-by: Pepijn <138571049+pkooij@users.noreply.github.com>
Co-authored-by: Steven Palma <imstevenpmwork@ieee.org>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: nepyope <nopyeps@gmail.com>
Co-authored-by: Martino Russi <77496684+nepyope@users.noreply.github.com>
Co-authored-by: Steven Palma <steven.palma@huggingface.co>
This commit is contained in:
Simon Alibert
2025-07-08 15:47:11 +02:00
committed by GitHub
parent a5e0aae13a
commit 039de254ea
26 changed files with 1922 additions and 17 deletions

View File

@@ -22,6 +22,29 @@
</div>
<h2 align="center">
<p><a href="https://huggingface.co/docs/lerobot/hope_jr">
Build Your Own HopeJR Robot!</a></p>
</h2>
<div align="center">
<img
src="media/hope_jr/hopejr.png?raw=true"
alt="HopeJR robot"
title="HopeJR robot"
style="width: 60%;"
/>
<p><strong>Meet HopeJR A humanoid robot arm and hand for dexterous manipulation!</strong></p>
<p>Control it with exoskeletons and gloves for precise hand movements.</p>
<p>Perfect for advanced manipulation tasks! 🤖</p>
<p><a href="https://huggingface.co/docs/lerobot/hope_jr">
See the full HopeJR tutorial here.</a></p>
</div>
<br/>
<h2 align="center">
<p><a href="https://huggingface.co/docs/lerobot/so101">
Build Your Own SO-101 Robot!</a></p>

View File

@@ -23,6 +23,8 @@
title: Finetune SmolVLA
title: "Policies"
- sections:
- local: hope_jr
title: Hope Jr
- local: so101
title: SO-101
- local: so100

1
docs/source/hope_jr.mdx Symbolic link
View File

@@ -0,0 +1 @@
../../src/lerobot/robots/hope_jr/hope_jr.mdx

BIN
media/hope_jr/hopejr.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 72 KiB

View File

@@ -86,6 +86,7 @@ dora = [
dynamixel = ["dynamixel-sdk>=3.7.31"]
feetech = ["feetech-servo-sdk>=1.0.0"]
gamepad = ["pygame>=2.5.1", "hidapi>=0.14.0"]
hopejr = ["feetech-servo-sdk>=1.0.0", "pygame>=2.5.1"]
kinematics = ["placo>=0.9.6"]
intelrealsense = [
"pyrealsense2>=2.55.1.6486 ; sys_platform != 'darwin'",

View File

@@ -36,6 +36,7 @@ from lerobot.cameras.realsense.configuration_realsense import RealSenseCameraCon
from lerobot.robots import ( # noqa: F401
Robot,
RobotConfig,
hope_jr,
koch_follower,
lekiwi,
make_robot_from_config,
@@ -45,6 +46,7 @@ from lerobot.robots import ( # noqa: F401
from lerobot.teleoperators import ( # noqa: F401
Teleoperator,
TeleoperatorConfig,
homunculus,
koch_leader,
make_teleoperator_from_config,
so100_leader,

View File

@@ -0,0 +1,401 @@
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import math
import os
from dataclasses import dataclass
os.environ["PYGAME_HIDE_SUPPORT_PROMPT"] = "1"
from lerobot.motors import MotorCalibration, MotorsBus
BAR_LEN, BAR_THICKNESS = 450, 8
HANDLE_R = 10
BRACKET_W, BRACKET_H = 6, 14
TRI_W, TRI_H = 12, 14
BTN_W, BTN_H = 60, 22
SAVE_W, SAVE_H = 80, 28
LOAD_W = 80
DD_W, DD_H = 160, 28
TOP_GAP = 50
PADDING_Y, TOP_OFFSET = 70, 60
FONT_SIZE, FPS = 20, 60
BG_COLOR = (30, 30, 30)
BAR_RED, BAR_GREEN = (200, 60, 60), (60, 200, 60)
HANDLE_COLOR, TEXT_COLOR = (240, 240, 240), (250, 250, 250)
TICK_COLOR = (250, 220, 40)
BTN_COLOR, BTN_COLOR_HL = (80, 80, 80), (110, 110, 110)
DD_COLOR, DD_COLOR_HL = (70, 70, 70), (100, 100, 100)
def dist(a, b):
return math.hypot(a[0] - b[0], a[1] - b[1])
@dataclass
class RangeValues:
min_v: int
pos_v: int
max_v: int
class RangeSlider:
"""One motor = one slider row"""
def __init__(self, motor, idx, res, calibration, present, label_pad, base_y):
import pygame
self.motor = motor
self.res = res
self.x0 = 40 + label_pad
self.x1 = self.x0 + BAR_LEN
self.y = base_y + idx * PADDING_Y
self.min_v = calibration.range_min
self.max_v = calibration.range_max
self.pos_v = max(self.min_v, min(present, self.max_v))
self.min_x = self._pos_from_val(self.min_v)
self.max_x = self._pos_from_val(self.max_v)
self.pos_x = self._pos_from_val(self.pos_v)
self.min_btn = pygame.Rect(self.x0 - BTN_W - 6, self.y - BTN_H // 2, BTN_W, BTN_H)
self.max_btn = pygame.Rect(self.x1 + 6, self.y - BTN_H // 2, BTN_W, BTN_H)
self.drag_min = self.drag_max = self.drag_pos = False
self.tick_val = present
self.font = pygame.font.Font(None, FONT_SIZE)
def _val_from_pos(self, x):
return round((x - self.x0) / BAR_LEN * self.res)
def _pos_from_val(self, v):
return self.x0 + (v / self.res) * BAR_LEN
def set_tick(self, v):
self.tick_val = max(0, min(v, self.res))
def _triangle_hit(self, pos):
import pygame
tri_top = self.y - BAR_THICKNESS // 2 - 2
return pygame.Rect(self.pos_x - TRI_W // 2, tri_top - TRI_H, TRI_W, TRI_H).collidepoint(pos)
def handle_event(self, e):
import pygame
if e.type == pygame.MOUSEBUTTONDOWN and e.button == 1:
if self.min_btn.collidepoint(e.pos):
self.min_x, self.min_v = self.pos_x, self.pos_v
return
if self.max_btn.collidepoint(e.pos):
self.max_x, self.max_v = self.pos_x, self.pos_v
return
if dist(e.pos, (self.min_x, self.y)) <= HANDLE_R:
self.drag_min = True
elif dist(e.pos, (self.max_x, self.y)) <= HANDLE_R:
self.drag_max = True
elif self._triangle_hit(e.pos):
self.drag_pos = True
elif e.type == pygame.MOUSEBUTTONUP and e.button == 1:
self.drag_min = self.drag_max = self.drag_pos = False
elif e.type == pygame.MOUSEMOTION:
x = e.pos[0]
if self.drag_min:
self.min_x = max(self.x0, min(x, self.pos_x))
elif self.drag_max:
self.max_x = min(self.x1, max(x, self.pos_x))
elif self.drag_pos:
self.pos_x = max(self.min_x, min(x, self.max_x))
self.min_v = self._val_from_pos(self.min_x)
self.max_v = self._val_from_pos(self.max_x)
self.pos_v = self._val_from_pos(self.pos_x)
def _draw_button(self, surf, rect, text):
import pygame
clr = BTN_COLOR_HL if rect.collidepoint(pygame.mouse.get_pos()) else BTN_COLOR
pygame.draw.rect(surf, clr, rect, border_radius=4)
t = self.font.render(text, True, TEXT_COLOR)
surf.blit(t, (rect.centerx - t.get_width() // 2, rect.centery - t.get_height() // 2))
def draw(self, surf):
import pygame
# motor name above set-min button (right-aligned)
name_surf = self.font.render(self.motor, True, TEXT_COLOR)
surf.blit(
name_surf,
(self.min_btn.right - name_surf.get_width(), self.min_btn.y - name_surf.get_height() - 4),
)
# bar + active section
pygame.draw.rect(surf, BAR_RED, (self.x0, self.y - BAR_THICKNESS // 2, BAR_LEN, BAR_THICKNESS))
pygame.draw.rect(
surf, BAR_GREEN, (self.min_x, self.y - BAR_THICKNESS // 2, self.max_x - self.min_x, BAR_THICKNESS)
)
# tick
tick_x = self._pos_from_val(self.tick_val)
pygame.draw.line(
surf,
TICK_COLOR,
(tick_x, self.y - BAR_THICKNESS // 2 - 4),
(tick_x, self.y + BAR_THICKNESS // 2 + 4),
2,
)
# brackets
for x, sign in ((self.min_x, +1), (self.max_x, -1)):
pygame.draw.line(
surf, HANDLE_COLOR, (x, self.y - BRACKET_H // 2), (x, self.y + BRACKET_H // 2), 2
)
pygame.draw.line(
surf,
HANDLE_COLOR,
(x, self.y - BRACKET_H // 2),
(x + sign * BRACKET_W, self.y - BRACKET_H // 2),
2,
)
pygame.draw.line(
surf,
HANDLE_COLOR,
(x, self.y + BRACKET_H // 2),
(x + sign * BRACKET_W, self.y + BRACKET_H // 2),
2,
)
# triangle ▼
tri_top = self.y - BAR_THICKNESS // 2 - 2
pygame.draw.polygon(
surf,
HANDLE_COLOR,
[
(self.pos_x, tri_top),
(self.pos_x - TRI_W // 2, tri_top - TRI_H),
(self.pos_x + TRI_W // 2, tri_top - TRI_H),
],
)
# numeric labels
fh = self.font.get_height()
pos_y = tri_top - TRI_H - 4 - fh
txts = [
(self.min_v, self.min_x, self.y - BRACKET_H // 2 - 4 - fh),
(self.max_v, self.max_x, self.y - BRACKET_H // 2 - 4 - fh),
(self.pos_v, self.pos_x, pos_y),
]
for v, x, y in txts:
s = self.font.render(str(v), True, TEXT_COLOR)
surf.blit(s, (x - s.get_width() // 2, y))
# buttons
self._draw_button(surf, self.min_btn, "set min")
self._draw_button(surf, self.max_btn, "set max")
# external
def values(self) -> RangeValues:
return RangeValues(self.min_v, self.pos_v, self.max_v)
class RangeFinderGUI:
def __init__(self, bus: MotorsBus, groups: dict[str, list[str]] | None = None):
import pygame
self.bus = bus
self.groups = groups if groups is not None else {"all": list(bus.motors)}
self.group_names = list(groups)
self.current_group = self.group_names[0]
if not bus.is_connected:
bus.connect()
self.calibration = bus.read_calibration()
self.res_table = bus.model_resolution_table
self.present_cache = {
m: bus.read("Present_Position", m, normalize=False) for motors in groups.values() for m in motors
}
pygame.init()
self.font = pygame.font.Font(None, FONT_SIZE)
label_pad = max(self.font.size(m)[0] for ms in groups.values() for m in ms)
self.label_pad = label_pad
width = 40 + label_pad + BAR_LEN + 6 + BTN_W + 10 + SAVE_W + 10
self.controls_bottom = 10 + SAVE_H
self.base_y = self.controls_bottom + TOP_GAP
height = self.base_y + PADDING_Y * len(groups[self.current_group]) + 40
self.screen = pygame.display.set_mode((width, height))
pygame.display.set_caption("Motors range finder")
# ui rects
self.save_btn = pygame.Rect(width - SAVE_W - 10, 10, SAVE_W, SAVE_H)
self.load_btn = pygame.Rect(self.save_btn.left - LOAD_W - 10, 10, LOAD_W, SAVE_H)
self.dd_btn = pygame.Rect(width // 2 - DD_W // 2, 10, DD_W, DD_H)
self.dd_open = False # dropdown expanded?
self.clock = pygame.time.Clock()
self._build_sliders()
self._adjust_height()
def _adjust_height(self):
import pygame
motors = self.groups[self.current_group]
new_h = self.base_y + PADDING_Y * len(motors) + 40
if new_h != self.screen.get_height():
w = self.screen.get_width()
self.screen = pygame.display.set_mode((w, new_h))
def _build_sliders(self):
self.sliders: list[RangeSlider] = []
motors = self.groups[self.current_group]
for i, m in enumerate(motors):
self.sliders.append(
RangeSlider(
motor=m,
idx=i,
res=self.res_table[self.bus.motors[m].model] - 1,
calibration=self.calibration[m],
present=self.present_cache[m],
label_pad=self.label_pad,
base_y=self.base_y,
)
)
def _draw_dropdown(self):
import pygame
# collapsed box
hover = self.dd_btn.collidepoint(pygame.mouse.get_pos())
pygame.draw.rect(self.screen, DD_COLOR_HL if hover else DD_COLOR, self.dd_btn, border_radius=6)
txt = self.font.render(self.current_group, True, TEXT_COLOR)
self.screen.blit(
txt, (self.dd_btn.centerx - txt.get_width() // 2, self.dd_btn.centery - txt.get_height() // 2)
)
tri_w, tri_h = 12, 6
cx = self.dd_btn.right - 14
cy = self.dd_btn.centery + 1
pygame.draw.polygon(
self.screen,
TEXT_COLOR,
[(cx - tri_w // 2, cy - tri_h // 2), (cx + tri_w // 2, cy - tri_h // 2), (cx, cy + tri_h // 2)],
)
if not self.dd_open:
return
# expanded list
for i, name in enumerate(self.group_names):
item_rect = pygame.Rect(self.dd_btn.left, self.dd_btn.bottom + i * DD_H, DD_W, DD_H)
clr = DD_COLOR_HL if item_rect.collidepoint(pygame.mouse.get_pos()) else DD_COLOR
pygame.draw.rect(self.screen, clr, item_rect)
t = self.font.render(name, True, TEXT_COLOR)
self.screen.blit(
t, (item_rect.centerx - t.get_width() // 2, item_rect.centery - t.get_height() // 2)
)
def _handle_dropdown_event(self, e):
import pygame
if e.type == pygame.MOUSEBUTTONDOWN and e.button == 1:
if self.dd_btn.collidepoint(e.pos):
self.dd_open = not self.dd_open
return True
if self.dd_open:
for i, name in enumerate(self.group_names):
item_rect = pygame.Rect(self.dd_btn.left, self.dd_btn.bottom + i * DD_H, DD_W, DD_H)
if item_rect.collidepoint(e.pos):
if name != self.current_group:
self.current_group = name
self._build_sliders()
self._adjust_height()
self.dd_open = False
return True
self.dd_open = False
return False
def _save_current(self):
for s in self.sliders:
self.calibration[s.motor].range_min = s.min_v
self.calibration[s.motor].range_max = s.max_v
with self.bus.torque_disabled():
self.bus.write_calibration(self.calibration)
def _load_current(self):
self.calibration = self.bus.read_calibration()
for s in self.sliders:
s.min_v = self.calibration[s.motor].range_min
s.max_v = self.calibration[s.motor].range_max
s.min_x = s._pos_from_val(s.min_v)
s.max_x = s._pos_from_val(s.max_v)
def run(self) -> dict[str, MotorCalibration]:
import pygame
while True:
for e in pygame.event.get():
if e.type == pygame.QUIT:
pygame.quit()
return self.calibration
if self._handle_dropdown_event(e):
continue
if e.type == pygame.MOUSEBUTTONDOWN and e.button == 1:
if self.save_btn.collidepoint(e.pos):
self._save_current()
elif self.load_btn.collidepoint(e.pos):
self._load_current()
for s in self.sliders:
s.handle_event(e)
# live goal write while dragging
for s in self.sliders:
if s.drag_pos:
self.bus.write("Goal_Position", s.motor, s.pos_v, normalize=False)
# tick update
for s in self.sliders:
pos = self.bus.read("Present_Position", s.motor, normalize=False)
s.set_tick(pos)
self.present_cache[s.motor] = pos
# ─ drawing
self.screen.fill(BG_COLOR)
for s in self.sliders:
s.draw(self.screen)
self._draw_dropdown()
# load / save buttons
for rect, text in ((self.load_btn, "LOAD"), (self.save_btn, "SAVE")):
clr = BTN_COLOR_HL if rect.collidepoint(pygame.mouse.get_pos()) else BTN_COLOR
pygame.draw.rect(self.screen, clr, rect, border_radius=6)
t = self.font.render(text, True, TEXT_COLOR)
self.screen.blit(t, (rect.centerx - t.get_width() // 2, rect.centery - t.get_height() // 2))
pygame.display.flip()
self.clock.tick(FPS)

View File

@@ -162,11 +162,11 @@ class DynamixelMotorsBus(MotorsBus):
raise RuntimeError(f"Motor '{motor}' (model '{model}') was not found. Make sure it is connected.")
def configure_motors(self) -> None:
def configure_motors(self, return_delay_time=0) -> None:
# By default, Dynamixel motors have a 500µs delay response time (corresponding to a value of 250 on
# the 'Return_Delay_Time' address). We ensure this is reduced to the minimum of 2µs (value of 0).
for motor in self.motors:
self.write("Return_Delay_Time", motor, 0)
self.write("Return_Delay_Time", motor, return_delay_time)
@property
def is_calibrated(self) -> bool:
@@ -190,13 +190,14 @@ class DynamixelMotorsBus(MotorsBus):
return calibration
def write_calibration(self, calibration_dict: dict[str, MotorCalibration]) -> None:
def write_calibration(self, calibration_dict: dict[str, MotorCalibration], cache: bool = True) -> None:
for motor, calibration in calibration_dict.items():
self.write("Homing_Offset", motor, calibration.homing_offset)
self.write("Min_Position_Limit", motor, calibration.range_min)
self.write("Max_Position_Limit", motor, calibration.range_max)
self.calibration = calibration_dict
if cache:
self.calibration = calibration_dict
def disable_torque(self, motors: str | list[str] | None = None, num_retry: int = 0) -> None:
for motor in self._get_motors_list(motors):

View File

@@ -219,15 +219,15 @@ class FeetechMotorsBus(MotorsBus):
raise RuntimeError(f"Motor '{motor}' (model '{model}') was not found. Make sure it is connected.")
def configure_motors(self) -> None:
def configure_motors(self, return_delay_time=0, maximum_acceleration=254, acceleration=254) -> None:
for motor in self.motors:
# By default, Feetech motors have a 500µs delay response time (corresponding to a value of 250 on
# the 'Return_Delay_Time' address). We ensure this is reduced to the minimum of 2µs (value of 0).
self.write("Return_Delay_Time", motor, 0)
self.write("Return_Delay_Time", motor, return_delay_time)
# Set 'Maximum_Acceleration' to 254 to speedup acceleration and deceleration of the motors.
# Note: this address is not in the official STS3215 Memory Table
self.write("Maximum_Acceleration", motor, 254)
self.write("Acceleration", motor, 254)
if self.protocol_version == 0:
self.write("Maximum_Acceleration", motor, maximum_acceleration)
self.write("Acceleration", motor, acceleration)
@property
def is_calibrated(self) -> bool:
@@ -270,14 +270,15 @@ class FeetechMotorsBus(MotorsBus):
return calibration
def write_calibration(self, calibration_dict: dict[str, MotorCalibration]) -> None:
def write_calibration(self, calibration_dict: dict[str, MotorCalibration], cache: bool = True) -> None:
for motor, calibration in calibration_dict.items():
if self.protocol_version == 0:
self.write("Homing_Offset", motor, calibration.homing_offset)
self.write("Min_Position_Limit", motor, calibration.range_min)
self.write("Max_Position_Limit", motor, calibration.range_max)
self.calibration = calibration_dict
if cache:
self.calibration = calibration_dict
def _get_half_turn_homings(self, positions: dict[NameOrID, Value]) -> dict[NameOrID, Value]:
"""

View File

@@ -189,7 +189,7 @@ MODEL_RESOLUTION = {
"scs_series": 1024,
"sts3215": 4096,
"sts3250": 4096,
"sm8512bl": 65536,
"sm8512bl": 4096,
"scs0009": 1024,
}

View File

@@ -586,7 +586,7 @@ class MotorsBus(abc.ABC):
pass
@contextmanager
def torque_disabled(self):
def torque_disabled(self, motors: int | str | list[str] | None = None):
"""Context-manager that guarantees torque is re-enabled.
This helper is useful to temporarily disable torque when configuring motors.
@@ -596,11 +596,11 @@ class MotorsBus(abc.ABC):
... # Safe operations here
... pass
"""
self.disable_torque()
self.disable_torque(motors)
try:
yield
finally:
self.enable_torque()
self.enable_torque(motors)
def set_timeout(self, timeout_ms: int | None = None):
"""Change the packet timeout used by the SDK.
@@ -653,12 +653,13 @@ class MotorsBus(abc.ABC):
pass
@abc.abstractmethod
def write_calibration(self, calibration_dict: dict[str, MotorCalibration]) -> None:
"""Write calibration parameters to the motors and cache them.
def write_calibration(self, calibration_dict: dict[str, MotorCalibration], cache: bool = True) -> None:
"""Write calibration parameters to the motors and optionally cache them.
Args:
calibration_dict (dict[str, MotorCalibration]): Calibration obtained from
:pymeth:`read_calibration` or crafted by the user.
cache (bool, optional): Save the calibration to :pyattr:`calibration`. Defaults to True.
"""
pass

View File

@@ -57,6 +57,7 @@ from lerobot.policies.pretrained import PreTrainedPolicy
from lerobot.robots import ( # noqa: F401
Robot,
RobotConfig,
hope_jr,
koch_follower,
make_robot_from_config,
so100_follower,
@@ -65,6 +66,7 @@ from lerobot.robots import ( # noqa: F401
from lerobot.teleoperators import ( # noqa: F401
Teleoperator,
TeleoperatorConfig,
homunculus,
koch_leader,
make_teleoperator_from_config,
so100_leader,

View File

@@ -39,6 +39,7 @@ from lerobot.datasets.lerobot_dataset import LeRobotDataset
from lerobot.robots import ( # noqa: F401
Robot,
RobotConfig,
hope_jr,
koch_follower,
make_robot_from_config,
so100_follower,

View File

@@ -0,0 +1,3 @@
from .config_hope_jr import HopeJrArmConfig, HopeJrHandConfig
from .hope_jr_arm import HopeJrArm
from .hope_jr_hand import HopeJrHand

View File

@@ -0,0 +1,51 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from dataclasses import dataclass, field
from lerobot.cameras import CameraConfig
from ..config import RobotConfig
@RobotConfig.register_subclass("hope_jr_hand")
@dataclass
class HopeJrHandConfig(RobotConfig):
port: str # Port to connect to the hand
side: str # "left" / "right"
disable_torque_on_disconnect: bool = True
cameras: dict[str, CameraConfig] = field(default_factory=dict)
def __post_init__(self):
super().__post_init__()
if self.side not in ["right", "left"]:
raise ValueError(self.side)
@RobotConfig.register_subclass("hope_jr_arm")
@dataclass
class HopeJrArmConfig(RobotConfig):
port: str # Port to connect to the hand
disable_torque_on_disconnect: bool = True
# `max_relative_target` limits the magnitude of the relative positional target vector for safety purposes.
# Set this to a positive scalar to have the same value for all motors, or a list that is the same length as
# the number of motors in your follower arms.
max_relative_target: int | None = None
cameras: dict[str, CameraConfig] = field(default_factory=dict)

View File

@@ -0,0 +1,268 @@
# HopeJR
## Prerequisites
- [Hardware Setup](https://github.com/TheRobotStudio/HOPEJr)
## Install LeRobot
Follow the [installation instructions](https://github.com/huggingface/lerobot#installation) to install LeRobot.
Install LeRobot with HopeJR dependencies:
```bash
pip install -e ".[hopejr]"
```
## Device Configuration
Before starting calibration and operation, you need to identify the USB ports for each HopeJR component. Run this script to find the USB ports for the arm, hand, glove, and exoskeleton:
```bash
python -m lerobot.find_port
```
This will display the available USB ports and their associated devices. Make note of the port paths (e.g., `/dev/tty.usbmodem58760433331`, `/dev/tty.usbmodem11301`) as you'll need to specify them in the `--robot.port` and `--teleop.port` parameters when recording data, replaying episodes, or running teleoperation scripts.
## Step 1: Calibration
Before performing teleoperation, HopeJR's limbs need to be calibrated. Calibration files will be saved in `~/.cache/huggingface/lerobot/calibration`
### 1.1 Calibrate Robot Hand
```bash
python -m lerobot.calibrate \
--robot.type=hope_jr_hand \
--robot.port=/dev/tty.usbmodem58760432281 \
--robot.id=blue \
--robot.side=right
```
When running the calibration script, a calibration GUI will pop up. Finger joints are named as follows:
**Thumb**:
- **CMC**: base joint connecting thumb to hand
- **MCP**: knuckle joint
- **PIP**: first finger joint
- **DIP** : fingertip joint
**Index, Middle, Ring, and Pinky fingers**:
- **Radial flexor**: Moves base of finger towards the thumb
- **Ulnar flexor**: Moves base of finger towards the pinky
- **PIP/DIP**: Flexes the distal and proximal phalanx of the finger
Each one of these will need to be calibrated individually via the GUI.
Note that ulnar and radial flexors should have ranges of the same size (but with different offsets) in order to get symmetric movement.
<p align="center">
<img src="https://huggingface.co/datasets/huggingface/documentation-images/resolve/main/lerobot/calibration_gui_1.png"
alt="Setting boundaries in the hand calibration GUI"
title="Setting boundaries in the hand calibration GUI"
width="100%">
</img>
</p>
Use the calibration interface to set the range boundaries for each joint as shown above.
<p align="center">
<img src="https://huggingface.co/datasets/huggingface/documentation-images/resolve/main/lerobot/calibration_gui_2.png"
alt="Saving calibration values"
title="Saving calibration values"
width="100%">
</img>
</p>
Once you have set the appropriate boundaries for all joints, click "Save" to save the calibration values to the motors.
### 1.2 Calibrate Teleoperator Glove
```bash
python -m lerobot.calibrate \
--teleop.type=homunculus_glove \
--teleop.port=/dev/tty.usbmodem11201 \
--teleop.id=red \
--teleop.side=right
```
Move each finger through its full range of motion, starting from the thumb.
```
Move thumb through its entire range of motion.
Recording positions. Press ENTER to stop...
-------------------------------------------
NAME | MIN | POS | MAX
thumb_cmc | 1790 | 1831 | 1853
thumb_mcp | 1497 | 1514 | 1528
thumb_pip | 1466 | 1496 | 1515
thumb_dip | 1463 | 1484 | 1514
```
Continue with each finger:
```
Move middle through its entire range of motion.
Recording positions. Press ENTER to stop...
-------------------------------------------
NAME | MIN | POS | MAX
middle_mcp_abduction | 1598 | 1718 | 1820
middle_mcp_flexion | 1512 | 1658 | 2136
middle_dip | 1484 | 1500 | 1547
```
Once calibration is complete, the system will save the calibration to `/Users/your_username/.cache/huggingface/lerobot/calibration/teleoperators/homunculus_glove/red.json`
### 1.3 Calibrate Robot Arm
```bash
python -m lerobot.calibrate \
--robot.type=hope_jr_arm \
--robot.port=/dev/tty.usbserial-1110 \
--robot.id=white
```
This will open a calibration GUI where you can set the range limits for each motor. The arm motions are organized as follows:
- **Shoulder**: pitch, yaw, and roll
- **Elbow**: flex
- **Wrist**: pitch, yaw, and roll
<p align="center">
<img src="https://huggingface.co/datasets/huggingface/documentation-images/resolve/main/lerobot/calibration_gui_2.png"
alt="Setting boundaries in the arm calibration GUI"
title="Setting boundaries in the arm calibration GUI"
width="100%">
</img>
</p>
Use the calibration interface to set the range boundaries for each joint. Move each joint through its full range of motion and adjust the minimum and maximum values accordingly. Once you have set the appropriate boundaries for all joints, save the calibration.
### 1.4 Calibrate Teleoperator Exoskeleton
```bash
python -m lerobot.calibrate \
--teleop.type=homunculus_arm \
--teleop.port=/dev/tty.usbmodem11201 \
--teleop.id=black
```
The exoskeleton allows one to control the robot arm. During calibration, you'll be prompted to move all joints through their full range of motion:
```
Move all joints through their entire range of motion.
Recording positions. Press ENTER to stop...
-------------------------------------------
-------------------------------------------
NAME | MIN | POS | MAX
shoulder_pitch | 586 | 736 | 895
shoulder_yaw | 1257 | 1374 | 1390
shoulder_roll | 449 | 1034 | 2564
elbow_flex | 3023 | 3117 | 3134
wrist_roll | 3073 | 3096 | 3147
wrist_yaw | 2143 | 2171 | 2185
wrist_pitch | 1975 | 1993 | 2074
Calibration saved to /Users/your_username/.cache/huggingface/lerobot/calibration/teleoperators/homunculus_arm/black.json
```
## Step 2: Teleoperation
Due to global variable conflicts in the Feetech middleware, teleoperation for arm and hand must run in separate shell sessions:
### Hand
```bash
python -m lerobot.teleoperate \
--robot.type=hope_jr_hand \
--robot.port=/dev/tty.usbmodem58760432281 \
--robot.id=blue \
--robot.side=right \
--teleop.type=homunculus_glove \
--teleop.port=/dev/tty.usbmodem11201 \
--teleop.id=red \
--teleop.side=right \
--display_data=true \
--fps=30
```
### Arm
```bash
python -m lerobot.teleoperate \
--robot.type=hope_jr_arm \
--robot.port=/dev/tty.usbserial-1110 \
--robot.id=white \
--teleop.type=homunculus_arm \
--teleop.port=/dev/tty.usbmodem11201 \
--teleop.id=black \
--display_data=true \
--fps=30
```
## Step 3: Record, Replay, Train
Record, Replay and Train with Hope-JR is still experimental.
### Record
This step records the dataset, which can be seen as an example [here](https://huggingface.co/datasets/nepyope/hand_record_test_with_video_data/settings).
```bash
python -m lerobot.record \
--robot.type=hope_jr_hand \
--robot.port=/dev/tty.usbmodem58760432281 \
--robot.id=right \
--robot.side=right \
--robot.cameras='{"main": {"type": "opencv", "index_or_path": 0, "width": 640, "height": 480, "fps": 30}}' \
--teleop.type=homunculus_glove \
--teleop.port=/dev/tty.usbmodem1201 \
--teleop.id=right \
--teleop.side=right \
--dataset.repo_id=nepyope/hand_record_test_with_video_data \
--dataset.single_task="Hand recording test with video data" \
--dataset.num_episodes=1 \
--dataset.episode_time_s=5 \
--dataset.push_to_hub=true \
--dataset.private=true \
--display_data=true
```
### Replay
```bash
python -m lerobot.replay \
--robot.type=hope_jr_hand \
--robot.port=/dev/tty.usbmodem58760432281 \
--robot.id=right \
--robot.side=right \
--dataset.repo_id=nepyope/hand_record_test_with_camera \
--dataset.episode=0
```
### Train
```bash
python -m lerobot.scripts.train \
--dataset.repo_id=nepyope/hand_record_test_with_video_data \
--policy.type=act \
--output_dir=outputs/train/hopejr_hand \
--job_name=hopejr \
--policy.device=mps \
--wandb.enable=true \
--policy.repo_id=nepyope/hand_test_policy
```
### Evaluate
This training run can be viewed as an example [here](https://wandb.ai/tino/lerobot/runs/rp0k8zvw?nw=nwusertino).
```bash
python -m lerobot.record \
--robot.type=hope_jr_hand \
--robot.port=/dev/tty.usbmodem58760432281 \
--robot.id=right \
--robot.side=right \
--robot.cameras='{"main": {"type": "opencv", "index_or_path": 0, "width": 640, "height": 480, "fps": 30}}' \
--display_data=false \
--dataset.repo_id=nepyope/eval_hopejr \
--dataset.single_task="Evaluate hopejr hand policy" \
--dataset.num_episodes=10 \
--policy.path=outputs/train/hopejr_hand/checkpoints/last/pretrained_model
```

View File

@@ -0,0 +1,176 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import time
from functools import cached_property
from typing import Any
from lerobot.cameras.utils import make_cameras_from_configs
from lerobot.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
from lerobot.motors import Motor, MotorNormMode
from lerobot.motors.calibration_gui import RangeFinderGUI
from lerobot.motors.feetech import (
FeetechMotorsBus,
)
from ..robot import Robot
from ..utils import ensure_safe_goal_position
from .config_hope_jr import HopeJrArmConfig
logger = logging.getLogger(__name__)
class HopeJrArm(Robot):
config_class = HopeJrArmConfig
name = "hope_jr_arm"
def __init__(self, config: HopeJrArmConfig):
super().__init__(config)
self.config = config
self.bus = FeetechMotorsBus(
port=self.config.port,
motors={
"shoulder_pitch": Motor(1, "sm8512bl", MotorNormMode.RANGE_M100_100),
"shoulder_yaw": Motor(2, "sts3250", MotorNormMode.RANGE_M100_100),
"shoulder_roll": Motor(3, "sts3250", MotorNormMode.RANGE_M100_100),
"elbow_flex": Motor(4, "sts3250", MotorNormMode.RANGE_M100_100),
"wrist_roll": Motor(5, "sts3250", MotorNormMode.RANGE_M100_100),
"wrist_yaw": Motor(6, "sts3250", MotorNormMode.RANGE_M100_100),
"wrist_pitch": Motor(7, "sts3250", MotorNormMode.RANGE_M100_100),
},
calibration=self.calibration,
)
self.cameras = make_cameras_from_configs(config.cameras)
# HACK
self.shoulder_pitch = "shoulder_pitch"
self.other_motors = [m for m in self.bus.motors if m != "shoulder_pitch"]
@property
def _motors_ft(self) -> dict[str, type]:
return {f"{motor}.pos": float for motor in self.bus.motors}
@property
def _cameras_ft(self) -> dict[str, tuple]:
return {
cam: (self.config.cameras[cam].height, self.config.cameras[cam].width, 3) for cam in self.cameras
}
@cached_property
def observation_features(self) -> dict[str, type | tuple]:
return {**self._motors_ft, **self._cameras_ft}
@cached_property
def action_features(self) -> dict[str, type]:
return self._motors_ft
@property
def is_connected(self) -> bool:
return self.bus.is_connected and all(cam.is_connected for cam in self.cameras.values())
def connect(self, calibrate: bool = True) -> None:
"""
We assume that at connection time, arm is in a rest position,
and torque can be safely disabled to run calibration.
"""
if self.is_connected:
raise DeviceAlreadyConnectedError(f"{self} already connected")
self.bus.connect(handshake=False)
if not self.is_calibrated and calibrate:
self.calibrate()
# Connect the cameras
for cam in self.cameras.values():
cam.connect()
self.configure()
logger.info(f"{self} connected.")
@property
def is_calibrated(self) -> bool:
return self.bus.is_calibrated
def calibrate(self, limb_name: str = None) -> None:
groups = {
"all": list(self.bus.motors.keys()),
"shoulder": ["shoulder_pitch", "shoulder_yaw", "shoulder_roll"],
"elbow": ["elbow_flex"],
"wrist": ["wrist_roll", "wrist_yaw", "wrist_pitch"],
}
self.calibration = RangeFinderGUI(self.bus, groups).run()
self._save_calibration()
print("Calibration saved to", self.calibration_fpath)
def configure(self) -> None:
with self.bus.torque_disabled():
self.bus.configure_motors(maximum_acceleration=30, acceleration=30)
def setup_motors(self) -> None:
# TODO: add docstring
for motor in reversed(self.bus.motors):
input(f"Connect the controller board to the '{motor}' motor only and press enter.")
self.bus.setup_motor(motor)
print(f"'{motor}' motor id set to {self.bus.motors[motor].id}")
def get_observation(self) -> dict[str, Any]:
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
# Read arm position
start = time.perf_counter()
obs_dict = self.bus.sync_read("Present_Position", self.other_motors)
obs_dict[self.shoulder_pitch] = self.bus.read("Present_Position", self.shoulder_pitch)
obs_dict = {f"{motor}.pos": val for motor, val in obs_dict.items()}
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read state: {dt_ms:.1f}ms")
# Capture images from cameras
for cam_key, cam in self.cameras.items():
start = time.perf_counter()
obs_dict[cam_key] = cam.async_read()
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms")
return obs_dict
def send_action(self, action: dict[str, Any]) -> dict[str, Any]:
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
goal_pos = {key.removesuffix(".pos"): val for key, val in action.items() if key.endswith(".pos")}
# Cap goal position when too far away from present position.
# /!\ Slower fps expected due to reading from the follower.
if self.config.max_relative_target is not None:
present_pos = self.bus.sync_read("Present_Position")
goal_present_pos = {key: (g_pos, present_pos[key]) for key, g_pos in goal_pos.items()}
goal_pos = ensure_safe_goal_position(goal_present_pos, self.config.max_relative_target)
self.bus.sync_write("Goal_Position", goal_pos)
return {f"{motor}.pos": val for motor, val in goal_pos.items()}
def disconnect(self):
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
self.bus.disconnect(self.config.disable_torque_on_disconnect)
for cam in self.cameras.values():
cam.disconnect()
logger.info(f"{self} disconnected.")

View File

@@ -0,0 +1,200 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import time
from functools import cached_property
from typing import Any
from lerobot.cameras.utils import make_cameras_from_configs
from lerobot.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
from lerobot.motors import Motor, MotorNormMode
from lerobot.motors.calibration_gui import RangeFinderGUI
from lerobot.motors.feetech import (
FeetechMotorsBus,
)
from ..robot import Robot
from .config_hope_jr import HopeJrHandConfig
logger = logging.getLogger(__name__)
RIGHT_HAND_INVERSIONS = [
"thumb_mcp",
"thumb_dip",
"index_ulnar_flexor",
"middle_ulnar_flexor",
"ring_ulnar_flexor",
"ring_pip_dip",
"pinky_ulnar_flexor",
"pinky_pip_dip",
]
LEFT_HAND_INVERSIONS = [
"thumb_cmc",
"thumb_mcp",
"thumb_dip",
"index_radial_flexor",
"index_pip_dip",
"middle_radial_flexor",
"middle_pip_dip",
"ring_radial_flexor",
"ring_pip_dip",
"pinky_radial_flexor",
# "pinky_pip_dip",
]
class HopeJrHand(Robot):
config_class = HopeJrHandConfig
name = "hope_jr_hand"
def __init__(self, config: HopeJrHandConfig):
super().__init__(config)
self.config = config
self.bus = FeetechMotorsBus(
port=self.config.port,
motors={
# Thumb
"thumb_cmc": Motor(1, "scs0009", MotorNormMode.RANGE_0_100),
"thumb_mcp": Motor(2, "scs0009", MotorNormMode.RANGE_0_100),
"thumb_pip": Motor(3, "scs0009", MotorNormMode.RANGE_0_100),
"thumb_dip": Motor(4, "scs0009", MotorNormMode.RANGE_0_100),
# Index
"index_radial_flexor": Motor(5, "scs0009", MotorNormMode.RANGE_0_100),
"index_ulnar_flexor": Motor(6, "scs0009", MotorNormMode.RANGE_0_100),
"index_pip_dip": Motor(7, "scs0009", MotorNormMode.RANGE_0_100),
# Middle
"middle_radial_flexor": Motor(8, "scs0009", MotorNormMode.RANGE_0_100),
"middle_ulnar_flexor": Motor(9, "scs0009", MotorNormMode.RANGE_0_100),
"middle_pip_dip": Motor(10, "scs0009", MotorNormMode.RANGE_0_100),
# Ring
"ring_radial_flexor": Motor(11, "scs0009", MotorNormMode.RANGE_0_100),
"ring_ulnar_flexor": Motor(12, "scs0009", MotorNormMode.RANGE_0_100),
"ring_pip_dip": Motor(13, "scs0009", MotorNormMode.RANGE_0_100),
# Pinky
"pinky_radial_flexor": Motor(14, "scs0009", MotorNormMode.RANGE_0_100),
"pinky_ulnar_flexor": Motor(15, "scs0009", MotorNormMode.RANGE_0_100),
"pinky_pip_dip": Motor(16, "scs0009", MotorNormMode.RANGE_0_100),
},
calibration=self.calibration,
protocol_version=1,
)
self.cameras = make_cameras_from_configs(config.cameras)
self.inverted_motors = RIGHT_HAND_INVERSIONS if config.side == "right" else LEFT_HAND_INVERSIONS
@property
def _motors_ft(self) -> dict[str, type]:
return {f"{motor}.pos": float for motor in self.bus.motors}
@property
def _cameras_ft(self) -> dict[str, tuple]:
return {
cam: (self.config.cameras[cam].height, self.config.cameras[cam].width, 3) for cam in self.cameras
}
@cached_property
def observation_features(self) -> dict[str, type | tuple]:
return {**self._motors_ft, **self._cameras_ft}
@cached_property
def action_features(self) -> dict[str, type]:
return self._motors_ft
@property
def is_connected(self) -> bool:
return self.bus.is_connected and all(cam.is_connected for cam in self.cameras.values())
def connect(self, calibrate: bool = True) -> None:
if self.is_connected:
raise DeviceAlreadyConnectedError(f"{self} already connected")
self.bus.connect()
if not self.is_calibrated and calibrate:
self.calibrate()
# Connect the cameras
for cam in self.cameras.values():
cam.connect()
self.configure()
logger.info(f"{self} connected.")
@property
def is_calibrated(self) -> bool:
return self.bus.is_calibrated
def calibrate(self) -> None:
fingers = {}
for finger in ["thumb", "index", "middle", "ring", "pinky"]:
fingers[finger] = [motor for motor in self.bus.motors if motor.startswith(finger)]
self.calibration = RangeFinderGUI(self.bus, fingers).run()
for motor in self.inverted_motors:
self.calibration[motor].drive_mode = 1
self._save_calibration()
print("Calibration saved to", self.calibration_fpath)
def configure(self) -> None:
with self.bus.torque_disabled():
self.bus.configure_motors()
def setup_motors(self) -> None:
# TODO: add docstring
for motor in self.bus.motors:
input(f"Connect the controller board to the '{motor}' motor only and press enter.")
self.bus.setup_motor(motor)
print(f"'{motor}' motor id set to {self.bus.motors[motor].id}")
def get_observation(self) -> dict[str, Any]:
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
obs_dict = {}
# Read hand position
start = time.perf_counter()
for motor in self.bus.motors:
obs_dict[f"{motor}.pos"] = self.bus.read("Present_Position", motor)
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read state: {dt_ms:.1f}ms")
# Capture images from cameras
for cam_key, cam in self.cameras.items():
start = time.perf_counter()
obs_dict[cam_key] = cam.async_read()
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms")
return obs_dict
def send_action(self, action: dict[str, Any]) -> dict[str, Any]:
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
goal_pos = {key.removesuffix(".pos"): val for key, val in action.items() if key.endswith(".pos")}
self.bus.sync_write("Goal_Position", goal_pos)
return action
def disconnect(self):
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
self.bus.disconnect(self.config.disable_torque_on_disconnect)
for cam in self.cameras.values():
cam.disconnect()
logger.info(f"{self} disconnected.")

View File

@@ -49,6 +49,14 @@ def make_robot_from_config(config: RobotConfig) -> Robot:
from .viperx import ViperX
return ViperX(config)
elif config.type == "hope_jr_hand":
from .hope_jr import HopeJrHand
return HopeJrHand(config)
elif config.type == "hope_jr_arm":
from .hope_jr import HopeJrArm
return HopeJrArm(config)
elif config.type == "mock_robot":
from tests.mocks.mock_robot import MockRobot

View File

@@ -43,6 +43,7 @@ from lerobot.cameras.realsense.configuration_realsense import RealSenseCameraCon
from lerobot.robots import ( # noqa: F401
Robot,
RobotConfig,
hope_jr,
koch_follower,
make_robot_from_config,
so100_follower,
@@ -52,6 +53,7 @@ from lerobot.teleoperators import ( # noqa: F401
Teleoperator,
TeleoperatorConfig,
gamepad,
homunculus,
koch_leader,
make_teleoperator_from_config,
so100_leader,

View File

@@ -0,0 +1,4 @@
from .config_homunculus import HomunculusArmConfig, HomunculusGloveConfig
from .homunculus_arm import HomunculusArm
from .homunculus_glove import HomunculusGlove
from .joints_translation import homunculus_glove_to_hope_jr_hand

View File

@@ -0,0 +1,38 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from dataclasses import dataclass
from ..config import TeleoperatorConfig
@TeleoperatorConfig.register_subclass("homunculus_glove")
@dataclass
class HomunculusGloveConfig(TeleoperatorConfig):
port: str # Port to connect to the glove
side: str # "left" / "right"
baud_rate: int = 115_200
def __post_init__(self):
if self.side not in ["right", "left"]:
raise ValueError(self.side)
@TeleoperatorConfig.register_subclass("homunculus_arm")
@dataclass
class HomunculusArmConfig(TeleoperatorConfig):
port: str # Port to connect to the arm
baud_rate: int = 115_200

View File

@@ -0,0 +1,310 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import threading
from collections import deque
from pprint import pformat
from typing import Deque, Dict, Optional
import serial
from lerobot.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
from lerobot.motors.motors_bus import MotorCalibration, MotorNormMode
from lerobot.utils.utils import enter_pressed, move_cursor_up
from ..teleoperator import Teleoperator
from .config_homunculus import HomunculusArmConfig
logger = logging.getLogger(__name__)
class HomunculusArm(Teleoperator):
"""
Homunculus Arm designed by Hugging Face.
"""
config_class = HomunculusArmConfig
name = "homunculus_arm"
def __init__(self, config: HomunculusArmConfig):
super().__init__(config)
self.config = config
self.serial = serial.Serial(config.port, config.baud_rate, timeout=1)
self.serial_lock = threading.Lock()
self.joints = {
"shoulder_pitch": MotorNormMode.RANGE_M100_100,
"shoulder_yaw": MotorNormMode.RANGE_M100_100,
"shoulder_roll": MotorNormMode.RANGE_M100_100,
"elbow_flex": MotorNormMode.RANGE_M100_100,
"wrist_roll": MotorNormMode.RANGE_M100_100,
"wrist_yaw": MotorNormMode.RANGE_M100_100,
"wrist_pitch": MotorNormMode.RANGE_M100_100,
}
n = 50
# EMA parameters ---------------------------------------------------
self.n: int = n
self.alpha: float = 2 / (n + 1)
# one deque *per joint* so we can inspect raw history if needed
self._buffers: Dict[str, Deque[int]] = {
joint: deque(maxlen=n)
for joint in (
"shoulder_pitch",
"shoulder_yaw",
"shoulder_roll",
"elbow_flex",
"wrist_roll",
"wrist_yaw",
"wrist_pitch",
)
}
# running EMA value per joint lazily initialised on first read
self._ema: Dict[str, Optional[float]] = dict.fromkeys(self._buffers)
self._state: dict[str, float] | None = None
self.new_state_event = threading.Event()
self.stop_event = threading.Event()
self.thread = threading.Thread(target=self._read_loop, daemon=True, name=f"{self} _read_loop")
self.state_lock = threading.Lock()
@property
def action_features(self) -> dict:
return {f"{joint}.pos": float for joint in self.joints}
@property
def feedback_features(self) -> dict:
return {}
@property
def is_connected(self) -> bool:
with self.serial_lock:
return self.serial.is_open and self.thread.is_alive()
def connect(self, calibrate: bool = True) -> None:
if self.is_connected:
raise DeviceAlreadyConnectedError(f"{self} already connected")
if not self.serial.is_open:
self.serial.open()
self.thread.start()
# wait for the thread to ramp up & 1st state to be ready
if not self.new_state_event.wait(timeout=2):
raise TimeoutError(f"{self}: Timed out waiting for state after 2s.")
if not self.is_calibrated and calibrate:
self.calibrate()
logger.info(f"{self} connected.")
@property
def is_calibrated(self) -> bool:
return self.calibration_fpath.is_file()
def calibrate(self) -> None:
print(
"\nMove all joints through their entire range of motion."
"\nRecording positions. Press ENTER to stop..."
)
range_mins, range_maxes = self._record_ranges_of_motion()
self.calibration = {}
for id_, joint in enumerate(self.joints):
self.calibration[joint] = MotorCalibration(
id=id_,
drive_mode=0,
homing_offset=0,
range_min=range_mins[joint],
range_max=range_maxes[joint],
)
self._save_calibration()
print("Calibration saved to", self.calibration_fpath)
# TODO(Steven): This function is copy/paste from the `HomunculusGlove` class. Consider moving it to an utility to reduce duplicated code.
def _record_ranges_of_motion(
self, joints: list[str] | None = None, display_values: bool = True
) -> tuple[dict[str, int], dict[str, int]]:
"""Interactively record the min/max encoder values of each joint.
Move the joints while the method streams live positions. Press :kbd:`Enter` to finish.
Args:
joints (list[str] | None, optional): Joints to record. Defaults to every joint (`None`).
display_values (bool, optional): When `True` (default) a live table is printed to the console.
Raises:
TypeError: `joints` is not `None` or a list.
ValueError: any joint's recorded min and max are the same.
Returns:
tuple[dict[str, int], dict[str, int]]: Two dictionaries *mins* and *maxes* with the extreme values
observed for each joint.
"""
if joints is None:
joints = list(self.joints)
elif not isinstance(joints, list):
raise TypeError(joints)
display_len = max(len(key) for key in joints)
start_positions = self._read(joints, normalize=False)
mins = start_positions.copy()
maxes = start_positions.copy()
user_pressed_enter = False
while not user_pressed_enter:
positions = self._read(joints, normalize=False)
mins = {joint: int(min(positions[joint], min_)) for joint, min_ in mins.items()}
maxes = {joint: int(max(positions[joint], max_)) for joint, max_ in maxes.items()}
if display_values:
print("\n-------------------------------------------")
print(f"{'NAME':<{display_len}} | {'MIN':>6} | {'POS':>6} | {'MAX':>6}")
for joint in joints:
print(
f"{joint:<{display_len}} | {mins[joint]:>6} | {positions[joint]:>6} | {maxes[joint]:>6}"
)
if enter_pressed():
user_pressed_enter = True
if display_values and not user_pressed_enter:
# Move cursor up to overwrite the previous output
move_cursor_up(len(joints) + 3)
same_min_max = [joint for joint in joints if mins[joint] == maxes[joint]]
if same_min_max:
raise ValueError(f"Some joints have the same min and max values:\n{pformat(same_min_max)}")
return mins, maxes
def configure(self) -> None:
pass
# TODO(Steven): This function is copy/paste from the `HomunculusGlove` class. Consider moving it to an utility to reduce duplicated code.
def _normalize(self, values: dict[str, int]) -> dict[str, float]:
if not self.calibration:
raise RuntimeError(f"{self} has no calibration registered.")
normalized_values = {}
for joint, val in values.items():
min_ = self.calibration[joint].range_min
max_ = self.calibration[joint].range_max
drive_mode = self.calibration[joint].drive_mode
bounded_val = min(max_, max(min_, val))
if self.joints[joint] is MotorNormMode.RANGE_M100_100:
norm = (((bounded_val - min_) / (max_ - min_)) * 200) - 100
normalized_values[joint] = -norm if drive_mode else norm
elif self.joints[joint] is MotorNormMode.RANGE_0_100:
norm = ((bounded_val - min_) / (max_ - min_)) * 100
normalized_values[joint] = 100 - norm if drive_mode else norm
return normalized_values
def _apply_ema(self, raw: Dict[str, int]) -> Dict[str, float]:
"""Update buffers & running EMA values; return smoothed dict."""
smoothed: Dict[str, float] = {}
for joint, value in raw.items():
# maintain raw history
self._buffers[joint].append(value)
# initialise on first run
if self._ema[joint] is None:
self._ema[joint] = float(value)
else:
self._ema[joint] = self.alpha * value + (1 - self.alpha) * self._ema[joint]
smoothed[joint] = self._ema[joint]
return smoothed
def _read(
self, joints: list[str] | None = None, normalize: bool = True, timeout: float = 1
) -> dict[str, int | float]:
"""
Return the most recent (single) values from self.last_d,
optionally applying calibration.
"""
if not self.new_state_event.wait(timeout=timeout):
raise TimeoutError(f"{self}: Timed out waiting for state after {timeout}s.")
with self.state_lock:
state = self._state
self.new_state_event.clear()
if state is None:
raise RuntimeError(f"{self} Internal error: Event set but no state available.")
if joints is not None:
state = {k: v for k, v in state.items() if k in joints}
if normalize:
state = self._normalize(state)
state = self._apply_ema(state)
return state
def _read_loop(self):
"""
Continuously read from the serial buffer in its own thread and sends values to the main thread through
a queue.
"""
while not self.stop_event.is_set():
try:
raw_values = None
with self.serial_lock:
if self.serial.in_waiting > 0:
self.serial.flush()
raw_values = self.serial.readline().decode("utf-8").strip().split(" ")
if raw_values is None or len(raw_values) != 21: # 16 raw + 5 angle values
continue
joint_angles = {
"shoulder_pitch": int(raw_values[19]),
"shoulder_yaw": int(raw_values[18]),
"shoulder_roll": int(raw_values[20]),
"elbow_flex": int(raw_values[17]),
"wrist_roll": int(raw_values[16]),
"wrist_yaw": int(raw_values[1]),
"wrist_pitch": int(raw_values[0]),
}
with self.state_lock:
self._state = joint_angles
self.new_state_event.set()
except Exception as e:
logger.debug(f"Error reading frame in background thread for {self}: {e}")
def get_action(self) -> dict[str, float]:
joint_positions = self._read()
return {f"{joint}.pos": pos for joint, pos in joint_positions.items()}
def send_feedback(self, feedback: dict[str, float]) -> None:
raise NotImplementedError
def disconnect(self) -> None:
if not self.is_connected:
DeviceNotConnectedError(f"{self} is not connected.")
self.stop_event.set()
self.thread.join(timeout=1)
self.serial.close()
logger.info(f"{self} disconnected.")

View File

@@ -0,0 +1,338 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import threading
from collections import deque
from pprint import pformat
from typing import Deque, Dict, Optional
import serial
from lerobot.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
from lerobot.motors import MotorCalibration
from lerobot.motors.motors_bus import MotorNormMode
from lerobot.teleoperators.homunculus.joints_translation import homunculus_glove_to_hope_jr_hand
from lerobot.utils.utils import enter_pressed, move_cursor_up
from ..teleoperator import Teleoperator
from .config_homunculus import HomunculusGloveConfig
logger = logging.getLogger(__name__)
LEFT_HAND_INVERSIONS = [
"thumb_cmc",
"index_dip",
"middle_mcp_abduction",
"middle_dip",
"pinky_mcp_abduction",
"pinky_dip",
]
RIGHT_HAND_INVERSIONS = [
"thumb_mcp",
"thumb_cmc",
"thumb_pip",
"thumb_dip",
"index_mcp_abduction",
# "index_dip",
"middle_mcp_abduction",
# "middle_dip",
"ring_mcp_abduction",
"ring_mcp_flexion",
# "ring_dip",
"pinky_mcp_abduction",
]
class HomunculusGlove(Teleoperator):
"""
Homunculus Glove designed by NepYope & Hugging Face.
"""
config_class = HomunculusGloveConfig
name = "homunculus_glove"
def __init__(self, config: HomunculusGloveConfig):
super().__init__(config)
self.config = config
self.serial = serial.Serial(config.port, config.baud_rate, timeout=1)
self.serial_lock = threading.Lock()
self.joints = {
"thumb_cmc": MotorNormMode.RANGE_0_100,
"thumb_mcp": MotorNormMode.RANGE_0_100,
"thumb_pip": MotorNormMode.RANGE_0_100,
"thumb_dip": MotorNormMode.RANGE_0_100,
"index_mcp_abduction": MotorNormMode.RANGE_M100_100,
"index_mcp_flexion": MotorNormMode.RANGE_0_100,
"index_dip": MotorNormMode.RANGE_0_100,
"middle_mcp_abduction": MotorNormMode.RANGE_M100_100,
"middle_mcp_flexion": MotorNormMode.RANGE_0_100,
"middle_dip": MotorNormMode.RANGE_0_100,
"ring_mcp_abduction": MotorNormMode.RANGE_M100_100,
"ring_mcp_flexion": MotorNormMode.RANGE_0_100,
"ring_dip": MotorNormMode.RANGE_0_100,
"pinky_mcp_abduction": MotorNormMode.RANGE_M100_100,
"pinky_mcp_flexion": MotorNormMode.RANGE_0_100,
"pinky_dip": MotorNormMode.RANGE_0_100,
}
self.inverted_joints = RIGHT_HAND_INVERSIONS if config.side == "right" else LEFT_HAND_INVERSIONS
n = 10
# EMA parameters ---------------------------------------------------
self.n: int = n
self.alpha: float = 2 / (n + 1)
# one deque *per joint* so we can inspect raw history if needed
self._buffers: Dict[str, Deque[int]] = {joint: deque(maxlen=n) for joint in self.joints}
# running EMA value per joint lazily initialised on first read
self._ema: Dict[str, Optional[float]] = dict.fromkeys(self._buffers)
self._state: dict[str, float] | None = None
self.new_state_event = threading.Event()
self.stop_event = threading.Event()
self.thread = threading.Thread(target=self._read_loop, daemon=True, name=f"{self} _read_loop")
self.state_lock = threading.Lock()
@property
def action_features(self) -> dict:
return {f"{joint}.pos": float for joint in self.joints}
@property
def feedback_features(self) -> dict:
return {}
@property
def is_connected(self) -> bool:
with self.serial_lock:
return self.serial.is_open and self.thread.is_alive()
def connect(self, calibrate: bool = True) -> None:
if self.is_connected:
raise DeviceAlreadyConnectedError(f"{self} already connected")
if not self.serial.is_open:
self.serial.open()
self.thread.start()
# wait for the thread to ramp up & 1st state to be ready
if not self.new_state_event.wait(timeout=2):
raise TimeoutError(f"{self}: Timed out waiting for state after 2s.")
if not self.is_calibrated and calibrate:
self.calibrate()
logger.info(f"{self} connected.")
@property
def is_calibrated(self) -> bool:
return self.calibration_fpath.is_file()
def calibrate(self) -> None:
range_mins, range_maxes = {}, {}
for finger in ["thumb", "index", "middle", "ring", "pinky"]:
print(
f"\nMove {finger} through its entire range of motion."
"\nRecording positions. Press ENTER to stop..."
)
finger_joints = [joint for joint in self.joints if joint.startswith(finger)]
finger_mins, finger_maxes = self._record_ranges_of_motion(finger_joints)
range_mins.update(finger_mins)
range_maxes.update(finger_maxes)
self.calibration = {}
for id_, joint in enumerate(self.joints):
self.calibration[joint] = MotorCalibration(
id=id_,
drive_mode=1 if joint in self.inverted_joints else 0,
homing_offset=0,
range_min=range_mins[joint],
range_max=range_maxes[joint],
)
self._save_calibration()
print("Calibration saved to", self.calibration_fpath)
# TODO(Steven): This function is copy/paste from the `HomunculusArm` class. Consider moving it to an utility to reduce duplicated code.
def _record_ranges_of_motion(
self, joints: list[str] | None = None, display_values: bool = True
) -> tuple[dict[str, int], dict[str, int]]:
"""Interactively record the min/max encoder values of each joint.
Move the joints while the method streams live positions. Press :kbd:`Enter` to finish.
Args:
joints (list[str] | None, optional): Joints to record. Defaults to every joint (`None`).
display_values (bool, optional): When `True` (default) a live table is printed to the console.
Raises:
TypeError: `joints` is not `None` or a list.
ValueError: any joint's recorded min and max are the same.
Returns:
tuple[dict[str, int], dict[str, int]]: Two dictionaries *mins* and *maxes* with the extreme values
observed for each joint.
"""
if joints is None:
joints = list(self.joints)
elif not isinstance(joints, list):
raise TypeError(joints)
display_len = max(len(key) for key in joints)
start_positions = self._read(joints, normalize=False)
mins = start_positions.copy()
maxes = start_positions.copy()
user_pressed_enter = False
while not user_pressed_enter:
positions = self._read(joints, normalize=False)
mins = {joint: int(min(positions[joint], min_)) for joint, min_ in mins.items()}
maxes = {joint: int(max(positions[joint], max_)) for joint, max_ in maxes.items()}
if display_values:
print("\n-------------------------------------------")
print(f"{'NAME':<{display_len}} | {'MIN':>6} | {'POS':>6} | {'MAX':>6}")
for joint in joints:
print(
f"{joint:<{display_len}} | {mins[joint]:>6} | {positions[joint]:>6} | {maxes[joint]:>6}"
)
if enter_pressed():
user_pressed_enter = True
if display_values and not user_pressed_enter:
# Move cursor up to overwrite the previous output
move_cursor_up(len(joints) + 3)
same_min_max = [joint for joint in joints if mins[joint] == maxes[joint]]
if same_min_max:
raise ValueError(f"Some joints have the same min and max values:\n{pformat(same_min_max)}")
return mins, maxes
def configure(self) -> None:
pass
# TODO(Steven): This function is copy/paste from the `HomunculusArm` class. Consider moving it to an utility to reduce duplicated code.
def _normalize(self, values: dict[str, int]) -> dict[str, float]:
if not self.calibration:
raise RuntimeError(f"{self} has no calibration registered.")
normalized_values = {}
for joint, val in values.items():
min_ = self.calibration[joint].range_min
max_ = self.calibration[joint].range_max
drive_mode = self.calibration[joint].drive_mode
bounded_val = min(max_, max(min_, val))
if self.joints[joint] is MotorNormMode.RANGE_M100_100:
norm = (((bounded_val - min_) / (max_ - min_)) * 200) - 100
normalized_values[joint] = -norm if drive_mode else norm
elif self.joints[joint] is MotorNormMode.RANGE_0_100:
norm = ((bounded_val - min_) / (max_ - min_)) * 100
normalized_values[joint] = 100 - norm if drive_mode else norm
return normalized_values
def _apply_ema(self, raw: Dict[str, int]) -> Dict[str, int]:
"""Update buffers & running EMA values; return smoothed dict as integers."""
smoothed: Dict[str, int] = {}
for joint, value in raw.items():
# maintain raw history
self._buffers[joint].append(value)
# initialise on first run
if self._ema[joint] is None:
self._ema[joint] = float(value)
else:
self._ema[joint] = self.alpha * value + (1 - self.alpha) * self._ema[joint]
# Convert back to int for compatibility with normalization
smoothed[joint] = int(round(self._ema[joint]))
return smoothed
def _read(
self, joints: list[str] | None = None, normalize: bool = True, timeout: float = 1
) -> dict[str, int | float]:
"""
Return the most recent (single) values from self.last_d,
optionally applying calibration.
"""
if not self.new_state_event.wait(timeout=timeout):
raise TimeoutError(f"{self}: Timed out waiting for state after {timeout}s.")
with self.state_lock:
state = self._state
self.new_state_event.clear()
if state is None:
raise RuntimeError(f"{self} Internal error: Event set but no state available.")
if joints is not None:
state = {k: v for k, v in state.items() if k in joints}
# Apply EMA smoothing to raw values first
state = self._apply_ema(state)
# Then normalize if requested
if normalize:
state = self._normalize(state)
return state
def _read_loop(self):
"""
Continuously read from the serial buffer in its own thread and sends values to the main thread through
a queue.
"""
while not self.stop_event.is_set():
try:
positions = None
with self.serial_lock:
if self.serial.in_waiting > 0:
self.serial.flush()
positions = self.serial.readline().decode("utf-8").strip().split(" ")
if positions is None or len(positions) != len(self.joints):
continue
joint_positions = {joint: int(pos) for joint, pos in zip(self.joints, positions, strict=True)}
with self.state_lock:
self._state = joint_positions
self.new_state_event.set()
except Exception as e:
logger.debug(f"Error reading frame in background thread for {self}: {e}")
def get_action(self) -> dict[str, float]:
joint_positions = self._read()
return homunculus_glove_to_hope_jr_hand(
{f"{joint}.pos": pos for joint, pos in joint_positions.items()}
)
def send_feedback(self, feedback: dict[str, float]) -> None:
raise NotImplementedError
def disconnect(self) -> None:
if not self.is_connected:
DeviceNotConnectedError(f"{self} is not connected.")
self.stop_event.set()
self.thread.join(timeout=1)
self.serial.close()
logger.info(f"{self} disconnected.")

View File

@@ -0,0 +1,63 @@
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
INDEX_SPLAY = 0.3
MIDDLE_SPLAY = 0.3
RING_SPLAY = 0.3
PINKY_SPLAY = 0.5
def get_ulnar_flexion(flexion: float, abduction: float, splay: float):
return -abduction * splay + flexion * (1 - splay)
def get_radial_flexion(flexion: float, abduction: float, splay: float):
return abduction * splay + flexion * (1 - splay)
def homunculus_glove_to_hope_jr_hand(glove_action: dict[str, float]) -> dict[str, float]:
return {
"thumb_cmc.pos": glove_action["thumb_cmc.pos"],
"thumb_mcp.pos": glove_action["thumb_mcp.pos"],
"thumb_pip.pos": glove_action["thumb_pip.pos"],
"thumb_dip.pos": glove_action["thumb_dip.pos"],
"index_radial_flexor.pos": get_radial_flexion(
glove_action["index_mcp_flexion.pos"], glove_action["index_mcp_abduction.pos"], INDEX_SPLAY
),
"index_ulnar_flexor.pos": get_ulnar_flexion(
glove_action["index_mcp_flexion.pos"], glove_action["index_mcp_abduction.pos"], INDEX_SPLAY
),
"index_pip_dip.pos": glove_action["index_dip.pos"],
"middle_radial_flexor.pos": get_radial_flexion(
glove_action["middle_mcp_flexion.pos"], glove_action["middle_mcp_abduction.pos"], MIDDLE_SPLAY
),
"middle_ulnar_flexor.pos": get_ulnar_flexion(
glove_action["middle_mcp_flexion.pos"], glove_action["middle_mcp_abduction.pos"], MIDDLE_SPLAY
),
"middle_pip_dip.pos": glove_action["middle_dip.pos"],
"ring_radial_flexor.pos": get_radial_flexion(
glove_action["ring_mcp_flexion.pos"], glove_action["ring_mcp_abduction.pos"], RING_SPLAY
),
"ring_ulnar_flexor.pos": get_ulnar_flexion(
glove_action["ring_mcp_flexion.pos"], glove_action["ring_mcp_abduction.pos"], RING_SPLAY
),
"ring_pip_dip.pos": glove_action["ring_dip.pos"],
"pinky_radial_flexor.pos": get_radial_flexion(
glove_action["pinky_mcp_flexion.pos"], glove_action["pinky_mcp_abduction.pos"], PINKY_SPLAY
),
"pinky_ulnar_flexor.pos": get_ulnar_flexion(
glove_action["pinky_mcp_flexion.pos"], glove_action["pinky_mcp_abduction.pos"], PINKY_SPLAY
),
"pinky_pip_dip.pos": glove_action["pinky_dip.pos"],
}

View File

@@ -53,5 +53,13 @@ def make_teleoperator_from_config(config: TeleoperatorConfig) -> Teleoperator:
from .keyboard.teleop_keyboard import KeyboardEndEffectorTeleop
return KeyboardEndEffectorTeleop(config)
elif config.type == "homunculus_glove":
from .homunculus import HomunculusGlove
return HomunculusGlove(config)
elif config.type == "homunculus_arm":
from .homunculus import HomunculusArm
return HomunculusArm(config)
else:
raise ValueError(config.type)