still mant bugs

This commit is contained in:
2025-07-02 11:42:03 +08:00
parent ef45ea9649
commit 96804bc86c
8 changed files with 948 additions and 20 deletions

View File

@@ -47,4 +47,16 @@ class RealmanMotorsBusConfig(MotorsBusConfig):
ip: str
port: int
motors: dict[str, tuple[int, str]]
init_joint: dict[str, list]
init_joint: dict[str, list]
@MotorsBusConfig.register_subclass("realman_dual")
@dataclass
class RealmanDualMotorsBusConfig(MotorsBusConfig):
left_ip: str
right_ip: str
left_port: int
right_port: int
motors: dict[str, tuple[int, str]]
init_joint: dict[str, list]
axis: dict[str, int]

View File

@@ -0,0 +1,187 @@
import time
from typing import Dict
from lerobot.common.robot_devices.motors.configs import RealmanDualMotorsBusConfig
from Robotic_Arm.rm_robot_interface import *
class RealmanDualMotorsBus:
"""
对Realman SDK的二次封装
"""
def __init__(self,
config: RealmanDualMotorsBusConfig):
self.left_rmarm = RoboticArm(rm_thread_mode_e.RM_TRIPLE_MODE_E)
self.right_rmarm = RoboticArm(rm_thread_mode_e.RM_TRIPLE_MODE_E)
self.handle_left = self.left_rmarm.rm_create_robot_arm(config.left_ip, config.left_port)
self.handle_right = self.right_rmarm.rm_create_robot_arm(config.right_ip, config.right_port)
self.motors = config.motors
self.axis = config.axis
self.joint_count = self.axis['left_joint']+self.axis['left_gripper'] + self.axis['right_joint'] + self.axis['right_gripper']
self.init_joint_position = config.init_joint['joint'] # [6 joints + 1 gripper + 6 joints + 1 gripper]
self.safe_disable_position = config.init_joint['joint']
self.left_offset = self.axis['left_joint']
self.left_rmarm.rm_movej(self.init_joint_position[:self.left_offset], 5, 0, 0, 1)
self.right_rmarm.rm_movej(self.init_joint_position[self.left_offset+1:-1], 5, 0, 0, 1)
time.sleep(3)
left_ret = self.left_rmarm.rm_get_current_arm_state()
right_ret = self.right_rmarm.rm_get_current_arm_state()
self.init_pose = left_ret[1]['pose'] + right_ret[1]['pose']
@property
def motor_names(self) -> list[str]:
return list(self.motors.keys())
@property
def motor_models(self) -> list[str]:
return [model for _, model in self.motors.values()]
@property
def motor_indices(self) -> list[int]:
return [idx for idx, _ in self.motors.values()]
def connect(self, enable=True) -> bool:
'''
使能机械臂并检测使能状态,尝试5s,如果使能超时则退出程序
'''
enable_flag = False
loop_flag = False
# 设置超时时间(秒)
timeout = 5
# 记录进入循环前的时间
start_time = time.time()
elapsed_time_flag = False
while not loop_flag:
elapsed_time = time.time() - start_time
print("--------------------")
if enable:
# 获取机械臂状态
left_ret = self.left_rmarm.rm_get_current_arm_state()
right_ret = self.right_rmarm.rm_get_current_arm_state()
if left_ret[0] == 0 and right_ret[0] == 0: # 成功获取状态
enable_flag = True
else:
enable_flag = False
# 断开所有连接,销毁线程
RoboticArm.rm_destory()
print("使能状态:", enable_flag)
print("--------------------")
if(enable_flag == enable):
loop_flag = True
enable_flag = True
else:
loop_flag = False
enable_flag = False
# 检查是否超过超时时间
if elapsed_time > timeout:
print("超时....")
elapsed_time_flag = True
enable_flag = True
break
time.sleep(1)
resp = enable_flag
print(f"Returning response: {resp}")
return resp
def motor_names(self):
return
def set_calibration(self):
return
def revert_calibration(self):
return
def apply_calibration(self):
"""
移动到初始位置
"""
self.write(target_joint=self.init_joint_position)
def write(self, target_joint: list):
assert len(target_joint) == self.joint_count, "len of target joint is not equal the count of joint"
self.left_rmarm.rm_movej_follow(target_joint[:self.left_offset])
self.left_rmarm.rm_set_gripper_position(target_joint[self.left_offset], block=False, timeout=2)
self.right_rmarm.rm_movej_follow(target_joint[self.left_offset+1:-1])
self.right_rmarm.rm_set_gripper_position(target_joint[-1], block=False, timeout=2)
def write_endpose(self, target_endpose: list):
assert target_endpose == 12, "the length of target pose is not equal 12"
self.left_rmarm.rm_movej_p(target_endpose[:6], 50, 0, 0, 1)
self.right_rmarm.rm_movej_p(target_endpose[6:], 50, 0, 0, 1)
def write_left_joint_slow(self, left_joint: list):
assert len(left_joint) == self.left_offset, "len of left master joint is not equal the count of left joint"
self.left_rmarm.rm_movej(left_joint, 5, 0, 0, 0)
# self.left_rmarm.rm_movej_follow(left_joint)
def write_right_joint_slow(self, right_joint: list):
assert len(right_joint) == self.left_offset, "len of right master joint is not equal the count of right joint"
self.right_rmarm.rm_movej(right_joint, 5, 0, 0, 0)
# self.right_rmarm.rm_movej_follow(right_joint)
def write_left_joint_canfd(self, left_joint: list):
assert len(left_joint) == self.left_offset, "len of left master joint is not equal the count of left joint"
self.left_rmarm.rm_movej_canfd(left_joint, False)
def write_right_joint_canfd(self, right_joint: list):
assert len(right_joint) == self.left_offset, "len of right master joint is not equal the count of right joint"
self.right_rmarm.rm_movej_canfd(right_joint, False)
def write_endpose_canfd(self, target_endpose: list):
assert target_endpose == 12, "the length of target pose is not equal 12"
self.left_rmarm.rm_movep_canfd(target_endpose[:6], False)
self.right_rmarm.rm_movep_canfd(target_endpose[6:], False)
def write_dual_gripper(self, left_gripper: int, right_gripper: int):
self.left_rmarm.rm_set_gripper_position(left_gripper, False, 2)
self.right_rmarm.rm_set_gripper_position(right_gripper, False, 2)
def read(self) -> Dict:
"""
- 机械臂关节消息,单位1度;[-1, 1]
- 机械臂夹爪消息,[-1, 1]
"""
left_joint_msg = self.left_rmarm.rm_get_current_arm_state()[1]
left_joint_state = left_joint_msg['joint']
right_joint_msg = self.right_rmarm.rm_get_current_arm_state()[1]
right_joint_state = right_joint_msg['joint']
left_gripper_msg = self.left_rmarm.rm_get_gripper_state()[1]
left_gripper_state = left_gripper_msg['actpos']
right_gripper_msg = self.right_rmarm.rm_get_gripper_state()[1]
right_gripper_state = right_gripper_msg['actpos']
return {
"left_joint_1": left_joint_state[0]/180,
"left_joint_2": left_joint_state[1]/180,
"left_joint_3": left_joint_state[2]/180,
"left_joint_4": left_joint_state[3]/180,
"left_joint_5": left_joint_state[4]/180,
"left_joint_6": left_joint_state[5]/180,
"left_gripper": (left_gripper_state-500)/500,
"right_joint_1": right_joint_state[0]/180,
"right_joint_2": right_joint_state[1]/180,
"right_joint_3": right_joint_state[2]/180,
"right_joint_4": right_joint_state[3]/180,
"right_joint_5": right_joint_state[4]/180,
"right_joint_6": right_joint_state[5]/180,
"right_gripper": (right_gripper_state-500)/500
}
def read_current_arm_joint_state(self):
return self.left_rmarm.rm_get_current_arm_state()[1]['joint'] + self.right_rmarm.rm_get_current_arm_state()[1]['joint']
def read_current_arm_endpose_state(self):
return self.left_rmarm.rm_get_current_arm_state()[1]['pose'] + self.right_rmarm.rm_get_current_arm_state()[1]['pose']
def safe_disconnect(self):
"""
Move to safe disconnect position
"""
self.write(target_joint=self.safe_disable_position)
# 断开所有连接,销毁线程
RoboticArm.rm_destory()

