diff --git a/lerobot/common/teleoperators/homonculus/homonculus_arm.py b/lerobot/common/teleoperators/homonculus/homonculus_arm.py index 46712f1a..3657c0a0 100644 --- a/lerobot/common/teleoperators/homonculus/homonculus_arm.py +++ b/lerobot/common/teleoperators/homonculus/homonculus_arm.py @@ -15,71 +15,53 @@ # limitations under the License. import logging -import os -import pickle # nosec import threading import time -from collections import deque -from enum import Enum +from pprint import pformat -import numpy as np import serial from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError +from lerobot.common.motors.motors_bus import MotorCalibration, MotorNormMode +from lerobot.common.utils.utils import enter_pressed, move_cursor_up from ..teleoperator import Teleoperator from .config_homonculus import HomonculusArmConfig logger = logging.getLogger(__name__) -LOWER_BOUND_LINEAR = -100 -UPPER_BOUND_LINEAR = 200 - - -class CalibrationMode(Enum): - # Joints with rotational motions are expressed in degrees in nominal range of [-180, 180] - DEGREE = 0 - # Joints with linear motions (like gripper of Aloha) are expressed in nominal range of [0, 100] - LINEAR = 1 - class HomonculusArm(Teleoperator): + """ + Homonculus Arm designed by Hugging Face. + """ + config_class = HomonculusArmConfig name = "homonculus_arm" def __init__(self, config: HomonculusArmConfig): + super().__init__(config) self.config = config self.serial = serial.Serial(config.port, config.baud_rate, timeout=1) - self.buffer_size = config.buffer_size - self.joints = [ - "wrist_roll", - "wrist_pitch", - "wrist_yaw", - "elbow_flex", - "shoulder_roll", - "shoulder_yaw", - "shoulder_pitch", - ] - # Initialize a buffer (deque) for each joint - self.joints_buffer = {joint: deque(maxlen=self.buffer_size) for joint in self.joints} - - # Last read dictionary - self.last_d = dict.fromkeys(self.joints, 100) - - # For adaptive EMA, we store a "previous smoothed" state per joint - self.adaptive_ema_state = dict.fromkeys(self.joints) - self.kalman_state = {joint: {"x": None, "P": None} for joint in self.joints} - - self.calibration = None - self.thread = threading.Thread(target=self.async_read, daemon=True) + self.joints = { + "wrist_roll": MotorNormMode.RANGE_M100_100, + "wrist_pitch": MotorNormMode.RANGE_M100_100, + "wrist_yaw": MotorNormMode.RANGE_M100_100, + "elbow_flex": MotorNormMode.RANGE_M100_100, + "shoulder_roll": MotorNormMode.RANGE_M100_100, + "shoulder_yaw": MotorNormMode.RANGE_M100_100, + "shoulder_pitch": MotorNormMode.RANGE_M100_100, + } + self.thread = threading.Thread(target=self._async_read, daemon=True, name=f"{self} _async_read") + self._lock = threading.Lock() @property - def action_feature(self) -> dict: + def action_features(self) -> dict: return {f"{joint}.pos": float for joint in self.joints} @property - def feedback_feature(self) -> dict: + def feedback_features(self) -> dict: return {} @property @@ -90,23 +72,152 @@ class HomonculusArm(Teleoperator): if self.is_connected: raise DeviceAlreadyConnectedError(f"{self} already connected") - self.serial.open() + if not self.serial.is_open: + self.serial.open() self.thread.start() - self.configure() + time.sleep(1) # gives time for the thread to ramp up logger.info(f"{self} connected.") @property def is_calibrated(self) -> bool: - raise NotImplementedError # TODO + return self.calibration_fpath.is_file() def calibrate(self) -> None: - raise NotImplementedError # TODO + print( + "\nMove all joints through their entire range of motion." + "\nRecording positions. Press ENTER to stop..." + ) + range_mins, range_maxes = self._record_ranges_of_motion() + + self.calibration = {} + for id_, joint in enumerate(self.joints): + self.calibration[joint] = MotorCalibration( + id=id_, + drive_mode=0, + homing_offset=0, + range_min=range_mins[joint], + range_max=range_maxes[joint], + ) + + self._save_calibration() + print("Calibration saved to", self.calibration_fpath) + + def _record_ranges_of_motion( + self, joints: list[str] | None = None, display_values: bool = True + ) -> tuple[dict[str, int], dict[str, int]]: + """Interactively record the min/max encoder values of each joint. + + Move the joints while the method streams live positions. Press :kbd:`Enter` to finish. + + Args: + joints (list[str] | None, optional): Joints to record. Defaults to every joint (`None`). + display_values (bool, optional): When `True` (default) a live table is printed to the console. + + Raises: + TypeError: `joints` is not `None` or a list. + ValueError: any joint's recorded min and max are the same. + + Returns: + tuple[dict[str, int], dict[str, int]]: Two dictionaries *mins* and *maxes* with the extreme values + observed for each joint. + """ + if joints is None: + joints = list(self.joints) + elif not isinstance(joints, list): + raise TypeError(joints) + + start_positions = self._read(joints, normalize=False) + mins = start_positions.copy() + maxes = start_positions.copy() + while True: + positions = self._read(joints, normalize=False) + mins = {joint: min(positions[joint], min_) for joint, min_ in mins.items()} + maxes = {joint: max(positions[joint], max_) for joint, max_ in maxes.items()} + + if display_values: + print("\n-------------------------------------------") + print(f"{'NAME':<15} | {'MIN':>6} | {'POS':>6} | {'MAX':>6}") + for joint in joints: + print(f"{joint:<15} | {mins[joint]:>6} | {positions[joint]:>6} | {maxes[joint]:>6}") + + if enter_pressed(): + break + + if display_values: + # Move cursor up to overwrite the previous output + move_cursor_up(len(joints) + 3) + + same_min_max = [joint for joint in joints if mins[joint] == maxes[joint]] + if same_min_max: + raise ValueError(f"Some joints have the same min and max values:\n{pformat(same_min_max)}") + + return mins, maxes def configure(self) -> None: - raise NotImplementedError # TODO + pass + + def _normalize(self, values: dict[str, int], joints: list[str] | None = None): + if not self.calibration: + raise RuntimeError(f"{self} has no calibration registered.") + + normalized_values = {} + for joint, val in values.items(): + min_ = self.calibration[joint].range_min + max_ = self.calibration[joint].range_max + drive_mode = self.calibration[joint].drive_mode + bounded_val = min(max_, max(min_, val)) + + if self.joints[joint] is MotorNormMode.RANGE_M100_100: + norm = (((bounded_val - min_) / (max_ - min_)) * 200) - 100 + normalized_values[joint] = -norm if drive_mode else norm + elif self.joints[joint] is MotorNormMode.RANGE_0_100: + norm = ((bounded_val - min_) / (max_ - min_)) * 100 + normalized_values[joint] = 100 - norm if drive_mode else norm + + return normalized_values + + def _read(self, joints: list[str] | None = None, normalize: bool = True) -> dict[str, int | float]: + """ + Return the most recent (single) values from self.last_d, + optionally applying calibration. + """ + with self._lock: + state = self._state + + if joints is not None: + state = {k: v for k, v in state.items() if k in joints} + + if normalize: + state = self._normalize(state, joints) + + return state + + def _async_read(self): + """ + Continuously read from the serial buffer in its own thread and sends values to the main thread through + a queue. + """ + while True: + if self.serial.in_waiting > 0: + self.serial.flush() + raw_values = self.serial.readline().decode("utf-8").strip().split(" ") + if len(raw_values) != 21: # 16 raw + 5 angle values + continue + + try: + joint_angles = { + joint: int(pos) + for joint, pos in zip(self.joints, raw_values[:2] + raw_values[16:], strict=True) + } + except ValueError: + continue + + with self._lock: + self._state = joint_angles def get_action(self) -> dict[str, float]: - raise NotImplementedError # TODO + joint_positions = self._read() + return {f"{joint}.pos": pos for joint, pos in joint_positions.items()} def send_feedback(self, feedback: dict[str, float]) -> None: raise NotImplementedError @@ -115,307 +226,6 @@ class HomonculusArm(Teleoperator): if not self.is_connected: DeviceNotConnectedError(f"{self} is not connected.") - self.thread.join() + self.thread.join(timeout=0.5) self.serial.close() logger.info(f"{self} disconnected.") - - ### WIP below ### - - @property - def joint_names(self): - return list(self.last_d.keys()) - - def read(self, motor_names: list[str] | None = None): - """ - Return the most recent (single) values from self.last_d, - optionally applying calibration. - """ - if motor_names is None: - motor_names = self.joint_names - - # Get raw (last) values - values = np.array([self.last_d[k] for k in motor_names]) - - # print(motor_names) - print(values) - - # Apply calibration if available - if self.calibration is not None: - values = self.apply_calibration(values, motor_names) - print(values) - return values - - def read_running_average(self, motor_names: list[str] | None = None, linearize=False): - """ - Return the AVERAGE of the most recent self.buffer_size (or fewer, if not enough data) readings - for each joint, optionally applying calibration. - """ - if motor_names is None: - motor_names = self.joint_names - - # Gather averaged readings from buffers - smoothed_vals = [] - for name in motor_names: - buf = self.joint_buffer[name] - if len(buf) == 0: - # If no data has been read yet, fall back to last_d - smoothed_vals.append(self.last_d[name]) - else: - # Otherwise, average over the existing buffer - smoothed_vals.append(np.mean(buf)) - - smoothed_vals = np.array(smoothed_vals, dtype=np.float32) - - # Apply calibration if available - if self.calibration is not None: - if False: - for i, joint_name in enumerate(motor_names): - # Re-use the same raw_min / raw_max from the calibration - calib_idx = self.calibration["motor_names"].index(joint_name) - min_reading = self.calibration["start_pos"][calib_idx] - max_reading = self.calibration["end_pos"][calib_idx] - - b_value = smoothed_vals[i] - print(joint_name) - if joint_name == "elbow_flex": - print("elbow") - try: - smoothed_vals[i] = int( - min_reading - + (max_reading - min_reading) - * np.arcsin((b_value - min_reading) / (max_reading - min_reading)) - / (np.pi / 2) - ) - except Exception: - print("not working") - print(smoothed_vals) - print("not working") - smoothed_vals = self.apply_calibration(smoothed_vals, motor_names) - return smoothed_vals - - def read_kalman_filter( - self, process_noise: float = 1.0, measurement_noise: float = 100.0, motors: list[str] | None = None - ) -> np.ndarray: - """ - Return a Kalman-filtered reading for each requested joint. - - We store a separate Kalman filter (x, P) per joint. For each new measurement Z: - 1) Predict: - x_pred = x (assuming no motion model) - P_pred = P + Q - 2) Update: - K = P_pred / (P_pred + R) - x = x_pred + K * (Z - x_pred) - P = (1 - K) * P_pred - - Args: - process_noise (float, optional): Process noise (Q). Larger Q means the estimate can change more - freely. Defaults to 1.0. - measurement_noise (float, optional): Measurement noise (R). Larger R means we trust our sensor - less. Defaults to 100.0. - motors (list[str] | None, optional): If None, all joints are filtered. Defaults to None. - - Returns: - np.ndarray: Kalman-filtered positions. - """ - if motors is None: - motors = self.joint_names - - current_vals = np.array([self.last_d[name] for name in motors], dtype=np.float32) - filtered_vals = np.zeros_like(current_vals) - - for i, name in enumerate(motors): - # Retrieve the filter state for this joint - x = self.kalman_state[name]["x"] - p = self.kalman_state[name]["P"] - z = current_vals[i] - - # If this is the first reading, initialize - if x is None or p is None: - x = z - p = 1.0 # or some large initial uncertainty - - # 1) Predict step - x_pred = x # no velocity model, so x_pred = x - p_pred = p + process_noise - - # 2) Update step - kalman_gain = p_pred / (p_pred + measurement_noise) # Kalman gain - x_new = x_pred + kalman_gain * (z - x_pred) # new state estimate - p_new = (1 - kalman_gain) * p_pred # new covariance - - # Save back - self.kalman_state[name]["x"] = x_new - self.kalman_state[name]["P"] = p_new - - filtered_vals[i] = x_new - - if self.calibration is not None: - filtered_vals = self.apply_calibration(filtered_vals, motors) - - return filtered_vals - - def async_read(self): - """ - Continuously read from the serial buffer in its own thread, - store into `self.last_d` and also append to the rolling buffer (joint_buffer). - """ - while True: - if self.serial.in_waiting > 0: - self.serial.flush() - vals = self.serial.readline().decode("utf-8").strip() - vals = vals.split(" ") - - if len(vals) != 7: - continue - try: - vals = [int(val) for val in vals] # remove last digit - except ValueError: - self.serial.flush() - vals = self.serial.readline().decode("utf-8").strip() - vals = vals.split(" ") - vals = [int(val) for val in vals] - d = { - "wrist_roll": vals[0], - "wrist_yaw": vals[1], - "wrist_pitch": vals[2], - "elbow_flex": vals[3], - "shoulder_roll": vals[4], - "shoulder_yaw": vals[5], - "shoulder_pitch": vals[6], - } - - # Update the last_d dictionary - self.last_d = d - - # Also push these new values into the rolling buffers - for joint_name, joint_val in d.items(): - self.joint_buffer[joint_name].append(joint_val) - - # Optional: short sleep to avoid busy-loop - # time.sleep(0.001) - - def run_calibration(self, robot): - robot.arm_bus.write("Acceleration", 50) - n_joints = len(self.joint_names) - - max_open_all = np.zeros(n_joints, dtype=np.float32) - min_open_all = np.zeros(n_joints, dtype=np.float32) - max_closed_all = np.zeros(n_joints, dtype=np.float32) - min_closed_all = np.zeros(n_joints, dtype=np.float32) - - for i, jname in enumerate(self.joint_names): - print(f"\n--- Calibrating joint '{jname}' ---") - - joint_idx = robot.arm_calib_dict["motor_names"].index(jname) - open_val = robot.arm_calib_dict["start_pos"][joint_idx] - print(f"Commanding {jname} to OPEN position {open_val}...") - robot.arm_bus.write("Goal_Position", [open_val], [jname]) - - input("Physically verify or adjust the joint. Press Enter when ready to capture...") - - open_pos_list = [] - for _ in range(100): - all_joints_vals = self.read() # read entire arm - open_pos_list.append(all_joints_vals[i]) # store only this joint - time.sleep(0.01) - - # Convert to numpy and track min/max - open_array = np.array(open_pos_list, dtype=np.float32) - max_open_all[i] = open_array.max() - min_open_all[i] = open_array.min() - closed_val = robot.arm_calib_dict["end_pos"][joint_idx] - if jname == "elbow_flex": - closed_val = closed_val - 700 - closed_val = robot.arm_calib_dict["end_pos"][joint_idx] - print(f"Commanding {jname} to CLOSED position {closed_val}...") - robot.arm_bus.write("Goal_Position", [closed_val], [jname]) - - input("Physically verify or adjust the joint. Press Enter when ready to capture...") - - closed_pos_list = [] - for _ in range(100): - all_joints_vals = self.read() - closed_pos_list.append(all_joints_vals[i]) - time.sleep(0.01) - - closed_array = np.array(closed_pos_list, dtype=np.float32) - # Some thresholding for closed positions - # closed_array[closed_array < 1000] = 60000 - - max_closed_all[i] = closed_array.max() - min_closed_all[i] = closed_array.min() - - robot.arm_bus.write("Goal_Position", [int((closed_val + open_val) / 2)], [jname]) - - open_pos = np.maximum(max_open_all, max_closed_all) - closed_pos = np.minimum(min_open_all, min_closed_all) - - for i, jname in enumerate(self.joint_names): - if jname not in ["wrist_pitch", "shoulder_pitch"]: - # Swap open/closed for these joints - tmp_pos = open_pos[i] - open_pos[i] = closed_pos[i] - closed_pos[i] = tmp_pos - - # Debug prints - print("\nFinal open/closed arrays after any swaps/inversions:") - print(f"open_pos={open_pos}") - print(f"closed_pos={closed_pos}") - - homing_offset = [0] * n_joints - drive_mode = [0] * n_joints - calib_modes = [CalibrationMode.LINEAR.name] * n_joints - - calib_dict = { - "homing_offset": homing_offset, - "drive_mode": drive_mode, - "start_pos": open_pos, - "end_pos": closed_pos, - "calib_mode": calib_modes, - "motor_names": self.joint_names, - } - file_path = "examples/hopejr/settings/arm_calib.pkl" - - if not os.path.exists(file_path): - with open(file_path, "wb") as f: - pickle.dump(calib_dict, f) # TODO(aliberts): use json - print(f"Dictionary saved to {file_path}") - - self.set_calibration(calib_dict) - - def set_calibration(self, calibration: dict[str, list]): - self.calibration = calibration - - def apply_calibration(self, values: np.ndarray | list, motor_names: list[str] | None): - """ - Example calibration that linearly maps [start_pos, end_pos] to [0,100]. - Extend or modify for your needs. - """ - if motor_names is None: - motor_names = self.joint_names - - values = values.astype(np.float32) - - for i, name in enumerate(motor_names): - calib_idx = self.calibration["motor_names"].index(name) - calib_mode = self.calibration["calib_mode"][calib_idx] - - if CalibrationMode[calib_mode] == CalibrationMode.LINEAR: - start_pos = self.calibration["start_pos"][calib_idx] - end_pos = self.calibration["end_pos"][calib_idx] - - # Rescale the present position to [0, 100] - values[i] = (values[i] - start_pos) / (end_pos - start_pos) * 100 - - # Check boundaries - if (values[i] < LOWER_BOUND_LINEAR) or (values[i] > UPPER_BOUND_LINEAR): - # If you want to handle out-of-range differently: - # raise JointOutOfRangeError(msg) - msg = ( - f"Wrong motor position range detected for {name}. " - f"Value = {values[i]} %, expected within [{LOWER_BOUND_LINEAR}, {UPPER_BOUND_LINEAR}]" - ) - print(msg) - - return values