refactory gamepad

This commit is contained in:
2025-07-06 14:28:38 +08:00
parent b04e6e0c7b
commit b3e9e11e11
6 changed files with 915 additions and 454 deletions

View File

@@ -7,7 +7,7 @@ import torch
import numpy as np
from dataclasses import dataclass, field, replace
from collections import deque
from lerobot.common.robot_devices.teleop.gamepad import HybridController
from lerobot.common.robot_devices.teleop.realman_single import HybridController
from lerobot.common.robot_devices.motors.utils import get_motor_names, make_motors_buses_from_configs
from lerobot.common.robot_devices.cameras.utils import make_cameras_from_configs
from lerobot.common.robot_devices.utils import RobotDeviceAlreadyConnectedError, RobotDeviceNotConnectedError

View File

@@ -9,7 +9,7 @@ import logging
from typing import Optional, Tuple, Dict
from dataclasses import dataclass, field, replace
from collections import deque
from lerobot.common.robot_devices.teleop.gamepad_dual import HybridController
from lerobot.common.robot_devices.teleop.realman_aloha_dual import HybridController
from lerobot.common.robot_devices.motors.utils import get_motor_names, make_motors_buses_from_configs
from lerobot.common.robot_devices.cameras.utils import make_cameras_from_configs
from lerobot.common.robot_devices.utils import RobotDeviceAlreadyConnectedError, RobotDeviceNotConnectedError
@@ -226,7 +226,7 @@ class RealmanDualRobot:
# 获取动作
action = self.teleop.get_action()
# 执行动作
self._execute_action(action, state)
# self._execute_action(action, state)
# 更新状态队列
self._update_state_queue()

View File