View File

@@ -49,6 +49,10 @@ def make_motors_buses_from_configs(motors_bus_configs: dict[str, MotorsBusConfig
motors_buses[key] = RealmanMotorsBus(cfg)
elif cfg.type == "realman_dual":
from lerobot.common.robot_devices.motors.realman_dual import RealmanDualMotorsBus
motors_buses[key] = RealmanDualMotorsBus(cfg)
else:
raise ValueError(f"The motor type '{cfg.type}' is not valid.")

View File

@@ -27,7 +27,8 @@ from lerobot.common.robot_devices.motors.configs import (
DynamixelMotorsBusConfig,
FeetechMotorsBusConfig,
MotorsBusConfig,
RealmanMotorsBusConfig
RealmanMotorsBusConfig,
RealmanDualMotorsBusConfig
)
@@ -745,21 +746,76 @@ class RealmanRobotConfig(RobotConfig):
}
)
# right_follower_arm: dict[str, MotorsBusConfig] = field(
# default_factory=lambda: {
# "main": RealmanMotorsBusConfig(
# ip = "192.168.3.19",
# port = 8080,
# motors={
# # name: (index, model)
# "joint_1": [1, "realman"],
# "joint_2": [2, "realman"],
# "joint_3": [3, "realman"],
# "joint_4": [4, "realman"],
# "joint_5": [5, "realman"],
# "joint_6": [6, "realman"],
# "gripper": (7, "realman"),
# },
# )
# }
# )
@RobotConfig.register_subclass("realman_dual")
@dataclass
class RealmanDualRobotConfig(RobotConfig):
inference_time: bool = False
max_gripper: int = 990
min_gripper: int = 10
servo_config_file: str = "/home/maic/LYT/lerobot/lerobot/common/robot_devices/teleop/servo_dual.yaml"
follower_arm: dict[str, MotorsBusConfig] = field(
default_factory=lambda: {
"main": RealmanDualMotorsBusConfig(
axis= {'left_joint': 6, 'left_gripper': 1, 'right_joint': 6, 'right_gripper': 1},
left_ip = "192.168.3.18",
left_port = 8080,
right_ip = "192.168.3.19",
right_port = 8080,
init_joint = {'joint': [-175, 90, 90, 45, 90, -90, 10, 175, 90, 90, -45, 90, 90, 10]},
motors={
# name: (index, model)
"left_joint_1": [1, "realman"],
"left_joint_2": [2, "realman"],
"left_joint_3": [3, "realman"],
"left_joint_4": [4, "realman"],
"left_joint_5": [5, "realman"],
"left_joint_6": [6, "realman"],
"left_gripper": [7, "realman"],
"right_joint_1": [8, "realman"],
"right_joint_2": [9, "realman"],
"right_joint_3": [10, "realman"],
"right_joint_4": [11, "realman"],
"right_joint_5": [12, "realman"],
"right_joint_6": [13, "realman"],
"right_gripper": [14, "realman"]
},
)
}
)
cameras: dict[str, CameraConfig] = field(
default_factory=lambda: {
"left": IntelRealSenseCameraConfig(
serial_number="153122077516",
fps=30,
width=640,
height=480,
use_depth=False
),
"right": IntelRealSenseCameraConfig(
serial_number="405622075165",
fps=30,
width=640,
height=480,
use_depth=False
),
"front": IntelRealSenseCameraConfig(
serial_number="145422072751",
fps=30,
width=640,
height=480,
use_depth=False
),
"high": IntelRealSenseCameraConfig(
serial_number="145422072193",
fps=30,
width=640,
height=480,
use_depth=False
),
}
)

