add dynamic import for cv2 and pyrealsense2

This commit is contained in:
Remi Cadene
2024-09-09 19:32:35 +02:00
parent 2469c99053
commit 44b8394365
4 changed files with 66 additions and 43 deletions

View File

@@ -347,7 +347,7 @@ class IntelRealSenseCamera:
return color_image return color_image
def read_loop(self): def read_loop(self):
while self.stop_event is None or not self.stop_event.is_set(): while not self.stop_event.is_set():
if self.use_depth: if self.use_depth:
self.color_image, self.depth_map = self.read() self.color_image, self.depth_map = self.read()
else: else:

41
tests/mock_opencv.py Normal file
View File

@@ -0,0 +1,41 @@
import cv2
import numpy as np
class MockVideoCapture(cv2.VideoCapture):
image = {
"480x640": np.random.randint(0, 256, size=(480, 640, 3), dtype=np.uint8),
"720x1280": np.random.randint(0, 256, size=(720, 1280, 3), dtype=np.uint8),
}
def __init__(self, *args, **kwargs):
self._mock_dict = {
cv2.CAP_PROP_FPS: 30,
cv2.CAP_PROP_FRAME_WIDTH: 640,
cv2.CAP_PROP_FRAME_HEIGHT: 480,
}
def isOpened(self): # noqa: N802
return True
def set(self, propId: int, value: float) -> bool: # noqa: N803
self._mock_dict[propId] = value
return True
def get(self, propId: int) -> float: # noqa: N803
value = self._mock_dict[propId]
if value == 0:
if propId == cv2.CAP_PROP_FRAME_HEIGHT:
value = 480
elif propId == cv2.CAP_PROP_FRAME_WIDTH:
value = 640
return value
def read(self):
h = self.get(cv2.CAP_PROP_FRAME_HEIGHT)
w = self.get(cv2.CAP_PROP_FRAME_WIDTH)
ret = True
return ret, self.image[f"{h}x{w}"]
def release(self):
pass

View File

@@ -5,6 +5,11 @@ Example usage:
```bash ```bash
pytest -sx tests/test_cameras.py::test_camera pytest -sx tests/test_cameras.py::test_camera
``` ```
Example of running test on mocked_koch:
```bash
python -m pytest -sx -k "mocked_koch" tests/test_cameras.py::test_camera
```
""" """
import numpy as np import numpy as np

View File

@@ -14,10 +14,9 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import platform import platform
import traceback
from functools import wraps from functools import wraps
import cv2
import numpy as np
import pytest import pytest
import torch import torch
@@ -187,7 +186,24 @@ def require_robot(func):
kwargs["robot_type"] = robot_type.replace("mocked_", "") kwargs["robot_type"] = robot_type.replace("mocked_", "")
monkeypatch = request.getfixturevalue("monkeypatch") monkeypatch = request.getfixturevalue("monkeypatch")
monkeypatch.setattr(cv2, "VideoCapture", MockVideoCapture)
try:
import cv2
from tests.mock_opencv import MockVideoCapture
monkeypatch.setattr(cv2, "VideoCapture", MockVideoCapture)
except ImportError:
traceback.print_exc()
try:
import pyrealsense2 as rs
# TODO(rcadene):
mock_pipeline = None
monkeypatch.setattr(rs, "pipeline", mock_pipeline)
except ImportError:
traceback.print_exc()
# Run test with a real robot. Skip test if robot connection fails. # Run test with a real robot. Skip test if robot connection fails.
else: else:
@@ -198,42 +214,3 @@ def require_robot(func):
return func(*args, **kwargs) return func(*args, **kwargs)
return wrapper return wrapper
class MockVideoCapture(cv2.VideoCapture):
image = {
"480x640": np.random.randint(0, 256, size=(480, 640, 3), dtype=np.uint8),
"720x1280": np.random.randint(0, 256, size=(720, 1280, 3), dtype=np.uint8),
}
def __init__(self, *args, **kwargs):
self._mock_dict = {
cv2.CAP_PROP_FPS: 30,
cv2.CAP_PROP_FRAME_WIDTH: 640,
cv2.CAP_PROP_FRAME_HEIGHT: 480,
}
def isOpened(self): # noqa: N802
return True
def set(self, propId: int, value: float) -> bool: # noqa: N803
self._mock_dict[propId] = value
return True
def get(self, propId: int) -> float: # noqa: N803
value = self._mock_dict[propId]
if value == 0:
if propId == cv2.CAP_PROP_FRAME_HEIGHT:
value = 480
elif propId == cv2.CAP_PROP_FRAME_WIDTH:
value = 640
return value
def read(self):
h = self.get(cv2.CAP_PROP_FRAME_HEIGHT)
w = self.get(cv2.CAP_PROP_FRAME_WIDTH)
ret = True
return ret, self.image[f"{h}x{w}"]
def release(self):
pass