diff --git a/lerobot/common/robot_devices/robots/realman.py b/lerobot/common/robot_devices/robots/realman.py index e8845037..2578179a 100644 --- a/lerobot/common/robot_devices/robots/realman.py +++ b/lerobot/common/robot_devices/robots/realman.py @@ -7,7 +7,7 @@ import torch import numpy as np from dataclasses import dataclass, field, replace from collections import deque -from lerobot.common.robot_devices.teleop.gamepad import HybridController +from lerobot.common.robot_devices.teleop.realman_single import HybridController from lerobot.common.robot_devices.motors.utils import get_motor_names, make_motors_buses_from_configs from lerobot.common.robot_devices.cameras.utils import make_cameras_from_configs from lerobot.common.robot_devices.utils import RobotDeviceAlreadyConnectedError, RobotDeviceNotConnectedError diff --git a/lerobot/common/robot_devices/robots/realman_dual.py b/lerobot/common/robot_devices/robots/realman_dual.py index ce0e29e3..3d7c3fba 100644 --- a/lerobot/common/robot_devices/robots/realman_dual.py +++ b/lerobot/common/robot_devices/robots/realman_dual.py @@ -9,7 +9,7 @@ import logging from typing import Optional, Tuple, Dict from dataclasses import dataclass, field, replace from collections import deque -from lerobot.common.robot_devices.teleop.gamepad_dual import HybridController +from lerobot.common.robot_devices.teleop.realman_aloha_dual import HybridController from lerobot.common.robot_devices.motors.utils import get_motor_names, make_motors_buses_from_configs from lerobot.common.robot_devices.cameras.utils import make_cameras_from_configs from lerobot.common.robot_devices.utils import RobotDeviceAlreadyConnectedError, RobotDeviceNotConnectedError @@ -226,7 +226,7 @@ class RealmanDualRobot: # 获取动作 action = self.teleop.get_action() # 执行动作 - self._execute_action(action, state) + # self._execute_action(action, state) # 更新状态队列 self._update_state_queue() diff --git a/lerobot/common/robot_devices/teleop/gamepad.py b/lerobot/common/robot_devices/teleop/gamepad.py index 330d3ab1..704b924a 100644 --- a/lerobot/common/robot_devices/teleop/gamepad.py +++ b/lerobot/common/robot_devices/teleop/gamepad.py @@ -1,466 +1,319 @@ import pygame import threading import time -import serial -import binascii import logging -import yaml -from typing import Dict -from Robotic_Arm.rm_robot_interface import * +import pyudev - -class ServoArm: - def __init__(self, config_file="config.yaml"): - """初始化机械臂的串口连接并发送初始数据。 - - Args: - config_file (str): 配置文件的路径。 - """ - self.config = self._load_config(config_file) - self.port = self.config["port"] - self.baudrate = self.config["baudrate"] - self.joint_hex_data = self.config["joint_hex_data"] - self.control_hex_data = self.config["control_hex_data"] - self.arm_axis = self.config.get("arm_axis", 7) - - try: - self.serial_conn = serial.Serial(self.port, self.baudrate, timeout=0) - self.bytes_to_send = binascii.unhexlify(self.joint_hex_data.replace(" ", "")) - self.serial_conn.write(self.bytes_to_send) - time.sleep(1) - self.connected = True - logging.info(f"串口连接成功: {self.port}") - except Exception as e: - logging.error(f"串口连接失败: {e}") - self.connected = False - - def _load_config(self, config_file): - """加载配置文件。 - - Args: - config_file (str): 配置文件的路径。 - - Returns: - dict: 配置文件内容。 - """ - try: - with open(config_file, "r") as file: - config = yaml.safe_load(file) - return config - except Exception as e: - logging.error(f"配置文件加载失败: {e}") - # 返回默认配置 - return { - "port": "/dev/ttyUSB0", - "baudrate": 460800, - "joint_hex_data": "55 AA 02 00 00 67", - "control_hex_data": "55 AA 08 00 00 B9", - "arm_axis": 6 - } - - def _bytes_to_signed_int(self, byte_data): - """将字节数据转换为有符号整数。 - - Args: - byte_data (bytes): 字节数据。 - - Returns: - int: 有符号整数。 - """ - return int.from_bytes(byte_data, byteorder="little", signed=True) - - def _parse_joint_data(self, hex_received): - """解析接收到的十六进制数据并提取关节数据。 - - Args: - hex_received (str): 接收到的十六进制字符串数据。 - - Returns: - dict: 解析后的关节数据。 - """ - logging.debug(f"hex_received: {hex_received}") - joints = {} - for i in range(self.arm_axis): - start = 14 + i * 10 - end = start + 8 - joint_hex = hex_received[start:end] - joint_byte_data = bytearray.fromhex(joint_hex) - joint_value = self._bytes_to_signed_int(joint_byte_data) / 10000.0 - joints[f"joint_{i+1}"] = joint_value - grasp_start = 14 + self.arm_axis*10 - grasp_hex = hex_received[grasp_start:grasp_start+8] - grasp_byte_data = bytearray.fromhex(grasp_hex) - # 夹爪进行归一化处理 - grasp_value = self._bytes_to_signed_int(grasp_byte_data)/1000 - - joints["grasp"] = grasp_value - return joints - - def _parse_controller_data(self, hex_received): - status = { - 'infrared': 0, - 'button': 0 - } - if len(hex_received) == 18: - status['infrared'] = self._bytes_to_signed_int(bytearray.fromhex(hex_received[12:14])) - status['button'] = self._bytes_to_signed_int(bytearray.fromhex(hex_received[14:16])) - # print(infrared) - return status - - def get_joint_actions(self): - """从串口读取数据并解析关节动作。 - - Returns: - dict: 包含关节数据的字典。 - """ - if not self.connected: - return {} - - try: - self.serial_conn.write(self.bytes_to_send) - time.sleep(0.02) - bytes_received = self.serial_conn.read(self.serial_conn.inWaiting()) - if len(bytes_received) == 0: - return {} - - hex_received = binascii.hexlify(bytes_received).decode("utf-8").upper() - actions = self._parse_joint_data(hex_received) - return actions - except Exception as e: - logging.error(f"读取串口数据错误: {e}") - return {} - - def get_controller_status(self): - bytes_to_send = binascii.unhexlify(self.control_hex_data.replace(" ", "")) - self.serial_conn.write(bytes_to_send) - time.sleep(0.02) - bytes_received = self.serial_conn.read(self.serial_conn.inWaiting()) - hex_received = binascii.hexlify(bytes_received).decode("utf-8").upper() - # print("control status:", hex_received) - status = self._parse_controller_data(hex_received) - return status - - def close(self): - """关闭串口连接""" - if self.connected and hasattr(self, 'serial_conn'): - self.serial_conn.close() - self.connected = False - logging.info("串口连接已关闭") - - -class HybridController: - def __init__(self, init_info): - # 初始化pygame和手柄 - pygame.init() - pygame.joystick.init() - - # 检查是否有连接的手柄 - if pygame.joystick.get_count() == 0: - raise Exception("未检测到手柄") - - # 初始化手柄 - self.joystick = pygame.joystick.Joystick(0) +class ControllerBase: + def __init__(self, joystick_index): + # 初始化手柄对象 + self.joystick = pygame.joystick.Joystick(joystick_index) self.joystick.init() - # 摇杆死区 - self.deadzone = 0.15 - # 控制模式: True为关节控制(主模式),False为末端控制 - self.joint_control_mode = True - # 精细控制模式 - self.fine_control_mode = False - - # 初始化末端姿态和关节角度 - self.init_joint = init_info['init_joint'] - self.init_pose = init_info.get('init_pose', [0]*6) - self.max_gripper = init_info['max_gripper'] - self.min_gripper = init_info['min_gripper'] - servo_config_file = init_info['servo_config_file'] - self.joint = self.init_joint.copy() - self.pose = self.init_pose.copy() - self.pose_speeds = [0.0] * 6 - self.joint_speeds = [0.0] * 6 - self.tozero = False - - # 主臂关节状态 - self.master_joint_actions = {} - self.master_controller_status = {} - self.use_master_arm = False - - # 末端位姿限制 - self.pose_limits = [ - (-0.800, 0.800), # X (m) - (-0.800, 0.800), # Y (m) - (-0.800, 0.800), # Z (m) - (-3.14, 3.14), # RX (rad) - (-3.14, 3.14), # RY (rad) - (-3.14, 3.14) # RZ (rad) - ] - - # 关节角度限制 (度) - self.joint_limits = [ - (-180, 180), # joint 1 - (-180, 180), # joint 2 - (-180, 180), # joint 3 - (-180, 180), # joint 4 - (-180, 180), # joint 5 - (-180, 180) # joint 6 - ] - - # 控制参数 - self.linear_step = 0.002 # 线性移动步长(m) - self.angular_step = 0.01 # 角度步长(rad) - - # 夹爪状态和速度 - self.gripper_speed = 10 - self.gripper = self.min_gripper - - # 初始化串口通信(主臂关节状态获取) - self.servo_arm = None - if servo_config_file: - try: - self.servo_arm = ServoArm(servo_config_file) - self.use_master_arm = True - logging.info("串口主臂连接成功,启用主从控制模式") - except Exception as e: - logging.error(f"串口主臂连接失败: {e}") - self.use_master_arm = False - - # 启动更新线程 - self.running = True - self.thread = threading.Thread(target=self.update_controller) - self.thread.start() - - print("混合控制器已启动") - print("主控制模式: 关节控制") - if self.use_master_arm: - print("主从控制: 启用") - print("Back按钮: 切换控制模式(关节/末端)") - print("L3按钮: 切换精细控制模式") - print("Start按钮: 重置到初始位置") - - def _apply_nonlinear_mapping(self, value): - """应用非线性映射以提高控制精度""" - sign = 1 if value >= 0 else -1 - return sign * (abs(value) ** 2) - - def _normalize_angle(self, angle): - """将角度归一化到[-π, π]范围内""" - import math - while angle > math.pi: - angle -= 2 * math.pi - while angle < -math.pi: - angle += 2 * math.pi - return angle - - def update_controller(self): - while self.running: - try: - pygame.event.pump() - except Exception as e: - print(f"控制器错误: {e}") - self.stop() - continue - - # 检查控制模式切换 (Back按钮) - if self.joystick.get_button(6): # Back按钮 - self.joint_control_mode = not self.joint_control_mode - mode_str = "关节控制" if self.joint_control_mode else "末端位姿控制" - print(f"切换到{mode_str}模式") - time.sleep(0.3) # 防止多次触发 - - # 检查精细控制模式切换 (L3按钮) - if self.joystick.get_button(10): # L3按钮 - self.fine_control_mode = not self.fine_control_mode - print(f"切换到{'精细' if self.fine_control_mode else '普通'}控制模式") - time.sleep(0.3) # 防止多次触发 - - # 检查重置按钮 (Start按钮) - if self.joystick.get_button(7): # Start按钮 - print("重置机械臂到初始位置...") - # print("init_joint", self.init_joint.copy()) - self.tozero = True - self.joint = self.init_joint.copy() - self.pose = self.init_pose.copy() - self.pose_speeds = [0.0] * 6 - self.joint_speeds = [0.0] * 6 - self.gripper_speed = 10 - self.gripper = self.min_gripper - print("机械臂已重置到初始位置") - time.sleep(0.3) # 防止多次触发 - # 从串口获取主臂关节状态 - if self.servo_arm and self.servo_arm.connected: - try: - self.master_joint_actions = self.servo_arm.get_joint_actions() - self.master_controller_status = self.servo_arm.get_controller_status() - if self.master_joint_actions: - logging.debug(f"主臂关节状态: {self.master_joint_actions}") - - except Exception as e: - logging.error(f"获取主臂状态错误: {e}") - self.master_joint_actions = {} - # print(self.master_joint_actions) - - # 根据控制模式更新相应的控制逻辑 - if self.joint_control_mode: - # 关节控制模式下,优先使用主臂数据,Xbox作为辅助 - self.update_joint_control() - else: - # 末端控制模式,使用Xbox控制 - self.update_end_pose() - time.sleep(0.02) - # print('gripper:', self.gripper) - - def update_joint_control(self): - """更新关节角度控制 - 优先使用主臂数据""" - if self.use_master_arm and self.master_joint_actions: - # 主从控制模式:直接使用主臂的关节角度 - try: - # 将主臂关节角度映射到从臂 - for i in range(6): # 假设只有6个关节需要控制 - joint_key = f"joint_{i+1}" - if joint_key in self.master_joint_actions: - # 直接使用主臂的关节角度(已经是度数) - self.joint[i] = self.master_joint_actions[joint_key] - - # 应用关节限制 - min_val, max_val = self.joint_limits[i] - self.joint[i] = max(min_val, min(max_val, self.joint[i])) - - # print(self.joint) - logging.debug(f"主臂关节映射到从臂: {self.joint[:6]}") - - except Exception as e: - logging.error(f"主臂数据映射错误: {e}") - - # 如果有主臂夹爪数据,使用主臂夹爪状态 - if self.use_master_arm and "grasp" in self.master_joint_actions: - self.gripper = self.master_joint_actions["grasp"] * 1000 - self.joint[-1] = self.gripper - - - def update_end_pose(self): - """更新末端位姿控制""" - # 根据控制模式调整步长 - current_linear_step = self.linear_step * (0.1 if self.fine_control_mode else 1.0) - current_angular_step = self.angular_step * (0.1 if self.fine_control_mode else 1.0) - - # 方向键控制XY - hat = self.joystick.get_hat(0) - hat_up = hat[1] == 1 # Y+ - hat_down = hat[1] == -1 # Y- - hat_left = hat[0] == -1 # X- - hat_right = hat[0] == 1 # X+ - - # 右摇杆控制Z - right_y_raw = -self.joystick.get_axis(4) - # 左摇杆控制RZ - left_y_raw = -self.joystick.get_axis(1) - - # 应用死区 - right_y = 0.0 if abs(right_y_raw) < self.deadzone else right_y_raw - left_y = 0.0 if abs(left_y_raw) < self.deadzone else left_y_raw - - # 计算各轴速度 - self.pose_speeds[1] = current_linear_step if hat_up else (-current_linear_step if hat_down else 0.0) # Y - self.pose_speeds[0] = -current_linear_step if hat_left else (current_linear_step if hat_right else 0.0) # X - - # 设置Z速度(右摇杆Y轴控制) - z_mapping = self._apply_nonlinear_mapping(right_y) - self.pose_speeds[2] = z_mapping * current_linear_step # Z - - # L1/R1控制RX旋转 - LB = self.joystick.get_button(4) # RX- - RB = self.joystick.get_button(5) # RX+ - self.pose_speeds[3] = (-current_angular_step if LB else (current_angular_step if RB else 0.0)) - - # △/□控制RY旋转 - triangle = self.joystick.get_button(2) # RY+ - square = self.joystick.get_button(3) # RY- - self.pose_speeds[4] = (current_angular_step if triangle else (-current_angular_step if square else 0.0)) - - # 左摇杆Y轴控制RZ旋转 - rz_mapping = self._apply_nonlinear_mapping(left_y) - self.pose_speeds[5] = rz_mapping * current_angular_step * 2 # RZ - - # 夹爪控制(圈/叉) - circle = self.joystick.get_button(1) # 夹爪开 - cross = self.joystick.get_button(0) # 夹爪关 - if circle: - self.gripper = min(self.max_gripper, self.gripper + self.gripper_speed) - elif cross: - self.gripper = max(self.min_gripper, self.gripper - self.gripper_speed) - - # 更新末端位姿 - for i in range(6): - self.pose[i] += self.pose_speeds[i] - - # 角度归一化处理 - for i in range(3, 6): - self.pose[i] = self._normalize_angle(self.pose[i]) - - def update_joint_state(self, joint): - self.joint = joint - # self.tozero = False - - def update_endpose_state(self, end_pose): - self.pose = end_pose - # self.tozero = False - - def update_tozero_state(self, tozero): - self.tozero = tozero - - - def get_action(self) -> Dict: - """获取当前控制命令""" - return { - 'control_mode': 'joint' if self.joint_control_mode else 'end_pose', - 'use_master_arm': self.use_master_arm, - 'master_joint_actions': self.master_joint_actions, - 'master_controller_status': self.master_controller_status, - 'end_pose': self.pose, - 'joint_angles': self.joint, - 'gripper': int(self.gripper), - 'tozero': self.tozero + # 存储所有控制器状态的字典 + self.states = { + 'buttons': [False] * self.joystick.get_numbuttons(), # 按钮状态 + 'axes': [0.0] * self.joystick.get_numaxes(), # 摇杆和轴状态 + 'hats': [(0, 0)] * self.joystick.get_numhats() # 舵状态 } - - def stop(self): - """停止控制器""" + + # deadzone + self.deadzone = 0.15 + # validzone + self.validzone = 0.05 + + # 线程控制标志 self.running = False - if self.thread.is_alive(): + + def start_polling(self): + """启动线程以轮询控制器状态""" + if not self.running: + self.running = True + self.thread = threading.Thread(target=self._poll_controller) + self.thread.start() + + def stop_polling(self): + """停止线程""" + if self.running: + self.running = False self.thread.join() - if self.servo_arm: - self.servo_arm.close() - pygame.quit() - print("混合控制器已退出") + + def _poll_controller(self): + """后台线程函数,用于轮询控制器状态""" + while self.running: + # 处理pygame事件 + pygame.event.pump() + + # 获取按钮状态 + for i in range(self.joystick.get_numbuttons()): + self.states['buttons'][i] = self.joystick.get_button(i) + + # 获取摇杆和轴状态(通常范围是 -1.0 到 1.0) + for i in range(self.joystick.get_numaxes()): + self.states['axes'][i] = self.joystick.get_axis(i) + + # 获取舵状态(通常返回一个元组 (x, y),值范围为 -1, 0, 1) + for i in range(self.joystick.get_numhats()): + self.states['hats'][i] = self.joystick.get_hat(i) + + # 控制轮询频率 + time.sleep(0.01) + + def get_raw_states(self): + """获取当前控制器状态""" + return self.states + +class FlightStick(ControllerBase): + def __init__(self, joystick_index): + super().__init__(joystick_index) + + def get_x_control_signal(self): + x = 0 + if self.states['axes'][0] > self.validzone: + x = 1 + elif self.states['axes'][0] < -self.validzone: + x = -1 + else: + x = 0 + return x - def reset(self): - """重置到初始状态""" - self.joint = self.init_joint.copy() - self.pose = self.init_pose.copy() - self.pose_speeds = [0.0] * 6 - self.joint_speeds = [0.0] * 6 - self.gripper_speed = 10 - self.gripper = self.min_gripper - print("已重置到初始状态") + def get_y_control_signal(self): + y = 0 + if self.states['axes'][1] > self.validzone: + y = 1 + elif self.states['axes'][1] < -self.validzone: + y = -1 + else: + y = 0 + return y + + def get_z_control_signal(self): + z = 0 + if self.states['buttons'][0]: + z = 1 + elif self.states['buttons'][1]: + z = -1 + else: + z = 0 + return z + + def get_gripper_control_signal(self): + gripper = 0 + if self.states['buttons'][2] == 1: + gripper = 1 + elif self.states['buttons'][3] == 1: + gripper = -1 + return gripper + + def get_vel_control_signal(self): + vel = self.deadzone + if self.states['axes'][2] > 0.8: + vel = self.deadzone * 2 + elif self.states['axes'][2] < -0.8: + vel = self.deadzone / 5 + else: + vel = self.deadzone + return vel + + def get_rx_control_signal(self): + rx = 0 + if self.states['hats'][0][0] == -1: + rx = -1 + elif self.states['hats'][0][0] == 1: + rx = 1 + else: + rx = 0 + return rx + + def get_ry_control_signal(self): + ry = 0 + if self.states['hats'][0][1] == 1: + ry = 1 + elif self.states['hats'][0][1] == -1: + ry = -1 + else: + ry = 0 + return ry + + def get_rz_control_signal(self): + rz = 0 + if self.states['axes'][3] < -self.validzone: + rz = -1 + elif self.states['axes'][3] > self.validzone: + rz = 1 + else: + rz = 0 + return rz + + def get_control_signal(self): + return{ + 'x': self.get_x_control_signal(), + 'y': self.get_y_control_signal(), + 'z': self.get_z_control_signal(), + 'rx': self.get_rx_control_signal(), + 'ry': self.get_ry_control_signal(), + 'rz': self.get_rz_control_signal(), + 'gripper': self.get_gripper_control_signal(), + 'vel': self.get_vel_control_signal() + } + + + +class XboxStick: + def __init__(self, joystick_index): + super().__init__(joystick_index) + + def get_x_control_signal(self): + """获取 X 轴控制信号""" + x = 0 + if self.states['hats'][0][0] == -1: + x = -1 + elif self.states['hats'][0][0] == 1: + x = 1 + return x + + def get_y_control_signal(self): + """获取 Y 轴控制信号""" + y = 0 + if self.states['hats'][0][1] == 1: + y = 1 + elif self.states['hats'][0][1] == -1: + y = -1 + return y + + def get_z_control_signal(self): + """获取 Z 轴控制信号""" + z = 0 + if self.states['axes'][4] > self.deadzone: # A 按钮 + z = 1 + elif self.states['axes'][4] < -self.deadzone: # B 按钮 + z = -1 + return z + + def get_vel_control_signal(self): + """获取速度控制信号""" + vel = self.deadzone + if self.states['axes'][2] > 0.8: # LT 按钮 + vel = self.deadzone * 2 + elif self.states['axes'][5] > 0.8: # RT 按钮 + vel = self.deadzone / 5 + return vel + + def get_gripper_control_signal(self): + gripper = 0 + if self.states['buttons'][0] == 1: + gripper = 1 + elif self.states['buttons'][1] == 1: + gripper = -1 + return gripper + + def get_rx_control_signal(self): + """获取 RX 轴控制信号""" + rx = 0 + if self.states['axes'][0] > self.deadzone: # 左舵 + rx = 1 + elif self.states['axes'][0] < -self.deadzone: # 右舵 + rx = -1 + return rx + + def get_ry_control_signal(self): + """获取 RY 轴控制信号""" + ry = 0 + if self.states['axes'][1] > self.deadzone: # 上舵 + ry = 1 + elif self.states['axes'][1] < -self.deadzone: # 下舵 + ry = -1 + return ry + + def get_rz_control_signal(self): + """获取 RZ 轴控制信号""" + rz = 0 + if self.states['buttons'][4] == 1: # 左摇杆 + rz = 1 + elif self.states['buttons'][5] == 1: # 右摇杆 + rz = -1 + return rz + + def get_control_signal(self): + """获取所有控制信号""" + return { + 'x': self.get_x_control_signal(), + 'y': self.get_y_control_signal(), + 'z': self.get_z_control_signal(), + 'rx': self.get_rx_control_signal(), + 'ry': self.get_ry_control_signal(), + 'rz': self.get_rz_control_signal(), + 'gripper': self.get_gripper_control_signal(), + 'vel': self.get_vel_control_signal() + } + + + +def get_usb_joystick_info(): + """获取所有 USB 游戏控制器的信息""" + context = pyudev.Context() + devices = [] + for device in context.list_devices(subsystem='input', ID_INPUT_JOYSTICK=1): + # 获取设备路径(如 /dev/input/js0) + device_path = device.device_node + if device_path: + # 获取 VID、PID 和 UUID(ID_SERIAL) + vid = device.get('ID_VENDOR_ID') + pid = device.get('ID_MODEL_ID') + uuid = device.get('ID_SERIAL_SHORT') # 设备的唯一标识符 + if vid and pid and uuid: + devices.append({ + 'path': device_path, + 'vid': int(vid, 16), # 转换为十六进制 + 'pid': int(pid, 16), # 转换为十六进制 + 'uuid': uuid # 设备的唯一标识符 + }) + return devices + + +def match_controller_index(joystick_info): + """将 USB 设备信息与 pygame 的控制器索引匹配""" + # 获取所有 pygame 控制器的设备路径 + pygame_joysticks = [] + for i in range(pygame.joystick.get_count()): + joystick = pygame.joystick.Joystick(i) + joystick.init() + pygame_joysticks.append({ + 'index': i, + 'name': joystick.get_name(), + 'guid': joystick.get_guid() + }) + # 匹配设备路径 + for usb_device in joystick_info: + for pygame_joystick in pygame_joysticks: + if usb_device['path'] in pygame_joystick['guid']: + usb_device['pygame_index'] = pygame_joystick['index'] + break + return joystick_info -# 使用示例 if __name__ == "__main__": - # 初始化睿尔曼机械臂 - arm = RoboticArm(rm_thread_mode_e.RM_TRIPLE_MODE_E) - # 创建机械臂连接 - handle = arm.rm_create_robot_arm("192.168.3.18", 8080) - print(f"机械臂连接ID: {handle.id}") - init_pose = arm.rm_get_current_arm_state()[1]['pose'] + # 初始化pygame + pygame.init() + pygame.joystick.init() + # 获取所有 USB 游戏控制器的信息 + usb_joysticks = get_usb_joystick_info() + matched_joysticks = match_controller_index(usb_joysticks) + print(usb_joysticks) + print(matched_joysticks) - with open('/home/maic/LYT/lerobot/lerobot/common/robot_devices/teleop/realman_mix.yaml', "r") as file: - config = yaml.safe_load(file) - config['init_pose'] = init_pose - arm_controller = HybridController(config) - try: - while True: - print(arm_controller.get_action()) - time.sleep(0.1) - except KeyboardInterrupt: - arm_controller.stop() \ No newline at end of file + + # stick = FlightStick() + # # stick = XboxStick() + # stick.start_polling() + + # try: + # while True: + # # 主程序可以继续执行其他任务 + # states = stick.get_raw_states() + # # states = stick.get_control_signal() + # print("当前状态:", states) + # time.sleep(1) + # except KeyboardInterrupt: + # stick.stop_polling() diff --git a/lerobot/common/robot_devices/teleop/gamepad_dual.py b/lerobot/common/robot_devices/teleop/realman_aloha_dual.py similarity index 96% rename from lerobot/common/robot_devices/teleop/gamepad_dual.py rename to lerobot/common/robot_devices/teleop/realman_aloha_dual.py index f0a4af40..6b61cdda 100644 --- a/lerobot/common/robot_devices/teleop/gamepad_dual.py +++ b/lerobot/common/robot_devices/teleop/realman_aloha_dual.py @@ -19,6 +19,7 @@ class ControllerConfig: max_gripper: int min_gripper: int config_file: str + end_controller: str = None class HybridController: @@ -37,7 +38,8 @@ class HybridController: init_pose=init_info.get('init_pose', [0]*12), max_gripper=init_info['max_gripper'], min_gripper=init_info['min_gripper'], - config_file=init_info['servo_config_file'] + config_file=init_info['servo_config_file'], + end_controller=init_info['end_controller'] ) def _initialize_master_arm(self): diff --git a/lerobot/common/robot_devices/teleop/realman_single.py b/lerobot/common/robot_devices/teleop/realman_single.py new file mode 100644 index 00000000..330d3ab1 --- /dev/null +++ b/lerobot/common/robot_devices/teleop/realman_single.py @@ -0,0 +1,466 @@ +import pygame +import threading +import time +import serial +import binascii +import logging +import yaml +from typing import Dict +from Robotic_Arm.rm_robot_interface import * + + + +class ServoArm: + def __init__(self, config_file="config.yaml"): + """初始化机械臂的串口连接并发送初始数据。 + + Args: + config_file (str): 配置文件的路径。 + """ + self.config = self._load_config(config_file) + self.port = self.config["port"] + self.baudrate = self.config["baudrate"] + self.joint_hex_data = self.config["joint_hex_data"] + self.control_hex_data = self.config["control_hex_data"] + self.arm_axis = self.config.get("arm_axis", 7) + + try: + self.serial_conn = serial.Serial(self.port, self.baudrate, timeout=0) + self.bytes_to_send = binascii.unhexlify(self.joint_hex_data.replace(" ", "")) + self.serial_conn.write(self.bytes_to_send) + time.sleep(1) + self.connected = True + logging.info(f"串口连接成功: {self.port}") + except Exception as e: + logging.error(f"串口连接失败: {e}") + self.connected = False + + def _load_config(self, config_file): + """加载配置文件。 + + Args: + config_file (str): 配置文件的路径。 + + Returns: + dict: 配置文件内容。 + """ + try: + with open(config_file, "r") as file: + config = yaml.safe_load(file) + return config + except Exception as e: + logging.error(f"配置文件加载失败: {e}") + # 返回默认配置 + return { + "port": "/dev/ttyUSB0", + "baudrate": 460800, + "joint_hex_data": "55 AA 02 00 00 67", + "control_hex_data": "55 AA 08 00 00 B9", + "arm_axis": 6 + } + + def _bytes_to_signed_int(self, byte_data): + """将字节数据转换为有符号整数。 + + Args: + byte_data (bytes): 字节数据。 + + Returns: + int: 有符号整数。 + """ + return int.from_bytes(byte_data, byteorder="little", signed=True) + + def _parse_joint_data(self, hex_received): + """解析接收到的十六进制数据并提取关节数据。 + + Args: + hex_received (str): 接收到的十六进制字符串数据。 + + Returns: + dict: 解析后的关节数据。 + """ + logging.debug(f"hex_received: {hex_received}") + joints = {} + for i in range(self.arm_axis): + start = 14 + i * 10 + end = start + 8 + joint_hex = hex_received[start:end] + joint_byte_data = bytearray.fromhex(joint_hex) + joint_value = self._bytes_to_signed_int(joint_byte_data) / 10000.0 + joints[f"joint_{i+1}"] = joint_value + grasp_start = 14 + self.arm_axis*10 + grasp_hex = hex_received[grasp_start:grasp_start+8] + grasp_byte_data = bytearray.fromhex(grasp_hex) + # 夹爪进行归一化处理 + grasp_value = self._bytes_to_signed_int(grasp_byte_data)/1000 + + joints["grasp"] = grasp_value + return joints + + def _parse_controller_data(self, hex_received): + status = { + 'infrared': 0, + 'button': 0 + } + if len(hex_received) == 18: + status['infrared'] = self._bytes_to_signed_int(bytearray.fromhex(hex_received[12:14])) + status['button'] = self._bytes_to_signed_int(bytearray.fromhex(hex_received[14:16])) + # print(infrared) + return status + + def get_joint_actions(self): + """从串口读取数据并解析关节动作。 + + Returns: + dict: 包含关节数据的字典。 + """ + if not self.connected: + return {} + + try: + self.serial_conn.write(self.bytes_to_send) + time.sleep(0.02) + bytes_received = self.serial_conn.read(self.serial_conn.inWaiting()) + if len(bytes_received) == 0: + return {} + + hex_received = binascii.hexlify(bytes_received).decode("utf-8").upper() + actions = self._parse_joint_data(hex_received) + return actions + except Exception as e: + logging.error(f"读取串口数据错误: {e}") + return {} + + def get_controller_status(self): + bytes_to_send = binascii.unhexlify(self.control_hex_data.replace(" ", "")) + self.serial_conn.write(bytes_to_send) + time.sleep(0.02) + bytes_received = self.serial_conn.read(self.serial_conn.inWaiting()) + hex_received = binascii.hexlify(bytes_received).decode("utf-8").upper() + # print("control status:", hex_received) + status = self._parse_controller_data(hex_received) + return status + + def close(self): + """关闭串口连接""" + if self.connected and hasattr(self, 'serial_conn'): + self.serial_conn.close() + self.connected = False + logging.info("串口连接已关闭") + + +class HybridController: + def __init__(self, init_info): + # 初始化pygame和手柄 + pygame.init() + pygame.joystick.init() + + # 检查是否有连接的手柄 + if pygame.joystick.get_count() == 0: + raise Exception("未检测到手柄") + + # 初始化手柄 + self.joystick = pygame.joystick.Joystick(0) + self.joystick.init() + # 摇杆死区 + self.deadzone = 0.15 + # 控制模式: True为关节控制(主模式),False为末端控制 + self.joint_control_mode = True + # 精细控制模式 + self.fine_control_mode = False + + # 初始化末端姿态和关节角度 + self.init_joint = init_info['init_joint'] + self.init_pose = init_info.get('init_pose', [0]*6) + self.max_gripper = init_info['max_gripper'] + self.min_gripper = init_info['min_gripper'] + servo_config_file = init_info['servo_config_file'] + self.joint = self.init_joint.copy() + self.pose = self.init_pose.copy() + self.pose_speeds = [0.0] * 6 + self.joint_speeds = [0.0] * 6 + self.tozero = False + + # 主臂关节状态 + self.master_joint_actions = {} + self.master_controller_status = {} + self.use_master_arm = False + + # 末端位姿限制 + self.pose_limits = [ + (-0.800, 0.800), # X (m) + (-0.800, 0.800), # Y (m) + (-0.800, 0.800), # Z (m) + (-3.14, 3.14), # RX (rad) + (-3.14, 3.14), # RY (rad) + (-3.14, 3.14) # RZ (rad) + ] + + # 关节角度限制 (度) + self.joint_limits = [ + (-180, 180), # joint 1 + (-180, 180), # joint 2 + (-180, 180), # joint 3 + (-180, 180), # joint 4 + (-180, 180), # joint 5 + (-180, 180) # joint 6 + ] + + # 控制参数 + self.linear_step = 0.002 # 线性移动步长(m) + self.angular_step = 0.01 # 角度步长(rad) + + # 夹爪状态和速度 + self.gripper_speed = 10 + self.gripper = self.min_gripper + + # 初始化串口通信(主臂关节状态获取) + self.servo_arm = None + if servo_config_file: + try: + self.servo_arm = ServoArm(servo_config_file) + self.use_master_arm = True + logging.info("串口主臂连接成功,启用主从控制模式") + except Exception as e: + logging.error(f"串口主臂连接失败: {e}") + self.use_master_arm = False + + # 启动更新线程 + self.running = True + self.thread = threading.Thread(target=self.update_controller) + self.thread.start() + + print("混合控制器已启动") + print("主控制模式: 关节控制") + if self.use_master_arm: + print("主从控制: 启用") + print("Back按钮: 切换控制模式(关节/末端)") + print("L3按钮: 切换精细控制模式") + print("Start按钮: 重置到初始位置") + + def _apply_nonlinear_mapping(self, value): + """应用非线性映射以提高控制精度""" + sign = 1 if value >= 0 else -1 + return sign * (abs(value) ** 2) + + def _normalize_angle(self, angle): + """将角度归一化到[-π, π]范围内""" + import math + while angle > math.pi: + angle -= 2 * math.pi + while angle < -math.pi: + angle += 2 * math.pi + return angle + + def update_controller(self): + while self.running: + try: + pygame.event.pump() + except Exception as e: + print(f"控制器错误: {e}") + self.stop() + continue + + # 检查控制模式切换 (Back按钮) + if self.joystick.get_button(6): # Back按钮 + self.joint_control_mode = not self.joint_control_mode + mode_str = "关节控制" if self.joint_control_mode else "末端位姿控制" + print(f"切换到{mode_str}模式") + time.sleep(0.3) # 防止多次触发 + + # 检查精细控制模式切换 (L3按钮) + if self.joystick.get_button(10): # L3按钮 + self.fine_control_mode = not self.fine_control_mode + print(f"切换到{'精细' if self.fine_control_mode else '普通'}控制模式") + time.sleep(0.3) # 防止多次触发 + + # 检查重置按钮 (Start按钮) + if self.joystick.get_button(7): # Start按钮 + print("重置机械臂到初始位置...") + # print("init_joint", self.init_joint.copy()) + self.tozero = True + self.joint = self.init_joint.copy() + self.pose = self.init_pose.copy() + self.pose_speeds = [0.0] * 6 + self.joint_speeds = [0.0] * 6 + self.gripper_speed = 10 + self.gripper = self.min_gripper + print("机械臂已重置到初始位置") + time.sleep(0.3) # 防止多次触发 + + # 从串口获取主臂关节状态 + if self.servo_arm and self.servo_arm.connected: + try: + self.master_joint_actions = self.servo_arm.get_joint_actions() + self.master_controller_status = self.servo_arm.get_controller_status() + if self.master_joint_actions: + logging.debug(f"主臂关节状态: {self.master_joint_actions}") + + except Exception as e: + logging.error(f"获取主臂状态错误: {e}") + self.master_joint_actions = {} + # print(self.master_joint_actions) + + # 根据控制模式更新相应的控制逻辑 + if self.joint_control_mode: + # 关节控制模式下,优先使用主臂数据,Xbox作为辅助 + self.update_joint_control() + else: + # 末端控制模式,使用Xbox控制 + self.update_end_pose() + time.sleep(0.02) + # print('gripper:', self.gripper) + + def update_joint_control(self): + """更新关节角度控制 - 优先使用主臂数据""" + if self.use_master_arm and self.master_joint_actions: + # 主从控制模式:直接使用主臂的关节角度 + try: + # 将主臂关节角度映射到从臂 + for i in range(6): # 假设只有6个关节需要控制 + joint_key = f"joint_{i+1}" + if joint_key in self.master_joint_actions: + # 直接使用主臂的关节角度(已经是度数) + self.joint[i] = self.master_joint_actions[joint_key] + + # 应用关节限制 + min_val, max_val = self.joint_limits[i] + self.joint[i] = max(min_val, min(max_val, self.joint[i])) + + # print(self.joint) + logging.debug(f"主臂关节映射到从臂: {self.joint[:6]}") + + except Exception as e: + logging.error(f"主臂数据映射错误: {e}") + + # 如果有主臂夹爪数据,使用主臂夹爪状态 + if self.use_master_arm and "grasp" in self.master_joint_actions: + self.gripper = self.master_joint_actions["grasp"] * 1000 + self.joint[-1] = self.gripper + + + def update_end_pose(self): + """更新末端位姿控制""" + # 根据控制模式调整步长 + current_linear_step = self.linear_step * (0.1 if self.fine_control_mode else 1.0) + current_angular_step = self.angular_step * (0.1 if self.fine_control_mode else 1.0) + + # 方向键控制XY + hat = self.joystick.get_hat(0) + hat_up = hat[1] == 1 # Y+ + hat_down = hat[1] == -1 # Y- + hat_left = hat[0] == -1 # X- + hat_right = hat[0] == 1 # X+ + + # 右摇杆控制Z + right_y_raw = -self.joystick.get_axis(4) + # 左摇杆控制RZ + left_y_raw = -self.joystick.get_axis(1) + + # 应用死区 + right_y = 0.0 if abs(right_y_raw) < self.deadzone else right_y_raw + left_y = 0.0 if abs(left_y_raw) < self.deadzone else left_y_raw + + # 计算各轴速度 + self.pose_speeds[1] = current_linear_step if hat_up else (-current_linear_step if hat_down else 0.0) # Y + self.pose_speeds[0] = -current_linear_step if hat_left else (current_linear_step if hat_right else 0.0) # X + + # 设置Z速度(右摇杆Y轴控制) + z_mapping = self._apply_nonlinear_mapping(right_y) + self.pose_speeds[2] = z_mapping * current_linear_step # Z + + # L1/R1控制RX旋转 + LB = self.joystick.get_button(4) # RX- + RB = self.joystick.get_button(5) # RX+ + self.pose_speeds[3] = (-current_angular_step if LB else (current_angular_step if RB else 0.0)) + + # △/□控制RY旋转 + triangle = self.joystick.get_button(2) # RY+ + square = self.joystick.get_button(3) # RY- + self.pose_speeds[4] = (current_angular_step if triangle else (-current_angular_step if square else 0.0)) + + # 左摇杆Y轴控制RZ旋转 + rz_mapping = self._apply_nonlinear_mapping(left_y) + self.pose_speeds[5] = rz_mapping * current_angular_step * 2 # RZ + + # 夹爪控制(圈/叉) + circle = self.joystick.get_button(1) # 夹爪开 + cross = self.joystick.get_button(0) # 夹爪关 + if circle: + self.gripper = min(self.max_gripper, self.gripper + self.gripper_speed) + elif cross: + self.gripper = max(self.min_gripper, self.gripper - self.gripper_speed) + + # 更新末端位姿 + for i in range(6): + self.pose[i] += self.pose_speeds[i] + + # 角度归一化处理 + for i in range(3, 6): + self.pose[i] = self._normalize_angle(self.pose[i]) + + def update_joint_state(self, joint): + self.joint = joint + # self.tozero = False + + def update_endpose_state(self, end_pose): + self.pose = end_pose + # self.tozero = False + + def update_tozero_state(self, tozero): + self.tozero = tozero + + + def get_action(self) -> Dict: + """获取当前控制命令""" + return { + 'control_mode': 'joint' if self.joint_control_mode else 'end_pose', + 'use_master_arm': self.use_master_arm, + 'master_joint_actions': self.master_joint_actions, + 'master_controller_status': self.master_controller_status, + 'end_pose': self.pose, + 'joint_angles': self.joint, + 'gripper': int(self.gripper), + 'tozero': self.tozero + } + + def stop(self): + """停止控制器""" + self.running = False + if self.thread.is_alive(): + self.thread.join() + if self.servo_arm: + self.servo_arm.close() + pygame.quit() + print("混合控制器已退出") + + def reset(self): + """重置到初始状态""" + self.joint = self.init_joint.copy() + self.pose = self.init_pose.copy() + self.pose_speeds = [0.0] * 6 + self.joint_speeds = [0.0] * 6 + self.gripper_speed = 10 + self.gripper = self.min_gripper + print("已重置到初始状态") + + +# 使用示例 +if __name__ == "__main__": + # 初始化睿尔曼机械臂 + arm = RoboticArm(rm_thread_mode_e.RM_TRIPLE_MODE_E) + # 创建机械臂连接 + handle = arm.rm_create_robot_arm("192.168.3.18", 8080) + print(f"机械臂连接ID: {handle.id}") + init_pose = arm.rm_get_current_arm_state()[1]['pose'] + + with open('/home/maic/LYT/lerobot/lerobot/common/robot_devices/teleop/realman_mix.yaml', "r") as file: + config = yaml.safe_load(file) + config['init_pose'] = init_pose + arm_controller = HybridController(config) + try: + while True: + print(arm_controller.get_action()) + time.sleep(0.1) + except KeyboardInterrupt: + arm_controller.stop() \ No newline at end of file diff --git a/realman_src/realman_flight.py b/realman_src/realman_flight.py new file mode 100644 index 00000000..a8b048ad --- /dev/null +++ b/realman_src/realman_flight.py @@ -0,0 +1,140 @@ +import pygame +import time +import sys + +class FlightStickDetector: + def __init__(self): + # 初始化pygame + pygame.init() + pygame.joystick.init() + + self.joysticks = [] + self.running = True + + def detect_joysticks(self): + """检测连接的手柄""" + joystick_count = pygame.joystick.get_count() + print(f"检测到 {joystick_count} 个手柄设备") + + if joystick_count == 0: + print("未检测到任何手柄设备!") + return False + + # 初始化所有检测到的手柄 + self.joysticks = [] + for i in range(joystick_count): + joystick = pygame.joystick.Joystick(i) + joystick.init() + self.joysticks.append(joystick) + + print(f"\n手柄 {i}:") + print(f" 名称: {joystick.get_name()}") + print(f" 轴数量: {joystick.get_numaxes()}") + print(f" 按钮数量: {joystick.get_numbuttons()}") + print(f" 帽子开关数量: {joystick.get_numhats()}") + + return True + + def get_joystick_data(self, joystick_id=0): + """获取指定手柄的数据""" + if joystick_id >= len(self.joysticks): + return None + + joystick = self.joysticks[joystick_id] + + # 获取轴数据 + axes = [] + for i in range(joystick.get_numaxes()): + axis_value = joystick.get_axis(i) + axes.append(round(axis_value, 3)) + + # 获取按钮数据 + buttons = [] + for i in range(joystick.get_numbuttons()): + button_pressed = joystick.get_button(i) + buttons.append(button_pressed) + + # 获取帽子开关数据 + hats = [] + for i in range(joystick.get_numhats()): + hat_value = joystick.get_hat(i) + hats.append(hat_value) + + return { + 'name': joystick.get_name(), + 'axes': axes, + 'buttons': buttons, + 'hats': hats + } + + def monitor_joystick(self, joystick_id=0, update_interval=0.1): + """实时监控手柄数据""" + print(f"\n开始监控手柄 {joystick_id} (按Ctrl+C停止)") + print("=" * 50) + + try: + while self.running: + pygame.event.pump() # 更新事件队列 + + data = self.get_joystick_data(joystick_id) + if data is None: + print("无法获取手柄数据") + break + + # 清屏并显示数据 + print("\033[H\033[J", end="") # ANSI清屏 + print(f"手柄: {data['name']}") + print("-" * 40) + + # 显示轴数据(通常用于飞行控制) + axis_names = ['X轴(副翼)', 'Y轴(升降舵)', 'Z轴(方向舵)', + '油门', '轴4', '轴5', '轴6', '轴7'] + + print("轴数据:") + for i, value in enumerate(data['axes']): + name = axis_names[i] if i < len(axis_names) else f'轴{i}' + # 创建进度条显示 + bar_length = 20 + bar_pos = int((value + 1) * bar_length / 2) + bar = ['─'] * bar_length + if 0 <= bar_pos < bar_length: + bar[bar_pos] = '█' + bar_str = ''.join(bar) + print(f" {name:12}: {value:6.3f} [{bar_str}]") + + # 显示按钮数据 + pressed_buttons = [i for i, pressed in enumerate(data['buttons']) if pressed] + if pressed_buttons: + print(f"\n按下的按钮: {pressed_buttons}") + + # 显示帽子开关数据 + if data['hats']: + print(f"帽子开关: {data['hats']}") + + time.sleep(update_interval) + + except KeyboardInterrupt: + print("\n\n监控已停止") + + def cleanup(self): + """清理资源""" + pygame.joystick.quit() + pygame.quit() + +# 使用示例 +def main(): + detector = FlightStickDetector() + + try: + # 检测手柄 + if detector.detect_joysticks(): + # 如果检测到手柄,开始监控第一个手柄 + detector.monitor_joystick(0) + else: + print("请连接飞行手柄后重试") + + finally: + detector.cleanup() + +if __name__ == "__main__": + main()