Compare commits

..

1 Commits

Author SHA1 Message Date
ef45ea9649 single arm refactory
Some checks failed
Secret Leaks / trufflehog (push) Has been cancelled
2025-06-26 21:08:22 +08:00
8 changed files with 45 additions and 312 deletions

View File

@@ -15,6 +15,10 @@ class RealmanMotorsBus:
self.motors = config.motors
self.init_joint_position = config.init_joint['joint'] # [6 joints + 1 gripper]
self.safe_disable_position = config.init_joint['joint']
self.rmarm.rm_movej(self.init_joint_position[:-1], 5, 0, 0, 1)
time.sleep(3)
ret = self.rmarm.rm_get_current_arm_state()
self.init_pose = ret[1]['pose']
@property
def motor_names(self) -> list[str]:
@@ -98,6 +102,18 @@ class RealmanMotorsBus:
self.rmarm.rm_movej_p(target_endpose, 50, 0, 0, 1)
self.rmarm.rm_set_gripper_position(gripper, block=False, timeout=2)
def write_joint_slow(self, target_joint: list):
self.rmarm.rm_movej(target_joint, 5, 0, 0, 0)
def write_joint_canfd(self, target_joint: list):
self.rmarm.rm_movej_canfd(target_joint, False)
def write_endpose_canfd(self, target_pose: list):
self.rmarm.rm_movep_canfd(target_pose, False)
def write_gripper(self, gripper: int):
self.rmarm.rm_set_gripper_position(gripper, False, 2)
def read(self) -> Dict:
"""
- 机械臂关节消息,单位1度;[-1, 1]
@@ -119,6 +135,12 @@ class RealmanMotorsBus:
"gripper": (gripper_state-500)/500
}
def read_current_arm_joint_state(self):
return self.rmarm.rm_get_current_arm_state()[1]['joint']
def read_current_arm_endpose_state(self):
return self.rmarm.rm_get_current_arm_state()[1]['pose']
def safe_disconnect(self):
"""
Move to safe disconnect position

View File

