Add robot_devices and control_robot script

This commit is contained in:
Remi Cadene
2024-07-02 21:09:00 +02:00
parent e410e5d711
commit 858d49fc04
11 changed files with 1588 additions and 23 deletions

View File

@@ -0,0 +1,249 @@
import argparse
from dataclasses import dataclass, replace
from pathlib import Path
from threading import Thread
import time
import cv2
import numpy as np
from lerobot.common.robot_devices.cameras.utils import save_color_image
def find_camera_indices(raise_when_empty=False, max_index_search_range=60):
camera_ids = []
for camera_idx in range(max_index_search_range):
camera = cv2.VideoCapture(camera_idx)
is_open = camera.isOpened()
camera.release()
if is_open:
print(f"Camera found at index {camera_idx}")
camera_ids.append(camera_idx)
if raise_when_empty and len(camera_ids) == 0:
raise OSError("Not a single camera was detected. Try re-plugging, or re-installing `opencv2`, or your camera driver, or make sure your camera is compatible with opencv2.")
return camera_ids
def benchmark_cameras(cameras, out_dir=None, save_images=False, num_warmup_frames=4):
if out_dir:
out_dir = Path(out_dir)
out_dir.mkdir(parents=True, exist_ok=True)
for _ in range(num_warmup_frames):
for camera in cameras:
try:
camera.capture_image()
time.sleep(0.01)
except OSError as e:
print(e)
while True:
now = time.time()
for camera in cameras:
color_image = camera.capture_image("bgr" if save_images else "rgb")
if save_images:
image_path = out_dir / f"camera_{camera.camera_index:02}.png"
print(f"Write to {image_path}")
save_color_image(color_image, image_path, write_shape=True)
dt_s = (time.time() - now)
dt_ms = dt_s * 1000
freq = 1 / dt_s
print(f"Latency (ms): {dt_ms:.2f}\tFrequency: {freq:.2f}")
if save_images:
break
if cv2.waitKey(1) & 0xFF == ord("q"):
break
@dataclass
class OpenCVCameraConfig:
"""
Example of tested options for Intel Real Sense D405:
```python
OpenCVCameraConfig(30, 640, 480)
OpenCVCameraConfig(60, 640, 480)
OpenCVCameraConfig(90, 640, 480)
OpenCVCameraConfig(30, 1280, 720)
```
"""
fps: int | None = None
width: int | None = None
height: int | None = None
color: str = "rgb"
class OpenCVCamera():
# TODO(rcadene): improve dosctring
"""
https://docs.opencv.org/4.x/d0/da7/videoio_overview.html
https://docs.opencv.org/4.x/d4/d15/group__videoio__flags__base.html#ga023786be1ee68a9105bf2e48c700294d
Example of uage:
```python
camera = OpenCVCamera(2)
color_image = camera.capture_image()
```
"""
def __init__(self, camera_index: int, config: OpenCVCameraConfig | None = None, **kwargs):
if config is None:
config = OpenCVCameraConfig()
# Overwrite config arguments using kwargs
config = replace(config, **kwargs)
self.camera_index = camera_index
self.fps = config.fps
self.width = config.width
self.height = config.height
self.color = config.color
if self.color not in ["rgb", "bgr"]:
raise ValueError(f"Expected color values are 'rgb' or 'bgr', but {self.color} is provided.")
if self.camera_index is None:
raise ValueError(f"`camera_index` is expected to be one of these available cameras {OpenCVCamera.AVAILABLE_CAMERAS_INDICES}, but {camera_index} is provided instead.")
self.camera = None
self.is_connected = False
self.t = Thread(target=self.capture_image_loop, args=())
self.t.daemon = True
self._color_image = None
def connect(self):
if self.is_connected:
raise ValueError(f"Camera {self.camera_index} is already connected.")
# First create a temporary camera trying to access `camera_index`,
# and verify it is a valid camera by calling `isOpened`.
tmp_camera = cv2.VideoCapture(self.camera_index)
is_camera_open = tmp_camera.isOpened()
# Release camera to make it accessible for `find_camera_indices`
del tmp_camera
# If the camera doesn't work, display the camera indices corresponding to
# valid cameras.
if not is_camera_open:
# Verify that the provided `camera_index` is valid before printing the traceback
if self.camera_index not in find_camera_indices():
raise ValueError(f"`camera_index` is expected to be one of these available cameras {OpenCVCamera.AVAILABLE_CAMERAS_INDICES}, but {self.camera_index} is provided instead.")
raise OSError(f"Can't access camera {self.camera_index}.")
# Secondly, create the camera that will be used downstream.
# Note: For some unknown reason, calling `isOpened` blocks the camera which then
# needs to be re-created.
self.camera = cv2.VideoCapture(self.camera_index)
if self.fps:
self.camera.set(cv2.CAP_PROP_FPS, self.fps)
if self.width:
self.camera.set(cv2.CAP_PROP_FRAME_WIDTH, self.width)
if self.height:
self.camera.set(cv2.CAP_PROP_FRAME_HEIGHT, self.height)
actual_fps = self.camera.get(cv2.CAP_PROP_FPS)
actual_width = self.camera.get(cv2.CAP_PROP_FRAME_WIDTH)
actual_height = self.camera.get(cv2.CAP_PROP_FRAME_HEIGHT)
if self.fps and self.fps != actual_fps:
raise OSError(f"Can't set {self.fps=} for camera {self.camera_index}. Actual value is {actual_fps}.")
if self.width and self.width != actual_width:
raise OSError(f"Can't set {self.width=} for camera {self.camera_index}. Actual value is {actual_width}.")
if self.height and self.height != actual_height:
raise OSError(f"Can't set {self.height=} for camera {self.camera_index}. Actual value is {actual_height}.")
self.is_connected = True
self.t.start()
def capture_image(self, temporary_color: str | None = None) -> np.ndarray:
if not self.is_connected:
self.connect()
ret, color_image = self.camera.read()
if not ret:
raise OSError(f"Can't capture color image from camera {self.camera_index}.")
if temporary_color is None:
requested_color = self.color
else:
requested_color = temporary_color
if requested_color not in ["rgb", "bgr"]:
raise ValueError(f"Expected color values are 'rgb' or 'bgr', but {requested_color} is provided.")
# OpenCV uses BGR format as default (blue, green red) for all operations, including displaying images.
# However, Deep Learning framework such as LeRobot uses RGB format as default to train neural networks,
# so we convert the image color from BGR to RGB.
if requested_color == "rgb":
color_image = cv2.cvtColor(color_image, cv2.COLOR_BGR2RGB)
return color_image
def capture_image_loop(self):
while True:
self._color_image = self.capture_image()
def read(self):
while self._color_image is None:
time.sleep(0.1)
return self._color_image
def disconnect(self):
if getattr(self, "camera", None):
self.camera.release()
def __del__(self):
self.disconnect()
def save_images_config(config: OpenCVCameraConfig, out_dir: Path):
cameras = []
print(f"Available camera indices: {OpenCVCamera.AVAILABLE_CAMERAS_INDICES}")
for camera_idx in OpenCVCamera.AVAILABLE_CAMERAS_INDICES:
camera = OpenCVCamera(camera_idx, config)
cameras.append(camera)
out_dir = out_dir.parent / f"{out_dir.name}_{config.width}x{config.height}_{config.fps}"
benchmark_cameras(cameras, out_dir, save_images=True)
def benchmark_config(config: OpenCVCameraConfig, camera_ids: list[int]):
cameras = [OpenCVCamera(idx, config) for idx in camera_ids]
benchmark_cameras(cameras)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--mode", type=str, choices=["save_images", 'benchmark'], default="save_images")
parser.add_argument("--camera-ids", type=int, nargs="*", default=[16, 4, 22, 10])
parser.add_argument("--fps", type=int, default=30)
parser.add_argument("--width", type=str, default=640)
parser.add_argument("--height", type=str, default=480)
parser.add_argument("--out-dir", type=Path, default="outputs/benchmark_cameras/opencv/2024_06_22_1727")
args = parser.parse_args()
config = OpenCVCameraConfig(args.fps, args.width, args.height)
# config = OpenCVCameraConfig()
# config = OpenCVCameraConfig(60, 640, 480)
# config = OpenCVCameraConfig(90, 640, 480)
# config = OpenCVCameraConfig(30, 1280, 720)
if args.mode == "save_images":
save_images_config(config, args.out_dir)
elif args.mode == "benchmark":
benchmark_config(config, args.camera_ids)
else:
raise ValueError(args.mode)

