format
This commit is contained in:
@@ -1,9 +1,8 @@
|
||||
import pickle
|
||||
import time
|
||||
from dataclasses import dataclass, field, replace
|
||||
from pathlib import Path
|
||||
import time
|
||||
|
||||
import einops
|
||||
import numpy as np
|
||||
import torch
|
||||
|
||||
@@ -145,15 +144,17 @@ def reset_arm(arm: MotorsBus):
|
||||
|
||||
|
||||
def run_arm_calibration(arm: MotorsBus, name: str, arm_type: str):
|
||||
""" Example of usage:
|
||||
```python
|
||||
run_arm_calibration(arm, "left", "follower")
|
||||
```
|
||||
"""Example of usage:
|
||||
```python
|
||||
run_arm_calibration(arm, "left", "follower")
|
||||
```
|
||||
"""
|
||||
reset_arm(arm)
|
||||
|
||||
# TODO(rcadene): document what position 1 mean
|
||||
print(f"Please move the '{name} {arm_type}' arm to the horizontal position (gripper fully closed, see {URL_HORIZONTAL_POSITION[arm_type]})")
|
||||
print(
|
||||
f"Please move the '{name} {arm_type}' arm to the horizontal position (gripper fully closed, see {URL_HORIZONTAL_POSITION[arm_type]})"
|
||||
)
|
||||
input("Press Enter to continue...")
|
||||
|
||||
horizontal_homing_offset = compute_homing_offset(
|
||||
@@ -161,7 +162,9 @@ def run_arm_calibration(arm: MotorsBus, name: str, arm_type: str):
|
||||
)
|
||||
|
||||
# TODO(rcadene): document what position 2 mean
|
||||
print(f"Please move the '{name} {arm_type}' arm to the 90 degree position (gripper fully open, see {URL_90_DEGREE_POSITION[arm_type]})")
|
||||
print(
|
||||
f"Please move the '{name} {arm_type}' arm to the 90 degree position (gripper fully open, see {URL_90_DEGREE_POSITION[arm_type]})"
|
||||
)
|
||||
input("Press Enter to continue...")
|
||||
|
||||
drive_mode = compute_drive_mode(arm, horizontal_homing_offset)
|
||||
@@ -197,14 +200,11 @@ class KochRobotConfig:
|
||||
"""
|
||||
|
||||
# Define all components of the robot
|
||||
leader_arms: dict[str, MotorsBus] = field(
|
||||
default_factory=lambda: {}
|
||||
)
|
||||
follower_arms: dict[str, MotorsBus] = field(
|
||||
default_factory=lambda: {}
|
||||
)
|
||||
leader_arms: dict[str, MotorsBus] = field(default_factory=lambda: {})
|
||||
follower_arms: dict[str, MotorsBus] = field(default_factory=lambda: {})
|
||||
cameras: dict[str, Camera] = field(default_factory=lambda: {})
|
||||
|
||||
|
||||
class KochRobot:
|
||||
"""Tau Robotics: https://tau-robotics.com
|
||||
|
||||
@@ -267,7 +267,7 @@ class KochRobot:
|
||||
"macbookpro": OpenCVCamera(camera_index=0, fps=30, width=640, height=480),
|
||||
"iphone": OpenCVCamera(camera_index=1, fps=30, width=640, height=480),
|
||||
}
|
||||
|
||||
|
||||
# Assumes leader and follower arms have been instantiated already (see first example)
|
||||
robot = KochRobot(leader_arms, follower_arms, cameras)
|
||||
robot.connect()
|
||||
@@ -318,10 +318,14 @@ class KochRobot:
|
||||
|
||||
def connect(self):
|
||||
if self.is_connected:
|
||||
raise RobotDeviceAlreadyConnectedError(f"KochRobot is already connected. Do not run `robot.connect()` twice.")
|
||||
raise RobotDeviceAlreadyConnectedError(
|
||||
"KochRobot is already connected. Do not run `robot.connect()` twice."
|
||||
)
|
||||
|
||||
if not self.leader_arms and not self.follower_arms and not self.cameras:
|
||||
raise ValueError("KochRobot doesn't have any device to connect. See example of usage in docstring of the class.")
|
||||
raise ValueError(
|
||||
"KochRobot doesn't have any device to connect. See example of usage in docstring of the class."
|
||||
)
|
||||
|
||||
# Connect the arms
|
||||
for name in self.follower_arms:
|
||||
@@ -391,7 +395,9 @@ class KochRobot:
|
||||
self, record_data=False
|
||||
) -> None | tuple[dict[str, torch.Tensor], dict[str, torch.Tensor]]:
|
||||
if not self.is_connected:
|
||||
raise RobotDeviceNotConnectedError(f"KochRobot is not connected. You need to run `robot.connect()`.")
|
||||
raise RobotDeviceNotConnectedError(
|
||||
"KochRobot is not connected. You need to run `robot.connect()`."
|
||||
)
|
||||
|
||||
# Prepare to assign the positions of the leader to the follower
|
||||
leader_pos = {}
|
||||
@@ -455,7 +461,9 @@ class KochRobot:
|
||||
def capture_observation(self):
|
||||
"""The returned observations do not have a batch dimension."""
|
||||
if not self.is_connected:
|
||||
raise RobotDeviceNotConnectedError(f"KochRobot is not connected. You need to run `robot.connect()`.")
|
||||
raise RobotDeviceNotConnectedError(
|
||||
"KochRobot is not connected. You need to run `robot.connect()`."
|
||||
)
|
||||
|
||||
# Read follower position
|
||||
follower_pos = {}
|
||||
@@ -488,8 +496,10 @@ class KochRobot:
|
||||
def send_action(self, action: torch.Tensor):
|
||||
"""The provided action is expected to be a vector."""
|
||||
if not self.is_connected:
|
||||
raise RobotDeviceNotConnectedError(f"KochRobot is not connected. You need to run `robot.connect()`.")
|
||||
|
||||
raise RobotDeviceNotConnectedError(
|
||||
"KochRobot is not connected. You need to run `robot.connect()`."
|
||||
)
|
||||
|
||||
from_idx = 0
|
||||
to_idx = 0
|
||||
follower_goal_pos = {}
|
||||
@@ -504,7 +514,9 @@ class KochRobot:
|
||||
|
||||
def disconnect(self):
|
||||
if not self.is_connected:
|
||||
raise RobotDeviceNotConnectedError(f"KochRobot is not connected. You need to run `robot.connect()` before disconnecting.")
|
||||
raise RobotDeviceNotConnectedError(
|
||||
"KochRobot is not connected. You need to run `robot.connect()` before disconnecting."
|
||||
)
|
||||
|
||||
for name in self.follower_arms:
|
||||
self.follower_arms[name].disconnect()
|
||||
@@ -514,7 +526,7 @@ class KochRobot:
|
||||
|
||||
for name in self.cameras:
|
||||
self.cameras[name].disconnect()
|
||||
|
||||
|
||||
self.is_connected = False
|
||||
|
||||
def __del__(self):
|
||||
|
||||
Reference in New Issue
Block a user