#!/usr/bin/env python # -*- coding: utf-8 -*- import time import yaml import serial import logging import binascii import numpy as np # 配置日志记录 logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" ) class ServoArm: def __init__(self, config_file="config.yaml"): """初始化机械臂的串口连接并发送初始数据。 Args: config_file (str): 配置文件的路径。 """ self.config = self._load_config(config_file) self.port = self.config["port"] self.baudrate = self.config["baudrate"] self.joint_hex_data = self.config["joint_hex_data"] self.control_hex_data = self.config["control_hex_data"] self.arm_axis = self.config.get("arm_axis", 7) self.serial_conn = serial.Serial(self.port, self.baudrate, timeout=0) self.bytes_to_send = binascii.unhexlify(self.joint_hex_data.replace(" ", "")) self.serial_conn.write(self.bytes_to_send) time.sleep(1) def _load_config(self, config_file): """加载配置文件。 Args: config_file (str): 配置文件的路径。 Returns: dict: 配置文件内容。 """ with open(config_file, "r") as file: config = yaml.safe_load(file) return config def _bytes_to_signed_int(self, byte_data): """将字节数据转换为有符号整数。 Args: byte_data (bytes): 字节数据。 Returns: int: 有符号整数。 """ return int.from_bytes(byte_data, byteorder="little", signed=True) def _parse_joint_data(self, hex_received): """解析接收到的十六进制数据并提取关节数据。 Args: hex_received (str): 接收到的十六进制字符串数据。 Returns: dict: 解析后的关节数据。 """ logging.debug(f"hex_received: {hex_received}") joints = {} for i in range(self.arm_axis): start = 14 + i * 10 end = start + 8 joint_hex = hex_received[start:end] joint_byte_data = bytearray.fromhex(joint_hex) joint_value = self._bytes_to_signed_int(joint_byte_data) / 10000.0 joints[f"joint_{i+1}"] = joint_value grasp_start = 14 + self.arm_axis*10 grasp_hex = hex_received[grasp_start:grasp_start+8] grasp_byte_data = bytearray.fromhex(grasp_hex) # 夹爪进行归一化处理 grasp_value = self._bytes_to_signed_int(grasp_byte_data)/1000 # print(grasp_value) joints["grasp"] = grasp_value return joints def _parse_controller_data(self, hex_received): status = { 'infrared': 0, 'button': 0 } if len(hex_received) == 18: status['infrared'] = self._bytes_to_signed_int(bytearray.fromhex(hex_received[12:14])) status['button'] = self._bytes_to_signed_int(bytearray.fromhex(hex_received[14:16])) # print(infrared) return status def get_joint_actions(self): """从串口读取数据并解析关节动作。 Returns: dict: 包含关节数据的字典。 """ self.serial_conn.write(self.bytes_to_send) time.sleep(0.02) bytes_received = self.serial_conn.read(self.serial_conn.inWaiting()) hex_received = binascii.hexlify(bytes_received).decode("utf-8").upper() # print("joints:", hex_received) actions = self._parse_joint_data(hex_received) return actions def get_controller_status(self): bytes_to_send = binascii.unhexlify(self.control_hex_data.replace(" ", "")) self.serial_conn.write(bytes_to_send) time.sleep(0.02) bytes_received = self.serial_conn.read(self.serial_conn.inWaiting()) hex_received = binascii.hexlify(bytes_received).decode("utf-8").upper() # print("control status:", hex_received) status = self._parse_controller_data(hex_received) return status def set_gripper_action(self, action): """设置夹爪动作。 Args: action (int): 夹爪动作值。 """ action = int(action * 1000) action_bytes = action.to_bytes(4, byteorder="little", signed=True) self.bytes_to_send = self.bytes_to_send[:74] + action_bytes + self.bytes_to_send[78:] if __name__ == "__main__": servo_arm = ServoArm("/home/maic/LYT/lerobot/realman_src/servo_arm.yaml") while True: joint_actions = servo_arm.get_joint_actions() logging.info(joint_actions) # time.sleep(0.02) status = servo_arm.get_controller_status() logging.info(status) time.sleep(0.5)