@@ -1,466 +1,319 @@
import pygame
import threading
import time
import serial
import binascii
import logging
import yaml
from typing import Dict
from Robotic_Arm.rm_robot_interface import *
import pyudev
class ServoArm:
def __init__(self, config_file="config.yaml"):
"""初始化机械臂的串口连接并发送初始数据。
Args:
config_file (str): 配置文件的路径。
"""
self.config = self._load_config(config_file)
self.port = self.config["port"]
self.baudrate = self.config["baudrate"]
self.joint_hex_data = self.config["joint_hex_data"]
self.control_hex_data = self.config["control_hex_data"]
self.arm_axis = self.config.get("arm_axis", 7)
try:
self.serial_conn = serial.Serial(self.port, self.baudrate, timeout=0)
self.bytes_to_send = binascii.unhexlify(self.joint_hex_data.replace(" ", ""))
self.serial_conn.write(self.bytes_to_send)
time.sleep(1)
self.connected = True
logging.info(f"串口连接成功: {self.port}")
except Exception as e:
logging.error(f"串口连接失败: {e}")
self.connected = False
def _load_config(self, config_file):
"""加载配置文件。
Args:
config_file (str): 配置文件的路径。
Returns:
dict: 配置文件内容。
"""
try:
with open(config_file, "r") as file:
config = yaml.safe_load(file)
return config
except Exception as e:
logging.error(f"配置文件加载失败: {e}")
# 返回默认配置
return {
"port": "/dev/ttyUSB0",
"baudrate": 460800,
"joint_hex_data": "55 AA 02 00 00 67",
"control_hex_data": "55 AA 08 00 00 B9",
"arm_axis": 6
}
def _bytes_to_signed_int(self, byte_data):
"""将字节数据转换为有符号整数。
Args:
byte_data (bytes): 字节数据。
Returns:
int: 有符号整数。
"""
return int.from_bytes(byte_data, byteorder="little", signed=True)
def _parse_joint_data(self, hex_received):
"""解析接收到的十六进制数据并提取关节数据。
Args:
hex_received (str): 接收到的十六进制字符串数据。
Returns:
dict: 解析后的关节数据。
"""
logging.debug(f"hex_received: {hex_received}")
joints = {}
for i in range(self.arm_axis):
start = 14 + i * 10
end = start + 8
joint_hex = hex_received[start:end]
joint_byte_data = bytearray.fromhex(joint_hex)
joint_value = self._bytes_to_signed_int(joint_byte_data) / 10000.0
joints[f"joint_{i+1}"] = joint_value
grasp_start = 14 + self.arm_axis*10
grasp_hex = hex_received[grasp_start:grasp_start+8]
grasp_byte_data = bytearray.fromhex(grasp_hex)
# 夹爪进行归一化处理
grasp_value = self._bytes_to_signed_int(grasp_byte_data)/1000
joints["grasp"] = grasp_value
return joints
def _parse_controller_data(self, hex_received):
status = {
'infrared': 0,
'button': 0
}
if len(hex_received) == 18:
status['infrared'] = self._bytes_to_signed_int(bytearray.fromhex(hex_received[12:14]))
status['button'] = self._bytes_to_signed_int(bytearray.fromhex(hex_received[14:16]))
# print(infrared)
return status
def get_joint_actions(self):
"""从串口读取数据并解析关节动作。
Returns:
dict: 包含关节数据的字典。
"""
if not self.connected:
return {}
try:
self.serial_conn.write(self.bytes_to_send)
time.sleep(0.02)
bytes_received = self.serial_conn.read(self.serial_conn.inWaiting())
if len(bytes_received) == 0:
return {}
hex_received = binascii.hexlify(bytes_received).decode("utf-8").upper()
actions = self._parse_joint_data(hex_received)
return actions
except Exception as e:
logging.error(f"读取串口数据错误: {e}")
return {}
def get_controller_status(self):
bytes_to_send = binascii.unhexlify(self.control_hex_data.replace(" ", ""))
self.serial_conn.write(bytes_to_send)
time.sleep(0.02)
bytes_received = self.serial_conn.read(self.serial_conn.inWaiting())
hex_received = binascii.hexlify(bytes_received).decode("utf-8").upper()
# print("control status:", hex_received)
status = self._parse_controller_data(hex_received)
return status
def close(self):
"""关闭串口连接"""
if self.connected and hasattr(self, 'serial_conn'):
self.serial_conn.close()
self.connected = False
logging.info("串口连接已关闭")
class HybridController:
def __init__(self, init_info):
# 初始化pygame和手柄
pygame.init()
pygame.joystick.init()
# 检查是否有连接的手柄
if pygame.joystick.get_count() == 0:
raise Exception("未检测到手柄")
# 初始化手柄
self.joystick = pygame.joystick.Joystick(0)
class ControllerBase:
def __init__(self, joystick_index):
# 初始化手柄对象
self.joystick = pygame.joystick.Joystick(joystick_index)
self.joystick.init()
# 摇杆死区
self.deadzone = 0.15
# 控制模式: True为关节控制主模式False为末端控制
self.joint_control_mode = True
# 精细控制模式
self.fine_control_mode = False
# 初始化末端姿态和关节角度
self.init_joint = init_info['init_joint']
self.init_pose = init_info.get('init_pose', [0]*6)
self.max_gripper = init_info['max_gripper']
self.min_gripper = init_info['min_gripper']
servo_config_file = init_info['servo_config_file']
self.joint = self.init_joint.copy()
self.pose = self.init_pose.copy()
self.pose_speeds = [0.0] * 6
self.joint_speeds = [0.0] * 6
self.tozero = False
# 主臂关节状态
self.master_joint_actions = {}
self.master_controller_status = {}
self.use_master_arm = False
# 末端位姿限制
self.pose_limits = [
(-0.800, 0.800), # X (m)
(-0.800, 0.800), # Y (m)
(-0.800, 0.800), # Z (m)
(-3.14, 3.14), # RX (rad)
(-3.14, 3.14), # RY (rad)
(-3.14, 3.14) # RZ (rad)
]
# 关节角度限制 (度)
self.joint_limits = [
(-180, 180), # joint 1
(-180, 180), # joint 2
(-180, 180), # joint 3
(-180, 180), # joint 4
(-180, 180), # joint 5
(-180, 180) # joint 6
]
# 控制参数
self.linear_step = 0.002 # 线性移动步长(m)
self.angular_step = 0.01 # 角度步长(rad)
# 夹爪状态和速度
self.gripper_speed = 10
self.gripper = self.min_gripper
# 初始化串口通信(主臂关节状态获取)
self.servo_arm = None
if servo_config_file:
try:
self.servo_arm = ServoArm(servo_config_file)
self.use_master_arm = True
logging.info("串口主臂连接成功,启用主从控制模式")
except Exception as e:
logging.error(f"串口主臂连接失败: {e}")
self.use_master_arm = False
# 启动更新线程
self.running = True
self.thread = threading.Thread(target=self.update_controller)
self.thread.start()
print("混合控制器已启动")
print("主控制模式: 关节控制")
if self.use_master_arm:
print("主从控制: 启用")
print("Back按钮: 切换控制模式(关节/末端)")
print("L3按钮: 切换精细控制模式")
print("Start按钮: 重置到初始位置")
def _apply_nonlinear_mapping(self, value):
"""应用非线性映射以提高控制精度"""
sign = 1 if value >= 0 else -1
return sign * (abs(value) ** 2)
def _normalize_angle(self, angle):
"""将角度归一化到[-π, π]范围内"""
import math
while angle > math.pi:
angle -= 2 * math.pi
while angle < -math.pi:
angle += 2 * math.pi
return angle
def update_controller(self):
while self.running:
try:
pygame.event.pump()
except Exception as e:
print(f"控制器错误: {e}")
self.stop()
continue
# 检查控制模式切换 (Back按钮)
if self.joystick.get_button(6): # Back按钮
self.joint_control_mode = not self.joint_control_mode
mode_str = "关节控制" if self.joint_control_mode else "末端位姿控制"
print(f"切换到{mode_str}模式")
time.sleep(0.3) # 防止多次触发
# 检查精细控制模式切换 (L3按钮)
if self.joystick.get_button(10): # L3按钮
self.fine_control_mode = not self.fine_control_mode
print(f"切换到{'精细' if self.fine_control_mode else '普通'}控制模式")
time.sleep(0.3) # 防止多次触发
# 检查重置按钮 (Start按钮)
if self.joystick.get_button(7): # Start按钮
print("重置机械臂到初始位置...")
# print("init_joint", self.init_joint.copy())
self.tozero = True
self.joint = self.init_joint.copy()
self.pose = self.init_pose.copy()
self.pose_speeds = [0.0] * 6
self.joint_speeds = [0.0] * 6
self.gripper_speed = 10
self.gripper = self.min_gripper
print("机械臂已重置到初始位置")
time.sleep(0.3) # 防止多次触发
# 从串口获取主臂关节状态
if self.servo_arm and self.servo_arm.connected:
try:
self.master_joint_actions = self.servo_arm.get_joint_actions()
self.master_controller_status = self.servo_arm.get_controller_status()
if self.master_joint_actions:
logging.debug(f"主臂关节状态: {self.master_joint_actions}")
except Exception as e:
logging.error(f"获取主臂状态错误: {e}")
self.master_joint_actions = {}
# print(self.master_joint_actions)
# 根据控制模式更新相应的控制逻辑
if self.joint_control_mode:
# 关节控制模式下优先使用主臂数据Xbox作为辅助
self.update_joint_control()
else:
# 末端控制模式使用Xbox控制
self.update_end_pose()
time.sleep(0.02)
# print('gripper:', self.gripper)
def update_joint_control(self):
"""更新关节角度控制 - 优先使用主臂数据"""
if self.use_master_arm and self.master_joint_actions:
# 主从控制模式:直接使用主臂的关节角度
try:
# 将主臂关节角度映射到从臂
for i in range(6): # 假设只有6个关节需要控制
joint_key = f"joint_{i+1}"
if joint_key in self.master_joint_actions:
# 直接使用主臂的关节角度(已经是度数)
self.joint[i] = self.master_joint_actions[joint_key]
# 应用关节限制
min_val, max_val = self.joint_limits[i]
self.joint[i] = max(min_val, min(max_val, self.joint[i]))
# print(self.joint)
logging.debug(f"主臂关节映射到从臂: {self.joint[:6]}")
except Exception as e:
logging.error(f"主臂数据映射错误: {e}")
# 如果有主臂夹爪数据,使用主臂夹爪状态
if self.use_master_arm and "grasp" in self.master_joint_actions:
self.gripper = self.master_joint_actions["grasp"] * 1000
self.joint[-1] = self.gripper
def update_end_pose(self):
"""更新末端位姿控制"""
# 根据控制模式调整步长
current_linear_step = self.linear_step * (0.1 if self.fine_control_mode else 1.0)
current_angular_step = self.angular_step * (0.1 if self.fine_control_mode else 1.0)
# 方向键控制XY
hat = self.joystick.get_hat(0)
hat_up = hat[1] == 1 # Y+
hat_down = hat[1] == -1 # Y-
hat_left = hat[0] == -1 # X-
hat_right = hat[0] == 1 # X+
# 右摇杆控制Z
right_y_raw = -self.joystick.get_axis(4)
# 左摇杆控制RZ
left_y_raw = -self.joystick.get_axis(1)
# 应用死区
right_y = 0.0 if abs(right_y_raw) < self.deadzone else right_y_raw
left_y = 0.0 if abs(left_y_raw) < self.deadzone else left_y_raw
# 计算各轴速度
self.pose_speeds[1] = current_linear_step if hat_up else (-current_linear_step if hat_down else 0.0) # Y
self.pose_speeds[0] = -current_linear_step if hat_left else (current_linear_step if hat_right else 0.0) # X
# 设置Z速度右摇杆Y轴控制
z_mapping = self._apply_nonlinear_mapping(right_y)
self.pose_speeds[2] = z_mapping * current_linear_step # Z
# L1/R1控制RX旋转
LB = self.joystick.get_button(4) # RX-
RB = self.joystick.get_button(5) # RX+
self.pose_speeds[3] = (-current_angular_step if LB else (current_angular_step if RB else 0.0))
# △/□控制RY旋转
triangle = self.joystick.get_button(2) # RY+
square = self.joystick.get_button(3) # RY-
self.pose_speeds[4] = (current_angular_step if triangle else (-current_angular_step if square else 0.0))
# 左摇杆Y轴控制RZ旋转
rz_mapping = self._apply_nonlinear_mapping(left_y)
self.pose_speeds[5] = rz_mapping * current_angular_step * 2 # RZ
# 夹爪控制(圈/叉)
circle = self.joystick.get_button(1) # 夹爪开
cross = self.joystick.get_button(0) # 夹爪关
if circle:
self.gripper = min(self.max_gripper, self.gripper + self.gripper_speed)
elif cross:
self.gripper = max(self.min_gripper, self.gripper - self.gripper_speed)
# 更新末端位姿
for i in range(6):
self.pose[i] += self.pose_speeds[i]
# 角度归一化处理
for i in range(3, 6):
self.pose[i] = self._normalize_angle(self.pose[i])
def update_joint_state(self, joint):
self.joint = joint
# self.tozero = False
def update_endpose_state(self, end_pose):
self.pose = end_pose
# self.tozero = False
def update_tozero_state(self, tozero):
self.tozero = tozero
def get_action(self) -> Dict:
"""获取当前控制命令"""
return {
'control_mode': 'joint' if self.joint_control_mode else 'end_pose',
'use_master_arm': self.use_master_arm,
'master_joint_actions': self.master_joint_actions,
'master_controller_status': self.master_controller_status,
'end_pose': self.pose,
'joint_angles': self.joint,
'gripper': int(self.gripper),
'tozero': self.tozero
# 存储所有控制器状态的字典
self.states = {
'buttons': [False] * self.joystick.get_numbuttons(), # 按钮状态
'axes': [0.0] * self.joystick.get_numaxes(), # 摇杆和轴状态
'hats': [(0, 0)] * self.joystick.get_numhats() # 舵状态
}
def stop(self):
"""停止控制器"""
# deadzone
self.deadzone = 0.15
# validzone
self.validzone = 0.05
# 线程控制标志
self.running = False
if self.thread.is_alive():
def start_polling(self):
"""启动线程以轮询控制器状态"""
if not self.running:
self.running = True
self.thread = threading.Thread(target=self._poll_controller)
self.thread.start()
def stop_polling(self):
"""停止线程"""
if self.running:
self.running = False
self.thread.join()
if self.servo_arm:
self.servo_arm.close()
pygame.quit()
print("混合控制器已退出")
def _poll_controller(self):
"""后台线程函数,用于轮询控制器状态"""
while self.running:
# 处理pygame事件
pygame.event.pump()
# 获取按钮状态
for i in range(self.joystick.get_numbuttons()):
self.states['buttons'][i] = self.joystick.get_button(i)
# 获取摇杆和轴状态(通常范围是 -1.0 到 1.0
for i in range(self.joystick.get_numaxes()):
self.states['axes'][i] = self.joystick.get_axis(i)
# 获取舵状态(通常返回一个元组 (x, y),值范围为 -1, 0, 1
for i in range(self.joystick.get_numhats()):
self.states['hats'][i] = self.joystick.get_hat(i)
# 控制轮询频率
time.sleep(0.01)
def get_raw_states(self):
"""获取当前控制器状态"""
return self.states
class FlightStick(ControllerBase):
def __init__(self, joystick_index):
super().__init__(joystick_index)
def get_x_control_signal(self):
x = 0
if self.states['axes'][0] > self.validzone:
x = 1
elif self.states['axes'][0] < -self.validzone:
x = -1
else:
x = 0
return x
def reset(self):
"""重置到初始状态"""
self.joint = self.init_joint.copy()
self.pose = self.init_pose.copy()
self.pose_speeds = [0.0] * 6
self.joint_speeds = [0.0] * 6
self.gripper_speed = 10
self.gripper = self.min_gripper
print("已重置到初始状态")
def get_y_control_signal(self):
y = 0
if self.states['axes'][1] > self.validzone:
y = 1
elif self.states['axes'][1] < -self.validzone:
y = -1
else:
y = 0
return y
def get_z_control_signal(self):
z = 0
if self.states['buttons'][0]:
z = 1
elif self.states['buttons'][1]:
z = -1
else:
z = 0
return z
def get_gripper_control_signal(self):
gripper = 0
if self.states['buttons'][2] == 1:
gripper = 1
elif self.states['buttons'][3] == 1:
gripper = -1
return gripper
def get_vel_control_signal(self):
vel = self.deadzone
if self.states['axes'][2] > 0.8:
vel = self.deadzone * 2
elif self.states['axes'][2] < -0.8:
vel = self.deadzone / 5
else:
vel = self.deadzone
return vel
def get_rx_control_signal(self):
rx = 0
if self.states['hats'][0][0] == -1:
rx = -1
elif self.states['hats'][0][0] == 1:
rx = 1
else:
rx = 0
return rx
def get_ry_control_signal(self):
ry = 0
if self.states['hats'][0][1] == 1:
ry = 1
elif self.states['hats'][0][1] == -1:
ry = -1
else:
ry = 0
return ry
def get_rz_control_signal(self):
rz = 0
if self.states['axes'][3] < -self.validzone:
rz = -1
elif self.states['axes'][3] > self.validzone:
rz = 1
else:
rz = 0
return rz
def get_control_signal(self):
return{
'x': self.get_x_control_signal(),
'y': self.get_y_control_signal(),
'z': self.get_z_control_signal(),
'rx': self.get_rx_control_signal(),
'ry': self.get_ry_control_signal(),
'rz': self.get_rz_control_signal(),
'gripper': self.get_gripper_control_signal(),
'vel': self.get_vel_control_signal()
}
class XboxStick:
def __init__(self, joystick_index):
super().__init__(joystick_index)
def get_x_control_signal(self):
"""获取 X 轴控制信号"""
x = 0
if self.states['hats'][0][0] == -1:
x = -1
elif self.states['hats'][0][0] == 1:
x = 1
return x
def get_y_control_signal(self):
"""获取 Y 轴控制信号"""
y = 0
if self.states['hats'][0][1] == 1:
y = 1
elif self.states['hats'][0][1] == -1:
y = -1
return y
def get_z_control_signal(self):
"""获取 Z 轴控制信号"""
z = 0
if self.states['axes'][4] > self.deadzone: # A 按钮
z = 1
elif self.states['axes'][4] < -self.deadzone: # B 按钮
z = -1
return z
def get_vel_control_signal(self):
"""获取速度控制信号"""
vel = self.deadzone
if self.states['axes'][2] > 0.8: # LT 按钮
vel = self.deadzone * 2
elif self.states['axes'][5] > 0.8: # RT 按钮
vel = self.deadzone / 5
return vel
def get_gripper_control_signal(self):
gripper = 0
if self.states['buttons'][0] == 1:
gripper = 1
elif self.states['buttons'][1] == 1:
gripper = -1
return gripper
def get_rx_control_signal(self):
"""获取 RX 轴控制信号"""
rx = 0
if self.states['axes'][0] > self.deadzone: # 左舵
rx = 1
elif self.states['axes'][0] < -self.deadzone: # 右舵
rx = -1
return rx
def get_ry_control_signal(self):
"""获取 RY 轴控制信号"""
ry = 0
if self.states['axes'][1] > self.deadzone: # 上舵
ry = 1
elif self.states['axes'][1] < -self.deadzone: # 下舵
ry = -1
return ry
def get_rz_control_signal(self):
"""获取 RZ 轴控制信号"""
rz = 0
if self.states['buttons'][4] == 1: # 左摇杆
rz = 1
elif self.states['buttons'][5] == 1: # 右摇杆
rz = -1
return rz
def get_control_signal(self):
"""获取所有控制信号"""
return {
'x': self.get_x_control_signal(),
'y': self.get_y_control_signal(),
'z': self.get_z_control_signal(),
'rx': self.get_rx_control_signal(),
'ry': self.get_ry_control_signal(),
'rz': self.get_rz_control_signal(),
'gripper': self.get_gripper_control_signal(),
'vel': self.get_vel_control_signal()
}
def get_usb_joystick_info():
"""获取所有 USB 游戏控制器的信息"""
context = pyudev.Context()
devices = []
for device in context.list_devices(subsystem='input', ID_INPUT_JOYSTICK=1):
# 获取设备路径(如 /dev/input/js0
device_path = device.device_node
if device_path:
# 获取 VID、PID 和 UUIDID_SERIAL
vid = device.get('ID_VENDOR_ID')
pid = device.get('ID_MODEL_ID')
uuid = device.get('ID_SERIAL_SHORT') # 设备的唯一标识符
if vid and pid and uuid:
devices.append({
'path': device_path,
'vid': int(vid, 16), # 转换为十六进制
'pid': int(pid, 16), # 转换为十六进制
'uuid': uuid # 设备的唯一标识符
})
return devices
def match_controller_index(joystick_info):
"""将 USB 设备信息与 pygame 的控制器索引匹配"""
# 获取所有 pygame 控制器的设备路径
pygame_joysticks = []
for i in range(pygame.joystick.get_count()):
joystick = pygame.joystick.Joystick(i)
joystick.init()
pygame_joysticks.append({
'index': i,
'name': joystick.get_name(),
'guid': joystick.get_guid()
})
# 匹配设备路径
for usb_device in joystick_info:
for pygame_joystick in pygame_joysticks:
if usb_device['path'] in pygame_joystick['guid']:
usb_device['pygame_index'] = pygame_joystick['index']
break
return joystick_info
# 使用示例
if __name__ == "__main__":
# 初始化睿尔曼机械臂
arm = RoboticArm(rm_thread_mode_e.RM_TRIPLE_MODE_E)
# 创建机械臂连接
handle = arm.rm_create_robot_arm("192.168.3.18", 8080)
print(f"机械臂连接ID: {handle.id}")
init_pose = arm.rm_get_current_arm_state()[1]['pose']
# 初始化pygame
pygame.init()
pygame.joystick.init()
# 获取所有 USB 游戏控制器的信息
usb_joysticks = get_usb_joystick_info()
matched_joysticks = match_controller_index(usb_joysticks)
print(usb_joysticks)
print(matched_joysticks)
with open('/home/maic/LYT/lerobot/lerobot/common/robot_devices/teleop/realman_mix.yaml', "r") as file:
config = yaml.safe_load(file)
config['init_pose'] = init_pose
arm_controller = HybridController(config)
try:
while True:
print(arm_controller.get_action())
time.sleep(0.1)
except KeyboardInterrupt:
arm_controller.stop()
# stick = FlightStick()
# # stick = XboxStick()
# stick.start_polling()
# try:
# while True:
# # 主程序可以继续执行其他任务
# states = stick.get_raw_states()
# # states = stick.get_control_signal()
# print("当前状态:", states)
# time.sleep(1)
# except KeyboardInterrupt:
# stick.stop_polling()

View File

@@ -19,6 +19,7 @@ class ControllerConfig:
max_gripper: int
min_gripper: int
config_file: str
end_controller: str = None
class HybridController:
@@ -37,7 +38,8 @@ class HybridController:
init_pose=init_info.get('init_pose', [0]*12),
max_gripper=init_info['max_gripper'],
min_gripper=init_info['min_gripper'],
config_file=init_info['servo_config_file']
config_file=init_info['servo_config_file'],
end_controller=init_info['end_controller']
)
def _initialize_master_arm(self):