View File

@@ -0,0 +1,306 @@
"""
Teleoperation Realman with a PS5 controller and
"""
import time
import torch
import numpy as np
from dataclasses import dataclass, field, replace
from collections import deque
from lerobot.common.robot_devices.teleop.gamepad_dual import HybridController
from lerobot.common.robot_devices.motors.utils import get_motor_names, make_motors_buses_from_configs
from lerobot.common.robot_devices.cameras.utils import make_cameras_from_configs
from lerobot.common.robot_devices.utils import RobotDeviceAlreadyConnectedError, RobotDeviceNotConnectedError
from lerobot.common.robot_devices.robots.configs import RealmanDualRobotConfig
class RealmanDualRobot:
def __init__(self, config: RealmanDualRobotConfig | None = None, **kwargs):
if config is None:
config = RealmanDualRobotConfig()
# Overwrite config arguments using kwargs
self.config = replace(config, **kwargs)
self.robot_type = self.config.type
self.inference_time = self.config.inference_time # if it is inference time
# build cameras
self.cameras = make_cameras_from_configs(self.config.cameras)
# build realman motors
self.piper_motors = make_motors_buses_from_configs(self.config.follower_arm)
self.arm = self.piper_motors['main']
# build init teleop info
self.init_info = {
'init_joint': self.arm.init_joint_position,
'init_pose': self.arm.init_pose,
'max_gripper': config.max_gripper,
'min_gripper': config.min_gripper,
'servo_config_file': config.servo_config_file
}
# build state-action cache
self.joint_queue = deque(maxlen=2)
self.last_endpose = self.arm.init_pose
# build gamepad teleop
if not self.inference_time:
self.teleop = HybridController(self.init_info)
else:
self.teleop = None
self.logs = {}
self.is_connected = False
@property
def camera_features(self) -> dict:
cam_ft = {}
for cam_key, cam in self.cameras.items():
key = f"observation.images.{cam_key}"
cam_ft[key] = {
"shape": (cam.height, cam.width, cam.channels),
"names": ["height", "width", "channels"],
"info": None,
}
return cam_ft
@property
def motor_features(self) -> dict:
action_names = get_motor_names(self.piper_motors)
state_names = get_motor_names(self.piper_motors)
return {
"action": {
"dtype": "float32",
"shape": (len(action_names),),
"names": action_names,
},
"observation.state": {
"dtype": "float32",
"shape": (len(state_names),),
"names": state_names,
},
}
@property
def has_camera(self):
return len(self.cameras) > 0
@property
def num_cameras(self):
return len(self.cameras)
def connect(self) -> None:
"""Connect RealmanArm and cameras"""
if self.is_connected:
raise RobotDeviceAlreadyConnectedError(
"RealmanArm is already connected. Do not run `robot.connect()` twice."
)
# connect RealmanArm
self.arm.connect(enable=True)
print("RealmanArm conneted")
# connect cameras
for name in self.cameras:
self.cameras[name].connect()
self.is_connected = self.is_connected and self.cameras[name].is_connected
print(f"camera {name} conneted")
print("All connected")
self.is_connected = True
self.run_calibration()
def disconnect(self) -> None:
"""move to home position, disenable piper and cameras"""
# move piper to home position, disable
if not self.inference_time:
self.teleop.stop()
# disconnect piper
self.arm.safe_disconnect()
print("RealmanArm disable after 5 seconds")
time.sleep(5)
self.arm.connect(enable=False)
# disconnect cameras
if len(self.cameras) > 0:
for cam in self.cameras.values():
cam.disconnect()
self.is_connected = False
def run_calibration(self):
"""move piper to the home position"""
if not self.is_connected:
raise ConnectionError()
self.arm.apply_calibration()
if not self.inference_time:
self.teleop.reset()
def teleop_step(
self, record_data=False
) -> None | tuple[dict[str, torch.Tensor], dict[str, torch.Tensor]]:
if not self.is_connected:
raise ConnectionError()
if self.teleop is None and self.inference_time:
self.teleop = HybridController(self.init_info)
# read target pose state as
before_read_t = time.perf_counter()
state = self.arm.read() # read current joint position from robot
action = self.teleop.get_action() # target joint position and pose end pos from gamepad
self.logs["read_pos_dt_s"] = time.perf_counter() - before_read_t
if action['control_mode'] == 'joint':
# 关节控制模式(主模式)
# do action
before_write_t = time.perf_counter()
left_joint = action['master_joint_actions'][:self.arm.left_offset]
# left_joint = [v*180 for v in action['master_joint_actions'][:self.arm.left_offset]]
left_gripper = int(action['master_joint_actions'][self.arm.left_offset] * 1000)
right_joint = action['master_joint_actions'][self.arm.left_offset+1:-1]
# right_joint = [v*180 for v in action['master_joint_actions'][self.arm.left_offset+1:-1]]
right_gripper = int(action['master_joint_actions'][-1] * 1000)
self.arm.write_dual_gripper(left_gripper, right_gripper)
# print(left_gripper, right_gripper)
if action['master_controller_status']['left']['infrared'] == 1:
if action['master_controller_status']['left']['button'] == 1:
self.arm.write_left_joint_canfd(left_joint)
else:
self.arm.write_left_joint_slow(left_joint)
print(action['master_controller_status']['left'], action['master_controller_status']['right'])
print('left', left_joint)
print(state)
if action['master_controller_status']['right']['infrared'] == 1:
if action['master_controller_status']['right']['button'] == 1:
self.arm.write_right_joint_canfd(right_joint)
print('right', left_joint)
else:
self.arm.write_right_joint_slow(right_joint)
print('right', left_joint)
self.joint_queue.append(list(self.arm.read().values()))
self.logs["write_pos_dt_s"] = time.perf_counter() - before_write_t
# else:
# target_pose = list(action['end_pose'])
# # do action
# before_write_t = time.perf_counter()
# if self.last_endpose != target_pose:
# self.arm.write_endpose_canfd(target_pose)
# self.last_endpose = target_pose
# self.arm.write_gripper(action['gripper'])
# target_joints = self.arm.read_current_arm_joint_state()
# self.joint_queue.append(list(self.arm.read().values()))
# self.teleop.update_joint_state(target_joints)
# self.logs["write_pos_dt_s"] = time.perf_counter() - before_write_t
if not record_data:
return
state = torch.as_tensor(list(self.joint_queue[0]), dtype=torch.float32)
action = torch.as_tensor(list(self.joint_queue[-1]), dtype=torch.float32)
# Capture images from cameras
images = {}
for name in self.cameras:
before_camread_t = time.perf_counter()
images[name] = self.cameras[name].async_read()
images[name] = torch.from_numpy(images[name])
self.logs[f"read_camera_{name}_dt_s"] = self.cameras[name].logs["delta_timestamp_s"]
self.logs[f"async_read_camera_{name}_dt_s"] = time.perf_counter() - before_camread_t
# Populate output dictionnaries
obs_dict, action_dict = {}, {}
obs_dict["observation.state"] = state
action_dict["action"] = action
for name in self.cameras:
obs_dict[f"observation.images.{name}"] = images[name]
return obs_dict, action_dict
def send_action(self, action: torch.Tensor) -> torch.Tensor:
"""Write the predicted actions from policy to the motors"""
if not self.is_connected:
raise RobotDeviceNotConnectedError(
"Piper is not connected. You need to run `robot.connect()`."
)
# send to motors, torch to list
target_joints = action.tolist()
len_joint = len(target_joints) - 1
target_joints = [target_joints[i]*180 for i in range(len_joint)] + [target_joints[-1]]
target_joints[-1] = int(target_joints[-1]*500 + 500)
self.arm.write(target_joints)
return action
def capture_observation(self) -> dict:
"""capture current images and joint positions"""
if not self.is_connected:
raise RobotDeviceNotConnectedError(
"Piper is not connected. You need to run `robot.connect()`."
)
# read current joint positions
before_read_t = time.perf_counter()
state = self.arm.read() # 6 joints + 1 gripper
self.logs["read_pos_dt_s"] = time.perf_counter() - before_read_t
state = torch.as_tensor(list(state.values()), dtype=torch.float32)
# read images from cameras
images = {}
for name in self.cameras:
before_camread_t = time.perf_counter()
images[name] = self.cameras[name].async_read()
images[name] = torch.from_numpy(images[name])
self.logs[f"read_camera_{name}_dt_s"] = self.cameras[name].logs["delta_timestamp_s"]
self.logs[f"async_read_camera_{name}_dt_s"] = time.perf_counter() - before_camread_t
# Populate output dictionnaries and format to pytorch
obs_dict = {}
obs_dict["observation.state"] = state
for name in self.cameras:
obs_dict[f"observation.images.{name}"] = images[name]
return obs_dict
def teleop_safety_stop(self):
""" move to home position after record one episode """
self.run_calibration()
def __del__(self):
if self.is_connected:
self.disconnect()
if not self.inference_time:
self.teleop.stop()
if __name__ == '__main__':
robot = RealmanDualRobot()
robot.connect()
# robot.run_calibration()
while True:
robot.teleop_step(record_data=True)
# robot.capture_observation()
# dummy_action = torch.Tensor([-0.40586111280653214, 0.5522833506266276, 0.4998166826036241, -0.3539944542778863, -0.524433347913954, 0.9064999898274739, 0.482])
# robot.send_action(dummy_action)
# robot.disconnect()
# print('ok')

