Compare commits

...

11 Commits

Author SHA1 Message Date
85ef0f289f 语音播报
Some checks failed
Secret Leaks / trufflehog (push) Failing after 17s
2025-08-12 11:52:35 +08:00
a87dce9e3f fix some bugs 2025-08-01 10:02:18 +08:00
3685542bf1 optimize some codes
Some checks failed
Secret Leaks / trufflehog (push) Failing after 2m47s
2025-07-13 16:24:28 +08:00
7c1699898b realman xbox+flight controller
Some checks failed
Secret Leaks / trufflehog (push) Failing after 1m30s
2025-07-13 15:42:27 +08:00
b3e9e11e11 refactory gamepad 2025-07-06 14:28:38 +08:00
b04e6e0c7b dual arm fisrt complete with master arm
Some checks failed
Secret Leaks / trufflehog (push) Failing after 1m30s
2025-07-03 19:44:10 +08:00
96804bc86c still mant bugs 2025-07-02 11:42:03 +08:00
ef45ea9649 single arm refactory
Some checks failed
Secret Leaks / trufflehog (push) Has been cancelled
2025-06-26 21:08:22 +08:00
bc351a0134 change pose control api to canfd
Some checks failed
Secret Leaks / trufflehog (push) Has been cancelled
2025-06-19 14:55:54 +08:00
68986f6fc0 update some readme
Some checks failed
Secret Leaks / trufflehog (push) Has been cancelled
2025-06-19 11:49:19 +08:00
2f124e34de redefine init joint pos
Some checks failed
Secret Leaks / trufflehog (push) Has been cancelled
2025-06-17 14:56:23 +08:00
18 changed files with 2746 additions and 549 deletions

View File

@@ -47,4 +47,16 @@ class RealmanMotorsBusConfig(MotorsBusConfig):
ip: str
port: int
motors: dict[str, tuple[int, str]]
init_joint: dict[str, list]
init_joint: dict[str, list]
@MotorsBusConfig.register_subclass("realman_dual")
@dataclass
class RealmanDualMotorsBusConfig(MotorsBusConfig):
left_ip: str
right_ip: str
left_port: int
right_port: int
motors: dict[str, tuple[int, str]]
init_joint: dict[str, list]
axis: dict[str, int]

View File

@@ -15,6 +15,10 @@ class RealmanMotorsBus:
self.motors = config.motors
self.init_joint_position = config.init_joint['joint'] # [6 joints + 1 gripper]
self.safe_disable_position = config.init_joint['joint']
self.rmarm.rm_movej(self.init_joint_position[:-1], 5, 0, 0, 1)
time.sleep(3)
ret = self.rmarm.rm_get_current_arm_state()
self.init_pose = ret[1]['pose']
@property
def motor_names(self) -> list[str]:
@@ -98,6 +102,18 @@ class RealmanMotorsBus:
self.rmarm.rm_movej_p(target_endpose, 50, 0, 0, 1)
self.rmarm.rm_set_gripper_position(gripper, block=False, timeout=2)
def write_joint_slow(self, target_joint: list):
self.rmarm.rm_movej(target_joint, 5, 0, 0, 0)
def write_joint_canfd(self, target_joint: list):
self.rmarm.rm_movej_canfd(target_joint, False)
def write_endpose_canfd(self, target_pose: list):
self.rmarm.rm_movep_canfd(target_pose, False)
def write_gripper(self, gripper: int):
self.rmarm.rm_set_gripper_position(gripper, False, 2)
def read(self) -> Dict:
"""
- 机械臂关节消息,单位1度;[-1, 1]
@@ -119,6 +135,12 @@ class RealmanMotorsBus:
"gripper": (gripper_state-500)/500
}
def read_current_arm_joint_state(self):
return self.rmarm.rm_get_current_arm_state()[1]['joint']
def read_current_arm_endpose_state(self):
return self.rmarm.rm_get_current_arm_state()[1]['pose']
def safe_disconnect(self):
"""
Move to safe disconnect position

View File

@@ -0,0 +1,351 @@
import time
import threading
from typing import Dict
from dataclasses import dataclass
from contextlib import contextmanager
from lerobot.common.robot_devices.motors.configs import RealmanDualMotorsBusConfig
from Robotic_Arm.rm_robot_interface import *
def compare_joint_difference(master_joints, follow_joints, threshold=30.0):
"""
比较主臂和从臂关节数据的差异
Args:
master_joints (list): 主臂关节数据 [joint1, joint2, ..., joint6]
follow_joints (list): 从臂关节数据 [joint1, joint2, ..., joint6]
threshold (float): 差异阈值默认5度
Returns:
bool: True表示差异在阈值内False表示超过阈值
"""
# 检查数据长度
if len(master_joints) != len(follow_joints):
return False
# 计算每个关节的绝对差异
for i in range(len(master_joints)):
diff = abs(master_joints[i] - follow_joints[i])
if diff > threshold:
return False
return True
@dataclass
class ArmState:
"""机械臂状态数据类"""
joint_positions: list
gripper_position: int
pose: list
class RealmanDualMotorsBus:
"""
对Realman SDK的二次封装
"""
def __init__(self, config: RealmanDualMotorsBusConfig):
self.config = config
# import pdb; pdb.set_trace()
self._initialize_arms()
self._initialize_parameters()
self._initialize_positions()
self._initialize_threading()
def _initialize_arms(self):
"""初始化机械臂连接"""
self.left_rmarm = RoboticArm(rm_thread_mode_e.RM_TRIPLE_MODE_E)
self.right_rmarm = RoboticArm(rm_thread_mode_e.RM_TRIPLE_MODE_E)
self.handle_left = self.left_rmarm.rm_create_robot_arm(
self.config.left_ip, self.config.left_port
)
self.handle_right = self.right_rmarm.rm_create_robot_arm(
self.config.right_ip, self.config.right_port
)
def _initialize_parameters(self):
"""初始化参数"""
self.motors = self.config.motors
self.axis = self.config.axis
self.joint_count = sum(self.axis.values())
self.left_offset = self.axis['left_joint']
def _initialize_positions(self):
"""初始化位置"""
self.init_joint_position = self.config.init_joint['joint']
self.safe_disable_position = self.config.init_joint['joint']
# 移动到初始位置
self._move_to_initial_position()
# 获取初始姿态
time.sleep(3)
self.init_pose = self._get_initial_pose()
def _initialize_threading(self):
"""初始化线程控制"""
self.left_slow_busy = False
self.right_slow_busy = False
self.gripper_busy = False
self._thread_lock = threading.Lock()
# 添加读取相关的线程控制
self._state_cache = {"joint": {}, "pose": {}}
self._cache_lock = threading.Lock()
self._keep_reading = True
# 启动后台读取线程
self._start_background_readers()
def _start_background_readers(self):
"""启动后台读取线程"""
# 读取线程
threading.Thread(
target=self._read_task,
daemon=True,
name="arm_reader"
).start()
@property
def motor_names(self) -> list[str]:
return list(self.motors.keys())
@property
def motor_models(self) -> list[str]:
return [model for _, model in self.motors.values()]
@property
def motor_indices(self) -> list[int]:
return [idx for idx, _ in self.motors.values()]
@contextmanager
def _timeout_context(self, timeout: float = 5.0):
"""超时上下文管理器"""
start_time = time.time()
try:
yield lambda: time.time() - start_time < timeout
except Exception as e:
raise TimeoutError(f"操作超时: {e}")
def _read_task(self):
"""左臂后台读取任务 - 模仿_left_slow_task的风格"""
while self._keep_reading:
try:
left_state = self._read_arm_state(self.left_rmarm, "left")
with self._cache_lock:
self._state_cache["joint"].update(left_state["joint"])
self._state_cache["pose"].update(left_state["pose"])
except Exception as e:
print(f"左臂读取失败: {e}")
try:
right_state = self._read_arm_state(self.right_rmarm, "right")
with self._cache_lock:
self._state_cache["joint"].update(right_state["joint"])
self._state_cache["pose"].update(right_state["pose"])
except Exception as e:
print(f"右臂读取失败: {e}")
def _read_arm_state(self, arm: RoboticArm, prefix: str) -> dict:
"""读取单臂状态 - 保持原有逻辑"""
joint_msg = arm.rm_get_current_arm_state()[1]
gripper_msg = arm.rm_get_gripper_state()[1]
joint_state = joint_msg['joint']
gripper_state = gripper_msg['actpos']
pose_state = joint_msg['pose']
joint_state_dict = {}
for i in range(len(joint_state)):
joint_state_dict[f"{prefix}_joint_{i+1}"] = joint_state[i]
joint_state_dict[f"{prefix}_gripper"] = gripper_state
pose_state_dict = {
f"{prefix}_x": pose_state[0],
f"{prefix}_y": pose_state[1],
f"{prefix}_z": pose_state[2],
f"{prefix}_rx": pose_state[3],
f"{prefix}_ry": pose_state[4],
f"{prefix}_rz": pose_state[5],
}
return {"joint": joint_state_dict, 'pose': pose_state_dict}
def _move_to_initial_position(self):
"""移动到初始位置"""
left_joints = self.init_joint_position[:self.left_offset]
right_joints = self.init_joint_position[self.left_offset+1:-1]
self.left_rmarm.rm_movej(left_joints, 5, 0, 0, 1)
self.right_rmarm.rm_movej(right_joints, 5, 0, 0, 1)
def _get_initial_pose(self) -> list:
"""获取初始姿态"""
left_ret = self.left_rmarm.rm_get_current_arm_state()
right_ret = self.right_rmarm.rm_get_current_arm_state()
return left_ret[1]['pose'] + right_ret[1]['pose']
def _validate_joint_count(self, joints: list, expected_count: int):
"""验证关节数量"""
if len(joints) != expected_count:
raise ValueError(f"关节数量不匹配: 期望 {expected_count}, 实际 {len(joints)}")
def _execute_slow_movement(self, arm: str, joint_data: list):
"""执行慢速运动"""
busy_flag = f"{arm}_slow_busy"
if not getattr(self, busy_flag):
setattr(self, busy_flag, True)
target_method = getattr(self, f"_{arm}_slow_task")
threading.Thread(
target=target_method,
args=(joint_data.copy(),),
daemon=True
).start()
def _left_slow_task(self, joint_data: list):
"""左臂慢速任务"""
try:
self.write_left_joint_slow(joint_data)
finally:
self.left_slow_busy = False
def _right_slow_task(self, joint_data: list):
"""右臂慢速任务"""
try:
self.write_right_joint_slow(joint_data)
finally:
self.right_slow_busy = False
def _execute_arm_action(self, arm: str, action: dict, master_joint: list, follow_joint: list):
"""执行单臂动作"""
controller_status = action['master_controller_status'][arm]
if controller_status['infrared'] == 1:
if compare_joint_difference(master_joint, follow_joint):
if arm == 'left':
self.write_left_joint_canfd(master_joint)
else:
self.write_right_joint_canfd(master_joint)
else:
self._execute_slow_movement(arm, master_joint)
def write_endpose(self, target_endpose: list):
assert target_endpose == 12, "the length of target pose is not equal 12"
self.left_rmarm.rm_movej_p(target_endpose[:6], 50, 0, 0, 1)
self.right_rmarm.rm_movej_p(target_endpose[6:], 50, 0, 0, 1)
def write_left_joint_slow(self, left_joint: list):
assert len(left_joint) == self.left_offset, "len of left master joint is not equal the count of left joint"
self.left_rmarm.rm_movej(left_joint, 5, 0, 0, 1)
def write_right_joint_slow(self, right_joint: list):
assert len(right_joint) == self.left_offset, "len of right master joint is not equal the count of right joint"
self.right_rmarm.rm_movej(right_joint, 5, 0, 0, 1)
def write_left_joint_canfd(self, left_joint: list):
assert len(left_joint) == self.left_offset, "len of left master joint is not equal the count of left joint"
self.left_rmarm.rm_movej_canfd(left_joint, False)
def write_right_joint_canfd(self, right_joint: list):
assert len(right_joint) == self.left_offset, "len of right master joint is not equal the count of right joint"
self.right_rmarm.rm_movej_canfd(right_joint, False)
def write_endpose_canfd(self, target_endpose: list):
assert len(target_endpose) == 12, "the length of target pose is not equal 12"
self.left_rmarm.rm_movep_canfd(target_endpose[:6], False)
self.right_rmarm.rm_movep_canfd(target_endpose[6:], False)
def write_dual_gripper(self, left_gripper: int, right_gripper: int):
try:
self.left_rmarm.rm_set_gripper_position(left_gripper, False, 2)
self.right_rmarm.rm_set_gripper_position(right_gripper, False, 2)
finally:
self.gripper_busy = False
def _execute_gripper_thread(self, left_gripper: int, right_gripper: int):
if not getattr(self, 'gripper_busy'):
setattr(self, 'gripper_busy', True)
threading.Thread(
target=self.write_dual_gripper,
args=(left_gripper, right_gripper),
daemon=True
).start()
def read_current_arm_joint_state(self):
return self.left_rmarm.rm_get_current_arm_state()[1]['joint'] + self.right_rmarm.rm_get_current_arm_state()[1]['joint']
def read_current_arm_endpose_state(self):
return self.left_rmarm.rm_get_current_arm_state()[1]['pose'] + self.right_rmarm.rm_get_current_arm_state()[1]['pose']
########################## lerobot function ##########################
def connect(self, enable: bool = True) -> bool:
"""使能机械臂并检测使能状态"""
with self._timeout_context() as is_timeout_valid:
while is_timeout_valid():
try:
if enable:
left_ret = self.left_rmarm.rm_get_current_arm_state()
right_ret = self.right_rmarm.rm_get_current_arm_state()
if left_ret[0] == 0 and right_ret[0] == 0:
print("机械臂使能成功")
return True
else:
RoboticArm.rm_destory()
print("机械臂断开连接")
return True
except Exception as e:
print(f"连接异常: {e}")
time.sleep(1)
print("连接超时")
return False
def set_calibration(self):
raise NotImplementedError
def revert_calibration(self):
raise NotImplementedError
def apply_calibration(self):
"""
移动到初始位置
"""
self.write(target_joint=self.init_joint_position)
def write(self, target_joint: list):
"""写入关节位置"""
self._validate_joint_count(target_joint, self.joint_count)
left_joints = target_joint[:self.left_offset]
left_gripper = target_joint[self.left_offset]
right_joints = target_joint[self.left_offset+1:-1]
right_gripper = target_joint[-1]
self.left_rmarm.rm_movej_canfd(left_joints, follow=False)
# self.left_rmarm.rm_movej_follow(left_joints)
# self.left_rmarm.rm_set_gripper_position(left_gripper, block=False, timeout=2)
self.right_rmarm.rm_movej_canfd(right_joints, follow=False)
# self.right_rmarm.rm_movej_follow(right_joints)
# self.right_rmarm.rm_set_gripper_position(right_gripper, block=False, timeout=2)
self._execute_gripper_thread(left_gripper, right_gripper)
def read(self) -> Dict:
"""读取机械臂状态 - 直接从缓存获取"""
with self._cache_lock:
return self._state_cache.copy()
def safe_disconnect(self):
"""安全断开连接"""
try:
self.write(target_joint=self.safe_disable_position)
time.sleep(2) # 等待移动完成
except Exception as e:
print(f"移动到安全位置失败: {e}")
finally:
RoboticArm.rm_destory()
########################## lerobot function ##########################

