Fix unit tests

This commit is contained in:
Remi Cadene
2024-09-25 14:11:28 +02:00
parent 6377d2a96c
commit bded8cbbe9
4 changed files with 31 additions and 10 deletions

View File

@@ -242,7 +242,8 @@ def is_headless():
########################################################################################
def calibrate(robot: Robot, arms: list[str] | None):
def get_available_arms(robot):
# TODO(rcadene): moves this function in manipulator class?
available_arms = []
for name in robot.follower_arms:
arm_id = get_arm_id(name, "follower")
@@ -250,9 +251,12 @@ def calibrate(robot: Robot, arms: list[str] | None):
for name in robot.leader_arms:
arm_id = get_arm_id(name, "leader")
available_arms.append(arm_id)
return available_arms
def calibrate(robot: Robot, arms: list[str] | None):
available_arms = get_available_arms(robot)
unknown_arms = [arm_id for arm_id in arms if arm_id not in available_arms]
available_arms_str = " ".join(available_arms)
unknown_arms_str = " ".join(unknown_arms)
@@ -445,6 +449,7 @@ def record(
# Using `with` to exist smoothly if an execption is raised.
futures = []
num_image_writers = num_image_writers_per_camera * len(robot.cameras)
num_image_writers = max(num_image_writers, 1)
with concurrent.futures.ThreadPoolExecutor(max_workers=num_image_writers) as executor:
# Start recording all episodes
while episode_index < num_episodes: