""" Teleoperation Realman with a PS5 controller and """ import time import torch import numpy as np from dataclasses import dataclass, field, replace from collections import deque from lerobot.common.robot_devices.teleop.realman_single import HybridController from lerobot.common.robot_devices.motors.utils import get_motor_names, make_motors_buses_from_configs from lerobot.common.robot_devices.cameras.utils import make_cameras_from_configs from lerobot.common.robot_devices.utils import RobotDeviceAlreadyConnectedError, RobotDeviceNotConnectedError from lerobot.common.robot_devices.robots.configs import RealmanRobotConfig class RealmanRobot: def __init__(self, config: RealmanRobotConfig | None = None, **kwargs): if config is None: config = RealmanRobotConfig() # Overwrite config arguments using kwargs self.config = replace(config, **kwargs) self.robot_type = self.config.type self.inference_time = self.config.inference_time # if it is inference time # build cameras self.cameras = make_cameras_from_configs(self.config.cameras) # build realman motors self.piper_motors = make_motors_buses_from_configs(self.config.left_follower_arm) self.arm = self.piper_motors['main'] # build init teleop info self.init_info = { 'init_joint': self.arm.init_joint_position, 'init_pose': self.arm.init_pose, 'max_gripper': config.max_gripper, 'min_gripper': config.min_gripper, 'servo_config_file': config.servo_config_file } # build state-action cache self.joint_queue = deque(maxlen=2) self.last_endpose = self.arm.init_pose # build gamepad teleop if not self.inference_time: self.teleop = HybridController(self.init_info) else: self.teleop = None self.logs = {} self.is_connected = False @property def camera_features(self) -> dict: cam_ft = {} for cam_key, cam in self.cameras.items(): key = f"observation.images.{cam_key}" cam_ft[key] = { "shape": (cam.height, cam.width, cam.channels), "names": ["height", "width", "channels"], "info": None, } return cam_ft @property def motor_features(self) -> dict: action_names = get_motor_names(self.piper_motors) state_names = get_motor_names(self.piper_motors) return { "action": { "dtype": "float32", "shape": (len(action_names),), "names": action_names, }, "observation.state": { "dtype": "float32", "shape": (len(state_names),), "names": state_names, }, } @property def has_camera(self): return len(self.cameras) > 0 @property def num_cameras(self): return len(self.cameras) def connect(self) -> None: """Connect RealmanArm and cameras""" if self.is_connected: raise RobotDeviceAlreadyConnectedError( "RealmanArm is already connected. Do not run `robot.connect()` twice." ) # connect RealmanArm self.arm.connect(enable=True) print("RealmanArm conneted") # connect cameras for name in self.cameras: self.cameras[name].connect() self.is_connected = self.is_connected and self.cameras[name].is_connected print(f"camera {name} conneted") print("All connected") self.is_connected = True self.run_calibration() def disconnect(self) -> None: """move to home position, disenable piper and cameras""" # move piper to home position, disable if not self.inference_time: self.teleop.stop() # disconnect piper self.arm.safe_disconnect() print("RealmanArm disable after 5 seconds") time.sleep(5) self.arm.connect(enable=False) # disconnect cameras if len(self.cameras) > 0: for cam in self.cameras.values(): cam.disconnect() self.is_connected = False def run_calibration(self): """move piper to the home position""" if not self.is_connected: raise ConnectionError() self.arm.apply_calibration() if not self.inference_time: self.teleop.reset() def teleop_step( self, record_data=False ) -> None | tuple[dict[str, torch.Tensor], dict[str, torch.Tensor]]: if not self.is_connected: raise ConnectionError() if self.teleop is None and self.inference_time: self.teleop = HybridController(self.init_info) # read target pose state as before_read_t = time.perf_counter() state = self.arm.read() # read current joint position from robot action = self.teleop.get_action() # target joint position and pose end pos from gamepad self.logs["read_pos_dt_s"] = time.perf_counter() - before_read_t if action['control_mode'] == 'joint': # 关节控制模式(主模式) current_pose = self.arm.read_current_arm_endpose_state() self.teleop.update_endpose_state(current_pose) target_joints = action['joint_angles'][:-1] self.arm.write_gripper(action['gripper']) print(action['gripper']) if action['master_controller_status']['infrared'] == 1: if action['master_controller_status']['button'] == 1: self.arm.write_joint_canfd(target_joints) else: self.arm.write_joint_slow(target_joints) # do action before_write_t = time.perf_counter() self.joint_queue.append(list(self.arm.read().values())) self.logs["write_pos_dt_s"] = time.perf_counter() - before_write_t else: target_pose = list(action['end_pose']) # do action before_write_t = time.perf_counter() if self.last_endpose != target_pose: self.arm.write_endpose_canfd(target_pose) self.last_endpose = target_pose self.arm.write_gripper(action['gripper']) target_joints = self.arm.read_current_arm_joint_state() self.joint_queue.append(list(self.arm.read().values())) self.teleop.update_joint_state(target_joints) self.logs["write_pos_dt_s"] = time.perf_counter() - before_write_t if not record_data: return state = torch.as_tensor(list(self.joint_queue[0]), dtype=torch.float32) action = torch.as_tensor(list(self.joint_queue[-1]), dtype=torch.float32) # Capture images from cameras images = {} for name in self.cameras: before_camread_t = time.perf_counter() images[name] = self.cameras[name].async_read() images[name] = torch.from_numpy(images[name]) self.logs[f"read_camera_{name}_dt_s"] = self.cameras[name].logs["delta_timestamp_s"] self.logs[f"async_read_camera_{name}_dt_s"] = time.perf_counter() - before_camread_t # Populate output dictionnaries obs_dict, action_dict = {}, {} obs_dict["observation.state"] = state action_dict["action"] = action for name in self.cameras: obs_dict[f"observation.images.{name}"] = images[name] return obs_dict, action_dict def send_action(self, action: torch.Tensor) -> torch.Tensor: """Write the predicted actions from policy to the motors""" if not self.is_connected: raise RobotDeviceNotConnectedError( "Piper is not connected. You need to run `robot.connect()`." ) # send to motors, torch to list target_joints = action.tolist() len_joint = len(target_joints) - 1 target_joints = [target_joints[i]*180 for i in range(len_joint)] + [target_joints[-1]] target_joints[-1] = int(target_joints[-1]*500 + 500) self.arm.write(target_joints) return action def capture_observation(self) -> dict: """capture current images and joint positions""" if not self.is_connected: raise RobotDeviceNotConnectedError( "Piper is not connected. You need to run `robot.connect()`." ) # read current joint positions before_read_t = time.perf_counter() state = self.arm.read() # 6 joints + 1 gripper self.logs["read_pos_dt_s"] = time.perf_counter() - before_read_t state = torch.as_tensor(list(state.values()), dtype=torch.float32) # read images from cameras images = {} for name in self.cameras: before_camread_t = time.perf_counter() images[name] = self.cameras[name].async_read() images[name] = torch.from_numpy(images[name]) self.logs[f"read_camera_{name}_dt_s"] = self.cameras[name].logs["delta_timestamp_s"] self.logs[f"async_read_camera_{name}_dt_s"] = time.perf_counter() - before_camread_t # Populate output dictionnaries and format to pytorch obs_dict = {} obs_dict["observation.state"] = state for name in self.cameras: obs_dict[f"observation.images.{name}"] = images[name] return obs_dict def teleop_safety_stop(self): """ move to home position after record one episode """ self.run_calibration() def __del__(self): if self.is_connected: self.disconnect() if not self.inference_time: self.teleop.stop() if __name__ == '__main__': robot = RealmanRobot() robot.connect() # robot.run_calibration() while True: robot.teleop_step(record_data=True) robot.capture_observation() dummy_action = torch.Tensor([-0.40586111280653214, 0.5522833506266276, 0.4998166826036241, -0.3539944542778863, -0.524433347913954, 0.9064999898274739, 0.482]) robot.send_action(dummy_action) robot.disconnect() print('ok')