All tests passing except test_control_robot.py

This commit is contained in:
Remi Cadene
2024-07-09 22:53:39 +02:00
parent a0432f1608
commit 798373e7bf
14 changed files with 493 additions and 168 deletions

View File

@@ -14,15 +14,21 @@ from lerobot.common.robot_devices.motors.dynamixel import (
TorqueMode,
)
from lerobot.common.robot_devices.motors.utils import MotorsBus
from lerobot.common.robot_devices.utils import RobotDeviceAlreadyConnectedError, RobotDeviceNotConnectedError
URL_HORIZONTAL_POSITION = {
"follower": "https://raw.githubusercontent.com/huggingface/lerobot/main/media/koch/follower_horizontal.png",
"leader": "https://raw.githubusercontent.com/huggingface/lerobot/main/media/koch/leader_horizontal.png",
}
URL_90_DEGREE_POSITION = {
"follower": "https://raw.githubusercontent.com/huggingface/lerobot/main/media/koch/follower_90_degree.png",
"leader": "https://raw.githubusercontent.com/huggingface/lerobot/main/media/koch/leader_90_degree.png",
}
########################################################################
# Calibration logic
########################################################################
# TARGET_HORIZONTAL_POSITION = motor_position_to_angle(np.array([0, -1024, 1024, 0, -1024, 0]))
# TARGET_90_DEGREE_POSITION = motor_position_to_angle(np.array([1024, 0, 0, 1024, 0, -1024]))
# GRIPPER_OPEN = motor_position_to_angle(np.array([-400]))
TARGET_HORIZONTAL_POSITION = np.array([0, -1024, 1024, 0, -1024, 0])
TARGET_90_DEGREE_POSITION = np.array([1024, 0, 0, 1024, 0, -1024])
GRIPPER_OPEN = np.array([-400])
@@ -137,11 +143,16 @@ def reset_arm(arm: MotorsBus):
arm.write("Drive_Mode", DriveMode.NON_INVERTED.value)
def run_arm_calibration(arm: MotorsBus, name: str):
def run_arm_calibration(arm: MotorsBus, name: str, arm_type: str):
""" Example of usage:
```python
run_arm_calibration(arm, "left", "follower")
```
"""
reset_arm(arm)
# TODO(rcadene): document what position 1 mean
print(f"Please move the '{name}' arm to the horizontal position (gripper fully closed)")
print(f"Please move the '{name} {arm_type}' arm to the horizontal position (gripper fully closed, see {URL_HORIZONTAL_POSITION[arm_type]})")
input("Press Enter to continue...")
horizontal_homing_offset = compute_homing_offset(
@@ -149,7 +160,7 @@ def run_arm_calibration(arm: MotorsBus, name: str):
)
# TODO(rcadene): document what position 2 mean
print(f"Please move the '{name}' arm to the 90 degree position (gripper fully open)")
print(f"Please move the '{name} {arm_type}' arm to the 90 degree position (gripper fully open, see {URL_90_DEGREE_POSITION[arm_type]})")
input("Press Enter to continue...")
drive_mode = compute_drive_mode(arm, horizontal_homing_offset)
@@ -184,42 +195,15 @@ class KochRobotConfig:
```
"""
# Define all the components of the robot
# Define all components of the robot
leader_arms: dict[str, MotorsBus] = field(
default_factory=lambda: {
"main": DynamixelMotorsBus(
port="/dev/tty.usbmodem575E0031751",
motors={
# name: (index, model)
"shoulder_pan": (1, "xl330-m077"),
"shoulder_lift": (2, "xl330-m077"),
"elbow_flex": (3, "xl330-m077"),
"wrist_flex": (4, "xl330-m077"),
"wrist_roll": (5, "xl330-m077"),
"gripper": (6, "xl330-m077"),
},
),
}
default_factory=lambda: {}
)
follower_arms: dict[str, MotorsBus] = field(
default_factory=lambda: {
"main": DynamixelMotorsBus(
port="/dev/tty.usbmodem575E0032081",
motors={
# name: (index, model)
"shoulder_pan": (1, "xl430-w250"),
"shoulder_lift": (2, "xl430-w250"),
"elbow_flex": (3, "xl330-m288"),
"wrist_flex": (4, "xl330-m288"),
"wrist_roll": (5, "xl330-m288"),
"gripper": (6, "xl330-m288"),
},
),
}
default_factory=lambda: {}
)
cameras: dict[str, Camera] = field(default_factory=lambda: {})
class KochRobot:
"""Tau Robotics: https://tau-robotics.com
@@ -306,6 +290,11 @@ class KochRobot:
# Orders the robot to move
robot.send_action(action)
```
Example of disconnecting which is not mandatory since we disconnect when the object is deleted:
```python
robot.disconnect()
```
"""
def __init__(
@@ -328,59 +317,68 @@ class KochRobot:
def connect(self):
if self.is_connected:
raise ValueError(f"KochRobot is already connected.")
raise RobotDeviceAlreadyConnectedError(f"KochRobot is already connected. Do not run `robot.connect()` twice.")
if not self.leader_arms and not self.follower_arms and not self.cameras:
raise ValueError("KochRobot doesn't have any device to connect. See example of usage in docstring of the class.")
# Connect the arms
for name in self.follower_arms:
self.follower_arms[name].connect()
self.leader_arms[name].connect()
# Reset the arms and load or run calibration
if self.calibration_path.exists():
# Reset all arms before setting calibration
for name in self.follower_arms:
reset_arm(self.follower_arms[name])
for name in self.leader_arms:
reset_arm(self.leader_arms[name])
with open(self.calibration_path, "rb") as f:
calibration = pickle.load(f)
else:
# Run calibration process which begins by reseting all arms
calibration = self.run_calibration()
self.calibration_path.parent.mkdir(parents=True, exist_ok=True)
with open(self.calibration_path, "wb") as f:
pickle.dump(calibration, f)
# Set calibration
for name in self.follower_arms:
self.follower_arms[name].set_calibration(calibration[f"follower_{name}"])
self.follower_arms[name].write("Torque_Enable", 1)
for name in self.leader_arms:
self.leader_arms[name].set_calibration(calibration[f"leader_{name}"])
# TODO(rcadene): add comments
self.leader_arms[name].write("Goal_Position", GRIPPER_OPEN, "gripper")
self.leader_arms[name].write("Torque_Enable", 1, "gripper")
# Enable torque on all motors of the follower arms
for name in self.follower_arms:
self.follower_arms[name].write("Torque_Enable", 1)
# Enable torque on the gripper of the leader arms, and move it to 45 degrees,
# so that we can use it as a trigger to close the gripper of the follower arms.
for name in self.leader_arms:
self.leader_arms[name].write("Torque_Enable", 1, "gripper")
self.leader_arms[name].write("Goal_Position", GRIPPER_OPEN, "gripper")
# Connect the cameras
for name in self.cameras:
self.cameras[name].connect()
self.is_connected = True
def run_calibration(self):
if not self.is_connected:
raise ValueError(f"KochRobot is not connected. You need to run `robot.connect()`.")
calibration = {}
for name in self.follower_arms:
homing_offset, drive_mode = run_arm_calibration(self.follower_arms[name], f"{name} follower")
homing_offset, drive_mode = run_arm_calibration(self.follower_arms[name], name, "follower")
calibration[f"follower_{name}"] = {}
for idx, motor_name in enumerate(self.follower_arms[name].motor_names):
calibration[f"follower_{name}"][motor_name] = (homing_offset[idx], drive_mode[idx])
for name in self.leader_arms:
homing_offset, drive_mode = run_arm_calibration(self.leader_arms[name], f"{name} leader")
homing_offset, drive_mode = run_arm_calibration(self.leader_arms[name], name, "leader")
calibration[f"leader_{name}"] = {}
for idx, motor_name in enumerate(self.leader_arms[name].motor_names):
@@ -392,7 +390,7 @@ class KochRobot:
self, record_data=False
) -> None | tuple[dict[str, torch.Tensor], dict[str, torch.Tensor]]:
if not self.is_connected:
raise ValueError(f"KochRobot is not connected. You need to run `robot.connect()`.")
raise RobotDeviceNotConnectedError(f"KochRobot is not connected. You need to run `robot.connect()`.")
# Prepare to assign the positions of the leader to the follower
leader_pos = {}
@@ -455,7 +453,7 @@ class KochRobot:
def capture_observation(self):
if not self.is_connected:
raise ValueError(f"KochRobot is not connected. You need to run `robot.connect()`.")
raise RobotDeviceNotConnectedError(f"KochRobot is not connected. You need to run `robot.connect()`.")
# Read follower position
follower_pos = {}
@@ -481,9 +479,9 @@ class KochRobot:
obs_dict[f"observation.images.{name}"] = torch.from_numpy(images[name])
return obs_dict
def send_action(self, action):
def send_action(self, action: torch.Tensor):
if not self.is_connected:
raise ValueError(f"KochRobot is not connected. You need to run `robot.connect()`.")
raise RobotDeviceNotConnectedError(f"KochRobot is not connected. You need to run `robot.connect()`.")
from_idx = 0
to_idx = 0
@@ -496,3 +494,22 @@ class KochRobot:
for name in self.follower_arms:
self.follower_arms[name].write("Goal_Position", follower_goal_pos[name].astype(np.int32))
def disconnect(self):
if not self.is_connected:
raise RobotDeviceNotConnectedError(f"KochRobot is not connected. You need to run `robot.connect()` before disconnecting.")
for name in self.follower_arms:
self.follower_arms[name].disconnect()
for name in self.leader_arms:
self.leader_arms[name].disconnect()
for name in self.cameras:
self.cameras[name].disconnect()
self.is_connected = False
def __del__(self):
if getattr(self, "is_connected", False):
self.disconnect()