diff --git a/README.md b/README.md index 153a3a21..ff7a9238 100644 --- a/README.md +++ b/README.md @@ -22,6 +22,29 @@ +

+

+ Build Your Own HopeJR Robot!

+

+ +
+ HopeJR robot + +

Meet HopeJR – A humanoid robot arm and hand for dexterous manipulation!

+

Control it with exoskeletons and gloves for precise hand movements.

+

Perfect for advanced manipulation tasks! 🤖

+ +

+ See the full HopeJR tutorial here.

+
+ +
+

Build Your Own SO-101 Robot!

diff --git a/docs/source/_toctree.yml b/docs/source/_toctree.yml index ea80e825..83777a3c 100644 --- a/docs/source/_toctree.yml +++ b/docs/source/_toctree.yml @@ -23,6 +23,8 @@ title: Finetune SmolVLA title: "Policies" - sections: + - local: hope_jr + title: Hope Jr - local: so101 title: SO-101 - local: so100 diff --git a/docs/source/hope_jr.mdx b/docs/source/hope_jr.mdx new file mode 120000 index 00000000..40242263 --- /dev/null +++ b/docs/source/hope_jr.mdx @@ -0,0 +1 @@ +../../src/lerobot/robots/hope_jr/hope_jr.mdx \ No newline at end of file diff --git a/media/hope_jr/hopejr.png b/media/hope_jr/hopejr.png new file mode 100644 index 00000000..4186547a Binary files /dev/null and b/media/hope_jr/hopejr.png differ diff --git a/pyproject.toml b/pyproject.toml index 9fc84d90..408e3b77 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -86,6 +86,7 @@ dora = [ dynamixel = ["dynamixel-sdk>=3.7.31"] feetech = ["feetech-servo-sdk>=1.0.0"] gamepad = ["pygame>=2.5.1", "hidapi>=0.14.0"] +hopejr = ["feetech-servo-sdk>=1.0.0", "pygame>=2.5.1"] kinematics = ["placo>=0.9.6"] intelrealsense = [ "pyrealsense2>=2.55.1.6486 ; sys_platform != 'darwin'", diff --git a/src/lerobot/calibrate.py b/src/lerobot/calibrate.py index 37a9d5bd..1e8bf475 100644 --- a/src/lerobot/calibrate.py +++ b/src/lerobot/calibrate.py @@ -36,6 +36,7 @@ from lerobot.cameras.realsense.configuration_realsense import RealSenseCameraCon from lerobot.robots import ( # noqa: F401 Robot, RobotConfig, + hope_jr, koch_follower, lekiwi, make_robot_from_config, @@ -45,6 +46,7 @@ from lerobot.robots import ( # noqa: F401 from lerobot.teleoperators import ( # noqa: F401 Teleoperator, TeleoperatorConfig, + homunculus, koch_leader, make_teleoperator_from_config, so100_leader, diff --git a/src/lerobot/motors/calibration_gui.py b/src/lerobot/motors/calibration_gui.py new file mode 100644 index 00000000..9832a163 --- /dev/null +++ b/src/lerobot/motors/calibration_gui.py @@ -0,0 +1,401 @@ +# Copyright 2025 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import math +import os +from dataclasses import dataclass + +os.environ["PYGAME_HIDE_SUPPORT_PROMPT"] = "1" + +from lerobot.motors import MotorCalibration, MotorsBus + +BAR_LEN, BAR_THICKNESS = 450, 8 +HANDLE_R = 10 +BRACKET_W, BRACKET_H = 6, 14 +TRI_W, TRI_H = 12, 14 + +BTN_W, BTN_H = 60, 22 +SAVE_W, SAVE_H = 80, 28 +LOAD_W = 80 +DD_W, DD_H = 160, 28 + +TOP_GAP = 50 +PADDING_Y, TOP_OFFSET = 70, 60 +FONT_SIZE, FPS = 20, 60 + +BG_COLOR = (30, 30, 30) +BAR_RED, BAR_GREEN = (200, 60, 60), (60, 200, 60) +HANDLE_COLOR, TEXT_COLOR = (240, 240, 240), (250, 250, 250) +TICK_COLOR = (250, 220, 40) +BTN_COLOR, BTN_COLOR_HL = (80, 80, 80), (110, 110, 110) +DD_COLOR, DD_COLOR_HL = (70, 70, 70), (100, 100, 100) + + +def dist(a, b): + return math.hypot(a[0] - b[0], a[1] - b[1]) + + +@dataclass +class RangeValues: + min_v: int + pos_v: int + max_v: int + + +class RangeSlider: + """One motor = one slider row""" + + def __init__(self, motor, idx, res, calibration, present, label_pad, base_y): + import pygame + + self.motor = motor + self.res = res + self.x0 = 40 + label_pad + self.x1 = self.x0 + BAR_LEN + self.y = base_y + idx * PADDING_Y + + self.min_v = calibration.range_min + self.max_v = calibration.range_max + self.pos_v = max(self.min_v, min(present, self.max_v)) + + self.min_x = self._pos_from_val(self.min_v) + self.max_x = self._pos_from_val(self.max_v) + self.pos_x = self._pos_from_val(self.pos_v) + + self.min_btn = pygame.Rect(self.x0 - BTN_W - 6, self.y - BTN_H // 2, BTN_W, BTN_H) + self.max_btn = pygame.Rect(self.x1 + 6, self.y - BTN_H // 2, BTN_W, BTN_H) + + self.drag_min = self.drag_max = self.drag_pos = False + self.tick_val = present + self.font = pygame.font.Font(None, FONT_SIZE) + + def _val_from_pos(self, x): + return round((x - self.x0) / BAR_LEN * self.res) + + def _pos_from_val(self, v): + return self.x0 + (v / self.res) * BAR_LEN + + def set_tick(self, v): + self.tick_val = max(0, min(v, self.res)) + + def _triangle_hit(self, pos): + import pygame + + tri_top = self.y - BAR_THICKNESS // 2 - 2 + return pygame.Rect(self.pos_x - TRI_W // 2, tri_top - TRI_H, TRI_W, TRI_H).collidepoint(pos) + + def handle_event(self, e): + import pygame + + if e.type == pygame.MOUSEBUTTONDOWN and e.button == 1: + if self.min_btn.collidepoint(e.pos): + self.min_x, self.min_v = self.pos_x, self.pos_v + return + if self.max_btn.collidepoint(e.pos): + self.max_x, self.max_v = self.pos_x, self.pos_v + return + if dist(e.pos, (self.min_x, self.y)) <= HANDLE_R: + self.drag_min = True + elif dist(e.pos, (self.max_x, self.y)) <= HANDLE_R: + self.drag_max = True + elif self._triangle_hit(e.pos): + self.drag_pos = True + + elif e.type == pygame.MOUSEBUTTONUP and e.button == 1: + self.drag_min = self.drag_max = self.drag_pos = False + + elif e.type == pygame.MOUSEMOTION: + x = e.pos[0] + if self.drag_min: + self.min_x = max(self.x0, min(x, self.pos_x)) + elif self.drag_max: + self.max_x = min(self.x1, max(x, self.pos_x)) + elif self.drag_pos: + self.pos_x = max(self.min_x, min(x, self.max_x)) + + self.min_v = self._val_from_pos(self.min_x) + self.max_v = self._val_from_pos(self.max_x) + self.pos_v = self._val_from_pos(self.pos_x) + + def _draw_button(self, surf, rect, text): + import pygame + + clr = BTN_COLOR_HL if rect.collidepoint(pygame.mouse.get_pos()) else BTN_COLOR + pygame.draw.rect(surf, clr, rect, border_radius=4) + t = self.font.render(text, True, TEXT_COLOR) + surf.blit(t, (rect.centerx - t.get_width() // 2, rect.centery - t.get_height() // 2)) + + def draw(self, surf): + import pygame + + # motor name above set-min button (right-aligned) + name_surf = self.font.render(self.motor, True, TEXT_COLOR) + surf.blit( + name_surf, + (self.min_btn.right - name_surf.get_width(), self.min_btn.y - name_surf.get_height() - 4), + ) + + # bar + active section + pygame.draw.rect(surf, BAR_RED, (self.x0, self.y - BAR_THICKNESS // 2, BAR_LEN, BAR_THICKNESS)) + pygame.draw.rect( + surf, BAR_GREEN, (self.min_x, self.y - BAR_THICKNESS // 2, self.max_x - self.min_x, BAR_THICKNESS) + ) + + # tick + tick_x = self._pos_from_val(self.tick_val) + pygame.draw.line( + surf, + TICK_COLOR, + (tick_x, self.y - BAR_THICKNESS // 2 - 4), + (tick_x, self.y + BAR_THICKNESS // 2 + 4), + 2, + ) + + # brackets + for x, sign in ((self.min_x, +1), (self.max_x, -1)): + pygame.draw.line( + surf, HANDLE_COLOR, (x, self.y - BRACKET_H // 2), (x, self.y + BRACKET_H // 2), 2 + ) + pygame.draw.line( + surf, + HANDLE_COLOR, + (x, self.y - BRACKET_H // 2), + (x + sign * BRACKET_W, self.y - BRACKET_H // 2), + 2, + ) + pygame.draw.line( + surf, + HANDLE_COLOR, + (x, self.y + BRACKET_H // 2), + (x + sign * BRACKET_W, self.y + BRACKET_H // 2), + 2, + ) + + # triangle ▼ + tri_top = self.y - BAR_THICKNESS // 2 - 2 + pygame.draw.polygon( + surf, + HANDLE_COLOR, + [ + (self.pos_x, tri_top), + (self.pos_x - TRI_W // 2, tri_top - TRI_H), + (self.pos_x + TRI_W // 2, tri_top - TRI_H), + ], + ) + + # numeric labels + fh = self.font.get_height() + pos_y = tri_top - TRI_H - 4 - fh + txts = [ + (self.min_v, self.min_x, self.y - BRACKET_H // 2 - 4 - fh), + (self.max_v, self.max_x, self.y - BRACKET_H // 2 - 4 - fh), + (self.pos_v, self.pos_x, pos_y), + ] + for v, x, y in txts: + s = self.font.render(str(v), True, TEXT_COLOR) + surf.blit(s, (x - s.get_width() // 2, y)) + + # buttons + self._draw_button(surf, self.min_btn, "set min") + self._draw_button(surf, self.max_btn, "set max") + + # external + def values(self) -> RangeValues: + return RangeValues(self.min_v, self.pos_v, self.max_v) + + +class RangeFinderGUI: + def __init__(self, bus: MotorsBus, groups: dict[str, list[str]] | None = None): + import pygame + + self.bus = bus + self.groups = groups if groups is not None else {"all": list(bus.motors)} + self.group_names = list(groups) + self.current_group = self.group_names[0] + + if not bus.is_connected: + bus.connect() + + self.calibration = bus.read_calibration() + self.res_table = bus.model_resolution_table + self.present_cache = { + m: bus.read("Present_Position", m, normalize=False) for motors in groups.values() for m in motors + } + + pygame.init() + self.font = pygame.font.Font(None, FONT_SIZE) + + label_pad = max(self.font.size(m)[0] for ms in groups.values() for m in ms) + self.label_pad = label_pad + width = 40 + label_pad + BAR_LEN + 6 + BTN_W + 10 + SAVE_W + 10 + self.controls_bottom = 10 + SAVE_H + self.base_y = self.controls_bottom + TOP_GAP + height = self.base_y + PADDING_Y * len(groups[self.current_group]) + 40 + + self.screen = pygame.display.set_mode((width, height)) + pygame.display.set_caption("Motors range finder") + + # ui rects + self.save_btn = pygame.Rect(width - SAVE_W - 10, 10, SAVE_W, SAVE_H) + self.load_btn = pygame.Rect(self.save_btn.left - LOAD_W - 10, 10, LOAD_W, SAVE_H) + self.dd_btn = pygame.Rect(width // 2 - DD_W // 2, 10, DD_W, DD_H) + self.dd_open = False # dropdown expanded? + + self.clock = pygame.time.Clock() + self._build_sliders() + self._adjust_height() + + def _adjust_height(self): + import pygame + + motors = self.groups[self.current_group] + new_h = self.base_y + PADDING_Y * len(motors) + 40 + if new_h != self.screen.get_height(): + w = self.screen.get_width() + self.screen = pygame.display.set_mode((w, new_h)) + + def _build_sliders(self): + self.sliders: list[RangeSlider] = [] + motors = self.groups[self.current_group] + for i, m in enumerate(motors): + self.sliders.append( + RangeSlider( + motor=m, + idx=i, + res=self.res_table[self.bus.motors[m].model] - 1, + calibration=self.calibration[m], + present=self.present_cache[m], + label_pad=self.label_pad, + base_y=self.base_y, + ) + ) + + def _draw_dropdown(self): + import pygame + + # collapsed box + hover = self.dd_btn.collidepoint(pygame.mouse.get_pos()) + pygame.draw.rect(self.screen, DD_COLOR_HL if hover else DD_COLOR, self.dd_btn, border_radius=6) + + txt = self.font.render(self.current_group, True, TEXT_COLOR) + self.screen.blit( + txt, (self.dd_btn.centerx - txt.get_width() // 2, self.dd_btn.centery - txt.get_height() // 2) + ) + + tri_w, tri_h = 12, 6 + cx = self.dd_btn.right - 14 + cy = self.dd_btn.centery + 1 + pygame.draw.polygon( + self.screen, + TEXT_COLOR, + [(cx - tri_w // 2, cy - tri_h // 2), (cx + tri_w // 2, cy - tri_h // 2), (cx, cy + tri_h // 2)], + ) + + if not self.dd_open: + return + + # expanded list + for i, name in enumerate(self.group_names): + item_rect = pygame.Rect(self.dd_btn.left, self.dd_btn.bottom + i * DD_H, DD_W, DD_H) + clr = DD_COLOR_HL if item_rect.collidepoint(pygame.mouse.get_pos()) else DD_COLOR + pygame.draw.rect(self.screen, clr, item_rect) + t = self.font.render(name, True, TEXT_COLOR) + self.screen.blit( + t, (item_rect.centerx - t.get_width() // 2, item_rect.centery - t.get_height() // 2) + ) + + def _handle_dropdown_event(self, e): + import pygame + + if e.type == pygame.MOUSEBUTTONDOWN and e.button == 1: + if self.dd_btn.collidepoint(e.pos): + self.dd_open = not self.dd_open + return True + if self.dd_open: + for i, name in enumerate(self.group_names): + item_rect = pygame.Rect(self.dd_btn.left, self.dd_btn.bottom + i * DD_H, DD_W, DD_H) + if item_rect.collidepoint(e.pos): + if name != self.current_group: + self.current_group = name + self._build_sliders() + self._adjust_height() + self.dd_open = False + return True + self.dd_open = False + return False + + def _save_current(self): + for s in self.sliders: + self.calibration[s.motor].range_min = s.min_v + self.calibration[s.motor].range_max = s.max_v + + with self.bus.torque_disabled(): + self.bus.write_calibration(self.calibration) + + def _load_current(self): + self.calibration = self.bus.read_calibration() + for s in self.sliders: + s.min_v = self.calibration[s.motor].range_min + s.max_v = self.calibration[s.motor].range_max + s.min_x = s._pos_from_val(s.min_v) + s.max_x = s._pos_from_val(s.max_v) + + def run(self) -> dict[str, MotorCalibration]: + import pygame + + while True: + for e in pygame.event.get(): + if e.type == pygame.QUIT: + pygame.quit() + return self.calibration + + if self._handle_dropdown_event(e): + continue + + if e.type == pygame.MOUSEBUTTONDOWN and e.button == 1: + if self.save_btn.collidepoint(e.pos): + self._save_current() + elif self.load_btn.collidepoint(e.pos): + self._load_current() + + for s in self.sliders: + s.handle_event(e) + + # live goal write while dragging + for s in self.sliders: + if s.drag_pos: + self.bus.write("Goal_Position", s.motor, s.pos_v, normalize=False) + + # tick update + for s in self.sliders: + pos = self.bus.read("Present_Position", s.motor, normalize=False) + s.set_tick(pos) + self.present_cache[s.motor] = pos + + # ─ drawing + self.screen.fill(BG_COLOR) + for s in self.sliders: + s.draw(self.screen) + + self._draw_dropdown() + + # load / save buttons + for rect, text in ((self.load_btn, "LOAD"), (self.save_btn, "SAVE")): + clr = BTN_COLOR_HL if rect.collidepoint(pygame.mouse.get_pos()) else BTN_COLOR + pygame.draw.rect(self.screen, clr, rect, border_radius=6) + t = self.font.render(text, True, TEXT_COLOR) + self.screen.blit(t, (rect.centerx - t.get_width() // 2, rect.centery - t.get_height() // 2)) + + pygame.display.flip() + self.clock.tick(FPS) diff --git a/src/lerobot/motors/dynamixel/dynamixel.py b/src/lerobot/motors/dynamixel/dynamixel.py index d4f41643..1113ec0f 100644 --- a/src/lerobot/motors/dynamixel/dynamixel.py +++ b/src/lerobot/motors/dynamixel/dynamixel.py @@ -162,11 +162,11 @@ class DynamixelMotorsBus(MotorsBus): raise RuntimeError(f"Motor '{motor}' (model '{model}') was not found. Make sure it is connected.") - def configure_motors(self) -> None: + def configure_motors(self, return_delay_time=0) -> None: # By default, Dynamixel motors have a 500µs delay response time (corresponding to a value of 250 on # the 'Return_Delay_Time' address). We ensure this is reduced to the minimum of 2µs (value of 0). for motor in self.motors: - self.write("Return_Delay_Time", motor, 0) + self.write("Return_Delay_Time", motor, return_delay_time) @property def is_calibrated(self) -> bool: @@ -190,13 +190,14 @@ class DynamixelMotorsBus(MotorsBus): return calibration - def write_calibration(self, calibration_dict: dict[str, MotorCalibration]) -> None: + def write_calibration(self, calibration_dict: dict[str, MotorCalibration], cache: bool = True) -> None: for motor, calibration in calibration_dict.items(): self.write("Homing_Offset", motor, calibration.homing_offset) self.write("Min_Position_Limit", motor, calibration.range_min) self.write("Max_Position_Limit", motor, calibration.range_max) - self.calibration = calibration_dict + if cache: + self.calibration = calibration_dict def disable_torque(self, motors: str | list[str] | None = None, num_retry: int = 0) -> None: for motor in self._get_motors_list(motors): diff --git a/src/lerobot/motors/feetech/feetech.py b/src/lerobot/motors/feetech/feetech.py index 7edf869a..88d45ba3 100644 --- a/src/lerobot/motors/feetech/feetech.py +++ b/src/lerobot/motors/feetech/feetech.py @@ -219,15 +219,15 @@ class FeetechMotorsBus(MotorsBus): raise RuntimeError(f"Motor '{motor}' (model '{model}') was not found. Make sure it is connected.") - def configure_motors(self) -> None: + def configure_motors(self, return_delay_time=0, maximum_acceleration=254, acceleration=254) -> None: for motor in self.motors: # By default, Feetech motors have a 500µs delay response time (corresponding to a value of 250 on # the 'Return_Delay_Time' address). We ensure this is reduced to the minimum of 2µs (value of 0). - self.write("Return_Delay_Time", motor, 0) + self.write("Return_Delay_Time", motor, return_delay_time) # Set 'Maximum_Acceleration' to 254 to speedup acceleration and deceleration of the motors. - # Note: this address is not in the official STS3215 Memory Table - self.write("Maximum_Acceleration", motor, 254) - self.write("Acceleration", motor, 254) + if self.protocol_version == 0: + self.write("Maximum_Acceleration", motor, maximum_acceleration) + self.write("Acceleration", motor, acceleration) @property def is_calibrated(self) -> bool: @@ -270,14 +270,15 @@ class FeetechMotorsBus(MotorsBus): return calibration - def write_calibration(self, calibration_dict: dict[str, MotorCalibration]) -> None: + def write_calibration(self, calibration_dict: dict[str, MotorCalibration], cache: bool = True) -> None: for motor, calibration in calibration_dict.items(): if self.protocol_version == 0: self.write("Homing_Offset", motor, calibration.homing_offset) self.write("Min_Position_Limit", motor, calibration.range_min) self.write("Max_Position_Limit", motor, calibration.range_max) - self.calibration = calibration_dict + if cache: + self.calibration = calibration_dict def _get_half_turn_homings(self, positions: dict[NameOrID, Value]) -> dict[NameOrID, Value]: """ diff --git a/src/lerobot/motors/feetech/tables.py b/src/lerobot/motors/feetech/tables.py index 0a2f2659..48814957 100644 --- a/src/lerobot/motors/feetech/tables.py +++ b/src/lerobot/motors/feetech/tables.py @@ -189,7 +189,7 @@ MODEL_RESOLUTION = { "scs_series": 1024, "sts3215": 4096, "sts3250": 4096, - "sm8512bl": 65536, + "sm8512bl": 4096, "scs0009": 1024, } diff --git a/src/lerobot/motors/motors_bus.py b/src/lerobot/motors/motors_bus.py index 7386bfb1..26522c7c 100644 --- a/src/lerobot/motors/motors_bus.py +++ b/src/lerobot/motors/motors_bus.py @@ -586,7 +586,7 @@ class MotorsBus(abc.ABC): pass @contextmanager - def torque_disabled(self): + def torque_disabled(self, motors: int | str | list[str] | None = None): """Context-manager that guarantees torque is re-enabled. This helper is useful to temporarily disable torque when configuring motors. @@ -596,11 +596,11 @@ class MotorsBus(abc.ABC): ... # Safe operations here ... pass """ - self.disable_torque() + self.disable_torque(motors) try: yield finally: - self.enable_torque() + self.enable_torque(motors) def set_timeout(self, timeout_ms: int | None = None): """Change the packet timeout used by the SDK. @@ -653,12 +653,13 @@ class MotorsBus(abc.ABC): pass @abc.abstractmethod - def write_calibration(self, calibration_dict: dict[str, MotorCalibration]) -> None: - """Write calibration parameters to the motors and cache them. + def write_calibration(self, calibration_dict: dict[str, MotorCalibration], cache: bool = True) -> None: + """Write calibration parameters to the motors and optionally cache them. Args: calibration_dict (dict[str, MotorCalibration]): Calibration obtained from :pymeth:`read_calibration` or crafted by the user. + cache (bool, optional): Save the calibration to :pyattr:`calibration`. Defaults to True. """ pass diff --git a/src/lerobot/record.py b/src/lerobot/record.py index 635bdf1e..9fc0dc7e 100644 --- a/src/lerobot/record.py +++ b/src/lerobot/record.py @@ -57,6 +57,7 @@ from lerobot.policies.pretrained import PreTrainedPolicy from lerobot.robots import ( # noqa: F401 Robot, RobotConfig, + hope_jr, koch_follower, make_robot_from_config, so100_follower, @@ -65,6 +66,7 @@ from lerobot.robots import ( # noqa: F401 from lerobot.teleoperators import ( # noqa: F401 Teleoperator, TeleoperatorConfig, + homunculus, koch_leader, make_teleoperator_from_config, so100_leader, diff --git a/src/lerobot/replay.py b/src/lerobot/replay.py index ef20c28e..c51c55ce 100644 --- a/src/lerobot/replay.py +++ b/src/lerobot/replay.py @@ -39,6 +39,7 @@ from lerobot.datasets.lerobot_dataset import LeRobotDataset from lerobot.robots import ( # noqa: F401 Robot, RobotConfig, + hope_jr, koch_follower, make_robot_from_config, so100_follower, diff --git a/src/lerobot/robots/hope_jr/__init__.py b/src/lerobot/robots/hope_jr/__init__.py new file mode 100644 index 00000000..324e3c8e --- /dev/null +++ b/src/lerobot/robots/hope_jr/__init__.py @@ -0,0 +1,3 @@ +from .config_hope_jr import HopeJrArmConfig, HopeJrHandConfig +from .hope_jr_arm import HopeJrArm +from .hope_jr_hand import HopeJrHand diff --git a/src/lerobot/robots/hope_jr/config_hope_jr.py b/src/lerobot/robots/hope_jr/config_hope_jr.py new file mode 100644 index 00000000..747e98e0 --- /dev/null +++ b/src/lerobot/robots/hope_jr/config_hope_jr.py @@ -0,0 +1,51 @@ +#!/usr/bin/env python + +# Copyright 2025 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from dataclasses import dataclass, field + +from lerobot.cameras import CameraConfig + +from ..config import RobotConfig + + +@RobotConfig.register_subclass("hope_jr_hand") +@dataclass +class HopeJrHandConfig(RobotConfig): + port: str # Port to connect to the hand + side: str # "left" / "right" + + disable_torque_on_disconnect: bool = True + + cameras: dict[str, CameraConfig] = field(default_factory=dict) + + def __post_init__(self): + super().__post_init__() + if self.side not in ["right", "left"]: + raise ValueError(self.side) + + +@RobotConfig.register_subclass("hope_jr_arm") +@dataclass +class HopeJrArmConfig(RobotConfig): + port: str # Port to connect to the hand + disable_torque_on_disconnect: bool = True + + # `max_relative_target` limits the magnitude of the relative positional target vector for safety purposes. + # Set this to a positive scalar to have the same value for all motors, or a list that is the same length as + # the number of motors in your follower arms. + max_relative_target: int | None = None + + cameras: dict[str, CameraConfig] = field(default_factory=dict) diff --git a/src/lerobot/robots/hope_jr/hope_jr.mdx b/src/lerobot/robots/hope_jr/hope_jr.mdx new file mode 100644 index 00000000..2f9ec9d8 --- /dev/null +++ b/src/lerobot/robots/hope_jr/hope_jr.mdx @@ -0,0 +1,268 @@ +# HopeJR + +## Prerequisites + +- [Hardware Setup](https://github.com/TheRobotStudio/HOPEJr) + +## Install LeRobot + +Follow the [installation instructions](https://github.com/huggingface/lerobot#installation) to install LeRobot. + +Install LeRobot with HopeJR dependencies: +```bash +pip install -e ".[hopejr]" +``` + +## Device Configuration + +Before starting calibration and operation, you need to identify the USB ports for each HopeJR component. Run this script to find the USB ports for the arm, hand, glove, and exoskeleton: + +```bash +python -m lerobot.find_port +``` + +This will display the available USB ports and their associated devices. Make note of the port paths (e.g., `/dev/tty.usbmodem58760433331`, `/dev/tty.usbmodem11301`) as you'll need to specify them in the `--robot.port` and `--teleop.port` parameters when recording data, replaying episodes, or running teleoperation scripts. + +## Step 1: Calibration + +Before performing teleoperation, HopeJR's limbs need to be calibrated. Calibration files will be saved in `~/.cache/huggingface/lerobot/calibration` + +### 1.1 Calibrate Robot Hand + +```bash +python -m lerobot.calibrate \ + --robot.type=hope_jr_hand \ + --robot.port=/dev/tty.usbmodem58760432281 \ + --robot.id=blue \ + --robot.side=right +``` + +When running the calibration script, a calibration GUI will pop up. Finger joints are named as follows: + +**Thumb**: +- **CMC**: base joint connecting thumb to hand +- **MCP**: knuckle joint +- **PIP**: first finger joint +- **DIP** : fingertip joint + +**Index, Middle, Ring, and Pinky fingers**: +- **Radial flexor**: Moves base of finger towards the thumb +- **Ulnar flexor**: Moves base of finger towards the pinky +- **PIP/DIP**: Flexes the distal and proximal phalanx of the finger + +Each one of these will need to be calibrated individually via the GUI. + Note that ulnar and radial flexors should have ranges of the same size (but with different offsets) in order to get symmetric movement. + +

+ Setting boundaries in the hand calibration GUI + +

+ +Use the calibration interface to set the range boundaries for each joint as shown above. + +

+ Saving calibration values + +

+ +Once you have set the appropriate boundaries for all joints, click "Save" to save the calibration values to the motors. + +### 1.2 Calibrate Teleoperator Glove + +```bash +python -m lerobot.calibrate \ + --teleop.type=homunculus_glove \ + --teleop.port=/dev/tty.usbmodem11201 \ + --teleop.id=red \ + --teleop.side=right +``` + +Move each finger through its full range of motion, starting from the thumb. + +``` +Move thumb through its entire range of motion. +Recording positions. Press ENTER to stop... + +------------------------------------------- +NAME | MIN | POS | MAX +thumb_cmc | 1790 | 1831 | 1853 +thumb_mcp | 1497 | 1514 | 1528 +thumb_pip | 1466 | 1496 | 1515 +thumb_dip | 1463 | 1484 | 1514 +``` + +Continue with each finger: + +``` +Move middle through its entire range of motion. +Recording positions. Press ENTER to stop... + +------------------------------------------- +NAME | MIN | POS | MAX +middle_mcp_abduction | 1598 | 1718 | 1820 +middle_mcp_flexion | 1512 | 1658 | 2136 +middle_dip | 1484 | 1500 | 1547 +``` + +Once calibration is complete, the system will save the calibration to `/Users/your_username/.cache/huggingface/lerobot/calibration/teleoperators/homunculus_glove/red.json` + +### 1.3 Calibrate Robot Arm + +```bash +python -m lerobot.calibrate \ + --robot.type=hope_jr_arm \ + --robot.port=/dev/tty.usbserial-1110 \ + --robot.id=white +``` + +This will open a calibration GUI where you can set the range limits for each motor. The arm motions are organized as follows: +- **Shoulder**: pitch, yaw, and roll +- **Elbow**: flex +- **Wrist**: pitch, yaw, and roll + +

+ Setting boundaries in the arm calibration GUI + +

+ +Use the calibration interface to set the range boundaries for each joint. Move each joint through its full range of motion and adjust the minimum and maximum values accordingly. Once you have set the appropriate boundaries for all joints, save the calibration. + +### 1.4 Calibrate Teleoperator Exoskeleton + +```bash +python -m lerobot.calibrate \ + --teleop.type=homunculus_arm \ + --teleop.port=/dev/tty.usbmodem11201 \ + --teleop.id=black +``` + +The exoskeleton allows one to control the robot arm. During calibration, you'll be prompted to move all joints through their full range of motion: + +``` +Move all joints through their entire range of motion. +Recording positions. Press ENTER to stop... + +------------------------------------------- +------------------------------------------- +NAME | MIN | POS | MAX +shoulder_pitch | 586 | 736 | 895 +shoulder_yaw | 1257 | 1374 | 1390 +shoulder_roll | 449 | 1034 | 2564 +elbow_flex | 3023 | 3117 | 3134 +wrist_roll | 3073 | 3096 | 3147 +wrist_yaw | 2143 | 2171 | 2185 +wrist_pitch | 1975 | 1993 | 2074 +Calibration saved to /Users/your_username/.cache/huggingface/lerobot/calibration/teleoperators/homunculus_arm/black.json +``` + +## Step 2: Teleoperation + +Due to global variable conflicts in the Feetech middleware, teleoperation for arm and hand must run in separate shell sessions: + +### Hand +```bash +python -m lerobot.teleoperate \ + --robot.type=hope_jr_hand \ + --robot.port=/dev/tty.usbmodem58760432281 \ + --robot.id=blue \ + --robot.side=right \ + --teleop.type=homunculus_glove \ + --teleop.port=/dev/tty.usbmodem11201 \ + --teleop.id=red \ + --teleop.side=right \ + --display_data=true \ + --fps=30 +``` + +### Arm +```bash +python -m lerobot.teleoperate \ + --robot.type=hope_jr_arm \ + --robot.port=/dev/tty.usbserial-1110 \ + --robot.id=white \ + --teleop.type=homunculus_arm \ + --teleop.port=/dev/tty.usbmodem11201 \ + --teleop.id=black \ + --display_data=true \ + --fps=30 +``` + +## Step 3: Record, Replay, Train + +Record, Replay and Train with Hope-JR is still experimental. + +### Record + +This step records the dataset, which can be seen as an example [here](https://huggingface.co/datasets/nepyope/hand_record_test_with_video_data/settings). + +```bash +python -m lerobot.record \ + --robot.type=hope_jr_hand \ + --robot.port=/dev/tty.usbmodem58760432281 \ + --robot.id=right \ + --robot.side=right \ + --robot.cameras='{"main": {"type": "opencv", "index_or_path": 0, "width": 640, "height": 480, "fps": 30}}' \ + --teleop.type=homunculus_glove \ + --teleop.port=/dev/tty.usbmodem1201 \ + --teleop.id=right \ + --teleop.side=right \ + --dataset.repo_id=nepyope/hand_record_test_with_video_data \ + --dataset.single_task="Hand recording test with video data" \ + --dataset.num_episodes=1 \ + --dataset.episode_time_s=5 \ + --dataset.push_to_hub=true \ + --dataset.private=true \ + --display_data=true +``` + +### Replay + +```bash +python -m lerobot.replay \ + --robot.type=hope_jr_hand \ + --robot.port=/dev/tty.usbmodem58760432281 \ + --robot.id=right \ + --robot.side=right \ + --dataset.repo_id=nepyope/hand_record_test_with_camera \ + --dataset.episode=0 +``` + +### Train + +```bash +python -m lerobot.scripts.train \ + --dataset.repo_id=nepyope/hand_record_test_with_video_data \ + --policy.type=act \ + --output_dir=outputs/train/hopejr_hand \ + --job_name=hopejr \ + --policy.device=mps \ + --wandb.enable=true \ + --policy.repo_id=nepyope/hand_test_policy +``` + +### Evaluate + +This training run can be viewed as an example [here](https://wandb.ai/tino/lerobot/runs/rp0k8zvw?nw=nwusertino). + +```bash +python -m lerobot.record \ + --robot.type=hope_jr_hand \ + --robot.port=/dev/tty.usbmodem58760432281 \ + --robot.id=right \ + --robot.side=right \ + --robot.cameras='{"main": {"type": "opencv", "index_or_path": 0, "width": 640, "height": 480, "fps": 30}}' \ + --display_data=false \ + --dataset.repo_id=nepyope/eval_hopejr \ + --dataset.single_task="Evaluate hopejr hand policy" \ + --dataset.num_episodes=10 \ + --policy.path=outputs/train/hopejr_hand/checkpoints/last/pretrained_model +``` diff --git a/src/lerobot/robots/hope_jr/hope_jr_arm.py b/src/lerobot/robots/hope_jr/hope_jr_arm.py new file mode 100644 index 00000000..0e3a615a --- /dev/null +++ b/src/lerobot/robots/hope_jr/hope_jr_arm.py @@ -0,0 +1,176 @@ +#!/usr/bin/env python + +# Copyright 2025 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import time +from functools import cached_property +from typing import Any + +from lerobot.cameras.utils import make_cameras_from_configs +from lerobot.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError +from lerobot.motors import Motor, MotorNormMode +from lerobot.motors.calibration_gui import RangeFinderGUI +from lerobot.motors.feetech import ( + FeetechMotorsBus, +) + +from ..robot import Robot +from ..utils import ensure_safe_goal_position +from .config_hope_jr import HopeJrArmConfig + +logger = logging.getLogger(__name__) + + +class HopeJrArm(Robot): + config_class = HopeJrArmConfig + name = "hope_jr_arm" + + def __init__(self, config: HopeJrArmConfig): + super().__init__(config) + self.config = config + self.bus = FeetechMotorsBus( + port=self.config.port, + motors={ + "shoulder_pitch": Motor(1, "sm8512bl", MotorNormMode.RANGE_M100_100), + "shoulder_yaw": Motor(2, "sts3250", MotorNormMode.RANGE_M100_100), + "shoulder_roll": Motor(3, "sts3250", MotorNormMode.RANGE_M100_100), + "elbow_flex": Motor(4, "sts3250", MotorNormMode.RANGE_M100_100), + "wrist_roll": Motor(5, "sts3250", MotorNormMode.RANGE_M100_100), + "wrist_yaw": Motor(6, "sts3250", MotorNormMode.RANGE_M100_100), + "wrist_pitch": Motor(7, "sts3250", MotorNormMode.RANGE_M100_100), + }, + calibration=self.calibration, + ) + self.cameras = make_cameras_from_configs(config.cameras) + + # HACK + self.shoulder_pitch = "shoulder_pitch" + self.other_motors = [m for m in self.bus.motors if m != "shoulder_pitch"] + + @property + def _motors_ft(self) -> dict[str, type]: + return {f"{motor}.pos": float for motor in self.bus.motors} + + @property + def _cameras_ft(self) -> dict[str, tuple]: + return { + cam: (self.config.cameras[cam].height, self.config.cameras[cam].width, 3) for cam in self.cameras + } + + @cached_property + def observation_features(self) -> dict[str, type | tuple]: + return {**self._motors_ft, **self._cameras_ft} + + @cached_property + def action_features(self) -> dict[str, type]: + return self._motors_ft + + @property + def is_connected(self) -> bool: + return self.bus.is_connected and all(cam.is_connected for cam in self.cameras.values()) + + def connect(self, calibrate: bool = True) -> None: + """ + We assume that at connection time, arm is in a rest position, + and torque can be safely disabled to run calibration. + """ + if self.is_connected: + raise DeviceAlreadyConnectedError(f"{self} already connected") + + self.bus.connect(handshake=False) + if not self.is_calibrated and calibrate: + self.calibrate() + + # Connect the cameras + for cam in self.cameras.values(): + cam.connect() + + self.configure() + logger.info(f"{self} connected.") + + @property + def is_calibrated(self) -> bool: + return self.bus.is_calibrated + + def calibrate(self, limb_name: str = None) -> None: + groups = { + "all": list(self.bus.motors.keys()), + "shoulder": ["shoulder_pitch", "shoulder_yaw", "shoulder_roll"], + "elbow": ["elbow_flex"], + "wrist": ["wrist_roll", "wrist_yaw", "wrist_pitch"], + } + + self.calibration = RangeFinderGUI(self.bus, groups).run() + self._save_calibration() + print("Calibration saved to", self.calibration_fpath) + + def configure(self) -> None: + with self.bus.torque_disabled(): + self.bus.configure_motors(maximum_acceleration=30, acceleration=30) + + def setup_motors(self) -> None: + # TODO: add docstring + for motor in reversed(self.bus.motors): + input(f"Connect the controller board to the '{motor}' motor only and press enter.") + self.bus.setup_motor(motor) + print(f"'{motor}' motor id set to {self.bus.motors[motor].id}") + + def get_observation(self) -> dict[str, Any]: + if not self.is_connected: + raise DeviceNotConnectedError(f"{self} is not connected.") + + # Read arm position + start = time.perf_counter() + obs_dict = self.bus.sync_read("Present_Position", self.other_motors) + obs_dict[self.shoulder_pitch] = self.bus.read("Present_Position", self.shoulder_pitch) + obs_dict = {f"{motor}.pos": val for motor, val in obs_dict.items()} + dt_ms = (time.perf_counter() - start) * 1e3 + logger.debug(f"{self} read state: {dt_ms:.1f}ms") + + # Capture images from cameras + for cam_key, cam in self.cameras.items(): + start = time.perf_counter() + obs_dict[cam_key] = cam.async_read() + dt_ms = (time.perf_counter() - start) * 1e3 + logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms") + + return obs_dict + + def send_action(self, action: dict[str, Any]) -> dict[str, Any]: + if not self.is_connected: + raise DeviceNotConnectedError(f"{self} is not connected.") + + goal_pos = {key.removesuffix(".pos"): val for key, val in action.items() if key.endswith(".pos")} + + # Cap goal position when too far away from present position. + # /!\ Slower fps expected due to reading from the follower. + if self.config.max_relative_target is not None: + present_pos = self.bus.sync_read("Present_Position") + goal_present_pos = {key: (g_pos, present_pos[key]) for key, g_pos in goal_pos.items()} + goal_pos = ensure_safe_goal_position(goal_present_pos, self.config.max_relative_target) + + self.bus.sync_write("Goal_Position", goal_pos) + return {f"{motor}.pos": val for motor, val in goal_pos.items()} + + def disconnect(self): + if not self.is_connected: + raise DeviceNotConnectedError(f"{self} is not connected.") + + self.bus.disconnect(self.config.disable_torque_on_disconnect) + for cam in self.cameras.values(): + cam.disconnect() + + logger.info(f"{self} disconnected.") diff --git a/src/lerobot/robots/hope_jr/hope_jr_hand.py b/src/lerobot/robots/hope_jr/hope_jr_hand.py new file mode 100644 index 00000000..8dc100e0 --- /dev/null +++ b/src/lerobot/robots/hope_jr/hope_jr_hand.py @@ -0,0 +1,200 @@ +#!/usr/bin/env python + +# Copyright 2025 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import time +from functools import cached_property +from typing import Any + +from lerobot.cameras.utils import make_cameras_from_configs +from lerobot.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError +from lerobot.motors import Motor, MotorNormMode +from lerobot.motors.calibration_gui import RangeFinderGUI +from lerobot.motors.feetech import ( + FeetechMotorsBus, +) + +from ..robot import Robot +from .config_hope_jr import HopeJrHandConfig + +logger = logging.getLogger(__name__) + +RIGHT_HAND_INVERSIONS = [ + "thumb_mcp", + "thumb_dip", + "index_ulnar_flexor", + "middle_ulnar_flexor", + "ring_ulnar_flexor", + "ring_pip_dip", + "pinky_ulnar_flexor", + "pinky_pip_dip", +] + +LEFT_HAND_INVERSIONS = [ + "thumb_cmc", + "thumb_mcp", + "thumb_dip", + "index_radial_flexor", + "index_pip_dip", + "middle_radial_flexor", + "middle_pip_dip", + "ring_radial_flexor", + "ring_pip_dip", + "pinky_radial_flexor", + # "pinky_pip_dip", +] + + +class HopeJrHand(Robot): + config_class = HopeJrHandConfig + name = "hope_jr_hand" + + def __init__(self, config: HopeJrHandConfig): + super().__init__(config) + self.config = config + self.bus = FeetechMotorsBus( + port=self.config.port, + motors={ + # Thumb + "thumb_cmc": Motor(1, "scs0009", MotorNormMode.RANGE_0_100), + "thumb_mcp": Motor(2, "scs0009", MotorNormMode.RANGE_0_100), + "thumb_pip": Motor(3, "scs0009", MotorNormMode.RANGE_0_100), + "thumb_dip": Motor(4, "scs0009", MotorNormMode.RANGE_0_100), + # Index + "index_radial_flexor": Motor(5, "scs0009", MotorNormMode.RANGE_0_100), + "index_ulnar_flexor": Motor(6, "scs0009", MotorNormMode.RANGE_0_100), + "index_pip_dip": Motor(7, "scs0009", MotorNormMode.RANGE_0_100), + # Middle + "middle_radial_flexor": Motor(8, "scs0009", MotorNormMode.RANGE_0_100), + "middle_ulnar_flexor": Motor(9, "scs0009", MotorNormMode.RANGE_0_100), + "middle_pip_dip": Motor(10, "scs0009", MotorNormMode.RANGE_0_100), + # Ring + "ring_radial_flexor": Motor(11, "scs0009", MotorNormMode.RANGE_0_100), + "ring_ulnar_flexor": Motor(12, "scs0009", MotorNormMode.RANGE_0_100), + "ring_pip_dip": Motor(13, "scs0009", MotorNormMode.RANGE_0_100), + # Pinky + "pinky_radial_flexor": Motor(14, "scs0009", MotorNormMode.RANGE_0_100), + "pinky_ulnar_flexor": Motor(15, "scs0009", MotorNormMode.RANGE_0_100), + "pinky_pip_dip": Motor(16, "scs0009", MotorNormMode.RANGE_0_100), + }, + calibration=self.calibration, + protocol_version=1, + ) + self.cameras = make_cameras_from_configs(config.cameras) + self.inverted_motors = RIGHT_HAND_INVERSIONS if config.side == "right" else LEFT_HAND_INVERSIONS + + @property + def _motors_ft(self) -> dict[str, type]: + return {f"{motor}.pos": float for motor in self.bus.motors} + + @property + def _cameras_ft(self) -> dict[str, tuple]: + return { + cam: (self.config.cameras[cam].height, self.config.cameras[cam].width, 3) for cam in self.cameras + } + + @cached_property + def observation_features(self) -> dict[str, type | tuple]: + return {**self._motors_ft, **self._cameras_ft} + + @cached_property + def action_features(self) -> dict[str, type]: + return self._motors_ft + + @property + def is_connected(self) -> bool: + return self.bus.is_connected and all(cam.is_connected for cam in self.cameras.values()) + + def connect(self, calibrate: bool = True) -> None: + if self.is_connected: + raise DeviceAlreadyConnectedError(f"{self} already connected") + + self.bus.connect() + if not self.is_calibrated and calibrate: + self.calibrate() + + # Connect the cameras + for cam in self.cameras.values(): + cam.connect() + + self.configure() + logger.info(f"{self} connected.") + + @property + def is_calibrated(self) -> bool: + return self.bus.is_calibrated + + def calibrate(self) -> None: + fingers = {} + for finger in ["thumb", "index", "middle", "ring", "pinky"]: + fingers[finger] = [motor for motor in self.bus.motors if motor.startswith(finger)] + + self.calibration = RangeFinderGUI(self.bus, fingers).run() + for motor in self.inverted_motors: + self.calibration[motor].drive_mode = 1 + self._save_calibration() + print("Calibration saved to", self.calibration_fpath) + + def configure(self) -> None: + with self.bus.torque_disabled(): + self.bus.configure_motors() + + def setup_motors(self) -> None: + # TODO: add docstring + for motor in self.bus.motors: + input(f"Connect the controller board to the '{motor}' motor only and press enter.") + self.bus.setup_motor(motor) + print(f"'{motor}' motor id set to {self.bus.motors[motor].id}") + + def get_observation(self) -> dict[str, Any]: + if not self.is_connected: + raise DeviceNotConnectedError(f"{self} is not connected.") + + obs_dict = {} + + # Read hand position + start = time.perf_counter() + for motor in self.bus.motors: + obs_dict[f"{motor}.pos"] = self.bus.read("Present_Position", motor) + dt_ms = (time.perf_counter() - start) * 1e3 + logger.debug(f"{self} read state: {dt_ms:.1f}ms") + + # Capture images from cameras + for cam_key, cam in self.cameras.items(): + start = time.perf_counter() + obs_dict[cam_key] = cam.async_read() + dt_ms = (time.perf_counter() - start) * 1e3 + logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms") + + return obs_dict + + def send_action(self, action: dict[str, Any]) -> dict[str, Any]: + if not self.is_connected: + raise DeviceNotConnectedError(f"{self} is not connected.") + + goal_pos = {key.removesuffix(".pos"): val for key, val in action.items() if key.endswith(".pos")} + self.bus.sync_write("Goal_Position", goal_pos) + return action + + def disconnect(self): + if not self.is_connected: + raise DeviceNotConnectedError(f"{self} is not connected.") + + self.bus.disconnect(self.config.disable_torque_on_disconnect) + for cam in self.cameras.values(): + cam.disconnect() + + logger.info(f"{self} disconnected.") diff --git a/src/lerobot/robots/utils.py b/src/lerobot/robots/utils.py index 435303c6..911d4046 100644 --- a/src/lerobot/robots/utils.py +++ b/src/lerobot/robots/utils.py @@ -49,6 +49,14 @@ def make_robot_from_config(config: RobotConfig) -> Robot: from .viperx import ViperX return ViperX(config) + elif config.type == "hope_jr_hand": + from .hope_jr import HopeJrHand + + return HopeJrHand(config) + elif config.type == "hope_jr_arm": + from .hope_jr import HopeJrArm + + return HopeJrArm(config) elif config.type == "mock_robot": from tests.mocks.mock_robot import MockRobot diff --git a/src/lerobot/teleoperate.py b/src/lerobot/teleoperate.py index e2819345..168f898c 100644 --- a/src/lerobot/teleoperate.py +++ b/src/lerobot/teleoperate.py @@ -43,6 +43,7 @@ from lerobot.cameras.realsense.configuration_realsense import RealSenseCameraCon from lerobot.robots import ( # noqa: F401 Robot, RobotConfig, + hope_jr, koch_follower, make_robot_from_config, so100_follower, @@ -52,6 +53,7 @@ from lerobot.teleoperators import ( # noqa: F401 Teleoperator, TeleoperatorConfig, gamepad, + homunculus, koch_leader, make_teleoperator_from_config, so100_leader, diff --git a/src/lerobot/teleoperators/homunculus/__init__.py b/src/lerobot/teleoperators/homunculus/__init__.py new file mode 100644 index 00000000..04b5c0f2 --- /dev/null +++ b/src/lerobot/teleoperators/homunculus/__init__.py @@ -0,0 +1,4 @@ +from .config_homunculus import HomunculusArmConfig, HomunculusGloveConfig +from .homunculus_arm import HomunculusArm +from .homunculus_glove import HomunculusGlove +from .joints_translation import homunculus_glove_to_hope_jr_hand diff --git a/src/lerobot/teleoperators/homunculus/config_homunculus.py b/src/lerobot/teleoperators/homunculus/config_homunculus.py new file mode 100644 index 00000000..da465215 --- /dev/null +++ b/src/lerobot/teleoperators/homunculus/config_homunculus.py @@ -0,0 +1,38 @@ +#!/usr/bin/env python + +# Copyright 2025 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from dataclasses import dataclass + +from ..config import TeleoperatorConfig + + +@TeleoperatorConfig.register_subclass("homunculus_glove") +@dataclass +class HomunculusGloveConfig(TeleoperatorConfig): + port: str # Port to connect to the glove + side: str # "left" / "right" + baud_rate: int = 115_200 + + def __post_init__(self): + if self.side not in ["right", "left"]: + raise ValueError(self.side) + + +@TeleoperatorConfig.register_subclass("homunculus_arm") +@dataclass +class HomunculusArmConfig(TeleoperatorConfig): + port: str # Port to connect to the arm + baud_rate: int = 115_200 diff --git a/src/lerobot/teleoperators/homunculus/homunculus_arm.py b/src/lerobot/teleoperators/homunculus/homunculus_arm.py new file mode 100644 index 00000000..dfce0c88 --- /dev/null +++ b/src/lerobot/teleoperators/homunculus/homunculus_arm.py @@ -0,0 +1,310 @@ +#!/usr/bin/env python + +# Copyright 2025 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import threading +from collections import deque +from pprint import pformat +from typing import Deque, Dict, Optional + +import serial + +from lerobot.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError +from lerobot.motors.motors_bus import MotorCalibration, MotorNormMode +from lerobot.utils.utils import enter_pressed, move_cursor_up + +from ..teleoperator import Teleoperator +from .config_homunculus import HomunculusArmConfig + +logger = logging.getLogger(__name__) + + +class HomunculusArm(Teleoperator): + """ + Homunculus Arm designed by Hugging Face. + """ + + config_class = HomunculusArmConfig + name = "homunculus_arm" + + def __init__(self, config: HomunculusArmConfig): + super().__init__(config) + self.config = config + self.serial = serial.Serial(config.port, config.baud_rate, timeout=1) + self.serial_lock = threading.Lock() + + self.joints = { + "shoulder_pitch": MotorNormMode.RANGE_M100_100, + "shoulder_yaw": MotorNormMode.RANGE_M100_100, + "shoulder_roll": MotorNormMode.RANGE_M100_100, + "elbow_flex": MotorNormMode.RANGE_M100_100, + "wrist_roll": MotorNormMode.RANGE_M100_100, + "wrist_yaw": MotorNormMode.RANGE_M100_100, + "wrist_pitch": MotorNormMode.RANGE_M100_100, + } + n = 50 + # EMA parameters --------------------------------------------------- + self.n: int = n + self.alpha: float = 2 / (n + 1) + # one deque *per joint* so we can inspect raw history if needed + self._buffers: Dict[str, Deque[int]] = { + joint: deque(maxlen=n) + for joint in ( + "shoulder_pitch", + "shoulder_yaw", + "shoulder_roll", + "elbow_flex", + "wrist_roll", + "wrist_yaw", + "wrist_pitch", + ) + } + # running EMA value per joint – lazily initialised on first read + self._ema: Dict[str, Optional[float]] = dict.fromkeys(self._buffers) + + self._state: dict[str, float] | None = None + self.new_state_event = threading.Event() + self.stop_event = threading.Event() + self.thread = threading.Thread(target=self._read_loop, daemon=True, name=f"{self} _read_loop") + self.state_lock = threading.Lock() + + @property + def action_features(self) -> dict: + return {f"{joint}.pos": float for joint in self.joints} + + @property + def feedback_features(self) -> dict: + return {} + + @property + def is_connected(self) -> bool: + with self.serial_lock: + return self.serial.is_open and self.thread.is_alive() + + def connect(self, calibrate: bool = True) -> None: + if self.is_connected: + raise DeviceAlreadyConnectedError(f"{self} already connected") + + if not self.serial.is_open: + self.serial.open() + self.thread.start() + + # wait for the thread to ramp up & 1st state to be ready + if not self.new_state_event.wait(timeout=2): + raise TimeoutError(f"{self}: Timed out waiting for state after 2s.") + + if not self.is_calibrated and calibrate: + self.calibrate() + + logger.info(f"{self} connected.") + + @property + def is_calibrated(self) -> bool: + return self.calibration_fpath.is_file() + + def calibrate(self) -> None: + print( + "\nMove all joints through their entire range of motion." + "\nRecording positions. Press ENTER to stop..." + ) + range_mins, range_maxes = self._record_ranges_of_motion() + + self.calibration = {} + for id_, joint in enumerate(self.joints): + self.calibration[joint] = MotorCalibration( + id=id_, + drive_mode=0, + homing_offset=0, + range_min=range_mins[joint], + range_max=range_maxes[joint], + ) + + self._save_calibration() + print("Calibration saved to", self.calibration_fpath) + + # TODO(Steven): This function is copy/paste from the `HomunculusGlove` class. Consider moving it to an utility to reduce duplicated code. + def _record_ranges_of_motion( + self, joints: list[str] | None = None, display_values: bool = True + ) -> tuple[dict[str, int], dict[str, int]]: + """Interactively record the min/max encoder values of each joint. + + Move the joints while the method streams live positions. Press :kbd:`Enter` to finish. + + Args: + joints (list[str] | None, optional): Joints to record. Defaults to every joint (`None`). + display_values (bool, optional): When `True` (default) a live table is printed to the console. + + Raises: + TypeError: `joints` is not `None` or a list. + ValueError: any joint's recorded min and max are the same. + + Returns: + tuple[dict[str, int], dict[str, int]]: Two dictionaries *mins* and *maxes* with the extreme values + observed for each joint. + """ + if joints is None: + joints = list(self.joints) + elif not isinstance(joints, list): + raise TypeError(joints) + + display_len = max(len(key) for key in joints) + + start_positions = self._read(joints, normalize=False) + mins = start_positions.copy() + maxes = start_positions.copy() + + user_pressed_enter = False + while not user_pressed_enter: + positions = self._read(joints, normalize=False) + mins = {joint: int(min(positions[joint], min_)) for joint, min_ in mins.items()} + maxes = {joint: int(max(positions[joint], max_)) for joint, max_ in maxes.items()} + + if display_values: + print("\n-------------------------------------------") + print(f"{'NAME':<{display_len}} | {'MIN':>6} | {'POS':>6} | {'MAX':>6}") + for joint in joints: + print( + f"{joint:<{display_len}} | {mins[joint]:>6} | {positions[joint]:>6} | {maxes[joint]:>6}" + ) + + if enter_pressed(): + user_pressed_enter = True + + if display_values and not user_pressed_enter: + # Move cursor up to overwrite the previous output + move_cursor_up(len(joints) + 3) + + same_min_max = [joint for joint in joints if mins[joint] == maxes[joint]] + if same_min_max: + raise ValueError(f"Some joints have the same min and max values:\n{pformat(same_min_max)}") + + return mins, maxes + + def configure(self) -> None: + pass + + # TODO(Steven): This function is copy/paste from the `HomunculusGlove` class. Consider moving it to an utility to reduce duplicated code. + def _normalize(self, values: dict[str, int]) -> dict[str, float]: + if not self.calibration: + raise RuntimeError(f"{self} has no calibration registered.") + + normalized_values = {} + for joint, val in values.items(): + min_ = self.calibration[joint].range_min + max_ = self.calibration[joint].range_max + drive_mode = self.calibration[joint].drive_mode + bounded_val = min(max_, max(min_, val)) + + if self.joints[joint] is MotorNormMode.RANGE_M100_100: + norm = (((bounded_val - min_) / (max_ - min_)) * 200) - 100 + normalized_values[joint] = -norm if drive_mode else norm + elif self.joints[joint] is MotorNormMode.RANGE_0_100: + norm = ((bounded_val - min_) / (max_ - min_)) * 100 + normalized_values[joint] = 100 - norm if drive_mode else norm + + return normalized_values + + def _apply_ema(self, raw: Dict[str, int]) -> Dict[str, float]: + """Update buffers & running EMA values; return smoothed dict.""" + smoothed: Dict[str, float] = {} + for joint, value in raw.items(): + # maintain raw history + self._buffers[joint].append(value) + + # initialise on first run + if self._ema[joint] is None: + self._ema[joint] = float(value) + else: + self._ema[joint] = self.alpha * value + (1 - self.alpha) * self._ema[joint] + + smoothed[joint] = self._ema[joint] + return smoothed + + def _read( + self, joints: list[str] | None = None, normalize: bool = True, timeout: float = 1 + ) -> dict[str, int | float]: + """ + Return the most recent (single) values from self.last_d, + optionally applying calibration. + """ + if not self.new_state_event.wait(timeout=timeout): + raise TimeoutError(f"{self}: Timed out waiting for state after {timeout}s.") + + with self.state_lock: + state = self._state + + self.new_state_event.clear() + + if state is None: + raise RuntimeError(f"{self} Internal error: Event set but no state available.") + + if joints is not None: + state = {k: v for k, v in state.items() if k in joints} + + if normalize: + state = self._normalize(state) + + state = self._apply_ema(state) + + return state + + def _read_loop(self): + """ + Continuously read from the serial buffer in its own thread and sends values to the main thread through + a queue. + """ + while not self.stop_event.is_set(): + try: + raw_values = None + with self.serial_lock: + if self.serial.in_waiting > 0: + self.serial.flush() + raw_values = self.serial.readline().decode("utf-8").strip().split(" ") + if raw_values is None or len(raw_values) != 21: # 16 raw + 5 angle values + continue + + joint_angles = { + "shoulder_pitch": int(raw_values[19]), + "shoulder_yaw": int(raw_values[18]), + "shoulder_roll": int(raw_values[20]), + "elbow_flex": int(raw_values[17]), + "wrist_roll": int(raw_values[16]), + "wrist_yaw": int(raw_values[1]), + "wrist_pitch": int(raw_values[0]), + } + + with self.state_lock: + self._state = joint_angles + self.new_state_event.set() + + except Exception as e: + logger.debug(f"Error reading frame in background thread for {self}: {e}") + + def get_action(self) -> dict[str, float]: + joint_positions = self._read() + return {f"{joint}.pos": pos for joint, pos in joint_positions.items()} + + def send_feedback(self, feedback: dict[str, float]) -> None: + raise NotImplementedError + + def disconnect(self) -> None: + if not self.is_connected: + DeviceNotConnectedError(f"{self} is not connected.") + + self.stop_event.set() + self.thread.join(timeout=1) + self.serial.close() + logger.info(f"{self} disconnected.") diff --git a/src/lerobot/teleoperators/homunculus/homunculus_glove.py b/src/lerobot/teleoperators/homunculus/homunculus_glove.py new file mode 100644 index 00000000..d367a2a7 --- /dev/null +++ b/src/lerobot/teleoperators/homunculus/homunculus_glove.py @@ -0,0 +1,338 @@ +#!/usr/bin/env python + +# Copyright 2025 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import threading +from collections import deque +from pprint import pformat +from typing import Deque, Dict, Optional + +import serial + +from lerobot.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError +from lerobot.motors import MotorCalibration +from lerobot.motors.motors_bus import MotorNormMode +from lerobot.teleoperators.homunculus.joints_translation import homunculus_glove_to_hope_jr_hand +from lerobot.utils.utils import enter_pressed, move_cursor_up + +from ..teleoperator import Teleoperator +from .config_homunculus import HomunculusGloveConfig + +logger = logging.getLogger(__name__) + +LEFT_HAND_INVERSIONS = [ + "thumb_cmc", + "index_dip", + "middle_mcp_abduction", + "middle_dip", + "pinky_mcp_abduction", + "pinky_dip", +] + +RIGHT_HAND_INVERSIONS = [ + "thumb_mcp", + "thumb_cmc", + "thumb_pip", + "thumb_dip", + "index_mcp_abduction", + # "index_dip", + "middle_mcp_abduction", + # "middle_dip", + "ring_mcp_abduction", + "ring_mcp_flexion", + # "ring_dip", + "pinky_mcp_abduction", +] + + +class HomunculusGlove(Teleoperator): + """ + Homunculus Glove designed by NepYope & Hugging Face. + """ + + config_class = HomunculusGloveConfig + name = "homunculus_glove" + + def __init__(self, config: HomunculusGloveConfig): + super().__init__(config) + self.config = config + self.serial = serial.Serial(config.port, config.baud_rate, timeout=1) + self.serial_lock = threading.Lock() + + self.joints = { + "thumb_cmc": MotorNormMode.RANGE_0_100, + "thumb_mcp": MotorNormMode.RANGE_0_100, + "thumb_pip": MotorNormMode.RANGE_0_100, + "thumb_dip": MotorNormMode.RANGE_0_100, + "index_mcp_abduction": MotorNormMode.RANGE_M100_100, + "index_mcp_flexion": MotorNormMode.RANGE_0_100, + "index_dip": MotorNormMode.RANGE_0_100, + "middle_mcp_abduction": MotorNormMode.RANGE_M100_100, + "middle_mcp_flexion": MotorNormMode.RANGE_0_100, + "middle_dip": MotorNormMode.RANGE_0_100, + "ring_mcp_abduction": MotorNormMode.RANGE_M100_100, + "ring_mcp_flexion": MotorNormMode.RANGE_0_100, + "ring_dip": MotorNormMode.RANGE_0_100, + "pinky_mcp_abduction": MotorNormMode.RANGE_M100_100, + "pinky_mcp_flexion": MotorNormMode.RANGE_0_100, + "pinky_dip": MotorNormMode.RANGE_0_100, + } + self.inverted_joints = RIGHT_HAND_INVERSIONS if config.side == "right" else LEFT_HAND_INVERSIONS + + n = 10 + # EMA parameters --------------------------------------------------- + self.n: int = n + self.alpha: float = 2 / (n + 1) + # one deque *per joint* so we can inspect raw history if needed + self._buffers: Dict[str, Deque[int]] = {joint: deque(maxlen=n) for joint in self.joints} + # running EMA value per joint – lazily initialised on first read + self._ema: Dict[str, Optional[float]] = dict.fromkeys(self._buffers) + + self._state: dict[str, float] | None = None + self.new_state_event = threading.Event() + self.stop_event = threading.Event() + self.thread = threading.Thread(target=self._read_loop, daemon=True, name=f"{self} _read_loop") + self.state_lock = threading.Lock() + + @property + def action_features(self) -> dict: + return {f"{joint}.pos": float for joint in self.joints} + + @property + def feedback_features(self) -> dict: + return {} + + @property + def is_connected(self) -> bool: + with self.serial_lock: + return self.serial.is_open and self.thread.is_alive() + + def connect(self, calibrate: bool = True) -> None: + if self.is_connected: + raise DeviceAlreadyConnectedError(f"{self} already connected") + + if not self.serial.is_open: + self.serial.open() + self.thread.start() + + # wait for the thread to ramp up & 1st state to be ready + if not self.new_state_event.wait(timeout=2): + raise TimeoutError(f"{self}: Timed out waiting for state after 2s.") + + if not self.is_calibrated and calibrate: + self.calibrate() + + logger.info(f"{self} connected.") + + @property + def is_calibrated(self) -> bool: + return self.calibration_fpath.is_file() + + def calibrate(self) -> None: + range_mins, range_maxes = {}, {} + for finger in ["thumb", "index", "middle", "ring", "pinky"]: + print( + f"\nMove {finger} through its entire range of motion." + "\nRecording positions. Press ENTER to stop..." + ) + finger_joints = [joint for joint in self.joints if joint.startswith(finger)] + finger_mins, finger_maxes = self._record_ranges_of_motion(finger_joints) + range_mins.update(finger_mins) + range_maxes.update(finger_maxes) + + self.calibration = {} + for id_, joint in enumerate(self.joints): + self.calibration[joint] = MotorCalibration( + id=id_, + drive_mode=1 if joint in self.inverted_joints else 0, + homing_offset=0, + range_min=range_mins[joint], + range_max=range_maxes[joint], + ) + + self._save_calibration() + print("Calibration saved to", self.calibration_fpath) + + # TODO(Steven): This function is copy/paste from the `HomunculusArm` class. Consider moving it to an utility to reduce duplicated code. + def _record_ranges_of_motion( + self, joints: list[str] | None = None, display_values: bool = True + ) -> tuple[dict[str, int], dict[str, int]]: + """Interactively record the min/max encoder values of each joint. + + Move the joints while the method streams live positions. Press :kbd:`Enter` to finish. + + Args: + joints (list[str] | None, optional): Joints to record. Defaults to every joint (`None`). + display_values (bool, optional): When `True` (default) a live table is printed to the console. + + Raises: + TypeError: `joints` is not `None` or a list. + ValueError: any joint's recorded min and max are the same. + + Returns: + tuple[dict[str, int], dict[str, int]]: Two dictionaries *mins* and *maxes* with the extreme values + observed for each joint. + """ + if joints is None: + joints = list(self.joints) + elif not isinstance(joints, list): + raise TypeError(joints) + + display_len = max(len(key) for key in joints) + + start_positions = self._read(joints, normalize=False) + mins = start_positions.copy() + maxes = start_positions.copy() + + user_pressed_enter = False + while not user_pressed_enter: + positions = self._read(joints, normalize=False) + mins = {joint: int(min(positions[joint], min_)) for joint, min_ in mins.items()} + maxes = {joint: int(max(positions[joint], max_)) for joint, max_ in maxes.items()} + + if display_values: + print("\n-------------------------------------------") + print(f"{'NAME':<{display_len}} | {'MIN':>6} | {'POS':>6} | {'MAX':>6}") + for joint in joints: + print( + f"{joint:<{display_len}} | {mins[joint]:>6} | {positions[joint]:>6} | {maxes[joint]:>6}" + ) + + if enter_pressed(): + user_pressed_enter = True + + if display_values and not user_pressed_enter: + # Move cursor up to overwrite the previous output + move_cursor_up(len(joints) + 3) + + same_min_max = [joint for joint in joints if mins[joint] == maxes[joint]] + if same_min_max: + raise ValueError(f"Some joints have the same min and max values:\n{pformat(same_min_max)}") + + return mins, maxes + + def configure(self) -> None: + pass + + # TODO(Steven): This function is copy/paste from the `HomunculusArm` class. Consider moving it to an utility to reduce duplicated code. + def _normalize(self, values: dict[str, int]) -> dict[str, float]: + if not self.calibration: + raise RuntimeError(f"{self} has no calibration registered.") + + normalized_values = {} + for joint, val in values.items(): + min_ = self.calibration[joint].range_min + max_ = self.calibration[joint].range_max + drive_mode = self.calibration[joint].drive_mode + bounded_val = min(max_, max(min_, val)) + + if self.joints[joint] is MotorNormMode.RANGE_M100_100: + norm = (((bounded_val - min_) / (max_ - min_)) * 200) - 100 + normalized_values[joint] = -norm if drive_mode else norm + elif self.joints[joint] is MotorNormMode.RANGE_0_100: + norm = ((bounded_val - min_) / (max_ - min_)) * 100 + normalized_values[joint] = 100 - norm if drive_mode else norm + + return normalized_values + + def _apply_ema(self, raw: Dict[str, int]) -> Dict[str, int]: + """Update buffers & running EMA values; return smoothed dict as integers.""" + smoothed: Dict[str, int] = {} + for joint, value in raw.items(): + # maintain raw history + self._buffers[joint].append(value) + + # initialise on first run + if self._ema[joint] is None: + self._ema[joint] = float(value) + else: + self._ema[joint] = self.alpha * value + (1 - self.alpha) * self._ema[joint] + + # Convert back to int for compatibility with normalization + smoothed[joint] = int(round(self._ema[joint])) + return smoothed + + def _read( + self, joints: list[str] | None = None, normalize: bool = True, timeout: float = 1 + ) -> dict[str, int | float]: + """ + Return the most recent (single) values from self.last_d, + optionally applying calibration. + """ + if not self.new_state_event.wait(timeout=timeout): + raise TimeoutError(f"{self}: Timed out waiting for state after {timeout}s.") + + with self.state_lock: + state = self._state + + self.new_state_event.clear() + + if state is None: + raise RuntimeError(f"{self} Internal error: Event set but no state available.") + + if joints is not None: + state = {k: v for k, v in state.items() if k in joints} + + # Apply EMA smoothing to raw values first + state = self._apply_ema(state) + + # Then normalize if requested + if normalize: + state = self._normalize(state) + + return state + + def _read_loop(self): + """ + Continuously read from the serial buffer in its own thread and sends values to the main thread through + a queue. + """ + while not self.stop_event.is_set(): + try: + positions = None + with self.serial_lock: + if self.serial.in_waiting > 0: + self.serial.flush() + positions = self.serial.readline().decode("utf-8").strip().split(" ") + if positions is None or len(positions) != len(self.joints): + continue + + joint_positions = {joint: int(pos) for joint, pos in zip(self.joints, positions, strict=True)} + + with self.state_lock: + self._state = joint_positions + self.new_state_event.set() + + except Exception as e: + logger.debug(f"Error reading frame in background thread for {self}: {e}") + + def get_action(self) -> dict[str, float]: + joint_positions = self._read() + return homunculus_glove_to_hope_jr_hand( + {f"{joint}.pos": pos for joint, pos in joint_positions.items()} + ) + + def send_feedback(self, feedback: dict[str, float]) -> None: + raise NotImplementedError + + def disconnect(self) -> None: + if not self.is_connected: + DeviceNotConnectedError(f"{self} is not connected.") + + self.stop_event.set() + self.thread.join(timeout=1) + self.serial.close() + logger.info(f"{self} disconnected.") diff --git a/src/lerobot/teleoperators/homunculus/joints_translation.py b/src/lerobot/teleoperators/homunculus/joints_translation.py new file mode 100644 index 00000000..f14f7b3e --- /dev/null +++ b/src/lerobot/teleoperators/homunculus/joints_translation.py @@ -0,0 +1,63 @@ +# Copyright 2025 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +INDEX_SPLAY = 0.3 +MIDDLE_SPLAY = 0.3 +RING_SPLAY = 0.3 +PINKY_SPLAY = 0.5 + + +def get_ulnar_flexion(flexion: float, abduction: float, splay: float): + return -abduction * splay + flexion * (1 - splay) + + +def get_radial_flexion(flexion: float, abduction: float, splay: float): + return abduction * splay + flexion * (1 - splay) + + +def homunculus_glove_to_hope_jr_hand(glove_action: dict[str, float]) -> dict[str, float]: + return { + "thumb_cmc.pos": glove_action["thumb_cmc.pos"], + "thumb_mcp.pos": glove_action["thumb_mcp.pos"], + "thumb_pip.pos": glove_action["thumb_pip.pos"], + "thumb_dip.pos": glove_action["thumb_dip.pos"], + "index_radial_flexor.pos": get_radial_flexion( + glove_action["index_mcp_flexion.pos"], glove_action["index_mcp_abduction.pos"], INDEX_SPLAY + ), + "index_ulnar_flexor.pos": get_ulnar_flexion( + glove_action["index_mcp_flexion.pos"], glove_action["index_mcp_abduction.pos"], INDEX_SPLAY + ), + "index_pip_dip.pos": glove_action["index_dip.pos"], + "middle_radial_flexor.pos": get_radial_flexion( + glove_action["middle_mcp_flexion.pos"], glove_action["middle_mcp_abduction.pos"], MIDDLE_SPLAY + ), + "middle_ulnar_flexor.pos": get_ulnar_flexion( + glove_action["middle_mcp_flexion.pos"], glove_action["middle_mcp_abduction.pos"], MIDDLE_SPLAY + ), + "middle_pip_dip.pos": glove_action["middle_dip.pos"], + "ring_radial_flexor.pos": get_radial_flexion( + glove_action["ring_mcp_flexion.pos"], glove_action["ring_mcp_abduction.pos"], RING_SPLAY + ), + "ring_ulnar_flexor.pos": get_ulnar_flexion( + glove_action["ring_mcp_flexion.pos"], glove_action["ring_mcp_abduction.pos"], RING_SPLAY + ), + "ring_pip_dip.pos": glove_action["ring_dip.pos"], + "pinky_radial_flexor.pos": get_radial_flexion( + glove_action["pinky_mcp_flexion.pos"], glove_action["pinky_mcp_abduction.pos"], PINKY_SPLAY + ), + "pinky_ulnar_flexor.pos": get_ulnar_flexion( + glove_action["pinky_mcp_flexion.pos"], glove_action["pinky_mcp_abduction.pos"], PINKY_SPLAY + ), + "pinky_pip_dip.pos": glove_action["pinky_dip.pos"], + } diff --git a/src/lerobot/teleoperators/utils.py b/src/lerobot/teleoperators/utils.py index b49addc1..8a667fd4 100644 --- a/src/lerobot/teleoperators/utils.py +++ b/src/lerobot/teleoperators/utils.py @@ -53,5 +53,13 @@ def make_teleoperator_from_config(config: TeleoperatorConfig) -> Teleoperator: from .keyboard.teleop_keyboard import KeyboardEndEffectorTeleop return KeyboardEndEffectorTeleop(config) + elif config.type == "homunculus_glove": + from .homunculus import HomunculusGlove + + return HomunculusGlove(config) + elif config.type == "homunculus_arm": + from .homunculus import HomunculusArm + + return HomunculusArm(config) else: raise ValueError(config.type)