#!/usr/bin/env python3 # -*-coding:utf8-*- from typing import Optional import time import yaml import serial import logging import binascii import numpy as np from Robotic_Arm.rm_robot_interface import * import pygame import threading from typing import Dict # 配置日志记录 logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" ) def enable_fun(arm: RoboticArm): ''' 使能机械臂并检测使能状态,尝试5s,如果使能超时则退出程序 ''' enable_flag = False # 设置超时时间(秒) timeout = 5 # 记录进入循环前的时间 start_time = time.time() elapsed_time_flag = False while not enable_flag: elapsed_time = time.time() - start_time print("--------------------") # 获取机械臂状态 ret = arm.rm_get_current_arm_state() if ret[0] == 0: # 成功获取状态 enable_flag = True print("使能状态:", enable_flag) print("--------------------") # 检查是否超过超时时间 if elapsed_time > timeout: print("超时....") elapsed_time_flag = True enable_flag = True break time.sleep(1) if elapsed_time_flag: print("程序自动使能超时,退出程序") exit(0) class ServoArm: def __init__(self, config_file="config.yaml"): """初始化机械臂的串口连接并发送初始数据。 Args: config_file (str): 配置文件的路径。 """ self.config = self._load_config(config_file) self.port = self.config["port"] self.baudrate = self.config["baudrate"] self.hex_data = self.config["hex_data"] self.arm_axis = self.config.get("arm_axis", 7) try: self.serial_conn = serial.Serial(self.port, self.baudrate, timeout=0) self.bytes_to_send = binascii.unhexlify(self.hex_data.replace(" ", "")) self.serial_conn.write(self.bytes_to_send) time.sleep(1) self.connected = True logging.info(f"串口连接成功: {self.port}") except Exception as e: logging.error(f"串口连接失败: {e}") self.connected = False def _load_config(self, config_file): """加载配置文件。 Args: config_file (str): 配置文件的路径。 Returns: dict: 配置文件内容。 """ try: with open(config_file, "r") as file: config = yaml.safe_load(file) return config except Exception as e: logging.error(f"配置文件加载失败: {e}") # 返回默认配置 return { "port": "/dev/ttyUSB0", "baudrate": 460800, "hex_data": "55 AA 02 00 00 67", "arm_axis": 6 } def _bytes_to_signed_int(self, byte_data): """将字节数据转换为有符号整数。 Args: byte_data (bytes): 字节数据。 Returns: int: 有符号整数。 """ return int.from_bytes(byte_data, byteorder="little", signed=True) def _parse_joint_data(self, hex_received): """解析接收到的十六进制数据并提取关节数据。 Args: hex_received (str): 接收到的十六进制字符串数据。 Returns: dict: 解析后的关节数据。 """ logging.debug(f"hex_received: {hex_received}") joints = {} for i in range(self.arm_axis): start = 14 + i * 10 end = start + 8 joint_hex = hex_received[start:end] joint_byte_data = bytearray.fromhex(joint_hex) joint_value = self._bytes_to_signed_int(joint_byte_data) / 10000.0 joints[f"joint_{i+1}"] = joint_value grasp_start = 14 + self.arm_axis*10 grasp_hex = hex_received[grasp_start:grasp_start+8] grasp_byte_data = bytearray.fromhex(grasp_hex) # 夹爪进行归一化处理 grasp_value = self._bytes_to_signed_int(grasp_byte_data)/1000 joints["grasp"] = grasp_value return joints def get_joint_actions(self): """从串口读取数据并解析关节动作。 Returns: dict: 包含关节数据的字典。 """ if not self.connected: return {} try: self.serial_conn.write(self.bytes_to_send) bytes_received = self.serial_conn.read(self.serial_conn.inWaiting()) if len(bytes_received) == 0: return {} hex_received = binascii.hexlify(bytes_received).decode("utf-8").upper() actions = self._parse_joint_data(hex_received) return actions except Exception as e: logging.error(f"读取串口数据错误: {e}") return {} def set_gripper_action(self, action): """设置夹爪动作。 Args: action (int): 夹爪动作值。 """ if not self.connected: return try: action = int(action * 1000) action_bytes = action.to_bytes(4, byteorder="little", signed=True) self.bytes_to_send = self.bytes_to_send[:74] + action_bytes + self.bytes_to_send[78:] except Exception as e: logging.error(f"设置夹爪动作错误: {e}") def close(self): """关闭串口连接""" if self.connected and hasattr(self, 'serial_conn'): self.serial_conn.close() self.connected = False logging.info("串口连接已关闭") class HybridController: def __init__(self, init_info): # 初始化pygame和手柄 pygame.init() pygame.joystick.init() # 检查是否有连接的手柄 if pygame.joystick.get_count() == 0: raise Exception("未检测到手柄") # 初始化手柄 self.joystick = pygame.joystick.Joystick(0) self.joystick.init() # 摇杆死区 self.deadzone = 0.15 # 控制模式: True为关节控制(主模式),False为末端控制 self.joint_control_mode = True # 精细控制模式 self.fine_control_mode = False # 初始化末端姿态和关节角度 self.init_joint = init_info['init_joint'] self.init_pose = init_info['init_pose'] self.max_gripper = init_info['max_gripper'] self.min_gripper = init_info['min_gripper'] servo_config_file = init_info['servo_config_file'] self.joint = self.init_joint.copy() self.pose = self.init_pose.copy() self.pose_speeds = [0.0] * 6 self.joint_speeds = [0.0] * 6 # 主臂关节状态 self.master_joint_actions = {} self.use_master_arm = False # 末端位姿限制 self.pose_limits = [ (-0.850, 0.850), # X (m) (-0.850, 0.850), # Y (m) (0.850, 0.850), # Z (m) (-3.14, 3.14), # RX (rad) (-3.14, 3.14), # RY (rad) (-3.14, 3.14) # RZ (rad) ] # 关节角度限制 (度) self.joint_limits = [ (-180, 180), # joint 1 (-180, 180), # joint 2 (-180, 180), # joint 3 (-180, 180), # joint 4 (-180, 180), # joint 5 (-180, 180) # joint 6 ] # 控制参数 self.linear_step = 0.005 # 线性移动步长(m) self.angular_step = 0.05 # 角度步长(rad) # 夹爪状态和速度 self.gripper_speed = 10 self.gripper = self.min_gripper # 初始化串口通信(主臂关节状态获取) self.servo_arm = None if servo_config_file: try: self.servo_arm = ServoArm(servo_config_file) self.use_master_arm = True logging.info("串口主臂连接成功,启用主从控制模式") except Exception as e: logging.error(f"串口主臂连接失败: {e}") self.use_master_arm = False # 启动更新线程 self.running = True self.thread = threading.Thread(target=self.update_controller) self.thread.start() print("混合控制器已启动") print("主控制模式: 关节控制") if self.use_master_arm: print("主从控制: 启用") print("Back按钮: 切换控制模式(关节/末端)") print("L3按钮: 切换精细控制模式") print("Start按钮: 重置到初始位置") def _apply_nonlinear_mapping(self, value): """应用非线性映射以提高控制精度""" sign = 1 if value >= 0 else -1 return sign * (abs(value) ** 2) def _normalize_angle(self, angle): """将角度归一化到[-π, π]范围内""" import math while angle > math.pi: angle -= 2 * math.pi while angle < -math.pi: angle += 2 * math.pi return angle def update_controller(self): while self.running: try: pygame.event.pump() except Exception as e: print(f"控制器错误: {e}") self.stop() continue # 检查控制模式切换 (Back按钮) if self.joystick.get_button(6): # Back按钮 self.joint_control_mode = not self.joint_control_mode mode_str = "关节控制" if self.joint_control_mode else "末端位姿控制" print(f"切换到{mode_str}模式") time.sleep(0.3) # 防止多次触发 # 检查精细控制模式切换 (L3按钮) if self.joystick.get_button(10): # L3按钮 self.fine_control_mode = not self.fine_control_mode print(f"切换到{'精细' if self.fine_control_mode else '普通'}控制模式") time.sleep(0.3) # 防止多次触发 # 检查重置按钮 (Start按钮) if self.joystick.get_button(7): # Start按钮 print("重置机械臂到初始位置...") self.joint = self.init_joint.copy() self.pose = self.init_pose.copy() self.pose_speeds = [0.0] * 6 self.joint_speeds = [0.0] * 6 self.gripper_speed = 10 self.gripper = self.min_gripper print("机械臂已重置到初始位置") time.sleep(0.3) # 防止多次触发 # 从串口获取主臂关节状态 if self.servo_arm and self.servo_arm.connected: try: self.master_joint_actions = self.servo_arm.get_joint_actions() if self.master_joint_actions: logging.debug(f"主臂关节状态: {self.master_joint_actions}") except Exception as e: logging.error(f"获取主臂状态错误: {e}") self.master_joint_actions = {} # print(self.master_joint_actions) # 根据控制模式更新相应的控制逻辑 if self.joint_control_mode: # 关节控制模式下,优先使用主臂数据,Xbox作为辅助 self.update_joint_control() else: # 末端控制模式,使用Xbox控制 self.update_end_pose() time.sleep(0.02) # print('gripper:', self.gripper) def update_joint_control(self): """更新关节角度控制 - 优先使用主臂数据""" if self.use_master_arm and self.master_joint_actions: # 主从控制模式:直接使用主臂的关节角度 try: # 将主臂关节角度映射到从臂 for i in range(6): # 假设只有6个关节需要控制 joint_key = f"joint_{i+1}" if joint_key in self.master_joint_actions: # 直接使用主臂的关节角度(已经是度数) self.joint[i] = self.master_joint_actions[joint_key] # 应用关节限制 min_val, max_val = self.joint_limits[i] self.joint[i] = max(min_val, min(max_val, self.joint[i])) # print(self.joint) logging.debug(f"主臂关节映射到从臂: {self.joint[:6]}") except Exception as e: logging.error(f"主臂数据映射错误: {e}") # 如果有主臂夹爪数据,使用主臂夹爪状态 if self.use_master_arm and "grasp" in self.master_joint_actions: self.gripper = self.master_joint_actions["grasp"] * 1000 def update_end_pose(self): """更新末端位姿控制""" # 根据控制模式调整步长 current_linear_step = self.linear_step * (0.1 if self.fine_control_mode else 1.0) current_angular_step = self.angular_step * (0.1 if self.fine_control_mode else 1.0) # 方向键控制XY hat = self.joystick.get_hat(0) hat_up = hat[1] == 1 # Y+ hat_down = hat[1] == -1 # Y- hat_left = hat[0] == -1 # X- hat_right = hat[0] == 1 # X+ # 右摇杆控制Z right_y_raw = -self.joystick.get_axis(4) # 左摇杆控制RZ left_y_raw = -self.joystick.get_axis(1) # 应用死区 right_y = 0.0 if abs(right_y_raw) < self.deadzone else right_y_raw left_y = 0.0 if abs(left_y_raw) < self.deadzone else left_y_raw # 计算各轴速度 self.pose_speeds[1] = current_linear_step if hat_up else (-current_linear_step if hat_down else 0.0) # Y self.pose_speeds[0] = -current_linear_step if hat_left else (current_linear_step if hat_right else 0.0) # X # 设置Z速度(右摇杆Y轴控制) z_mapping = self._apply_nonlinear_mapping(right_y) self.pose_speeds[2] = z_mapping * current_linear_step # Z # L1/R1控制RX旋转 LB = self.joystick.get_button(4) # RX- RB = self.joystick.get_button(5) # RX+ self.pose_speeds[3] = (-current_angular_step if LB else (current_angular_step if RB else 0.0)) # △/□控制RY旋转 triangle = self.joystick.get_button(2) # RY+ square = self.joystick.get_button(3) # RY- self.pose_speeds[4] = (current_angular_step if triangle else (-current_angular_step if square else 0.0)) # 左摇杆Y轴控制RZ旋转 rz_mapping = self._apply_nonlinear_mapping(left_y) self.pose_speeds[5] = rz_mapping * current_angular_step * 2 # RZ # 夹爪控制(圈/叉) circle = self.joystick.get_button(1) # 夹爪开 cross = self.joystick.get_button(0) # 夹爪关 if circle: self.gripper = min(self.max_gripper, self.gripper + self.gripper_speed) elif cross: self.gripper = max(self.min_gripper, self.gripper - self.gripper_speed) # 更新末端位姿 for i in range(6): self.pose[i] += self.pose_speeds[i] # 角度归一化处理 for i in range(3, 6): self.pose[i] = self._normalize_angle(self.pose[i]) def update_joint_state(self, joint): self.joint = joint def update_endpose_state(self, end_pose): self.pose = end_pose def update_state(self, state): """更新状态信息(从机械臂获取当前状态)""" self.pose = state['end_pose'] self.joint = state['joint'] self.gripper = state['gripper'] def get_action(self) -> Dict: """获取当前控制命令""" return { 'control_mode': 'joint' if self.joint_control_mode else 'end_pose', 'use_master_arm': self.use_master_arm, 'master_joint_actions': self.master_joint_actions, 'end_pose': { 'X': self.pose[0], 'Y': self.pose[1], 'Z': self.pose[2], 'RX': self.pose[3], 'RY': self.pose[4], 'RZ': self.pose[5], }, 'joint_angles': self.joint, 'gripper': int(self.gripper) } def stop(self): """停止控制器""" self.running = False if self.thread.is_alive(): self.thread.join() if self.servo_arm: self.servo_arm.close() pygame.quit() print("混合控制器已退出") def reset(self): """重置到初始状态""" self.joint = self.init_joint.copy() self.pose = self.init_pose.copy() self.pose_speeds = [0.0] * 6 self.joint_speeds = [0.0] * 6 self.gripper_speed = 10 self.gripper = self.min_gripper print("已重置到初始状态") if __name__ == "__main__": # 初始化睿尔曼机械臂 arm = RoboticArm(rm_thread_mode_e.RM_TRIPLE_MODE_E) # 创建机械臂连接 handle = arm.rm_create_robot_arm("192.168.3.18", 8080) print(f"机械臂连接ID: {handle.id}") # 使能机械臂 enable_fun(arm=arm) # 初始关节角度和位姿 init_joint = [-90, 90, 90, -90, -90, 90] max_gripper = 990 min_gripper = 10 arm.rm_movej(init_joint, 50, 0, 0, 1) arm.rm_set_gripper_position(max_gripper, True, 2) arm.rm_set_gripper_position(min_gripper, True, 2) init_pose = arm.rm_get_current_arm_state()[1]['pose'] init_info = { 'init_pose': init_pose, 'init_joint': init_joint, 'max_gripper': max_gripper, 'min_gripper': min_gripper, 'servo_config_file': "/home/maic/LYT/lerobot/realman_src/realman_aloha/shadow_rm_robot/config/servo_arm.yaml" } try: teleop = HybridController(init_info) except Exception as e: print(f"初始化失败: {e}") exit(1) try: while True: # 获取当前控制命令 action = teleop.get_action() if action['control_mode'] == 'joint': # 关节控制模式(主模式) target_joints = action['joint_angles'] ret = arm.rm_get_current_arm_state() current_pose = ret[1].get('pose', init_pose) teleop.update_endpose_state(current_pose) else: # 末端位姿控制模式 target_pose = [ action['end_pose']['X'], # X (m) action['end_pose']['Y'], # Y (m) action['end_pose']['Z'], # Z (m) action['end_pose']['RX'], # RX (rad) action['end_pose']['RY'], # RY (rad) action['end_pose']['RZ'] # RZ (rad) ] # 使用笛卡尔空间运动控制末端位姿 result = arm.rm_movej_p(target_pose, 50, 0, 0, 0) ret = arm.rm_get_current_arm_state() current_joint = ret[1].get('joint', init_joint) teleop.update_joint_state(current_joint) # 夹爪控制 arm.rm_set_gripper_position(action['gripper'], True, 2) # 获取当前机械臂状态 ret = arm.rm_get_current_arm_state() if ret[0] == 0: # 成功获取状态 current_pose = ret[1].get('pose', [0]*6) current_joint = ret[1].get('joint', [0]*6) print(f"当前位姿: {current_pose}") print(f"当前关节: {current_joint[:6]}") # 只显示前6个关节 else: print(f"获取机械臂状态失败,错误码: {ret[0]}") print("-" * 80) # time.sleep(0.1) except KeyboardInterrupt: print("程序被用户中断") finally: # 清理资源 teleop.stop() arm.rm_delete_robot_arm() print("程序退出完成")