Mock OpenCVCamera

This commit is contained in:
Remi Cadene
2024-09-01 17:36:58 +02:00
committed by Remi Cadene
parent f17d9a2ba1
commit 96cc2433d6
3 changed files with 63 additions and 16 deletions

View File

@@ -16,9 +16,12 @@
import platform
from functools import wraps
import cv2
import numpy as np
import pytest
import torch
from lerobot import available_robots
from lerobot.common.utils.import_utils import is_package_available
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
@@ -28,6 +31,8 @@ DEFAULT_CONFIG_PATH = "lerobot/configs/default.yaml"
ROBOT_CONFIG_PATH_TEMPLATE = "lerobot/configs/robot/{robot}.yaml"
TEST_ROBOT_TYPES = available_robots + [f"mocked_{robot_type}" for robot_type in available_robots]
def require_x86_64_kernel(func):
"""
@@ -175,11 +180,58 @@ def require_robot(func):
robot_type = kwargs.get("robot_type")
if request is None:
raise ValueError("The 'request' fixture must be passed to the test function as a parameter.")
raise ValueError("The 'request' fixture must be an argument of the test function.")
# Run test with a monkeypatched version of the robot devices.
if robot_type.startswith("mocked_"):
kwargs["robot_type"] = robot_type.replace("mocked_", "")
monkeypatch = request.getfixturevalue("monkeypatch")
monkeypatch.setattr(cv2, "VideoCapture", MockVideoCapture)
# Run test with a real robot. Skip test if robot connection fails.
else:
# `is_robot_available` is defined in `tests/conftest.py`
if not request.getfixturevalue("is_robot_available"):
pytest.skip(f"A {robot_type} robot is not available.")
# The function `is_robot_available` is defined in `tests/conftest.py`
if not request.getfixturevalue("is_robot_available"):
pytest.skip(f"A {robot_type} robot is not available.")
return func(*args, **kwargs)
return wrapper
class MockVideoCapture(cv2.VideoCapture):
image = {
"480x640": np.random.randint(0, 256, size=(480, 640, 3), dtype=np.uint8),
"720x1280": np.random.randint(0, 256, size=(720, 1280, 3), dtype=np.uint8),
}
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._mock_dict = {
cv2.CAP_PROP_FPS: 30,
cv2.CAP_PROP_FRAME_WIDTH: 640,
cv2.CAP_PROP_FRAME_HEIGHT: 480,
}
def isOpened(self): # noqa: N802
return True
def set(self, propId: int, value: float) -> bool: # noqa: N803
self._mock_dict[propId] = value
return True
def get(self, propId: int) -> float: # noqa: N803
value = self._mock_dict[propId]
if value == 0:
if propId == cv2.CAP_PROP_FRAME_HEIGHT:
value = 480
elif propId == cv2.CAP_PROP_FRAME_WIDTH:
value = 640
return value
def read(self):
h = self.get(cv2.CAP_PROP_FRAME_HEIGHT)
w = self.get(cv2.CAP_PROP_FRAME_WIDTH)
ret = True
return ret, self.image[f"{h}x{w}"]