Files
lerobot/tests/utils.py
2024-09-26 14:35:17 +02:00

385 lines
13 KiB
Python

#!/usr/bin/env python
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import platform
import traceback
from functools import wraps
import pytest
import torch
from lerobot import available_cameras, available_motors, available_robots
from lerobot.common.robot_devices.cameras.utils import Camera
from lerobot.common.robot_devices.motors.utils import MotorsBus
from lerobot.common.robot_devices.robots.factory import make_robot as make_robot_from_cfg
from lerobot.common.robot_devices.robots.utils import Robot
from lerobot.common.utils.import_utils import is_package_available
from lerobot.common.utils.utils import init_hydra_config
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
# Pass this as the first argument to init_hydra_config.
DEFAULT_CONFIG_PATH = "lerobot/configs/default.yaml"
ROBOT_CONFIG_PATH_TEMPLATE = "lerobot/configs/robot/{robot}.yaml"
TEST_ROBOT_TYPES = []
for robot_type in available_robots:
TEST_ROBOT_TYPES += [(robot_type, True), (robot_type, False)]
TEST_CAMERA_TYPES = []
for camera_type in available_cameras:
TEST_CAMERA_TYPES += [(camera_type, True), (camera_type, False)]
TEST_MOTOR_TYPES = []
for motor_type in available_motors:
TEST_MOTOR_TYPES += [(motor_type, True), (motor_type, False)]
# Camera indices used for connecting physical cameras
OPENCV_CAMERA_INDEX = int(os.environ.get("LEROBOT_TEST_OPENCV_CAMERA_INDEX", 0))
INTELREALSENSE_CAMERA_INDEX = int(os.environ.get("LEROBOT_TEST_INTELREALSENSE_CAMERA_INDEX", 128422271614))
DYNAMIXEL_PORT = "/dev/tty.usbmodem575E0032081"
DYNAMIXEL_MOTORS = {
"shoulder_pan": [1, "xl430-w250"],
"shoulder_lift": [2, "xl430-w250"],
"elbow_flex": [3, "xl330-m288"],
"wrist_flex": [4, "xl330-m288"],
"wrist_roll": [5, "xl330-m288"],
"gripper": [6, "xl330-m288"],
}
def require_x86_64_kernel(func):
"""
Decorator that skips the test if plateform device is not an x86_64 cpu.
"""
from functools import wraps
@wraps(func)
def wrapper(*args, **kwargs):
if platform.machine() != "x86_64":
pytest.skip("requires x86_64 plateform")
return func(*args, **kwargs)
return wrapper
def require_cpu(func):
"""
Decorator that skips the test if device is not cpu.
"""
from functools import wraps
@wraps(func)
def wrapper(*args, **kwargs):
if DEVICE != "cpu":
pytest.skip("requires cpu")
return func(*args, **kwargs)
return wrapper
def require_cuda(func):
"""
Decorator that skips the test if cuda is not available.
"""
from functools import wraps
@wraps(func)
def wrapper(*args, **kwargs):
if not torch.cuda.is_available():
pytest.skip("requires cuda")
return func(*args, **kwargs)
return wrapper
def require_env(func):
"""
Decorator that skips the test if the required environment package is not installed.
As it need 'env_name' in args, it also checks whether it is provided as an argument.
If 'env_name' is None, this check is skipped.
"""
@wraps(func)
def wrapper(*args, **kwargs):
# Determine if 'env_name' is provided and extract its value
arg_names = func.__code__.co_varnames[: func.__code__.co_argcount]
if "env_name" in arg_names:
# Get the index of 'env_name' and retrieve the value from args
index = arg_names.index("env_name")
env_name = args[index] if len(args) > index else kwargs.get("env_name")
else:
raise ValueError("Function does not have 'env_name' as an argument.")
# Perform the package check
package_name = f"gym_{env_name}"
if env_name is not None and not is_package_available(package_name):
pytest.skip(f"gym-{env_name} not installed")
return func(*args, **kwargs)
return wrapper
def require_package_arg(func):
"""
Decorator that skips the test if the required package is not installed.
This is similar to `require_env` but more general in that it can check any package (not just environments).
As it need 'required_packages' in args, it also checks whether it is provided as an argument.
If 'required_packages' is None, this check is skipped.
"""
@wraps(func)
def wrapper(*args, **kwargs):
# Determine if 'required_packages' is provided and extract its value
arg_names = func.__code__.co_varnames[: func.__code__.co_argcount]
if "required_packages" in arg_names:
# Get the index of 'required_packages' and retrieve the value from args
index = arg_names.index("required_packages")
required_packages = args[index] if len(args) > index else kwargs.get("required_packages")
else:
raise ValueError("Function does not have 'required_packages' as an argument.")
if required_packages is None:
return func(*args, **kwargs)
# Perform the package check
for package in required_packages:
if not is_package_available(package):
pytest.skip(f"{package} not installed")
return func(*args, **kwargs)
return wrapper
def require_package(package_name):
"""
Decorator that skips the test if the specified package is not installed.
"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
if not is_package_available(package_name):
pytest.skip(f"{package_name} not installed")
return func(*args, **kwargs)
return wrapper
return decorator
def mock_robot_or_skip_test_when_not_available(monkeypatch, robot_type, mock):
if mock:
# Run test with a monkeypatched version of the robot devices.
# TODO(rcadene): redesign mocking to not have this hardcoded logic
if robot_type in ["koch", "koch_bimanual"]:
camera_type = "opencv"
elif robot_type == "aloha":
camera_type = "intelrealsense"
else:
camera_type = "all"
mock_cameras(monkeypatch, camera_type)
mock_motors(monkeypatch)
# To run calibration without user input
monkeypatch.setattr("builtins.input", mock_input)
elif not is_robot_available(robot_type):
# Run test with a real robot. Skip test if robot connection fails.
pytest.skip(f"A {robot_type} robot is not available.")
def mock_camera_or_skip_test_when_not_available(monkeypatch, camera_type, mock):
if camera_type not in available_cameras:
raise ValueError(
f"The camera type '{camera_type}' is not valid. Expected one of these '{available_cameras}"
)
if mock:
# Run test with a monkeypatched version of the cameras
mock_cameras(monkeypatch, camera_type)
elif not is_camera_available(camera_type):
# Run test with a real camera. Skip test if camera connection fails
pytest.skip(f"A {camera_type} camera is not available.")
def mock_motor_or_skip_test_when_not_available(monkeypatch, motor_type, mock):
if motor_type not in available_motors:
raise ValueError(
f"The motor type '{motor_type}' is not valid. Expected one of these '{available_motors}"
)
if mock:
# Run test with a monkeypatched version of the motors
mock_motors(monkeypatch)
elif not is_motor_available(motor_type):
# Run test with a real motor. Skip test if motor connection fails
pytest.skip(f"A {motor_type} motor is not available.")
def mock_input(text=None):
if text is not None:
print(text)
def mock_cameras(monkeypatch, camera_type="all"):
# TODO(rcadene): Redesign the mocking tests
if camera_type in ["opencv", "all"]:
try:
import cv2
from tests.mock_opencv import MockVideoCapture
monkeypatch.setattr(cv2, "VideoCapture", MockVideoCapture)
except ImportError:
traceback.print_exc()
pytest.skip("To avoid skipping tests mocking opencv cameras, run `pip install opencv-python`.")
if camera_type in ["intelrealsense", "all"]:
try:
import pyrealsense2 as rs
from tests.mock_intelrealsense import (
MockConfig,
MockContext,
MockFormat,
MockPipeline,
MockStream,
)
monkeypatch.setattr(rs, "config", MockConfig)
monkeypatch.setattr(rs, "pipeline", MockPipeline)
monkeypatch.setattr(rs, "stream", MockStream)
monkeypatch.setattr(rs, "format", MockFormat)
monkeypatch.setattr(rs, "context", MockContext)
except ImportError:
traceback.print_exc()
pytest.skip(
"To avoid skipping tests mocking intelrealsense cameras, run `pip install pyrealsense2`."
)
def mock_motors(monkeypatch):
# TODO(rcadene): Redesign the mocking tests
try:
import dynamixel_sdk
from tests.mock_dynamixel import (
MockGroupSyncRead,
MockGroupSyncWrite,
MockPacketHandler,
MockPortHandler,
mock_convert_to_bytes,
)
monkeypatch.setattr(dynamixel_sdk, "GroupSyncRead", MockGroupSyncRead)
monkeypatch.setattr(dynamixel_sdk, "GroupSyncWrite", MockGroupSyncWrite)
monkeypatch.setattr(dynamixel_sdk, "PacketHandler", MockPacketHandler)
monkeypatch.setattr(dynamixel_sdk, "PortHandler", MockPortHandler)
# Import dynamixel AFTER mocking dynamixel_sdk to use mocked classes
from lerobot.common.robot_devices.motors import dynamixel
# TODO(rcadene): remove need to mock `convert_to_bytes` by implemented the inverse transform
# `convert_bytes_to_value`
monkeypatch.setattr(dynamixel, "convert_to_bytes", mock_convert_to_bytes)
except ImportError:
traceback.print_exc()
pytest.skip("To avoid skipping tests mocking dynamixel motors, run `pip install dynamixel-sdk`.")
def make_robot(robot_type: str, overrides: list[str] | None = None) -> Robot:
config_path = ROBOT_CONFIG_PATH_TEMPLATE.format(robot=robot_type)
robot_cfg = init_hydra_config(config_path, overrides)
robot = make_robot_from_cfg(robot_cfg)
return robot
def make_camera(camera_type, **kwargs) -> Camera:
if camera_type == "opencv":
from lerobot.common.robot_devices.cameras.opencv import OpenCVCamera
camera_index = kwargs.pop("camera_index", OPENCV_CAMERA_INDEX)
return OpenCVCamera(camera_index, **kwargs)
elif camera_type == "intelrealsense":
from lerobot.common.robot_devices.cameras.intelrealsense import IntelRealSenseCamera
camera_index = kwargs.pop("camera_index", INTELREALSENSE_CAMERA_INDEX)
return IntelRealSenseCamera(camera_index, **kwargs)
else:
raise ValueError(f"The camera type '{camera_type}' is not valid.")
def make_motors_bus(motor_type: str, **kwargs) -> MotorsBus:
if motor_type == "dynamixel":
from lerobot.common.robot_devices.motors.dynamixel import DynamixelMotorsBus
port = kwargs.pop("port", DYNAMIXEL_PORT)
motors = kwargs.pop("motors", DYNAMIXEL_MOTORS)
return DynamixelMotorsBus(port, motors, **kwargs)
else:
raise ValueError(f"The motor type '{motor_type}' is not valid.")
def is_robot_available(robot_type):
try:
from lerobot.common.robot_devices.robots.factory import make_robot
config_path = ROBOT_CONFIG_PATH_TEMPLATE.format(robot=robot_type)
robot_cfg = init_hydra_config(config_path)
robot = make_robot(robot_cfg)
print("DEBUG", robot.leader_arms)
print("DEBUG", robot.cameras)
robot.connect()
del robot
return True
except Exception:
traceback.print_exc()
print(f"\nA {robot_type} robot is not available.")
return False
def is_camera_available(camera_type):
try:
camera = make_camera(camera_type)
camera.connect()
del camera
return True
except Exception:
traceback.print_exc()
print(f"\nA {camera_type} camera is not available.")
return False
def is_motor_available(motor_type):
try:
motors_bus = make_motors_bus(motor_type)
motors_bus.connect()
del motors_bus
return True
except Exception:
traceback.print_exc()
print(f"\nA {motor_type} motor is not available.")
return False