View File

@@ -0,0 +1,48 @@
from pathlib import Path
import time
import cv2
from typing import Protocol
import numpy as np
def write_shape_on_image_inplace(image):
height, width = image.shape[:2]
text = f'Width: {width} Height: {height}'
# Define the font, scale, color, and thickness
font = cv2.FONT_HERSHEY_SIMPLEX
font_scale = 1
color = (255, 0, 0) # Blue in BGR
thickness = 2
position = (10, height - 10) # 10 pixels from the bottom-left corner
cv2.putText(image, text, position, font, font_scale, color, thickness)
def save_color_image(image, path, write_shape=False):
path = Path(path)
path.parent.mkdir(parents=True, exist_ok=True)
if write_shape:
write_shape_on_image_inplace(image)
cv2.imwrite(str(path), image)
def save_depth_image(depth, path, write_shape=False):
path = Path(path)
path.parent.mkdir(parents=True, exist_ok=True)
# Apply colormap on depth image (image must be converted to 8-bit per pixel first)
depth_image = cv2.applyColorMap(cv2.convertScaleAbs(depth, alpha=0.03), cv2.COLORMAP_JET)
if write_shape:
write_shape_on_image_inplace(depth_image)
cv2.imwrite(str(path), depth_image)
# Defines a camera type
class Camera(Protocol):
def connect(self): ...
def read(self, temporary_color: str | None = None) -> np.ndarray: ...
def disconnect(self): ...

