realman xbox+flight controller
Some checks failed
Secret Leaks / trufflehog (push) Failing after 1m30s

This commit is contained in:
2025-07-13 15:42:27 +08:00
parent b3e9e11e11
commit 7c1699898b
6 changed files with 315 additions and 248 deletions

View File

@@ -88,7 +88,7 @@ class RealmanDualMotorsBus:
self._thread_lock = threading.Lock()
# 添加读取相关的线程控制
self._state_cache = {}
self._state_cache = {"joint": {}, "pose": {}}
self._cache_lock = threading.Lock()
self._keep_reading = True
@@ -97,21 +97,12 @@ class RealmanDualMotorsBus:
def _start_background_readers(self):
"""启动后台读取线程"""
# 左臂读取线程
# 读取线程
threading.Thread(
target=self._left_read_task,
target=self._read_task,
daemon=True,
name="left_arm_reader"
name="arm_reader"
).start()
# 右臂读取线程
threading.Thread(
target=self._right_read_task,
daemon=True,
name="right_arm_reader"
).start()
@property
def motor_names(self) -> list[str]:
@@ -134,27 +125,24 @@ class RealmanDualMotorsBus:
except Exception as e:
raise TimeoutError(f"操作超时: {e}")
def _left_read_task(self):
def _read_task(self):
"""左臂后台读取任务 - 模仿_left_slow_task的风格"""
while self._keep_reading:
try:
left_state = self._read_arm_state(self.left_rmarm, "left")
with self._cache_lock:
self._state_cache.update(left_state)
self._state_cache["joint"].update(left_state["joint"])
self._state_cache["pose"].update(left_state["pose"])
except Exception as e:
print(f"左臂读取失败: {e}")
time.sleep(0.005)
def _right_read_task(self):
"""右臂后台读取任务 - 模仿_right_slow_task的风格"""
while self._keep_reading:
try:
right_state = self._read_arm_state(self.right_rmarm, "right")
with self._cache_lock:
self._state_cache.update(right_state)
self._state_cache["joint"].update(right_state["joint"])
self._state_cache["pose"].update(right_state["pose"])
except Exception as e:
print(f"右臂读取失败: {e}")
time.sleep(0.005)
def _read_arm_state(self, arm: RoboticArm, prefix: str) -> dict:
"""读取单臂状态 - 保持原有逻辑"""
@@ -163,12 +151,23 @@ class RealmanDualMotorsBus:
joint_state = joint_msg['joint']
gripper_state = gripper_msg['actpos']
pose_state = joint_msg['pose']
state_dict = {}
for i in range(6):
state_dict[f"{prefix}_joint_{i+1}"] = joint_state[i] / 180
state_dict[f"{prefix}_gripper"] = (gripper_state - 500) / 500
return state_dict
joint_state_dict = {}
for i in range(len(joint_state)):
joint_state_dict[f"{prefix}_joint_{i+1}"] = joint_state[i]
joint_state_dict[f"{prefix}_gripper"] = gripper_state
pose_state_dict = {
f"{prefix}_x": pose_state[0],
f"{prefix}_y": pose_state[1],
f"{prefix}_z": pose_state[2],
f"{prefix}_rx": pose_state[3],
f"{prefix}_ry": pose_state[4],
f"{prefix}_rz": pose_state[5],
}
return {"joint": joint_state_dict, 'pose': pose_state_dict}
def _move_to_initial_position(self):
"""移动到初始位置"""
@@ -189,10 +188,6 @@ class RealmanDualMotorsBus:
if len(joints) != expected_count:
raise ValueError(f"关节数量不匹配: 期望 {expected_count}, 实际 {len(joints)}")
def _extract_joint_state(self, state: dict, arm_prefix: str) -> list:
"""提取关节状态"""
return [state[v] * 180 for v in state if f"{arm_prefix}" in v]
def _execute_slow_movement(self, arm: str, joint_data: list):
"""执行慢速运动"""
busy_flag = f"{arm}_slow_busy"
@@ -256,7 +251,7 @@ class RealmanDualMotorsBus:
self.right_rmarm.rm_movej_canfd(right_joint, False)
def write_endpose_canfd(self, target_endpose: list):
assert target_endpose == 12, "the length of target pose is not equal 12"
assert len(target_endpose) == 12, "the length of target pose is not equal 12"
self.left_rmarm.rm_movep_canfd(target_endpose[:6], False)
self.right_rmarm.rm_movep_canfd(target_endpose[6:], False)
@@ -314,28 +309,13 @@ class RealmanDualMotorsBus:
right_joints = target_joint[self.left_offset+1:-1]
right_gripper = target_joint[-1]
self.left_rmarm.rm_movej_follow(left_joints)
self.left_rmarm.rm_set_gripper_position(left_gripper, block=False, timeout=2)
self.right_rmarm.rm_movej_follow(right_joints)
self.right_rmarm.rm_set_gripper_position(right_gripper, block=False, timeout=2)
self.left_rmarm.rm_movej_canfd(left_joints, follow=False)
# self.left_rmarm.rm_movej_follow(left_joints)
# self.left_rmarm.rm_set_gripper_position(left_gripper, block=False, timeout=2)
self.right_rmarm.rm_movej_canfd(right_joints, follow=False)
# self.right_rmarm.rm_movej_follow(right_joints)
# self.right_rmarm.rm_set_gripper_position(right_gripper, block=False, timeout=2)
def write_action(self, action: dict, state: dict):
# 提取状态数据
follow_left_joint = self._extract_joint_state(state, "left_joint")
follow_right_joint = self._extract_joint_state(state, "right_joint")
# 提取动作数据
master_left_joint = action['left_joint_actions']
master_right_joint = action['right_joint_actions']
left_gripper = int(action['left_gripper_actions'] * 1000)
right_gripper = int(action['right_gripper_actions'] * 1000)
# 执行夹爪动作
self.write_dual_gripper(left_gripper, right_gripper)
# 执行关节动作
self._execute_arm_action('left', action, master_left_joint, follow_left_joint)
self._execute_arm_action('right', action, master_right_joint, follow_right_joint)
def read(self) -> Dict:
"""读取机械臂状态 - 直接从缓存获取"""
with self._cache_lock:

