Add support for Stretch (hello-robot) (#409)

Co-authored-by: Remi <remi.cadene@huggingface.co>
Co-authored-by: Remi Cadene <re.cadene@gmail.com>
This commit is contained in:
Simon Alibert
2024-10-04 18:56:42 +02:00
committed by GitHub
parent 26f97cfd17
commit 1a343c3591
20 changed files with 5052 additions and 1652 deletions

View File

@@ -159,30 +159,25 @@ def convert_to_bytes(value, bytes, mock=False):
if mock:
return value
from dynamixel_sdk import (
DXL_HIBYTE,
DXL_HIWORD,
DXL_LOBYTE,
DXL_LOWORD,
)
import dynamixel_sdk as dxl
# Note: No need to convert back into unsigned int, since this byte preprocessing
# already handles it for us.
if bytes == 1:
data = [
DXL_LOBYTE(DXL_LOWORD(value)),
dxl.DXL_LOBYTE(dxl.DXL_LOWORD(value)),
]
elif bytes == 2:
data = [
DXL_LOBYTE(DXL_LOWORD(value)),
DXL_HIBYTE(DXL_LOWORD(value)),
dxl.DXL_LOBYTE(dxl.DXL_LOWORD(value)),
dxl.DXL_HIBYTE(dxl.DXL_LOWORD(value)),
]
elif bytes == 4:
data = [
DXL_LOBYTE(DXL_LOWORD(value)),
DXL_HIBYTE(DXL_LOWORD(value)),
DXL_LOBYTE(DXL_HIWORD(value)),
DXL_HIBYTE(DXL_HIWORD(value)),
dxl.DXL_LOBYTE(dxl.DXL_LOWORD(value)),
dxl.DXL_HIBYTE(dxl.DXL_LOWORD(value)),
dxl.DXL_LOBYTE(dxl.DXL_HIWORD(value)),
dxl.DXL_HIBYTE(dxl.DXL_HIWORD(value)),
]
else:
raise NotImplementedError(
@@ -361,12 +356,12 @@ class DynamixelMotorsBus:
)
if self.mock:
from tests.mock_dynamixel_sdk import PacketHandler, PortHandler
import tests.mock_dynamixel_sdk as dxl
else:
from dynamixel_sdk import PacketHandler, PortHandler
import dynamixel_sdk as dxl
self.port_handler = PortHandler(self.port)
self.packet_handler = PacketHandler(PROTOCOL_VERSION)
self.port_handler = dxl.PortHandler(self.port)
self.packet_handler = dxl.PacketHandler(PROTOCOL_VERSION)
try:
if not self.port_handler.openPort():
@@ -399,12 +394,12 @@ class DynamixelMotorsBus:
def reconnect(self):
if self.mock:
from tests.mock_dynamixel_sdk import PacketHandler, PortHandler
import tests.mock_dynamixel_sdk as dxl
else:
from dynamixel_sdk import PacketHandler, PortHandler
import dynamixel_sdk as dxl
self.port_handler = PortHandler(self.port)
self.packet_handler = PacketHandler(PROTOCOL_VERSION)
self.port_handler = dxl.PortHandler(self.port)
self.packet_handler = dxl.PacketHandler(PROTOCOL_VERSION)
if not self.port_handler.openPort():
raise OSError(f"Failed to open port '{self.port}'.")
@@ -795,9 +790,9 @@ class DynamixelMotorsBus:
def _read_with_motor_ids(self, motor_models, motor_ids, data_name):
if self.mock:
from tests.mock_dynamixel_sdk import COMM_SUCCESS, GroupSyncRead
import tests.mock_dynamixel_sdk as dxl
else:
from dynamixel_sdk import COMM_SUCCESS, GroupSyncRead
import dynamixel_sdk as dxl
return_list = True
if not isinstance(motor_ids, list):
@@ -806,12 +801,12 @@ class DynamixelMotorsBus:
assert_same_address(self.model_ctrl_table, self.motor_models, data_name)
addr, bytes = self.model_ctrl_table[motor_models[0]][data_name]
group = GroupSyncRead(self.port_handler, self.packet_handler, addr, bytes)
group = dxl.GroupSyncRead(self.port_handler, self.packet_handler, addr, bytes)
for idx in motor_ids:
group.addParam(idx)
comm = group.txRxPacket()
if comm != COMM_SUCCESS:
if comm != dxl.COMM_SUCCESS:
raise ConnectionError(
f"Read failed due to communication error on port {self.port_handler.port_name} for indices {motor_ids}: "
f"{self.packet_handler.getTxRxResult(comm)}"
@@ -836,9 +831,9 @@ class DynamixelMotorsBus:
start_time = time.perf_counter()
if self.mock:
from tests.mock_dynamixel_sdk import COMM_SUCCESS, GroupSyncRead
import tests.mock_dynamixel_sdk as dxl
else:
from dynamixel_sdk import COMM_SUCCESS, GroupSyncRead
import dynamixel_sdk as dxl
if motor_names is None:
motor_names = self.motor_names
@@ -859,16 +854,18 @@ class DynamixelMotorsBus:
if data_name not in self.group_readers:
# create new group reader
self.group_readers[group_key] = GroupSyncRead(self.port_handler, self.packet_handler, addr, bytes)
self.group_readers[group_key] = dxl.GroupSyncRead(
self.port_handler, self.packet_handler, addr, bytes
)
for idx in motor_ids:
self.group_readers[group_key].addParam(idx)
for _ in range(NUM_READ_RETRY):
comm = self.group_readers[group_key].txRxPacket()
if comm == COMM_SUCCESS:
if comm == dxl.COMM_SUCCESS:
break
if comm != COMM_SUCCESS:
if comm != dxl.COMM_SUCCESS:
raise ConnectionError(
f"Read failed due to communication error on port {self.port} for group_key {group_key}: "
f"{self.packet_handler.getTxRxResult(comm)}"
@@ -900,9 +897,9 @@ class DynamixelMotorsBus:
def _write_with_motor_ids(self, motor_models, motor_ids, data_name, values):
if self.mock:
from tests.mock_dynamixel_sdk import COMM_SUCCESS, GroupSyncWrite
import tests.mock_dynamixel_sdk as dxl
else:
from dynamixel_sdk import COMM_SUCCESS, GroupSyncWrite
import dynamixel_sdk as dxl
if not isinstance(motor_ids, list):
motor_ids = [motor_ids]
@@ -911,13 +908,13 @@ class DynamixelMotorsBus:
assert_same_address(self.model_ctrl_table, motor_models, data_name)
addr, bytes = self.model_ctrl_table[motor_models[0]][data_name]
group = GroupSyncWrite(self.port_handler, self.packet_handler, addr, bytes)
group = dxl.GroupSyncWrite(self.port_handler, self.packet_handler, addr, bytes)
for idx, value in zip(motor_ids, values, strict=True):
data = convert_to_bytes(value, bytes, self.mock)
group.addParam(idx, data)
comm = group.txPacket()
if comm != COMM_SUCCESS:
if comm != dxl.COMM_SUCCESS:
raise ConnectionError(
f"Write failed due to communication error on port {self.port_handler.port_name} for indices {motor_ids}: "
f"{self.packet_handler.getTxRxResult(comm)}"
@@ -932,9 +929,9 @@ class DynamixelMotorsBus:
start_time = time.perf_counter()
if self.mock:
from tests.mock_dynamixel_sdk import COMM_SUCCESS, GroupSyncWrite
import tests.mock_dynamixel_sdk as dxl
else:
from dynamixel_sdk import COMM_SUCCESS, GroupSyncWrite
import dynamixel_sdk as dxl
if motor_names is None:
motor_names = self.motor_names
@@ -965,7 +962,7 @@ class DynamixelMotorsBus:
init_group = data_name not in self.group_readers
if init_group:
self.group_writers[group_key] = GroupSyncWrite(
self.group_writers[group_key] = dxl.GroupSyncWrite(
self.port_handler, self.packet_handler, addr, bytes
)
@@ -977,7 +974,7 @@ class DynamixelMotorsBus:
self.group_writers[group_key].changeParam(idx, data)
comm = self.group_writers[group_key].txPacket()
if comm != COMM_SUCCESS:
if comm != dxl.COMM_SUCCESS:
raise ConnectionError(
f"Write failed due to communication error on port {self.port} for group_key {group_key}: "
f"{self.packet_handler.getTxRxResult(comm)}"