View File

@@ -0,0 +1,63 @@
import pygame
import threading
import time
import serial
import binascii
import logging
import yaml
from typing import Dict
from Robotic_Arm.rm_robot_interface import *
from .servo_server import ServoArmServer
class HybridController:
def __init__(self, init_info):
self.init_joint = init_info['init_joint']
self.init_pose = init_info.get('init_pose', [0]*12)
self.max_gripper = init_info['max_gripper']
self.min_gripper = init_info['min_gripper']
self.config_file = init_info['servo_config_file']
self.joint = self.init_joint.copy()
self.pose = self.init_pose.copy()
self.master_dual_arm = ServoArmServer(self.config_file)
self.joint_control_mode = True
def get_action(self) -> Dict:
master_controller_status = {}
master_controller_status['left'] = self.master_dual_arm.get_controller_status(arm='left')
master_controller_status['right'] = self.master_dual_arm.get_controller_status(arm='left')
return {
'control_mode': 'joint' if self.joint_control_mode else 'end_pose',
'master_joint_actions': self.master_dual_arm.get_joint_data()['dual_joint_actions'],
'master_controller_status': master_controller_status,
'end_pose': self.pose
}
def stop(self):
if self.master_dual_arm:
self.master_dual_arm.shutdown()
print("混合控制器已退出")
def reset(self):
pass
# 使用示例
if __name__ == "__main__":
init_info = {
'init_joint': [-175, 90, 90, 45, 90, -90, 10, 175, 90, 90, -45, 90, 90, 10],
'init_pose': [[-0.0305, 0.125938, 0.13153, 3.141, 0.698, -1.57, -0.030486, -0.11487, 0.144707, 3.141, 0.698, 1.57]],
'max_gripper': 990,
'min_gripper': 10,
'servo_config_file': '/home/maic/LYT/lerobot/lerobot/common/robot_devices/teleop/servo_dual.yaml'
}
arm_controller = HybridController(init_info)
time.sleep(1)
try:
while True:
print(arm_controller.get_action())
time.sleep(1)
except KeyboardInterrupt:
arm_controller.stop()

