forked from tangger/lerobot
some bug still
This commit is contained in:
@@ -58,7 +58,7 @@ def log_control_info(robot: Robot, dt_s, episode_index=None, frame_index=None, f
|
|||||||
log_dt("dt", dt_s)
|
log_dt("dt", dt_s)
|
||||||
|
|
||||||
# TODO(aliberts): move robot-specific logs logic in robot.print_logs()
|
# TODO(aliberts): move robot-specific logs logic in robot.print_logs()
|
||||||
if not robot.robot_type.startswith("stretch"):
|
if not robot.robot_type.startswith(("stretch", "realman")):
|
||||||
for name in robot.leader_arms:
|
for name in robot.leader_arms:
|
||||||
key = f"read_leader_{name}_pos_dt_s"
|
key = f"read_leader_{name}_pos_dt_s"
|
||||||
if key in robot.logs:
|
if key in robot.logs:
|
||||||
|
|||||||
@@ -39,3 +39,12 @@ class FeetechMotorsBusConfig(MotorsBusConfig):
|
|||||||
port: str
|
port: str
|
||||||
motors: dict[str, tuple[int, str]]
|
motors: dict[str, tuple[int, str]]
|
||||||
mock: bool = False
|
mock: bool = False
|
||||||
|
|
||||||
|
|
||||||
|
@MotorsBusConfig.register_subclass("realman")
|
||||||
|
@dataclass
|
||||||
|
class RealmanMotorsBusConfig(MotorsBusConfig):
|
||||||
|
ip: str
|
||||||
|
port: int
|
||||||
|
motors: dict[str, tuple[int, str]]
|
||||||
|
init_joint: dict[str, list]
|
||||||
124
lerobot/common/robot_devices/motors/realman.py
Normal file
124
lerobot/common/robot_devices/motors/realman.py
Normal file
@@ -0,0 +1,124 @@
|
|||||||
|
import time
|
||||||
|
from typing import Dict
|
||||||
|
from lerobot.common.robot_devices.motors.configs import RealmanMotorsBusConfig
|
||||||
|
from Robotic_Arm.rm_robot_interface import *
|
||||||
|
|
||||||
|
|
||||||
|
class RealmanMotorsBus:
|
||||||
|
"""
|
||||||
|
对Realman SDK的二次封装
|
||||||
|
"""
|
||||||
|
def __init__(self,
|
||||||
|
config: RealmanMotorsBusConfig):
|
||||||
|
self.rmarm = RoboticArm(rm_thread_mode_e.RM_TRIPLE_MODE_E)
|
||||||
|
self.handle = self.rmarm.rm_create_robot_arm(config.ip, config.port)
|
||||||
|
self.motors = config.motors
|
||||||
|
self.init_joint_position = config.init_joint['joint'] # [6 joints + 1 gripper]
|
||||||
|
self.safe_disable_position = config.init_joint['joint']
|
||||||
|
|
||||||
|
@property
|
||||||
|
def motor_names(self) -> list[str]:
|
||||||
|
return list(self.motors.keys())
|
||||||
|
|
||||||
|
@property
|
||||||
|
def motor_models(self) -> list[str]:
|
||||||
|
return [model for _, model in self.motors.values()]
|
||||||
|
|
||||||
|
@property
|
||||||
|
def motor_indices(self) -> list[int]:
|
||||||
|
return [idx for idx, _ in self.motors.values()]
|
||||||
|
|
||||||
|
|
||||||
|
def connect(self, enable=True) -> bool:
|
||||||
|
'''
|
||||||
|
使能机械臂并检测使能状态,尝试5s,如果使能超时则退出程序
|
||||||
|
'''
|
||||||
|
enable_flag = False
|
||||||
|
loop_flag = False
|
||||||
|
# 设置超时时间(秒)
|
||||||
|
timeout = 5
|
||||||
|
# 记录进入循环前的时间
|
||||||
|
start_time = time.time()
|
||||||
|
elapsed_time_flag = False
|
||||||
|
|
||||||
|
while not loop_flag:
|
||||||
|
elapsed_time = time.time() - start_time
|
||||||
|
print("--------------------")
|
||||||
|
|
||||||
|
if enable:
|
||||||
|
# 获取机械臂状态
|
||||||
|
ret = self.rmarm.rm_get_current_arm_state()
|
||||||
|
if ret[0] == 0: # 成功获取状态
|
||||||
|
enable_flag = True
|
||||||
|
else:
|
||||||
|
enable_flag = False
|
||||||
|
# 断开所有连接,销毁线程
|
||||||
|
RoboticArm.rm_destory()
|
||||||
|
print("使能状态:", enable_flag)
|
||||||
|
print("--------------------")
|
||||||
|
if(enable_flag == enable):
|
||||||
|
loop_flag = True
|
||||||
|
enable_flag = True
|
||||||
|
else:
|
||||||
|
loop_flag = False
|
||||||
|
enable_flag = False
|
||||||
|
# 检查是否超过超时时间
|
||||||
|
if elapsed_time > timeout:
|
||||||
|
print("超时....")
|
||||||
|
elapsed_time_flag = True
|
||||||
|
enable_flag = True
|
||||||
|
break
|
||||||
|
time.sleep(1)
|
||||||
|
|
||||||
|
resp = enable_flag
|
||||||
|
print(f"Returning response: {resp}")
|
||||||
|
return resp
|
||||||
|
|
||||||
|
def motor_names(self):
|
||||||
|
return
|
||||||
|
|
||||||
|
def set_calibration(self):
|
||||||
|
return
|
||||||
|
|
||||||
|
def revert_calibration(self):
|
||||||
|
return
|
||||||
|
|
||||||
|
def apply_calibration(self):
|
||||||
|
"""
|
||||||
|
移动到初始位置
|
||||||
|
"""
|
||||||
|
self.write(target_joint=self.init_joint_position)
|
||||||
|
|
||||||
|
def write(self, target_joint:list):
|
||||||
|
self.rmarm.rm_movej(target_joint[:-1], 50, 0, 0, 1)
|
||||||
|
self.rmarm.rm_set_gripper_position(target_joint[-1], block=False, timeout=2)
|
||||||
|
|
||||||
|
|
||||||
|
def read(self) -> Dict:
|
||||||
|
"""
|
||||||
|
- 机械臂关节消息,单位1度;[-1, 1]
|
||||||
|
- 机械臂夹爪消息,[-1, 1]
|
||||||
|
"""
|
||||||
|
joint_msg = self.rmarm.rm_get_current_arm_state()[1]
|
||||||
|
joint_state = joint_msg['joint']
|
||||||
|
|
||||||
|
gripper_msg = self.rmarm.rm_get_gripper_state()[1]
|
||||||
|
gripper_state = gripper_msg['actpos']
|
||||||
|
|
||||||
|
return {
|
||||||
|
"joint_1": joint_state[0]/180,
|
||||||
|
"joint_2": joint_state[1]/180,
|
||||||
|
"joint_3": joint_state[2]/180,
|
||||||
|
"joint_4": joint_state[3]/180,
|
||||||
|
"joint_5": joint_state[4]/180,
|
||||||
|
"joint_6": joint_state[5]/180,
|
||||||
|
"gripper": (gripper_state-500)/500
|
||||||
|
}
|
||||||
|
|
||||||
|
def safe_disconnect(self):
|
||||||
|
"""
|
||||||
|
Move to safe disconnect position
|
||||||
|
"""
|
||||||
|
self.write(target_joint=self.safe_disable_position)
|
||||||
|
# 断开所有连接,销毁线程
|
||||||
|
RoboticArm.rm_destory()
|
||||||
@@ -44,6 +44,11 @@ def make_motors_buses_from_configs(motors_bus_configs: dict[str, MotorsBusConfig
|
|||||||
|
|
||||||
motors_buses[key] = FeetechMotorsBus(cfg)
|
motors_buses[key] = FeetechMotorsBus(cfg)
|
||||||
|
|
||||||
|
elif cfg.type == "realman":
|
||||||
|
from lerobot.common.robot_devices.motors.realman import RealmanMotorsBus
|
||||||
|
|
||||||
|
motors_buses[key] = RealmanMotorsBus(cfg)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
raise ValueError(f"The motor type '{cfg.type}' is not valid.")
|
raise ValueError(f"The motor type '{cfg.type}' is not valid.")
|
||||||
|
|
||||||
@@ -65,3 +70,7 @@ def make_motors_bus(motor_type: str, **kwargs) -> MotorsBus:
|
|||||||
|
|
||||||
else:
|
else:
|
||||||
raise ValueError(f"The motor type '{motor_type}' is not valid.")
|
raise ValueError(f"The motor type '{motor_type}' is not valid.")
|
||||||
|
|
||||||
|
|
||||||
|
def get_motor_names(arm: dict[str, MotorsBus]) -> list:
|
||||||
|
return [f"{arm}_{motor}" for arm, bus in arm.items() for motor in bus.motors]
|
||||||
@@ -27,6 +27,7 @@ from lerobot.common.robot_devices.motors.configs import (
|
|||||||
DynamixelMotorsBusConfig,
|
DynamixelMotorsBusConfig,
|
||||||
FeetechMotorsBusConfig,
|
FeetechMotorsBusConfig,
|
||||||
MotorsBusConfig,
|
MotorsBusConfig,
|
||||||
|
RealmanMotorsBusConfig
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@@ -674,3 +675,86 @@ class LeKiwiRobotConfig(RobotConfig):
|
|||||||
)
|
)
|
||||||
|
|
||||||
mock: bool = False
|
mock: bool = False
|
||||||
|
|
||||||
|
|
||||||
|
@RobotConfig.register_subclass("realman")
|
||||||
|
@dataclass
|
||||||
|
class RealmanRobotConfig(RobotConfig):
|
||||||
|
inference_time: bool = False
|
||||||
|
max_gripper: int = 990
|
||||||
|
min_gripper: int = 10
|
||||||
|
servo_config_file: str = "/home/maic/LYT/lerobot/realman_src/realman_aloha/shadow_rm_robot/config/servo_arm.yaml"
|
||||||
|
|
||||||
|
|
||||||
|
left_follower_arm: dict[str, MotorsBusConfig] = field(
|
||||||
|
default_factory=lambda: {
|
||||||
|
"main": RealmanMotorsBusConfig(
|
||||||
|
ip = "192.168.3.18",
|
||||||
|
port = 8080,
|
||||||
|
motors={
|
||||||
|
# name: (index, model)
|
||||||
|
"joint_1": [1, "realman"],
|
||||||
|
"joint_2": [2, "realman"],
|
||||||
|
"joint_3": [3, "realman"],
|
||||||
|
"joint_4": [4, "realman"],
|
||||||
|
"joint_5": [5, "realman"],
|
||||||
|
"joint_6": [6, "realman"],
|
||||||
|
"gripper": [7, "realman"],
|
||||||
|
},
|
||||||
|
init_joint = {'joint': [-90, 90, 90, 170, 90, -90, 1000]}
|
||||||
|
)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
cameras: dict[str, CameraConfig] = field(
|
||||||
|
default_factory=lambda: {
|
||||||
|
"one": OpenCVCameraConfig(
|
||||||
|
camera_index=4,
|
||||||
|
fps=30,
|
||||||
|
width=640,
|
||||||
|
height=480,
|
||||||
|
),
|
||||||
|
# "two": IntelRealSenseCameraConfig(
|
||||||
|
# camera_index=2,
|
||||||
|
# fps=30,
|
||||||
|
# width=640,
|
||||||
|
# height=480,
|
||||||
|
# ),
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
# right_follower_arm: dict[str, MotorsBusConfig] = field(
|
||||||
|
# default_factory=lambda: {
|
||||||
|
# "main": RealmanMotorsBusConfig(
|
||||||
|
# ip = "192.168.3.19",
|
||||||
|
# port = 8080,
|
||||||
|
# motors={
|
||||||
|
# # name: (index, model)
|
||||||
|
# "joint_1": [1, "realman"],
|
||||||
|
# "joint_2": [2, "realman"],
|
||||||
|
# "joint_3": [3, "realman"],
|
||||||
|
# "joint_4": [4, "realman"],
|
||||||
|
# "joint_5": [5, "realman"],
|
||||||
|
# "joint_6": [6, "realman"],
|
||||||
|
# "gripper": (7, "realman"),
|
||||||
|
# },
|
||||||
|
# )
|
||||||
|
# }
|
||||||
|
# )
|
||||||
|
|
||||||
|
# cameras: dict[str, CameraConfig] = field(
|
||||||
|
# default_factory=lambda: {
|
||||||
|
# "one": OpenCVCameraConfig(
|
||||||
|
# camera_index=0,
|
||||||
|
# fps=30,
|
||||||
|
# width=640,
|
||||||
|
# height=480,
|
||||||
|
# ),
|
||||||
|
# "two": OpenCVCameraConfig(
|
||||||
|
# camera_index=2,
|
||||||
|
# fps=30,
|
||||||
|
# width=640,
|
||||||
|
# height=480,
|
||||||
|
# ),
|
||||||
|
# }
|
||||||
|
# )
|
||||||
|
|||||||
313
lerobot/common/robot_devices/robots/realman.py
Normal file
313
lerobot/common/robot_devices/robots/realman.py
Normal file
@@ -0,0 +1,313 @@
|
|||||||
|
"""
|
||||||
|
Teleoperation Realman with a PS5 controller and
|
||||||
|
"""
|
||||||
|
|
||||||
|
import time
|
||||||
|
import torch
|
||||||
|
import numpy as np
|
||||||
|
from dataclasses import dataclass, field, replace
|
||||||
|
from collections import deque
|
||||||
|
from lerobot.common.robot_devices.teleop.gamepad import HybridController
|
||||||
|
from lerobot.common.robot_devices.motors.utils import get_motor_names, make_motors_buses_from_configs
|
||||||
|
from lerobot.common.robot_devices.cameras.utils import make_cameras_from_configs
|
||||||
|
from lerobot.common.robot_devices.utils import RobotDeviceAlreadyConnectedError, RobotDeviceNotConnectedError
|
||||||
|
from lerobot.common.robot_devices.robots.configs import RealmanRobotConfig
|
||||||
|
|
||||||
|
class RealmanRobot:
|
||||||
|
def __init__(self, config: RealmanRobotConfig | None = None, **kwargs):
|
||||||
|
if config is None:
|
||||||
|
config = RealmanRobotConfig()
|
||||||
|
# Overwrite config arguments using kwargs
|
||||||
|
self.config = replace(config, **kwargs)
|
||||||
|
self.robot_type = self.config.type
|
||||||
|
self.inference_time = self.config.inference_time # if it is inference time
|
||||||
|
|
||||||
|
# build cameras
|
||||||
|
self.cameras = make_cameras_from_configs(self.config.cameras)
|
||||||
|
|
||||||
|
# build realman motors
|
||||||
|
self.piper_motors = make_motors_buses_from_configs(self.config.left_follower_arm)
|
||||||
|
self.arm = self.piper_motors['main']
|
||||||
|
self.arm.rmarm.rm_movej(self.arm.init_joint_position[:-1], 50, 0, 0, 0)
|
||||||
|
ret = self.arm.rmarm.rm_get_current_arm_state()
|
||||||
|
init_pose = ret[1]['pose']
|
||||||
|
|
||||||
|
|
||||||
|
# build state-action cache
|
||||||
|
self.joint_queue = deque(maxlen=2)
|
||||||
|
|
||||||
|
# build gamepad teleop
|
||||||
|
if not self.inference_time:
|
||||||
|
# build init teleop info
|
||||||
|
self.init_info = {
|
||||||
|
'init_joint': self.arm.init_joint_position,
|
||||||
|
'init_pose': init_pose,
|
||||||
|
'max_gripper': config.max_gripper,
|
||||||
|
'min_gripper': config.min_gripper,
|
||||||
|
'servo_config_file': config.servo_config_file
|
||||||
|
}
|
||||||
|
self.teleop = HybridController(self.init_info)
|
||||||
|
else:
|
||||||
|
self.teleop = None
|
||||||
|
|
||||||
|
self.logs = {}
|
||||||
|
self.is_connected = False
|
||||||
|
|
||||||
|
@property
|
||||||
|
def camera_features(self) -> dict:
|
||||||
|
cam_ft = {}
|
||||||
|
for cam_key, cam in self.cameras.items():
|
||||||
|
key = f"observation.images.{cam_key}"
|
||||||
|
cam_ft[key] = {
|
||||||
|
"shape": (cam.height, cam.width, cam.channels),
|
||||||
|
"names": ["height", "width", "channels"],
|
||||||
|
"info": None,
|
||||||
|
}
|
||||||
|
return cam_ft
|
||||||
|
|
||||||
|
|
||||||
|
@property
|
||||||
|
def motor_features(self) -> dict:
|
||||||
|
action_names = get_motor_names(self.piper_motors)
|
||||||
|
state_names = get_motor_names(self.piper_motors)
|
||||||
|
return {
|
||||||
|
"action": {
|
||||||
|
"dtype": "float32",
|
||||||
|
"shape": (len(action_names),),
|
||||||
|
"names": action_names,
|
||||||
|
},
|
||||||
|
"observation.state": {
|
||||||
|
"dtype": "float32",
|
||||||
|
"shape": (len(state_names),),
|
||||||
|
"names": state_names,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
@property
|
||||||
|
def has_camera(self):
|
||||||
|
return len(self.cameras) > 0
|
||||||
|
|
||||||
|
@property
|
||||||
|
def num_cameras(self):
|
||||||
|
return len(self.cameras)
|
||||||
|
|
||||||
|
|
||||||
|
def connect(self) -> None:
|
||||||
|
"""Connect RealmanArm and cameras"""
|
||||||
|
if self.is_connected:
|
||||||
|
raise RobotDeviceAlreadyConnectedError(
|
||||||
|
"RealmanArm is already connected. Do not run `robot.connect()` twice."
|
||||||
|
)
|
||||||
|
|
||||||
|
# connect RealmanArm
|
||||||
|
self.arm.connect(enable=True)
|
||||||
|
print("RealmanArm conneted")
|
||||||
|
|
||||||
|
# connect cameras
|
||||||
|
for name in self.cameras:
|
||||||
|
self.cameras[name].connect()
|
||||||
|
self.is_connected = self.is_connected and self.cameras[name].is_connected
|
||||||
|
print(f"camera {name} conneted")
|
||||||
|
|
||||||
|
print("All connected")
|
||||||
|
self.is_connected = True
|
||||||
|
|
||||||
|
self.run_calibration()
|
||||||
|
|
||||||
|
|
||||||
|
def disconnect(self) -> None:
|
||||||
|
"""move to home position, disenable piper and cameras"""
|
||||||
|
# move piper to home position, disable
|
||||||
|
if not self.inference_time:
|
||||||
|
self.teleop.stop()
|
||||||
|
|
||||||
|
# disconnect piper
|
||||||
|
self.arm.safe_disconnect()
|
||||||
|
print("RealmanArm disable after 5 seconds")
|
||||||
|
time.sleep(5)
|
||||||
|
self.arm.connect(enable=False)
|
||||||
|
|
||||||
|
# disconnect cameras
|
||||||
|
if len(self.cameras) > 0:
|
||||||
|
for cam in self.cameras.values():
|
||||||
|
cam.disconnect()
|
||||||
|
|
||||||
|
self.is_connected = False
|
||||||
|
|
||||||
|
|
||||||
|
def run_calibration(self):
|
||||||
|
"""move piper to the home position"""
|
||||||
|
if not self.is_connected:
|
||||||
|
raise ConnectionError()
|
||||||
|
|
||||||
|
self.arm.apply_calibration()
|
||||||
|
if not self.inference_time:
|
||||||
|
self.teleop.reset()
|
||||||
|
|
||||||
|
|
||||||
|
def teleop_step(
|
||||||
|
self, record_data=False
|
||||||
|
) -> None | tuple[dict[str, torch.Tensor], dict[str, torch.Tensor]]:
|
||||||
|
if not self.is_connected:
|
||||||
|
raise ConnectionError()
|
||||||
|
|
||||||
|
if self.teleop is None and self.inference_time:
|
||||||
|
self.teleop = HybridController(self.init_info)
|
||||||
|
|
||||||
|
# read target pose state as
|
||||||
|
before_read_t = time.perf_counter()
|
||||||
|
state = self.arm.read() # read current joint position from robot
|
||||||
|
action = self.teleop.get_action() # target joint position and pose end pos from gamepad
|
||||||
|
self.logs["read_pos_dt_s"] = time.perf_counter() - before_read_t
|
||||||
|
# print('gripper', action['gripper'])
|
||||||
|
|
||||||
|
if action['control_mode'] == 'joint':
|
||||||
|
# 关节控制模式(主模式)
|
||||||
|
ret = self.arm.rmarm.rm_get_current_arm_state()
|
||||||
|
current_pose = ret[1]['pose']
|
||||||
|
self.teleop.update_endpose_state(current_pose)
|
||||||
|
target_joints = action['joint_angles']
|
||||||
|
|
||||||
|
# do action
|
||||||
|
before_write_t = time.perf_counter()
|
||||||
|
self.joint_queue.append(list(self.arm.read().values()))
|
||||||
|
self.logs["write_pos_dt_s"] = time.perf_counter() - before_write_t
|
||||||
|
|
||||||
|
else:
|
||||||
|
# 末端位姿控制模式
|
||||||
|
target_pose = [
|
||||||
|
action['end_pose']['X'], # X (m)
|
||||||
|
action['end_pose']['Y'], # Y (m)
|
||||||
|
action['end_pose']['Z'], # Z (m)
|
||||||
|
action['end_pose']['RX'], # RX (rad)
|
||||||
|
action['end_pose']['RY'], # RY (rad)
|
||||||
|
action['end_pose']['RZ'] # RZ (rad)
|
||||||
|
]
|
||||||
|
|
||||||
|
# do action
|
||||||
|
before_write_t = time.perf_counter()
|
||||||
|
# 使用笛卡尔空间运动控制末端位姿
|
||||||
|
if action['tozero']:
|
||||||
|
self.arm.rmarm.rm_movej(self.arm.init_joint_position[:-1], 50, 0, 0, 0)
|
||||||
|
ret = self.arm.rmarm.rm_get_current_arm_state()
|
||||||
|
target_joints = ret[1].get('joint', self.arm.init_joint_position)
|
||||||
|
current_pose = ret[1]['pose']
|
||||||
|
self.teleop.update_endpose_state(current_pose)
|
||||||
|
self.teleop.update_joint_state(target_joints)
|
||||||
|
self.teleop.update_tozero_state(False)
|
||||||
|
else:
|
||||||
|
result = self.arm.rmarm.rm_movej_p(target_pose, 50, 0, 0, 0)
|
||||||
|
# 夹爪控制
|
||||||
|
self.arm.rmarm.rm_set_gripper_position(action['gripper'], False, 2)
|
||||||
|
ret = self.arm.rmarm.rm_get_current_arm_state()
|
||||||
|
target_joints = ret[1].get('joint', self.arm.init_joint_position)
|
||||||
|
self.joint_queue.append(list(self.arm.read().values()))
|
||||||
|
self.teleop.update_joint_state(target_joints)
|
||||||
|
self.logs["write_pos_dt_s"] = time.perf_counter() - before_write_t
|
||||||
|
|
||||||
|
print('-'*80)
|
||||||
|
print('mode: ', action['control_mode'])
|
||||||
|
print('state: ', list(state.values()))
|
||||||
|
print('action: ', target_joints)
|
||||||
|
print('cache[0]: ', self.joint_queue[0])
|
||||||
|
print('cache[-1]: ', self.joint_queue[-1])
|
||||||
|
print('-'*80)
|
||||||
|
# time.sleep(1)
|
||||||
|
if not record_data:
|
||||||
|
return
|
||||||
|
|
||||||
|
state = torch.as_tensor(list(self.joint_queue[0]), dtype=torch.float32)
|
||||||
|
action = torch.as_tensor(list(self.joint_queue[-1]), dtype=torch.float32)
|
||||||
|
|
||||||
|
# Capture images from cameras
|
||||||
|
images = {}
|
||||||
|
for name in self.cameras:
|
||||||
|
before_camread_t = time.perf_counter()
|
||||||
|
images[name] = self.cameras[name].async_read()
|
||||||
|
images[name] = torch.from_numpy(images[name])
|
||||||
|
self.logs[f"read_camera_{name}_dt_s"] = self.cameras[name].logs["delta_timestamp_s"]
|
||||||
|
self.logs[f"async_read_camera_{name}_dt_s"] = time.perf_counter() - before_camread_t
|
||||||
|
|
||||||
|
# Populate output dictionnaries
|
||||||
|
obs_dict, action_dict = {}, {}
|
||||||
|
obs_dict["observation.state"] = state
|
||||||
|
action_dict["action"] = action
|
||||||
|
for name in self.cameras:
|
||||||
|
obs_dict[f"observation.images.{name}"] = images[name]
|
||||||
|
|
||||||
|
return obs_dict, action_dict
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def send_action(self, action: torch.Tensor) -> torch.Tensor:
|
||||||
|
"""Write the predicted actions from policy to the motors"""
|
||||||
|
if not self.is_connected:
|
||||||
|
raise RobotDeviceNotConnectedError(
|
||||||
|
"Piper is not connected. You need to run `robot.connect()`."
|
||||||
|
)
|
||||||
|
|
||||||
|
# send to motors, torch to list
|
||||||
|
target_joints = action.tolist()
|
||||||
|
len_joint = len(target_joints) - 1
|
||||||
|
target_joints = [target_joints[i]*180 for i in range(len_joint)] + [target_joints[-1]]
|
||||||
|
target_joints[-1] = int(target_joints[-1]*500 + 500)
|
||||||
|
self.arm.write(target_joints)
|
||||||
|
|
||||||
|
return action
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def capture_observation(self) -> dict:
|
||||||
|
"""capture current images and joint positions"""
|
||||||
|
if not self.is_connected:
|
||||||
|
raise RobotDeviceNotConnectedError(
|
||||||
|
"Piper is not connected. You need to run `robot.connect()`."
|
||||||
|
)
|
||||||
|
|
||||||
|
# read current joint positions
|
||||||
|
before_read_t = time.perf_counter()
|
||||||
|
state = self.arm.read() # 6 joints + 1 gripper
|
||||||
|
self.logs["read_pos_dt_s"] = time.perf_counter() - before_read_t
|
||||||
|
|
||||||
|
state = torch.as_tensor(list(state.values()), dtype=torch.float32)
|
||||||
|
|
||||||
|
# read images from cameras
|
||||||
|
images = {}
|
||||||
|
for name in self.cameras:
|
||||||
|
before_camread_t = time.perf_counter()
|
||||||
|
images[name] = self.cameras[name].async_read()
|
||||||
|
images[name] = torch.from_numpy(images[name])
|
||||||
|
self.logs[f"read_camera_{name}_dt_s"] = self.cameras[name].logs["delta_timestamp_s"]
|
||||||
|
self.logs[f"async_read_camera_{name}_dt_s"] = time.perf_counter() - before_camread_t
|
||||||
|
|
||||||
|
# Populate output dictionnaries and format to pytorch
|
||||||
|
obs_dict = {}
|
||||||
|
obs_dict["observation.state"] = state
|
||||||
|
for name in self.cameras:
|
||||||
|
obs_dict[f"observation.images.{name}"] = images[name]
|
||||||
|
return obs_dict
|
||||||
|
|
||||||
|
def teleop_safety_stop(self):
|
||||||
|
""" move to home position after record one episode """
|
||||||
|
self.run_calibration()
|
||||||
|
|
||||||
|
|
||||||
|
def __del__(self):
|
||||||
|
if self.is_connected:
|
||||||
|
self.disconnect()
|
||||||
|
if not self.inference_time:
|
||||||
|
self.teleop.stop()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
robot = RealmanRobot()
|
||||||
|
robot.connect()
|
||||||
|
# robot.run_calibration()
|
||||||
|
while True:
|
||||||
|
robot.teleop_step(record_data=True)
|
||||||
|
|
||||||
|
robot.capture_observation()
|
||||||
|
dummy_action = torch.Tensor([-0.40586111280653214, 0.5522833506266276, 0.4998166826036241, -0.3539944542778863, -0.524433347913954, 0.9064999898274739, 0.482])
|
||||||
|
robot.send_action(dummy_action)
|
||||||
|
robot.disconnect()
|
||||||
|
print('ok')
|
||||||
@@ -25,6 +25,7 @@ from lerobot.common.robot_devices.robots.configs import (
|
|||||||
So100RobotConfig,
|
So100RobotConfig,
|
||||||
So101RobotConfig,
|
So101RobotConfig,
|
||||||
StretchRobotConfig,
|
StretchRobotConfig,
|
||||||
|
RealmanRobotConfig
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@@ -65,6 +66,9 @@ def make_robot_config(robot_type: str, **kwargs) -> RobotConfig:
|
|||||||
return StretchRobotConfig(**kwargs)
|
return StretchRobotConfig(**kwargs)
|
||||||
elif robot_type == "lekiwi":
|
elif robot_type == "lekiwi":
|
||||||
return LeKiwiRobotConfig(**kwargs)
|
return LeKiwiRobotConfig(**kwargs)
|
||||||
|
elif robot_type == 'realman':
|
||||||
|
return RealmanRobotConfig(**kwargs)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
raise ValueError(f"Robot type '{robot_type}' is not available.")
|
raise ValueError(f"Robot type '{robot_type}' is not available.")
|
||||||
|
|
||||||
@@ -78,6 +82,12 @@ def make_robot_from_config(config: RobotConfig):
|
|||||||
from lerobot.common.robot_devices.robots.mobile_manipulator import MobileManipulator
|
from lerobot.common.robot_devices.robots.mobile_manipulator import MobileManipulator
|
||||||
|
|
||||||
return MobileManipulator(config)
|
return MobileManipulator(config)
|
||||||
|
|
||||||
|
elif isinstance(config, RealmanRobotConfig):
|
||||||
|
from lerobot.common.robot_devices.robots.realman import RealmanRobot
|
||||||
|
|
||||||
|
return RealmanRobot(config)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
from lerobot.common.robot_devices.robots.stretch import StretchRobot
|
from lerobot.common.robot_devices.robots.stretch import StretchRobot
|
||||||
|
|
||||||
|
|||||||
466
lerobot/common/robot_devices/teleop/gamepad.py
Normal file
466
lerobot/common/robot_devices/teleop/gamepad.py
Normal file
@@ -0,0 +1,466 @@
|
|||||||
|
import pygame
|
||||||
|
import threading
|
||||||
|
import time
|
||||||
|
import serial
|
||||||
|
import binascii
|
||||||
|
import logging
|
||||||
|
import yaml
|
||||||
|
from typing import Dict
|
||||||
|
from Robotic_Arm.rm_robot_interface import *
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
class ServoArm:
|
||||||
|
def __init__(self, config_file="config.yaml"):
|
||||||
|
"""初始化机械臂的串口连接并发送初始数据。
|
||||||
|
|
||||||
|
Args:
|
||||||
|
config_file (str): 配置文件的路径。
|
||||||
|
"""
|
||||||
|
self.config = self._load_config(config_file)
|
||||||
|
self.port = self.config["port"]
|
||||||
|
self.baudrate = self.config["baudrate"]
|
||||||
|
self.hex_data = self.config["hex_data"]
|
||||||
|
self.arm_axis = self.config.get("arm_axis", 7)
|
||||||
|
|
||||||
|
try:
|
||||||
|
self.serial_conn = serial.Serial(self.port, self.baudrate, timeout=0)
|
||||||
|
self.bytes_to_send = binascii.unhexlify(self.hex_data.replace(" ", ""))
|
||||||
|
self.serial_conn.write(self.bytes_to_send)
|
||||||
|
time.sleep(1)
|
||||||
|
self.connected = True
|
||||||
|
logging.info(f"串口连接成功: {self.port}")
|
||||||
|
except Exception as e:
|
||||||
|
logging.error(f"串口连接失败: {e}")
|
||||||
|
self.connected = False
|
||||||
|
|
||||||
|
def _load_config(self, config_file):
|
||||||
|
"""加载配置文件。
|
||||||
|
|
||||||
|
Args:
|
||||||
|
config_file (str): 配置文件的路径。
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
dict: 配置文件内容。
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
with open(config_file, "r") as file:
|
||||||
|
config = yaml.safe_load(file)
|
||||||
|
return config
|
||||||
|
except Exception as e:
|
||||||
|
logging.error(f"配置文件加载失败: {e}")
|
||||||
|
# 返回默认配置
|
||||||
|
return {
|
||||||
|
"port": "/dev/ttyUSB0",
|
||||||
|
"baudrate": 460800,
|
||||||
|
"hex_data": "55 AA 02 00 00 67",
|
||||||
|
"arm_axis": 6
|
||||||
|
}
|
||||||
|
|
||||||
|
def _bytes_to_signed_int(self, byte_data):
|
||||||
|
"""将字节数据转换为有符号整数。
|
||||||
|
|
||||||
|
Args:
|
||||||
|
byte_data (bytes): 字节数据。
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
int: 有符号整数。
|
||||||
|
"""
|
||||||
|
return int.from_bytes(byte_data, byteorder="little", signed=True)
|
||||||
|
|
||||||
|
def _parse_joint_data(self, hex_received):
|
||||||
|
"""解析接收到的十六进制数据并提取关节数据。
|
||||||
|
|
||||||
|
Args:
|
||||||
|
hex_received (str): 接收到的十六进制字符串数据。
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
dict: 解析后的关节数据。
|
||||||
|
"""
|
||||||
|
logging.debug(f"hex_received: {hex_received}")
|
||||||
|
joints = {}
|
||||||
|
for i in range(self.arm_axis):
|
||||||
|
start = 14 + i * 10
|
||||||
|
end = start + 8
|
||||||
|
joint_hex = hex_received[start:end]
|
||||||
|
joint_byte_data = bytearray.fromhex(joint_hex)
|
||||||
|
joint_value = self._bytes_to_signed_int(joint_byte_data) / 10000.0
|
||||||
|
joints[f"joint_{i+1}"] = joint_value
|
||||||
|
grasp_start = 14 + self.arm_axis*10
|
||||||
|
grasp_hex = hex_received[grasp_start:grasp_start+8]
|
||||||
|
grasp_byte_data = bytearray.fromhex(grasp_hex)
|
||||||
|
# 夹爪进行归一化处理
|
||||||
|
grasp_value = self._bytes_to_signed_int(grasp_byte_data)/1000
|
||||||
|
|
||||||
|
joints["grasp"] = grasp_value
|
||||||
|
return joints
|
||||||
|
|
||||||
|
def get_joint_actions(self):
|
||||||
|
"""从串口读取数据并解析关节动作。
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
dict: 包含关节数据的字典。
|
||||||
|
"""
|
||||||
|
if not self.connected:
|
||||||
|
return {}
|
||||||
|
|
||||||
|
try:
|
||||||
|
self.serial_conn.write(self.bytes_to_send)
|
||||||
|
bytes_received = self.serial_conn.read(self.serial_conn.inWaiting())
|
||||||
|
if len(bytes_received) == 0:
|
||||||
|
return {}
|
||||||
|
|
||||||
|
hex_received = binascii.hexlify(bytes_received).decode("utf-8").upper()
|
||||||
|
actions = self._parse_joint_data(hex_received)
|
||||||
|
return actions
|
||||||
|
except Exception as e:
|
||||||
|
logging.error(f"读取串口数据错误: {e}")
|
||||||
|
return {}
|
||||||
|
|
||||||
|
def set_gripper_action(self, action):
|
||||||
|
"""设置夹爪动作。
|
||||||
|
Args:
|
||||||
|
action (int): 夹爪动作值。
|
||||||
|
"""
|
||||||
|
if not self.connected:
|
||||||
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
|
action = int(action * 1000)
|
||||||
|
action_bytes = action.to_bytes(4, byteorder="little", signed=True)
|
||||||
|
self.bytes_to_send = self.bytes_to_send[:74] + action_bytes + self.bytes_to_send[78:]
|
||||||
|
except Exception as e:
|
||||||
|
logging.error(f"设置夹爪动作错误: {e}")
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
"""关闭串口连接"""
|
||||||
|
if self.connected and hasattr(self, 'serial_conn'):
|
||||||
|
self.serial_conn.close()
|
||||||
|
self.connected = False
|
||||||
|
logging.info("串口连接已关闭")
|
||||||
|
|
||||||
|
|
||||||
|
class HybridController:
|
||||||
|
def __init__(self, init_info):
|
||||||
|
# 初始化pygame和手柄
|
||||||
|
pygame.init()
|
||||||
|
pygame.joystick.init()
|
||||||
|
|
||||||
|
# 检查是否有连接的手柄
|
||||||
|
if pygame.joystick.get_count() == 0:
|
||||||
|
raise Exception("未检测到手柄")
|
||||||
|
|
||||||
|
# 初始化手柄
|
||||||
|
self.joystick = pygame.joystick.Joystick(0)
|
||||||
|
self.joystick.init()
|
||||||
|
# 摇杆死区
|
||||||
|
self.deadzone = 0.15
|
||||||
|
# 控制模式: True为关节控制(主模式),False为末端控制
|
||||||
|
self.joint_control_mode = True
|
||||||
|
# 精细控制模式
|
||||||
|
self.fine_control_mode = False
|
||||||
|
|
||||||
|
# 初始化末端姿态和关节角度
|
||||||
|
self.init_joint = init_info['init_joint']
|
||||||
|
self.init_pose = init_info.get('init_pose', [0]*6)
|
||||||
|
self.max_gripper = init_info['max_gripper']
|
||||||
|
self.min_gripper = init_info['min_gripper']
|
||||||
|
servo_config_file = init_info['servo_config_file']
|
||||||
|
self.joint = self.init_joint.copy()
|
||||||
|
self.pose = self.init_pose.copy()
|
||||||
|
self.pose_speeds = [0.0] * 6
|
||||||
|
self.joint_speeds = [0.0] * 6
|
||||||
|
self.tozero = False
|
||||||
|
|
||||||
|
# 主臂关节状态
|
||||||
|
self.master_joint_actions = {}
|
||||||
|
self.use_master_arm = False
|
||||||
|
|
||||||
|
# 末端位姿限制
|
||||||
|
self.pose_limits = [
|
||||||
|
(-0.800, 0.800), # X (m)
|
||||||
|
(-0.800, 0.800), # Y (m)
|
||||||
|
(-0.800, 0.800), # Z (m)
|
||||||
|
(-3.14, 3.14), # RX (rad)
|
||||||
|
(-3.14, 3.14), # RY (rad)
|
||||||
|
(-3.14, 3.14) # RZ (rad)
|
||||||
|
]
|
||||||
|
|
||||||
|
# 关节角度限制 (度)
|
||||||
|
self.joint_limits = [
|
||||||
|
(-180, 180), # joint 1
|
||||||
|
(-180, 180), # joint 2
|
||||||
|
(-180, 180), # joint 3
|
||||||
|
(-180, 180), # joint 4
|
||||||
|
(-180, 180), # joint 5
|
||||||
|
(-180, 180) # joint 6
|
||||||
|
]
|
||||||
|
|
||||||
|
# 控制参数
|
||||||
|
self.linear_step = 0.005 # 线性移动步长(m)
|
||||||
|
self.angular_step = 0.05 # 角度步长(rad)
|
||||||
|
|
||||||
|
# 夹爪状态和速度
|
||||||
|
self.gripper_speed = 10
|
||||||
|
self.gripper = self.min_gripper
|
||||||
|
|
||||||
|
# 初始化串口通信(主臂关节状态获取)
|
||||||
|
self.servo_arm = None
|
||||||
|
if servo_config_file:
|
||||||
|
try:
|
||||||
|
self.servo_arm = ServoArm(servo_config_file)
|
||||||
|
self.use_master_arm = True
|
||||||
|
logging.info("串口主臂连接成功,启用主从控制模式")
|
||||||
|
except Exception as e:
|
||||||
|
logging.error(f"串口主臂连接失败: {e}")
|
||||||
|
self.use_master_arm = False
|
||||||
|
|
||||||
|
# 启动更新线程
|
||||||
|
self.running = True
|
||||||
|
self.thread = threading.Thread(target=self.update_controller)
|
||||||
|
self.thread.start()
|
||||||
|
|
||||||
|
print("混合控制器已启动")
|
||||||
|
print("主控制模式: 关节控制")
|
||||||
|
if self.use_master_arm:
|
||||||
|
print("主从控制: 启用")
|
||||||
|
print("Back按钮: 切换控制模式(关节/末端)")
|
||||||
|
print("L3按钮: 切换精细控制模式")
|
||||||
|
print("Start按钮: 重置到初始位置")
|
||||||
|
|
||||||
|
def _apply_nonlinear_mapping(self, value):
|
||||||
|
"""应用非线性映射以提高控制精度"""
|
||||||
|
sign = 1 if value >= 0 else -1
|
||||||
|
return sign * (abs(value) ** 2)
|
||||||
|
|
||||||
|
def _normalize_angle(self, angle):
|
||||||
|
"""将角度归一化到[-π, π]范围内"""
|
||||||
|
import math
|
||||||
|
while angle > math.pi:
|
||||||
|
angle -= 2 * math.pi
|
||||||
|
while angle < -math.pi:
|
||||||
|
angle += 2 * math.pi
|
||||||
|
return angle
|
||||||
|
|
||||||
|
def update_controller(self):
|
||||||
|
while self.running:
|
||||||
|
try:
|
||||||
|
pygame.event.pump()
|
||||||
|
except Exception as e:
|
||||||
|
print(f"控制器错误: {e}")
|
||||||
|
self.stop()
|
||||||
|
continue
|
||||||
|
|
||||||
|
# 检查控制模式切换 (Back按钮)
|
||||||
|
if self.joystick.get_button(6): # Back按钮
|
||||||
|
self.joint_control_mode = not self.joint_control_mode
|
||||||
|
mode_str = "关节控制" if self.joint_control_mode else "末端位姿控制"
|
||||||
|
print(f"切换到{mode_str}模式")
|
||||||
|
time.sleep(0.3) # 防止多次触发
|
||||||
|
|
||||||
|
# 检查精细控制模式切换 (L3按钮)
|
||||||
|
if self.joystick.get_button(10): # L3按钮
|
||||||
|
self.fine_control_mode = not self.fine_control_mode
|
||||||
|
print(f"切换到{'精细' if self.fine_control_mode else '普通'}控制模式")
|
||||||
|
time.sleep(0.3) # 防止多次触发
|
||||||
|
|
||||||
|
# 检查重置按钮 (Start按钮)
|
||||||
|
if self.joystick.get_button(7): # Start按钮
|
||||||
|
print("重置机械臂到初始位置...")
|
||||||
|
# print("init_joint", self.init_joint.copy())
|
||||||
|
self.tozero = True
|
||||||
|
self.joint = self.init_joint.copy()
|
||||||
|
self.pose = self.init_pose.copy()
|
||||||
|
self.pose_speeds = [0.0] * 6
|
||||||
|
self.joint_speeds = [0.0] * 6
|
||||||
|
self.gripper_speed = 10
|
||||||
|
self.gripper = self.min_gripper
|
||||||
|
print("机械臂已重置到初始位置")
|
||||||
|
time.sleep(0.3) # 防止多次触发
|
||||||
|
|
||||||
|
# 从串口获取主臂关节状态
|
||||||
|
if self.servo_arm and self.servo_arm.connected:
|
||||||
|
try:
|
||||||
|
self.master_joint_actions = self.servo_arm.get_joint_actions()
|
||||||
|
if self.master_joint_actions:
|
||||||
|
logging.debug(f"主臂关节状态: {self.master_joint_actions}")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logging.error(f"获取主臂状态错误: {e}")
|
||||||
|
self.master_joint_actions = {}
|
||||||
|
# print(self.master_joint_actions)
|
||||||
|
|
||||||
|
# 根据控制模式更新相应的控制逻辑
|
||||||
|
if self.joint_control_mode:
|
||||||
|
# 关节控制模式下,优先使用主臂数据,Xbox作为辅助
|
||||||
|
self.update_joint_control()
|
||||||
|
else:
|
||||||
|
# 末端控制模式,使用Xbox控制
|
||||||
|
self.update_end_pose()
|
||||||
|
time.sleep(0.02)
|
||||||
|
# print('gripper:', self.gripper)
|
||||||
|
|
||||||
|
def update_joint_control(self):
|
||||||
|
"""更新关节角度控制 - 优先使用主臂数据"""
|
||||||
|
if self.use_master_arm and self.master_joint_actions:
|
||||||
|
# 主从控制模式:直接使用主臂的关节角度
|
||||||
|
try:
|
||||||
|
# 将主臂关节角度映射到从臂
|
||||||
|
for i in range(6): # 假设只有6个关节需要控制
|
||||||
|
joint_key = f"joint_{i+1}"
|
||||||
|
if joint_key in self.master_joint_actions:
|
||||||
|
# 直接使用主臂的关节角度(已经是度数)
|
||||||
|
self.joint[i] = self.master_joint_actions[joint_key]
|
||||||
|
|
||||||
|
# 应用关节限制
|
||||||
|
min_val, max_val = self.joint_limits[i]
|
||||||
|
self.joint[i] = max(min_val, min(max_val, self.joint[i]))
|
||||||
|
|
||||||
|
# print(self.joint)
|
||||||
|
logging.debug(f"主臂关节映射到从臂: {self.joint[:6]}")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logging.error(f"主臂数据映射错误: {e}")
|
||||||
|
|
||||||
|
# 如果有主臂夹爪数据,使用主臂夹爪状态
|
||||||
|
if self.use_master_arm and "grasp" in self.master_joint_actions:
|
||||||
|
self.gripper = self.master_joint_actions["grasp"] * 1000
|
||||||
|
self.joint[-1] = self.gripper
|
||||||
|
|
||||||
|
|
||||||
|
def update_end_pose(self):
|
||||||
|
"""更新末端位姿控制"""
|
||||||
|
# 根据控制模式调整步长
|
||||||
|
current_linear_step = self.linear_step * (0.1 if self.fine_control_mode else 1.0)
|
||||||
|
current_angular_step = self.angular_step * (0.1 if self.fine_control_mode else 1.0)
|
||||||
|
|
||||||
|
# 方向键控制XY
|
||||||
|
hat = self.joystick.get_hat(0)
|
||||||
|
hat_up = hat[1] == 1 # Y+
|
||||||
|
hat_down = hat[1] == -1 # Y-
|
||||||
|
hat_left = hat[0] == -1 # X-
|
||||||
|
hat_right = hat[0] == 1 # X+
|
||||||
|
|
||||||
|
# 右摇杆控制Z
|
||||||
|
right_y_raw = -self.joystick.get_axis(4)
|
||||||
|
# 左摇杆控制RZ
|
||||||
|
left_y_raw = -self.joystick.get_axis(1)
|
||||||
|
|
||||||
|
# 应用死区
|
||||||
|
right_y = 0.0 if abs(right_y_raw) < self.deadzone else right_y_raw
|
||||||
|
left_y = 0.0 if abs(left_y_raw) < self.deadzone else left_y_raw
|
||||||
|
|
||||||
|
# 计算各轴速度
|
||||||
|
self.pose_speeds[1] = current_linear_step if hat_up else (-current_linear_step if hat_down else 0.0) # Y
|
||||||
|
self.pose_speeds[0] = -current_linear_step if hat_left else (current_linear_step if hat_right else 0.0) # X
|
||||||
|
|
||||||
|
# 设置Z速度(右摇杆Y轴控制)
|
||||||
|
z_mapping = self._apply_nonlinear_mapping(right_y)
|
||||||
|
self.pose_speeds[2] = z_mapping * current_linear_step # Z
|
||||||
|
|
||||||
|
# L1/R1控制RX旋转
|
||||||
|
LB = self.joystick.get_button(4) # RX-
|
||||||
|
RB = self.joystick.get_button(5) # RX+
|
||||||
|
self.pose_speeds[3] = (-current_angular_step if LB else (current_angular_step if RB else 0.0))
|
||||||
|
|
||||||
|
# △/□控制RY旋转
|
||||||
|
triangle = self.joystick.get_button(2) # RY+
|
||||||
|
square = self.joystick.get_button(3) # RY-
|
||||||
|
self.pose_speeds[4] = (current_angular_step if triangle else (-current_angular_step if square else 0.0))
|
||||||
|
|
||||||
|
# 左摇杆Y轴控制RZ旋转
|
||||||
|
rz_mapping = self._apply_nonlinear_mapping(left_y)
|
||||||
|
self.pose_speeds[5] = rz_mapping * current_angular_step * 2 # RZ
|
||||||
|
|
||||||
|
# 夹爪控制(圈/叉)
|
||||||
|
circle = self.joystick.get_button(1) # 夹爪开
|
||||||
|
cross = self.joystick.get_button(0) # 夹爪关
|
||||||
|
if circle:
|
||||||
|
self.gripper = min(self.max_gripper, self.gripper + self.gripper_speed)
|
||||||
|
elif cross:
|
||||||
|
self.gripper = max(self.min_gripper, self.gripper - self.gripper_speed)
|
||||||
|
|
||||||
|
# 更新末端位姿
|
||||||
|
for i in range(6):
|
||||||
|
self.pose[i] += self.pose_speeds[i]
|
||||||
|
|
||||||
|
# 角度归一化处理
|
||||||
|
for i in range(3, 6):
|
||||||
|
self.pose[i] = self._normalize_angle(self.pose[i])
|
||||||
|
|
||||||
|
def update_joint_state(self, joint):
|
||||||
|
self.joint = joint
|
||||||
|
# self.tozero = False
|
||||||
|
|
||||||
|
def update_endpose_state(self, end_pose):
|
||||||
|
self.pose = end_pose
|
||||||
|
# self.tozero = False
|
||||||
|
|
||||||
|
def update_tozero_state(self, tozero):
|
||||||
|
self.tozero = tozero
|
||||||
|
|
||||||
|
# def update_state(self, state):
|
||||||
|
# """更新状态信息(从机械臂获取当前状态)"""
|
||||||
|
# self.pose = state['end_pose']
|
||||||
|
# self.joint = state['joint']
|
||||||
|
# self.gripper = state['gripper']
|
||||||
|
|
||||||
|
def get_action(self) -> Dict:
|
||||||
|
"""获取当前控制命令"""
|
||||||
|
return {
|
||||||
|
'control_mode': 'joint' if self.joint_control_mode else 'end_pose',
|
||||||
|
'use_master_arm': self.use_master_arm,
|
||||||
|
'master_joint_actions': self.master_joint_actions,
|
||||||
|
'end_pose': {
|
||||||
|
'X': self.pose[0],
|
||||||
|
'Y': self.pose[1],
|
||||||
|
'Z': self.pose[2],
|
||||||
|
'RX': self.pose[3],
|
||||||
|
'RY': self.pose[4],
|
||||||
|
'RZ': self.pose[5],
|
||||||
|
},
|
||||||
|
'joint_angles': self.joint,
|
||||||
|
'gripper': int(self.gripper),
|
||||||
|
'tozero': self.tozero
|
||||||
|
}
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
"""停止控制器"""
|
||||||
|
self.running = False
|
||||||
|
if self.thread.is_alive():
|
||||||
|
self.thread.join()
|
||||||
|
if self.servo_arm:
|
||||||
|
self.servo_arm.close()
|
||||||
|
pygame.quit()
|
||||||
|
print("混合控制器已退出")
|
||||||
|
|
||||||
|
def reset(self):
|
||||||
|
"""重置到初始状态"""
|
||||||
|
self.joint = self.init_joint.copy()
|
||||||
|
self.pose = self.init_pose.copy()
|
||||||
|
self.pose_speeds = [0.0] * 6
|
||||||
|
self.joint_speeds = [0.0] * 6
|
||||||
|
self.gripper_speed = 10
|
||||||
|
self.gripper = self.min_gripper
|
||||||
|
print("已重置到初始状态")
|
||||||
|
|
||||||
|
|
||||||
|
# 使用示例
|
||||||
|
if __name__ == "__main__":
|
||||||
|
# 初始化睿尔曼机械臂
|
||||||
|
arm = RoboticArm(rm_thread_mode_e.RM_TRIPLE_MODE_E)
|
||||||
|
# 创建机械臂连接
|
||||||
|
handle = arm.rm_create_robot_arm("192.168.3.18", 8080)
|
||||||
|
print(f"机械臂连接ID: {handle.id}")
|
||||||
|
init_pose = arm.rm_get_current_arm_state()[1]['pose']
|
||||||
|
|
||||||
|
with open('/home/maic/LYT/lerobot/lerobot/common/robot_devices/teleop/realman_mix.yaml', "r") as file:
|
||||||
|
config = yaml.safe_load(file)
|
||||||
|
config['init_pose'] = init_pose
|
||||||
|
arm_controller = HybridController(config)
|
||||||
|
try:
|
||||||
|
while True:
|
||||||
|
print(arm_controller.get_action())
|
||||||
|
time.sleep(0.1)
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
arm_controller.stop()
|
||||||
4
lerobot/common/robot_devices/teleop/realman_mix.yaml
Normal file
4
lerobot/common/robot_devices/teleop/realman_mix.yaml
Normal file
@@ -0,0 +1,4 @@
|
|||||||
|
init_joint: [-90, 90, 90, -90, -90, 90]
|
||||||
|
max_gripper: 990
|
||||||
|
min_gripper: 10
|
||||||
|
servo_config_file: "/home/maic/LYT/lerobot/lerobot/common/robot_devices/teleop/servo_arm.yaml"
|
||||||
5
lerobot/common/robot_devices/teleop/servo_arm.yaml
Normal file
5
lerobot/common/robot_devices/teleop/servo_arm.yaml
Normal file
@@ -0,0 +1,5 @@
|
|||||||
|
port: /dev/ttyUSB0
|
||||||
|
right_port: /dev/ttyUSB1
|
||||||
|
baudrate: 460800
|
||||||
|
hex_data: "55 AA 02 00 00 67"
|
||||||
|
arm_axis: 6
|
||||||
@@ -273,7 +273,6 @@ def record(
|
|||||||
|
|
||||||
# Load pretrained policy
|
# Load pretrained policy
|
||||||
policy = None if cfg.policy is None else make_policy(cfg.policy, ds_meta=dataset.meta)
|
policy = None if cfg.policy is None else make_policy(cfg.policy, ds_meta=dataset.meta)
|
||||||
|
|
||||||
if not robot.is_connected:
|
if not robot.is_connected:
|
||||||
robot.connect()
|
robot.connect()
|
||||||
|
|
||||||
@@ -290,6 +289,9 @@ def record(
|
|||||||
if has_method(robot, "teleop_safety_stop"):
|
if has_method(robot, "teleop_safety_stop"):
|
||||||
robot.teleop_safety_stop()
|
robot.teleop_safety_stop()
|
||||||
|
|
||||||
|
# import pdb
|
||||||
|
# pdb.set_trace()
|
||||||
|
|
||||||
recorded_episodes = 0
|
recorded_episodes = 0
|
||||||
while True:
|
while True:
|
||||||
if recorded_episodes >= cfg.num_episodes:
|
if recorded_episodes >= cfg.num_episodes:
|
||||||
|
|||||||
0
realman_src/realsense_test.py
Normal file
0
realman_src/realsense_test.py
Normal file
@@ -18,6 +18,8 @@ else:
|
|||||||
|
|
||||||
print("Left: ", robot.rm_get_current_arm_state())
|
print("Left: ", robot.rm_get_current_arm_state())
|
||||||
print("Left: ", robot.rm_get_arm_all_state())
|
print("Left: ", robot.rm_get_arm_all_state())
|
||||||
|
robot.rm_set_gripper_position(200, True, 2)
|
||||||
|
print("Gripper: ", robot.rm_get_gripper_state())
|
||||||
|
|
||||||
# 断开所有连接,销毁线程
|
# 断开所有连接,销毁线程
|
||||||
RoboticArm.rm_destory()
|
RoboticArm.rm_destory()
|
||||||
Reference in New Issue
Block a user