Compare commits
4 Commits
main
...
endposectr
| Author | SHA1 | Date | |
|---|---|---|---|
| 2bcbddbfb6 | |||
| ac85a0d17e | |||
| 5bf422ca6f | |||
| 676ab74521 |
@@ -125,3 +125,4 @@ python lerobot/scripts/control_robot.py \
|
|||||||
--control.policy.path=outputs/train/act_koch_pick_place_lego/checkpoints/latest/pretrained_model
|
--control.policy.path=outputs/train/act_koch_pick_place_lego/checkpoints/latest/pretrained_model
|
||||||
```
|
```
|
||||||
|
|
||||||
|
export LD_LIBRARY_PATH=/home/ubuntu/miniconda3/envs/piper_lerobot/lib:$LD_LIBRARY_PATH
|
||||||
@@ -121,36 +121,39 @@ def init_keyboard_listener():
|
|||||||
events["exit_early"] = False
|
events["exit_early"] = False
|
||||||
events["rerecord_episode"] = False
|
events["rerecord_episode"] = False
|
||||||
events["stop_recording"] = False
|
events["stop_recording"] = False
|
||||||
|
events["confirm_save"] = False
|
||||||
|
events["discard_episode"] = False
|
||||||
if is_headless():
|
if is_headless():
|
||||||
logging.warning(
|
logging.warning(
|
||||||
"Headless environment detected. On-screen cameras display and keyboard inputs will not be available."
|
"Headless environment detected. On-screen cameras display and keyboard inputs will not be available."
|
||||||
)
|
)
|
||||||
listener = None
|
listener = None
|
||||||
return listener, events
|
return listener, events
|
||||||
|
|
||||||
# Only import pynput if not in a headless environment
|
# Only import pynput if not in a headless environment
|
||||||
from pynput import keyboard
|
from pynput import keyboard
|
||||||
|
|
||||||
def on_press(key):
|
def on_press(key):
|
||||||
try:
|
try:
|
||||||
if key == keyboard.Key.right:
|
if key == keyboard.Key.right:
|
||||||
print("Right arrow key pressed. Exiting loop...")
|
print("Right arrow key pressed. Exiting current recording...")
|
||||||
events["exit_early"] = True
|
events["exit_early"] = True
|
||||||
elif key == keyboard.Key.left:
|
elif key == keyboard.Key.left:
|
||||||
print("Left arrow key pressed. Exiting loop and rerecord the last episode...")
|
print("Left arrow key pressed. Interrupting and preparing to rerecord...")
|
||||||
events["rerecord_episode"] = True
|
events["rerecord_episode"] = True
|
||||||
events["exit_early"] = True
|
events["exit_early"] = True
|
||||||
elif key == keyboard.Key.esc:
|
elif key == keyboard.Key.esc:
|
||||||
print("Escape key pressed. Stopping data recording...")
|
print("Escape key pressed. Stopping data recording session...")
|
||||||
events["stop_recording"] = True
|
events["stop_recording"] = True
|
||||||
events["exit_early"] = True
|
events["exit_early"] = True
|
||||||
|
elif key == keyboard.Key.enter:
|
||||||
|
print("Enter key pressed. Confirming and saving current episode...")
|
||||||
|
events["confirm_save"] = True
|
||||||
|
elif key == keyboard.Key.backspace:
|
||||||
|
print("Back key pressed. Discarding completed episode...")
|
||||||
|
events["discard_episode"] = True
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"Error handling key press: {e}")
|
print(f"Error handling key press: {e}")
|
||||||
|
|
||||||
listener = keyboard.Listener(on_press=on_press)
|
listener = keyboard.Listener(on_press=on_press)
|
||||||
listener.start()
|
listener.start()
|
||||||
|
|
||||||
return listener, events
|
return listener, events
|
||||||
|
|
||||||
|
|
||||||
@@ -257,10 +260,11 @@ def control_loop(
|
|||||||
dataset.add_frame(frame)
|
dataset.add_frame(frame)
|
||||||
|
|
||||||
if display_cameras and not is_headless():
|
if display_cameras and not is_headless():
|
||||||
image_keys = [key for key in observation if "image" in key]
|
# image_keys = [key for key in observation if "image" in key]
|
||||||
for key in image_keys:
|
# for key in image_keys:
|
||||||
cv2.imshow(key, cv2.cvtColor(observation[key].numpy(), cv2.COLOR_RGB2BGR))
|
# cv2.imshow(key, cv2.cvtColor(observation[key].numpy(), cv2.COLOR_RGB2BGR))
|
||||||
cv2.waitKey(1)
|
# cv2.waitKey(1)
|
||||||
|
display_observations_combined(observation)
|
||||||
|
|
||||||
if fps is not None:
|
if fps is not None:
|
||||||
dt_s = time.perf_counter() - start_loop_t
|
dt_s = time.perf_counter() - start_loop_t
|
||||||
@@ -274,6 +278,107 @@ def control_loop(
|
|||||||
events["exit_early"] = False
|
events["exit_early"] = False
|
||||||
break
|
break
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
def display_observations_combined(observation, display_cameras=True):
|
||||||
|
"""将摄像头画面组合在一个窗口中显示,优化以防止抖动"""
|
||||||
|
if display_cameras and not is_headless():
|
||||||
|
image_keys = [key for key in observation if "image" in key]
|
||||||
|
if not image_keys:
|
||||||
|
return
|
||||||
|
|
||||||
|
# 获取所有图像并转换为BGR格式
|
||||||
|
images = []
|
||||||
|
for key in image_keys:
|
||||||
|
img = observation[key].numpy()
|
||||||
|
# 确保每个图像尺寸为640×480
|
||||||
|
if img.shape[0] != 480 or img.shape[1] != 640:
|
||||||
|
img = cv2.resize(img, (640, 480))
|
||||||
|
img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)
|
||||||
|
# 添加图像标题
|
||||||
|
cv2.putText(img, key, (10, 30),
|
||||||
|
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
|
||||||
|
images.append(img)
|
||||||
|
|
||||||
|
# 根据实际摄像头数量确定布局
|
||||||
|
num_cameras = len(images)
|
||||||
|
|
||||||
|
# 为固定数量的摄像头定义固定布局
|
||||||
|
margin = 10 # 图像之间的间距
|
||||||
|
|
||||||
|
if num_cameras == 1:
|
||||||
|
# 单个摄像头 - 直接显示
|
||||||
|
grid_image = images[0]
|
||||||
|
elif num_cameras == 2:
|
||||||
|
# 两个摄像头 - 水平排列
|
||||||
|
grid_width = 2 * 640 + margin
|
||||||
|
grid_height = 480
|
||||||
|
grid_image = np.zeros((grid_height, grid_width, 3), dtype=np.uint8)
|
||||||
|
|
||||||
|
# 放置两个图像
|
||||||
|
grid_image[0:480, 0:640] = images[0]
|
||||||
|
grid_image[0:480, 640+margin:] = images[1]
|
||||||
|
elif num_cameras == 3:
|
||||||
|
# 三个摄像头 - 上面一行2个,下面一个
|
||||||
|
grid_width = 2 * 640 + margin
|
||||||
|
grid_height = 2 * 480 + margin
|
||||||
|
grid_image = np.zeros((grid_height, grid_width, 3), dtype=np.uint8)
|
||||||
|
|
||||||
|
# 放置三个图像
|
||||||
|
grid_image[0:480, 0:640] = images[0]
|
||||||
|
grid_image[0:480, 640+margin:] = images[1]
|
||||||
|
grid_image[480+margin:, (grid_width-640)//2:(grid_width+640)//2] = images[2] # 居中放置
|
||||||
|
else:
|
||||||
|
# 四个或更多摄像头 - 2×2网格
|
||||||
|
grid_cols = 2
|
||||||
|
grid_rows = 2
|
||||||
|
grid_width = grid_cols * 640 + (grid_cols - 1) * margin
|
||||||
|
grid_height = grid_rows * 480 + (grid_rows - 1) * margin
|
||||||
|
grid_image = np.zeros((grid_height, grid_width, 3), dtype=np.uint8)
|
||||||
|
|
||||||
|
# 最多显示4个
|
||||||
|
for i, img in enumerate(images[:4]):
|
||||||
|
row = i // grid_cols
|
||||||
|
col = i % grid_cols
|
||||||
|
|
||||||
|
y_start = row * (480 + margin)
|
||||||
|
y_end = y_start + 480
|
||||||
|
x_start = col * (640 + margin)
|
||||||
|
x_end = x_start + 640
|
||||||
|
|
||||||
|
grid_image[y_start:y_end, x_start:x_end] = img
|
||||||
|
|
||||||
|
# 创建一个固定名称的窗口
|
||||||
|
window_name = "Camera Views"
|
||||||
|
|
||||||
|
# 使用 getWindowProperty 检查窗口是否已经存在
|
||||||
|
try:
|
||||||
|
# 尝试获取窗口属性 - 如果窗口不存在会抛出异常
|
||||||
|
window_exists = cv2.getWindowProperty(window_name, cv2.WND_PROP_VISIBLE) >= 0
|
||||||
|
except:
|
||||||
|
window_exists = False
|
||||||
|
|
||||||
|
# 如果窗口不存在,创建并定位它
|
||||||
|
if not window_exists:
|
||||||
|
cv2.namedWindow(window_name, cv2.WINDOW_NORMAL)
|
||||||
|
|
||||||
|
# 设置窗口位置和大小(只需要在创建时设置一次)
|
||||||
|
screen_width = 1920 # 您的屏幕宽度
|
||||||
|
screen_height = 1080 # 您的屏幕高度
|
||||||
|
|
||||||
|
# 根据实际图像计算合适的显示尺寸
|
||||||
|
scale_factor = min(screen_width / grid_width, screen_height / grid_height) * 0.95
|
||||||
|
display_width = int(grid_width * scale_factor)
|
||||||
|
display_height = int(grid_height * scale_factor)
|
||||||
|
|
||||||
|
cv2.resizeWindow(window_name, display_width, display_height)
|
||||||
|
cv2.moveWindow(window_name, (screen_width - display_width) // 2,
|
||||||
|
(screen_height - display_height) // 2) # 居中显示
|
||||||
|
|
||||||
|
# 显示图像
|
||||||
|
cv2.imshow(window_name, grid_image)
|
||||||
|
cv2.waitKey(1)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def reset_environment(robot, events, reset_time_s, fps):
|
def reset_environment(robot, events, reset_time_s, fps):
|
||||||
# TODO(rcadene): refactor warmup_record and reset_environment
|
# TODO(rcadene): refactor warmup_record and reset_environment
|
||||||
|
|||||||
@@ -91,9 +91,9 @@ class PiperMotorsBus:
|
|||||||
"""
|
"""
|
||||||
移动到初始位置
|
移动到初始位置
|
||||||
"""
|
"""
|
||||||
self.write(target_joint=self.init_joint_position)
|
self.write("joints", target_joint=self.init_joint_position)
|
||||||
|
|
||||||
def write(self, target_joint:list):
|
def write(self, control_mode:str, target_joint:list):
|
||||||
"""
|
"""
|
||||||
Joint control
|
Joint control
|
||||||
- target joint: in radians
|
- target joint: in radians
|
||||||
@@ -105,21 +105,49 @@ class PiperMotorsBus:
|
|||||||
joint_6 (float): 关节6角度 -90000~90000 / 57324.840764
|
joint_6 (float): 关节6角度 -90000~90000 / 57324.840764
|
||||||
gripper_range: 夹爪角度 0~0.08
|
gripper_range: 夹爪角度 0~0.08
|
||||||
"""
|
"""
|
||||||
joint_0 = round(target_joint[0]*self.joint_factor)
|
if control_mode == "end_pose":
|
||||||
joint_1 = round(target_joint[1]*self.joint_factor)
|
# 末端位姿控制
|
||||||
joint_2 = round(target_joint[2]*self.joint_factor)
|
factor = 1000
|
||||||
joint_3 = round(target_joint[3]*self.joint_factor)
|
X = round(target_joint[0]*factor*factor)
|
||||||
joint_4 = round(target_joint[4]*self.joint_factor)
|
Y = round(target_joint[1]*factor*factor)
|
||||||
joint_5 = round(target_joint[5]*self.joint_factor)
|
Z = round(target_joint[2]*factor*factor)
|
||||||
|
RX = round(target_joint[3]*factor)
|
||||||
|
RY = round(target_joint[4]*factor)
|
||||||
|
RZ = round(target_joint[5]*factor)
|
||||||
|
|
||||||
|
self.piper.MotionCtrl_2(0x01, 0x00, 100, 0x00)
|
||||||
|
self.piper.EndPoseCtrl(X, Y, Z, RX, RY, RZ)
|
||||||
|
else:
|
||||||
|
# 关节控制
|
||||||
|
joint_0 = round(target_joint[0]*self.joint_factor)
|
||||||
|
joint_1 = round(target_joint[1]*self.joint_factor)
|
||||||
|
joint_2 = round(target_joint[2]*self.joint_factor)
|
||||||
|
joint_3 = round(target_joint[3]*self.joint_factor)
|
||||||
|
joint_4 = round(target_joint[4]*self.joint_factor)
|
||||||
|
joint_5 = round(target_joint[5]*self.joint_factor)
|
||||||
|
|
||||||
|
self.piper.MotionCtrl_2(0x01, 0x01, 100, 0x00) # joint control # 0x00 标准模式
|
||||||
|
# self.piper.MotionCtrl_2(0x01, 0x01, 10, 0xAD) # joint control # mit模式
|
||||||
|
self.piper.JointCtrl(joint_0, joint_1, joint_2, joint_3, joint_4, joint_5)
|
||||||
|
|
||||||
|
# 夹爪控制
|
||||||
gripper_range = round(target_joint[6]*1000*1000)
|
gripper_range = round(target_joint[6]*1000*1000)
|
||||||
|
|
||||||
self.piper.MotionCtrl_2(0x01, 0x01, 100, 0x00) # joint control # 0x00 标准模式
|
|
||||||
# self.piper.MotionCtrl_2(0x01, 0x01, 10, 0xAD) # joint control # mit模式
|
|
||||||
|
|
||||||
self.piper.JointCtrl(joint_0, joint_1, joint_2, joint_3, joint_4, joint_5)
|
|
||||||
self.piper.GripperCtrl(abs(gripper_range), 1000, 0x01, 0) # 单位 0.001°
|
self.piper.GripperCtrl(abs(gripper_range), 1000, 0x01, 0) # 单位 0.001°
|
||||||
|
|
||||||
|
|
||||||
|
# def write_endpose(self, target_pose: list):
|
||||||
|
# X = round(target_pose[0] * 1000 * 1000) # m -> mm -> 0.001mm
|
||||||
|
# Y = round(target_pose[1] * 1000 * 1000)
|
||||||
|
# Z = round(target_pose[2] * 1000 * 1000)
|
||||||
|
# RX = round(target_pose[3] * self.joint_factor) # rad -> 0.001deg
|
||||||
|
# RY = round(target_pose[4] * self.joint_factor)
|
||||||
|
# RZ = round(target_pose[5] * self.joint_factor)
|
||||||
|
# gripper_range = round(target_pose[6]*1000*1000)
|
||||||
|
|
||||||
|
# self.piper.EndPoseCtrl(X, Y, Z, RX, RY, RZ)
|
||||||
|
# self.piper.GripperCtrl(abs(gripper_range), 1000, 0x01, 0) # 单位 0.001°
|
||||||
|
|
||||||
|
|
||||||
def read(self) -> Dict:
|
def read(self) -> Dict:
|
||||||
"""
|
"""
|
||||||
- 机械臂关节消息,单位0.001度
|
- 机械臂关节消息,单位0.001度
|
||||||
@@ -132,17 +160,17 @@ class PiperMotorsBus:
|
|||||||
gripper_state = gripper_msg.gripper_state
|
gripper_state = gripper_msg.gripper_state
|
||||||
|
|
||||||
return {
|
return {
|
||||||
"joint_1": joint_state.joint_1,
|
"joint_1": joint_state.joint_1 / self.joint_factor,
|
||||||
"joint_2": joint_state.joint_2,
|
"joint_2": joint_state.joint_2 / self.joint_factor,
|
||||||
"joint_3": joint_state.joint_3,
|
"joint_3": joint_state.joint_3 / self.joint_factor,
|
||||||
"joint_4": joint_state.joint_4,
|
"joint_4": joint_state.joint_4 / self.joint_factor,
|
||||||
"joint_5": joint_state.joint_5,
|
"joint_5": joint_state.joint_5 / self.joint_factor,
|
||||||
"joint_6": joint_state.joint_6,
|
"joint_6": joint_state.joint_6 / self.joint_factor,
|
||||||
"gripper": gripper_state.grippers_angle
|
"gripper": gripper_state.grippers_angle / 1000 / 1000
|
||||||
}
|
}
|
||||||
|
|
||||||
def safe_disconnect(self):
|
def safe_disconnect(self):
|
||||||
"""
|
"""
|
||||||
Move to safe disconnect position
|
Move to safe disconnect position
|
||||||
"""
|
"""
|
||||||
self.write(target_joint=self.safe_disable_position)
|
self.write("joints", target_joint=self.safe_disable_position)
|
||||||
@@ -7,7 +7,7 @@ import torch
|
|||||||
import numpy as np
|
import numpy as np
|
||||||
from dataclasses import dataclass, field, replace
|
from dataclasses import dataclass, field, replace
|
||||||
|
|
||||||
from lerobot.common.robot_devices.teleop.gamepad import SixAxisArmController
|
from lerobot.common.robot_devices.teleop.gamepad import SixAxisArmController, UnifiedArmController
|
||||||
from lerobot.common.robot_devices.motors.utils import get_motor_names, make_motors_buses_from_configs
|
from lerobot.common.robot_devices.motors.utils import get_motor_names, make_motors_buses_from_configs
|
||||||
from lerobot.common.robot_devices.cameras.utils import make_cameras_from_configs
|
from lerobot.common.robot_devices.cameras.utils import make_cameras_from_configs
|
||||||
from lerobot.common.robot_devices.utils import RobotDeviceAlreadyConnectedError, RobotDeviceNotConnectedError
|
from lerobot.common.robot_devices.utils import RobotDeviceAlreadyConnectedError, RobotDeviceNotConnectedError
|
||||||
@@ -31,7 +31,8 @@ class PiperRobot:
|
|||||||
|
|
||||||
# build gamepad teleop
|
# build gamepad teleop
|
||||||
if not self.inference_time:
|
if not self.inference_time:
|
||||||
self.teleop = SixAxisArmController()
|
# self.teleop = SixAxisArmController()
|
||||||
|
self.teleop = UnifiedArmController()
|
||||||
else:
|
else:
|
||||||
self.teleop = None
|
self.teleop = None
|
||||||
|
|
||||||
@@ -138,24 +139,31 @@ class PiperRobot:
|
|||||||
raise ConnectionError()
|
raise ConnectionError()
|
||||||
|
|
||||||
if self.teleop is None and self.inference_time:
|
if self.teleop is None and self.inference_time:
|
||||||
self.teleop = SixAxisArmController()
|
# self.teleop = SixAxisArmController()
|
||||||
|
self.teleop = UnifiedArmController()
|
||||||
|
|
||||||
# read target pose state as
|
# read target pose state as
|
||||||
before_read_t = time.perf_counter()
|
before_read_t = time.perf_counter()
|
||||||
state = self.arm.read() # read current joint position from robot
|
state = self.arm.read() # read current joint position from robot
|
||||||
action = self.teleop.get_action() # target joint position from gamepad
|
action = self.teleop.get_action() # target joint position/ end pose from gamepad
|
||||||
|
control_mode = self.teleop.get_control_mode()
|
||||||
self.logs["read_pos_dt_s"] = time.perf_counter() - before_read_t
|
self.logs["read_pos_dt_s"] = time.perf_counter() - before_read_t
|
||||||
|
|
||||||
# do action
|
# do action
|
||||||
before_write_t = time.perf_counter()
|
before_write_t = time.perf_counter()
|
||||||
target_joints = list(action.values())
|
target_joints = list(action.values())
|
||||||
self.arm.write(target_joints)
|
self.arm.write(control_mode, target_joints) # execute joint pos / end pose
|
||||||
|
# update teleop state
|
||||||
|
new_end_pose = self.arm.piper.GetArmEndPoseMsgs().end_pose
|
||||||
|
new_joint_state = self.arm.piper.GetArmJointMsgs().joint_state
|
||||||
|
self.teleop.update_state(control_mode, new_end_pose, new_joint_state) # new arm state update teleop state
|
||||||
self.logs["write_pos_dt_s"] = time.perf_counter() - before_write_t
|
self.logs["write_pos_dt_s"] = time.perf_counter() - before_write_t
|
||||||
|
|
||||||
if not record_data:
|
if not record_data:
|
||||||
return
|
return
|
||||||
|
|
||||||
state = torch.as_tensor(list(state.values()), dtype=torch.float32)
|
state = torch.as_tensor(list(state.values()), dtype=torch.float32)
|
||||||
|
action = self.teleop.get_joints() # 取最新的关节状态, 取joints,不用前面的action是因为action同时包含endpose或joints
|
||||||
action = torch.as_tensor(list(action.values()), dtype=torch.float32)
|
action = torch.as_tensor(list(action.values()), dtype=torch.float32)
|
||||||
|
|
||||||
# Capture images from cameras
|
# Capture images from cameras
|
||||||
@@ -176,8 +184,6 @@ class PiperRobot:
|
|||||||
|
|
||||||
return obs_dict, action_dict
|
return obs_dict, action_dict
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def send_action(self, action: torch.Tensor) -> torch.Tensor:
|
def send_action(self, action: torch.Tensor) -> torch.Tensor:
|
||||||
"""Write the predicted actions from policy to the motors"""
|
"""Write the predicted actions from policy to the motors"""
|
||||||
if not self.is_connected:
|
if not self.is_connected:
|
||||||
@@ -191,8 +197,6 @@ class PiperRobot:
|
|||||||
|
|
||||||
return action
|
return action
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def capture_observation(self) -> dict:
|
def capture_observation(self) -> dict:
|
||||||
"""capture current images and joint positions"""
|
"""capture current images and joint positions"""
|
||||||
if not self.is_connected:
|
if not self.is_connected:
|
||||||
|
|||||||
@@ -130,12 +130,432 @@ class SixAxisArmController:
|
|||||||
self.speeds = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0] # 6个关节的速度
|
self.speeds = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0] # 6个关节的速度
|
||||||
self.gripper_speed = 0.0 # 夹爪速度
|
self.gripper_speed = 0.0 # 夹爪速度
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
class UnifiedArmController:
|
||||||
|
def __init__(self):
|
||||||
|
# 初始化pygame和手柄
|
||||||
|
pygame.init()
|
||||||
|
pygame.joystick.init()
|
||||||
|
|
||||||
|
# 检查是否有连接的手柄
|
||||||
|
if pygame.joystick.get_count() == 0:
|
||||||
|
raise Exception("未检测到手柄")
|
||||||
|
|
||||||
|
# 初始化手柄
|
||||||
|
self.joystick = pygame.joystick.Joystick(0)
|
||||||
|
self.joystick.init()
|
||||||
|
|
||||||
|
# 控制模式标志 - True为末端位姿控制,False为关节控制
|
||||||
|
self.end_pose_mode = False
|
||||||
|
|
||||||
|
# 摇杆死区
|
||||||
|
self.deadzone = 0.15
|
||||||
|
|
||||||
|
# 精细控制模式
|
||||||
|
self.fine_control_mode = False
|
||||||
|
|
||||||
|
# 初始化关节状态
|
||||||
|
self.joints = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0] # 6个关节
|
||||||
|
self.joint_speeds = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0] # 6个关节的速度
|
||||||
|
|
||||||
|
# 关节弧度限制
|
||||||
|
self.joint_limits = [
|
||||||
|
(-92000 / 57324.840764, 92000 / 57324.840764), # joint1
|
||||||
|
( 0 / 57324.840764, 120000 / 57324.840764), # joint2
|
||||||
|
(-80000 / 57324.840764, 0 / 57324.840764), # joint3
|
||||||
|
(-90000 / 57324.840764, 90000 / 57324.840764), # joint4
|
||||||
|
(-65000 / 57324.840764, 65000 / 57324.840764), # joint5
|
||||||
|
(-90000 / 57324.840764, 90000 / 57324.840764) # joint6
|
||||||
|
]
|
||||||
|
|
||||||
|
# 初始化末端姿态 [X, Y, Z, RX, RY, RZ]
|
||||||
|
self.pose = [0.056127, 0, 0.213266, 0, 84.999, 0]
|
||||||
|
self.pose_speeds = [0.0] * 6
|
||||||
|
|
||||||
|
# 末端位姿限制
|
||||||
|
self.pose_limits = [
|
||||||
|
(-0.6, 0.6), # X (m)
|
||||||
|
(-0.6, 0.6), # Y (m)
|
||||||
|
(0.05, 0.6), # Z (m) - 设置最小高度防止碰撞
|
||||||
|
(-180, 180), # RX (deg)
|
||||||
|
(-180, 180), # RY (deg)
|
||||||
|
(-180, 180) # RZ (deg)
|
||||||
|
]
|
||||||
|
|
||||||
|
# 控制参数
|
||||||
|
self.linear_step = 0.0015 # 线性移动步长(m)
|
||||||
|
self.angular_step = 0.05 # 角度步长(deg)
|
||||||
|
self.joint_step = 0.015 # 关节步长(rad)
|
||||||
|
|
||||||
|
# 夹爪状态和速度
|
||||||
|
self.gripper = 0.0
|
||||||
|
self.gripper_speed = 0.0
|
||||||
|
|
||||||
|
# 启动更新线程
|
||||||
|
self.running = True
|
||||||
|
self.thread = threading.Thread(target=self.update_controller)
|
||||||
|
self.thread.start()
|
||||||
|
|
||||||
|
print("机械臂统一控制器已启动")
|
||||||
|
print("按下OPTIONS(Start)切换控制模式")
|
||||||
|
print("当前模式:", "末端位姿控制" if self.end_pose_mode else "关节控制")
|
||||||
|
|
||||||
|
def _apply_nonlinear_mapping(self, value):
|
||||||
|
"""应用非线性映射以提高控制精度"""
|
||||||
|
# 保持符号,但对数值应用平方映射以提高精度
|
||||||
|
sign = 1 if value >= 0 else -1
|
||||||
|
return sign * (abs(value) ** 2)
|
||||||
|
|
||||||
|
def _normalize_angle(self, angle):
|
||||||
|
"""将角度归一化到[-180, 180]范围内"""
|
||||||
|
while angle > 180:
|
||||||
|
angle -= 360
|
||||||
|
while angle < -180:
|
||||||
|
angle += 360
|
||||||
|
return angle
|
||||||
|
|
||||||
|
def update_controller(self):
|
||||||
|
while self.running:
|
||||||
|
try:
|
||||||
|
pygame.event.pump()
|
||||||
|
except Exception as e:
|
||||||
|
print(f"控制器错误: {e}")
|
||||||
|
self.stop()
|
||||||
|
continue
|
||||||
|
|
||||||
|
# 检查控制模式切换 (使用左摇杆按钮)
|
||||||
|
if self.joystick.get_button(9): # 左摇杆按钮
|
||||||
|
self.end_pose_mode = not self.end_pose_mode
|
||||||
|
print(f"切换到{'末端位姿控制' if self.end_pose_mode else '关节控制'}模式")
|
||||||
|
time.sleep(0.3) # 防止多次触发
|
||||||
|
|
||||||
|
# 检查精细控制模式切换 (使用L3按钮)
|
||||||
|
if self.joystick.get_button(10): # L3按钮
|
||||||
|
self.fine_control_mode = not self.fine_control_mode
|
||||||
|
print(f"切换到{'精细' if self.fine_control_mode else '普通'}控制模式")
|
||||||
|
time.sleep(0.3) # 防止多次触发
|
||||||
|
|
||||||
|
# 检查重置按钮 (7号按钮,通常是Start按钮)
|
||||||
|
if self.joystick.get_button(7): # Start按钮
|
||||||
|
print("重置机械臂到0位...")
|
||||||
|
# 重置关节和位姿
|
||||||
|
self.joints = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
|
||||||
|
self.joint_speeds = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
|
||||||
|
self.gripper = 0.0
|
||||||
|
self.gripper_speed = 0.0
|
||||||
|
# 临时切换到关节控制模式
|
||||||
|
self.end_pose_mode = False
|
||||||
|
print("机械臂已重置到0位")
|
||||||
|
time.sleep(0.3) # 防止多次触发
|
||||||
|
|
||||||
|
# 根据控制模式获取输入并更新状态
|
||||||
|
if self.end_pose_mode:
|
||||||
|
self.update_end_pose()
|
||||||
|
else:
|
||||||
|
self.update_joints()
|
||||||
|
|
||||||
|
# 夹爪控制(圈/叉)- 两种模式下都保持一致
|
||||||
|
circle = self.joystick.get_button(1) # 夹爪开
|
||||||
|
cross = self.joystick.get_button(0) # 夹爪关
|
||||||
|
self.gripper_speed = 0.01 if circle else (-0.01 if cross else 0.0)
|
||||||
|
|
||||||
|
# 更新夹爪
|
||||||
|
self.gripper += self.gripper_speed
|
||||||
|
self.gripper = max(0.0, min(0.08, self.gripper))
|
||||||
|
|
||||||
|
time.sleep(0.02)
|
||||||
|
|
||||||
|
def update_end_pose(self):
|
||||||
|
"""更新末端位姿控制"""
|
||||||
|
# 根据控制模式调整步长
|
||||||
|
current_linear_step = self.linear_step * (0.1 if self.fine_control_mode else 1.0)
|
||||||
|
current_angular_step = self.angular_step * (0.1 if self.fine_control_mode else 1.0)
|
||||||
|
|
||||||
|
# 方向键控制XY
|
||||||
|
hat = self.joystick.get_hat(0)
|
||||||
|
hat_up = hat[1] == 1 # Y+
|
||||||
|
hat_down = hat[1] == -1 # Y-
|
||||||
|
hat_left = hat[0] == -1 # X-
|
||||||
|
hat_right = hat[0] == 1 # X+
|
||||||
|
|
||||||
|
# 右摇杆控制Z
|
||||||
|
right_y = -self.joystick.get_axis(4) # Z控制,取反使向上为正
|
||||||
|
|
||||||
|
# 左摇杆控制RZ
|
||||||
|
left_y = -self.joystick.get_axis(1) # RZ控制,取反使向上为正
|
||||||
|
|
||||||
|
# 应用死区
|
||||||
|
right_y = 0.0 if abs(right_y) < self.deadzone else right_y
|
||||||
|
left_y = 0.0 if abs(left_y) < self.deadzone else left_y
|
||||||
|
|
||||||
|
self.pose_speeds[0] = current_linear_step if hat_up else (-current_linear_step if hat_down else 0.0) # X
|
||||||
|
self.pose_speeds[1] = current_linear_step if hat_left else (-current_linear_step if hat_right else 0.0) # Y
|
||||||
|
|
||||||
|
# 设置Z速度(右摇杆Y轴控制)
|
||||||
|
self.pose_speeds[2] = self._apply_nonlinear_mapping(right_y) * current_linear_step # Z
|
||||||
|
|
||||||
|
# L1/R1控制RX旋转
|
||||||
|
LB = self.joystick.get_button(4) # RX-
|
||||||
|
RB = self.joystick.get_button(5) # RX+
|
||||||
|
self.pose_speeds[3] = (-current_angular_step if LB else (current_angular_step if RB else 0.0))
|
||||||
|
|
||||||
|
# △/□控制RY旋转
|
||||||
|
triangle = self.joystick.get_button(2) # RY+
|
||||||
|
square = self.joystick.get_button(3) # RY-
|
||||||
|
self.pose_speeds[4] = (current_angular_step if triangle else (-current_angular_step if square else 0.0))
|
||||||
|
|
||||||
|
# 左摇杆Y轴控制RZ旋转
|
||||||
|
self.pose_speeds[5] = self._apply_nonlinear_mapping(left_y) * current_angular_step * 2 # RZ,增加系数使旋转更明显
|
||||||
|
|
||||||
|
# 更新末端位姿
|
||||||
|
for i in range(6):
|
||||||
|
self.pose[i] += self.pose_speeds[i]
|
||||||
|
|
||||||
|
# 位置限制
|
||||||
|
for i in range(3):
|
||||||
|
min_val, max_val = self.pose_limits[i]
|
||||||
|
self.pose[i] = max(min_val, min(max_val, self.pose[i]))
|
||||||
|
|
||||||
|
# 角度归一化处理
|
||||||
|
for i in range(3, 6):
|
||||||
|
self.pose[i] = self._normalize_angle(self.pose[i])
|
||||||
|
|
||||||
|
def update_joints(self):
|
||||||
|
"""更新关节控制"""
|
||||||
|
# 根据控制模式调整步长
|
||||||
|
current_joint_step = self.joint_step * (0.1 if self.fine_control_mode else 1.0)
|
||||||
|
|
||||||
|
# 使用类似于末端位姿控制的映射,但直接控制关节
|
||||||
|
|
||||||
|
# 左摇杆控制关节1和2 (类似于末端位姿控制中的X和Y)
|
||||||
|
left_x = -self.joystick.get_axis(0) # 左摇杆x轴
|
||||||
|
left_y = -self.joystick.get_axis(1) # 左摇杆y轴
|
||||||
|
|
||||||
|
# 应用死区
|
||||||
|
left_x = 0.0 if abs(left_x) < self.deadzone else left_x
|
||||||
|
left_y = 0.0 if abs(left_y) < self.deadzone else left_y
|
||||||
|
|
||||||
|
# 右摇杆控制关节3和4
|
||||||
|
right_x = self.joystick.get_axis(3) # 右摇杆x轴
|
||||||
|
right_y = self.joystick.get_axis(4) # 右摇杆y轴
|
||||||
|
|
||||||
|
# 应用死区
|
||||||
|
right_x = 0.0 if abs(right_x) < self.deadzone else right_x
|
||||||
|
right_y = 0.0 if abs(right_y) < self.deadzone else right_y
|
||||||
|
|
||||||
|
# 方向键控制关节5和6
|
||||||
|
hat = self.joystick.get_hat(0)
|
||||||
|
up = hat[1] == 1
|
||||||
|
down = hat[1] == -1
|
||||||
|
left = hat[0] == -1
|
||||||
|
right = hat[0] == 1
|
||||||
|
|
||||||
|
# 映射输入到关节速度
|
||||||
|
self.joint_speeds[0] = left_x * current_joint_step # joint1速度
|
||||||
|
self.joint_speeds[1] = left_y * current_joint_step # joint2速度
|
||||||
|
self.joint_speeds[2] = right_y * current_joint_step # joint3速度
|
||||||
|
self.joint_speeds[3] = right_x * current_joint_step # joint4速度
|
||||||
|
self.joint_speeds[4] = -current_joint_step if up else (current_joint_step if down else 0.0) # joint5速度
|
||||||
|
self.joint_speeds[5] = current_joint_step if right else (-current_joint_step if left else 0.0) # joint6速度
|
||||||
|
|
||||||
|
# 积分速度到关节位置
|
||||||
|
for i in range(6):
|
||||||
|
self.joints[i] += self.joint_speeds[i]
|
||||||
|
|
||||||
|
# 关节范围保护
|
||||||
|
for i in range(6):
|
||||||
|
min_val, max_val = self.joint_limits[i]
|
||||||
|
self.joints[i] = max(min_val, min(max_val, self.joints[i]))
|
||||||
|
|
||||||
|
def update_state(self, ctrl_mode, end_pose, joint_state):
|
||||||
|
if ctrl_mode == 'end_pose':
|
||||||
|
_joint_state = [0] * 6
|
||||||
|
_joint_state[0] = joint_state.joint_1 / 57324.840764
|
||||||
|
_joint_state[1] = joint_state.joint_2 / 57324.840764
|
||||||
|
_joint_state[2] = joint_state.joint_3 / 57324.840764
|
||||||
|
_joint_state[3] = joint_state.joint_4 / 57324.840764
|
||||||
|
_joint_state[4] = joint_state.joint_5 / 57324.840764
|
||||||
|
_joint_state[5] = joint_state.joint_6 / 57324.840764
|
||||||
|
self.joints = _joint_state
|
||||||
|
else:
|
||||||
|
_end_pose = [0] * 6
|
||||||
|
_end_pose[0] = end_pose.X_axis / 1000 / 1000
|
||||||
|
_end_pose[1] = end_pose.Y_axis / 1000 / 1000
|
||||||
|
_end_pose[2] = end_pose.Z_axis / 1000 / 1000
|
||||||
|
_end_pose[3] = end_pose.RX_axis / 1000
|
||||||
|
_end_pose[4] = end_pose.RY_axis / 1000
|
||||||
|
_end_pose[5] = end_pose.RZ_axis / 1000
|
||||||
|
self.pose = _end_pose
|
||||||
|
|
||||||
|
|
||||||
|
def get_action(self) -> Dict:
|
||||||
|
"""获取当前控制命令"""
|
||||||
|
if self.end_pose_mode:
|
||||||
|
# 返回末端位姿
|
||||||
|
return {
|
||||||
|
'X': self.pose[0],
|
||||||
|
'Y': self.pose[1],
|
||||||
|
'Z': self.pose[2],
|
||||||
|
'RX': self.pose[3],
|
||||||
|
'RY': self.pose[4],
|
||||||
|
'RZ': self.pose[5],
|
||||||
|
'gripper': self.gripper
|
||||||
|
}
|
||||||
|
else:
|
||||||
|
# 返回关节角度
|
||||||
|
return {
|
||||||
|
'joint0': self.joints[0],
|
||||||
|
'joint1': self.joints[1],
|
||||||
|
'joint2': self.joints[2],
|
||||||
|
'joint3': self.joints[3],
|
||||||
|
'joint4': self.joints[4],
|
||||||
|
'joint5': self.joints[5],
|
||||||
|
'gripper': self.gripper
|
||||||
|
}
|
||||||
|
|
||||||
|
def get_joints(self) -> Dict:
|
||||||
|
return {
|
||||||
|
'joint0': self.joints[0],
|
||||||
|
'joint1': self.joints[1],
|
||||||
|
'joint2': self.joints[2],
|
||||||
|
'joint3': self.joints[3],
|
||||||
|
'joint4': self.joints[4],
|
||||||
|
'joint5': self.joints[5],
|
||||||
|
'gripper': self.gripper
|
||||||
|
}
|
||||||
|
|
||||||
|
def get_control_mode(self):
|
||||||
|
"""返回当前控制模式"""
|
||||||
|
return "end_pose" if self.end_pose_mode else "joints"
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
"""停止控制器"""
|
||||||
|
self.running = False
|
||||||
|
if self.thread.is_alive():
|
||||||
|
self.thread.join()
|
||||||
|
pygame.quit()
|
||||||
|
print("控制器已退出")
|
||||||
|
|
||||||
|
def reset(self):
|
||||||
|
"""重置到初始状态"""
|
||||||
|
self.joints = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
|
||||||
|
self.joint_speeds = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
|
||||||
|
self.pose = [0.056127, 0, 0.213266, 0, 84.999, 0]
|
||||||
|
self.pose_speeds = [0.0] * 6
|
||||||
|
self.gripper = 0.0
|
||||||
|
self.gripper_speed = 0.0
|
||||||
|
print("已重置到初始状态")
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
# 使用示例
|
# 使用示例
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
arm_controller = SixAxisArmController()
|
# arm_controller = SixAxisArmController()
|
||||||
try:
|
# try:
|
||||||
while True:
|
# while True:
|
||||||
print(arm_controller.get_action())
|
# print(arm_controller.get_action())
|
||||||
time.sleep(0.1)
|
# time.sleep(0.1)
|
||||||
except KeyboardInterrupt:
|
# except KeyboardInterrupt:
|
||||||
arm_controller.stop()
|
# arm_controller.stop()
|
||||||
|
|
||||||
|
from piper_sdk import *
|
||||||
|
|
||||||
|
def enable_fun(piper:C_PiperInterface_V2):
|
||||||
|
'''
|
||||||
|
使能机械臂并检测使能状态,尝试5s,如果使能超时则退出程序
|
||||||
|
'''
|
||||||
|
enable_flag = False
|
||||||
|
# 设置超时时间(秒)
|
||||||
|
timeout = 5
|
||||||
|
# 记录进入循环前的时间
|
||||||
|
start_time = time.time()
|
||||||
|
elapsed_time_flag = False
|
||||||
|
while not (enable_flag):
|
||||||
|
elapsed_time = time.time() - start_time
|
||||||
|
print("--------------------")
|
||||||
|
enable_flag = piper.GetArmLowSpdInfoMsgs().motor_1.foc_status.driver_enable_status and \
|
||||||
|
piper.GetArmLowSpdInfoMsgs().motor_2.foc_status.driver_enable_status and \
|
||||||
|
piper.GetArmLowSpdInfoMsgs().motor_3.foc_status.driver_enable_status and \
|
||||||
|
piper.GetArmLowSpdInfoMsgs().motor_4.foc_status.driver_enable_status and \
|
||||||
|
piper.GetArmLowSpdInfoMsgs().motor_5.foc_status.driver_enable_status and \
|
||||||
|
piper.GetArmLowSpdInfoMsgs().motor_6.foc_status.driver_enable_status
|
||||||
|
print("使能状态:",enable_flag)
|
||||||
|
piper.EnableArm(7)
|
||||||
|
piper.GripperCtrl(0,1000,0x01, 0)
|
||||||
|
print("--------------------")
|
||||||
|
# 检查是否超过超时时间
|
||||||
|
if elapsed_time > timeout:
|
||||||
|
print("超时....")
|
||||||
|
elapsed_time_flag = True
|
||||||
|
enable_flag = True
|
||||||
|
break
|
||||||
|
time.sleep(1)
|
||||||
|
pass
|
||||||
|
if(elapsed_time_flag):
|
||||||
|
print("程序自动使能超时,退出程序")
|
||||||
|
exit(0)
|
||||||
|
|
||||||
|
piper = C_PiperInterface_V2("can0")
|
||||||
|
piper.ConnectPort()
|
||||||
|
piper.EnableArm(7)
|
||||||
|
enable_fun(piper=piper)
|
||||||
|
piper.GripperCtrl(0,1000,0x01, 0)
|
||||||
|
|
||||||
|
teleop = UnifiedArmController()
|
||||||
|
factor = 1000
|
||||||
|
|
||||||
|
while True:
|
||||||
|
# 获取当前控制命令
|
||||||
|
action = teleop.get_action()
|
||||||
|
control_mode = teleop.get_control_mode()
|
||||||
|
|
||||||
|
if control_mode == "end_pose":
|
||||||
|
# 末端位姿控制
|
||||||
|
position = list(action.values())
|
||||||
|
X = round(position[0]*factor*factor)
|
||||||
|
Y = round(position[1]*factor*factor)
|
||||||
|
Z = round(position[2]*factor*factor)
|
||||||
|
RX = round(position[3]*factor)
|
||||||
|
RY = round(position[4]*factor)
|
||||||
|
RZ = round(position[5]*factor)
|
||||||
|
joint_6 = round(position[6]*factor*factor)
|
||||||
|
|
||||||
|
piper.MotionCtrl_2(0x01, 0x00, 100, 0x00)
|
||||||
|
piper.EndPoseCtrl(X, Y, Z, RX, RY, RZ)
|
||||||
|
piper.GripperCtrl(abs(joint_6), 1000, 0x01, 0)
|
||||||
|
|
||||||
|
new_end_pose = piper.GetArmEndPoseMsgs().end_pose
|
||||||
|
new_joint_state = piper.GetArmJointMsgs().joint_state
|
||||||
|
teleop.update_state(control_mode, new_end_pose, new_joint_state)
|
||||||
|
print("控制模式: 末端控制")
|
||||||
|
print("末端位置", new_end_pose)
|
||||||
|
print("关节位置:", new_joint_state)
|
||||||
|
|
||||||
|
|
||||||
|
else:
|
||||||
|
# 关节控制
|
||||||
|
joints = list(action.values())
|
||||||
|
# 将关节角度转换为适合发送的格式
|
||||||
|
joint0 = round(joints[0] * 57324.840764) # 转换为机械臂期望的单位
|
||||||
|
joint1 = round(joints[1] * 57324.840764)
|
||||||
|
joint2 = round(joints[2] * 57324.840764)
|
||||||
|
joint3 = round(joints[3] * 57324.840764)
|
||||||
|
joint4 = round(joints[4] * 57324.840764)
|
||||||
|
joint5 = round(joints[5] * 57324.840764)
|
||||||
|
gripper = round(joints[6] * 1000 * 1000)
|
||||||
|
|
||||||
|
# 发送关节控制命令
|
||||||
|
piper.MotionCtrl_2(0x01, 0x01, 100, 0x00)
|
||||||
|
piper.JointCtrl(joint0, joint1, joint2, joint3, joint4, joint5)
|
||||||
|
piper.GripperCtrl(abs(gripper), 1000, 0x01, 0)
|
||||||
|
|
||||||
|
new_end_pose = piper.GetArmEndPoseMsgs().end_pose
|
||||||
|
new_joint_state = piper.GetArmJointMsgs().joint_state
|
||||||
|
teleop.update_state(control_mode, new_end_pose, new_joint_state)
|
||||||
|
|
||||||
|
print("控制模式: 关节控制")
|
||||||
|
print("末端位置", new_end_pose)
|
||||||
|
print("关节位置:", new_joint_state)
|
||||||
|
|
||||||
|
time.sleep(0.1)
|
||||||
|
|||||||
@@ -291,6 +291,28 @@ def record(
|
|||||||
single_task=cfg.single_task,
|
single_task=cfg.single_task,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# 录制完成后等待用户确认是否保存
|
||||||
|
if not events["stop_recording"] and not events["rerecord_episode"]:
|
||||||
|
log_say("Episode recorded. Press Enter to save, Backspace to discard, Esc to end session", cfg.play_sounds)
|
||||||
|
print("Confirmation controls: Enter to save, Backspace to discard, Esc to end recording session")
|
||||||
|
|
||||||
|
# 等待用户确认是否保存该段录制
|
||||||
|
waiting_for_confirmation = True
|
||||||
|
events["confirm_save"] = False
|
||||||
|
events["discard_episode"] = False
|
||||||
|
|
||||||
|
while waiting_for_confirmation and not events["stop_recording"]:
|
||||||
|
time.sleep(0.1)
|
||||||
|
if events["confirm_save"]:
|
||||||
|
log_say("Saving episode", cfg.play_sounds)
|
||||||
|
waiting_for_confirmation = False
|
||||||
|
elif events["discard_episode"]:
|
||||||
|
log_say("Discarding episode", cfg.play_sounds)
|
||||||
|
events["rerecord_episode"] = True
|
||||||
|
waiting_for_confirmation = False
|
||||||
|
elif events["stop_recording"]:
|
||||||
|
waiting_for_confirmation = False
|
||||||
|
|
||||||
# Execute a few seconds without recording to give time to manually reset the environment
|
# Execute a few seconds without recording to give time to manually reset the environment
|
||||||
# Current code logic doesn't allow to teleoperate during this time.
|
# Current code logic doesn't allow to teleoperate during this time.
|
||||||
# TODO(rcadene): add an option to enable teleoperation during reset
|
# TODO(rcadene): add an option to enable teleoperation during reset
|
||||||
|
|||||||
300
piper_scripts/piper_demo_endpose.py
Normal file
300
piper_scripts/piper_demo_endpose.py
Normal file
@@ -0,0 +1,300 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
# -*-coding:utf8-*-
|
||||||
|
# 注意demo无法直接运行,需要pip安装sdk后才能运行
|
||||||
|
from typing import (
|
||||||
|
Optional,
|
||||||
|
)
|
||||||
|
import time
|
||||||
|
from piper_sdk import *
|
||||||
|
|
||||||
|
import pygame
|
||||||
|
import threading
|
||||||
|
import time
|
||||||
|
from typing import Dict
|
||||||
|
|
||||||
|
|
||||||
|
def enable_fun(piper:C_PiperInterface_V2):
|
||||||
|
'''
|
||||||
|
使能机械臂并检测使能状态,尝试5s,如果使能超时则退出程序
|
||||||
|
'''
|
||||||
|
enable_flag = False
|
||||||
|
# 设置超时时间(秒)
|
||||||
|
timeout = 5
|
||||||
|
# 记录进入循环前的时间
|
||||||
|
start_time = time.time()
|
||||||
|
elapsed_time_flag = False
|
||||||
|
while not (enable_flag):
|
||||||
|
elapsed_time = time.time() - start_time
|
||||||
|
print("--------------------")
|
||||||
|
enable_flag = piper.GetArmLowSpdInfoMsgs().motor_1.foc_status.driver_enable_status and \
|
||||||
|
piper.GetArmLowSpdInfoMsgs().motor_2.foc_status.driver_enable_status and \
|
||||||
|
piper.GetArmLowSpdInfoMsgs().motor_3.foc_status.driver_enable_status and \
|
||||||
|
piper.GetArmLowSpdInfoMsgs().motor_4.foc_status.driver_enable_status and \
|
||||||
|
piper.GetArmLowSpdInfoMsgs().motor_5.foc_status.driver_enable_status and \
|
||||||
|
piper.GetArmLowSpdInfoMsgs().motor_6.foc_status.driver_enable_status
|
||||||
|
print("使能状态:",enable_flag)
|
||||||
|
piper.EnableArm(7)
|
||||||
|
piper.GripperCtrl(0,1000,0x01, 0)
|
||||||
|
print("--------------------")
|
||||||
|
# 检查是否超过超时时间
|
||||||
|
if elapsed_time > timeout:
|
||||||
|
print("超时....")
|
||||||
|
elapsed_time_flag = True
|
||||||
|
enable_flag = True
|
||||||
|
break
|
||||||
|
time.sleep(1)
|
||||||
|
pass
|
||||||
|
if(elapsed_time_flag):
|
||||||
|
print("程序自动使能超时,退出程序")
|
||||||
|
exit(0)
|
||||||
|
|
||||||
|
|
||||||
|
class EndPoseController:
|
||||||
|
def __init__(self):
|
||||||
|
pygame.init()
|
||||||
|
pygame.joystick.init()
|
||||||
|
if pygame.joystick.get_count() == 0:
|
||||||
|
raise Exception("未检测到手柄")
|
||||||
|
self.joystick = pygame.joystick.Joystick(0)
|
||||||
|
self.joystick.init()
|
||||||
|
|
||||||
|
# 末端初始姿态 [X, Y, Z, RX, RY, RZ]
|
||||||
|
# {'X': 56127, 'Y': 0, 'Z': 213266, 'RX': 0, 'RY': 84999, 'RZ': 0, 'gripper': -70} # 右臂
|
||||||
|
self.pose = [0.056127, 0, 0.213266, 0, 84.999, 0]
|
||||||
|
self.speeds = [0.0] * 6
|
||||||
|
|
||||||
|
# 夹爪状态和速度
|
||||||
|
self.gripper = 0.0
|
||||||
|
self.gripper_speed = 0.0
|
||||||
|
|
||||||
|
# 末端位姿限制
|
||||||
|
self.pose_limits = [
|
||||||
|
(-0.6, 0.6), # X (m)
|
||||||
|
(-0.6, 0.6), # Y (m)
|
||||||
|
(0.05, 0.6), # Z (m) - 设置最小高度防止碰撞
|
||||||
|
(-180, 180), # RX (deg)
|
||||||
|
(-180, 180), # RY (deg)
|
||||||
|
(-180, 180) # RZ (deg)
|
||||||
|
]
|
||||||
|
|
||||||
|
# 控制参数
|
||||||
|
self.linear_step = 0.003 # 线性移动步长(m),降低以提高精度
|
||||||
|
self.angular_step = 0.1 # 角度步长(deg),提高以便更好控制
|
||||||
|
|
||||||
|
# 摇杆死区
|
||||||
|
self.deadzone = 0.15
|
||||||
|
|
||||||
|
# 控制模式
|
||||||
|
self.fine_control_mode = False # 精细控制模式
|
||||||
|
|
||||||
|
self.running = True
|
||||||
|
self.thread = threading.Thread(target=self.update_pose)
|
||||||
|
self.thread.start()
|
||||||
|
|
||||||
|
print("末端位姿控制器已启动")
|
||||||
|
print("使用方向键上下左右控制XY,右摇杆控制Z")
|
||||||
|
print("L1/R1控制RX,△/□控制RY,右摇杆Y轴控制RZ")
|
||||||
|
print("按下L3(左摇杆按下)切换精细/普通控制模式")
|
||||||
|
|
||||||
|
def update_pose(self):
|
||||||
|
while self.running:
|
||||||
|
try:
|
||||||
|
pygame.event.pump()
|
||||||
|
except Exception as e:
|
||||||
|
print(f"控制器错误: {e}")
|
||||||
|
self.stop()
|
||||||
|
continue
|
||||||
|
|
||||||
|
# 检查精细控制模式切换
|
||||||
|
if self.joystick.get_button(10): # L3按钮
|
||||||
|
self.fine_control_mode = not self.fine_control_mode
|
||||||
|
print(f"切换到{'精细' if self.fine_control_mode else '普通'}控制模式")
|
||||||
|
time.sleep(0.3) # 防止多次触发
|
||||||
|
|
||||||
|
# 根据控制模式调整步长
|
||||||
|
current_linear_step = self.linear_step * (0.1 if self.fine_control_mode else 1.0)
|
||||||
|
current_angular_step = self.angular_step * (0.1 if self.fine_control_mode else 1.0)
|
||||||
|
|
||||||
|
# 方向键控制XY
|
||||||
|
hat = self.joystick.get_hat(0)
|
||||||
|
hat_up = hat[1] == 1 # Y+
|
||||||
|
hat_down = hat[1] == -1 # Y-
|
||||||
|
hat_left = hat[0] == -1 # X-
|
||||||
|
hat_right = hat[0] == 1 # X+
|
||||||
|
|
||||||
|
# 右摇杆控制Z和RZ
|
||||||
|
right_y = -self.joystick.get_axis(4) # Z控制,取反使向上为正
|
||||||
|
left_y = -self.joystick.get_axis(1) # RZ控制,取反使向上为正
|
||||||
|
|
||||||
|
# 应用死区
|
||||||
|
right_y = 0.0 if abs(right_y) < self.deadzone else right_y
|
||||||
|
left_y = 0.0 if abs(left_y) < self.deadzone else left_y
|
||||||
|
|
||||||
|
# 设置XY速度(方向键控制)
|
||||||
|
# self.speeds[0] = current_linear_step if hat_right else (-current_linear_step if hat_left else 0.0) # X
|
||||||
|
# self.speeds[1] = current_linear_step if hat_up else (-current_linear_step if hat_down else 0.0) # Y
|
||||||
|
|
||||||
|
self.speeds[0] = current_linear_step if hat_up else (-current_linear_step if hat_down else 0.0) # X
|
||||||
|
self.speeds[1] = current_linear_step if hat_left else (-current_linear_step if hat_right else 0.0) # Y
|
||||||
|
|
||||||
|
# 设置Z速度(右摇杆Y轴控制)
|
||||||
|
self.speeds[2] = self._apply_nonlinear_mapping(right_y) * current_linear_step # Z
|
||||||
|
|
||||||
|
# L1/R1控制RX旋转
|
||||||
|
LB = self.joystick.get_button(4) # RX-
|
||||||
|
RB = self.joystick.get_button(5) # RX+
|
||||||
|
self.speeds[3] = (-current_angular_step if LB else (current_angular_step if RB else 0.0))
|
||||||
|
|
||||||
|
# △/□控制RY旋转
|
||||||
|
triangle = self.joystick.get_button(2) # RY+
|
||||||
|
square = self.joystick.get_button(3) # RY-
|
||||||
|
self.speeds[4] = (current_angular_step if triangle else (-current_angular_step if square else 0.0))
|
||||||
|
|
||||||
|
# 左摇杆Y轴控制RZ旋转
|
||||||
|
self.speeds[5] = self._apply_nonlinear_mapping(left_y) * current_angular_step * 2 # RZ,增加系数使旋转更明显
|
||||||
|
|
||||||
|
# 夹爪控制(圈/叉)
|
||||||
|
circle = self.joystick.get_button(1) # 夹爪开
|
||||||
|
cross = self.joystick.get_button(0) # 夹爪关
|
||||||
|
self.gripper_speed = 0.01 if circle else (-0.01 if cross else 0.0)
|
||||||
|
|
||||||
|
# 更新末端位姿
|
||||||
|
for i in range(6):
|
||||||
|
self.pose[i] += self.speeds[i]
|
||||||
|
|
||||||
|
# 位置限制
|
||||||
|
for i in range(3):
|
||||||
|
min_val, max_val = self.pose_limits[i]
|
||||||
|
self.pose[i] = max(min_val, min(max_val, self.pose[i]))
|
||||||
|
|
||||||
|
# 角度归一化处理
|
||||||
|
for i in range(3, 6):
|
||||||
|
self.pose[i] = self._normalize_angle(self.pose[i])
|
||||||
|
|
||||||
|
# 更新夹爪
|
||||||
|
self.gripper += self.gripper_speed
|
||||||
|
self.gripper = max(0.0, min(0.08, self.gripper))
|
||||||
|
|
||||||
|
time.sleep(0.02)
|
||||||
|
|
||||||
|
def _normalize_angle(self, angle):
|
||||||
|
"""将角度归一化到[-180, 180]范围内"""
|
||||||
|
while angle > 180:
|
||||||
|
angle -= 360
|
||||||
|
while angle < -180:
|
||||||
|
angle += 360
|
||||||
|
return angle
|
||||||
|
|
||||||
|
def _apply_nonlinear_mapping(self, value):
|
||||||
|
"""应用非线性映射以提高控制精度"""
|
||||||
|
# 保持符号,但对数值应用平方映射以提高精度
|
||||||
|
sign = 1 if value >= 0 else -1
|
||||||
|
return sign * (abs(value) ** 2)
|
||||||
|
|
||||||
|
def get_action(self) -> Dict:
|
||||||
|
"""获取当前末端位姿和夹爪状态"""
|
||||||
|
return {
|
||||||
|
'X': self.pose[0]*1000,
|
||||||
|
'Y': self.pose[1]*1000,
|
||||||
|
'Z': self.pose[2]*1000,
|
||||||
|
'RX': self.pose[3],
|
||||||
|
'RY': self.pose[4],
|
||||||
|
'RZ': self.pose[5],
|
||||||
|
'gripper': self.gripper*1000
|
||||||
|
}
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
"""停止控制器"""
|
||||||
|
self.running = False
|
||||||
|
if self.thread.is_alive():
|
||||||
|
self.thread.join()
|
||||||
|
pygame.quit()
|
||||||
|
print("末端位姿控制器已退出")
|
||||||
|
|
||||||
|
def reset(self):
|
||||||
|
"""重置到初始位姿"""
|
||||||
|
self.pose = [0.056127, 0, 0.213266, 0, 84.999, 0] # 使用您提供的初始位姿
|
||||||
|
self.speeds = [0.0] * 6
|
||||||
|
self.gripper = 0.0
|
||||||
|
self.gripper_speed = 0.0
|
||||||
|
print("已重置到初始位姿")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
piper = C_PiperInterface_V2("can0")
|
||||||
|
piper.ConnectPort()
|
||||||
|
piper.EnableArm(7)
|
||||||
|
enable_fun(piper=piper)
|
||||||
|
piper.GripperCtrl(0,1000,0x01, 0)
|
||||||
|
factor = 1000
|
||||||
|
position = [
|
||||||
|
55.0, \
|
||||||
|
0.0, \
|
||||||
|
206.0, \
|
||||||
|
0, \
|
||||||
|
85.0, \
|
||||||
|
0, \
|
||||||
|
0]
|
||||||
|
# position = [0.0, \
|
||||||
|
# 0.0, \
|
||||||
|
# 80.0, \
|
||||||
|
# 0, \
|
||||||
|
# 203.386, \
|
||||||
|
# 0, \
|
||||||
|
# 0.8]
|
||||||
|
count = 0
|
||||||
|
teleop = EndPoseController()
|
||||||
|
while True:
|
||||||
|
print(piper.GetArmEndPoseMsgs())
|
||||||
|
# print(piper.GetArmStatus())
|
||||||
|
import time
|
||||||
|
# count = count + 1
|
||||||
|
# # print(count)
|
||||||
|
# if(count == 0):
|
||||||
|
# print("1-----------")
|
||||||
|
# position = [
|
||||||
|
# 55.0, \
|
||||||
|
# 0.0, \
|
||||||
|
# 206.0, \
|
||||||
|
# 0, \
|
||||||
|
# 85.0, \
|
||||||
|
# 0, \
|
||||||
|
# 0]
|
||||||
|
# elif(count == 200):
|
||||||
|
# print("2-----------")
|
||||||
|
# position = [
|
||||||
|
# 55.0, \
|
||||||
|
# 0.0, \
|
||||||
|
# 260.0, \
|
||||||
|
# 0, \
|
||||||
|
# 85.0, \
|
||||||
|
# 0, \
|
||||||
|
# 0]
|
||||||
|
# elif(count == 400):
|
||||||
|
# print("1-----------")
|
||||||
|
# position = [
|
||||||
|
# 55.0, \
|
||||||
|
# 0.0, \
|
||||||
|
# 280.0, \
|
||||||
|
# 0, \
|
||||||
|
# 85.0, \
|
||||||
|
# 0, \
|
||||||
|
# 0]
|
||||||
|
# count = 0
|
||||||
|
position = teleop.get_action()
|
||||||
|
position = list(position.values())
|
||||||
|
X = round(position[0]*factor)
|
||||||
|
Y = round(position[1]*factor)
|
||||||
|
Z = round(position[2]*factor)
|
||||||
|
RX = round(position[3]*factor)
|
||||||
|
RY = round(position[4]*factor)
|
||||||
|
RZ = round(position[5]*factor)
|
||||||
|
joint_6 = round(position[6]*factor)
|
||||||
|
print(X,Y,Z,RX,RY,RZ, joint_6)
|
||||||
|
# piper.MotionCtrl_1()
|
||||||
|
piper.MotionCtrl_2(0x01, 0x00, 100, 0x00)
|
||||||
|
piper.EndPoseCtrl(X,Y,Z,RX,RY,RZ)
|
||||||
|
piper.GripperCtrl(abs(joint_6), 1000, 0x01, 0)
|
||||||
|
|
||||||
|
time.sleep(0.1)
|
||||||
|
pass
|
||||||
415
piper_scripts/piper_demo_endpose_joint.py
Normal file
415
piper_scripts/piper_demo_endpose_joint.py
Normal file
@@ -0,0 +1,415 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
# -*-coding:utf8-*-
|
||||||
|
from typing import Optional
|
||||||
|
import time
|
||||||
|
from piper_sdk import *
|
||||||
|
import pygame
|
||||||
|
import threading
|
||||||
|
from typing import Dict
|
||||||
|
|
||||||
|
|
||||||
|
def enable_fun(piper:C_PiperInterface_V2):
|
||||||
|
'''
|
||||||
|
使能机械臂并检测使能状态,尝试5s,如果使能超时则退出程序
|
||||||
|
'''
|
||||||
|
enable_flag = False
|
||||||
|
# 设置超时时间(秒)
|
||||||
|
timeout = 5
|
||||||
|
# 记录进入循环前的时间
|
||||||
|
start_time = time.time()
|
||||||
|
elapsed_time_flag = False
|
||||||
|
while not (enable_flag):
|
||||||
|
elapsed_time = time.time() - start_time
|
||||||
|
print("--------------------")
|
||||||
|
enable_flag = piper.GetArmLowSpdInfoMsgs().motor_1.foc_status.driver_enable_status and \
|
||||||
|
piper.GetArmLowSpdInfoMsgs().motor_2.foc_status.driver_enable_status and \
|
||||||
|
piper.GetArmLowSpdInfoMsgs().motor_3.foc_status.driver_enable_status and \
|
||||||
|
piper.GetArmLowSpdInfoMsgs().motor_4.foc_status.driver_enable_status and \
|
||||||
|
piper.GetArmLowSpdInfoMsgs().motor_5.foc_status.driver_enable_status and \
|
||||||
|
piper.GetArmLowSpdInfoMsgs().motor_6.foc_status.driver_enable_status
|
||||||
|
print("使能状态:",enable_flag)
|
||||||
|
piper.EnableArm(7)
|
||||||
|
piper.GripperCtrl(0,1000,0x01, 0)
|
||||||
|
print("--------------------")
|
||||||
|
# 检查是否超过超时时间
|
||||||
|
if elapsed_time > timeout:
|
||||||
|
print("超时....")
|
||||||
|
elapsed_time_flag = True
|
||||||
|
enable_flag = True
|
||||||
|
break
|
||||||
|
time.sleep(1)
|
||||||
|
pass
|
||||||
|
if(elapsed_time_flag):
|
||||||
|
print("程序自动使能超时,退出程序")
|
||||||
|
exit(0)
|
||||||
|
|
||||||
|
|
||||||
|
class UnifiedArmController:
|
||||||
|
def __init__(self):
|
||||||
|
# 初始化pygame和手柄
|
||||||
|
pygame.init()
|
||||||
|
pygame.joystick.init()
|
||||||
|
|
||||||
|
# 检查是否有连接的手柄
|
||||||
|
if pygame.joystick.get_count() == 0:
|
||||||
|
raise Exception("未检测到手柄")
|
||||||
|
|
||||||
|
# 初始化手柄
|
||||||
|
self.joystick = pygame.joystick.Joystick(0)
|
||||||
|
self.joystick.init()
|
||||||
|
|
||||||
|
# 控制模式标志 - True为末端位姿控制,False为关节控制
|
||||||
|
self.end_pose_mode = False
|
||||||
|
|
||||||
|
# 摇杆死区
|
||||||
|
self.deadzone = 0.15
|
||||||
|
|
||||||
|
# 精细控制模式
|
||||||
|
self.fine_control_mode = False
|
||||||
|
|
||||||
|
# 初始化关节状态
|
||||||
|
self.joints = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0] # 6个关节
|
||||||
|
self.joint_speeds = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0] # 6个关节的速度
|
||||||
|
|
||||||
|
# 关节弧度限制
|
||||||
|
self.joint_limits = [
|
||||||
|
(-92000 / 57324.840764, 92000 / 57324.840764), # joint1
|
||||||
|
( 0 / 57324.840764, 120000 / 57324.840764), # joint2
|
||||||
|
(-80000 / 57324.840764, 0 / 57324.840764), # joint3
|
||||||
|
(-90000 / 57324.840764, 90000 / 57324.840764), # joint4
|
||||||
|
(-65000 / 57324.840764, 65000 / 57324.840764), # joint5
|
||||||
|
(-90000 / 57324.840764, 90000 / 57324.840764) # joint6
|
||||||
|
]
|
||||||
|
|
||||||
|
# 初始化末端姿态 [X, Y, Z, RX, RY, RZ]
|
||||||
|
self.pose = [0.056127, 0, 0.213266, 0, 84.999, 0]
|
||||||
|
self.pose_speeds = [0.0] * 6
|
||||||
|
|
||||||
|
# 末端位姿限制
|
||||||
|
self.pose_limits = [
|
||||||
|
(-0.6, 0.6), # X (m)
|
||||||
|
(-0.6, 0.6), # Y (m)
|
||||||
|
(0.05, 0.6), # Z (m) - 设置最小高度防止碰撞
|
||||||
|
(-180, 180), # RX (deg)
|
||||||
|
(-180, 180), # RY (deg)
|
||||||
|
(-180, 180) # RZ (deg)
|
||||||
|
]
|
||||||
|
|
||||||
|
# 控制参数
|
||||||
|
self.linear_step = 0.0015 # 线性移动步长(m)
|
||||||
|
self.angular_step = 0.05 # 角度步长(deg)
|
||||||
|
self.joint_step = 0.015 # 关节步长(rad)
|
||||||
|
|
||||||
|
# 夹爪状态和速度
|
||||||
|
self.gripper = 0.0
|
||||||
|
self.gripper_speed = 0.0
|
||||||
|
|
||||||
|
# 启动更新线程
|
||||||
|
self.running = True
|
||||||
|
self.thread = threading.Thread(target=self.update_controller)
|
||||||
|
self.thread.start()
|
||||||
|
|
||||||
|
print("机械臂统一控制器已启动")
|
||||||
|
print("按下OPTIONS(Start)切换控制模式")
|
||||||
|
print("当前模式:", "末端位姿控制" if self.end_pose_mode else "关节控制")
|
||||||
|
|
||||||
|
def _apply_nonlinear_mapping(self, value):
|
||||||
|
"""应用非线性映射以提高控制精度"""
|
||||||
|
# 保持符号,但对数值应用平方映射以提高精度
|
||||||
|
sign = 1 if value >= 0 else -1
|
||||||
|
return sign * (abs(value) ** 2)
|
||||||
|
|
||||||
|
def _normalize_angle(self, angle):
|
||||||
|
"""将角度归一化到[-180, 180]范围内"""
|
||||||
|
while angle > 180:
|
||||||
|
angle -= 360
|
||||||
|
while angle < -180:
|
||||||
|
angle += 360
|
||||||
|
return angle
|
||||||
|
|
||||||
|
def update_controller(self):
|
||||||
|
while self.running:
|
||||||
|
try:
|
||||||
|
pygame.event.pump()
|
||||||
|
except Exception as e:
|
||||||
|
print(f"控制器错误: {e}")
|
||||||
|
self.stop()
|
||||||
|
continue
|
||||||
|
|
||||||
|
# 检查控制模式切换 (使用左摇杆按钮)
|
||||||
|
if self.joystick.get_button(9): # 左摇杆按钮
|
||||||
|
self.end_pose_mode = not self.end_pose_mode
|
||||||
|
print(f"切换到{'末端位姿控制' if self.end_pose_mode else '关节控制'}模式")
|
||||||
|
time.sleep(0.3) # 防止多次触发
|
||||||
|
|
||||||
|
# 检查精细控制模式切换 (使用L3按钮)
|
||||||
|
if self.joystick.get_button(10): # L3按钮
|
||||||
|
self.fine_control_mode = not self.fine_control_mode
|
||||||
|
print(f"切换到{'精细' if self.fine_control_mode else '普通'}控制模式")
|
||||||
|
time.sleep(0.3) # 防止多次触发
|
||||||
|
|
||||||
|
# 检查重置按钮 (7号按钮,通常是Start按钮)
|
||||||
|
if self.joystick.get_button(7): # Start按钮
|
||||||
|
print("重置机械臂到0位...")
|
||||||
|
# 重置关节和位姿
|
||||||
|
self.joints = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
|
||||||
|
self.joint_speeds = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
|
||||||
|
self.gripper = 0.0
|
||||||
|
self.gripper_speed = 0.0
|
||||||
|
# 临时切换到关节控制模式
|
||||||
|
self.end_pose_mode = False
|
||||||
|
print("机械臂已重置到0位")
|
||||||
|
time.sleep(0.3) # 防止多次触发
|
||||||
|
|
||||||
|
# 根据控制模式获取输入并更新状态
|
||||||
|
if self.end_pose_mode:
|
||||||
|
self.update_end_pose()
|
||||||
|
else:
|
||||||
|
self.update_joints()
|
||||||
|
|
||||||
|
# 夹爪控制(圈/叉)- 两种模式下都保持一致
|
||||||
|
circle = self.joystick.get_button(1) # 夹爪开
|
||||||
|
cross = self.joystick.get_button(0) # 夹爪关
|
||||||
|
self.gripper_speed = 0.01 if circle else (-0.01 if cross else 0.0)
|
||||||
|
|
||||||
|
# 更新夹爪
|
||||||
|
self.gripper += self.gripper_speed
|
||||||
|
self.gripper = max(0.0, min(0.08, self.gripper))
|
||||||
|
|
||||||
|
time.sleep(0.02)
|
||||||
|
|
||||||
|
def update_end_pose(self):
|
||||||
|
"""更新末端位姿控制"""
|
||||||
|
# 根据控制模式调整步长
|
||||||
|
current_linear_step = self.linear_step * (0.1 if self.fine_control_mode else 1.0)
|
||||||
|
current_angular_step = self.angular_step * (0.1 if self.fine_control_mode else 1.0)
|
||||||
|
|
||||||
|
# 方向键控制XY
|
||||||
|
hat = self.joystick.get_hat(0)
|
||||||
|
hat_up = hat[1] == 1 # Y+
|
||||||
|
hat_down = hat[1] == -1 # Y-
|
||||||
|
hat_left = hat[0] == -1 # X-
|
||||||
|
hat_right = hat[0] == 1 # X+
|
||||||
|
|
||||||
|
# 右摇杆控制Z
|
||||||
|
right_y = -self.joystick.get_axis(4) # Z控制,取反使向上为正
|
||||||
|
|
||||||
|
# 左摇杆控制RZ
|
||||||
|
left_y = -self.joystick.get_axis(1) # RZ控制,取反使向上为正
|
||||||
|
|
||||||
|
# 应用死区
|
||||||
|
right_y = 0.0 if abs(right_y) < self.deadzone else right_y
|
||||||
|
left_y = 0.0 if abs(left_y) < self.deadzone else left_y
|
||||||
|
|
||||||
|
self.pose_speeds[0] = current_linear_step if hat_up else (-current_linear_step if hat_down else 0.0) # X
|
||||||
|
self.pose_speeds[1] = current_linear_step if hat_left else (-current_linear_step if hat_right else 0.0) # Y
|
||||||
|
|
||||||
|
# 设置Z速度(右摇杆Y轴控制)
|
||||||
|
self.pose_speeds[2] = self._apply_nonlinear_mapping(right_y) * current_linear_step # Z
|
||||||
|
|
||||||
|
# L1/R1控制RX旋转
|
||||||
|
LB = self.joystick.get_button(4) # RX-
|
||||||
|
RB = self.joystick.get_button(5) # RX+
|
||||||
|
self.pose_speeds[3] = (-current_angular_step if LB else (current_angular_step if RB else 0.0))
|
||||||
|
|
||||||
|
# △/□控制RY旋转
|
||||||
|
triangle = self.joystick.get_button(2) # RY+
|
||||||
|
square = self.joystick.get_button(3) # RY-
|
||||||
|
self.pose_speeds[4] = (current_angular_step if triangle else (-current_angular_step if square else 0.0))
|
||||||
|
|
||||||
|
# 左摇杆Y轴控制RZ旋转
|
||||||
|
self.pose_speeds[5] = self._apply_nonlinear_mapping(left_y) * current_angular_step * 2 # RZ,增加系数使旋转更明显
|
||||||
|
|
||||||
|
# 更新末端位姿
|
||||||
|
for i in range(6):
|
||||||
|
self.pose[i] += self.pose_speeds[i]
|
||||||
|
|
||||||
|
# 位置限制
|
||||||
|
for i in range(3):
|
||||||
|
min_val, max_val = self.pose_limits[i]
|
||||||
|
self.pose[i] = max(min_val, min(max_val, self.pose[i]))
|
||||||
|
|
||||||
|
# 角度归一化处理
|
||||||
|
for i in range(3, 6):
|
||||||
|
self.pose[i] = self._normalize_angle(self.pose[i])
|
||||||
|
|
||||||
|
def update_joints(self):
|
||||||
|
"""更新关节控制"""
|
||||||
|
# 根据控制模式调整步长
|
||||||
|
current_joint_step = self.joint_step * (0.1 if self.fine_control_mode else 1.0)
|
||||||
|
|
||||||
|
# 使用类似于末端位姿控制的映射,但直接控制关节
|
||||||
|
|
||||||
|
# 左摇杆控制关节1和2 (类似于末端位姿控制中的X和Y)
|
||||||
|
left_x = -self.joystick.get_axis(0) # 左摇杆x轴
|
||||||
|
left_y = -self.joystick.get_axis(1) # 左摇杆y轴
|
||||||
|
|
||||||
|
# 应用死区
|
||||||
|
left_x = 0.0 if abs(left_x) < self.deadzone else left_x
|
||||||
|
left_y = 0.0 if abs(left_y) < self.deadzone else left_y
|
||||||
|
|
||||||
|
# 右摇杆控制关节3和4
|
||||||
|
right_x = self.joystick.get_axis(3) # 右摇杆x轴
|
||||||
|
right_y = self.joystick.get_axis(4) # 右摇杆y轴
|
||||||
|
|
||||||
|
# 应用死区
|
||||||
|
right_x = 0.0 if abs(right_x) < self.deadzone else right_x
|
||||||
|
right_y = 0.0 if abs(right_y) < self.deadzone else right_y
|
||||||
|
|
||||||
|
# 方向键控制关节5和6
|
||||||
|
hat = self.joystick.get_hat(0)
|
||||||
|
up = hat[1] == 1
|
||||||
|
down = hat[1] == -1
|
||||||
|
left = hat[0] == -1
|
||||||
|
right = hat[0] == 1
|
||||||
|
|
||||||
|
# 映射输入到关节速度
|
||||||
|
self.joint_speeds[0] = left_x * current_joint_step # joint1速度
|
||||||
|
self.joint_speeds[1] = left_y * current_joint_step # joint2速度
|
||||||
|
self.joint_speeds[2] = right_y * current_joint_step # joint3速度
|
||||||
|
self.joint_speeds[3] = right_x * current_joint_step # joint4速度
|
||||||
|
self.joint_speeds[4] = -current_joint_step if up else (current_joint_step if down else 0.0) # joint5速度
|
||||||
|
self.joint_speeds[5] = current_joint_step if right else (-current_joint_step if left else 0.0) # joint6速度
|
||||||
|
|
||||||
|
# 积分速度到关节位置
|
||||||
|
for i in range(6):
|
||||||
|
self.joints[i] += self.joint_speeds[i]
|
||||||
|
|
||||||
|
# 关节范围保护
|
||||||
|
for i in range(6):
|
||||||
|
min_val, max_val = self.joint_limits[i]
|
||||||
|
self.joints[i] = max(min_val, min(max_val, self.joints[i]))
|
||||||
|
|
||||||
|
def update_state(self, ctrl_mode, end_pose, joint_state):
|
||||||
|
if ctrl_mode == 'end_pose':
|
||||||
|
_joint_state = [0] * 6
|
||||||
|
_joint_state[0] = joint_state.joint_1 / 57324.840764
|
||||||
|
_joint_state[1] = joint_state.joint_2 / 57324.840764
|
||||||
|
_joint_state[2] = joint_state.joint_3 / 57324.840764
|
||||||
|
_joint_state[3] = joint_state.joint_4 / 57324.840764
|
||||||
|
_joint_state[4] = joint_state.joint_5 / 57324.840764
|
||||||
|
_joint_state[5] = joint_state.joint_6 / 57324.840764
|
||||||
|
self.joints = _joint_state
|
||||||
|
else:
|
||||||
|
_end_pose = [0] * 6
|
||||||
|
_end_pose[0] = end_pose.X_axis / 1000 / 1000
|
||||||
|
_end_pose[1] = end_pose.Y_axis / 1000 / 1000
|
||||||
|
_end_pose[2] = end_pose.Z_axis / 1000 / 1000
|
||||||
|
_end_pose[3] = end_pose.RX_axis / 1000
|
||||||
|
_end_pose[4] = end_pose.RY_axis / 1000
|
||||||
|
_end_pose[5] = end_pose.RZ_axis / 1000
|
||||||
|
self.pose = _end_pose
|
||||||
|
|
||||||
|
|
||||||
|
def get_action(self) -> Dict:
|
||||||
|
"""获取当前控制命令"""
|
||||||
|
if self.end_pose_mode:
|
||||||
|
# 返回末端位姿
|
||||||
|
return {
|
||||||
|
'X': self.pose[0],
|
||||||
|
'Y': self.pose[1],
|
||||||
|
'Z': self.pose[2],
|
||||||
|
'RX': self.pose[3],
|
||||||
|
'RY': self.pose[4],
|
||||||
|
'RZ': self.pose[5],
|
||||||
|
'gripper': self.gripper
|
||||||
|
}
|
||||||
|
else:
|
||||||
|
# 返回关节角度
|
||||||
|
return {
|
||||||
|
'joint0': self.joints[0],
|
||||||
|
'joint1': self.joints[1],
|
||||||
|
'joint2': self.joints[2],
|
||||||
|
'joint3': self.joints[3],
|
||||||
|
'joint4': self.joints[4],
|
||||||
|
'joint5': self.joints[5],
|
||||||
|
'gripper': self.gripper
|
||||||
|
}
|
||||||
|
|
||||||
|
def get_control_mode(self):
|
||||||
|
"""返回当前控制模式"""
|
||||||
|
return "end_pose" if self.end_pose_mode else "joints"
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
"""停止控制器"""
|
||||||
|
self.running = False
|
||||||
|
if self.thread.is_alive():
|
||||||
|
self.thread.join()
|
||||||
|
pygame.quit()
|
||||||
|
print("控制器已退出")
|
||||||
|
|
||||||
|
def reset(self):
|
||||||
|
"""重置到初始状态"""
|
||||||
|
self.joints = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
|
||||||
|
self.joint_speeds = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
|
||||||
|
self.pose = [0.056127, 0, 0.213266, 0, 84.999, 0]
|
||||||
|
self.pose_speeds = [0.0] * 6
|
||||||
|
self.gripper = 0.0
|
||||||
|
self.gripper_speed = 0.0
|
||||||
|
print("已重置到初始状态")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
piper = C_PiperInterface_V2("can0")
|
||||||
|
piper.ConnectPort()
|
||||||
|
piper.EnableArm(7)
|
||||||
|
enable_fun(piper=piper)
|
||||||
|
piper.GripperCtrl(0,1000,0x01, 0)
|
||||||
|
|
||||||
|
teleop = UnifiedArmController()
|
||||||
|
factor = 1000
|
||||||
|
|
||||||
|
while True:
|
||||||
|
# 获取当前控制命令
|
||||||
|
action = teleop.get_action()
|
||||||
|
control_mode = teleop.get_control_mode()
|
||||||
|
|
||||||
|
if control_mode == "end_pose":
|
||||||
|
# 末端位姿控制
|
||||||
|
position = list(action.values())
|
||||||
|
X = round(position[0]*factor*factor)
|
||||||
|
Y = round(position[1]*factor*factor)
|
||||||
|
Z = round(position[2]*factor*factor)
|
||||||
|
RX = round(position[3]*factor)
|
||||||
|
RY = round(position[4]*factor)
|
||||||
|
RZ = round(position[5]*factor)
|
||||||
|
joint_6 = round(position[6]*factor*factor)
|
||||||
|
|
||||||
|
piper.MotionCtrl_2(0x01, 0x00, 100, 0x00)
|
||||||
|
piper.EndPoseCtrl(X, Y, Z, RX, RY, RZ)
|
||||||
|
piper.GripperCtrl(abs(joint_6), 1000, 0x01, 0)
|
||||||
|
|
||||||
|
new_end_pose = piper.GetArmEndPoseMsgs().end_pose
|
||||||
|
new_joint_state = piper.GetArmJointMsgs().joint_state
|
||||||
|
teleop.update_state(control_mode, new_end_pose, new_joint_state)
|
||||||
|
print("控制模式: 末端控制")
|
||||||
|
print("末端位置", new_end_pose)
|
||||||
|
print("关节位置:", new_joint_state)
|
||||||
|
|
||||||
|
|
||||||
|
else:
|
||||||
|
# 关节控制
|
||||||
|
joints = list(action.values())
|
||||||
|
# 将关节角度转换为适合发送的格式
|
||||||
|
joint0 = round(joints[0] * 57324.840764) # 转换为机械臂期望的单位
|
||||||
|
joint1 = round(joints[1] * 57324.840764)
|
||||||
|
joint2 = round(joints[2] * 57324.840764)
|
||||||
|
joint3 = round(joints[3] * 57324.840764)
|
||||||
|
joint4 = round(joints[4] * 57324.840764)
|
||||||
|
joint5 = round(joints[5] * 57324.840764)
|
||||||
|
gripper = round(joints[6] * 1000 * 1000)
|
||||||
|
|
||||||
|
# 发送关节控制命令
|
||||||
|
piper.MotionCtrl_2(0x01, 0x01, 100, 0x00)
|
||||||
|
piper.JointCtrl(joint0, joint1, joint2, joint3, joint4, joint5)
|
||||||
|
piper.GripperCtrl(abs(gripper), 1000, 0x01, 0)
|
||||||
|
|
||||||
|
new_end_pose = piper.GetArmEndPoseMsgs().end_pose
|
||||||
|
new_joint_state = piper.GetArmJointMsgs().joint_state
|
||||||
|
teleop.update_state(control_mode, new_end_pose, new_joint_state)
|
||||||
|
|
||||||
|
print("控制模式: 关节控制")
|
||||||
|
print("末端位置", new_end_pose)
|
||||||
|
print("关节位置:", new_joint_state)
|
||||||
|
|
||||||
|
time.sleep(0.1)
|
||||||
Reference in New Issue
Block a user