Compare commits
4 Commits
main
...
user/rcade
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f996a13f70 | ||
|
|
743ebfa7c1 | ||
|
|
2c45660d77 | ||
|
|
9dd4414c6e |
681
examples/test.py
Normal file
681
examples/test.py
Normal file
@@ -0,0 +1,681 @@
|
||||
import threading
|
||||
import time
|
||||
from typing import Callable
|
||||
|
||||
import cv2
|
||||
import numpy as np
|
||||
|
||||
# from qai_hub_models.models.mediapipe_hand.app import MediaPipeHandApp
|
||||
# from qai_hub_models.models.mediapipe_hand.model import (
|
||||
# MediaPipeHand,
|
||||
# )
|
||||
# from qai_hub_models.utils.image_processing import (
|
||||
# app_to_net_image_inputs,
|
||||
# )
|
||||
from lerobot.common.robot_devices.motors.feetech import (
|
||||
CalibrationMode,
|
||||
FeetechMotorsBus,
|
||||
)
|
||||
|
||||
LOWER_BOUND_LINEAR = -100
|
||||
UPPER_BOUND_LINEAR = 200
|
||||
|
||||
import serial
|
||||
|
||||
|
||||
class HomonculusGlove:
|
||||
def __init__(self):
|
||||
self.serial_port = "/dev/tty.usbmodem21401"
|
||||
self.baud_rate = 115200
|
||||
self.serial = serial.Serial(self.serial_port, self.baud_rate, timeout=1)
|
||||
self.thread = threading.Thread(target=self.async_read)
|
||||
self.thread.start()
|
||||
self.last_d = {
|
||||
"thumb_0": 100,
|
||||
"thumb_1": 100,
|
||||
"thumb_2": 100,
|
||||
"thumb_3": 100,
|
||||
"index_0": 100,
|
||||
"index_1": 100,
|
||||
"index_2": 100,
|
||||
"middle_0": 100,
|
||||
"middle_1": 100,
|
||||
"middle_2": 100,
|
||||
"ring_0": 100,
|
||||
"ring_1": 100,
|
||||
"ring_2": 100,
|
||||
"pinky_0": 100,
|
||||
"pinky_1": 100,
|
||||
"pinky_2": 100,
|
||||
"battery_voltage": 100,
|
||||
}
|
||||
self.calibration = None
|
||||
|
||||
@property
|
||||
def joint_names(self):
|
||||
return list(self.last_d.keys())
|
||||
|
||||
def read(self, motor_names: list[str] | None = None):
|
||||
if motor_names is None:
|
||||
motor_names = self.joint_names
|
||||
|
||||
values = np.array([self.last_d[k] for k in motor_names])
|
||||
|
||||
print(motor_names)
|
||||
print(values)
|
||||
|
||||
if self.calibration is not None:
|
||||
values = self.apply_calibration(values, motor_names)
|
||||
print(values)
|
||||
return values
|
||||
|
||||
def async_read(self):
|
||||
while True:
|
||||
if self.serial.in_waiting > 0:
|
||||
self.serial.flush()
|
||||
vals = self.serial.readline().decode("utf-8").strip()
|
||||
vals = vals.split(" ")
|
||||
if len(vals) != 17:
|
||||
continue
|
||||
vals = [int(val) for val in vals]
|
||||
|
||||
d = {
|
||||
"thumb_0": vals[0],
|
||||
"thumb_1": vals[1],
|
||||
"thumb_2": vals[2],
|
||||
"thumb_3": vals[3],
|
||||
"index_0": vals[4],
|
||||
"index_1": vals[5],
|
||||
"index_2": vals[6],
|
||||
"middle_0": vals[7],
|
||||
"middle_1": vals[8],
|
||||
"middle_2": vals[9],
|
||||
"ring_0": vals[10],
|
||||
"ring_1": vals[11],
|
||||
"ring_2": vals[12],
|
||||
"pinky_0": vals[13],
|
||||
"pinky_1": vals[14],
|
||||
"pinky_2": vals[15],
|
||||
"battery_voltage": vals[16],
|
||||
}
|
||||
self.last_d = d
|
||||
# print(d.values())
|
||||
|
||||
def run_calibration(self):
|
||||
print("\nMove arm to open position")
|
||||
input("Press Enter to continue...")
|
||||
open_pos_list = []
|
||||
for _ in range(300):
|
||||
open_pos = self.read()
|
||||
open_pos_list.append(open_pos)
|
||||
time.sleep(0.01)
|
||||
open_pos = np.array(open_pos_list)
|
||||
max_open_pos = open_pos.max(axis=0)
|
||||
min_open_pos = open_pos.min(axis=0)
|
||||
|
||||
print(f"{max_open_pos=}")
|
||||
print(f"{min_open_pos=}")
|
||||
|
||||
print("\nMove arm to closed position")
|
||||
input("Press Enter to continue...")
|
||||
closed_pos_list = []
|
||||
for _ in range(300):
|
||||
closed_pos = self.read()
|
||||
closed_pos_list.append(closed_pos)
|
||||
time.sleep(0.01)
|
||||
closed_pos = np.array(closed_pos_list)
|
||||
max_closed_pos = closed_pos.max(axis=0)
|
||||
closed_pos[closed_pos < 1000] = 60000
|
||||
min_closed_pos = closed_pos.min(axis=0)
|
||||
|
||||
print(f"{max_closed_pos=}")
|
||||
print(f"{min_closed_pos=}")
|
||||
|
||||
open_pos = np.array([max_open_pos, max_closed_pos]).max(axis=0)
|
||||
closed_pos = np.array([min_open_pos, min_closed_pos]).min(axis=0)
|
||||
|
||||
# INVERTION
|
||||
# INVERTION
|
||||
# INVERTION
|
||||
# INVERTION
|
||||
# INVERTION
|
||||
# INVERTION
|
||||
# INVERTION
|
||||
for i, jname in enumerate(self.joint_names):
|
||||
if jname in ["thumb_0", "thumb_3", "index_2", "middle_2", "ring_2", "pinky_0", "pinky_2"]:
|
||||
tmp_pos = open_pos[i]
|
||||
open_pos[i] = closed_pos[i]
|
||||
closed_pos[i] = tmp_pos
|
||||
|
||||
print()
|
||||
print(f"{open_pos=}")
|
||||
print(f"{closed_pos=}")
|
||||
|
||||
homing_offset = [0] * len(self.joint_names)
|
||||
drive_mode = [0] * len(self.joint_names)
|
||||
calib_modes = [CalibrationMode.LINEAR.name] * len(self.joint_names)
|
||||
|
||||
calib_dict = {
|
||||
"homing_offset": homing_offset,
|
||||
"drive_mode": drive_mode,
|
||||
"start_pos": open_pos,
|
||||
"end_pos": closed_pos,
|
||||
"calib_mode": calib_modes,
|
||||
"motor_names": self.joint_names,
|
||||
}
|
||||
# return calib_dict
|
||||
self.set_calibration(calib_dict)
|
||||
|
||||
def set_calibration(self, calibration: dict[str, list]):
|
||||
self.calibration = calibration
|
||||
|
||||
def apply_calibration(self, values: np.ndarray | list, motor_names: list[str] | None):
|
||||
"""Convert from unsigned int32 joint position range [0, 2**32[ to the universal float32 nominal degree range ]-180.0, 180.0[ with
|
||||
a "zero position" at 0 degree.
|
||||
|
||||
Note: We say "nominal degree range" since the motors can take values outside this range. For instance, 190 degrees, if the motor
|
||||
rotate more than a half a turn from the zero position. However, most motors can't rotate more than 180 degrees and will stay in this range.
|
||||
|
||||
Joints values are original in [0, 2**32[ (unsigned int32). Each motor are expected to complete a full rotation
|
||||
when given a goal position that is + or - their resolution. For instance, feetech xl330-m077 have a resolution of 4096, and
|
||||
at any position in their original range, let's say the position 56734, they complete a full rotation clockwise by moving to 60830,
|
||||
or anticlockwise by moving to 52638. The position in the original range is arbitrary and might change a lot between each motor.
|
||||
To harmonize between motors of the same model, different robots, or even models of different brands, we propose to work
|
||||
in the centered nominal degree range ]-180, 180[.
|
||||
"""
|
||||
if motor_names is None:
|
||||
motor_names = self.motor_names
|
||||
|
||||
# Convert from unsigned int32 original range [0, 2**32] to signed float32 range
|
||||
values = values.astype(np.float32)
|
||||
|
||||
for i, name in enumerate(motor_names):
|
||||
calib_idx = self.calibration["motor_names"].index(name)
|
||||
calib_mode = self.calibration["calib_mode"][calib_idx]
|
||||
|
||||
if CalibrationMode[calib_mode] == CalibrationMode.LINEAR:
|
||||
start_pos = self.calibration["start_pos"][calib_idx]
|
||||
end_pos = self.calibration["end_pos"][calib_idx]
|
||||
|
||||
# Rescale the present position to a nominal range [0, 100] %,
|
||||
# useful for joints with linear motions like Aloha gripper
|
||||
values[i] = (values[i] - start_pos) / (end_pos - start_pos) * 100
|
||||
|
||||
if (values[i] < LOWER_BOUND_LINEAR) or (values[i] > UPPER_BOUND_LINEAR):
|
||||
if name == "pinky_1" and (values[i] < LOWER_BOUND_LINEAR):
|
||||
values[i] = end_pos
|
||||
else:
|
||||
msg = (
|
||||
f"Wrong motor position range detected for {name}. "
|
||||
f"Expected to be in nominal range of [0, 100] % (a full linear translation), "
|
||||
f"with a maximum range of [{LOWER_BOUND_LINEAR}, {UPPER_BOUND_LINEAR}] % to account for some imprecision during calibration, "
|
||||
f"but present value is {values[i]} %. "
|
||||
"This might be due to a cable connection issue creating an artificial jump in motor values. "
|
||||
"You need to recalibrate by running: `python lerobot/scripts/control_robot.py calibrate`"
|
||||
)
|
||||
print(msg)
|
||||
# raise JointOutOfRangeError(msg)
|
||||
|
||||
return values
|
||||
|
||||
# def revert_calibration(self, values: np.ndarray | list, motor_names: list[str] | None):
|
||||
# """Inverse of `apply_calibration`."""
|
||||
# if motor_names is None:
|
||||
# motor_names = self.motor_names
|
||||
|
||||
# for i, name in enumerate(motor_names):
|
||||
# calib_idx = self.calibration["motor_names"].index(name)
|
||||
# calib_mode = self.calibration["calib_mode"][calib_idx]
|
||||
|
||||
# if CalibrationMode[calib_mode] == CalibrationMode.LINEAR:
|
||||
# start_pos = self.calibration["start_pos"][calib_idx]
|
||||
# end_pos = self.calibration["end_pos"][calib_idx]
|
||||
|
||||
# # Convert from nominal lnear range of [0, 100] % to
|
||||
# # actual motor range of values which can be arbitrary.
|
||||
# values[i] = values[i] / 100 * (end_pos - start_pos) + start_pos
|
||||
|
||||
# values = np.round(values).astype(np.int32)
|
||||
# return values
|
||||
|
||||
|
||||
class HopeJuniorRobot:
|
||||
def __init__(self):
|
||||
self.arm_bus = FeetechMotorsBus(
|
||||
port="/dev/tty.usbmodem58760429571",
|
||||
motors={
|
||||
# "motor1": (2, "sts3250"),
|
||||
# "motor2": (1, "scs0009"),
|
||||
"shoulder_pitch": [1, "sts3250"],
|
||||
"shoulder_yaw": [2, "sts3215"], # TODO: sts3250
|
||||
"shoulder_roll": [3, "sts3215"], # TODO: sts3250
|
||||
"elbow_flex": [4, "sts3250"],
|
||||
"wrist_roll": [5, "sts3215"],
|
||||
"wrist_yaw": [6, "sts3215"],
|
||||
"wrist_pitch": [7, "sts3215"],
|
||||
},
|
||||
protocol_version=0,
|
||||
)
|
||||
self.hand_bus = FeetechMotorsBus(
|
||||
port="/dev/tty.usbmodem585A0077581",
|
||||
motors={
|
||||
"thumb_basel_rotation": [30, "scs0009"],
|
||||
"thumb_flexor": [27, "scs0009"],
|
||||
"thumb_pinky_side": [26, "scs0009"],
|
||||
"thumb_thumb_side": [28, "scs0009"],
|
||||
"index_flexor": [25, "scs0009"],
|
||||
"index_pinky_side": [31, "scs0009"],
|
||||
"index_thumb_side": [32, "scs0009"],
|
||||
"middle_flexor": [24, "scs0009"],
|
||||
"middle_pinky_side": [33, "scs0009"],
|
||||
"middle_thumb_side": [34, "scs0009"],
|
||||
"ring_flexor": [21, "scs0009"],
|
||||
"ring_pinky_side": [36, "scs0009"],
|
||||
"ring_thumb_side": [35, "scs0009"],
|
||||
"pinky_flexor": [23, "scs0009"],
|
||||
"pinky_pinky_side": [38, "scs0009"],
|
||||
"pinky_thumb_side": [37, "scs0009"],
|
||||
},
|
||||
protocol_version=1,
|
||||
group_sync_read=False,
|
||||
)
|
||||
|
||||
def get_hand_calibration(self):
|
||||
homing_offset = [0] * len(self.hand_bus.motor_names)
|
||||
drive_mode = [0] * len(self.hand_bus.motor_names)
|
||||
|
||||
start_pos = [
|
||||
500,
|
||||
900,
|
||||
1000,
|
||||
0,
|
||||
100,
|
||||
250,
|
||||
750,
|
||||
100,
|
||||
400,
|
||||
150,
|
||||
100,
|
||||
120,
|
||||
980,
|
||||
100,
|
||||
950,
|
||||
750,
|
||||
]
|
||||
|
||||
end_pos = [
|
||||
500 - 250,
|
||||
900 - 300,
|
||||
1000 - 550,
|
||||
0 + 550,
|
||||
1000,
|
||||
250 + 700,
|
||||
750 - 700,
|
||||
1000,
|
||||
400 + 700,
|
||||
150 + 700,
|
||||
1000,
|
||||
120 + 700,
|
||||
980 - 700,
|
||||
1000,
|
||||
950 - 700,
|
||||
750 - 700,
|
||||
]
|
||||
|
||||
calib_modes = [CalibrationMode.LINEAR.name] * len(self.hand_bus.motor_names)
|
||||
|
||||
calib_dict = {
|
||||
"homing_offset": homing_offset,
|
||||
"drive_mode": drive_mode,
|
||||
"start_pos": start_pos,
|
||||
"end_pos": end_pos,
|
||||
"calib_mode": calib_modes,
|
||||
"motor_names": self.hand_bus.motor_names,
|
||||
}
|
||||
return calib_dict
|
||||
|
||||
def connect(self):
|
||||
self.arm_bus.connect()
|
||||
self.hand_bus.connect()
|
||||
|
||||
|
||||
ESCAPE_KEY_ID = 27
|
||||
|
||||
|
||||
def capture_and_display_processed_frames(
|
||||
frame_processor: Callable[[np.ndarray], np.ndarray],
|
||||
window_display_name: str,
|
||||
cap_device: int = 0,
|
||||
) -> None:
|
||||
"""
|
||||
Capture frames from the given input camera device, run them through
|
||||
the frame processor, and display the outputs in a window with the given name.
|
||||
|
||||
User should press Esc to exit.
|
||||
|
||||
Inputs:
|
||||
frame_processor: Callable[[np.ndarray], np.ndarray]
|
||||
Processes frames.
|
||||
Input and output are numpy arrays of shape (H W C) with BGR channel layout and dtype uint8 / byte.
|
||||
window_display_name: str
|
||||
Name of the window used to display frames.
|
||||
cap_device: int
|
||||
Identifier for the camera to use to capture frames.
|
||||
"""
|
||||
cv2.namedWindow(window_display_name)
|
||||
capture = cv2.VideoCapture(cap_device)
|
||||
if not capture.isOpened():
|
||||
raise ValueError("Unable to open video capture.")
|
||||
|
||||
frame_count = 0
|
||||
has_frame, frame = capture.read()
|
||||
while has_frame:
|
||||
assert isinstance(frame, np.ndarray)
|
||||
|
||||
frame_count = frame_count + 1
|
||||
# mirror frame
|
||||
frame = np.ascontiguousarray(frame[:, ::-1, ::-1])
|
||||
|
||||
# process & show frame
|
||||
processed_frame = frame_processor(frame)
|
||||
cv2.imshow(window_display_name, processed_frame[:, :, ::-1])
|
||||
|
||||
has_frame, frame = capture.read()
|
||||
key = cv2.waitKey(1)
|
||||
if key == ESCAPE_KEY_ID:
|
||||
break
|
||||
|
||||
capture.release()
|
||||
|
||||
|
||||
def main():
|
||||
robot = HopeJuniorRobot()
|
||||
robot.connect()
|
||||
|
||||
# robot.hand_bus.calibration = None
|
||||
|
||||
# breakpoint()
|
||||
# print(robot.arm_bus.read("Present_Position"))
|
||||
robot.arm_bus.write("Torque_Enable", 1)
|
||||
robot.arm_bus.write("Acceleration", 20)
|
||||
robot.arm_bus.read("Acceleration")
|
||||
|
||||
calibration = robot.get_hand_calibration()
|
||||
robot.hand_bus.write("Goal_Position", calibration["start_pos"])
|
||||
# robot.hand_bus.write("Goal_Position", calibration["end_pos"][:4], robot.hand_bus.motor_names[:4])
|
||||
robot.hand_bus.set_calibration(calibration)
|
||||
lol = 1
|
||||
|
||||
# # print(motors_bus.write("Goal_Position", 500))
|
||||
# print(robot.hand_bus.read("Present_Position"))
|
||||
# # pos = hand_bus.read("Present_Position")
|
||||
# # hand_bus.write("Goal_Position", pos[0]+20, hand_bus.motor_names[0])
|
||||
# # hand_bus.write("Goal_Position", pos[i]+delta, hand_bus.motor_names[i])
|
||||
# robot.hand_bus.read("Acceleration")
|
||||
# robot.hand_bus.write("Acceleration", 10)
|
||||
|
||||
# sleep = 1
|
||||
# # robot.hand_bus.write(
|
||||
# # "Goal_Position", [glove.last_d['index_2']-1500,300,300], ["index_pinky_side", "index_flexor", "index_thumb_side"]
|
||||
# # )
|
||||
# #time.sleep(sleep)
|
||||
# time.sleep(sleep)
|
||||
# robot.hand_bus.write(
|
||||
# "Goal_Position", [100, 100, 100], ["index_flexor", "index_pinky_side", "index_thumb_side"]
|
||||
# )
|
||||
# time.sleep(sleep)
|
||||
# robot.hand_bus.write(
|
||||
# "Goal_Position", [100, 0, 0], ["middle_flexor", "middle_pinky_side", "middle_thumb_side"]
|
||||
# )
|
||||
# time.sleep(sleep)
|
||||
# robot.hand_bus.write(
|
||||
# "Goal_Position", [200, 200, 0], ["ring_flexor", "ring_pinky_side", "ring_thumb_side"]
|
||||
# )
|
||||
# time.sleep(sleep)
|
||||
# robot.hand_bus.write(
|
||||
# "Goal_Position", [200, 100, 600], ["pinky_flexor", "pinky_pinky_side", "pinky_thumb_side"]
|
||||
# )
|
||||
# time.sleep(sleep)
|
||||
|
||||
# breakpoint()
|
||||
|
||||
glove = HomonculusGlove()
|
||||
glove.run_calibration()
|
||||
# while True:
|
||||
# joint_names = ["index_1", "index_2"]
|
||||
# joint_values = glove.read(joint_names)
|
||||
# print(joint_values)
|
||||
|
||||
input()
|
||||
while True:
|
||||
joint_names = []
|
||||
joint_names += ["thumb_0", "thumb_2", "thumb_3"]
|
||||
joint_names += ["index_1", "index_2"]
|
||||
joint_names += ["middle_1", "middle_2"]
|
||||
joint_names += ["ring_1", "ring_2"]
|
||||
joint_names += ["pinky_1", "pinky_2"]
|
||||
joint_values = glove.read(joint_names)
|
||||
joint_values = joint_values.round().astype(int)
|
||||
joint_dict = {k: v for k, v in zip(joint_names, joint_values, strict=False)}
|
||||
|
||||
motor_values = []
|
||||
motor_names = []
|
||||
motor_names += ["thumb_basel_rotation", "thumb_flexor", "thumb_pinky_side", "thumb_thumb_side"]
|
||||
motor_values += [
|
||||
joint_dict["thumb_3"],
|
||||
joint_dict["thumb_0"],
|
||||
joint_dict["thumb_2"],
|
||||
joint_dict["thumb_2"],
|
||||
]
|
||||
motor_names += ["index_flexor", "index_pinky_side", "index_thumb_side"]
|
||||
motor_values += [joint_dict["index_2"], joint_dict["index_1"], joint_dict["index_1"]]
|
||||
motor_names += ["middle_flexor", "middle_pinky_side", "middle_thumb_side"]
|
||||
motor_values += [joint_dict["middle_2"], joint_dict["middle_1"], joint_dict["middle_1"]]
|
||||
motor_names += ["ring_flexor", "ring_pinky_side", "ring_thumb_side"]
|
||||
motor_values += [joint_dict["ring_2"], joint_dict["ring_1"], joint_dict["ring_1"]]
|
||||
motor_names += ["pinky_flexor", "pinky_pinky_side", "pinky_thumb_side"]
|
||||
|
||||
motor_values += [joint_dict["pinky_2"], joint_dict["pinky_1"], joint_dict["pinky_1"]]
|
||||
|
||||
motor_values = np.array(motor_values)
|
||||
motor_values = np.clip(motor_values, 0, 100)
|
||||
|
||||
robot.hand_bus.write("Goal_Position", motor_values, motor_names)
|
||||
time.sleep(0.02)
|
||||
|
||||
while True:
|
||||
# print(glove.read()['index_2']-1500)
|
||||
glove_index_flexor = glove.read()["index_2"] - 1500
|
||||
glove_index_subflexor = glove.read()["index_1"] - 1500
|
||||
glove_index_side = glove.read()["index_0"] - 2100
|
||||
|
||||
vals = [glove_index_flexor, 1000 - (glove_index_subflexor), glove_index_subflexor]
|
||||
|
||||
keys = ["index_flexor", "index_pinky_side", "index_thumb_side"]
|
||||
|
||||
glove_middle_flexor = glove.read()["middle_2"] - 1500
|
||||
glove_middle_subflexor = 1000 - (glove.read()["middle_1"] - 1700)
|
||||
vals += [glove_middle_flexor, glove_middle_subflexor, glove_middle_subflexor - 200]
|
||||
keys += ["middle_flexor", "middle_pinky_side", "middle_thumb_side"]
|
||||
|
||||
glove_ring_flexor = glove.read()["ring_2"] - 1300
|
||||
print(glove_ring_flexor)
|
||||
glove_ring_subflexor = glove.read()["ring_1"] - 1100
|
||||
|
||||
vals += [glove_ring_flexor, 1000 - glove_ring_subflexor, glove_ring_subflexor]
|
||||
keys += ["ring_flexor", "ring_pinky_side", "ring_thumb_side"]
|
||||
|
||||
glove_pinky_flexor = glove.read()["pinky_2"] - 1500
|
||||
glove_pinky_subflexor = glove.read()["pinky_1"] - 1300
|
||||
vals += [300 + glove_pinky_flexor, max(1000 - glove_pinky_subflexor - 100, 0), glove_pinky_subflexor]
|
||||
keys += ["pinky_flexor", "pinky_pinky_side", "pinky_thumb_side"]
|
||||
|
||||
robot.hand_bus.write("Goal_Position", vals, keys)
|
||||
time.sleep(0.1)
|
||||
|
||||
time.sleep(3)
|
||||
|
||||
def move_arm(loop=10):
|
||||
sleep = 1
|
||||
for i in range(loop):
|
||||
robot.arm_bus.write("Goal_Position", [1981, 2030, 2069, 2032, 1874, 1957, 1695])
|
||||
time.sleep(sleep)
|
||||
robot.arm_bus.write("Goal_Position", [1981, 2030, 2069, 2032, 1874, 1957, 1195])
|
||||
time.sleep(sleep)
|
||||
robot.arm_bus.write("Goal_Position", [1981, 2030, 2069, 2032, 1874, 1957, 2195])
|
||||
time.sleep(sleep)
|
||||
robot.arm_bus.write("Goal_Position", [1981, 2030, 2069, 2032, 1874, 1957, 1695])
|
||||
time.sleep(sleep)
|
||||
robot.arm_bus.write("Goal_Position", [1981, 2030, 2069, 2032, 1874, 1457, 1695])
|
||||
time.sleep(sleep)
|
||||
robot.arm_bus.write("Goal_Position", [1981, 2030, 2069, 2032, 1874, 2357, 1695])
|
||||
time.sleep(sleep)
|
||||
robot.arm_bus.write("Goal_Position", [1981, 2030, 2069, 2032, 1874, 1957, 1695])
|
||||
time.sleep(sleep)
|
||||
robot.arm_bus.write("Goal_Position", [1981, 2030, 2069, 2032, 974, 1957, 1695])
|
||||
time.sleep(sleep)
|
||||
robot.arm_bus.write("Goal_Position", [1981, 2030, 2069, 2032, 2674, 1957, 1695])
|
||||
time.sleep(sleep + 2)
|
||||
robot.arm_bus.write("Goal_Position", [1981, 2030, 2069, 2032, 1874, 1957, 1695])
|
||||
time.sleep(sleep)
|
||||
robot.arm_bus.write("Goal_Position", [1981, 2030, 2069, 1632, 1874, 1957, 1695])
|
||||
time.sleep(sleep)
|
||||
robot.arm_bus.write("Goal_Position", [1981, 2030, 1369, 1632, 1874, 1957, 1695])
|
||||
time.sleep(sleep)
|
||||
robot.arm_bus.write("Goal_Position", [1981, 2030, 2069, 2032, 1874, 1957, 1695])
|
||||
time.sleep(sleep)
|
||||
robot.arm_bus.write("Goal_Position", [1981, 1330, 2069, 2032, 1874, 1957, 1695])
|
||||
time.sleep(sleep)
|
||||
robot.arm_bus.write("Goal_Position", [1981, 2030, 2069, 2032, 1874, 1957, 1695])
|
||||
time.sleep(sleep)
|
||||
robot.arm_bus.write("Goal_Position", [2381, 2030, 2069, 2032, 1874, 1957, 1695])
|
||||
time.sleep(sleep)
|
||||
robot.arm_bus.write("Goal_Position", [1681, 2030, 2069, 2032, 1874, 1957, 1695])
|
||||
time.sleep(sleep)
|
||||
robot.arm_bus.write("Goal_Position", [1981, 2030, 2069, 2032, 1874, 1957, 1695])
|
||||
time.sleep(sleep)
|
||||
|
||||
def move_hand(loop=10):
|
||||
sleep = 0.5
|
||||
for i in range(loop):
|
||||
robot.hand_bus.write(
|
||||
"Goal_Position",
|
||||
[500, 1000, 0, 1000],
|
||||
["thumb_basel_rotation", "thumb_flexor", "thumb_pinky_side", "thumb_thumb_side"],
|
||||
)
|
||||
time.sleep(sleep)
|
||||
robot.hand_bus.write(
|
||||
"Goal_Position", [100, 100, 100], ["index_flexor", "index_pinky_side", "index_thumb_side"]
|
||||
)
|
||||
time.sleep(sleep)
|
||||
robot.hand_bus.write(
|
||||
"Goal_Position", [100, 1000, 150], ["middle_flexor", "middle_pinky_side", "middle_thumb_side"]
|
||||
)
|
||||
time.sleep(sleep)
|
||||
robot.hand_bus.write(
|
||||
"Goal_Position", [200, 200, 0], ["ring_flexor", "ring_pinky_side", "ring_thumb_side"]
|
||||
)
|
||||
time.sleep(sleep)
|
||||
robot.hand_bus.write(
|
||||
"Goal_Position", [200, 100, 700], ["pinky_flexor", "pinky_pinky_side", "pinky_thumb_side"]
|
||||
)
|
||||
time.sleep(sleep)
|
||||
|
||||
robot.hand_bus.write(
|
||||
"Goal_Position",
|
||||
[500, 1000 - 250, 0 + 300, 1000 - 200],
|
||||
["thumb_basel_rotation", "thumb_flexor", "thumb_pinky_side", "thumb_thumb_side"],
|
||||
)
|
||||
time.sleep(sleep)
|
||||
robot.hand_bus.write(
|
||||
"Goal_Position",
|
||||
[100 + 450, 100 + 400, 100 + 400],
|
||||
["index_flexor", "index_pinky_side", "index_thumb_side"],
|
||||
)
|
||||
time.sleep(sleep)
|
||||
robot.hand_bus.write(
|
||||
"Goal_Position",
|
||||
[100 + 350, 1000 - 450, 150 + 450],
|
||||
["middle_flexor", "middle_pinky_side", "middle_thumb_side"],
|
||||
)
|
||||
time.sleep(sleep)
|
||||
robot.hand_bus.write(
|
||||
"Goal_Position",
|
||||
[200 + 650, 200 + 350, 0 + 350],
|
||||
["ring_flexor", "ring_pinky_side", "ring_thumb_side"],
|
||||
)
|
||||
time.sleep(sleep)
|
||||
robot.hand_bus.write(
|
||||
"Goal_Position",
|
||||
[200 + 450, 100 + 400, 700 - 400],
|
||||
["pinky_flexor", "pinky_pinky_side", "pinky_thumb_side"],
|
||||
)
|
||||
time.sleep(sleep)
|
||||
|
||||
move_hand(3)
|
||||
|
||||
move_arm(1)
|
||||
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
|
||||
with ThreadPoolExecutor() as executor:
|
||||
executor.submit(move_arm)
|
||||
executor.submit(move_hand)
|
||||
|
||||
# initial position
|
||||
for i in range(3):
|
||||
robot.hand_bus.write(
|
||||
"Goal_Position", [500, 1000, 0, 1000, 100, 950, 100, 100, 1000, 150, 200, 200, 0, 200, 100, 700]
|
||||
)
|
||||
time.sleep(1)
|
||||
|
||||
# for i in range(3):
|
||||
# robot.hand_bus.write("Goal_Position", [500, 1000-150, 0+250, 1000-150,
|
||||
# 100+300, 950-250, 100+250,
|
||||
# 100+200, 1000-300, 150+300,
|
||||
# 200+500, 200+200, 0+200,
|
||||
# 200+300, 100+200, 700-200])
|
||||
# time.sleep(1)
|
||||
|
||||
# camera = 0
|
||||
# score_threshold = 0.95
|
||||
# iou_threshold = 0.3
|
||||
|
||||
# app = MediaPipeHandApp(MediaPipeHand.from_pretrained(), score_threshold, iou_threshold)
|
||||
|
||||
# def frame_processor(frame: np.ndarray) -> np.ndarray:
|
||||
# # Input Prep
|
||||
# NHWC_int_numpy_frames, NCHW_fp32_torch_frames = app_to_net_image_inputs(frame)
|
||||
|
||||
# # Run Bounding Box & Keypoint Detector
|
||||
# batched_selected_boxes, batched_selected_keypoints = app._run_box_detector(NCHW_fp32_torch_frames)
|
||||
|
||||
# # The region of interest ( bounding box of 4 (x, y) corners).
|
||||
# # list[torch.Tensor(shape=[Num Boxes, 4, 2])],
|
||||
# # where 2 == (x, y)
|
||||
# #
|
||||
# # A list element will be None if there is no selected ROI.
|
||||
# batched_roi_4corners = app._compute_object_roi(batched_selected_boxes, batched_selected_keypoints)
|
||||
|
||||
# # selected landmarks for the ROI (if any)
|
||||
# # list[torch.Tensor(shape=[Num Selected Landmarks, K, 3])],
|
||||
# # where K == number of landmark keypoints, 3 == (x, y, confidence)
|
||||
# #
|
||||
# # A list element will be None if there is no ROI.
|
||||
# landmarks_out = app._run_landmark_detector(NHWC_int_numpy_frames, batched_roi_4corners)
|
||||
|
||||
# app._draw_predictions(
|
||||
# NHWC_int_numpy_frames,
|
||||
# batched_selected_boxes,
|
||||
# batched_selected_keypoints,
|
||||
# batched_roi_4corners,
|
||||
# *landmarks_out,
|
||||
# )
|
||||
|
||||
# return NHWC_int_numpy_frames[0]
|
||||
|
||||
# capture_and_display_processed_frames(frame_processor, "QAIHM Mediapipe Hand Demo", camera)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
133
examples/test2.py
Normal file
133
examples/test2.py
Normal file
@@ -0,0 +1,133 @@
|
||||
#!/usr/bin/env python
|
||||
#
|
||||
# ********* Ping Example *********
|
||||
#
|
||||
#
|
||||
# Available SCServo model on this example : All models using Protocol SCS
|
||||
# This example is tested with a SCServo(STS/SMS/SCS), and an URT
|
||||
# Be sure that SCServo(STS/SMS/SCS) properties are already set as %% ID : 1 / Baudnum : 6 (Baudrate : 1000000)
|
||||
#
|
||||
|
||||
import os
|
||||
|
||||
if os.name == "nt":
|
||||
import msvcrt
|
||||
|
||||
def getch():
|
||||
return msvcrt.getch().decode()
|
||||
else:
|
||||
import sys
|
||||
import termios
|
||||
import tty
|
||||
|
||||
fd = sys.stdin.fileno()
|
||||
old_settings = termios.tcgetattr(fd)
|
||||
|
||||
def getch():
|
||||
try:
|
||||
tty.setraw(sys.stdin.fileno())
|
||||
ch = sys.stdin.read(1)
|
||||
finally:
|
||||
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
|
||||
return ch
|
||||
|
||||
|
||||
from scservo_sdk import * # Uses SCServo SDK library
|
||||
|
||||
# Default setting
|
||||
SCS_ID = 1 # SCServo ID : 1
|
||||
BAUDRATE = 1000000 # SCServo default baudrate : 1000000
|
||||
DEVICENAME = "/dev/tty.usbserial-2130" # Check which port is being used on your controller
|
||||
# ex) Windows: "COM1" Linux: "/dev/ttyUSB0" Mac: "/dev/tty.usbserial-*"
|
||||
|
||||
protocol_end = 1 # SCServo bit end(STS/SMS=0, SCS=1)
|
||||
|
||||
# Initialize PortHandler instance
|
||||
# Set the port path
|
||||
# Get methods and members of PortHandlerLinux or PortHandlerWindows
|
||||
portHandler = PortHandler(DEVICENAME)
|
||||
|
||||
# Initialize PacketHandler instance
|
||||
# Get methods and members of Protocol
|
||||
packetHandler = PacketHandler(protocol_end)
|
||||
|
||||
# Open port
|
||||
if portHandler.openPort():
|
||||
print("Succeeded to open the port")
|
||||
else:
|
||||
print("Failed to open the port")
|
||||
print("Press any key to terminate...")
|
||||
getch()
|
||||
quit()
|
||||
|
||||
|
||||
# Set port baudrate
|
||||
if portHandler.setBaudRate(BAUDRATE):
|
||||
print("Succeeded to change the baudrate")
|
||||
else:
|
||||
print("Failed to change the baudrate")
|
||||
print("Press any key to terminate...")
|
||||
getch()
|
||||
quit()
|
||||
|
||||
# Try to ping the SCServo
|
||||
# Get SCServo model number
|
||||
scs_model_number, scs_comm_result, scs_error = packetHandler.ping(portHandler, SCS_ID)
|
||||
if scs_comm_result != COMM_SUCCESS:
|
||||
print("%s" % packetHandler.getTxRxResult(scs_comm_result))
|
||||
elif scs_error != 0:
|
||||
print("%s" % packetHandler.getRxPacketError(scs_error))
|
||||
else:
|
||||
print("[ID:%03d] ping Succeeded. SCServo model number : %d" % (SCS_ID, scs_model_number))
|
||||
|
||||
|
||||
ADDR_SCS_PRESENT_POSITION = 56
|
||||
scs_present_position, scs_comm_result, scs_error = packetHandler.read2ByteTxRx(
|
||||
portHandler, SCS_ID, ADDR_SCS_PRESENT_POSITION
|
||||
)
|
||||
if scs_comm_result != COMM_SUCCESS:
|
||||
print(packetHandler.getTxRxResult(scs_comm_result))
|
||||
elif scs_error != 0:
|
||||
print(packetHandler.getRxPacketError(scs_error))
|
||||
|
||||
breakpoint()
|
||||
scs_present_position = SCS_LOWORD(scs_present_position)
|
||||
# scs_present_speed = SCS_HIWORD(scs_present_position_speed)
|
||||
# print("[ID:%03d] PresPos:%03d PresSpd:%03d" % (SCS_ID, scs_present_position, SCS_TOHOST(scs_present_speed, 15)))
|
||||
print("[ID:%03d] PresPos:%03d" % (SCS_ID, scs_present_position))
|
||||
|
||||
groupSyncRead = GroupSyncRead(portHandler, packetHandler, ADDR_SCS_PRESENT_POSITION, 2)
|
||||
|
||||
scs_addparam_result = groupSyncRead.addParam(SCS_ID)
|
||||
if scs_addparam_result != True:
|
||||
print("[ID:%03d] groupSyncRead addparam failed" % SCS_ID)
|
||||
quit()
|
||||
|
||||
# Syncread present position
|
||||
scs_comm_result = groupSyncRead.txRxPacket()
|
||||
if scs_comm_result != COMM_SUCCESS:
|
||||
print("%s" % packetHandler.getTxRxResult(scs_comm_result))
|
||||
|
||||
# Check if groupsyncread data of SCServo#1 is available
|
||||
scs_getdata_result = groupSyncRead.isAvailable(SCS_ID, ADDR_SCS_PRESENT_POSITION, 2)
|
||||
if scs_getdata_result == True:
|
||||
# Get SCServo#1 present position value
|
||||
scs_present_position = groupSyncRead.getData(SCS_ID, ADDR_SCS_PRESENT_POSITION, 2)
|
||||
else:
|
||||
scs_present_position = 0
|
||||
print("[ID:%03d] groupSyncRead getdata failed" % SCS_ID)
|
||||
|
||||
# # Check if groupsyncread data of SCServo#2 is available
|
||||
# scs_getdata_result = groupSyncRead.isAvailable(SCS2_ID, ADDR_SCS_PRESENT_POSITION, 2)
|
||||
# if scs_getdata_result == True:
|
||||
# # Get SCServo#2 present position value
|
||||
# scs2_present_position_speed = groupSyncRead.getData(SCS2_ID, ADDR_SCS_PRESENT_POSITION, 2)
|
||||
# else:
|
||||
# print("[ID:%03d] groupSyncRead getdata failed" % SCS2_ID)
|
||||
|
||||
scs_present_position = SCS_LOWORD(scs_present_position)
|
||||
print("[ID:%03d] PresPos:%03d" % (SCS_ID, scs_present_position))
|
||||
|
||||
|
||||
# Close port
|
||||
portHandler.closePort()
|
||||
45
examples/test3.py
Normal file
45
examples/test3.py
Normal file
@@ -0,0 +1,45 @@
|
||||
import serial
|
||||
|
||||
|
||||
class HomonculusGlove:
|
||||
def __init__(self):
|
||||
self.serial_port = "/dev/tty.usbmodem1101"
|
||||
self.baud_rate = 115200
|
||||
self.serial = serial.Serial(self.serial_port, self.baud_rate, timeout=1)
|
||||
|
||||
def read(self):
|
||||
while True:
|
||||
if self.serial.in_waiting > 0:
|
||||
vals = self.serial.readline().decode("utf-8").strip()
|
||||
vals = vals.split(" ")
|
||||
vals = [int(val) for val in vals]
|
||||
|
||||
d = {
|
||||
"thumb_0": vals[0],
|
||||
"thumb_1": vals[1],
|
||||
"thumb_2": vals[2],
|
||||
"thumb_3": vals[3],
|
||||
"index_0": vals[4],
|
||||
"index_1": vals[5],
|
||||
"index_2": vals[6],
|
||||
"middle_0": vals[7],
|
||||
"middle_1": vals[8],
|
||||
"middle_2": vals[9],
|
||||
"ring_0": vals[10],
|
||||
"ring_1": vals[11],
|
||||
"ring_2": vals[12],
|
||||
"pinky_0": vals[13],
|
||||
"pinky_1": vals[14],
|
||||
"pinky_2": vals[15],
|
||||
}
|
||||
return d
|
||||
|
||||
# if ser.in_waiting > 0:
|
||||
# line = ser.readline().decode('utf-8').strip()
|
||||
# print(line)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
glove = HomonculusGlove()
|
||||
d = glove.read()
|
||||
lol = 1
|
||||
693
examples/test4.py
Normal file
693
examples/test4.py
Normal file
@@ -0,0 +1,693 @@
|
||||
import threading
|
||||
import time
|
||||
from typing import Callable
|
||||
|
||||
import cv2
|
||||
import numpy as np
|
||||
|
||||
# from qai_hub_models.models.mediapipe_hand.app import MediaPipeHandApp
|
||||
# from qai_hub_models.models.mediapipe_hand.model import (
|
||||
# MediaPipeHand,
|
||||
# )
|
||||
# from qai_hub_models.utils.image_processing import (
|
||||
# app_to_net_image_inputs,
|
||||
# )
|
||||
from lerobot.common.robot_devices.motors.feetech import (
|
||||
CalibrationMode,
|
||||
FeetechMotorsBus,
|
||||
)
|
||||
|
||||
LOWER_BOUND_LINEAR = -100
|
||||
UPPER_BOUND_LINEAR = 200
|
||||
|
||||
import serial
|
||||
|
||||
|
||||
class HomonculusGlove:
|
||||
def __init__(self):
|
||||
self.serial_port = "/dev/tty.usbmodem1401"
|
||||
self.baud_rate = 115200
|
||||
self.serial = serial.Serial(self.serial_port, self.baud_rate, timeout=1)
|
||||
self.thread = threading.Thread(target=self.async_read)
|
||||
self.thread.start()
|
||||
self.last_d = {
|
||||
"thumb_0": 100,
|
||||
"thumb_1": 100,
|
||||
"thumb_2": 100,
|
||||
"thumb_3": 100,
|
||||
"index_0": 100,
|
||||
"index_1": 100,
|
||||
"index_2": 100,
|
||||
"middle_0": 100,
|
||||
"middle_1": 100,
|
||||
"middle_2": 100,
|
||||
"ring_0": 100,
|
||||
"ring_1": 100,
|
||||
"ring_2": 100,
|
||||
"pinky_0": 100,
|
||||
"pinky_1": 100,
|
||||
"pinky_2": 100,
|
||||
"battery_voltage": 100,
|
||||
}
|
||||
self.calibration = None
|
||||
|
||||
@property
|
||||
def joint_names(self):
|
||||
return list(self.last_d.keys())
|
||||
|
||||
def read(self, motor_names: list[str] | None = None):
|
||||
if motor_names is None:
|
||||
motor_names = self.joint_names
|
||||
|
||||
values = np.array([self.last_d[k] for k in motor_names])
|
||||
|
||||
print(motor_names)
|
||||
print(values)
|
||||
|
||||
if self.calibration is not None:
|
||||
values = self.apply_calibration(values, motor_names)
|
||||
print(values)
|
||||
return values
|
||||
|
||||
def async_read(self):
|
||||
while True:
|
||||
if self.serial.in_waiting > 0:
|
||||
self.serial.flush()
|
||||
vals = self.serial.readline().decode("utf-8").strip()
|
||||
vals = vals.split(" ")
|
||||
if len(vals) != 17:
|
||||
continue
|
||||
vals = [int(val) for val in vals]
|
||||
|
||||
d = {
|
||||
"thumb_0": vals[0],
|
||||
"thumb_1": vals[1],
|
||||
"thumb_2": vals[2],
|
||||
"thumb_3": vals[3],
|
||||
"index_0": vals[4],
|
||||
"index_1": vals[5],
|
||||
"index_2": vals[6],
|
||||
"middle_0": vals[7],
|
||||
"middle_1": vals[8],
|
||||
"middle_2": vals[9],
|
||||
"ring_0": vals[10],
|
||||
"ring_1": vals[11],
|
||||
"ring_2": vals[12],
|
||||
"pinky_0": vals[13],
|
||||
"pinky_1": vals[14],
|
||||
"pinky_2": vals[15],
|
||||
"battery_voltage": vals[16],
|
||||
}
|
||||
self.last_d = d
|
||||
# print(d.values())
|
||||
|
||||
def run_calibration(self):
|
||||
print("\nMove arm to open position")
|
||||
input("Press Enter to continue...")
|
||||
open_pos_list = []
|
||||
for _ in range(300):
|
||||
open_pos = self.read()
|
||||
open_pos_list.append(open_pos)
|
||||
time.sleep(0.01)
|
||||
open_pos = np.array(open_pos_list)
|
||||
max_open_pos = open_pos.max(axis=0)
|
||||
min_open_pos = open_pos.min(axis=0)
|
||||
|
||||
print(f"{max_open_pos=}")
|
||||
print(f"{min_open_pos=}")
|
||||
|
||||
print("\nMove arm to closed position")
|
||||
input("Press Enter to continue...")
|
||||
closed_pos_list = []
|
||||
for _ in range(300):
|
||||
closed_pos = self.read()
|
||||
closed_pos_list.append(closed_pos)
|
||||
time.sleep(0.01)
|
||||
closed_pos = np.array(closed_pos_list)
|
||||
max_closed_pos = closed_pos.max(axis=0)
|
||||
closed_pos[closed_pos < 1000] = 60000
|
||||
min_closed_pos = closed_pos.min(axis=0)
|
||||
|
||||
print(f"{max_closed_pos=}")
|
||||
print(f"{min_closed_pos=}")
|
||||
|
||||
open_pos = np.array([max_open_pos, max_closed_pos]).max(axis=0)
|
||||
closed_pos = np.array([min_open_pos, min_closed_pos]).min(axis=0)
|
||||
|
||||
# INVERTION
|
||||
# INVERTION
|
||||
# INVERTION
|
||||
# INVERTION
|
||||
# INVERTION
|
||||
# INVERTION
|
||||
# INVERTION
|
||||
for i, jname in enumerate(self.joint_names):
|
||||
if jname in [
|
||||
"thumb_0",
|
||||
"thumb_3",
|
||||
"index_2",
|
||||
"middle_2",
|
||||
"ring_2",
|
||||
"pinky_0",
|
||||
"pinky_2",
|
||||
"index_0",
|
||||
]:
|
||||
tmp_pos = open_pos[i]
|
||||
open_pos[i] = closed_pos[i]
|
||||
closed_pos[i] = tmp_pos
|
||||
|
||||
print()
|
||||
print(f"{open_pos=}")
|
||||
print(f"{closed_pos=}")
|
||||
|
||||
homing_offset = [0] * len(self.joint_names)
|
||||
drive_mode = [0] * len(self.joint_names)
|
||||
calib_modes = [CalibrationMode.LINEAR.name] * len(self.joint_names)
|
||||
|
||||
calib_dict = {
|
||||
"homing_offset": homing_offset,
|
||||
"drive_mode": drive_mode,
|
||||
"start_pos": open_pos,
|
||||
"end_pos": closed_pos,
|
||||
"calib_mode": calib_modes,
|
||||
"motor_names": self.joint_names,
|
||||
}
|
||||
# return calib_dict
|
||||
self.set_calibration(calib_dict)
|
||||
|
||||
def set_calibration(self, calibration: dict[str, list]):
|
||||
self.calibration = calibration
|
||||
|
||||
def apply_calibration(self, values: np.ndarray | list, motor_names: list[str] | None):
|
||||
"""Convert from unsigned int32 joint position range [0, 2**32[ to the universal float32 nominal degree range ]-180.0, 180.0[ with
|
||||
a "zero position" at 0 degree.
|
||||
|
||||
Note: We say "nominal degree range" since the motors can take values outside this range. For instance, 190 degrees, if the motor
|
||||
rotate more than a half a turn from the zero position. However, most motors can't rotate more than 180 degrees and will stay in this range.
|
||||
|
||||
Joints values are original in [0, 2**32[ (unsigned int32). Each motor are expected to complete a full rotation
|
||||
when given a goal position that is + or - their resolution. For instance, feetech xl330-m077 have a resolution of 4096, and
|
||||
at any position in their original range, let's say the position 56734, they complete a full rotation clockwise by moving to 60830,
|
||||
or anticlockwise by moving to 52638. The position in the original range is arbitrary and might change a lot between each motor.
|
||||
To harmonize between motors of the same model, different robots, or even models of different brands, we propose to work
|
||||
in the centered nominal degree range ]-180, 180[.
|
||||
"""
|
||||
if motor_names is None:
|
||||
motor_names = self.motor_names
|
||||
|
||||
# Convert from unsigned int32 original range [0, 2**32] to signed float32 range
|
||||
values = values.astype(np.float32)
|
||||
|
||||
for i, name in enumerate(motor_names):
|
||||
calib_idx = self.calibration["motor_names"].index(name)
|
||||
calib_mode = self.calibration["calib_mode"][calib_idx]
|
||||
|
||||
if CalibrationMode[calib_mode] == CalibrationMode.LINEAR:
|
||||
start_pos = self.calibration["start_pos"][calib_idx]
|
||||
end_pos = self.calibration["end_pos"][calib_idx]
|
||||
|
||||
# Rescale the present position to a nominal range [0, 100] %,
|
||||
# useful for joints with linear motions like Aloha gripper
|
||||
values[i] = (values[i] - start_pos) / (end_pos - start_pos) * 100
|
||||
|
||||
if (values[i] < LOWER_BOUND_LINEAR) or (values[i] > UPPER_BOUND_LINEAR):
|
||||
if name == "pinky_1" and (values[i] < LOWER_BOUND_LINEAR):
|
||||
values[i] = end_pos
|
||||
else:
|
||||
msg = (
|
||||
f"Wrong motor position range detected for {name}. "
|
||||
f"Expected to be in nominal range of [0, 100] % (a full linear translation), "
|
||||
f"with a maximum range of [{LOWER_BOUND_LINEAR}, {UPPER_BOUND_LINEAR}] % to account for some imprecision during calibration, "
|
||||
f"but present value is {values[i]} %. "
|
||||
"This might be due to a cable connection issue creating an artificial jump in motor values. "
|
||||
"You need to recalibrate by running: `python lerobot/scripts/control_robot.py calibrate`"
|
||||
)
|
||||
print(msg)
|
||||
# raise JointOutOfRangeError(msg)
|
||||
|
||||
return values
|
||||
|
||||
# def revert_calibration(self, values: np.ndarray | list, motor_names: list[str] | None):
|
||||
# """Inverse of `apply_calibration`."""
|
||||
# if motor_names is None:
|
||||
# motor_names = self.motor_names
|
||||
|
||||
# for i, name in enumerate(motor_names):
|
||||
# calib_idx = self.calibration["motor_names"].index(name)
|
||||
# calib_mode = self.calibration["calib_mode"][calib_idx]
|
||||
|
||||
# if CalibrationMode[calib_mode] == CalibrationMode.LINEAR:
|
||||
# start_pos = self.calibration["start_pos"][calib_idx]
|
||||
# end_pos = self.calibration["end_pos"][calib_idx]
|
||||
|
||||
# # Convert from nominal lnear range of [0, 100] % to
|
||||
# # actual motor range of values which can be arbitrary.
|
||||
# values[i] = values[i] / 100 * (end_pos - start_pos) + start_pos
|
||||
|
||||
# values = np.round(values).astype(np.int32)
|
||||
# return values
|
||||
|
||||
|
||||
class HopeJuniorRobot:
|
||||
def __init__(self):
|
||||
self.arm_bus = FeetechMotorsBus(
|
||||
port="/dev/tty.usbmodem58760429571",
|
||||
motors={
|
||||
# "motor1": (2, "sts3250"),
|
||||
# "motor2": (1, "scs0009"),
|
||||
"shoulder_pitch": [1, "sts3250"],
|
||||
"shoulder_yaw": [2, "sts3215"], # TODO: sts3250
|
||||
"shoulder_roll": [3, "sts3215"], # TODO: sts3250
|
||||
"elbow_flex": [4, "sts3250"],
|
||||
"wrist_roll": [5, "sts3215"],
|
||||
"wrist_yaw": [6, "sts3215"],
|
||||
"wrist_pitch": [7, "sts3215"],
|
||||
},
|
||||
protocol_version=0,
|
||||
)
|
||||
self.hand_bus = FeetechMotorsBus(
|
||||
port="/dev/tty.usbmodem585A0077581",
|
||||
motors={
|
||||
"thumb_basel_rotation": [30, "scs0009"],
|
||||
"thumb_flexor": [27, "scs0009"],
|
||||
"thumb_pinky_side": [26, "scs0009"],
|
||||
"thumb_thumb_side": [28, "scs0009"],
|
||||
"index_flexor": [25, "scs0009"],
|
||||
"index_pinky_side": [31, "scs0009"],
|
||||
"index_thumb_side": [32, "scs0009"],
|
||||
"middle_flexor": [24, "scs0009"],
|
||||
"middle_pinky_side": [33, "scs0009"],
|
||||
"middle_thumb_side": [34, "scs0009"],
|
||||
"ring_flexor": [21, "scs0009"],
|
||||
"ring_pinky_side": [36, "scs0009"],
|
||||
"ring_thumb_side": [35, "scs0009"],
|
||||
"pinky_flexor": [23, "scs0009"],
|
||||
"pinky_pinky_side": [38, "scs0009"],
|
||||
"pinky_thumb_side": [37, "scs0009"],
|
||||
},
|
||||
protocol_version=1,
|
||||
group_sync_read=False,
|
||||
)
|
||||
|
||||
def get_hand_calibration(self):
|
||||
homing_offset = [0] * len(self.hand_bus.motor_names)
|
||||
drive_mode = [0] * len(self.hand_bus.motor_names)
|
||||
|
||||
start_pos = [
|
||||
500,
|
||||
900,
|
||||
1000,
|
||||
0,
|
||||
100,
|
||||
250,
|
||||
750,
|
||||
100,
|
||||
400,
|
||||
150,
|
||||
100,
|
||||
120,
|
||||
980,
|
||||
100,
|
||||
950,
|
||||
750,
|
||||
]
|
||||
|
||||
end_pos = [
|
||||
500 - 250,
|
||||
900 - 300,
|
||||
1000 - 550,
|
||||
0 + 550,
|
||||
1000,
|
||||
start_pos[5] + 500,
|
||||
start_pos[6] - 500,
|
||||
1000,
|
||||
400 + 700,
|
||||
150 + 700,
|
||||
1000,
|
||||
120 + 700,
|
||||
980 - 700,
|
||||
1000,
|
||||
950 - 700,
|
||||
750 - 700,
|
||||
]
|
||||
|
||||
calib_modes = [CalibrationMode.LINEAR.name] * len(self.hand_bus.motor_names)
|
||||
|
||||
calib_dict = {
|
||||
"homing_offset": homing_offset,
|
||||
"drive_mode": drive_mode,
|
||||
"start_pos": start_pos,
|
||||
"end_pos": end_pos,
|
||||
"calib_mode": calib_modes,
|
||||
"motor_names": self.hand_bus.motor_names,
|
||||
}
|
||||
return calib_dict
|
||||
|
||||
def connect(self):
|
||||
self.arm_bus.connect()
|
||||
self.hand_bus.connect()
|
||||
|
||||
|
||||
ESCAPE_KEY_ID = 27
|
||||
|
||||
|
||||
def capture_and_display_processed_frames(
|
||||
frame_processor: Callable[[np.ndarray], np.ndarray],
|
||||
window_display_name: str,
|
||||
cap_device: int = 0,
|
||||
) -> None:
|
||||
"""
|
||||
Capture frames from the given input camera device, run them through
|
||||
the frame processor, and display the outputs in a window with the given name.
|
||||
|
||||
User should press Esc to exit.
|
||||
|
||||
Inputs:
|
||||
frame_processor: Callable[[np.ndarray], np.ndarray]
|
||||
Processes frames.
|
||||
Input and output are numpy arrays of shape (H W C) with BGR channel layout and dtype uint8 / byte.
|
||||
window_display_name: str
|
||||
Name of the window used to display frames.
|
||||
cap_device: int
|
||||
Identifier for the camera to use to capture frames.
|
||||
"""
|
||||
cv2.namedWindow(window_display_name)
|
||||
capture = cv2.VideoCapture(cap_device)
|
||||
if not capture.isOpened():
|
||||
raise ValueError("Unable to open video capture.")
|
||||
|
||||
frame_count = 0
|
||||
has_frame, frame = capture.read()
|
||||
while has_frame:
|
||||
assert isinstance(frame, np.ndarray)
|
||||
|
||||
frame_count = frame_count + 1
|
||||
# mirror frame
|
||||
frame = np.ascontiguousarray(frame[:, ::-1, ::-1])
|
||||
|
||||
# process & show frame
|
||||
processed_frame = frame_processor(frame)
|
||||
cv2.imshow(window_display_name, processed_frame[:, :, ::-1])
|
||||
|
||||
has_frame, frame = capture.read()
|
||||
key = cv2.waitKey(1)
|
||||
if key == ESCAPE_KEY_ID:
|
||||
break
|
||||
|
||||
capture.release()
|
||||
|
||||
|
||||
def main():
|
||||
robot = HopeJuniorRobot()
|
||||
robot.connect()
|
||||
|
||||
# robot.hand_bus.calibration = None
|
||||
|
||||
# breakpoint()
|
||||
# print(robot.arm_bus.read("Present_Position"))
|
||||
robot.arm_bus.write("Torque_Enable", 1)
|
||||
robot.arm_bus.write("Acceleration", 20)
|
||||
robot.arm_bus.read("Acceleration")
|
||||
|
||||
calibration = robot.get_hand_calibration()
|
||||
robot.hand_bus.write("Goal_Position", calibration["start_pos"])
|
||||
# robot.hand_bus.write("Goal_Position", calibration["end_pos"][:4], robot.hand_bus.motor_names[:4])
|
||||
robot.hand_bus.set_calibration(calibration)
|
||||
lol = 1
|
||||
|
||||
# # print(motors_bus.write("Goal_Position", 500))
|
||||
# print(robot.hand_bus.read("Present_Position"))
|
||||
# # pos = hand_bus.read("Present_Position")
|
||||
# # hand_bus.write("Goal_Position", pos[0]+20, hand_bus.motor_names[0])
|
||||
# # hand_bus.write("Goal_Position", pos[i]+delta, hand_bus.motor_names[i])
|
||||
# robot.hand_bus.read("Acceleration")
|
||||
# robot.hand_bus.write("Acceleration", 10)
|
||||
|
||||
# sleep = 1
|
||||
# # robot.hand_bus.write(
|
||||
# # "Goal_Position", [glove.last_d['index_2']-1500,300,300], ["index_pinky_side", "index_flexor", "index_thumb_side"]
|
||||
# # )
|
||||
# #time.sleep(sleep)
|
||||
# time.sleep(sleep)
|
||||
# robot.hand_bus.write(
|
||||
# "Goal_Position", [100, 100, 100], ["index_flexor", "index_pinky_side", "index_thumb_side"]
|
||||
# )
|
||||
# time.sleep(sleep)
|
||||
# robot.hand_bus.write(
|
||||
# "Goal_Position", [100, 0, 0], ["middle_flexor", "middle_pinky_side", "middle_thumb_side"]
|
||||
# )
|
||||
# time.sleep(sleep)
|
||||
# robot.hand_bus.write(
|
||||
# "Goal_Position", [200, 200, 0], ["ring_flexor", "ring_pinky_side", "ring_thumb_side"]
|
||||
# )
|
||||
# time.sleep(sleep)
|
||||
# robot.hand_bus.write(
|
||||
# "Goal_Position", [200, 100, 600], ["pinky_flexor", "pinky_pinky_side", "pinky_thumb_side"]
|
||||
# )
|
||||
# time.sleep(sleep)
|
||||
|
||||
# breakpoint()
|
||||
|
||||
glove = HomonculusGlove()
|
||||
glove.run_calibration()
|
||||
# while True:
|
||||
# joint_names = ["index_1", "index_2"]
|
||||
# joint_values = glove.read(joint_names)
|
||||
# print(joint_values)
|
||||
|
||||
input()
|
||||
while True:
|
||||
joint_names = []
|
||||
# joint_names += ["thumb_0", "thumb_2", "thumb_3"]
|
||||
joint_names += ["index_0", "index_1"]
|
||||
# joint_names += ["middle_1", "middle_2"]
|
||||
# joint_names += ["ring_1", "ring_2"]
|
||||
# joint_names += ["pinky_0", "pinky_2"]
|
||||
joint_values = glove.read(joint_names)
|
||||
joint_values = joint_values.round().astype(int)
|
||||
joint_dict = {k: v for k, v in zip(joint_names, joint_values, strict=False)}
|
||||
|
||||
motor_values = []
|
||||
motor_names = []
|
||||
# motor_names += ["thumb_basel_rotation", "thumb_flexor", "thumb_pinky_side", "thumb_thumb_side"]
|
||||
# motor_values += [joint_dict["thumb_3"], joint_dict["thumb_0"], joint_dict["thumb_2"], joint_dict["thumb_2"]]
|
||||
motor_names += ["index_pinky_side", "index_thumb_side"]
|
||||
# if joint_dict["index_0"] -2100 > 0:
|
||||
splayamount = 0.5
|
||||
motor_values += [
|
||||
(100 - joint_dict["index_0"]) * splayamount + joint_dict["index_1"] * (1 - splayamount),
|
||||
(joint_dict["index_0"]) * splayamount + joint_dict["index_1"] * (1 - splayamount),
|
||||
]
|
||||
# else:
|
||||
# motor_values += [100-joint_dict["index_0"], joint_dict["index_0"]]
|
||||
|
||||
# motor_names += ["middle_flexor", "middle_pinky_side", "middle_thumb_side"]
|
||||
# motor_values += [joint_dict["middle_2"], joint_dict["middle_1"], joint_dict["middle_1"]]
|
||||
# motor_names += ["ring_flexor", "ring_pinky_side", "ring_thumb_side"]
|
||||
# motor_values += [joint_dict["ring_2"], joint_dict["ring_1"], joint_dict["ring_1"]]
|
||||
# motor_names += ["pinky_flexor", "pinky_pinky_side", "pinky_thumb_side"]
|
||||
|
||||
# motor_values += [joint_dict["pinky_2"], joint_dict["pinky_0"], joint_dict["pinky_0"]]
|
||||
|
||||
motor_values = np.array(motor_values)
|
||||
motor_values = np.clip(motor_values, 0, 100)
|
||||
|
||||
robot.hand_bus.write("Goal_Position", motor_values, motor_names)
|
||||
time.sleep(0.02)
|
||||
|
||||
while True:
|
||||
# print(glove.read()['index_2']-1500)
|
||||
glove_index_flexor = glove.read()["index_2"] - 1500
|
||||
glove_index_subflexor = glove.read()["index_1"] - 1500
|
||||
glove_index_side = glove.read()["index_0"] - 2100
|
||||
|
||||
vals = [glove_index_flexor, 1000 - (glove_index_subflexor), glove_index_subflexor]
|
||||
|
||||
keys = ["index_flexor", "index_pinky_side", "index_thumb_side"]
|
||||
|
||||
glove_middle_flexor = glove.read()["middle_2"] - 1500
|
||||
glove_middle_subflexor = 1000 - (glove.read()["middle_1"] - 1700)
|
||||
vals += [glove_middle_flexor, glove_middle_subflexor, glove_middle_subflexor - 200]
|
||||
keys += ["middle_flexor", "middle_pinky_side", "middle_thumb_side"]
|
||||
|
||||
glove_ring_flexor = glove.read()["ring_2"] - 1300
|
||||
print(glove_ring_flexor)
|
||||
glove_ring_subflexor = glove.read()["ring_1"] - 1100
|
||||
|
||||
vals += [glove_ring_flexor, 1000 - glove_ring_subflexor, glove_ring_subflexor]
|
||||
keys += ["ring_flexor", "ring_pinky_side", "ring_thumb_side"]
|
||||
|
||||
glove_pinky_flexor = glove.read()["pinky_2"] - 1500
|
||||
glove_pinky_subflexor = glove.read()["pinky_1"] - 1300
|
||||
vals += [300 + glove_pinky_flexor, max(1000 - glove_pinky_subflexor - 100, 0), glove_pinky_subflexor]
|
||||
keys += ["pinky_flexor", "pinky_pinky_side", "pinky_thumb_side"]
|
||||
|
||||
robot.hand_bus.write("Goal_Position", vals, keys)
|
||||
time.sleep(0.1)
|
||||
|
||||
time.sleep(3)
|
||||
|
||||
def move_arm(loop=10):
|
||||
sleep = 1
|
||||
for i in range(loop):
|
||||
robot.arm_bus.write("Goal_Position", [1981, 2030, 2069, 2032, 1874, 1957, 1695])
|
||||
time.sleep(sleep)
|
||||
robot.arm_bus.write("Goal_Position", [1981, 2030, 2069, 2032, 1874, 1957, 1195])
|
||||
time.sleep(sleep)
|
||||
robot.arm_bus.write("Goal_Position", [1981, 2030, 2069, 2032, 1874, 1957, 2195])
|
||||
time.sleep(sleep)
|
||||
robot.arm_bus.write("Goal_Position", [1981, 2030, 2069, 2032, 1874, 1957, 1695])
|
||||
time.sleep(sleep)
|
||||
robot.arm_bus.write("Goal_Position", [1981, 2030, 2069, 2032, 1874, 1457, 1695])
|
||||
time.sleep(sleep)
|
||||
robot.arm_bus.write("Goal_Position", [1981, 2030, 2069, 2032, 1874, 2357, 1695])
|
||||
time.sleep(sleep)
|
||||
robot.arm_bus.write("Goal_Position", [1981, 2030, 2069, 2032, 1874, 1957, 1695])
|
||||
time.sleep(sleep)
|
||||
robot.arm_bus.write("Goal_Position", [1981, 2030, 2069, 2032, 974, 1957, 1695])
|
||||
time.sleep(sleep)
|
||||
robot.arm_bus.write("Goal_Position", [1981, 2030, 2069, 2032, 2674, 1957, 1695])
|
||||
time.sleep(sleep + 2)
|
||||
robot.arm_bus.write("Goal_Position", [1981, 2030, 2069, 2032, 1874, 1957, 1695])
|
||||
time.sleep(sleep)
|
||||
robot.arm_bus.write("Goal_Position", [1981, 2030, 2069, 1632, 1874, 1957, 1695])
|
||||
time.sleep(sleep)
|
||||
robot.arm_bus.write("Goal_Position", [1981, 2030, 1369, 1632, 1874, 1957, 1695])
|
||||
time.sleep(sleep)
|
||||
robot.arm_bus.write("Goal_Position", [1981, 2030, 2069, 2032, 1874, 1957, 1695])
|
||||
time.sleep(sleep)
|
||||
robot.arm_bus.write("Goal_Position", [1981, 1330, 2069, 2032, 1874, 1957, 1695])
|
||||
time.sleep(sleep)
|
||||
robot.arm_bus.write("Goal_Position", [1981, 2030, 2069, 2032, 1874, 1957, 1695])
|
||||
time.sleep(sleep)
|
||||
robot.arm_bus.write("Goal_Position", [2381, 2030, 2069, 2032, 1874, 1957, 1695])
|
||||
time.sleep(sleep)
|
||||
robot.arm_bus.write("Goal_Position", [1681, 2030, 2069, 2032, 1874, 1957, 1695])
|
||||
time.sleep(sleep)
|
||||
robot.arm_bus.write("Goal_Position", [1981, 2030, 2069, 2032, 1874, 1957, 1695])
|
||||
time.sleep(sleep)
|
||||
|
||||
def move_hand(loop=10):
|
||||
sleep = 0.5
|
||||
for i in range(loop):
|
||||
robot.hand_bus.write(
|
||||
"Goal_Position",
|
||||
[500, 1000, 0, 1000],
|
||||
["thumb_basel_rotation", "thumb_flexor", "thumb_pinky_side", "thumb_thumb_side"],
|
||||
)
|
||||
time.sleep(sleep)
|
||||
robot.hand_bus.write(
|
||||
"Goal_Position", [100, 100, 100], ["index_flexor", "index_pinky_side", "index_thumb_side"]
|
||||
)
|
||||
time.sleep(sleep)
|
||||
robot.hand_bus.write(
|
||||
"Goal_Position", [100, 1000, 150], ["middle_flexor", "middle_pinky_side", "middle_thumb_side"]
|
||||
)
|
||||
time.sleep(sleep)
|
||||
robot.hand_bus.write(
|
||||
"Goal_Position", [200, 200, 0], ["ring_flexor", "ring_pinky_side", "ring_thumb_side"]
|
||||
)
|
||||
time.sleep(sleep)
|
||||
robot.hand_bus.write(
|
||||
"Goal_Position", [200, 100, 700], ["pinky_flexor", "pinky_pinky_side", "pinky_thumb_side"]
|
||||
)
|
||||
time.sleep(sleep)
|
||||
|
||||
robot.hand_bus.write(
|
||||
"Goal_Position",
|
||||
[500, 1000 - 250, 0 + 300, 1000 - 200],
|
||||
["thumb_basel_rotation", "thumb_flexor", "thumb_pinky_side", "thumb_thumb_side"],
|
||||
)
|
||||
time.sleep(sleep)
|
||||
robot.hand_bus.write(
|
||||
"Goal_Position",
|
||||
[100 + 450, 100 + 400, 100 + 400],
|
||||
["index_flexor", "index_pinky_side", "index_thumb_side"],
|
||||
)
|
||||
time.sleep(sleep)
|
||||
robot.hand_bus.write(
|
||||
"Goal_Position",
|
||||
[100 + 350, 1000 - 450, 150 + 450],
|
||||
["middle_flexor", "middle_pinky_side", "middle_thumb_side"],
|
||||
)
|
||||
time.sleep(sleep)
|
||||
robot.hand_bus.write(
|
||||
"Goal_Position",
|
||||
[200 + 650, 200 + 350, 0 + 350],
|
||||
["ring_flexor", "ring_pinky_side", "ring_thumb_side"],
|
||||
)
|
||||
time.sleep(sleep)
|
||||
robot.hand_bus.write(
|
||||
"Goal_Position",
|
||||
[200 + 450, 100 + 400, 700 - 400],
|
||||
["pinky_flexor", "pinky_pinky_side", "pinky_thumb_side"],
|
||||
)
|
||||
time.sleep(sleep)
|
||||
|
||||
move_hand(3)
|
||||
|
||||
move_arm(1)
|
||||
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
|
||||
with ThreadPoolExecutor() as executor:
|
||||
executor.submit(move_arm)
|
||||
executor.submit(move_hand)
|
||||
|
||||
# initial position
|
||||
for i in range(3):
|
||||
robot.hand_bus.write(
|
||||
"Goal_Position", [500, 1000, 0, 1000, 100, 950, 100, 100, 1000, 150, 200, 200, 0, 200, 100, 700]
|
||||
)
|
||||
time.sleep(1)
|
||||
|
||||
# for i in range(3):
|
||||
# robot.hand_bus.write("Goal_Position", [500, 1000-150, 0+250, 1000-150,
|
||||
# 100+300, 950-250, 100+250,
|
||||
# 100+200, 1000-300, 150+300,
|
||||
# 200+500, 200+200, 0+200,
|
||||
# 200+300, 100+200, 700-200])
|
||||
# time.sleep(1)
|
||||
|
||||
# camera = 0
|
||||
# score_threshold = 0.95
|
||||
# iou_threshold = 0.3
|
||||
|
||||
# app = MediaPipeHandApp(MediaPipeHand.from_pretrained(), score_threshold, iou_threshold)
|
||||
|
||||
# def frame_processor(frame: np.ndarray) -> np.ndarray:
|
||||
# # Input Prep
|
||||
# NHWC_int_numpy_frames, NCHW_fp32_torch_frames = app_to_net_image_inputs(frame)
|
||||
|
||||
# # Run Bounding Box & Keypoint Detector
|
||||
# batched_selected_boxes, batched_selected_keypoints = app._run_box_detector(NCHW_fp32_torch_frames)
|
||||
|
||||
# # The region of interest ( bounding box of 4 (x, y) corners).
|
||||
# # list[torch.Tensor(shape=[Num Boxes, 4, 2])],
|
||||
# # where 2 == (x, y)
|
||||
# #
|
||||
# # A list element will be None if there is no selected ROI.
|
||||
# batched_roi_4corners = app._compute_object_roi(batched_selected_boxes, batched_selected_keypoints)
|
||||
|
||||
# # selected landmarks for the ROI (if any)
|
||||
# # list[torch.Tensor(shape=[Num Selected Landmarks, K, 3])],
|
||||
# # where K == number of landmark keypoints, 3 == (x, y, confidence)
|
||||
# #
|
||||
# # A list element will be None if there is no ROI.
|
||||
# landmarks_out = app._run_landmark_detector(NHWC_int_numpy_frames, batched_roi_4corners)
|
||||
|
||||
# app._draw_predictions(
|
||||
# NHWC_int_numpy_frames,
|
||||
# batched_selected_boxes,
|
||||
# batched_selected_keypoints,
|
||||
# batched_roi_4corners,
|
||||
# *landmarks_out,
|
||||
# )
|
||||
|
||||
# return NHWC_int_numpy_frames[0]
|
||||
|
||||
# capture_and_display_processed_frames(frame_processor, "QAIHM Mediapipe Hand Demo", camera)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -37,7 +37,7 @@ HALF_TURN_DEGREE = 180
|
||||
# See this link for STS3215 Memory Table:
|
||||
# https://docs.google.com/spreadsheets/d/1GVs7W1VS1PqdhA1nW-abeyAHhTUxKUdR/edit?usp=sharing&ouid=116566590112741600240&rtpof=true&sd=true
|
||||
# data_name: (address, size_byte)
|
||||
SCS_SERIES_CONTROL_TABLE = {
|
||||
STS_SERIES_CONTROL_TABLE = {
|
||||
"Model": (3, 2),
|
||||
"ID": (5, 1),
|
||||
"Baud_Rate": (6, 1),
|
||||
@@ -87,6 +87,70 @@ SCS_SERIES_CONTROL_TABLE = {
|
||||
"Maximum_Acceleration": (85, 2),
|
||||
}
|
||||
|
||||
SCS_SERIES_CONTROL_TABLE = {
|
||||
"Model": (3, 2),
|
||||
"ID": (5, 1),
|
||||
"Baud_Rate": (6, 1),
|
||||
"Return_Delay": (7, 1),
|
||||
"Response_Status_Level": (8, 1),
|
||||
"Min_Angle_Limit": (9, 2),
|
||||
"Max_Angle_Limit": (11, 2),
|
||||
"Max_Temperature_Limit": (13, 1),
|
||||
"Max_Voltage_Limit": (14, 1),
|
||||
"Min_Voltage_Limit": (15, 1),
|
||||
"Max_Torque_Limit": (16, 2),
|
||||
"Phase": (18, 1),
|
||||
"Unloading_Condition": (19, 1),
|
||||
"LED_Alarm_Condition": (20, 1),
|
||||
"P_Coefficient": (21, 1),
|
||||
"D_Coefficient": (22, 1),
|
||||
"I_Coefficient": (23, 1),
|
||||
"Minimum_Startup_Force": (24, 2),
|
||||
"CW_Dead_Zone": (26, 1),
|
||||
"CCW_Dead_Zone": (27, 1),
|
||||
# "Protection_Current": (28, 2),
|
||||
# "Angular_Resolution": (30, 1),
|
||||
# "Offset": (31, 2),
|
||||
# "Mode": (33, 1),
|
||||
"Protective_Torque": (37, 1),
|
||||
"Protection_Time": (38, 1),
|
||||
"Torque_Enable": (40, 1),
|
||||
"Acceleration": (41, 1),
|
||||
"Goal_Position": (42, 2),
|
||||
"Running_Time": (44, 2),
|
||||
"Goal_Speed": (46, 2),
|
||||
"Lock": (48, 1),
|
||||
"Present_Position": (56, 2),
|
||||
"Present_Speed": (58, 2),
|
||||
"Present_Load": (60, 2),
|
||||
"Present_Voltage": (62, 1),
|
||||
"Present_Temperature": (63, 1),
|
||||
"Sync_Write_Flag": (64, 1),
|
||||
"Status": (65, 1),
|
||||
"Moving": (66, 1),
|
||||
# "Overload_Torque": (36, 1),
|
||||
# "Speed_closed_loop_P_proportional_coefficient": (37, 1),
|
||||
# "Over_Current_Protection_Time": (38, 1),
|
||||
# "Velocity_closed_loop_I_integral_coefficient": (39, 1),
|
||||
# "Acceleration": (41, 1),
|
||||
# "Goal_Time": (44, 2),
|
||||
# "Torque_Limit": (48, 2),
|
||||
# "Present_Current": (69, 2),
|
||||
# # Not in the Memory Table
|
||||
# "Maximum_Acceleration": (85, 2),
|
||||
}
|
||||
|
||||
STS_SERIES_BAUDRATE_TABLE = {
|
||||
0: 1_000_000,
|
||||
1: 500_000,
|
||||
2: 250_000,
|
||||
3: 128_000,
|
||||
4: 115_200,
|
||||
5: 57_600,
|
||||
6: 38_400,
|
||||
7: 19_200,
|
||||
}
|
||||
|
||||
SCS_SERIES_BAUDRATE_TABLE = {
|
||||
0: 1_000_000,
|
||||
1: 500_000,
|
||||
@@ -103,22 +167,31 @@ CONVERT_UINT32_TO_INT32_REQUIRED = ["Goal_Position", "Present_Position"]
|
||||
|
||||
|
||||
MODEL_CONTROL_TABLE = {
|
||||
"sts_series": STS_SERIES_CONTROL_TABLE,
|
||||
"scs_series": SCS_SERIES_CONTROL_TABLE,
|
||||
"sts3215": SCS_SERIES_CONTROL_TABLE,
|
||||
"sts3215": STS_SERIES_CONTROL_TABLE,
|
||||
"sts3250": STS_SERIES_CONTROL_TABLE,
|
||||
"scs0009": SCS_SERIES_CONTROL_TABLE,
|
||||
}
|
||||
|
||||
MODEL_RESOLUTION = {
|
||||
"scs_series": 4096,
|
||||
"sts_series": 4096,
|
||||
"scs_series": 1024,
|
||||
"sts3215": 4096,
|
||||
"sts3250": 4096,
|
||||
"scs0009": 1024,
|
||||
}
|
||||
|
||||
MODEL_BAUDRATE_TABLE = {
|
||||
"sts_series": STS_SERIES_BAUDRATE_TABLE,
|
||||
"scs_series": SCS_SERIES_BAUDRATE_TABLE,
|
||||
"sts3215": SCS_SERIES_BAUDRATE_TABLE,
|
||||
"sts3215": STS_SERIES_BAUDRATE_TABLE,
|
||||
"sts3250": STS_SERIES_BAUDRATE_TABLE,
|
||||
"scs0009": SCS_SERIES_BAUDRATE_TABLE,
|
||||
}
|
||||
|
||||
# High number of retries is needed for feetech compared to dynamixel motors.
|
||||
NUM_READ_RETRY = 20
|
||||
NUM_READ_RETRY = 50
|
||||
NUM_WRITE_RETRY = 20
|
||||
|
||||
|
||||
@@ -273,12 +346,18 @@ class FeetechMotorsBus:
|
||||
self,
|
||||
port: str,
|
||||
motors: dict[str, tuple[int, str]],
|
||||
group_sync_read: bool = True,
|
||||
group_sync_write: bool = True,
|
||||
protocol_version: int = 0,
|
||||
extra_model_control_table: dict[str, list[tuple]] | None = None,
|
||||
extra_model_resolution: dict[str, int] | None = None,
|
||||
mock=False,
|
||||
):
|
||||
self.port = port
|
||||
self.motors = motors
|
||||
self.group_sync_read = group_sync_read
|
||||
self.group_sync_write = group_sync_write
|
||||
self.protocol_version = protocol_version
|
||||
self.mock = mock
|
||||
|
||||
self.model_ctrl_table = deepcopy(MODEL_CONTROL_TABLE)
|
||||
@@ -311,7 +390,7 @@ class FeetechMotorsBus:
|
||||
import scservo_sdk as scs
|
||||
|
||||
self.port_handler = scs.PortHandler(self.port)
|
||||
self.packet_handler = scs.PacketHandler(PROTOCOL_VERSION)
|
||||
self.packet_handler = scs.PacketHandler(self.protocol_version)
|
||||
|
||||
try:
|
||||
if not self.port_handler.openPort():
|
||||
@@ -335,7 +414,7 @@ class FeetechMotorsBus:
|
||||
import scservo_sdk as scs
|
||||
|
||||
self.port_handler = scs.PortHandler(self.port)
|
||||
self.packet_handler = scs.PacketHandler(PROTOCOL_VERSION)
|
||||
self.packet_handler = scs.PacketHandler(self.protocol_version)
|
||||
|
||||
if not self.port_handler.openPort():
|
||||
raise OSError(f"Failed to open port '{self.port}'.")
|
||||
@@ -661,6 +740,8 @@ class FeetechMotorsBus:
|
||||
else:
|
||||
import scservo_sdk as scs
|
||||
|
||||
scs.SCS_SETEND(self.protocol_version)
|
||||
|
||||
return_list = True
|
||||
if not isinstance(motor_ids, list):
|
||||
return_list = False
|
||||
@@ -699,6 +780,8 @@ class FeetechMotorsBus:
|
||||
else:
|
||||
import scservo_sdk as scs
|
||||
|
||||
scs.SCS_SETEND(self.protocol_version)
|
||||
|
||||
if not self.is_connected:
|
||||
raise RobotDeviceNotConnectedError(
|
||||
f"FeetechMotorsBus({self.port}) is not connected. You need to run `motors_bus.connect()`."
|
||||
@@ -721,31 +804,51 @@ class FeetechMotorsBus:
|
||||
|
||||
assert_same_address(self.model_ctrl_table, models, data_name)
|
||||
addr, bytes = self.model_ctrl_table[model][data_name]
|
||||
group_key = get_group_sync_key(data_name, motor_names)
|
||||
|
||||
if data_name not in self.group_readers:
|
||||
# create new group reader
|
||||
self.group_readers[group_key] = scs.GroupSyncRead(
|
||||
self.port_handler, self.packet_handler, addr, bytes
|
||||
)
|
||||
if self.group_sync_read:
|
||||
group_key = get_group_sync_key(data_name, motor_names)
|
||||
|
||||
if data_name not in self.group_readers:
|
||||
# create new group reader
|
||||
self.group_readers[group_key] = scs.GroupSyncRead(
|
||||
self.port_handler, self.packet_handler, addr, bytes
|
||||
)
|
||||
for idx in motor_ids:
|
||||
self.group_readers[group_key].addParam(idx)
|
||||
|
||||
for _ in range(NUM_READ_RETRY):
|
||||
comm = self.group_readers[group_key].txRxPacket()
|
||||
if comm == scs.COMM_SUCCESS:
|
||||
break
|
||||
|
||||
if comm != scs.COMM_SUCCESS:
|
||||
raise ConnectionError(
|
||||
f"Read failed due to communication error on port {self.port} for group_key {group_key}: "
|
||||
f"{self.packet_handler.getTxRxResult(comm)}"
|
||||
)
|
||||
|
||||
values = []
|
||||
for idx in motor_ids:
|
||||
self.group_readers[group_key].addParam(idx)
|
||||
value = self.group_readers[group_key].getData(idx, addr, bytes)
|
||||
values.append(value)
|
||||
else:
|
||||
values = []
|
||||
for idx in motor_ids:
|
||||
if bytes == 1:
|
||||
value, comm, error = self.packet_handler.read1ByteTxRx(self.port_handler, idx, addr)
|
||||
elif bytes == 2:
|
||||
value, comm, error = self.packet_handler.read2ByteTxRx(self.port_handler, idx, addr)
|
||||
elif bytes == 4:
|
||||
value, comm, error = self.packet_handler.read4ByteTxRx(self.port_handler, idx, addr)
|
||||
else:
|
||||
raise ValueError(bytes)
|
||||
|
||||
for _ in range(NUM_READ_RETRY):
|
||||
comm = self.group_readers[group_key].txRxPacket()
|
||||
if comm == scs.COMM_SUCCESS:
|
||||
break
|
||||
if comm != scs.COMM_SUCCESS:
|
||||
raise ConnectionError(self.packet_handler.getTxRxResult(comm))
|
||||
elif error != 0:
|
||||
raise ConnectionError(self.packet_handler.getRxPacketError(error))
|
||||
|
||||
if comm != scs.COMM_SUCCESS:
|
||||
raise ConnectionError(
|
||||
f"Read failed due to communication error on port {self.port} for group_key {group_key}: "
|
||||
f"{self.packet_handler.getTxRxResult(comm)}"
|
||||
)
|
||||
|
||||
values = []
|
||||
for idx in motor_ids:
|
||||
value = self.group_readers[group_key].getData(idx, addr, bytes)
|
||||
values.append(value)
|
||||
values.append(value)
|
||||
|
||||
values = np.array(values)
|
||||
|
||||
@@ -775,6 +878,8 @@ class FeetechMotorsBus:
|
||||
else:
|
||||
import scservo_sdk as scs
|
||||
|
||||
scs.SCS_SETEND(self.protocol_version)
|
||||
|
||||
if not isinstance(motor_ids, list):
|
||||
motor_ids = [motor_ids]
|
||||
if not isinstance(values, list):
|
||||
@@ -811,6 +916,8 @@ class FeetechMotorsBus:
|
||||
else:
|
||||
import scservo_sdk as scs
|
||||
|
||||
scs.SCS_SETEND(self.protocol_version)
|
||||
|
||||
if motor_names is None:
|
||||
motor_names = self.motor_names
|
||||
|
||||
@@ -836,27 +943,31 @@ class FeetechMotorsBus:
|
||||
|
||||
assert_same_address(self.model_ctrl_table, models, data_name)
|
||||
addr, bytes = self.model_ctrl_table[model][data_name]
|
||||
group_key = get_group_sync_key(data_name, motor_names)
|
||||
|
||||
init_group = data_name not in self.group_readers
|
||||
if init_group:
|
||||
self.group_writers[group_key] = scs.GroupSyncWrite(
|
||||
self.port_handler, self.packet_handler, addr, bytes
|
||||
)
|
||||
if self.group_sync_write:
|
||||
group_key = get_group_sync_key(data_name, motor_names)
|
||||
|
||||
for idx, value in zip(motor_ids, values, strict=True):
|
||||
data = convert_to_bytes(value, bytes, self.mock)
|
||||
init_group = data_name not in self.group_readers
|
||||
if init_group:
|
||||
self.group_writers[group_key].addParam(idx, data)
|
||||
else:
|
||||
self.group_writers[group_key].changeParam(idx, data)
|
||||
self.group_writers[group_key] = scs.GroupSyncWrite(
|
||||
self.port_handler, self.packet_handler, addr, bytes
|
||||
)
|
||||
|
||||
comm = self.group_writers[group_key].txPacket()
|
||||
if comm != scs.COMM_SUCCESS:
|
||||
raise ConnectionError(
|
||||
f"Write failed due to communication error on port {self.port} for group_key {group_key}: "
|
||||
f"{self.packet_handler.getTxRxResult(comm)}"
|
||||
)
|
||||
for idx, value in zip(motor_ids, values, strict=True):
|
||||
data = convert_to_bytes(value, bytes, self.mock)
|
||||
if init_group:
|
||||
self.group_writers[group_key].addParam(idx, data)
|
||||
else:
|
||||
self.group_writers[group_key].changeParam(idx, data)
|
||||
|
||||
comm = self.group_writers[group_key].txPacket()
|
||||
if comm != scs.COMM_SUCCESS:
|
||||
raise ConnectionError(
|
||||
f"Write failed due to communication error on port {self.port} for group_key {group_key}: "
|
||||
f"{self.packet_handler.getTxRxResult(comm)}"
|
||||
)
|
||||
else:
|
||||
raise NotImplementedError()
|
||||
|
||||
# log the number of seconds it took to write the data to the motors
|
||||
delta_ts_name = get_log_name("delta_timestamp_s", "write", data_name, motor_names)
|
||||
|
||||
@@ -306,16 +306,16 @@ def run_arm_auto_calibration_moss(arm: MotorsBus, robot_type: str, arm_name: str
|
||||
calib = {}
|
||||
|
||||
print("Calibrate shoulder_pan")
|
||||
calib["shoulder_pan"] = move_to_calibrate(arm, "shoulder_pan", load_threshold=350, count_threshold=200)
|
||||
calib["shoulder_pan"] = move_to_calibrate(arm, "shoulder_pan")
|
||||
arm.write("Goal_Position", calib["shoulder_pan"]["zero_pos"], "shoulder_pan")
|
||||
time.sleep(1)
|
||||
|
||||
print("Calibrate gripper")
|
||||
calib["gripper"] = move_to_calibrate(arm, "gripper", invert_drive_mode=True, count_threshold=200)
|
||||
calib["gripper"] = move_to_calibrate(arm, "gripper", invert_drive_mode=True)
|
||||
time.sleep(1)
|
||||
|
||||
print("Calibrate wrist_flex")
|
||||
calib["wrist_flex"] = move_to_calibrate(arm, "wrist_flex", invert_drive_mode=True, count_threshold=200)
|
||||
calib["wrist_flex"] = move_to_calibrate(arm, "wrist_flex", invert_drive_mode=True)
|
||||
calib["wrist_flex"] = apply_offset(calib["wrist_flex"], offset=-210 + 1024)
|
||||
|
||||
wr_pos = arm.read("Present_Position", "wrist_roll")
|
||||
@@ -329,7 +329,7 @@ def run_arm_auto_calibration_moss(arm: MotorsBus, robot_type: str, arm_name: str
|
||||
time.sleep(1)
|
||||
|
||||
print("Calibrate wrist_roll")
|
||||
calib["wrist_roll"] = move_to_calibrate(arm, "wrist_roll", invert_drive_mode=True, count_threshold=200)
|
||||
calib["wrist_roll"] = move_to_calibrate(arm, "wrist_roll", invert_drive_mode=True)
|
||||
calib["wrist_roll"] = apply_offset(calib["wrist_roll"], offset=790)
|
||||
|
||||
arm.write("Goal_Position", calib["wrist_roll"]["zero_pos"] - 1024, "wrist_roll")
|
||||
@@ -348,7 +348,6 @@ def run_arm_auto_calibration_moss(arm: MotorsBus, robot_type: str, arm_name: str
|
||||
arm,
|
||||
"elbow_flex",
|
||||
invert_drive_mode=True,
|
||||
count_threshold=200,
|
||||
in_between_move_hook=in_between_move_elbow_flex_hook,
|
||||
)
|
||||
arm.write("Goal_Position", calib["wrist_flex"]["zero_pos"] - 1024, "wrist_flex")
|
||||
|
||||
73
lerobot/configs/robot/hopejr.yaml
Normal file
73
lerobot/configs/robot/hopejr.yaml
Normal file
@@ -0,0 +1,73 @@
|
||||
# [SO-100 robot arm](https://github.com/TheRobotStudio/SO-ARM100)
|
||||
|
||||
# Requires installing extras packages
|
||||
# With pip: `pip install -e ".[feetech]"`
|
||||
# With poetry: `poetry install --sync --extras "feetech"`
|
||||
|
||||
# See [tutorial](https://github.com/huggingface/lerobot/blob/main/examples/10_use_so100.md)
|
||||
|
||||
_target_: lerobot.common.robot_devices.robots.manipulator.ManipulatorRobot
|
||||
robot_type: hopejr
|
||||
calibration_dir: .cache/calibration/hopejr
|
||||
|
||||
# `max_relative_target` limits the magnitude of the relative positional target vector for safety purposes.
|
||||
# Set this to a positive scalar to have the same value for all motors, or a list that is the same length as
|
||||
# the number of motors in your follower arms.
|
||||
max_relative_target: null
|
||||
|
||||
# leader_arms:
|
||||
# main:
|
||||
# _target_: lerobot.common.robot_devices.motors.feetech.FeetechMotorsBus
|
||||
# port: /dev/tty.usbmodem585A0077581
|
||||
# motors:
|
||||
# # name: (index, model)
|
||||
# shoulder_pan: [1, "sts3215"]
|
||||
# shoulder_lift: [2, "sts3215"]
|
||||
# elbow_flex: [3, "sts3215"]
|
||||
# wrist_flex: [4, "sts3215"]
|
||||
# wrist_roll: [5, "sts3215"]
|
||||
# gripper: [6, "sts3215"]
|
||||
|
||||
follower_arms:
|
||||
main:
|
||||
_target_: lerobot.common.robot_devices.motors.feetech.FeetechMotorsBus
|
||||
port: /dev/tty.usbserial-2130
|
||||
motors:
|
||||
# name: (index, model)
|
||||
shoulder_pitch: [1, "sts3250"]
|
||||
shoulder_yaw: [2, "sts3215"] # TODO: sts3250
|
||||
shoulder_roll: [3, "sts3215"] # TODO: sts3250
|
||||
elbow_flex: [4, "sts3250"]
|
||||
wrist_roll: [5, "sts3215"]
|
||||
wrist_yaw: [6, "sts3215"]
|
||||
wrist_pitch: [7, "sts3215"]
|
||||
thumb_basel_rotation: [30, "scs0009"]
|
||||
thumb_flexion: [27, "scs0009"]
|
||||
thumb_pinky_side: [26, "scs0009"]
|
||||
thumb_thumb_side: [28, "scs0009"]
|
||||
index_flexor: [25, "scs0009"]
|
||||
index_pinky_side: [31, "scs0009"]
|
||||
index_thumb_side: [32, "scs0009"]
|
||||
middle_flexor: [24, "scs0009"]
|
||||
middle_pinky_side: [33, "scs0009"]
|
||||
middle_thumb_side: [34, "scs0009"]
|
||||
ring_flexor: [21, "scs0009"]
|
||||
ring_pinky_side: [36, "scs0009"]
|
||||
ring_thumb_side: [35, "scs0009"]
|
||||
pinky_flexor: [23, "scs0009"]
|
||||
pinky_pinky_side: [38, "scs0009"]
|
||||
pinky_thumb_side: [37, "scs0009"]
|
||||
|
||||
cameras:
|
||||
laptop:
|
||||
_target_: lerobot.common.robot_devices.cameras.opencv.OpenCVCamera
|
||||
camera_index: 0
|
||||
fps: 30
|
||||
width: 640
|
||||
height: 480
|
||||
phone:
|
||||
_target_: lerobot.common.robot_devices.cameras.opencv.OpenCVCamera
|
||||
camera_index: 1
|
||||
fps: 30
|
||||
width: 640
|
||||
height: 480
|
||||
@@ -18,6 +18,10 @@ def convert_to_bytes(value, bytes):
|
||||
return value
|
||||
|
||||
|
||||
def SCS_SETEND(protocol_version):
|
||||
del protocol_version
|
||||
|
||||
|
||||
def get_default_motor_values(motor_index):
|
||||
return {
|
||||
# Key (int) are from SCS_SERIES_CONTROL_TABLE
|
||||
|
||||
Reference in New Issue
Block a user