View File

@@ -0,0 +1,405 @@
from copy import deepcopy
import enum
from typing import Union
import numpy as np
from dynamixel_sdk import PacketHandler, PortHandler, COMM_SUCCESS, GroupSyncRead, GroupSyncWrite
from dynamixel_sdk import DXL_HIBYTE, DXL_HIWORD, DXL_LOBYTE, DXL_LOWORD
PROTOCOL_VERSION = 2.0
BAUD_RATE = 1_000_000
TIMEOUT_MS = 1000
# https://emanual.robotis.com/docs/en/dxl/x/xl330-m077
# https://emanual.robotis.com/docs/en/dxl/x/xl330-m288
# https://emanual.robotis.com/docs/en/dxl/x/xl430-w250
# https://emanual.robotis.com/docs/en/dxl/x/xm430-w350
# https://emanual.robotis.com/docs/en/dxl/x/xm540-w270
# data_name: (address, size_byte)
X_SERIES_CONTROL_TABLE = {
"Model_Number": (0, 2),
"Model_Information": (2, 4),
"Firmware_Version": (6, 1),
"ID": (7, 1),
"Baud_Rate": (8, 1),
"Return_Delay_Time": (9, 1),
"Drive_Mode": (10, 1),
"Operating_Mode": (11, 1),
"Secondary_ID": (12, 1),
"Protocol_Type": (13, 1),
"Homing_Offset": (20, 4),
"Moving_Threshold": (24, 4),
"Temperature_Limit": (31, 1),
"Max_Voltage_Limit": (32, 2),
"Min_Voltage_Limit": (34, 2),
"PWM_Limit": (36, 2),
"Current_Limit": (38, 2),
"Acceleration_Limit": (40, 4),
"Velocity_Limit": (44, 4),
"Max_Position_Limit": (48, 4),
"Min_Position_Limit": (52, 4),
"Shutdown": (63, 1),
"Torque_Enable": (64, 1),
"LED": (65, 1),
"Status_Return_Level": (68, 1),
"Registered_Instruction": (69, 1),
"Hardware_Error_Status": (70, 1),
"Velocity_I_Gain": (76, 2),
"Velocity_P_Gain": (78, 2),
"Position_D_Gain": (80, 2),
"Position_I_Gain": (82, 2),
"Position_P_Gain": (84, 2),
"Feedforward_2nd_Gain": (88, 2),
"Feedforward_1st_Gain": (90, 2),
"Bus_Watchdog": (98, 1),
"Goal_PWM": (100, 2),
"Goal_Current": (102, 2),
"Goal_Velocity": (104, 4),
"Profile_Acceleration": (108, 4),
"Profile_Velocity": (112, 4),
"Goal_Position": (116, 4),
"Realtime_Tick": (120, 2),
"Moving": (122, 1),
"Moving_Status": (123, 1),
"Present_PWM": (124, 2),
"Present_Current": (126, 2),
"Present_Velocity": (128, 4),
"Present_Position": (132, 4),
"Velocity_Trajectory": (136, 4),
"Position_Trajectory": (140, 4),
"Present_Input_Voltage": (144, 2),
"Present_Temperature": (146, 1)
}
CALIBRATION_REQUIRED = ["Goal_Position", "Present_Position"]
CONVERT_UINT32_TO_INT32_REQUIRED = ["Goal_Position", "Present_Position"]
#CONVERT_POSITION_TO_ANGLE_REQUIRED = ["Goal_Position", "Present_Position"]
CONVERT_POSITION_TO_ANGLE_REQUIRED = []
MODEL_CONTROL_TABLE = {
"x_series": X_SERIES_CONTROL_TABLE,
"xl330-m077": X_SERIES_CONTROL_TABLE,
"xl330-m288": X_SERIES_CONTROL_TABLE,
"xl430-w250": X_SERIES_CONTROL_TABLE,
"xm430-w350": X_SERIES_CONTROL_TABLE,
"xm540-w270": X_SERIES_CONTROL_TABLE,
}
def uint32_to_int32(values: np.ndarray):
"""
Convert an unsigned 32-bit integer array to a signed 32-bit integer array.
"""
for i in range(len(values)):
if values[i] is not None and values[i] > 2147483647:
values[i] = values[i] - 4294967296
return values
def int32_to_uint32(values: np.ndarray):
"""
Convert a signed 32-bit integer array to an unsigned 32-bit integer array.
"""
for i in range(len(values)):
if values[i] is not None and values[i] < 0:
values[i] = values[i] + 4294967296
return values
def motor_position_to_angle(position: np.ndarray) -> np.ndarray:
"""
Convert from motor position in [-2048, 2048] to radian in [-pi, pi]
"""
return (position / 2048) * 3.14
def motor_angle_to_position(angle: np.ndarray) -> np.ndarray:
"""
Convert from radian in [-pi, pi] to motor position in [-2048, 2048]
"""
return ((angle / 3.14) * 2048).astype(np.int64)
# def pwm2vel(pwm: np.ndarray) -> np.ndarray:
# """
# :param pwm: numpy array of pwm/s joint velocities
# :return: numpy array of rad/s joint velocities
# """
# return pwm * 3.14 / 2048
# def vel2pwm(vel: np.ndarray) -> np.ndarray:
# """
# :param vel: numpy array of rad/s joint velocities
# :return: numpy array of pwm/s joint velocities
# """
# return (vel * 2048 / 3.14).astype(np.int64)
def get_group_sync_key(data_name, motor_names):
group_key = f"{data_name}_" + "_".join([name for name in motor_names])
return group_key
class TorqueMode(enum.Enum):
ENABLED = 1
DISABLED = 0
class OperatingMode(enum.Enum):
VELOCITY = 1
POSITION = 3
EXTENDED_POSITION = 4
CURRENT_CONTROLLED_POSITION = 5
PWM = 16
UNKNOWN = -1
class DriveMode(enum.Enum):
NON_INVERTED = 0
INVERTED = 1
class DynamixelMotorsBus:
def __init__(self, port: str, motors: dict[str, tuple[int, str]],
extra_model_control_table: dict[str, list[tuple]] | None = None):
self.port = port
self.motors = motors
self.model_ctrl_table = deepcopy(MODEL_CONTROL_TABLE)
if extra_model_control_table:
self.model_ctrl_table.update(extra_model_control_table)
self.port_handler = PortHandler(self.port)
self.packet_handler = PacketHandler(PROTOCOL_VERSION)
if not self.port_handler.openPort():
raise OSError(f"Failed to open port {self.port}")
self.port_handler.setBaudRate(BAUD_RATE)
self.port_handler.setPacketTimeoutMillis(TIMEOUT_MS)
self.group_readers = {}
self.group_writers = {}
self.calibration = None
@property
def motor_names(self) -> list[int]:
return list(self.motors.keys())
def set_calibration(self, calibration: dict[str, tuple[int, bool]]):
self.calibration = calibration
def apply_calibration(self, values: np.ndarray | list, motor_names: list[str] | None):
if not self.calibration:
return values
if motor_names is None:
motor_names = self.motor_names
for i, name in enumerate(motor_names):
homing_offset, drive_mode = self.calibration[name]
if values[i] is not None:
if drive_mode:
values[i] *= -1
values[i] += homing_offset
return values
def revert_calibration(self, values: np.ndarray | list, motor_names: list[str] | None):
if not self.calibration:
return values
if motor_names is None:
motor_names = self.motor_names
for i, name in enumerate(motor_names):
homing_offset, drive_mode = self.calibration[name]
if values[i] is not None:
values[i] -= homing_offset
if drive_mode:
values[i] *= -1
return values
def read(self, data_name, motor_names: list[str] | None = None):
if motor_names is None:
motor_names = self.motor_names
motor_ids = []
models = []
for name in motor_names:
motor_idx, model = self.motors[name]
motor_ids.append(motor_idx)
models.append(model)
# TODO(rcadene): assert all motors follow same address
addr, bytes = self.model_ctrl_table[model][data_name]
group_key = get_group_sync_key(data_name, motor_names)
if data_name not in self.group_readers:
# create new group reader
self.group_readers[group_key] = GroupSyncRead(self.port_handler, self.packet_handler, addr, bytes)
for idx in motor_ids:
self.group_readers[group_key].addParam(idx)
comm = self.group_readers[group_key].txRxPacket()
if comm != COMM_SUCCESS:
raise ConnectionError(
f"Read failed due to communication error on port {self.port} for group_key {group_key}: "
f"{self.packet_handler.getTxRxResult(comm)}"
)
values = []
for idx in motor_ids:
value = self.group_readers[group_key].getData(idx, addr, bytes)
values.append(value)
values = np.array(values)
# TODO(rcadene): explain why
if data_name in CONVERT_UINT32_TO_INT32_REQUIRED:
values = uint32_to_int32(values)
if data_name in CALIBRATION_REQUIRED:
values = self.apply_calibration(values, motor_names)
if data_name in CONVERT_POSITION_TO_ANGLE_REQUIRED:
values = motor_position_to_angle(values)
return values
def write(self, data_name, values: int | float | np.ndarray, motor_names: str | list[str] | None = None):
if motor_names is None:
motor_names = self.motor_names
if isinstance(motor_names, str):
motor_names = [motor_names]
motor_ids = []
models = []
for name in motor_names:
motor_idx, model = self.motors[name]
motor_ids.append(motor_idx)
models.append(model)
if isinstance(values, (int, float, np.integer)):
values = [int(values)] * len(motor_ids)
values = np.array(values)
if data_name in CONVERT_POSITION_TO_ANGLE_REQUIRED:
values = motor_angle_to_position(values)
if data_name in CALIBRATION_REQUIRED:
values = self.revert_calibration(values, motor_names)
# TODO(rcadene): why dont we do it?
# if data_name in CONVERT_INT32_TO_UINT32_REQUIRED:
# values = int32_to_uint32(values)
values = values.tolist()
# TODO(rcadene): assert all motors follow same address
addr, bytes = self.model_ctrl_table[model][data_name]
group_key = get_group_sync_key(data_name, motor_names)
init_group = data_name not in self.group_readers
if init_group:
self.group_writers[group_key] = GroupSyncWrite(self.port_handler, self.packet_handler, addr, bytes)
for idx, value in zip(motor_ids, values):
if bytes == 1:
data = [
DXL_LOBYTE(DXL_LOWORD(value)),
]
elif bytes == 2:
data = [
DXL_LOBYTE(DXL_LOWORD(value)),
DXL_HIBYTE(DXL_LOWORD(value)),
]
elif bytes == 4:
data = [
DXL_LOBYTE(DXL_LOWORD(value)),
DXL_HIBYTE(DXL_LOWORD(value)),
DXL_LOBYTE(DXL_HIWORD(value)),
DXL_HIBYTE(DXL_HIWORD(value)),
]
else:
raise NotImplementedError(
f"Value of the number of bytes to be sent is expected to be in [1, 2, 4], but "
f"{bytes} is provided instead.")
if init_group:
self.group_writers[group_key].addParam(idx, data)
else:
self.group_writers[group_key].changeParam(idx, data)
comm = self.group_writers[group_key].txPacket()
if comm != COMM_SUCCESS:
raise ConnectionError(
f"Write failed due to communication error on port {self.port} for group_key {group_key}: "
f"{self.packet_handler.getTxRxResult(comm)}"
)
# def read(self, data_name, motor_name: str):
# motor_idx, model = self.motors[motor_name]
# addr, bytes = self.model_ctrl_table[model][data_name]
# args = (self.port_handler, motor_idx, addr)
# if bytes == 1:
# value, comm, err = self.packet_handler.read1ByteTxRx(*args)
# elif bytes == 2:
# value, comm, err = self.packet_handler.read2ByteTxRx(*args)
# elif bytes == 4:
# value, comm, err = self.packet_handler.read4ByteTxRx(*args)
# else:
# raise NotImplementedError(
# f"Value of the number of bytes to be sent is expected to be in [1, 2, 4], but "
# f"{bytes} is provided instead.")
# if comm != COMM_SUCCESS:
# raise ConnectionError(
# f"Read failed due to communication error on port {self.port} for motor {motor_idx}: "
# f"{self.packet_handler.getTxRxResult(comm)}"
# )
# elif err != 0:
# raise ConnectionError(
# f"Read failed due to error {err} on port {self.port} for motor {motor_idx}: "
# f"{self.packet_handler.getTxRxResult(err)}"
# )
# if data_name in CALIBRATION_REQUIRED:
# value = self.apply_calibration([value], [motor_name])[0]
# return value
# def write(self, data_name, value, motor_name: str):
# if data_name in CALIBRATION_REQUIRED:
# value = self.revert_calibration([value], [motor_name])[0]
# motor_idx, model = self.motors[motor_name]
# addr, bytes = self.model_ctrl_table[model][data_name]
# args = (self.port_handler, motor_idx, addr, value)
# if bytes == 1:
# comm, err = self.packet_handler.write1ByteTxRx(*args)
# elif bytes == 2:
# comm, err = self.packet_handler.write2ByteTxRx(*args)
# elif bytes == 4:
# comm, err = self.packet_handler.write4ByteTxRx(*args)
# else:
# raise NotImplementedError(
# f"Value of the number of bytes to be sent is expected to be in [1, 2, 4], but {bytes} "
# f"is provided instead.")
# if comm != COMM_SUCCESS:
# raise ConnectionError(
# f"Write failed due to communication error on port {self.port} for motor {motor_idx}: "
# f"{self.packet_handler.getTxRxResult(comm)}"
# )
# elif err != 0:
# raise ConnectionError(
# f"Write failed due to error {err} on port {self.port} for motor {motor_idx}: "
# f"{self.packet_handler.getTxRxResult(err)}"
# )

