添加xbox手柄和飞行控制
This commit is contained in:
6
main.py
Normal file
6
main.py
Normal file
@@ -0,0 +1,6 @@
|
||||
def main():
|
||||
print("Hello from lerobot!")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
11
pyproject.toml
Normal file
11
pyproject.toml
Normal file
@@ -0,0 +1,11 @@
|
||||
[project]
|
||||
name = "robot-client"
|
||||
version = "0.1.0"
|
||||
description = "Add your description here"
|
||||
readme = "README.md"
|
||||
requires-python = ">=3.12"
|
||||
dependencies = [
|
||||
"lerobot>=0.4.2",
|
||||
"Robotic_Arm==1.1.3",
|
||||
"pygame==2.6.1",
|
||||
]
|
||||
18
src/robots/realman/config.py
Normal file
18
src/robots/realman/config.py
Normal file
@@ -0,0 +1,18 @@
|
||||
from dataclasses import dataclass, field
|
||||
from lerobot.cameras.configs import CameraConfig
|
||||
from lerobot.cameras.realsense.configuration_realsense import RealSenseCameraConfig
|
||||
from lerobot.motors.motors_bus import Motor, MotorCalibration
|
||||
from lerobot.robots.config import RobotConfig
|
||||
|
||||
@RobotConfig.register_subclass("realman")
|
||||
@dataclass
|
||||
class RealmanRobotConfig(RobotConfig):
|
||||
# Port to connect to the arm
|
||||
port: str
|
||||
gripper_range: list[int] = [10, 990]
|
||||
disable_torque_on_disconnect: bool = True
|
||||
# cameras
|
||||
cameras: dict[str, RealSenseCameraConfig] = field(default_factory=dict)
|
||||
joint: list=field(default_factory=list)
|
||||
motors: dict[str, Motor] = field(default_factory=dict)
|
||||
calibration: dict[str, MotorCalibration] | None = None
|
||||
295
src/robots/realman/motors_bus.py
Normal file
295
src/robots/realman/motors_bus.py
Normal file
@@ -0,0 +1,295 @@
|
||||
from lerobot import Motor, MotorCalibration, MotorsBus, Value
|
||||
import time
|
||||
from typing import Any, Dict
|
||||
from Robotic_Arm.rm_robot_interface import *
|
||||
#动作执行成功
|
||||
ACTION_SUCCESS = 0
|
||||
|
||||
class RealmanMotorsBus(MotorsBus):
|
||||
model_number_table={}
|
||||
model_ctrl_table={
|
||||
"realman":{}
|
||||
}
|
||||
"""
|
||||
对Realman SDK的二次封装
|
||||
"""
|
||||
def __init__(
|
||||
self,
|
||||
port: str,
|
||||
motors:dict[str, Motor],
|
||||
gripper_range:list[int],
|
||||
joint: list,
|
||||
calibration: dict[str, MotorCalibration] | None = None,
|
||||
):
|
||||
super().__init__(port,motors, calibration)
|
||||
self.rmarm = RoboticArm(rm_thread_mode_e.RM_TRIPLE_MODE_E)
|
||||
address = port.split(':')
|
||||
self.handle = self.rmarm.rm_create_robot_arm(address[0], int(address[1]))
|
||||
self.motors = motors
|
||||
self.gripper_range = gripper_range
|
||||
self.init_joint_position = joint
|
||||
self.safe_disable_position = joint
|
||||
self.rmarm.rm_movej(self.init_joint_position[:-1], 5, 0, 0, 1)
|
||||
time.sleep(3)
|
||||
ret = self.rmarm.rm_get_current_arm_state()
|
||||
self.init_pose = ret[1]['pose']
|
||||
|
||||
@property
|
||||
def motor_names(self) -> list[str]:
|
||||
return list(self.motors.keys())
|
||||
|
||||
@property
|
||||
def motor_models(self) -> list[str]:
|
||||
return [model for _, model in self.motors.values()]
|
||||
|
||||
@property
|
||||
def motor_indices(self) -> list[int]:
|
||||
return [idx for idx, _ in self.motors.values()]
|
||||
|
||||
|
||||
def connect(self, enable=True) -> bool:
|
||||
'''
|
||||
使能机械臂并检测使能状态,尝试5s,如果使能超时则退出程序
|
||||
'''
|
||||
enable_flag = False
|
||||
loop_flag = False
|
||||
# 设置超时时间(秒)
|
||||
timeout = 5
|
||||
# 记录进入循环前的时间
|
||||
start_time = time.time()
|
||||
elapsed_time_flag = False
|
||||
|
||||
while not loop_flag:
|
||||
elapsed_time = time.time() - start_time
|
||||
print("--------------------")
|
||||
|
||||
if enable:
|
||||
# 获取机械臂状态
|
||||
ret = self.rmarm.rm_get_current_arm_state()
|
||||
if ret[0] == 0: # 成功获取状态
|
||||
enable_flag = True
|
||||
else:
|
||||
enable_flag = False
|
||||
# 断开所有连接,销毁线程
|
||||
RoboticArm.rm_destory()
|
||||
print("使能状态:", enable_flag)
|
||||
print("--------------------")
|
||||
if(enable_flag == enable):
|
||||
loop_flag = True
|
||||
enable_flag = True
|
||||
else:
|
||||
loop_flag = False
|
||||
enable_flag = False
|
||||
# 检查是否超过超时时间
|
||||
if elapsed_time > timeout:
|
||||
print("超时....")
|
||||
elapsed_time_flag = True
|
||||
enable_flag = True
|
||||
break
|
||||
time.sleep(1)
|
||||
|
||||
resp = enable_flag
|
||||
print(f"Returning response: {resp}")
|
||||
return resp
|
||||
|
||||
def motor_names(self):
|
||||
return
|
||||
|
||||
def read_calibration(self):
|
||||
return
|
||||
def set_calibration(self):
|
||||
return
|
||||
|
||||
def revert_calibration(self):
|
||||
return
|
||||
|
||||
def apply_calibration(self):
|
||||
"""
|
||||
移动到初始位置
|
||||
"""
|
||||
self.write_arm(target_joint=self.init_joint_position)
|
||||
|
||||
def write(self, data_name: str, motor: str, value: Value, *, normalize: bool = True, num_retry: int = 0):
|
||||
return False
|
||||
|
||||
def write_arm(self, target_joint: list):
|
||||
ret = self.rmarm.rm_movej_follow(target_joint[:-1])
|
||||
if ret != 0:
|
||||
print("movej error:", ret[1])
|
||||
return ret
|
||||
|
||||
ret = self.rmarm.rm_set_gripper_position(target_joint[-1], block=False, timeout=2)
|
||||
return ret
|
||||
|
||||
def write_endpose(self, target_endpose: list, gripper: int):
|
||||
self.rmarm.rm_movej_p(target_endpose, 50, 0, 0, 1)
|
||||
self.rmarm.rm_set_gripper_position(gripper, block=False, timeout=2)
|
||||
|
||||
def write_joint_slow(self, target_joint: list):
|
||||
self.rmarm.rm_movej(target_joint, 5, 0, 0, 0)
|
||||
|
||||
def write_joint_canfd(self, target_joint: list):
|
||||
self.rmarm.rm_movej_canfd(target_joint, False)
|
||||
|
||||
def write_endpose_canfd(self, target_pose: list):
|
||||
self.rmarm.rm_movep_canfd(target_pose, False)
|
||||
|
||||
def write_gripper(self, gripper: int):
|
||||
self.rmarm.rm_set_gripper_position(gripper, False, 2)
|
||||
|
||||
def read(self) -> Dict:
|
||||
"""
|
||||
- 机械臂关节消息,单位1度;[-1, 1]
|
||||
- 机械臂夹爪消息,[-1, 1]
|
||||
"""
|
||||
joint_msg = self.rmarm.rm_get_current_arm_state()[1]
|
||||
joint_state = joint_msg['joint']
|
||||
|
||||
gripper_msg = self.rmarm.rm_get_gripper_state()[1]
|
||||
gripper_state = gripper_msg['actpos']
|
||||
|
||||
return {
|
||||
"joint_1": joint_state[0]/180,
|
||||
"joint_2": joint_state[1]/180,
|
||||
"joint_3": joint_state[2]/180,
|
||||
"joint_4": joint_state[3]/180,
|
||||
"joint_5": joint_state[4]/180,
|
||||
"joint_6": joint_state[5]/180,
|
||||
"gripper": (gripper_state-500)/500
|
||||
}
|
||||
|
||||
def read_current_arm_joint_state(self):
|
||||
return self.rmarm.rm_get_current_arm_state()[1]['joint']
|
||||
|
||||
def read_current_arm_endpose_state(self):
|
||||
return self.rmarm.rm_get_current_arm_state()[1]['pose']
|
||||
|
||||
def safe_disconnect(self):
|
||||
"""
|
||||
Move to safe disconnect position
|
||||
"""
|
||||
self.write_arm(target_joint=self.safe_disable_position)
|
||||
# 断开所有连接,销毁线程
|
||||
RoboticArm.rm_destory()
|
||||
|
||||
"""
|
||||
向机械臂写入动作
|
||||
"""
|
||||
def sync_write(self,name,actionData: dict[str, Any]):
|
||||
if name =="Goal_Position":
|
||||
self.write_arm(target_joint=actionData)
|
||||
|
||||
def sync_read(self, data_name, motors = None, *, normalize = True, num_retry = 0):
|
||||
if data_name == "Present_Position":
|
||||
return self.read()
|
||||
|
||||
def _assert_protocol_is_compatible(self, instruction_name):
|
||||
return True
|
||||
|
||||
|
||||
"""
|
||||
配置电机
|
||||
"""
|
||||
def configure_motors(self, return_delay_time=0, maximum_acceleration=254, acceleration=254) -> None:
|
||||
self.rmarm.rm_set_gripper_route(self.gripper_range[0],self.gripper_range[1])
|
||||
|
||||
def disable_torque(self, motors: int | str | list[str] | None = None, num_retry: int = 0):
|
||||
for motor in self._get_motors_list(motors):
|
||||
self.rmarm.rm_set_joint_en_state(motor,0)
|
||||
|
||||
def _disable_torque(self, motor: int, model: str, num_retry: int = 0):
|
||||
self.rmarm.rm_set_joint_en_state(motor,0)
|
||||
|
||||
def enable_torque(self, motors = None, num_retry = 0):
|
||||
for motor in self._get_motors_list(motors):
|
||||
self.rmarm.rm_set_joint_en_state(motor,1)
|
||||
|
||||
@property
|
||||
def is_calibrated(self) -> bool:
|
||||
return True
|
||||
# motors_calibration = self.read_calibration()
|
||||
# if set(motors_calibration) != set(self.calibration):
|
||||
# return False
|
||||
|
||||
# same_ranges = all(
|
||||
# self.calibration[motor].range_min == cal.range_min
|
||||
# and self.calibration[motor].range_max == cal.range_max
|
||||
# for motor, cal in motors_calibration.items()
|
||||
# )
|
||||
# if self.protocol_version == 1:
|
||||
# return same_ranges
|
||||
|
||||
# same_offsets = all(
|
||||
# self.calibration[motor].homing_offset == cal.homing_offset
|
||||
# for motor, cal in motors_calibration.items()
|
||||
# )
|
||||
# return same_ranges and same_offsets
|
||||
|
||||
def write_calibration(self, calibration_dict, cache = True):
|
||||
pass
|
||||
# for motor, calibration in calibration_dict.items():
|
||||
# self.rmarm.rm_set_joint_min_pos(self, motor, calibration.range_min)
|
||||
# self.rmarm.rm_set_joint_max_pos(self, motor, calibration.range_max)
|
||||
# if cache:
|
||||
# self.calibration = calibration_dict
|
||||
|
||||
def _get_half_turn_homings(self, positions):
|
||||
offsets = {}
|
||||
for motor_id, current_pos in positions.items():
|
||||
# 确保输入是角度值
|
||||
current_deg = self._ensure_angle_degrees(current_pos, motor_id)
|
||||
# 核心逻辑:让当前位置变成180°
|
||||
offset = self._calculate_angle_offset(current_deg, motor_id)
|
||||
offsets[motor_id] = offset
|
||||
return offsets
|
||||
|
||||
def _calculate_angle_offset(self, current_deg, motor_id):
|
||||
"""计算角度偏移量"""
|
||||
|
||||
# 获取关节角度范围(从配置或SDK)
|
||||
# min_angle, max_angle = self._get_joint_limits(motor_id)
|
||||
|
||||
# 对于单圈关节,我们想要的目标是范围中点
|
||||
# 但set_half_turn_homings的语义是"变成半圈(180°)"
|
||||
TARGET = 180.0 # 固定目标:180°
|
||||
|
||||
# 计算基本偏移
|
||||
offset = TARGET - current_deg
|
||||
|
||||
# 单圈关节处理:偏移应该在[-180, 180)范围内
|
||||
if motor_id <= 6: # 单圈关节
|
||||
# 优化偏移到最短路径
|
||||
while offset >= 180.0:
|
||||
offset -= 360.0
|
||||
while offset < -180.0:
|
||||
offset += 360.0
|
||||
else: # 多圈关节(关节7)
|
||||
# 多圈关节:只需要考虑在当前圈内的偏移
|
||||
current_in_first_turn = current_deg % 360.0
|
||||
offset = TARGET - current_in_first_turn
|
||||
|
||||
# 同样优化到最短路径
|
||||
if offset >= 180.0:
|
||||
offset -= 360.0
|
||||
elif offset < -180.0:
|
||||
offset += 360.0
|
||||
|
||||
# 保留RealMan精度
|
||||
return round(offset, 3)
|
||||
|
||||
def _find_single_motor(self, motor: str, initial_baudrate: int | None = None) -> tuple[int, int]:
|
||||
return 0,self.motors[motor].id
|
||||
|
||||
def _encode_sign(self, data_name, ids_values):
|
||||
raise NotImplementedError
|
||||
|
||||
def _decode_sign(self, data_name, ids_values):
|
||||
raise NotImplementedError
|
||||
|
||||
def _split_into_byte_chunks(self, value, length):
|
||||
raise NotImplementedError
|
||||
|
||||
def broadcast_ping(self, num_retry = 0, raise_on_error = False):
|
||||
raise NotImplementedError
|
||||
def _handshake(self) -> None:
|
||||
pass
|
||||
145
src/robots/realman/robot.py
Normal file
145
src/robots/realman/robot.py
Normal file
@@ -0,0 +1,145 @@
|
||||
import logging
|
||||
from functools import cached_property
|
||||
import time
|
||||
from typing import Any
|
||||
from lerobot.cameras.utils import make_cameras_from_configs
|
||||
from .motors_bus import RealmanMotorsBus
|
||||
from .config import RealmanRobotConfig
|
||||
from lerobot.robots.robot import Robot
|
||||
from lerobot.robots.utils import ensure_safe_goal_position
|
||||
from lerobot.utils.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
class RealmanRobot(Robot):
|
||||
config_class = RealmanRobotConfig
|
||||
name = "realman"
|
||||
def __init__(self, config: RealmanRobotConfig):
|
||||
super().__init__(config)
|
||||
self.config = config
|
||||
self.bus = RealmanMotorsBus(
|
||||
port=self.config.port,
|
||||
motors=config.motors,
|
||||
joint=config.joint,
|
||||
gripper_range=config.gripper_range,
|
||||
calibration=self.calibration,
|
||||
)
|
||||
self.cameras = make_cameras_from_configs(config.cameras)
|
||||
@property
|
||||
def _motors_ft(self) -> dict[str, type]:
|
||||
return {f"{motor}.pos": float for motor in self.bus.motors}
|
||||
|
||||
@property
|
||||
def _cameras_ft(self) -> dict[str, tuple]:
|
||||
return {
|
||||
cam: (self.config.cameras[cam].height, self.config.cameras[cam].width, 3) for cam in self.cameras
|
||||
}
|
||||
|
||||
@cached_property
|
||||
def observation_features(self) -> dict[str, type | tuple]:
|
||||
return {**self._motors_ft, **self._cameras_ft}
|
||||
|
||||
@cached_property
|
||||
def action_features(self) -> dict[str, type]:
|
||||
return self._motors_ft
|
||||
|
||||
@property
|
||||
def is_connected(self) -> bool:
|
||||
return self.bus.is_connected and all(cam.is_connected for cam in self.cameras.values())
|
||||
|
||||
def connect(self, calibrate: bool = True) -> None:
|
||||
"""
|
||||
We assume that at connection time, arm is in a rest position,
|
||||
and torque can be safely disabled to run calibration.
|
||||
"""
|
||||
if self.is_connected:
|
||||
raise DeviceAlreadyConnectedError(f"{self} already connected")
|
||||
|
||||
self.bus.connect()
|
||||
|
||||
for cam in self.cameras.values():
|
||||
cam.connect()
|
||||
|
||||
if not self.is_calibrated and calibrate:
|
||||
logger.info(
|
||||
"Mismatch between calibration values in the motor and the calibration file or no calibration file found"
|
||||
)
|
||||
self.calibrate()
|
||||
self.configure()
|
||||
logger.info(f"{self} connected.")
|
||||
|
||||
@property
|
||||
def is_calibrated(self) -> bool:
|
||||
return self.bus.is_calibrated
|
||||
|
||||
def calibrate(self) -> None:
|
||||
"""move piper to the home position"""
|
||||
if not self.is_connected:
|
||||
raise ConnectionError()
|
||||
|
||||
self.bus.apply_calibration()
|
||||
|
||||
def configure(self) -> None:
|
||||
with self.bus.torque_disabled():
|
||||
self.bus.configure_motors()
|
||||
|
||||
def setup_motors(self) -> None:
|
||||
pass
|
||||
|
||||
def get_observation(self) -> dict[str, Any]:
|
||||
if not self.is_connected:
|
||||
raise DeviceNotConnectedError(f"{self} is not connected.")
|
||||
|
||||
# Read arm position
|
||||
start = time.perf_counter()
|
||||
obs_dict = self.bus.sync_read("Present_Position")
|
||||
obs_dict = {f"{motor}.pos": val for motor, val in obs_dict.items()}
|
||||
dt_ms = (time.perf_counter() - start) * 1e3
|
||||
logger.debug(f"{self} read state: {dt_ms:.1f}ms")
|
||||
|
||||
# Capture images from cameras
|
||||
for cam_key, cam in self.cameras.items():
|
||||
start = time.perf_counter()
|
||||
obs_dict[cam_key] = cam.async_read()
|
||||
dt_ms = (time.perf_counter() - start) * 1e3
|
||||
logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms")
|
||||
|
||||
return obs_dict
|
||||
|
||||
def send_action(self, action: dict[str, Any]) -> dict[str, Any]:
|
||||
"""Command arm to move to a target joint configuration.
|
||||
|
||||
The relative action magnitude may be clipped depending on the configuration parameter
|
||||
`max_relative_target`. In this case, the action sent differs from original action.
|
||||
Thus, this function always returns the action actually sent.
|
||||
|
||||
Raises:
|
||||
RobotDeviceNotConnectedError: if robot is not connected.
|
||||
|
||||
Returns:
|
||||
the action sent to the motors, potentially clipped.
|
||||
"""
|
||||
if not self.is_connected:
|
||||
raise DeviceNotConnectedError(f"{self} is not connected.")
|
||||
|
||||
goal_pos = {key.removesuffix(".pos"): val for key, val in action.items() if key.endswith(".pos")}
|
||||
|
||||
# Cap goal position when too far away from present position.
|
||||
# /!\ Slower fps expected due to reading from the follower.
|
||||
if self.config.max_relative_target is not None:
|
||||
present_pos = self.bus.sync_read("Present_Position")
|
||||
goal_present_pos = {key: (g_pos, present_pos[key]) for key, g_pos in goal_pos.items()}
|
||||
goal_pos = ensure_safe_goal_position(goal_present_pos, self.config.max_relative_target)
|
||||
|
||||
# Send goal position to the arm
|
||||
self.bus.sync_write("Goal_Position", goal_pos)
|
||||
return {f"{motor}.pos": val for motor, val in goal_pos.items()}
|
||||
|
||||
def disconnect(self):
|
||||
if not self.is_connected:
|
||||
raise DeviceNotConnectedError(f"{self} is not connected.")
|
||||
|
||||
self.bus.disconnect(self.config.disable_torque_on_disconnect)
|
||||
for cam in self.cameras.values():
|
||||
cam.disconnect()
|
||||
|
||||
logger.info(f"{self} disconnected.")
|
||||
0
src/teleoperators/__init__.py
Normal file
0
src/teleoperators/__init__.py
Normal file
2
src/teleoperators/flight_stick/__init__.py
Normal file
2
src/teleoperators/flight_stick/__init__.py
Normal file
@@ -0,0 +1,2 @@
|
||||
from .flight_stick import FlightStick,FlightStickConfig
|
||||
__all__ = ["FlightStick", "FlightStickConfig"]
|
||||
22
src/teleoperators/flight_stick/config.py
Normal file
22
src/teleoperators/flight_stick/config.py
Normal file
@@ -0,0 +1,22 @@
|
||||
from dataclasses import dataclass
|
||||
from lerobot.teleoperators import TeleoperatorConfig
|
||||
@dataclass
|
||||
@TeleoperatorConfig.register_subclass("flight_stick")
|
||||
class FlightStickConfig(TeleoperatorConfig):
|
||||
#控制器索引
|
||||
index: int = 0
|
||||
#定义启动按钮
|
||||
start_button: str = "button_7"
|
||||
#定义精细控制模式切换按钮
|
||||
fine_control_button: str = "button_10"
|
||||
#定义夹爪开按钮
|
||||
gripper_open_button: str = "button_1"
|
||||
#定义夹爪关按钮
|
||||
gripper_close_button: str = "button_0"
|
||||
#定义方向帽子键
|
||||
hat_button: int = 0
|
||||
#RX旋转映射
|
||||
RX_MAP: str = "button_4_5"
|
||||
#RY旋转映射
|
||||
RX_MAP: str = "button_2_3"
|
||||
|
||||
60
src/teleoperators/flight_stick/flight_stick.py
Normal file
60
src/teleoperators/flight_stick/flight_stick.py
Normal file
@@ -0,0 +1,60 @@
|
||||
import pygame
|
||||
from .config import FlightStickConfig
|
||||
from ..xbox import Xbox
|
||||
"""
|
||||
FlightStick类用于检测和监控飞行手柄。
|
||||
"""
|
||||
class FlightStick(Xbox):
|
||||
config_class = FlightStickConfig
|
||||
name = "flight_stick"
|
||||
|
||||
def get_action(self):
|
||||
pygame.event.pump()
|
||||
pose_speeds = [0.0] * 6
|
||||
"""更新末端位姿控制"""
|
||||
# 根据控制模式调整步长
|
||||
current_linear_step = self.linear_step * (0.1 if self.fine_control_mode else 1.0)
|
||||
current_angular_step = self.angular_step * (0.1 if self.fine_control_mode else 1.0)
|
||||
|
||||
# print(f"步长设置 - 线性: {current_linear_step}, 角度: {current_angular_step}")
|
||||
print(f"精细控制模式: {self.fine_control_mode}")
|
||||
# print(f"{self.joystick.get_axis(0)},{self.joystick.get_axis(1)},{self.joystick.get_axis(2)},{self.joystick.get_axis(3)}")
|
||||
# 方向键控制XY
|
||||
hat = self.joystick.get_hat(0)
|
||||
hat_up = hat[1] == 1 # Y+
|
||||
hat_down = hat[1] == -1 # Y-
|
||||
hat_left = hat[0] == -1 # X-
|
||||
hat_right = hat[0] == 1 # X+
|
||||
# 计算各轴速度
|
||||
pose_speeds[0] = -current_linear_step if hat_left else (current_linear_step if hat_right else 0.0) # X
|
||||
pose_speeds[1] = current_linear_step if hat_up else (-current_linear_step if hat_down else 0.0) # Y
|
||||
|
||||
# print(f"方向键状态: up={hat_up}, down={hat_down}, left={hat_left}, right={hat_right}")
|
||||
# 油门控制Z轴
|
||||
gas_axis = -self.joystick.get_axis(2)
|
||||
|
||||
# 设置Z速度
|
||||
z_mapping = self.apply_nonlinear_mapping(gas_axis)
|
||||
# print(f"Z轴非线性映射: {gas_axis} -> {z_mapping}")
|
||||
pose_speeds[2] = z_mapping * current_linear_step # Z
|
||||
|
||||
|
||||
# 主摇杆X轴控制RX旋转
|
||||
# 设置RX旋转速度
|
||||
rx_mapping = self.apply_nonlinear_mapping(-self.joystick.get_axis(1))
|
||||
# print(f"RX轴非线性映射: {x_axis} -> {rx_mapping}")
|
||||
pose_speeds[3] = rx_mapping * current_angular_step * 2 # RX
|
||||
|
||||
# 主摇杆Y轴控制RY旋转
|
||||
# 设置RY旋转速度
|
||||
ry_mapping = -self.apply_nonlinear_mapping(-self.joystick.get_axis(0))
|
||||
# print(f"RY轴非线性映射: {y_axis} -> {ry_mapping}")
|
||||
pose_speeds[4] = ry_mapping * current_angular_step * 2 # RY
|
||||
|
||||
# 主摇杆左右旋转轴 控制RZ旋转
|
||||
# 设置RZ旋转速度
|
||||
rz_mapping = self.apply_nonlinear_mapping(self.joystick.get_axis(3))
|
||||
# print(f"RZ轴非线性映射: {z_axis} -> {rz_mapping}")
|
||||
pose_speeds[5] = rz_mapping * current_angular_step * 2 # RZ
|
||||
|
||||
return pose_speeds
|
||||
2
src/teleoperators/xbox/__init__.py
Normal file
2
src/teleoperators/xbox/__init__.py
Normal file
@@ -0,0 +1,2 @@
|
||||
from .xbox import Xbox,XboxConfig
|
||||
__all__ = ["Xbox", "XboxConfig"]
|
||||
7
src/teleoperators/xbox/config.py
Normal file
7
src/teleoperators/xbox/config.py
Normal file
@@ -0,0 +1,7 @@
|
||||
from dataclasses import dataclass
|
||||
from lerobot.teleoperators import TeleoperatorConfig
|
||||
@dataclass
|
||||
@TeleoperatorConfig.register_subclass("xbox")
|
||||
class XboxConfig(TeleoperatorConfig):
|
||||
#控制器索引
|
||||
index: int = 0
|
||||
140
src/teleoperators/xbox/xbox.py
Normal file
140
src/teleoperators/xbox/xbox.py
Normal file
@@ -0,0 +1,140 @@
|
||||
import pygame
|
||||
import time
|
||||
import sys
|
||||
from lerobot.teleoperators import Teleoperator
|
||||
from .config import XboxConfig
|
||||
"""
|
||||
Xbox类用于检测和监控飞行手柄。
|
||||
"""
|
||||
class Xbox(Teleoperator):
|
||||
config_class = XboxConfig
|
||||
name = "flight_stick"
|
||||
def __init__(self, config: XboxConfig):
|
||||
self.config = config
|
||||
# 控制参数
|
||||
self.linear_step = 0.0015 # 线性移动步长(m)
|
||||
self.angular_step = 0.001 # 角度步长(rad) - 从度转换为弧度
|
||||
# 摇杆死区
|
||||
self.deadzone = 0.15
|
||||
|
||||
# 精细控制模式
|
||||
self.fine_control_mode = False
|
||||
self.reset()
|
||||
def reset(self):
|
||||
self.joystick = None
|
||||
@property
|
||||
def action_features(self):
|
||||
return {
|
||||
"dtype": "float32",
|
||||
"shape": (len(self.pose.keys()),),
|
||||
"names": self.pose.keys(),
|
||||
}
|
||||
|
||||
@property
|
||||
def feedback_features(self):
|
||||
return {}
|
||||
|
||||
@property
|
||||
def is_connected(self):
|
||||
return self.joystick is not None
|
||||
|
||||
def connect(self, calibrate = True):
|
||||
# 初始化pygame
|
||||
pygame.init()
|
||||
pygame.joystick.init()
|
||||
"""检测连接的手柄"""
|
||||
joystick_count = pygame.joystick.get_count()
|
||||
print(f"检测到 {joystick_count} 个手柄设备")
|
||||
|
||||
if joystick_count == 0:
|
||||
print("未检测到任何手柄设备!")
|
||||
return False
|
||||
|
||||
if joystick_count<self.config.index+1:
|
||||
print(f"未检测到配置中指定的索引设备!")
|
||||
return False
|
||||
# 初始化所有检测到的手柄
|
||||
self.joystick = pygame.joystick.Joystick(self.config.index)
|
||||
print(f"已连接设备:{self.joystick.get_name()}")
|
||||
|
||||
@property
|
||||
def is_calibrated(self):
|
||||
return True
|
||||
|
||||
def calibrate(self):
|
||||
pass
|
||||
|
||||
def configure(self):
|
||||
pass
|
||||
|
||||
def get_action(self):
|
||||
pygame.event.pump()
|
||||
pose_speeds = [0.0] * 6
|
||||
"""更新末端位姿控制"""
|
||||
# 根据控制模式调整步长
|
||||
current_linear_step = self.linear_step * (0.1 if self.fine_control_mode else 1.0)
|
||||
current_angular_step = self.angular_step * (0.1 if self.fine_control_mode else 1.0)
|
||||
if self.joystick.get_button(0): # A按钮
|
||||
self.fine_control_mode = True
|
||||
if self.joystick.get_button(1): # B按钮
|
||||
self.fine_control_mode = False
|
||||
|
||||
# print(f"步长设置 - 线性: {current_linear_step}, 角度: {current_angular_step}")
|
||||
print(f"精细控制模式: {self.fine_control_mode}")
|
||||
|
||||
# 方向键控制XY
|
||||
hat = self.joystick.get_hat(0)
|
||||
hat_up = hat[1] == 1 # Y+
|
||||
hat_down = hat[1] == -1 # Y-
|
||||
hat_left = hat[0] == -1 # X-
|
||||
hat_right = hat[0] == 1 # X+
|
||||
# print(f"方向键状态: up={hat_up}, down={hat_down}, left={hat_left}, right={hat_right}")
|
||||
# 计算各轴速度
|
||||
pose_speeds[0] = -current_linear_step if hat_left else (current_linear_step if hat_right else 0.0) # X
|
||||
pose_speeds[1] = current_linear_step if hat_up else (-current_linear_step if hat_down else 0.0) # Y
|
||||
|
||||
# 右摇杆X轴
|
||||
z_mapping = self.apply_nonlinear_mapping(self.joystick.get_axis(1))
|
||||
print(f"Z轴非线性映射: {self.joystick.get_axis(1)} -> {z_mapping}")
|
||||
pose_speeds[2] = z_mapping * current_angular_step * 2 # RY
|
||||
|
||||
#左摇杆X轴控制RX旋转
|
||||
# 设置RX旋转速度
|
||||
rx_mapping = self.apply_nonlinear_mapping(self.joystick.get_axis(2))
|
||||
# print(f"RX轴非线性映射: {x_axis} -> {rx_mapping}")
|
||||
pose_speeds[3] = rx_mapping * current_angular_step * 2 # RX
|
||||
|
||||
# 左摇杆Y轴控制RY旋转
|
||||
# 设置RY旋转速度
|
||||
ry_mapping = -self.apply_nonlinear_mapping(-self.joystick.get_axis(3))
|
||||
# print(f"RY轴非线性映射: {y_axis} -> {ry_mapping}")
|
||||
pose_speeds[4] = ry_mapping * current_angular_step * 2 # RY
|
||||
|
||||
# 左右扳机控制RZ旋转
|
||||
# 设置RZ旋转速度
|
||||
right_tiger = ((self.joystick.get_axis(5)+1)/2)
|
||||
left_tiger = ((self.joystick.get_axis(4)+1)/2)
|
||||
print(f"左右扳机: {left_tiger} {right_tiger}")
|
||||
rz_mapping = self.apply_nonlinear_mapping(left_tiger - right_tiger)
|
||||
# print(f"RZ轴非线性映射: {z_axis} -> {rz_mapping}")
|
||||
pose_speeds[5] = rz_mapping * current_angular_step * 2 # RZ
|
||||
|
||||
return pose_speeds
|
||||
|
||||
def apply_nonlinear_mapping(self, value):
|
||||
"""应用非线性映射以提高控制精度"""
|
||||
# 保持符号,但对数值应用平方映射以提高精度
|
||||
value = 0.0 if abs(value) < self.deadzone else value
|
||||
sign = 1 if value >= 0 else -1
|
||||
return sign * (abs(value) ** 2)
|
||||
def normalize_value(self, v1,v2):
|
||||
"""把两个轴归一化到一个轴上"""
|
||||
return v1-v2
|
||||
|
||||
def send_feedback(self, feedback):
|
||||
pass
|
||||
|
||||
def disconnect(self):
|
||||
self.reset()
|
||||
pygame.joystick.quit()
|
||||
pygame.quit()
|
||||
29
src/test.py
Normal file
29
src/test.py
Normal file
@@ -0,0 +1,29 @@
|
||||
import time
|
||||
from teleoperators.flight_stick import FlightStick,FlightStickConfig
|
||||
from teleoperators.xbox import Xbox,XboxConfig
|
||||
|
||||
def FlightStickTest():
|
||||
config = FlightStickConfig()
|
||||
config.index = 0
|
||||
flightStick = FlightStick(config)
|
||||
flightStick.connect()
|
||||
while True:
|
||||
data = flightStick.get_action()
|
||||
print(data)
|
||||
time.sleep(1)
|
||||
"""
|
||||
xbox手柄测试
|
||||
"""
|
||||
def XboxTest():
|
||||
config = XboxConfig()
|
||||
config.index = 0
|
||||
xbox = Xbox(config)
|
||||
xbox.connect()
|
||||
while True:
|
||||
data = xbox.get_action()
|
||||
print(data)
|
||||
time.sleep(1)
|
||||
|
||||
if __name__ == '__main__':
|
||||
XboxTest()
|
||||
|
||||
Reference in New Issue
Block a user