(WIP) Update arm
This commit is contained in:
@@ -15,71 +15,53 @@
|
|||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
import os
|
|
||||||
import pickle # nosec
|
|
||||||
import threading
|
import threading
|
||||||
import time
|
import time
|
||||||
from collections import deque
|
from pprint import pformat
|
||||||
from enum import Enum
|
|
||||||
|
|
||||||
import numpy as np
|
|
||||||
import serial
|
import serial
|
||||||
|
|
||||||
from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
|
from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
|
||||||
|
from lerobot.common.motors.motors_bus import MotorCalibration, MotorNormMode
|
||||||
|
from lerobot.common.utils.utils import enter_pressed, move_cursor_up
|
||||||
|
|
||||||
from ..teleoperator import Teleoperator
|
from ..teleoperator import Teleoperator
|
||||||
from .config_homonculus import HomonculusArmConfig
|
from .config_homonculus import HomonculusArmConfig
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
LOWER_BOUND_LINEAR = -100
|
|
||||||
UPPER_BOUND_LINEAR = 200
|
|
||||||
|
|
||||||
|
|
||||||
class CalibrationMode(Enum):
|
|
||||||
# Joints with rotational motions are expressed in degrees in nominal range of [-180, 180]
|
|
||||||
DEGREE = 0
|
|
||||||
# Joints with linear motions (like gripper of Aloha) are expressed in nominal range of [0, 100]
|
|
||||||
LINEAR = 1
|
|
||||||
|
|
||||||
|
|
||||||
class HomonculusArm(Teleoperator):
|
class HomonculusArm(Teleoperator):
|
||||||
|
"""
|
||||||
|
Homonculus Arm designed by Hugging Face.
|
||||||
|
"""
|
||||||
|
|
||||||
config_class = HomonculusArmConfig
|
config_class = HomonculusArmConfig
|
||||||
name = "homonculus_arm"
|
name = "homonculus_arm"
|
||||||
|
|
||||||
def __init__(self, config: HomonculusArmConfig):
|
def __init__(self, config: HomonculusArmConfig):
|
||||||
|
super().__init__(config)
|
||||||
self.config = config
|
self.config = config
|
||||||
self.serial = serial.Serial(config.port, config.baud_rate, timeout=1)
|
self.serial = serial.Serial(config.port, config.baud_rate, timeout=1)
|
||||||
self.buffer_size = config.buffer_size
|
|
||||||
|
|
||||||
self.joints = [
|
self.joints = {
|
||||||
"wrist_roll",
|
"wrist_roll": MotorNormMode.RANGE_M100_100,
|
||||||
"wrist_pitch",
|
"wrist_pitch": MotorNormMode.RANGE_M100_100,
|
||||||
"wrist_yaw",
|
"wrist_yaw": MotorNormMode.RANGE_M100_100,
|
||||||
"elbow_flex",
|
"elbow_flex": MotorNormMode.RANGE_M100_100,
|
||||||
"shoulder_roll",
|
"shoulder_roll": MotorNormMode.RANGE_M100_100,
|
||||||
"shoulder_yaw",
|
"shoulder_yaw": MotorNormMode.RANGE_M100_100,
|
||||||
"shoulder_pitch",
|
"shoulder_pitch": MotorNormMode.RANGE_M100_100,
|
||||||
]
|
}
|
||||||
# Initialize a buffer (deque) for each joint
|
self.thread = threading.Thread(target=self._async_read, daemon=True, name=f"{self} _async_read")
|
||||||
self.joints_buffer = {joint: deque(maxlen=self.buffer_size) for joint in self.joints}
|
self._lock = threading.Lock()
|
||||||
|
|
||||||
# Last read dictionary
|
|
||||||
self.last_d = dict.fromkeys(self.joints, 100)
|
|
||||||
|
|
||||||
# For adaptive EMA, we store a "previous smoothed" state per joint
|
|
||||||
self.adaptive_ema_state = dict.fromkeys(self.joints)
|
|
||||||
self.kalman_state = {joint: {"x": None, "P": None} for joint in self.joints}
|
|
||||||
|
|
||||||
self.calibration = None
|
|
||||||
self.thread = threading.Thread(target=self.async_read, daemon=True)
|
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def action_feature(self) -> dict:
|
def action_features(self) -> dict:
|
||||||
return {f"{joint}.pos": float for joint in self.joints}
|
return {f"{joint}.pos": float for joint in self.joints}
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def feedback_feature(self) -> dict:
|
def feedback_features(self) -> dict:
|
||||||
return {}
|
return {}
|
||||||
|
|
||||||
@property
|
@property
|
||||||
@@ -90,23 +72,152 @@ class HomonculusArm(Teleoperator):
|
|||||||
if self.is_connected:
|
if self.is_connected:
|
||||||
raise DeviceAlreadyConnectedError(f"{self} already connected")
|
raise DeviceAlreadyConnectedError(f"{self} already connected")
|
||||||
|
|
||||||
self.serial.open()
|
if not self.serial.is_open:
|
||||||
|
self.serial.open()
|
||||||
self.thread.start()
|
self.thread.start()
|
||||||
self.configure()
|
time.sleep(1) # gives time for the thread to ramp up
|
||||||
logger.info(f"{self} connected.")
|
logger.info(f"{self} connected.")
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def is_calibrated(self) -> bool:
|
def is_calibrated(self) -> bool:
|
||||||
raise NotImplementedError # TODO
|
return self.calibration_fpath.is_file()
|
||||||
|
|
||||||
def calibrate(self) -> None:
|
def calibrate(self) -> None:
|
||||||
raise NotImplementedError # TODO
|
print(
|
||||||
|
"\nMove all joints through their entire range of motion."
|
||||||
|
"\nRecording positions. Press ENTER to stop..."
|
||||||
|
)
|
||||||
|
range_mins, range_maxes = self._record_ranges_of_motion()
|
||||||
|
|
||||||
|
self.calibration = {}
|
||||||
|
for id_, joint in enumerate(self.joints):
|
||||||
|
self.calibration[joint] = MotorCalibration(
|
||||||
|
id=id_,
|
||||||
|
drive_mode=0,
|
||||||
|
homing_offset=0,
|
||||||
|
range_min=range_mins[joint],
|
||||||
|
range_max=range_maxes[joint],
|
||||||
|
)
|
||||||
|
|
||||||
|
self._save_calibration()
|
||||||
|
print("Calibration saved to", self.calibration_fpath)
|
||||||
|
|
||||||
|
def _record_ranges_of_motion(
|
||||||
|
self, joints: list[str] | None = None, display_values: bool = True
|
||||||
|
) -> tuple[dict[str, int], dict[str, int]]:
|
||||||
|
"""Interactively record the min/max encoder values of each joint.
|
||||||
|
|
||||||
|
Move the joints while the method streams live positions. Press :kbd:`Enter` to finish.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
joints (list[str] | None, optional): Joints to record. Defaults to every joint (`None`).
|
||||||
|
display_values (bool, optional): When `True` (default) a live table is printed to the console.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
TypeError: `joints` is not `None` or a list.
|
||||||
|
ValueError: any joint's recorded min and max are the same.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
tuple[dict[str, int], dict[str, int]]: Two dictionaries *mins* and *maxes* with the extreme values
|
||||||
|
observed for each joint.
|
||||||
|
"""
|
||||||
|
if joints is None:
|
||||||
|
joints = list(self.joints)
|
||||||
|
elif not isinstance(joints, list):
|
||||||
|
raise TypeError(joints)
|
||||||
|
|
||||||
|
start_positions = self._read(joints, normalize=False)
|
||||||
|
mins = start_positions.copy()
|
||||||
|
maxes = start_positions.copy()
|
||||||
|
while True:
|
||||||
|
positions = self._read(joints, normalize=False)
|
||||||
|
mins = {joint: min(positions[joint], min_) for joint, min_ in mins.items()}
|
||||||
|
maxes = {joint: max(positions[joint], max_) for joint, max_ in maxes.items()}
|
||||||
|
|
||||||
|
if display_values:
|
||||||
|
print("\n-------------------------------------------")
|
||||||
|
print(f"{'NAME':<15} | {'MIN':>6} | {'POS':>6} | {'MAX':>6}")
|
||||||
|
for joint in joints:
|
||||||
|
print(f"{joint:<15} | {mins[joint]:>6} | {positions[joint]:>6} | {maxes[joint]:>6}")
|
||||||
|
|
||||||
|
if enter_pressed():
|
||||||
|
break
|
||||||
|
|
||||||
|
if display_values:
|
||||||
|
# Move cursor up to overwrite the previous output
|
||||||
|
move_cursor_up(len(joints) + 3)
|
||||||
|
|
||||||
|
same_min_max = [joint for joint in joints if mins[joint] == maxes[joint]]
|
||||||
|
if same_min_max:
|
||||||
|
raise ValueError(f"Some joints have the same min and max values:\n{pformat(same_min_max)}")
|
||||||
|
|
||||||
|
return mins, maxes
|
||||||
|
|
||||||
def configure(self) -> None:
|
def configure(self) -> None:
|
||||||
raise NotImplementedError # TODO
|
pass
|
||||||
|
|
||||||
|
def _normalize(self, values: dict[str, int], joints: list[str] | None = None):
|
||||||
|
if not self.calibration:
|
||||||
|
raise RuntimeError(f"{self} has no calibration registered.")
|
||||||
|
|
||||||
|
normalized_values = {}
|
||||||
|
for joint, val in values.items():
|
||||||
|
min_ = self.calibration[joint].range_min
|
||||||
|
max_ = self.calibration[joint].range_max
|
||||||
|
drive_mode = self.calibration[joint].drive_mode
|
||||||
|
bounded_val = min(max_, max(min_, val))
|
||||||
|
|
||||||
|
if self.joints[joint] is MotorNormMode.RANGE_M100_100:
|
||||||
|
norm = (((bounded_val - min_) / (max_ - min_)) * 200) - 100
|
||||||
|
normalized_values[joint] = -norm if drive_mode else norm
|
||||||
|
elif self.joints[joint] is MotorNormMode.RANGE_0_100:
|
||||||
|
norm = ((bounded_val - min_) / (max_ - min_)) * 100
|
||||||
|
normalized_values[joint] = 100 - norm if drive_mode else norm
|
||||||
|
|
||||||
|
return normalized_values
|
||||||
|
|
||||||
|
def _read(self, joints: list[str] | None = None, normalize: bool = True) -> dict[str, int | float]:
|
||||||
|
"""
|
||||||
|
Return the most recent (single) values from self.last_d,
|
||||||
|
optionally applying calibration.
|
||||||
|
"""
|
||||||
|
with self._lock:
|
||||||
|
state = self._state
|
||||||
|
|
||||||
|
if joints is not None:
|
||||||
|
state = {k: v for k, v in state.items() if k in joints}
|
||||||
|
|
||||||
|
if normalize:
|
||||||
|
state = self._normalize(state, joints)
|
||||||
|
|
||||||
|
return state
|
||||||
|
|
||||||
|
def _async_read(self):
|
||||||
|
"""
|
||||||
|
Continuously read from the serial buffer in its own thread and sends values to the main thread through
|
||||||
|
a queue.
|
||||||
|
"""
|
||||||
|
while True:
|
||||||
|
if self.serial.in_waiting > 0:
|
||||||
|
self.serial.flush()
|
||||||
|
raw_values = self.serial.readline().decode("utf-8").strip().split(" ")
|
||||||
|
if len(raw_values) != 21: # 16 raw + 5 angle values
|
||||||
|
continue
|
||||||
|
|
||||||
|
try:
|
||||||
|
joint_angles = {
|
||||||
|
joint: int(pos)
|
||||||
|
for joint, pos in zip(self.joints, raw_values[:2] + raw_values[16:], strict=True)
|
||||||
|
}
|
||||||
|
except ValueError:
|
||||||
|
continue
|
||||||
|
|
||||||
|
with self._lock:
|
||||||
|
self._state = joint_angles
|
||||||
|
|
||||||
def get_action(self) -> dict[str, float]:
|
def get_action(self) -> dict[str, float]:
|
||||||
raise NotImplementedError # TODO
|
joint_positions = self._read()
|
||||||
|
return {f"{joint}.pos": pos for joint, pos in joint_positions.items()}
|
||||||
|
|
||||||
def send_feedback(self, feedback: dict[str, float]) -> None:
|
def send_feedback(self, feedback: dict[str, float]) -> None:
|
||||||
raise NotImplementedError
|
raise NotImplementedError
|
||||||
@@ -115,307 +226,6 @@ class HomonculusArm(Teleoperator):
|
|||||||
if not self.is_connected:
|
if not self.is_connected:
|
||||||
DeviceNotConnectedError(f"{self} is not connected.")
|
DeviceNotConnectedError(f"{self} is not connected.")
|
||||||
|
|
||||||
self.thread.join()
|
self.thread.join(timeout=0.5)
|
||||||
self.serial.close()
|
self.serial.close()
|
||||||
logger.info(f"{self} disconnected.")
|
logger.info(f"{self} disconnected.")
|
||||||
|
|
||||||
### WIP below ###
|
|
||||||
|
|
||||||
@property
|
|
||||||
def joint_names(self):
|
|
||||||
return list(self.last_d.keys())
|
|
||||||
|
|
||||||
def read(self, motor_names: list[str] | None = None):
|
|
||||||
"""
|
|
||||||
Return the most recent (single) values from self.last_d,
|
|
||||||
optionally applying calibration.
|
|
||||||
"""
|
|
||||||
if motor_names is None:
|
|
||||||
motor_names = self.joint_names
|
|
||||||
|
|
||||||
# Get raw (last) values
|
|
||||||
values = np.array([self.last_d[k] for k in motor_names])
|
|
||||||
|
|
||||||
# print(motor_names)
|
|
||||||
print(values)
|
|
||||||
|
|
||||||
# Apply calibration if available
|
|
||||||
if self.calibration is not None:
|
|
||||||
values = self.apply_calibration(values, motor_names)
|
|
||||||
print(values)
|
|
||||||
return values
|
|
||||||
|
|
||||||
def read_running_average(self, motor_names: list[str] | None = None, linearize=False):
|
|
||||||
"""
|
|
||||||
Return the AVERAGE of the most recent self.buffer_size (or fewer, if not enough data) readings
|
|
||||||
for each joint, optionally applying calibration.
|
|
||||||
"""
|
|
||||||
if motor_names is None:
|
|
||||||
motor_names = self.joint_names
|
|
||||||
|
|
||||||
# Gather averaged readings from buffers
|
|
||||||
smoothed_vals = []
|
|
||||||
for name in motor_names:
|
|
||||||
buf = self.joint_buffer[name]
|
|
||||||
if len(buf) == 0:
|
|
||||||
# If no data has been read yet, fall back to last_d
|
|
||||||
smoothed_vals.append(self.last_d[name])
|
|
||||||
else:
|
|
||||||
# Otherwise, average over the existing buffer
|
|
||||||
smoothed_vals.append(np.mean(buf))
|
|
||||||
|
|
||||||
smoothed_vals = np.array(smoothed_vals, dtype=np.float32)
|
|
||||||
|
|
||||||
# Apply calibration if available
|
|
||||||
if self.calibration is not None:
|
|
||||||
if False:
|
|
||||||
for i, joint_name in enumerate(motor_names):
|
|
||||||
# Re-use the same raw_min / raw_max from the calibration
|
|
||||||
calib_idx = self.calibration["motor_names"].index(joint_name)
|
|
||||||
min_reading = self.calibration["start_pos"][calib_idx]
|
|
||||||
max_reading = self.calibration["end_pos"][calib_idx]
|
|
||||||
|
|
||||||
b_value = smoothed_vals[i]
|
|
||||||
print(joint_name)
|
|
||||||
if joint_name == "elbow_flex":
|
|
||||||
print("elbow")
|
|
||||||
try:
|
|
||||||
smoothed_vals[i] = int(
|
|
||||||
min_reading
|
|
||||||
+ (max_reading - min_reading)
|
|
||||||
* np.arcsin((b_value - min_reading) / (max_reading - min_reading))
|
|
||||||
/ (np.pi / 2)
|
|
||||||
)
|
|
||||||
except Exception:
|
|
||||||
print("not working")
|
|
||||||
print(smoothed_vals)
|
|
||||||
print("not working")
|
|
||||||
smoothed_vals = self.apply_calibration(smoothed_vals, motor_names)
|
|
||||||
return smoothed_vals
|
|
||||||
|
|
||||||
def read_kalman_filter(
|
|
||||||
self, process_noise: float = 1.0, measurement_noise: float = 100.0, motors: list[str] | None = None
|
|
||||||
) -> np.ndarray:
|
|
||||||
"""
|
|
||||||
Return a Kalman-filtered reading for each requested joint.
|
|
||||||
|
|
||||||
We store a separate Kalman filter (x, P) per joint. For each new measurement Z:
|
|
||||||
1) Predict:
|
|
||||||
x_pred = x (assuming no motion model)
|
|
||||||
P_pred = P + Q
|
|
||||||
2) Update:
|
|
||||||
K = P_pred / (P_pred + R)
|
|
||||||
x = x_pred + K * (Z - x_pred)
|
|
||||||
P = (1 - K) * P_pred
|
|
||||||
|
|
||||||
Args:
|
|
||||||
process_noise (float, optional): Process noise (Q). Larger Q means the estimate can change more
|
|
||||||
freely. Defaults to 1.0.
|
|
||||||
measurement_noise (float, optional): Measurement noise (R). Larger R means we trust our sensor
|
|
||||||
less. Defaults to 100.0.
|
|
||||||
motors (list[str] | None, optional): If None, all joints are filtered. Defaults to None.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
np.ndarray: Kalman-filtered positions.
|
|
||||||
"""
|
|
||||||
if motors is None:
|
|
||||||
motors = self.joint_names
|
|
||||||
|
|
||||||
current_vals = np.array([self.last_d[name] for name in motors], dtype=np.float32)
|
|
||||||
filtered_vals = np.zeros_like(current_vals)
|
|
||||||
|
|
||||||
for i, name in enumerate(motors):
|
|
||||||
# Retrieve the filter state for this joint
|
|
||||||
x = self.kalman_state[name]["x"]
|
|
||||||
p = self.kalman_state[name]["P"]
|
|
||||||
z = current_vals[i]
|
|
||||||
|
|
||||||
# If this is the first reading, initialize
|
|
||||||
if x is None or p is None:
|
|
||||||
x = z
|
|
||||||
p = 1.0 # or some large initial uncertainty
|
|
||||||
|
|
||||||
# 1) Predict step
|
|
||||||
x_pred = x # no velocity model, so x_pred = x
|
|
||||||
p_pred = p + process_noise
|
|
||||||
|
|
||||||
# 2) Update step
|
|
||||||
kalman_gain = p_pred / (p_pred + measurement_noise) # Kalman gain
|
|
||||||
x_new = x_pred + kalman_gain * (z - x_pred) # new state estimate
|
|
||||||
p_new = (1 - kalman_gain) * p_pred # new covariance
|
|
||||||
|
|
||||||
# Save back
|
|
||||||
self.kalman_state[name]["x"] = x_new
|
|
||||||
self.kalman_state[name]["P"] = p_new
|
|
||||||
|
|
||||||
filtered_vals[i] = x_new
|
|
||||||
|
|
||||||
if self.calibration is not None:
|
|
||||||
filtered_vals = self.apply_calibration(filtered_vals, motors)
|
|
||||||
|
|
||||||
return filtered_vals
|
|
||||||
|
|
||||||
def async_read(self):
|
|
||||||
"""
|
|
||||||
Continuously read from the serial buffer in its own thread,
|
|
||||||
store into `self.last_d` and also append to the rolling buffer (joint_buffer).
|
|
||||||
"""
|
|
||||||
while True:
|
|
||||||
if self.serial.in_waiting > 0:
|
|
||||||
self.serial.flush()
|
|
||||||
vals = self.serial.readline().decode("utf-8").strip()
|
|
||||||
vals = vals.split(" ")
|
|
||||||
|
|
||||||
if len(vals) != 7:
|
|
||||||
continue
|
|
||||||
try:
|
|
||||||
vals = [int(val) for val in vals] # remove last digit
|
|
||||||
except ValueError:
|
|
||||||
self.serial.flush()
|
|
||||||
vals = self.serial.readline().decode("utf-8").strip()
|
|
||||||
vals = vals.split(" ")
|
|
||||||
vals = [int(val) for val in vals]
|
|
||||||
d = {
|
|
||||||
"wrist_roll": vals[0],
|
|
||||||
"wrist_yaw": vals[1],
|
|
||||||
"wrist_pitch": vals[2],
|
|
||||||
"elbow_flex": vals[3],
|
|
||||||
"shoulder_roll": vals[4],
|
|
||||||
"shoulder_yaw": vals[5],
|
|
||||||
"shoulder_pitch": vals[6],
|
|
||||||
}
|
|
||||||
|
|
||||||
# Update the last_d dictionary
|
|
||||||
self.last_d = d
|
|
||||||
|
|
||||||
# Also push these new values into the rolling buffers
|
|
||||||
for joint_name, joint_val in d.items():
|
|
||||||
self.joint_buffer[joint_name].append(joint_val)
|
|
||||||
|
|
||||||
# Optional: short sleep to avoid busy-loop
|
|
||||||
# time.sleep(0.001)
|
|
||||||
|
|
||||||
def run_calibration(self, robot):
|
|
||||||
robot.arm_bus.write("Acceleration", 50)
|
|
||||||
n_joints = len(self.joint_names)
|
|
||||||
|
|
||||||
max_open_all = np.zeros(n_joints, dtype=np.float32)
|
|
||||||
min_open_all = np.zeros(n_joints, dtype=np.float32)
|
|
||||||
max_closed_all = np.zeros(n_joints, dtype=np.float32)
|
|
||||||
min_closed_all = np.zeros(n_joints, dtype=np.float32)
|
|
||||||
|
|
||||||
for i, jname in enumerate(self.joint_names):
|
|
||||||
print(f"\n--- Calibrating joint '{jname}' ---")
|
|
||||||
|
|
||||||
joint_idx = robot.arm_calib_dict["motor_names"].index(jname)
|
|
||||||
open_val = robot.arm_calib_dict["start_pos"][joint_idx]
|
|
||||||
print(f"Commanding {jname} to OPEN position {open_val}...")
|
|
||||||
robot.arm_bus.write("Goal_Position", [open_val], [jname])
|
|
||||||
|
|
||||||
input("Physically verify or adjust the joint. Press Enter when ready to capture...")
|
|
||||||
|
|
||||||
open_pos_list = []
|
|
||||||
for _ in range(100):
|
|
||||||
all_joints_vals = self.read() # read entire arm
|
|
||||||
open_pos_list.append(all_joints_vals[i]) # store only this joint
|
|
||||||
time.sleep(0.01)
|
|
||||||
|
|
||||||
# Convert to numpy and track min/max
|
|
||||||
open_array = np.array(open_pos_list, dtype=np.float32)
|
|
||||||
max_open_all[i] = open_array.max()
|
|
||||||
min_open_all[i] = open_array.min()
|
|
||||||
closed_val = robot.arm_calib_dict["end_pos"][joint_idx]
|
|
||||||
if jname == "elbow_flex":
|
|
||||||
closed_val = closed_val - 700
|
|
||||||
closed_val = robot.arm_calib_dict["end_pos"][joint_idx]
|
|
||||||
print(f"Commanding {jname} to CLOSED position {closed_val}...")
|
|
||||||
robot.arm_bus.write("Goal_Position", [closed_val], [jname])
|
|
||||||
|
|
||||||
input("Physically verify or adjust the joint. Press Enter when ready to capture...")
|
|
||||||
|
|
||||||
closed_pos_list = []
|
|
||||||
for _ in range(100):
|
|
||||||
all_joints_vals = self.read()
|
|
||||||
closed_pos_list.append(all_joints_vals[i])
|
|
||||||
time.sleep(0.01)
|
|
||||||
|
|
||||||
closed_array = np.array(closed_pos_list, dtype=np.float32)
|
|
||||||
# Some thresholding for closed positions
|
|
||||||
# closed_array[closed_array < 1000] = 60000
|
|
||||||
|
|
||||||
max_closed_all[i] = closed_array.max()
|
|
||||||
min_closed_all[i] = closed_array.min()
|
|
||||||
|
|
||||||
robot.arm_bus.write("Goal_Position", [int((closed_val + open_val) / 2)], [jname])
|
|
||||||
|
|
||||||
open_pos = np.maximum(max_open_all, max_closed_all)
|
|
||||||
closed_pos = np.minimum(min_open_all, min_closed_all)
|
|
||||||
|
|
||||||
for i, jname in enumerate(self.joint_names):
|
|
||||||
if jname not in ["wrist_pitch", "shoulder_pitch"]:
|
|
||||||
# Swap open/closed for these joints
|
|
||||||
tmp_pos = open_pos[i]
|
|
||||||
open_pos[i] = closed_pos[i]
|
|
||||||
closed_pos[i] = tmp_pos
|
|
||||||
|
|
||||||
# Debug prints
|
|
||||||
print("\nFinal open/closed arrays after any swaps/inversions:")
|
|
||||||
print(f"open_pos={open_pos}")
|
|
||||||
print(f"closed_pos={closed_pos}")
|
|
||||||
|
|
||||||
homing_offset = [0] * n_joints
|
|
||||||
drive_mode = [0] * n_joints
|
|
||||||
calib_modes = [CalibrationMode.LINEAR.name] * n_joints
|
|
||||||
|
|
||||||
calib_dict = {
|
|
||||||
"homing_offset": homing_offset,
|
|
||||||
"drive_mode": drive_mode,
|
|
||||||
"start_pos": open_pos,
|
|
||||||
"end_pos": closed_pos,
|
|
||||||
"calib_mode": calib_modes,
|
|
||||||
"motor_names": self.joint_names,
|
|
||||||
}
|
|
||||||
file_path = "examples/hopejr/settings/arm_calib.pkl"
|
|
||||||
|
|
||||||
if not os.path.exists(file_path):
|
|
||||||
with open(file_path, "wb") as f:
|
|
||||||
pickle.dump(calib_dict, f) # TODO(aliberts): use json
|
|
||||||
print(f"Dictionary saved to {file_path}")
|
|
||||||
|
|
||||||
self.set_calibration(calib_dict)
|
|
||||||
|
|
||||||
def set_calibration(self, calibration: dict[str, list]):
|
|
||||||
self.calibration = calibration
|
|
||||||
|
|
||||||
def apply_calibration(self, values: np.ndarray | list, motor_names: list[str] | None):
|
|
||||||
"""
|
|
||||||
Example calibration that linearly maps [start_pos, end_pos] to [0,100].
|
|
||||||
Extend or modify for your needs.
|
|
||||||
"""
|
|
||||||
if motor_names is None:
|
|
||||||
motor_names = self.joint_names
|
|
||||||
|
|
||||||
values = values.astype(np.float32)
|
|
||||||
|
|
||||||
for i, name in enumerate(motor_names):
|
|
||||||
calib_idx = self.calibration["motor_names"].index(name)
|
|
||||||
calib_mode = self.calibration["calib_mode"][calib_idx]
|
|
||||||
|
|
||||||
if CalibrationMode[calib_mode] == CalibrationMode.LINEAR:
|
|
||||||
start_pos = self.calibration["start_pos"][calib_idx]
|
|
||||||
end_pos = self.calibration["end_pos"][calib_idx]
|
|
||||||
|
|
||||||
# Rescale the present position to [0, 100]
|
|
||||||
values[i] = (values[i] - start_pos) / (end_pos - start_pos) * 100
|
|
||||||
|
|
||||||
# Check boundaries
|
|
||||||
if (values[i] < LOWER_BOUND_LINEAR) or (values[i] > UPPER_BOUND_LINEAR):
|
|
||||||
# If you want to handle out-of-range differently:
|
|
||||||
# raise JointOutOfRangeError(msg)
|
|
||||||
msg = (
|
|
||||||
f"Wrong motor position range detected for {name}. "
|
|
||||||
f"Value = {values[i]} %, expected within [{LOWER_BOUND_LINEAR}, {UPPER_BOUND_LINEAR}]"
|
|
||||||
)
|
|
||||||
print(msg)
|
|
||||||
|
|
||||||
return values
|
|
||||||
|
|||||||
Reference in New Issue
Block a user