[pre-commit.ci] auto fixes from pre-commit.com hooks
for more information, see https://pre-commit.ci
This commit is contained in:
committed by
Michel Aractingi
parent
bb69cb3c8c
commit
85fe8a3f4e
@@ -24,9 +24,7 @@ from lerobot.common.robot_devices.motors.dynamixel import (
|
||||
)
|
||||
from lerobot.common.robot_devices.motors.utils import MotorsBus
|
||||
|
||||
URL_TEMPLATE = (
|
||||
"https://raw.githubusercontent.com/huggingface/lerobot/main/media/{robot}/{arm}_{position}.webp"
|
||||
)
|
||||
URL_TEMPLATE = "https://raw.githubusercontent.com/huggingface/lerobot/main/media/{robot}/{arm}_{position}.webp"
|
||||
|
||||
# The following positions are provided in nominal degree range ]-180, +180[
|
||||
# For more info on these constants, see comments in the code where they get used.
|
||||
@@ -37,7 +35,9 @@ ROTATED_POSITION_DEGREE = 90
|
||||
def assert_drive_mode(drive_mode):
|
||||
# `drive_mode` is in [0,1] with 0 means original rotation direction for the motor, and 1 means inverted.
|
||||
if not np.all(np.isin(drive_mode, [0, 1])):
|
||||
raise ValueError(f"`drive_mode` contains values other than 0 or 1: ({drive_mode})")
|
||||
raise ValueError(
|
||||
f"`drive_mode` contains values other than 0 or 1: ({drive_mode})"
|
||||
)
|
||||
|
||||
|
||||
def apply_drive_mode(position, drive_mode):
|
||||
@@ -78,12 +78,16 @@ def run_arm_calibration(arm: MotorsBus, robot_type: str, arm_name: str, arm_type
|
||||
```
|
||||
"""
|
||||
if (arm.read("Torque_Enable") != TorqueMode.DISABLED.value).any():
|
||||
raise ValueError("To run calibration, the torque must be disabled on all motors.")
|
||||
raise ValueError(
|
||||
"To run calibration, the torque must be disabled on all motors."
|
||||
)
|
||||
|
||||
print(f"\nRunning calibration of {robot_type} {arm_name} {arm_type}...")
|
||||
|
||||
print("\nMove arm to zero position")
|
||||
print("See: " + URL_TEMPLATE.format(robot=robot_type, arm=arm_type, position="zero"))
|
||||
print(
|
||||
"See: " + URL_TEMPLATE.format(robot=robot_type, arm=arm_type, position="zero")
|
||||
)
|
||||
input("Press Enter to continue...")
|
||||
|
||||
# We arbitrarily chose our zero target position to be a straight horizontal position with gripper upwards and closed.
|
||||
@@ -104,10 +108,15 @@ def run_arm_calibration(arm: MotorsBus, robot_type: str, arm_name: str, arm_type
|
||||
# corresponds to opening the gripper. When the rotation direction is ambiguous, we arbitrarily rotate clockwise from the point of view
|
||||
# of the previous motor in the kinetic chain.
|
||||
print("\nMove arm to rotated target position")
|
||||
print("See: " + URL_TEMPLATE.format(robot=robot_type, arm=arm_type, position="rotated"))
|
||||
print(
|
||||
"See: "
|
||||
+ URL_TEMPLATE.format(robot=robot_type, arm=arm_type, position="rotated")
|
||||
)
|
||||
input("Press Enter to continue...")
|
||||
|
||||
rotated_target_pos = convert_degrees_to_steps(ROTATED_POSITION_DEGREE, arm.motor_models)
|
||||
rotated_target_pos = convert_degrees_to_steps(
|
||||
ROTATED_POSITION_DEGREE, arm.motor_models
|
||||
)
|
||||
|
||||
# Find drive mode by rotating each motor by a quarter of a turn.
|
||||
# Drive mode indicates if the motor rotation direction should be inverted (=1) or not (=0).
|
||||
@@ -116,11 +125,15 @@ def run_arm_calibration(arm: MotorsBus, robot_type: str, arm_name: str, arm_type
|
||||
|
||||
# Re-compute homing offset to take into account drive mode
|
||||
rotated_drived_pos = apply_drive_mode(rotated_pos, drive_mode)
|
||||
rotated_nearest_pos = compute_nearest_rounded_position(rotated_drived_pos, arm.motor_models)
|
||||
rotated_nearest_pos = compute_nearest_rounded_position(
|
||||
rotated_drived_pos, arm.motor_models
|
||||
)
|
||||
homing_offset = rotated_target_pos - rotated_nearest_pos
|
||||
|
||||
print("\nMove arm to rest position")
|
||||
print("See: " + URL_TEMPLATE.format(robot=robot_type, arm=arm_type, position="rest"))
|
||||
print(
|
||||
"See: " + URL_TEMPLATE.format(robot=robot_type, arm=arm_type, position="rest")
|
||||
)
|
||||
input("Press Enter to continue...")
|
||||
print()
|
||||
|
||||
|
||||
@@ -26,9 +26,7 @@ from lerobot.common.robot_devices.motors.feetech import (
|
||||
)
|
||||
from lerobot.common.robot_devices.motors.utils import MotorsBus
|
||||
|
||||
URL_TEMPLATE = (
|
||||
"https://raw.githubusercontent.com/huggingface/lerobot/main/media/{robot}/{arm}_{position}.webp"
|
||||
)
|
||||
URL_TEMPLATE = "https://raw.githubusercontent.com/huggingface/lerobot/main/media/{robot}/{arm}_{position}.webp"
|
||||
|
||||
# The following positions are provided in nominal degree range ]-180, +180[
|
||||
# For more info on these constants, see comments in the code where they get used.
|
||||
@@ -39,7 +37,9 @@ ROTATED_POSITION_DEGREE = 90
|
||||
def assert_drive_mode(drive_mode):
|
||||
# `drive_mode` is in [0,1] with 0 means original rotation direction for the motor, and 1 means inverted.
|
||||
if not np.all(np.isin(drive_mode, [0, 1])):
|
||||
raise ValueError(f"`drive_mode` contains values other than 0 or 1: ({drive_mode})")
|
||||
raise ValueError(
|
||||
f"`drive_mode` contains values other than 0 or 1: ({drive_mode})"
|
||||
)
|
||||
|
||||
|
||||
def apply_drive_mode(position, drive_mode):
|
||||
@@ -140,7 +140,9 @@ def apply_offset(calib, offset):
|
||||
return calib
|
||||
|
||||
|
||||
def run_arm_auto_calibration(arm: MotorsBus, robot_type: str, arm_name: str, arm_type: str):
|
||||
def run_arm_auto_calibration(
|
||||
arm: MotorsBus, robot_type: str, arm_name: str, arm_type: str
|
||||
):
|
||||
if robot_type == "so100":
|
||||
return run_arm_auto_calibration_so100(arm, robot_type, arm_name, arm_type)
|
||||
elif robot_type == "moss":
|
||||
@@ -149,18 +151,27 @@ def run_arm_auto_calibration(arm: MotorsBus, robot_type: str, arm_name: str, arm
|
||||
raise ValueError(robot_type)
|
||||
|
||||
|
||||
def run_arm_auto_calibration_so100(arm: MotorsBus, robot_type: str, arm_name: str, arm_type: str):
|
||||
def run_arm_auto_calibration_so100(
|
||||
arm: MotorsBus, robot_type: str, arm_name: str, arm_type: str
|
||||
):
|
||||
"""All the offsets and magic numbers are hand tuned, and are unique to SO-100 follower arms"""
|
||||
if (arm.read("Torque_Enable") != TorqueMode.DISABLED.value).any():
|
||||
raise ValueError("To run calibration, the torque must be disabled on all motors.")
|
||||
raise ValueError(
|
||||
"To run calibration, the torque must be disabled on all motors."
|
||||
)
|
||||
|
||||
if not (robot_type == "so100" and arm_type == "follower"):
|
||||
raise NotImplementedError("Auto calibration only supports the follower of so100 arms for now.")
|
||||
raise NotImplementedError(
|
||||
"Auto calibration only supports the follower of so100 arms for now."
|
||||
)
|
||||
|
||||
print(f"\nRunning calibration of {robot_type} {arm_name} {arm_type}...")
|
||||
|
||||
print("\nMove arm to initial position")
|
||||
print("See: " + URL_TEMPLATE.format(robot=robot_type, arm=arm_type, position="initial"))
|
||||
print(
|
||||
"See: "
|
||||
+ URL_TEMPLATE.format(robot=robot_type, arm=arm_type, position="initial")
|
||||
)
|
||||
input("Press Enter to continue...")
|
||||
|
||||
# Lower the acceleration of the motors (in [0,254])
|
||||
@@ -207,11 +218,16 @@ def run_arm_auto_calibration_so100(arm: MotorsBus, robot_type: str, arm_name: st
|
||||
|
||||
print("Calibrate elbow_flex")
|
||||
calib["elbow_flex"] = move_to_calibrate(
|
||||
arm, "elbow_flex", positive_first=False, in_between_move_hook=in_between_move_hook
|
||||
arm,
|
||||
"elbow_flex",
|
||||
positive_first=False,
|
||||
in_between_move_hook=in_between_move_hook,
|
||||
)
|
||||
calib["elbow_flex"] = apply_offset(calib["elbow_flex"], offset=80 - 1024)
|
||||
|
||||
arm.write("Goal_Position", calib["elbow_flex"]["zero_pos"] + 1024 + 512, "elbow_flex")
|
||||
arm.write(
|
||||
"Goal_Position", calib["elbow_flex"]["zero_pos"] + 1024 + 512, "elbow_flex"
|
||||
)
|
||||
time.sleep(1)
|
||||
|
||||
def in_between_move_hook():
|
||||
@@ -239,18 +255,30 @@ def run_arm_auto_calibration_so100(arm: MotorsBus, robot_type: str, arm_name: st
|
||||
}
|
||||
arm.write("Goal_Position", list(positions.values()), list(positions.keys()))
|
||||
|
||||
arm.write("Goal_Position", round(calib["shoulder_lift"]["zero_pos"] - 1600), "shoulder_lift")
|
||||
arm.write(
|
||||
"Goal_Position",
|
||||
round(calib["shoulder_lift"]["zero_pos"] - 1600),
|
||||
"shoulder_lift",
|
||||
)
|
||||
time.sleep(2)
|
||||
arm.write("Goal_Position", round(calib["elbow_flex"]["zero_pos"] + 1700), "elbow_flex")
|
||||
arm.write(
|
||||
"Goal_Position", round(calib["elbow_flex"]["zero_pos"] + 1700), "elbow_flex"
|
||||
)
|
||||
time.sleep(2)
|
||||
arm.write("Goal_Position", round(calib["wrist_flex"]["zero_pos"] + 800), "wrist_flex")
|
||||
arm.write(
|
||||
"Goal_Position", round(calib["wrist_flex"]["zero_pos"] + 800), "wrist_flex"
|
||||
)
|
||||
time.sleep(2)
|
||||
arm.write("Goal_Position", round(calib["gripper"]["end_pos"]), "gripper")
|
||||
time.sleep(2)
|
||||
|
||||
print("Calibrate wrist_roll")
|
||||
calib["wrist_roll"] = move_to_calibrate(
|
||||
arm, "wrist_roll", invert_drive_mode=True, positive_first=False, while_move_hook=while_move_hook
|
||||
arm,
|
||||
"wrist_roll",
|
||||
invert_drive_mode=True,
|
||||
positive_first=False,
|
||||
while_move_hook=while_move_hook,
|
||||
)
|
||||
|
||||
arm.write("Goal_Position", calib["wrist_roll"]["zero_pos"], "wrist_roll")
|
||||
@@ -260,7 +288,9 @@ def run_arm_auto_calibration_so100(arm: MotorsBus, robot_type: str, arm_name: st
|
||||
arm.write("Goal_Position", calib["wrist_flex"]["zero_pos"], "wrist_flex")
|
||||
time.sleep(1)
|
||||
arm.write("Goal_Position", calib["elbow_flex"]["zero_pos"] + 2048, "elbow_flex")
|
||||
arm.write("Goal_Position", calib["shoulder_lift"]["zero_pos"] - 2048, "shoulder_lift")
|
||||
arm.write(
|
||||
"Goal_Position", calib["shoulder_lift"]["zero_pos"] - 2048, "shoulder_lift"
|
||||
)
|
||||
time.sleep(1)
|
||||
arm.write("Goal_Position", calib["shoulder_pan"]["zero_pos"], "shoulder_pan")
|
||||
time.sleep(1)
|
||||
@@ -289,18 +319,27 @@ def run_arm_auto_calibration_so100(arm: MotorsBus, robot_type: str, arm_name: st
|
||||
return calib_dict
|
||||
|
||||
|
||||
def run_arm_auto_calibration_moss(arm: MotorsBus, robot_type: str, arm_name: str, arm_type: str):
|
||||
def run_arm_auto_calibration_moss(
|
||||
arm: MotorsBus, robot_type: str, arm_name: str, arm_type: str
|
||||
):
|
||||
"""All the offsets and magic numbers are hand tuned, and are unique to SO-100 follower arms"""
|
||||
if (arm.read("Torque_Enable") != TorqueMode.DISABLED.value).any():
|
||||
raise ValueError("To run calibration, the torque must be disabled on all motors.")
|
||||
raise ValueError(
|
||||
"To run calibration, the torque must be disabled on all motors."
|
||||
)
|
||||
|
||||
if not (robot_type == "moss" and arm_type == "follower"):
|
||||
raise NotImplementedError("Auto calibration only supports the follower of moss arms for now.")
|
||||
raise NotImplementedError(
|
||||
"Auto calibration only supports the follower of moss arms for now."
|
||||
)
|
||||
|
||||
print(f"\nRunning calibration of {robot_type} {arm_name} {arm_type}...")
|
||||
|
||||
print("\nMove arm to initial position")
|
||||
print("See: " + URL_TEMPLATE.format(robot=robot_type, arm=arm_type, position="initial"))
|
||||
print(
|
||||
"See: "
|
||||
+ URL_TEMPLATE.format(robot=robot_type, arm=arm_type, position="initial")
|
||||
)
|
||||
input("Press Enter to continue...")
|
||||
|
||||
# Lower the acceleration of the motors (in [0,254])
|
||||
@@ -384,8 +423,12 @@ def run_arm_auto_calibration_moss(arm: MotorsBus, robot_type: str, arm_name: str
|
||||
|
||||
arm.write("Goal_Position", calib["wrist_flex"]["zero_pos"] - 1024, "wrist_flex")
|
||||
time.sleep(1)
|
||||
arm.write("Goal_Position", calib["shoulder_lift"]["zero_pos"] + 2048, "shoulder_lift")
|
||||
arm.write("Goal_Position", calib["elbow_flex"]["zero_pos"] - 1024 - 400, "elbow_flex")
|
||||
arm.write(
|
||||
"Goal_Position", calib["shoulder_lift"]["zero_pos"] + 2048, "shoulder_lift"
|
||||
)
|
||||
arm.write(
|
||||
"Goal_Position", calib["elbow_flex"]["zero_pos"] - 1024 - 400, "elbow_flex"
|
||||
)
|
||||
time.sleep(2)
|
||||
|
||||
calib_modes = []
|
||||
@@ -412,7 +455,9 @@ def run_arm_auto_calibration_moss(arm: MotorsBus, robot_type: str, arm_name: str
|
||||
return calib_dict
|
||||
|
||||
|
||||
def run_arm_manual_calibration(arm: MotorsBus, robot_type: str, arm_name: str, arm_type: str):
|
||||
def run_arm_manual_calibration(
|
||||
arm: MotorsBus, robot_type: str, arm_name: str, arm_type: str
|
||||
):
|
||||
"""This function ensures that a neural network trained on data collected on a given robot
|
||||
can work on another robot. For instance before calibration, setting a same goal position
|
||||
for each motor of two different robots will get two very different positions. But after calibration,
|
||||
@@ -435,12 +480,16 @@ def run_arm_manual_calibration(arm: MotorsBus, robot_type: str, arm_name: str, a
|
||||
```
|
||||
"""
|
||||
if (arm.read("Torque_Enable") != TorqueMode.DISABLED.value).any():
|
||||
raise ValueError("To run calibration, the torque must be disabled on all motors.")
|
||||
raise ValueError(
|
||||
"To run calibration, the torque must be disabled on all motors."
|
||||
)
|
||||
|
||||
print(f"\nRunning calibration of {robot_type} {arm_name} {arm_type}...")
|
||||
|
||||
print("\nMove arm to zero position")
|
||||
print("See: " + URL_TEMPLATE.format(robot=robot_type, arm=arm_type, position="zero"))
|
||||
print(
|
||||
"See: " + URL_TEMPLATE.format(robot=robot_type, arm=arm_type, position="zero")
|
||||
)
|
||||
input("Press Enter to continue...")
|
||||
|
||||
# We arbitrarily chose our zero target position to be a straight horizontal position with gripper upwards and closed.
|
||||
@@ -460,10 +509,15 @@ def run_arm_manual_calibration(arm: MotorsBus, robot_type: str, arm_name: str, a
|
||||
# corresponds to opening the gripper. When the rotation direction is ambiguous, we arbitrarily rotate clockwise from the point of view
|
||||
# of the previous motor in the kinetic chain.
|
||||
print("\nMove arm to rotated target position")
|
||||
print("See: " + URL_TEMPLATE.format(robot=robot_type, arm=arm_type, position="rotated"))
|
||||
print(
|
||||
"See: "
|
||||
+ URL_TEMPLATE.format(robot=robot_type, arm=arm_type, position="rotated")
|
||||
)
|
||||
input("Press Enter to continue...")
|
||||
|
||||
rotated_target_pos = convert_degrees_to_steps(ROTATED_POSITION_DEGREE, arm.motor_models)
|
||||
rotated_target_pos = convert_degrees_to_steps(
|
||||
ROTATED_POSITION_DEGREE, arm.motor_models
|
||||
)
|
||||
|
||||
# Find drive mode by rotating each motor by a quarter of a turn.
|
||||
# Drive mode indicates if the motor rotation direction should be inverted (=1) or not (=0).
|
||||
@@ -475,7 +529,9 @@ def run_arm_manual_calibration(arm: MotorsBus, robot_type: str, arm_name: str, a
|
||||
homing_offset = rotated_target_pos - rotated_drived_pos
|
||||
|
||||
print("\nMove arm to rest position")
|
||||
print("See: " + URL_TEMPLATE.format(robot=robot_type, arm=arm_type, position="rest"))
|
||||
print(
|
||||
"See: " + URL_TEMPLATE.format(robot=robot_type, arm=arm_type, position="rest")
|
||||
)
|
||||
input("Press Enter to continue...")
|
||||
print()
|
||||
|
||||
|
||||
@@ -31,11 +31,16 @@ from lerobot.common.robot_devices.cameras.utils import make_cameras_from_configs
|
||||
from lerobot.common.robot_devices.motors.utils import MotorsBus, make_motors_buses_from_configs
|
||||
from lerobot.common.robot_devices.robots.configs import ManipulatorRobotConfig
|
||||
from lerobot.common.robot_devices.robots.utils import get_arm_id
|
||||
from lerobot.common.robot_devices.utils import RobotDeviceAlreadyConnectedError, RobotDeviceNotConnectedError
|
||||
from lerobot.common.robot_devices.utils import (
|
||||
RobotDeviceAlreadyConnectedError,
|
||||
RobotDeviceNotConnectedError,
|
||||
)
|
||||
|
||||
|
||||
def ensure_safe_goal_position(
|
||||
goal_pos: torch.Tensor, present_pos: torch.Tensor, max_relative_target: float | list[float]
|
||||
goal_pos: torch.Tensor,
|
||||
present_pos: torch.Tensor,
|
||||
max_relative_target: float | list[float],
|
||||
):
|
||||
# Cap relative action target magnitude for safety.
|
||||
diff = goal_pos - present_pos
|
||||
@@ -277,7 +282,9 @@ class ManipulatorRobot:
|
||||
# to squeeze the gripper and have it spring back to an open position on its own.
|
||||
for name in self.leader_arms:
|
||||
self.leader_arms[name].write("Torque_Enable", 1, "gripper")
|
||||
self.leader_arms[name].write("Goal_Position", self.config.gripper_open_degree, "gripper")
|
||||
self.leader_arms[name].write(
|
||||
"Goal_Position", self.config.gripper_open_degree, "gripper"
|
||||
)
|
||||
|
||||
# Check both arms can be read
|
||||
for name in self.follower_arms:
|
||||
@@ -309,18 +316,26 @@ class ManipulatorRobot:
|
||||
print(f"Missing calibration file '{arm_calib_path}'")
|
||||
|
||||
if self.robot_type in ["koch", "koch_bimanual", "aloha"]:
|
||||
from lerobot.common.robot_devices.robots.dynamixel_calibration import run_arm_calibration
|
||||
from lerobot.common.robot_devices.robots.dynamixel_calibration import (
|
||||
run_arm_calibration,
|
||||
)
|
||||
|
||||
calibration = run_arm_calibration(arm, self.robot_type, name, arm_type)
|
||||
calibration = run_arm_calibration(
|
||||
arm, self.robot_type, name, arm_type
|
||||
)
|
||||
|
||||
elif self.robot_type in ["so100", "moss", "lekiwi"]:
|
||||
from lerobot.common.robot_devices.robots.feetech_calibration import (
|
||||
run_arm_manual_calibration,
|
||||
)
|
||||
|
||||
calibration = run_arm_manual_calibration(arm, self.robot_type, name, arm_type)
|
||||
calibration = run_arm_manual_calibration(
|
||||
arm, self.robot_type, name, arm_type
|
||||
)
|
||||
|
||||
print(f"Calibration is done! Saving calibration file '{arm_calib_path}'")
|
||||
print(
|
||||
f"Calibration is done! Saving calibration file '{arm_calib_path}'"
|
||||
)
|
||||
arm_calib_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
with open(arm_calib_path, "w") as f:
|
||||
json.dump(calibration, f)
|
||||
@@ -339,13 +354,17 @@ class ManipulatorRobot:
|
||||
from lerobot.common.robot_devices.motors.dynamixel import TorqueMode
|
||||
|
||||
if (arm.read("Torque_Enable") != TorqueMode.DISABLED.value).any():
|
||||
raise ValueError("To run set robot preset, the torque must be disabled on all motors.")
|
||||
raise ValueError(
|
||||
"To run set robot preset, the torque must be disabled on all motors."
|
||||
)
|
||||
|
||||
# Use 'extended position mode' for all motors except gripper, because in joint mode the servos can't
|
||||
# rotate more than 360 degrees (from 0 to 4095) And some mistake can happen while assembling the arm,
|
||||
# you could end up with a servo with a position 0 or 4095 at a crucial point See [
|
||||
# https://emanual.robotis.com/docs/en/dxl/x/x_series/#operating-mode11]
|
||||
all_motors_except_gripper = [name for name in arm.motor_names if name != "gripper"]
|
||||
all_motors_except_gripper = [
|
||||
name for name in arm.motor_names if name != "gripper"
|
||||
]
|
||||
if len(all_motors_except_gripper) > 0:
|
||||
# 4 corresponds to Extended Position on Koch motors
|
||||
arm.write("Operating_Mode", 4, all_motors_except_gripper)
|
||||
@@ -374,7 +393,9 @@ class ManipulatorRobot:
|
||||
# Enable torque on the gripper of the leader arms, and move it to 45 degrees,
|
||||
# so that we can use it as a trigger to close the gripper of the follower arms.
|
||||
self.leader_arms[name].write("Torque_Enable", 1, "gripper")
|
||||
self.leader_arms[name].write("Goal_Position", self.config.gripper_open_degree, "gripper")
|
||||
self.leader_arms[name].write(
|
||||
"Goal_Position", self.config.gripper_open_degree, "gripper"
|
||||
)
|
||||
|
||||
def set_aloha_robot_preset(self):
|
||||
def set_shadow_(arm):
|
||||
@@ -404,11 +425,15 @@ class ManipulatorRobot:
|
||||
# you could end up with a servo with a position 0 or 4095 at a crucial point See [
|
||||
# https://emanual.robotis.com/docs/en/dxl/x/x_series/#operating-mode11]
|
||||
all_motors_except_gripper = [
|
||||
name for name in self.follower_arms[name].motor_names if name != "gripper"
|
||||
name
|
||||
for name in self.follower_arms[name].motor_names
|
||||
if name != "gripper"
|
||||
]
|
||||
if len(all_motors_except_gripper) > 0:
|
||||
# 4 corresponds to Extended Position on Aloha motors
|
||||
self.follower_arms[name].write("Operating_Mode", 4, all_motors_except_gripper)
|
||||
self.follower_arms[name].write(
|
||||
"Operating_Mode", 4, all_motors_except_gripper
|
||||
)
|
||||
|
||||
# Use 'position control current based' for follower gripper to be limited by the limit of the current.
|
||||
# It can grasp an object without forcing too much even tho,
|
||||
@@ -456,7 +481,9 @@ class ManipulatorRobot:
|
||||
before_lread_t = time.perf_counter()
|
||||
leader_pos[name] = self.leader_arms[name].read("Present_Position")
|
||||
leader_pos[name] = torch.from_numpy(leader_pos[name])
|
||||
self.logs[f"read_leader_{name}_pos_dt_s"] = time.perf_counter() - before_lread_t
|
||||
self.logs[f"read_leader_{name}_pos_dt_s"] = (
|
||||
time.perf_counter() - before_lread_t
|
||||
)
|
||||
|
||||
# Send goal position to the follower
|
||||
follower_goal_pos = {}
|
||||
@@ -477,14 +504,18 @@ class ManipulatorRobot:
|
||||
if self.config.max_relative_target is not None:
|
||||
present_pos = self.follower_arms[name].read("Present_Position")
|
||||
present_pos = torch.from_numpy(present_pos)
|
||||
goal_pos = ensure_safe_goal_position(goal_pos, present_pos, self.config.max_relative_target)
|
||||
goal_pos = ensure_safe_goal_position(
|
||||
goal_pos, present_pos, self.config.max_relative_target
|
||||
)
|
||||
|
||||
# Used when record_data=True
|
||||
follower_goal_pos[name] = goal_pos
|
||||
|
||||
goal_pos = goal_pos.numpy().astype(np.float32)
|
||||
self.follower_arms[name].write("Goal_Position", goal_pos)
|
||||
self.logs[f"write_follower_{name}_goal_pos_dt_s"] = time.perf_counter() - before_fwrite_t
|
||||
self.logs[f"write_follower_{name}_goal_pos_dt_s"] = (
|
||||
time.perf_counter() - before_fwrite_t
|
||||
)
|
||||
|
||||
# Early exit when recording data is not requested
|
||||
if not record_data:
|
||||
@@ -497,7 +528,9 @@ class ManipulatorRobot:
|
||||
before_fread_t = time.perf_counter()
|
||||
follower_pos[name] = self.follower_arms[name].read("Present_Position")
|
||||
follower_pos[name] = torch.from_numpy(follower_pos[name])
|
||||
self.logs[f"read_follower_{name}_pos_dt_s"] = time.perf_counter() - before_fread_t
|
||||
self.logs[f"read_follower_{name}_pos_dt_s"] = (
|
||||
time.perf_counter() - before_fread_t
|
||||
)
|
||||
|
||||
# Create state by concatenating follower current position
|
||||
state = []
|
||||
@@ -519,8 +552,12 @@ class ManipulatorRobot:
|
||||
before_camread_t = time.perf_counter()
|
||||
images[name] = self.cameras[name].async_read()
|
||||
images[name] = torch.from_numpy(images[name])
|
||||
self.logs[f"read_camera_{name}_dt_s"] = self.cameras[name].logs["delta_timestamp_s"]
|
||||
self.logs[f"async_read_camera_{name}_dt_s"] = time.perf_counter() - before_camread_t
|
||||
self.logs[f"read_camera_{name}_dt_s"] = self.cameras[name].logs[
|
||||
"delta_timestamp_s"
|
||||
]
|
||||
self.logs[f"async_read_camera_{name}_dt_s"] = (
|
||||
time.perf_counter() - before_camread_t
|
||||
)
|
||||
|
||||
# Populate output dictionaries
|
||||
obs_dict, action_dict = {}, {}
|
||||
@@ -544,7 +581,9 @@ class ManipulatorRobot:
|
||||
before_fread_t = time.perf_counter()
|
||||
follower_pos[name] = self.follower_arms[name].read("Present_Position")
|
||||
follower_pos[name] = torch.from_numpy(follower_pos[name])
|
||||
self.logs[f"read_follower_{name}_pos_dt_s"] = time.perf_counter() - before_fread_t
|
||||
self.logs[f"read_follower_{name}_pos_dt_s"] = (
|
||||
time.perf_counter() - before_fread_t
|
||||
)
|
||||
|
||||
# Create state by concatenating follower current position
|
||||
state = []
|
||||
@@ -559,8 +598,12 @@ class ManipulatorRobot:
|
||||
before_camread_t = time.perf_counter()
|
||||
images[name] = self.cameras[name].async_read()
|
||||
images[name] = torch.from_numpy(images[name])
|
||||
self.logs[f"read_camera_{name}_dt_s"] = self.cameras[name].logs["delta_timestamp_s"]
|
||||
self.logs[f"async_read_camera_{name}_dt_s"] = time.perf_counter() - before_camread_t
|
||||
self.logs[f"read_camera_{name}_dt_s"] = self.cameras[name].logs[
|
||||
"delta_timestamp_s"
|
||||
]
|
||||
self.logs[f"async_read_camera_{name}_dt_s"] = (
|
||||
time.perf_counter() - before_camread_t
|
||||
)
|
||||
|
||||
# Populate output dictionaries and format to pytorch
|
||||
obs_dict = {}
|
||||
@@ -606,7 +649,9 @@ class ManipulatorRobot:
|
||||
if self.config.max_relative_target is not None:
|
||||
present_pos = self.follower_arms[name].read("Present_Position")
|
||||
present_pos = torch.from_numpy(present_pos)
|
||||
goal_pos = ensure_safe_goal_position(goal_pos, present_pos, self.config.max_relative_target)
|
||||
goal_pos = ensure_safe_goal_position(
|
||||
goal_pos, present_pos, self.config.max_relative_target
|
||||
)
|
||||
|
||||
# Save tensor to concat and return
|
||||
action_sent.append(goal_pos)
|
||||
|
||||
@@ -52,7 +52,9 @@ class StretchRobot(StretchAPI):
|
||||
def connect(self) -> None:
|
||||
self.is_connected = self.startup()
|
||||
if not self.is_connected:
|
||||
print("Another process is already using Stretch. Try running 'stretch_free_robot_process.py'")
|
||||
print(
|
||||
"Another process is already using Stretch. Try running 'stretch_free_robot_process.py'"
|
||||
)
|
||||
raise ConnectionError()
|
||||
|
||||
for name in self.cameras:
|
||||
@@ -60,7 +62,9 @@ class StretchRobot(StretchAPI):
|
||||
self.is_connected = self.is_connected and self.cameras[name].is_connected
|
||||
|
||||
if not self.is_connected:
|
||||
print("Could not connect to the cameras, check that all cameras are plugged-in.")
|
||||
print(
|
||||
"Could not connect to the cameras, check that all cameras are plugged-in."
|
||||
)
|
||||
raise ConnectionError()
|
||||
|
||||
self.run_calibration()
|
||||
@@ -105,8 +109,12 @@ class StretchRobot(StretchAPI):
|
||||
before_camread_t = time.perf_counter()
|
||||
images[name] = self.cameras[name].async_read()
|
||||
images[name] = torch.from_numpy(images[name])
|
||||
self.logs[f"read_camera_{name}_dt_s"] = self.cameras[name].logs["delta_timestamp_s"]
|
||||
self.logs[f"async_read_camera_{name}_dt_s"] = time.perf_counter() - before_camread_t
|
||||
self.logs[f"read_camera_{name}_dt_s"] = self.cameras[name].logs[
|
||||
"delta_timestamp_s"
|
||||
]
|
||||
self.logs[f"async_read_camera_{name}_dt_s"] = (
|
||||
time.perf_counter() - before_camread_t
|
||||
)
|
||||
|
||||
# Populate output dictionaries
|
||||
obs_dict, action_dict = {}, {}
|
||||
@@ -150,8 +158,12 @@ class StretchRobot(StretchAPI):
|
||||
before_camread_t = time.perf_counter()
|
||||
images[name] = self.cameras[name].async_read()
|
||||
images[name] = torch.from_numpy(images[name])
|
||||
self.logs[f"read_camera_{name}_dt_s"] = self.cameras[name].logs["delta_timestamp_s"]
|
||||
self.logs[f"async_read_camera_{name}_dt_s"] = time.perf_counter() - before_camread_t
|
||||
self.logs[f"read_camera_{name}_dt_s"] = self.cameras[name].logs[
|
||||
"delta_timestamp_s"
|
||||
]
|
||||
self.logs[f"async_read_camera_{name}_dt_s"] = (
|
||||
time.perf_counter() - before_camread_t
|
||||
)
|
||||
|
||||
# Populate output dictionaries
|
||||
obs_dict = {}
|
||||
|
||||
Reference in New Issue
Block a user