Skip tests if alexander koch robot is not available

This commit is contained in:
Remi Cadene
2024-07-10 19:41:57 +02:00
parent c2388c59be
commit 95de4b7454
6 changed files with 55 additions and 10 deletions

View File

@@ -13,8 +13,25 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pytest
from .utils import DEVICE
def pytest_collection_finish():
print(f"\nTesting with {DEVICE=}")
@pytest.fixture(scope="session")
def is_koch_available():
try:
from lerobot.common.robot_devices.robots.factory import make_robot
robot = make_robot("koch")
robot.connect()
del robot
return True
except Exception as e:
print("An alexander koch robot is not available.")
print(e)
return False

View File

@@ -3,6 +3,7 @@ import pytest
from lerobot.common.robot_devices.cameras.opencv import OpenCVCamera, save_images_from_cameras
from lerobot.common.robot_devices.utils import RobotDeviceAlreadyConnectedError, RobotDeviceNotConnectedError
from tests.utils import require_koch
CAMERA_INDEX = 2
# Maximum absolute difference between two consecutive images recored by a camera.
@@ -14,7 +15,8 @@ def compute_max_pixel_difference(first_image, second_image):
return np.abs(first_image.astype(float) - second_image.astype(float)).max()
def test_camera():
@require_koch
def test_camera(request):
"""Test assumes that `camera.read()` returns the same image when called multiple times in a row.
So the environment should not change (you shouldnt be in front of the camera) and the camera should not be moving.
@@ -118,5 +120,6 @@ def test_camera():
del camera
def test_save_images_from_cameras(tmpdir):
@require_koch
def test_save_images_from_cameras(tmpdir, request):
save_images_from_cameras(tmpdir, record_time_s=1)

View File

@@ -4,10 +4,11 @@ from lerobot.common.policies.factory import make_policy
from lerobot.common.robot_devices.robots.factory import make_robot
from lerobot.common.utils.utils import init_hydra_config
from lerobot.scripts.control_robot import record_dataset, replay_episode, run_policy, teleoperate
from tests.utils import DEFAULT_CONFIG_PATH, DEVICE
from tests.utils import DEFAULT_CONFIG_PATH, DEVICE, require_koch
def test_teleoperate():
@require_koch
def test_teleoperate(request):
robot = make_robot("koch")
teleoperate(robot, teleop_time_s=1)
teleoperate(robot, fps=30, teleop_time_s=1)
@@ -15,13 +16,13 @@ def test_teleoperate():
del robot
def test_record_dataset_and_replay_episode_and_run_policy(tmpdir):
@require_koch
def test_record_dataset_and_replay_episode_and_run_policy(tmpdir, request):
robot_name = "koch"
env_name = "koch_real"
policy_name = "act_koch_real"
# root = Path(tmpdir)
root = Path("tmp/data")
root = Path(tmpdir)
repo_id = "lerobot/debug"
robot = make_robot(robot_name)

View File

@@ -5,9 +5,11 @@ import pytest
from lerobot.common.robot_devices.motors.dynamixel import DynamixelMotorsBus, find_port
from lerobot.common.robot_devices.utils import RobotDeviceAlreadyConnectedError, RobotDeviceNotConnectedError
from tests.utils import require_koch
def test_motors_bus():
@require_koch
def test_motors_bus(request):
# TODO(rcadene): measure fps in nightly?
# TODO(rcadene): test logs
# TODO(rcadene): test calibration
@@ -83,5 +85,6 @@ def test_motors_bus():
assert (new_values == values).all()
def test_find_port():
@require_koch
def test_find_port(request):
find_port()

View File

@@ -7,9 +7,11 @@ import torch
from lerobot.common.robot_devices.robots.factory import make_robot
from lerobot.common.robot_devices.robots.koch import KochRobot
from lerobot.common.robot_devices.utils import RobotDeviceAlreadyConnectedError, RobotDeviceNotConnectedError
from tests.utils import require_koch
def test_robot(tmpdir):
@require_koch
def test_robot(tmpdir, request):
# TODO(rcadene): measure fps in nightly?
# TODO(rcadene): test logs
# TODO(rcadene): add compatibility with other robots

View File

@@ -147,3 +147,22 @@ def require_package(package_name):
return wrapper
return decorator
def require_koch(func):
"""
Decorator that skips the test if an alexander koch robot is not available
"""
@wraps(func)
def wrapper(*args, **kwargs):
# Access the pytest request context to get the is_koch_available fixture
request = kwargs.get("request")
if request is None:
raise ValueError("The 'request' fixture must be passed to the test function as a parameter.")
if not request.getfixturevalue("is_koch_available"):
pytest.skip("An alexander koch robot is not available.")
return func(*args, **kwargs)
return wrapper