dual arm fisrt complete with master arm
Some checks failed
Secret Leaks / trufflehog (push) Failing after 1m30s

This commit is contained in:
2025-07-03 19:44:10 +08:00
parent 96804bc86c
commit b04e6e0c7b
6 changed files with 517 additions and 260 deletions

View File

@@ -1,31 +1,117 @@
import time
import threading
from typing import Dict
from dataclasses import dataclass
from contextlib import contextmanager
from lerobot.common.robot_devices.motors.configs import RealmanDualMotorsBusConfig
from Robotic_Arm.rm_robot_interface import *
def compare_joint_difference(master_joints, follow_joints, threshold=30.0):
"""
比较主臂和从臂关节数据的差异
Args:
master_joints (list): 主臂关节数据 [joint1, joint2, ..., joint6]
follow_joints (list): 从臂关节数据 [joint1, joint2, ..., joint6]
threshold (float): 差异阈值默认5度
Returns:
bool: True表示差异在阈值内False表示超过阈值
"""
# 检查数据长度
if len(master_joints) != len(follow_joints):
return False
# 计算每个关节的绝对差异
for i in range(len(master_joints)):
diff = abs(master_joints[i] - follow_joints[i])
if diff > threshold:
return False
return True
@dataclass
class ArmState:
"""机械臂状态数据类"""
joint_positions: list
gripper_position: int
pose: list
class RealmanDualMotorsBus:
"""
对Realman SDK的二次封装
"""
def __init__(self,
config: RealmanDualMotorsBusConfig):
def __init__(self, config: RealmanDualMotorsBusConfig):
self.config = config
self._initialize_arms()
self._initialize_parameters()
self._initialize_positions()
self._initialize_threading()
def _initialize_arms(self):
"""初始化机械臂连接"""
self.left_rmarm = RoboticArm(rm_thread_mode_e.RM_TRIPLE_MODE_E)
self.right_rmarm = RoboticArm(rm_thread_mode_e.RM_TRIPLE_MODE_E)
self.handle_left = self.left_rmarm.rm_create_robot_arm(config.left_ip, config.left_port)
self.handle_right = self.right_rmarm.rm_create_robot_arm(config.right_ip, config.right_port)
self.motors = config.motors
self.axis = config.axis
self.joint_count = self.axis['left_joint']+self.axis['left_gripper'] + self.axis['right_joint'] + self.axis['right_gripper']
self.init_joint_position = config.init_joint['joint'] # [6 joints + 1 gripper + 6 joints + 1 gripper]
self.safe_disable_position = config.init_joint['joint']
self.handle_left = self.left_rmarm.rm_create_robot_arm(
self.config.left_ip, self.config.left_port
)
self.handle_right = self.right_rmarm.rm_create_robot_arm(
self.config.right_ip, self.config.right_port
)
def _initialize_parameters(self):
"""初始化参数"""
self.motors = self.config.motors
self.axis = self.config.axis
self.joint_count = sum(self.axis.values())
self.left_offset = self.axis['left_joint']
self.left_rmarm.rm_movej(self.init_joint_position[:self.left_offset], 5, 0, 0, 1)
self.right_rmarm.rm_movej(self.init_joint_position[self.left_offset+1:-1], 5, 0, 0, 1)
def _initialize_positions(self):
"""初始化位置"""
self.init_joint_position = self.config.init_joint['joint']
self.safe_disable_position = self.config.init_joint['joint']
# 移动到初始位置
self._move_to_initial_position()
# 获取初始姿态
time.sleep(3)
left_ret = self.left_rmarm.rm_get_current_arm_state()
right_ret = self.right_rmarm.rm_get_current_arm_state()
self.init_pose = left_ret[1]['pose'] + right_ret[1]['pose']
self.init_pose = self._get_initial_pose()
def _initialize_threading(self):
"""初始化线程控制"""
self.left_slow_busy = False
self.right_slow_busy = False
self._thread_lock = threading.Lock()
# 添加读取相关的线程控制
self._state_cache = {}
self._cache_lock = threading.Lock()
self._keep_reading = True
# 启动后台读取线程
self._start_background_readers()
def _start_background_readers(self):
"""启动后台读取线程"""
# 左臂读取线程
threading.Thread(
target=self._left_read_task,
daemon=True,
name="left_arm_reader"
).start()
# 右臂读取线程
threading.Thread(
target=self._right_read_task,
daemon=True,
name="right_arm_reader"
).start()
@property
def motor_names(self) -> list[str]:
@@ -39,75 +125,115 @@ class RealmanDualMotorsBus:
def motor_indices(self) -> list[int]:
return [idx for idx, _ in self.motors.values()]
def connect(self, enable=True) -> bool:
'''
使能机械臂并检测使能状态,尝试5s,如果使能超时则退出程序
'''
enable_flag = False
loop_flag = False
# 设置超时时间(秒)
timeout = 5
# 记录进入循环前的时间
@contextmanager
def _timeout_context(self, timeout: float = 5.0):
"""超时上下文管理器"""
start_time = time.time()
elapsed_time_flag = False
try:
yield lambda: time.time() - start_time < timeout
except Exception as e:
raise TimeoutError(f"操作超时: {e}")
def _left_read_task(self):
"""左臂后台读取任务 - 模仿_left_slow_task的风格"""
while self._keep_reading:
try:
left_state = self._read_arm_state(self.left_rmarm, "left")
with self._cache_lock:
self._state_cache.update(left_state)
except Exception as e:
print(f"左臂读取失败: {e}")
time.sleep(0.005)
def _right_read_task(self):
"""右臂后台读取任务 - 模仿_right_slow_task的风格"""
while self._keep_reading:
try:
right_state = self._read_arm_state(self.right_rmarm, "right")
with self._cache_lock:
self._state_cache.update(right_state)
except Exception as e:
print(f"右臂读取失败: {e}")
time.sleep(0.005)
def _read_arm_state(self, arm: RoboticArm, prefix: str) -> dict:
"""读取单臂状态 - 保持原有逻辑"""
joint_msg = arm.rm_get_current_arm_state()[1]
gripper_msg = arm.rm_get_gripper_state()[1]
while not loop_flag:
elapsed_time = time.time() - start_time
print("--------------------")
joint_state = joint_msg['joint']
gripper_state = gripper_msg['actpos']
state_dict = {}
for i in range(6):
state_dict[f"{prefix}_joint_{i+1}"] = joint_state[i] / 180
state_dict[f"{prefix}_gripper"] = (gripper_state - 500) / 500
return state_dict
def _move_to_initial_position(self):
"""移动到初始位置"""
left_joints = self.init_joint_position[:self.left_offset]
right_joints = self.init_joint_position[self.left_offset+1:-1]
self.left_rmarm.rm_movej(left_joints, 5, 0, 0, 1)
self.right_rmarm.rm_movej(right_joints, 5, 0, 0, 1)
def _get_initial_pose(self) -> list:
"""获取初始姿态"""
left_ret = self.left_rmarm.rm_get_current_arm_state()
right_ret = self.right_rmarm.rm_get_current_arm_state()
return left_ret[1]['pose'] + right_ret[1]['pose']
def _validate_joint_count(self, joints: list, expected_count: int):
"""验证关节数量"""
if len(joints) != expected_count:
raise ValueError(f"关节数量不匹配: 期望 {expected_count}, 实际 {len(joints)}")
def _extract_joint_state(self, state: dict, arm_prefix: str) -> list:
"""提取关节状态"""
return [state[v] * 180 for v in state if f"{arm_prefix}" in v]
def _execute_slow_movement(self, arm: str, joint_data: list):
"""执行慢速运动"""
busy_flag = f"{arm}_slow_busy"
if not getattr(self, busy_flag):
setattr(self, busy_flag, True)
if enable:
# 获取机械臂状态
left_ret = self.left_rmarm.rm_get_current_arm_state()
right_ret = self.right_rmarm.rm_get_current_arm_state()
if left_ret[0] == 0 and right_ret[0] == 0: # 成功获取状态
enable_flag = True
target_method = getattr(self, f"_{arm}_slow_task")
threading.Thread(
target=target_method,
args=(joint_data.copy(),),
daemon=True
).start()
def _left_slow_task(self, joint_data: list):
"""左臂慢速任务"""
try:
self.write_left_joint_slow(joint_data)
finally:
self.left_slow_busy = False
def _right_slow_task(self, joint_data: list):
"""右臂慢速任务"""
try:
self.write_right_joint_slow(joint_data)
finally:
self.right_slow_busy = False
def _execute_arm_action(self, arm: str, action: dict, master_joint: list, follow_joint: list):
"""执行单臂动作"""
controller_status = action['master_controller_status'][arm]
if controller_status['infrared'] == 1:
if compare_joint_difference(master_joint, follow_joint):
if arm == 'left':
self.write_left_joint_canfd(master_joint)
else:
self.write_right_joint_canfd(master_joint)
else:
enable_flag = False
# 断开所有连接,销毁线程
RoboticArm.rm_destory()
print("使能状态:", enable_flag)
print("--------------------")
if(enable_flag == enable):
loop_flag = True
enable_flag = True
else:
loop_flag = False
enable_flag = False
# 检查是否超过超时时间
if elapsed_time > timeout:
print("超时....")
elapsed_time_flag = True
enable_flag = True
break
time.sleep(1)
self._execute_slow_movement(arm, master_joint)
resp = enable_flag
print(f"Returning response: {resp}")
return resp
def motor_names(self):
return
def set_calibration(self):
return
def revert_calibration(self):
return
def apply_calibration(self):
"""
移动到初始位置
"""
self.write(target_joint=self.init_joint_position)
def write(self, target_joint: list):
assert len(target_joint) == self.joint_count, "len of target joint is not equal the count of joint"
self.left_rmarm.rm_movej_follow(target_joint[:self.left_offset])
self.left_rmarm.rm_set_gripper_position(target_joint[self.left_offset], block=False, timeout=2)
self.right_rmarm.rm_movej_follow(target_joint[self.left_offset+1:-1])
self.right_rmarm.rm_set_gripper_position(target_joint[-1], block=False, timeout=2)
def write_endpose(self, target_endpose: list):
assert target_endpose == 12, "the length of target pose is not equal 12"
self.left_rmarm.rm_movej_p(target_endpose[:6], 50, 0, 0, 1)
@@ -115,18 +241,16 @@ class RealmanDualMotorsBus:
def write_left_joint_slow(self, left_joint: list):
assert len(left_joint) == self.left_offset, "len of left master joint is not equal the count of left joint"
self.left_rmarm.rm_movej(left_joint, 5, 0, 0, 0)
# self.left_rmarm.rm_movej_follow(left_joint)
self.left_rmarm.rm_movej(left_joint, 5, 0, 0, 1)
def write_right_joint_slow(self, right_joint: list):
assert len(right_joint) == self.left_offset, "len of right master joint is not equal the count of right joint"
self.right_rmarm.rm_movej(right_joint, 5, 0, 0, 0)
# self.right_rmarm.rm_movej_follow(right_joint)
self.right_rmarm.rm_movej(right_joint, 5, 0, 0, 1)
def write_left_joint_canfd(self, left_joint: list):
assert len(left_joint) == self.left_offset, "len of left master joint is not equal the count of left joint"
self.left_rmarm.rm_movej_canfd(left_joint, False)
def write_right_joint_canfd(self, right_joint: list):
assert len(right_joint) == self.left_offset, "len of right master joint is not equal the count of right joint"
self.right_rmarm.rm_movej_canfd(right_joint, False)
@@ -140,48 +264,91 @@ class RealmanDualMotorsBus:
self.left_rmarm.rm_set_gripper_position(left_gripper, False, 2)
self.right_rmarm.rm_set_gripper_position(right_gripper, False, 2)
def read(self) -> Dict:
"""
- 机械臂关节消息,单位1度;[-1, 1]
- 机械臂夹爪消息,[-1, 1]
"""
left_joint_msg = self.left_rmarm.rm_get_current_arm_state()[1]
left_joint_state = left_joint_msg['joint']
right_joint_msg = self.right_rmarm.rm_get_current_arm_state()[1]
right_joint_state = right_joint_msg['joint']
left_gripper_msg = self.left_rmarm.rm_get_gripper_state()[1]
left_gripper_state = left_gripper_msg['actpos']
right_gripper_msg = self.right_rmarm.rm_get_gripper_state()[1]
right_gripper_state = right_gripper_msg['actpos']
return {
"left_joint_1": left_joint_state[0]/180,
"left_joint_2": left_joint_state[1]/180,
"left_joint_3": left_joint_state[2]/180,
"left_joint_4": left_joint_state[3]/180,
"left_joint_5": left_joint_state[4]/180,
"left_joint_6": left_joint_state[5]/180,
"left_gripper": (left_gripper_state-500)/500,
"right_joint_1": right_joint_state[0]/180,
"right_joint_2": right_joint_state[1]/180,
"right_joint_3": right_joint_state[2]/180,
"right_joint_4": right_joint_state[3]/180,
"right_joint_5": right_joint_state[4]/180,
"right_joint_6": right_joint_state[5]/180,
"right_gripper": (right_gripper_state-500)/500
}
def read_current_arm_joint_state(self):
return self.left_rmarm.rm_get_current_arm_state()[1]['joint'] + self.right_rmarm.rm_get_current_arm_state()[1]['joint']
def read_current_arm_endpose_state(self):
return self.left_rmarm.rm_get_current_arm_state()[1]['pose'] + self.right_rmarm.rm_get_current_arm_state()[1]['pose']
########################## lerobot function ##########################
def safe_disconnect(self):
"""
Move to safe disconnect position
def connect(self, enable: bool = True) -> bool:
"""使能机械臂并检测使能状态"""
with self._timeout_context() as is_timeout_valid:
while is_timeout_valid():
try:
if enable:
left_ret = self.left_rmarm.rm_get_current_arm_state()
right_ret = self.right_rmarm.rm_get_current_arm_state()
if left_ret[0] == 0 and right_ret[0] == 0:
print("机械臂使能成功")
return True
else:
RoboticArm.rm_destory()
print("机械臂断开连接")
return True
except Exception as e:
print(f"连接异常: {e}")
time.sleep(1)
print("连接超时")
return False
def set_calibration(self):
raise NotImplementedError
def revert_calibration(self):
raise NotImplementedError
def apply_calibration(self):
"""
self.write(target_joint=self.safe_disable_position)
# 断开所有连接,销毁线程
RoboticArm.rm_destory()
移动到初始位置
"""
self.write(target_joint=self.init_joint_position)
def write(self, target_joint: list):
"""写入关节位置"""
self._validate_joint_count(target_joint, self.joint_count)
left_joints = target_joint[:self.left_offset]
left_gripper = target_joint[self.left_offset]
right_joints = target_joint[self.left_offset+1:-1]
right_gripper = target_joint[-1]
self.left_rmarm.rm_movej_follow(left_joints)
self.left_rmarm.rm_set_gripper_position(left_gripper, block=False, timeout=2)
self.right_rmarm.rm_movej_follow(right_joints)
self.right_rmarm.rm_set_gripper_position(right_gripper, block=False, timeout=2)
def write_action(self, action: dict, state: dict):
# 提取状态数据
follow_left_joint = self._extract_joint_state(state, "left_joint")
follow_right_joint = self._extract_joint_state(state, "right_joint")
# 提取动作数据
master_left_joint = action['left_joint_actions']
master_right_joint = action['right_joint_actions']
left_gripper = int(action['left_gripper_actions'] * 1000)
right_gripper = int(action['right_gripper_actions'] * 1000)
# 执行夹爪动作
self.write_dual_gripper(left_gripper, right_gripper)
# 执行关节动作
self._execute_arm_action('left', action, master_left_joint, follow_left_joint)
self._execute_arm_action('right', action, master_right_joint, follow_right_joint)
def read(self) -> Dict:
"""读取机械臂状态 - 直接从缓存获取"""
with self._cache_lock:
return self._state_cache.copy()
def safe_disconnect(self):
"""安全断开连接"""
try:
self.write(target_joint=self.safe_disable_position)
time.sleep(2) # 等待移动完成
except Exception as e:
print(f"移动到安全位置失败: {e}")
finally:
RoboticArm.rm_destory()
########################## lerobot function ##########################

View File

@@ -765,7 +765,7 @@ class RealmanDualRobotConfig(RobotConfig):
left_port = 8080,
right_ip = "192.168.3.19",
right_port = 8080,
init_joint = {'joint': [-175, 90, 90, 45, 90, -90, 10, 175, 90, 90, -45, 90, 90, 10]},
init_joint = {'joint': [-170, 90, 0, 90, 120, 0, 10, 170, 90, 0, -90, 120, 0, 10]},
motors={
# name: (index, model)
"left_joint_1": [1, "realman"],

View File

@@ -5,6 +5,8 @@
import time
import torch
import numpy as np
import logging
from typing import Optional, Tuple, Dict
from dataclasses import dataclass, field, replace
from collections import deque
from lerobot.common.robot_devices.teleop.gamepad_dual import HybridController
@@ -14,6 +16,7 @@ from lerobot.common.robot_devices.utils import RobotDeviceAlreadyConnectedError,
from lerobot.common.robot_devices.robots.configs import RealmanDualRobotConfig
class RealmanDualRobot:
def __init__(self, config: RealmanDualRobotConfig | None = None, **kwargs):
if config is None:
@@ -39,19 +42,84 @@ class RealmanDualRobot:
'servo_config_file': config.servo_config_file
}
# build state-action cache
self.joint_queue = deque(maxlen=2)
self.last_endpose = self.arm.init_pose
# 初始化遥操作
self._initialize_teleop()
# init state
self._initialize_state()
# build gamepad teleop
def _initialize_teleop(self):
"""初始化遥操作"""
self.init_info = {
'init_joint': self.arm.init_joint_position,
'init_pose': self.arm.init_pose,
'max_gripper': self.config.max_gripper,
'min_gripper': self.config.min_gripper,
'servo_config_file': self.config.servo_config_file
}
if not self.inference_time:
self.teleop = HybridController(self.init_info)
else:
self.teleop = None
def _initialize_state(self):
"""初始化状态"""
self.joint_queue = deque(maxlen=2)
self.last_endpose = self.arm.init_pose
self.logs = {}
self.is_connected = False
def _read_robot_state(self) -> dict:
"""读取机器人状态"""
before_read_t = time.perf_counter()
state = self.arm.read()
self.logs["read_pos_dt_s"] = time.perf_counter() - before_read_t
return state
def _execute_action(self, action: dict, state: dict):
"""执行动作"""
before_write_t = time.perf_counter()
if action['control_mode'] == 'joint':
self.arm.write_action(action, state)
# 可以添加其他控制模式的处理
self.logs["write_pos_dt_s"] = time.perf_counter() - before_write_t
def _prepare_record_data(self) -> Tuple[Dict, Dict]:
"""准备记录数据"""
if len(self.joint_queue) < 2:
return {}, {}
state = torch.as_tensor(list(self.joint_queue[0]), dtype=torch.float32)
action = torch.as_tensor(list(self.joint_queue[-1]), dtype=torch.float32)
# 捕获图像
images = self._capture_images()
# 构建输出字典
obs_dict = {
"observation.state": state,
**{f"observation.images.{name}": img for name, img in images.items()}
}
action_dict = {"action": action}
return obs_dict, action_dict
def _update_state_queue(self):
"""更新状态队列"""
current_state = list(self.arm.read().values())
self.joint_queue.append(current_state)
def _capture_images(self) -> Dict[str, torch.Tensor]:
"""捕获图像"""
images = {}
for name, camera in self.cameras.items():
before_camread_t = time.perf_counter()
image = camera.async_read()
images[name] = torch.from_numpy(image)
self.logs[f"read_camera_{name}_dt_s"] = camera.logs["delta_timestamp_s"]
self.logs[f"async_read_camera_{name}_dt_s"] = time.perf_counter() - before_camread_t
return images
@property
def camera_features(self) -> dict:
cam_ft = {}
@@ -143,7 +211,6 @@ class RealmanDualRobot:
if not self.inference_time:
self.teleop.reset()
def teleop_step(
self, record_data=False
) -> None | tuple[dict[str, torch.Tensor], dict[str, torch.Tensor]]:
@@ -153,83 +220,23 @@ class RealmanDualRobot:
if self.teleop is None and self.inference_time:
self.teleop = HybridController(self.init_info)
# read target pose state as
before_read_t = time.perf_counter()
state = self.arm.read() # read current joint position from robot
action = self.teleop.get_action() # target joint position and pose end pos from gamepad
self.logs["read_pos_dt_s"] = time.perf_counter() - before_read_t
if action['control_mode'] == 'joint':
# 关节控制模式(主模式)
# do action
before_write_t = time.perf_counter()
left_joint = action['master_joint_actions'][:self.arm.left_offset]
# left_joint = [v*180 for v in action['master_joint_actions'][:self.arm.left_offset]]
left_gripper = int(action['master_joint_actions'][self.arm.left_offset] * 1000)
right_joint = action['master_joint_actions'][self.arm.left_offset+1:-1]
# right_joint = [v*180 for v in action['master_joint_actions'][self.arm.left_offset+1:-1]]
right_gripper = int(action['master_joint_actions'][-1] * 1000)
self.arm.write_dual_gripper(left_gripper, right_gripper)
# print(left_gripper, right_gripper)
if action['master_controller_status']['left']['infrared'] == 1:
if action['master_controller_status']['left']['button'] == 1:
self.arm.write_left_joint_canfd(left_joint)
else:
self.arm.write_left_joint_slow(left_joint)
print(action['master_controller_status']['left'], action['master_controller_status']['right'])
print('left', left_joint)
print(state)
try:
# 读取当前状态
state = self._read_robot_state()
# 获取动作
action = self.teleop.get_action()
# 执行动作
self._execute_action(action, state)
# 更新状态队列
self._update_state_queue()
if action['master_controller_status']['right']['infrared'] == 1:
if action['master_controller_status']['right']['button'] == 1:
self.arm.write_right_joint_canfd(right_joint)
print('right', left_joint)
else:
self.arm.write_right_joint_slow(right_joint)
print('right', left_joint)
self.joint_queue.append(list(self.arm.read().values()))
self.logs["write_pos_dt_s"] = time.perf_counter() - before_write_t
# else:
# target_pose = list(action['end_pose'])
# # do action
# before_write_t = time.perf_counter()
# if self.last_endpose != target_pose:
# self.arm.write_endpose_canfd(target_pose)
# self.last_endpose = target_pose
# self.arm.write_gripper(action['gripper'])
if record_data:
data = self._prepare_record_data()
return data
# target_joints = self.arm.read_current_arm_joint_state()
# self.joint_queue.append(list(self.arm.read().values()))
# self.teleop.update_joint_state(target_joints)
# self.logs["write_pos_dt_s"] = time.perf_counter() - before_write_t
if not record_data:
return
state = torch.as_tensor(list(self.joint_queue[0]), dtype=torch.float32)
action = torch.as_tensor(list(self.joint_queue[-1]), dtype=torch.float32)
# Capture images from cameras
images = {}
for name in self.cameras:
before_camread_t = time.perf_counter()
images[name] = self.cameras[name].async_read()
images[name] = torch.from_numpy(images[name])
self.logs[f"read_camera_{name}_dt_s"] = self.cameras[name].logs["delta_timestamp_s"]
self.logs[f"async_read_camera_{name}_dt_s"] = time.perf_counter() - before_camread_t
# Populate output dictionnaries
obs_dict, action_dict = {}, {}
obs_dict["observation.state"] = state
action_dict["action"] = action
for name in self.cameras:
obs_dict[f"observation.images.{name}"] = images[name]
return obs_dict, action_dict
except Exception as e:
logging.error(f"遥操作步骤失败: {e}")
return None
def send_action(self, action: torch.Tensor) -> torch.Tensor:
@@ -249,7 +256,6 @@ class RealmanDualRobot:
return action
def capture_observation(self) -> dict:
"""capture current images and joint positions"""
if not self.is_connected:

View File

@@ -25,7 +25,8 @@ from lerobot.common.robot_devices.robots.configs import (
So100RobotConfig,
So101RobotConfig,
StretchRobotConfig,
RealmanRobotConfig
RealmanRobotConfig,
RealmanDualRobotConfig
)
@@ -87,6 +88,11 @@ def make_robot_from_config(config: RobotConfig):
from lerobot.common.robot_devices.robots.realman import RealmanRobot
return RealmanRobot(config)
elif isinstance(config, RealmanDualRobotConfig):
from lerobot.common.robot_devices.robots.realman_dual import RealmanDualRobot
return RealmanDualRobot(config)
else:
from lerobot.common.robot_devices.robots.stretch import StretchRobot

View File

@@ -6,43 +6,94 @@ import binascii
import logging
import yaml
from typing import Dict
from dataclasses import dataclass
from Robotic_Arm.rm_robot_interface import *
from .servo_server import ServoArmServer
@dataclass
class ControllerConfig:
"""控制器配置"""
init_joint: list
init_pose: list
max_gripper: int
min_gripper: int
config_file: str
class HybridController:
def __init__(self, init_info):
self.init_joint = init_info['init_joint']
self.init_pose = init_info.get('init_pose', [0]*12)
self.max_gripper = init_info['max_gripper']
self.min_gripper = init_info['min_gripper']
self.config_file = init_info['servo_config_file']
self.joint = self.init_joint.copy()
self.pose = self.init_pose.copy()
self.master_dual_arm = ServoArmServer(self.config_file)
self.config = self._parse_init_info(init_info)
self.joint = self.config.init_joint.copy()
self.pose = self.config.init_pose.copy()
self.joint_control_mode = True
self._initialize_master_arm()
def _parse_init_info(self, init_info: dict) -> ControllerConfig:
"""解析初始化信息"""
return ControllerConfig(
init_joint=init_info['init_joint'],
init_pose=init_info.get('init_pose', [0]*12),
max_gripper=init_info['max_gripper'],
min_gripper=init_info['min_gripper'],
config_file=init_info['servo_config_file']
)
def _initialize_master_arm(self):
"""初始化主控臂"""
try:
self.master_dual_arm = ServoArmServer(self.config.config_file)
except Exception as e:
logging.error(f"初始化主控臂失败: {e}")
raise
def get_action(self) -> Dict:
master_controller_status = {}
master_controller_status['left'] = self.master_dual_arm.get_controller_status(arm='left')
master_controller_status['right'] = self.master_dual_arm.get_controller_status(arm='left')
"""获取控制动作"""
try:
master_joint_actions = self.master_dual_arm.get_joint_data()
return self._format_action(master_joint_actions)
except Exception as e:
logging.error(f"获取动作失败: {e}")
def _format_action(self, master_joint_actions: dict) -> dict:
"""格式化动作数据"""
master_controller_status = {
'left': master_joint_actions['left_controller_status'],
'right': master_joint_actions['right_controller_status']
}
return {
'control_mode': 'joint' if self.joint_control_mode else 'end_pose',
'master_joint_actions': self.master_dual_arm.get_joint_data()['dual_joint_actions'],
'master_joint_actions': master_joint_actions['dual_joint_actions'],
'left_joint_actions': master_joint_actions['left_joint_actions'][:-1],
'right_joint_actions': master_joint_actions['right_joint_actions'][:-1],
'left_gripper_actions': master_joint_actions['left_joint_actions'][-1], # 修复bug
'right_gripper_actions': master_joint_actions['right_joint_actions'][-1],
'master_controller_status': master_controller_status,
'end_pose': self.pose
}
def switch_control_mode(self):
"""切换控制模式"""
self.joint_control_mode = not self.joint_control_mode
mode = "关节" if self.joint_control_mode else "末端"
print(f"切换到{mode}控制模式")
def stop(self):
if self.master_dual_arm:
self.master_dual_arm.shutdown()
print("混合控制器已退出")
"""停止控制器"""
try:
if hasattr(self, 'master_dual_arm') and self.master_dual_arm:
self.master_dual_arm.shutdown()
print("混合控制器已退出")
except Exception as e:
logging.error(f"停止控制器失败: {e}")
def reset(self):
pass
"""重置控制器"""
self.joint = self.config.init_joint.copy()
self.pose = self.config.init_pose.copy()
self.joint_control_mode = True
# 使用示例
if __name__ == "__main__":

View File

@@ -22,6 +22,8 @@ class ServoArmServer:
self.latest_data = {
'left_joint_actions': {},
'right_joint_actions': {},
'left_controller_status': {},
'right_controller_status': {},
'timestamp': time.time()
}
@@ -30,6 +32,7 @@ class ServoArmServer:
# 启动数据采集线程
self._start_data_collection()
def _initialize_arms(self):
"""初始化左右机械臂"""
try:
@@ -47,42 +50,77 @@ class ServoArmServer:
def _start_data_collection(self):
"""启动数据采集线程"""
self.running = True
self.data_thread = threading.Thread(target=self._data_collection_loop)
self.data_thread.daemon = True
self.data_thread.start()
logging.info("数据采集线程已启动")
# 创建左臂数据采集线程
self.left_data_thread = threading.Thread(target=self._left_arm_data_loop)
self.left_data_thread.daemon = True
self.left_data_thread.start()
# 创建右臂数据采集线程
self.right_data_thread = threading.Thread(target=self._right_arm_data_loop)
self.right_data_thread.daemon = True
self.right_data_thread.start()
logging.info("左右机械臂数据采集线程已启动")
def _data_collection_loop(self):
"""数据采集循环"""
def _left_arm_data_loop(self):
"""左机械臂数据采集循环"""
while self.running:
try:
left_actions = {}
right_actions = {}
left_controller_status = {}
# 获取左机械臂数据
if self.left_servo_arm and self.left_servo_arm.connected:
left_actions = self.left_servo_arm.get_joint_actions()
left_controller_status = self.left_servo_arm.get_controller_status()
# 获取右机械臂数据
if self.right_servo_arm and self.right_servo_arm.connected:
right_actions = self.right_servo_arm.get_joint_actions()
if self._check_val_safety(left_actions) == False or self._check_val_safety(right_actions) == False:
if self._check_val_safety(left_actions) == False:
time.sleep(0.02)
continue
# 更新最新数据
# 更新左机械臂数据
with self.data_lock:
self.latest_data = {
'dual_joint_actions': [left_actions[k] for k in left_actions] + [right_actions[k] for k in right_actions],
'left_joint_actions': left_actions,
'right_joint_actions': right_actions,
'timestamp': time.time()
}
self.latest_data['left_joint_actions'] = [left_actions[k] for k in left_actions]
self.latest_data['left_controller_status'] = left_controller_status
# 更新dual_joint_actions
if self.latest_data['right_joint_actions']:
self.latest_data['dual_joint_actions'] = self.latest_data['left_joint_actions'] + self.latest_data['right_joint_actions']
self.latest_data['timestamp'] = time.time()
time.sleep(0.02) # 50Hz采集频率
except Exception as e:
logging.error(f"数据采集错误: {e}")
logging.error(f"左机械臂数据采集错误: {e}")
time.sleep(0.1)
def _right_arm_data_loop(self):
"""右机械臂数据采集循环"""
while self.running:
try:
right_actions = {}
right_controller_status = {}
# 获取右机械臂数据
if self.right_servo_arm and self.right_servo_arm.connected:
right_actions = self.right_servo_arm.get_joint_actions()
right_controller_status = self.right_servo_arm.get_controller_status()
if self._check_val_safety(right_actions) == False:
time.sleep(0.02)
continue
# 更新右机械臂数据
with self.data_lock:
self.latest_data['right_joint_actions'] = [right_actions[k] for k in right_actions]
self.latest_data['right_controller_status'] = right_controller_status
# 更新dual_joint_actions
if self.latest_data['left_joint_actions']:
self.latest_data['dual_joint_actions'] = self.latest_data['left_joint_actions'] + self.latest_data['right_joint_actions']
self.latest_data['timestamp'] = time.time()
time.sleep(0.02) # 50Hz采集频率
except Exception as e:
logging.error(f"右机械臂数据采集错误: {e}")
time.sleep(0.1)
def _check_val_safety(self, data: dict):
@@ -102,34 +140,23 @@ class ServoArmServer:
return self.latest_data.copy()
def get_left_joint_actions(self):
"""获取左机械臂关节数据"""
"""获取左机械臂关节数据和控制器状态"""
with self.data_lock:
return {
'data': self.latest_data['left_joint_actions'],
'controller_status': self.latest_data['left_controller_status'],
'timestamp': self.latest_data['timestamp']
}
def get_right_joint_actions(self):
"""获取右机械臂关节数据"""
"""获取右机械臂关节数据和控制器状态"""
with self.data_lock:
return {
'data': self.latest_data['right_joint_actions'],
'controller_status': self.latest_data['right_controller_status'],
'timestamp': self.latest_data['timestamp']
}
def get_controller_status(self, arm='left'):
"""获取控制器状态"""
try:
if arm == 'left' and self.left_servo_arm:
return self.left_servo_arm.get_controller_status()
elif arm == 'right' and self.right_servo_arm:
return self.right_servo_arm.get_controller_status()
else:
return {'error': f'Invalid arm: {arm}'}
except Exception as e:
logging.error(f"获取控制器状态错误: {e}")
return {'error': str(e)}
def get_connection_status(self):
"""获取连接状态"""
return {