Rename record replay; Remove run_policy; Add koch.yaml; Move to degree
This commit is contained in:
@@ -98,6 +98,15 @@ MODEL_CONTROL_TABLE = {
|
||||
"xm540-w270": X_SERIES_CONTROL_TABLE,
|
||||
}
|
||||
|
||||
MODEL_RESOLUTION = {
|
||||
"x_series": 4096,
|
||||
"xl330-m077": 4096,
|
||||
"xl330-m288": 4096,
|
||||
"xl430-w250": 4096,
|
||||
"xm430-w350": 4096,
|
||||
"xm540-w270": 4096,
|
||||
}
|
||||
|
||||
NUM_READ_RETRY = 10
|
||||
|
||||
|
||||
@@ -233,6 +242,7 @@ class DynamixelMotorsBus:
|
||||
port: str,
|
||||
motors: dict[str, tuple[int, str]],
|
||||
extra_model_control_table: dict[str, list[tuple]] | None = None,
|
||||
extra_model_resolution: dict[str, int] | None = None,
|
||||
):
|
||||
self.port = port
|
||||
self.motors = motors
|
||||
@@ -241,6 +251,10 @@ class DynamixelMotorsBus:
|
||||
if extra_model_control_table:
|
||||
self.model_ctrl_table.update(extra_model_control_table)
|
||||
|
||||
self.model_resolution = deepcopy(MODEL_RESOLUTION)
|
||||
if extra_model_resolution:
|
||||
self.model_resolution.update(extra_model_resolution)
|
||||
|
||||
self.port_handler = None
|
||||
self.packet_handler = None
|
||||
self.calibration = None
|
||||
@@ -281,36 +295,80 @@ class DynamixelMotorsBus:
|
||||
self.calibration = calibration
|
||||
|
||||
def apply_calibration(self, values: np.ndarray | list, motor_names: list[str] | None):
|
||||
if not self.calibration:
|
||||
return values
|
||||
"""Convert from unsigned int32 joint position range [0, 2**32[ to the universal float32 centered degree range [-180.0, 180.0[
|
||||
|
||||
Joints values are original in [0, 2**32[ (unsigned int32). Each motor are expected to complete a full rotation
|
||||
when given a goal position that is + or - their resolution. For instance, dynamixel xl330-m077 have a resolution of 4096, and
|
||||
at any position in their original range, let's say the position 56734, they complete a full rotation clockwise by moving to 60830,
|
||||
or anticlockwise by moving to 42638. The position in the original range is arbitrary and might change a lot between each motor.
|
||||
To harmonize between motors of the same model, different robots, or even models of different brands, we propose to work
|
||||
in the centered degree range [-180, 180[. This function first applies the pre-computed calibration to convert
|
||||
from [0, 2**32[ to [-2048, 2048[, then divide by 2048.
|
||||
"""
|
||||
if motor_names is None:
|
||||
motor_names = self.motor_names
|
||||
|
||||
# Convert from unsigned int32 original range [0, 2**32[ to centered signed int32 range [-2**31, 2**31[
|
||||
values = values.astype(np.int32)
|
||||
|
||||
for i, name in enumerate(motor_names):
|
||||
homing_offset, drive_mode = self.calibration[name]
|
||||
|
||||
if values[i] is not None:
|
||||
if drive_mode:
|
||||
values[i] *= -1
|
||||
values[i] += homing_offset
|
||||
# Update direction of rotation of the motor to match between leader and follower. In fact, the motor of the leader for a given joint
|
||||
# can be assembled in an opposite direction in term of rotation than the motor of the follower on the same joint.
|
||||
if drive_mode:
|
||||
values[i] *= -1
|
||||
|
||||
# Convert from range [-2**31, 2**31[ to centered resolution range [-resolution, resolution[ (e.g. [-2048, 2048[)
|
||||
values[i] += homing_offset
|
||||
|
||||
# Convert from range [-resolution, resolution[ to the universal float32 centered degree range [-180, 180[
|
||||
values = values.astype(np.float32)
|
||||
for i, name in enumerate(motor_names):
|
||||
_, model = self.motors[name]
|
||||
resolution = self.model_resolution[model]
|
||||
values[i] = values[i] / (resolution // 2) * 180
|
||||
|
||||
if (values < -180).any() or (values >= 180).any():
|
||||
raise ValueError(
|
||||
f"At least one of the motor has a joint value outside of its centered degree range of [-180, 180[."
|
||||
'This "jump of range" can be caused by a hardware issue, or you might have unexpectedly completed a full rotation of the motor '
|
||||
"during manipulation or transportation of your robot. Try to recalibrate all motors by setting a different "
|
||||
"`calibration_path` during the instatiation of your robot. "
|
||||
f"The values and motors: {values} {motor_names}"
|
||||
)
|
||||
|
||||
return values
|
||||
|
||||
def revert_calibration(self, values: np.ndarray | list, motor_names: list[str] | None):
|
||||
if not self.calibration:
|
||||
return values
|
||||
|
||||
if motor_names is None:
|
||||
motor_names = self.motor_names
|
||||
|
||||
if (values < -180).any() or (values >= 180).any():
|
||||
raise ValueError(
|
||||
f"At least one of the motor has a joint value outside of its centered degree range of [-180, 180[. "
|
||||
f"The values and motors: {values} {motor_names}"
|
||||
)
|
||||
|
||||
# Convert from the universal float32 centered degree range [-180, 180[ to centered resolution range [-resolution, resolution[
|
||||
for i, name in enumerate(motor_names):
|
||||
_, model = self.motors[name]
|
||||
resolution = self.model_resolution[model]
|
||||
|
||||
values[i] = values[i] / 180 * (resolution // 2)
|
||||
|
||||
values = np.round(values).astype(np.int32)
|
||||
|
||||
# Convert from range [-resolution, resolution[ to centered signed int32 range [-2**31, 2**31[
|
||||
for i, name in enumerate(motor_names):
|
||||
homing_offset, drive_mode = self.calibration[name]
|
||||
values[i] -= homing_offset
|
||||
|
||||
if values[i] is not None:
|
||||
values[i] -= homing_offset
|
||||
if drive_mode:
|
||||
values[i] *= -1
|
||||
# Update direction of rotation of the motor that was matching between leader and follower to their original direction.
|
||||
# In fact, the motor of the leader for a given joint can be assembled in an opposite direction in term of rotation
|
||||
# than the motor of the follower on the same joint.
|
||||
if drive_mode:
|
||||
values[i] *= -1
|
||||
|
||||
return values
|
||||
|
||||
@@ -367,7 +425,7 @@ class DynamixelMotorsBus:
|
||||
if data_name in CONVERT_UINT32_TO_INT32_REQUIRED:
|
||||
values = values.astype(np.int32)
|
||||
|
||||
if data_name in CALIBRATION_REQUIRED:
|
||||
if data_name in CALIBRATION_REQUIRED and self.calibration is not None:
|
||||
values = self.apply_calibration(values, motor_names)
|
||||
|
||||
# log the number of seconds it took to read the data from the motors
|
||||
@@ -406,7 +464,7 @@ class DynamixelMotorsBus:
|
||||
motor_ids.append(motor_idx)
|
||||
models.append(model)
|
||||
|
||||
if data_name in CALIBRATION_REQUIRED:
|
||||
if data_name in CALIBRATION_REQUIRED and self.calibration is not None:
|
||||
values = self.revert_calibration(values, motor_names)
|
||||
|
||||
values = values.tolist()
|
||||
|
||||
@@ -1,46 +1,7 @@
|
||||
def make_robot(name):
|
||||
if name == "koch":
|
||||
# TODO(rcadene): Add configurable robot from command line and yaml config
|
||||
# TODO(rcadene): Add example with and without cameras
|
||||
from lerobot.common.robot_devices.cameras.opencv import OpenCVCamera
|
||||
from lerobot.common.robot_devices.motors.dynamixel import DynamixelMotorsBus
|
||||
from lerobot.common.robot_devices.robots.koch import KochRobot
|
||||
import hydra
|
||||
from omegaconf import DictConfig
|
||||
|
||||
robot = KochRobot(
|
||||
leader_arms={
|
||||
"main": DynamixelMotorsBus(
|
||||
port="/dev/tty.usbmodem575E0031751",
|
||||
motors={
|
||||
# name: (index, model)
|
||||
"shoulder_pan": (1, "xl330-m077"),
|
||||
"shoulder_lift": (2, "xl330-m077"),
|
||||
"elbow_flex": (3, "xl330-m077"),
|
||||
"wrist_flex": (4, "xl330-m077"),
|
||||
"wrist_roll": (5, "xl330-m077"),
|
||||
"gripper": (6, "xl330-m077"),
|
||||
},
|
||||
),
|
||||
},
|
||||
follower_arms={
|
||||
"main": DynamixelMotorsBus(
|
||||
port="/dev/tty.usbmodem575E0032081",
|
||||
motors={
|
||||
# name: (index, model)
|
||||
"shoulder_pan": (1, "xl430-w250"),
|
||||
"shoulder_lift": (2, "xl430-w250"),
|
||||
"elbow_flex": (3, "xl330-m288"),
|
||||
"wrist_flex": (4, "xl330-m288"),
|
||||
"wrist_roll": (5, "xl330-m288"),
|
||||
"gripper": (6, "xl330-m288"),
|
||||
},
|
||||
),
|
||||
},
|
||||
cameras={
|
||||
"laptop": OpenCVCamera(0, fps=30, width=640, height=480),
|
||||
"phone": OpenCVCamera(1, fps=30, width=640, height=480),
|
||||
},
|
||||
)
|
||||
else:
|
||||
raise ValueError(f"Robot '{name}' not found.")
|
||||
|
||||
def make_robot(cfg: DictConfig):
|
||||
robot = hydra.utils.instantiate(cfg)
|
||||
return robot
|
||||
|
||||
@@ -29,9 +29,12 @@ URL_90_DEGREE_POSITION = {
|
||||
# Calibration logic
|
||||
########################################################################
|
||||
|
||||
# In range ]-2048, 2048[
|
||||
TARGET_HORIZONTAL_POSITION = np.array([0, -1024, 1024, 0, -1024, 0])
|
||||
TARGET_90_DEGREE_POSITION = np.array([1024, 0, 0, 1024, 0, -1024])
|
||||
GRIPPER_OPEN = np.array([-400])
|
||||
|
||||
# In range ]-180, 180[
|
||||
GRIPPER_OPEN = np.array([-35.156])
|
||||
|
||||
|
||||
def apply_homing_offset(values: np.array, homing_offset: np.array) -> np.array:
|
||||
@@ -500,11 +503,7 @@ class KochRobot:
|
||||
obs_dict = {}
|
||||
obs_dict["observation.state"] = torch.from_numpy(state)
|
||||
for name in self.cameras:
|
||||
# Convert to pytorch format: channel first and float32 in [0,1]
|
||||
img = torch.from_numpy(images[name])
|
||||
img = img.type(torch.float32) / 255
|
||||
img = img.permute(2, 0, 1).contiguous()
|
||||
obs_dict[f"observation.images.{name}"] = img
|
||||
obs_dict[f"observation.images.{name}"] = torch.from_numpy(images[name])
|
||||
return obs_dict
|
||||
|
||||
def send_action(self, action: torch.Tensor):
|
||||
|
||||
Reference in New Issue
Block a user