@@ -683,7 +683,6 @@ class RealmanRobotConfig(RobotConfig):
inference_time: bool = False
max_gripper: int = 990
min_gripper: int = 10
axis: int = 6
servo_config_file: str = "/home/maic/LYT/lerobot/lerobot/common/robot_devices/teleop/servo_arm.yaml"
@@ -707,26 +706,6 @@ class RealmanRobotConfig(RobotConfig):
}
)
right_follower_arm: dict[str, MotorsBusConfig] = field(
default_factory=lambda: {
"main": RealmanMotorsBusConfig(
ip = "192.168.3.19",
port = 8080,
motors={
# name: (index, model)
"joint_1": [1, "realman"],
"joint_2": [2, "realman"],
"joint_3": [3, "realman"],
"joint_4": [4, "realman"],
"joint_5": [5, "realman"],
"joint_6": [6, "realman"],
"gripper": [7, "realman"],
},
init_joint = {'joint': [-90, 90, 90, 90, 90, -90, 10]}
)
}
)
cameras: dict[str, CameraConfig] = field(
default_factory=lambda: {
# "one": OpenCVCameraConfig(
@@ -742,13 +721,13 @@ class RealmanRobotConfig(RobotConfig):
height=480,
use_depth=False
),
"right": IntelRealSenseCameraConfig(
serial_number="405622075165",
fps=30,
width=640,
height=480,
use_depth=False
),
# "right": IntelRealSenseCameraConfig(
# serial_number="405622075165",
# fps=30,
# width=640,
# height=480,
# use_depth=False
# ),
"front": IntelRealSenseCameraConfig(
serial_number="145422072751",
fps=30,

View File

@@ -14,45 +14,6 @@ from lerobot.common.robot_devices.utils import RobotDeviceAlreadyConnectedError,
from lerobot.common.robot_devices.robots.configs import RealmanRobotConfig
def smooth_movep_canfd(arm_controller, current_pose, target_pose, fps=100):
"""
可变帧率版本
Args:
fps: 目标帧率,决定每次移动的时长
"""
total_duration = 1.0 / fps
control_dt = 0.002 # 2ms控制周期
num_points = max(1, int(total_duration / control_dt))
print(f"{fps}fps移动: {total_duration*1000:.1f}ms, {num_points}个插值点")
# 生成轨迹
trajectory = []
for i in range(num_points):
t = i / (num_points - 1) if num_points > 1 else 1.0
s = 6 * t**5 - 15 * t**4 + 10 * t**3 # S曲线
pose = []
for axis in range(6):
value = current_pose[axis] + s * (target_pose[axis] - current_pose[axis])
pose.append(value)
trajectory.append(pose)
# 执行
start_time = time.perf_counter()
for i, pose in enumerate(trajectory):
target_time = start_time + i * control_dt
current_time = time.perf_counter()
sleep_time = target_time - current_time
# if sleep_time > 0:
# time.sleep(sleep_time)
arm_controller.rmarm.rm_movep_canfd(pose, True)
return True
class RealmanRobot:
def __init__(self, config: RealmanRobotConfig | None = None, **kwargs):
if config is None:
@@ -68,14 +29,11 @@ class RealmanRobot:
# build realman motors
self.piper_motors = make_motors_buses_from_configs(self.config.left_follower_arm)
self.arm = self.piper_motors['main']
self.arm.rmarm.rm_movej(self.arm.init_joint_position[:-1], 50, 0, 0, 1)
time.sleep(2)
ret = self.arm.rmarm.rm_get_current_arm_state()
init_pose = ret[1]['pose']
# build init teleop info
self.init_info = {
'init_joint': self.arm.init_joint_position,
'init_pose': init_pose,
'init_pose': self.arm.init_pose,
'max_gripper': config.max_gripper,
'min_gripper': config.min_gripper,
'servo_config_file': config.servo_config_file
@@ -83,7 +41,7 @@ class RealmanRobot:
# build state-action cache
self.joint_queue = deque(maxlen=2)
self.last_endpose = init_pose
self.last_endpose = self.arm.init_pose
# build gamepad teleop
if not self.inference_time:
@@ -102,7 +60,7 @@ class RealmanRobot:
cam_ft[key] = {
"shape": (cam.height, cam.width, cam.channels),
"names": ["height", "width", "channels"],
"info": None,
"info": None,
}
return cam_ft
@@ -203,15 +161,17 @@ class RealmanRobot:
if action['control_mode'] == 'joint':
# 关节控制模式(主模式)
ret = self.arm.rmarm.rm_get_current_arm_state()
current_pose = ret[1]['pose']
current_pose = self.arm.read_current_arm_endpose_state()
self.teleop.update_endpose_state(current_pose)
target_joints = action['joint_angles'][:-1]
self.arm.write_gripper(action['gripper'])
print(action['gripper'])
if action['master_controller_status']['infrared'] == 1:
if action['master_controller_status']['button'] == 1:
self.arm.rmarm.rm_movej_canfd(target_joints, False)
self.arm.write_joint_canfd(target_joints)
else:
self.arm.rmarm.rm_movej(target_joints, 5, 0, 0, 0)
self.arm.write_joint_slow(target_joints)
# do action
before_write_t = time.perf_counter()
@@ -219,43 +179,19 @@ class RealmanRobot:
self.logs["write_pos_dt_s"] = time.perf_counter() - before_write_t
else:
# 末端位姿控制模式
# target_pose = [
# action['end_pose']['X'], # X (m)
# action['end_pose']['Y'], # Y (m)
# action['end_pose']['Z'], # Z (m)
# action['end_pose']['RX'], # RX (rad)
# action['end_pose']['RY'], # RY (rad)
# action['end_pose']['RZ'] # RZ (rad)
# ]
target_pose = action['end_pose']
target_pose = list(action['end_pose'])
# do action
before_write_t = time.perf_counter()
# result = self.arm.rmarm.rm_movej_p(target_pose, 100, 0, 0, 0)
if self.last_endpose != target_pose:
# self.arm.rmarm.rm_movep_follow(target_pose)
# self.arm.rmarm.rm_movep_canfd(target_pose, True)
self.arm.rmarm.rm_movep_canfd(target_pose, False)
self.arm.write_endpose_canfd(target_pose)
self.last_endpose = target_pose
self.arm.rmarm.rm_set_gripper_position(action['gripper'], False, 2)
self.arm.write_gripper(action['gripper'])
ret = self.arm.rmarm.rm_get_current_arm_state()
target_joints = ret[1].get('joint', self.arm.init_joint_position)
target_joints = self.arm.read_current_arm_joint_state()
self.joint_queue.append(list(self.arm.read().values()))
self.teleop.update_joint_state(target_joints)
self.logs["write_pos_dt_s"] = time.perf_counter() - before_write_t
# print('-'*80)
# print('mode: ', action['control_mode'])
# print('state: ', list(state.values()))
# print('action: ', target_joints)
# print('cache[0]: ', self.joint_queue[0])
# print('cache[-1]: ', self.joint_queue[-1])
# print('time: ', time.perf_counter() - before_write_t)
# print('-'*80)
# time.sleep(1)
if not record_data:
return

View File

@@ -335,7 +335,7 @@ class HybridController:
# 如果有主臂夹爪数据,使用主臂夹爪状态
if self.use_master_arm and "grasp" in self.master_joint_actions:
self.gripper = self.master_joint_actions["grasp"]# * 1000
self.gripper = self.master_joint_actions["grasp"] * 1000
self.joint[-1] = self.gripper

View File

@@ -39,7 +39,7 @@ bash can_activate.sh can0 1000000
cd ..
python lerobot/scripts/control_robot.py \
--robot.type=realman \
--robot.type=piper \
--robot.inference_time=false \
--control.type=teleoperate
```

View File

@@ -1,58 +0,0 @@
def crc8_maxim(data):
"""
使用CRC-8/MAXIM算法计算CRC8校验值
多项式X8+X5+X4+1=0x31
Args:
data: 要校验的字节数据
Returns:
计算出的CRC8校验值
"""
# CRC8表与您提供的表相同
CRC8TAB = [
0x00, 0x31, 0x62, 0x53, 0xC4, 0xF5, 0xA6, 0x97,
0xB9, 0x88, 0xDB, 0xEA, 0x7D, 0x4C, 0x1F, 0x2E,
0x43, 0x72, 0x21, 0x10, 0x87, 0xB6, 0xE5, 0xD4,
0xFA, 0xCB, 0x98, 0xA9, 0x3E, 0x0F, 0x5C, 0x6D,
0x86, 0xB7, 0xE4, 0xD5, 0x42, 0x73, 0x20, 0x11,
0x3F, 0x0E, 0x5D, 0x6C, 0xFB, 0xCA, 0x99, 0xA8,
0xC5, 0xF4, 0xA7, 0x96, 0x01, 0x30, 0x63, 0x52,
0x7C, 0x4D, 0x1E, 0x2F, 0xB8, 0x89, 0xDA, 0xEB,
0x3D, 0x0C, 0x5F, 0x6E, 0xF9, 0xC8, 0x9B, 0xAA,
0x84, 0xB5, 0xE6, 0xD7, 0x40, 0x71, 0x22, 0x13,
0x7E, 0x4F, 0x1C, 0x2D, 0xBA, 0x8B, 0xD8, 0xE9,
0xC7, 0xF6, 0xA5, 0x94, 0x03, 0x32, 0x61, 0x50,
0xBB, 0x8A, 0xD9, 0xE8, 0x7F, 0x4E, 0x1D, 0x2C,
0x02, 0x33, 0x60, 0x51, 0xC6, 0xF7, 0xA4, 0x95,
0xF8, 0xC9, 0x9A, 0xAB, 0x3C, 0x0D, 0x5E, 0x6F,
0x41, 0x70, 0x23, 0x12, 0x85, 0xB4, 0xE7, 0xD6,
0x7A, 0x4B, 0x18, 0x29, 0xBE, 0x8F, 0xDC, 0xED,
0xC3, 0xF2, 0xA1, 0x90, 0x07, 0x36, 0x65, 0x54,
0x39, 0x08, 0x5B, 0x6A, 0xFD, 0xCC, 0x9F, 0xAE,
0x80, 0xB1, 0xE2, 0xD3, 0x44, 0x75, 0x26, 0x17,
0xFC, 0xCD, 0x9E, 0xAF, 0x38, 0x09, 0x5A, 0x6B,
0x45, 0x74, 0x27, 0x16, 0x81, 0xB0, 0xE3, 0xD2,
0xBF, 0x8E, 0xDD, 0xEC, 0x7B, 0x4A, 0x19, 0x28,
0x06, 0x37, 0x64, 0x55, 0xC2, 0xF3, 0xA0, 0x91,
0x47, 0x76, 0x25, 0x14, 0x83, 0xB2, 0xE1, 0xD0,
0xFE, 0xCF, 0x9C, 0xAD, 0x3A, 0x0B, 0x58, 0x69,
0x04, 0x35, 0x66, 0x57, 0xC0, 0xF1, 0xA2, 0x93,
0xBD, 0x8C, 0xDF, 0xFE, 0x79, 0x48, 0x1B, 0x2A,
0xC1, 0xF0, 0xA3, 0x92, 0x05, 0x34, 0x67, 0x56,
0x78, 0x49, 0x1A, 0x2B, 0xBC, 0x8D, 0xDE, 0xEF,
0x82, 0xB3, 0xE0, 0xD1, 0x46, 0x77, 0x24, 0x15,
0x3B, 0x0A, 0x59, 0x68, 0xFF, 0xCE, 0x9D, 0xAC
]
crc = 0
for byte in data:
crc = CRC8TAB[crc ^ byte]
return crc
# 数据帧55 AA 02 00 00 67
data = bytes([0x55, 0xAA, 0x08, 0x00, 0x02])
crc = crc8_maxim(data)
print(f"计算出的CRC8校验值: 0x{crc:02X}")

View File

@@ -1,6 +0,0 @@
port: /dev/ttyUSB0
right_port: /dev/ttyUSB1
baudrate: 460800
joint_hex_data: "55 AA 02 00 00 67"
control_hex_data: "55 AA 08 00 00 B9"
arm_axis: 6

View File

@@ -1,140 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import time
import yaml
import serial
import logging
import binascii
import numpy as np
# 配置日志记录
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
class ServoArm:
def __init__(self, config_file="config.yaml"):
"""初始化机械臂的串口连接并发送初始数据。
Args:
config_file (str): 配置文件的路径。
"""
self.config = self._load_config(config_file)
self.port = self.config["port"]
self.baudrate = self.config["baudrate"]
self.joint_hex_data = self.config["joint_hex_data"]
self.control_hex_data = self.config["control_hex_data"]
self.arm_axis = self.config.get("arm_axis", 7)
self.serial_conn = serial.Serial(self.port, self.baudrate, timeout=0)
self.bytes_to_send = binascii.unhexlify(self.joint_hex_data.replace(" ", ""))
self.serial_conn.write(self.bytes_to_send)
time.sleep(1)
def _load_config(self, config_file):
"""加载配置文件。
Args:
config_file (str): 配置文件的路径。
Returns:
dict: 配置文件内容。
"""
with open(config_file, "r") as file:
config = yaml.safe_load(file)
return config
def _bytes_to_signed_int(self, byte_data):
"""将字节数据转换为有符号整数。
Args:
byte_data (bytes): 字节数据。
Returns:
int: 有符号整数。
"""
return int.from_bytes(byte_data, byteorder="little", signed=True)
def _parse_joint_data(self, hex_received):
"""解析接收到的十六进制数据并提取关节数据。
Args:
hex_received (str): 接收到的十六进制字符串数据。
Returns:
dict: 解析后的关节数据。
"""
logging.debug(f"hex_received: {hex_received}")
joints = {}
for i in range(self.arm_axis):
start = 14 + i * 10
end = start + 8
joint_hex = hex_received[start:end]
joint_byte_data = bytearray.fromhex(joint_hex)
joint_value = self._bytes_to_signed_int(joint_byte_data) / 10000.0
joints[f"joint_{i+1}"] = joint_value
grasp_start = 14 + self.arm_axis*10
grasp_hex = hex_received[grasp_start:grasp_start+8]
grasp_byte_data = bytearray.fromhex(grasp_hex)
# 夹爪进行归一化处理
grasp_value = self._bytes_to_signed_int(grasp_byte_data)/1000
# print(grasp_value)
joints["grasp"] = grasp_value
return joints
def _parse_controller_data(self, hex_received):
status = {
'infrared': 0,
'button': 0
}
if len(hex_received) == 18:
status['infrared'] = self._bytes_to_signed_int(bytearray.fromhex(hex_received[12:14]))
status['button'] = self._bytes_to_signed_int(bytearray.fromhex(hex_received[14:16]))
# print(infrared)
return status
def get_joint_actions(self):
"""从串口读取数据并解析关节动作。
Returns:
dict: 包含关节数据的字典。
"""
self.serial_conn.write(self.bytes_to_send)
time.sleep(0.02)
bytes_received = self.serial_conn.read(self.serial_conn.inWaiting())
hex_received = binascii.hexlify(bytes_received).decode("utf-8").upper()
# print("joints:", hex_received)
actions = self._parse_joint_data(hex_received)
return actions
def get_controller_status(self):
bytes_to_send = binascii.unhexlify(self.control_hex_data.replace(" ", ""))
self.serial_conn.write(bytes_to_send)
time.sleep(0.02)
bytes_received = self.serial_conn.read(self.serial_conn.inWaiting())
hex_received = binascii.hexlify(bytes_received).decode("utf-8").upper()
# print("control status:", hex_received)
status = self._parse_controller_data(hex_received)
return status
def set_gripper_action(self, action):
"""设置夹爪动作。
Args:
action (int): 夹爪动作值。
"""
action = int(action * 1000)
action_bytes = action.to_bytes(4, byteorder="little", signed=True)
self.bytes_to_send = self.bytes_to_send[:74] + action_bytes + self.bytes_to_send[78:]
if __name__ == "__main__":
servo_arm = ServoArm("/home/maic/LYT/lerobot/realman_src/servo_arm.yaml")
while True:
joint_actions = servo_arm.get_joint_actions()
logging.info(joint_actions)
# time.sleep(0.02)
status = servo_arm.get_controller_status()
logging.info(status)
time.sleep(0.5)