From 6348f0f418feb481cc48aa3202dadc4293346ab7 Mon Sep 17 00:00:00 2001 From: Steven Palma Date: Mon, 28 Apr 2025 15:05:41 +0200 Subject: [PATCH] refactor(cameras): improvements opencv cam v0.1 --- lerobot/common/cameras/camera.py | 26 +- lerobot/common/cameras/configs.py | 27 +- .../common/cameras/opencv/camera_opencv.py | 725 ++++++++++-------- .../cameras/opencv/configuration_opencv.py | 23 +- lerobot/common/cameras/utils.py | 162 +++- lerobot/common/utils/control_utils.py | 1 + lerobot/find_cameras.py | 1 + tests/cameras/test_opencv.py | 13 + 8 files changed, 658 insertions(+), 320 deletions(-) create mode 100644 lerobot/find_cameras.py create mode 100644 tests/cameras/test_opencv.py diff --git a/lerobot/common/cameras/camera.py b/lerobot/common/cameras/camera.py index a5198b99e..1814c4607 100644 --- a/lerobot/common/cameras/camera.py +++ b/lerobot/common/cameras/camera.py @@ -1,10 +1,30 @@ +#!/usr/bin/env python + +# Copyright 2024 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + import abc import numpy as np +from .configs import ColorMode + +# NOTE(Steven): Consider something like configure() if makes sense for both cameras class Camera(abc.ABC): - @abc.abstractproperty + @property + @abc.abstractmethod def is_connected(self) -> bool: pass @@ -13,11 +33,11 @@ class Camera(abc.ABC): pass @abc.abstractmethod - def read(self, temporary_color: str | None = None) -> np.ndarray: + def read(self, color_mode: ColorMode | None = None) -> np.ndarray: pass @abc.abstractmethod - def async_read(self) -> np.ndarray: + def async_read(self, timeout_ms: float = 2000) -> np.ndarray: pass @abc.abstractmethod diff --git a/lerobot/common/cameras/configs.py b/lerobot/common/cameras/configs.py index 15fe4334f..3ad767755 100644 --- a/lerobot/common/cameras/configs.py +++ b/lerobot/common/cameras/configs.py @@ -1,3 +1,19 @@ +#!/usr/bin/env python + +# Copyright 2024 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + import abc from dataclasses import dataclass from enum import Enum @@ -6,8 +22,15 @@ import draccus class ColorMode(Enum): - RGB = 0 - BGR = 1 + RGB = "rgb" + BGR = "bgr" + + +class Cv2Rotation(Enum): + NO_ROTATION = 0 + ROTATE_90 = 90 + ROTATE_180 = 180 + ROTATE_270 = -90 @dataclass diff --git a/lerobot/common/cameras/opencv/camera_opencv.py b/lerobot/common/cameras/opencv/camera_opencv.py index ddc6869ce..824c5ca35 100644 --- a/lerobot/common/cameras/opencv/camera_opencv.py +++ b/lerobot/common/cameras/opencv/camera_opencv.py @@ -13,35 +13,27 @@ # limitations under the License. """ -This file contains utilities for recording frames from cameras. For more info look at `OpenCVCamera` docstring. +Provides the OpenCVCamera class for capturing frames from cameras using OpenCV. """ -import argparse -import concurrent.futures +import contextlib import logging import math import platform -import shutil +import queue import time from pathlib import Path from threading import Event, Thread -from typing import TypeAlias import cv2 import numpy as np -from PIL import Image from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError -from lerobot.common.utils.robot_utils import ( - busy_wait, -) from lerobot.common.utils.utils import capture_timestamp_utc from ..camera import Camera -from ..utils import get_cv2_backend, get_cv2_rotation -from .configuration_opencv import OpenCVCameraConfig - -IndexOrPath: TypeAlias = int | Path +from ..utils import IndexOrPath, get_cv2_backend, get_cv2_rotation +from .configuration_opencv import ColorMode, OpenCVCameraConfig # The maximum opencv device index depends on your operating system. For instance, # if you have 3 cameras, they should be associated to index 0, 1, and 2. This is the case @@ -53,376 +45,505 @@ MAX_OPENCV_INDEX = 60 logger = logging.getLogger(__name__) -def is_valid_unix_path(path: str) -> bool: - """Note: if 'path' points to a symlink, this will return True only if the target exists""" - p = Path(path) - return p.is_absolute() and p.exists() - - -def get_camera_index_from_unix_port(port: Path) -> int: - return int(str(port.resolve()).removeprefix("/dev/video")) - - -def save_image(img_array: np.ndarray, camera_index: int, frame_index: int, images_dir: Path): - img = Image.fromarray(img_array) - path = images_dir / f"camera_{camera_index:02d}_frame_{frame_index:06d}.png" - path.parent.mkdir(parents=True, exist_ok=True) - img.save(str(path), quality=100) - - -def save_images_from_cameras( - images_dir: Path, - camera_idx_or_paths: list[IndexOrPath] | None = None, - fps: int | None = None, - width: int | None = None, - height: int | None = None, - record_time_s: int = 2, -): - """ - Initializes all the cameras and saves images to the directory. Useful to visually identify the camera - associated to a given camera index. - """ - if not camera_idx_or_paths: - camera_idx_or_paths = OpenCVCamera.find_cameras() - if len(camera_idx_or_paths) == 0: - raise RuntimeError( - "Not a single camera was detected. Try re-plugging, or re-installing `opencv-python`, " - "or your camera driver, or make sure your camera is compatible with opencv." - ) - - print("Connecting cameras") - cameras = [] - for idx_or_path in camera_idx_or_paths: - config = OpenCVCameraConfig(index_or_path=idx_or_path, fps=fps, width=width, height=height) - camera = OpenCVCamera(config) - camera.connect() - print( - f"OpenCVCamera({camera.index_or_path}, fps={camera.fps}, width={camera.capture_width}, " - f"height={camera.capture_height}, color_mode={camera.color_mode})" - ) - cameras.append(camera) - - images_dir = Path(images_dir) - if images_dir.exists(): - shutil.rmtree( - images_dir, - ) - images_dir.mkdir(parents=True, exist_ok=True) - - print(f"Saving images to {images_dir}") - frame_index = 0 - start_time = time.perf_counter() - with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor: - while True: - now = time.perf_counter() - - for camera in cameras: - # If we use async_read when fps is None, the loop will go full speed, and we will endup - # saving the same images from the cameras multiple times until the RAM/disk is full. - image = camera.read() if fps is None else camera.async_read() - - executor.submit( - save_image, - image, - camera.camera_index, - frame_index, - images_dir, - ) - - if fps is not None: - dt_s = time.perf_counter() - now - busy_wait(1 / fps - dt_s) - - print(f"Frame: {frame_index:04d}\tLatency (ms): {(time.perf_counter() - now) * 1000:.2f}") - - if time.perf_counter() - start_time > record_time_s: - break - - frame_index += 1 - - print(f"Images have been saved to {images_dir}") - - class OpenCVCamera(Camera): """ - The OpenCVCamera class allows to efficiently record images from cameras. It relies on opencv2 to communicate - with the cameras. Most cameras are compatible. For more info, see the [Video I/O with OpenCV Overview](https://docs.opencv.org/4.x/d0/da7/videoio_overview.html). + Manages camera interactions using OpenCV for efficient frame recording. - An OpenCVCamera instance requires a camera index (e.g. `OpenCVCamera(camera_index=0)`). When you only have one camera - like a webcam of a laptop, the camera index is expected to be 0, but it might also be very different, and the camera index - might change if you reboot your computer or re-plug your camera. This behavior depends on your operation system. + This class provides a high-level interface to connect to, configure, and read + frames from cameras compatible with OpenCV's VideoCapture. It supports both + synchronous and asynchronous frame reading. - To find the camera indices of your cameras, you can run our utility script that will be save a few frames for each camera: + An OpenCVCamera instance requires a camera index (e.g., 0) or a device path + (e.g., '/dev/video0' on Linux). Camera indices can be unstable across reboots + or port changes, especially on Linux. Use the provided utility script to find + available camera indices or paths: ```bash - python lerobot/common/robot_devices/cameras/opencv.py --images-dir outputs/images_from_opencv_cameras + NOTE(Steven): Point to future util ``` - When an OpenCVCamera is instantiated, if no specific config is provided, the default fps, width, height and color_mode - of the given camera will be used. + The camera's default settings (FPS, resolution, color mode) are used unless + overridden in the configuration. - Example of usage: - ```python - from lerobot.common.robot_devices.cameras.configs import OpenCVCameraConfig + Args: + config (OpenCVCameraConfig): Configuration object containing settings like + camera index/path, desired FPS, width, height, color mode, and rotation. - config = OpenCVCameraConfig(index_or_path=0) - camera = OpenCVCamera(config) - camera.connect() - color_image = camera.read() - # when done using the camera, consider disconnecting - camera.disconnect() - ``` + Example: + ```python + from lerobot.common.cameras.opencv import OpenCVCamera + from lerobot.common.cameras.configuration_opencv import OpenCVCameraConfig, ColorMode - Example of changing default fps, width, height and color_mode: - ```python - config = OpenCVCameraConfig(index_or_path=0, fps=30, width=1280, height=720) - config = OpenCVCameraConfig(index_or_path=0, fps=90, width=640, height=480) - config = OpenCVCameraConfig(index_or_path=0, fps=90, width=640, height=480, color_mode="bgr") - # Note: might error out open `camera.connect()` if these settings are not compatible with the camera - ``` + # Basic usage with camera index 0 + config = OpenCVCameraConfig(index_or_path=0) + camera = OpenCVCamera(config) + try: + camera.connect() + print(f"Connected to {camera}") + color_image = camera.read() # Synchronous read + print(f"Read frame shape: {color_image.shape}") + async_image = camera.async_read() # Asynchronous read + print(f"Async read frame shape: {async_image.shape}") + except Exception as e: + print(f"An error occurred: {e}") + finally: + camera.disconnect() + print(f"Disconnected from {camera}") + + # Example with custom settings + custom_config = OpenCVCameraConfig( + index_or_path='/dev/video0', # Or use an index + fps=30, + width=1280, + height=720, + color_mode=ColorMode.RGB, + rotation=90 + ) + custom_camera = OpenCVCamera(custom_config) + # ... connect, read, disconnect ... + ``` """ def __init__(self, config: OpenCVCameraConfig): + """ + Initializes the OpenCVCamera instance. + + Args: + config: The configuration settings for the camera. + """ self.config = config - self.index_or_path = config.index_or_path + self.index_or_path: IndexOrPath = config.index_or_path - # Store the raw (capture) resolution from the config. - self.capture_width = config.width - self.capture_height = config.height + self.capture_width: int | None = config.width + self.capture_height: int | None = config.height - # If rotated by ±90, swap width and height. if config.rotation in [-90, 90]: - self.width = config.height - self.height = config.width + self.width: int | None = config.height + self.height: int | None = config.width else: - self.width = config.width - self.height = config.height + self.width: int | None = config.width + self.height: int | None = config.height - self.fps = config.fps - self.channels = config.channels - self.color_mode = config.color_mode + self.fps: int | None = config.fps + self.channels: int = config.channels + self.color_mode: ColorMode = config.color_mode - self.camera: cv2.VideoCapture | None = None + self.videocapture_camera: cv2.VideoCapture | None = None self.thread: Thread | None = None self.stop_event: Event | None = None - self.color_image = None - self.logs = {} + self.frame_queue: queue.Queue = queue.Queue(maxsize=1) - self.rotation = get_cv2_rotation(config.rotation) - self.backend = get_cv2_backend() + self.logs: dict = {} # NOTE(Steven): Might be removed in the future + + self.rotation: int | None = get_cv2_rotation(config.rotation) + self.backend: int = get_cv2_backend() def __str__(self) -> str: + """Returns a string representation of the camera instance.""" return f"{self.__class__.__name__}({self.index_or_path})" @property def is_connected(self) -> bool: - return self.camera.isOpened() if isinstance(self.camera, cv2.VideoCapture) else False + """Checks if the camera is currently connected and opened.""" + return isinstance(self.videocapture_camera, cv2.VideoCapture) and self.videocapture_camera.isOpened() + + def _scan_available_cameras_and_raise(self, index_or_path: IndexOrPath) -> None: + """ + Scans for available cameras and raises an error if the specified + camera cannot be found or accessed. + + Args: + index_or_path: The camera identifier that failed to connect. + + Raises: + ValueError: If the specified `index_or_path` is not found among the + list of detected, usable cameras. + ConnectionError: If the camera connection fails (this specific method + is called within the connection failure context). + """ + cameras_info = self.find_cameras() + available_cam_ids = [cam["index"] for cam in cameras_info] + if index_or_path not in available_cam_ids: + raise ValueError( + f"Camera '{index_or_path}' not found or not accessible among available cameras: {available_cam_ids}. " + "Ensure the camera is properly connected and drivers are installed. " + "Run 'NOTE(Steven): Add script path' to list detected cameras." + ) + raise ConnectionError( + f"Failed to connect to camera {self}. Even though it might be listed, it could not be opened." + ) + + # NOTE(Steven): Moving it to a different function for now. To be evaluated later if it is worth it and if it makes sense to have it as an abstract method + def _configure_capture_settings(self, fps: int | None, width: int | None, height: int | None) -> None: + """ + Applies the specified FPS, width, and height settings to the connected camera. + + This method attempts to set the camera properties via OpenCV. It checks if + the camera successfully applied the settings and raises an error if not. + + Args: + fps: The desired frames per second. If None, the setting is skipped. + width: The desired capture width. If None, the setting is skipped. + height: The desired capture height. If None, the setting is skipped. + + Raises: + RuntimeError: If the camera fails to set any of the specified properties + to the requested value. + DeviceNotConnectedError: If the camera is not connected when attempting + to configure settings. + """ + if not self.is_connected: + raise DeviceNotConnectedError(f"Cannot configure settings for {self} as it is not connected.") + + if fps is not None: + self._set_fps(fps) + if width is not None: + self._set_capture_width(width) + if height is not None: + self._set_capture_height(height) def connect(self): + """ + Connects to the camera device specified in the configuration. + + Initializes the OpenCV VideoCapture object, sets desired camera properties + (FPS, width, height), and performs initial checks. + + Raises: + DeviceAlreadyConnectedError: If the camera is already connected. + ValueError: If the specified camera index/path is not found or accessible. + ConnectionError: If the camera is found but fails to open. + RuntimeError: If the camera opens but fails to apply requested FPS/resolution settings. + """ if self.is_connected: raise DeviceAlreadyConnectedError(f"{self} is already connected.") - # Use 1 thread to avoid blocking the main thread. Especially useful during data collection - # when other threads are used to save the images. + # Use 1 thread for OpenCV operations to avoid potential conflicts or + # blocking in multi-threaded applications, especially during data collection. cv2.setNumThreads(1) - self.camera = cv2.VideoCapture(self.index_or_path, self.backend) + logger.debug(f"Attempting to connect to camera {self.index_or_path} using backend {self.backend}...") + self.videocapture_camera = cv2.VideoCapture(self.index_or_path, self.backend) - # If the camera doesn't work, display the camera indices corresponding to valid cameras. - if not self.camera.isOpened(): - # Release camera to make it accessible for `find_camera_indices` - self.camera.release() - # Verify that the provided `camera_index` is valid before printing the traceback - cameras_info = self.find_cameras() - available_cam_ids = [cam["index"] for cam in cameras_info] - if self.index_or_path not in available_cam_ids: - raise ValueError( - f"`camera_index` is expected to be one of these available cameras {available_cam_ids}, " - f"but {self.index_or_path} is provided instead. To find the camera index you should use, " - "run `python lerobot/common/robot_devices/cameras/opencv.py`." - ) + if not self.videocapture_camera.isOpened(): + logger.error(f"Failed to open camera {self.index_or_path}.") + self.videocapture_camera.release() + self.videocapture_camera = None + self._scan_available_cameras_and_raise(self.index_or_path) # This raises an exception everytime - raise ConnectionError(f"Can't access {self}.") - - if self.fps is not None: - self._set_fps(self.fps) - if self.capture_width is not None: - self._set_capture_width(self.capture_width) - if self.capture_height is not None: - self._set_capture_height(self.capture_height) + logger.debug(f"Successfully opened camera {self.index_or_path}. Applying configuration...") + self._configure_capture_settings(self.fps, self.capture_width, self.capture_height) + logger.debug(f"Camera {self.index_or_path} connected and configured successfully.") def _set_fps(self, fps: int) -> None: - self.camera.set(cv2.CAP_PROP_FPS, fps) - actual_fps = self.camera.get(cv2.CAP_PROP_FPS) - # Using `math.isclose` since actual fps can be a float (e.g. 29.9 instead of 30) - if not math.isclose(fps, actual_fps, rel_tol=1e-3): - raise RuntimeError(f"Can't set {fps=} for {self}. Actual value is {actual_fps}.") + """Sets the camera's frames per second (FPS).""" + success = self.videocapture_camera.set(cv2.CAP_PROP_FPS, float(fps)) + actual_fps = self.videocapture_camera.get(cv2.CAP_PROP_FPS) + # Use math.isclose for robust float comparison + if not success or not math.isclose(fps, actual_fps, rel_tol=1e-3): + logger.warning( + f"Requested FPS {fps} for {self}, but camera reported {actual_fps} (set success: {success}). " + "This might be due to camera limitations." + ) + raise RuntimeError( + f"Failed to set requested FPS {fps} for {self}. Actual value reported: {actual_fps}." + ) + logger.debug(f"FPS set to {actual_fps} for {self}.") def _set_capture_width(self, capture_width: int) -> None: - self.camera.set(cv2.CAP_PROP_FRAME_WIDTH, capture_width) - actual_width = self.camera.get(cv2.CAP_PROP_FRAME_WIDTH) - if not math.isclose(self.capture_width, actual_width, rel_tol=1e-3): - raise RuntimeError(f"Can't set {capture_width=} for {self}. Actual value is {actual_width}.") + """Sets the camera's frame capture width.""" + success = self.videocapture_camera.set(cv2.CAP_PROP_FRAME_WIDTH, float(capture_width)) + actual_width = round(self.videocapture_camera.get(cv2.CAP_PROP_FRAME_WIDTH)) + if not success or self.capture_width != actual_width: + logger.warning( + f"Requested capture width {capture_width} for {self}, but camera reported {actual_width} (set success: {success})." + ) + raise RuntimeError( + f"Failed to set requested capture width {capture_width} for {self}. Actual value: {actual_width}." + ) + logger.debug(f"Capture width set to {actual_width} for {self}.") def _set_capture_height(self, capture_height: int) -> None: - self.camera.set(cv2.CAP_PROP_FRAME_HEIGHT, capture_height) - actual_height = self.camera.get(cv2.CAP_PROP_FRAME_HEIGHT) - if not math.isclose(self.capture_height, actual_height, rel_tol=1e-3): - raise RuntimeError(f"Can't set {capture_height=} for {self}. Actual value is {actual_height}.") + """Sets the camera's frame capture height.""" + success = self.videocapture_camera.set(cv2.CAP_PROP_FRAME_HEIGHT, float(capture_height)) + actual_height = round(self.videocapture_camera.get(cv2.CAP_PROP_FRAME_HEIGHT)) + if not success or self.capture_height != actual_height: + logger.warning( + f"Requested capture height {capture_height} for {self}, but camera reported {actual_height} (set success: {success})." + ) + raise RuntimeError( + f"Failed to set requested capture height {capture_height} for {self}. Actual value: {actual_height}." + ) + logger.debug(f"Capture height set to {actual_height} for {self}.") @staticmethod def find_cameras(max_index_search_range=MAX_OPENCV_INDEX) -> list[IndexOrPath]: + """ + Detects available cameras connected to the system. + + On Linux, it scans '/dev/video*' paths. On other systems (like macOS, Windows), + it checks indices from 0 up to `max_index_search_range`. + + Args: + max_index_search_range (int): The maximum index to check on non-Linux systems. + + Returns: + list[dict]: A list of dictionaries, where each dictionary contains information + about a found camera, primarily its 'index' (which can be an + integer index or a string path). + + Note: + This method temporarily opens each potential camera to check availability, + which might briefly interfere with other applications using cameras. + """ + if platform.system() == "Linux": - print("Linux detected. Finding available camera indices through scanning '/dev/video*' ports") - possible_idx_or_paths = [str(port) for port in Path("/dev").glob("video*")] + logger.info("Linux detected. Scanning '/dev/video*' device paths...") + possible_paths = sorted(Path("/dev").glob("video*"), key=lambda p: p.name) + targets_to_scan = [str(p) for p in possible_paths] + logger.debug(f"Found potential paths: {targets_to_scan}") else: - print( - f"{platform.system()} system detected. Finding available camera indices through " - f"scanning all indices from 0 to {MAX_OPENCV_INDEX}" + logger.info( + f"{platform.system()} system detected. Scanning indices from 0 to {max_index_search_range}..." ) - possible_idx_or_paths = range(max_index_search_range) + targets_to_scan = list(range(max_index_search_range)) - found_idx_or_paths = [] - for target in possible_idx_or_paths: - camera = cv2.VideoCapture(target) - is_open = camera.isOpened() - camera.release() - if is_open: - print(f"Camera found at {target}") - found_idx_or_paths.append(target) + found_cameras = [] + for target in targets_to_scan: + camera = cv2.VideoCapture(target, get_cv2_backend()) + if camera.isOpened(): + logger.debug(f"Camera found and accessible at '{target}'") + found_cameras.append(target) + camera.release() - return found_idx_or_paths + if not found_cameras: + logger.warning("No OpenCV cameras detected.") + else: + logger.info(f"Detected accessible cameras: {found_cameras}") - def read(self, temporary_color_mode: str | None = None) -> np.ndarray: - """Read a frame from the camera returned in the format (height, width, channels) - (e.g. 480 x 640 x 3), contrarily to the pytorch format which is channel first. + return found_cameras - Note: Reading a frame is done every `camera.fps` times per second, and it is blocking. - If you are reading data from other sensors, we advise to use `camera.async_read()` which is non blocking version of `camera.read()`. + def read(self, color_mode: ColorMode | None = None) -> np.ndarray: + """ + Reads a single frame synchronously from the camera. + + This is a blocking call. It waits for the next available frame from the + camera hardware via OpenCV. + + Args: + color_mode (Optional[ColorMode]): If specified, overrides the default + color mode (`self.color_mode`) for this read operation (e.g., + request RGB even if default is BGR). + + Returns: + np.ndarray: The captured frame as a NumPy array in the format + (height, width, channels), using the specified or default + color mode and applying any configured rotation. + + Raises: + DeviceNotConnectedError: If the camera is not connected. + RuntimeError: If reading the frame from the camera fails or if the + received frame dimensions don't match expectations before rotation. + ValueError: If an invalid `color_mode` is requested. """ if not self.is_connected: raise DeviceNotConnectedError(f"{self} is not connected.") - start = time.perf_counter() + start_time = time.perf_counter() - ret, color_image = self.camera.read() - if not ret: - raise RuntimeError(f"Can't capture color image from {self}.") + ret, frame = self.videocapture_camera.read() - self.color_image = self._postprocess_image(color_image, temporary_color_mode) - - dt_ms = (time.perf_counter() - start) * 1e3 - logger.debug(f"{self} read: {dt_ms:.1f}ms") - - # log the utc time at which the image was received - self.logs["timestamp_utc"] = capture_timestamp_utc() - - def _postprocess_image(self, image, temporary_color_mode: str | None = None): - requested_color_mode = self.color_mode if temporary_color_mode is None else temporary_color_mode - - if requested_color_mode not in ["rgb", "bgr"]: - raise ValueError( - f"Expected color values are 'rgb' or 'bgr', but {requested_color_mode} is provided." + if not ret or frame is None: + raise RuntimeError( + f"Failed to capture frame from {self}. '.read()' returned status={ret} and frame is None." ) - # OpenCV uses BGR format as default (blue, green, red) for all operations, including displaying images. - # However, Deep Learning framework such as LeRobot uses RGB format as default to train neural networks, - # so we convert the image color from BGR to RGB. - if requested_color_mode == "rgb": - color_image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) + # Post-process the frame (color conversion, dimension check, rotation) + processed_frame = self._postprocess_image(frame, color_mode) + + read_duration_ms = (time.perf_counter() - start_time) * 1e3 + logger.debug(f"{self} synchronous read took: {read_duration_ms:.1f}ms") + + self.logs["timestamp_utc"] = capture_timestamp_utc() + return processed_frame + + def _postprocess_image(self, image: np.ndarray, color_mode: ColorMode | None = None) -> np.ndarray: + """ + Applies color conversion, dimension validation, and rotation to a raw frame. + + Args: + image (np.ndarray): The raw image frame (expected BGR format from OpenCV). + color_mode (Optional[ColorMode]): The target color mode (RGB or BGR). If None, + uses the instance's default `self.color_mode`. + + Returns: + np.ndarray: The processed image frame. + + Raises: + ValueError: If the requested `color_mode` is invalid. + RuntimeError: If the raw frame dimensions do not match the configured + `capture_width` and `capture_height`. + """ + requested_color_mode = self.color_mode if color_mode is None else color_mode + + if requested_color_mode not in (ColorMode.RGB, ColorMode.BGR): + raise ValueError( + f"Invalid requested color mode '{requested_color_mode}'. Expected {ColorMode.RGB} or {ColorMode.BGR}." + ) + + h, w, c = image.shape - h, w, _ = color_image.shape if h != self.capture_height or w != self.capture_width: raise RuntimeError( - f"Can't capture color image with expected height and width ({self.height} x {self.width}). " - f"({h} x {w}) returned instead." + f"Captured frame dimensions ({h}x{w}) do not match configured capture dimensions ({self.capture_height}x{self.capture_width}) for {self}." + ) + if c != self.channels: + logger.warning( + f"Captured frame channels ({c}) do not match configured channels ({self.channels}) for {self}." ) - if self.rotation is not None: - color_image = cv2.rotate(color_image, self.rotation) + processed_image = image + if requested_color_mode == ColorMode.RGB: + processed_image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) + logger.debug(f"Converted frame from BGR to RGB for {self}.") - return color_image + if self.rotation is not None: + processed_image = cv2.rotate(processed_image, self.rotation) + logger.debug(f"Rotated frame by {self.config.rotation} degrees for {self}.") + + return processed_image def _read_loop(self): + """ + Internal loop run by the background thread for asynchronous reading. + + Continuously reads frames from the camera using the synchronous `read()` + method and places the latest frame into the `frame_queue`. It overwrites + any previous frame in the queue to ensure only the most recent one is available. + """ + logger.debug(f"Starting read loop thread for {self}.") while not self.stop_event.is_set(): try: - self.color_image = self.read() - except Exception as e: - print(f"Error reading in thread: {e}") + color_image = self.read() - def async_read(self): + with contextlib.suppress(queue.Empty): + _ = self.frame_queue.get_nowait() + self.frame_queue.put(color_image) + logger.debug(f"Frame placed in queue for {self}.") + + except DeviceNotConnectedError: + logger.error(f"Read loop for {self} stopped: Camera disconnected.") + break + except Exception as e: + logger.warning(f"Error reading frame in background thread for {self}: {e}") + + logger.debug(f"Stopping read loop thread for {self}.") + + def _ensure_read_thread_running(self): + """Starts or restarts the background read thread if it's not running.""" + logger.debug(f"Read thread for {self} is not running. Starting...") + if self.thread is not None: + self.thread.join(timeout=0.1) + if self.stop_event is not None: + self.stop_event.set() + + self.stop_event = Event() + self.thread = Thread( + target=self._read_loop, args=(), name=f"OpenCVCameraReadLoop-{self}-{self.index_or_path}" + ) + self.thread.daemon = True + self.thread.start() + logger.debug(f"Read thread started for {self}.") + + def async_read(self, timeout_ms: float = 2000) -> np.ndarray: + """ + Reads the latest available frame asynchronously. + + This method retrieves the most recent frame captured by the background + read thread. It does not block waiting for the camera hardware directly, + only waits for a frame to appear in the internal queue up to the specified + timeout. + + Args: + timeout_ms (float): Maximum time in milliseconds to wait for a frame + to become available in the queue. Defaults to 2000ms (2 seconds). + + Returns: + np.ndarray: The latest captured frame as a NumPy array in the format + (height, width, channels), processed according to configuration. + + Raises: + DeviceNotConnectedError: If the camera is not connected. + TimeoutError: If no frame becomes available within the specified timeout. + RuntimeError: If an unexpected error occurs while retrieving from the queue. + """ if not self.is_connected: raise DeviceNotConnectedError(f"{self} is not connected.") - if self.thread is None: - self.stop_event = Event() - self.thread = Thread(target=self._read_loop, args=()) - self.thread.daemon = True - self.thread.start() + if self.thread is None or not self.thread.is_alive(): + # Ensure the background reading thread is operational + self._ensure_read_thread_running() - num_tries = 0 - while True: - if self.color_image is not None: - return self.color_image + try: + # Get the latest frame from the queue, waiting up to the timeout + return self.frame_queue.get(timeout=timeout_ms / 1000.0) + except queue.Empty as e: + thread_alive = self.thread is not None and self.thread.is_alive() + logger.error( + f"Timeout waiting for frame from {self} queue after {timeout_ms}ms. " + f"(Read thread alive: {thread_alive})" + ) + raise TimeoutError( + f"Timed out waiting for frame from camera {self.index_or_path} after {timeout_ms} ms. " + f"Read thread alive: {thread_alive}." + ) from e + except Exception as e: + logger.exception(f"Unexpected error getting frame from queue for {self}: {e}") + raise RuntimeError(f"Error getting frame from queue for camera {self.index_or_path}: {e}") from e - time.sleep(1 / self.fps) - num_tries += 1 - if num_tries > self.fps * 2: - raise TimeoutError("Timed out waiting for async_read() to start.") + def _shutdown_read_thread(self): + """Cleans the background read thread if it's running.""" + if self.stop_event is not None: + logger.debug(f"Signaling stop event for read thread of {self}.") + self.stop_event.set() + else: + logger.warning(f"Stop event not found for thread of {self}, cannot signal.") + + logger.debug(f"Waiting for read thread of {self} to join...") + self.thread.join(timeout=2.0) + if self.thread.is_alive(): + logger.warning(f"Read thread for {self} did not terminate gracefully after 2 seconds.") + else: + logger.debug(f"Read thread for {self} joined successfully.") + self.thread = None + self.stop_event = None def disconnect(self): - if not self.is_connected: - raise DeviceNotConnectedError(f"{self} is not connected.") + """ + Disconnects from the camera and cleans up resources. + + Stops the background read thread (if running) and releases the OpenCV + VideoCapture object. + + Raises: + DeviceNotConnectedError: If the camera is already disconnected. + """ + if not self.is_connected and self.thread is None: + raise DeviceNotConnectedError( + f"Attempted to disconnect {self}, but it appears already disconnected." + ) + + logger.debug(f"Disconnecting from camera {self.index_or_path}...") if self.thread is not None: - self.stop_event.set() - self.thread.join() # wait for the thread to finish - self.thread = None - self.stop_event = None + self._shutdown_read_thread() - self.camera.release() - self.camera = None + if self.videocapture_camera is not None: + logger.debug(f"Releasing OpenCV VideoCapture object for {self}.") + self.videocapture_camera.release() + self.videocapture_camera = None + else: + logger.debug( + f"No OpenCV VideoCapture object to release for {self} (was already None or failed connection)." + ) - -if __name__ == "__main__": - parser = argparse.ArgumentParser( - description="Save a few frames using `OpenCVCamera` for all cameras connected to the computer, or a selected subset." - ) - parser.add_argument( - "--camera-ids", - type=int, - nargs="*", - default=None, - help="List of camera indices used to instantiate the `OpenCVCamera`. If not provided, find and use all available camera indices.", - ) - parser.add_argument( - "--fps", - type=int, - default=None, - help="Set the number of frames recorded per seconds for all cameras. If not provided, use the default fps of each camera.", - ) - parser.add_argument( - "--width", - type=int, - default=None, - help="Set the width for all cameras. If not provided, use the default width of each camera.", - ) - parser.add_argument( - "--height", - type=int, - default=None, - help="Set the height for all cameras. If not provided, use the default height of each camera.", - ) - parser.add_argument( - "--images-dir", - type=Path, - default="outputs/images_from_opencv_cameras", - help="Set directory to save a few frames for each camera.", - ) - parser.add_argument( - "--record-time-s", - type=float, - default=4.0, - help="Set the number of seconds used to record the frames. By default, 2 seconds.", - ) - args = parser.parse_args() - save_images_from_cameras(**vars(args)) + logger.debug(f"Camera {self.index_or_path} disconnected successfully.") diff --git a/lerobot/common/cameras/opencv/configuration_opencv.py b/lerobot/common/cameras/opencv/configuration_opencv.py index d28552503..354e807cf 100644 --- a/lerobot/common/cameras/opencv/configuration_opencv.py +++ b/lerobot/common/cameras/opencv/configuration_opencv.py @@ -1,7 +1,7 @@ from dataclasses import dataclass from pathlib import Path -from ..configs import CameraConfig, ColorMode +from ..configs import CameraConfig, ColorMode, Cv2Rotation @CameraConfig.register_subclass("opencv") @@ -23,17 +23,24 @@ class OpenCVCameraConfig(CameraConfig): width: int | None = None height: int | None = None color_mode: ColorMode = ColorMode.RGB - channels: int = 3 - rotation: int | None = None + channels: int = 3 # NOTE(Steven): Why is this a config? + rotation: Cv2Rotation = Cv2Rotation.NO_ROTATION def __post_init__(self): - if self.color_mode not in [ColorMode.RGB, ColorMode.BGR]: + if self.color_mode not in (ColorMode.RGB, ColorMode.BGR): raise ValueError( - f"`color_mode` is expected to be 'rgb' or 'bgr', but {self.color_mode} is provided." + f"`color_mode` is expected to be {ColorMode.RGB.value} or {ColorMode.BGR.value}, but {self.color_mode} is provided." + ) + + if self.rotation not in ( + Cv2Rotation.NO_ROTATION, + Cv2Rotation.ROTATE_90, + Cv2Rotation.ROTATE_180, + Cv2Rotation.ROTATE_270, + ): + raise ValueError( + f"`rotation` is expected to be in {(Cv2Rotation.NO_ROTATION, Cv2Rotation.ROTATE_90, Cv2Rotation.ROTATE_180, Cv2Rotation.ROTATE_270)}, but {self.rotation} is provided." ) if self.channels != 3: raise NotImplementedError(f"Unsupported number of channels: {self.channels}") - - if self.rotation not in [-90, None, 90, 180]: - raise ValueError(f"`rotation` must be in [-90, None, 90, 180] (got {self.rotation})") diff --git a/lerobot/common/cameras/utils.py b/lerobot/common/cameras/utils.py index e77038dcd..1968c0a35 100644 --- a/lerobot/common/cameras/utils.py +++ b/lerobot/common/cameras/utils.py @@ -1,7 +1,30 @@ +#!/usr/bin/env python + +# Copyright 2024 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + import platform +from pathlib import Path +from typing import TypeAlias + +import numpy as np +from PIL import Image from .camera import Camera -from .configs import CameraConfig +from .configs import CameraConfig, Cv2Rotation + +IndexOrPath: TypeAlias = int | Path def make_cameras_from_configs(camera_configs: dict[str, CameraConfig]) -> dict[str, Camera]: @@ -23,13 +46,14 @@ def make_cameras_from_configs(camera_configs: dict[str, CameraConfig]) -> dict[s return cameras -def get_cv2_rotation(rotation: int) -> int: +def get_cv2_rotation(rotation: Cv2Rotation) -> int: import cv2 return { - -90: cv2.ROTATE_90_COUNTERCLOCKWISE, - 90: cv2.ROTATE_90_CLOCKWISE, - 180: cv2.ROTATE_180, + Cv2Rotation.ROTATE_270: cv2.ROTATE_90_COUNTERCLOCKWISE, + Cv2Rotation.ROTATE_90: cv2.ROTATE_90_CLOCKWISE, + Cv2Rotation.ROTATE_180: cv2.ROTATE_180, + Cv2Rotation.NO_ROTATION: 0, }.get(rotation) @@ -41,3 +65,131 @@ def get_cv2_backend() -> int: "Windows": cv2.CAP_AVFOUNDATION, "Darwin": cv2.CAP_ANY, }.get(platform.system(), cv2.CAP_V4L2) + + +def save_image(img_array: np.ndarray, camera_index: int, frame_index: int, images_dir: Path): + img = Image.fromarray(img_array) + path = images_dir / f"camera_{camera_index:02d}_frame_{frame_index:06d}.png" + path.parent.mkdir(parents=True, exist_ok=True) + img.save(str(path), quality=100) + + +# NOTE(Steven): This should be use with both cameras implementations +# def save_images_from_cameras( +# images_dir: Path, +# camera_idx_or_paths: list[IndexOrPath] | None = None, +# fps: int | None = None, +# width: int | None = None, +# height: int | None = None, +# record_time_s: int = 2, +# ): +# """ +# Initializes all the cameras and saves images to the directory. Useful to visually identify the camera +# associated to a given camera index. +# """ +# if not camera_idx_or_paths: +# camera_idx_or_paths = OpenCVCamera.find_cameras() +# if len(camera_idx_or_paths) == 0: +# raise RuntimeError( +# "Not a single camera was detected. Try re-plugging, or re-installing `opencv-python`, " +# "or your camera driver, or make sure your camera is compatible with opencv." +# ) + +# print("Connecting cameras") +# cameras = [] +# for idx_or_path in camera_idx_or_paths: +# config = OpenCVCameraConfig(index_or_path=idx_or_path, fps=fps, width=width, height=height) +# camera = OpenCVCamera(config) +# camera.connect() +# print( +# f"OpenCVCamera({camera.index_or_path}, fps={camera.fps}, width={camera.capture_width}, " +# f"height={camera.capture_height}, color_mode={camera.color_mode})" +# ) +# cameras.append(camera) + +# images_dir = Path(images_dir) +# if images_dir.exists(): +# shutil.rmtree( +# images_dir, +# ) +# images_dir.mkdir(parents=True, exist_ok=True) + +# print(f"Saving images to {images_dir}") +# frame_index = 0 +# start_time = time.perf_counter() +# with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor: +# while True: +# now = time.perf_counter() + +# for camera in cameras: +# # If we use async_read when fps is None, the loop will go full speed, and we will endup +# # saving the same images from the cameras multiple times until the RAM/disk is full. +# image = camera.read() if fps is None else camera.async_read() + +# executor.submit( +# save_image, +# image, +# camera.camera_index, +# frame_index, +# images_dir, +# ) + +# if fps is not None: +# dt_s = time.perf_counter() - now +# busy_wait(1 / fps - dt_s) + +# print(f"Frame: {frame_index:04d}\tLatency (ms): {(time.perf_counter() - now) * 1000:.2f}") + +# if time.perf_counter() - start_time > record_time_s: +# break + +# frame_index += 1 + +# print(f"Images have been saved to {images_dir}") +# # NOTE(Steven): Cameras don't get disconnected + + +# # NOTE(Steven): Update this to be valid for both cameras type +# if __name__ == "__main__": +# parser = argparse.ArgumentParser( +# description="Save a few frames using `OpenCVCamera` for all cameras connected to the computer, or a selected subset." +# ) +# parser.add_argument( +# "--camera-ids", +# type=int, +# nargs="*", +# default=None, +# help="List of camera indices used to instantiate the `OpenCVCamera`. If not provided, find and use all available camera indices.", +# ) +# parser.add_argument( +# "--fps", +# type=int, +# default=None, +# help="Set the number of frames recorded per seconds for all cameras. If not provided, use the default fps of each camera.", +# ) +# parser.add_argument( +# "--width", +# type=str, +# default=None, +# help="Set the width for all cameras. If not provided, use the default width of each camera.", +# ) +# parser.add_argument( +# "--height", +# type=str, +# default=None, +# help="Set the height for all cameras. If not provided, use the default height of each camera.", +# ) +# parser.add_argument( +# "--images-dir", +# type=Path, +# default="outputs/images_from_opencv_cameras", +# help="Set directory to save a few frames for each camera.", +# ) +# parser.add_argument( +# "--record-time-s", +# type=float, +# default=4.0, +# help="Set the number of seconds used to record the frames. By default, 2 seconds.", +# ) +# args = parser.parse_args() +# save_images_from_cameras(**vars(args)) diff --git a/lerobot/common/utils/control_utils.py b/lerobot/common/utils/control_utils.py index 3ac792a5d..730144f33 100644 --- a/lerobot/common/utils/control_utils.py +++ b/lerobot/common/utils/control_utils.py @@ -38,6 +38,7 @@ from lerobot.common.utils.robot_utils import busy_wait from lerobot.common.utils.utils import get_safe_torch_device, has_method +# NOTE(Steven): Consider integrating this in camera class def log_control_info(robot: Robot, dt_s, episode_index=None, frame_index=None, fps=None): log_items = [] if episode_index is not None: diff --git a/lerobot/find_cameras.py b/lerobot/find_cameras.py new file mode 100644 index 000000000..9a0014dec --- /dev/null +++ b/lerobot/find_cameras.py @@ -0,0 +1 @@ +# NOTE(Steven): Place here the code for save_img (valid for both imnplementations:: python -m lerobot.find_cameras) diff --git a/tests/cameras/test_opencv.py b/tests/cameras/test_opencv.py new file mode 100644 index 000000000..5791019b8 --- /dev/null +++ b/tests/cameras/test_opencv.py @@ -0,0 +1,13 @@ +# from lerobot.common.cameras.opencv import OpenCVCamera, OpenCVCameraConfig + + +# class TestCameras(unittest.TestCase): +# # NOTE(Steven): Independent test func +# def test_connect(): +# config = OpenCVCameraConfig(camera_id=0) +# camera = OpenCVCamera(config) +# camera.connect() +# assert camera.is_connected + + +# # 1. Refactor -> opencv , write some test , think about realsese, iterate