refactor(cameras): opencv camera init

This commit is contained in:
Simon Alibert
2025-04-21 16:14:06 +02:00
committed by Steven Palma
parent 8a6412b0db
commit 0b5b438f50
2 changed files with 139 additions and 208 deletions

View File

@@ -18,14 +18,16 @@ This file contains utilities for recording frames from cameras. For more info lo
import argparse
import concurrent.futures
import logging
import math
import platform
import shutil
import threading
import time
from pathlib import Path
from threading import Thread
from threading import Event, Thread
from typing import TypeAlias
import cv2
import numpy as np
from PIL import Image
@@ -36,8 +38,11 @@ from lerobot.common.utils.robot_utils import (
from lerobot.common.utils.utils import capture_timestamp_utc
from ..camera import Camera
from ..utils import get_cv2_backend, get_cv2_rotation
from .configuration_opencv import OpenCVCameraConfig
IndexOrPath: TypeAlias = int | Path
# The maximum opencv device index depends on your operating system. For instance,
# if you have 3 cameras, they should be associated to index 0, 1, and 2. This is the case
# on MacOS. However, on Ubuntu, the indices are different like 6, 16, 23.
@@ -45,63 +50,7 @@ from .configuration_opencv import OpenCVCameraConfig
# treat the same cameras as new devices. Thus we select a higher bound to search indices.
MAX_OPENCV_INDEX = 60
def find_cameras(raise_when_empty=False, max_index_search_range=MAX_OPENCV_INDEX, mock=False) -> list[dict]:
cameras = []
if platform.system() == "Linux":
print("Linux detected. Finding available camera indices through scanning '/dev/video*' ports")
possible_ports = [str(port) for port in Path("/dev").glob("video*")]
ports = _find_cameras(possible_ports, mock=mock)
for port in ports:
cameras.append(
{
"port": port,
"index": int(port.removeprefix("/dev/video")),
}
)
else:
print(
"Mac or Windows detected. Finding available camera indices through "
f"scanning all indices from 0 to {MAX_OPENCV_INDEX}"
)
possible_indices = range(max_index_search_range)
indices = _find_cameras(possible_indices, mock=mock)
for index in indices:
cameras.append(
{
"port": None,
"index": index,
}
)
return cameras
def _find_cameras(
possible_camera_ids: list[int | str], raise_when_empty=False, mock=False
) -> list[int | str]:
if mock:
import tests.cameras.mock_cv2 as cv2
else:
import cv2
camera_ids = []
for camera_idx in possible_camera_ids:
camera = cv2.VideoCapture(camera_idx)
is_open = camera.isOpened()
camera.release()
if is_open:
print(f"Camera found at index {camera_idx}")
camera_ids.append(camera_idx)
if raise_when_empty and len(camera_ids) == 0:
raise OSError(
"Not a single camera was detected. Try re-plugging, or re-installing `opencv2`, "
"or your camera driver, or make sure your camera is compatible with opencv2."
)
return camera_ids
logger = logging.getLogger(__name__)
def is_valid_unix_path(path: str) -> bool:
@@ -114,7 +63,7 @@ def get_camera_index_from_unix_port(port: Path) -> int:
return int(str(port.resolve()).removeprefix("/dev/video"))
def save_image(img_array, camera_index, frame_index, images_dir):
def save_image(img_array: np.ndarray, camera_index: int, frame_index: int, images_dir: Path):
img = Image.fromarray(img_array)
path = images_dir / f"camera_{camera_index:02d}_frame_{frame_index:06d}.png"
path.parent.mkdir(parents=True, exist_ok=True)
@@ -123,29 +72,32 @@ def save_image(img_array, camera_index, frame_index, images_dir):
def save_images_from_cameras(
images_dir: Path,
camera_ids: list | None = None,
fps=None,
width=None,
height=None,
record_time_s=2,
mock=False,
camera_idx_or_paths: list[IndexOrPath] | None = None,
fps: int | None = None,
width: int | None = None,
height: int | None = None,
record_time_s: int = 2,
):
"""
Initializes all the cameras and saves images to the directory. Useful to visually identify the camera
associated to a given camera index.
"""
if camera_ids is None or len(camera_ids) == 0:
camera_infos = find_cameras(mock=mock)
camera_ids = [cam["index"] for cam in camera_infos]
if not camera_idx_or_paths:
camera_idx_or_paths = OpenCVCamera.find_cameras()
if len(camera_idx_or_paths) == 0:
raise RuntimeError(
"Not a single camera was detected. Try re-plugging, or re-installing `opencv-python`, "
"or your camera driver, or make sure your camera is compatible with opencv."
)
print("Connecting cameras")
cameras = []
for cam_idx in camera_ids:
config = OpenCVCameraConfig(camera_index=cam_idx, fps=fps, width=width, height=height, mock=mock)
for idx_or_path in camera_idx_or_paths:
config = OpenCVCameraConfig(index_or_path=idx_or_path, fps=fps, width=width, height=height)
camera = OpenCVCamera(config)
camera.connect()
print(
f"OpenCVCamera({camera.camera_index}, fps={camera.fps}, width={camera.capture_width}, "
f"OpenCVCamera({camera.index_or_path}, fps={camera.fps}, width={camera.capture_width}, "
f"height={camera.capture_height}, color_mode={camera.color_mode})"
)
cameras.append(camera)
@@ -212,7 +164,7 @@ class OpenCVCamera(Camera):
```python
from lerobot.common.robot_devices.cameras.configs import OpenCVCameraConfig
config = OpenCVCameraConfig(camera_index=0)
config = OpenCVCameraConfig(index_or_path=0)
camera = OpenCVCamera(config)
camera.connect()
color_image = camera.read()
@@ -222,28 +174,16 @@ class OpenCVCamera(Camera):
Example of changing default fps, width, height and color_mode:
```python
config = OpenCVCameraConfig(camera_index=0, fps=30, width=1280, height=720)
config = OpenCVCameraConfig(camera_index=0, fps=90, width=640, height=480)
config = OpenCVCameraConfig(camera_index=0, fps=90, width=640, height=480, color_mode="bgr")
config = OpenCVCameraConfig(index_or_path=0, fps=30, width=1280, height=720)
config = OpenCVCameraConfig(index_or_path=0, fps=90, width=640, height=480)
config = OpenCVCameraConfig(index_or_path=0, fps=90, width=640, height=480, color_mode="bgr")
# Note: might error out open `camera.connect()` if these settings are not compatible with the camera
```
"""
def __init__(self, config: OpenCVCameraConfig):
self.config = config
self.camera_index = config.camera_index
self.port = None
# Linux uses ports for connecting to cameras
if platform.system() == "Linux":
if isinstance(self.camera_index, int):
self.port = Path(f"/dev/video{self.camera_index}")
elif isinstance(self.camera_index, str) and is_valid_unix_path(self.camera_index):
self.port = Path(self.camera_index)
# Retrieve the camera index from a potentially symlinked path
self.camera_index = get_camera_index_from_unix_port(self.port)
else:
raise ValueError(f"Please check the provided camera_index: {self.camera_index}")
self.index_or_path = config.index_or_path
# Store the raw (capture) resolution from the config.
self.capture_width = config.width
@@ -260,113 +200,97 @@ class OpenCVCamera(Camera):
self.fps = config.fps
self.channels = config.channels
self.color_mode = config.color_mode
self.mock = config.mock
self.camera = None
self.is_connected = False
self.thread = None
self.stop_event = None
self.camera: cv2.VideoCapture | None = None
self.thread: Thread | None = None
self.stop_event: Event | None = None
self.color_image = None
self.logs = {}
if self.mock:
import tests.cameras.mock_cv2 as cv2
else:
import cv2
self.rotation = get_cv2_rotation(config.rotation)
self.backend = get_cv2_backend()
self.rotation = None
if config.rotation == -90:
self.rotation = cv2.ROTATE_90_COUNTERCLOCKWISE
elif config.rotation == 90:
self.rotation = cv2.ROTATE_90_CLOCKWISE
elif config.rotation == 180:
self.rotation = cv2.ROTATE_180
def __str__(self) -> str:
return f"{self.__class__.__name__}({self.index_or_path})"
@property
def is_connected(self) -> bool:
return self.camera.isOpened() if isinstance(self.camera, cv2.VideoCapture) else False
def connect(self):
if self.is_connected:
raise DeviceAlreadyConnectedError(f"OpenCVCamera({self.camera_index}) is already connected.")
raise DeviceAlreadyConnectedError(f"{self} is already connected.")
if self.mock:
import tests.cameras.mock_cv2 as cv2
else:
import cv2
# Use 1 thread to avoid blocking the main thread. Especially useful during data collection
# when other threads are used to save the images.
cv2.setNumThreads(1)
# Use 1 thread to avoid blocking the main thread. Especially useful during data collection
# when other threads are used to save the images.
cv2.setNumThreads(1)
self.camera = cv2.VideoCapture(self.index_or_path, self.backend)
backend = (
cv2.CAP_V4L2
if platform.system() == "Linux"
else cv2.CAP_DSHOW
if platform.system() == "Windows"
else cv2.CAP_AVFOUNDATION
if platform.system() == "Darwin"
else cv2.CAP_ANY
)
camera_idx = f"/dev/video{self.camera_index}" if platform.system() == "Linux" else self.camera_index
# First create a temporary camera trying to access `camera_index`,
# and verify it is a valid camera by calling `isOpened`.
tmp_camera = cv2.VideoCapture(camera_idx, backend)
is_camera_open = tmp_camera.isOpened()
# Release camera to make it accessible for `find_camera_indices`
tmp_camera.release()
del tmp_camera
# If the camera doesn't work, display the camera indices corresponding to
# valid cameras.
if not is_camera_open:
# If the camera doesn't work, display the camera indices corresponding to valid cameras.
if not self.camera.isOpened():
# Release camera to make it accessible for `find_camera_indices`
self.camera.release()
# Verify that the provided `camera_index` is valid before printing the traceback
cameras_info = find_cameras()
cameras_info = self.find_cameras()
available_cam_ids = [cam["index"] for cam in cameras_info]
if self.camera_index not in available_cam_ids:
if self.index_or_path not in available_cam_ids:
raise ValueError(
f"`camera_index` is expected to be one of these available cameras {available_cam_ids}, but {self.camera_index} is provided instead. "
"To find the camera index you should use, run `python lerobot/common/robot_devices/cameras/opencv.py`."
f"`camera_index` is expected to be one of these available cameras {available_cam_ids}, "
f"but {self.index_or_path} is provided instead. To find the camera index you should use, "
"run `python lerobot/common/robot_devices/cameras/opencv.py`."
)
raise OSError(f"Can't access OpenCVCamera({camera_idx}).")
# Secondly, create the camera that will be used downstream.
# Note: For some unknown reason, calling `isOpened` blocks the camera which then
# needs to be re-created.
self.camera = cv2.VideoCapture(camera_idx, backend)
raise ConnectionError(f"Can't access {self}.")
if self.fps is not None:
self.camera.set(cv2.CAP_PROP_FPS, self.fps)
self._set_fps(self.fps)
if self.capture_width is not None:
self.camera.set(cv2.CAP_PROP_FRAME_WIDTH, self.capture_width)
self._set_capture_width(self.capture_width)
if self.capture_height is not None:
self.camera.set(cv2.CAP_PROP_FRAME_HEIGHT, self.capture_height)
self._set_capture_height(self.capture_height)
def _set_fps(self, fps: int) -> None:
self.camera.set(cv2.CAP_PROP_FPS, fps)
actual_fps = self.camera.get(cv2.CAP_PROP_FPS)
actual_width = self.camera.get(cv2.CAP_PROP_FRAME_WIDTH)
actual_height = self.camera.get(cv2.CAP_PROP_FRAME_HEIGHT)
# Using `math.isclose` since actual fps can be a float (e.g. 29.9 instead of 30)
if self.fps is not None and not math.isclose(self.fps, actual_fps, rel_tol=1e-3):
# Using `OSError` since it's a broad that encompasses issues related to device communication
raise OSError(
f"Can't set {self.fps=} for OpenCVCamera({self.camera_index}). Actual value is {actual_fps}."
)
if self.capture_width is not None and not math.isclose(
self.capture_width, actual_width, rel_tol=1e-3
):
raise OSError(
f"Can't set {self.capture_width=} for OpenCVCamera({self.camera_index}). Actual value is {actual_width}."
)
if self.capture_height is not None and not math.isclose(
self.capture_height, actual_height, rel_tol=1e-3
):
raise OSError(
f"Can't set {self.capture_height=} for OpenCVCamera({self.camera_index}). Actual value is {actual_height}."
)
if not math.isclose(fps, actual_fps, rel_tol=1e-3):
raise RuntimeError(f"Can't set {fps=} for {self}. Actual value is {actual_fps}.")
self.fps = round(actual_fps)
self.capture_width = round(actual_width)
self.capture_height = round(actual_height)
self.is_connected = True
def _set_capture_width(self, capture_width: int) -> None:
self.camera.set(cv2.CAP_PROP_FRAME_WIDTH, capture_width)
actual_width = self.camera.get(cv2.CAP_PROP_FRAME_WIDTH)
if not math.isclose(self.capture_width, actual_width, rel_tol=1e-3):
raise RuntimeError(f"Can't set {capture_width=} for {self}. Actual value is {actual_width}.")
def _set_capture_height(self, capture_height: int) -> None:
self.camera.set(cv2.CAP_PROP_FRAME_HEIGHT, capture_height)
actual_height = self.camera.get(cv2.CAP_PROP_FRAME_HEIGHT)
if not math.isclose(self.capture_height, actual_height, rel_tol=1e-3):
raise RuntimeError(f"Can't set {capture_height=} for {self}. Actual value is {actual_height}.")
@staticmethod
def find_cameras(max_index_search_range=MAX_OPENCV_INDEX) -> list[IndexOrPath]:
if platform.system() == "Linux":
print("Linux detected. Finding available camera indices through scanning '/dev/video*' ports")
possible_idx_or_paths = [str(port) for port in Path("/dev").glob("video*")]
else:
print(
f"{platform.system()} system detected. Finding available camera indices through "
f"scanning all indices from 0 to {MAX_OPENCV_INDEX}"
)
possible_idx_or_paths = range(max_index_search_range)
found_idx_or_paths = []
for target in possible_idx_or_paths:
camera = cv2.VideoCapture(target)
is_open = camera.isOpened()
camera.release()
if is_open:
print(f"Camera found at {target}")
found_idx_or_paths.append(target)
return found_idx_or_paths
def read(self, temporary_color_mode: str | None = None) -> np.ndarray:
"""Read a frame from the camera returned in the format (height, width, channels)
@@ -376,17 +300,23 @@ class OpenCVCamera(Camera):
If you are reading data from other sensors, we advise to use `camera.async_read()` which is non blocking version of `camera.read()`.
"""
if not self.is_connected:
raise DeviceNotConnectedError(
f"OpenCVCamera({self.camera_index}) is not connected. Try running `camera.connect()` first."
)
raise DeviceNotConnectedError(f"{self} is not connected.")
start_time = time.perf_counter()
start = time.perf_counter()
ret, color_image = self.camera.read()
if not ret:
raise OSError(f"Can't capture color image from camera {self.camera_index}.")
raise RuntimeError(f"Can't capture color image from {self}.")
self.color_image = self._postprocess_image(color_image, temporary_color_mode)
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read: {dt_ms:.1f}ms")
# log the utc time at which the image was received
self.logs["timestamp_utc"] = capture_timestamp_utc()
def _postprocess_image(self, image, temporary_color_mode: str | None = None):
requested_color_mode = self.color_mode if temporary_color_mode is None else temporary_color_mode
if requested_color_mode not in ["rgb", "bgr"]:
@@ -398,33 +328,21 @@ class OpenCVCamera(Camera):
# However, Deep Learning framework such as LeRobot uses RGB format as default to train neural networks,
# so we convert the image color from BGR to RGB.
if requested_color_mode == "rgb":
if self.mock:
import tests.cameras.mock_cv2 as cv2
else:
import cv2
color_image = cv2.cvtColor(color_image, cv2.COLOR_BGR2RGB)
color_image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
h, w, _ = color_image.shape
if h != self.capture_height or w != self.capture_width:
raise OSError(
f"Can't capture color image with expected height and width ({self.height} x {self.width}). ({h} x {w}) returned instead."
raise RuntimeError(
f"Can't capture color image with expected height and width ({self.height} x {self.width}). "
f"({h} x {w}) returned instead."
)
if self.rotation is not None:
color_image = cv2.rotate(color_image, self.rotation)
# log the number of seconds it took to read the image
self.logs["delta_timestamp_s"] = time.perf_counter() - start_time
# log the utc time at which the image was received
self.logs["timestamp_utc"] = capture_timestamp_utc()
self.color_image = color_image
return color_image
def read_loop(self):
def _read_loop(self):
while not self.stop_event.is_set():
try:
self.color_image = self.read()
@@ -433,13 +351,11 @@ class OpenCVCamera(Camera):
def async_read(self):
if not self.is_connected:
raise DeviceNotConnectedError(
f"OpenCVCamera({self.camera_index}) is not connected. Try running `camera.connect()` first."
)
raise DeviceNotConnectedError(f"{self} is not connected.")
if self.thread is None:
self.stop_event = threading.Event()
self.thread = Thread(target=self.read_loop, args=())
self.stop_event = Event()
self.thread = Thread(target=self._read_loop, args=())
self.thread.daemon = True
self.thread.start()
@@ -455,9 +371,7 @@ class OpenCVCamera(Camera):
def disconnect(self):
if not self.is_connected:
raise DeviceNotConnectedError(
f"OpenCVCamera({self.camera_index}) is not connected. Try running `camera.connect()` first."
)
raise DeviceNotConnectedError(f"{self} is not connected.")
if self.thread is not None:
self.stop_event.set()
@@ -467,11 +381,6 @@ class OpenCVCamera(Camera):
self.camera.release()
self.camera = None
self.is_connected = False
def __del__(self):
if getattr(self, "is_connected", False):
self.disconnect()
if __name__ == "__main__":

View File

@@ -1,3 +1,5 @@
import platform
from .camera import Camera
from .configs import CameraConfig
@@ -19,3 +21,23 @@ def make_cameras_from_configs(camera_configs: dict[str, CameraConfig]) -> dict[s
raise ValueError(f"The motor type '{cfg.type}' is not valid.")
return cameras
def get_cv2_rotation(rotation: int) -> int:
import cv2
return {
-90: cv2.ROTATE_90_COUNTERCLOCKWISE,
90: cv2.ROTATE_90_CLOCKWISE,
180: cv2.ROTATE_180,
}.get(rotation)
def get_cv2_backend() -> int:
import cv2
return {
"Linux": cv2.CAP_DSHOW,
"Windows": cv2.CAP_AVFOUNDATION,
"Darwin": cv2.CAP_ANY,
}.get(platform.system(), cv2.CAP_V4L2)