Compare commits

...

2 Commits

Author SHA1 Message Date
7c1699898b realman xbox+flight controller
Some checks failed
Secret Leaks / trufflehog (push) Failing after 1m30s
2025-07-13 15:42:27 +08:00
b3e9e11e11 refactory gamepad 2025-07-06 14:28:38 +08:00
10 changed files with 1161 additions and 633 deletions

View File

@@ -88,7 +88,7 @@ class RealmanDualMotorsBus:
self._thread_lock = threading.Lock()
# 添加读取相关的线程控制
self._state_cache = {}
self._state_cache = {"joint": {}, "pose": {}}
self._cache_lock = threading.Lock()
self._keep_reading = True
@@ -97,21 +97,12 @@ class RealmanDualMotorsBus:
def _start_background_readers(self):
"""启动后台读取线程"""
# 左臂读取线程
# 读取线程
threading.Thread(
target=self._left_read_task,
target=self._read_task,
daemon=True,
name="left_arm_reader"
name="arm_reader"
).start()
# 右臂读取线程
threading.Thread(
target=self._right_read_task,
daemon=True,
name="right_arm_reader"
).start()
@property
def motor_names(self) -> list[str]:
@@ -134,27 +125,24 @@ class RealmanDualMotorsBus:
except Exception as e:
raise TimeoutError(f"操作超时: {e}")
def _left_read_task(self):
def _read_task(self):
"""左臂后台读取任务 - 模仿_left_slow_task的风格"""
while self._keep_reading:
try:
left_state = self._read_arm_state(self.left_rmarm, "left")
with self._cache_lock:
self._state_cache.update(left_state)
self._state_cache["joint"].update(left_state["joint"])
self._state_cache["pose"].update(left_state["pose"])
except Exception as e:
print(f"左臂读取失败: {e}")
time.sleep(0.005)
def _right_read_task(self):
"""右臂后台读取任务 - 模仿_right_slow_task的风格"""
while self._keep_reading:
try:
right_state = self._read_arm_state(self.right_rmarm, "right")
with self._cache_lock:
self._state_cache.update(right_state)
self._state_cache["joint"].update(right_state["joint"])
self._state_cache["pose"].update(right_state["pose"])
except Exception as e:
print(f"右臂读取失败: {e}")
time.sleep(0.005)
def _read_arm_state(self, arm: RoboticArm, prefix: str) -> dict:
"""读取单臂状态 - 保持原有逻辑"""
@@ -163,12 +151,23 @@ class RealmanDualMotorsBus:
joint_state = joint_msg['joint']
gripper_state = gripper_msg['actpos']
pose_state = joint_msg['pose']
state_dict = {}
for i in range(6):
state_dict[f"{prefix}_joint_{i+1}"] = joint_state[i] / 180
state_dict[f"{prefix}_gripper"] = (gripper_state - 500) / 500
return state_dict
joint_state_dict = {}
for i in range(len(joint_state)):
joint_state_dict[f"{prefix}_joint_{i+1}"] = joint_state[i]
joint_state_dict[f"{prefix}_gripper"] = gripper_state
pose_state_dict = {
f"{prefix}_x": pose_state[0],
f"{prefix}_y": pose_state[1],
f"{prefix}_z": pose_state[2],
f"{prefix}_rx": pose_state[3],
f"{prefix}_ry": pose_state[4],
f"{prefix}_rz": pose_state[5],
}
return {"joint": joint_state_dict, 'pose': pose_state_dict}
def _move_to_initial_position(self):
"""移动到初始位置"""
@@ -189,10 +188,6 @@ class RealmanDualMotorsBus:
if len(joints) != expected_count:
raise ValueError(f"关节数量不匹配: 期望 {expected_count}, 实际 {len(joints)}")
def _extract_joint_state(self, state: dict, arm_prefix: str) -> list:
"""提取关节状态"""
return [state[v] * 180 for v in state if f"{arm_prefix}" in v]
def _execute_slow_movement(self, arm: str, joint_data: list):
"""执行慢速运动"""
busy_flag = f"{arm}_slow_busy"
@@ -256,7 +251,7 @@ class RealmanDualMotorsBus:
self.right_rmarm.rm_movej_canfd(right_joint, False)
def write_endpose_canfd(self, target_endpose: list):
assert target_endpose == 12, "the length of target pose is not equal 12"
assert len(target_endpose) == 12, "the length of target pose is not equal 12"
self.left_rmarm.rm_movep_canfd(target_endpose[:6], False)
self.right_rmarm.rm_movep_canfd(target_endpose[6:], False)
@@ -314,28 +309,13 @@ class RealmanDualMotorsBus:
right_joints = target_joint[self.left_offset+1:-1]
right_gripper = target_joint[-1]
self.left_rmarm.rm_movej_follow(left_joints)
self.left_rmarm.rm_set_gripper_position(left_gripper, block=False, timeout=2)
self.right_rmarm.rm_movej_follow(right_joints)
self.right_rmarm.rm_set_gripper_position(right_gripper, block=False, timeout=2)
self.left_rmarm.rm_movej_canfd(left_joints, follow=False)
# self.left_rmarm.rm_movej_follow(left_joints)
# self.left_rmarm.rm_set_gripper_position(left_gripper, block=False, timeout=2)
self.right_rmarm.rm_movej_canfd(right_joints, follow=False)
# self.right_rmarm.rm_movej_follow(right_joints)
# self.right_rmarm.rm_set_gripper_position(right_gripper, block=False, timeout=2)
def write_action(self, action: dict, state: dict):
# 提取状态数据
follow_left_joint = self._extract_joint_state(state, "left_joint")
follow_right_joint = self._extract_joint_state(state, "right_joint")
# 提取动作数据
master_left_joint = action['left_joint_actions']
master_right_joint = action['right_joint_actions']
left_gripper = int(action['left_gripper_actions'] * 1000)
right_gripper = int(action['right_gripper_actions'] * 1000)
# 执行夹爪动作
self.write_dual_gripper(left_gripper, right_gripper)
# 执行关节动作
self._execute_arm_action('left', action, master_left_joint, follow_left_joint)
self._execute_arm_action('right', action, master_right_joint, follow_right_joint)
def read(self) -> Dict:
"""读取机械臂状态 - 直接从缓存获取"""
with self._cache_lock:

View File