View File

@@ -0,0 +1,466 @@
import pygame
import threading
import time
import serial
import binascii
import logging
import yaml
from typing import Dict
from Robotic_Arm.rm_robot_interface import *
class ServoArm:
def __init__(self, config_file="config.yaml"):
"""初始化机械臂的串口连接并发送初始数据。
Args:
config_file (str): 配置文件的路径。
"""
self.config = self._load_config(config_file)
self.port = self.config["port"]
self.baudrate = self.config["baudrate"]
self.joint_hex_data = self.config["joint_hex_data"]
self.control_hex_data = self.config["control_hex_data"]
self.arm_axis = self.config.get("arm_axis", 7)
try:
self.serial_conn = serial.Serial(self.port, self.baudrate, timeout=0)
self.bytes_to_send = binascii.unhexlify(self.joint_hex_data.replace(" ", ""))
self.serial_conn.write(self.bytes_to_send)
time.sleep(1)
self.connected = True
logging.info(f"串口连接成功: {self.port}")
except Exception as e:
logging.error(f"串口连接失败: {e}")
self.connected = False
def _load_config(self, config_file):
"""加载配置文件。
Args:
config_file (str): 配置文件的路径。
Returns:
dict: 配置文件内容。
"""
try:
with open(config_file, "r") as file:
config = yaml.safe_load(file)
return config
except Exception as e:
logging.error(f"配置文件加载失败: {e}")
# 返回默认配置
return {
"port": "/dev/ttyUSB0",
"baudrate": 460800,
"joint_hex_data": "55 AA 02 00 00 67",
"control_hex_data": "55 AA 08 00 00 B9",
"arm_axis": 6
}
def _bytes_to_signed_int(self, byte_data):
"""将字节数据转换为有符号整数。
Args:
byte_data (bytes): 字节数据。
Returns:
int: 有符号整数。
"""
return int.from_bytes(byte_data, byteorder="little", signed=True)
def _parse_joint_data(self, hex_received):
"""解析接收到的十六进制数据并提取关节数据。
Args:
hex_received (str): 接收到的十六进制字符串数据。
Returns:
dict: 解析后的关节数据。
"""
logging.debug(f"hex_received: {hex_received}")
joints = {}
for i in range(self.arm_axis):
start = 14 + i * 10
end = start + 8
joint_hex = hex_received[start:end]
joint_byte_data = bytearray.fromhex(joint_hex)
joint_value = self._bytes_to_signed_int(joint_byte_data) / 10000.0
joints[f"joint_{i+1}"] = joint_value
grasp_start = 14 + self.arm_axis*10
grasp_hex = hex_received[grasp_start:grasp_start+8]
grasp_byte_data = bytearray.fromhex(grasp_hex)
# 夹爪进行归一化处理
grasp_value = self._bytes_to_signed_int(grasp_byte_data)/1000
joints["grasp"] = grasp_value
return joints
def _parse_controller_data(self, hex_received):
status = {
'infrared': 0,
'button': 0
}
if len(hex_received) == 18:
status['infrared'] = self._bytes_to_signed_int(bytearray.fromhex(hex_received[12:14]))
status['button'] = self._bytes_to_signed_int(bytearray.fromhex(hex_received[14:16]))
# print(infrared)
return status
def get_joint_actions(self):
"""从串口读取数据并解析关节动作。
Returns:
dict: 包含关节数据的字典。
"""
if not self.connected:
return {}
try:
self.serial_conn.write(self.bytes_to_send)
time.sleep(0.02)
bytes_received = self.serial_conn.read(self.serial_conn.inWaiting())
if len(bytes_received) == 0:
return {}
hex_received = binascii.hexlify(bytes_received).decode("utf-8").upper()
actions = self._parse_joint_data(hex_received)
return actions
except Exception as e:
logging.error(f"读取串口数据错误: {e}")
return {}
def get_controller_status(self):
bytes_to_send = binascii.unhexlify(self.control_hex_data.replace(" ", ""))
self.serial_conn.write(bytes_to_send)
time.sleep(0.02)
bytes_received = self.serial_conn.read(self.serial_conn.inWaiting())
hex_received = binascii.hexlify(bytes_received).decode("utf-8").upper()
# print("control status:", hex_received)
status = self._parse_controller_data(hex_received)
return status
def close(self):
"""关闭串口连接"""
if self.connected and hasattr(self, 'serial_conn'):
self.serial_conn.close()
self.connected = False
logging.info("串口连接已关闭")
class HybridController:
def __init__(self, init_info):
# 初始化pygame和手柄
pygame.init()
pygame.joystick.init()
# 检查是否有连接的手柄
if pygame.joystick.get_count() == 0:
raise Exception("未检测到手柄")
# 初始化手柄
self.joystick = pygame.joystick.Joystick(0)
self.joystick.init()
# 摇杆死区
self.deadzone = 0.15
# 控制模式: True为关节控制主模式False为末端控制
self.joint_control_mode = True
# 精细控制模式
self.fine_control_mode = False
# 初始化末端姿态和关节角度
self.init_joint = init_info['init_joint']
self.init_pose = init_info.get('init_pose', [0]*6)
self.max_gripper = init_info['max_gripper']
self.min_gripper = init_info['min_gripper']
servo_config_file = init_info['servo_config_file']
self.joint = self.init_joint.copy()
self.pose = self.init_pose.copy()
self.pose_speeds = [0.0] * 6
self.joint_speeds = [0.0] * 6
self.tozero = False
# 主臂关节状态
self.master_joint_actions = {}
self.master_controller_status = {}
self.use_master_arm = False
# 末端位姿限制
self.pose_limits = [
(-0.800, 0.800), # X (m)
(-0.800, 0.800), # Y (m)
(-0.800, 0.800), # Z (m)
(-3.14, 3.14), # RX (rad)
(-3.14, 3.14), # RY (rad)
(-3.14, 3.14) # RZ (rad)
]
# 关节角度限制 (度)
self.joint_limits = [
(-180, 180), # joint 1
(-180, 180), # joint 2
(-180, 180), # joint 3
(-180, 180), # joint 4
(-180, 180), # joint 5
(-180, 180) # joint 6
]
# 控制参数
self.linear_step = 0.002 # 线性移动步长(m)
self.angular_step = 0.01 # 角度步长(rad)
# 夹爪状态和速度
self.gripper_speed = 10
self.gripper = self.min_gripper
# 初始化串口通信(主臂关节状态获取)
self.servo_arm = None
if servo_config_file:
try:
self.servo_arm = ServoArm(servo_config_file)
self.use_master_arm = True
logging.info("串口主臂连接成功,启用主从控制模式")
except Exception as e:
logging.error(f"串口主臂连接失败: {e}")
self.use_master_arm = False
# 启动更新线程
self.running = True
self.thread = threading.Thread(target=self.update_controller)
self.thread.start()
print("混合控制器已启动")
print("主控制模式: 关节控制")
if self.use_master_arm:
print("主从控制: 启用")
print("Back按钮: 切换控制模式(关节/末端)")
print("L3按钮: 切换精细控制模式")
print("Start按钮: 重置到初始位置")
def _apply_nonlinear_mapping(self, value):
"""应用非线性映射以提高控制精度"""
sign = 1 if value >= 0 else -1
return sign * (abs(value) ** 2)
def _normalize_angle(self, angle):
"""将角度归一化到[-π, π]范围内"""
import math
while angle > math.pi:
angle -= 2 * math.pi
while angle < -math.pi:
angle += 2 * math.pi
return angle
def update_controller(self):
while self.running:
try:
pygame.event.pump()
except Exception as e:
print(f"控制器错误: {e}")
self.stop()
continue
# 检查控制模式切换 (Back按钮)
if self.joystick.get_button(6): # Back按钮
self.joint_control_mode = not self.joint_control_mode
mode_str = "关节控制" if self.joint_control_mode else "末端位姿控制"
print(f"切换到{mode_str}模式")
time.sleep(0.3) # 防止多次触发
# 检查精细控制模式切换 (L3按钮)
if self.joystick.get_button(10): # L3按钮
self.fine_control_mode = not self.fine_control_mode
print(f"切换到{'精细' if self.fine_control_mode else '普通'}控制模式")
time.sleep(0.3) # 防止多次触发
# 检查重置按钮 (Start按钮)
if self.joystick.get_button(7): # Start按钮
print("重置机械臂到初始位置...")
# print("init_joint", self.init_joint.copy())
self.tozero = True
self.joint = self.init_joint.copy()
self.pose = self.init_pose.copy()
self.pose_speeds = [0.0] * 6
self.joint_speeds = [0.0] * 6
self.gripper_speed = 10
self.gripper = self.min_gripper
print("机械臂已重置到初始位置")
time.sleep(0.3) # 防止多次触发
# 从串口获取主臂关节状态
if self.servo_arm and self.servo_arm.connected:
try:
self.master_joint_actions = self.servo_arm.get_joint_actions()
self.master_controller_status = self.servo_arm.get_controller_status()
if self.master_joint_actions:
logging.debug(f"主臂关节状态: {self.master_joint_actions}")
except Exception as e:
logging.error(f"获取主臂状态错误: {e}")
self.master_joint_actions = {}
# print(self.master_joint_actions)
# 根据控制模式更新相应的控制逻辑
if self.joint_control_mode:
# 关节控制模式下优先使用主臂数据Xbox作为辅助
self.update_joint_control()
else:
# 末端控制模式使用Xbox控制
self.update_end_pose()
time.sleep(0.02)
# print('gripper:', self.gripper)
def update_joint_control(self):
"""更新关节角度控制 - 优先使用主臂数据"""
if self.use_master_arm and self.master_joint_actions:
# 主从控制模式:直接使用主臂的关节角度
try:
# 将主臂关节角度映射到从臂
for i in range(6): # 假设只有6个关节需要控制
joint_key = f"joint_{i+1}"
if joint_key in self.master_joint_actions:
# 直接使用主臂的关节角度(已经是度数)
self.joint[i] = self.master_joint_actions[joint_key]
# 应用关节限制
min_val, max_val = self.joint_limits[i]
self.joint[i] = max(min_val, min(max_val, self.joint[i]))
# print(self.joint)
logging.debug(f"主臂关节映射到从臂: {self.joint[:6]}")
except Exception as e:
logging.error(f"主臂数据映射错误: {e}")
# 如果有主臂夹爪数据,使用主臂夹爪状态
if self.use_master_arm and "grasp" in self.master_joint_actions:
self.gripper = self.master_joint_actions["grasp"] * 1000
self.joint[-1] = self.gripper
def update_end_pose(self):
"""更新末端位姿控制"""
# 根据控制模式调整步长
current_linear_step = self.linear_step * (0.1 if self.fine_control_mode else 1.0)
current_angular_step = self.angular_step * (0.1 if self.fine_control_mode else 1.0)
# 方向键控制XY
hat = self.joystick.get_hat(0)
hat_up = hat[1] == 1 # Y+
hat_down = hat[1] == -1 # Y-
hat_left = hat[0] == -1 # X-
hat_right = hat[0] == 1 # X+
# 右摇杆控制Z
right_y_raw = -self.joystick.get_axis(4)
# 左摇杆控制RZ
left_y_raw = -self.joystick.get_axis(1)
# 应用死区
right_y = 0.0 if abs(right_y_raw) < self.deadzone else right_y_raw
left_y = 0.0 if abs(left_y_raw) < self.deadzone else left_y_raw
# 计算各轴速度
self.pose_speeds[1] = current_linear_step if hat_up else (-current_linear_step if hat_down else 0.0) # Y
self.pose_speeds[0] = -current_linear_step if hat_left else (current_linear_step if hat_right else 0.0) # X
# 设置Z速度右摇杆Y轴控制
z_mapping = self._apply_nonlinear_mapping(right_y)
self.pose_speeds[2] = z_mapping * current_linear_step # Z
# L1/R1控制RX旋转
LB = self.joystick.get_button(4) # RX-
RB = self.joystick.get_button(5) # RX+
self.pose_speeds[3] = (-current_angular_step if LB else (current_angular_step if RB else 0.0))
# △/□控制RY旋转
triangle = self.joystick.get_button(2) # RY+
square = self.joystick.get_button(3) # RY-
self.pose_speeds[4] = (current_angular_step if triangle else (-current_angular_step if square else 0.0))
# 左摇杆Y轴控制RZ旋转
rz_mapping = self._apply_nonlinear_mapping(left_y)
self.pose_speeds[5] = rz_mapping * current_angular_step * 2 # RZ
# 夹爪控制(圈/叉)
circle = self.joystick.get_button(1) # 夹爪开
cross = self.joystick.get_button(0) # 夹爪关
if circle:
self.gripper = min(self.max_gripper, self.gripper + self.gripper_speed)
elif cross:
self.gripper = max(self.min_gripper, self.gripper - self.gripper_speed)
# 更新末端位姿
for i in range(6):
self.pose[i] += self.pose_speeds[i]
# 角度归一化处理
for i in range(3, 6):
self.pose[i] = self._normalize_angle(self.pose[i])
def update_joint_state(self, joint):
self.joint = joint
# self.tozero = False
def update_endpose_state(self, end_pose):
self.pose = end_pose
# self.tozero = False
def update_tozero_state(self, tozero):
self.tozero = tozero
def get_action(self) -> Dict:
"""获取当前控制命令"""
return {
'control_mode': 'joint' if self.joint_control_mode else 'end_pose',
'use_master_arm': self.use_master_arm,
'master_joint_actions': self.master_joint_actions,
'master_controller_status': self.master_controller_status,
'end_pose': self.pose,
'joint_angles': self.joint,
'gripper': int(self.gripper),
'tozero': self.tozero
}
def stop(self):
"""停止控制器"""
self.running = False
if self.thread.is_alive():
self.thread.join()
if self.servo_arm:
self.servo_arm.close()
pygame.quit()
print("混合控制器已退出")
def reset(self):
"""重置到初始状态"""
self.joint = self.init_joint.copy()
self.pose = self.init_pose.copy()
self.pose_speeds = [0.0] * 6
self.joint_speeds = [0.0] * 6
self.gripper_speed = 10
self.gripper = self.min_gripper
print("已重置到初始状态")
# 使用示例
if __name__ == "__main__":
# 初始化睿尔曼机械臂
arm = RoboticArm(rm_thread_mode_e.RM_TRIPLE_MODE_E)
# 创建机械臂连接
handle = arm.rm_create_robot_arm("192.168.3.18", 8080)
print(f"机械臂连接ID: {handle.id}")
init_pose = arm.rm_get_current_arm_state()[1]['pose']
with open('/home/maic/LYT/lerobot/lerobot/common/robot_devices/teleop/realman_mix.yaml', "r") as file:
config = yaml.safe_load(file)
config['init_pose'] = init_pose
arm_controller = HybridController(config)
try:
while True:
print(arm_controller.get_action())
time.sleep(0.1)
except KeyboardInterrupt:
arm_controller.stop()

