Compare commits

..

1 Commits

Author SHA1 Message Date
Steven Palma
cb10f97ccc test(cameras): add opencv camera patch tests suite 2025-04-16 23:25:40 +02:00
3 changed files with 342 additions and 495 deletions

View File

@@ -1,305 +0,0 @@
# ruff: noqa: N802,N803
import abc
from typing import Optional, Tuple
import numpy as np
# --- Interface Definition ---
class IVideoCapture(abc.ABC):
"""Interface for the cv2.VideoCapture class."""
@abc.abstractmethod
def __init__(self, index: int | str, backend: Optional[int] = None):
pass
@abc.abstractmethod
def isOpened(self) -> bool:
pass
@abc.abstractmethod
def release(self) -> None:
pass
@abc.abstractmethod
def set(self, propId: int, value: float) -> bool:
pass
@abc.abstractmethod
def get(self, propId: int) -> float:
pass
@abc.abstractmethod
def read(self) -> Tuple[bool, Optional[np.ndarray]]:
pass
class IOpenCVSDK(abc.ABC):
"""Interface defining the contract for OpenCV SDK interactions."""
# --- Constants ---
CAP_PROP_FPS: int
CAP_PROP_FRAME_WIDTH: int
CAP_PROP_FRAME_HEIGHT: int
COLOR_BGR2RGB: int
ROTATE_90_COUNTERCLOCKWISE: int
ROTATE_90_CLOCKWISE: int
ROTATE_180: int
CAP_V4L2: int
CAP_DSHOW: int
CAP_AVFOUNDATION: int
CAP_ANY: int
# --- Inner Class Type Hint ---
VideoCapture: type[IVideoCapture]
# --- Methods ---
@abc.abstractmethod
def setNumThreads(self, nthreads: int) -> None:
pass
@abc.abstractmethod
def cvtColor(self, src: np.ndarray, code: int) -> np.ndarray:
pass
@abc.abstractmethod
def rotate(self, src: np.ndarray, rotateCode: int) -> np.ndarray:
pass
# --- Real SDK Adapter ---
class OpenCVSDKAdapter(IOpenCVSDK):
"""Adapts the real cv2 library to the IOpenCVSDK interface."""
_cv2 = None
def __init__(self):
try:
import cv2
OpenCVSDKAdapter._cv2 = cv2
except ImportError as e:
raise ImportError(
"OpenCV (cv2) is not installed. Please install it to use the real camera."
) from e
# --- Constants ---
self.CAP_PROP_FPS = self._cv2.CAP_PROP_FPS
self.CAP_PROP_FRAME_WIDTH = self._cv2.CAP_PROP_FRAME_WIDTH
self.CAP_PROP_FRAME_HEIGHT = self._cv2.CAP_PROP_FRAME_HEIGHT
self.COLOR_BGR2RGB = self._cv2.COLOR_BGR2RGB
self.ROTATE_90_COUNTERCLOCKWISE = self._cv2.ROTATE_90_COUNTERCLOCKWISE
self.ROTATE_90_CLOCKWISE = self._cv2.ROTATE_90_CLOCKWISE
self.ROTATE_180 = self._cv2.ROTATE_180
self.CAP_V4L2 = self._cv2.CAP_V4L2
self.CAP_DSHOW = self._cv2.CAP_DSHOW
self.CAP_AVFOUNDATION = self._cv2.CAP_AVFOUNDATION
self.CAP_ANY = self._cv2.CAP_ANY
# --- Inner Class Implementation ---
class RealVideoCapture(IVideoCapture):
def __init__(self, index: int | str, backend: Optional[int] = None):
self._cap = OpenCVSDKAdapter._cv2.VideoCapture(index, backend)
def isOpened(self) -> bool:
return self._cap.isOpened()
def release(self) -> None:
self._cap.release()
def set(self, propId: int, value: float) -> bool:
return self._cap.set(propId, value)
def get(self, propId: int) -> float:
return self._cap.get(propId)
def read(self) -> Tuple[bool, Optional[np.ndarray]]:
return self._cap.read()
def __del__(self):
if hasattr(self, "_cap") and self._cap and self._cap.isOpened():
self._cap.release()
self.VideoCapture = RealVideoCapture
# --- Methods ---
def setNumThreads(self, nthreads: int) -> None:
self._cv2.setNumThreads(nthreads)
def cvtColor(self, src: np.ndarray, code: int) -> np.ndarray:
return self._cv2.cvtColor(src, code)
def rotate(self, src: np.ndarray, rotateCode: int) -> np.ndarray:
return self._cv2.rotate(src, rotateCode)
# Emulates the cheap USB camera
VALID_INDICES = {0, 1, 2, "/dev/video0", "/dev/video1", "/dev/video2"}
DEFAULT_FPS = 30.0
DEFAULT_WIDTH = 1280
DEFAULT_HEIGHT = 720
# --- Fake SDK Adapter ---
class FakeOpenCVSDKAdapter(IOpenCVSDK):
"""Implements the IOpenCVSDK interface with fake behavior for testing."""
# --- Constants ---
CAP_PROP_FPS = DEFAULT_FPS
CAP_PROP_FRAME_WIDTH = DEFAULT_WIDTH
CAP_PROP_FRAME_HEIGHT = DEFAULT_HEIGHT
COLOR_BGR2RGB = 99
ROTATE_90_COUNTERCLOCKWISE = -90
ROTATE_90_CLOCKWISE = 90
ROTATE_180 = 180
CAP_V4L2 = 91
CAP_DSHOW = 92
CAP_AVFOUNDATION = 93
CAP_ANY = 90
_cameras_opened: dict[int | str, bool] = {}
_camera_properties: dict[tuple[int | str, int], float] = {}
_simulated_image: np.ndarray = np.random.randint(
0, 256, (DEFAULT_HEIGHT, DEFAULT_WIDTH, 3), dtype=np.uint8
)
_simulated_fps: float = DEFAULT_FPS
_image_read_count: int = 0
_fail_read_after: Optional[int] = None # Simulate read failure
@classmethod
def init_configure_fake(
cls,
simulated_image: Optional[np.ndarray] = None,
simulated_fps: Optional[float] = None,
fail_read_after: Optional[int] = None,
):
if simulated_image is not None:
cls._simulated_image = simulated_image
if simulated_fps is not None:
cls._simulated_fps = simulated_fps
cls._fail_read_after = fail_read_after
cls._image_read_count = 0
cls._cameras_opened = {}
cls._camera_properties = {}
@classmethod
def configure_fake_simulated_image(cls, simulated_image: Optional[np.ndarray] = None):
if simulated_image is not None:
cls._simulated_image = simulated_image
@classmethod
def configure_fail_read_after(cls, fail_read_after: Optional[int] = None):
cls._fail_read_after = fail_read_after
@classmethod
def configure_fake_simulated_fps(cls, simulated_fps: Optional[float] = None):
if simulated_fps is not None:
cls._simulated_fps = simulated_fps
# --- Inner Class Implementation ---
class FakeVideoCapture(IVideoCapture):
def __init__(self, index: int | str, backend: Optional[int] = None):
self.index = index
self.backend = backend
valid_indices = VALID_INDICES
if self.index in valid_indices:
FakeOpenCVSDKAdapter._cameras_opened[self.index] = True
print(f"[FAKE SDK] Opened camera {self.index}")
# Set some default fake properties
FakeOpenCVSDKAdapter._camera_properties[(self.index, FakeOpenCVSDKAdapter.CAP_PROP_FPS)] = (
DEFAULT_FPS
)
FakeOpenCVSDKAdapter._camera_properties[
(self.index, FakeOpenCVSDKAdapter.CAP_PROP_FRAME_WIDTH)
] = float(FakeOpenCVSDKAdapter._simulated_image.shape[1])
FakeOpenCVSDKAdapter._camera_properties[
(self.index, FakeOpenCVSDKAdapter.CAP_PROP_FRAME_HEIGHT)
] = float(FakeOpenCVSDKAdapter._simulated_image.shape[0])
else:
FakeOpenCVSDKAdapter._cameras_opened[self.index] = False
print(f"[FAKE SDK] Failed to open camera {self.index}")
def isOpened(self) -> bool:
return FakeOpenCVSDKAdapter._cameras_opened.get(self.index, False)
def release(self) -> None:
if self.index in FakeOpenCVSDKAdapter._cameras_opened:
FakeOpenCVSDKAdapter._cameras_opened[self.index] = False
print(f"[FAKE SDK] Released camera {self.index}")
# Clear properties on release
props_to_remove = [k for k in FakeOpenCVSDKAdapter._camera_properties if k[0] == self.index]
for k in props_to_remove:
del FakeOpenCVSDKAdapter._camera_properties[k]
def set(self, propId: int, value: float) -> bool:
if not self.isOpened():
return False
print(
f"[FAKE SDK] Ignoring set property {propId} = {value} for camera {self.index} to preserve state."
)
# FakeOpenCVSDKAdapter._camera_properties[(self.index, propId)] = value
# Simulate failure for specific unrealistic settings if needed
return True
def get(self, propId: int) -> float:
if not self.isOpened():
return 0.0 # Or raise error? Mimic cv2 behavior
val = FakeOpenCVSDKAdapter._camera_properties.get((self.index, propId))
print(f"[FAKE SDK] Get property {propId} for camera {self.index} -> {val}")
return val
def read(self) -> Tuple[bool, Optional[np.ndarray]]:
if not self.isOpened():
print(f"[FAKE SDK] Read failed: Camera {self.index} not open.")
return False, None
FakeOpenCVSDKAdapter._image_read_count += 1
if (
FakeOpenCVSDKAdapter._fail_read_after is not None
and FakeOpenCVSDKAdapter._image_read_count > FakeOpenCVSDKAdapter._fail_read_after
):
print(
f"[FAKE SDK] Simulated read failure for camera {self.index} after {FakeOpenCVSDKAdapter._fail_read_after} reads."
)
return False, None
print(
f"[FAKE SDK] Read image from camera {self.index} (read #{FakeOpenCVSDKAdapter._image_read_count})"
)
# Return a copy to prevent modification issues if the caller changes it
return True, FakeOpenCVSDKAdapter._simulated_image.copy()
def __del__(self):
# Ensure cleanup if garbage collected
self.release()
VideoCapture = FakeVideoCapture # Assign inner class
# --- Methods ---
def setNumThreads(self, nthreads: int) -> None:
print(f"[FAKE SDK] setNumThreads({nthreads}) called.")
# No actual behavior needed in fake
def cvtColor(self, src: np.ndarray, code: int) -> np.ndarray:
print(f"[FAKE SDK] cvtColor called with code {code}.")
# Just return the source image, or simulate channel swap if needed
if code == self.COLOR_BGR2RGB and src.shape[2] == 3:
print("[FAKE SDK] Simulating BGR -> RGB conversion.")
return src[..., ::-1]
return src.copy()
def rotate(self, src: np.ndarray, rotateCode: int) -> np.ndarray:
print(f"[FAKE SDK] rotate called with code {rotateCode}.")
if rotateCode == self.ROTATE_90_COUNTERCLOCKWISE:
print("[FAKE SDK] Simulating 90 degree counter-clockwise rotation.")
rotated_img = np.rot90(np.rot90(np.rot90(src.copy())))
return rotated_img
elif rotateCode == self.ROTATE_90_CLOCKWISE:
print("[FAKE SDK] Simulating 90 degree clockwise rotation.")
rotated_img = np.rot90(src.copy())
return rotated_img
elif rotateCode == self.ROTATE_180:
print("[FAKE SDK] Simulating 180 degree rotation.")
rotated_img = np.rot90(np.rot90(src.copy()))
return rotated_img
return src.copy()