View File

@@ -0,0 +1,9 @@
from typing import Protocol
class MotorsBus(Protocol):
def motor_names(self): ...
def set_calibration(self): ...
def apply_calibration(self): ...
def revert_calibration(self): ...
def read(self): ...
def write(self): ...

View File

@@ -0,0 +1,46 @@
def make_robot(name):
if name == "koch":
from lerobot.common.robot_devices.robots.koch import KochRobot
from lerobot.common.robot_devices.motors.dynamixel import DynamixelMotorsBus
from lerobot.common.robot_devices.cameras.opencv import OpenCVCamera
robot = KochRobot(
leader_arms={
"main": DynamixelMotorsBus(
port="/dev/tty.usbmodem575E0031751",
motors={
# name: (index, model)
"shoulder_pan": (1, "xl330-m077"),
"shoulder_lift": (2, "xl330-m077"),
"elbow_flex": (3, "xl330-m077"),
"wrist_flex": (4, "xl330-m077"),
"wrist_roll": (5, "xl330-m077"),
"gripper": (6, "xl330-m077"),
},
),
},
follower_arms={
"main": DynamixelMotorsBus(
port="/dev/tty.usbmodem575E0032081",
motors={
# name: (index, model)
"shoulder_pan": (1, "xl430-w250"),
"shoulder_lift": (2, "xl430-w250"),
"elbow_flex": (3, "xl330-m288"),
"wrist_flex": (4, "xl330-m288"),
"wrist_roll": (5, "xl330-m288"),
"gripper": (6, "xl330-m288"),
},
),
},
cameras={
"main": OpenCVCamera(1, fps=30, width=640, height=480),
}
)
else:
raise ValueError(f"Robot '{name}' not found.")
return robot