View File

@@ -49,6 +49,10 @@ def make_motors_buses_from_configs(motors_bus_configs: dict[str, MotorsBusConfig
motors_buses[key] = RealmanMotorsBus(cfg)
elif cfg.type == "realman_dual":
from lerobot.common.robot_devices.motors.realman_dual import RealmanDualMotorsBus
motors_buses[key] = RealmanDualMotorsBus(cfg)
else:
raise ValueError(f"The motor type '{cfg.type}' is not valid.")

View File

@@ -27,7 +27,8 @@ from lerobot.common.robot_devices.motors.configs import (
DynamixelMotorsBusConfig,
FeetechMotorsBusConfig,
MotorsBusConfig,
RealmanMotorsBusConfig
RealmanMotorsBusConfig,
RealmanDualMotorsBusConfig
)
@@ -683,7 +684,7 @@ class RealmanRobotConfig(RobotConfig):
inference_time: bool = False
max_gripper: int = 990
min_gripper: int = 10
servo_config_file: str = "/home/maic/LYT/lerobot/realman_src/realman_aloha/shadow_rm_robot/config/servo_arm.yaml"
servo_config_file: str = "/home/maic/LYT/lerobot/lerobot/common/robot_devices/teleop/servo_arm.yaml"
left_follower_arm: dict[str, MotorsBusConfig] = field(
@@ -701,7 +702,7 @@ class RealmanRobotConfig(RobotConfig):
"joint_6": [6, "realman"],
"gripper": [7, "realman"],
},
init_joint = {'joint': [-90, 90, 90, -90, -90, 90, 1000]}
init_joint = {'joint': [-90, 90, 90, 90, 90, -90, 10]}
)
}
)
@@ -714,6 +715,81 @@ class RealmanRobotConfig(RobotConfig):
# width=640,
# height=480,
# ),
"left": IntelRealSenseCameraConfig(
serial_number="153122077516",
fps=30,
width=640,
height=480,
use_depth=False
),
# "right": IntelRealSenseCameraConfig(
# serial_number="405622075165",
# fps=30,
# width=640,
# height=480,
# use_depth=False
# ),
"front": IntelRealSenseCameraConfig(
serial_number="145422072751",
fps=30,
width=640,
height=480,
use_depth=False
),
"high": IntelRealSenseCameraConfig(
serial_number="145422072193",
fps=30,
width=640,
height=480,
use_depth=False
),
}
)
@RobotConfig.register_subclass("realman_dual")
@dataclass
class RealmanDualRobotConfig(RobotConfig):
inference_time: bool = False
max_gripper: int = 990
min_gripper: int = 10
servo_config_file: str = "/home/maic/LYT/lerobot/lerobot/common/robot_devices/teleop/servo_dual.yaml"
left_end_control_guid: str = '0300b14bff1100003708000010010000'
right_end_control_guid: str = '030003f05e0400008e02000010010000'
follower_arm: dict[str, MotorsBusConfig] = field(
default_factory=lambda: {
"main": RealmanDualMotorsBusConfig(
axis= {'left_joint': 6, 'left_gripper': 1, 'right_joint': 6, 'right_gripper': 1},
left_ip = "192.168.3.18",
left_port = 8080,
right_ip = "192.168.3.19",
right_port = 8080,
init_joint = {'joint': [-170, 90, 0, 90, 120, 0, 10, 170, 90, 0, -90, 120, 0, 10]},
motors={
# name: (index, model)
"left_joint_1": [1, "realman"],
"left_joint_2": [2, "realman"],
"left_joint_3": [3, "realman"],
"left_joint_4": [4, "realman"],
"left_joint_5": [5, "realman"],
"left_joint_6": [6, "realman"],
"left_gripper": [7, "realman"],
"right_joint_1": [8, "realman"],
"right_joint_2": [9, "realman"],
"right_joint_3": [10, "realman"],
"right_joint_4": [11, "realman"],
"right_joint_5": [12, "realman"],
"right_joint_6": [13, "realman"],
"right_gripper": [14, "realman"]
},
)
}
)
cameras: dict[str, CameraConfig] = field(
default_factory=lambda: {
"left": IntelRealSenseCameraConfig(
serial_number="153122077516",
fps=30,
@@ -743,23 +819,4 @@ class RealmanRobotConfig(RobotConfig):
use_depth=False
),
}
)
# right_follower_arm: dict[str, MotorsBusConfig] = field(
# default_factory=lambda: {
# "main": RealmanMotorsBusConfig(
# ip = "192.168.3.19",
# port = 8080,
# motors={
# # name: (index, model)
# "joint_1": [1, "realman"],
# "joint_2": [2, "realman"],
# "joint_3": [3, "realman"],
# "joint_4": [4, "realman"],
# "joint_5": [5, "realman"],
# "joint_6": [6, "realman"],
# "gripper": (7, "realman"),
# },
# )
# }
# )
)

View File

