import threading import numpy as np import xrobotoolkit_sdk as xrt import time import sys from lerobot.teleoperators import Teleoperator from .config import XrConfig from scipy.spatial.transform import Rotation as R import meshcat.transformations as tf """ Xbox类用于检测和监控飞行手柄。 """ class Xr(Teleoperator): config_class = XrConfig name = "xr" def __init__(self, config: XrConfig): self.R_HEADSET_TO_WORLD = np.array( [ [-1, 0, 0], [0, 0, 1], [0, 1, 0], ] ) self.R_HEADSET_TO_WORLD2 = np.array( [ [1, 0, 0], [0, 1, 0], [0, 0, 1], ] ) self.config = config # 控制参数 self.linear_step = 0.0015 # 线性移动步长(m) self.current_linear_step = self.linear_step self.angular_step = 0.0003 # 角度步长(rad) - 从度转换为弧度 self.current_angular_step = self.angular_step # 摇杆死区 self.deadzone = 0.15 # 精细控制模式 self.fine_control_mode = False self.running = False self.data = { "left": { "pose": np.zeros(7), "init_pose":None, "trigger":0.0, "axis":[0.0]*2, "grip":0.0, "axis_button":0.0 }, "right": { "pose": np.zeros(7), "init_pose":None, "trigger":0.0, "grip":0.0, "axis":[0.0]*2, "axis_button":0.0 } } self.scale_factor = 0.2 self.x = 0.0 self.y = 0.0 self.a = 0.0 self.b = 0.0 @property def action_features(self): return { "dtype": "float32", "shape": (7,), "names": {"x.vel":0, "y.vel":1, "z.vel":2, "rx.vel.vel":3, "ry.vel":4, "rz.vel":5, "rw.vel":6, "gripper.vel":7}, } @property def feedback_features(self): return {} @property def is_connected(self): return self.joystick is not None def quaternion2euler(self,quaternion:list): r = R.from_quat(quaternion) euler = r.as_euler('xyz', degrees=True) return euler def update(self): while self.running: self.a = xrt.get_A_button() self.b = xrt.get_B_button() self.x = xrt.get_X_button() self.y = xrt.get_Y_button() self.data['left']['pose'] = xrt.get_left_controller_pose() self.data['left']['trigger'] = xrt.get_left_trigger() self.data['left']['grip'] = xrt.get_left_grip() self.data['left']['axis'] = xrt.get_left_axis() self.data['left']['axis_button'] = xrt.get_left_axis_click() self.data['right']['pose'] = xrt.get_right_controller_pose() self.data['right']['trigger'] = xrt.get_right_trigger() self.data['right']['grip'] = xrt.get_right_grip() self.data['right']['axis'] = xrt.get_right_axis() self.data['right']['axis_button'] = xrt.get_right_axis_click() time.sleep(0.01) def connect(self, calibrate = True): # 初始化vr xrt.init() # 设置超时时间(秒) timeout = 5 # 记录进入循环前的时间 start_time = time.time() elapsed_time_flag = False print("等待vr初始化完成") while True: elapsed_time = time.time() - start_time right_pose = xrt.get_right_controller_pose() left_pose = xrt.get_left_controller_pose() if not all(num == 0 for num in right_pose) and not all(num == 0 for num in left_pose): print(f"connect success") self.running = True threading.Thread(target=self.update).start() break # 检查是否超过超时时间 if elapsed_time > timeout: print("超时....") elapsed_time_flag = True break time.sleep(1) return not elapsed_time_flag @property def is_calibrated(self): return True def calibrate(self): pass def configure(self): pass def get_pose(self, prefix,pose_speeds): data = self.data[prefix] xr_pose = np.array(data['pose']) """Process the current XR controller pose.""" # Get position and orientation controller_xyz = np.array([xr_pose[0], xr_pose[1], xr_pose[2]]) controller_xyz = self.R_HEADSET_TO_WORLD @ controller_xyz controller_quat = R.from_quat(xr_pose[3:]) if data['trigger'] >0 or data['grip']>0: isLock = data['grip']>0 if data['init_pose'] is None: data['init_pose'] = controller_quat data['init_position'] = controller_xyz else: data['init_pose'] = None if data['init_pose'] is None: delta_xyz = np.zeros(3) delta_rot = np.array([0.0, 0.0, 0.0]) else: delta_xyz = (controller_xyz - data['init_position']) * self.scale_factor delta_rot = self.quat_diff_as_angle_axis(controller_quat,data['init_pose']) data['init_pose'] = controller_quat data['init_position'] = controller_xyz if isLock : delta_rot = [0.0,0.0,0.0] else: delta_rot *= self.scale_factor delta_rot = self.R_HEADSET_TO_WORLD2 @ delta_rot print(f"{prefix}:{delta_rot}") pose_speeds[f'{prefix}.x.vel'] = delta_xyz[0] pose_speeds[f'{prefix}.y.vel'] = delta_xyz[1] pose_speeds[f'{prefix}.z.vel'] = delta_xyz[2] pose_speeds[f'{prefix}.rx.vel'] = delta_rot[2] pose_speeds[f'{prefix}.ry.vel'] = delta_rot[1] pose_speeds[f'{prefix}.rz.vel'] = -delta_rot[0] # 设置夹爪开合速度 gripper_mapping = -self.apply_nonlinear_mapping(-data['axis'][0]) pose_speeds[f'{prefix}.gripper.vel'] = gripper_mapping * self.current_angular_step * 2 def quat_diff_as_angle_axis(self,q1: R, q2: R, eps: float = 1e-6) -> np.ndarray: delta_q = q2 * q1.inv() rotvec = delta_q.as_rotvec() angle = np.linalg.norm(rotvec) if angle < eps: return np.zeros(3, dtype=np.float64) return rotvec def get_action(self): pose_speeds = {} """更新末端位姿控制""" # 根据控制模式调整步长 if self.a: # A按钮 self.fine_control_mode = True self.scale_factor = 0.07 if self.b: # B按钮 self.fine_control_mode = False self.scale_factor = 0.2 self.get_pose('left',pose_speeds) self.get_pose('right',pose_speeds) return pose_speeds def apply_nonlinear_mapping(self, value): """应用非线性映射以提高控制精度""" # 保持符号,但对数值应用平方映射以提高精度 value = 0.0 if abs(value) < self.deadzone else value sign = 1 if value >= 0 else -1 return sign * (abs(value) ** 2) def send_feedback(self, feedback): pass def disconnect(self): self.running = False