fix(lerobot/common/robot_devices): remove lint warnings/errors

This commit is contained in:
Steven Palma
2025-03-07 14:59:08 +01:00
parent 85214ec303
commit 652fedf69c
8 changed files with 81 additions and 58 deletions

View File

@@ -77,9 +77,9 @@ def save_image(img_array, serial_number, frame_index, images_dir):
path = images_dir / f"camera_{serial_number}_frame_{frame_index:06d}.png"
path.parent.mkdir(parents=True, exist_ok=True)
img.save(str(path), quality=100)
logging.info(f"Saved image: {path}")
logging.info("Saved image: %s", path)
except Exception as e:
logging.error(f"Failed to save image for camera {serial_number} frame {frame_index}: {e}")
logging.error("Failed to save image for camera %s frame %s: %s", serial_number, frame_index, e)
def save_images_from_cameras(
@@ -447,7 +447,7 @@ class IntelRealSenseCamera:
num_tries += 1
time.sleep(1 / self.fps)
if num_tries > self.fps and (self.thread.ident is None or not self.thread.is_alive()):
raise Exception(
raise TimeoutError(
"The thread responsible for `self.async_read()` took too much time to start. There might be an issue. Verify that `self.thread.start()` has been called."
)

View File

@@ -45,7 +45,7 @@ from lerobot.common.utils.utils import capture_timestamp_utc
MAX_OPENCV_INDEX = 60
def find_cameras(raise_when_empty=False, max_index_search_range=MAX_OPENCV_INDEX, mock=False) -> list[dict]:
def find_cameras(max_index_search_range=MAX_OPENCV_INDEX, mock=False) -> list[dict]:
cameras = []
if platform.system() == "Linux":
print("Linux detected. Finding available camera indices through scanning '/dev/video*' ports")

View File

@@ -169,7 +169,8 @@ def convert_degrees_to_steps(degrees: float | np.ndarray, models: str | list[str
return steps
def convert_to_bytes(value, bytes, mock=False):
# TODO(Steven): Similar function in feetch.py, should be moved to a common place.
def convert_to_bytes(value, byte, mock=False):
if mock:
return value
@@ -177,16 +178,16 @@ def convert_to_bytes(value, bytes, mock=False):
# Note: No need to convert back into unsigned int, since this byte preprocessing
# already handles it for us.
if bytes == 1:
if byte == 1:
data = [
dxl.DXL_LOBYTE(dxl.DXL_LOWORD(value)),
]
elif bytes == 2:
elif byte == 2:
data = [
dxl.DXL_LOBYTE(dxl.DXL_LOWORD(value)),
dxl.DXL_HIBYTE(dxl.DXL_LOWORD(value)),
]
elif bytes == 4:
elif byte == 4:
data = [
dxl.DXL_LOBYTE(dxl.DXL_LOWORD(value)),
dxl.DXL_HIBYTE(dxl.DXL_LOWORD(value)),
@@ -196,7 +197,7 @@ def convert_to_bytes(value, bytes, mock=False):
else:
raise NotImplementedError(
f"Value of the number of bytes to be sent is expected to be in [1, 2, 4], but "
f"{bytes} is provided instead."
f"{byte} is provided instead."
)
return data
@@ -228,9 +229,9 @@ def assert_same_address(model_ctrl_table, motor_models, data_name):
all_addr = []
all_bytes = []
for model in motor_models:
addr, bytes = model_ctrl_table[model][data_name]
addr, byte = model_ctrl_table[model][data_name]
all_addr.append(addr)
all_bytes.append(bytes)
all_bytes.append(byte)
if len(set(all_addr)) != 1:
raise NotImplementedError(
@@ -576,6 +577,8 @@ class DynamixelMotorsBus:
# (start_pos - values[i]) / resolution <= factor <= (end_pos - values[i]) / resolution
low_factor = (start_pos - values[i]) / resolution
upp_factor = (end_pos - values[i]) / resolution
else:
raise ValueError(f"Unknown calibration mode '{calib_mode}'.")
if not in_range:
# Get first integer between the two bounds
@@ -596,10 +599,15 @@ class DynamixelMotorsBus:
elif CalibrationMode[calib_mode] == CalibrationMode.LINEAR:
out_of_range_str = f"{LOWER_BOUND_LINEAR} < {calib_val} < {UPPER_BOUND_LINEAR} %"
in_range_str = f"{LOWER_BOUND_LINEAR} < {calib_val} < {UPPER_BOUND_LINEAR} %"
else:
raise ValueError(f"Unknown calibration mode '{calib_mode}'.")
logging.warning(
f"Auto-correct calibration of motor '{name}' by shifting value by {abs(factor)} full turns, "
f"from '{out_of_range_str}' to '{in_range_str}'."
"Auto-correct calibration of motor '%s' by shifting value by {abs(factor)} full turns, "
"from '%s' to '%s'.",
name,
out_of_range_str,
in_range_str,
)
# A full turn corresponds to 360 degrees but also to 4096 steps for a motor resolution of 4096.
@@ -656,8 +664,8 @@ class DynamixelMotorsBus:
motor_ids = [motor_ids]
assert_same_address(self.model_ctrl_table, self.motor_models, data_name)
addr, bytes = self.model_ctrl_table[motor_models[0]][data_name]
group = dxl.GroupSyncRead(self.port_handler, self.packet_handler, addr, bytes)
addr, byte = self.model_ctrl_table[motor_models[0]][data_name]
group = dxl.GroupSyncRead(self.port_handler, self.packet_handler, addr, byte)
for idx in motor_ids:
group.addParam(idx)
@@ -674,7 +682,7 @@ class DynamixelMotorsBus:
values = []
for idx in motor_ids:
value = group.getData(idx, addr, bytes)
value = group.getData(idx, addr, byte)
values.append(value)
if return_list:
@@ -709,13 +717,13 @@ class DynamixelMotorsBus:
models.append(model)
assert_same_address(self.model_ctrl_table, models, data_name)
addr, bytes = self.model_ctrl_table[model][data_name]
addr, byte = self.model_ctrl_table[model][data_name]
group_key = get_group_sync_key(data_name, motor_names)
if data_name not in self.group_readers:
# create new group reader
self.group_readers[group_key] = dxl.GroupSyncRead(
self.port_handler, self.packet_handler, addr, bytes
self.port_handler, self.packet_handler, addr, byte
)
for idx in motor_ids:
self.group_readers[group_key].addParam(idx)
@@ -733,7 +741,7 @@ class DynamixelMotorsBus:
values = []
for idx in motor_ids:
value = self.group_readers[group_key].getData(idx, addr, bytes)
value = self.group_readers[group_key].getData(idx, addr, byte)
values.append(value)
values = np.array(values)
@@ -767,10 +775,10 @@ class DynamixelMotorsBus:
values = [values]
assert_same_address(self.model_ctrl_table, motor_models, data_name)
addr, bytes = self.model_ctrl_table[motor_models[0]][data_name]
group = dxl.GroupSyncWrite(self.port_handler, self.packet_handler, addr, bytes)
addr, byte = self.model_ctrl_table[motor_models[0]][data_name]
group = dxl.GroupSyncWrite(self.port_handler, self.packet_handler, addr, byte)
for idx, value in zip(motor_ids, values, strict=True):
data = convert_to_bytes(value, bytes, self.mock)
data = convert_to_bytes(value, byte, self.mock)
group.addParam(idx, data)
for _ in range(num_retry):
@@ -821,17 +829,17 @@ class DynamixelMotorsBus:
values = values.tolist()
assert_same_address(self.model_ctrl_table, models, data_name)
addr, bytes = self.model_ctrl_table[model][data_name]
addr, byte = self.model_ctrl_table[model][data_name]
group_key = get_group_sync_key(data_name, motor_names)
init_group = data_name not in self.group_readers
if init_group:
self.group_writers[group_key] = dxl.GroupSyncWrite(
self.port_handler, self.packet_handler, addr, bytes
self.port_handler, self.packet_handler, addr, byte
)
for idx, value in zip(motor_ids, values, strict=True):
data = convert_to_bytes(value, bytes, self.mock)
data = convert_to_bytes(value, byte, self.mock)
if init_group:
self.group_writers[group_key].addParam(idx, data)
else:

View File

@@ -148,7 +148,7 @@ def convert_degrees_to_steps(degrees: float | np.ndarray, models: str | list[str
return steps
def convert_to_bytes(value, bytes, mock=False):
def convert_to_bytes(value, byte, mock=False):
if mock:
return value
@@ -156,16 +156,16 @@ def convert_to_bytes(value, bytes, mock=False):
# Note: No need to convert back into unsigned int, since this byte preprocessing
# already handles it for us.
if bytes == 1:
if byte == 1:
data = [
scs.SCS_LOBYTE(scs.SCS_LOWORD(value)),
]
elif bytes == 2:
elif byte == 2:
data = [
scs.SCS_LOBYTE(scs.SCS_LOWORD(value)),
scs.SCS_HIBYTE(scs.SCS_LOWORD(value)),
]
elif bytes == 4:
elif byte == 4:
data = [
scs.SCS_LOBYTE(scs.SCS_LOWORD(value)),
scs.SCS_HIBYTE(scs.SCS_LOWORD(value)),
@@ -175,7 +175,7 @@ def convert_to_bytes(value, bytes, mock=False):
else:
raise NotImplementedError(
f"Value of the number of bytes to be sent is expected to be in [1, 2, 4], but "
f"{bytes} is provided instead."
f"{byte} is provided instead."
)
return data
@@ -207,9 +207,9 @@ def assert_same_address(model_ctrl_table, motor_models, data_name):
all_addr = []
all_bytes = []
for model in motor_models:
addr, bytes = model_ctrl_table[model][data_name]
addr, byte = model_ctrl_table[model][data_name]
all_addr.append(addr)
all_bytes.append(bytes)
all_bytes.append(byte)
if len(set(all_addr)) != 1:
raise NotImplementedError(
@@ -557,6 +557,8 @@ class FeetechMotorsBus:
# (start_pos - values[i]) / resolution <= factor <= (end_pos - values[i]) / resolution
low_factor = (start_pos - values[i]) / resolution
upp_factor = (end_pos - values[i]) / resolution
else:
raise ValueError(f"Unknown calibration mode {calib_mode}")
if not in_range:
# Get first integer between the two bounds
@@ -577,10 +579,16 @@ class FeetechMotorsBus:
elif CalibrationMode[calib_mode] == CalibrationMode.LINEAR:
out_of_range_str = f"{LOWER_BOUND_LINEAR} < {calib_val} < {UPPER_BOUND_LINEAR} %"
in_range_str = f"{LOWER_BOUND_LINEAR} < {calib_val} < {UPPER_BOUND_LINEAR} %"
else:
raise ValueError(f"Unknown calibration mode {calib_mode}")
logging.warning(
f"Auto-correct calibration of motor '{name}' by shifting value by {abs(factor)} full turns, "
f"from '{out_of_range_str}' to '{in_range_str}'."
"Auto-correct calibration of motor '%s' by shifting value by %s full turns, "
"from '%s' to '%s'.",
name,
abs(factor),
out_of_range_str,
in_range_str,
)
# A full turn corresponds to 360 degrees but also to 4096 steps for a motor resolution of 4096.
@@ -674,8 +682,8 @@ class FeetechMotorsBus:
motor_ids = [motor_ids]
assert_same_address(self.model_ctrl_table, self.motor_models, data_name)
addr, bytes = self.model_ctrl_table[motor_models[0]][data_name]
group = scs.GroupSyncRead(self.port_handler, self.packet_handler, addr, bytes)
addr, byte = self.model_ctrl_table[motor_models[0]][data_name]
group = scs.GroupSyncRead(self.port_handler, self.packet_handler, addr, byte)
for idx in motor_ids:
group.addParam(idx)
@@ -692,7 +700,7 @@ class FeetechMotorsBus:
values = []
for idx in motor_ids:
value = group.getData(idx, addr, bytes)
value = group.getData(idx, addr, byte)
values.append(value)
if return_list:
@@ -727,7 +735,7 @@ class FeetechMotorsBus:
models.append(model)
assert_same_address(self.model_ctrl_table, models, data_name)
addr, bytes = self.model_ctrl_table[model][data_name]
addr, byte = self.model_ctrl_table[model][data_name]
group_key = get_group_sync_key(data_name, motor_names)
if data_name not in self.group_readers:
@@ -737,7 +745,7 @@ class FeetechMotorsBus:
# create new group reader
self.group_readers[group_key] = scs.GroupSyncRead(
self.port_handler, self.packet_handler, addr, bytes
self.port_handler, self.packet_handler, addr, byte
)
for idx in motor_ids:
self.group_readers[group_key].addParam(idx)
@@ -755,7 +763,7 @@ class FeetechMotorsBus:
values = []
for idx in motor_ids:
value = self.group_readers[group_key].getData(idx, addr, bytes)
value = self.group_readers[group_key].getData(idx, addr, byte)
values.append(value)
values = np.array(values)
@@ -792,10 +800,10 @@ class FeetechMotorsBus:
values = [values]
assert_same_address(self.model_ctrl_table, motor_models, data_name)
addr, bytes = self.model_ctrl_table[motor_models[0]][data_name]
group = scs.GroupSyncWrite(self.port_handler, self.packet_handler, addr, bytes)
addr, byte = self.model_ctrl_table[motor_models[0]][data_name]
group = scs.GroupSyncWrite(self.port_handler, self.packet_handler, addr, byte)
for idx, value in zip(motor_ids, values, strict=True):
data = convert_to_bytes(value, bytes, self.mock)
data = convert_to_bytes(value, byte, self.mock)
group.addParam(idx, data)
for _ in range(num_retry):
@@ -846,17 +854,17 @@ class FeetechMotorsBus:
values = values.tolist()
assert_same_address(self.model_ctrl_table, models, data_name)
addr, bytes = self.model_ctrl_table[model][data_name]
addr, byte = self.model_ctrl_table[model][data_name]
group_key = get_group_sync_key(data_name, motor_names)
init_group = data_name not in self.group_readers
if init_group:
self.group_writers[group_key] = scs.GroupSyncWrite(
self.port_handler, self.packet_handler, addr, bytes
self.port_handler, self.packet_handler, addr, byte
)
for idx, value in zip(motor_ids, values, strict=True):
data = convert_to_bytes(value, bytes, self.mock)
data = convert_to_bytes(value, byte, self.mock)
if init_group:
self.group_writers[group_key].addParam(idx, data)
else:

View File

@@ -95,6 +95,8 @@ def move_to_calibrate(
while_move_hook=None,
):
initial_pos = arm.read("Present_Position", motor_name)
p_present_pos = None
n_present_pos = None
if positive_first:
p_present_pos = move_until_block(
@@ -196,7 +198,7 @@ def run_arm_auto_calibration_so100(arm: MotorsBus, robot_type: str, arm_name: st
calib["wrist_flex"] = move_to_calibrate(arm, "wrist_flex")
calib["wrist_flex"] = apply_offset(calib["wrist_flex"], offset=80)
def in_between_move_hook():
def in_between_move_hook_elbow():
nonlocal arm, calib
time.sleep(2)
ef_pos = arm.read("Present_Position", "elbow_flex")
@@ -207,14 +209,14 @@ def run_arm_auto_calibration_so100(arm: MotorsBus, robot_type: str, arm_name: st
print("Calibrate elbow_flex")
calib["elbow_flex"] = move_to_calibrate(
arm, "elbow_flex", positive_first=False, in_between_move_hook=in_between_move_hook
arm, "elbow_flex", positive_first=False, in_between_move_hook=in_between_move_hook_elbow
)
calib["elbow_flex"] = apply_offset(calib["elbow_flex"], offset=80 - 1024)
arm.write("Goal_Position", calib["elbow_flex"]["zero_pos"] + 1024 + 512, "elbow_flex")
time.sleep(1)
def in_between_move_hook():
def in_between_move_hook_shoulder():
nonlocal arm, calib
arm.write("Goal_Position", calib["elbow_flex"]["zero_pos"], "elbow_flex")
@@ -224,7 +226,7 @@ def run_arm_auto_calibration_so100(arm: MotorsBus, robot_type: str, arm_name: st
"shoulder_lift",
invert_drive_mode=True,
positive_first=False,
in_between_move_hook=in_between_move_hook,
in_between_move_hook=in_between_move_hook_shoulder,
)
# add an 30 steps as offset to align with body
calib["shoulder_lift"] = apply_offset(calib["shoulder_lift"], offset=1024 - 50)

View File

@@ -67,14 +67,14 @@ def calibrate_follower_arm(motors_bus, calib_dir_str):
return
if calib_file.exists():
with open(calib_file) as f:
with open(calib_file, encoding="utf-8") as f:
calibration = json.load(f)
print(f"[INFO] Loaded calibration from {calib_file}")
else:
print("[INFO] Calibration file not found. Running manual calibration...")
calibration = run_arm_manual_calibration(motors_bus, "lekiwi", "follower_arm", "follower")
print(f"[INFO] Calibration complete. Saving to {calib_file}")
with open(calib_file, "w") as f:
with open(calib_file, "w", encoding="utf-8") as f:
json.dump(calibration, f)
try:
motors_bus.set_calibration(calibration)

View File

@@ -47,8 +47,10 @@ def ensure_safe_goal_position(
if not torch.allclose(goal_pos, safe_goal_pos):
logging.warning(
"Relative goal position magnitude had to be clamped to be safe.\n"
f" requested relative goal position target: {diff}\n"
f" clamped relative goal position target: {safe_diff}"
" requested relative goal position target: %s\n"
" clamped relative goal position target: %s",
diff,
safe_diff,
)
return safe_goal_pos
@@ -245,6 +247,8 @@ class ManipulatorRobot:
from lerobot.common.robot_devices.motors.dynamixel import TorqueMode
elif self.robot_type in ["so100", "moss", "lekiwi"]:
from lerobot.common.robot_devices.motors.feetech import TorqueMode
else:
raise NotImplementedError(f"Robot type {self.robot_type} is not supported")
# We assume that at connection time, arms are in a rest position, and torque can
# be safely disabled to run calibration and/or set robot preset configurations.
@@ -302,7 +306,7 @@ class ManipulatorRobot:
arm_calib_path = self.calibration_dir / f"{arm_id}.json"
if arm_calib_path.exists():
with open(arm_calib_path) as f:
with open(arm_calib_path, encoding="utf-8") as f:
calibration = json.load(f)
else:
# TODO(rcadene): display a warning in __init__ if calibration file not available
@@ -322,7 +326,7 @@ class ManipulatorRobot:
print(f"Calibration is done! Saving calibration file '{arm_calib_path}'")
arm_calib_path.parent.mkdir(parents=True, exist_ok=True)
with open(arm_calib_path, "w") as f:
with open(arm_calib_path, "w", encoding="utf-8") as f:
json.dump(calibration, f)
return calibration

View File

@@ -262,14 +262,14 @@ class MobileManipulator:
arm_calib_path = self.calibration_dir / f"{arm_id}.json"
if arm_calib_path.exists():
with open(arm_calib_path) as f:
with open(arm_calib_path, encoding="utf-8") as f:
calibration = json.load(f)
else:
print(f"Missing calibration file '{arm_calib_path}'")
calibration = run_arm_manual_calibration(arm, self.robot_type, name, arm_type)
print(f"Calibration is done! Saving calibration file '{arm_calib_path}'")
arm_calib_path.parent.mkdir(parents=True, exist_ok=True)
with open(arm_calib_path, "w") as f:
with open(arm_calib_path, "w", encoding="utf-8") as f:
json.dump(calibration, f)
return calibration
@@ -372,6 +372,7 @@ class MobileManipulator:
present_speed = self.last_present_speed
# TODO(Steven): [WARN] Plenty of general exceptions
except Exception as e:
print(f"[DEBUG] Error decoding video message: {e}")
# If decode fails, fall back to old data