Record初步实现
This commit is contained in:
@@ -5,11 +5,60 @@ from lerobot.motors import Motor, MotorCalibration, MotorsBus
|
||||
#动作执行成功
|
||||
ACTION_SUCCESS = 0
|
||||
|
||||
class RoboticArmMock:
|
||||
def __init__(self,*args):
|
||||
pass
|
||||
|
||||
def rm_create_robot_arm(self, ip, port):
|
||||
print(f"connnect {ip}:{port}")
|
||||
return {"id":1}
|
||||
|
||||
def rm_movej(self,*args):
|
||||
print(f"rm_movej:{args}")
|
||||
|
||||
def rm_movej_follow(self,*args):
|
||||
print(f"rm_movej_follow:{args}")
|
||||
|
||||
def rm_get_current_arm_state(self):
|
||||
print("get_current_arm_state")
|
||||
return ACTION_SUCCESS, {"pose":[0.0,0.0,0.0,0.0,0.0,0.0],"joint":[0.0,0.0,0.0,0.0,0.0,0.0]}
|
||||
|
||||
def rm_movej_p(self,*args):
|
||||
print(f"rm_movej_p:{args}")
|
||||
|
||||
|
||||
def rm_set_joint_en_state(self,*args):
|
||||
print(f"rm_set_joint_en_state:{args}")
|
||||
|
||||
def rm_set_gripper_route(self,*args):
|
||||
print(f"rm_set_gripper_route:{args}")
|
||||
|
||||
def rm_set_gripper_position(self,*args):
|
||||
print(f"rm_set_gripper_position:{args}")
|
||||
|
||||
def rm_get_gripper_state(self):
|
||||
print("get_gripper_state")
|
||||
return ACTION_SUCCESS, {"actpos":0.0}
|
||||
|
||||
def rm_set_gripper_release(self,*args):
|
||||
print(f"rm_set_gripper_release:{args}")
|
||||
def rm_set_gripper_pick_on(self,*args):
|
||||
print(f"rm_set_gripper_pick_on:{args}")
|
||||
|
||||
def rm_movev_canfd(self,*args):
|
||||
print(f"rm_movev_canfd:{args}")
|
||||
|
||||
def rm_destory(self):
|
||||
print("destory")
|
||||
|
||||
|
||||
class RealmanMotorsBus(MotorsBus):
|
||||
model_number_table={}
|
||||
model_ctrl_table={
|
||||
"realman":{}
|
||||
}
|
||||
ip: str
|
||||
port: int
|
||||
"""
|
||||
对Realman SDK的二次封装
|
||||
"""
|
||||
@@ -19,20 +68,23 @@ class RealmanMotorsBus(MotorsBus):
|
||||
motors:dict[str, Motor],
|
||||
gripper_range:list[int],
|
||||
joint: list,
|
||||
mock: bool = False,
|
||||
calibration: dict[str, MotorCalibration] | None = None,
|
||||
):
|
||||
super().__init__(port,motors, calibration)
|
||||
self.rmarm = RoboticArm(rm_thread_mode_e.RM_TRIPLE_MODE_E)
|
||||
if mock:
|
||||
self.rmarm = RoboticArmMock()
|
||||
else:
|
||||
self.rmarm = RoboticArm(rm_thread_mode_e.RM_TRIPLE_MODE_E)
|
||||
address = port.split(':')
|
||||
self.handle = self.rmarm.rm_create_robot_arm(address[0], int(address[1]))
|
||||
self.ip = address[0]
|
||||
self.port = int(address[1])
|
||||
self.motors = motors
|
||||
self.gripper_range = gripper_range
|
||||
self.init_joint_position = joint
|
||||
self.safe_disable_position = joint
|
||||
self.rmarm.rm_movej(self.init_joint_position[:-1], 5, 0, 0, 1)
|
||||
time.sleep(3)
|
||||
ret = self.rmarm.rm_get_current_arm_state()
|
||||
self.init_pose = ret[1]['pose']
|
||||
self.handle = None
|
||||
self.init_pose = None
|
||||
|
||||
@property
|
||||
def motor_names(self) -> list[str]:
|
||||
@@ -87,7 +139,12 @@ class RealmanMotorsBus(MotorsBus):
|
||||
enable_flag = True
|
||||
break
|
||||
time.sleep(1)
|
||||
|
||||
|
||||
self.handle = self.rmarm.rm_create_robot_arm(self.ip, self.port)
|
||||
self.rmarm.rm_movej(self.init_joint_position[:-1], 5, 0, 0, 1)
|
||||
time.sleep(3)
|
||||
ret = self.rmarm.rm_get_current_arm_state()
|
||||
self.init_pose = ret[1]['pose']
|
||||
resp = enable_flag
|
||||
print(f"Returning response: {resp}")
|
||||
return resp
|
||||
@@ -120,22 +177,6 @@ class RealmanMotorsBus(MotorsBus):
|
||||
|
||||
ret = self.rmarm.rm_set_gripper_position(target_joint[-1], block=False, timeout=2)
|
||||
return ret
|
||||
|
||||
def write_endpose(self, target_endpose: list, gripper: int):
|
||||
self.rmarm.rm_movej_p(target_endpose, 50, 0, 0, 1)
|
||||
self.rmarm.rm_set_gripper_position(gripper, block=False, timeout=2)
|
||||
|
||||
def write_joint_slow(self, target_joint: list):
|
||||
self.rmarm.rm_movej(target_joint, 5, 0, 0, 0)
|
||||
|
||||
def write_joint_canfd(self, target_joint: list):
|
||||
self.rmarm.rm_movej_canfd(target_joint, False)
|
||||
|
||||
def write_endpose_canfd(self, target_pose: list):
|
||||
self.rmarm.rm_movep_canfd(target_pose, False)
|
||||
|
||||
def write_gripper(self, gripper: int):
|
||||
self.rmarm.rm_set_gripper_position(gripper, False, 2)
|
||||
|
||||
def read(self) -> Dict:
|
||||
"""
|
||||
@@ -163,6 +204,9 @@ class RealmanMotorsBus(MotorsBus):
|
||||
|
||||
def read_current_arm_endpose_state(self):
|
||||
return self.rmarm.rm_get_current_arm_state()[1]['pose']
|
||||
|
||||
def disconnect(self, disable_torque = True):
|
||||
self.safe_disconnect()
|
||||
|
||||
def safe_disconnect(self):
|
||||
"""
|
||||
@@ -178,6 +222,13 @@ class RealmanMotorsBus(MotorsBus):
|
||||
def sync_write(self,name,actionData: dict[str, Any]):
|
||||
if name =="Goal_Position":
|
||||
self.write_arm(target_joint=actionData)
|
||||
elif name == "Goal_Velocity":#速度透传模式
|
||||
self.rmarm.rm_movev_canfd(list(actionData.values())[:6])
|
||||
gripper = actionData['gripper']
|
||||
if gripper > 0 :
|
||||
self.rmarm.rm_set_gripper_release(gripper*1000, False)
|
||||
else:
|
||||
self.rmarm.rm_set_gripper_pick_on(gripper*1000, gripper*1000, False)
|
||||
|
||||
def sync_read(self, data_name, motors = None, *, normalize = True, num_retry = 0):
|
||||
if data_name == "Present_Position":
|
||||
|
||||
Reference in New Issue
Block a user