View File

@@ -0,0 +1,365 @@
import copy
from dataclasses import dataclass, field, replace
from pathlib import Path
import pickle
import numpy as np
import torch
from lerobot.common.robot_devices.cameras.opencv import OpenCVCamera
from lerobot.common.robot_devices.cameras.utils import Camera
from lerobot.common.robot_devices.motors.dynamixel import DriveMode, DynamixelMotorsBus, OperatingMode, TorqueMode, motor_position_to_angle
from lerobot.common.robot_devices.motors.utils import MotorsBus
########################################################################
# Calibration logic
########################################################################
# TARGET_HORIZONTAL_POSITION = motor_position_to_angle(np.array([0, -1024, 1024, 0, -1024, 0]))
# TARGET_90_DEGREE_POSITION = motor_position_to_angle(np.array([1024, 0, 0, 1024, 0, -1024]))
# GRIPPER_OPEN = motor_position_to_angle(np.array([-400]))
TARGET_HORIZONTAL_POSITION = np.array([0, -1024, 1024, 0, -1024, 0])
TARGET_90_DEGREE_POSITION = np.array([1024, 0, 0, 1024, 0, -1024])
GRIPPER_OPEN = np.array([-400])
def apply_homing_offset(values: np.array, homing_offset: np.array) -> np.array:
for i in range(len(values)):
if values[i] is not None:
values[i] += homing_offset[i]
return values
def apply_drive_mode(values: np.array, drive_mode: np.array) -> np.array:
for i in range(len(values)):
if values[i] is not None and drive_mode[i]:
values[i] = -values[i]
return values
def apply_calibration(values: np.array, homing_offset: np.array, drive_mode: np.array) -> np.array:
values = apply_drive_mode(values, drive_mode)
values = apply_homing_offset(values, homing_offset)
return values
def revert_calibration(values: np.array, homing_offset: np.array, drive_mode: np.array) -> np.array:
"""
Transform working position into real position for the robot.
"""
values = apply_homing_offset(values, np.array([
-homing_offset if homing_offset is not None else None for homing_offset in homing_offset
]))
values = apply_drive_mode(values, drive_mode)
return values
def revert_appropriate_positions(positions: np.array, drive_mode: list[bool]) -> np.array:
for i, revert in enumerate(drive_mode):
if not revert and positions[i] is not None:
positions[i] = -positions[i]
return positions
def compute_corrections(positions: np.array, drive_mode: list[bool], target_position: np.array) -> np.array:
correction = revert_appropriate_positions(positions, drive_mode)
for i in range(len(positions)):
if correction[i] is not None:
if drive_mode[i]:
correction[i] -= target_position[i]
else:
correction[i] += target_position[i]
return correction
def compute_nearest_rounded_positions(positions: np.array) -> np.array:
return np.array(
[round(positions[i] / 1024) * 1024 if positions[i] is not None else None for i in range(len(positions))])
def compute_homing_offset(arm: DynamixelMotorsBus, drive_mode: list[bool], target_position: np.array) -> np.array:
# Get the present positions of the servos
present_positions = apply_calibration(
arm.read("Present_Position"),
np.array([0, 0, 0, 0, 0, 0]),
drive_mode)
nearest_positions = compute_nearest_rounded_positions(present_positions)
correction = compute_corrections(nearest_positions, drive_mode, target_position)
return correction
def compute_drive_mode(arm: DynamixelMotorsBus, offset: np.array):
# Get current positions
present_positions = apply_calibration(
arm.read("Present_Position"),
offset,
np.array([False, False, False, False, False, False]))
nearest_positions = compute_nearest_rounded_positions(present_positions)
# construct 'drive_mode' list comparing nearest_positions and TARGET_90_DEGREE_POSITION
drive_mode = []
for i in range(len(nearest_positions)):
drive_mode.append(nearest_positions[i] != TARGET_90_DEGREE_POSITION[i])
return drive_mode
def reset_arm(arm: MotorsBus):
# To be configured, all servos must be in "torque disable" mode
arm.write("Torque_Enable", TorqueMode.DISABLED.value)
# Use 'extended position mode' for all motors except gripper, because in joint mode the servos can't
# rotate more than 360 degrees (from 0 to 4095) And some mistake can happen while assembling the arm,
# you could end up with a servo with a position 0 or 4095 at a crucial point See [
# https://emanual.robotis.com/docs/en/dxl/x/x_series/#operating-mode11]
all_motors_except_gripper = [name for name in arm.motor_names if name != "gripper"]
arm.write("Operating_Mode", OperatingMode.EXTENDED_POSITION.value, all_motors_except_gripper)
# TODO(rcadene): why?
# Use 'position control current based' for gripper
arm.write("Operating_Mode", OperatingMode.CURRENT_CONTROLLED_POSITION.value, "gripper")
# Make sure the native calibration (homing offset abd drive mode) is disabled, since we use our own calibration layer to be more generic
arm.write("Homing_Offset", 0)
arm.write("Drive_Mode", DriveMode.NON_INVERTED.value)
def run_arm_calibration(arm: MotorsBus, name: str):
reset_arm(arm)
# TODO(rcadene): document what position 1 mean
print(f"Please move the '{name}' arm to the horizontal position (gripper fully closed)")
input("Press Enter to continue...")
horizontal_homing_offset = compute_homing_offset(arm, [False, False, False, False, False, False], TARGET_HORIZONTAL_POSITION)
# TODO(rcadene): document what position 2 mean
print(f"Please move the '{name}' arm to the 90 degree position (gripper fully open)")
input("Press Enter to continue...")
drive_mode = compute_drive_mode(arm, horizontal_homing_offset)
homing_offset = compute_homing_offset(arm, drive_mode, TARGET_90_DEGREE_POSITION)
# Invert offset for all drive_mode servos
for i in range(len(drive_mode)):
if drive_mode[i]:
homing_offset[i] = -homing_offset[i]
print("Calibration is done!")
print("=====================================")
print(" HOMING_OFFSET: ", " ".join([str(i) for i in homing_offset]))
print(" DRIVE_MODE: ", " ".join([str(i) for i in drive_mode]))
print("=====================================")
return homing_offset, drive_mode
########################################################################
# Alexander Koch robot arm
########################################################################
@dataclass
class KochRobotConfig:
"""
Example of usage:
```python
KochRobotConfig()
```
"""
# Define all the components of the robot
leader_arms: dict[str, MotorsBus] = field(
default_factory=lambda: {
"main": DynamixelMotorsBus(
port="/dev/tty.usbmodem575E0031751",
motors={
# name: (index, model)
"shoulder_pan": (1, "xl330-m077"),
"shoulder_lift": (2, "xl330-m077"),
"elbow_flex": (3, "xl330-m077"),
"wrist_flex": (4, "xl330-m077"),
"wrist_roll": (5, "xl330-m077"),
"gripper": (6, "xl330-m077"),
},
),
}
)
follower_arms: dict[str, MotorsBus] = field(
default_factory=lambda: {
"main": DynamixelMotorsBus(
port="/dev/tty.usbmodem575E0032081",
motors={
# name: (index, model)
"shoulder_pan": (1, "xl430-w250"),
"shoulder_lift": (2, "xl430-w250"),
"elbow_flex": (3, "xl330-m288"),
"wrist_flex": (4, "xl330-m288"),
"wrist_roll": (5, "xl330-m288"),
"gripper": (6, "xl330-m288"),
},
),
}
)
cameras: dict[str, Camera] = field(
default_factory=lambda: {}
)
class KochRobot():
""" Tau Robotics: https://tau-robotics.com
Example of usage:
```python
robot = KochRobot()
```
"""
def __init__(self, config: KochRobotConfig | None = None, calibration_path: Path = ".cache/calibration/koch.pkl", **kwargs):
if config is None:
config = KochRobotConfig()
# Overwrite config arguments using kwargs
self.config = replace(config, **kwargs)
self.calibration_path = Path(calibration_path)
self.leader_arms = self.config.leader_arms
self.follower_arms = self.config.follower_arms
self.cameras = self.config.cameras
def init_teleop(self):
if self.calibration_path.exists():
# Reset all arms before setting calibration
for name in self.follower_arms:
reset_arm(self.follower_arms[name])
for name in self.leader_arms:
reset_arm(self.leader_arms[name])
with open(self.calibration_path, 'rb') as f:
calibration = pickle.load(f)
else:
calibration = self.run_calibration()
self.calibration_path.parent.mkdir(parents=True, exist_ok=True)
with open(self.calibration_path, 'wb') as f:
pickle.dump(calibration, f)
for name in self.follower_arms:
self.follower_arms[name].set_calibration(calibration[f"follower_{name}"])
self.follower_arms[name].write("Torque_Enable", 1)
for name in self.leader_arms:
self.leader_arms[name].set_calibration(calibration[f"leader_{name}"])
# TODO(rcadene): add comments
self.leader_arms[name].write("Goal_Position", GRIPPER_OPEN, "gripper")
self.leader_arms[name].write("Torque_Enable", 1, "gripper")
for name in self.cameras:
self.cameras[name].connect()
def run_calibration(self):
calibration = {}
for name in self.follower_arms:
homing_offset, drive_mode = run_arm_calibration(self.follower_arms[name], f"{name} follower")
calibration[f"follower_{name}"] = {}
for idx, motor_name in enumerate(self.follower_arms[name].motor_names):
calibration[f"follower_{name}"][motor_name] = (homing_offset[idx], drive_mode[idx])
for name in self.leader_arms:
homing_offset, drive_mode = run_arm_calibration(self.leader_arms[name], f"{name} leader")
calibration[f"leader_{name}"] = {}
for idx, motor_name in enumerate(self.leader_arms[name].motor_names):
calibration[f"leader_{name}"][motor_name] = (homing_offset[idx], drive_mode[idx])
return calibration
def teleop_step(self, record_data=False) -> None | tuple[dict[str, torch.Tensor], dict[str, torch.Tensor]]:
# Prepare to assign the positions of the leader to the follower
leader_pos = {}
for name in self.leader_arms:
leader_pos[name] = self.leader_arms[name].read("Present_Position")
follower_goal_pos = {}
for name in self.leader_arms:
follower_goal_pos[name] = leader_pos[name]
# Send action
for name in self.follower_arms:
self.follower_arms[name].write("Goal_Position", follower_goal_pos[name])
# Early exit when recording data is not requested
if not record_data:
return
# Read follower position
follower_pos = {}
for name in self.follower_arms:
follower_pos[name] = self.follower_arms[name].read("Present_Position")
# Create state by concatenating follower current position
state = []
for name in self.follower_arms:
if name in follower_pos:
state.append(follower_pos[name])
state = np.concatenate(state)
# Create action by concatenating follower goal position
action = []
for name in self.follower_arms:
if name in follower_goal_pos:
action.append(follower_goal_pos[name])
action = np.concatenate(action)
# Capture images from cameras
images = {}
for name in self.cameras:
images[name] = self.cameras[name].read()
# Populate output dictionnaries and format to pytorch
obs_dict, action_dict = {}, {}
obs_dict["observation.state"] = torch.from_numpy(state)
action_dict["action"] = torch.from_numpy(action)
for name in self.cameras:
obs_dict[f"observation.images.{name}"] = torch.from_numpy(images[name])
return obs_dict, action_dict
def capture_observation(self):
# Read follower position
follower_pos = {}
for name in self.follower_arms:
follower_pos[name] = self.follower_arms[name].read("Present_Position")
# Create state by concatenating follower current position
state = []
for name in self.follower_arms:
if name in follower_pos:
state.append(follower_pos[name])
state = np.concatenate(state)
# Capture images from cameras
images = {}
for name in self.cameras:
images[name] = self.cameras[name].read()
# Populate output dictionnaries and format to pytorch
obs_dict = {}
obs_dict["observation.state"] = torch.from_numpy(state)
for name in self.cameras:
obs_dict[f"observation.images.{name}"] = torch.from_numpy(images[name])
return obs_dict
def send_action(self, action):
from_idx = 0
to_idx = 0
follower_goal_pos = {}
for name in self.follower_arms:
if name in self.follower_arms:
to_idx += len(self.follower_arms[name].motor_names)
follower_goal_pos[name] = action[from_idx:to_idx].numpy()
from_idx = to_idx
for name in self.follower_arms:
self.follower_arms[name].write("Goal_Position", follower_goal_pos[name].astype(np.int32))

View File

@@ -0,0 +1,8 @@
from typing import Protocol
class Robot(Protocol):
def init_teleop(self): ...
def run_calibration(self): ...
def teleop_step(self, record_data=False): ...
def capture_observation(self): ...
def send_action(self, action): ...