View File

@@ -25,6 +25,7 @@ import threading
import time
from pathlib import Path
import cv2
import numpy as np
from PIL import Image
@@ -35,7 +36,6 @@ from lerobot.common.utils.robot_utils import (
from lerobot.common.utils.utils import capture_timestamp_utc
from ..camera import Camera
from ..interface_camera_sdk import IOpenCVSDK, OpenCVSDKAdapter
from .configuration_opencv import OpenCVCameraConfig
# The maximum opencv device index depends on your operating system. For instance,
@@ -46,17 +46,12 @@ from .configuration_opencv import OpenCVCameraConfig
MAX_OPENCV_INDEX = 60
def find_cameras(
raise_when_empty=False, max_index_search_range=MAX_OPENCV_INDEX, cv2_sdk: IOpenCVSDK = None
) -> list[dict]:
if cv2_sdk is None:
cv2_sdk = OpenCVSDKAdapter()
def find_cameras(raise_when_empty=False, max_index_search_range=MAX_OPENCV_INDEX) -> list[dict]:
cameras = []
if platform.system() == "Linux":
print("Linux detected. Finding available camera indices through scanning '/dev/video*' ports")
possible_ports = [str(port) for port in Path("/dev").glob("video*")]
ports = _find_cameras(possible_ports, cv2_sdk=cv2_sdk)
ports = _find_cameras(possible_ports)
for port in ports:
cameras.append(
{
@@ -70,7 +65,7 @@ def find_cameras(
f"scanning all indices from 0 to {MAX_OPENCV_INDEX}"
)
possible_indices = range(max_index_search_range)
indices = _find_cameras(possible_indices, cv2_sdk=cv2_sdk)
indices = _find_cameras(possible_indices)
for index in indices:
cameras.append(
{
@@ -82,15 +77,10 @@ def find_cameras(
return cameras
def _find_cameras(
possible_camera_ids: list[int | str], raise_when_empty=False, cv2_sdk: IOpenCVSDK = None
) -> list[int | str]:
if cv2_sdk is None:
cv2_sdk = OpenCVSDKAdapter()
def _find_cameras(possible_camera_ids: list[int | str], raise_when_empty=False) -> list[int | str]:
camera_ids = []
for camera_idx in possible_camera_ids:
camera = cv2_sdk.VideoCapture(camera_idx)
camera = cv2.VideoCapture(camera_idx)
is_open = camera.isOpened()
camera.release()
@@ -131,24 +121,20 @@ def save_images_from_cameras(
width=None,
height=None,
record_time_s=2,
cv2_sdk: IOpenCVSDK = None,
):
"""
Initializes all the cameras and saves images to the directory. Useful to visually identify the camera
associated to a given camera index.
"""
if cv2_sdk is None:
cv2_sdk = OpenCVSDKAdapter()
if camera_ids is None or len(camera_ids) == 0:
camera_infos = find_cameras(cv2_sdk=cv2_sdk)
camera_infos = find_cameras()
camera_ids = [cam["index"] for cam in camera_infos]
print("Connecting cameras")
cameras = []
for cam_idx in camera_ids:
config = OpenCVCameraConfig(camera_index=cam_idx, fps=fps, width=width, height=height)
camera = OpenCVCamera(config, cv2_sdk=cv2_sdk)
camera = OpenCVCamera(config)
camera.connect()
print(
f"OpenCVCamera({camera.camera_index}, fps={camera.fps}, width={camera.capture_width}, "
@@ -235,16 +221,11 @@ class OpenCVCamera(Camera):
```
"""
def __init__(self, config: OpenCVCameraConfig, cv2_sdk: IOpenCVSDK = None):
def __init__(self, config: OpenCVCameraConfig):
self.config = config
self.camera_index = config.camera_index
self.port = None
if cv2_sdk is None:
cv2_sdk = OpenCVSDKAdapter()
self.cv2_sdk = cv2_sdk
# Linux uses ports for connecting to cameras
if platform.system() == "Linux":
if isinstance(self.camera_index, int):
@@ -281,36 +262,34 @@ class OpenCVCamera(Camera):
self.rotation = None
if config.rotation == -90:
self.rotation = cv2_sdk.ROTATE_90_COUNTERCLOCKWISE
self.rotation = cv2.ROTATE_90_COUNTERCLOCKWISE
elif config.rotation == 90:
self.rotation = cv2_sdk.ROTATE_90_CLOCKWISE
self.rotation = cv2.ROTATE_90_CLOCKWISE
elif config.rotation == 180:
self.rotation = cv2_sdk.ROTATE_180
self.rotation = cv2.ROTATE_180
def connect(self):
if self.is_connected:
raise DeviceAlreadyConnectedError(f"OpenCVCamera({self.camera_index}) is already connected.")
cv2_sdk = self.cv2_sdk
# Use 1 thread to avoid blocking the main thread. Especially useful during data collection
# when other threads are used to save the images.
cv2_sdk.setNumThreads(1)
cv2.setNumThreads(1)
backend = (
cv2_sdk.CAP_V4L2
cv2.CAP_V4L2
if platform.system() == "Linux"
else cv2_sdk.CAP_DSHOW
else cv2.CAP_DSHOW
if platform.system() == "Windows"
else cv2_sdk.CAP_AVFOUNDATION
else cv2.CAP_AVFOUNDATION
if platform.system() == "Darwin"
else cv2_sdk.CAP_ANY
else cv2.CAP_ANY
)
camera_idx = f"/dev/video{self.camera_index}" if platform.system() == "Linux" else self.camera_index
# First create a temporary camera trying to access `camera_index`,
# and verify it is a valid camera by calling `isOpened`.
tmp_camera = cv2_sdk.VideoCapture(camera_idx, backend)
tmp_camera = cv2.VideoCapture(camera_idx, backend)
is_camera_open = tmp_camera.isOpened()
# Release camera to make it accessible for `find_camera_indices`
tmp_camera.release()
@@ -320,7 +299,7 @@ class OpenCVCamera(Camera):
# valid cameras.
if not is_camera_open:
# Verify that the provided `camera_index` is valid before printing the traceback
cameras_info = find_cameras(cv2_sdk=cv2_sdk)
cameras_info = find_cameras()
available_cam_ids = [cam["index"] for cam in cameras_info]
if self.camera_index not in available_cam_ids:
raise ValueError(
@@ -333,18 +312,18 @@ class OpenCVCamera(Camera):
# Secondly, create the camera that will be used downstream.
# Note: For some unknown reason, calling `isOpened` blocks the camera which then
# needs to be re-created.
self.camera = cv2_sdk.VideoCapture(camera_idx, backend)
self.camera = cv2.VideoCapture(camera_idx, backend)
if self.fps is not None:
self.camera.set(cv2_sdk.CAP_PROP_FPS, self.fps)
self.camera.set(cv2.CAP_PROP_FPS, self.fps)
if self.capture_width is not None:
self.camera.set(cv2_sdk.CAP_PROP_FRAME_WIDTH, self.capture_width)
self.camera.set(cv2.CAP_PROP_FRAME_WIDTH, self.capture_width)
if self.capture_height is not None:
self.camera.set(cv2_sdk.CAP_PROP_FRAME_HEIGHT, self.capture_height)
self.camera.set(cv2.CAP_PROP_FRAME_HEIGHT, self.capture_height)
actual_fps = self.camera.get(cv2_sdk.CAP_PROP_FPS)
actual_width = self.camera.get(cv2_sdk.CAP_PROP_FRAME_WIDTH)
actual_height = self.camera.get(cv2_sdk.CAP_PROP_FRAME_HEIGHT)
actual_fps = self.camera.get(cv2.CAP_PROP_FPS)
actual_width = self.camera.get(cv2.CAP_PROP_FRAME_WIDTH)
actual_height = self.camera.get(cv2.CAP_PROP_FRAME_HEIGHT)
# Using `math.isclose` since actual fps can be a float (e.g. 29.9 instead of 30)
if self.fps is not None and not math.isclose(self.fps, actual_fps, rel_tol=1e-3):
@@ -382,8 +361,6 @@ class OpenCVCamera(Camera):
f"OpenCVCamera({self.camera_index}) is not connected. Try running `camera.connect()` first."
)
cv2_sdk = self.cv2_sdk
start_time = time.perf_counter()
ret, color_image = self.camera.read()
@@ -402,7 +379,7 @@ class OpenCVCamera(Camera):
# However, Deep Learning framework such as LeRobot uses RGB format as default to train neural networks,
# so we convert the image color from BGR to RGB.
if requested_color_mode == "rgb":
color_image = cv2_sdk.cvtColor(color_image, cv2_sdk.COLOR_BGR2RGB)
color_image = cv2.cvtColor(color_image, cv2.COLOR_BGR2RGB)
h, w, _ = color_image.shape
if h != self.capture_height or w != self.capture_width:
@@ -411,7 +388,7 @@ class OpenCVCamera(Camera):
)
if self.rotation is not None:
color_image = cv2_sdk.rotate(color_image, self.rotation)
color_image = cv2.rotate(color_image, self.rotation)
# log the number of seconds it took to read the image
self.logs["delta_timestamp_s"] = time.perf_counter() - start_time

View File

@@ -1,3 +1,5 @@
#!/usr/bin/env python
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -11,31 +13,38 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import platform
# Note: The current test approach uses mock/patch per management decision.
# Future maintainers may consider Dependency Injection for improved testability.
# As it currently stands, it is brittle, requires complex mocks and discourages refactoring.
# Everytime that we change the implementation code, we might need to change the tests.
import unittest
from pathlib import Path
from unittest.mock import MagicMock, patch
from unittest.mock import MagicMock, call, patch
import numpy as np
import pytest
from lerobot.common.cameras.interface_camera_sdk import FakeOpenCVSDKAdapter, IVideoCapture, OpenCVSDKAdapter
from lerobot.common.cameras.opencv import OpenCVCamera, OpenCVCameraConfig
from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
# We might need to mock these
from lerobot.common.errors import (
DeviceAlreadyConnectedError,
DeviceNotConnectedError,
)
MODULE_PATH = "lerobot.common.cameras.opencv.camera_opencv"
# Define constants that might be used by mocks
MOCK_CV2_CAP_PROP_FPS = FakeOpenCVSDKAdapter.CAP_PROP_FPS
MOCK_CV2_CAP_PROP_FRAME_WIDTH = FakeOpenCVSDKAdapter.CAP_PROP_FRAME_WIDTH
MOCK_CV2_CAP_PROP_FRAME_HEIGHT = FakeOpenCVSDKAdapter.CAP_PROP_FRAME_HEIGHT
# TODO(Steven): Consider a CLI argument to set this
# Emulates the cheap USB camera in index 0
LEROBOT_USE_REAL_OPENCV_CAMERA_TESTS = (
os.getenv("LEROBOT_USE_REAL_OPENCV_CAMERA_TESTS", "False").lower() == "true"
)
MOCK_CV2_CAP_PROP_FPS = 30
MOCK_CV2_CAP_PROP_FRAME_WIDTH = 1280
MOCK_CV2_CAP_PROP_FRAME_HEIGHT = 720
MOCK_CV2_ROTATE_90_CLOCKWISE = 90
MOCK_CV2_ROTATE_180 = 180
MOCK_CV2_ROTATE_90_COUNTERCLOCKWISE = -90
MOCK_CV2_COLOR_BGR2RGB = 99
MOCK_CV2_CAP_V4L2 = 91
MOCK_CV2_CAP_DSHOW = 92
MOCK_CV2_CAP_AVFOUNDATION = 93
MOCK_CV2_CAP_ANY = 90
# Helper function to create a realistic-looking dummy image
@@ -48,7 +57,7 @@ def create_dummy_image(height, width, channels=3):
class TestOpenCVCamera(unittest.TestCase):
def setUp(self):
# Emulates the cheap USB camera in index 0
# Default config used in many tests
self.default_config = OpenCVCameraConfig(
camera_index=0,
width=MOCK_CV2_CAP_PROP_FRAME_WIDTH,
@@ -57,27 +66,44 @@ class TestOpenCVCamera(unittest.TestCase):
color_mode="rgb",
rotation=None,
)
# TODO(Steven): Consider calling this only once, instead of in every test
if LEROBOT_USE_REAL_OPENCV_CAMERA_TESTS:
# Use the real OpenCV SDK
self.test_sdk = OpenCVSDKAdapter()
# self.addCleanup()
else:
# Create a default dummy image based on config
self.dummy_bgr_image = create_dummy_image(self.default_config.height, self.default_config.width)
self.dummy_rgb_image = self.dummy_bgr_image[..., ::-1] # Simple BGR -> RGB simulation
FakeOpenCVSDKAdapter.init_configure_fake(simulated_image=self.dummy_bgr_image)
self.test_sdk = FakeOpenCVSDKAdapter()
# self.addCleanup()
# Create a default dummy image based on config
self.dummy_bgr_image = create_dummy_image(self.default_config.height, self.default_config.width)
self.dummy_rgb_image = self.dummy_bgr_image[..., ::-1] # Simple BGR -> RGB simulation
self.patch_dependencies()
# --- Mock Setup ---
def patch_dependencies(self):
# Mock the cv2 module itself
patcher_cv2 = patch(f"{MODULE_PATH}.cv2", autospec=True)
self.mock_cv2 = patcher_cv2.start()
self.addCleanup(patcher_cv2.stop)
# Mock the VideoCapture class returned by cv2.VideoCapture
self.mock_capture_instance = MagicMock()
self.mock_cv2.VideoCapture.return_value = self.mock_capture_instance
# Assign mock constants (important for comparisons and calls)
self.mock_cv2.CAP_PROP_FPS = MOCK_CV2_CAP_PROP_FPS
self.mock_cv2.CAP_PROP_FRAME_WIDTH = MOCK_CV2_CAP_PROP_FRAME_WIDTH
self.mock_cv2.CAP_PROP_FRAME_HEIGHT = MOCK_CV2_CAP_PROP_FRAME_HEIGHT
self.mock_cv2.ROTATE_90_CLOCKWISE = MOCK_CV2_ROTATE_90_CLOCKWISE
self.mock_cv2.ROTATE_180 = MOCK_CV2_ROTATE_180
self.mock_cv2.ROTATE_90_COUNTERCLOCKWISE = MOCK_CV2_ROTATE_90_COUNTERCLOCKWISE
self.mock_cv2.COLOR_BGR2RGB = MOCK_CV2_COLOR_BGR2RGB
self.mock_cv2.CAP_V4L2 = MOCK_CV2_CAP_V4L2
self.mock_cv2.CAP_DSHOW = MOCK_CV2_CAP_DSHOW
self.mock_cv2.CAP_AVFOUNDATION = MOCK_CV2_CAP_AVFOUNDATION
self.mock_cv2.CAP_ANY = MOCK_CV2_CAP_ANY
patcher_platform = patch(f"{MODULE_PATH}.platform.system")
self.mock_platform_system = patcher_platform.start()
self.addCleanup(patcher_platform.stop)
self.mock_platform_system.return_value = "Darwin" # Default to macOS
patcher_is_valid_unix_path = patch(f"{MODULE_PATH}.is_valid_unix_path")
self.mock_is_valid_unix_path = patcher_is_valid_unix_path.start()
self.addCleanup(patcher_is_valid_unix_path.stop)
mock_is_valid_unix_path = patch(f"{MODULE_PATH}.is_valid_unix_path")
self.mock_is_valid_unix_path = mock_is_valid_unix_path.start()
self.addCleanup(mock_is_valid_unix_path.stop)
self.mock_is_valid_unix_path.return_value = True
patcher_time_sleep = patch(f"{MODULE_PATH}.time.sleep", return_value=None)
@@ -98,9 +124,18 @@ class TestOpenCVCamera(unittest.TestCase):
self.mock_event_class.return_value = self.mock_event_instance
self.mock_event_instance.is_set.return_value = False # Default to not set
# --- Tests __init__ ---
# Mock internal utility functions if they interact with system/cv2
patcher_find_cameras = patch(f"{MODULE_PATH}.find_cameras")
self.mock_find_cameras = patcher_find_cameras.start()
self.addCleanup(patcher_find_cameras.stop)
patcher_get_index = patch(f"{MODULE_PATH}.get_camera_index_from_unix_port")
self.mock_get_camera_index_from_unix_port = patcher_get_index.start()
self.addCleanup(patcher_get_index.stop)
# --- Test __init__ ---
def test_init_defaults(self):
cam = OpenCVCamera(self.default_config, cv2_sdk=self.test_sdk)
cam = OpenCVCamera(self.default_config)
self.assertEqual(cam.camera_index, 0)
self.assertEqual(cam.capture_width, MOCK_CV2_CAP_PROP_FRAME_WIDTH)
@@ -116,94 +151,129 @@ class TestOpenCVCamera(unittest.TestCase):
def test_init_with_rotation_90(self):
config = self.default_config
config.rotation = 90
cam = OpenCVCamera(config, cv2_sdk=self.test_sdk)
cam = OpenCVCamera(config)
self.assertEqual(cam.width, MOCK_CV2_CAP_PROP_FRAME_HEIGHT) # Swapped
self.assertEqual(cam.height, MOCK_CV2_CAP_PROP_FRAME_WIDTH) # Swapped
self.assertEqual(cam.rotation, self.test_sdk.ROTATE_90_CLOCKWISE)
self.assertEqual(cam.rotation, MOCK_CV2_ROTATE_90_CLOCKWISE)
def test_init_with_rotation_minus_90(self):
config = self.default_config
config.rotation = -90
cam = OpenCVCamera(config, cv2_sdk=self.test_sdk)
cam = OpenCVCamera(config)
self.assertEqual(cam.width, MOCK_CV2_CAP_PROP_FRAME_HEIGHT) # Swapped
self.assertEqual(cam.height, MOCK_CV2_CAP_PROP_FRAME_WIDTH) # Swapped
self.assertEqual(cam.rotation, self.test_sdk.ROTATE_90_COUNTERCLOCKWISE)
self.assertEqual(cam.rotation, MOCK_CV2_ROTATE_90_COUNTERCLOCKWISE)
def test_init_with_rotation_180(self):
config = self.default_config
config.rotation = 180
cam = OpenCVCamera(config, cv2_sdk=self.test_sdk)
self.assertEqual(cam.width, MOCK_CV2_CAP_PROP_FRAME_WIDTH) # Swapped
self.assertEqual(cam.height, MOCK_CV2_CAP_PROP_FRAME_HEIGHT) # Swapped
self.assertEqual(cam.rotation, self.test_sdk.ROTATE_180)
cam = OpenCVCamera(config)
self.assertEqual(cam.width, MOCK_CV2_CAP_PROP_FRAME_WIDTH) # Not swapped
self.assertEqual(cam.height, MOCK_CV2_CAP_PROP_FRAME_HEIGHT) # Not swapped
self.assertEqual(cam.rotation, MOCK_CV2_ROTATE_180)
@pytest.mark.skipif(
LEROBOT_USE_REAL_OPENCV_CAMERA_TESTS and platform.system() != "Linux",
reason="Not valid for real camera for other platform than Linux",
)
def test_init_linux_with_index(self):
self.mock_platform_system.return_value = "Linux"
cam = OpenCVCamera(self.default_config, cv2_sdk=self.test_sdk)
cam = OpenCVCamera(self.default_config)
self.assertEqual(cam.camera_index, 0)
self.assertIsInstance(cam.port, Path)
self.assertEqual(str(cam.port), "/dev/video0")
@pytest.mark.skipif(
LEROBOT_USE_REAL_OPENCV_CAMERA_TESTS and platform.system() != "Linux",
reason="Not valid for real camera for other platform than Linux",
)
def test_init_linux_with_valid_path(self):
self.mock_platform_system.return_value = "Linux"
self.mock_is_valid_unix_path.return_value = True
self.mock_get_camera_index_from_unix_port.return_value = 2
config = self.default_config
config.camera_index = "/dev/video2"
cam = OpenCVCamera(config, cv2_sdk=self.test_sdk)
cam = OpenCVCamera(config)
self.assertIsInstance(cam.port, Path)
self.assertEqual(str(cam.port), "/dev/video2")
self.assertEqual(cam.camera_index, 2)
self.mock_is_valid_unix_path.assert_called_once_with("/dev/video2")
@pytest.mark.skipif(
LEROBOT_USE_REAL_OPENCV_CAMERA_TESTS and platform.system() != "Linux",
reason="Not valid for real camera for other platform than Linux",
)
def test_init_linux_with_invalid_path(self):
self.mock_platform_system.return_value = "Linux"
self.mock_is_valid_unix_path.return_value = False
config = self.default_config
config.camera_index = "[*?:[/invalid/path"
with self.assertRaisesRegex(ValueError, "Please check the provided camera_index"):
OpenCVCamera(config, cv2_sdk=self.test_sdk)
OpenCVCamera(config)
# --- Test connect ---
def test_connect_success(self):
cam = OpenCVCamera(self.default_config, cv2_sdk=self.test_sdk)
cam = OpenCVCamera(self.default_config)
# Mock the temporary camera check
mock_tmp_capture = MagicMock()
mock_tmp_capture.isOpened.return_value = True
# First call to VideoCapture is the temporary one
# Second call is the real one
self.mock_cv2.VideoCapture.side_effect = [mock_tmp_capture, self.mock_capture_instance]
# Mock the actual camera setup
self.mock_capture_instance.get.side_effect = [
self.default_config.fps,
self.default_config.width,
self.default_config.height,
]
cam.connect()
# Check temporary camera interactions
self.assertEqual(self.mock_cv2.VideoCapture.call_count, 2)
self.mock_cv2.VideoCapture.assert_any_call(
self.default_config.camera_index, MOCK_CV2_CAP_AVFOUNDATION
)
mock_tmp_capture.isOpened.assert_called_once()
mock_tmp_capture.release.assert_called_once()
# Check actual camera interactions
self.mock_capture_instance.set.assert_has_calls(
[
call(MOCK_CV2_CAP_PROP_FPS, self.default_config.fps),
call(MOCK_CV2_CAP_PROP_FRAME_WIDTH, self.default_config.width),
call(MOCK_CV2_CAP_PROP_FRAME_HEIGHT, self.default_config.height),
]
)
self.mock_capture_instance.get.assert_has_calls(
[
call(MOCK_CV2_CAP_PROP_FPS),
call(MOCK_CV2_CAP_PROP_FRAME_WIDTH),
call(MOCK_CV2_CAP_PROP_FRAME_HEIGHT),
]
)
self.assertTrue(cam.is_connected)
self.assertIsNotNone(cam.camera)
self.assertIsInstance(cam.camera, IVideoCapture)
self.mock_cv2.setNumThreads.assert_called_once_with(1)
# Check stored actual values (after rounding)
self.assertEqual(cam.fps, self.default_config.fps)
self.assertEqual(cam.capture_width, self.default_config.width)
self.assertEqual(cam.capture_height, self.default_config.height)
@pytest.mark.skipif(
LEROBOT_USE_REAL_OPENCV_CAMERA_TESTS and platform.system() != "Linux",
reason="Not valid for real camera for other platform than Linux",
)
def test_connect_success_linux(self):
self.mock_platform_system.return_value = "Linux"
cam = OpenCVCamera(self.default_config, cv2_sdk=self.test_sdk)
cam = OpenCVCamera(self.default_config)
mock_tmp_capture = MagicMock()
mock_tmp_capture.isOpened.return_value = True
self.mock_cv2.VideoCapture.side_effect = [mock_tmp_capture, self.mock_capture_instance]
self.mock_capture_instance.get.side_effect = [
MOCK_CV2_CAP_PROP_FPS,
MOCK_CV2_CAP_PROP_FRAME_WIDTH,
MOCK_CV2_CAP_PROP_FRAME_HEIGHT,
] # Floats are ok
cam.connect()
self.assertTrue(cam.is_connected)
self.assertIsNotNone(cam.camera)
self.assertIsInstance(cam.camera, IVideoCapture)
# self.assertIn('/dev/video0', self.test_sdk._cameras_opened)
# self.assertEqual(self.test_sdk.FakeVideoCapture.backend, self.test_sdk.CAP_V4L2)
self.assertEqual(self.mock_cv2.VideoCapture.call_count, 2)
# Check it uses the port path and correct backend
self.mock_cv2.VideoCapture.assert_any_call("/dev/video0", MOCK_CV2_CAP_V4L2)
def test_connect_already_connected(self):
cam = OpenCVCamera(self.default_config, cv2_sdk=self.test_sdk)
cam = OpenCVCamera(self.default_config)
# Simulate already connected state
cam.is_connected = True
with self.assertRaisesRegex(DeviceAlreadyConnectedError, "already connected"):
@@ -212,19 +282,45 @@ class TestOpenCVCamera(unittest.TestCase):
def test_connect_camera_not_opened_invalid_index(self):
config = self.default_config
config.camera_index = 99 # Invalid index in fakeSDK != (0,1,2)
cam = OpenCVCamera(config, cv2_sdk=self.test_sdk)
config.camera_index = 99
cam = OpenCVCamera(config)
# Mock temporary check failing
mock_tmp_capture = MagicMock()
mock_tmp_capture.isOpened.return_value = False
self.mock_cv2.VideoCapture.return_value = mock_tmp_capture # Only tmp is created
# Mock find_cameras returning *other* indices
self.mock_find_cameras.return_value = [
{"index": 0, "port": None},
{"index": 1, "port": None},
{"index": 2, "port": None},
]
with self.assertRaisesRegex(
ValueError, "expected to be one of these available cameras \\[0, 1, 2\\], but 99 is provided"
):
cam.connect()
self.assertFalse(cam.is_connected)
self.mock_find_cameras.assert_called_once()
mock_tmp_capture.release.assert_called_once() # Should still release tmp capture
def test_connect_fps_mismatch(self):
config = self.default_config
config.fps = 999 # Different FPS
cam = OpenCVCamera(config, cv2_sdk=self.test_sdk)
config.fps = 999
cam = OpenCVCamera(config)
mock_tmp_capture = MagicMock()
mock_tmp_capture.isOpened.return_value = True
self.mock_cv2.VideoCapture.side_effect = [mock_tmp_capture, self.mock_capture_instance]
# Mock get to return a different FPS
self.mock_capture_instance.get.side_effect = [
MOCK_CV2_CAP_PROP_FPS, # Different FPS
self.default_config.width,
self.default_config.height,
]
with self.assertRaisesRegex(
OSError, f"Can't set self.fps=999 .* Actual value is {MOCK_CV2_CAP_PROP_FPS}"
@@ -235,10 +331,20 @@ class TestOpenCVCamera(unittest.TestCase):
def test_connect_width_mismatch(self):
config = self.default_config
config.width = 9999
cam = OpenCVCamera(config, cv2_sdk=self.test_sdk)
cam = OpenCVCamera(config)
mock_tmp_capture = MagicMock()
mock_tmp_capture.isOpened.return_value = True
self.mock_cv2.VideoCapture.side_effect = [mock_tmp_capture, self.mock_capture_instance]
self.mock_capture_instance.get.side_effect = [
self.default_config.fps,
MOCK_CV2_CAP_PROP_FRAME_WIDTH, # Different width
self.default_config.height,
]
with self.assertRaisesRegex(
OSError, f"Can't set self.capture_width=9999.* Actual value is {MOCK_CV2_CAP_PROP_FRAME_WIDTH}"
OSError, f"Can't set self.capture_width=9999 .* Actual value is {MOCK_CV2_CAP_PROP_FRAME_WIDTH}"
):
cam.connect()
self.assertFalse(cam.is_connected)
@@ -246,87 +352,123 @@ class TestOpenCVCamera(unittest.TestCase):
def test_connect_height_mismatch(self):
config = self.default_config
config.height = 9999
cam = OpenCVCamera(config, cv2_sdk=self.test_sdk)
cam = OpenCVCamera(config)
mock_tmp_capture = MagicMock()
mock_tmp_capture.isOpened.return_value = True
self.mock_cv2.VideoCapture.side_effect = [mock_tmp_capture, self.mock_capture_instance]
self.mock_capture_instance.get.side_effect = [
self.default_config.fps,
self.default_config.width,
MOCK_CV2_CAP_PROP_FRAME_HEIGHT, # Different height
]
with self.assertRaisesRegex(
OSError, f"Can't set self.capture_height=9999.* Actual value is {MOCK_CV2_CAP_PROP_FRAME_HEIGHT}"
OSError, f"Can't set self.capture_height=9999 .* Actual value is {MOCK_CV2_CAP_PROP_FRAME_HEIGHT}"
):
cam.connect()
self.assertFalse(cam.is_connected)
# --- Test read ---
def test_read_not_connected(self):
cam = OpenCVCamera(self.default_config, cv2_sdk=self.test_sdk)
cam = OpenCVCamera(self.default_config)
with self.assertRaisesRegex(DeviceNotConnectedError, "not connected"):
cam.read()
def test_read_success_rgb(self):
cam = OpenCVCamera(self.default_config, cv2_sdk=self.test_sdk)
cam.connect()
cam = OpenCVCamera(self.default_config)
cam.camera = self.mock_capture_instance # Simulate connection
cam.is_connected = True
cam.capture_width = MOCK_CV2_CAP_PROP_FRAME_WIDTH # Set dimensions as if connect succeeded
cam.capture_height = MOCK_CV2_CAP_PROP_FRAME_HEIGHT
self.mock_capture_instance.read.return_value = (True, self.dummy_bgr_image)
self.mock_cv2.cvtColor.return_value = self.dummy_rgb_image # Mock conversion
img = cam.read()
if isinstance(self.test_sdk, FakeOpenCVSDKAdapter):
# When using fake SDK, verify exact match with dummy image
np.testing.assert_array_equal(img, self.dummy_rgb_image)
else:
# When using real camera, verify basic properties of the captured image
self.assertIsInstance(img, np.ndarray)
self.mock_capture_instance.read.assert_called_once()
self.mock_cv2.cvtColor.assert_called_once_with(self.dummy_bgr_image, MOCK_CV2_COLOR_BGR2RGB)
self.mock_cv2.rotate.assert_not_called()
np.testing.assert_array_equal(img, self.dummy_rgb_image)
def test_read_success_bgr(self):
config = self.default_config
config.color_mode = "bgr"
cam = OpenCVCamera(config, cv2_sdk=self.test_sdk)
cam.connect()
config.color_mode = "bgr" # Request BGR
cam = OpenCVCamera(config)
cam.camera = self.mock_capture_instance
cam.is_connected = True
cam.capture_width = MOCK_CV2_CAP_PROP_FRAME_WIDTH
cam.capture_height = MOCK_CV2_CAP_PROP_FRAME_HEIGHT
self.mock_capture_instance.read.return_value = (True, self.dummy_bgr_image)
img = cam.read()
if isinstance(self.test_sdk, FakeOpenCVSDKAdapter):
# When using fake SDK, verify exact match with dummy image
np.testing.assert_array_equal(img, self.dummy_bgr_image)
else:
# When using real camera, verify basic properties of the captured image
self.assertIsInstance(img, np.ndarray)
self.mock_capture_instance.read.assert_called_once()
self.mock_cv2.cvtColor.assert_not_called() # Should not convert
np.testing.assert_array_equal(img, self.dummy_bgr_image) # Expect BGR
def test_read_success_with_rotation(self):
config = self.default_config
config.rotation = 90
cam = OpenCVCamera(config, cv2_sdk=self.test_sdk) # Sets cam.rotation internally
cam.connect()
cam = OpenCVCamera(config)
cam.camera = self.mock_capture_instance
cam.is_connected = True
cam.capture_width = MOCK_CV2_CAP_PROP_FRAME_WIDTH # Original capture dimensions
cam.capture_height = MOCK_CV2_CAP_PROP_FRAME_HEIGHT
self.mock_capture_instance.read.return_value = (True, self.dummy_bgr_image)
self.mock_cv2.cvtColor.return_value = self.dummy_rgb_image
rotated_image = np.rot90(self.dummy_rgb_image) # Simulate rotation
self.mock_cv2.rotate.return_value = rotated_image # Mock rotation result
img = cam.read()
if isinstance(self.test_sdk, FakeOpenCVSDKAdapter):
# When using fake SDK, verify exact match with dummy image
rotated_image = np.rot90(self.dummy_rgb_image) # Simulate rotation
np.testing.assert_array_equal(img, rotated_image)
else:
# When using real camera, verify basic properties of the captured image
self.assertEqual(img.shape, (self.default_config.width, self.default_config.height, 3))
@pytest.mark.skipif(LEROBOT_USE_REAL_OPENCV_CAMERA_TESTS, reason="Not valid for real camera")
self.mock_capture_instance.read.assert_called_once()
self.mock_cv2.cvtColor.assert_called_once_with(self.dummy_bgr_image, MOCK_CV2_COLOR_BGR2RGB)
self.mock_cv2.rotate.assert_called_once_with(self.dummy_rgb_image, MOCK_CV2_ROTATE_90_CLOCKWISE)
np.testing.assert_array_equal(img, rotated_image)
def test_read_capture_fails(self):
FakeOpenCVSDKAdapter.configure_fail_read_after(fail_read_after=0) # Simulate read failure
cam = OpenCVCamera(self.default_config, cv2_sdk=self.test_sdk)
cam.connect()
cam = OpenCVCamera(self.default_config)
cam.camera = self.mock_capture_instance
cam.is_connected = True
self.mock_capture_instance.read.return_value = (False, None) # Simulate read failure
with self.assertRaisesRegex(OSError, f"Can't capture color image from camera {cam.camera_index}"):
cam.read()
def test_read_invalid_temporary_color_mode(self):
cam = OpenCVCamera(self.default_config, cv2_sdk=self.test_sdk)
cam.connect()
cam = OpenCVCamera(self.default_config)
cam.camera = self.mock_capture_instance
cam.is_connected = True
cam.capture_width = MOCK_CV2_CAP_PROP_FRAME_WIDTH
cam.capture_height = MOCK_CV2_CAP_PROP_FRAME_HEIGHT
self.mock_capture_instance.read.return_value = (
True,
self.dummy_bgr_image,
) # Need read to succeed initially
with self.assertRaisesRegex(
ValueError, "Expected color values are 'rgb' or 'bgr', but xyz is provided"
):
cam.read(temporary_color_mode="xyz")
@pytest.mark.skipif(LEROBOT_USE_REAL_OPENCV_CAMERA_TESTS, reason="Not valid for real camera")
def test_read_dimension_mismatch(self):
wrong_dim_image = create_dummy_image(240, 320)
cam = OpenCVCamera(self.default_config, cv2_sdk=self.test_sdk)
cam.connect()
FakeOpenCVSDKAdapter.configure_fake_simulated_image(
simulated_image=wrong_dim_image
) # Different dimensions
wrong_dim_image = create_dummy_image(240, 320) # Different dimensions
cam = OpenCVCamera(self.default_config)
cam.camera = self.mock_capture_instance
cam.is_connected = True
cam.capture_width = MOCK_CV2_CAP_PROP_FRAME_WIDTH # Expected dims
cam.capture_height = MOCK_CV2_CAP_PROP_FRAME_HEIGHT
self.mock_capture_instance.read.return_value = (True, wrong_dim_image)
self.mock_cv2.cvtColor.return_value = wrong_dim_image
with self.assertRaisesRegex(
OSError,
@@ -334,18 +476,24 @@ class TestOpenCVCamera(unittest.TestCase):
):
cam.read()
# --- Async Read Tests ---
# --- Test async_read ---
def test_async_read_not_connected(self):
cam = OpenCVCamera(self.default_config, cv2_sdk=self.test_sdk)
cam = OpenCVCamera(self.default_config)
with self.assertRaisesRegex(DeviceNotConnectedError, "not connected"):
cam.async_read()
# TODO(Steven): This is dirty, but at least we don't have to manually mock the read
def test_async_read_starts_thread_and_returns_image(
self,
):
cam = OpenCVCamera(self.default_config, cv2_sdk=self.test_sdk)
cam.connect()
# TODO(Steven): Not sure about this one
@patch.object(OpenCVCamera, "read", autospec=True) # Mock the instance's read method
def test_async_read_starts_thread_and_returns_image(self, mock_instance_read):
cam = OpenCVCamera(self.default_config)
cam.is_connected = True # Simulate connection
# TODO(Steven): This is dirty
def mock_read_effect(self_cam_instance):
self_cam_instance.color_image = self.dummy_rgb_image
return self.dummy_rgb_image
mock_instance_read.side_effect = mock_read_effect
def mock_start():
cam.read_loop()
@@ -364,18 +512,25 @@ class TestOpenCVCamera(unittest.TestCase):
self.assertTrue(self.mock_thread_instance.daemon)
self.assertIsNotNone(cam.thread)
self.assertIsNotNone(cam.stop_event)
mock_instance_read.assert_called_once_with(cam)
if isinstance(self.test_sdk, FakeOpenCVSDKAdapter):
# When using fake SDK, verify exact match with dummy image
np.testing.assert_array_equal(img, self.dummy_rgb_image)
else:
# When using real camera, verify basic properties of the captured image
self.assertIsInstance(img, np.ndarray)
cam.is_connected = False # To avoid warning when running the destructor
def test_async_read_timeout(self):
# Assert the image returned by async_read is the one set by the mocked read
np.testing.assert_array_equal(img, self.dummy_rgb_image)
@patch.object(OpenCVCamera, "read", autospec=True)
def test_async_read_timeout(self, mock_instance_read):
config = self.default_config
cam = OpenCVCamera(config, cv2_sdk=self.test_sdk)
cam.connect()
cam = OpenCVCamera(config)
cam.is_connected = True
# Mock 'read' so it *never* sets color_image
def mock_read_effect(self_cam_instance):
self_cam_instance.color_image = None
return None
mock_instance_read.side_effect = mock_read_effect
with self.assertRaisesRegex(TimeoutError, "Timed out waiting for async_read"):
cam.async_read()
@@ -388,15 +543,18 @@ class TestOpenCVCamera(unittest.TestCase):
self.assertIsNotNone(cam.thread)
self.assertIsNotNone(cam.stop_event)
# --- Disconnection Tests ---
cam.is_connected = False # To avoid warning when running the destructor
# --- Test disconnect ---
def test_disconnect_not_connected(self):
cam = OpenCVCamera(self.default_config, cv2_sdk=self.test_sdk)
cam = OpenCVCamera(self.default_config)
with self.assertRaisesRegex(DeviceNotConnectedError, "not connected"):
cam.disconnect()
def test_disconnect_no_thread(self):
cam = OpenCVCamera(self.default_config, cv2_sdk=self.test_sdk)
cam.connect()
cam = OpenCVCamera(self.default_config)
cam.camera = self.mock_capture_instance
cam.is_connected = True
cam.disconnect()
@@ -405,9 +563,22 @@ class TestOpenCVCamera(unittest.TestCase):
self.assertIsNone(cam.thread)
self.assertIsNone(cam.stop_event)
def test_disconnect_with_thread(self):
cam = OpenCVCamera(self.default_config, cv2_sdk=self.test_sdk)
cam.connect()
self.mock_capture_instance.release.assert_called_once()
self.mock_event_instance.set.assert_not_called()
self.mock_thread_instance.join.assert_not_called()
@patch.object(OpenCVCamera, "read", autospec=True)
def test_disconnect_with_thread(self, mock_instance_read):
cam = OpenCVCamera(self.default_config)
cam.camera = self.mock_capture_instance
cam.is_connected = True
# TODO(Steven): This is dirty
def mock_read_effect(self_cam_instance):
self_cam_instance.color_image = self.dummy_rgb_image
return self.dummy_rgb_image
mock_instance_read.side_effect = mock_read_effect
def mock_start():
cam.read_loop()
@@ -417,6 +588,7 @@ class TestOpenCVCamera(unittest.TestCase):
# Make Event.is_set return False initially, then True after disconnect/stop
self.mock_event_instance.is_set.side_effect = [False, True]
# Start the thread
_img = cam.async_read()
self.assertIsNotNone(cam.thread)
@@ -428,6 +600,9 @@ class TestOpenCVCamera(unittest.TestCase):
self.mock_event_instance.set.assert_called_once()
self.mock_thread_instance.join.assert_called_once()
# Check camera release
self.mock_capture_instance.release.assert_called_once()
# Check state reset
self.assertIsNone(cam.camera)
self.assertFalse(cam.is_connected)