View File

@@ -755,7 +755,8 @@ class RealmanDualRobotConfig(RobotConfig):
max_gripper: int = 990
min_gripper: int = 10
servo_config_file: str = "/home/maic/LYT/lerobot/lerobot/common/robot_devices/teleop/servo_dual.yaml"
left_end_control_guid: str = '0300b14bff1100003708000010010000'
right_end_control_guid: str = '0300509d5e040000120b000009050000'
follower_arm: dict[str, MotorsBusConfig] = field(
default_factory=lambda: {

View File

@@ -33,15 +33,6 @@ class RealmanDualRobot:
self.piper_motors = make_motors_buses_from_configs(self.config.follower_arm)
self.arm = self.piper_motors['main']
# build init teleop info
self.init_info = {
'init_joint': self.arm.init_joint_position,
'init_pose': self.arm.init_pose,
'max_gripper': config.max_gripper,
'min_gripper': config.min_gripper,
'servo_config_file': config.servo_config_file
}
# 初始化遥操作
self._initialize_teleop()
# init state
@@ -54,7 +45,8 @@ class RealmanDualRobot:
'init_pose': self.arm.init_pose,
'max_gripper': self.config.max_gripper,
'min_gripper': self.config.min_gripper,
'servo_config_file': self.config.servo_config_file
'servo_config_file': self.config.servo_config_file,
'end_control_info': {'left': self.config.left_end_control_guid , 'right': self.config.right_end_control_guid}
}
if not self.inference_time:
@@ -72,17 +64,26 @@ class RealmanDualRobot:
def _read_robot_state(self) -> dict:
"""读取机器人状态"""
before_read_t = time.perf_counter()
state = self.arm.read()
from copy import deepcopy
state = deepcopy(self.arm.read())
self.logs["read_pos_dt_s"] = time.perf_counter() - before_read_t
return state
def _execute_action(self, action: dict, state: dict):
"""执行动作"""
before_write_t = time.perf_counter()
if action['control_mode'] == 'joint':
self.arm.write_action(action, state)
# 可以添加其他控制模式的处理
# self.arm.write_action(action, state)
pass
else:
if list(action['pose'].values()) != list(state['pose'].values()):
pose = list(action['pose'].values())
self.arm.write_endpose_canfd(pose)
elif list(action['joint'].values()) != list(state['joint'].values()):
target_joint = list(action['joint'].values())
self.arm.write(target_joint)
self.logs["write_pos_dt_s"] = time.perf_counter() - before_write_t
@@ -104,8 +105,10 @@ class RealmanDualRobot:
return obs_dict, action_dict
def _update_state_queue(self):
# import pdb
# pdb.set_trace()
"""更新状态队列"""
current_state = list(self.arm.read().values())
current_state = list(self.arm.read()['joint'].values())
self.joint_queue.append(current_state)
def _capture_images(self) -> Dict[str, torch.Tensor]:
@@ -224,12 +227,17 @@ class RealmanDualRobot:
# 读取当前状态
state = self._read_robot_state()
# 获取动作
action = self.teleop.get_action()
action = self.teleop.get_action(state)
# print(state['pose'])
# print(action['pose'])
# 执行动作
# self._execute_action(action, state)
# import pdb
# pdb.set_trace()
self._execute_action(action, state)
# 更新状态队列
self._update_state_queue()
time.sleep(0.02)
if record_data:
data = self._prepare_record_data()
return data

View File

@@ -0,0 +1,18 @@
import pygame
def find_controller_index():
# 获取所有 pygame 控制器的设备路径
pygame_joysticks = {}
for i in range(pygame.joystick.get_count()):
joystick = pygame.joystick.Joystick(i)
joystick.init()
pygame_joysticks[joystick.get_guid()] = {
'index': i,
'device_name': joystick.get_name()
}
return pygame_joysticks
if __name__ == '__main__':
print(find_controller_index())

View File

@@ -2,14 +2,135 @@ import pygame
import threading
import time
import logging
import pyudev
from typing import Dict
from dataclasses import dataclass
from .find_gamepad import find_controller_index
from .servo_server import ServoArmServer
class RealmanAlohaMaster:
def __init__(self, config):
self.config = config
self._initialize_master_arm()
def _initialize_master_arm(self):
"""初始化主控臂"""
try:
self.master_dual_arm = ServoArmServer(self.config.config_file)
except Exception as e:
logging.error(f"初始化主控臂失败: {e}")
raise
def get_action(self) -> Dict:
"""获取控制动作"""
try:
master_joint_actions = self.master_dual_arm.get_joint_data()
return self._format_action(master_joint_actions)
except Exception as e:
logging.error(f"获取动作失败: {e}")
def _format_action(self, master_joint_actions: dict) -> dict:
"""格式化动作数据"""
master_controller_status = {
'left': master_joint_actions['left_controller_status'],
'right': master_joint_actions['right_controller_status']
}
return {
'control_mode': 'joint',
'master_joint_actions': master_joint_actions['dual_joint_actions'],
'left_joint_actions': master_joint_actions['left_joint_actions'][:-1],
'right_joint_actions': master_joint_actions['right_joint_actions'][:-1],
'left_gripper_actions': master_joint_actions['left_joint_actions'][-1], # 修复bug
'right_gripper_actions': master_joint_actions['right_joint_actions'][-1],
'master_controller_status': master_controller_status
}
def stop(self):
"""停止控制器"""
try:
if hasattr(self, 'master_dual_arm') and self.master_dual_arm:
self.master_dual_arm.shutdown()
print("混合控制器已退出")
except Exception as e:
logging.error(f"停止控制器失败: {e}")
class DummyEndposeMaster:
def __init__(self, config):
# 初始化pygame
pygame.init()
pygame.joystick.init()
# 获取所有 USB 游戏控制器的信息
self.joysticks = find_controller_index()
print(self.joysticks)
self.control_info = config.end_control_info
left_stick = self._init_stick('left')
right_stick = self._init_stick('right')
self.controllers = [left_stick, right_stick]
def _init_stick(self, arm_name:str):
stick_info = {}
stick_info['index'] = self.joysticks[self.control_info[arm_name]]['index']
stick_info['guid'] = self.control_info[arm_name]
stick_info['name'] = f'{arm_name}'
device_name = self.joysticks[self.control_info[arm_name]]['device_name']
stick = XboxStick(stick_info) if "Xbox" in device_name else FlightStick(stick_info)
stick.start_polling()
return stick
def get_action(self, state) -> Dict:
from copy import deepcopy
new_state = deepcopy(state)
gamepad_action = {}
xyz = []
rxryrz = []
gripper = []
"""获取控制动作"""
try:
for i, controller in enumerate(self.controllers):
# states = controller.get_raw_states()
gamepad_action.update(controller.get_control_signal(controller.name))
xyz += [f"{controller.name}_x", f"{controller.name}_y", f"{controller.name}_z"]
rxryrz += [f"{controller.name}_joint_4", f"{controller.name}_joint_5", f"{controller.name}_joint_6"]
gripper += [f"{controller.name}_gripper"]
for name in xyz:
new_state['pose'][name] += (gamepad_action[name] * gamepad_action['xyz_vel'] * gamepad_action[name.split('_')[0]+'_ratio'])
for name in gripper:
new_state['joint'][name] += int(gamepad_action[name] * gamepad_action['gripper_vel'] * gamepad_action[name.split('_')[0]+'_ratio'])
new_state['joint'][name] = min(990, max(0, new_state['joint'][name]))
for name in rxryrz:
new_state['joint'][name] += (gamepad_action[name] * gamepad_action['rxyz_vel'] * gamepad_action[name.split('_')[0]+'_ratio'])
new_state['control_mode'] = 'endpose'
return new_state
except Exception as e:
logging.error(f"获取动作失败: {e}")
def stop(self):
"""停止控制器"""
try:
# 停止轮询线程
for controller in self.controllers:
controller.stop_polling()
except Exception as e:
logging.error(f"停止控制器失败: {e}")
class ControllerBase:
def __init__(self, joystick_index):
def __init__(self, joystick_info: dict):
# 初始化手柄对象
self.joystick = pygame.joystick.Joystick(joystick_index)
self.joystick = pygame.joystick.Joystick(joystick_info['index'])
self.joystick.init()
self.name = joystick_info['name']
self.guid = joystick_info['guid']
# 存储所有控制器状态的字典
self.states = {
@@ -22,6 +143,10 @@ class ControllerBase:
self.deadzone = 0.15
# validzone
self.validzone = 0.05
self.ratio = 1
self.gripper_vel = 50
self.rxyz_vel = 5
self.xyz_vel = 0.02
# 线程控制标志
self.running = False
@@ -65,8 +190,8 @@ class ControllerBase:
return self.states
class FlightStick(ControllerBase):
def __init__(self, joystick_index):
super().__init__(joystick_index)
def __init__(self, joystick_info):
super().__init__(joystick_info)
def get_x_control_signal(self):
x = 0
@@ -74,18 +199,14 @@ class FlightStick(ControllerBase):
x = 1
elif self.states['axes'][0] < -self.validzone:
x = -1
else:
x = 0
return x
def get_y_control_signal(self):
y = 0
if self.states['axes'][1] > self.validzone:
y = 1
elif self.states['axes'][1] < -self.validzone:
y = -1
else:
y = 0
elif self.states['axes'][1] < -self.validzone:
y = 1
return y
def get_z_control_signal(self):
@@ -94,8 +215,6 @@ class FlightStick(ControllerBase):
z = 1
elif self.states['buttons'][1]:
z = -1
else:
z = 0
return z
def get_gripper_control_signal(self):
@@ -106,22 +225,20 @@ class FlightStick(ControllerBase):
gripper = -1
return gripper
def get_vel_control_signal(self):
vel = self.deadzone
def get_ratio_control_signal(self):
ratio = self.ratio
if self.states['axes'][2] > 0.8:
vel = self.deadzone * 2
ratio = self.ratio / 5
elif self.states['axes'][2] < -0.8:
vel = self.deadzone / 5
else:
vel = self.deadzone
return vel
ratio = self.ratio * 2
return ratio
def get_rx_control_signal(self):
rx = 0
if self.states['hats'][0][0] == -1:
rx = -1
elif self.states['hats'][0][0] == 1:
rx = 1
elif self.states['hats'][0][0] == 1:
rx = -1
else:
rx = 0
return rx
@@ -129,9 +246,9 @@ class FlightStick(ControllerBase):
def get_ry_control_signal(self):
ry = 0
if self.states['hats'][0][1] == 1:
ry = 1
elif self.states['hats'][0][1] == -1:
ry = -1
elif self.states['hats'][0][1] == -1:
ry = 1
else:
ry = 0
return ry
@@ -146,59 +263,63 @@ class FlightStick(ControllerBase):
rz = 0
return rz
def get_control_signal(self):
return{
'x': self.get_x_control_signal(),
'y': self.get_y_control_signal(),
'z': self.get_z_control_signal(),
'rx': self.get_rx_control_signal(),
'ry': self.get_ry_control_signal(),
'rz': self.get_rz_control_signal(),
'gripper': self.get_gripper_control_signal(),
'vel': self.get_vel_control_signal()
def get_control_signal(self, prefix: str = ""):
"""获取所有控制信号"""
return {
f'{prefix}_x': self.get_x_control_signal(),
f'{prefix}_y': self.get_y_control_signal(),
f'{prefix}_z': self.get_z_control_signal(),
f'{prefix}_joint_4': self.get_rx_control_signal(),
f'{prefix}_joint_5': self.get_ry_control_signal(),
f'{prefix}_joint_6': self.get_rz_control_signal(),
f'{prefix}_gripper': self.get_gripper_control_signal(),
f'{prefix}_ratio': self.get_ratio_control_signal(),
'gripper_vel': self.gripper_vel,
'rxyz_vel': self.rxyz_vel,
'xyz_vel': self.xyz_vel
}
class XboxStick:
def __init__(self, joystick_index):
super().__init__(joystick_index)
class XboxStick(ControllerBase):
def __init__(self, joystick_info: dict):
super().__init__(joystick_info)
def get_x_control_signal(self):
"""获取 X 轴控制信号"""
x = 0
if self.states['hats'][0][0] == -1:
x = -1
elif self.states['hats'][0][0] == 1:
x = 1
elif self.states['hats'][0][0] == 1:
x = -1
return x
def get_y_control_signal(self):
"""获取 Y 轴控制信号"""
y = 0
if self.states['hats'][0][1] == 1:
y = 1
elif self.states['hats'][0][1] == -1:
y = -1
elif self.states['hats'][0][1] == -1:
y = 1
return y
def get_z_control_signal(self):
"""获取 Z 轴控制信号"""
z = 0
if self.states['axes'][4] > self.deadzone: # A 按钮
z = 1
elif self.states['axes'][4] < -self.deadzone: # B 按钮
z = -1
elif self.states['axes'][4] < -self.deadzone: # B 按钮
z = 1
return z
def get_vel_control_signal(self):
def get_ratio_control_signal(self):
"""获取速度控制信号"""
vel = self.deadzone
ratio = self.ratio
if self.states['axes'][2] > 0.8: # LT 按钮
vel = self.deadzone * 2
ratio = self.ratio * 2
elif self.states['axes'][5] > 0.8: # RT 按钮
vel = self.deadzone / 5
return vel
ratio = self.ratio / 5
return ratio
def get_gripper_control_signal(self):
gripper = 0
@@ -212,9 +333,9 @@ class XboxStick:
"""获取 RX 轴控制信号"""
rx = 0
if self.states['axes'][0] > self.deadzone: # 左舵
rx = 1
elif self.states['axes'][0] < -self.deadzone: # 右舵
rx = -1
elif self.states['axes'][0] < -self.deadzone: # 右舵
rx = 1
return rx
def get_ry_control_signal(self):
@@ -235,85 +356,64 @@ class XboxStick:
rz = -1
return rz
def get_control_signal(self):
def get_control_signal(self, prefix: str = ""):
"""获取所有控制信号"""
return {
'x': self.get_x_control_signal(),
'y': self.get_y_control_signal(),
'z': self.get_z_control_signal(),
'rx': self.get_rx_control_signal(),
'ry': self.get_ry_control_signal(),
'rz': self.get_rz_control_signal(),
'gripper': self.get_gripper_control_signal(),
'vel': self.get_vel_control_signal()
f'{prefix}_x': self.get_x_control_signal(),
f'{prefix}_y': self.get_y_control_signal(),
f'{prefix}_z': self.get_z_control_signal(),
f'{prefix}_joint_4': self.get_rx_control_signal(),
f'{prefix}_joint_5': self.get_ry_control_signal(),
f'{prefix}_joint_6': self.get_rz_control_signal(),
f'{prefix}_gripper': self.get_gripper_control_signal(),
f'{prefix}_ratio': self.get_ratio_control_signal(),
'gripper_vel': self.gripper_vel,
'rxyz_vel': self.rxyz_vel,
'xyz_vel': self.xyz_vel
}
@dataclass
class ControllerConfig:
"""控制器配置"""
init_joint: list
init_pose: list
max_gripper: int
min_gripper: int
config_file: str
end_control_info: dict
def parse_init_info(init_info: dict) -> ControllerConfig:
"""解析初始化信息"""
return ControllerConfig(
init_joint=init_info['init_joint'],
init_pose=init_info.get('init_pose', [0]*12),
max_gripper=init_info['max_gripper'],
min_gripper=init_info['min_gripper'],
config_file=init_info['servo_config_file'],
end_control_info=init_info['end_control_info']
)
def get_usb_joystick_info():
"""获取所有 USB 游戏控制器的信息"""
context = pyudev.Context()
devices = []
for device in context.list_devices(subsystem='input', ID_INPUT_JOYSTICK=1):
# 获取设备路径(如 /dev/input/js0
device_path = device.device_node
if device_path:
# 获取 VID、PID 和 UUIDID_SERIAL
vid = device.get('ID_VENDOR_ID')
pid = device.get('ID_MODEL_ID')
uuid = device.get('ID_SERIAL_SHORT') # 设备的唯一标识符
if vid and pid and uuid:
devices.append({
'path': device_path,
'vid': int(vid, 16), # 转换为十六进制
'pid': int(pid, 16), # 转换为十六进制
'uuid': uuid # 设备的唯一标识符
})
return devices
def match_controller_index(joystick_info):
"""将 USB 设备信息与 pygame 的控制器索引匹配"""
# 获取所有 pygame 控制器的设备路径
pygame_joysticks = []
for i in range(pygame.joystick.get_count()):
joystick = pygame.joystick.Joystick(i)
joystick.init()
pygame_joysticks.append({
'index': i,
'name': joystick.get_name(),
'guid': joystick.get_guid()
})
# 匹配设备路径
for usb_device in joystick_info:
for pygame_joystick in pygame_joysticks:
if usb_device['path'] in pygame_joystick['guid']:
usb_device['pygame_index'] = pygame_joystick['index']
break
return joystick_info
if __name__ == "__main__":
# 初始化pygame
pygame.init()
pygame.joystick.init()
# 获取所有 USB 游戏控制器的信息
usb_joysticks = get_usb_joystick_info()
matched_joysticks = match_controller_index(usb_joysticks)
print(usb_joysticks)
print(matched_joysticks)
# stick = FlightStick()
# # stick = XboxStick()
# stick.start_polling()
# try:
# while True:
# # 主程序可以继续执行其他任务
# states = stick.get_raw_states()
# # states = stick.get_control_signal()
# print("当前状态:", states)
# time.sleep(1)
# except KeyboardInterrupt:
# stick.stop_polling()
config = {
'init_joint': {'joint': [-170, 90, 0, 90, 120, 0, 10, 170, 90, 0, -90, 120, 0, 10]},
'init_pose': {},
'max_gripper': {},
'min_gripper': {},
'servo_config_file': {},
'end_control_info': {'left': "0300b14bff1100003708000010010000" , 'right': '0300509d5e040000120b000009050000'}
}
config = parse_init_info(config)
endpose_arm = DummyEndposeMaster(config)
while True:
gamepad_action = {}
xyz = []
for i, controller in enumerate(endpose_arm.controllers):
# states = controller.get_raw_states()
gamepad_action.update(controller.get_control_signal(controller.name))
xyz += [f"{controller.name}_x", f"{controller.name}_y", f"{controller.name}_z"]
time.sleep(1)
print(gamepad_action)

View File

@@ -1,14 +1,8 @@
import pygame
import threading
import time
import serial
import binascii
import logging
import yaml
from typing import Dict
from dataclasses import dataclass
from Robotic_Arm.rm_robot_interface import *
from .servo_server import ServoArmServer
from .gamepad import RealmanAlohaMaster, DummyEndposeMaster
@dataclass
@@ -19,7 +13,7 @@ class ControllerConfig:
max_gripper: int
min_gripper: int
config_file: str
end_controller: str = None
end_control_info: dict
class HybridController:
@@ -27,9 +21,9 @@ class HybridController:
self.config = self._parse_init_info(init_info)
self.joint = self.config.init_joint.copy()
self.pose = self.config.init_pose.copy()
self.joint_control_mode = True
self._initialize_master_arm()
self.joint_arm = RealmanAlohaMaster(self.config)
self.endpose_arm = DummyEndposeMaster(self.config)
def _parse_init_info(self, init_info: dict) -> ControllerConfig:
"""解析初始化信息"""
@@ -39,57 +33,21 @@ class HybridController:
max_gripper=init_info['max_gripper'],
min_gripper=init_info['min_gripper'],
config_file=init_info['servo_config_file'],
end_controller=init_info['end_controller']
end_control_info=init_info['end_control_info']
)
def _initialize_master_arm(self):
"""初始化主控臂"""
try:
self.master_dual_arm = ServoArmServer(self.config.config_file)
except Exception as e:
logging.error(f"初始化主控臂失败: {e}")
raise
def get_action(self) -> Dict:
def get_action(self, state) -> Dict:
"""获取控制动作"""
try:
master_joint_actions = self.master_dual_arm.get_joint_data()
return self._format_action(master_joint_actions)
endpose_action = self.endpose_arm.get_action(state)
return endpose_action
# return self.joint_arm.get_action()
except Exception as e:
logging.error(f"获取动作失败: {e}")
def _format_action(self, master_joint_actions: dict) -> dict:
"""格式化动作数据"""
master_controller_status = {
'left': master_joint_actions['left_controller_status'],
'right': master_joint_actions['right_controller_status']
}
return {
'control_mode': 'joint' if self.joint_control_mode else 'end_pose',
'master_joint_actions': master_joint_actions['dual_joint_actions'],
'left_joint_actions': master_joint_actions['left_joint_actions'][:-1],
'right_joint_actions': master_joint_actions['right_joint_actions'][:-1],
'left_gripper_actions': master_joint_actions['left_joint_actions'][-1], # 修复bug
'right_gripper_actions': master_joint_actions['right_joint_actions'][-1],
'master_controller_status': master_controller_status,
'end_pose': self.pose
}
def switch_control_mode(self):
"""切换控制模式"""
self.joint_control_mode = not self.joint_control_mode
mode = "关节" if self.joint_control_mode else "末端"
print(f"切换到{mode}控制模式")
def stop(self):
"""停止控制器"""
try:
if hasattr(self, 'master_dual_arm') and self.master_dual_arm:
self.master_dual_arm.shutdown()
print("混合控制器已退出")
except Exception as e:
logging.error(f"停止控制器失败: {e}")
self.joint_arm.stop()
def reset(self):
"""重置控制器"""
@@ -97,6 +55,7 @@ class HybridController:
self.pose = self.config.init_pose.copy()
self.joint_control_mode = True
# 使用示例
if __name__ == "__main__":
init_info = {
@@ -104,7 +63,8 @@ if __name__ == "__main__":
'init_pose': [[-0.0305, 0.125938, 0.13153, 3.141, 0.698, -1.57, -0.030486, -0.11487, 0.144707, 3.141, 0.698, 1.57]],
'max_gripper': 990,
'min_gripper': 10,
'servo_config_file': '/home/maic/LYT/lerobot/lerobot/common/robot_devices/teleop/servo_dual.yaml'
'servo_config_file': '/home/maic/LYT/lerobot/lerobot/common/robot_devices/teleop/servo_dual.yaml',
'end_control_info': {'left': '0300b14bff1100003708000010010000', 'right': '030003f05e0400008e02000010010000'}
}
arm_controller = HybridController(init_info)
time.sleep(1)