View File

@@ -0,0 +1,6 @@
left_port: /dev/ttyUSB0
right_port: /dev/ttyUSB1
baudrate: 460800
joint_hex_data: "55 AA 02 00 00 67"
control_hex_data: "55 AA 08 00 00 B9"
arm_axis: 6

View File

@@ -0,0 +1,294 @@
import threading
import time
import serial
import binascii
import logging
import yaml
from typing import Dict
# logging.basicConfig(
# level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
# )
class ServoArmServer:
def __init__(self, config_file="servo_dual.yaml"):
"""初始化服务器,创建左右机械臂实例"""
self.config_file = config_file
self.left_servo_arm = None
self.right_servo_arm = None
self.running = False
self.data_lock = threading.Lock()
self.latest_data = {
'left_joint_actions': {},
'right_joint_actions': {},
'timestamp': time.time()
}
# 初始化机械臂
self._initialize_arms()
# 启动数据采集线程
self._start_data_collection()
def _initialize_arms(self):
"""初始化左右机械臂"""
try:
self.left_servo_arm = ServoArm(self.config_file, "left_port")
logging.info("左master机械臂初始化成功")
except Exception as e:
logging.error(f"左master机械臂初始化失败: {e}")
try:
self.right_servo_arm = ServoArm(self.config_file, "right_port")
logging.info("右master机械臂初始化成功")
except Exception as e:
logging.error(f"右master机械臂初始化失败: {e}")
def _start_data_collection(self):
"""启动数据采集线程"""
self.running = True
self.data_thread = threading.Thread(target=self._data_collection_loop)
self.data_thread.daemon = True
self.data_thread.start()
logging.info("数据采集线程已启动")
def _data_collection_loop(self):
"""数据采集循环"""
while self.running:
try:
left_actions = {}
right_actions = {}
# 获取左机械臂数据
if self.left_servo_arm and self.left_servo_arm.connected:
left_actions = self.left_servo_arm.get_joint_actions()
# 获取右机械臂数据
if self.right_servo_arm and self.right_servo_arm.connected:
right_actions = self.right_servo_arm.get_joint_actions()
if self._check_val_safety(left_actions) == False or self._check_val_safety(right_actions) == False:
continue
# 更新最新数据
with self.data_lock:
self.latest_data = {
'dual_joint_actions': [left_actions[k] for k in left_actions] + [right_actions[k] for k in right_actions],
'left_joint_actions': left_actions,
'right_joint_actions': right_actions,
'timestamp': time.time()
}
time.sleep(0.02) # 50Hz采集频率
except Exception as e:
logging.error(f"数据采集错误: {e}")
time.sleep(0.1)
def _check_val_safety(self, data: dict):
data = [data[k] for k in data]
ret = True
if len(data) != self.left_servo_arm.arm_axis + 1:
ret = False
for v in data:
if v > 180 or v < -180:
ret = False
return ret
# ZeroRPC 服务方法
def get_joint_data(self):
"""获取最新的关节数据"""
with self.data_lock:
return self.latest_data.copy()
def get_left_joint_actions(self):
"""获取左机械臂关节数据"""
with self.data_lock:
return {
'data': self.latest_data['left_joint_actions'],
'timestamp': self.latest_data['timestamp']
}
def get_right_joint_actions(self):
"""获取右机械臂关节数据"""
with self.data_lock:
return {
'data': self.latest_data['right_joint_actions'],
'timestamp': self.latest_data['timestamp']
}
def get_controller_status(self, arm='left'):
"""获取控制器状态"""
try:
if arm == 'left' and self.left_servo_arm:
return self.left_servo_arm.get_controller_status()
elif arm == 'right' and self.right_servo_arm:
return self.right_servo_arm.get_controller_status()
else:
return {'error': f'Invalid arm: {arm}'}
except Exception as e:
logging.error(f"获取控制器状态错误: {e}")
return {'error': str(e)}
def get_connection_status(self):
"""获取连接状态"""
return {
'left_connected': self.left_servo_arm.connected if self.left_servo_arm else False,
'right_connected': self.right_servo_arm.connected if self.right_servo_arm else False,
'server_running': self.running
}
def ping(self):
"""测试连接"""
return "pong"
def shutdown(self):
"""关闭服务器"""
logging.info("正在关闭服务器...")
self.running = False
if self.left_servo_arm:
self.left_servo_arm.close()
if self.right_servo_arm:
self.right_servo_arm.close()
return "Server shutdown"
class ServoArm:
def __init__(self, config_file="config.yaml", port_name="left_port"):
"""初始化机械臂的串口连接并发送初始数据。
Args:
config_file (str): 配置文件的路径。
"""
self.config = self._load_config(config_file)
self.port = self.config[port_name]
self.baudrate = self.config["baudrate"]
self.joint_hex_data = self.config["joint_hex_data"]
self.control_hex_data = self.config["control_hex_data"]
self.arm_axis = self.config.get("arm_axis", 7)
try:
self.serial_conn = serial.Serial(self.port, self.baudrate, timeout=0)
self.bytes_to_send = binascii.unhexlify(self.joint_hex_data.replace(" ", ""))
self.serial_conn.write(self.bytes_to_send)
time.sleep(1)
self.connected = True
logging.info(f"串口连接成功: {self.port}")
except Exception as e:
logging.error(f"串口连接失败: {e}")
self.connected = False
def _load_config(self, config_file):
"""加载配置文件。
Args:
config_file (str): 配置文件的路径。
Returns:
dict: 配置文件内容。
"""
try:
with open(config_file, "r") as file:
config = yaml.safe_load(file)
return config
except Exception as e:
logging.error(f"配置文件加载失败: {e}")
# 返回默认配置
return {
"port": "/dev/ttyUSB0",
"baudrate": 460800,
"joint_hex_data": "55 AA 02 00 00 67",
"control_hex_data": "55 AA 08 00 00 B9",
"arm_axis": 6
}
def _bytes_to_signed_int(self, byte_data):
"""将字节数据转换为有符号整数。
Args:
byte_data (bytes): 字节数据。
Returns:
int: 有符号整数。
"""
return int.from_bytes(byte_data, byteorder="little", signed=True)
def _parse_joint_data(self, hex_received):
"""解析接收到的十六进制数据并提取关节数据。
Args:
hex_received (str): 接收到的十六进制字符串数据。
Returns:
dict: 解析后的关节数据。
"""
logging.debug(f"hex_received: {hex_received}")
joints = {}
for i in range(self.arm_axis):
start = 14 + i * 10
end = start + 8
joint_hex = hex_received[start:end]
joint_byte_data = bytearray.fromhex(joint_hex)
joint_value = self._bytes_to_signed_int(joint_byte_data) / 10000.0
joints[f"joint_{i+1}"] = joint_value #/ 180
grasp_start = 14 + self.arm_axis*10
grasp_hex = hex_received[grasp_start:grasp_start+8]
grasp_byte_data = bytearray.fromhex(grasp_hex)
# 夹爪进行归一化处理
grasp_value = self._bytes_to_signed_int(grasp_byte_data)/1000
joints["grasp"] = grasp_value
return joints
def _parse_controller_data(self, hex_received):
status = {
'infrared': 0,
'button': 0
}
if len(hex_received) == 18:
status['infrared'] = self._bytes_to_signed_int(bytearray.fromhex(hex_received[12:14]))
status['button'] = self._bytes_to_signed_int(bytearray.fromhex(hex_received[14:16]))
# print(infrared)
return status
def get_joint_actions(self):
"""从串口读取数据并解析关节动作。
Returns:
dict: 包含关节数据的字典。
"""
if not self.connected:
return {}
try:
self.serial_conn.write(self.bytes_to_send)
time.sleep(0.025)
bytes_received = self.serial_conn.read(self.serial_conn.inWaiting())
if len(bytes_received) == 0:
return {}
hex_received = binascii.hexlify(bytes_received).decode("utf-8").upper()
actions = self._parse_joint_data(hex_received)
return actions
except Exception as e:
logging.error(f"读取串口数据错误: {e}")
return {}
def get_controller_status(self):
bytes_to_send = binascii.unhexlify(self.control_hex_data.replace(" ", ""))
self.serial_conn.write(bytes_to_send)
time.sleep(0.025)
bytes_received = self.serial_conn.read(self.serial_conn.inWaiting())
hex_received = binascii.hexlify(bytes_received).decode("utf-8").upper()
# print("control status:", hex_received)
status = self._parse_controller_data(hex_received)
return status
def close(self):
"""关闭串口连接"""
if self.connected and hasattr(self, 'serial_conn'):
self.serial_conn.close()
self.connected = False
logging.info("串口连接已关闭")