optimize some codes
Some checks failed
Secret Leaks / trufflehog (push) Failing after 2m47s

This commit is contained in:
2025-07-13 16:24:28 +08:00
parent 7c1699898b
commit 3685542bf1
3 changed files with 35 additions and 18 deletions

View File

@@ -85,6 +85,7 @@ class RealmanDualMotorsBus:
"""初始化线程控制"""
self.left_slow_busy = False
self.right_slow_busy = False
self.gripper_busy = False
self._thread_lock = threading.Lock()
# 添加读取相关的线程控制
@@ -256,8 +257,21 @@ class RealmanDualMotorsBus:
self.right_rmarm.rm_movep_canfd(target_endpose[6:], False)
def write_dual_gripper(self, left_gripper: int, right_gripper: int):
self.left_rmarm.rm_set_gripper_position(left_gripper, False, 2)
self.right_rmarm.rm_set_gripper_position(right_gripper, False, 2)
try:
self.left_rmarm.rm_set_gripper_position(left_gripper, False, 2)
self.right_rmarm.rm_set_gripper_position(right_gripper, False, 2)
finally:
self.gripper_busy = False
def _execute_gripper_thread(self, left_gripper: int, right_gripper: int):
if not getattr(self, 'gripper_busy'):
setattr(self, 'gripper_busy', True)
threading.Thread(
target=self.write_dual_gripper,
args=(left_gripper, right_gripper),
daemon=True
).start()
def read_current_arm_joint_state(self):
return self.left_rmarm.rm_get_current_arm_state()[1]['joint'] + self.right_rmarm.rm_get_current_arm_state()[1]['joint']
@@ -315,6 +329,8 @@ class RealmanDualMotorsBus:
self.right_rmarm.rm_movej_canfd(right_joints, follow=False)
# self.right_rmarm.rm_movej_follow(right_joints)
# self.right_rmarm.rm_set_gripper_position(right_gripper, block=False, timeout=2)
self._execute_gripper_thread(left_gripper, right_gripper)
def read(self) -> Dict:
"""读取机械臂状态 - 直接从缓存获取"""

View File

@@ -105,11 +105,15 @@ class RealmanDualRobot:
return obs_dict, action_dict
def _update_state_queue(self):
# import pdb
# pdb.set_trace()
"""更新状态队列"""
current_state = list(self.arm.read()['joint'].values())
self.joint_queue.append(current_state)
current_state = self.arm.read()['joint']
current_state_lst = []
for data in current_state:
if "joint" in data:
current_state_lst.append(current_state[data] / 180)
elif "gripper" in data:
current_state_lst.append((current_state[data]-500)/500)
self.joint_queue.append(current_state_lst)
def _capture_images(self) -> Dict[str, torch.Tensor]:
"""捕获图像"""
@@ -228,16 +232,11 @@ class RealmanDualRobot:
state = self._read_robot_state()
# 获取动作
action = self.teleop.get_action(state)
# print(state['pose'])
# print(action['pose'])
# 执行动作
# import pdb
# pdb.set_trace()
self._execute_action(action, state)
# 更新状态队列
self._update_state_queue()
time.sleep(0.02)
time.sleep(0.019) # 50HZ
if record_data:
data = self._prepare_record_data()
return data

View File

@@ -144,9 +144,11 @@ class ControllerBase:
# validzone
self.validzone = 0.05
self.ratio = 1
self.gripper_vel = 50
self.gripper_vel = 100
self.rxyz_vel = 5
self.xyz_vel = 0.02
self.scale_up = 2
self.scale_down = 10
# 线程控制标志
self.running = False
@@ -228,9 +230,9 @@ class FlightStick(ControllerBase):
def get_ratio_control_signal(self):
ratio = self.ratio
if self.states['axes'][2] > 0.8:
ratio = self.ratio / 5
ratio = self.ratio / self.scale_down
elif self.states['axes'][2] < -0.8:
ratio = self.ratio * 2
ratio = self.ratio * self.scale_up
return ratio
def get_rx_control_signal(self):
@@ -316,9 +318,9 @@ class XboxStick(ControllerBase):
"""获取速度控制信号"""
ratio = self.ratio
if self.states['axes'][2] > 0.8: # LT 按钮
ratio = self.ratio * 2
ratio = self.ratio * self.scale_up
elif self.states['axes'][5] > 0.8: # RT 按钮
ratio = self.ratio / 5
ratio = self.ratio / self.scale_down
return ratio
def get_gripper_control_signal(self):