Compare commits

...

4 Commits

Author SHA1 Message Date
Remi Cadene
f996a13f70 Add test3 test4 2024-12-29 13:30:17 +01:00
Remi Cadene
743ebfa7c1 Cremaillaire HF 2024-12-24 11:33:55 +01:00
Remi Cadene
2c45660d77 WIP 2024-11-27 13:59:37 +01:00
Remi Cadene
9dd4414c6e fix autocalib moss 2024-10-26 10:43:52 +02:00
8 changed files with 1789 additions and 50 deletions

681
examples/test.py Normal file
View File

@@ -0,0 +1,681 @@
import threading
import time
from typing import Callable
import cv2
import numpy as np
# from qai_hub_models.models.mediapipe_hand.app import MediaPipeHandApp
# from qai_hub_models.models.mediapipe_hand.model import (
# MediaPipeHand,
# )
# from qai_hub_models.utils.image_processing import (
# app_to_net_image_inputs,
# )
from lerobot.common.robot_devices.motors.feetech import (
CalibrationMode,
FeetechMotorsBus,
)
LOWER_BOUND_LINEAR = -100
UPPER_BOUND_LINEAR = 200
import serial
class HomonculusGlove:
def __init__(self):
self.serial_port = "/dev/tty.usbmodem21401"
self.baud_rate = 115200
self.serial = serial.Serial(self.serial_port, self.baud_rate, timeout=1)
self.thread = threading.Thread(target=self.async_read)
self.thread.start()
self.last_d = {
"thumb_0": 100,
"thumb_1": 100,
"thumb_2": 100,
"thumb_3": 100,
"index_0": 100,
"index_1": 100,
"index_2": 100,
"middle_0": 100,
"middle_1": 100,
"middle_2": 100,
"ring_0": 100,
"ring_1": 100,
"ring_2": 100,
"pinky_0": 100,
"pinky_1": 100,
"pinky_2": 100,
"battery_voltage": 100,
}
self.calibration = None
@property
def joint_names(self):
return list(self.last_d.keys())
def read(self, motor_names: list[str] | None = None):
if motor_names is None:
motor_names = self.joint_names
values = np.array([self.last_d[k] for k in motor_names])
print(motor_names)
print(values)
if self.calibration is not None:
values = self.apply_calibration(values, motor_names)
print(values)
return values
def async_read(self):
while True:
if self.serial.in_waiting > 0:
self.serial.flush()
vals = self.serial.readline().decode("utf-8").strip()
vals = vals.split(" ")
if len(vals) != 17:
continue
vals = [int(val) for val in vals]
d = {
"thumb_0": vals[0],
"thumb_1": vals[1],
"thumb_2": vals[2],
"thumb_3": vals[3],
"index_0": vals[4],
"index_1": vals[5],
"index_2": vals[6],
"middle_0": vals[7],
"middle_1": vals[8],
"middle_2": vals[9],
"ring_0": vals[10],
"ring_1": vals[11],
"ring_2": vals[12],
"pinky_0": vals[13],
"pinky_1": vals[14],
"pinky_2": vals[15],
"battery_voltage": vals[16],
}
self.last_d = d
# print(d.values())
def run_calibration(self):
print("\nMove arm to open position")
input("Press Enter to continue...")
open_pos_list = []
for _ in range(300):
open_pos = self.read()
open_pos_list.append(open_pos)
time.sleep(0.01)
open_pos = np.array(open_pos_list)
max_open_pos = open_pos.max(axis=0)
min_open_pos = open_pos.min(axis=0)
print(f"{max_open_pos=}")
print(f"{min_open_pos=}")
print("\nMove arm to closed position")
input("Press Enter to continue...")
closed_pos_list = []
for _ in range(300):
closed_pos = self.read()
closed_pos_list.append(closed_pos)
time.sleep(0.01)
closed_pos = np.array(closed_pos_list)
max_closed_pos = closed_pos.max(axis=0)
closed_pos[closed_pos < 1000] = 60000
min_closed_pos = closed_pos.min(axis=0)
print(f"{max_closed_pos=}")
print(f"{min_closed_pos=}")
open_pos = np.array([max_open_pos, max_closed_pos]).max(axis=0)
closed_pos = np.array([min_open_pos, min_closed_pos]).min(axis=0)
# INVERTION
# INVERTION
# INVERTION
# INVERTION
# INVERTION
# INVERTION
# INVERTION
for i, jname in enumerate(self.joint_names):
if jname in ["thumb_0", "thumb_3", "index_2", "middle_2", "ring_2", "pinky_0", "pinky_2"]:
tmp_pos = open_pos[i]
open_pos[i] = closed_pos[i]
closed_pos[i] = tmp_pos
print()
print(f"{open_pos=}")
print(f"{closed_pos=}")
homing_offset = [0] * len(self.joint_names)
drive_mode = [0] * len(self.joint_names)
calib_modes = [CalibrationMode.LINEAR.name] * len(self.joint_names)
calib_dict = {
"homing_offset": homing_offset,
"drive_mode": drive_mode,
"start_pos": open_pos,
"end_pos": closed_pos,
"calib_mode": calib_modes,
"motor_names": self.joint_names,
}
# return calib_dict
self.set_calibration(calib_dict)
def set_calibration(self, calibration: dict[str, list]):
self.calibration = calibration
def apply_calibration(self, values: np.ndarray | list, motor_names: list[str] | None):
"""Convert from unsigned int32 joint position range [0, 2**32[ to the universal float32 nominal degree range ]-180.0, 180.0[ with
a "zero position" at 0 degree.
Note: We say "nominal degree range" since the motors can take values outside this range. For instance, 190 degrees, if the motor
rotate more than a half a turn from the zero position. However, most motors can't rotate more than 180 degrees and will stay in this range.
Joints values are original in [0, 2**32[ (unsigned int32). Each motor are expected to complete a full rotation
when given a goal position that is + or - their resolution. For instance, feetech xl330-m077 have a resolution of 4096, and
at any position in their original range, let's say the position 56734, they complete a full rotation clockwise by moving to 60830,
or anticlockwise by moving to 52638. The position in the original range is arbitrary and might change a lot between each motor.
To harmonize between motors of the same model, different robots, or even models of different brands, we propose to work
in the centered nominal degree range ]-180, 180[.
"""
if motor_names is None:
motor_names = self.motor_names
# Convert from unsigned int32 original range [0, 2**32] to signed float32 range
values = values.astype(np.float32)
for i, name in enumerate(motor_names):
calib_idx = self.calibration["motor_names"].index(name)
calib_mode = self.calibration["calib_mode"][calib_idx]
if CalibrationMode[calib_mode] == CalibrationMode.LINEAR:
start_pos = self.calibration["start_pos"][calib_idx]
end_pos = self.calibration["end_pos"][calib_idx]
# Rescale the present position to a nominal range [0, 100] %,
# useful for joints with linear motions like Aloha gripper
values[i] = (values[i] - start_pos) / (end_pos - start_pos) * 100
if (values[i] < LOWER_BOUND_LINEAR) or (values[i] > UPPER_BOUND_LINEAR):
if name == "pinky_1" and (values[i] < LOWER_BOUND_LINEAR):
values[i] = end_pos
else:
msg = (
f"Wrong motor position range detected for {name}. "
f"Expected to be in nominal range of [0, 100] % (a full linear translation), "
f"with a maximum range of [{LOWER_BOUND_LINEAR}, {UPPER_BOUND_LINEAR}] % to account for some imprecision during calibration, "
f"but present value is {values[i]} %. "
"This might be due to a cable connection issue creating an artificial jump in motor values. "
"You need to recalibrate by running: `python lerobot/scripts/control_robot.py calibrate`"
)
print(msg)
# raise JointOutOfRangeError(msg)
return values
# def revert_calibration(self, values: np.ndarray | list, motor_names: list[str] | None):
# """Inverse of `apply_calibration`."""
# if motor_names is None:
# motor_names = self.motor_names
# for i, name in enumerate(motor_names):
# calib_idx = self.calibration["motor_names"].index(name)
# calib_mode = self.calibration["calib_mode"][calib_idx]
# if CalibrationMode[calib_mode] == CalibrationMode.LINEAR:
# start_pos = self.calibration["start_pos"][calib_idx]
# end_pos = self.calibration["end_pos"][calib_idx]
# # Convert from nominal lnear range of [0, 100] % to
# # actual motor range of values which can be arbitrary.
# values[i] = values[i] / 100 * (end_pos - start_pos) + start_pos
# values = np.round(values).astype(np.int32)
# return values
class HopeJuniorRobot:
def __init__(self):
self.arm_bus = FeetechMotorsBus(
port="/dev/tty.usbmodem58760429571",
motors={
# "motor1": (2, "sts3250"),
# "motor2": (1, "scs0009"),
"shoulder_pitch": [1, "sts3250"],
"shoulder_yaw": [2, "sts3215"], # TODO: sts3250
"shoulder_roll": [3, "sts3215"], # TODO: sts3250
"elbow_flex": [4, "sts3250"],
"wrist_roll": [5, "sts3215"],
"wrist_yaw": [6, "sts3215"],
"wrist_pitch": [7, "sts3215"],
},
protocol_version=0,
)
self.hand_bus = FeetechMotorsBus(
port="/dev/tty.usbmodem585A0077581",
motors={
"thumb_basel_rotation": [30, "scs0009"],
"thumb_flexor": [27, "scs0009"],
"thumb_pinky_side": [26, "scs0009"],
"thumb_thumb_side": [28, "scs0009"],
"index_flexor": [25, "scs0009"],
"index_pinky_side": [31, "scs0009"],
"index_thumb_side": [32, "scs0009"],
"middle_flexor": [24, "scs0009"],
"middle_pinky_side": [33, "scs0009"],
"middle_thumb_side": [34, "scs0009"],
"ring_flexor": [21, "scs0009"],
"ring_pinky_side": [36, "scs0009"],
"ring_thumb_side": [35, "scs0009"],
"pinky_flexor": [23, "scs0009"],
"pinky_pinky_side": [38, "scs0009"],
"pinky_thumb_side": [37, "scs0009"],
},
protocol_version=1,
group_sync_read=False,
)
def get_hand_calibration(self):
homing_offset = [0] * len(self.hand_bus.motor_names)
drive_mode = [0] * len(self.hand_bus.motor_names)
start_pos = [
500,
900,
1000,
0,
100,
250,
750,
100,
400,
150,
100,
120,
980,
100,
950,
750,
]
end_pos = [
500 - 250,
900 - 300,
1000 - 550,
0 + 550,
1000,
250 + 700,
750 - 700,
1000,
400 + 700,
150 + 700,
1000,
120 + 700,
980 - 700,
1000,
950 - 700,
750 - 700,
]
calib_modes = [CalibrationMode.LINEAR.name] * len(self.hand_bus.motor_names)
calib_dict = {
"homing_offset": homing_offset,
"drive_mode": drive_mode,
"start_pos": start_pos,
"end_pos": end_pos,
"calib_mode": calib_modes,
"motor_names": self.hand_bus.motor_names,
}
return calib_dict
def connect(self):
self.arm_bus.connect()
self.hand_bus.connect()
ESCAPE_KEY_ID = 27
def capture_and_display_processed_frames(
frame_processor: Callable[[np.ndarray], np.ndarray],
window_display_name: str,
cap_device: int = 0,
) -> None:
"""
Capture frames from the given input camera device, run them through
the frame processor, and display the outputs in a window with the given name.
User should press Esc to exit.
Inputs:
frame_processor: Callable[[np.ndarray], np.ndarray]
Processes frames.
Input and output are numpy arrays of shape (H W C) with BGR channel layout and dtype uint8 / byte.
window_display_name: str
Name of the window used to display frames.
cap_device: int
Identifier for the camera to use to capture frames.
"""
cv2.namedWindow(window_display_name)
capture = cv2.VideoCapture(cap_device)
if not capture.isOpened():
raise ValueError("Unable to open video capture.")
frame_count = 0
has_frame, frame = capture.read()
while has_frame:
assert isinstance(frame, np.ndarray)
frame_count = frame_count + 1
# mirror frame
frame = np.ascontiguousarray(frame[:, ::-1, ::-1])
# process & show frame
processed_frame = frame_processor(frame)
cv2.imshow(window_display_name, processed_frame[:, :, ::-1])
has_frame, frame = capture.read()
key = cv2.waitKey(1)
if key == ESCAPE_KEY_ID:
break
capture.release()
def main():
robot = HopeJuniorRobot()
robot.connect()
# robot.hand_bus.calibration = None
# breakpoint()
# print(robot.arm_bus.read("Present_Position"))
robot.arm_bus.write("Torque_Enable", 1)
robot.arm_bus.write("Acceleration", 20)
robot.arm_bus.read("Acceleration")
calibration = robot.get_hand_calibration()
robot.hand_bus.write("Goal_Position", calibration["start_pos"])
# robot.hand_bus.write("Goal_Position", calibration["end_pos"][:4], robot.hand_bus.motor_names[:4])
robot.hand_bus.set_calibration(calibration)
lol = 1
# # print(motors_bus.write("Goal_Position", 500))
# print(robot.hand_bus.read("Present_Position"))
# # pos = hand_bus.read("Present_Position")
# # hand_bus.write("Goal_Position", pos[0]+20, hand_bus.motor_names[0])
# # hand_bus.write("Goal_Position", pos[i]+delta, hand_bus.motor_names[i])
# robot.hand_bus.read("Acceleration")
# robot.hand_bus.write("Acceleration", 10)
# sleep = 1
# # robot.hand_bus.write(
# # "Goal_Position", [glove.last_d['index_2']-1500,300,300], ["index_pinky_side", "index_flexor", "index_thumb_side"]
# # )
# #time.sleep(sleep)
# time.sleep(sleep)
# robot.hand_bus.write(
# "Goal_Position", [100, 100, 100], ["index_flexor", "index_pinky_side", "index_thumb_side"]
# )
# time.sleep(sleep)
# robot.hand_bus.write(
# "Goal_Position", [100, 0, 0], ["middle_flexor", "middle_pinky_side", "middle_thumb_side"]
# )
# time.sleep(sleep)
# robot.hand_bus.write(
# "Goal_Position", [200, 200, 0], ["ring_flexor", "ring_pinky_side", "ring_thumb_side"]
# )
# time.sleep(sleep)
# robot.hand_bus.write(
# "Goal_Position", [200, 100, 600], ["pinky_flexor", "pinky_pinky_side", "pinky_thumb_side"]
# )
# time.sleep(sleep)
# breakpoint()
glove = HomonculusGlove()
glove.run_calibration()
# while True:
# joint_names = ["index_1", "index_2"]
# joint_values = glove.read(joint_names)
# print(joint_values)
input()
while True:
joint_names = []
joint_names += ["thumb_0", "thumb_2", "thumb_3"]
joint_names += ["index_1", "index_2"]
joint_names += ["middle_1", "middle_2"]
joint_names += ["ring_1", "ring_2"]
joint_names += ["pinky_1", "pinky_2"]
joint_values = glove.read(joint_names)
joint_values = joint_values.round().astype(int)
joint_dict = {k: v for k, v in zip(joint_names, joint_values, strict=False)}
motor_values = []
motor_names = []
motor_names += ["thumb_basel_rotation", "thumb_flexor", "thumb_pinky_side", "thumb_thumb_side"]
motor_values += [
joint_dict["thumb_3"],
joint_dict["thumb_0"],
joint_dict["thumb_2"],
joint_dict["thumb_2"],
]
motor_names += ["index_flexor", "index_pinky_side", "index_thumb_side"]
motor_values += [joint_dict["index_2"], joint_dict["index_1"], joint_dict["index_1"]]
motor_names += ["middle_flexor", "middle_pinky_side", "middle_thumb_side"]
motor_values += [joint_dict["middle_2"], joint_dict["middle_1"], joint_dict["middle_1"]]
motor_names += ["ring_flexor", "ring_pinky_side", "ring_thumb_side"]
motor_values += [joint_dict["ring_2"], joint_dict["ring_1"], joint_dict["ring_1"]]
motor_names += ["pinky_flexor", "pinky_pinky_side", "pinky_thumb_side"]
motor_values += [joint_dict["pinky_2"], joint_dict["pinky_1"], joint_dict["pinky_1"]]
motor_values = np.array(motor_values)
motor_values = np.clip(motor_values, 0, 100)
robot.hand_bus.write("Goal_Position", motor_values, motor_names)
time.sleep(0.02)
while True:
# print(glove.read()['index_2']-1500)
glove_index_flexor = glove.read()["index_2"] - 1500
glove_index_subflexor = glove.read()["index_1"] - 1500
glove_index_side = glove.read()["index_0"] - 2100
vals = [glove_index_flexor, 1000 - (glove_index_subflexor), glove_index_subflexor]
keys = ["index_flexor", "index_pinky_side", "index_thumb_side"]
glove_middle_flexor = glove.read()["middle_2"] - 1500
glove_middle_subflexor = 1000 - (glove.read()["middle_1"] - 1700)
vals += [glove_middle_flexor, glove_middle_subflexor, glove_middle_subflexor - 200]
keys += ["middle_flexor", "middle_pinky_side", "middle_thumb_side"]
glove_ring_flexor = glove.read()["ring_2"] - 1300
print(glove_ring_flexor)
glove_ring_subflexor = glove.read()["ring_1"] - 1100
vals += [glove_ring_flexor, 1000 - glove_ring_subflexor, glove_ring_subflexor]
keys += ["ring_flexor", "ring_pinky_side", "ring_thumb_side"]
glove_pinky_flexor = glove.read()["pinky_2"] - 1500
glove_pinky_subflexor = glove.read()["pinky_1"] - 1300
vals += [300 + glove_pinky_flexor, max(1000 - glove_pinky_subflexor - 100, 0), glove_pinky_subflexor]
keys += ["pinky_flexor", "pinky_pinky_side", "pinky_thumb_side"]
robot.hand_bus.write("Goal_Position", vals, keys)
time.sleep(0.1)
time.sleep(3)
def move_arm(loop=10):
sleep = 1
for i in range(loop):
robot.arm_bus.write("Goal_Position", [1981, 2030, 2069, 2032, 1874, 1957, 1695])
time.sleep(sleep)
robot.arm_bus.write("Goal_Position", [1981, 2030, 2069, 2032, 1874, 1957, 1195])
time.sleep(sleep)
robot.arm_bus.write("Goal_Position", [1981, 2030, 2069, 2032, 1874, 1957, 2195])
time.sleep(sleep)
robot.arm_bus.write("Goal_Position", [1981, 2030, 2069, 2032, 1874, 1957, 1695])
time.sleep(sleep)
robot.arm_bus.write("Goal_Position", [1981, 2030, 2069, 2032, 1874, 1457, 1695])
time.sleep(sleep)
robot.arm_bus.write("Goal_Position", [1981, 2030, 2069, 2032, 1874, 2357, 1695])
time.sleep(sleep)
robot.arm_bus.write("Goal_Position", [1981, 2030, 2069, 2032, 1874, 1957, 1695])
time.sleep(sleep)
robot.arm_bus.write("Goal_Position", [1981, 2030, 2069, 2032, 974, 1957, 1695])
time.sleep(sleep)
robot.arm_bus.write("Goal_Position", [1981, 2030, 2069, 2032, 2674, 1957, 1695])
time.sleep(sleep + 2)
robot.arm_bus.write("Goal_Position", [1981, 2030, 2069, 2032, 1874, 1957, 1695])
time.sleep(sleep)
robot.arm_bus.write("Goal_Position", [1981, 2030, 2069, 1632, 1874, 1957, 1695])
time.sleep(sleep)
robot.arm_bus.write("Goal_Position", [1981, 2030, 1369, 1632, 1874, 1957, 1695])
time.sleep(sleep)
robot.arm_bus.write("Goal_Position", [1981, 2030, 2069, 2032, 1874, 1957, 1695])
time.sleep(sleep)
robot.arm_bus.write("Goal_Position", [1981, 1330, 2069, 2032, 1874, 1957, 1695])
time.sleep(sleep)
robot.arm_bus.write("Goal_Position", [1981, 2030, 2069, 2032, 1874, 1957, 1695])
time.sleep(sleep)
robot.arm_bus.write("Goal_Position", [2381, 2030, 2069, 2032, 1874, 1957, 1695])
time.sleep(sleep)
robot.arm_bus.write("Goal_Position", [1681, 2030, 2069, 2032, 1874, 1957, 1695])
time.sleep(sleep)
robot.arm_bus.write("Goal_Position", [1981, 2030, 2069, 2032, 1874, 1957, 1695])
time.sleep(sleep)
def move_hand(loop=10):
sleep = 0.5
for i in range(loop):
robot.hand_bus.write(
"Goal_Position",
[500, 1000, 0, 1000],
["thumb_basel_rotation", "thumb_flexor", "thumb_pinky_side", "thumb_thumb_side"],
)
time.sleep(sleep)
robot.hand_bus.write(
"Goal_Position", [100, 100, 100], ["index_flexor", "index_pinky_side", "index_thumb_side"]
)
time.sleep(sleep)
robot.hand_bus.write(
"Goal_Position", [100, 1000, 150], ["middle_flexor", "middle_pinky_side", "middle_thumb_side"]
)
time.sleep(sleep)
robot.hand_bus.write(
"Goal_Position", [200, 200, 0], ["ring_flexor", "ring_pinky_side", "ring_thumb_side"]
)
time.sleep(sleep)
robot.hand_bus.write(
"Goal_Position", [200, 100, 700], ["pinky_flexor", "pinky_pinky_side", "pinky_thumb_side"]
)
time.sleep(sleep)
robot.hand_bus.write(
"Goal_Position",
[500, 1000 - 250, 0 + 300, 1000 - 200],
["thumb_basel_rotation", "thumb_flexor", "thumb_pinky_side", "thumb_thumb_side"],
)
time.sleep(sleep)
robot.hand_bus.write(
"Goal_Position",
[100 + 450, 100 + 400, 100 + 400],
["index_flexor", "index_pinky_side", "index_thumb_side"],
)
time.sleep(sleep)
robot.hand_bus.write(
"Goal_Position",
[100 + 350, 1000 - 450, 150 + 450],
["middle_flexor", "middle_pinky_side", "middle_thumb_side"],
)
time.sleep(sleep)
robot.hand_bus.write(
"Goal_Position",
[200 + 650, 200 + 350, 0 + 350],
["ring_flexor", "ring_pinky_side", "ring_thumb_side"],
)
time.sleep(sleep)
robot.hand_bus.write(
"Goal_Position",
[200 + 450, 100 + 400, 700 - 400],
["pinky_flexor", "pinky_pinky_side", "pinky_thumb_side"],
)
time.sleep(sleep)
move_hand(3)
move_arm(1)
from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor() as executor:
executor.submit(move_arm)
executor.submit(move_hand)
# initial position
for i in range(3):
robot.hand_bus.write(
"Goal_Position", [500, 1000, 0, 1000, 100, 950, 100, 100, 1000, 150, 200, 200, 0, 200, 100, 700]
)
time.sleep(1)
# for i in range(3):
# robot.hand_bus.write("Goal_Position", [500, 1000-150, 0+250, 1000-150,
# 100+300, 950-250, 100+250,
# 100+200, 1000-300, 150+300,
# 200+500, 200+200, 0+200,
# 200+300, 100+200, 700-200])
# time.sleep(1)
# camera = 0
# score_threshold = 0.95
# iou_threshold = 0.3
# app = MediaPipeHandApp(MediaPipeHand.from_pretrained(), score_threshold, iou_threshold)
# def frame_processor(frame: np.ndarray) -> np.ndarray:
# # Input Prep
# NHWC_int_numpy_frames, NCHW_fp32_torch_frames = app_to_net_image_inputs(frame)
# # Run Bounding Box & Keypoint Detector
# batched_selected_boxes, batched_selected_keypoints = app._run_box_detector(NCHW_fp32_torch_frames)
# # The region of interest ( bounding box of 4 (x, y) corners).
# # list[torch.Tensor(shape=[Num Boxes, 4, 2])],
# # where 2 == (x, y)
# #
# # A list element will be None if there is no selected ROI.
# batched_roi_4corners = app._compute_object_roi(batched_selected_boxes, batched_selected_keypoints)
# # selected landmarks for the ROI (if any)
# # list[torch.Tensor(shape=[Num Selected Landmarks, K, 3])],
# # where K == number of landmark keypoints, 3 == (x, y, confidence)
# #
# # A list element will be None if there is no ROI.
# landmarks_out = app._run_landmark_detector(NHWC_int_numpy_frames, batched_roi_4corners)
# app._draw_predictions(
# NHWC_int_numpy_frames,
# batched_selected_boxes,
# batched_selected_keypoints,
# batched_roi_4corners,
# *landmarks_out,
# )
# return NHWC_int_numpy_frames[0]
# capture_and_display_processed_frames(frame_processor, "QAIHM Mediapipe Hand Demo", camera)
if __name__ == "__main__":
main()

