diff --git a/lerobot/common/cameras/opencv/camera_opencv.py b/lerobot/common/cameras/opencv/camera_opencv.py index d1b08743..ddc6869c 100644 --- a/lerobot/common/cameras/opencv/camera_opencv.py +++ b/lerobot/common/cameras/opencv/camera_opencv.py @@ -18,14 +18,16 @@ This file contains utilities for recording frames from cameras. For more info lo import argparse import concurrent.futures +import logging import math import platform import shutil -import threading import time from pathlib import Path -from threading import Thread +from threading import Event, Thread +from typing import TypeAlias +import cv2 import numpy as np from PIL import Image @@ -36,8 +38,11 @@ from lerobot.common.utils.robot_utils import ( from lerobot.common.utils.utils import capture_timestamp_utc from ..camera import Camera +from ..utils import get_cv2_backend, get_cv2_rotation from .configuration_opencv import OpenCVCameraConfig +IndexOrPath: TypeAlias = int | Path + # The maximum opencv device index depends on your operating system. For instance, # if you have 3 cameras, they should be associated to index 0, 1, and 2. This is the case # on MacOS. However, on Ubuntu, the indices are different like 6, 16, 23. @@ -45,63 +50,7 @@ from .configuration_opencv import OpenCVCameraConfig # treat the same cameras as new devices. Thus we select a higher bound to search indices. MAX_OPENCV_INDEX = 60 - -def find_cameras(raise_when_empty=False, max_index_search_range=MAX_OPENCV_INDEX, mock=False) -> list[dict]: - cameras = [] - if platform.system() == "Linux": - print("Linux detected. Finding available camera indices through scanning '/dev/video*' ports") - possible_ports = [str(port) for port in Path("/dev").glob("video*")] - ports = _find_cameras(possible_ports, mock=mock) - for port in ports: - cameras.append( - { - "port": port, - "index": int(port.removeprefix("/dev/video")), - } - ) - else: - print( - "Mac or Windows detected. Finding available camera indices through " - f"scanning all indices from 0 to {MAX_OPENCV_INDEX}" - ) - possible_indices = range(max_index_search_range) - indices = _find_cameras(possible_indices, mock=mock) - for index in indices: - cameras.append( - { - "port": None, - "index": index, - } - ) - - return cameras - - -def _find_cameras( - possible_camera_ids: list[int | str], raise_when_empty=False, mock=False -) -> list[int | str]: - if mock: - import tests.cameras.mock_cv2 as cv2 - else: - import cv2 - - camera_ids = [] - for camera_idx in possible_camera_ids: - camera = cv2.VideoCapture(camera_idx) - is_open = camera.isOpened() - camera.release() - - if is_open: - print(f"Camera found at index {camera_idx}") - camera_ids.append(camera_idx) - - if raise_when_empty and len(camera_ids) == 0: - raise OSError( - "Not a single camera was detected. Try re-plugging, or re-installing `opencv2`, " - "or your camera driver, or make sure your camera is compatible with opencv2." - ) - - return camera_ids +logger = logging.getLogger(__name__) def is_valid_unix_path(path: str) -> bool: @@ -114,7 +63,7 @@ def get_camera_index_from_unix_port(port: Path) -> int: return int(str(port.resolve()).removeprefix("/dev/video")) -def save_image(img_array, camera_index, frame_index, images_dir): +def save_image(img_array: np.ndarray, camera_index: int, frame_index: int, images_dir: Path): img = Image.fromarray(img_array) path = images_dir / f"camera_{camera_index:02d}_frame_{frame_index:06d}.png" path.parent.mkdir(parents=True, exist_ok=True) @@ -123,29 +72,32 @@ def save_image(img_array, camera_index, frame_index, images_dir): def save_images_from_cameras( images_dir: Path, - camera_ids: list | None = None, - fps=None, - width=None, - height=None, - record_time_s=2, - mock=False, + camera_idx_or_paths: list[IndexOrPath] | None = None, + fps: int | None = None, + width: int | None = None, + height: int | None = None, + record_time_s: int = 2, ): """ Initializes all the cameras and saves images to the directory. Useful to visually identify the camera associated to a given camera index. """ - if camera_ids is None or len(camera_ids) == 0: - camera_infos = find_cameras(mock=mock) - camera_ids = [cam["index"] for cam in camera_infos] + if not camera_idx_or_paths: + camera_idx_or_paths = OpenCVCamera.find_cameras() + if len(camera_idx_or_paths) == 0: + raise RuntimeError( + "Not a single camera was detected. Try re-plugging, or re-installing `opencv-python`, " + "or your camera driver, or make sure your camera is compatible with opencv." + ) print("Connecting cameras") cameras = [] - for cam_idx in camera_ids: - config = OpenCVCameraConfig(camera_index=cam_idx, fps=fps, width=width, height=height, mock=mock) + for idx_or_path in camera_idx_or_paths: + config = OpenCVCameraConfig(index_or_path=idx_or_path, fps=fps, width=width, height=height) camera = OpenCVCamera(config) camera.connect() print( - f"OpenCVCamera({camera.camera_index}, fps={camera.fps}, width={camera.capture_width}, " + f"OpenCVCamera({camera.index_or_path}, fps={camera.fps}, width={camera.capture_width}, " f"height={camera.capture_height}, color_mode={camera.color_mode})" ) cameras.append(camera) @@ -212,7 +164,7 @@ class OpenCVCamera(Camera): ```python from lerobot.common.robot_devices.cameras.configs import OpenCVCameraConfig - config = OpenCVCameraConfig(camera_index=0) + config = OpenCVCameraConfig(index_or_path=0) camera = OpenCVCamera(config) camera.connect() color_image = camera.read() @@ -222,28 +174,16 @@ class OpenCVCamera(Camera): Example of changing default fps, width, height and color_mode: ```python - config = OpenCVCameraConfig(camera_index=0, fps=30, width=1280, height=720) - config = OpenCVCameraConfig(camera_index=0, fps=90, width=640, height=480) - config = OpenCVCameraConfig(camera_index=0, fps=90, width=640, height=480, color_mode="bgr") + config = OpenCVCameraConfig(index_or_path=0, fps=30, width=1280, height=720) + config = OpenCVCameraConfig(index_or_path=0, fps=90, width=640, height=480) + config = OpenCVCameraConfig(index_or_path=0, fps=90, width=640, height=480, color_mode="bgr") # Note: might error out open `camera.connect()` if these settings are not compatible with the camera ``` """ def __init__(self, config: OpenCVCameraConfig): self.config = config - self.camera_index = config.camera_index - self.port = None - - # Linux uses ports for connecting to cameras - if platform.system() == "Linux": - if isinstance(self.camera_index, int): - self.port = Path(f"/dev/video{self.camera_index}") - elif isinstance(self.camera_index, str) and is_valid_unix_path(self.camera_index): - self.port = Path(self.camera_index) - # Retrieve the camera index from a potentially symlinked path - self.camera_index = get_camera_index_from_unix_port(self.port) - else: - raise ValueError(f"Please check the provided camera_index: {self.camera_index}") + self.index_or_path = config.index_or_path # Store the raw (capture) resolution from the config. self.capture_width = config.width @@ -260,113 +200,97 @@ class OpenCVCamera(Camera): self.fps = config.fps self.channels = config.channels self.color_mode = config.color_mode - self.mock = config.mock - self.camera = None - self.is_connected = False - self.thread = None - self.stop_event = None + self.camera: cv2.VideoCapture | None = None + self.thread: Thread | None = None + self.stop_event: Event | None = None self.color_image = None self.logs = {} - if self.mock: - import tests.cameras.mock_cv2 as cv2 - else: - import cv2 + self.rotation = get_cv2_rotation(config.rotation) + self.backend = get_cv2_backend() - self.rotation = None - if config.rotation == -90: - self.rotation = cv2.ROTATE_90_COUNTERCLOCKWISE - elif config.rotation == 90: - self.rotation = cv2.ROTATE_90_CLOCKWISE - elif config.rotation == 180: - self.rotation = cv2.ROTATE_180 + def __str__(self) -> str: + return f"{self.__class__.__name__}({self.index_or_path})" + + @property + def is_connected(self) -> bool: + return self.camera.isOpened() if isinstance(self.camera, cv2.VideoCapture) else False def connect(self): if self.is_connected: - raise DeviceAlreadyConnectedError(f"OpenCVCamera({self.camera_index}) is already connected.") + raise DeviceAlreadyConnectedError(f"{self} is already connected.") - if self.mock: - import tests.cameras.mock_cv2 as cv2 - else: - import cv2 + # Use 1 thread to avoid blocking the main thread. Especially useful during data collection + # when other threads are used to save the images. + cv2.setNumThreads(1) - # Use 1 thread to avoid blocking the main thread. Especially useful during data collection - # when other threads are used to save the images. - cv2.setNumThreads(1) + self.camera = cv2.VideoCapture(self.index_or_path, self.backend) - backend = ( - cv2.CAP_V4L2 - if platform.system() == "Linux" - else cv2.CAP_DSHOW - if platform.system() == "Windows" - else cv2.CAP_AVFOUNDATION - if platform.system() == "Darwin" - else cv2.CAP_ANY - ) - - camera_idx = f"/dev/video{self.camera_index}" if platform.system() == "Linux" else self.camera_index - # First create a temporary camera trying to access `camera_index`, - # and verify it is a valid camera by calling `isOpened`. - tmp_camera = cv2.VideoCapture(camera_idx, backend) - is_camera_open = tmp_camera.isOpened() - # Release camera to make it accessible for `find_camera_indices` - tmp_camera.release() - del tmp_camera - - # If the camera doesn't work, display the camera indices corresponding to - # valid cameras. - if not is_camera_open: + # If the camera doesn't work, display the camera indices corresponding to valid cameras. + if not self.camera.isOpened(): + # Release camera to make it accessible for `find_camera_indices` + self.camera.release() # Verify that the provided `camera_index` is valid before printing the traceback - cameras_info = find_cameras() + cameras_info = self.find_cameras() available_cam_ids = [cam["index"] for cam in cameras_info] - if self.camera_index not in available_cam_ids: + if self.index_or_path not in available_cam_ids: raise ValueError( - f"`camera_index` is expected to be one of these available cameras {available_cam_ids}, but {self.camera_index} is provided instead. " - "To find the camera index you should use, run `python lerobot/common/robot_devices/cameras/opencv.py`." + f"`camera_index` is expected to be one of these available cameras {available_cam_ids}, " + f"but {self.index_or_path} is provided instead. To find the camera index you should use, " + "run `python lerobot/common/robot_devices/cameras/opencv.py`." ) - raise OSError(f"Can't access OpenCVCamera({camera_idx}).") - - # Secondly, create the camera that will be used downstream. - # Note: For some unknown reason, calling `isOpened` blocks the camera which then - # needs to be re-created. - self.camera = cv2.VideoCapture(camera_idx, backend) + raise ConnectionError(f"Can't access {self}.") if self.fps is not None: - self.camera.set(cv2.CAP_PROP_FPS, self.fps) + self._set_fps(self.fps) if self.capture_width is not None: - self.camera.set(cv2.CAP_PROP_FRAME_WIDTH, self.capture_width) + self._set_capture_width(self.capture_width) if self.capture_height is not None: - self.camera.set(cv2.CAP_PROP_FRAME_HEIGHT, self.capture_height) + self._set_capture_height(self.capture_height) + def _set_fps(self, fps: int) -> None: + self.camera.set(cv2.CAP_PROP_FPS, fps) actual_fps = self.camera.get(cv2.CAP_PROP_FPS) - actual_width = self.camera.get(cv2.CAP_PROP_FRAME_WIDTH) - actual_height = self.camera.get(cv2.CAP_PROP_FRAME_HEIGHT) - # Using `math.isclose` since actual fps can be a float (e.g. 29.9 instead of 30) - if self.fps is not None and not math.isclose(self.fps, actual_fps, rel_tol=1e-3): - # Using `OSError` since it's a broad that encompasses issues related to device communication - raise OSError( - f"Can't set {self.fps=} for OpenCVCamera({self.camera_index}). Actual value is {actual_fps}." - ) - if self.capture_width is not None and not math.isclose( - self.capture_width, actual_width, rel_tol=1e-3 - ): - raise OSError( - f"Can't set {self.capture_width=} for OpenCVCamera({self.camera_index}). Actual value is {actual_width}." - ) - if self.capture_height is not None and not math.isclose( - self.capture_height, actual_height, rel_tol=1e-3 - ): - raise OSError( - f"Can't set {self.capture_height=} for OpenCVCamera({self.camera_index}). Actual value is {actual_height}." - ) + if not math.isclose(fps, actual_fps, rel_tol=1e-3): + raise RuntimeError(f"Can't set {fps=} for {self}. Actual value is {actual_fps}.") - self.fps = round(actual_fps) - self.capture_width = round(actual_width) - self.capture_height = round(actual_height) - self.is_connected = True + def _set_capture_width(self, capture_width: int) -> None: + self.camera.set(cv2.CAP_PROP_FRAME_WIDTH, capture_width) + actual_width = self.camera.get(cv2.CAP_PROP_FRAME_WIDTH) + if not math.isclose(self.capture_width, actual_width, rel_tol=1e-3): + raise RuntimeError(f"Can't set {capture_width=} for {self}. Actual value is {actual_width}.") + + def _set_capture_height(self, capture_height: int) -> None: + self.camera.set(cv2.CAP_PROP_FRAME_HEIGHT, capture_height) + actual_height = self.camera.get(cv2.CAP_PROP_FRAME_HEIGHT) + if not math.isclose(self.capture_height, actual_height, rel_tol=1e-3): + raise RuntimeError(f"Can't set {capture_height=} for {self}. Actual value is {actual_height}.") + + @staticmethod + def find_cameras(max_index_search_range=MAX_OPENCV_INDEX) -> list[IndexOrPath]: + if platform.system() == "Linux": + print("Linux detected. Finding available camera indices through scanning '/dev/video*' ports") + possible_idx_or_paths = [str(port) for port in Path("/dev").glob("video*")] + else: + print( + f"{platform.system()} system detected. Finding available camera indices through " + f"scanning all indices from 0 to {MAX_OPENCV_INDEX}" + ) + possible_idx_or_paths = range(max_index_search_range) + + found_idx_or_paths = [] + for target in possible_idx_or_paths: + camera = cv2.VideoCapture(target) + is_open = camera.isOpened() + camera.release() + if is_open: + print(f"Camera found at {target}") + found_idx_or_paths.append(target) + + return found_idx_or_paths def read(self, temporary_color_mode: str | None = None) -> np.ndarray: """Read a frame from the camera returned in the format (height, width, channels) @@ -376,17 +300,23 @@ class OpenCVCamera(Camera): If you are reading data from other sensors, we advise to use `camera.async_read()` which is non blocking version of `camera.read()`. """ if not self.is_connected: - raise DeviceNotConnectedError( - f"OpenCVCamera({self.camera_index}) is not connected. Try running `camera.connect()` first." - ) + raise DeviceNotConnectedError(f"{self} is not connected.") - start_time = time.perf_counter() + start = time.perf_counter() ret, color_image = self.camera.read() - if not ret: - raise OSError(f"Can't capture color image from camera {self.camera_index}.") + raise RuntimeError(f"Can't capture color image from {self}.") + self.color_image = self._postprocess_image(color_image, temporary_color_mode) + + dt_ms = (time.perf_counter() - start) * 1e3 + logger.debug(f"{self} read: {dt_ms:.1f}ms") + + # log the utc time at which the image was received + self.logs["timestamp_utc"] = capture_timestamp_utc() + + def _postprocess_image(self, image, temporary_color_mode: str | None = None): requested_color_mode = self.color_mode if temporary_color_mode is None else temporary_color_mode if requested_color_mode not in ["rgb", "bgr"]: @@ -398,33 +328,21 @@ class OpenCVCamera(Camera): # However, Deep Learning framework such as LeRobot uses RGB format as default to train neural networks, # so we convert the image color from BGR to RGB. if requested_color_mode == "rgb": - if self.mock: - import tests.cameras.mock_cv2 as cv2 - else: - import cv2 - - color_image = cv2.cvtColor(color_image, cv2.COLOR_BGR2RGB) + color_image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) h, w, _ = color_image.shape if h != self.capture_height or w != self.capture_width: - raise OSError( - f"Can't capture color image with expected height and width ({self.height} x {self.width}). ({h} x {w}) returned instead." + raise RuntimeError( + f"Can't capture color image with expected height and width ({self.height} x {self.width}). " + f"({h} x {w}) returned instead." ) if self.rotation is not None: color_image = cv2.rotate(color_image, self.rotation) - # log the number of seconds it took to read the image - self.logs["delta_timestamp_s"] = time.perf_counter() - start_time - - # log the utc time at which the image was received - self.logs["timestamp_utc"] = capture_timestamp_utc() - - self.color_image = color_image - return color_image - def read_loop(self): + def _read_loop(self): while not self.stop_event.is_set(): try: self.color_image = self.read() @@ -433,13 +351,11 @@ class OpenCVCamera(Camera): def async_read(self): if not self.is_connected: - raise DeviceNotConnectedError( - f"OpenCVCamera({self.camera_index}) is not connected. Try running `camera.connect()` first." - ) + raise DeviceNotConnectedError(f"{self} is not connected.") if self.thread is None: - self.stop_event = threading.Event() - self.thread = Thread(target=self.read_loop, args=()) + self.stop_event = Event() + self.thread = Thread(target=self._read_loop, args=()) self.thread.daemon = True self.thread.start() @@ -455,9 +371,7 @@ class OpenCVCamera(Camera): def disconnect(self): if not self.is_connected: - raise DeviceNotConnectedError( - f"OpenCVCamera({self.camera_index}) is not connected. Try running `camera.connect()` first." - ) + raise DeviceNotConnectedError(f"{self} is not connected.") if self.thread is not None: self.stop_event.set() @@ -467,11 +381,6 @@ class OpenCVCamera(Camera): self.camera.release() self.camera = None - self.is_connected = False - - def __del__(self): - if getattr(self, "is_connected", False): - self.disconnect() if __name__ == "__main__": diff --git a/lerobot/common/cameras/utils.py b/lerobot/common/cameras/utils.py index f0b6ce5f..e77038dc 100644 --- a/lerobot/common/cameras/utils.py +++ b/lerobot/common/cameras/utils.py @@ -1,3 +1,5 @@ +import platform + from .camera import Camera from .configs import CameraConfig @@ -19,3 +21,23 @@ def make_cameras_from_configs(camera_configs: dict[str, CameraConfig]) -> dict[s raise ValueError(f"The motor type '{cfg.type}' is not valid.") return cameras + + +def get_cv2_rotation(rotation: int) -> int: + import cv2 + + return { + -90: cv2.ROTATE_90_COUNTERCLOCKWISE, + 90: cv2.ROTATE_90_CLOCKWISE, + 180: cv2.ROTATE_180, + }.get(rotation) + + +def get_cv2_backend() -> int: + import cv2 + + return { + "Linux": cv2.CAP_DSHOW, + "Windows": cv2.CAP_AVFOUNDATION, + "Darwin": cv2.CAP_ANY, + }.get(platform.system(), cv2.CAP_V4L2)