@@ -7,12 +7,13 @@ import torch
import numpy as np
from dataclasses import dataclass, field, replace
from collections import deque
from lerobot.common.robot_devices.teleop.gamepad import HybridController
from lerobot.common.robot_devices.teleop.realman_single import HybridController
from lerobot.common.robot_devices.motors.utils import get_motor_names, make_motors_buses_from_configs
from lerobot.common.robot_devices.cameras.utils import make_cameras_from_configs
from lerobot.common.robot_devices.utils import RobotDeviceAlreadyConnectedError, RobotDeviceNotConnectedError
from lerobot.common.robot_devices.robots.configs import RealmanRobotConfig
class RealmanRobot:
def __init__(self, config: RealmanRobotConfig | None = None, **kwargs):
if config is None:
@@ -28,14 +29,11 @@ class RealmanRobot:
# build realman motors
self.piper_motors = make_motors_buses_from_configs(self.config.left_follower_arm)
self.arm = self.piper_motors['main']
self.arm.rmarm.rm_movej(self.arm.init_joint_position[:-1], 50, 0, 0, 1)
time.sleep(2)
ret = self.arm.rmarm.rm_get_current_arm_state()
init_pose = ret[1]['pose']
# build init teleop info
self.init_info = {
'init_joint': self.arm.init_joint_position,
'init_pose': init_pose,
'init_pose': self.arm.init_pose,
'max_gripper': config.max_gripper,
'min_gripper': config.min_gripper,
'servo_config_file': config.servo_config_file
@@ -43,6 +41,7 @@ class RealmanRobot:
# build state-action cache
self.joint_queue = deque(maxlen=2)
self.last_endpose = self.arm.init_pose
# build gamepad teleop
if not self.inference_time:
@@ -61,7 +60,7 @@ class RealmanRobot:
cam_ft[key] = {
"shape": (cam.height, cam.width, cam.channels),
"names": ["height", "width", "channels"],
"info": None,
"info": None,
}
return cam_ft
@@ -162,49 +161,37 @@ class RealmanRobot:
if action['control_mode'] == 'joint':
# 关节控制模式(主模式)
ret = self.arm.rmarm.rm_get_current_arm_state()
current_pose = ret[1]['pose']
current_pose = self.arm.read_current_arm_endpose_state()
self.teleop.update_endpose_state(current_pose)
target_joints = action['joint_angles']
target_joints = action['joint_angles'][:-1]
self.arm.write_gripper(action['gripper'])
print(action['gripper'])
if action['master_controller_status']['infrared'] == 1:
if action['master_controller_status']['button'] == 1:
self.arm.write_joint_canfd(target_joints)
else:
self.arm.write_joint_slow(target_joints)
# do action
before_write_t = time.perf_counter()
self.joint_queue.append(list(self.arm.read().values()))
self.logs["write_pos_dt_s"] = time.perf_counter() - before_write_t
else:
# 末端位姿控制模式
target_pose = [
action['end_pose']['X'], # X (m)
action['end_pose']['Y'], # Y (m)
action['end_pose']['Z'], # Z (m)
action['end_pose']['RX'], # RX (rad)
action['end_pose']['RY'], # RY (rad)
action['end_pose']['RZ'] # RZ (rad)
]
target_pose = list(action['end_pose'])
# do action
before_write_t = time.perf_counter()
if self.last_endpose != target_pose:
self.arm.write_endpose_canfd(target_pose)
self.last_endpose = target_pose
self.arm.write_gripper(action['gripper'])
# result = self.arm.rmarm.rm_movej_p(target_pose, 100, 0, 0, 0)
self.arm.rmarm.rm_movep_follow(target_pose)
self.arm.rmarm.rm_set_gripper_position(action['gripper'], False, 2)
ret = self.arm.rmarm.rm_get_current_arm_state()
target_joints = ret[1].get('joint', self.arm.init_joint_position)
target_joints = self.arm.read_current_arm_joint_state()
self.joint_queue.append(list(self.arm.read().values()))
self.teleop.update_joint_state(target_joints)
self.logs["write_pos_dt_s"] = time.perf_counter() - before_write_t
# print('-'*80)
# print('mode: ', action['control_mode'])
# print('state: ', list(state.values()))
# print('action: ', target_joints)
# print('cache[0]: ', self.joint_queue[0])
# print('cache[-1]: ', self.joint_queue[-1])
# print('time: ', time.perf_counter() - before_write_t)
# print('-'*80)
# time.sleep(1)
if not record_data:
return

View File

@@ -0,0 +1,456 @@
"""
Teleoperation Realman with a PS5 controller and LLM interaction
"""
import time
import torch
import numpy as np
import logging
from typing import Optional, Tuple, Dict
from dataclasses import dataclass, field, replace
from collections import deque
import signal
import sys
from lerobot.common.robot_devices.teleop.realman_aloha_dual import HybridController
from lerobot.common.robot_devices.motors.utils import get_motor_names, make_motors_buses_from_configs
from lerobot.common.robot_devices.cameras.utils import make_cameras_from_configs
from lerobot.common.robot_devices.utils import RobotDeviceAlreadyConnectedError, RobotDeviceNotConnectedError
from lerobot.common.robot_devices.robots.configs import RealmanDualRobotConfig
from lerobot.common.robot_devices.robots.utils import (
ask_llm,
extract_json_from_response,
create_keyboard_listener,
start_keyboard_listener,
stop_keyboard_listener,
speak_async,
handle_llm_interaction_with_images
)
class RealmanDualRobot:
"""RealmanDual机器人控制类支持双臂操作和LLM交互"""
def __init__(self, config: RealmanDualRobotConfig | None = None, **kwargs):
if config is None:
config = RealmanDualRobotConfig()
# 配置初始化
self.config = replace(config, **kwargs)
self.robot_type = self.config.type
self.inference_time = self.config.inference_time
# 硬件初始化
self.cameras = make_cameras_from_configs(self.config.cameras)
self.piper_motors = make_motors_buses_from_configs(self.config.follower_arm)
self.arm = self.piper_motors['main']
# 控制系统初始化
self._initialize_teleop()
self._initialize_state()
self._initialize_keyboard_interface()
# 状态标志
self._shutdown_flag = False
self._llm_triggered = False
def _initialize_teleop(self):
"""初始化遥操作控制器"""
self.init_info = {
'init_joint': self.arm.init_joint_position,
'init_pose': self.arm.init_pose,
'max_gripper': self.config.max_gripper,
'min_gripper': self.config.min_gripper,
'servo_config_file': self.config.servo_config_file,
'end_control_info': {
'left': self.config.left_end_control_guid,
'right': self.config.right_end_control_guid
}
}
if not self.inference_time:
self.teleop = HybridController(self.init_info)
else:
self.teleop = None
def _initialize_state(self):
"""初始化状态管理"""
self.joint_queue = deque(maxlen=2)
self.last_endpose = self.arm.init_pose
self.logs = {}
self.is_connected = False
def _initialize_keyboard_interface(self):
"""初始化键盘交互接口"""
self.w_pressed = False # W键触发LLM
self.q_pressed = False # Q键退出
self.keyboard_listener = None
self._start_keyboard_listener()
def _start_keyboard_listener(self):
"""启动键盘监听器"""
def on_key_press(key_char):
if key_char == 'w':
self.w_pressed = True
print("检测到W键按下")
elif key_char == 'q':
self.q_pressed = True
print("检测到Q键按下")
self.keyboard_listener = create_keyboard_listener(on_key_press)
success = start_keyboard_listener(self.keyboard_listener)
if success:
print("键盘监听器启动成功 (W键:调用LLM, Q键:退出)")
else:
print("键盘监听器启动失败")
def _read_robot_state(self) -> dict:
"""读取机器人状态"""
before_read_t = time.perf_counter()
from copy import deepcopy
state = deepcopy(self.arm.read())
self.logs["read_pos_dt_s"] = time.perf_counter() - before_read_t
return state
def _execute_action(self, action: dict, state: dict):
"""执行机器人动作"""
before_write_t = time.perf_counter()
if action['control_mode'] == 'joint':
pass
else:
if list(action['pose'].values()) != list(state['pose'].values()):
pose = list(action['pose'].values())
self.arm.write_endpose_canfd(pose)
elif list(action['joint'].values()) != list(state['joint'].values()):
target_joint = list(action['joint'].values())
self.arm.write(target_joint)
self.logs["write_pos_dt_s"] = time.perf_counter() - before_write_t
def _prepare_record_data(self) -> Tuple[Dict, Dict]:
"""准备记录数据 - 保持原有逻辑"""
if len(self.joint_queue) < 2:
return {}, {}
state = torch.as_tensor(list(self.joint_queue[0]), dtype=torch.float32)
action = torch.as_tensor(list(self.joint_queue[-1]), dtype=torch.float32)
# 捕获图像
images = self._capture_images()
# 构建输出字典
obs_dict = {
"observation.state": state,
**{f"observation.images.{name}": img for name, img in images.items()}
}
action_dict = {"action": action}
return obs_dict, action_dict
def _update_state_queue(self):
"""更新状态队列"""
current_state = self.arm.read()['joint']
current_state_lst = []
for data in current_state:
if "joint" in data:
current_state_lst.append(current_state[data] / 180)
elif "gripper" in data:
current_state_lst.append((current_state[data]-500)/500)
self.joint_queue.append(current_state_lst)
def _capture_images(self) -> Dict[str, torch.Tensor]:
"""捕获摄像头图像"""
images = {}
for name, camera in self.cameras.items():
before_camread_t = time.perf_counter()
image = camera.async_read()
images[name] = torch.from_numpy(image)
self.logs[f"read_camera_{name}_dt_s"] = camera.logs["delta_timestamp_s"]
self.logs[f"async_read_camera_{name}_dt_s"] = time.perf_counter() - before_camread_t
return images
def _handle_llm_interaction(self, obs_dict: Dict) -> bool:
"""处理LLM交互逻辑"""
print("[W键已按下] 正在准备数据并调用LLM...")
# 提取图像数据
camera_images = {}
for key, value in obs_dict.items():
if key.startswith("observation.images."):
camera_name = key.replace("observation.images.", "")
camera_images[camera_name] = value.cpu().numpy()
# 使用utils中的函数处理LLM交互
success, response = handle_llm_interaction_with_images(
"将超声仪左下角试管架上的蓝色试管移动到超声仪中",
camera_images
)
return success
def connect(self) -> None:
"""连接机器人和摄像头"""
if self.is_connected:
raise RobotDeviceAlreadyConnectedError(
"RealmanArm is already connected. Do not run `robot.connect()` twice."
)
# 连接机械臂
self.arm.connect(enable=True)
print("RealmanArm conneted")
# 连接摄像头
for name in self.cameras:
self.cameras[name].connect()
self.is_connected = self.is_connected and self.cameras[name].is_connected
print(f"camera {name} conneted")
print("All connected")
self.is_connected = True
self.run_calibration()
def disconnect(self) -> None:
"""断开连接"""
if self._shutdown_flag:
return
self._shutdown_flag = True
try:
# 停止键盘监听器
stop_keyboard_listener(self.keyboard_listener)
print("键盘监听器已停止")
# 停止遥操作控制器
if hasattr(self, 'teleop') and self.teleop and not self.inference_time:
self.teleop.stop()
print("遥操作控制器已停止")
# 断开机械臂连接
if hasattr(self, 'arm'):
try:
self.arm.safe_disconnect()
print("RealmanArm 安全断开连接")
time.sleep(2)
self.arm.connect(enable=False)
print("RealmanArm 已禁用")
except Exception as e:
print(f"断开机械臂连接时出错: {e}")
# 断开摄像头连接
if len(self.cameras) > 0:
for name, cam in self.cameras.items():
try:
cam.disconnect()
print(f"Camera {name} 已断开连接")
except Exception as e:
print(f"断开相机 {name} 时出错: {e}")
self.is_connected = False
print("所有设备已断开连接")
except Exception as e:
print(f"断开连接时发生错误: {e}")
def run_calibration(self):
"""运行标定"""
if not self.is_connected:
raise ConnectionError()
self.arm.apply_calibration()
if not self.inference_time:
self.teleop.reset()
def teleop_step(self, record_data=False) -> None | tuple[dict[str, torch.Tensor], dict[str, torch.Tensor]]:
"""遥操作步骤 - 保持原有数据记录逻辑添加LLM交互"""
if not self.is_connected:
raise ConnectionError()
if self.teleop is None and self.inference_time:
self.teleop = HybridController(self.init_info)
try:
# 检查退出条件
if self.q_pressed:
print("检测到Q键任务终止...")
speak_async("任务已终止")
raise KeyboardInterrupt("用户请求退出")
# 执行基础遥操作
state = self._read_robot_state()
action = self.teleop.get_action(state)
self._execute_action(action, state)
# 更新状态队列
self._update_state_queue()
time.sleep(0.019) # 50Hz
# 处理数据记录
if record_data:
data = self._prepare_record_data()
# 处理LLM交互请求只有在有有效数据时才处理
if data[0] and self.w_pressed: # 如果有有效数据且W键被按下
self.w_pressed = False # 重置标志位
self._llm_triggered = True
success = self._handle_llm_interaction(data[0])
if not success:
print("LLM交互处理失败")
# 如果没有有效数据,创建默认数据
if not data[0]:
# 创建默认的观测数据
if len(self.joint_queue) > 0:
state_tensor = torch.as_tensor(list(self.joint_queue[-1]), dtype=torch.float32)
else:
state_tensor = torch.zeros(14, dtype=torch.float32) # 根据你的机器人调整维度
# 捕获当前图像
images = self._capture_images()
obs_dict = {
"observation.state": state_tensor,
**{f"observation.images.{name}": img for name, img in images.items()}
}
action_dict = {"action": state_tensor} # 使用相同的状态作为动作
data = (obs_dict, action_dict)
return data
return None
except KeyboardInterrupt:
# 重新抛出键盘中断,让上层处理
raise
except Exception as e:
logging.error(f"遥操作步骤失败: {e}")
# 即使出错在record_data=True时也要返回有效数据
if record_data:
# 创建紧急默认数据
state_tensor = torch.zeros(14, dtype=torch.float32) # 根据你的机器人调整维度
images = {}
try:
images = self._capture_images()
except:
# 如果连图像都无法捕获,创建空图像
for camera_name in self.cameras.keys():
images[camera_name] = torch.zeros((480, 640, 3), dtype=torch.uint8)
obs_dict = {
"observation.state": state_tensor,
**{f"observation.images.{name}": img for name, img in images.items()}
}
action_dict = {"action": state_tensor}
return obs_dict, action_dict
return None
def send_action(self, action: torch.Tensor) -> torch.Tensor:
"""发送动作到机器人"""
if not self.is_connected:
raise RobotDeviceNotConnectedError(
"Piper is not connected. You need to run `robot.connect()`."
)
target_joints = action.tolist()
len_joint = len(target_joints) - 1
target_joints = [target_joints[i]*180 for i in range(len_joint)] + [target_joints[-1]]
target_joints[-1] = int(target_joints[-1]*500 + 500)
self.arm.write(target_joints)
return action
def capture_observation(self) -> dict:
"""捕获当前观测"""
if not self.is_connected:
raise RobotDeviceNotConnectedError(
"Piper is not connected. You need to run `robot.connect()`."
)
before_read_t = time.perf_counter()
state = self.arm.read()
self.logs["read_pos_dt_s"] = time.perf_counter() - before_read_t
state = torch.as_tensor(list(state.values()), dtype=torch.float32)
# 读取图像
images = {}
for name in self.cameras:
before_camread_t = time.perf_counter()
images[name] = self.cameras[name].async_read()
images[name] = torch.from_numpy(images[name])
self.logs[f"read_camera_{name}_dt_s"] = self.cameras[name].logs["delta_timestamp_s"]
self.logs[f"async_read_camera_{name}_dt_s"] = time.perf_counter() - before_camread_t
# 构建观测字典
obs_dict = {}
obs_dict["observation.state"] = state
for name in self.cameras:
obs_dict[f"observation.images.{name}"] = images[name]
return obs_dict
def teleop_safety_stop(self):
"""遥操作安全停止"""
self.run_calibration()
@property
def camera_features(self) -> dict:
"""获取摄像头特征"""
cam_ft = {}
for cam_key, cam in self.cameras.items():
key = f"observation.images.{cam_key}"
cam_ft[key] = {
"shape": (cam.height, cam.width, cam.channels),
"names": ["height", "width", "channels"],
"info": None,
}
return cam_ft
@property
def motor_features(self) -> dict:
"""获取电机特征"""
action_names = get_motor_names(self.piper_motors)
state_names = get_motor_names(self.piper_motors)
return {
"action": {
"dtype": "float32",
"shape": (len(action_names),),
"names": action_names,
},
"observation.state": {
"dtype": "float32",
"shape": (len(state_names),),
"names": state_names,
},
}
@property
def has_camera(self):
return len(self.cameras) > 0
@property
def num_cameras(self):
return len(self.cameras)
def __del__(self):
"""析构函数"""
try:
if not self._shutdown_flag:
self.disconnect()
except:
pass
def signal_handler(signum, frame):
"""信号处理器"""
print("\n收到中断信号,正在安全退出...")
sys.exit(0)
if __name__ == '__main__':
signal.signal(signal.SIGINT, signal_handler)
robot = None
try:
robot = RealmanDualRobot()
robot.connect()
print("RealmanDual 机器人控制已启动")
print("操作说明:")
print(" - 使用手柄进行遥操作控制")
print(" - 按 W 键调用LLM分析当前场景")
print(" - 按 Q 键:安全退出程序")
print(" - Ctrl+C强制退出")
print("\n等待操作...")
while True:
result = robot.teleop_step(record_data=True)
if result is None:
continue
# 这里可以处理记录的数据
except KeyboardInterrupt:
print("\n收到键盘中断信号")
except Exception as e:
print(f"程序运行出错: {e}")
finally:
if robot:
try:
robot.disconnect()
except:
pass
print("程序已完全退出")

View File

@@ -11,9 +11,8 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Protocol
from typing import Protocol, Dict
# Robot configuration imports
from lerobot.common.robot_devices.robots.configs import (
AlohaRobotConfig,
KochBimanualRobotConfig,
@@ -25,30 +24,31 @@ from lerobot.common.robot_devices.robots.configs import (
So100RobotConfig,
So101RobotConfig,
StretchRobotConfig,
RealmanRobotConfig
RealmanRobotConfig,
RealmanDualRobotConfig
)
# Added library imports for LLM interaction
import os
import base64
from io import BytesIO
import json
from openai import OpenAI
from PIL import Image
import numpy as np
import torch
def get_arm_id(name, arm_type):
"""Returns the string identifier of a robot arm. For instance, for a bimanual manipulator
like Aloha, it could be left_follower, right_follower, left_leader, or right_leader.
"""
"""Returns the string identifier of a robot arm."""
return f"{name}_{arm_type}"
class Robot(Protocol):
# TODO(rcadene, aliberts): Add unit test checking the protocol is implemented in the corresponding classes
robot_type: str
features: dict
cameras: Dict
def connect(self): ...
def run_calibration(self): ...
def teleop_step(self, record_data=False): ...
def capture_observation(self): ...
def capture_observation(self) -> Dict: ...
def send_action(self, action): ...
def disconnect(self): ...
def make_robot_config(robot_type: str, **kwargs) -> RobotConfig:
if robot_type == "aloha":
return AlohaRobotConfig(**kwargs)
@@ -68,32 +68,319 @@ def make_robot_config(robot_type: str, **kwargs) -> RobotConfig:
return LeKiwiRobotConfig(**kwargs)
elif robot_type == 'realman':
return RealmanRobotConfig(**kwargs)
elif robot_type == 'realman_dual':
return RealmanDualRobotConfig(**kwargs)
else:
raise ValueError(f"Robot type '{robot_type}' is not available.")
def make_robot_from_config(config: RobotConfig):
if isinstance(config, ManipulatorRobotConfig):
from lerobot.common.robot_devices.robots.manipulator import ManipulatorRobot
return ManipulatorRobot(config)
elif isinstance(config, LeKiwiRobotConfig):
from lerobot.common.robot_devices.robots.mobile_manipulator import MobileManipulator
return MobileManipulator(config)
elif isinstance(config, RealmanRobotConfig):
from lerobot.common.robot_devices.robots.realman import RealmanRobot
return RealmanRobot(config)
elif isinstance(config, RealmanDualRobotConfig):
from lerobot.common.robot_devices.robots.realman_dual import RealmanDualRobot
return RealmanDualRobot(config)
else:
from lerobot.common.robot_devices.robots.stretch import StretchRobot
return StretchRobot(config)
def make_robot(robot_type: str, **kwargs) -> Robot:
config = make_robot_config(robot_type, **kwargs)
return make_robot_from_config(config)
# 全局变量用于管理会话历史和API客户端
conversation_history = []
conversation_client = None
def reset_conversation_history():
"""清空对话历史,开始一个新任务。"""
global conversation_history
conversation_history = []
print("对话历史已重置。")
def extract_json_from_response(response: str) -> dict:
"""
从LLM响应中提取JSON格式的指令 - 从realman_dual.py移动过来
Args:
response: LLM的原始响应文本
Returns:
dict: 解析后的JSON指令字典
"""
try:
# 尝试直接解析整个响应
return json.loads(response)
except:
# 尝试从响应中查找JSON部分
import re
json_pattern = r'\{[^{}]*\}'
matches = re.findall(json_pattern, response)
for match in matches:
try:
return json.loads(match)
except:
continue
# 如果无法解析JSON返回默认格式
return {
"action": "unknown",
"description": response,
"status": "parse_error"
}
def ask_llm(query: str, state: dict):
"""
向大型语言模型发送查询,并获取下一步操作指令。
优化版本只保留最新的4张图片避免Token超限
"""
global conversation_client
global conversation_history
# 将NumPy数组转换为Base64编码的图片
def numpy_to_base64(img_array):
if img_array.dtype != np.uint8:
if img_array.max() <= 1.0:
img_array = (img_array * 255).astype(np.uint8)
else:
img_array = img_array.astype(np.uint8)
image = Image.fromarray(img_array)
buffered = BytesIO()
image.save(buffered, format="PNG")
img_base64 = base64.b64encode(buffered.getvalue()).decode('utf-8')
return img_base64
# 初始化API客户端
api_key = "sk-H5FY8bQn6ZwBCja56280C3A4C8824017A4CdB683Dc990e35"
base_url = "https://api.apiyi.com/v1"
# if not api_key:
# raise ValueError("API Key为空请直接在代码中填写。")
if conversation_client is None:
conversation_client = OpenAI(api_key=api_key, base_url=base_url)
# 仅在对话历史为空时添加系统提示
if not conversation_history:
system_prompt = f"""
你是一个聪明的双臂机器人助手:
**机器人配置:**
- 左臂:配备夹爪,可以抓取物体
- 右臂:没有夹爪,主要用于辅助定位和观察
- 四个摄像头high camera(俯视角)、front camera(正视角)、left camera(左臂视角)、right camera(右臂视角)
**任务目标:** {query}
**工作流程:**
1. 我向你展示当前4个摄像头的画面
2. 你分析场景,给出下一步具体操作指令
3. 我执行你的指令后,再次更新画面
4. 重复w此过程直到完成任务
**重要要求:**
- 每次只给出ONE STEP最关键的操作指令
- 指令要具体明确,便于执行(如"将左臂夹爪移动到试管正上方5cm处准备下降抓取"
- 当视角不清晰时,要求调整摄像头位置
- 左臂负责抓取,右臂负责辅助观察和定位
- 给出的指令必须可执行,避免模糊描述
**输出格式要求:**
- 使用纯文本输出不要使用任何Markdown格式符号
- 不要使用星号、井号、下划线等格式化符号
- 直接给出简洁分析和具体操作指令
- 输出内容适合语音播报
**输出格式:**
简洁分析 + 具体操作指令
"""
conversation_history.append({"role": "system", "content": system_prompt})
# 如果对话历史中有图片消息,只保留最近的一轮对话(系统消息 + 最后一轮用户-助手对话)
if len(conversation_history) > 3: # 系统消息 + 用户消息 + 助手回复
# 保留系统消息
system_msg = conversation_history[0]
# 只保留最后一轮对话(最后的用户消息和助手回复)
recent_messages = conversation_history[-2:] # 最后2条消息
conversation_history = [system_msg] + recent_messages
print("清理对话历史避免Token超限")
# 构建本次用户输入的消息内容
user_content = [{"type": "text", "text": "这是当前的4个摄像头画面请根据任务目标分析场景并给出下一步具体操作指令。"}]
# 添加图片,按照指定顺序
camera_order = ['high', 'front', 'left', 'right']
for camera_name in camera_order:
if camera_name in state:
img_base64 = numpy_to_base64(state[camera_name])
user_content.append({
"type": "image_url",
"image_url": {
"url": f"data:image/png;base64,{img_base64}",
"detail": "high" # 使用高清晰度分析
}
})
# 将当前用户输入添加到对话历史中
conversation_history.append({"role": "user", "content": user_content})
try:
completion = conversation_client.chat.completions.create(
model="claude-sonnet-4-20250514",
messages=conversation_history,
max_tokens=800,
temperature=0.3
)
response_message = completion.choices[0].message
response_content = response_message.content
# 将模型的响应添加到历史中
conversation_history.append(response_message)
print(f"机器人响应: {response_content}")
return response_content
except Exception as e:
error_msg = str(e)
print(f"调用LLM时出错: {error_msg}")
# Token超限时的特殊处理
if "token_limit_exceeded" in error_msg or "context_length_exceeded" in error_msg:
print("检测到Token超限清空对话历史重新开始...")
# 清空除系统消息外的所有历史
if conversation_history:
conversation_history = [conversation_history[0]] # 只保留系统消息
# 移除刚刚添加的用户消息,准备重试
if conversation_history and len(conversation_history) > 1:
conversation_history.pop()
return "由于对话历史过长已重置对话。请再次按Enter键继续。"
# 其他错误时也要清理最后的用户输入
if conversation_history and conversation_history[-1]["role"] == "user":
conversation_history.pop()
return None
def extract_json_from_response(response: str) -> dict:
"""
从LLM响应中提取结构化指令 - 优化版本
"""
try:
# 尝试直接解析整个响应
return json.loads(response)
except:
# 如果无法解析JSON创建结构化的响应
return {
"action": "move_arm", # 默认动作类型
"description": response.strip(),
"arm": "left", # 默认使用左臂
"target": "unknown",
"status": "ready"
}
from pynput import keyboard
import threading
def create_keyboard_listener(on_key_press_callback):
"""
创建键盘监听器
Args:
on_key_press_callback: 按键回调函数,接收参数 (key_char)
Returns:
keyboard.Listener: 键盘监听器对象
"""
def on_press(key):
try:
if hasattr(key, 'char') and key.char:
on_key_press_callback(key.char.lower())
except AttributeError:
pass
return keyboard.Listener(on_press=on_press)
def start_keyboard_listener(listener):
"""启动键盘监听器"""
try:
listener.start()
return True
except Exception as e:
print(f"键盘监听器启动失败: {e}")
return False
def stop_keyboard_listener(listener):
"""停止键盘监听器"""
try:
if listener and listener.is_alive():
listener.stop()
return True
except Exception:
pass
return False
def speak_async(text: str):
"""异步语音播报"""
from lerobot.common.utils.utils import say
def speak_thread():
try:
print(f"开始语音播报: {text}")
say(text, blocking=True)
print("语音播报完成")
except Exception as e:
print(f"语音播报失败: {e}")
thread = threading.Thread(target=speak_thread, daemon=True)
thread.start()
def handle_llm_interaction_with_images(query: str, camera_images: dict):
"""
处理LLM交互接收相机图像字典
Args:
query: 用户查询文本
camera_images: 相机图像字典 {camera_name: numpy_array}
Returns:
tuple: (success: bool, response: str)
"""
if len(camera_images) != 4:
print(f"警告: 期望4个相机实际获取到{len(camera_images)}")
return False, "相机数量不足"
print("图像提取成功正在向LLM发送请求...")
try:
response = ask_llm(query, camera_images)
if response:
if "已重置对话" in response:
print("对话历史已重置,请再次按键继续")
return True, response
instruction = extract_json_from_response(response)
print("\n下一步指令:")
print(f"操作类型: {instruction.get('action', 'unknown')}")
print(f"详细描述: {instruction.get('description', '无描述')}")
# 准备语音播报内容
if "description" in instruction:
desc = instruction['description']
if len(desc) > 100:
sentences = desc.split('')
key_instruction = sentences[-2] if len(sentences) > 1 else desc[:50]
else:
key_instruction = desc
speech_text = f"下一步操作:{key_instruction}"
print(f"即将播报: {speech_text}")
speak_async(speech_text)
return True, response
else:
print("LLM请求失败可能是网络问题")
speak_async("LLM请求失败")
return False, "LLM请求失败"
except Exception as e:
print(f"LLM交互出错: {e}")
print("建议1.检查网络连接 2.检查API密钥 3.稍后重试")
speak_async("LLM交互失败")
return False, f"LLM交互出错: {e}"

View File

@@ -0,0 +1,18 @@
import pygame
def find_controller_index():
# 获取所有 pygame 控制器的设备路径
pygame_joysticks = {}
for i in range(pygame.joystick.get_count()):
joystick = pygame.joystick.Joystick(i)
joystick.init()
pygame_joysticks[joystick.get_guid()] = {
'index': i,
'device_name': joystick.get_name()
}
return pygame_joysticks
if __name__ == '__main__':
print(find_controller_index())

View File

@@ -1,461 +1,421 @@
import pygame
import threading
import time
import serial
import binascii
import logging
import yaml
from typing import Dict
from Robotic_Arm.rm_robot_interface import *
from dataclasses import dataclass
from lerobot.common.robot_devices.teleop.find_gamepad import find_controller_index
from lerobot.common.robot_devices.teleop.servo_server import ServoArmServer
class ServoArm:
def __init__(self, config_file="config.yaml"):
"""初始化机械臂的串口连接并发送初始数据。
Args:
config_file (str): 配置文件的路径。
"""
self.config = self._load_config(config_file)
self.port = self.config["port"]
self.baudrate = self.config["baudrate"]
self.hex_data = self.config["hex_data"]
self.arm_axis = self.config.get("arm_axis", 7)
class RealmanAlohaMaster:
def __init__(self, config):
self.config = config
self._initialize_master_arm()
def _initialize_master_arm(self):
"""初始化主控臂"""
try:
self.serial_conn = serial.Serial(self.port, self.baudrate, timeout=0)
self.bytes_to_send = binascii.unhexlify(self.hex_data.replace(" ", ""))
self.serial_conn.write(self.bytes_to_send)
time.sleep(1)
self.connected = True
logging.info(f"串口连接成功: {self.port}")
self.master_dual_arm = ServoArmServer(self.config.config_file)
except Exception as e:
logging.error(f"串口连接失败: {e}")
self.connected = False
logging.error(f"初始化主控臂失败: {e}")
raise
def _load_config(self, config_file):
"""加载配置文件。
Args:
config_file (str): 配置文件的路径。
Returns:
dict: 配置文件内容。
"""
try:
with open(config_file, "r") as file:
config = yaml.safe_load(file)
return config
except Exception as e:
logging.error(f"配置文件加载失败: {e}")
# 返回默认配置
return {
"port": "/dev/ttyUSB0",
"baudrate": 460800,
"hex_data": "55 AA 02 00 00 67",
"arm_axis": 6
}
def _bytes_to_signed_int(self, byte_data):
"""将字节数据转换为有符号整数。
Args:
byte_data (bytes): 字节数据。
Returns:
int: 有符号整数。
"""
return int.from_bytes(byte_data, byteorder="little", signed=True)
def _parse_joint_data(self, hex_received):
"""解析接收到的十六进制数据并提取关节数据。
Args:
hex_received (str): 接收到的十六进制字符串数据。
Returns:
dict: 解析后的关节数据。
"""
logging.debug(f"hex_received: {hex_received}")
joints = {}
for i in range(self.arm_axis):
start = 14 + i * 10
end = start + 8
joint_hex = hex_received[start:end]
joint_byte_data = bytearray.fromhex(joint_hex)
joint_value = self._bytes_to_signed_int(joint_byte_data) / 10000.0
joints[f"joint_{i+1}"] = joint_value
grasp_start = 14 + self.arm_axis*10
grasp_hex = hex_received[grasp_start:grasp_start+8]
grasp_byte_data = bytearray.fromhex(grasp_hex)
# 夹爪进行归一化处理
grasp_value = self._bytes_to_signed_int(grasp_byte_data)/1000
joints["grasp"] = grasp_value
return joints
def get_joint_actions(self):
"""从串口读取数据并解析关节动作。
Returns:
dict: 包含关节数据的字典。
"""
if not self.connected:
return {}
try:
self.serial_conn.write(self.bytes_to_send)
bytes_received = self.serial_conn.read(self.serial_conn.inWaiting())
if len(bytes_received) == 0:
return {}
hex_received = binascii.hexlify(bytes_received).decode("utf-8").upper()
actions = self._parse_joint_data(hex_received)
return actions
except Exception as e:
logging.error(f"读取串口数据错误: {e}")
return {}
def set_gripper_action(self, action):
"""设置夹爪动作。
Args:
action (int): 夹爪动作值。
"""
if not self.connected:
return
try:
action = int(action * 1000)
action_bytes = action.to_bytes(4, byteorder="little", signed=True)
self.bytes_to_send = self.bytes_to_send[:74] + action_bytes + self.bytes_to_send[78:]
except Exception as e:
logging.error(f"设置夹爪动作错误: {e}")
def close(self):
"""关闭串口连接"""
if self.connected and hasattr(self, 'serial_conn'):
self.serial_conn.close()
self.connected = False
logging.info("串口连接已关闭")
class HybridController:
def __init__(self, init_info):
# 初始化pygame和手柄
pygame.init()
pygame.joystick.init()
# 检查是否有连接的手柄
if pygame.joystick.get_count() == 0:
raise Exception("未检测到手柄")
# 初始化手柄
self.joystick = pygame.joystick.Joystick(0)
self.joystick.init()
# 摇杆死区
self.deadzone = 0.15
# 控制模式: True为关节控制主模式False为末端控制
self.joint_control_mode = True
# 精细控制模式
self.fine_control_mode = False
# 初始化末端姿态和关节角度
self.init_joint = init_info['init_joint']
self.init_pose = init_info.get('init_pose', [0]*6)
self.max_gripper = init_info['max_gripper']
self.min_gripper = init_info['min_gripper']
servo_config_file = init_info['servo_config_file']
self.joint = self.init_joint.copy()
self.pose = self.init_pose.copy()
self.pose_speeds = [0.0] * 6
self.joint_speeds = [0.0] * 6
self.tozero = False
# 主臂关节状态
self.master_joint_actions = {}
self.use_master_arm = False
# 末端位姿限制
self.pose_limits = [
(-0.800, 0.800), # X (m)
(-0.800, 0.800), # Y (m)
(-0.800, 0.800), # Z (m)
(-3.14, 3.14), # RX (rad)
(-3.14, 3.14), # RY (rad)
(-3.14, 3.14) # RZ (rad)
]
# 关节角度限制 (度)
self.joint_limits = [
(-180, 180), # joint 1
(-180, 180), # joint 2
(-180, 180), # joint 3
(-180, 180), # joint 4
(-180, 180), # joint 5
(-180, 180) # joint 6
]
# 控制参数
self.linear_step = 0.002 # 线性移动步长(m)
self.angular_step = 0.01 # 角度步长(rad)
# 夹爪状态和速度
self.gripper_speed = 10
self.gripper = self.min_gripper
# 初始化串口通信(主臂关节状态获取)
self.servo_arm = None
if servo_config_file:
try:
self.servo_arm = ServoArm(servo_config_file)
self.use_master_arm = True
logging.info("串口主臂连接成功,启用主从控制模式")
except Exception as e:
logging.error(f"串口主臂连接失败: {e}")
self.use_master_arm = False
# 启动更新线程
self.running = True
self.thread = threading.Thread(target=self.update_controller)
self.thread.start()
print("混合控制器已启动")
print("主控制模式: 关节控制")
if self.use_master_arm:
print("主从控制: 启用")
print("Back按钮: 切换控制模式(关节/末端)")
print("L3按钮: 切换精细控制模式")
print("Start按钮: 重置到初始位置")
def _apply_nonlinear_mapping(self, value):
"""应用非线性映射以提高控制精度"""
sign = 1 if value >= 0 else -1
return sign * (abs(value) ** 2)
def _normalize_angle(self, angle):
"""将角度归一化到[-π, π]范围内"""
import math
while angle > math.pi:
angle -= 2 * math.pi
while angle < -math.pi:
angle += 2 * math.pi
return angle
def update_controller(self):
while self.running:
try:
pygame.event.pump()
except Exception as e:
print(f"控制器错误: {e}")
self.stop()
continue
# 检查控制模式切换 (Back按钮)
if self.joystick.get_button(6): # Back按钮
self.joint_control_mode = not self.joint_control_mode
mode_str = "关节控制" if self.joint_control_mode else "末端位姿控制"
print(f"切换到{mode_str}模式")
time.sleep(0.3) # 防止多次触发
# 检查精细控制模式切换 (L3按钮)
if self.joystick.get_button(10): # L3按钮
self.fine_control_mode = not self.fine_control_mode
print(f"切换到{'精细' if self.fine_control_mode else '普通'}控制模式")
time.sleep(0.3) # 防止多次触发
# 检查重置按钮 (Start按钮)
if self.joystick.get_button(7): # Start按钮
print("重置机械臂到初始位置...")
# print("init_joint", self.init_joint.copy())
self.tozero = True
self.joint = self.init_joint.copy()
self.pose = self.init_pose.copy()
self.pose_speeds = [0.0] * 6
self.joint_speeds = [0.0] * 6
self.gripper_speed = 10
self.gripper = self.min_gripper
print("机械臂已重置到初始位置")
time.sleep(0.3) # 防止多次触发
# 从串口获取主臂关节状态
if self.servo_arm and self.servo_arm.connected:
try:
self.master_joint_actions = self.servo_arm.get_joint_actions()
if self.master_joint_actions:
logging.debug(f"主臂关节状态: {self.master_joint_actions}")
except Exception as e:
logging.error(f"获取主臂状态错误: {e}")
self.master_joint_actions = {}
# print(self.master_joint_actions)
# 根据控制模式更新相应的控制逻辑
if self.joint_control_mode:
# 关节控制模式下优先使用主臂数据Xbox作为辅助
self.update_joint_control()
else:
# 末端控制模式使用Xbox控制
self.update_end_pose()
time.sleep(0.02)
# print('gripper:', self.gripper)
def update_joint_control(self):
"""更新关节角度控制 - 优先使用主臂数据"""
if self.use_master_arm and self.master_joint_actions:
# 主从控制模式:直接使用主臂的关节角度
try:
# 将主臂关节角度映射到从臂
for i in range(6): # 假设只有6个关节需要控制
joint_key = f"joint_{i+1}"
if joint_key in self.master_joint_actions:
# 直接使用主臂的关节角度(已经是度数)
self.joint[i] = self.master_joint_actions[joint_key]
# 应用关节限制
min_val, max_val = self.joint_limits[i]
self.joint[i] = max(min_val, min(max_val, self.joint[i]))
# print(self.joint)
logging.debug(f"主臂关节映射到从臂: {self.joint[:6]}")
except Exception as e:
logging.error(f"主臂数据映射错误: {e}")
# 如果有主臂夹爪数据,使用主臂夹爪状态
if self.use_master_arm and "grasp" in self.master_joint_actions:
self.gripper = self.master_joint_actions["grasp"] * 1000
self.joint[-1] = self.gripper
def update_end_pose(self):
"""更新末端位姿控制"""
# 根据控制模式调整步长
current_linear_step = self.linear_step * (0.1 if self.fine_control_mode else 1.0)
current_angular_step = self.angular_step * (0.1 if self.fine_control_mode else 1.0)
# 方向键控制XY
hat = self.joystick.get_hat(0)
hat_up = hat[1] == 1 # Y+
hat_down = hat[1] == -1 # Y-
hat_left = hat[0] == -1 # X-
hat_right = hat[0] == 1 # X+
# 右摇杆控制Z
right_y_raw = -self.joystick.get_axis(4)
# 左摇杆控制RZ
left_y_raw = -self.joystick.get_axis(1)
# 应用死区
right_y = 0.0 if abs(right_y_raw) < self.deadzone else right_y_raw
left_y = 0.0 if abs(left_y_raw) < self.deadzone else left_y_raw
# 计算各轴速度
self.pose_speeds[1] = current_linear_step if hat_up else (-current_linear_step if hat_down else 0.0) # Y
self.pose_speeds[0] = -current_linear_step if hat_left else (current_linear_step if hat_right else 0.0) # X
# 设置Z速度右摇杆Y轴控制
z_mapping = self._apply_nonlinear_mapping(right_y)
self.pose_speeds[2] = z_mapping * current_linear_step # Z
# L1/R1控制RX旋转
LB = self.joystick.get_button(4) # RX-
RB = self.joystick.get_button(5) # RX+
self.pose_speeds[3] = (-current_angular_step if LB else (current_angular_step if RB else 0.0))
# △/□控制RY旋转
triangle = self.joystick.get_button(2) # RY+
square = self.joystick.get_button(3) # RY-
self.pose_speeds[4] = (current_angular_step if triangle else (-current_angular_step if square else 0.0))
# 左摇杆Y轴控制RZ旋转
rz_mapping = self._apply_nonlinear_mapping(left_y)
self.pose_speeds[5] = rz_mapping * current_angular_step * 2 # RZ
# 夹爪控制(圈/叉)
circle = self.joystick.get_button(1) # 夹爪开
cross = self.joystick.get_button(0) # 夹爪关
if circle:
self.gripper = min(self.max_gripper, self.gripper + self.gripper_speed)
elif cross:
self.gripper = max(self.min_gripper, self.gripper - self.gripper_speed)
# 更新末端位姿
for i in range(6):
self.pose[i] += self.pose_speeds[i]
# 角度归一化处理
for i in range(3, 6):
self.pose[i] = self._normalize_angle(self.pose[i])
def update_joint_state(self, joint):
self.joint = joint
# self.tozero = False
def update_endpose_state(self, end_pose):
self.pose = end_pose
# self.tozero = False
def update_tozero_state(self, tozero):
self.tozero = tozero
def get_action(self) -> Dict:
"""获取当前控制命令"""
return {
'control_mode': 'joint' if self.joint_control_mode else 'end_pose',
'use_master_arm': self.use_master_arm,
'master_joint_actions': self.master_joint_actions,
'end_pose': {
'X': self.pose[0],
'Y': self.pose[1],
'Z': self.pose[2],
'RX': self.pose[3],
'RY': self.pose[4],
'RZ': self.pose[5],
},
'joint_angles': self.joint,
'gripper': int(self.gripper),
'tozero': self.tozero
"""获取控制动作"""
try:
master_joint_actions = self.master_dual_arm.get_joint_data()
return self._format_action(master_joint_actions)
except Exception as e:
logging.error(f"获取动作失败: {e}")
def _format_action(self, master_joint_actions: dict) -> dict:
"""格式化动作数据"""
master_controller_status = {
'left': master_joint_actions['left_controller_status'],
'right': master_joint_actions['right_controller_status']
}
return {
'control_mode': 'joint',
'master_joint_actions': master_joint_actions['dual_joint_actions'],
'left_joint_actions': master_joint_actions['left_joint_actions'][:-1],
'right_joint_actions': master_joint_actions['right_joint_actions'][:-1],
'left_gripper_actions': master_joint_actions['left_joint_actions'][-1], # 修复bug
'right_gripper_actions': master_joint_actions['right_joint_actions'][-1],
'master_controller_status': master_controller_status
}
def stop(self):
"""停止控制器"""
self.running = False
if self.thread.is_alive():
self.thread.join()
if self.servo_arm:
self.servo_arm.close()
pygame.quit()
print("混合控制器已退出")
try:
if hasattr(self, 'master_dual_arm') and self.master_dual_arm:
self.master_dual_arm.shutdown()
print("混合控制器已退出")
except Exception as e:
logging.error(f"停止控制器失败: {e}")
class DummyEndposeMaster:
def __init__(self, config):
# 初始化pygame
pygame.init()
pygame.joystick.init()
# 获取所有 USB 游戏控制器的信息
self.joysticks = find_controller_index()
print(self.joysticks)
self.control_info = config.end_control_info
left_stick = self._init_stick('left')
right_stick = self._init_stick('right')
self.controllers = [left_stick, right_stick]
def _init_stick(self, arm_name:str):
stick_info = {}
stick_info['index'] = self.joysticks[self.control_info[arm_name]]['index']
stick_info['guid'] = self.control_info[arm_name]
stick_info['name'] = f'{arm_name}'
device_name = self.joysticks[self.control_info[arm_name]]['device_name']
stick = XboxStick(stick_info) if "Xbox" in device_name else FlightStick(stick_info)
stick.start_polling()
return stick
def reset(self):
"""重置到初始状态"""
self.joint = self.init_joint.copy()
self.pose = self.init_pose.copy()
self.pose_speeds = [0.0] * 6
self.joint_speeds = [0.0] * 6
self.gripper_speed = 10
self.gripper = self.min_gripper
print("已重置到初始状态")
def get_action(self, state) -> Dict:
from copy import deepcopy
new_state = deepcopy(state)
gamepad_action = {}
xyz = []
rxryrz = []
gripper = []
"""获取控制动作"""
try:
for i, controller in enumerate(self.controllers):
# states = controller.get_raw_states()
gamepad_action.update(controller.get_control_signal(controller.name))
xyz += [f"{controller.name}_x", f"{controller.name}_y", f"{controller.name}_z"]
rxryrz += [f"{controller.name}_joint_4", f"{controller.name}_joint_5", f"{controller.name}_joint_6"]
gripper += [f"{controller.name}_gripper"]
for name in xyz:
new_state['pose'][name] += (gamepad_action[name] * gamepad_action['xyz_vel'] * gamepad_action[name.split('_')[0]+'_ratio'])
for name in gripper:
new_state['joint'][name] += int(gamepad_action[name] * gamepad_action['gripper_vel'] * gamepad_action[name.split('_')[0]+'_ratio'])
new_state['joint'][name] = min(990, max(0, new_state['joint'][name]))
for name in rxryrz:
new_state['joint'][name] += (gamepad_action[name] * gamepad_action['rxyz_vel'] * gamepad_action[name.split('_')[0]+'_ratio'])
new_state['control_mode'] = 'endpose'
return new_state
except Exception as e:
logging.error(f"获取动作失败: {e}")
def stop(self):
"""停止控制器"""
try:
# 停止轮询线程
for controller in self.controllers:
controller.stop_polling()
except Exception as e:
logging.error(f"停止控制器失败: {e}")
class ControllerBase:
def __init__(self, joystick_info: dict):
# 初始化手柄对象
self.joystick = pygame.joystick.Joystick(joystick_info['index'])
self.joystick.init()
self.name = joystick_info['name']
self.guid = joystick_info['guid']
# 存储所有控制器状态的字典
self.states = {
'buttons': [False] * self.joystick.get_numbuttons(), # 按钮状态
'axes': [0.0] * self.joystick.get_numaxes(), # 摇杆和轴状态
'hats': [(0, 0)] * self.joystick.get_numhats() # 舵状态
}
# deadzone
self.deadzone = 0.15
# validzone
self.validzone = 0.05
self.ratio = 1
self.gripper_vel = 100
self.rxyz_vel = 5
self.xyz_vel = 0.02
self.scale_up = 2
self.scale_down = 10
# 线程控制标志
self.running = False
def start_polling(self):
"""启动线程以轮询控制器状态"""
if not self.running:
self.running = True
self.thread = threading.Thread(target=self._poll_controller)
self.thread.start()
def stop_polling(self):
"""停止线程"""
if self.running:
self.running = False
self.thread.join()
def _poll_controller(self):
"""后台线程函数,用于轮询控制器状态"""
while self.running:
# 处理pygame事件
pygame.event.pump()
# 获取按钮状态
for i in range(self.joystick.get_numbuttons()):
self.states['buttons'][i] = self.joystick.get_button(i)
# 获取摇杆和轴状态(通常范围是 -1.0 到 1.0
for i in range(self.joystick.get_numaxes()):
self.states['axes'][i] = self.joystick.get_axis(i)
# 获取舵状态(通常返回一个元组 (x, y),值范围为 -1, 0, 1
for i in range(self.joystick.get_numhats()):
self.states['hats'][i] = self.joystick.get_hat(i)
# 控制轮询频率
time.sleep(0.01)
def get_raw_states(self):
"""获取当前控制器状态"""
return self.states
class FlightStick(ControllerBase):
def __init__(self, joystick_info):
super().__init__(joystick_info)
def get_x_control_signal(self):
x = 0
if self.states['axes'][0] > self.validzone:
x = 1
elif self.states['axes'][0] < -self.validzone:
x = -1
return x
def get_y_control_signal(self):
y = 0
if self.states['axes'][1] > self.validzone:
y = -1
elif self.states['axes'][1] < -self.validzone:
y = 1
return y
def get_z_control_signal(self):
z = 0
if self.states['buttons'][0]:
z = 1
elif self.states['buttons'][1]:
z = -1
return z
def get_gripper_control_signal(self):
gripper = 0
if self.states['buttons'][2] == 1:
gripper = 1
elif self.states['buttons'][3] == 1:
gripper = -1
return gripper
def get_ratio_control_signal(self):
ratio = self.ratio
if self.states['axes'][2] > 0.8:
ratio = self.ratio / self.scale_down
elif self.states['axes'][2] < -0.8:
ratio = self.ratio * self.scale_up
return ratio
def get_rx_control_signal(self):
rx = 0
if self.states['hats'][0][0] == -1:
rx = 1
elif self.states['hats'][0][0] == 1:
rx = -1
else:
rx = 0
return rx
def get_ry_control_signal(self):
ry = 0
if self.states['hats'][0][1] == 1:
ry = -1
elif self.states['hats'][0][1] == -1:
ry = 1
else:
ry = 0
return ry
def get_rz_control_signal(self):
rz = 0
if self.states['axes'][3] < -self.validzone:
rz = -1
elif self.states['axes'][3] > self.validzone:
rz = 1
else:
rz = 0
return rz
def get_control_signal(self, prefix: str = ""):
"""获取所有控制信号"""
return {
f'{prefix}_x': self.get_x_control_signal(),
f'{prefix}_y': self.get_y_control_signal(),
f'{prefix}_z': self.get_z_control_signal(),
f'{prefix}_joint_4': self.get_rx_control_signal(),
f'{prefix}_joint_5': self.get_ry_control_signal(),
f'{prefix}_joint_6': self.get_rz_control_signal(),
f'{prefix}_gripper': self.get_gripper_control_signal(),
f'{prefix}_ratio': self.get_ratio_control_signal(),
'gripper_vel': self.gripper_vel,
'rxyz_vel': self.rxyz_vel,
'xyz_vel': self.xyz_vel
}
class XboxStick(ControllerBase):
def __init__(self, joystick_info: dict):
super().__init__(joystick_info)
def get_x_control_signal(self):
"""获取 X 轴控制信号"""
x = 0
if self.states['hats'][0][0] == -1:
x = 1
elif self.states['hats'][0][0] == 1:
x = -1
return x
def get_y_control_signal(self):
"""获取 Y 轴控制信号"""
y = 0
if self.states['hats'][0][1] == 1:
y = -1
elif self.states['hats'][0][1] == -1:
y = 1
return y
def get_z_control_signal(self):
"""获取 Z 轴控制信号"""
z = 0
if self.states['axes'][4] > self.deadzone: # A 按钮
z = -1
elif self.states['axes'][4] < -self.deadzone: # B 按钮
z = 1
return z
def get_ratio_control_signal(self):
"""获取速度控制信号"""
ratio = self.ratio
if self.states['axes'][2] > 0.8: # LT 按钮
ratio = self.ratio * self.scale_up
elif self.states['axes'][5] > 0.8: # RT 按钮
ratio = self.ratio / self.scale_down
return ratio
def get_gripper_control_signal(self):
gripper = 0
if self.states['buttons'][0] == 1:
gripper = 1
elif self.states['buttons'][1] == 1:
gripper = -1
return gripper
def get_rx_control_signal(self):
"""获取 RX 轴控制信号"""
rx = 0
if self.states['axes'][0] > self.deadzone: # 左舵
rx = -1
elif self.states['axes'][0] < -self.deadzone: # 右舵
rx = 1
return rx
def get_ry_control_signal(self):
"""获取 RY 轴控制信号"""
ry = 0
if self.states['axes'][1] > self.deadzone: # 上舵
ry = 1
elif self.states['axes'][1] < -self.deadzone: # 下舵
ry = -1
return ry
def get_rz_control_signal(self):
"""获取 RZ 轴控制信号"""
rz = 0
if self.states['buttons'][4] == 1: # 左摇杆
rz = 1
elif self.states['buttons'][5] == 1: # 右摇杆
rz = -1
return rz
def get_control_signal(self, prefix: str = ""):
"""获取所有控制信号"""
return {
f'{prefix}_x': self.get_x_control_signal(),
f'{prefix}_y': self.get_y_control_signal(),
f'{prefix}_z': self.get_z_control_signal(),
f'{prefix}_joint_4': self.get_rx_control_signal(),
f'{prefix}_joint_5': self.get_ry_control_signal(),
f'{prefix}_joint_6': self.get_rz_control_signal(),
f'{prefix}_gripper': self.get_gripper_control_signal(),
f'{prefix}_ratio': self.get_ratio_control_signal(),
'gripper_vel': self.gripper_vel,
'rxyz_vel': self.rxyz_vel,
'xyz_vel': self.xyz_vel
}
@dataclass
class ControllerConfig:
"""控制器配置"""
init_joint: list
init_pose: list
max_gripper: int
min_gripper: int
config_file: str
end_control_info: dict
def parse_init_info(init_info: dict) -> ControllerConfig:
"""解析初始化信息"""
return ControllerConfig(
init_joint=init_info['init_joint'],
init_pose=init_info.get('init_pose', [0]*12),
max_gripper=init_info['max_gripper'],
min_gripper=init_info['min_gripper'],
config_file=init_info['servo_config_file'],
end_control_info=init_info['end_control_info']
)
# 使用示例
if __name__ == "__main__":
# 初始化睿尔曼机械臂
arm = RoboticArm(rm_thread_mode_e.RM_TRIPLE_MODE_E)
# 创建机械臂连接
handle = arm.rm_create_robot_arm("192.168.3.18", 8080)
print(f"机械臂连接ID: {handle.id}")
init_pose = arm.rm_get_current_arm_state()[1]['pose']
with open('/home/maic/LYT/lerobot/lerobot/common/robot_devices/teleop/realman_mix.yaml', "r") as file:
config = yaml.safe_load(file)
config['init_pose'] = init_pose
arm_controller = HybridController(config)
try:
while True:
print(arm_controller.get_action())
time.sleep(0.1)
except KeyboardInterrupt:
arm_controller.stop()
config = {
'init_joint': {'joint': [-170, 90, 0, 90, 120, 0, 10, 170, 90, 0, -90, 120, 0, 10]},
'init_pose': {},
'max_gripper': {},
'min_gripper': {},
'servo_config_file': {},
'end_control_info': {'left': "0300b14bff1100003708000010010000" , 'right': '0300509d5e040000120b000009050000'}
}
config = parse_init_info(config)
endpose_arm = DummyEndposeMaster(config)
while True:
gamepad_action = {}
xyz = []
for i, controller in enumerate(endpose_arm.controllers):
# states = controller.get_raw_states()
gamepad_action.update(controller.get_control_signal(controller.name))
xyz += [f"{controller.name}_x", f"{controller.name}_y", f"{controller.name}_z"]
time.sleep(1)
print(gamepad_action)

View File

@@ -0,0 +1,76 @@
import time
import logging
from typing import Dict
from dataclasses import dataclass
from lerobot.common.robot_devices.teleop.gamepad import RealmanAlohaMaster, DummyEndposeMaster
@dataclass
class ControllerConfig:
"""控制器配置"""
init_joint: list
init_pose: list
max_gripper: int
min_gripper: int
config_file: str
end_control_info: dict
class HybridController:
def __init__(self, init_info):
self.config = self._parse_init_info(init_info)
self.joint = self.config.init_joint.copy()
self.pose = self.config.init_pose.copy()
self.joint_arm = RealmanAlohaMaster(self.config)
self.endpose_arm = DummyEndposeMaster(self.config)
def _parse_init_info(self, init_info: dict) -> ControllerConfig:
"""解析初始化信息"""
return ControllerConfig(
init_joint=init_info['init_joint'],
init_pose=init_info.get('init_pose', [0]*12),
max_gripper=init_info['max_gripper'],
min_gripper=init_info['min_gripper'],
config_file=init_info['servo_config_file'],
end_control_info=init_info['end_control_info']
)
def get_action(self, state) -> Dict:
"""获取控制动作"""
try:
endpose_action = self.endpose_arm.get_action(state)
return endpose_action
# return self.joint_arm.get_action()
except Exception as e:
logging.error(f"获取动作失败: {e}")
def stop(self):
self.joint_arm.stop()
def reset(self):
"""重置控制器"""
self.joint = self.config.init_joint.copy()
self.pose = self.config.init_pose.copy()
self.joint_control_mode = True
# 使用示例
if __name__ == "__main__":
init_info = {
'init_joint': [-175, 90, 90, 45, 90, -90, 10, 175, 90, 90, -45, 90, 90, 10],
'init_pose': [[-0.0305, 0.125938, 0.13153, 3.141, 0.698, -1.57, -0.030486, -0.11487, 0.144707, 3.141, 0.698, 1.57]],
'max_gripper': 990,
'min_gripper': 10,
'servo_config_file': '/home/maic/LYT/lerobot/lerobot/common/robot_devices/teleop/servo_dual.yaml',
'end_control_info': {'left': '0300b14bff1100003708000010010000', 'right': '030003f05e0400008e02000010010000'}
}
arm_controller = HybridController(init_info)
time.sleep(1)
try:
while True:
print(arm_controller.get_action())
time.sleep(1)
except KeyboardInterrupt:
arm_controller.stop()

View File

@@ -0,0 +1,466 @@
import pygame
import threading
import time
import serial
import binascii
import logging
import yaml
from typing import Dict
from Robotic_Arm.rm_robot_interface import *
class ServoArm:
def __init__(self, config_file="config.yaml"):
"""初始化机械臂的串口连接并发送初始数据。
Args:
config_file (str): 配置文件的路径。
"""
self.config = self._load_config(config_file)
self.port = self.config["port"]
self.baudrate = self.config["baudrate"]
self.joint_hex_data = self.config["joint_hex_data"]
self.control_hex_data = self.config["control_hex_data"]
self.arm_axis = self.config.get("arm_axis", 7)
try:
self.serial_conn = serial.Serial(self.port, self.baudrate, timeout=0)
self.bytes_to_send = binascii.unhexlify(self.joint_hex_data.replace(" ", ""))
self.serial_conn.write(self.bytes_to_send)
time.sleep(1)
self.connected = True
logging.info(f"串口连接成功: {self.port}")
except Exception as e:
logging.error(f"串口连接失败: {e}")
self.connected = False
def _load_config(self, config_file):
"""加载配置文件。
Args:
config_file (str): 配置文件的路径。
Returns:
dict: 配置文件内容。
"""
try:
with open(config_file, "r") as file:
config = yaml.safe_load(file)
return config
except Exception as e:
logging.error(f"配置文件加载失败: {e}")
# 返回默认配置
return {
"port": "/dev/ttyUSB0",
"baudrate": 460800,
"joint_hex_data": "55 AA 02 00 00 67",
"control_hex_data": "55 AA 08 00 00 B9",
"arm_axis": 6
}
def _bytes_to_signed_int(self, byte_data):
"""将字节数据转换为有符号整数。
Args:
byte_data (bytes): 字节数据。
Returns:
int: 有符号整数。
"""
return int.from_bytes(byte_data, byteorder="little", signed=True)
def _parse_joint_data(self, hex_received):
"""解析接收到的十六进制数据并提取关节数据。
Args:
hex_received (str): 接收到的十六进制字符串数据。
Returns:
dict: 解析后的关节数据。
"""
logging.debug(f"hex_received: {hex_received}")
joints = {}
for i in range(self.arm_axis):
start = 14 + i * 10
end = start + 8
joint_hex = hex_received[start:end]
joint_byte_data = bytearray.fromhex(joint_hex)
joint_value = self._bytes_to_signed_int(joint_byte_data) / 10000.0
joints[f"joint_{i+1}"] = joint_value
grasp_start = 14 + self.arm_axis*10
grasp_hex = hex_received[grasp_start:grasp_start+8]
grasp_byte_data = bytearray.fromhex(grasp_hex)
# 夹爪进行归一化处理
grasp_value = self._bytes_to_signed_int(grasp_byte_data)/1000
joints["grasp"] = grasp_value
return joints
def _parse_controller_data(self, hex_received):
status = {
'infrared': 0,
'button': 0
}
if len(hex_received) == 18:
status['infrared'] = self._bytes_to_signed_int(bytearray.fromhex(hex_received[12:14]))
status['button'] = self._bytes_to_signed_int(bytearray.fromhex(hex_received[14:16]))
# print(infrared)
return status
def get_joint_actions(self):
"""从串口读取数据并解析关节动作。
Returns:
dict: 包含关节数据的字典。
"""
if not self.connected:
return {}
try:
self.serial_conn.write(self.bytes_to_send)
time.sleep(0.02)
bytes_received = self.serial_conn.read(self.serial_conn.inWaiting())
if len(bytes_received) == 0:
return {}
hex_received = binascii.hexlify(bytes_received).decode("utf-8").upper()
actions = self._parse_joint_data(hex_received)
return actions
except Exception as e:
logging.error(f"读取串口数据错误: {e}")
return {}
def get_controller_status(self):
bytes_to_send = binascii.unhexlify(self.control_hex_data.replace(" ", ""))
self.serial_conn.write(bytes_to_send)
time.sleep(0.02)
bytes_received = self.serial_conn.read(self.serial_conn.inWaiting())
hex_received = binascii.hexlify(bytes_received).decode("utf-8").upper()
# print("control status:", hex_received)
status = self._parse_controller_data(hex_received)
return status
def close(self):
"""关闭串口连接"""
if self.connected and hasattr(self, 'serial_conn'):
self.serial_conn.close()
self.connected = False
logging.info("串口连接已关闭")
class HybridController:
def __init__(self, init_info):
# 初始化pygame和手柄
pygame.init()
pygame.joystick.init()
# 检查是否有连接的手柄
if pygame.joystick.get_count() == 0:
raise Exception("未检测到手柄")
# 初始化手柄
self.joystick = pygame.joystick.Joystick(0)
self.joystick.init()
# 摇杆死区
self.deadzone = 0.15
# 控制模式: True为关节控制主模式False为末端控制
self.joint_control_mode = True
# 精细控制模式
self.fine_control_mode = False
# 初始化末端姿态和关节角度
self.init_joint = init_info['init_joint']
self.init_pose = init_info.get('init_pose', [0]*6)
self.max_gripper = init_info['max_gripper']
self.min_gripper = init_info['min_gripper']
servo_config_file = init_info['servo_config_file']
self.joint = self.init_joint.copy()
self.pose = self.init_pose.copy()
self.pose_speeds = [0.0] * 6
self.joint_speeds = [0.0] * 6
self.tozero = False
# 主臂关节状态
self.master_joint_actions = {}
self.master_controller_status = {}
self.use_master_arm = False
# 末端位姿限制
self.pose_limits = [
(-0.800, 0.800), # X (m)
(-0.800, 0.800), # Y (m)
(-0.800, 0.800), # Z (m)
(-3.14, 3.14), # RX (rad)
(-3.14, 3.14), # RY (rad)
(-3.14, 3.14) # RZ (rad)
]
# 关节角度限制 (度)
self.joint_limits = [
(-180, 180), # joint 1
(-180, 180), # joint 2
(-180, 180), # joint 3
(-180, 180), # joint 4
(-180, 180), # joint 5
(-180, 180) # joint 6
]
# 控制参数
self.linear_step = 0.002 # 线性移动步长(m)
self.angular_step = 0.01 # 角度步长(rad)
# 夹爪状态和速度
self.gripper_speed = 10
self.gripper = self.min_gripper
# 初始化串口通信(主臂关节状态获取)
self.servo_arm = None
if servo_config_file:
try:
self.servo_arm = ServoArm(servo_config_file)
self.use_master_arm = True
logging.info("串口主臂连接成功,启用主从控制模式")
except Exception as e:
logging.error(f"串口主臂连接失败: {e}")
self.use_master_arm = False
# 启动更新线程
self.running = True
self.thread = threading.Thread(target=self.update_controller)
self.thread.start()
print("混合控制器已启动")
print("主控制模式: 关节控制")
if self.use_master_arm:
print("主从控制: 启用")
print("Back按钮: 切换控制模式(关节/末端)")
print("L3按钮: 切换精细控制模式")
print("Start按钮: 重置到初始位置")
def _apply_nonlinear_mapping(self, value):
"""应用非线性映射以提高控制精度"""
sign = 1 if value >= 0 else -1
return sign * (abs(value) ** 2)
def _normalize_angle(self, angle):
"""将角度归一化到[-π, π]范围内"""
import math
while angle > math.pi:
angle -= 2 * math.pi
while angle < -math.pi:
angle += 2 * math.pi
return angle
def update_controller(self):
while self.running:
try:
pygame.event.pump()
except Exception as e:
print(f"控制器错误: {e}")
self.stop()
continue
# 检查控制模式切换 (Back按钮)
if self.joystick.get_button(6): # Back按钮
self.joint_control_mode = not self.joint_control_mode
mode_str = "关节控制" if self.joint_control_mode else "末端位姿控制"
print(f"切换到{mode_str}模式")
time.sleep(0.3) # 防止多次触发
# 检查精细控制模式切换 (L3按钮)
if self.joystick.get_button(10): # L3按钮
self.fine_control_mode = not self.fine_control_mode
print(f"切换到{'精细' if self.fine_control_mode else '普通'}控制模式")
time.sleep(0.3) # 防止多次触发
# 检查重置按钮 (Start按钮)
if self.joystick.get_button(7): # Start按钮
print("重置机械臂到初始位置...")
# print("init_joint", self.init_joint.copy())
self.tozero = True
self.joint = self.init_joint.copy()
self.pose = self.init_pose.copy()
self.pose_speeds = [0.0] * 6
self.joint_speeds = [0.0] * 6
self.gripper_speed = 10
self.gripper = self.min_gripper
print("机械臂已重置到初始位置")
time.sleep(0.3) # 防止多次触发
# 从串口获取主臂关节状态
if self.servo_arm and self.servo_arm.connected:
try:
self.master_joint_actions = self.servo_arm.get_joint_actions()
self.master_controller_status = self.servo_arm.get_controller_status()
if self.master_joint_actions:
logging.debug(f"主臂关节状态: {self.master_joint_actions}")
except Exception as e:
logging.error(f"获取主臂状态错误: {e}")
self.master_joint_actions = {}
# print(self.master_joint_actions)
# 根据控制模式更新相应的控制逻辑
if self.joint_control_mode:
# 关节控制模式下优先使用主臂数据Xbox作为辅助
self.update_joint_control()
else:
# 末端控制模式使用Xbox控制
self.update_end_pose()
time.sleep(0.02)
# print('gripper:', self.gripper)
def update_joint_control(self):
"""更新关节角度控制 - 优先使用主臂数据"""
if self.use_master_arm and self.master_joint_actions:
# 主从控制模式:直接使用主臂的关节角度
try:
# 将主臂关节角度映射到从臂
for i in range(6): # 假设只有6个关节需要控制
joint_key = f"joint_{i+1}"
if joint_key in self.master_joint_actions:
# 直接使用主臂的关节角度(已经是度数)
self.joint[i] = self.master_joint_actions[joint_key]
# 应用关节限制
min_val, max_val = self.joint_limits[i]
self.joint[i] = max(min_val, min(max_val, self.joint[i]))
# print(self.joint)
logging.debug(f"主臂关节映射到从臂: {self.joint[:6]}")
except Exception as e:
logging.error(f"主臂数据映射错误: {e}")
# 如果有主臂夹爪数据,使用主臂夹爪状态
if self.use_master_arm and "grasp" in self.master_joint_actions:
self.gripper = self.master_joint_actions["grasp"] * 1000
self.joint[-1] = self.gripper
def update_end_pose(self):
"""更新末端位姿控制"""
# 根据控制模式调整步长
current_linear_step = self.linear_step * (0.1 if self.fine_control_mode else 1.0)
current_angular_step = self.angular_step * (0.1 if self.fine_control_mode else 1.0)
# 方向键控制XY
hat = self.joystick.get_hat(0)
hat_up = hat[1] == 1 # Y+
hat_down = hat[1] == -1 # Y-
hat_left = hat[0] == -1 # X-
hat_right = hat[0] == 1 # X+
# 右摇杆控制Z
right_y_raw = -self.joystick.get_axis(4)
# 左摇杆控制RZ
left_y_raw = -self.joystick.get_axis(1)
# 应用死区
right_y = 0.0 if abs(right_y_raw) < self.deadzone else right_y_raw
left_y = 0.0 if abs(left_y_raw) < self.deadzone else left_y_raw
# 计算各轴速度
self.pose_speeds[1] = current_linear_step if hat_up else (-current_linear_step if hat_down else 0.0) # Y
self.pose_speeds[0] = -current_linear_step if hat_left else (current_linear_step if hat_right else 0.0) # X
# 设置Z速度右摇杆Y轴控制
z_mapping = self._apply_nonlinear_mapping(right_y)
self.pose_speeds[2] = z_mapping * current_linear_step # Z
# L1/R1控制RX旋转
LB = self.joystick.get_button(4) # RX-
RB = self.joystick.get_button(5) # RX+
self.pose_speeds[3] = (-current_angular_step if LB else (current_angular_step if RB else 0.0))
# △/□控制RY旋转
triangle = self.joystick.get_button(2) # RY+
square = self.joystick.get_button(3) # RY-
self.pose_speeds[4] = (current_angular_step if triangle else (-current_angular_step if square else 0.0))
# 左摇杆Y轴控制RZ旋转
rz_mapping = self._apply_nonlinear_mapping(left_y)
self.pose_speeds[5] = rz_mapping * current_angular_step * 2 # RZ
# 夹爪控制(圈/叉)
circle = self.joystick.get_button(1) # 夹爪开
cross = self.joystick.get_button(0) # 夹爪关
if circle:
self.gripper = min(self.max_gripper, self.gripper + self.gripper_speed)
elif cross:
self.gripper = max(self.min_gripper, self.gripper - self.gripper_speed)
# 更新末端位姿
for i in range(6):
self.pose[i] += self.pose_speeds[i]
# 角度归一化处理
for i in range(3, 6):
self.pose[i] = self._normalize_angle(self.pose[i])
def update_joint_state(self, joint):
self.joint = joint
# self.tozero = False
def update_endpose_state(self, end_pose):
self.pose = end_pose
# self.tozero = False
def update_tozero_state(self, tozero):
self.tozero = tozero
def get_action(self) -> Dict:
"""获取当前控制命令"""
return {
'control_mode': 'joint' if self.joint_control_mode else 'end_pose',
'use_master_arm': self.use_master_arm,
'master_joint_actions': self.master_joint_actions,
'master_controller_status': self.master_controller_status,
'end_pose': self.pose,
'joint_angles': self.joint,
'gripper': int(self.gripper),
'tozero': self.tozero
}
def stop(self):
"""停止控制器"""
self.running = False
if self.thread.is_alive():
self.thread.join()
if self.servo_arm:
self.servo_arm.close()
pygame.quit()
print("混合控制器已退出")
def reset(self):
"""重置到初始状态"""
self.joint = self.init_joint.copy()
self.pose = self.init_pose.copy()
self.pose_speeds = [0.0] * 6
self.joint_speeds = [0.0] * 6
self.gripper_speed = 10
self.gripper = self.min_gripper
print("已重置到初始状态")
# 使用示例
if __name__ == "__main__":
# 初始化睿尔曼机械臂
arm = RoboticArm(rm_thread_mode_e.RM_TRIPLE_MODE_E)
# 创建机械臂连接
handle = arm.rm_create_robot_arm("192.168.3.18", 8080)
print(f"机械臂连接ID: {handle.id}")
init_pose = arm.rm_get_current_arm_state()[1]['pose']
with open('/home/maic/LYT/lerobot/lerobot/common/robot_devices/teleop/realman_mix.yaml', "r") as file:
config = yaml.safe_load(file)
config['init_pose'] = init_pose
arm_controller = HybridController(config)
try:
while True:
print(arm_controller.get_action())
time.sleep(0.1)
except KeyboardInterrupt:
arm_controller.stop()

View File

@@ -1,5 +1,6 @@
port: /dev/ttyUSB0
right_port: /dev/ttyUSB1
baudrate: 460800
hex_data: "55 AA 02 00 00 67"
joint_hex_data: "55 AA 02 00 00 67"
control_hex_data: "55 AA 08 00 00 B9"
arm_axis: 6

View File

@@ -0,0 +1,6 @@
left_port: /dev/ttyUSB0
right_port: /dev/ttyUSB1
baudrate: 460800
joint_hex_data: "55 AA 02 00 00 67"
control_hex_data: "55 AA 08 00 00 B9"
arm_axis: 6

View File

@@ -0,0 +1,321 @@
import threading
import time
import serial
import binascii
import logging
import yaml
from typing import Dict
# logging.basicConfig(
# level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
# )
class ServoArmServer:
def __init__(self, config_file="servo_dual.yaml"):
"""初始化服务器,创建左右机械臂实例"""
self.config_file = config_file
self.left_servo_arm = None
self.right_servo_arm = None
self.running = False
self.data_lock = threading.Lock()
self.latest_data = {
'left_joint_actions': {},
'right_joint_actions': {},
'left_controller_status': {},
'right_controller_status': {},
'timestamp': time.time()
}
# 初始化机械臂
self._initialize_arms()
# 启动数据采集线程
self._start_data_collection()
def _initialize_arms(self):
"""初始化左右机械臂"""
try:
self.left_servo_arm = ServoArm(self.config_file, "left_port")
logging.info("左master机械臂初始化成功")
except Exception as e:
logging.error(f"左master机械臂初始化失败: {e}")
try:
self.right_servo_arm = ServoArm(self.config_file, "right_port")
logging.info("右master机械臂初始化成功")
except Exception as e:
logging.error(f"右master机械臂初始化失败: {e}")
def _start_data_collection(self):
"""启动数据采集线程"""
self.running = True
# 创建左臂数据采集线程
self.left_data_thread = threading.Thread(target=self._left_arm_data_loop)
self.left_data_thread.daemon = True
self.left_data_thread.start()
# 创建右臂数据采集线程
self.right_data_thread = threading.Thread(target=self._right_arm_data_loop)
self.right_data_thread.daemon = True
self.right_data_thread.start()
logging.info("左右机械臂数据采集线程已启动")
def _left_arm_data_loop(self):
"""左机械臂数据采集循环"""
while self.running:
try:
left_actions = {}
left_controller_status = {}
# 获取左机械臂数据
if self.left_servo_arm and self.left_servo_arm.connected:
left_actions = self.left_servo_arm.get_joint_actions()
left_controller_status = self.left_servo_arm.get_controller_status()
if self._check_val_safety(left_actions) == False:
time.sleep(0.02)
continue
# 更新左机械臂数据
with self.data_lock:
self.latest_data['left_joint_actions'] = [left_actions[k] for k in left_actions]
self.latest_data['left_controller_status'] = left_controller_status
# 更新dual_joint_actions
if self.latest_data['right_joint_actions']:
self.latest_data['dual_joint_actions'] = self.latest_data['left_joint_actions'] + self.latest_data['right_joint_actions']
self.latest_data['timestamp'] = time.time()
time.sleep(0.02) # 50Hz采集频率
except Exception as e:
logging.error(f"左机械臂数据采集错误: {e}")
time.sleep(0.1)
def _right_arm_data_loop(self):
"""右机械臂数据采集循环"""
while self.running:
try:
right_actions = {}
right_controller_status = {}
# 获取右机械臂数据
if self.right_servo_arm and self.right_servo_arm.connected:
right_actions = self.right_servo_arm.get_joint_actions()
right_controller_status = self.right_servo_arm.get_controller_status()
if self._check_val_safety(right_actions) == False:
time.sleep(0.02)
continue
# 更新右机械臂数据
with self.data_lock:
self.latest_data['right_joint_actions'] = [right_actions[k] for k in right_actions]
self.latest_data['right_controller_status'] = right_controller_status
# 更新dual_joint_actions
if self.latest_data['left_joint_actions']:
self.latest_data['dual_joint_actions'] = self.latest_data['left_joint_actions'] + self.latest_data['right_joint_actions']
self.latest_data['timestamp'] = time.time()
time.sleep(0.02) # 50Hz采集频率
except Exception as e:
logging.error(f"右机械臂数据采集错误: {e}")
time.sleep(0.1)
def _check_val_safety(self, data: dict):
data = [data[k] for k in data]
ret = True
if len(data) != self.left_servo_arm.arm_axis + 1:
ret = False
for v in data:
if v > 180 or v < -180:
ret = False
return ret
# ZeroRPC 服务方法
def get_joint_data(self):
"""获取最新的关节数据"""
with self.data_lock:
return self.latest_data.copy()
def get_left_joint_actions(self):
"""获取左机械臂关节数据和控制器状态"""
with self.data_lock:
return {
'data': self.latest_data['left_joint_actions'],
'controller_status': self.latest_data['left_controller_status'],
'timestamp': self.latest_data['timestamp']
}
def get_right_joint_actions(self):
"""获取右机械臂关节数据和控制器状态"""
with self.data_lock:
return {
'data': self.latest_data['right_joint_actions'],
'controller_status': self.latest_data['right_controller_status'],
'timestamp': self.latest_data['timestamp']
}
def get_connection_status(self):
"""获取连接状态"""
return {
'left_connected': self.left_servo_arm.connected if self.left_servo_arm else False,
'right_connected': self.right_servo_arm.connected if self.right_servo_arm else False,
'server_running': self.running
}
def ping(self):
"""测试连接"""
return "pong"
def shutdown(self):
"""关闭服务器"""
logging.info("正在关闭服务器...")
self.running = False
if self.left_servo_arm:
self.left_servo_arm.close()
if self.right_servo_arm:
self.right_servo_arm.close()
return "Server shutdown"
class ServoArm:
def __init__(self, config_file="config.yaml", port_name="left_port"):
"""初始化机械臂的串口连接并发送初始数据。
Args:
config_file (str): 配置文件的路径。
"""
self.config = self._load_config(config_file)
self.port = self.config[port_name]
self.baudrate = self.config["baudrate"]
self.joint_hex_data = self.config["joint_hex_data"]
self.control_hex_data = self.config["control_hex_data"]
self.arm_axis = self.config.get("arm_axis", 7)
try:
self.serial_conn = serial.Serial(self.port, self.baudrate, timeout=0)
self.bytes_to_send = binascii.unhexlify(self.joint_hex_data.replace(" ", ""))
self.serial_conn.write(self.bytes_to_send)
time.sleep(1)
self.connected = True
logging.info(f"串口连接成功: {self.port}")
except Exception as e:
logging.error(f"串口连接失败: {e}")
self.connected = False
def _load_config(self, config_file):
"""加载配置文件。
Args:
config_file (str): 配置文件的路径。
Returns:
dict: 配置文件内容。
"""
try:
with open(config_file, "r") as file:
config = yaml.safe_load(file)
return config
except Exception as e:
logging.error(f"配置文件加载失败: {e}")
# 返回默认配置
return {
"port": "/dev/ttyUSB0",
"baudrate": 460800,
"joint_hex_data": "55 AA 02 00 00 67",
"control_hex_data": "55 AA 08 00 00 B9",
"arm_axis": 6
}
def _bytes_to_signed_int(self, byte_data):
"""将字节数据转换为有符号整数。
Args:
byte_data (bytes): 字节数据。
Returns:
int: 有符号整数。
"""
return int.from_bytes(byte_data, byteorder="little", signed=True)
def _parse_joint_data(self, hex_received):
"""解析接收到的十六进制数据并提取关节数据。
Args:
hex_received (str): 接收到的十六进制字符串数据。
Returns:
dict: 解析后的关节数据。
"""
logging.debug(f"hex_received: {hex_received}")
joints = {}
for i in range(self.arm_axis):
start = 14 + i * 10
end = start + 8
joint_hex = hex_received[start:end]
joint_byte_data = bytearray.fromhex(joint_hex)
joint_value = self._bytes_to_signed_int(joint_byte_data) / 10000.0
joints[f"joint_{i+1}"] = joint_value #/ 180
grasp_start = 14 + self.arm_axis*10
grasp_hex = hex_received[grasp_start:grasp_start+8]
grasp_byte_data = bytearray.fromhex(grasp_hex)
# 夹爪进行归一化处理
grasp_value = self._bytes_to_signed_int(grasp_byte_data)/1000
joints["grasp"] = grasp_value
return joints
def _parse_controller_data(self, hex_received):
status = {
'infrared': 0,
'button': 0
}
if len(hex_received) == 18:
status['infrared'] = self._bytes_to_signed_int(bytearray.fromhex(hex_received[12:14]))
status['button'] = self._bytes_to_signed_int(bytearray.fromhex(hex_received[14:16]))
# print(infrared)
return status
def get_joint_actions(self):
"""从串口读取数据并解析关节动作。
Returns:
dict: 包含关节数据的字典。
"""
if not self.connected:
return {}
try:
self.serial_conn.write(self.bytes_to_send)
time.sleep(0.025)
bytes_received = self.serial_conn.read(self.serial_conn.inWaiting())
if len(bytes_received) == 0:
return {}
hex_received = binascii.hexlify(bytes_received).decode("utf-8").upper()
actions = self._parse_joint_data(hex_received)
return actions
except Exception as e:
logging.error(f"读取串口数据错误: {e}")
return {}
def get_controller_status(self):
bytes_to_send = binascii.unhexlify(self.control_hex_data.replace(" ", ""))
self.serial_conn.write(bytes_to_send)
time.sleep(0.025)
bytes_received = self.serial_conn.read(self.serial_conn.inWaiting())
hex_received = binascii.hexlify(bytes_received).decode("utf-8").upper()
# print("control status:", hex_received)
status = self._parse_controller_data(hex_received)
return status
def close(self):
"""关闭串口连接"""
if self.connected and hasattr(self, 'serial_conn'):
self.serial_conn.close()
self.connected = False
logging.info("串口连接已关闭")

View File

@@ -175,9 +175,13 @@ def say(text, blocking=False):
cmd = ["say", text]
elif system == "Linux":
cmd = ["spd-say", text]
if blocking:
cmd.append("--wait")
# cmd = ["spd-say", text]
cmd = ["edge-playback", "-t", text]
######################################## remove #####################################
# if blocking:
# cmd.append("--wait")
######################################## remove #####################################
elif system == "Windows":
cmd = [

View File

@@ -8,6 +8,8 @@ conda activate lerobot
Install 🤗 LeRobot:
```bash
pip install -e . -i https://pypi.tuna.tsinghua.edu.cn/simple
pip install edge-tts
sudo apt install mpv -y
# pip uninstall numpy
# pip install numpy==1.26.0
@@ -31,16 +33,14 @@ pip install pygame
见lerobot_piper_tutorial/1. 🤗 LeRobot新增机械臂的一般流程.pdf
# Teleoperate
```bash
cd piper_scripts/
bash can_activate.sh can0 1000000
cd ..
```python
python lerobot/scripts/control_robot.py \
--robot.type=piper \
--robot.inference_time=false \
--control.type=teleoperate
--robot.type=realman_dual \
--robot.inference_time=false \
--control.type=teleoperate \
--control.display_data=true
```
display_data=true turn on run.io else turn off.
# Record
Set dataset root path
@@ -51,18 +51,19 @@ echo $HF_USER
```bash
python lerobot/scripts/control_robot.py \
--robot.type=piper \
--robot.type=realman \
--robot.inference_time=false \
--control.type=record \
--control.fps=30 \
--control.fps=15 \
--control.single_task="move" \
--control.repo_id=${HF_USER}/test \
--control.repo_id=maic/test \
--control.num_episodes=2 \
--control.warmup_time_s=2 \
--control.episode_time_s=10 \
--control.reset_time_s=10 \
--control.play_sounds=true \
--control.push_to_hub=false
--control.push_to_hub=false \
--control.display_data=true
```
Press right arrow -> at any time during episode recording to early stop and go to resetting. Same during resetting, to early stop and to go to the next episode recording.
@@ -104,22 +105,50 @@ python lerobot/scripts/train.py \
--wandb.enable=true
```
# FT smolvla
python lerobot/scripts/train.py \
--dataset.repo_id=maic/move_the_bottle_into_ultrasonic_device_with_realman_single \
--policy.path=lerobot/smolvla_base \
--output_dir=outputs/train/smolvla_move_the_bottle_into_ultrasonic_device_with_realman_single \
--job_name=smolvla_move_the_bottle_into_ultrasonic_device_with_realman_single \
--policy.device=cuda \
--wandb.enable=false \
--steps=200000 \
--batch_size=16
# Inference
还是使用control_robot.py中的record loop配置 **--robot.inference_time=true** 可以将手柄移出。
```bash
python lerobot/scripts/control_robot.py \
--robot.type=piper \
--robot.type=realman \
--robot.inference_time=true \
--control.type=record \
--control.fps=30 \
--control.single_task="move" \
--control.repo_id=$USER/eval_act_jack \
--control.single_task="move the bottle into ultrasonic device with realman single" \
--control.repo_id=maic/move_the_bottle_into_ultrasonic_device_with_realman_single \
--control.num_episodes=1 \
--control.warmup_time_s=2 \
--control.episode_time_s=30 \
--control.reset_time_s=10 \
--control.push_to_hub=false \
--control.policy.path=outputs/train/act_koch_pick_place_lego/checkpoints/latest/pretrained_model
--control.policy.path=outputs/train/act_move_the_bottle_into_ultrasonic_device_with_realman_single/checkpoints/100000/pretrained_model
```
```bash
python lerobot/scripts/control_robot.py \
--robot.type=realman \
--robot.inference_time=true \
--control.type=record \
--control.fps=30 \
--control.single_task="move the bottle into ultrasonic device with realman single" \
--control.repo_id=maic/eval_smolvla_move_the_bottle_into_ultrasonic_device_with_realman_single \
--control.num_episodes=1 \
--control.warmup_time_s=2 \
--control.episode_time_s=60 \
--control.reset_time_s=10 \
--control.push_to_hub=false \
--control.policy.path=outputs/train/smolvla_move_the_bottle_into_ultrasonic_device_with_realman_single/checkpoints/160000/pretrained_model \
--control.display_data=true
```

View File

@@ -0,0 +1,140 @@
import pygame
import time
import sys
class FlightStickDetector:
def __init__(self):
# 初始化pygame
pygame.init()
pygame.joystick.init()
self.joysticks = []
self.running = True
def detect_joysticks(self):
"""检测连接的手柄"""
joystick_count = pygame.joystick.get_count()
print(f"检测到 {joystick_count} 个手柄设备")
if joystick_count == 0:
print("未检测到任何手柄设备!")
return False
# 初始化所有检测到的手柄
self.joysticks = []
for i in range(joystick_count):
joystick = pygame.joystick.Joystick(i)
joystick.init()
self.joysticks.append(joystick)
print(f"\n手柄 {i}:")
print(f" 名称: {joystick.get_name()}")
print(f" 轴数量: {joystick.get_numaxes()}")
print(f" 按钮数量: {joystick.get_numbuttons()}")
print(f" 帽子开关数量: {joystick.get_numhats()}")
return True
def get_joystick_data(self, joystick_id=0):
"""获取指定手柄的数据"""
if joystick_id >= len(self.joysticks):
return None
joystick = self.joysticks[joystick_id]
# 获取轴数据
axes = []
for i in range(joystick.get_numaxes()):
axis_value = joystick.get_axis(i)
axes.append(round(axis_value, 3))
# 获取按钮数据
buttons = []
for i in range(joystick.get_numbuttons()):
button_pressed = joystick.get_button(i)
buttons.append(button_pressed)
# 获取帽子开关数据
hats = []
for i in range(joystick.get_numhats()):
hat_value = joystick.get_hat(i)
hats.append(hat_value)
return {
'name': joystick.get_name(),
'axes': axes,
'buttons': buttons,
'hats': hats
}
def monitor_joystick(self, joystick_id=0, update_interval=0.1):
"""实时监控手柄数据"""
print(f"\n开始监控手柄 {joystick_id} (按Ctrl+C停止)")
print("=" * 50)
try:
while self.running:
pygame.event.pump() # 更新事件队列
data = self.get_joystick_data(joystick_id)
if data is None:
print("无法获取手柄数据")
break
# 清屏并显示数据
print("\033[H\033[J", end="") # ANSI清屏
print(f"手柄: {data['name']}")
print("-" * 40)
# 显示轴数据(通常用于飞行控制)
axis_names = ['X轴(副翼)', 'Y轴(升降舵)', 'Z轴(方向舵)',
'油门', '轴4', '轴5', '轴6', '轴7']
print("轴数据:")
for i, value in enumerate(data['axes']):
name = axis_names[i] if i < len(axis_names) else f'{i}'
# 创建进度条显示
bar_length = 20
bar_pos = int((value + 1) * bar_length / 2)
bar = [''] * bar_length
if 0 <= bar_pos < bar_length:
bar[bar_pos] = ''
bar_str = ''.join(bar)
print(f" {name:12}: {value:6.3f} [{bar_str}]")
# 显示按钮数据
pressed_buttons = [i for i, pressed in enumerate(data['buttons']) if pressed]
if pressed_buttons:
print(f"\n按下的按钮: {pressed_buttons}")
# 显示帽子开关数据
if data['hats']:
print(f"帽子开关: {data['hats']}")
time.sleep(update_interval)
except KeyboardInterrupt:
print("\n\n监控已停止")
def cleanup(self):
"""清理资源"""
pygame.joystick.quit()
pygame.quit()
# 使用示例
def main():
detector = FlightStickDetector()
try:
# 检测手柄
if detector.detect_joysticks():
# 如果检测到手柄,开始监控第一个手柄
detector.monitor_joystick(0)
else:
print("请连接飞行手柄后重试")
finally:
detector.cleanup()
if __name__ == "__main__":
main()