133
examples/test2.py Normal file
View File

@@ -0,0 +1,133 @@
#!/usr/bin/env python
#
# ********* Ping Example *********
#
#
# Available SCServo model on this example : All models using Protocol SCS
# This example is tested with a SCServo(STS/SMS/SCS), and an URT
# Be sure that SCServo(STS/SMS/SCS) properties are already set as %% ID : 1 / Baudnum : 6 (Baudrate : 1000000)
#
import os
if os.name == "nt":
import msvcrt
def getch():
return msvcrt.getch().decode()
else:
import sys
import termios
import tty
fd = sys.stdin.fileno()
old_settings = termios.tcgetattr(fd)
def getch():
try:
tty.setraw(sys.stdin.fileno())
ch = sys.stdin.read(1)
finally:
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
return ch
from scservo_sdk import * # Uses SCServo SDK library
# Default setting
SCS_ID = 1 # SCServo ID : 1
BAUDRATE = 1000000 # SCServo default baudrate : 1000000
DEVICENAME = "/dev/tty.usbserial-2130" # Check which port is being used on your controller
# ex) Windows: "COM1" Linux: "/dev/ttyUSB0" Mac: "/dev/tty.usbserial-*"
protocol_end = 1 # SCServo bit end(STS/SMS=0, SCS=1)
# Initialize PortHandler instance
# Set the port path
# Get methods and members of PortHandlerLinux or PortHandlerWindows
portHandler = PortHandler(DEVICENAME)
# Initialize PacketHandler instance
# Get methods and members of Protocol
packetHandler = PacketHandler(protocol_end)
# Open port
if portHandler.openPort():
print("Succeeded to open the port")
else:
print("Failed to open the port")
print("Press any key to terminate...")
getch()
quit()
# Set port baudrate
if portHandler.setBaudRate(BAUDRATE):
print("Succeeded to change the baudrate")
else:
print("Failed to change the baudrate")
print("Press any key to terminate...")
getch()
quit()
# Try to ping the SCServo
# Get SCServo model number
scs_model_number, scs_comm_result, scs_error = packetHandler.ping(portHandler, SCS_ID)
if scs_comm_result != COMM_SUCCESS:
print("%s" % packetHandler.getTxRxResult(scs_comm_result))
elif scs_error != 0:
print("%s" % packetHandler.getRxPacketError(scs_error))
else:
print("[ID:%03d] ping Succeeded. SCServo model number : %d" % (SCS_ID, scs_model_number))
ADDR_SCS_PRESENT_POSITION = 56
scs_present_position, scs_comm_result, scs_error = packetHandler.read2ByteTxRx(
portHandler, SCS_ID, ADDR_SCS_PRESENT_POSITION
)
if scs_comm_result != COMM_SUCCESS:
print(packetHandler.getTxRxResult(scs_comm_result))
elif scs_error != 0:
print(packetHandler.getRxPacketError(scs_error))
breakpoint()
scs_present_position = SCS_LOWORD(scs_present_position)
# scs_present_speed = SCS_HIWORD(scs_present_position_speed)
# print("[ID:%03d] PresPos:%03d PresSpd:%03d" % (SCS_ID, scs_present_position, SCS_TOHOST(scs_present_speed, 15)))
print("[ID:%03d] PresPos:%03d" % (SCS_ID, scs_present_position))
groupSyncRead = GroupSyncRead(portHandler, packetHandler, ADDR_SCS_PRESENT_POSITION, 2)
scs_addparam_result = groupSyncRead.addParam(SCS_ID)
if scs_addparam_result != True:
print("[ID:%03d] groupSyncRead addparam failed" % SCS_ID)
quit()
# Syncread present position
scs_comm_result = groupSyncRead.txRxPacket()
if scs_comm_result != COMM_SUCCESS:
print("%s" % packetHandler.getTxRxResult(scs_comm_result))
# Check if groupsyncread data of SCServo#1 is available
scs_getdata_result = groupSyncRead.isAvailable(SCS_ID, ADDR_SCS_PRESENT_POSITION, 2)
if scs_getdata_result == True:
# Get SCServo#1 present position value
scs_present_position = groupSyncRead.getData(SCS_ID, ADDR_SCS_PRESENT_POSITION, 2)
else:
scs_present_position = 0
print("[ID:%03d] groupSyncRead getdata failed" % SCS_ID)
# # Check if groupsyncread data of SCServo#2 is available
# scs_getdata_result = groupSyncRead.isAvailable(SCS2_ID, ADDR_SCS_PRESENT_POSITION, 2)
# if scs_getdata_result == True:
# # Get SCServo#2 present position value
# scs2_present_position_speed = groupSyncRead.getData(SCS2_ID, ADDR_SCS_PRESENT_POSITION, 2)
# else:
# print("[ID:%03d] groupSyncRead getdata failed" % SCS2_ID)
scs_present_position = SCS_LOWORD(scs_present_position)
print("[ID:%03d] PresPos:%03d" % (SCS_ID, scs_present_position))
# Close port
portHandler.closePort()

