commit ffda98edbccf308d9d52b1aa9e09b2d4b5a5758d Author: yutang Date: Fri Jun 27 21:45:44 2025 +0800 first update diff --git a/src/__pycache__/realman_shadow_arm.cpython-310.pyc b/src/__pycache__/realman_shadow_arm.cpython-310.pyc new file mode 100644 index 0000000..bcfcf55 Binary files /dev/null and b/src/__pycache__/realman_shadow_arm.cpython-310.pyc differ diff --git a/src/client.py b/src/client.py new file mode 100644 index 0000000..aca2bd3 --- /dev/null +++ b/src/client.py @@ -0,0 +1,126 @@ +import zerorpc +import time +import logging +import json + +logging.basicConfig( + level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" +) + +class ServoArmClient: + def __init__(self, server_address="tcp://localhost:4242"): + """初始化客户端连接""" + self.server_address = server_address + self.client = zerorpc.Client() + self.client.connect(server_address) + logging.info(f"连接到服务器: {server_address}") + + def test_connection(self): + """测试连接""" + try: + response = self.client.ping() + logging.info(f"连接测试: {response}") + return True + except Exception as e: + logging.error(f"连接测试失败: {e}") + return False + + def get_joint_data(self): + """获取所有关节数据""" + try: + data = self.client.get_joint_data() + return data + except Exception as e: + logging.error(f"获取关节数据失败: {e}") + return None + + def get_left_joint_actions(self): + """获取左机械臂数据""" + try: + data = self.client.get_left_joint_actions() + return data + except Exception as e: + logging.error(f"获取左机械臂数据失败: {e}") + return None + + def get_right_joint_actions(self): + """获取右机械臂数据""" + try: + data = self.client.get_right_joint_actions() + return data + except Exception as e: + logging.error(f"获取右机械臂数据失败: {e}") + return None + + def get_controller_status(self, arm='left'): + """获取控制器状态""" + try: + status = self.client.get_controller_status(arm) + return status + except Exception as e: + logging.error(f"获取控制器状态失败: {e}") + return None + + def get_connection_status(self): + """获取连接状态""" + try: + status = self.client.get_connection_status() + return status + except Exception as e: + logging.error(f"获取连接状态失败: {e}") + return None + + def close(self): + """关闭客户端连接""" + self.client.close() + logging.info("客户端连接已关闭") + + +def main(): + # 创建客户端 + client = ServoArmClient() + + # 测试连接 + if not client.test_connection(): + logging.error("无法连接到服务器") + return + + # 获取连接状态 + status = client.get_connection_status() + if status: + logging.info(f"连接状态: {json.dumps(status, indent=2)}") + + try: + # 持续获取数据 + logging.info("开始获取机械臂数据,按Ctrl+C停止...") + + while True: + print("\n" + "="*50) + + # 获取所有数据 + joint_data = client.get_joint_data() + if joint_data: + print(f"时间戳: {joint_data['timestamp']}") + print(f"左机械臂数据: {json.dumps(joint_data['left_joint_actions'], indent=2)}") + print(f"右机械臂数据: {json.dumps(joint_data['right_joint_actions'], indent=2)}") + + # 获取控制器状态 + left_controller = client.get_controller_status('left') + right_controller = client.get_controller_status('right') + + if left_controller: + print(f"左控制器状态: {json.dumps(left_controller, indent=2)}") + if right_controller: + print(f"右控制器状态: {json.dumps(right_controller, indent=2)}") + + # time.sleep(1) # 1Hz显示频率 + + except KeyboardInterrupt: + logging.info("收到中断信号,正在关闭客户端...") + finally: + client.close() + + + +if __name__ == "__main__": + main() diff --git a/src/realman_shadow_arm.py b/src/realman_shadow_arm.py new file mode 100644 index 0000000..0142348 --- /dev/null +++ b/src/realman_shadow_arm.py @@ -0,0 +1,161 @@ +import threading +import time +import serial +import binascii +import logging +import yaml +from typing import Dict + +logging.basicConfig( + level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" +) + +class ServoArm: + def __init__(self, config_file="config.yaml", port_name="left_port"): + """初始化机械臂的串口连接并发送初始数据。 + + Args: + config_file (str): 配置文件的路径。 + """ + self.config = self._load_config(config_file) + self.port = self.config[port_name] + self.baudrate = self.config["baudrate"] + self.joint_hex_data = self.config["joint_hex_data"] + self.control_hex_data = self.config["control_hex_data"] + self.arm_axis = self.config.get("arm_axis", 7) + + try: + self.serial_conn = serial.Serial(self.port, self.baudrate, timeout=0) + self.bytes_to_send = binascii.unhexlify(self.joint_hex_data.replace(" ", "")) + self.serial_conn.write(self.bytes_to_send) + time.sleep(1) + self.connected = True + logging.info(f"串口连接成功: {self.port}") + except Exception as e: + logging.error(f"串口连接失败: {e}") + self.connected = False + + def _load_config(self, config_file): + """加载配置文件。 + + Args: + config_file (str): 配置文件的路径。 + + Returns: + dict: 配置文件内容。 + """ + try: + with open(config_file, "r") as file: + config = yaml.safe_load(file) + return config + except Exception as e: + logging.error(f"配置文件加载失败: {e}") + # 返回默认配置 + return { + "port": "/dev/ttyUSB0", + "baudrate": 460800, + "joint_hex_data": "55 AA 02 00 00 67", + "control_hex_data": "55 AA 08 00 00 B9", + "arm_axis": 6 + } + + def _bytes_to_signed_int(self, byte_data): + """将字节数据转换为有符号整数。 + + Args: + byte_data (bytes): 字节数据。 + + Returns: + int: 有符号整数。 + """ + return int.from_bytes(byte_data, byteorder="little", signed=True) + + def _parse_joint_data(self, hex_received): + """解析接收到的十六进制数据并提取关节数据。 + + Args: + hex_received (str): 接收到的十六进制字符串数据。 + + Returns: + dict: 解析后的关节数据。 + """ + logging.debug(f"hex_received: {hex_received}") + joints = {} + for i in range(self.arm_axis): + start = 14 + i * 10 + end = start + 8 + joint_hex = hex_received[start:end] + joint_byte_data = bytearray.fromhex(joint_hex) + joint_value = self._bytes_to_signed_int(joint_byte_data) / 10000.0 + joints[f"joint_{i+1}"] = joint_value + grasp_start = 14 + self.arm_axis*10 + grasp_hex = hex_received[grasp_start:grasp_start+8] + grasp_byte_data = bytearray.fromhex(grasp_hex) + # 夹爪进行归一化处理 + grasp_value = self._bytes_to_signed_int(grasp_byte_data)/1000 + + joints["grasp"] = grasp_value + return joints + + def _parse_controller_data(self, hex_received): + status = { + 'infrared': 0, + 'button': 0 + } + if len(hex_received) == 18: + status['infrared'] = self._bytes_to_signed_int(bytearray.fromhex(hex_received[12:14])) + status['button'] = self._bytes_to_signed_int(bytearray.fromhex(hex_received[14:16])) + # print(infrared) + return status + + def get_joint_actions(self): + """从串口读取数据并解析关节动作。 + + Returns: + dict: 包含关节数据的字典。 + """ + if not self.connected: + return {} + + try: + self.serial_conn.write(self.bytes_to_send) + time.sleep(0.02) + bytes_received = self.serial_conn.read(self.serial_conn.inWaiting()) + if len(bytes_received) == 0: + return {} + + hex_received = binascii.hexlify(bytes_received).decode("utf-8").upper() + actions = self._parse_joint_data(hex_received) + return actions + except Exception as e: + logging.error(f"读取串口数据错误: {e}") + return {} + + def get_controller_status(self): + bytes_to_send = binascii.unhexlify(self.control_hex_data.replace(" ", "")) + self.serial_conn.write(bytes_to_send) + time.sleep(0.02) + bytes_received = self.serial_conn.read(self.serial_conn.inWaiting()) + hex_received = binascii.hexlify(bytes_received).decode("utf-8").upper() + # print("control status:", hex_received) + status = self._parse_controller_data(hex_received) + return status + + def close(self): + """关闭串口连接""" + if self.connected and hasattr(self, 'serial_conn'): + self.serial_conn.close() + self.connected = False + logging.info("串口连接已关闭") + + +if __name__ == "__main__": + left_servo_arm = ServoArm("/home/maic/LYT/issacsim_zerorpc/src/servo_arm.yaml", "left_port") + right_servo_arm = ServoArm("/home/maic/LYT/issacsim_zerorpc/src/servo_arm.yaml", "right_port") + while True: + left_joint_actions = left_servo_arm.get_joint_actions() + right_joint_actions = right_servo_arm.get_joint_actions() + # print(joint_actions) + logging.info(left_joint_actions) + logging.info(right_joint_actions) + time.sleep(1) \ No newline at end of file diff --git a/src/servo_arm.yaml b/src/servo_arm.yaml new file mode 100644 index 0000000..e90e019 --- /dev/null +++ b/src/servo_arm.yaml @@ -0,0 +1,6 @@ +left_port: /dev/ttyUSB0 +right_port: /dev/ttyUSB1 +baudrate: 460800 +joint_hex_data: "55 AA 02 00 00 67" +control_hex_data: "55 AA 08 00 00 B9" +arm_axis: 6 diff --git a/src/zerorpc_server.py b/src/zerorpc_server.py new file mode 100644 index 0000000..960bc61 --- /dev/null +++ b/src/zerorpc_server.py @@ -0,0 +1,157 @@ +import threading +import time +import logging +import zerorpc +from realman_shadow_arm import ServoArm + +logging.basicConfig( + level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" +) + + +class ServoArmServer: + def __init__(self, config_file="/home/maic/LYT/issacsim_zerorpc/src/servo_arm.yaml"): + """初始化服务器,创建左右机械臂实例""" + self.config_file = config_file + self.left_servo_arm = None + self.right_servo_arm = None + self.running = False + self.data_lock = threading.Lock() + self.latest_data = { + 'left_joint_actions': {}, + 'right_joint_actions': {}, + 'timestamp': time.time() + } + + # 初始化机械臂 + self._initialize_arms() + # 启动数据采集线程 + self._start_data_collection() + + def _initialize_arms(self): + """初始化左右机械臂""" + try: + self.left_servo_arm = ServoArm(self.config_file, "left_port") + logging.info("左机械臂初始化成功") + except Exception as e: + logging.error(f"左机械臂初始化失败: {e}") + + try: + self.right_servo_arm = ServoArm(self.config_file, "right_port") + logging.info("右机械臂初始化成功") + except Exception as e: + logging.error(f"右机械臂初始化失败: {e}") + + def _start_data_collection(self): + """启动数据采集线程""" + self.running = True + self.data_thread = threading.Thread(target=self._data_collection_loop) + self.data_thread.daemon = True + self.data_thread.start() + logging.info("数据采集线程已启动") + + def _data_collection_loop(self): + """数据采集循环""" + while self.running: + try: + left_actions = {} + right_actions = {} + + # 获取左机械臂数据 + if self.left_servo_arm and self.left_servo_arm.connected: + left_actions = self.left_servo_arm.get_joint_actions() + + # 获取右机械臂数据 + if self.right_servo_arm and self.right_servo_arm.connected: + right_actions = self.right_servo_arm.get_joint_actions() + + # 更新最新数据 + with self.data_lock: + self.latest_data = { + 'left_joint_actions': left_actions, + 'right_joint_actions': right_actions, + 'timestamp': time.time() + } + + time.sleep(0.02) # 50Hz采集频率 + + except Exception as e: + logging.error(f"数据采集错误: {e}") + time.sleep(0.1) + + # ZeroRPC 服务方法 + def get_joint_data(self): + """获取最新的关节数据""" + with self.data_lock: + return self.latest_data.copy() + + def get_left_joint_actions(self): + """获取左机械臂关节数据""" + with self.data_lock: + return { + 'data': self.latest_data['left_joint_actions'], + 'timestamp': self.latest_data['timestamp'] + } + + def get_right_joint_actions(self): + """获取右机械臂关节数据""" + with self.data_lock: + return { + 'data': self.latest_data['right_joint_actions'], + 'timestamp': self.latest_data['timestamp'] + } + + def get_controller_status(self, arm='left'): + """获取控制器状态""" + try: + if arm == 'left' and self.left_servo_arm: + return self.left_servo_arm.get_controller_status() + elif arm == 'right' and self.right_servo_arm: + return self.right_servo_arm.get_controller_status() + else: + return {'error': f'Invalid arm: {arm}'} + except Exception as e: + logging.error(f"获取控制器状态错误: {e}") + return {'error': str(e)} + + def get_connection_status(self): + """获取连接状态""" + return { + 'left_connected': self.left_servo_arm.connected if self.left_servo_arm else False, + 'right_connected': self.right_servo_arm.connected if self.right_servo_arm else False, + 'server_running': self.running + } + + def ping(self): + """测试连接""" + return "pong" + + def shutdown(self): + """关闭服务器""" + logging.info("正在关闭服务器...") + self.running = False + + if self.left_servo_arm: + self.left_servo_arm.close() + if self.right_servo_arm: + self.right_servo_arm.close() + + return "Server shutdown" + +def main(): + server = ServoArmServer() + + # 创建ZeroRPC服务器 + s = zerorpc.Server(server) + s.bind("tcp://0.0.0.0:4242") + logging.info("ServoArm ZeroRPC服务器启动在 tcp://0.0.0.0:4242") + + try: + s.run() + except KeyboardInterrupt: + logging.info("收到中断信号,正在关闭服务器...") + server.shutdown() + s.close() + +if __name__ == "__main__": + main() \ No newline at end of file