From 7694d03dee36459a16c85167112a4820686a7f1d Mon Sep 17 00:00:00 2001 From: Michel Aractingi Date: Mon, 12 May 2025 17:47:13 +0200 Subject: [PATCH] Improved the takeover logic in the case of `leader_automatic` control_mode in `gym_manipulator.py` --- lerobot/scripts/server/gym_manipulator.py | 82 +++++++++++++---------- 1 file changed, 45 insertions(+), 37 deletions(-) diff --git a/lerobot/scripts/server/gym_manipulator.py b/lerobot/scripts/server/gym_manipulator.py index 2b221d71..ed2f3a45 100644 --- a/lerobot/scripts/server/gym_manipulator.py +++ b/lerobot/scripts/server/gym_manipulator.py @@ -1180,9 +1180,9 @@ class BaseLeaderControlWrapper(gym.Wrapper): # With higher gains, it would be dangerous and difficult to modify the leader's pose while torque is enabled # Default value for P_coeff is 32 self.robot_leader.write("Torque_Enable", 1) - self.robot_leader.write("P_Coefficient", 4) + self.robot_leader.write("P_Coefficient", 16) self.robot_leader.write("I_Coefficient", 0) - self.robot_leader.write("D_Coefficient", 4) + self.robot_leader.write("D_Coefficient", 16) self._init_keyboard_listener() @@ -1451,9 +1451,9 @@ class GearedLeaderAutomaticControlWrapper(BaseLeaderControlWrapper): env, ee_action_space_params=None, use_gripper=False, - intervention_threshold=1.7, - release_threshold=0.01, - queue_size=10, + intervention_threshold=0.005, + release_threshold=5e-6, + queue_size=3, ): """ Initialize the automatic intervention wrapper. @@ -1474,18 +1474,19 @@ class GearedLeaderAutomaticControlWrapper(BaseLeaderControlWrapper): self.queue_size = queue_size # Number of error measurements to keep # Error tracking variables - self.error_queue = deque(maxlen=self.queue_size) - self.error_over_time_queue = deque(maxlen=self.queue_size) - self.previous_error = 0.0 + self.ee_error_queue = deque(maxlen=self.queue_size) + self.ee_error_over_time_queue = deque(maxlen=self.queue_size) + self.previous_ee_error = 0.0 + self.previous_ee_error_over_time_over_time = 0.0 self.is_intervention_active = False self.start_time = time.perf_counter() def _check_intervention(self): """ - Determine if intervention should occur based on leader-follower error. + Determine if intervention should occur based on the rate of change of leader-follower error in end_effector space. - This method monitors the error rate between leader and follower positions - and automatically triggers intervention when the error rate exceeds + This method monitors the rate of change of leader-follower error in end_effector space + and automatically triggers intervention when the rate of change exceeds the intervention threshold, releasing when it falls below the release threshold. Returns: @@ -1499,35 +1500,41 @@ class GearedLeaderAutomaticControlWrapper(BaseLeaderControlWrapper): leader_positions = self.robot_leader.read("Present_Position") follower_positions = self.robot_follower.read("Present_Position") - # Calculate error and error rate - error = np.linalg.norm(leader_positions - follower_positions) - error_over_time = np.abs(error - self.previous_error) + leader_ee = self.kinematics.fk_gripper_tip(leader_positions)[:3, 3] + follower_ee = self.kinematics.fk_gripper_tip(follower_positions)[:3, 3] - # Add to queue for running average - self.error_queue.append(error) - self.error_over_time_queue.append(error_over_time) + ee_error = np.linalg.norm(leader_ee - follower_ee) + ee_error_over_time = np.abs(ee_error - self.previous_ee_error) - # Update previous error - self.previous_error = error + self.ee_error_queue.append(ee_error) + self.ee_error_over_time_queue.append(ee_error_over_time) # Calculate averages if we have enough data - if len(self.error_over_time_queue) >= self.queue_size: - avg_error_over_time = np.mean(self.error_over_time_queue) + if len(self.ee_error_over_time_queue) >= self.queue_size: + if self.previous_ee_error_over_time_over_time is None: + # Initialize the previous error over time + self.previous_ee_error_over_time_over_time = self.ee_error_over_time_queue[-1] + else: + # Calculate the rate of change of error over time + avg_error_over_time = np.abs( + self.ee_error_over_time_queue[-1] - self.previous_ee_error_over_time_over_time + ) + self.previous_ee_error_over_time_over_time = self.ee_error_over_time_queue[-1] - # Debug info - if self.is_intervention_active: - logging.debug(f"Error rate during intervention: {avg_error_over_time:.4f}") + # Determine if intervention should start or stop based on the thresholds set in the constructor + if not self.is_intervention_active and avg_error_over_time > self.intervention_threshold: + # Transition to intervention mode + self.is_intervention_active = True + logging.info(f"Starting automatic intervention: error rate {avg_error_over_time:.4f}") + self.ee_error_over_time_queue.clear() + self.previous_ee_error_over_time_over_time = None - # Determine if intervention should start or stop - if not self.is_intervention_active and avg_error_over_time > self.intervention_threshold: - # Transition to intervention mode - self.is_intervention_active = True - logging.info(f"Starting automatic intervention: error rate {avg_error_over_time:.4f}") - - elif self.is_intervention_active and avg_error_over_time < self.release_threshold: - # End intervention mode - self.is_intervention_active = False - logging.info(f"Ending automatic intervention: error rate {avg_error_over_time:.4f}") + elif self.is_intervention_active and avg_error_over_time < self.release_threshold: + # End intervention mode + self.is_intervention_active = False + logging.info(f"Ending automatic intervention: error rate {avg_error_over_time:.4f}") + self.ee_error_over_time_queue.clear() + self.previous_ee_error_over_time_over_time = None return self.is_intervention_active @@ -1541,11 +1548,12 @@ class GearedLeaderAutomaticControlWrapper(BaseLeaderControlWrapper): Returns: The initial observation and info. """ - self.error_queue.clear() - self.error_over_time_queue.clear() - self.previous_error = 0.0 self.is_intervention_active = False self.start_time = time.perf_counter() + self.ee_error_queue.clear() + self.ee_error_over_time_queue.clear() + self.previous_ee_error = 0.0 + self.old_avg_error_over_time = 0.0 return super().reset(**kwargs)