diff --git a/lerobot/common/robot_devices/cameras/opencv.py b/lerobot/common/robot_devices/cameras/opencv.py index a27fec18c..661fae6c0 100644 --- a/lerobot/common/robot_devices/cameras/opencv.py +++ b/lerobot/common/robot_devices/cameras/opencv.py @@ -219,6 +219,14 @@ class OpenCVCamera: self.camera = None self.is_connected = False self.thread = None + # Using Lock to avoid race condition and segfault when multiple threads + # access the same camera. This is not one of our use case, but we add this + # for safety if users want to use this class with threads. As a result, threads + # will go sequentially through the code logic protected by a lock, instead of + # in parallel. Also, we use Recursive Lock to avoid deadlock by allowing each + # thread to acquire the lock multiple times. + # TODO(rcadene, aliberts): Add RLock on every robot devices where it makes sense? + self.lock = threading.RLock() self.stop_event = None self.color_image = None self.logs = {} @@ -227,21 +235,18 @@ class OpenCVCamera: if self.is_connected: raise RobotDeviceAlreadyConnectedError(f"OpenCVCamera({self.camera_index}) is already connected.") - # First create a temporary camera trying to access `camera_index`, - # and verify it is a valid camera by calling `isOpened`. + camera_idx = f"/dev/video{self.camera_index}" if platform.system() == "Linux" else self.camera_index + with self.lock: + # First create a temporary camera trying to access `camera_index`, + # and verify it is a valid camera by calling `isOpened`. + tmp_camera = cv2.VideoCapture(camera_idx) + is_camera_open = tmp_camera.isOpened() + # Release camera to make it accessible for `find_camera_indices` + tmp_camera.release() + del tmp_camera - if platform.system() == "Linux": - # Linux uses ports for connecting to cameras - tmp_camera = cv2.VideoCapture(f"/dev/video{self.camera_index}") - else: - tmp_camera = cv2.VideoCapture(self.camera_index) - - is_camera_open = tmp_camera.isOpened() - # Release camera to make it accessible for `find_camera_indices` - del tmp_camera - - # If the camera doesn't work, display the camera indices corresponding to - # valid cameras. + # If the camera doesn't work, display the camera indices corresponding to + # valid cameras. if not is_camera_open: # Verify that the provided `camera_index` is valid before printing the traceback available_cam_ids = find_camera_indices() @@ -251,47 +256,45 @@ class OpenCVCamera: "To find the camera index you should use, run `python lerobot/common/robot_devices/cameras/opencv.py`." ) - raise OSError(f"Can't access OpenCVCamera({self.camera_index}).") + raise OSError(f"Can't access OpenCVCamera({camera_idx}).") # Secondly, create the camera that will be used downstream. # Note: For some unknown reason, calling `isOpened` blocks the camera which then # needs to be re-created. - if platform.system() == "Linux": - self.camera = cv2.VideoCapture(f"/dev/video{self.camera_index}") - else: - self.camera = cv2.VideoCapture(self.camera_index) + with self.lock: + self.camera = cv2.VideoCapture(camera_idx) - if self.fps is not None: - self.camera.set(cv2.CAP_PROP_FPS, self.fps) - if self.width is not None: - self.camera.set(cv2.CAP_PROP_FRAME_WIDTH, self.width) - if self.height is not None: - self.camera.set(cv2.CAP_PROP_FRAME_HEIGHT, self.height) + if self.fps is not None: + self.camera.set(cv2.CAP_PROP_FPS, self.fps) + if self.width is not None: + self.camera.set(cv2.CAP_PROP_FRAME_WIDTH, self.width) + if self.height is not None: + self.camera.set(cv2.CAP_PROP_FRAME_HEIGHT, self.height) - actual_fps = self.camera.get(cv2.CAP_PROP_FPS) - actual_width = self.camera.get(cv2.CAP_PROP_FRAME_WIDTH) - actual_height = self.camera.get(cv2.CAP_PROP_FRAME_HEIGHT) + actual_fps = self.camera.get(cv2.CAP_PROP_FPS) + actual_width = self.camera.get(cv2.CAP_PROP_FRAME_WIDTH) + actual_height = self.camera.get(cv2.CAP_PROP_FRAME_HEIGHT) - # Using `math.isclose` since actual fps can be a float (e.g. 29.9 instead of 30) - if self.fps is not None and not math.isclose(self.fps, actual_fps, rel_tol=1e-3): - # Using `OSError` since it's a broad that encompasses issues related to device communication - raise OSError( - f"Can't set {self.fps=} for OpenCVCamera({self.camera_index}). Actual value is {actual_fps}." - ) - if self.width is not None and self.width != actual_width: - raise OSError( - f"Can't set {self.width=} for OpenCVCamera({self.camera_index}). Actual value is {actual_width}." - ) - if self.height is not None and self.height != actual_height: - raise OSError( - f"Can't set {self.height=} for OpenCVCamera({self.camera_index}). Actual value is {actual_height}." - ) + # Using `math.isclose` since actual fps can be a float (e.g. 29.9 instead of 30) + if self.fps is not None and not math.isclose(self.fps, actual_fps, rel_tol=1e-3): + # Using `OSError` since it's a broad that encompasses issues related to device communication + raise OSError( + f"Can't set {self.fps=} for OpenCVCamera({self.camera_index}). Actual value is {actual_fps}." + ) + if self.width is not None and self.width != actual_width: + raise OSError( + f"Can't set {self.width=} for OpenCVCamera({self.camera_index}). Actual value is {actual_width}." + ) + if self.height is not None and self.height != actual_height: + raise OSError( + f"Can't set {self.height=} for OpenCVCamera({self.camera_index}). Actual value is {actual_height}." + ) - self.fps = actual_fps - self.width = actual_width - self.height = actual_height + self.fps = int(actual_fps) + self.width = int(actual_width) + self.height = int(actual_height) - self.is_connected = True + self.is_connected = True def read(self, temporary_color_mode: str | None = None) -> np.ndarray: """Read a frame from the camera returned in the format (height, width, channels) @@ -307,7 +310,9 @@ class OpenCVCamera: start_time = time.perf_counter() - ret, color_image = self.camera.read() + with self.lock: + ret, color_image = self.camera.read() + if not ret: raise OSError(f"Can't capture color image from camera {self.camera_index}.") @@ -336,11 +341,17 @@ class OpenCVCamera: # log the utc time at which the image was received self.logs["timestamp_utc"] = capture_timestamp_utc() + with self.lock: + self.color_image = color_image + return color_image def read_loop(self): while not self.stop_event.is_set(): - self.color_image = self.read() + try: + self.color_image = self.read() + except Exception as e: + print(f"Error reading in thread: {e}") def async_read(self): if not self.is_connected: @@ -349,21 +360,26 @@ class OpenCVCamera: ) if self.thread is None: - self.stop_event = threading.Event() - self.thread = Thread(target=self.read_loop, args=()) - self.thread.daemon = True - self.thread.start() + with self.lock: + self.stop_event = threading.Event() + self.thread = Thread(target=self.read_loop, args=()) + self.thread.daemon = True + self.thread.start() num_tries = 0 - while self.color_image is None: - num_tries += 1 - time.sleep(1 / self.fps) - if num_tries > self.fps and (self.thread.ident is None or not self.thread.is_alive()): - raise Exception( - "The thread responsible for `self.async_read()` took too much time to start. There might be an issue. Verify that `self.thread.start()` has been called." - ) + while True: + with self.lock: + if self.color_image is not None: + return self.color_image - return self.color_image + time.sleep(1 / self.fps) + num_tries += 1 + if num_tries > self.fps * 2: + raise TimeoutError("Timed out waiting for async_read() to start.") + # if num_tries > self.fps and (self.thread.ident is None or not self.thread.is_alive()): + # raise Exception( + # "The thread responsible for `self.async_read()` took too much time to start. There might be an issue. Verify that `self.thread.start()` has been called." + # ) def disconnect(self): if not self.is_connected: @@ -371,21 +387,22 @@ class OpenCVCamera: f"OpenCVCamera({self.camera_index}) is not connected. Try running `camera.connect()` first." ) - if self.thread is not None and self.thread.is_alive(): - # wait for the thread to finish - self.stop_event.set() - self.thread.join() + if self.thread is not None: + with self.lock: + self.stop_event.set() + self.thread.join() # wait for the thread to finish self.thread = None self.stop_event = None - self.camera.release() - self.camera = None - - self.is_connected = False + with self.lock: + self.camera.release() + self.camera = None + self.is_connected = False def __del__(self): - if getattr(self, "is_connected", False): - self.disconnect() + with self.lock: + if getattr(self, "is_connected", False): + self.disconnect() if __name__ == "__main__": diff --git a/tests/mock_opencv.py b/tests/mock_opencv.py index c54ae285d..8a89c0b5c 100644 --- a/tests/mock_opencv.py +++ b/tests/mock_opencv.py @@ -1,28 +1,35 @@ +from functools import cache + import cv2 import numpy as np -class MockVideoCapture(cv2.VideoCapture): - image = { - "480x640": np.random.randint(0, 256, size=(480, 640, 3), dtype=np.uint8), - "720x1280": np.random.randint(0, 256, size=(720, 1280, 3), dtype=np.uint8), - } +@cache +def _generate_image(width: int, height: int): + return np.random.randint(0, 256, size=(height, width, 3), dtype=np.uint8) + +class MockVideoCapture: def __init__(self, *args, **kwargs): self._mock_dict = { cv2.CAP_PROP_FPS: 30, cv2.CAP_PROP_FRAME_WIDTH: 640, cv2.CAP_PROP_FRAME_HEIGHT: 480, } + self._is_opened = True def isOpened(self): # noqa: N802 - return True + return self._is_opened def set(self, propId: int, value: float) -> bool: # noqa: N803 + if not self._is_opened: + raise RuntimeError("Camera is not opened") self._mock_dict[propId] = value return True def get(self, propId: int) -> float: # noqa: N803 + if not self._is_opened: + raise RuntimeError("Camera is not opened") value = self._mock_dict[propId] if value == 0: if propId == cv2.CAP_PROP_FRAME_HEIGHT: @@ -32,10 +39,16 @@ class MockVideoCapture(cv2.VideoCapture): return value def read(self): + if not self._is_opened: + raise RuntimeError("Camera is not opened") h = self.get(cv2.CAP_PROP_FRAME_HEIGHT) w = self.get(cv2.CAP_PROP_FRAME_WIDTH) ret = True - return ret, self.image[f"{h}x{w}"] + return ret, _generate_image(width=w, height=h) def release(self): - pass + self._is_opened = False + + def __del__(self): + if self._is_opened: + self.release() diff --git a/tests/test_cameras.py b/tests/test_cameras.py index 1ddbbd076..a5d30f6ca 100644 --- a/tests/test_cameras.py +++ b/tests/test_cameras.py @@ -116,7 +116,9 @@ def test_camera(request, camera_type, mock): compute_max_pixel_difference(color_image, async_color_image), ) # TODO(rcadene): properly set `rtol` - assert np.allclose(color_image, async_color_image, rtol=1e-5, atol=MAX_PIXEL_DIFFERENCE), error_msg + np.testing.assert_allclose( + color_image, async_color_image, rtol=1e-5, atol=MAX_PIXEL_DIFFERENCE, err_msg=error_msg + ) # Test disconnecting camera.disconnect() @@ -133,7 +135,9 @@ def test_camera(request, camera_type, mock): camera.connect() assert camera.color_mode == "bgr" bgr_color_image = camera.read() - assert np.allclose(color_image, bgr_color_image[:, :, [2, 1, 0]], rtol=1e-5, atol=MAX_PIXEL_DIFFERENCE) + np.testing.assert_allclose( + color_image, bgr_color_image[:, :, [2, 1, 0]], rtol=1e-5, atol=MAX_PIXEL_DIFFERENCE, err_msg=error_msg + ) del camera # TODO(rcadene): Add a test for a camera that doesnt support fps=60 and raises an OSError