45
examples/test3.py Normal file
View File

@@ -0,0 +1,45 @@
import serial
class HomonculusGlove:
def __init__(self):
self.serial_port = "/dev/tty.usbmodem1101"
self.baud_rate = 115200
self.serial = serial.Serial(self.serial_port, self.baud_rate, timeout=1)
def read(self):
while True:
if self.serial.in_waiting > 0:
vals = self.serial.readline().decode("utf-8").strip()
vals = vals.split(" ")
vals = [int(val) for val in vals]
d = {
"thumb_0": vals[0],
"thumb_1": vals[1],
"thumb_2": vals[2],
"thumb_3": vals[3],
"index_0": vals[4],
"index_1": vals[5],
"index_2": vals[6],
"middle_0": vals[7],
"middle_1": vals[8],
"middle_2": vals[9],
"ring_0": vals[10],
"ring_1": vals[11],
"ring_2": vals[12],
"pinky_0": vals[13],
"pinky_1": vals[14],
"pinky_2": vals[15],
}
return d
# if ser.in_waiting > 0:
# line = ser.readline().decode('utf-8').strip()
# print(line)
if __name__ == "__main__":
glove = HomonculusGlove()
d = glove.read()
lol = 1

693
examples/test4.py Normal file
View File

@@ -0,0 +1,693 @@
import threading
import time
from typing import Callable
import cv2
import numpy as np
# from qai_hub_models.models.mediapipe_hand.app import MediaPipeHandApp
# from qai_hub_models.models.mediapipe_hand.model import (
# MediaPipeHand,
# )
# from qai_hub_models.utils.image_processing import (
# app_to_net_image_inputs,
# )
from lerobot.common.robot_devices.motors.feetech import (
CalibrationMode,
FeetechMotorsBus,
)
LOWER_BOUND_LINEAR = -100
UPPER_BOUND_LINEAR = 200
import serial
class HomonculusGlove:
def __init__(self):
self.serial_port = "/dev/tty.usbmodem1401"
self.baud_rate = 115200
self.serial = serial.Serial(self.serial_port, self.baud_rate, timeout=1)
self.thread = threading.Thread(target=self.async_read)
self.thread.start()
self.last_d = {
"thumb_0": 100,
"thumb_1": 100,
"thumb_2": 100,
"thumb_3": 100,
"index_0": 100,
"index_1": 100,
"index_2": 100,
"middle_0": 100,
"middle_1": 100,
"middle_2": 100,
"ring_0": 100,
"ring_1": 100,
"ring_2": 100,
"pinky_0": 100,
"pinky_1": 100,
"pinky_2": 100,
"battery_voltage": 100,
}
self.calibration = None
@property
def joint_names(self):
return list(self.last_d.keys())
def read(self, motor_names: list[str] | None = None):
if motor_names is None:
motor_names = self.joint_names
values = np.array([self.last_d[k] for k in motor_names])
print(motor_names)
print(values)
if self.calibration is not None:
values = self.apply_calibration(values, motor_names)
print(values)
return values
def async_read(self):
while True:
if self.serial.in_waiting > 0:
self.serial.flush()
vals = self.serial.readline().decode("utf-8").strip()
vals = vals.split(" ")
if len(vals) != 17:
continue
vals = [int(val) for val in vals]
d = {
"thumb_0": vals[0],
"thumb_1": vals[1],
"thumb_2": vals[2],
"thumb_3": vals[3],
"index_0": vals[4],
"index_1": vals[5],
"index_2": vals[6],
"middle_0": vals[7],
"middle_1": vals[8],
"middle_2": vals[9],
"ring_0": vals[10],
"ring_1": vals[11],
"ring_2": vals[12],
"pinky_0": vals[13],
"pinky_1": vals[14],
"pinky_2": vals[15],
"battery_voltage": vals[16],
}
self.last_d = d
# print(d.values())
def run_calibration(self):
print("\nMove arm to open position")
input("Press Enter to continue...")
open_pos_list = []
for _ in range(300):
open_pos = self.read()
open_pos_list.append(open_pos)
time.sleep(0.01)
open_pos = np.array(open_pos_list)
max_open_pos = open_pos.max(axis=0)
min_open_pos = open_pos.min(axis=0)
print(f"{max_open_pos=}")
print(f"{min_open_pos=}")
print("\nMove arm to closed position")
input("Press Enter to continue...")
closed_pos_list = []
for _ in range(300):
closed_pos = self.read()
closed_pos_list.append(closed_pos)
time.sleep(0.01)
closed_pos = np.array(closed_pos_list)
max_closed_pos = closed_pos.max(axis=0)
closed_pos[closed_pos < 1000] = 60000
min_closed_pos = closed_pos.min(axis=0)
print(f"{max_closed_pos=}")
print(f"{min_closed_pos=}")
open_pos = np.array([max_open_pos, max_closed_pos]).max(axis=0)
closed_pos = np.array([min_open_pos, min_closed_pos]).min(axis=0)
# INVERTION
# INVERTION
# INVERTION
# INVERTION
# INVERTION
# INVERTION
# INVERTION
for i, jname in enumerate(self.joint_names):
if jname in [
"thumb_0",
"thumb_3",
"index_2",
"middle_2",
"ring_2",
"pinky_0",
"pinky_2",
"index_0",
]:
tmp_pos = open_pos[i]
open_pos[i] = closed_pos[i]
closed_pos[i] = tmp_pos
print()
print(f"{open_pos=}")
print(f"{closed_pos=}")
homing_offset = [0] * len(self.joint_names)
drive_mode = [0] * len(self.joint_names)
calib_modes = [CalibrationMode.LINEAR.name] * len(self.joint_names)
calib_dict = {
"homing_offset": homing_offset,
"drive_mode": drive_mode,
"start_pos": open_pos,
"end_pos": closed_pos,
"calib_mode": calib_modes,
"motor_names": self.joint_names,
}
# return calib_dict
self.set_calibration(calib_dict)
def set_calibration(self, calibration: dict[str, list]):
self.calibration = calibration
def apply_calibration(self, values: np.ndarray | list, motor_names: list[str] | None):
"""Convert from unsigned int32 joint position range [0, 2**32[ to the universal float32 nominal degree range ]-180.0, 180.0[ with
a "zero position" at 0 degree.
Note: We say "nominal degree range" since the motors can take values outside this range. For instance, 190 degrees, if the motor
rotate more than a half a turn from the zero position. However, most motors can't rotate more than 180 degrees and will stay in this range.
Joints values are original in [0, 2**32[ (unsigned int32). Each motor are expected to complete a full rotation
when given a goal position that is + or - their resolution. For instance, feetech xl330-m077 have a resolution of 4096, and
at any position in their original range, let's say the position 56734, they complete a full rotation clockwise by moving to 60830,
or anticlockwise by moving to 52638. The position in the original range is arbitrary and might change a lot between each motor.
To harmonize between motors of the same model, different robots, or even models of different brands, we propose to work
in the centered nominal degree range ]-180, 180[.
"""
if motor_names is None:
motor_names = self.motor_names
# Convert from unsigned int32 original range [0, 2**32] to signed float32 range
values = values.astype(np.float32)
for i, name in enumerate(motor_names):
calib_idx = self.calibration["motor_names"].index(name)
calib_mode = self.calibration["calib_mode"][calib_idx]
if CalibrationMode[calib_mode] == CalibrationMode.LINEAR:
start_pos = self.calibration["start_pos"][calib_idx]
end_pos = self.calibration["end_pos"][calib_idx]
# Rescale the present position to a nominal range [0, 100] %,
# useful for joints with linear motions like Aloha gripper
values[i] = (values[i] - start_pos) / (end_pos - start_pos) * 100
if (values[i] < LOWER_BOUND_LINEAR) or (values[i] > UPPER_BOUND_LINEAR):
if name == "pinky_1" and (values[i] < LOWER_BOUND_LINEAR):
values[i] = end_pos
else:
msg = (
f"Wrong motor position range detected for {name}. "
f"Expected to be in nominal range of [0, 100] % (a full linear translation), "
f"with a maximum range of [{LOWER_BOUND_LINEAR}, {UPPER_BOUND_LINEAR}] % to account for some imprecision during calibration, "
f"but present value is {values[i]} %. "
"This might be due to a cable connection issue creating an artificial jump in motor values. "
"You need to recalibrate by running: `python lerobot/scripts/control_robot.py calibrate`"
)
print(msg)
# raise JointOutOfRangeError(msg)
return values
# def revert_calibration(self, values: np.ndarray | list, motor_names: list[str] | None):
# """Inverse of `apply_calibration`."""
# if motor_names is None:
# motor_names = self.motor_names
# for i, name in enumerate(motor_names):
# calib_idx = self.calibration["motor_names"].index(name)
# calib_mode = self.calibration["calib_mode"][calib_idx]
# if CalibrationMode[calib_mode] == CalibrationMode.LINEAR:
# start_pos = self.calibration["start_pos"][calib_idx]
# end_pos = self.calibration["end_pos"][calib_idx]
# # Convert from nominal lnear range of [0, 100] % to
# # actual motor range of values which can be arbitrary.
# values[i] = values[i] / 100 * (end_pos - start_pos) + start_pos
# values = np.round(values).astype(np.int32)
# return values
class HopeJuniorRobot:
def __init__(self):
self.arm_bus = FeetechMotorsBus(
port="/dev/tty.usbmodem58760429571",
motors={
# "motor1": (2, "sts3250"),
# "motor2": (1, "scs0009"),
"shoulder_pitch": [1, "sts3250"],
"shoulder_yaw": [2, "sts3215"], # TODO: sts3250
"shoulder_roll": [3, "sts3215"], # TODO: sts3250
"elbow_flex": [4, "sts3250"],
"wrist_roll": [5, "sts3215"],
"wrist_yaw": [6, "sts3215"],
"wrist_pitch": [7, "sts3215"],
},
protocol_version=0,
)
self.hand_bus = FeetechMotorsBus(
port="/dev/tty.usbmodem585A0077581",
motors={
"thumb_basel_rotation": [30, "scs0009"],
"thumb_flexor": [27, "scs0009"],
"thumb_pinky_side": [26, "scs0009"],
"thumb_thumb_side": [28, "scs0009"],
"index_flexor": [25, "scs0009"],
"index_pinky_side": [31, "scs0009"],
"index_thumb_side": [32, "scs0009"],
"middle_flexor": [24, "scs0009"],
"middle_pinky_side": [33, "scs0009"],
"middle_thumb_side": [34, "scs0009"],
"ring_flexor": [21, "scs0009"],
"ring_pinky_side": [36, "scs0009"],
"ring_thumb_side": [35, "scs0009"],
"pinky_flexor": [23, "scs0009"],
"pinky_pinky_side": [38, "scs0009"],
"pinky_thumb_side": [37, "scs0009"],
},
protocol_version=1,
group_sync_read=False,
)
def get_hand_calibration(self):
homing_offset = [0] * len(self.hand_bus.motor_names)
drive_mode = [0] * len(self.hand_bus.motor_names)
start_pos = [
500,
900,
1000,
0,
100,
250,
750,
100,
400,
150,
100,
120,
980,
100,
950,
750,
]
end_pos = [
500 - 250,
900 - 300,
1000 - 550,
0 + 550,
1000,
start_pos[5] + 500,
start_pos[6] - 500,
1000,
400 + 700,
150 + 700,
1000,
120 + 700,
980 - 700,
1000,
950 - 700,
750 - 700,
]
calib_modes = [CalibrationMode.LINEAR.name] * len(self.hand_bus.motor_names)
calib_dict = {
"homing_offset": homing_offset,
"drive_mode": drive_mode,
"start_pos": start_pos,
"end_pos": end_pos,
"calib_mode": calib_modes,
"motor_names": self.hand_bus.motor_names,
}
return calib_dict
def connect(self):
self.arm_bus.connect()
self.hand_bus.connect()
ESCAPE_KEY_ID = 27
def capture_and_display_processed_frames(
frame_processor: Callable[[np.ndarray], np.ndarray],
window_display_name: str,
cap_device: int = 0,
) -> None:
"""
Capture frames from the given input camera device, run them through
the frame processor, and display the outputs in a window with the given name.
User should press Esc to exit.
Inputs:
frame_processor: Callable[[np.ndarray], np.ndarray]
Processes frames.
Input and output are numpy arrays of shape (H W C) with BGR channel layout and dtype uint8 / byte.
window_display_name: str
Name of the window used to display frames.
cap_device: int
Identifier for the camera to use to capture frames.
"""
cv2.namedWindow(window_display_name)
capture = cv2.VideoCapture(cap_device)
if not capture.isOpened():
raise ValueError("Unable to open video capture.")
frame_count = 0
has_frame, frame = capture.read()
while has_frame:
assert isinstance(frame, np.ndarray)
frame_count = frame_count + 1
# mirror frame
frame = np.ascontiguousarray(frame[:, ::-1, ::-1])
# process & show frame
processed_frame = frame_processor(frame)
cv2.imshow(window_display_name, processed_frame[:, :, ::-1])
has_frame, frame = capture.read()
key = cv2.waitKey(1)
if key == ESCAPE_KEY_ID:
break
capture.release()
def main():
robot = HopeJuniorRobot()
robot.connect()
# robot.hand_bus.calibration = None
# breakpoint()
# print(robot.arm_bus.read("Present_Position"))
robot.arm_bus.write("Torque_Enable", 1)
robot.arm_bus.write("Acceleration", 20)
robot.arm_bus.read("Acceleration")
calibration = robot.get_hand_calibration()
robot.hand_bus.write("Goal_Position", calibration["start_pos"])
# robot.hand_bus.write("Goal_Position", calibration["end_pos"][:4], robot.hand_bus.motor_names[:4])
robot.hand_bus.set_calibration(calibration)
lol = 1
# # print(motors_bus.write("Goal_Position", 500))
# print(robot.hand_bus.read("Present_Position"))
# # pos = hand_bus.read("Present_Position")
# # hand_bus.write("Goal_Position", pos[0]+20, hand_bus.motor_names[0])
# # hand_bus.write("Goal_Position", pos[i]+delta, hand_bus.motor_names[i])
# robot.hand_bus.read("Acceleration")
# robot.hand_bus.write("Acceleration", 10)
# sleep = 1
# # robot.hand_bus.write(
# # "Goal_Position", [glove.last_d['index_2']-1500,300,300], ["index_pinky_side", "index_flexor", "index_thumb_side"]
# # )
# #time.sleep(sleep)
# time.sleep(sleep)
# robot.hand_bus.write(
# "Goal_Position", [100, 100, 100], ["index_flexor", "index_pinky_side", "index_thumb_side"]
# )
# time.sleep(sleep)
# robot.hand_bus.write(
# "Goal_Position", [100, 0, 0], ["middle_flexor", "middle_pinky_side", "middle_thumb_side"]
# )
# time.sleep(sleep)
# robot.hand_bus.write(
# "Goal_Position", [200, 200, 0], ["ring_flexor", "ring_pinky_side", "ring_thumb_side"]
# )
# time.sleep(sleep)
# robot.hand_bus.write(
# "Goal_Position", [200, 100, 600], ["pinky_flexor", "pinky_pinky_side", "pinky_thumb_side"]
# )
# time.sleep(sleep)
# breakpoint()
glove = HomonculusGlove()
glove.run_calibration()
# while True:
# joint_names = ["index_1", "index_2"]
# joint_values = glove.read(joint_names)
# print(joint_values)
input()
while True:
joint_names = []
# joint_names += ["thumb_0", "thumb_2", "thumb_3"]
joint_names += ["index_0", "index_1"]
# joint_names += ["middle_1", "middle_2"]
# joint_names += ["ring_1", "ring_2"]
# joint_names += ["pinky_0", "pinky_2"]
joint_values = glove.read(joint_names)
joint_values = joint_values.round().astype(int)
joint_dict = {k: v for k, v in zip(joint_names, joint_values, strict=False)}
motor_values = []
motor_names = []
# motor_names += ["thumb_basel_rotation", "thumb_flexor", "thumb_pinky_side", "thumb_thumb_side"]
# motor_values += [joint_dict["thumb_3"], joint_dict["thumb_0"], joint_dict["thumb_2"], joint_dict["thumb_2"]]
motor_names += ["index_pinky_side", "index_thumb_side"]
# if joint_dict["index_0"] -2100 > 0:
splayamount = 0.5
motor_values += [
(100 - joint_dict["index_0"]) * splayamount + joint_dict["index_1"] * (1 - splayamount),
(joint_dict["index_0"]) * splayamount + joint_dict["index_1"] * (1 - splayamount),
]
# else:
# motor_values += [100-joint_dict["index_0"], joint_dict["index_0"]]
# motor_names += ["middle_flexor", "middle_pinky_side", "middle_thumb_side"]
# motor_values += [joint_dict["middle_2"], joint_dict["middle_1"], joint_dict["middle_1"]]
# motor_names += ["ring_flexor", "ring_pinky_side", "ring_thumb_side"]
# motor_values += [joint_dict["ring_2"], joint_dict["ring_1"], joint_dict["ring_1"]]
# motor_names += ["pinky_flexor", "pinky_pinky_side", "pinky_thumb_side"]
# motor_values += [joint_dict["pinky_2"], joint_dict["pinky_0"], joint_dict["pinky_0"]]
motor_values = np.array(motor_values)
motor_values = np.clip(motor_values, 0, 100)
robot.hand_bus.write("Goal_Position", motor_values, motor_names)
time.sleep(0.02)
while True:
# print(glove.read()['index_2']-1500)
glove_index_flexor = glove.read()["index_2"] - 1500
glove_index_subflexor = glove.read()["index_1"] - 1500
glove_index_side = glove.read()["index_0"] - 2100
vals = [glove_index_flexor, 1000 - (glove_index_subflexor), glove_index_subflexor]
keys = ["index_flexor", "index_pinky_side", "index_thumb_side"]
glove_middle_flexor = glove.read()["middle_2"] - 1500
glove_middle_subflexor = 1000 - (glove.read()["middle_1"] - 1700)
vals += [glove_middle_flexor, glove_middle_subflexor, glove_middle_subflexor - 200]
keys += ["middle_flexor", "middle_pinky_side", "middle_thumb_side"]
glove_ring_flexor = glove.read()["ring_2"] - 1300
print(glove_ring_flexor)
glove_ring_subflexor = glove.read()["ring_1"] - 1100
vals += [glove_ring_flexor, 1000 - glove_ring_subflexor, glove_ring_subflexor]
keys += ["ring_flexor", "ring_pinky_side", "ring_thumb_side"]
glove_pinky_flexor = glove.read()["pinky_2"] - 1500
glove_pinky_subflexor = glove.read()["pinky_1"] - 1300
vals += [300 + glove_pinky_flexor, max(1000 - glove_pinky_subflexor - 100, 0), glove_pinky_subflexor]
keys += ["pinky_flexor", "pinky_pinky_side", "pinky_thumb_side"]
robot.hand_bus.write("Goal_Position", vals, keys)
time.sleep(0.1)
time.sleep(3)
def move_arm(loop=10):
sleep = 1
for i in range(loop):
robot.arm_bus.write("Goal_Position", [1981, 2030, 2069, 2032, 1874, 1957, 1695])
time.sleep(sleep)
robot.arm_bus.write("Goal_Position", [1981, 2030, 2069, 2032, 1874, 1957, 1195])
time.sleep(sleep)
robot.arm_bus.write("Goal_Position", [1981, 2030, 2069, 2032, 1874, 1957, 2195])
time.sleep(sleep)
robot.arm_bus.write("Goal_Position", [1981, 2030, 2069, 2032, 1874, 1957, 1695])
time.sleep(sleep)
robot.arm_bus.write("Goal_Position", [1981, 2030, 2069, 2032, 1874, 1457, 1695])
time.sleep(sleep)
robot.arm_bus.write("Goal_Position", [1981, 2030, 2069, 2032, 1874, 2357, 1695])
time.sleep(sleep)
robot.arm_bus.write("Goal_Position", [1981, 2030, 2069, 2032, 1874, 1957, 1695])
time.sleep(sleep)
robot.arm_bus.write("Goal_Position", [1981, 2030, 2069, 2032, 974, 1957, 1695])
time.sleep(sleep)
robot.arm_bus.write("Goal_Position", [1981, 2030, 2069, 2032, 2674, 1957, 1695])
time.sleep(sleep + 2)
robot.arm_bus.write("Goal_Position", [1981, 2030, 2069, 2032, 1874, 1957, 1695])
time.sleep(sleep)
robot.arm_bus.write("Goal_Position", [1981, 2030, 2069, 1632, 1874, 1957, 1695])
time.sleep(sleep)
robot.arm_bus.write("Goal_Position", [1981, 2030, 1369, 1632, 1874, 1957, 1695])
time.sleep(sleep)
robot.arm_bus.write("Goal_Position", [1981, 2030, 2069, 2032, 1874, 1957, 1695])
time.sleep(sleep)
robot.arm_bus.write("Goal_Position", [1981, 1330, 2069, 2032, 1874, 1957, 1695])
time.sleep(sleep)
robot.arm_bus.write("Goal_Position", [1981, 2030, 2069, 2032, 1874, 1957, 1695])
time.sleep(sleep)
robot.arm_bus.write("Goal_Position", [2381, 2030, 2069, 2032, 1874, 1957, 1695])
time.sleep(sleep)
robot.arm_bus.write("Goal_Position", [1681, 2030, 2069, 2032, 1874, 1957, 1695])
time.sleep(sleep)
robot.arm_bus.write("Goal_Position", [1981, 2030, 2069, 2032, 1874, 1957, 1695])
time.sleep(sleep)
def move_hand(loop=10):
sleep = 0.5
for i in range(loop):
robot.hand_bus.write(
"Goal_Position",
[500, 1000, 0, 1000],
["thumb_basel_rotation", "thumb_flexor", "thumb_pinky_side", "thumb_thumb_side"],
)
time.sleep(sleep)
robot.hand_bus.write(
"Goal_Position", [100, 100, 100], ["index_flexor", "index_pinky_side", "index_thumb_side"]
)
time.sleep(sleep)
robot.hand_bus.write(
"Goal_Position", [100, 1000, 150], ["middle_flexor", "middle_pinky_side", "middle_thumb_side"]
)
time.sleep(sleep)
robot.hand_bus.write(
"Goal_Position", [200, 200, 0], ["ring_flexor", "ring_pinky_side", "ring_thumb_side"]
)
time.sleep(sleep)
robot.hand_bus.write(
"Goal_Position", [200, 100, 700], ["pinky_flexor", "pinky_pinky_side", "pinky_thumb_side"]
)
time.sleep(sleep)
robot.hand_bus.write(
"Goal_Position",
[500, 1000 - 250, 0 + 300, 1000 - 200],
["thumb_basel_rotation", "thumb_flexor", "thumb_pinky_side", "thumb_thumb_side"],
)
time.sleep(sleep)
robot.hand_bus.write(
"Goal_Position",
[100 + 450, 100 + 400, 100 + 400],
["index_flexor", "index_pinky_side", "index_thumb_side"],
)
time.sleep(sleep)
robot.hand_bus.write(
"Goal_Position",
[100 + 350, 1000 - 450, 150 + 450],
["middle_flexor", "middle_pinky_side", "middle_thumb_side"],
)
time.sleep(sleep)
robot.hand_bus.write(
"Goal_Position",
[200 + 650, 200 + 350, 0 + 350],
["ring_flexor", "ring_pinky_side", "ring_thumb_side"],
)
time.sleep(sleep)
robot.hand_bus.write(
"Goal_Position",
[200 + 450, 100 + 400, 700 - 400],
["pinky_flexor", "pinky_pinky_side", "pinky_thumb_side"],
)
time.sleep(sleep)
move_hand(3)
move_arm(1)
from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor() as executor:
executor.submit(move_arm)
executor.submit(move_hand)
# initial position
for i in range(3):
robot.hand_bus.write(
"Goal_Position", [500, 1000, 0, 1000, 100, 950, 100, 100, 1000, 150, 200, 200, 0, 200, 100, 700]
)
time.sleep(1)
# for i in range(3):
# robot.hand_bus.write("Goal_Position", [500, 1000-150, 0+250, 1000-150,
# 100+300, 950-250, 100+250,
# 100+200, 1000-300, 150+300,
# 200+500, 200+200, 0+200,
# 200+300, 100+200, 700-200])
# time.sleep(1)
# camera = 0
# score_threshold = 0.95
# iou_threshold = 0.3
# app = MediaPipeHandApp(MediaPipeHand.from_pretrained(), score_threshold, iou_threshold)
# def frame_processor(frame: np.ndarray) -> np.ndarray:
# # Input Prep
# NHWC_int_numpy_frames, NCHW_fp32_torch_frames = app_to_net_image_inputs(frame)
# # Run Bounding Box & Keypoint Detector
# batched_selected_boxes, batched_selected_keypoints = app._run_box_detector(NCHW_fp32_torch_frames)
# # The region of interest ( bounding box of 4 (x, y) corners).
# # list[torch.Tensor(shape=[Num Boxes, 4, 2])],
# # where 2 == (x, y)
# #
# # A list element will be None if there is no selected ROI.
# batched_roi_4corners = app._compute_object_roi(batched_selected_boxes, batched_selected_keypoints)
# # selected landmarks for the ROI (if any)
# # list[torch.Tensor(shape=[Num Selected Landmarks, K, 3])],
# # where K == number of landmark keypoints, 3 == (x, y, confidence)
# #
# # A list element will be None if there is no ROI.
# landmarks_out = app._run_landmark_detector(NHWC_int_numpy_frames, batched_roi_4corners)
# app._draw_predictions(
# NHWC_int_numpy_frames,
# batched_selected_boxes,
# batched_selected_keypoints,
# batched_roi_4corners,
# *landmarks_out,
# )
# return NHWC_int_numpy_frames[0]
# capture_and_display_processed_frames(frame_processor, "QAIHM Mediapipe Hand Demo", camera)
if __name__ == "__main__":
main()

