Improve control robot ; Add process to configure motor indices (#326)

Co-authored-by: Simon Alibert <alibert.sim@gmail.com>
Co-authored-by: jess-moss <jess.moss@dextrousrobotics.com>
Co-authored-by: Marina Barannikov <marina.barannikov@huggingface.co>
Co-authored-by: Alexander Soare <alexander.soare159@gmail.com>
This commit is contained in:
Remi
2024-08-15 18:11:33 +02:00
committed by GitHub
parent 8c4643687c
commit bbe9057225
35 changed files with 2085 additions and 476 deletions

View File

@@ -1,46 +1,7 @@
def make_robot(name):
if name == "koch":
# TODO(rcadene): Add configurable robot from command line and yaml config
# TODO(rcadene): Add example with and without cameras
from lerobot.common.robot_devices.cameras.opencv import OpenCVCamera
from lerobot.common.robot_devices.motors.dynamixel import DynamixelMotorsBus
from lerobot.common.robot_devices.robots.koch import KochRobot
import hydra
from omegaconf import DictConfig
robot = KochRobot(
leader_arms={
"main": DynamixelMotorsBus(
port="/dev/tty.usbmodem575E0031751",
motors={
# name: (index, model)
"shoulder_pan": (1, "xl330-m077"),
"shoulder_lift": (2, "xl330-m077"),
"elbow_flex": (3, "xl330-m077"),
"wrist_flex": (4, "xl330-m077"),
"wrist_roll": (5, "xl330-m077"),
"gripper": (6, "xl330-m077"),
},
),
},
follower_arms={
"main": DynamixelMotorsBus(
port="/dev/tty.usbmodem575E0032081",
motors={
# name: (index, model)
"shoulder_pan": (1, "xl430-w250"),
"shoulder_lift": (2, "xl430-w250"),
"elbow_flex": (3, "xl330-m288"),
"wrist_flex": (4, "xl330-m288"),
"wrist_roll": (5, "xl330-m288"),
"gripper": (6, "xl330-m288"),
},
),
},
cameras={
"laptop": OpenCVCamera(0, fps=30, width=640, height=480),
"phone": OpenCVCamera(1, fps=30, width=640, height=480),
},
)
else:
raise ValueError(f"Robot '{name}' not found.")
def make_robot(cfg: DictConfig):
robot = hydra.utils.instantiate(cfg)
return robot

View File

@@ -8,122 +8,43 @@ import torch
from lerobot.common.robot_devices.cameras.utils import Camera
from lerobot.common.robot_devices.motors.dynamixel import (
DriveMode,
DynamixelMotorsBus,
OperatingMode,
TorqueMode,
convert_degrees_to_steps,
)
from lerobot.common.robot_devices.motors.utils import MotorsBus
from lerobot.common.robot_devices.utils import RobotDeviceAlreadyConnectedError, RobotDeviceNotConnectedError
URL_HORIZONTAL_POSITION = {
"follower": "https://raw.githubusercontent.com/huggingface/lerobot/main/media/koch/follower_horizontal.png",
"leader": "https://raw.githubusercontent.com/huggingface/lerobot/main/media/koch/leader_horizontal.png",
}
URL_90_DEGREE_POSITION = {
"follower": "https://raw.githubusercontent.com/huggingface/lerobot/main/media/koch/follower_90_degree.png",
"leader": "https://raw.githubusercontent.com/huggingface/lerobot/main/media/koch/leader_90_degree.png",
}
########################################################################
# Calibration logic
########################################################################
TARGET_HORIZONTAL_POSITION = np.array([0, -1024, 1024, 0, -1024, 0])
TARGET_90_DEGREE_POSITION = np.array([1024, 0, 0, 1024, 0, -1024])
GRIPPER_OPEN = np.array([-400])
URL_TEMPLATE = (
"https://raw.githubusercontent.com/huggingface/lerobot/main/media/{robot}/{arm}_{position}.webp"
)
# In nominal degree range ]-180, +180[
ZERO_POSITION_DEGREE = 0
ROTATED_POSITION_DEGREE = 90
GRIPPER_OPEN_DEGREE = 35.156
def apply_homing_offset(values: np.array, homing_offset: np.array) -> np.array:
for i in range(len(values)):
if values[i] is not None:
values[i] += homing_offset[i]
return values
def assert_drive_mode(drive_mode):
# `drive_mode` is in [0,1] with 0 means original rotation direction for the motor, and 1 means inverted.
if not np.all(np.isin(drive_mode, [0, 1])):
raise ValueError(f"`drive_mode` contains values other than 0 or 1: ({drive_mode})")
def apply_drive_mode(values: np.array, drive_mode: np.array) -> np.array:
for i in range(len(values)):
if values[i] is not None and drive_mode[i]:
values[i] = -values[i]
return values
def apply_drive_mode(position, drive_mode):
assert_drive_mode(drive_mode)
# Convert `drive_mode` from [0, 1] with 0 indicates original rotation direction and 1 inverted,
# to [-1, 1] with 1 indicates original rotation direction and -1 inverted.
signed_drive_mode = -(drive_mode * 2 - 1)
position *= signed_drive_mode
return position
def apply_calibration(values: np.array, homing_offset: np.array, drive_mode: np.array) -> np.array:
values = apply_drive_mode(values, drive_mode)
values = apply_homing_offset(values, homing_offset)
return values
def revert_calibration(values: np.array, homing_offset: np.array, drive_mode: np.array) -> np.array:
"""
Transform working position into real position for the robot.
"""
values = apply_homing_offset(
values,
np.array([-homing_offset if homing_offset is not None else None for homing_offset in homing_offset]),
)
values = apply_drive_mode(values, drive_mode)
return values
def revert_appropriate_positions(positions: np.array, drive_mode: list[bool]) -> np.array:
for i, revert in enumerate(drive_mode):
if not revert and positions[i] is not None:
positions[i] = -positions[i]
return positions
def compute_corrections(positions: np.array, drive_mode: list[bool], target_position: np.array) -> np.array:
correction = revert_appropriate_positions(positions, drive_mode)
for i in range(len(positions)):
if correction[i] is not None:
if drive_mode[i]:
correction[i] -= target_position[i]
else:
correction[i] += target_position[i]
return correction
def compute_nearest_rounded_positions(positions: np.array) -> np.array:
return np.array(
[
round(positions[i] / 1024) * 1024 if positions[i] is not None else None
for i in range(len(positions))
]
)
def compute_homing_offset(
arm: DynamixelMotorsBus, drive_mode: list[bool], target_position: np.array
) -> np.array:
# Get the present positions of the servos
present_positions = apply_calibration(
arm.read("Present_Position"), np.array([0, 0, 0, 0, 0, 0]), drive_mode
)
nearest_positions = compute_nearest_rounded_positions(present_positions)
correction = compute_corrections(nearest_positions, drive_mode, target_position)
return correction
def compute_drive_mode(arm: DynamixelMotorsBus, offset: np.array):
# Get current positions
present_positions = apply_calibration(
arm.read("Present_Position"), offset, np.array([False, False, False, False, False, False])
)
nearest_positions = compute_nearest_rounded_positions(present_positions)
# construct 'drive_mode' list comparing nearest_positions and TARGET_90_DEGREE_POSITION
drive_mode = []
for i in range(len(nearest_positions)):
drive_mode.append(nearest_positions[i] != TARGET_90_DEGREE_POSITION[i])
return drive_mode
def reset_arm(arm: MotorsBus):
def reset_torque_mode(arm: MotorsBus):
# To be configured, all servos must be in "torque disable" mode
arm.write("Torque_Enable", TorqueMode.DISABLED.value)
@@ -132,55 +53,95 @@ def reset_arm(arm: MotorsBus):
# you could end up with a servo with a position 0 or 4095 at a crucial point See [
# https://emanual.robotis.com/docs/en/dxl/x/x_series/#operating-mode11]
all_motors_except_gripper = [name for name in arm.motor_names if name != "gripper"]
arm.write("Operating_Mode", OperatingMode.EXTENDED_POSITION.value, all_motors_except_gripper)
if len(all_motors_except_gripper) > 0:
arm.write("Operating_Mode", OperatingMode.EXTENDED_POSITION.value, all_motors_except_gripper)
# TODO(rcadene): why?
# Use 'position control current based' for gripper
# Use 'position control current based' for gripper to be limited by the limit of the current.
# For the follower gripper, it means it can grasp an object without forcing too much even tho,
# it's goal position is a complete grasp (both gripper fingers are ordered to join and reach a touch).
# For the leader gripper, it means we can use it as a physical trigger, since we can force with our finger
# to make it move, and it will move back to its original target position when we release the force.
arm.write("Operating_Mode", OperatingMode.CURRENT_CONTROLLED_POSITION.value, "gripper")
# Make sure the native calibration (homing offset abd drive mode) is disabled, since we use our own calibration layer to be more generic
arm.write("Homing_Offset", 0)
arm.write("Drive_Mode", DriveMode.NON_INVERTED.value)
def run_arm_calibration(arm: MotorsBus, name: str, arm_type: str):
"""Example of usage:
"""This function ensures that a neural network trained on data collected on a given robot
can work on another robot. For instance before calibration, setting a same goal position
for each motor of two different robots will get two very different positions. But after calibration,
the two robots will move to the same position.To this end, this function computes the homing offset
and the drive mode for each motor of a given robot.
Homing offset is used to shift the motor position to a ]-2048, +2048[ nominal range (when the motor uses 2048 steps
to complete a half a turn). This range is set around an arbitrary "zero position" corresponding to all motor positions
being 0. During the calibration process, you will need to manually move the robot to this "zero position".
Drive mode is used to invert the rotation direction of the motor. This is useful when some motors have been assembled
in the opposite orientation for some robots. During the calibration process, you will need to manually move the robot
to the "rotated position".
After calibration, the homing offsets and drive modes are stored in a cache.
Example of usage:
```python
run_arm_calibration(arm, "left", "follower")
```
"""
reset_arm(arm)
reset_torque_mode(arm)
# TODO(rcadene): document what position 1 mean
print(
f"Please move the '{name} {arm_type}' arm to the horizontal position (gripper fully closed, see {URL_HORIZONTAL_POSITION[arm_type]})"
)
print(f"\nRunning calibration of {name} {arm_type}...")
print("\nMove arm to zero position")
print("See: " + URL_TEMPLATE.format(robot="koch", arm=arm_type, position="zero"))
input("Press Enter to continue...")
horizontal_homing_offset = compute_homing_offset(
arm, [False, False, False, False, False, False], TARGET_HORIZONTAL_POSITION
)
# We arbitrarely choosed our zero target position to be a straight horizontal position with gripper upwards and closed.
# It is easy to identify and all motors are in a "quarter turn" position. Once calibration is done, this position will
# corresponds to every motor angle being 0. If you set all 0 as Goal Position, the arm will move in this position.
zero_position = convert_degrees_to_steps(ZERO_POSITION_DEGREE, arm.motor_models)
# TODO(rcadene): document what position 2 mean
print(
f"Please move the '{name} {arm_type}' arm to the 90 degree position (gripper fully open, see {URL_90_DEGREE_POSITION[arm_type]})"
)
def _compute_nearest_rounded_position(position, models):
# TODO(rcadene): Rework this function since some motors cant physically rotate a quarter turn
# (e.g. the gripper of Aloha arms can only rotate ~50 degree)
quarter_turn_degree = 90
quarter_turn = convert_degrees_to_steps(quarter_turn_degree, models)
nearest_pos = np.round(position.astype(float) / quarter_turn) * quarter_turn
return nearest_pos.astype(position.dtype)
# Compute homing offset so that `present_position + homing_offset ~= target_position`.
position = arm.read("Present_Position")
position = _compute_nearest_rounded_position(position, arm.motor_models)
homing_offset = zero_position - position
print("\nMove arm to rotated target position")
print("See: " + URL_TEMPLATE.format(robot="koch", arm=arm_type, position="rotated"))
input("Press Enter to continue...")
drive_mode = compute_drive_mode(arm, horizontal_homing_offset)
homing_offset = compute_homing_offset(arm, drive_mode, TARGET_90_DEGREE_POSITION)
# The rotated target position corresponds to a rotation of a quarter turn from the zero position.
# This allows to identify the rotation direction of each motor.
# For instance, if the motor rotates 90 degree, and its value is -90 after applying the homing offset, then we know its rotation direction
# is inverted. However, for the calibration being successful, we need everyone to follow the same target position.
# Sometimes, there is only one possible rotation direction. For instance, if the gripper is closed, there is only one direction which
# corresponds to opening the gripper. When the rotation direction is ambiguous, we arbitrarely rotate clockwise from the point of view
# of the previous motor in the kinetic chain.
rotated_position = convert_degrees_to_steps(ROTATED_POSITION_DEGREE, arm.motor_models)
# Invert offset for all drive_mode servos
for i in range(len(drive_mode)):
if drive_mode[i]:
homing_offset[i] = -homing_offset[i]
# Find drive mode by rotating each motor by a quarter of a turn.
# Drive mode indicates if the motor rotation direction should be inverted (=1) or not (=0).
position = arm.read("Present_Position")
position += homing_offset
position = _compute_nearest_rounded_position(position, arm.motor_models)
drive_mode = (position != rotated_position).astype(np.int32)
print("Calibration is done!")
# Re-compute homing offset to take into account drive mode
position = arm.read("Present_Position")
position = apply_drive_mode(position, drive_mode)
position = _compute_nearest_rounded_position(position, arm.motor_models)
homing_offset = rotated_position - position
print("=====================================")
print(" HOMING_OFFSET: ", " ".join([str(i) for i in homing_offset]))
print(" DRIVE_MODE: ", " ".join([str(i) for i in drive_mode]))
print("=====================================")
print("\nMove arm to rest position")
print("See: " + URL_TEMPLATE.format(robot="koch", arm=arm_type, position="rest"))
input("Press Enter to continue...")
print()
return homing_offset, drive_mode
@@ -207,7 +168,12 @@ class KochRobotConfig:
class KochRobot:
# TODO(rcadene): Implement force feedback
"""Tau Robotics: https://tau-robotics.com
"""This class allows to control any Koch robot of various number of motors.
A few versions are available:
- [Koch v1.0](https://github.com/AlexanderKoch-Koch/low_cost_robot), with and without the wrist-to-elbow expansion, which was developed
by Alexander Koch from [Tau Robotics](https://tau-robotics.com): [Github for sourcing and assembly](
- [Koch v1.1])https://github.com/jess-moss/koch-v1-1), which was developed by Jess Moss.
Example of highest frequency teleoperation without camera:
```python
@@ -261,12 +227,12 @@ class KochRobot:
Example of highest frequency data collection with cameras:
```python
# Defines how to communicate with 2 cameras connected to the computer.
# Here, the webcam of the mackbookpro and the iphone (connected in USB to the macbookpro)
# Here, the webcam of the laptop and the phone (connected in USB to the laptop)
# can be reached respectively using the camera indices 0 and 1. These indices can be
# arbitrary. See the documentation of `OpenCVCamera` to find your own camera indices.
cameras = {
"macbookpro": OpenCVCamera(camera_index=0, fps=30, width=640, height=480),
"iphone": OpenCVCamera(camera_index=1, fps=30, width=640, height=480),
"laptop": OpenCVCamera(camera_index=0, fps=30, width=640, height=480),
"phone": OpenCVCamera(camera_index=1, fps=30, width=640, height=480),
}
# Assumes leader and follower arms have been instantiated already (see first example)
@@ -330,23 +296,27 @@ class KochRobot:
# Connect the arms
for name in self.follower_arms:
print(f"Connecting {name} follower arm.")
self.follower_arms[name].connect()
print(f"Connecting {name} leader arm.")
self.leader_arms[name].connect()
# Reset the arms and load or run calibration
if self.calibration_path.exists():
# Reset all arms before setting calibration
for name in self.follower_arms:
reset_arm(self.follower_arms[name])
reset_torque_mode(self.follower_arms[name])
for name in self.leader_arms:
reset_arm(self.leader_arms[name])
reset_torque_mode(self.leader_arms[name])
with open(self.calibration_path, "rb") as f:
calibration = pickle.load(f)
else:
print(f"Missing calibration file '{self.calibration_path}'. Starting calibration precedure.")
# Run calibration process which begins by reseting all arms
calibration = self.run_calibration()
print(f"Calibration is done! Saving calibration file '{self.calibration_path}'")
self.calibration_path.parent.mkdir(parents=True, exist_ok=True)
with open(self.calibration_path, "wb") as f:
pickle.dump(calibration, f)
@@ -366,13 +336,14 @@ class KochRobot:
# Enable torque on all motors of the follower arms
for name in self.follower_arms:
print(f"Activating torque on {name} follower arm.")
self.follower_arms[name].write("Torque_Enable", 1)
# Enable torque on the gripper of the leader arms, and move it to 45 degrees,
# so that we can use it as a trigger to close the gripper of the follower arms.
for name in self.leader_arms:
self.leader_arms[name].write("Torque_Enable", 1, "gripper")
self.leader_arms[name].write("Goal_Position", GRIPPER_OPEN, "gripper")
self.leader_arms[name].write("Goal_Position", GRIPPER_OPEN_DEGREE, "gripper")
# Connect the cameras
for name in self.cameras:
@@ -407,12 +378,12 @@ class KochRobot:
"KochRobot is not connected. You need to run `robot.connect()`."
)
# Prepare to assign the positions of the leader to the follower
# Prepare to assign the position of the leader to the follower
leader_pos = {}
for name in self.leader_arms:
now = time.perf_counter()
before_lread_t = time.perf_counter()
leader_pos[name] = self.leader_arms[name].read("Present_Position")
self.logs[f"read_leader_{name}_pos_dt_s"] = time.perf_counter() - now
self.logs[f"read_leader_{name}_pos_dt_s"] = time.perf_counter() - before_lread_t
follower_goal_pos = {}
for name in self.leader_arms:
@@ -420,9 +391,9 @@ class KochRobot:
# Send action
for name in self.follower_arms:
now = time.perf_counter()
before_fwrite_t = time.perf_counter()
self.follower_arms[name].write("Goal_Position", follower_goal_pos[name])
self.logs[f"write_follower_{name}_goal_pos_dt_s"] = time.perf_counter() - now
self.logs[f"write_follower_{name}_goal_pos_dt_s"] = time.perf_counter() - before_fwrite_t
# Early exit when recording data is not requested
if not record_data:
@@ -432,9 +403,9 @@ class KochRobot:
# Read follower position
follower_pos = {}
for name in self.follower_arms:
now = time.perf_counter()
before_fread_t = time.perf_counter()
follower_pos[name] = self.follower_arms[name].read("Present_Position")
self.logs[f"read_follower_{name}_pos_dt_s"] = time.perf_counter() - now
self.logs[f"read_follower_{name}_pos_dt_s"] = time.perf_counter() - before_fread_t
# Create state by concatenating follower current position
state = []
@@ -453,10 +424,10 @@ class KochRobot:
# Capture images from cameras
images = {}
for name in self.cameras:
now = time.perf_counter()
before_camread_t = time.perf_counter()
images[name] = self.cameras[name].async_read()
self.logs[f"read_camera_{name}_dt_s"] = self.cameras[name].logs["delta_timestamp_s"]
self.logs[f"async_read_camera_{name}_dt_s"] = time.perf_counter() - now
self.logs[f"async_read_camera_{name}_dt_s"] = time.perf_counter() - before_camread_t
# Populate output dictionnaries and format to pytorch
obs_dict, action_dict = {}, {}
@@ -477,9 +448,9 @@ class KochRobot:
# Read follower position
follower_pos = {}
for name in self.follower_arms:
now = time.perf_counter()
before_fread_t = time.perf_counter()
follower_pos[name] = self.follower_arms[name].read("Present_Position")
self.logs[f"read_follower_{name}_pos_dt_s"] = time.perf_counter() - now
self.logs[f"read_follower_{name}_pos_dt_s"] = time.perf_counter() - before_fread_t
# Create state by concatenating follower current position
state = []
@@ -491,20 +462,16 @@ class KochRobot:
# Capture images from cameras
images = {}
for name in self.cameras:
now = time.perf_counter()
before_camread_t = time.perf_counter()
images[name] = self.cameras[name].async_read()
self.logs[f"read_camera_{name}_dt_s"] = self.cameras[name].logs["delta_timestamp_s"]
self.logs[f"async_read_camera_{name}_dt_s"] = time.perf_counter() - now
self.logs[f"async_read_camera_{name}_dt_s"] = time.perf_counter() - before_camread_t
# Populate output dictionnaries and format to pytorch
obs_dict = {}
obs_dict["observation.state"] = torch.from_numpy(state)
for name in self.cameras:
# Convert to pytorch format: channel first and float32 in [0,1]
img = torch.from_numpy(images[name])
img = img.type(torch.float32) / 255
img = img.permute(2, 0, 1).contiguous()
obs_dict[f"observation.images.{name}"] = img
obs_dict[f"observation.images.{name}"] = torch.from_numpy(images[name])
return obs_dict
def send_action(self, action: torch.Tensor):