forked from tangger/lerobot
fix teleop
This commit is contained in:
192
examples/test_torque/hopejr.py
Normal file
192
examples/test_torque/hopejr.py
Normal file
@@ -0,0 +1,192 @@
|
||||
import threading
|
||||
import time
|
||||
from typing import Callable
|
||||
|
||||
import cv2
|
||||
import numpy as np
|
||||
import serial
|
||||
|
||||
from lerobot.common.robot_devices.motors.feetech import (
|
||||
CalibrationMode,
|
||||
FeetechMotorsBus,
|
||||
)
|
||||
|
||||
LOWER_BOUND_LINEAR = -100
|
||||
UPPER_BOUND_LINEAR = 200
|
||||
|
||||
ESCAPE_KEY_ID = 27
|
||||
|
||||
|
||||
class HopeJuniorRobot:
|
||||
def __init__(self):
|
||||
self.arm_bus = FeetechMotorsBus(
|
||||
port="/dev/ttyACM1",
|
||||
motors={
|
||||
# "motor1": (2, "sts3250"),
|
||||
# "motor2": (1, "scs0009"),
|
||||
#"shoulder_pitch": [1, "sts3250"],
|
||||
#"shoulder_yaw": [2, "sts3215"], # TODO: sts3250
|
||||
#"shoulder_roll": [3, "sts3215"], # TODO: sts3250
|
||||
#"elbow_flex": [4, "sts3250"],
|
||||
#"wrist_roll": [5, "sts3215"],
|
||||
#"wrist_yaw": [6, "sts3215"],
|
||||
"wrist_pitch": [7, "sts3215"],
|
||||
},
|
||||
protocol_version=0,
|
||||
)
|
||||
self.hand_bus = FeetechMotorsBus(
|
||||
port="/dev/ttyACM1",
|
||||
motors={
|
||||
"thumb_basel_rotation": [30, "scs0009"],
|
||||
"thumb_flexor": [27, "scs0009"],
|
||||
"thumb_pinky_side": [26, "scs0009"],
|
||||
"thumb_thumb_side": [28, "scs0009"],
|
||||
"index_flexor": [25, "scs0009"],
|
||||
"index_pinky_side": [31, "scs0009"],
|
||||
"index_thumb_side": [32, "scs0009"],
|
||||
"middle_flexor": [24, "scs0009"],
|
||||
"middle_pinky_side": [33, "scs0009"],
|
||||
"middle_thumb_side": [34, "scs0009"],
|
||||
"ring_flexor": [21, "scs0009"],
|
||||
"ring_pinky_side": [36, "scs0009"],
|
||||
"ring_thumb_side": [35, "scs0009"],
|
||||
"pinky_flexor": [23, "scs0009"],
|
||||
"pinky_pinky_side": [38, "scs0009"],
|
||||
"pinky_thumb_side": [37, "scs0009"],
|
||||
},
|
||||
protocol_version=1,
|
||||
group_sync_read=False,
|
||||
)
|
||||
|
||||
def get_hand_calibration(self):
|
||||
"""
|
||||
Returns a dictionary containing calibration settings for each motor
|
||||
on the hand bus.
|
||||
"""
|
||||
homing_offset = [0] * len(self.hand_bus.motor_names)
|
||||
drive_mode = [0] * len(self.hand_bus.motor_names)
|
||||
|
||||
start_pos = [
|
||||
500, 900, 0, 1000, 100, 250, 750, 100, 400, 150, 100, 120, 980, 100, 950, 750,
|
||||
]
|
||||
|
||||
end_pos = [
|
||||
start_pos[0] - 400, # 500 - 400 = 100
|
||||
start_pos[1] - 300, # 900 - 300 = 600
|
||||
start_pos[2] + 550, # 0 + 550 = 550
|
||||
start_pos[3] - 550, # 1000 - 550 = 450
|
||||
start_pos[4] + 900, # 100 + 900 = 1000
|
||||
start_pos[5] + 500, # 250 + 500 = 750
|
||||
start_pos[6] - 500, # 750 - 500 = 250
|
||||
start_pos[7] + 900, # 100 + 900 = 1000
|
||||
start_pos[8] + 700, # 400 + 700 = 1100
|
||||
start_pos[9] + 700, # 150 + 700 = 850
|
||||
start_pos[10] + 900, # 100 + 900 = 1000
|
||||
start_pos[11] + 700, # 120 + 700 = 820
|
||||
start_pos[12] - 700, # 980 - 700 = 280
|
||||
start_pos[13] + 900, # 100 + 900 = 1000
|
||||
start_pos[14] - 700, # 950 - 700 = 250
|
||||
start_pos[15] - 700, # 750 - 700 = 50
|
||||
]
|
||||
|
||||
calib_modes = [CalibrationMode.LINEAR.name] * len(self.hand_bus.motor_names)
|
||||
|
||||
calib_dict = {
|
||||
"homing_offset": homing_offset,
|
||||
"drive_mode": drive_mode,
|
||||
"start_pos": start_pos,
|
||||
"end_pos": end_pos,
|
||||
"calib_mode": calib_modes,
|
||||
"motor_names": self.hand_bus.motor_names,
|
||||
}
|
||||
return calib_dict
|
||||
|
||||
def get_arm_calibration(self):
|
||||
"""
|
||||
Returns a dictionary containing calibration settings for each motor
|
||||
on the arm bus.
|
||||
"""
|
||||
homing_offset = [0] * len(self.arm_bus.motor_names)
|
||||
drive_mode = [0] * len(self.arm_bus.motor_names)
|
||||
|
||||
# Example placeholders
|
||||
start_pos = [
|
||||
600, # shoulder_up
|
||||
1500, # shoulder_forward
|
||||
1300, # shoulder_yaw
|
||||
1000, # bend_elbow
|
||||
1600, # wrist_roll
|
||||
1700, # wrist_yaw
|
||||
600, # wrist_pitch
|
||||
]
|
||||
|
||||
end_pos = [
|
||||
2300, # shoulder_up
|
||||
2300, # shoulder_forward
|
||||
2800, # shoulder_yaw
|
||||
2500, # bend_elbow
|
||||
2800, # wrist_roll
|
||||
2200, # wrist_yaw
|
||||
1700, # wrist_pitch
|
||||
]
|
||||
|
||||
calib_modes = [CalibrationMode.LINEAR.name] * len(self.arm_bus.motor_names)
|
||||
|
||||
calib_dict = {
|
||||
"homing_offset": homing_offset,
|
||||
"drive_mode": drive_mode,
|
||||
"start_pos": start_pos,
|
||||
"end_pos": end_pos,
|
||||
"calib_mode": calib_modes,
|
||||
"motor_names": self.arm_bus.motor_names,
|
||||
}
|
||||
return calib_dict
|
||||
|
||||
def connect(self):
|
||||
"""Connect to the Feetech buses."""
|
||||
self.arm_bus.connect()
|
||||
# self.hand_bus.connect()
|
||||
|
||||
|
||||
def capture_and_display_processed_frames(
|
||||
frame_processor: Callable[[np.ndarray], np.ndarray],
|
||||
window_display_name: str,
|
||||
cap_device: int = 0,
|
||||
) -> None:
|
||||
"""
|
||||
Capture frames from the given input camera device, run them through
|
||||
the frame processor, and display the outputs in a window with the given name.
|
||||
|
||||
User should press Esc to exit.
|
||||
|
||||
Inputs:
|
||||
frame_processor: Callable[[np.ndarray], np.ndarray]
|
||||
Processes frames.
|
||||
Input and output are numpy arrays of shape (H W C) with BGR channel layout and dtype uint8 / byte.
|
||||
window_display_name: str
|
||||
Name of the window used to display frames.
|
||||
cap_device: int
|
||||
Identifier for the camera to use to capture frames.
|
||||
"""
|
||||
cv2.namedWindow(window_display_name)
|
||||
capture = cv2.VideoCapture(cap_device)
|
||||
if not capture.isOpened():
|
||||
raise ValueError("Unable to open video capture.")
|
||||
|
||||
frame_count = 0
|
||||
has_frame, frame = capture.read()
|
||||
while has_frame:
|
||||
frame_count = frame_count + 1
|
||||
# Mirror frame horizontally and flip color for demonstration
|
||||
frame = np.ascontiguousarray(frame[:, ::-1, ::-1])
|
||||
|
||||
# process & show frame
|
||||
processed_frame = frame_processor(frame)
|
||||
cv2.imshow(window_display_name, processed_frame[:, :, ::-1])
|
||||
|
||||
has_frame, frame = capture.read()
|
||||
key = cv2.waitKey(1)
|
||||
if key == ESCAPE_KEY_ID:
|
||||
break
|
||||
|
||||
capture.release()
|
||||
Reference in New Issue
Block a user