@@ -755,7 +755,8 @@ class RealmanDualRobotConfig(RobotConfig):
max_gripper: int = 990
min_gripper: int = 10
servo_config_file: str = "/home/maic/LYT/lerobot/lerobot/common/robot_devices/teleop/servo_dual.yaml"
left_end_control_guid: str = '0300b14bff1100003708000010010000'
right_end_control_guid: str = '0300509d5e040000120b000009050000'
follower_arm: dict[str, MotorsBusConfig] = field(
default_factory=lambda: {

View File

@@ -7,7 +7,7 @@ import torch
import numpy as np
from dataclasses import dataclass, field, replace
from collections import deque
from lerobot.common.robot_devices.teleop.gamepad import HybridController
from lerobot.common.robot_devices.teleop.realman_single import HybridController
from lerobot.common.robot_devices.motors.utils import get_motor_names, make_motors_buses_from_configs
from lerobot.common.robot_devices.cameras.utils import make_cameras_from_configs
from lerobot.common.robot_devices.utils import RobotDeviceAlreadyConnectedError, RobotDeviceNotConnectedError

View File

@@ -9,7 +9,7 @@ import logging
from typing import Optional, Tuple, Dict
from dataclasses import dataclass, field, replace
from collections import deque
from lerobot.common.robot_devices.teleop.gamepad_dual import HybridController
from lerobot.common.robot_devices.teleop.realman_aloha_dual import HybridController
from lerobot.common.robot_devices.motors.utils import get_motor_names, make_motors_buses_from_configs
from lerobot.common.robot_devices.cameras.utils import make_cameras_from_configs
from lerobot.common.robot_devices.utils import RobotDeviceAlreadyConnectedError, RobotDeviceNotConnectedError
@@ -33,15 +33,6 @@ class RealmanDualRobot:
self.piper_motors = make_motors_buses_from_configs(self.config.follower_arm)
self.arm = self.piper_motors['main']
# build init teleop info
self.init_info = {
'init_joint': self.arm.init_joint_position,
'init_pose': self.arm.init_pose,
'max_gripper': config.max_gripper,
'min_gripper': config.min_gripper,
'servo_config_file': config.servo_config_file
}
# 初始化遥操作
self._initialize_teleop()
# init state
@@ -54,7 +45,8 @@ class RealmanDualRobot:
'init_pose': self.arm.init_pose,
'max_gripper': self.config.max_gripper,
'min_gripper': self.config.min_gripper,
'servo_config_file': self.config.servo_config_file
'servo_config_file': self.config.servo_config_file,
'end_control_info': {'left': self.config.left_end_control_guid , 'right': self.config.right_end_control_guid}
}
if not self.inference_time:
@@ -72,17 +64,26 @@ class RealmanDualRobot:
def _read_robot_state(self) -> dict:
"""读取机器人状态"""
before_read_t = time.perf_counter()
state = self.arm.read()
from copy import deepcopy
state = deepcopy(self.arm.read())
self.logs["read_pos_dt_s"] = time.perf_counter() - before_read_t
return state
def _execute_action(self, action: dict, state: dict):
"""执行动作"""
before_write_t = time.perf_counter()
if action['control_mode'] == 'joint':
self.arm.write_action(action, state)
# 可以添加其他控制模式的处理
# self.arm.write_action(action, state)
pass
else:
if list(action['pose'].values()) != list(state['pose'].values()):
pose = list(action['pose'].values())
self.arm.write_endpose_canfd(pose)
elif list(action['joint'].values()) != list(state['joint'].values()):
target_joint = list(action['joint'].values())
self.arm.write(target_joint)
self.logs["write_pos_dt_s"] = time.perf_counter() - before_write_t
@@ -104,8 +105,10 @@ class RealmanDualRobot:
return obs_dict, action_dict
def _update_state_queue(self):
# import pdb
# pdb.set_trace()
"""更新状态队列"""
current_state = list(self.arm.read().values())
current_state = list(self.arm.read()['joint'].values())
self.joint_queue.append(current_state)
def _capture_images(self) -> Dict[str, torch.Tensor]:
@@ -224,12 +227,17 @@ class RealmanDualRobot:
# 读取当前状态
state = self._read_robot_state()
# 获取动作
action = self.teleop.get_action()
action = self.teleop.get_action(state)
# print(state['pose'])
# print(action['pose'])
# 执行动作
# import pdb
# pdb.set_trace()
self._execute_action(action, state)
# 更新状态队列
self._update_state_queue()
time.sleep(0.02)
if record_data:
data = self._prepare_record_data()
return data

View File

@@ -0,0 +1,18 @@
import pygame
def find_controller_index():
# 获取所有 pygame 控制器的设备路径
pygame_joysticks = {}
for i in range(pygame.joystick.get_count()):
joystick = pygame.joystick.Joystick(i)
joystick.init()
pygame_joysticks[joystick.get_guid()] = {
'index': i,
'device_name': joystick.get_name()
}
return pygame_joysticks
if __name__ == '__main__':
print(find_controller_index())

View File

@@ -1,466 +1,419 @@
import pygame
import threading
import time
import serial
import binascii
import logging
import yaml
from typing import Dict
from Robotic_Arm.rm_robot_interface import *
from dataclasses import dataclass
from .find_gamepad import find_controller_index
from .servo_server import ServoArmServer
class ServoArm:
def __init__(self, config_file="config.yaml"):
"""初始化机械臂的串口连接并发送初始数据。
Args:
config_file (str): 配置文件的路径。
"""
self.config = self._load_config(config_file)
self.port = self.config["port"]
self.baudrate = self.config["baudrate"]
self.joint_hex_data = self.config["joint_hex_data"]
self.control_hex_data = self.config["control_hex_data"]
self.arm_axis = self.config.get("arm_axis", 7)
class RealmanAlohaMaster:
def __init__(self, config):
self.config = config
self._initialize_master_arm()
def _initialize_master_arm(self):
"""初始化主控臂"""
try:
self.serial_conn = serial.Serial(self.port, self.baudrate, timeout=0)
self.bytes_to_send = binascii.unhexlify(self.joint_hex_data.replace(" ", ""))
self.serial_conn.write(self.bytes_to_send)
time.sleep(1)
self.connected = True
logging.info(f"串口连接成功: {self.port}")
self.master_dual_arm = ServoArmServer(self.config.config_file)
except Exception as e:
logging.error(f"串口连接失败: {e}")
self.connected = False
logging.error(f"初始化主控臂失败: {e}")
raise
def _load_config(self, config_file):
"""加载配置文件。
Args:
config_file (str): 配置文件的路径。
Returns:
dict: 配置文件内容。
"""
try:
with open(config_file, "r") as file:
config = yaml.safe_load(file)
return config
except Exception as e:
logging.error(f"配置文件加载失败: {e}")
# 返回默认配置
return {
"port": "/dev/ttyUSB0",
"baudrate": 460800,
"joint_hex_data": "55 AA 02 00 00 67",
"control_hex_data": "55 AA 08 00 00 B9",
"arm_axis": 6
}
def _bytes_to_signed_int(self, byte_data):
"""将字节数据转换为有符号整数。
Args:
byte_data (bytes): 字节数据。
Returns:
int: 有符号整数。
"""
return int.from_bytes(byte_data, byteorder="little", signed=True)
def _parse_joint_data(self, hex_received):
"""解析接收到的十六进制数据并提取关节数据。
Args:
hex_received (str): 接收到的十六进制字符串数据。
Returns:
dict: 解析后的关节数据。
"""
logging.debug(f"hex_received: {hex_received}")
joints = {}
for i in range(self.arm_axis):
start = 14 + i * 10
end = start + 8
joint_hex = hex_received[start:end]
joint_byte_data = bytearray.fromhex(joint_hex)
joint_value = self._bytes_to_signed_int(joint_byte_data) / 10000.0
joints[f"joint_{i+1}"] = joint_value
grasp_start = 14 + self.arm_axis*10
grasp_hex = hex_received[grasp_start:grasp_start+8]
grasp_byte_data = bytearray.fromhex(grasp_hex)
# 夹爪进行归一化处理
grasp_value = self._bytes_to_signed_int(grasp_byte_data)/1000
joints["grasp"] = grasp_value
return joints
def _parse_controller_data(self, hex_received):
status = {
'infrared': 0,
'button': 0
}
if len(hex_received) == 18:
status['infrared'] = self._bytes_to_signed_int(bytearray.fromhex(hex_received[12:14]))
status['button'] = self._bytes_to_signed_int(bytearray.fromhex(hex_received[14:16]))
# print(infrared)
return status
def get_joint_actions(self):
"""从串口读取数据并解析关节动作。
Returns:
dict: 包含关节数据的字典。
"""
if not self.connected:
return {}
try:
self.serial_conn.write(self.bytes_to_send)
time.sleep(0.02)
bytes_received = self.serial_conn.read(self.serial_conn.inWaiting())
if len(bytes_received) == 0:
return {}
hex_received = binascii.hexlify(bytes_received).decode("utf-8").upper()
actions = self._parse_joint_data(hex_received)
return actions
except Exception as e:
logging.error(f"读取串口数据错误: {e}")
return {}
def get_controller_status(self):
bytes_to_send = binascii.unhexlify(self.control_hex_data.replace(" ", ""))
self.serial_conn.write(bytes_to_send)
time.sleep(0.02)
bytes_received = self.serial_conn.read(self.serial_conn.inWaiting())
hex_received = binascii.hexlify(bytes_received).decode("utf-8").upper()
# print("control status:", hex_received)
status = self._parse_controller_data(hex_received)
return status
def close(self):
"""关闭串口连接"""
if self.connected and hasattr(self, 'serial_conn'):
self.serial_conn.close()
self.connected = False
logging.info("串口连接已关闭")
class HybridController:
def __init__(self, init_info):
# 初始化pygame和手柄
pygame.init()
pygame.joystick.init()
# 检查是否有连接的手柄
if pygame.joystick.get_count() == 0:
raise Exception("未检测到手柄")
# 初始化手柄
self.joystick = pygame.joystick.Joystick(0)
self.joystick.init()
# 摇杆死区
self.deadzone = 0.15
# 控制模式: True为关节控制主模式False为末端控制
self.joint_control_mode = True
# 精细控制模式
self.fine_control_mode = False
# 初始化末端姿态和关节角度
self.init_joint = init_info['init_joint']
self.init_pose = init_info.get('init_pose', [0]*6)
self.max_gripper = init_info['max_gripper']
self.min_gripper = init_info['min_gripper']
servo_config_file = init_info['servo_config_file']
self.joint = self.init_joint.copy()
self.pose = self.init_pose.copy()
self.pose_speeds = [0.0] * 6
self.joint_speeds = [0.0] * 6
self.tozero = False
# 主臂关节状态
self.master_joint_actions = {}
self.master_controller_status = {}
self.use_master_arm = False
# 末端位姿限制
self.pose_limits = [
(-0.800, 0.800), # X (m)
(-0.800, 0.800), # Y (m)
(-0.800, 0.800), # Z (m)
(-3.14, 3.14), # RX (rad)
(-3.14, 3.14), # RY (rad)
(-3.14, 3.14) # RZ (rad)
]
# 关节角度限制 (度)
self.joint_limits = [
(-180, 180), # joint 1
(-180, 180), # joint 2
(-180, 180), # joint 3
(-180, 180), # joint 4
(-180, 180), # joint 5
(-180, 180) # joint 6
]
# 控制参数
self.linear_step = 0.002 # 线性移动步长(m)
self.angular_step = 0.01 # 角度步长(rad)
# 夹爪状态和速度
self.gripper_speed = 10
self.gripper = self.min_gripper
# 初始化串口通信(主臂关节状态获取)
self.servo_arm = None
if servo_config_file:
try:
self.servo_arm = ServoArm(servo_config_file)
self.use_master_arm = True
logging.info("串口主臂连接成功,启用主从控制模式")
except Exception as e:
logging.error(f"串口主臂连接失败: {e}")
self.use_master_arm = False
# 启动更新线程
self.running = True
self.thread = threading.Thread(target=self.update_controller)
self.thread.start()
print("混合控制器已启动")
print("主控制模式: 关节控制")
if self.use_master_arm:
print("主从控制: 启用")
print("Back按钮: 切换控制模式(关节/末端)")
print("L3按钮: 切换精细控制模式")
print("Start按钮: 重置到初始位置")
def _apply_nonlinear_mapping(self, value):
"""应用非线性映射以提高控制精度"""
sign = 1 if value >= 0 else -1
return sign * (abs(value) ** 2)
def _normalize_angle(self, angle):
"""将角度归一化到[-π, π]范围内"""
import math
while angle > math.pi:
angle -= 2 * math.pi
while angle < -math.pi:
angle += 2 * math.pi
return angle
def update_controller(self):
while self.running:
try:
pygame.event.pump()
except Exception as e:
print(f"控制器错误: {e}")
self.stop()
continue
# 检查控制模式切换 (Back按钮)
if self.joystick.get_button(6): # Back按钮
self.joint_control_mode = not self.joint_control_mode
mode_str = "关节控制" if self.joint_control_mode else "末端位姿控制"
print(f"切换到{mode_str}模式")
time.sleep(0.3) # 防止多次触发
# 检查精细控制模式切换 (L3按钮)
if self.joystick.get_button(10): # L3按钮
self.fine_control_mode = not self.fine_control_mode
print(f"切换到{'精细' if self.fine_control_mode else '普通'}控制模式")
time.sleep(0.3) # 防止多次触发
# 检查重置按钮 (Start按钮)
if self.joystick.get_button(7): # Start按钮
print("重置机械臂到初始位置...")
# print("init_joint", self.init_joint.copy())
self.tozero = True
self.joint = self.init_joint.copy()
self.pose = self.init_pose.copy()
self.pose_speeds = [0.0] * 6
self.joint_speeds = [0.0] * 6
self.gripper_speed = 10
self.gripper = self.min_gripper
print("机械臂已重置到初始位置")
time.sleep(0.3) # 防止多次触发
# 从串口获取主臂关节状态
if self.servo_arm and self.servo_arm.connected:
try:
self.master_joint_actions = self.servo_arm.get_joint_actions()
self.master_controller_status = self.servo_arm.get_controller_status()
if self.master_joint_actions:
logging.debug(f"主臂关节状态: {self.master_joint_actions}")
except Exception as e:
logging.error(f"获取主臂状态错误: {e}")
self.master_joint_actions = {}
# print(self.master_joint_actions)
# 根据控制模式更新相应的控制逻辑
if self.joint_control_mode:
# 关节控制模式下优先使用主臂数据Xbox作为辅助
self.update_joint_control()
else:
# 末端控制模式使用Xbox控制
self.update_end_pose()
time.sleep(0.02)
# print('gripper:', self.gripper)
def update_joint_control(self):
"""更新关节角度控制 - 优先使用主臂数据"""
if self.use_master_arm and self.master_joint_actions:
# 主从控制模式:直接使用主臂的关节角度
try:
# 将主臂关节角度映射到从臂
for i in range(6): # 假设只有6个关节需要控制
joint_key = f"joint_{i+1}"
if joint_key in self.master_joint_actions:
# 直接使用主臂的关节角度(已经是度数)
self.joint[i] = self.master_joint_actions[joint_key]
# 应用关节限制
min_val, max_val = self.joint_limits[i]
self.joint[i] = max(min_val, min(max_val, self.joint[i]))
# print(self.joint)
logging.debug(f"主臂关节映射到从臂: {self.joint[:6]}")
except Exception as e:
logging.error(f"主臂数据映射错误: {e}")
# 如果有主臂夹爪数据,使用主臂夹爪状态
if self.use_master_arm and "grasp" in self.master_joint_actions:
self.gripper = self.master_joint_actions["grasp"] * 1000
self.joint[-1] = self.gripper
def update_end_pose(self):
"""更新末端位姿控制"""
# 根据控制模式调整步长
current_linear_step = self.linear_step * (0.1 if self.fine_control_mode else 1.0)
current_angular_step = self.angular_step * (0.1 if self.fine_control_mode else 1.0)
# 方向键控制XY
hat = self.joystick.get_hat(0)
hat_up = hat[1] == 1 # Y+
hat_down = hat[1] == -1 # Y-
hat_left = hat[0] == -1 # X-
hat_right = hat[0] == 1 # X+
# 右摇杆控制Z
right_y_raw = -self.joystick.get_axis(4)
# 左摇杆控制RZ
left_y_raw = -self.joystick.get_axis(1)
# 应用死区
right_y = 0.0 if abs(right_y_raw) < self.deadzone else right_y_raw
left_y = 0.0 if abs(left_y_raw) < self.deadzone else left_y_raw
# 计算各轴速度
self.pose_speeds[1] = current_linear_step if hat_up else (-current_linear_step if hat_down else 0.0) # Y
self.pose_speeds[0] = -current_linear_step if hat_left else (current_linear_step if hat_right else 0.0) # X
# 设置Z速度右摇杆Y轴控制
z_mapping = self._apply_nonlinear_mapping(right_y)
self.pose_speeds[2] = z_mapping * current_linear_step # Z
# L1/R1控制RX旋转
LB = self.joystick.get_button(4) # RX-
RB = self.joystick.get_button(5) # RX+
self.pose_speeds[3] = (-current_angular_step if LB else (current_angular_step if RB else 0.0))
# △/□控制RY旋转
triangle = self.joystick.get_button(2) # RY+
square = self.joystick.get_button(3) # RY-
self.pose_speeds[4] = (current_angular_step if triangle else (-current_angular_step if square else 0.0))
# 左摇杆Y轴控制RZ旋转
rz_mapping = self._apply_nonlinear_mapping(left_y)
self.pose_speeds[5] = rz_mapping * current_angular_step * 2 # RZ
# 夹爪控制(圈/叉)
circle = self.joystick.get_button(1) # 夹爪开
cross = self.joystick.get_button(0) # 夹爪关
if circle:
self.gripper = min(self.max_gripper, self.gripper + self.gripper_speed)
elif cross:
self.gripper = max(self.min_gripper, self.gripper - self.gripper_speed)
# 更新末端位姿
for i in range(6):
self.pose[i] += self.pose_speeds[i]
# 角度归一化处理
for i in range(3, 6):
self.pose[i] = self._normalize_angle(self.pose[i])
def update_joint_state(self, joint):
self.joint = joint
# self.tozero = False
def update_endpose_state(self, end_pose):
self.pose = end_pose
# self.tozero = False
def update_tozero_state(self, tozero):
self.tozero = tozero
def get_action(self) -> Dict:
"""获取当前控制命令"""
return {
'control_mode': 'joint' if self.joint_control_mode else 'end_pose',
'use_master_arm': self.use_master_arm,
'master_joint_actions': self.master_joint_actions,
'master_controller_status': self.master_controller_status,
'end_pose': self.pose,
'joint_angles': self.joint,
'gripper': int(self.gripper),
'tozero': self.tozero
"""获取控制动作"""
try:
master_joint_actions = self.master_dual_arm.get_joint_data()
return self._format_action(master_joint_actions)
except Exception as e:
logging.error(f"获取动作失败: {e}")
def _format_action(self, master_joint_actions: dict) -> dict:
"""格式化动作数据"""
master_controller_status = {
'left': master_joint_actions['left_controller_status'],
'right': master_joint_actions['right_controller_status']
}
return {
'control_mode': 'joint',
'master_joint_actions': master_joint_actions['dual_joint_actions'],
'left_joint_actions': master_joint_actions['left_joint_actions'][:-1],
'right_joint_actions': master_joint_actions['right_joint_actions'][:-1],
'left_gripper_actions': master_joint_actions['left_joint_actions'][-1], # 修复bug
'right_gripper_actions': master_joint_actions['right_joint_actions'][-1],
'master_controller_status': master_controller_status
}
def stop(self):
"""停止控制器"""
self.running = False
if self.thread.is_alive():
self.thread.join()
if self.servo_arm:
self.servo_arm.close()
pygame.quit()
print("混合控制器已退出")
try:
if hasattr(self, 'master_dual_arm') and self.master_dual_arm:
self.master_dual_arm.shutdown()
print("混合控制器已退出")
except Exception as e:
logging.error(f"停止控制器失败: {e}")
class DummyEndposeMaster:
def __init__(self, config):
# 初始化pygame
pygame.init()
pygame.joystick.init()
# 获取所有 USB 游戏控制器的信息
self.joysticks = find_controller_index()
print(self.joysticks)
self.control_info = config.end_control_info
left_stick = self._init_stick('left')
right_stick = self._init_stick('right')
self.controllers = [left_stick, right_stick]
def _init_stick(self, arm_name:str):
stick_info = {}
stick_info['index'] = self.joysticks[self.control_info[arm_name]]['index']
stick_info['guid'] = self.control_info[arm_name]
stick_info['name'] = f'{arm_name}'
device_name = self.joysticks[self.control_info[arm_name]]['device_name']
stick = XboxStick(stick_info) if "Xbox" in device_name else FlightStick(stick_info)
stick.start_polling()
return stick
def reset(self):
"""重置到初始状态"""
self.joint = self.init_joint.copy()
self.pose = self.init_pose.copy()
self.pose_speeds = [0.0] * 6
self.joint_speeds = [0.0] * 6
self.gripper_speed = 10
self.gripper = self.min_gripper
print("已重置到初始状态")
def get_action(self, state) -> Dict:
from copy import deepcopy
new_state = deepcopy(state)
gamepad_action = {}
xyz = []
rxryrz = []
gripper = []
"""获取控制动作"""
try:
for i, controller in enumerate(self.controllers):
# states = controller.get_raw_states()
gamepad_action.update(controller.get_control_signal(controller.name))
xyz += [f"{controller.name}_x", f"{controller.name}_y", f"{controller.name}_z"]
rxryrz += [f"{controller.name}_joint_4", f"{controller.name}_joint_5", f"{controller.name}_joint_6"]
gripper += [f"{controller.name}_gripper"]
for name in xyz:
new_state['pose'][name] += (gamepad_action[name] * gamepad_action['xyz_vel'] * gamepad_action[name.split('_')[0]+'_ratio'])
for name in gripper:
new_state['joint'][name] += int(gamepad_action[name] * gamepad_action['gripper_vel'] * gamepad_action[name.split('_')[0]+'_ratio'])
new_state['joint'][name] = min(990, max(0, new_state['joint'][name]))
for name in rxryrz:
new_state['joint'][name] += (gamepad_action[name] * gamepad_action['rxyz_vel'] * gamepad_action[name.split('_')[0]+'_ratio'])
new_state['control_mode'] = 'endpose'
return new_state
except Exception as e:
logging.error(f"获取动作失败: {e}")
def stop(self):
"""停止控制器"""
try:
# 停止轮询线程
for controller in self.controllers:
controller.stop_polling()
except Exception as e:
logging.error(f"停止控制器失败: {e}")
class ControllerBase:
def __init__(self, joystick_info: dict):
# 初始化手柄对象
self.joystick = pygame.joystick.Joystick(joystick_info['index'])
self.joystick.init()
self.name = joystick_info['name']
self.guid = joystick_info['guid']
# 存储所有控制器状态的字典
self.states = {
'buttons': [False] * self.joystick.get_numbuttons(), # 按钮状态
'axes': [0.0] * self.joystick.get_numaxes(), # 摇杆和轴状态
'hats': [(0, 0)] * self.joystick.get_numhats() # 舵状态
}
# deadzone
self.deadzone = 0.15
# validzone
self.validzone = 0.05
self.ratio = 1
self.gripper_vel = 50
self.rxyz_vel = 5
self.xyz_vel = 0.02
# 线程控制标志
self.running = False
def start_polling(self):
"""启动线程以轮询控制器状态"""
if not self.running:
self.running = True
self.thread = threading.Thread(target=self._poll_controller)
self.thread.start()
def stop_polling(self):
"""停止线程"""
if self.running:
self.running = False
self.thread.join()
def _poll_controller(self):
"""后台线程函数,用于轮询控制器状态"""
while self.running:
# 处理pygame事件
pygame.event.pump()
# 获取按钮状态
for i in range(self.joystick.get_numbuttons()):
self.states['buttons'][i] = self.joystick.get_button(i)
# 获取摇杆和轴状态(通常范围是 -1.0 到 1.0
for i in range(self.joystick.get_numaxes()):
self.states['axes'][i] = self.joystick.get_axis(i)
# 获取舵状态(通常返回一个元组 (x, y),值范围为 -1, 0, 1
for i in range(self.joystick.get_numhats()):
self.states['hats'][i] = self.joystick.get_hat(i)
# 控制轮询频率
time.sleep(0.01)
def get_raw_states(self):
"""获取当前控制器状态"""
return self.states
class FlightStick(ControllerBase):
def __init__(self, joystick_info):
super().__init__(joystick_info)
def get_x_control_signal(self):
x = 0
if self.states['axes'][0] > self.validzone:
x = 1
elif self.states['axes'][0] < -self.validzone:
x = -1
return x
def get_y_control_signal(self):
y = 0
if self.states['axes'][1] > self.validzone:
y = -1
elif self.states['axes'][1] < -self.validzone:
y = 1
return y
def get_z_control_signal(self):
z = 0
if self.states['buttons'][0]:
z = 1
elif self.states['buttons'][1]:
z = -1
return z
def get_gripper_control_signal(self):
gripper = 0
if self.states['buttons'][2] == 1:
gripper = 1
elif self.states['buttons'][3] == 1:
gripper = -1
return gripper
def get_ratio_control_signal(self):
ratio = self.ratio
if self.states['axes'][2] > 0.8:
ratio = self.ratio / 5
elif self.states['axes'][2] < -0.8:
ratio = self.ratio * 2
return ratio
def get_rx_control_signal(self):
rx = 0
if self.states['hats'][0][0] == -1:
rx = 1
elif self.states['hats'][0][0] == 1:
rx = -1
else:
rx = 0
return rx
def get_ry_control_signal(self):
ry = 0
if self.states['hats'][0][1] == 1:
ry = -1
elif self.states['hats'][0][1] == -1:
ry = 1
else:
ry = 0
return ry
def get_rz_control_signal(self):
rz = 0
if self.states['axes'][3] < -self.validzone:
rz = -1
elif self.states['axes'][3] > self.validzone:
rz = 1
else:
rz = 0
return rz
def get_control_signal(self, prefix: str = ""):
"""获取所有控制信号"""
return {
f'{prefix}_x': self.get_x_control_signal(),
f'{prefix}_y': self.get_y_control_signal(),
f'{prefix}_z': self.get_z_control_signal(),
f'{prefix}_joint_4': self.get_rx_control_signal(),
f'{prefix}_joint_5': self.get_ry_control_signal(),
f'{prefix}_joint_6': self.get_rz_control_signal(),
f'{prefix}_gripper': self.get_gripper_control_signal(),
f'{prefix}_ratio': self.get_ratio_control_signal(),
'gripper_vel': self.gripper_vel,
'rxyz_vel': self.rxyz_vel,
'xyz_vel': self.xyz_vel
}
class XboxStick(ControllerBase):
def __init__(self, joystick_info: dict):
super().__init__(joystick_info)
def get_x_control_signal(self):
"""获取 X 轴控制信号"""
x = 0
if self.states['hats'][0][0] == -1:
x = 1
elif self.states['hats'][0][0] == 1:
x = -1
return x
def get_y_control_signal(self):
"""获取 Y 轴控制信号"""
y = 0
if self.states['hats'][0][1] == 1:
y = -1
elif self.states['hats'][0][1] == -1:
y = 1
return y
def get_z_control_signal(self):
"""获取 Z 轴控制信号"""
z = 0
if self.states['axes'][4] > self.deadzone: # A 按钮
z = -1
elif self.states['axes'][4] < -self.deadzone: # B 按钮
z = 1
return z
def get_ratio_control_signal(self):
"""获取速度控制信号"""
ratio = self.ratio
if self.states['axes'][2] > 0.8: # LT 按钮
ratio = self.ratio * 2
elif self.states['axes'][5] > 0.8: # RT 按钮
ratio = self.ratio / 5
return ratio
def get_gripper_control_signal(self):
gripper = 0
if self.states['buttons'][0] == 1:
gripper = 1
elif self.states['buttons'][1] == 1:
gripper = -1
return gripper
def get_rx_control_signal(self):
"""获取 RX 轴控制信号"""
rx = 0
if self.states['axes'][0] > self.deadzone: # 左舵
rx = -1
elif self.states['axes'][0] < -self.deadzone: # 右舵
rx = 1
return rx
def get_ry_control_signal(self):
"""获取 RY 轴控制信号"""
ry = 0
if self.states['axes'][1] > self.deadzone: # 上舵
ry = 1
elif self.states['axes'][1] < -self.deadzone: # 下舵
ry = -1
return ry
def get_rz_control_signal(self):
"""获取 RZ 轴控制信号"""
rz = 0
if self.states['buttons'][4] == 1: # 左摇杆
rz = 1
elif self.states['buttons'][5] == 1: # 右摇杆
rz = -1
return rz
def get_control_signal(self, prefix: str = ""):
"""获取所有控制信号"""
return {
f'{prefix}_x': self.get_x_control_signal(),
f'{prefix}_y': self.get_y_control_signal(),
f'{prefix}_z': self.get_z_control_signal(),
f'{prefix}_joint_4': self.get_rx_control_signal(),
f'{prefix}_joint_5': self.get_ry_control_signal(),
f'{prefix}_joint_6': self.get_rz_control_signal(),
f'{prefix}_gripper': self.get_gripper_control_signal(),
f'{prefix}_ratio': self.get_ratio_control_signal(),
'gripper_vel': self.gripper_vel,
'rxyz_vel': self.rxyz_vel,
'xyz_vel': self.xyz_vel
}
@dataclass
class ControllerConfig:
"""控制器配置"""
init_joint: list
init_pose: list
max_gripper: int
min_gripper: int
config_file: str
end_control_info: dict
def parse_init_info(init_info: dict) -> ControllerConfig:
"""解析初始化信息"""
return ControllerConfig(
init_joint=init_info['init_joint'],
init_pose=init_info.get('init_pose', [0]*12),
max_gripper=init_info['max_gripper'],
min_gripper=init_info['min_gripper'],
config_file=init_info['servo_config_file'],
end_control_info=init_info['end_control_info']
)
# 使用示例
if __name__ == "__main__":
# 初始化睿尔曼机械臂
arm = RoboticArm(rm_thread_mode_e.RM_TRIPLE_MODE_E)
# 创建机械臂连接
handle = arm.rm_create_robot_arm("192.168.3.18", 8080)
print(f"机械臂连接ID: {handle.id}")
init_pose = arm.rm_get_current_arm_state()[1]['pose']
with open('/home/maic/LYT/lerobot/lerobot/common/robot_devices/teleop/realman_mix.yaml', "r") as file:
config = yaml.safe_load(file)
config['init_pose'] = init_pose
arm_controller = HybridController(config)
try:
while True:
print(arm_controller.get_action())
time.sleep(0.1)
except KeyboardInterrupt:
arm_controller.stop()
config = {
'init_joint': {'joint': [-170, 90, 0, 90, 120, 0, 10, 170, 90, 0, -90, 120, 0, 10]},
'init_pose': {},
'max_gripper': {},
'min_gripper': {},
'servo_config_file': {},
'end_control_info': {'left': "0300b14bff1100003708000010010000" , 'right': '0300509d5e040000120b000009050000'}
}
config = parse_init_info(config)
endpose_arm = DummyEndposeMaster(config)
while True:
gamepad_action = {}
xyz = []
for i, controller in enumerate(endpose_arm.controllers):
# states = controller.get_raw_states()
gamepad_action.update(controller.get_control_signal(controller.name))
xyz += [f"{controller.name}_x", f"{controller.name}_y", f"{controller.name}_z"]
time.sleep(1)
print(gamepad_action)

View File

@@ -1,114 +0,0 @@
import pygame
import threading
import time
import serial
import binascii
import logging
import yaml
from typing import Dict
from dataclasses import dataclass
from Robotic_Arm.rm_robot_interface import *
from .servo_server import ServoArmServer
@dataclass
class ControllerConfig:
"""控制器配置"""
init_joint: list
init_pose: list
max_gripper: int
min_gripper: int
config_file: str
class HybridController:
def __init__(self, init_info):
self.config = self._parse_init_info(init_info)
self.joint = self.config.init_joint.copy()
self.pose = self.config.init_pose.copy()
self.joint_control_mode = True
self._initialize_master_arm()
def _parse_init_info(self, init_info: dict) -> ControllerConfig:
"""解析初始化信息"""
return ControllerConfig(
init_joint=init_info['init_joint'],
init_pose=init_info.get('init_pose', [0]*12),
max_gripper=init_info['max_gripper'],
min_gripper=init_info['min_gripper'],
config_file=init_info['servo_config_file']
)
def _initialize_master_arm(self):
"""初始化主控臂"""
try:
self.master_dual_arm = ServoArmServer(self.config.config_file)
except Exception as e:
logging.error(f"初始化主控臂失败: {e}")
raise
def get_action(self) -> Dict:
"""获取控制动作"""
try:
master_joint_actions = self.master_dual_arm.get_joint_data()
return self._format_action(master_joint_actions)
except Exception as e:
logging.error(f"获取动作失败: {e}")
def _format_action(self, master_joint_actions: dict) -> dict:
"""格式化动作数据"""
master_controller_status = {
'left': master_joint_actions['left_controller_status'],
'right': master_joint_actions['right_controller_status']
}
return {
'control_mode': 'joint' if self.joint_control_mode else 'end_pose',
'master_joint_actions': master_joint_actions['dual_joint_actions'],
'left_joint_actions': master_joint_actions['left_joint_actions'][:-1],
'right_joint_actions': master_joint_actions['right_joint_actions'][:-1],
'left_gripper_actions': master_joint_actions['left_joint_actions'][-1], # 修复bug
'right_gripper_actions': master_joint_actions['right_joint_actions'][-1],
'master_controller_status': master_controller_status,
'end_pose': self.pose
}
def switch_control_mode(self):
"""切换控制模式"""
self.joint_control_mode = not self.joint_control_mode
mode = "关节" if self.joint_control_mode else "末端"
print(f"切换到{mode}控制模式")
def stop(self):
"""停止控制器"""
try:
if hasattr(self, 'master_dual_arm') and self.master_dual_arm:
self.master_dual_arm.shutdown()
print("混合控制器已退出")
except Exception as e:
logging.error(f"停止控制器失败: {e}")
def reset(self):
"""重置控制器"""
self.joint = self.config.init_joint.copy()
self.pose = self.config.init_pose.copy()
self.joint_control_mode = True
# 使用示例
if __name__ == "__main__":
init_info = {
'init_joint': [-175, 90, 90, 45, 90, -90, 10, 175, 90, 90, -45, 90, 90, 10],
'init_pose': [[-0.0305, 0.125938, 0.13153, 3.141, 0.698, -1.57, -0.030486, -0.11487, 0.144707, 3.141, 0.698, 1.57]],
'max_gripper': 990,
'min_gripper': 10,
'servo_config_file': '/home/maic/LYT/lerobot/lerobot/common/robot_devices/teleop/servo_dual.yaml'
}
arm_controller = HybridController(init_info)
time.sleep(1)
try:
while True:
print(arm_controller.get_action())
time.sleep(1)
except KeyboardInterrupt:
arm_controller.stop()

View File

@@ -0,0 +1,76 @@
import time
import logging
from typing import Dict
from dataclasses import dataclass
from .gamepad import RealmanAlohaMaster, DummyEndposeMaster
@dataclass
class ControllerConfig:
"""控制器配置"""
init_joint: list
init_pose: list
max_gripper: int
min_gripper: int
config_file: str
end_control_info: dict
class HybridController:
def __init__(self, init_info):
self.config = self._parse_init_info(init_info)
self.joint = self.config.init_joint.copy()
self.pose = self.config.init_pose.copy()
self.joint_arm = RealmanAlohaMaster(self.config)
self.endpose_arm = DummyEndposeMaster(self.config)
def _parse_init_info(self, init_info: dict) -> ControllerConfig:
"""解析初始化信息"""
return ControllerConfig(
init_joint=init_info['init_joint'],
init_pose=init_info.get('init_pose', [0]*12),
max_gripper=init_info['max_gripper'],
min_gripper=init_info['min_gripper'],
config_file=init_info['servo_config_file'],
end_control_info=init_info['end_control_info']
)
def get_action(self, state) -> Dict:
"""获取控制动作"""
try:
endpose_action = self.endpose_arm.get_action(state)
return endpose_action
# return self.joint_arm.get_action()
except Exception as e:
logging.error(f"获取动作失败: {e}")
def stop(self):
self.joint_arm.stop()
def reset(self):
"""重置控制器"""
self.joint = self.config.init_joint.copy()
self.pose = self.config.init_pose.copy()
self.joint_control_mode = True
# 使用示例
if __name__ == "__main__":
init_info = {
'init_joint': [-175, 90, 90, 45, 90, -90, 10, 175, 90, 90, -45, 90, 90, 10],
'init_pose': [[-0.0305, 0.125938, 0.13153, 3.141, 0.698, -1.57, -0.030486, -0.11487, 0.144707, 3.141, 0.698, 1.57]],
'max_gripper': 990,
'min_gripper': 10,
'servo_config_file': '/home/maic/LYT/lerobot/lerobot/common/robot_devices/teleop/servo_dual.yaml',
'end_control_info': {'left': '0300b14bff1100003708000010010000', 'right': '030003f05e0400008e02000010010000'}
}
arm_controller = HybridController(init_info)
time.sleep(1)
try:
while True:
print(arm_controller.get_action())
time.sleep(1)
except KeyboardInterrupt:
arm_controller.stop()

View File

@@ -0,0 +1,466 @@
import pygame
import threading
import time
import serial
import binascii
import logging
import yaml
from typing import Dict
from Robotic_Arm.rm_robot_interface import *
class ServoArm:
def __init__(self, config_file="config.yaml"):
"""初始化机械臂的串口连接并发送初始数据。
Args:
config_file (str): 配置文件的路径。
"""
self.config = self._load_config(config_file)
self.port = self.config["port"]
self.baudrate = self.config["baudrate"]
self.joint_hex_data = self.config["joint_hex_data"]
self.control_hex_data = self.config["control_hex_data"]
self.arm_axis = self.config.get("arm_axis", 7)
try:
self.serial_conn = serial.Serial(self.port, self.baudrate, timeout=0)
self.bytes_to_send = binascii.unhexlify(self.joint_hex_data.replace(" ", ""))
self.serial_conn.write(self.bytes_to_send)
time.sleep(1)
self.connected = True
logging.info(f"串口连接成功: {self.port}")
except Exception as e:
logging.error(f"串口连接失败: {e}")
self.connected = False
def _load_config(self, config_file):
"""加载配置文件。
Args:
config_file (str): 配置文件的路径。
Returns:
dict: 配置文件内容。
"""
try:
with open(config_file, "r") as file:
config = yaml.safe_load(file)
return config
except Exception as e:
logging.error(f"配置文件加载失败: {e}")
# 返回默认配置
return {
"port": "/dev/ttyUSB0",
"baudrate": 460800,
"joint_hex_data": "55 AA 02 00 00 67",
"control_hex_data": "55 AA 08 00 00 B9",
"arm_axis": 6
}
def _bytes_to_signed_int(self, byte_data):
"""将字节数据转换为有符号整数。
Args:
byte_data (bytes): 字节数据。
Returns:
int: 有符号整数。
"""
return int.from_bytes(byte_data, byteorder="little", signed=True)
def _parse_joint_data(self, hex_received):
"""解析接收到的十六进制数据并提取关节数据。
Args:
hex_received (str): 接收到的十六进制字符串数据。
Returns:
dict: 解析后的关节数据。
"""
logging.debug(f"hex_received: {hex_received}")
joints = {}
for i in range(self.arm_axis):
start = 14 + i * 10
end = start + 8
joint_hex = hex_received[start:end]
joint_byte_data = bytearray.fromhex(joint_hex)
joint_value = self._bytes_to_signed_int(joint_byte_data) / 10000.0
joints[f"joint_{i+1}"] = joint_value
grasp_start = 14 + self.arm_axis*10
grasp_hex = hex_received[grasp_start:grasp_start+8]
grasp_byte_data = bytearray.fromhex(grasp_hex)
# 夹爪进行归一化处理
grasp_value = self._bytes_to_signed_int(grasp_byte_data)/1000
joints["grasp"] = grasp_value
return joints
def _parse_controller_data(self, hex_received):
status = {
'infrared': 0,
'button': 0
}
if len(hex_received) == 18:
status['infrared'] = self._bytes_to_signed_int(bytearray.fromhex(hex_received[12:14]))
status['button'] = self._bytes_to_signed_int(bytearray.fromhex(hex_received[14:16]))
# print(infrared)
return status
def get_joint_actions(self):
"""从串口读取数据并解析关节动作。
Returns:
dict: 包含关节数据的字典。
"""
if not self.connected:
return {}
try:
self.serial_conn.write(self.bytes_to_send)
time.sleep(0.02)
bytes_received = self.serial_conn.read(self.serial_conn.inWaiting())
if len(bytes_received) == 0:
return {}
hex_received = binascii.hexlify(bytes_received).decode("utf-8").upper()
actions = self._parse_joint_data(hex_received)
return actions
except Exception as e:
logging.error(f"读取串口数据错误: {e}")
return {}
def get_controller_status(self):
bytes_to_send = binascii.unhexlify(self.control_hex_data.replace(" ", ""))
self.serial_conn.write(bytes_to_send)
time.sleep(0.02)
bytes_received = self.serial_conn.read(self.serial_conn.inWaiting())
hex_received = binascii.hexlify(bytes_received).decode("utf-8").upper()
# print("control status:", hex_received)
status = self._parse_controller_data(hex_received)
return status
def close(self):
"""关闭串口连接"""
if self.connected and hasattr(self, 'serial_conn'):
self.serial_conn.close()
self.connected = False
logging.info("串口连接已关闭")
class HybridController:
def __init__(self, init_info):
# 初始化pygame和手柄
pygame.init()
pygame.joystick.init()
# 检查是否有连接的手柄
if pygame.joystick.get_count() == 0:
raise Exception("未检测到手柄")
# 初始化手柄
self.joystick = pygame.joystick.Joystick(0)
self.joystick.init()
# 摇杆死区
self.deadzone = 0.15
# 控制模式: True为关节控制主模式False为末端控制
self.joint_control_mode = True
# 精细控制模式
self.fine_control_mode = False
# 初始化末端姿态和关节角度
self.init_joint = init_info['init_joint']
self.init_pose = init_info.get('init_pose', [0]*6)
self.max_gripper = init_info['max_gripper']
self.min_gripper = init_info['min_gripper']
servo_config_file = init_info['servo_config_file']
self.joint = self.init_joint.copy()
self.pose = self.init_pose.copy()
self.pose_speeds = [0.0] * 6
self.joint_speeds = [0.0] * 6
self.tozero = False
# 主臂关节状态
self.master_joint_actions = {}
self.master_controller_status = {}
self.use_master_arm = False
# 末端位姿限制
self.pose_limits = [
(-0.800, 0.800), # X (m)
(-0.800, 0.800), # Y (m)
(-0.800, 0.800), # Z (m)
(-3.14, 3.14), # RX (rad)
(-3.14, 3.14), # RY (rad)
(-3.14, 3.14) # RZ (rad)
]
# 关节角度限制 (度)
self.joint_limits = [
(-180, 180), # joint 1
(-180, 180), # joint 2
(-180, 180), # joint 3
(-180, 180), # joint 4
(-180, 180), # joint 5
(-180, 180) # joint 6
]
# 控制参数
self.linear_step = 0.002 # 线性移动步长(m)
self.angular_step = 0.01 # 角度步长(rad)
# 夹爪状态和速度
self.gripper_speed = 10
self.gripper = self.min_gripper
# 初始化串口通信(主臂关节状态获取)
self.servo_arm = None
if servo_config_file:
try:
self.servo_arm = ServoArm(servo_config_file)
self.use_master_arm = True
logging.info("串口主臂连接成功,启用主从控制模式")
except Exception as e:
logging.error(f"串口主臂连接失败: {e}")
self.use_master_arm = False
# 启动更新线程
self.running = True
self.thread = threading.Thread(target=self.update_controller)
self.thread.start()
print("混合控制器已启动")
print("主控制模式: 关节控制")
if self.use_master_arm:
print("主从控制: 启用")
print("Back按钮: 切换控制模式(关节/末端)")
print("L3按钮: 切换精细控制模式")
print("Start按钮: 重置到初始位置")
def _apply_nonlinear_mapping(self, value):
"""应用非线性映射以提高控制精度"""
sign = 1 if value >= 0 else -1
return sign * (abs(value) ** 2)
def _normalize_angle(self, angle):
"""将角度归一化到[-π, π]范围内"""
import math
while angle > math.pi:
angle -= 2 * math.pi
while angle < -math.pi:
angle += 2 * math.pi
return angle
def update_controller(self):
while self.running:
try:
pygame.event.pump()
except Exception as e:
print(f"控制器错误: {e}")
self.stop()
continue
# 检查控制模式切换 (Back按钮)
if self.joystick.get_button(6): # Back按钮
self.joint_control_mode = not self.joint_control_mode
mode_str = "关节控制" if self.joint_control_mode else "末端位姿控制"
print(f"切换到{mode_str}模式")
time.sleep(0.3) # 防止多次触发
# 检查精细控制模式切换 (L3按钮)
if self.joystick.get_button(10): # L3按钮
self.fine_control_mode = not self.fine_control_mode
print(f"切换到{'精细' if self.fine_control_mode else '普通'}控制模式")
time.sleep(0.3) # 防止多次触发
# 检查重置按钮 (Start按钮)
if self.joystick.get_button(7): # Start按钮
print("重置机械臂到初始位置...")
# print("init_joint", self.init_joint.copy())
self.tozero = True
self.joint = self.init_joint.copy()
self.pose = self.init_pose.copy()
self.pose_speeds = [0.0] * 6
self.joint_speeds = [0.0] * 6
self.gripper_speed = 10
self.gripper = self.min_gripper
print("机械臂已重置到初始位置")
time.sleep(0.3) # 防止多次触发
# 从串口获取主臂关节状态
if self.servo_arm and self.servo_arm.connected:
try:
self.master_joint_actions = self.servo_arm.get_joint_actions()
self.master_controller_status = self.servo_arm.get_controller_status()
if self.master_joint_actions:
logging.debug(f"主臂关节状态: {self.master_joint_actions}")
except Exception as e:
logging.error(f"获取主臂状态错误: {e}")
self.master_joint_actions = {}
# print(self.master_joint_actions)
# 根据控制模式更新相应的控制逻辑
if self.joint_control_mode:
# 关节控制模式下优先使用主臂数据Xbox作为辅助
self.update_joint_control()
else:
# 末端控制模式使用Xbox控制
self.update_end_pose()
time.sleep(0.02)
# print('gripper:', self.gripper)
def update_joint_control(self):
"""更新关节角度控制 - 优先使用主臂数据"""
if self.use_master_arm and self.master_joint_actions:
# 主从控制模式:直接使用主臂的关节角度
try:
# 将主臂关节角度映射到从臂
for i in range(6): # 假设只有6个关节需要控制
joint_key = f"joint_{i+1}"
if joint_key in self.master_joint_actions:
# 直接使用主臂的关节角度(已经是度数)
self.joint[i] = self.master_joint_actions[joint_key]
# 应用关节限制
min_val, max_val = self.joint_limits[i]
self.joint[i] = max(min_val, min(max_val, self.joint[i]))
# print(self.joint)
logging.debug(f"主臂关节映射到从臂: {self.joint[:6]}")
except Exception as e:
logging.error(f"主臂数据映射错误: {e}")
# 如果有主臂夹爪数据,使用主臂夹爪状态
if self.use_master_arm and "grasp" in self.master_joint_actions:
self.gripper = self.master_joint_actions["grasp"] * 1000
self.joint[-1] = self.gripper
def update_end_pose(self):
"""更新末端位姿控制"""
# 根据控制模式调整步长
current_linear_step = self.linear_step * (0.1 if self.fine_control_mode else 1.0)
current_angular_step = self.angular_step * (0.1 if self.fine_control_mode else 1.0)
# 方向键控制XY
hat = self.joystick.get_hat(0)
hat_up = hat[1] == 1 # Y+
hat_down = hat[1] == -1 # Y-
hat_left = hat[0] == -1 # X-
hat_right = hat[0] == 1 # X+
# 右摇杆控制Z
right_y_raw = -self.joystick.get_axis(4)
# 左摇杆控制RZ
left_y_raw = -self.joystick.get_axis(1)
# 应用死区
right_y = 0.0 if abs(right_y_raw) < self.deadzone else right_y_raw
left_y = 0.0 if abs(left_y_raw) < self.deadzone else left_y_raw
# 计算各轴速度
self.pose_speeds[1] = current_linear_step if hat_up else (-current_linear_step if hat_down else 0.0) # Y
self.pose_speeds[0] = -current_linear_step if hat_left else (current_linear_step if hat_right else 0.0) # X
# 设置Z速度右摇杆Y轴控制
z_mapping = self._apply_nonlinear_mapping(right_y)
self.pose_speeds[2] = z_mapping * current_linear_step # Z
# L1/R1控制RX旋转
LB = self.joystick.get_button(4) # RX-
RB = self.joystick.get_button(5) # RX+
self.pose_speeds[3] = (-current_angular_step if LB else (current_angular_step if RB else 0.0))
# △/□控制RY旋转
triangle = self.joystick.get_button(2) # RY+
square = self.joystick.get_button(3) # RY-
self.pose_speeds[4] = (current_angular_step if triangle else (-current_angular_step if square else 0.0))
# 左摇杆Y轴控制RZ旋转
rz_mapping = self._apply_nonlinear_mapping(left_y)
self.pose_speeds[5] = rz_mapping * current_angular_step * 2 # RZ
# 夹爪控制(圈/叉)
circle = self.joystick.get_button(1) # 夹爪开
cross = self.joystick.get_button(0) # 夹爪关
if circle:
self.gripper = min(self.max_gripper, self.gripper + self.gripper_speed)
elif cross:
self.gripper = max(self.min_gripper, self.gripper - self.gripper_speed)
# 更新末端位姿
for i in range(6):
self.pose[i] += self.pose_speeds[i]
# 角度归一化处理
for i in range(3, 6):
self.pose[i] = self._normalize_angle(self.pose[i])
def update_joint_state(self, joint):
self.joint = joint
# self.tozero = False
def update_endpose_state(self, end_pose):
self.pose = end_pose
# self.tozero = False
def update_tozero_state(self, tozero):
self.tozero = tozero
def get_action(self) -> Dict:
"""获取当前控制命令"""
return {
'control_mode': 'joint' if self.joint_control_mode else 'end_pose',
'use_master_arm': self.use_master_arm,
'master_joint_actions': self.master_joint_actions,
'master_controller_status': self.master_controller_status,
'end_pose': self.pose,
'joint_angles': self.joint,
'gripper': int(self.gripper),
'tozero': self.tozero
}
def stop(self):
"""停止控制器"""
self.running = False
if self.thread.is_alive():
self.thread.join()
if self.servo_arm:
self.servo_arm.close()
pygame.quit()
print("混合控制器已退出")
def reset(self):
"""重置到初始状态"""
self.joint = self.init_joint.copy()
self.pose = self.init_pose.copy()
self.pose_speeds = [0.0] * 6
self.joint_speeds = [0.0] * 6
self.gripper_speed = 10
self.gripper = self.min_gripper
print("已重置到初始状态")
# 使用示例
if __name__ == "__main__":
# 初始化睿尔曼机械臂
arm = RoboticArm(rm_thread_mode_e.RM_TRIPLE_MODE_E)
# 创建机械臂连接
handle = arm.rm_create_robot_arm("192.168.3.18", 8080)
print(f"机械臂连接ID: {handle.id}")
init_pose = arm.rm_get_current_arm_state()[1]['pose']
with open('/home/maic/LYT/lerobot/lerobot/common/robot_devices/teleop/realman_mix.yaml', "r") as file:
config = yaml.safe_load(file)
config['init_pose'] = init_pose
arm_controller = HybridController(config)
try:
while True:
print(arm_controller.get_action())
time.sleep(0.1)
except KeyboardInterrupt:
arm_controller.stop()

View File

@@ -0,0 +1,140 @@
import pygame
import time
import sys
class FlightStickDetector:
def __init__(self):
# 初始化pygame
pygame.init()
pygame.joystick.init()
self.joysticks = []
self.running = True
def detect_joysticks(self):
"""检测连接的手柄"""
joystick_count = pygame.joystick.get_count()
print(f"检测到 {joystick_count} 个手柄设备")
if joystick_count == 0:
print("未检测到任何手柄设备!")
return False
# 初始化所有检测到的手柄
self.joysticks = []
for i in range(joystick_count):
joystick = pygame.joystick.Joystick(i)
joystick.init()
self.joysticks.append(joystick)
print(f"\n手柄 {i}:")
print(f" 名称: {joystick.get_name()}")
print(f" 轴数量: {joystick.get_numaxes()}")
print(f" 按钮数量: {joystick.get_numbuttons()}")
print(f" 帽子开关数量: {joystick.get_numhats()}")
return True
def get_joystick_data(self, joystick_id=0):
"""获取指定手柄的数据"""
if joystick_id >= len(self.joysticks):
return None
joystick = self.joysticks[joystick_id]
# 获取轴数据
axes = []
for i in range(joystick.get_numaxes()):
axis_value = joystick.get_axis(i)
axes.append(round(axis_value, 3))
# 获取按钮数据
buttons = []
for i in range(joystick.get_numbuttons()):
button_pressed = joystick.get_button(i)
buttons.append(button_pressed)
# 获取帽子开关数据
hats = []
for i in range(joystick.get_numhats()):
hat_value = joystick.get_hat(i)
hats.append(hat_value)
return {
'name': joystick.get_name(),
'axes': axes,
'buttons': buttons,
'hats': hats
}
def monitor_joystick(self, joystick_id=0, update_interval=0.1):
"""实时监控手柄数据"""
print(f"\n开始监控手柄 {joystick_id} (按Ctrl+C停止)")
print("=" * 50)
try:
while self.running:
pygame.event.pump() # 更新事件队列
data = self.get_joystick_data(joystick_id)
if data is None:
print("无法获取手柄数据")
break
# 清屏并显示数据
print("\033[H\033[J", end="") # ANSI清屏
print(f"手柄: {data['name']}")
print("-" * 40)
# 显示轴数据(通常用于飞行控制)
axis_names = ['X轴(副翼)', 'Y轴(升降舵)', 'Z轴(方向舵)',
'油门', '轴4', '轴5', '轴6', '轴7']
print("轴数据:")
for i, value in enumerate(data['axes']):
name = axis_names[i] if i < len(axis_names) else f'{i}'
# 创建进度条显示
bar_length = 20
bar_pos = int((value + 1) * bar_length / 2)
bar = [''] * bar_length
if 0 <= bar_pos < bar_length:
bar[bar_pos] = ''
bar_str = ''.join(bar)
print(f" {name:12}: {value:6.3f} [{bar_str}]")
# 显示按钮数据
pressed_buttons = [i for i, pressed in enumerate(data['buttons']) if pressed]
if pressed_buttons:
print(f"\n按下的按钮: {pressed_buttons}")
# 显示帽子开关数据
if data['hats']:
print(f"帽子开关: {data['hats']}")
time.sleep(update_interval)
except KeyboardInterrupt:
print("\n\n监控已停止")
def cleanup(self):
"""清理资源"""
pygame.joystick.quit()
pygame.quit()
# 使用示例
def main():
detector = FlightStickDetector()
try:
# 检测手柄
if detector.detect_joysticks():
# 如果检测到手柄,开始监控第一个手柄
detector.monitor_joystick(0)
else:
print("请连接飞行手柄后重试")
finally:
detector.cleanup()
if __name__ == "__main__":
main()