[pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci
This commit is contained in:
pre-commit-ci[bot]
2025-03-24 13:16:38 +00:00
committed by Michel Aractingi
parent cdcf346061
commit 1c8daf11fd
95 changed files with 1592 additions and 491 deletions

View File

@@ -69,9 +69,13 @@ class ManipulatorRobotConfig(RobotConfig):
if not cam.mock:
cam.mock = True
if self.max_relative_target is not None and isinstance(self.max_relative_target, Sequence):
if self.max_relative_target is not None and isinstance(
self.max_relative_target, Sequence
):
for name in self.follower_arms:
if len(self.follower_arms[name].motors) != len(self.max_relative_target):
if len(self.follower_arms[name].motors) != len(
self.max_relative_target
):
raise ValueError(
f"len(max_relative_target)={len(self.max_relative_target)} but the follower arm with name {name} has "
f"{len(self.follower_arms[name].motors)} motors. Please make sure that the "

View File

@@ -42,7 +42,9 @@ def run_camera_capture(cameras, images_lock, latest_images_dict, stop_event):
local_dict = {}
for name, cam in cameras.items():
frame = cam.async_read()
ret, buffer = cv2.imencode(".jpg", frame, [int(cv2.IMWRITE_JPEG_QUALITY), 90])
ret, buffer = cv2.imencode(
".jpg", frame, [int(cv2.IMWRITE_JPEG_QUALITY), 90]
)
if ret:
local_dict[name] = base64.b64encode(buffer).decode("utf-8")
else:
@@ -61,7 +63,9 @@ def calibrate_follower_arm(motors_bus, calib_dir_str):
calib_dir.mkdir(parents=True, exist_ok=True)
calib_file = calib_dir / "main_follower.json"
try:
from lerobot.common.robot_devices.robots.feetech_calibration import run_arm_manual_calibration
from lerobot.common.robot_devices.robots.feetech_calibration import (
run_arm_manual_calibration,
)
except ImportError:
print("[WARNING] Calibration function not available. Skipping calibration.")
return
@@ -72,7 +76,9 @@ def calibrate_follower_arm(motors_bus, calib_dir_str):
print(f"[INFO] Loaded calibration from {calib_file}")
else:
print("[INFO] Calibration file not found. Running manual calibration...")
calibration = run_arm_manual_calibration(motors_bus, "lekiwi", "follower_arm", "follower")
calibration = run_arm_manual_calibration(
motors_bus, "lekiwi", "follower_arm", "follower"
)
print(f"[INFO] Calibration complete. Saving to {calib_file}")
with open(calib_file, "w") as f:
json.dump(calibration, f)
@@ -116,7 +122,14 @@ def run_lekiwi(robot_config):
robot = LeKiwi(motors_bus)
# Define the expected arm motor IDs.
arm_motor_ids = ["shoulder_pan", "shoulder_lift", "elbow_flex", "wrist_flex", "wrist_roll", "gripper"]
arm_motor_ids = [
"shoulder_pan",
"shoulder_lift",
"elbow_flex",
"wrist_flex",
"wrist_roll",
"gripper",
]
# Disable torque for each arm motor.
for motor in arm_motor_ids:
@@ -130,7 +143,9 @@ def run_lekiwi(robot_config):
images_lock = threading.Lock()
stop_event = threading.Event()
cam_thread = threading.Thread(
target=run_camera_capture, args=(cameras, images_lock, latest_images_dict, stop_event), daemon=True
target=run_camera_capture,
args=(cameras, images_lock, latest_images_dict, stop_event),
daemon=True,
)
cam_thread.start()
@@ -159,7 +174,9 @@ def run_lekiwi(robot_config):
f"[WARNING] Received {len(arm_positions)} arm positions, expected {len(arm_motor_ids)}"
)
else:
for motor, pos in zip(arm_motor_ids, arm_positions, strict=False):
for motor, pos in zip(
arm_motor_ids, arm_positions, strict=False
):
motors_bus.write("Goal_Position", pos, motor)
# Process wheel (base) commands.
if "raw_velocity" in data:
@@ -190,7 +207,9 @@ def run_lekiwi(robot_config):
try:
pos = motors_bus.read("Present_Position", motor)
# Convert the position to a float (or use as is if already numeric).
follower_arm_state.append(float(pos) if not isinstance(pos, (int, float)) else pos)
follower_arm_state.append(
float(pos) if not isinstance(pos, (int, float)) else pos
)
except Exception as e:
print(f"[ERROR] Reading motor {motor} failed: {e}")

View File

@@ -28,7 +28,10 @@ import numpy as np
import torch
from lerobot.common.robot_devices.cameras.utils import make_cameras_from_configs
from lerobot.common.robot_devices.motors.utils import MotorsBus, make_motors_buses_from_configs
from lerobot.common.robot_devices.motors.utils import (
MotorsBus,
make_motors_buses_from_configs,
)
from lerobot.common.robot_devices.robots.configs import ManipulatorRobotConfig
from lerobot.common.robot_devices.robots.utils import get_arm_id
from lerobot.common.robot_devices.utils import (

View File

@@ -25,9 +25,14 @@ import zmq
from lerobot.common.robot_devices.cameras.utils import make_cameras_from_configs
from lerobot.common.robot_devices.motors.feetech import TorqueMode
from lerobot.common.robot_devices.motors.utils import MotorsBus, make_motors_buses_from_configs
from lerobot.common.robot_devices.motors.utils import (
MotorsBus,
make_motors_buses_from_configs,
)
from lerobot.common.robot_devices.robots.configs import LeKiwiRobotConfig
from lerobot.common.robot_devices.robots.feetech_calibration import run_arm_manual_calibration
from lerobot.common.robot_devices.robots.feetech_calibration import (
run_arm_manual_calibration,
)
from lerobot.common.robot_devices.robots.utils import get_arm_id
from lerobot.common.robot_devices.utils import RobotDeviceNotConnectedError
@@ -266,7 +271,9 @@ class MobileManipulator:
calibration = json.load(f)
else:
print(f"Missing calibration file '{arm_calib_path}'")
calibration = run_arm_manual_calibration(arm, self.robot_type, name, arm_type)
calibration = run_arm_manual_calibration(
arm, self.robot_type, name, arm_type
)
print(f"Calibration is done! Saving calibration file '{arm_calib_path}'")
arm_calib_path.parent.mkdir(parents=True, exist_ok=True)
with open(arm_calib_path, "w") as f:
@@ -296,7 +303,9 @@ class MobileManipulator:
bus.write("Torque_Enable", 0, motor_id)
# Then filter out wheels
arm_only_dict = {k: v for k, v in bus.motors.items() if not k.startswith("wheel_")}
arm_only_dict = {
k: v for k, v in bus.motors.items() if not k.startswith("wheel_")
}
if not arm_only_dict:
continue
@@ -324,7 +333,11 @@ class MobileManipulator:
socks = dict(poller.poll(15))
if self.video_socket not in socks or socks[self.video_socket] != zmq.POLLIN:
# No new data arrived → reuse ALL old data
return (self.last_frames, self.last_present_speed, self.last_remote_arm_state)
return (
self.last_frames,
self.last_present_speed,
self.last_remote_arm_state,
)
# Drain all messages, keep only the last
last_msg = None
@@ -337,7 +350,11 @@ class MobileManipulator:
if not last_msg:
# No new message → also reuse old
return (self.last_frames, self.last_present_speed, self.last_remote_arm_state)
return (
self.last_frames,
self.last_present_speed,
self.last_remote_arm_state,
)
# Decode only the final message
try:
@@ -360,7 +377,9 @@ class MobileManipulator:
if new_arm_state is not None and frames is not None:
self.last_frames = frames
remote_arm_state_tensor = torch.tensor(new_arm_state, dtype=torch.float32)
remote_arm_state_tensor = torch.tensor(
new_arm_state, dtype=torch.float32
)
self.last_remote_arm_state = remote_arm_state_tensor
present_speed = new_speed
@@ -375,14 +394,21 @@ class MobileManipulator:
except Exception as e:
print(f"[DEBUG] Error decoding video message: {e}")
# If decode fails, fall back to old data
return (self.last_frames, self.last_present_speed, self.last_remote_arm_state)
return (
self.last_frames,
self.last_present_speed,
self.last_remote_arm_state,
)
return frames, present_speed, remote_arm_state_tensor
def _process_present_speed(self, present_speed: dict) -> torch.Tensor:
state_tensor = torch.zeros(3, dtype=torch.int32)
if present_speed:
decoded = {key: MobileManipulator.raw_to_degps(value) for key, value in present_speed.items()}
decoded = {
key: MobileManipulator.raw_to_degps(value)
for key, value in present_speed.items()
}
if "1" in decoded:
state_tensor[0] = decoded["1"]
if "2" in decoded:
@@ -395,7 +421,9 @@ class MobileManipulator:
self, record_data: bool = False
) -> None | tuple[dict[str, torch.Tensor], dict[str, torch.Tensor]]:
if not self.is_connected:
raise RobotDeviceNotConnectedError("MobileManipulator is not connected. Run `connect()` first.")
raise RobotDeviceNotConnectedError(
"MobileManipulator is not connected. Run `connect()` first."
)
speed_setting = self.speed_levels[self.speed_index]
xy_speed = speed_setting["xy"] # e.g. 0.1, 0.25, or 0.4
@@ -461,9 +489,15 @@ class MobileManipulator:
body_state = self.wheel_raw_to_body(present_speed)
body_state_mm = (body_state[0] * 1000.0, body_state[1] * 1000.0, body_state[2]) # Convert x,y to mm/s
body_state_mm = (
body_state[0] * 1000.0,
body_state[1] * 1000.0,
body_state[2],
) # Convert x,y to mm/s
wheel_state_tensor = torch.tensor(body_state_mm, dtype=torch.float32)
combined_state_tensor = torch.cat((remote_arm_state_tensor, wheel_state_tensor), dim=0)
combined_state_tensor = torch.cat(
(remote_arm_state_tensor, wheel_state_tensor), dim=0
)
obs_dict = {"observation.state": combined_state_tensor}
@@ -620,7 +654,11 @@ class MobileManipulator:
# Convert each wheels angular speed (deg/s) to a raw integer.
wheel_raw = [MobileManipulator.degps_to_raw(deg) for deg in wheel_degps]
return {"left_wheel": wheel_raw[0], "back_wheel": wheel_raw[1], "right_wheel": wheel_raw[2]}
return {
"left_wheel": wheel_raw[0],
"back_wheel": wheel_raw[1],
"right_wheel": wheel_raw[2],
}
def wheel_raw_to_body(
self, wheel_raw: dict, wheel_radius: float = 0.05, base_radius: float = 0.125

View File

@@ -72,7 +72,9 @@ def make_robot_from_config(config: RobotConfig):
return ManipulatorRobot(config)
elif isinstance(config, LeKiwiRobotConfig):
from lerobot.common.robot_devices.robots.mobile_manipulator import MobileManipulator
from lerobot.common.robot_devices.robots.mobile_manipulator import (
MobileManipulator,
)
return MobileManipulator(config)
else: