""" Teleoperation Realman with a PS5 controller and """ import time import torch import numpy as np from dataclasses import dataclass, field, replace from collections import deque from lerobot.common.robot_devices.teleop.gamepad import HybridController from lerobot.common.robot_devices.motors.utils import get_motor_names, make_motors_buses_from_configs from lerobot.common.robot_devices.cameras.utils import make_cameras_from_configs from lerobot.common.robot_devices.utils import RobotDeviceAlreadyConnectedError, RobotDeviceNotConnectedError from lerobot.common.robot_devices.robots.configs import RealmanRobotConfig class RealmanRobot: def __init__(self, config: RealmanRobotConfig | None = None, **kwargs): if config is None: config = RealmanRobotConfig() # Overwrite config arguments using kwargs self.config = replace(config, **kwargs) self.robot_type = self.config.type self.inference_time = self.config.inference_time # if it is inference time # build cameras self.cameras = make_cameras_from_configs(self.config.cameras) # build realman motors self.piper_motors = make_motors_buses_from_configs(self.config.left_follower_arm) self.arm = self.piper_motors['main'] self.arm.rmarm.rm_movej(self.arm.init_joint_position[:-1], 50, 0, 0, 1) time.sleep(2) ret = self.arm.rmarm.rm_get_current_arm_state() init_pose = ret[1]['pose'] # build init teleop info self.init_info = { 'init_joint': self.arm.init_joint_position, 'init_pose': init_pose, 'max_gripper': config.max_gripper, 'min_gripper': config.min_gripper, 'servo_config_file': config.servo_config_file } # build state-action cache self.joint_queue = deque(maxlen=2) # build gamepad teleop if not self.inference_time: self.teleop = HybridController(self.init_info) else: self.teleop = None self.logs = {} self.is_connected = False @property def camera_features(self) -> dict: cam_ft = {} for cam_key, cam in self.cameras.items(): key = f"observation.images.{cam_key}" cam_ft[key] = { "shape": (cam.height, cam.width, cam.channels), "names": ["height", "width", "channels"], "info": None, } return cam_ft @property def motor_features(self) -> dict: action_names = get_motor_names(self.piper_motors) state_names = get_motor_names(self.piper_motors) return { "action": { "dtype": "float32", "shape": (len(action_names),), "names": action_names, }, "observation.state": { "dtype": "float32", "shape": (len(state_names),), "names": state_names, }, } @property def has_camera(self): return len(self.cameras) > 0 @property def num_cameras(self): return len(self.cameras) def connect(self) -> None: """Connect RealmanArm and cameras""" if self.is_connected: raise RobotDeviceAlreadyConnectedError( "RealmanArm is already connected. Do not run `robot.connect()` twice." ) # connect RealmanArm self.arm.connect(enable=True) print("RealmanArm conneted") # connect cameras for name in self.cameras: self.cameras[name].connect() self.is_connected = self.is_connected and self.cameras[name].is_connected print(f"camera {name} conneted") print("All connected") self.is_connected = True self.run_calibration() def disconnect(self) -> None: """move to home position, disenable piper and cameras""" # move piper to home position, disable if not self.inference_time: self.teleop.stop() # disconnect piper self.arm.safe_disconnect() print("RealmanArm disable after 5 seconds") time.sleep(5) self.arm.connect(enable=False) # disconnect cameras if len(self.cameras) > 0: for cam in self.cameras.values(): cam.disconnect() self.is_connected = False def run_calibration(self): """move piper to the home position""" if not self.is_connected: raise ConnectionError() self.arm.apply_calibration() if not self.inference_time: self.teleop.reset() def teleop_step( self, record_data=False ) -> None | tuple[dict[str, torch.Tensor], dict[str, torch.Tensor]]: if not self.is_connected: raise ConnectionError() if self.teleop is None and self.inference_time: self.teleop = HybridController(self.init_info) # read target pose state as before_read_t = time.perf_counter() state = self.arm.read() # read current joint position from robot action = self.teleop.get_action() # target joint position and pose end pos from gamepad self.logs["read_pos_dt_s"] = time.perf_counter() - before_read_t if action['control_mode'] == 'joint': # 关节控制模式(主模式) ret = self.arm.rmarm.rm_get_current_arm_state() current_pose = ret[1]['pose'] self.teleop.update_endpose_state(current_pose) target_joints = action['joint_angles'] # do action before_write_t = time.perf_counter() self.joint_queue.append(list(self.arm.read().values())) self.logs["write_pos_dt_s"] = time.perf_counter() - before_write_t else: # 末端位姿控制模式 target_pose = [ action['end_pose']['X'], # X (m) action['end_pose']['Y'], # Y (m) action['end_pose']['Z'], # Z (m) action['end_pose']['RX'], # RX (rad) action['end_pose']['RY'], # RY (rad) action['end_pose']['RZ'] # RZ (rad) ] # do action before_write_t = time.perf_counter() # result = self.arm.rmarm.rm_movej_p(target_pose, 100, 0, 0, 0) self.arm.rmarm.rm_movep_follow(target_pose) self.arm.rmarm.rm_set_gripper_position(action['gripper'], False, 2) ret = self.arm.rmarm.rm_get_current_arm_state() target_joints = ret[1].get('joint', self.arm.init_joint_position) self.joint_queue.append(list(self.arm.read().values())) self.teleop.update_joint_state(target_joints) self.logs["write_pos_dt_s"] = time.perf_counter() - before_write_t # print('-'*80) # print('mode: ', action['control_mode']) # print('state: ', list(state.values())) # print('action: ', target_joints) # print('cache[0]: ', self.joint_queue[0]) # print('cache[-1]: ', self.joint_queue[-1]) # print('time: ', time.perf_counter() - before_write_t) # print('-'*80) # time.sleep(1) if not record_data: return state = torch.as_tensor(list(self.joint_queue[0]), dtype=torch.float32) action = torch.as_tensor(list(self.joint_queue[-1]), dtype=torch.float32) # Capture images from cameras images = {} for name in self.cameras: before_camread_t = time.perf_counter() images[name] = self.cameras[name].async_read() images[name] = torch.from_numpy(images[name]) self.logs[f"read_camera_{name}_dt_s"] = self.cameras[name].logs["delta_timestamp_s"] self.logs[f"async_read_camera_{name}_dt_s"] = time.perf_counter() - before_camread_t # Populate output dictionnaries obs_dict, action_dict = {}, {} obs_dict["observation.state"] = state action_dict["action"] = action for name in self.cameras: obs_dict[f"observation.images.{name}"] = images[name] return obs_dict, action_dict def send_action(self, action: torch.Tensor) -> torch.Tensor: """Write the predicted actions from policy to the motors""" if not self.is_connected: raise RobotDeviceNotConnectedError( "Piper is not connected. You need to run `robot.connect()`." ) # send to motors, torch to list target_joints = action.tolist() len_joint = len(target_joints) - 1 target_joints = [target_joints[i]*180 for i in range(len_joint)] + [target_joints[-1]] target_joints[-1] = int(target_joints[-1]*500 + 500) self.arm.write(target_joints) return action def capture_observation(self) -> dict: """capture current images and joint positions""" if not self.is_connected: raise RobotDeviceNotConnectedError( "Piper is not connected. You need to run `robot.connect()`." ) # read current joint positions before_read_t = time.perf_counter() state = self.arm.read() # 6 joints + 1 gripper self.logs["read_pos_dt_s"] = time.perf_counter() - before_read_t state = torch.as_tensor(list(state.values()), dtype=torch.float32) # read images from cameras images = {} for name in self.cameras: before_camread_t = time.perf_counter() images[name] = self.cameras[name].async_read() images[name] = torch.from_numpy(images[name]) self.logs[f"read_camera_{name}_dt_s"] = self.cameras[name].logs["delta_timestamp_s"] self.logs[f"async_read_camera_{name}_dt_s"] = time.perf_counter() - before_camread_t # Populate output dictionnaries and format to pytorch obs_dict = {} obs_dict["observation.state"] = state for name in self.cameras: obs_dict[f"observation.images.{name}"] = images[name] return obs_dict def teleop_safety_stop(self): """ move to home position after record one episode """ self.run_calibration() def __del__(self): if self.is_connected: self.disconnect() if not self.inference_time: self.teleop.stop() if __name__ == '__main__': robot = RealmanRobot() robot.connect() # robot.run_calibration() while True: robot.teleop_step(record_data=True) robot.capture_observation() dummy_action = torch.Tensor([-0.40586111280653214, 0.5522833506266276, 0.4998166826036241, -0.3539944542778863, -0.524433347913954, 0.9064999898274739, 0.482]) robot.send_action(dummy_action) robot.disconnect() print('ok')