#!/usr/bin/env python # Copyright 2025 The HuggingFace Inc. team. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import logging import time from typing import Any from lerobot.common.cameras.utils import make_cameras_from_configs from lerobot.common.constants import OBS_IMAGES, OBS_STATE from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError from lerobot.common.motors import Motor, MotorCalibration, MotorNormMode from lerobot.common.motors.feetech import ( FeetechMotorsBus, OperatingMode, ) from ..robot import Robot from ..utils import ensure_safe_goal_position from .config_hope_jr import HopeJrArmConfig logger = logging.getLogger(__name__) class HopeJrArm(Robot): config_class = HopeJrArmConfig name = "hope_jr_arm" def __init__(self, config: HopeJrArmConfig): super().__init__(config) self.config = config self.arm = FeetechMotorsBus( port=self.config.port, motors={ "shoulder_pitch": Motor(1, "sm8512bl", MotorNormMode.RANGE_M100_100), "shoulder_yaw": Motor(2, "sts3250", MotorNormMode.RANGE_M100_100), "shoulder_roll": Motor(3, "sts3250", MotorNormMode.RANGE_M100_100), "elbow_flex": Motor(4, "sts3250", MotorNormMode.RANGE_M100_100), "wrist_roll": Motor(5, "sts3215", MotorNormMode.RANGE_M100_100), "wrist_yaw": Motor(6, "sts3215", MotorNormMode.RANGE_M100_100), "wrist_pitch": Motor(7, "sts3215", MotorNormMode.RANGE_M100_100), }, ) self.cameras = make_cameras_from_configs(config.cameras) @property def state_feature(self) -> dict: return { "dtype": "float32", "shape": (len(self.arm),), "names": {"motors": list(self.arm.motors)}, } @property def action_feature(self) -> dict: return self.state_feature @property def camera_features(self) -> dict[str, dict]: cam_ft = {} for cam_key, cam in self.cameras.items(): cam_ft[cam_key] = { "shape": (cam.height, cam.width, cam.channels), "names": ["height", "width", "channels"], "info": None, } return cam_ft @property def is_connected(self) -> bool: # TODO(aliberts): add cam.is_connected for cam in self.cameras return self.arm.is_connected def connect(self) -> None: """ We assume that at connection time, arm is in a rest position, and torque can be safely disabled to run calibration. """ if self.is_connected: raise DeviceAlreadyConnectedError(f"{self} already connected") self.arm.connect() if not self.is_calibrated: self.calibrate() # Connect the cameras for cam in self.cameras.values(): cam.connect() self.configure() logger.info(f"{self} connected.") @property def is_calibrated(self) -> bool: return self.arm.is_calibrated def calibrate(self) -> None: raise NotImplementedError # TODO(aliberts): adapt code below (copied from koch) logger.info(f"\nRunning calibration of {self}") self.arm.disable_torque() for name in self.arm.names: self.arm.write("Operating_Mode", name, OperatingMode.POSITION.value) input("Move robot to the middle of its range of motion and press ENTER....") homing_offsets = self.arm.set_half_turn_homings() full_turn_motor = "wrist_roll" unknown_range_motors = [name for name in self.arm.names if name != full_turn_motor] logger.info( f"Move all joints except '{full_turn_motor}' sequentially through their " "entire ranges of motion.\nRecording positions. Press ENTER to stop..." ) range_mins, range_maxes = self.arm.record_ranges_of_motion(unknown_range_motors) range_mins[full_turn_motor] = 0 range_maxes[full_turn_motor] = 4095 self.calibration = {} for name, motor in self.arm.motors.items(): self.calibration[name] = MotorCalibration( id=motor.id, drive_mode=0, homing_offset=homing_offsets[name], range_min=range_mins[name], range_max=range_maxes[name], ) self.arm.write_calibration(self.calibration) self._save_calibration() print("Calibration saved to", self.calibration_fpath) def configure(self) -> None: self.arm.disable_torque() self.arm.configure_motors() # TODO self.arm.enable_torque() def get_observation(self) -> dict[str, Any]: if not self.is_connected: raise DeviceNotConnectedError(f"{self} is not connected.") obs_dict = {} # Read arm position start = time.perf_counter() obs_dict[OBS_STATE] = self.arm.sync_read("Present_Position") dt_ms = (time.perf_counter() - start) * 1e3 logger.debug(f"{self} read state: {dt_ms:.1f}ms") # Capture images from cameras for cam_key, cam in self.cameras.items(): start = time.perf_counter() obs_dict[f"{OBS_IMAGES}.{cam_key}"] = cam.async_read() dt_ms = (time.perf_counter() - start) * 1e3 logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms") return obs_dict def send_action(self, action: dict[str, Any]) -> dict[str, Any]: if not self.is_connected: raise DeviceNotConnectedError(f"{self} is not connected.") goal_pos = action # Cap goal position when too far away from present position. # /!\ Slower fps expected due to reading from the follower. if self.config.max_relative_target is not None: present_pos = self.arm.sync_read("Present_Position") goal_present_pos = {key: (g_pos, present_pos[key]) for key, g_pos in goal_pos.items()} goal_pos = ensure_safe_goal_position(goal_present_pos, self.config.max_relative_target) self.arm.sync_write("Goal_Position", goal_pos) return goal_pos def disconnect(self): if not self.is_connected: raise DeviceNotConnectedError(f"{self} is not connected.") self.arm.disconnect(self.config.disable_torque_on_disconnect) for cam in self.cameras.values(): cam.disconnect() logger.info(f"{self} disconnected.")