提交代码
This commit is contained in:
@@ -1,5 +1,8 @@
|
||||
type: realman
|
||||
port: 192.168.3.18:8080
|
||||
gripper_range:
|
||||
- 10
|
||||
- 990
|
||||
motors:
|
||||
joint_1:
|
||||
id: 1
|
||||
|
||||
@@ -17,6 +17,7 @@ class RealmanMotorsBus(MotorsBus):
|
||||
self,
|
||||
port: str,
|
||||
motors:dict[str, Motor],
|
||||
gripper_range:list[int],
|
||||
joint: list,
|
||||
calibration: dict[str, MotorCalibration] | None = None,
|
||||
):
|
||||
@@ -25,6 +26,7 @@ class RealmanMotorsBus(MotorsBus):
|
||||
address = port.split(':')
|
||||
self.handle = self.rmarm.rm_create_robot_arm(address[0], int(address[1]))
|
||||
self.motors = motors
|
||||
self.gripper_range = gripper_range
|
||||
self.init_joint_position = joint
|
||||
self.safe_disable_position = joint
|
||||
self.rmarm.rm_movej(self.init_joint_position[:-1], 5, 0, 0, 1)
|
||||
@@ -105,7 +107,7 @@ class RealmanMotorsBus(MotorsBus):
|
||||
"""
|
||||
移动到初始位置
|
||||
"""
|
||||
self.write(target_joint=self.init_joint_position)
|
||||
self.write_arm(target_joint=self.init_joint_position)
|
||||
|
||||
def write(self, data_name: str, motor: str, value: Value, *, normalize: bool = True, num_retry: int = 0):
|
||||
return False
|
||||
@@ -176,6 +178,10 @@ class RealmanMotorsBus(MotorsBus):
|
||||
def sync_write(self,name,actionData: dict[str, Any]):
|
||||
if name =="Goal_Position":
|
||||
self.write_arm(target_joint=actionData)
|
||||
|
||||
def sync_read(self, data_name, motors = None, *, normalize = True, num_retry = 0):
|
||||
if data_name == "Present_Position":
|
||||
return self.read()
|
||||
|
||||
def _assert_protocol_is_compatible(self, instruction_name):
|
||||
return True
|
||||
@@ -185,14 +191,7 @@ class RealmanMotorsBus(MotorsBus):
|
||||
配置电机
|
||||
"""
|
||||
def configure_motors(self, return_delay_time=0, maximum_acceleration=254, acceleration=254) -> None:
|
||||
for motor in self.motors:
|
||||
# By default, Feetech motors have a 500µs delay response time (corresponding to a value of 250 on
|
||||
# the 'Return_Delay_Time' address). We ensure this is reduced to the minimum of 2µs (value of 0).
|
||||
self.write("Return_Delay_Time", motor, return_delay_time)
|
||||
# Set 'Maximum_Acceleration' to 254 to speedup acceleration and deceleration of the motors.
|
||||
if self.protocol_version == 0:
|
||||
self.write("Maximum_Acceleration", motor, maximum_acceleration)
|
||||
self.write("Acceleration", motor, acceleration)
|
||||
self.rmarm.rm_set_gripper_route(self.gripper_range[0],self.gripper_range[1])
|
||||
|
||||
def disable_torque(self, motors: int | str | list[str] | None = None, num_retry: int = 0):
|
||||
for motor in self._get_motors_list(motors):
|
||||
@@ -207,30 +206,32 @@ class RealmanMotorsBus(MotorsBus):
|
||||
|
||||
@property
|
||||
def is_calibrated(self) -> bool:
|
||||
motors_calibration = self.read_calibration()
|
||||
if set(motors_calibration) != set(self.calibration):
|
||||
return False
|
||||
return True
|
||||
# motors_calibration = self.read_calibration()
|
||||
# if set(motors_calibration) != set(self.calibration):
|
||||
# return False
|
||||
|
||||
same_ranges = all(
|
||||
self.calibration[motor].range_min == cal.range_min
|
||||
and self.calibration[motor].range_max == cal.range_max
|
||||
for motor, cal in motors_calibration.items()
|
||||
)
|
||||
if self.protocol_version == 1:
|
||||
return same_ranges
|
||||
# same_ranges = all(
|
||||
# self.calibration[motor].range_min == cal.range_min
|
||||
# and self.calibration[motor].range_max == cal.range_max
|
||||
# for motor, cal in motors_calibration.items()
|
||||
# )
|
||||
# if self.protocol_version == 1:
|
||||
# return same_ranges
|
||||
|
||||
same_offsets = all(
|
||||
self.calibration[motor].homing_offset == cal.homing_offset
|
||||
for motor, cal in motors_calibration.items()
|
||||
)
|
||||
return same_ranges and same_offsets
|
||||
# same_offsets = all(
|
||||
# self.calibration[motor].homing_offset == cal.homing_offset
|
||||
# for motor, cal in motors_calibration.items()
|
||||
# )
|
||||
# return same_ranges and same_offsets
|
||||
|
||||
def write_calibration(self, calibration_dict, cache = True):
|
||||
for motor, calibration in calibration_dict.items():
|
||||
self.rmarm.rm_set_joint_min_pos(self, motor, calibration.range_min)
|
||||
self.rmarm.rm_set_joint_max_pos(self, motor, calibration.range_max)
|
||||
if cache:
|
||||
self.calibration = calibration_dict
|
||||
pass
|
||||
# for motor, calibration in calibration_dict.items():
|
||||
# self.rmarm.rm_set_joint_min_pos(self, motor, calibration.range_min)
|
||||
# self.rmarm.rm_set_joint_max_pos(self, motor, calibration.range_max)
|
||||
# if cache:
|
||||
# self.calibration = calibration_dict
|
||||
|
||||
def _get_half_turn_homings(self, positions):
|
||||
offsets = {}
|
||||
|
||||
@@ -17,11 +17,11 @@ class RealmanRobot(Robot):
|
||||
def __init__(self, config: RealmanRobotConfig):
|
||||
super().__init__(config)
|
||||
self.config = config
|
||||
print(config)
|
||||
self.bus = RealmanMotorsBus(
|
||||
port=self.config.port,
|
||||
motors=config.motors,
|
||||
joint=config.joint,
|
||||
gripper_range=config.gripper_range,
|
||||
calibration=self.calibration,
|
||||
)
|
||||
self.cameras = make_cameras_from_configs(config.cameras)
|
||||
@@ -56,15 +56,15 @@ class RealmanRobot(Robot):
|
||||
raise DeviceAlreadyConnectedError(f"{self} already connected")
|
||||
|
||||
self.bus.connect()
|
||||
|
||||
for cam in self.cameras.values():
|
||||
cam.connect()
|
||||
|
||||
if not self.is_calibrated and calibrate:
|
||||
logger.info(
|
||||
"Mismatch between calibration values in the motor and the calibration file or no calibration file found"
|
||||
)
|
||||
self.calibrate()
|
||||
|
||||
for cam in self.cameras.values():
|
||||
cam.connect()
|
||||
|
||||
self.configure()
|
||||
logger.info(f"{self} connected.")
|
||||
|
||||
@@ -73,69 +73,18 @@ class RealmanRobot(Robot):
|
||||
return self.bus.is_calibrated
|
||||
|
||||
def calibrate(self) -> None:
|
||||
if self.calibration:
|
||||
# Calibration file exists, ask user whether to use it or run new calibration
|
||||
user_input = input(
|
||||
f"Press ENTER to use provided calibration file associated with the id {self.id}, or type 'c' and press ENTER to run calibration: "
|
||||
)
|
||||
if user_input.strip().lower() != "c":
|
||||
logger.info(f"Writing calibration file associated with the id {self.id} to the motors")
|
||||
self.bus.write_calibration(self.calibration)
|
||||
return
|
||||
|
||||
logger.info(f"\nRunning calibration of {self}")
|
||||
self.bus.disable_torque()
|
||||
for motor in self.bus.motors:
|
||||
self.bus.write("Operating_Mode", motor, OperatingMode.POSITION.value)
|
||||
|
||||
input(f"Move {self} to the middle of its range of motion and press ENTER....")
|
||||
homing_offsets = self.bus.set_half_turn_homings()
|
||||
|
||||
full_turn_motor = "wrist_roll"
|
||||
unknown_range_motors = [motor for motor in self.bus.motors if motor != full_turn_motor]
|
||||
print(
|
||||
f"Move all joints except '{full_turn_motor}' sequentially through their "
|
||||
"entire ranges of motion.\nRecording positions. Press ENTER to stop..."
|
||||
)
|
||||
range_mins, range_maxes = self.bus.record_ranges_of_motion(unknown_range_motors)
|
||||
range_mins[full_turn_motor] = 0
|
||||
range_maxes[full_turn_motor] = 4095
|
||||
|
||||
self.calibration = {}
|
||||
for motor, m in self.bus.motors.items():
|
||||
self.calibration[motor] = MotorCalibration(
|
||||
id=m.id,
|
||||
drive_mode=0,
|
||||
homing_offset=homing_offsets[motor],
|
||||
range_min=range_mins[motor],
|
||||
range_max=range_maxes[motor],
|
||||
)
|
||||
|
||||
self.bus.write_calibration(self.calibration)
|
||||
self._save_calibration()
|
||||
print("Calibration saved to", self.calibration_fpath)
|
||||
"""move piper to the home position"""
|
||||
if not self.is_connected:
|
||||
raise ConnectionError()
|
||||
|
||||
self.bus.apply_calibration()
|
||||
|
||||
def configure(self) -> None:
|
||||
with self.bus.torque_disabled():
|
||||
self.bus.configure_motors()
|
||||
for motor in self.bus.motors:
|
||||
self.bus.write("Operating_Mode", motor, OperatingMode.POSITION.value)
|
||||
# Set P_Coefficient to lower value to avoid shakiness (Default is 32)
|
||||
self.bus.write("P_Coefficient", motor, 16)
|
||||
# Set I_Coefficient and D_Coefficient to default value 0 and 32
|
||||
self.bus.write("I_Coefficient", motor, 0)
|
||||
self.bus.write("D_Coefficient", motor, 32)
|
||||
|
||||
if motor == "gripper":
|
||||
self.bus.write("Max_Torque_Limit", motor, 500) # 50% of max torque to avoid burnout
|
||||
self.bus.write("Protection_Current", motor, 250) # 50% of max current to avoid burnout
|
||||
self.bus.write("Overload_Torque", motor, 25) # 25% torque when overloaded
|
||||
|
||||
def setup_motors(self) -> None:
|
||||
for motor in reversed(self.bus.motors):
|
||||
input(f"Connect the controller board to the '{motor}' motor only and press enter.")
|
||||
self.bus.setup_motor(motor)
|
||||
print(f"'{motor}' motor id set to {self.bus.motors[motor].id}")
|
||||
pass
|
||||
|
||||
def get_observation(self) -> dict[str, Any]:
|
||||
if not self.is_connected:
|
||||
|
||||
@@ -9,6 +9,7 @@ from lerobot.robots.config import RobotConfig
|
||||
class RealmanRobotConfig(RobotConfig):
|
||||
# Port to connect to the arm
|
||||
port: str
|
||||
gripper_range: list[int] = [10, 990]
|
||||
disable_torque_on_disconnect: bool = True
|
||||
# cameras
|
||||
cameras: dict[str, RealSenseCameraConfig] = field(default_factory=dict)
|
||||
|
||||
Reference in New Issue
Block a user