diff --git a/lerobot/common/robot_devices/motors/realman_dual.py b/lerobot/common/robot_devices/motors/realman_dual.py index 2d2f04d9..a83d7590 100644 --- a/lerobot/common/robot_devices/motors/realman_dual.py +++ b/lerobot/common/robot_devices/motors/realman_dual.py @@ -88,7 +88,7 @@ class RealmanDualMotorsBus: self._thread_lock = threading.Lock() # 添加读取相关的线程控制 - self._state_cache = {} + self._state_cache = {"joint": {}, "pose": {}} self._cache_lock = threading.Lock() self._keep_reading = True @@ -97,21 +97,12 @@ class RealmanDualMotorsBus: def _start_background_readers(self): """启动后台读取线程""" - # 左臂读取线程 + # 读取线程 threading.Thread( - target=self._left_read_task, + target=self._read_task, daemon=True, - name="left_arm_reader" + name="arm_reader" ).start() - - # 右臂读取线程 - threading.Thread( - target=self._right_read_task, - daemon=True, - name="right_arm_reader" - ).start() - - @property def motor_names(self) -> list[str]: @@ -134,27 +125,24 @@ class RealmanDualMotorsBus: except Exception as e: raise TimeoutError(f"操作超时: {e}") - def _left_read_task(self): + def _read_task(self): """左臂后台读取任务 - 模仿_left_slow_task的风格""" while self._keep_reading: try: left_state = self._read_arm_state(self.left_rmarm, "left") with self._cache_lock: - self._state_cache.update(left_state) + self._state_cache["joint"].update(left_state["joint"]) + self._state_cache["pose"].update(left_state["pose"]) except Exception as e: print(f"左臂读取失败: {e}") - time.sleep(0.005) - def _right_read_task(self): - """右臂后台读取任务 - 模仿_right_slow_task的风格""" - while self._keep_reading: try: right_state = self._read_arm_state(self.right_rmarm, "right") with self._cache_lock: - self._state_cache.update(right_state) + self._state_cache["joint"].update(right_state["joint"]) + self._state_cache["pose"].update(right_state["pose"]) except Exception as e: print(f"右臂读取失败: {e}") - time.sleep(0.005) def _read_arm_state(self, arm: RoboticArm, prefix: str) -> dict: """读取单臂状态 - 保持原有逻辑""" @@ -163,12 +151,23 @@ class RealmanDualMotorsBus: joint_state = joint_msg['joint'] gripper_state = gripper_msg['actpos'] + pose_state = joint_msg['pose'] - state_dict = {} - for i in range(6): - state_dict[f"{prefix}_joint_{i+1}"] = joint_state[i] / 180 - state_dict[f"{prefix}_gripper"] = (gripper_state - 500) / 500 - return state_dict + joint_state_dict = {} + for i in range(len(joint_state)): + joint_state_dict[f"{prefix}_joint_{i+1}"] = joint_state[i] + joint_state_dict[f"{prefix}_gripper"] = gripper_state + + pose_state_dict = { + f"{prefix}_x": pose_state[0], + f"{prefix}_y": pose_state[1], + f"{prefix}_z": pose_state[2], + f"{prefix}_rx": pose_state[3], + f"{prefix}_ry": pose_state[4], + f"{prefix}_rz": pose_state[5], + } + + return {"joint": joint_state_dict, 'pose': pose_state_dict} def _move_to_initial_position(self): """移动到初始位置""" @@ -189,10 +188,6 @@ class RealmanDualMotorsBus: if len(joints) != expected_count: raise ValueError(f"关节数量不匹配: 期望 {expected_count}, 实际 {len(joints)}") - def _extract_joint_state(self, state: dict, arm_prefix: str) -> list: - """提取关节状态""" - return [state[v] * 180 for v in state if f"{arm_prefix}" in v] - def _execute_slow_movement(self, arm: str, joint_data: list): """执行慢速运动""" busy_flag = f"{arm}_slow_busy" @@ -256,7 +251,7 @@ class RealmanDualMotorsBus: self.right_rmarm.rm_movej_canfd(right_joint, False) def write_endpose_canfd(self, target_endpose: list): - assert target_endpose == 12, "the length of target pose is not equal 12" + assert len(target_endpose) == 12, "the length of target pose is not equal 12" self.left_rmarm.rm_movep_canfd(target_endpose[:6], False) self.right_rmarm.rm_movep_canfd(target_endpose[6:], False) @@ -314,28 +309,13 @@ class RealmanDualMotorsBus: right_joints = target_joint[self.left_offset+1:-1] right_gripper = target_joint[-1] - self.left_rmarm.rm_movej_follow(left_joints) - self.left_rmarm.rm_set_gripper_position(left_gripper, block=False, timeout=2) - self.right_rmarm.rm_movej_follow(right_joints) - self.right_rmarm.rm_set_gripper_position(right_gripper, block=False, timeout=2) + self.left_rmarm.rm_movej_canfd(left_joints, follow=False) + # self.left_rmarm.rm_movej_follow(left_joints) + # self.left_rmarm.rm_set_gripper_position(left_gripper, block=False, timeout=2) + self.right_rmarm.rm_movej_canfd(right_joints, follow=False) + # self.right_rmarm.rm_movej_follow(right_joints) + # self.right_rmarm.rm_set_gripper_position(right_gripper, block=False, timeout=2) - def write_action(self, action: dict, state: dict): - # 提取状态数据 - follow_left_joint = self._extract_joint_state(state, "left_joint") - follow_right_joint = self._extract_joint_state(state, "right_joint") - - # 提取动作数据 - master_left_joint = action['left_joint_actions'] - master_right_joint = action['right_joint_actions'] - left_gripper = int(action['left_gripper_actions'] * 1000) - right_gripper = int(action['right_gripper_actions'] * 1000) - # 执行夹爪动作 - self.write_dual_gripper(left_gripper, right_gripper) - - # 执行关节动作 - self._execute_arm_action('left', action, master_left_joint, follow_left_joint) - self._execute_arm_action('right', action, master_right_joint, follow_right_joint) - def read(self) -> Dict: """读取机械臂状态 - 直接从缓存获取""" with self._cache_lock: diff --git a/lerobot/common/robot_devices/robots/configs.py b/lerobot/common/robot_devices/robots/configs.py index 74e6f853..9cb5b7f3 100644 --- a/lerobot/common/robot_devices/robots/configs.py +++ b/lerobot/common/robot_devices/robots/configs.py @@ -755,7 +755,8 @@ class RealmanDualRobotConfig(RobotConfig): max_gripper: int = 990 min_gripper: int = 10 servo_config_file: str = "/home/maic/LYT/lerobot/lerobot/common/robot_devices/teleop/servo_dual.yaml" - + left_end_control_guid: str = '0300b14bff1100003708000010010000' + right_end_control_guid: str = '0300509d5e040000120b000009050000' follower_arm: dict[str, MotorsBusConfig] = field( default_factory=lambda: { diff --git a/lerobot/common/robot_devices/robots/realman_dual.py b/lerobot/common/robot_devices/robots/realman_dual.py index 3d7c3fba..dd50f3dc 100644 --- a/lerobot/common/robot_devices/robots/realman_dual.py +++ b/lerobot/common/robot_devices/robots/realman_dual.py @@ -33,15 +33,6 @@ class RealmanDualRobot: self.piper_motors = make_motors_buses_from_configs(self.config.follower_arm) self.arm = self.piper_motors['main'] - # build init teleop info - self.init_info = { - 'init_joint': self.arm.init_joint_position, - 'init_pose': self.arm.init_pose, - 'max_gripper': config.max_gripper, - 'min_gripper': config.min_gripper, - 'servo_config_file': config.servo_config_file - } - # 初始化遥操作 self._initialize_teleop() # init state @@ -54,7 +45,8 @@ class RealmanDualRobot: 'init_pose': self.arm.init_pose, 'max_gripper': self.config.max_gripper, 'min_gripper': self.config.min_gripper, - 'servo_config_file': self.config.servo_config_file + 'servo_config_file': self.config.servo_config_file, + 'end_control_info': {'left': self.config.left_end_control_guid , 'right': self.config.right_end_control_guid} } if not self.inference_time: @@ -72,17 +64,26 @@ class RealmanDualRobot: def _read_robot_state(self) -> dict: """读取机器人状态""" before_read_t = time.perf_counter() - state = self.arm.read() + from copy import deepcopy + state = deepcopy(self.arm.read()) self.logs["read_pos_dt_s"] = time.perf_counter() - before_read_t return state def _execute_action(self, action: dict, state: dict): """执行动作""" before_write_t = time.perf_counter() - + if action['control_mode'] == 'joint': - self.arm.write_action(action, state) - # 可以添加其他控制模式的处理 + # self.arm.write_action(action, state) + pass + else: + if list(action['pose'].values()) != list(state['pose'].values()): + pose = list(action['pose'].values()) + self.arm.write_endpose_canfd(pose) + + elif list(action['joint'].values()) != list(state['joint'].values()): + target_joint = list(action['joint'].values()) + self.arm.write(target_joint) self.logs["write_pos_dt_s"] = time.perf_counter() - before_write_t @@ -104,8 +105,10 @@ class RealmanDualRobot: return obs_dict, action_dict def _update_state_queue(self): + # import pdb + # pdb.set_trace() """更新状态队列""" - current_state = list(self.arm.read().values()) + current_state = list(self.arm.read()['joint'].values()) self.joint_queue.append(current_state) def _capture_images(self) -> Dict[str, torch.Tensor]: @@ -224,12 +227,17 @@ class RealmanDualRobot: # 读取当前状态 state = self._read_robot_state() # 获取动作 - action = self.teleop.get_action() + action = self.teleop.get_action(state) + # print(state['pose']) + # print(action['pose']) # 执行动作 - # self._execute_action(action, state) + # import pdb + # pdb.set_trace() + self._execute_action(action, state) # 更新状态队列 self._update_state_queue() + time.sleep(0.02) if record_data: data = self._prepare_record_data() return data diff --git a/lerobot/common/robot_devices/teleop/find_gamepad.py b/lerobot/common/robot_devices/teleop/find_gamepad.py new file mode 100644 index 00000000..771dab8b --- /dev/null +++ b/lerobot/common/robot_devices/teleop/find_gamepad.py @@ -0,0 +1,18 @@ +import pygame + +def find_controller_index(): + # 获取所有 pygame 控制器的设备路径 + pygame_joysticks = {} + for i in range(pygame.joystick.get_count()): + joystick = pygame.joystick.Joystick(i) + joystick.init() + pygame_joysticks[joystick.get_guid()] = { + 'index': i, + 'device_name': joystick.get_name() + } + + return pygame_joysticks + + +if __name__ == '__main__': + print(find_controller_index()) \ No newline at end of file diff --git a/lerobot/common/robot_devices/teleop/gamepad.py b/lerobot/common/robot_devices/teleop/gamepad.py index 704b924a..1a6a5a8a 100644 --- a/lerobot/common/robot_devices/teleop/gamepad.py +++ b/lerobot/common/robot_devices/teleop/gamepad.py @@ -2,14 +2,135 @@ import pygame import threading import time import logging -import pyudev +from typing import Dict +from dataclasses import dataclass +from .find_gamepad import find_controller_index +from .servo_server import ServoArmServer + + +class RealmanAlohaMaster: + def __init__(self, config): + self.config = config + self._initialize_master_arm() + + def _initialize_master_arm(self): + """初始化主控臂""" + try: + self.master_dual_arm = ServoArmServer(self.config.config_file) + except Exception as e: + logging.error(f"初始化主控臂失败: {e}") + raise + + def get_action(self) -> Dict: + """获取控制动作""" + try: + master_joint_actions = self.master_dual_arm.get_joint_data() + return self._format_action(master_joint_actions) + except Exception as e: + logging.error(f"获取动作失败: {e}") + + def _format_action(self, master_joint_actions: dict) -> dict: + """格式化动作数据""" + master_controller_status = { + 'left': master_joint_actions['left_controller_status'], + 'right': master_joint_actions['right_controller_status'] + } + + return { + 'control_mode': 'joint', + 'master_joint_actions': master_joint_actions['dual_joint_actions'], + 'left_joint_actions': master_joint_actions['left_joint_actions'][:-1], + 'right_joint_actions': master_joint_actions['right_joint_actions'][:-1], + 'left_gripper_actions': master_joint_actions['left_joint_actions'][-1], # 修复bug + 'right_gripper_actions': master_joint_actions['right_joint_actions'][-1], + 'master_controller_status': master_controller_status + } + + def stop(self): + """停止控制器""" + try: + if hasattr(self, 'master_dual_arm') and self.master_dual_arm: + self.master_dual_arm.shutdown() + print("混合控制器已退出") + except Exception as e: + logging.error(f"停止控制器失败: {e}") + + + +class DummyEndposeMaster: + def __init__(self, config): + # 初始化pygame + pygame.init() + pygame.joystick.init() + # 获取所有 USB 游戏控制器的信息 + self.joysticks = find_controller_index() + print(self.joysticks) + self.control_info = config.end_control_info + left_stick = self._init_stick('left') + right_stick = self._init_stick('right') + self.controllers = [left_stick, right_stick] + + def _init_stick(self, arm_name:str): + stick_info = {} + stick_info['index'] = self.joysticks[self.control_info[arm_name]]['index'] + stick_info['guid'] = self.control_info[arm_name] + stick_info['name'] = f'{arm_name}' + device_name = self.joysticks[self.control_info[arm_name]]['device_name'] + stick = XboxStick(stick_info) if "Xbox" in device_name else FlightStick(stick_info) + stick.start_polling() + return stick + + def get_action(self, state) -> Dict: + from copy import deepcopy + + new_state = deepcopy(state) + gamepad_action = {} + xyz = [] + rxryrz = [] + gripper = [] + """获取控制动作""" + try: + for i, controller in enumerate(self.controllers): + # states = controller.get_raw_states() + gamepad_action.update(controller.get_control_signal(controller.name)) + xyz += [f"{controller.name}_x", f"{controller.name}_y", f"{controller.name}_z"] + rxryrz += [f"{controller.name}_joint_4", f"{controller.name}_joint_5", f"{controller.name}_joint_6"] + gripper += [f"{controller.name}_gripper"] + + for name in xyz: + new_state['pose'][name] += (gamepad_action[name] * gamepad_action['xyz_vel'] * gamepad_action[name.split('_')[0]+'_ratio']) + + for name in gripper: + new_state['joint'][name] += int(gamepad_action[name] * gamepad_action['gripper_vel'] * gamepad_action[name.split('_')[0]+'_ratio']) + new_state['joint'][name] = min(990, max(0, new_state['joint'][name])) + + for name in rxryrz: + new_state['joint'][name] += (gamepad_action[name] * gamepad_action['rxyz_vel'] * gamepad_action[name.split('_')[0]+'_ratio']) + + new_state['control_mode'] = 'endpose' + return new_state + + except Exception as e: + logging.error(f"获取动作失败: {e}") + + def stop(self): + """停止控制器""" + try: + # 停止轮询线程 + for controller in self.controllers: + controller.stop_polling() + except Exception as e: + logging.error(f"停止控制器失败: {e}") + class ControllerBase: - def __init__(self, joystick_index): + def __init__(self, joystick_info: dict): # 初始化手柄对象 - self.joystick = pygame.joystick.Joystick(joystick_index) + self.joystick = pygame.joystick.Joystick(joystick_info['index']) self.joystick.init() + self.name = joystick_info['name'] + self.guid = joystick_info['guid'] # 存储所有控制器状态的字典 self.states = { @@ -22,6 +143,10 @@ class ControllerBase: self.deadzone = 0.15 # validzone self.validzone = 0.05 + self.ratio = 1 + self.gripper_vel = 50 + self.rxyz_vel = 5 + self.xyz_vel = 0.02 # 线程控制标志 self.running = False @@ -65,8 +190,8 @@ class ControllerBase: return self.states class FlightStick(ControllerBase): - def __init__(self, joystick_index): - super().__init__(joystick_index) + def __init__(self, joystick_info): + super().__init__(joystick_info) def get_x_control_signal(self): x = 0 @@ -74,18 +199,14 @@ class FlightStick(ControllerBase): x = 1 elif self.states['axes'][0] < -self.validzone: x = -1 - else: - x = 0 return x def get_y_control_signal(self): y = 0 if self.states['axes'][1] > self.validzone: - y = 1 - elif self.states['axes'][1] < -self.validzone: y = -1 - else: - y = 0 + elif self.states['axes'][1] < -self.validzone: + y = 1 return y def get_z_control_signal(self): @@ -94,8 +215,6 @@ class FlightStick(ControllerBase): z = 1 elif self.states['buttons'][1]: z = -1 - else: - z = 0 return z def get_gripper_control_signal(self): @@ -106,22 +225,20 @@ class FlightStick(ControllerBase): gripper = -1 return gripper - def get_vel_control_signal(self): - vel = self.deadzone + def get_ratio_control_signal(self): + ratio = self.ratio if self.states['axes'][2] > 0.8: - vel = self.deadzone * 2 + ratio = self.ratio / 5 elif self.states['axes'][2] < -0.8: - vel = self.deadzone / 5 - else: - vel = self.deadzone - return vel + ratio = self.ratio * 2 + return ratio def get_rx_control_signal(self): rx = 0 if self.states['hats'][0][0] == -1: - rx = -1 - elif self.states['hats'][0][0] == 1: rx = 1 + elif self.states['hats'][0][0] == 1: + rx = -1 else: rx = 0 return rx @@ -129,9 +246,9 @@ class FlightStick(ControllerBase): def get_ry_control_signal(self): ry = 0 if self.states['hats'][0][1] == 1: - ry = 1 - elif self.states['hats'][0][1] == -1: ry = -1 + elif self.states['hats'][0][1] == -1: + ry = 1 else: ry = 0 return ry @@ -146,59 +263,63 @@ class FlightStick(ControllerBase): rz = 0 return rz - def get_control_signal(self): - return{ - 'x': self.get_x_control_signal(), - 'y': self.get_y_control_signal(), - 'z': self.get_z_control_signal(), - 'rx': self.get_rx_control_signal(), - 'ry': self.get_ry_control_signal(), - 'rz': self.get_rz_control_signal(), - 'gripper': self.get_gripper_control_signal(), - 'vel': self.get_vel_control_signal() + def get_control_signal(self, prefix: str = ""): + """获取所有控制信号""" + return { + f'{prefix}_x': self.get_x_control_signal(), + f'{prefix}_y': self.get_y_control_signal(), + f'{prefix}_z': self.get_z_control_signal(), + f'{prefix}_joint_4': self.get_rx_control_signal(), + f'{prefix}_joint_5': self.get_ry_control_signal(), + f'{prefix}_joint_6': self.get_rz_control_signal(), + f'{prefix}_gripper': self.get_gripper_control_signal(), + f'{prefix}_ratio': self.get_ratio_control_signal(), + 'gripper_vel': self.gripper_vel, + 'rxyz_vel': self.rxyz_vel, + 'xyz_vel': self.xyz_vel } -class XboxStick: - def __init__(self, joystick_index): - super().__init__(joystick_index) +class XboxStick(ControllerBase): + def __init__(self, joystick_info: dict): + super().__init__(joystick_info) def get_x_control_signal(self): """获取 X 轴控制信号""" x = 0 if self.states['hats'][0][0] == -1: - x = -1 - elif self.states['hats'][0][0] == 1: x = 1 + elif self.states['hats'][0][0] == 1: + x = -1 return x def get_y_control_signal(self): """获取 Y 轴控制信号""" y = 0 if self.states['hats'][0][1] == 1: - y = 1 - elif self.states['hats'][0][1] == -1: y = -1 + elif self.states['hats'][0][1] == -1: + y = 1 return y def get_z_control_signal(self): """获取 Z 轴控制信号""" z = 0 if self.states['axes'][4] > self.deadzone: # A 按钮 - z = 1 - elif self.states['axes'][4] < -self.deadzone: # B 按钮 z = -1 + elif self.states['axes'][4] < -self.deadzone: # B 按钮 + z = 1 return z - def get_vel_control_signal(self): + def get_ratio_control_signal(self): """获取速度控制信号""" - vel = self.deadzone + ratio = self.ratio if self.states['axes'][2] > 0.8: # LT 按钮 - vel = self.deadzone * 2 + ratio = self.ratio * 2 elif self.states['axes'][5] > 0.8: # RT 按钮 - vel = self.deadzone / 5 - return vel + ratio = self.ratio / 5 + return ratio def get_gripper_control_signal(self): gripper = 0 @@ -212,9 +333,9 @@ class XboxStick: """获取 RX 轴控制信号""" rx = 0 if self.states['axes'][0] > self.deadzone: # 左舵 - rx = 1 - elif self.states['axes'][0] < -self.deadzone: # 右舵 rx = -1 + elif self.states['axes'][0] < -self.deadzone: # 右舵 + rx = 1 return rx def get_ry_control_signal(self): @@ -235,85 +356,64 @@ class XboxStick: rz = -1 return rz - def get_control_signal(self): + def get_control_signal(self, prefix: str = ""): """获取所有控制信号""" return { - 'x': self.get_x_control_signal(), - 'y': self.get_y_control_signal(), - 'z': self.get_z_control_signal(), - 'rx': self.get_rx_control_signal(), - 'ry': self.get_ry_control_signal(), - 'rz': self.get_rz_control_signal(), - 'gripper': self.get_gripper_control_signal(), - 'vel': self.get_vel_control_signal() + f'{prefix}_x': self.get_x_control_signal(), + f'{prefix}_y': self.get_y_control_signal(), + f'{prefix}_z': self.get_z_control_signal(), + f'{prefix}_joint_4': self.get_rx_control_signal(), + f'{prefix}_joint_5': self.get_ry_control_signal(), + f'{prefix}_joint_6': self.get_rz_control_signal(), + f'{prefix}_gripper': self.get_gripper_control_signal(), + f'{prefix}_ratio': self.get_ratio_control_signal(), + 'gripper_vel': self.gripper_vel, + 'rxyz_vel': self.rxyz_vel, + 'xyz_vel': self.xyz_vel } + + +@dataclass +class ControllerConfig: + """控制器配置""" + init_joint: list + init_pose: list + max_gripper: int + min_gripper: int + config_file: str + end_control_info: dict +def parse_init_info(init_info: dict) -> ControllerConfig: + """解析初始化信息""" + return ControllerConfig( + init_joint=init_info['init_joint'], + init_pose=init_info.get('init_pose', [0]*12), + max_gripper=init_info['max_gripper'], + min_gripper=init_info['min_gripper'], + config_file=init_info['servo_config_file'], + end_control_info=init_info['end_control_info'] + ) -def get_usb_joystick_info(): - """获取所有 USB 游戏控制器的信息""" - context = pyudev.Context() - devices = [] - for device in context.list_devices(subsystem='input', ID_INPUT_JOYSTICK=1): - # 获取设备路径(如 /dev/input/js0) - device_path = device.device_node - if device_path: - # 获取 VID、PID 和 UUID(ID_SERIAL) - vid = device.get('ID_VENDOR_ID') - pid = device.get('ID_MODEL_ID') - uuid = device.get('ID_SERIAL_SHORT') # 设备的唯一标识符 - if vid and pid and uuid: - devices.append({ - 'path': device_path, - 'vid': int(vid, 16), # 转换为十六进制 - 'pid': int(pid, 16), # 转换为十六进制 - 'uuid': uuid # 设备的唯一标识符 - }) - return devices - - -def match_controller_index(joystick_info): - """将 USB 设备信息与 pygame 的控制器索引匹配""" - # 获取所有 pygame 控制器的设备路径 - pygame_joysticks = [] - for i in range(pygame.joystick.get_count()): - joystick = pygame.joystick.Joystick(i) - joystick.init() - pygame_joysticks.append({ - 'index': i, - 'name': joystick.get_name(), - 'guid': joystick.get_guid() - }) - # 匹配设备路径 - for usb_device in joystick_info: - for pygame_joystick in pygame_joysticks: - if usb_device['path'] in pygame_joystick['guid']: - usb_device['pygame_index'] = pygame_joystick['index'] - break - return joystick_info if __name__ == "__main__": - # 初始化pygame - pygame.init() - pygame.joystick.init() - # 获取所有 USB 游戏控制器的信息 - usb_joysticks = get_usb_joystick_info() - matched_joysticks = match_controller_index(usb_joysticks) - print(usb_joysticks) - print(matched_joysticks) - - - # stick = FlightStick() - # # stick = XboxStick() - # stick.start_polling() - - # try: - # while True: - # # 主程序可以继续执行其他任务 - # states = stick.get_raw_states() - # # states = stick.get_control_signal() - # print("当前状态:", states) - # time.sleep(1) - # except KeyboardInterrupt: - # stick.stop_polling() + config = { + 'init_joint': {'joint': [-170, 90, 0, 90, 120, 0, 10, 170, 90, 0, -90, 120, 0, 10]}, + 'init_pose': {}, + 'max_gripper': {}, + 'min_gripper': {}, + 'servo_config_file': {}, + 'end_control_info': {'left': "0300b14bff1100003708000010010000" , 'right': '0300509d5e040000120b000009050000'} + } + config = parse_init_info(config) + endpose_arm = DummyEndposeMaster(config) + while True: + gamepad_action = {} + xyz = [] + for i, controller in enumerate(endpose_arm.controllers): + # states = controller.get_raw_states() + gamepad_action.update(controller.get_control_signal(controller.name)) + xyz += [f"{controller.name}_x", f"{controller.name}_y", f"{controller.name}_z"] + time.sleep(1) + print(gamepad_action) \ No newline at end of file diff --git a/lerobot/common/robot_devices/teleop/realman_aloha_dual.py b/lerobot/common/robot_devices/teleop/realman_aloha_dual.py index 6b61cdda..1bb43728 100644 --- a/lerobot/common/robot_devices/teleop/realman_aloha_dual.py +++ b/lerobot/common/robot_devices/teleop/realman_aloha_dual.py @@ -1,14 +1,8 @@ -import pygame -import threading import time -import serial -import binascii import logging -import yaml from typing import Dict from dataclasses import dataclass -from Robotic_Arm.rm_robot_interface import * -from .servo_server import ServoArmServer +from .gamepad import RealmanAlohaMaster, DummyEndposeMaster @dataclass @@ -19,7 +13,7 @@ class ControllerConfig: max_gripper: int min_gripper: int config_file: str - end_controller: str = None + end_control_info: dict class HybridController: @@ -27,9 +21,9 @@ class HybridController: self.config = self._parse_init_info(init_info) self.joint = self.config.init_joint.copy() self.pose = self.config.init_pose.copy() - self.joint_control_mode = True - - self._initialize_master_arm() + + self.joint_arm = RealmanAlohaMaster(self.config) + self.endpose_arm = DummyEndposeMaster(self.config) def _parse_init_info(self, init_info: dict) -> ControllerConfig: """解析初始化信息""" @@ -39,57 +33,21 @@ class HybridController: max_gripper=init_info['max_gripper'], min_gripper=init_info['min_gripper'], config_file=init_info['servo_config_file'], - end_controller=init_info['end_controller'] + end_control_info=init_info['end_control_info'] ) - def _initialize_master_arm(self): - """初始化主控臂""" - try: - self.master_dual_arm = ServoArmServer(self.config.config_file) - except Exception as e: - logging.error(f"初始化主控臂失败: {e}") - raise - - def get_action(self) -> Dict: + def get_action(self, state) -> Dict: """获取控制动作""" try: - master_joint_actions = self.master_dual_arm.get_joint_data() - return self._format_action(master_joint_actions) + endpose_action = self.endpose_arm.get_action(state) + return endpose_action + # return self.joint_arm.get_action() + except Exception as e: logging.error(f"获取动作失败: {e}") - def _format_action(self, master_joint_actions: dict) -> dict: - """格式化动作数据""" - master_controller_status = { - 'left': master_joint_actions['left_controller_status'], - 'right': master_joint_actions['right_controller_status'] - } - - return { - 'control_mode': 'joint' if self.joint_control_mode else 'end_pose', - 'master_joint_actions': master_joint_actions['dual_joint_actions'], - 'left_joint_actions': master_joint_actions['left_joint_actions'][:-1], - 'right_joint_actions': master_joint_actions['right_joint_actions'][:-1], - 'left_gripper_actions': master_joint_actions['left_joint_actions'][-1], # 修复bug - 'right_gripper_actions': master_joint_actions['right_joint_actions'][-1], - 'master_controller_status': master_controller_status, - 'end_pose': self.pose - } - - def switch_control_mode(self): - """切换控制模式""" - self.joint_control_mode = not self.joint_control_mode - mode = "关节" if self.joint_control_mode else "末端" - print(f"切换到{mode}控制模式") - def stop(self): - """停止控制器""" - try: - if hasattr(self, 'master_dual_arm') and self.master_dual_arm: - self.master_dual_arm.shutdown() - print("混合控制器已退出") - except Exception as e: - logging.error(f"停止控制器失败: {e}") + self.joint_arm.stop() def reset(self): """重置控制器""" @@ -97,6 +55,7 @@ class HybridController: self.pose = self.config.init_pose.copy() self.joint_control_mode = True + # 使用示例 if __name__ == "__main__": init_info = { @@ -104,7 +63,8 @@ if __name__ == "__main__": 'init_pose': [[-0.0305, 0.125938, 0.13153, 3.141, 0.698, -1.57, -0.030486, -0.11487, 0.144707, 3.141, 0.698, 1.57]], 'max_gripper': 990, 'min_gripper': 10, - 'servo_config_file': '/home/maic/LYT/lerobot/lerobot/common/robot_devices/teleop/servo_dual.yaml' + 'servo_config_file': '/home/maic/LYT/lerobot/lerobot/common/robot_devices/teleop/servo_dual.yaml', + 'end_control_info': {'left': '0300b14bff1100003708000010010000', 'right': '030003f05e0400008e02000010010000'} } arm_controller = HybridController(init_info) time.sleep(1)