View File

@@ -0,0 +1,140 @@
import pygame
import time
import sys
class FlightStickDetector:
def __init__(self):
# 初始化pygame
pygame.init()
pygame.joystick.init()
self.joysticks = []
self.running = True
def detect_joysticks(self):
"""检测连接的手柄"""
joystick_count = pygame.joystick.get_count()
print(f"检测到 {joystick_count} 个手柄设备")
if joystick_count == 0:
print("未检测到任何手柄设备!")
return False
# 初始化所有检测到的手柄
self.joysticks = []
for i in range(joystick_count):
joystick = pygame.joystick.Joystick(i)
joystick.init()
self.joysticks.append(joystick)
print(f"\n手柄 {i}:")
print(f" 名称: {joystick.get_name()}")
print(f" 轴数量: {joystick.get_numaxes()}")
print(f" 按钮数量: {joystick.get_numbuttons()}")
print(f" 帽子开关数量: {joystick.get_numhats()}")
return True
def get_joystick_data(self, joystick_id=0):
"""获取指定手柄的数据"""
if joystick_id >= len(self.joysticks):
return None
joystick = self.joysticks[joystick_id]
# 获取轴数据
axes = []
for i in range(joystick.get_numaxes()):
axis_value = joystick.get_axis(i)
axes.append(round(axis_value, 3))
# 获取按钮数据
buttons = []
for i in range(joystick.get_numbuttons()):
button_pressed = joystick.get_button(i)
buttons.append(button_pressed)
# 获取帽子开关数据
hats = []
for i in range(joystick.get_numhats()):
hat_value = joystick.get_hat(i)
hats.append(hat_value)
return {
'name': joystick.get_name(),
'axes': axes,
'buttons': buttons,
'hats': hats
}
def monitor_joystick(self, joystick_id=0, update_interval=0.1):
"""实时监控手柄数据"""
print(f"\n开始监控手柄 {joystick_id} (按Ctrl+C停止)")
print("=" * 50)
try:
while self.running:
pygame.event.pump() # 更新事件队列
data = self.get_joystick_data(joystick_id)
if data is None:
print("无法获取手柄数据")
break
# 清屏并显示数据
print("\033[H\033[J", end="") # ANSI清屏
print(f"手柄: {data['name']}")
print("-" * 40)
# 显示轴数据(通常用于飞行控制)
axis_names = ['X轴(副翼)', 'Y轴(升降舵)', 'Z轴(方向舵)',
'油门', '轴4', '轴5', '轴6', '轴7']
print("轴数据:")
for i, value in enumerate(data['axes']):
name = axis_names[i] if i < len(axis_names) else f'{i}'
# 创建进度条显示
bar_length = 20
bar_pos = int((value + 1) * bar_length / 2)
bar = [''] * bar_length
if 0 <= bar_pos < bar_length:
bar[bar_pos] = ''
bar_str = ''.join(bar)
print(f" {name:12}: {value:6.3f} [{bar_str}]")
# 显示按钮数据
pressed_buttons = [i for i, pressed in enumerate(data['buttons']) if pressed]
if pressed_buttons:
print(f"\n按下的按钮: {pressed_buttons}")
# 显示帽子开关数据
if data['hats']:
print(f"帽子开关: {data['hats']}")
time.sleep(update_interval)
except KeyboardInterrupt:
print("\n\n监控已停止")
def cleanup(self):
"""清理资源"""
pygame.joystick.quit()
pygame.quit()
# 使用示例
def main():
detector = FlightStickDetector()
try:
# 检测手柄
if detector.detect_joysticks():
# 如果检测到手柄,开始监控第一个手柄
detector.monitor_joystick(0)
else:
print("请连接飞行手柄后重试")
finally:
detector.cleanup()
if __name__ == "__main__":
main()