View File

@@ -37,7 +37,7 @@ HALF_TURN_DEGREE = 180
# See this link for STS3215 Memory Table:
# https://docs.google.com/spreadsheets/d/1GVs7W1VS1PqdhA1nW-abeyAHhTUxKUdR/edit?usp=sharing&ouid=116566590112741600240&rtpof=true&sd=true
# data_name: (address, size_byte)
SCS_SERIES_CONTROL_TABLE = {
STS_SERIES_CONTROL_TABLE = {
"Model": (3, 2),
"ID": (5, 1),
"Baud_Rate": (6, 1),
@@ -87,6 +87,70 @@ SCS_SERIES_CONTROL_TABLE = {
"Maximum_Acceleration": (85, 2),
}
SCS_SERIES_CONTROL_TABLE = {
"Model": (3, 2),
"ID": (5, 1),
"Baud_Rate": (6, 1),
"Return_Delay": (7, 1),
"Response_Status_Level": (8, 1),
"Min_Angle_Limit": (9, 2),
"Max_Angle_Limit": (11, 2),
"Max_Temperature_Limit": (13, 1),
"Max_Voltage_Limit": (14, 1),
"Min_Voltage_Limit": (15, 1),
"Max_Torque_Limit": (16, 2),
"Phase": (18, 1),
"Unloading_Condition": (19, 1),
"LED_Alarm_Condition": (20, 1),
"P_Coefficient": (21, 1),
"D_Coefficient": (22, 1),
"I_Coefficient": (23, 1),
"Minimum_Startup_Force": (24, 2),
"CW_Dead_Zone": (26, 1),
"CCW_Dead_Zone": (27, 1),
# "Protection_Current": (28, 2),
# "Angular_Resolution": (30, 1),
# "Offset": (31, 2),
# "Mode": (33, 1),
"Protective_Torque": (37, 1),
"Protection_Time": (38, 1),
"Torque_Enable": (40, 1),
"Acceleration": (41, 1),
"Goal_Position": (42, 2),
"Running_Time": (44, 2),
"Goal_Speed": (46, 2),
"Lock": (48, 1),
"Present_Position": (56, 2),
"Present_Speed": (58, 2),
"Present_Load": (60, 2),
"Present_Voltage": (62, 1),
"Present_Temperature": (63, 1),
"Sync_Write_Flag": (64, 1),
"Status": (65, 1),
"Moving": (66, 1),
# "Overload_Torque": (36, 1),
# "Speed_closed_loop_P_proportional_coefficient": (37, 1),
# "Over_Current_Protection_Time": (38, 1),
# "Velocity_closed_loop_I_integral_coefficient": (39, 1),
# "Acceleration": (41, 1),
# "Goal_Time": (44, 2),
# "Torque_Limit": (48, 2),
# "Present_Current": (69, 2),
# # Not in the Memory Table
# "Maximum_Acceleration": (85, 2),
}
STS_SERIES_BAUDRATE_TABLE = {
0: 1_000_000,
1: 500_000,
2: 250_000,
3: 128_000,
4: 115_200,
5: 57_600,
6: 38_400,
7: 19_200,
}
SCS_SERIES_BAUDRATE_TABLE = {
0: 1_000_000,
1: 500_000,
@@ -103,22 +167,31 @@ CONVERT_UINT32_TO_INT32_REQUIRED = ["Goal_Position", "Present_Position"]
MODEL_CONTROL_TABLE = {
"sts_series": STS_SERIES_CONTROL_TABLE,
"scs_series": SCS_SERIES_CONTROL_TABLE,
"sts3215": SCS_SERIES_CONTROL_TABLE,
"sts3215": STS_SERIES_CONTROL_TABLE,
"sts3250": STS_SERIES_CONTROL_TABLE,
"scs0009": SCS_SERIES_CONTROL_TABLE,
}
MODEL_RESOLUTION = {
"scs_series": 4096,
"sts_series": 4096,
"scs_series": 1024,
"sts3215": 4096,
"sts3250": 4096,
"scs0009": 1024,
}
MODEL_BAUDRATE_TABLE = {
"sts_series": STS_SERIES_BAUDRATE_TABLE,
"scs_series": SCS_SERIES_BAUDRATE_TABLE,
"sts3215": SCS_SERIES_BAUDRATE_TABLE,
"sts3215": STS_SERIES_BAUDRATE_TABLE,
"sts3250": STS_SERIES_BAUDRATE_TABLE,
"scs0009": SCS_SERIES_BAUDRATE_TABLE,
}
# High number of retries is needed for feetech compared to dynamixel motors.
NUM_READ_RETRY = 20
NUM_READ_RETRY = 50
NUM_WRITE_RETRY = 20
@@ -273,12 +346,18 @@ class FeetechMotorsBus:
self,
port: str,
motors: dict[str, tuple[int, str]],
group_sync_read: bool = True,
group_sync_write: bool = True,
protocol_version: int = 0,
extra_model_control_table: dict[str, list[tuple]] | None = None,
extra_model_resolution: dict[str, int] | None = None,
mock=False,
):
self.port = port
self.motors = motors
self.group_sync_read = group_sync_read
self.group_sync_write = group_sync_write
self.protocol_version = protocol_version
self.mock = mock
self.model_ctrl_table = deepcopy(MODEL_CONTROL_TABLE)
@@ -311,7 +390,7 @@ class FeetechMotorsBus:
import scservo_sdk as scs
self.port_handler = scs.PortHandler(self.port)
self.packet_handler = scs.PacketHandler(PROTOCOL_VERSION)
self.packet_handler = scs.PacketHandler(self.protocol_version)
try:
if not self.port_handler.openPort():
@@ -335,7 +414,7 @@ class FeetechMotorsBus:
import scservo_sdk as scs
self.port_handler = scs.PortHandler(self.port)
self.packet_handler = scs.PacketHandler(PROTOCOL_VERSION)
self.packet_handler = scs.PacketHandler(self.protocol_version)
if not self.port_handler.openPort():
raise OSError(f"Failed to open port '{self.port}'.")
@@ -661,6 +740,8 @@ class FeetechMotorsBus:
else:
import scservo_sdk as scs
scs.SCS_SETEND(self.protocol_version)
return_list = True
if not isinstance(motor_ids, list):
return_list = False
@@ -699,6 +780,8 @@ class FeetechMotorsBus:
else:
import scservo_sdk as scs
scs.SCS_SETEND(self.protocol_version)
if not self.is_connected:
raise RobotDeviceNotConnectedError(
f"FeetechMotorsBus({self.port}) is not connected. You need to run `motors_bus.connect()`."
@@ -721,31 +804,51 @@ class FeetechMotorsBus:
assert_same_address(self.model_ctrl_table, models, data_name)
addr, bytes = self.model_ctrl_table[model][data_name]
group_key = get_group_sync_key(data_name, motor_names)
if data_name not in self.group_readers:
# create new group reader
self.group_readers[group_key] = scs.GroupSyncRead(
self.port_handler, self.packet_handler, addr, bytes
)
if self.group_sync_read:
group_key = get_group_sync_key(data_name, motor_names)
if data_name not in self.group_readers:
# create new group reader
self.group_readers[group_key] = scs.GroupSyncRead(
self.port_handler, self.packet_handler, addr, bytes
)
for idx in motor_ids:
self.group_readers[group_key].addParam(idx)
for _ in range(NUM_READ_RETRY):
comm = self.group_readers[group_key].txRxPacket()
if comm == scs.COMM_SUCCESS:
break
if comm != scs.COMM_SUCCESS:
raise ConnectionError(
f"Read failed due to communication error on port {self.port} for group_key {group_key}: "
f"{self.packet_handler.getTxRxResult(comm)}"
)
values = []
for idx in motor_ids:
self.group_readers[group_key].addParam(idx)
value = self.group_readers[group_key].getData(idx, addr, bytes)
values.append(value)
else:
values = []
for idx in motor_ids:
if bytes == 1:
value, comm, error = self.packet_handler.read1ByteTxRx(self.port_handler, idx, addr)
elif bytes == 2:
value, comm, error = self.packet_handler.read2ByteTxRx(self.port_handler, idx, addr)
elif bytes == 4:
value, comm, error = self.packet_handler.read4ByteTxRx(self.port_handler, idx, addr)
else:
raise ValueError(bytes)
for _ in range(NUM_READ_RETRY):
comm = self.group_readers[group_key].txRxPacket()
if comm == scs.COMM_SUCCESS:
break
if comm != scs.COMM_SUCCESS:
raise ConnectionError(self.packet_handler.getTxRxResult(comm))
elif error != 0:
raise ConnectionError(self.packet_handler.getRxPacketError(error))
if comm != scs.COMM_SUCCESS:
raise ConnectionError(
f"Read failed due to communication error on port {self.port} for group_key {group_key}: "
f"{self.packet_handler.getTxRxResult(comm)}"
)
values = []
for idx in motor_ids:
value = self.group_readers[group_key].getData(idx, addr, bytes)
values.append(value)
values.append(value)
values = np.array(values)
@@ -775,6 +878,8 @@ class FeetechMotorsBus:
else:
import scservo_sdk as scs
scs.SCS_SETEND(self.protocol_version)
if not isinstance(motor_ids, list):
motor_ids = [motor_ids]
if not isinstance(values, list):
@@ -811,6 +916,8 @@ class FeetechMotorsBus:
else:
import scservo_sdk as scs
scs.SCS_SETEND(self.protocol_version)
if motor_names is None:
motor_names = self.motor_names
@@ -836,27 +943,31 @@ class FeetechMotorsBus:
assert_same_address(self.model_ctrl_table, models, data_name)
addr, bytes = self.model_ctrl_table[model][data_name]
group_key = get_group_sync_key(data_name, motor_names)
init_group = data_name not in self.group_readers
if init_group:
self.group_writers[group_key] = scs.GroupSyncWrite(
self.port_handler, self.packet_handler, addr, bytes
)
if self.group_sync_write:
group_key = get_group_sync_key(data_name, motor_names)
for idx, value in zip(motor_ids, values, strict=True):
data = convert_to_bytes(value, bytes, self.mock)
init_group = data_name not in self.group_readers
if init_group:
self.group_writers[group_key].addParam(idx, data)
else:
self.group_writers[group_key].changeParam(idx, data)
self.group_writers[group_key] = scs.GroupSyncWrite(
self.port_handler, self.packet_handler, addr, bytes
)
comm = self.group_writers[group_key].txPacket()
if comm != scs.COMM_SUCCESS:
raise ConnectionError(
f"Write failed due to communication error on port {self.port} for group_key {group_key}: "
f"{self.packet_handler.getTxRxResult(comm)}"
)
for idx, value in zip(motor_ids, values, strict=True):
data = convert_to_bytes(value, bytes, self.mock)
if init_group:
self.group_writers[group_key].addParam(idx, data)
else:
self.group_writers[group_key].changeParam(idx, data)
comm = self.group_writers[group_key].txPacket()
if comm != scs.COMM_SUCCESS:
raise ConnectionError(
f"Write failed due to communication error on port {self.port} for group_key {group_key}: "
f"{self.packet_handler.getTxRxResult(comm)}"
)
else:
raise NotImplementedError()
# log the number of seconds it took to write the data to the motors
delta_ts_name = get_log_name("delta_timestamp_s", "write", data_name, motor_names)

View File

@@ -306,16 +306,16 @@ def run_arm_auto_calibration_moss(arm: MotorsBus, robot_type: str, arm_name: str
calib = {}
print("Calibrate shoulder_pan")
calib["shoulder_pan"] = move_to_calibrate(arm, "shoulder_pan", load_threshold=350, count_threshold=200)
calib["shoulder_pan"] = move_to_calibrate(arm, "shoulder_pan")
arm.write("Goal_Position", calib["shoulder_pan"]["zero_pos"], "shoulder_pan")
time.sleep(1)
print("Calibrate gripper")
calib["gripper"] = move_to_calibrate(arm, "gripper", invert_drive_mode=True, count_threshold=200)
calib["gripper"] = move_to_calibrate(arm, "gripper", invert_drive_mode=True)
time.sleep(1)
print("Calibrate wrist_flex")
calib["wrist_flex"] = move_to_calibrate(arm, "wrist_flex", invert_drive_mode=True, count_threshold=200)
calib["wrist_flex"] = move_to_calibrate(arm, "wrist_flex", invert_drive_mode=True)
calib["wrist_flex"] = apply_offset(calib["wrist_flex"], offset=-210 + 1024)
wr_pos = arm.read("Present_Position", "wrist_roll")
@@ -329,7 +329,7 @@ def run_arm_auto_calibration_moss(arm: MotorsBus, robot_type: str, arm_name: str
time.sleep(1)
print("Calibrate wrist_roll")
calib["wrist_roll"] = move_to_calibrate(arm, "wrist_roll", invert_drive_mode=True, count_threshold=200)
calib["wrist_roll"] = move_to_calibrate(arm, "wrist_roll", invert_drive_mode=True)
calib["wrist_roll"] = apply_offset(calib["wrist_roll"], offset=790)
arm.write("Goal_Position", calib["wrist_roll"]["zero_pos"] - 1024, "wrist_roll")
@@ -348,7 +348,6 @@ def run_arm_auto_calibration_moss(arm: MotorsBus, robot_type: str, arm_name: str
arm,
"elbow_flex",
invert_drive_mode=True,
count_threshold=200,
in_between_move_hook=in_between_move_elbow_flex_hook,
)
arm.write("Goal_Position", calib["wrist_flex"]["zero_pos"] - 1024, "wrist_flex")

View File

@@ -0,0 +1,73 @@
# [SO-100 robot arm](https://github.com/TheRobotStudio/SO-ARM100)
# Requires installing extras packages
# With pip: `pip install -e ".[feetech]"`
# With poetry: `poetry install --sync --extras "feetech"`
# See [tutorial](https://github.com/huggingface/lerobot/blob/main/examples/10_use_so100.md)
_target_: lerobot.common.robot_devices.robots.manipulator.ManipulatorRobot
robot_type: hopejr
calibration_dir: .cache/calibration/hopejr
# `max_relative_target` limits the magnitude of the relative positional target vector for safety purposes.
# Set this to a positive scalar to have the same value for all motors, or a list that is the same length as
# the number of motors in your follower arms.
max_relative_target: null
# leader_arms:
# main:
# _target_: lerobot.common.robot_devices.motors.feetech.FeetechMotorsBus
# port: /dev/tty.usbmodem585A0077581
# motors:
# # name: (index, model)
# shoulder_pan: [1, "sts3215"]
# shoulder_lift: [2, "sts3215"]
# elbow_flex: [3, "sts3215"]
# wrist_flex: [4, "sts3215"]
# wrist_roll: [5, "sts3215"]
# gripper: [6, "sts3215"]
follower_arms:
main:
_target_: lerobot.common.robot_devices.motors.feetech.FeetechMotorsBus
port: /dev/tty.usbserial-2130
motors:
# name: (index, model)
shoulder_pitch: [1, "sts3250"]
shoulder_yaw: [2, "sts3215"] # TODO: sts3250
shoulder_roll: [3, "sts3215"] # TODO: sts3250
elbow_flex: [4, "sts3250"]
wrist_roll: [5, "sts3215"]
wrist_yaw: [6, "sts3215"]
wrist_pitch: [7, "sts3215"]
thumb_basel_rotation: [30, "scs0009"]
thumb_flexion: [27, "scs0009"]
thumb_pinky_side: [26, "scs0009"]
thumb_thumb_side: [28, "scs0009"]
index_flexor: [25, "scs0009"]
index_pinky_side: [31, "scs0009"]
index_thumb_side: [32, "scs0009"]
middle_flexor: [24, "scs0009"]
middle_pinky_side: [33, "scs0009"]
middle_thumb_side: [34, "scs0009"]
ring_flexor: [21, "scs0009"]
ring_pinky_side: [36, "scs0009"]
ring_thumb_side: [35, "scs0009"]
pinky_flexor: [23, "scs0009"]
pinky_pinky_side: [38, "scs0009"]
pinky_thumb_side: [37, "scs0009"]
cameras:
laptop:
_target_: lerobot.common.robot_devices.cameras.opencv.OpenCVCamera
camera_index: 0
fps: 30
width: 640
height: 480
phone:
_target_: lerobot.common.robot_devices.cameras.opencv.OpenCVCamera
camera_index: 1
fps: 30
width: 640
height: 480

View File

@@ -18,6 +18,10 @@ def convert_to_bytes(value, bytes):
return value
def SCS_SETEND(protocol_version):
del protocol_version
def get_default_motor_values(motor_index):
return {
# Key (int) are from SCS_SERIES_CONTROL_TABLE