Improved the takeover logic in the case of leader_automatic control_mode in gym_manipulator.py

This commit is contained in:
Michel Aractingi
2025-05-12 17:47:13 +02:00
committed by AdilZouitine
parent aa793cbd4a
commit 7694d03dee

View File

@@ -1180,9 +1180,9 @@ class BaseLeaderControlWrapper(gym.Wrapper):
# With higher gains, it would be dangerous and difficult to modify the leader's pose while torque is enabled
# Default value for P_coeff is 32
self.robot_leader.write("Torque_Enable", 1)
self.robot_leader.write("P_Coefficient", 4)
self.robot_leader.write("P_Coefficient", 16)
self.robot_leader.write("I_Coefficient", 0)
self.robot_leader.write("D_Coefficient", 4)
self.robot_leader.write("D_Coefficient", 16)
self._init_keyboard_listener()
@@ -1451,9 +1451,9 @@ class GearedLeaderAutomaticControlWrapper(BaseLeaderControlWrapper):
env,
ee_action_space_params=None,
use_gripper=False,
intervention_threshold=1.7,
release_threshold=0.01,
queue_size=10,
intervention_threshold=0.005,
release_threshold=5e-6,
queue_size=3,
):
"""
Initialize the automatic intervention wrapper.
@@ -1474,18 +1474,19 @@ class GearedLeaderAutomaticControlWrapper(BaseLeaderControlWrapper):
self.queue_size = queue_size # Number of error measurements to keep
# Error tracking variables
self.error_queue = deque(maxlen=self.queue_size)
self.error_over_time_queue = deque(maxlen=self.queue_size)
self.previous_error = 0.0
self.ee_error_queue = deque(maxlen=self.queue_size)
self.ee_error_over_time_queue = deque(maxlen=self.queue_size)
self.previous_ee_error = 0.0
self.previous_ee_error_over_time_over_time = 0.0
self.is_intervention_active = False
self.start_time = time.perf_counter()
def _check_intervention(self):
"""
Determine if intervention should occur based on leader-follower error.
Determine if intervention should occur based on the rate of change of leader-follower error in end_effector space.
This method monitors the error rate between leader and follower positions
and automatically triggers intervention when the error rate exceeds
This method monitors the rate of change of leader-follower error in end_effector space
and automatically triggers intervention when the rate of change exceeds
the intervention threshold, releasing when it falls below the release threshold.
Returns:
@@ -1499,35 +1500,41 @@ class GearedLeaderAutomaticControlWrapper(BaseLeaderControlWrapper):
leader_positions = self.robot_leader.read("Present_Position")
follower_positions = self.robot_follower.read("Present_Position")
# Calculate error and error rate
error = np.linalg.norm(leader_positions - follower_positions)
error_over_time = np.abs(error - self.previous_error)
leader_ee = self.kinematics.fk_gripper_tip(leader_positions)[:3, 3]
follower_ee = self.kinematics.fk_gripper_tip(follower_positions)[:3, 3]
# Add to queue for running average
self.error_queue.append(error)
self.error_over_time_queue.append(error_over_time)
ee_error = np.linalg.norm(leader_ee - follower_ee)
ee_error_over_time = np.abs(ee_error - self.previous_ee_error)
# Update previous error
self.previous_error = error
self.ee_error_queue.append(ee_error)
self.ee_error_over_time_queue.append(ee_error_over_time)
# Calculate averages if we have enough data
if len(self.error_over_time_queue) >= self.queue_size:
avg_error_over_time = np.mean(self.error_over_time_queue)
if len(self.ee_error_over_time_queue) >= self.queue_size:
if self.previous_ee_error_over_time_over_time is None:
# Initialize the previous error over time
self.previous_ee_error_over_time_over_time = self.ee_error_over_time_queue[-1]
else:
# Calculate the rate of change of error over time
avg_error_over_time = np.abs(
self.ee_error_over_time_queue[-1] - self.previous_ee_error_over_time_over_time
)
self.previous_ee_error_over_time_over_time = self.ee_error_over_time_queue[-1]
# Debug info
if self.is_intervention_active:
logging.debug(f"Error rate during intervention: {avg_error_over_time:.4f}")
# Determine if intervention should start or stop based on the thresholds set in the constructor
if not self.is_intervention_active and avg_error_over_time > self.intervention_threshold:
# Transition to intervention mode
self.is_intervention_active = True
logging.info(f"Starting automatic intervention: error rate {avg_error_over_time:.4f}")
self.ee_error_over_time_queue.clear()
self.previous_ee_error_over_time_over_time = None
# Determine if intervention should start or stop
if not self.is_intervention_active and avg_error_over_time > self.intervention_threshold:
# Transition to intervention mode
self.is_intervention_active = True
logging.info(f"Starting automatic intervention: error rate {avg_error_over_time:.4f}")
elif self.is_intervention_active and avg_error_over_time < self.release_threshold:
# End intervention mode
self.is_intervention_active = False
logging.info(f"Ending automatic intervention: error rate {avg_error_over_time:.4f}")
elif self.is_intervention_active and avg_error_over_time < self.release_threshold:
# End intervention mode
self.is_intervention_active = False
logging.info(f"Ending automatic intervention: error rate {avg_error_over_time:.4f}")
self.ee_error_over_time_queue.clear()
self.previous_ee_error_over_time_over_time = None
return self.is_intervention_active
@@ -1541,11 +1548,12 @@ class GearedLeaderAutomaticControlWrapper(BaseLeaderControlWrapper):
Returns:
The initial observation and info.
"""
self.error_queue.clear()
self.error_over_time_queue.clear()
self.previous_error = 0.0
self.is_intervention_active = False
self.start_time = time.perf_counter()
self.ee_error_queue.clear()
self.ee_error_over_time_queue.clear()
self.previous_ee_error = 0.0
self.old_avg_error_over_time = 0.0
return super().reset(**kwargs)