Compare commits

...

24 Commits

Author SHA1 Message Date
Steven Palma
9adbd245e5 fix(cameras): correct validate_width_height logic 2025-05-15 16:30:18 +02:00
Steven Palma
859a369b29 chore(docs): adress notes + add docs in camera code 2025-05-15 11:08:53 +02:00
Steven Palma
cca647307b fix(tests): kill thread when camera async_read tests fail 2025-05-14 14:14:55 +02:00
Steven Palma
dae5f7c74d chore(tests): explicit cameras artefacts in gitattributes 2025-05-14 14:14:51 +02:00
Steven Palma
d6d8f29b5c fix(cameras): correct imports for camera config in scripts 2025-05-14 14:14:48 +02:00
Steven Palma
27bb7c4d71 chore(cameras): remove compressed files + filename better managed in opencv camera tests + add camera artefacts in lfs 2025-05-14 14:14:44 +02:00
Steven Palma
2d86812b97 refactor(cameras): width, fps and height is mandatory to have a value in robot config 2025-05-14 14:14:41 +02:00
Steven Palma
57c2181ed2 refactor(cameras): add read_depth() for realsense + new compressed bag 2025-05-14 14:14:36 +02:00
Steven Palma
81c49cecd0 [skip ci] refactor(cameras): add warmup read + images different size testing opencv + compressed test artefacts 2025-05-14 14:14:30 +02:00
Steven Palma
4675b3cd02 refactor(cameras): add warm-up, fix defaul args, remove width and height from find_cameras utils 2025-05-14 14:14:06 +02:00
Steven Palma
dbce247ec1 refactor(cameras): homogeneous depth processing in realsense camera 2025-05-14 14:14:02 +02:00
Steven Palma
904bc618ee refactor(cameras): fps, width and height are optional at camera level, these 3 are now moved to the camera base class, the width and height specified in the config is now the one output by read() methods 2025-05-14 14:13:59 +02:00
Steven Palma
ddd8fd325b refactor(cameras): improvements utils functionalities v0.2 2025-05-14 14:13:55 +02:00
Steven Palma
7f34e1af9c refactor(cameras): improvements utils functionalities v0.1 2025-05-14 14:13:52 +02:00
Steven Palma
3416036e34 chore(cameras): set timeout to 0 in tests 2025-05-14 14:13:48 +02:00
Steven Palma
2af8edcf74 chore(cameras): delete unused files 2025-05-14 14:13:44 +02:00
Steven Palma
b089c6db3a test(cameras): add minimal realsense test 2025-05-14 14:13:41 +02:00
Steven Palma
15b5d28f45 refactor(cameras): improvements realsense cam v0.1 2025-05-14 14:13:37 +02:00
Steven Palma
35c4b01752 test(cameras): add minimal opencv test 2025-05-14 14:13:33 +02:00
Steven Palma
6348f0f418 refactor(cameras): improvements opencv cam v0.1 2025-05-14 14:13:30 +02:00
Simon Alibert
720a6374ba chore(dependencies): add pyrealsense2 for macos + cleanup init camera modules 2025-05-14 14:13:26 +02:00
Simon Alibert
3297c7e802 refactor(cameras): realsense camera init 2025-05-14 14:13:23 +02:00
Simon Alibert
0b5b438f50 refactor(cameras): opencv camera init 2025-05-14 14:13:20 +02:00
Simon Alibert
8a6412b0db refactor(cameras): init abc class + config 2025-05-14 14:13:16 +02:00
29 changed files with 2091 additions and 1445 deletions

3
.gitattributes vendored
View File

@@ -11,10 +11,11 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
*.memmap filter=lfs diff=lfs merge=lfs -text
*.stl filter=lfs diff=lfs merge=lfs -text
*.safetensors filter=lfs diff=lfs merge=lfs -text
*.mp4 filter=lfs diff=lfs merge=lfs -text
*.arrow filter=lfs diff=lfs merge=lfs -text
*.json !text !filter !merge !diff
tests/artifacts/cameras/*.png filter=lfs diff=lfs merge=lfs -text
tests/artifacts/cameras/*.bag filter=lfs diff=lfs merge=lfs -text

View File

@@ -31,7 +31,6 @@ from pprint import pformat
import draccus
from lerobot.common.cameras import intel, opencv # noqa: F401
from lerobot.common.robots import ( # noqa: F401
Robot,
RobotConfig,

View File

@@ -1,3 +1,17 @@
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .camera import Camera
from .configs import CameraConfig
from .utils import make_cameras_from_configs

View File

@@ -1,25 +1,49 @@
#!/usr/bin/env python
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
import numpy as np
from .configs import CameraConfig, ColorMode
class Camera(abc.ABC):
def __init__(self, config: CameraConfig):
self.fps: int | None = config.fps
self.width: int | None = config.width
self.height: int | None = config.height
@property
@abc.abstractmethod
def connect(self):
def is_connected(self) -> bool:
pass
@abc.abstractmethod
def read(self, temporary_color: str | None = None) -> np.ndarray:
def connect(self, do_warmup_read: bool = True) -> None:
pass
@abc.abstractmethod
def async_read(self) -> np.ndarray:
def read(self, color_mode: ColorMode | None = None) -> np.ndarray:
pass
@abc.abstractmethod
def disconnect(self):
def async_read(self, timeout_ms: float = 2000) -> np.ndarray:
pass
def __del__(self):
if getattr(self, "is_connected", False):
self.disconnect()
@abc.abstractmethod
def disconnect(self) -> None:
pass

View File

@@ -1,11 +1,44 @@
#!/usr/bin/env python
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
from dataclasses import dataclass
from enum import Enum
import draccus
@dataclass
class ColorMode(Enum):
RGB = "rgb"
BGR = "bgr"
class Cv2Rotation(Enum):
NO_ROTATION = 0
ROTATE_90 = 90
ROTATE_180 = 180
ROTATE_270 = -90
@dataclass(kw_only=True)
class CameraConfig(draccus.ChoiceRegistry, abc.ABC):
fps: int | None = None
width: int | None = None
height: int | None = None
@property
def type(self) -> str:
return self.get_choice_name(self.__class__)

View File

@@ -1,4 +1,16 @@
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .camera_realsense import RealSenseCamera
from .configuration_realsense import RealSenseCameraConfig
__all__ = ["RealSenseCamera", "RealSenseCameraConfig"]

File diff suppressed because it is too large Load Diff

View File

@@ -14,58 +14,74 @@
from dataclasses import dataclass
from ..configs import CameraConfig
from ..configs import CameraConfig, ColorMode, Cv2Rotation
@CameraConfig.register_subclass("intelrealsense")
@dataclass
class RealSenseCameraConfig(CameraConfig):
"""
Example of tested options for Intel Real Sense D405:
"""Configuration class for Intel RealSense cameras.
This class provides specialized configuration options for Intel RealSense cameras,
including support for depth sensing and device identification via serial number or name.
Example configurations for Intel RealSense D405:
```python
RealSenseCameraConfig(128422271347, 30, 640, 480)
RealSenseCameraConfig(128422271347, 60, 640, 480)
RealSenseCameraConfig(128422271347, 90, 640, 480)
RealSenseCameraConfig(128422271347, 30, 1280, 720)
RealSenseCameraConfig(128422271347, 30, 640, 480, use_depth=True)
RealSenseCameraConfig(128422271347, 30, 640, 480, rotation=90)
# Basic configurations
RealSenseCameraConfig(128422271347, 30, 1280, 720) # 1280x720 @ 30FPS
RealSenseCameraConfig(128422271347, 60, 640, 480) # 640x480 @ 60FPS
# Advanced configurations
RealSenseCameraConfig(128422271347, 30, 640, 480, use_depth=True) # With depth sensing
RealSenseCameraConfig(128422271347, 30, 640, 480, rotation=Cv2Rotation.ROTATE_90) # With 90° rotation
```
Attributes:
fps: Requested frames per second for the color stream.
width: Requested frame width in pixels for the color stream.
height: Requested frame height in pixels for the color stream.
name: Optional human-readable name to identify the camera.
serial_number: Optional unique serial number to identify the camera.
Either name or serial_number must be provided.
color_mode: Color mode for image output (RGB or BGR). Defaults to RGB.
channels: Number of color channels (currently only 3 is supported).
use_depth: Whether to enable depth stream. Defaults to False.
rotation: Image rotation setting (0°, 90°, 180°, or 270°). Defaults to no rotation.
Note:
- Either name or serial_number must be specified, but not both.
- Depth stream configuration (if enabled) will use the same FPS as the color stream.
- The actual resolution and FPS may be adjusted by the camera to the nearest supported mode.
- Only 3-channel color output (RGB/BGR) is currently supported.
"""
name: str | None = None
serial_number: int | None = None
fps: int | None = None
width: int | None = None
height: int | None = None
color_mode: str = "rgb"
channels: int | None = None
color_mode: ColorMode = ColorMode.RGB
channels: int | None = 3
use_depth: bool = False
force_hardware_reset: bool = True
rotation: int | None = None
mock: bool = False
rotation: Cv2Rotation = Cv2Rotation.NO_ROTATION # NOTE(Steven): Check if draccus can parse to an enum
def __post_init__(self):
# bool is stronger than is None, since it works with empty strings
if self.color_mode not in (ColorMode.RGB, ColorMode.BGR):
raise ValueError(
f"`color_mode` is expected to be {ColorMode.RGB.value} or {ColorMode.BGR.value}, but {self.color_mode} is provided."
)
if self.rotation not in (
Cv2Rotation.NO_ROTATION,
Cv2Rotation.ROTATE_90,
Cv2Rotation.ROTATE_180,
Cv2Rotation.ROTATE_270,
):
raise ValueError(
f"`rotation` is expected to be in {(Cv2Rotation.NO_ROTATION, Cv2Rotation.ROTATE_90, Cv2Rotation.ROTATE_180, Cv2Rotation.ROTATE_270)}, but {self.rotation} is provided."
)
if self.channels != 3:
raise NotImplementedError(f"Unsupported number of channels: {self.channels}")
if bool(self.name) and bool(self.serial_number):
raise ValueError(
f"One of them must be set: name or serial_number, but {self.name=} and {self.serial_number=} provided."
)
if self.color_mode not in ["rgb", "bgr"]:
raise ValueError(
f"`color_mode` is expected to be 'rgb' or 'bgr', but {self.color_mode} is provided."
)
self.channels = 3
at_least_one_is_not_none = self.fps is not None or self.width is not None or self.height is not None
at_least_one_is_none = self.fps is None or self.width is None or self.height is None
if at_least_one_is_not_none and at_least_one_is_none:
raise ValueError(
"For `fps`, `width` and `height`, either all of them need to be set, or none of them, "
f"but {self.fps=}, {self.width=}, {self.height=} were provided."
)
if self.rotation not in [-90, None, 90, 180]:
raise ValueError(f"`rotation` must be in [-90, None, 90, 180] (got {self.rotation})")

View File

@@ -1,4 +1,16 @@
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .camera_opencv import OpenCVCamera
from .configuration_opencv import OpenCVCameraConfig
__all__ = ["OpenCVCamera", "OpenCVCameraConfig"]

View File

@@ -13,507 +13,543 @@
# limitations under the License.
"""
This file contains utilities for recording frames from cameras. For more info look at `OpenCVCamera` docstring.
Provides the OpenCVCamera class for capturing frames from cameras using OpenCV.
"""
import argparse
import concurrent.futures
import contextlib
import logging
import math
import platform
import shutil
import threading
import queue
import time
from pathlib import Path
from threading import Thread
from threading import Event, Thread
from typing import Any, Dict, List
import cv2
import numpy as np
from PIL import Image
from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
from lerobot.common.utils.robot_utils import (
busy_wait,
)
from lerobot.common.utils.utils import capture_timestamp_utc
from ..camera import Camera
from .configuration_opencv import OpenCVCameraConfig
from ..utils import IndexOrPath, get_cv2_backend, get_cv2_rotation
from .configuration_opencv import ColorMode, OpenCVCameraConfig
# The maximum opencv device index depends on your operating system. For instance,
# NOTE(Steven): The maximum opencv device index depends on your operating system. For instance,
# if you have 3 cameras, they should be associated to index 0, 1, and 2. This is the case
# on MacOS. However, on Ubuntu, the indices are different like 6, 16, 23.
# When you change the USB port or reboot the computer, the operating system might
# treat the same cameras as new devices. Thus we select a higher bound to search indices.
MAX_OPENCV_INDEX = 60
def find_cameras(raise_when_empty=False, max_index_search_range=MAX_OPENCV_INDEX, mock=False) -> list[dict]:
cameras = []
if platform.system() == "Linux":
print("Linux detected. Finding available camera indices through scanning '/dev/video*' ports")
possible_ports = [str(port) for port in Path("/dev").glob("video*")]
ports = _find_cameras(possible_ports, mock=mock)
for port in ports:
cameras.append(
{
"port": port,
"index": int(port.removeprefix("/dev/video")),
}
)
else:
print(
"Mac or Windows detected. Finding available camera indices through "
f"scanning all indices from 0 to {MAX_OPENCV_INDEX}"
)
possible_indices = range(max_index_search_range)
indices = _find_cameras(possible_indices, mock=mock)
for index in indices:
cameras.append(
{
"port": None,
"index": index,
}
)
return cameras
def _find_cameras(
possible_camera_ids: list[int | str], raise_when_empty=False, mock=False
) -> list[int | str]:
if mock:
import tests.cameras.mock_cv2 as cv2
else:
import cv2
camera_ids = []
for camera_idx in possible_camera_ids:
camera = cv2.VideoCapture(camera_idx)
is_open = camera.isOpened()
camera.release()
if is_open:
print(f"Camera found at index {camera_idx}")
camera_ids.append(camera_idx)
if raise_when_empty and len(camera_ids) == 0:
raise OSError(
"Not a single camera was detected. Try re-plugging, or re-installing `opencv2`, "
"or your camera driver, or make sure your camera is compatible with opencv2."
)
return camera_ids
def is_valid_unix_path(path: str) -> bool:
"""Note: if 'path' points to a symlink, this will return True only if the target exists"""
p = Path(path)
return p.is_absolute() and p.exists()
def get_camera_index_from_unix_port(port: Path) -> int:
return int(str(port.resolve()).removeprefix("/dev/video"))
def save_image(img_array, camera_index, frame_index, images_dir):
img = Image.fromarray(img_array)
path = images_dir / f"camera_{camera_index:02d}_frame_{frame_index:06d}.png"
path.parent.mkdir(parents=True, exist_ok=True)
img.save(str(path), quality=100)
def save_images_from_cameras(
images_dir: Path,
camera_ids: list | None = None,
fps=None,
width=None,
height=None,
record_time_s=2,
mock=False,
):
"""
Initializes all the cameras and saves images to the directory. Useful to visually identify the camera
associated to a given camera index.
"""
if camera_ids is None or len(camera_ids) == 0:
camera_infos = find_cameras(mock=mock)
camera_ids = [cam["index"] for cam in camera_infos]
print("Connecting cameras")
cameras = []
for cam_idx in camera_ids:
config = OpenCVCameraConfig(camera_index=cam_idx, fps=fps, width=width, height=height, mock=mock)
camera = OpenCVCamera(config)
camera.connect()
print(
f"OpenCVCamera({camera.camera_index}, fps={camera.fps}, width={camera.capture_width}, "
f"height={camera.capture_height}, color_mode={camera.color_mode})"
)
cameras.append(camera)
images_dir = Path(images_dir)
if images_dir.exists():
shutil.rmtree(
images_dir,
)
images_dir.mkdir(parents=True, exist_ok=True)
print(f"Saving images to {images_dir}")
frame_index = 0
start_time = time.perf_counter()
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
while True:
now = time.perf_counter()
for camera in cameras:
# If we use async_read when fps is None, the loop will go full speed, and we will endup
# saving the same images from the cameras multiple times until the RAM/disk is full.
image = camera.read() if fps is None else camera.async_read()
executor.submit(
save_image,
image,
camera.camera_index,
frame_index,
images_dir,
)
if fps is not None:
dt_s = time.perf_counter() - now
busy_wait(1 / fps - dt_s)
print(f"Frame: {frame_index:04d}\tLatency (ms): {(time.perf_counter() - now) * 1000:.2f}")
if time.perf_counter() - start_time > record_time_s:
break
frame_index += 1
print(f"Images have been saved to {images_dir}")
logger = logging.getLogger(__name__)
class OpenCVCamera(Camera):
"""
The OpenCVCamera class allows to efficiently record images from cameras. It relies on opencv2 to communicate
with the cameras. Most cameras are compatible. For more info, see the [Video I/O with OpenCV Overview](https://docs.opencv.org/4.x/d0/da7/videoio_overview.html).
Manages camera interactions using OpenCV for efficient frame recording.
An OpenCVCamera instance requires a camera index (e.g. `OpenCVCamera(camera_index=0)`). When you only have one camera
like a webcam of a laptop, the camera index is expected to be 0, but it might also be very different, and the camera index
might change if you reboot your computer or re-plug your camera. This behavior depends on your operation system.
This class provides a high-level interface to connect to, configure, and read
frames from cameras compatible with OpenCV's VideoCapture. It supports both
synchronous and asynchronous frame reading.
To find the camera indices of your cameras, you can run our utility script that will be save a few frames for each camera:
An OpenCVCamera instance requires a camera index (e.g., 0) or a device path
(e.g., '/dev/video0' on Linux). Camera indices can be unstable across reboots
or port changes, especially on Linux. Use the provided utility script to find
available camera indices or paths:
```bash
python lerobot/common/robot_devices/cameras/opencv.py --images-dir outputs/images_from_opencv_cameras
python -m lerobot.find_cameras
```
When an OpenCVCamera is instantiated, if no specific config is provided, the default fps, width, height and color_mode
of the given camera will be used.
The camera's default settings (FPS, resolution, color mode) are used unless
overridden in the configuration.
Example of usage:
```python
from lerobot.common.robot_devices.cameras.configs import OpenCVCameraConfig
Args:
config (OpenCVCameraConfig): Configuration object containing settings like
camera index/path, desired FPS, width, height, color mode, and rotation.
config = OpenCVCameraConfig(camera_index=0)
camera = OpenCVCamera(config)
camera.connect()
color_image = camera.read()
# when done using the camera, consider disconnecting
camera.disconnect()
```
Example:
```python
from lerobot.common.cameras.opencv import OpenCVCamera
from lerobot.common.cameras.configuration_opencv import OpenCVCameraConfig, ColorMode
Example of changing default fps, width, height and color_mode:
```python
config = OpenCVCameraConfig(camera_index=0, fps=30, width=1280, height=720)
config = OpenCVCameraConfig(camera_index=0, fps=90, width=640, height=480)
config = OpenCVCameraConfig(camera_index=0, fps=90, width=640, height=480, color_mode="bgr")
# Note: might error out open `camera.connect()` if these settings are not compatible with the camera
```
# Basic usage with camera index 0
config = OpenCVCameraConfig(index_or_path=0)
camera = OpenCVCamera(config)
try:
camera.connect()
print(f"Connected to {camera}")
color_image = camera.read() # Synchronous read
print(f"Read frame shape: {color_image.shape}")
async_image = camera.async_read() # Asynchronous read
print(f"Async read frame shape: {async_image.shape}")
except Exception as e:
print(f"An error occurred: {e}")
finally:
camera.disconnect()
print(f"Disconnected from {camera}")
# Example with custom settings
custom_config = OpenCVCameraConfig(
index_or_path='/dev/video0', # Or use an index
fps=30,
width=1280,
height=720,
color_mode=ColorMode.RGB,
rotation=90
)
custom_camera = OpenCVCamera(custom_config)
# ... connect, read, disconnect ...
```
"""
def __init__(self, config: OpenCVCameraConfig):
"""
Initializes the OpenCVCamera instance.
Args:
config: The configuration settings for the camera.
"""
super().__init__(config)
self.config = config
self.camera_index = config.camera_index
self.port = None
self.index_or_path: IndexOrPath = config.index_or_path
# Linux uses ports for connecting to cameras
if platform.system() == "Linux":
if isinstance(self.camera_index, int):
self.port = Path(f"/dev/video{self.camera_index}")
elif isinstance(self.camera_index, str) and is_valid_unix_path(self.camera_index):
self.port = Path(self.camera_index)
# Retrieve the camera index from a potentially symlinked path
self.camera_index = get_camera_index_from_unix_port(self.port)
self.fps: int | None = config.fps
self.channels: int = config.channels
self.color_mode: ColorMode = config.color_mode
self.videocapture_camera: cv2.VideoCapture | None = None
self.thread: Thread | None = None
self.stop_event: Event | None = None
self.frame_queue: queue.Queue = queue.Queue(maxsize=1)
self.logs: dict = {} # NOTE(Steven): Might be removed in the future
self.rotation: int | None = get_cv2_rotation(config.rotation)
self.backend: int = get_cv2_backend() # NOTE(Steven): If we specify backend the opencv open fails
if self.height and self.width:
if self.rotation in [cv2.ROTATE_90_CLOCKWISE, cv2.ROTATE_90_COUNTERCLOCKWISE]:
self.prerotated_width, self.prerotated_height = self.height, self.width
else:
raise ValueError(f"Please check the provided camera_index: {self.camera_index}")
self.prerotated_width, self.prerotated_height = self.width, self.height
# Store the raw (capture) resolution from the config.
self.capture_width = config.width
self.capture_height = config.height
def __str__(self) -> str:
"""Returns a string representation of the camera instance."""
return f"{self.__class__.__name__}({self.index_or_path})"
# If rotated by ±90, swap width and height.
if config.rotation in [-90, 90]:
self.width = config.height
self.height = config.width
else:
self.width = config.width
self.height = config.height
@property
def is_connected(self) -> bool:
"""Checks if the camera is currently connected and opened."""
return isinstance(self.videocapture_camera, cv2.VideoCapture) and self.videocapture_camera.isOpened()
self.fps = config.fps
self.channels = config.channels
self.color_mode = config.color_mode
self.mock = config.mock
def _configure_capture_settings(self) -> None:
"""
Applies the specified FPS, width, and height settings to the connected camera.
self.camera = None
self.is_connected = False
self.thread = None
self.stop_event = None
self.color_image = None
self.logs = {}
This method attempts to set the camera properties via OpenCV. It checks if
the camera successfully applied the settings and raises an error if not.
if self.mock:
import tests.cameras.mock_cv2 as cv2
else:
import cv2
Args:
fps: The desired frames per second. If None, the setting is skipped.
width: The desired capture width. If None, the setting is skipped.
height: The desired capture height. If None, the setting is skipped.
self.rotation = None
if config.rotation == -90:
self.rotation = cv2.ROTATE_90_COUNTERCLOCKWISE
elif config.rotation == 90:
self.rotation = cv2.ROTATE_90_CLOCKWISE
elif config.rotation == 180:
self.rotation = cv2.ROTATE_180
def connect(self):
if self.is_connected:
raise DeviceAlreadyConnectedError(f"OpenCVCamera({self.camera_index}) is already connected.")
if self.mock:
import tests.cameras.mock_cv2 as cv2
else:
import cv2
# Use 1 thread to avoid blocking the main thread. Especially useful during data collection
# when other threads are used to save the images.
cv2.setNumThreads(1)
backend = (
cv2.CAP_V4L2
if platform.system() == "Linux"
else cv2.CAP_DSHOW
if platform.system() == "Windows"
else cv2.CAP_AVFOUNDATION
if platform.system() == "Darwin"
else cv2.CAP_ANY
)
camera_idx = f"/dev/video{self.camera_index}" if platform.system() == "Linux" else self.camera_index
# First create a temporary camera trying to access `camera_index`,
# and verify it is a valid camera by calling `isOpened`.
tmp_camera = cv2.VideoCapture(camera_idx, backend)
is_camera_open = tmp_camera.isOpened()
# Release camera to make it accessible for `find_camera_indices`
tmp_camera.release()
del tmp_camera
# If the camera doesn't work, display the camera indices corresponding to
# valid cameras.
if not is_camera_open:
# Verify that the provided `camera_index` is valid before printing the traceback
cameras_info = find_cameras()
available_cam_ids = [cam["index"] for cam in cameras_info]
if self.camera_index not in available_cam_ids:
raise ValueError(
f"`camera_index` is expected to be one of these available cameras {available_cam_ids}, but {self.camera_index} is provided instead. "
"To find the camera index you should use, run `python lerobot/common/robot_devices/cameras/opencv.py`."
)
raise OSError(f"Can't access OpenCVCamera({camera_idx}).")
# Secondly, create the camera that will be used downstream.
# Note: For some unknown reason, calling `isOpened` blocks the camera which then
# needs to be re-created.
self.camera = cv2.VideoCapture(camera_idx, backend)
if self.fps is not None:
self.camera.set(cv2.CAP_PROP_FPS, self.fps)
if self.capture_width is not None:
self.camera.set(cv2.CAP_PROP_FRAME_WIDTH, self.capture_width)
if self.capture_height is not None:
self.camera.set(cv2.CAP_PROP_FRAME_HEIGHT, self.capture_height)
actual_fps = self.camera.get(cv2.CAP_PROP_FPS)
actual_width = self.camera.get(cv2.CAP_PROP_FRAME_WIDTH)
actual_height = self.camera.get(cv2.CAP_PROP_FRAME_HEIGHT)
# Using `math.isclose` since actual fps can be a float (e.g. 29.9 instead of 30)
if self.fps is not None and not math.isclose(self.fps, actual_fps, rel_tol=1e-3):
# Using `OSError` since it's a broad that encompasses issues related to device communication
raise OSError(
f"Can't set {self.fps=} for OpenCVCamera({self.camera_index}). Actual value is {actual_fps}."
)
if self.capture_width is not None and not math.isclose(
self.capture_width, actual_width, rel_tol=1e-3
):
raise OSError(
f"Can't set {self.capture_width=} for OpenCVCamera({self.camera_index}). Actual value is {actual_width}."
)
if self.capture_height is not None and not math.isclose(
self.capture_height, actual_height, rel_tol=1e-3
):
raise OSError(
f"Can't set {self.capture_height=} for OpenCVCamera({self.camera_index}). Actual value is {actual_height}."
)
self.fps = round(actual_fps)
self.capture_width = round(actual_width)
self.capture_height = round(actual_height)
self.is_connected = True
def read(self, temporary_color_mode: str | None = None) -> np.ndarray:
"""Read a frame from the camera returned in the format (height, width, channels)
(e.g. 480 x 640 x 3), contrarily to the pytorch format which is channel first.
Note: Reading a frame is done every `camera.fps` times per second, and it is blocking.
If you are reading data from other sensors, we advise to use `camera.async_read()` which is non blocking version of `camera.read()`.
Raises:
RuntimeError: If the camera fails to set any of the specified properties
to the requested value.
DeviceNotConnectedError: If the camera is not connected when attempting
to configure settings.
"""
if not self.is_connected:
raise DeviceNotConnectedError(
f"OpenCVCamera({self.camera_index}) is not connected. Try running `camera.connect()` first."
raise DeviceNotConnectedError(f"Cannot configure settings for {self} as it is not connected.")
self._validate_fps()
self._validate_width_and_height()
def connect(self, do_warmup_read: bool = True):
"""
Connects to the OpenCV camera specified in the configuration.
Initializes the OpenCV VideoCapture object, sets desired camera properties
(FPS, width, height), and performs initial checks.
Raises:
DeviceAlreadyConnectedError: If the camera is already connected.
ValueError: If the specified camera index/path is not found or accessible.
ConnectionError: If the camera is found but fails to open.
RuntimeError: If the camera opens but fails to apply requested FPS/resolution settings.
"""
if self.is_connected:
raise DeviceAlreadyConnectedError(f"{self} is already connected.")
# Use 1 thread for OpenCV operations to avoid potential conflicts or
# blocking in multi-threaded applications, especially during data collection.
cv2.setNumThreads(1)
logger.debug(f"Attempting to connect to camera {self.index_or_path} using backend {self.backend}...")
self.videocapture_camera = cv2.VideoCapture(self.index_or_path)
if not self.videocapture_camera.isOpened():
self.videocapture_camera.release()
self.videocapture_camera = None
raise ConnectionError(
f"Failed to open OpenCV camera {self.index_or_path}."
f"Run 'python -m find_cameras list-cameras' for details."
)
logger.debug(f"Successfully opened camera {self.index_or_path}. Applying configuration...")
self._configure_capture_settings()
if do_warmup_read:
logger.debug(f"Reading a warm-up frame for {self.index_or_path}...")
self.read() # NOTE(Steven): For now we just read one frame, we could also loop for X frames/secs
logger.debug(f"Camera {self.index_or_path} connected and configured successfully.")
def _validate_fps(self) -> None:
"""Validates and sets the camera's frames per second (FPS)."""
if self.fps is None:
self.fps = self.videocapture_camera.get(cv2.CAP_PROP_FPS)
logger.info(f"FPS set to camera default: {self.fps}.")
return
success = self.videocapture_camera.set(cv2.CAP_PROP_FPS, float(self.fps))
actual_fps = self.videocapture_camera.get(cv2.CAP_PROP_FPS)
# Use math.isclose for robust float comparison
if not success or not math.isclose(self.fps, actual_fps, rel_tol=1e-3):
logger.warning(
f"Requested FPS {self.fps} for {self}, but camera reported {actual_fps} (set success: {success}). "
"This might be due to camera limitations."
)
raise RuntimeError(
f"Failed to set requested FPS {self.fps} for {self}. Actual value reported: {actual_fps}."
)
logger.debug(f"FPS set to {actual_fps} for {self}.")
def _validate_width_and_height(self) -> None:
"""Validates and sets the camera's frame capture width and height."""
default_width = int(round(self.videocapture_camera.get(cv2.CAP_PROP_FRAME_WIDTH)))
default_height = int(round(self.videocapture_camera.get(cv2.CAP_PROP_FRAME_HEIGHT)))
if self.width is None or self.height is None:
if self.rotation in [cv2.ROTATE_90_CLOCKWISE, cv2.ROTATE_90_COUNTERCLOCKWISE]:
self.width, self.height = default_height, default_width
self.prerotated_width, self.prerotated_height = default_width, default_height
else:
self.width, self.height = default_width, default_height
self.prerotated_width, self.prerotated_height = default_width, default_height
logger.info(f"Capture width set to camera default: {self.width}.")
logger.info(f"Capture height set to camera default: {self.height}.")
return
success = self.videocapture_camera.set(cv2.CAP_PROP_FRAME_WIDTH, float(self.prerotated_width))
actual_width = int(round(self.videocapture_camera.get(cv2.CAP_PROP_FRAME_WIDTH)))
if not success or self.prerotated_width != actual_width:
logger.warning(
f"Requested capture width {self.prerotated_width} for {self}, but camera reported {actual_width} (set success: {success})."
)
raise RuntimeError(
f"Failed to set requested capture width {self.prerotated_width} for {self}. Actual value: {actual_width}."
)
logger.debug(f"Capture width set to {actual_width} for {self}.")
success = self.videocapture_camera.set(cv2.CAP_PROP_FRAME_HEIGHT, float(self.prerotated_height))
actual_height = int(round(self.videocapture_camera.get(cv2.CAP_PROP_FRAME_HEIGHT)))
if not success or self.prerotated_height != actual_height:
logger.warning(
f"Requested capture height {self.prerotated_height} for {self}, but camera reported {actual_height} (set success: {success})."
)
raise RuntimeError(
f"Failed to set requested capture height {self.prerotated_height} for {self}. Actual value: {actual_height}."
)
logger.debug(f"Capture height set to {actual_height} for {self}.")
@staticmethod
def find_cameras(
max_index_search_range=MAX_OPENCV_INDEX, raise_when_empty: bool = True
) -> List[Dict[str, Any]]:
"""
Detects available OpenCV cameras connected to the system.
On Linux, it scans '/dev/video*' paths. On other systems (like macOS, Windows),
it checks indices from 0 up to `max_index_search_range`.
Args:
max_index_search_range (int): The maximum index to check on non-Linux systems.
raise_when_empty (bool): If True, raises an OSError if no cameras are found.
Returns:
List[Dict[str, Any]]: A list of dictionaries,
where each dictionary contains 'type', 'id' (port index or path),
and the default profile properties (width, height, fps, format).
"""
found_cameras_info = []
if platform.system() == "Linux":
logger.info("Linux detected. Scanning '/dev/video*' device paths...")
possible_paths = sorted(Path("/dev").glob("video*"), key=lambda p: p.name)
targets_to_scan = [str(p) for p in possible_paths]
logger.debug(f"Found potential paths: {targets_to_scan}")
else:
logger.info(
f"{platform.system()} system detected. Scanning indices from 0 to {max_index_search_range}..."
)
targets_to_scan = list(range(max_index_search_range))
for target in targets_to_scan:
camera = cv2.VideoCapture(target)
if camera.isOpened():
default_width = int(camera.get(cv2.CAP_PROP_FRAME_WIDTH))
default_height = int(camera.get(cv2.CAP_PROP_FRAME_HEIGHT))
default_fps = camera.get(cv2.CAP_PROP_FPS)
default_format = camera.get(cv2.CAP_PROP_FORMAT)
camera_info = {
"name": f"OpenCV Camera @ {target}",
"type": "OpenCV",
"id": target,
"backend_api": camera.getBackendName(),
"default_stream_profile": {
"format": default_format,
"width": default_width,
"height": default_height,
"fps": default_fps,
},
}
found_cameras_info.append(camera_info)
logger.debug(f"Found OpenCV camera:: {camera_info}")
camera.release()
if not found_cameras_info:
logger.warning("No OpenCV devices detected.")
if raise_when_empty:
raise OSError("No OpenCV devices detected. Ensure cameras are connected.")
logger.info(f"Detected OpenCV cameras: {[cam['id'] for cam in found_cameras_info]}")
return found_cameras_info
def read(self, color_mode: ColorMode | None = None) -> np.ndarray:
"""
Reads a single frame synchronously from the camera.
This is a blocking call. It waits for the next available frame from the
camera hardware via OpenCV.
Args:
color_mode (Optional[ColorMode]): If specified, overrides the default
color mode (`self.color_mode`) for this read operation (e.g.,
request RGB even if default is BGR).
Returns:
np.ndarray: The captured frame as a NumPy array in the format
(height, width, channels), using the specified or default
color mode and applying any configured rotation.
Raises:
DeviceNotConnectedError: If the camera is not connected.
RuntimeError: If reading the frame from the camera fails or if the
received frame dimensions don't match expectations before rotation.
ValueError: If an invalid `color_mode` is requested.
"""
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
start_time = time.perf_counter()
ret, color_image = self.camera.read()
# NOTE(Steven): Are we okay with this blocking an undefined amount of time?
ret, frame = self.videocapture_camera.read()
if not ret:
raise OSError(f"Can't capture color image from camera {self.camera_index}.")
requested_color_mode = self.color_mode if temporary_color_mode is None else temporary_color_mode
if requested_color_mode not in ["rgb", "bgr"]:
raise ValueError(
f"Expected color values are 'rgb' or 'bgr', but {requested_color_mode} is provided."
if not ret or frame is None:
raise RuntimeError(
f"Failed to capture frame from {self}. '.read()' returned status={ret} and frame is None."
)
# OpenCV uses BGR format as default (blue, green, red) for all operations, including displaying images.
# However, Deep Learning framework such as LeRobot uses RGB format as default to train neural networks,
# so we convert the image color from BGR to RGB.
if requested_color_mode == "rgb":
if self.mock:
import tests.cameras.mock_cv2 as cv2
else:
import cv2
# Post-process the frame (color conversion, dimension check, rotation)
processed_frame = self._postprocess_image(frame, color_mode)
color_image = cv2.cvtColor(color_image, cv2.COLOR_BGR2RGB)
read_duration_ms = (time.perf_counter() - start_time) * 1e3
logger.debug(f"{self} synchronous read took: {read_duration_ms:.1f}ms")
h, w, _ = color_image.shape
if h != self.capture_height or w != self.capture_width:
raise OSError(
f"Can't capture color image with expected height and width ({self.height} x {self.width}). ({h} x {w}) returned instead."
)
if self.rotation is not None:
color_image = cv2.rotate(color_image, self.rotation)
# log the number of seconds it took to read the image
self.logs["delta_timestamp_s"] = time.perf_counter() - start_time
# log the utc time at which the image was received
self.logs["timestamp_utc"] = capture_timestamp_utc()
return processed_frame
self.color_image = color_image
def _postprocess_image(self, image: np.ndarray, color_mode: ColorMode | None = None) -> np.ndarray:
"""
Applies color conversion, dimension validation, and rotation to a raw frame.
return color_image
Args:
image (np.ndarray): The raw image frame (expected BGR format from OpenCV).
color_mode (Optional[ColorMode]): The target color mode (RGB or BGR). If None,
uses the instance's default `self.color_mode`.
def read_loop(self):
Returns:
np.ndarray: The processed image frame.
Raises:
ValueError: If the requested `color_mode` is invalid.
RuntimeError: If the raw frame dimensions do not match the configured
`width` and `height`.
"""
requested_color_mode = self.color_mode if color_mode is None else color_mode
if requested_color_mode not in (ColorMode.RGB, ColorMode.BGR):
raise ValueError(
f"Invalid requested color mode '{requested_color_mode}'. Expected {ColorMode.RGB} or {ColorMode.BGR}."
)
h, w, c = image.shape
if h != self.prerotated_height or w != self.prerotated_width:
raise RuntimeError(
f"Captured frame dimensions ({h}x{w}) do not match configured capture dimensions ({self.prerotated_height}x{self.prerotated_width}) for {self}."
)
if c != self.channels:
logger.warning(
f"Captured frame channels ({c}) do not match configured channels ({self.channels}) for {self}."
)
processed_image = image
if requested_color_mode == ColorMode.RGB:
processed_image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
logger.debug(f"Converted frame from BGR to RGB for {self}.")
if self.rotation in [cv2.ROTATE_90_CLOCKWISE, cv2.ROTATE_90_COUNTERCLOCKWISE]:
processed_image = cv2.rotate(processed_image, self.rotation)
logger.debug(f"Rotated frame by {self.config.rotation} degrees for {self}.")
return processed_image
def _read_loop(self):
"""
Internal loop run by the background thread for asynchronous reading.
Continuously reads frames from the camera using the synchronous `read()`
method and places the latest frame into the `frame_queue`. It overwrites
any previous frame in the queue.
"""
logger.debug(f"Starting read loop thread for {self}.")
while not self.stop_event.is_set():
try:
self.color_image = self.read()
color_image = self.read()
with contextlib.suppress(queue.Empty):
_ = self.frame_queue.get_nowait()
self.frame_queue.put(color_image)
logger.debug(f"Frame placed in queue for {self}.")
except DeviceNotConnectedError:
logger.error(f"Read loop for {self} stopped: Camera disconnected.")
break
except Exception as e:
print(f"Error reading in thread: {e}")
logger.warning(f"Error reading frame in background thread for {self}: {e}")
def async_read(self):
logger.debug(f"Stopping read loop thread for {self}.")
def _ensure_read_thread_running(self):
"""Starts or restarts the background read thread if it's not running."""
if self.thread is not None and self.thread.is_alive():
self.thread.join(timeout=0.1)
if self.stop_event is not None:
self.stop_event.set()
self.stop_event = Event()
self.thread = Thread(
target=self._read_loop, args=(), name=f"OpenCVCameraReadLoop-{self}-{self.index_or_path}"
)
self.thread.daemon = True
self.thread.start()
logger.debug(f"Read thread started for {self}.")
def async_read(self, timeout_ms: float = 2000) -> np.ndarray:
"""
Reads the latest available frame asynchronously.
This method retrieves the most recent frame captured by the background
read thread. It does not block waiting for the camera hardware directly,
only waits for a frame to appear in the internal queue up to the specified
timeout.
Args:
timeout_ms (float): Maximum time in milliseconds to wait for a frame
to become available in the queue. Defaults to 2000ms (2 seconds).
Returns:
np.ndarray: The latest captured frame as a NumPy array in the format
(height, width, channels), processed according to configuration.
Raises:
DeviceNotConnectedError: If the camera is not connected.
TimeoutError: If no frame becomes available within the specified timeout.
RuntimeError: If an unexpected error occurs while retrieving from the queue.
"""
if not self.is_connected:
raise DeviceNotConnectedError(
f"OpenCVCamera({self.camera_index}) is not connected. Try running `camera.connect()` first."
raise DeviceNotConnectedError(f"{self} is not connected.")
if self.thread is None or not self.thread.is_alive():
self._ensure_read_thread_running()
try:
return self.frame_queue.get(timeout=timeout_ms / 1000.0)
except queue.Empty as e:
thread_alive = self.thread is not None and self.thread.is_alive()
logger.error(
f"Timeout waiting for frame from {self} queue after {timeout_ms}ms. "
f"(Read thread alive: {thread_alive})"
)
raise TimeoutError(
f"Timed out waiting for frame from camera {self.index_or_path} after {timeout_ms} ms. "
f"Read thread alive: {thread_alive}."
) from e
except Exception as e:
logger.exception(f"Unexpected error getting frame from queue for {self}: {e}")
raise RuntimeError(f"Error getting frame from queue for camera {self.index_or_path}: {e}") from e
if self.thread is None:
self.stop_event = threading.Event()
self.thread = Thread(target=self.read_loop, args=())
self.thread.daemon = True
self.thread.start()
def _shutdown_read_thread(self):
"""Signals the background read thread to stop and waits for it to join."""
if self.stop_event is not None:
logger.debug(f"Signaling stop event for read thread of {self}.")
self.stop_event.set()
num_tries = 0
while True:
if self.color_image is not None:
return self.color_image
if self.thread is not None and self.thread.is_alive():
logger.debug(f"Waiting for read thread of {self} to join...")
self.thread.join(timeout=2.0)
if self.thread.is_alive():
logger.warning(f"Read thread for {self} did not terminate gracefully after 2 seconds.")
else:
logger.debug(f"Read thread for {self} joined successfully.")
time.sleep(1 / self.fps)
num_tries += 1
if num_tries > self.fps * 2:
raise TimeoutError("Timed out waiting for async_read() to start.")
self.thread = None
self.stop_event = None
def disconnect(self):
if not self.is_connected:
"""
Disconnects from the camera and cleans up resources.
Stops the background read thread (if running) and releases the OpenCV
VideoCapture object.
Raises:
DeviceNotConnectedError: If the camera is already disconnected.
"""
if not self.is_connected and self.thread is None:
raise DeviceNotConnectedError(
f"OpenCVCamera({self.camera_index}) is not connected. Try running `camera.connect()` first."
f"Attempted to disconnect {self}, but it appears already disconnected."
)
logger.debug(f"Disconnecting from camera {self.index_or_path}...")
if self.thread is not None:
self.stop_event.set()
self.thread.join() # wait for the thread to finish
self.thread = None
self.stop_event = None
self._shutdown_read_thread()
self.camera.release()
self.camera = None
self.is_connected = False
if self.videocapture_camera is not None:
logger.debug(f"Releasing OpenCV VideoCapture object for {self}.")
self.videocapture_camera.release()
self.videocapture_camera = None
def __del__(self):
if getattr(self, "is_connected", False):
self.disconnect()
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Save a few frames using `OpenCVCamera` for all cameras connected to the computer, or a selected subset."
)
parser.add_argument(
"--camera-ids",
type=int,
nargs="*",
default=None,
help="List of camera indices used to instantiate the `OpenCVCamera`. If not provided, find and use all available camera indices.",
)
parser.add_argument(
"--fps",
type=int,
default=None,
help="Set the number of frames recorded per seconds for all cameras. If not provided, use the default fps of each camera.",
)
parser.add_argument(
"--width",
type=int,
default=None,
help="Set the width for all cameras. If not provided, use the default width of each camera.",
)
parser.add_argument(
"--height",
type=int,
default=None,
help="Set the height for all cameras. If not provided, use the default height of each camera.",
)
parser.add_argument(
"--images-dir",
type=Path,
default="outputs/images_from_opencv_cameras",
help="Set directory to save a few frames for each camera.",
)
parser.add_argument(
"--record-time-s",
type=float,
default=4.0,
help="Set the number of seconds used to record the frames. By default, 2 seconds.",
)
args = parser.parse_args()
save_images_from_cameras(**vars(args))
logger.info(f"Camera {self.index_or_path} disconnected successfully.")

View File

@@ -1,38 +1,76 @@
from dataclasses import dataclass
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ..configs import CameraConfig
from dataclasses import dataclass
from pathlib import Path
from ..configs import CameraConfig, ColorMode, Cv2Rotation
@CameraConfig.register_subclass("opencv")
@dataclass
class OpenCVCameraConfig(CameraConfig):
"""
Example of tested options for Intel Real Sense D405:
"""Configuration class for OpenCV-based camera devices or video files.
This class provides configuration options for cameras accessed through OpenCV,
supporting both physical camera devices and video files. It includes settings
for resolution, frame rate, color mode, and image rotation.
Example configurations:
```python
OpenCVCameraConfig(0, 30, 640, 480)
OpenCVCameraConfig(0, 60, 640, 480)
OpenCVCameraConfig(0, 90, 640, 480)
OpenCVCameraConfig(0, 30, 1280, 720)
# Basic configurations
OpenCVCameraConfig(0, 30, 1280, 720) # 1280x720 @ 30FPS
OpenCVCameraConfig(/dev/video4, 60, 640, 480) # 640x480 @ 60FPS
# Advanced configurations
OpenCVCameraConfig(128422271347, 30, 640, 480, rotation=Cv2Rotation.ROTATE_90) # With 90° rotation
```
Attributes:
index_or_path: Either an integer representing the camera device index,
or a Path object pointing to a video file.
fps: Requested frames per second for the color stream.
width: Requested frame width in pixels for the color stream.
height: Requested frame height in pixels for the color stream.
color_mode: Color mode for image output (RGB or BGR). Defaults to RGB.
channels: Number of color channels (currently only 3 is supported).
rotation: Image rotation setting (0°, 90°, 180°, or 270°). Defaults to no rotation.
Note:
- Only 3-channel color output (RGB/BGR) is currently supported.
"""
camera_index: int
fps: int | None = None
width: int | None = None
height: int | None = None
color_mode: str = "rgb"
channels: int | None = None
rotation: int | None = None
mock: bool = False
index_or_path: int | Path
color_mode: ColorMode = ColorMode.RGB
channels: int = 3 # NOTE(Steven): Why is this a config?
rotation: Cv2Rotation = Cv2Rotation.NO_ROTATION
def __post_init__(self):
if self.color_mode not in ["rgb", "bgr"]:
if self.color_mode not in (ColorMode.RGB, ColorMode.BGR):
raise ValueError(
f"`color_mode` is expected to be 'rgb' or 'bgr', but {self.color_mode} is provided."
f"`color_mode` is expected to be {ColorMode.RGB.value} or {ColorMode.BGR.value}, but {self.color_mode} is provided."
)
self.channels = 3
if self.rotation not in (
Cv2Rotation.NO_ROTATION,
Cv2Rotation.ROTATE_90,
Cv2Rotation.ROTATE_180,
Cv2Rotation.ROTATE_270,
):
raise ValueError(
f"`rotation` is expected to be in {(Cv2Rotation.NO_ROTATION, Cv2Rotation.ROTATE_90, Cv2Rotation.ROTATE_180, Cv2Rotation.ROTATE_270)}, but {self.rotation} is provided."
)
if self.rotation not in [-90, None, 90, 180]:
raise ValueError(f"`rotation` must be in [-90, None, 90, 180] (got {self.rotation})")
if self.channels != 3:
raise NotImplementedError(f"Unsupported number of channels: {self.channels}")

View File

@@ -1,5 +1,30 @@
#!/usr/bin/env python
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import platform
from pathlib import Path
from typing import TypeAlias
import numpy as np
from PIL import Image
from .camera import Camera
from .configs import CameraConfig
from .configs import CameraConfig, Cv2Rotation
IndexOrPath: TypeAlias = int | Path
def make_cameras_from_configs(camera_configs: dict[str, CameraConfig]) -> dict[str, Camera]:
@@ -19,3 +44,30 @@ def make_cameras_from_configs(camera_configs: dict[str, CameraConfig]) -> dict[s
raise ValueError(f"The motor type '{cfg.type}' is not valid.")
return cameras
def get_cv2_rotation(rotation: Cv2Rotation) -> int:
import cv2
return {
Cv2Rotation.ROTATE_270: cv2.ROTATE_90_COUNTERCLOCKWISE,
Cv2Rotation.ROTATE_90: cv2.ROTATE_90_CLOCKWISE,
Cv2Rotation.ROTATE_180: cv2.ROTATE_180,
}.get(rotation)
def get_cv2_backend() -> int:
import cv2
return {
"Linux": cv2.CAP_DSHOW,
"Windows": cv2.CAP_AVFOUNDATION,
"Darwin": cv2.CAP_ANY,
}.get(platform.system(), cv2.CAP_V4L2)
def save_image(img_array: np.ndarray, camera_index: int, frame_index: int, images_dir: Path):
img = Image.fromarray(img_array)
path = images_dir / f"camera_{camera_index:02d}_frame_{frame_index:06d}.png"
path.parent.mkdir(parents=True, exist_ok=True)
img.save(str(path), quality=100)

View File

@@ -26,6 +26,17 @@ class RobotConfig(draccus.ChoiceRegistry, abc.ABC):
# Directory to store calibration file
calibration_dir: Path | None = None
def __post_init__(self):
if hasattr(self, "cameras"):
cameras = self.cameras
if cameras:
for cam_name, cam_config in cameras.items():
for attr in ["width", "height", "fps"]:
if getattr(cam_config, attr) is None:
raise ValueError(
f"Camera config for '{cam_name}' has None value for required attribute '{attr}'"
)
@property
def type(self) -> str:
return self.get_choice_name(self.__class__)

View File

@@ -33,7 +33,7 @@ class Stretch3RobotConfig(RobotConfig):
cameras: dict[str, CameraConfig] = field(
default_factory=lambda: {
"navigation": OpenCVCameraConfig(
camera_index="/dev/hello-nav-head-camera",
index_or_path="/dev/hello-nav-head-camera",
fps=10,
width=1280,
height=720,

View File

@@ -38,6 +38,7 @@ from lerobot.common.utils.robot_utils import busy_wait
from lerobot.common.utils.utils import get_safe_torch_device, has_method
# NOTE(Steven): Consider integrating this in camera class
def log_control_info(robot: Robot, dt_s, episode_index=None, frame_index=None, fps=None):
log_items = []
if episode_index is not None:

357
lerobot/find_cameras.py Normal file
View File

@@ -0,0 +1,357 @@
#!/usr/bin/env python
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import concurrent.futures
import logging
import shutil
import time
from pathlib import Path
from typing import Any, Dict, List
import numpy as np
from PIL import Image
from lerobot.common.cameras.configs import ColorMode
from lerobot.common.cameras.intel.camera_realsense import RealSenseCamera
from lerobot.common.cameras.intel.configuration_realsense import RealSenseCameraConfig
from lerobot.common.cameras.opencv.camera_opencv import OpenCVCamera
from lerobot.common.cameras.opencv.configuration_opencv import OpenCVCameraConfig
logger = logging.getLogger(__name__)
# logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(module)s - %(message)s")
def find_all_opencv_cameras() -> List[Dict[str, Any]]:
"""
Finds all available OpenCV cameras plugged into the system.
Returns:
A list of all available OpenCV cameras with their metadata.
"""
all_opencv_cameras_info: List[Dict[str, Any]] = []
logger.info("Searching for OpenCV cameras...")
try:
opencv_cameras = OpenCVCamera.find_cameras(raise_when_empty=False)
for cam_info in opencv_cameras:
all_opencv_cameras_info.append(cam_info)
logger.info(f"Found {len(opencv_cameras)} OpenCV cameras.")
except Exception as e:
logger.error(f"Error finding OpenCV cameras: {e}")
return all_opencv_cameras_info
def find_all_realsense_cameras() -> List[Dict[str, Any]]:
"""
Finds all available RealSense cameras plugged into the system.
Returns:
A list of all available RealSense cameras with their metadata.
"""
all_realsense_cameras_info: List[Dict[str, Any]] = []
logger.info("Searching for RealSense cameras...")
try:
realsense_cameras = RealSenseCamera.find_cameras(raise_when_empty=False)
for cam_info in realsense_cameras:
all_realsense_cameras_info.append(cam_info)
logger.info(f"Found {len(realsense_cameras)} RealSense cameras.")
except ImportError:
logger.warning("Skipping RealSense camera search: pyrealsense2 library not found or not importable.")
except Exception as e:
logger.error(f"Error finding RealSense cameras: {e}")
return all_realsense_cameras_info
def find_and_print_cameras(camera_type_filter: str | None = None) -> List[Dict[str, Any]]:
"""
Finds available cameras based on an optional filter and prints their information.
Args:
camera_type_filter: Optional string to filter cameras ("realsense" or "opencv").
If None, lists all cameras.
Returns:
A list of all available cameras matching the filter, with their metadata.
"""
all_cameras_info: List[Dict[str, Any]] = []
if camera_type_filter:
camera_type_filter = camera_type_filter.lower()
if camera_type_filter is None or camera_type_filter == "opencv":
all_cameras_info.extend(find_all_opencv_cameras())
if camera_type_filter is None or camera_type_filter == "realsense":
all_cameras_info.extend(find_all_realsense_cameras())
if not all_cameras_info:
if camera_type_filter:
logger.warning(f"No {camera_type_filter} cameras were detected.")
else:
logger.warning("No cameras (OpenCV or RealSense) were detected.")
else:
print("\n--- Detected Cameras ---")
for i, cam_info in enumerate(all_cameras_info):
print(f"Camera #{i}:")
for key, value in cam_info.items():
if key == "default_stream_profile" and isinstance(value, dict):
print(f" {key.replace('_', ' ').capitalize()}:")
for sub_key, sub_value in value.items():
print(f" {sub_key.capitalize()}: {sub_value}")
else:
print(f" {key.replace('_', ' ').capitalize()}: {value}")
print("-" * 20)
return all_cameras_info
def save_image(
img_array: np.ndarray,
camera_identifier: str | int,
images_dir: Path,
camera_type: str,
):
"""
Saves a single image to disk using Pillow. Handles color conversion if necessary.
"""
try:
img = Image.fromarray(img_array, mode="RGB")
safe_identifier = str(camera_identifier).replace("/", "_").replace("\\", "_")
filename_prefix = f"{camera_type.lower()}_{safe_identifier}"
filename = f"{filename_prefix}.png"
path = images_dir / filename
path.parent.mkdir(parents=True, exist_ok=True)
img.save(str(path))
logger.info(f"Saved image: {path}")
except Exception as e:
logger.error(f"Failed to save image for camera {camera_identifier} (type {camera_type}): {e}")
def initialize_output_directory(output_dir: str | Path) -> Path:
"""Initialize and clean the output directory."""
output_dir = Path(output_dir)
if output_dir.exists():
logger.info(f"Output directory {output_dir} exists. Removing previous content.")
shutil.rmtree(output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
logger.info(f"Saving images to {output_dir}")
return output_dir
def create_camera_instance(cam_meta: Dict[str, Any]) -> Dict[str, Any] | None:
"""Create and connect to a camera instance based on metadata."""
cam_type = cam_meta.get("type")
cam_id = cam_meta.get("id")
instance = None
logger.info(f"Preparing {cam_type} ID {cam_id} with default profile")
try:
if cam_type == "OpenCV":
cv_config = OpenCVCameraConfig(
index_or_path=cam_id,
color_mode=ColorMode.RGB,
)
instance = OpenCVCamera(cv_config)
elif cam_type == "RealSense":
rs_config = RealSenseCameraConfig(
serial_number=str(cam_id),
color_mode=ColorMode.RGB,
)
instance = RealSenseCamera(rs_config)
else:
logger.warning(f"Unknown camera type: {cam_type} for ID {cam_id}. Skipping.")
return None
if instance:
logger.info(f"Connecting to {cam_type} camera: {cam_id}...")
instance.connect(do_warmup_read=False)
return {"instance": instance, "meta": cam_meta}
except Exception as e:
logger.error(f"Failed to connect or configure {cam_type} camera {cam_id}: {e}")
if instance and instance.is_connected:
instance.disconnect()
return None
def process_camera_image(
cam_dict: Dict[str, Any], output_dir: Path, current_time: float
) -> concurrent.futures.Future | None:
"""Capture and process an image from a single camera."""
cam = cam_dict["instance"]
meta = cam_dict["meta"]
cam_type_str = str(meta.get("type", "unknown"))
cam_id_str = str(meta.get("id", "unknown"))
try:
image_data = cam.read()
return save_image(
image_data,
cam_id_str,
output_dir,
cam_type_str,
)
except TimeoutError:
logger.warning(
f"Timeout reading from {cam_type_str} camera {cam_id_str} at time {current_time:.2f}s."
)
except Exception as e:
logger.error(f"Error reading from {cam_type_str} camera {cam_id_str}: {e}")
return None
def cleanup_cameras(cameras_to_use: List[Dict[str, Any]]):
"""Disconnect all cameras."""
logger.info(f"Disconnecting {len(cameras_to_use)} cameras...")
for cam_dict in cameras_to_use:
try:
if cam_dict["instance"] and cam_dict["instance"].is_connected:
cam_dict["instance"].disconnect()
except Exception as e:
logger.error(f"Error disconnecting camera {cam_dict['meta'].get('id')}: {e}")
def save_images_from_all_cameras(
output_dir: str | Path,
record_time_s: float = 2.0,
camera_type_filter: str | None = None,
):
"""
Connects to detected cameras (optionally filtered by type) and saves images from each.
Uses default stream profiles for width, height, and FPS.
Args:
output_dir: Directory to save images.
record_time_s: Duration in seconds to record images.
camera_type_filter: Optional string to filter cameras ("realsense" or "opencv").
If None, uses all detected cameras.
"""
output_dir = initialize_output_directory(output_dir)
all_camera_metadata = find_and_print_cameras(camera_type_filter=camera_type_filter)
if not all_camera_metadata:
logger.warning("No cameras detected matching the criteria. Cannot save images.")
return
cameras_to_use = []
for cam_meta in all_camera_metadata:
camera_instance = create_camera_instance(cam_meta)
if camera_instance:
cameras_to_use.append(camera_instance)
if not cameras_to_use:
logger.warning("No cameras could be connected. Aborting image save.")
return
logger.info(f"Starting image capture for {record_time_s} seconds from {len(cameras_to_use)} cameras.")
start_time = time.perf_counter()
with concurrent.futures.ThreadPoolExecutor(max_workers=len(cameras_to_use) * 2) as executor:
try:
while time.perf_counter() - start_time < record_time_s:
futures = []
current_capture_time = time.perf_counter()
for cam_dict in cameras_to_use:
future = process_camera_image(cam_dict, output_dir, current_capture_time)
if future:
futures.append(future)
if futures:
concurrent.futures.wait(futures)
except KeyboardInterrupt:
logger.info("Capture interrupted by user.")
finally:
print("\nFinalizing image saving...")
executor.shutdown(wait=True)
cleanup_cameras(cameras_to_use)
logger.info(f"Image capture finished. Images saved to {output_dir}")
# NOTE(Steven):
# * realsense identified as opencv -> consistent in linux, we can even capture images
# * opencv mac cams reporting different fps at init, not an issue as we don't enforce fps here
# * opencv not opening in linux if we specify a backend
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Unified camera utility script for listing cameras and capturing images."
)
subparsers = parser.add_subparsers(dest="command", help="Available commands")
# List cameras command
list_parser = subparsers.add_parser(
"list-cameras", help="Shows connected cameras. Optionally filter by type (realsense or opencv)."
)
list_parser.add_argument(
"camera_type",
type=str,
nargs="?",
default=None,
choices=["realsense", "opencv"],
help="Specify camera type to list (e.g., 'realsense', 'opencv'). Lists all if omitted.",
)
list_parser.set_defaults(func=lambda args: find_and_print_cameras(args.camera_type))
# Capture images command
capture_parser = subparsers.add_parser(
"capture-images",
help="Saves images from detected cameras (optionally filtered by type) using their default stream profiles.",
)
capture_parser.add_argument(
"camera_type",
type=str,
nargs="?",
default=None,
choices=["realsense", "opencv"],
help="Specify camera type to capture from (e.g., 'realsense', 'opencv'). Captures from all if omitted.",
)
capture_parser.add_argument(
"--output-dir",
type=Path,
default="outputs/captured_images",
help="Directory to save images. Default: outputs/captured_images",
)
capture_parser.add_argument(
"--record-time-s",
type=float,
default=6.0,
help="Time duration to attempt capturing frames. Default: 6 seconds.",
)
capture_parser.set_defaults(
func=lambda args: save_images_from_all_cameras(
output_dir=args.output_dir,
record_time_s=args.record_time_s,
camera_type_filter=args.camera_type,
)
)
args = parser.parse_args()
if args.command is None:
default_output_dir = capture_parser.get_default("output_dir")
default_record_time_s = capture_parser.get_default("record_time_s")
save_images_from_all_cameras(
output_dir=default_output_dir,
record_time_s=default_record_time_s,
camera_type_filter=None,
)
else:
args.func(args)

View File

@@ -44,8 +44,6 @@ import rerun as rr
from lerobot.common.cameras import ( # noqa: F401
CameraConfig, # noqa: F401
intel,
opencv,
)
from lerobot.common.datasets.image_writer import safe_stop_image_writer
from lerobot.common.datasets.lerobot_dataset import LeRobotDataset

View File

@@ -38,7 +38,6 @@ import draccus
import numpy as np
import rerun as rr
from lerobot.common.cameras import intel, opencv # noqa: F401
from lerobot.common.robots import ( # noqa: F401
Robot,
RobotConfig,

View File

@@ -84,7 +84,10 @@ dora = [
]
dynamixel = ["dynamixel-sdk>=3.7.31"]
feetech = ["feetech-servo-sdk>=1.0.0"]
intelrealsense = ["pyrealsense2>=2.55.1.6486 ; sys_platform != 'darwin'"]
intelrealsense = [
"pyrealsense2>=2.55.1.6486 ; sys_platform != 'darwin'",
"pyrealsense2-macosx>=2.54 ; sys_platform == 'darwin'", #NOTE(Steven): Check previous version for sudo issue
]
pi0 = ["transformers>=4.48.0"]
pusht = ["gym-pusht>=0.1.5 ; python_version < '4.0'"]
stretch = [

View File

@@ -0,0 +1,3 @@
version https://git-lfs.github.com/spec/v1
oid sha256:f79d14daafb1c0cf2fec5d46ee8029a73fe357402fdd31a7cd4a4794d7319a7c
size 260367

View File

@@ -0,0 +1,3 @@
version https://git-lfs.github.com/spec/v1
oid sha256:b8840fb643afe903191248703b1f95a57faf5812ecd9978ac502ee939646fdb2
size 121115

View File

@@ -0,0 +1,3 @@
version https://git-lfs.github.com/spec/v1
oid sha256:7e11af87616b83c1cdb30330e951b91e86b51c64a1326e1ba5b4a3fbcdec1a11
size 55698

View File

@@ -0,0 +1,3 @@
version https://git-lfs.github.com/spec/v1
oid sha256:9dc9df05797dc0e7b92edc845caab2e4c37c3cfcabb4ee6339c67212b5baba3b
size 38023

View File

@@ -0,0 +1,3 @@
version https://git-lfs.github.com/spec/v1
oid sha256:a8d6e64d6cb0e02c94ae125630ee758055bd2e695772c0463a30d63ddc6c5e17
size 3520862

View File

@@ -1,101 +0,0 @@
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from functools import cache
import numpy as np
CAP_V4L2 = 200
CAP_DSHOW = 700
CAP_AVFOUNDATION = 1200
CAP_ANY = -1
CAP_PROP_FPS = 5
CAP_PROP_FRAME_WIDTH = 3
CAP_PROP_FRAME_HEIGHT = 4
COLOR_RGB2BGR = 4
COLOR_BGR2RGB = 4
ROTATE_90_COUNTERCLOCKWISE = 2
ROTATE_90_CLOCKWISE = 0
ROTATE_180 = 1
@cache
def _generate_image(width: int, height: int):
return np.random.randint(0, 256, size=(height, width, 3), dtype=np.uint8)
def cvtColor(color_image, color_conversion): # noqa: N802
if color_conversion in [COLOR_RGB2BGR, COLOR_BGR2RGB]:
return color_image[:, :, [2, 1, 0]]
else:
raise NotImplementedError(color_conversion)
def rotate(color_image, rotation):
if rotation is None:
return color_image
elif rotation == ROTATE_90_CLOCKWISE:
return np.rot90(color_image, k=1)
elif rotation == ROTATE_180:
return np.rot90(color_image, k=2)
elif rotation == ROTATE_90_COUNTERCLOCKWISE:
return np.rot90(color_image, k=3)
else:
raise NotImplementedError(rotation)
class VideoCapture:
def __init__(self, *args, **kwargs):
self._mock_dict = {
CAP_PROP_FPS: 30,
CAP_PROP_FRAME_WIDTH: 640,
CAP_PROP_FRAME_HEIGHT: 480,
}
self._is_opened = True
def isOpened(self): # noqa: N802
return self._is_opened
def set(self, propId: int, value: float) -> bool: # noqa: N803
if not self._is_opened:
raise RuntimeError("Camera is not opened")
self._mock_dict[propId] = value
return True
def get(self, propId: int) -> float: # noqa: N803
if not self._is_opened:
raise RuntimeError("Camera is not opened")
value = self._mock_dict[propId]
if value == 0:
if propId == CAP_PROP_FRAME_HEIGHT:
value = 480
elif propId == CAP_PROP_FRAME_WIDTH:
value = 640
return value
def read(self):
if not self._is_opened:
raise RuntimeError("Camera is not opened")
h = self.get(CAP_PROP_FRAME_HEIGHT)
w = self.get(CAP_PROP_FRAME_WIDTH)
ret = True
return ret, _generate_image(width=w, height=h)
def release(self):
self._is_opened = False
def __del__(self):
if self._is_opened:
self.release()

View File

@@ -1,148 +0,0 @@
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import enum
import numpy as np
class stream(enum.Enum): # noqa: N801
color = 0
depth = 1
class format(enum.Enum): # noqa: N801
rgb8 = 0
z16 = 1
class config: # noqa: N801
def enable_device(self, device_id: str):
self.device_enabled = device_id
def enable_stream(self, stream_type: stream, width=None, height=None, color_format=None, fps=None):
self.stream_type = stream_type
# Overwrite default values when possible
self.width = 848 if width is None else width
self.height = 480 if height is None else height
self.color_format = format.rgb8 if color_format is None else color_format
self.fps = 30 if fps is None else fps
class RSColorProfile:
def __init__(self, config):
self.config = config
def fps(self):
return self.config.fps
def width(self):
return self.config.width
def height(self):
return self.config.height
class RSColorStream:
def __init__(self, config):
self.config = config
def as_video_stream_profile(self):
return RSColorProfile(self.config)
class RSProfile:
def __init__(self, config):
self.config = config
def get_stream(self, color_format):
del color_format # unused
return RSColorStream(self.config)
class pipeline: # noqa: N801
def __init__(self):
self.started = False
self.config = None
def start(self, config):
self.started = True
self.config = config
return RSProfile(self.config)
def stop(self):
if not self.started:
raise RuntimeError("You need to start the camera before stop.")
self.started = False
self.config = None
def wait_for_frames(self, timeout_ms=50000):
del timeout_ms # unused
return RSFrames(self.config)
class RSFrames:
def __init__(self, config):
self.config = config
def get_color_frame(self):
return RSColorFrame(self.config)
def get_depth_frame(self):
return RSDepthFrame(self.config)
class RSColorFrame:
def __init__(self, config):
self.config = config
def get_data(self):
data = np.ones((self.config.height, self.config.width, 3), dtype=np.uint8)
# Create a difference between rgb and bgr
data[:, :, 0] = 2
return data
class RSDepthFrame:
def __init__(self, config):
self.config = config
def get_data(self):
return np.ones((self.config.height, self.config.width), dtype=np.uint16)
class RSDevice:
def __init__(self):
pass
def get_info(self, camera_info) -> str:
del camera_info # unused
# return fake serial number
return "123456789"
class context: # noqa: N801
def __init__(self):
pass
def query_devices(self):
return [RSDevice()]
class camera_info: # noqa: N801
# fake name
name = "Intel RealSense D435I"
def __init__(self, serial_number):
del serial_number
pass

View File

@@ -1,252 +0,0 @@
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Tests for physical cameras and their mocked versions.
If the physical camera is not connected to the computer, or not working,
the test will be skipped.
Example of running a specific test:
```bash
pytest -sx tests/test_cameras.py::test_camera
```
Example of running test on a real camera connected to the computer:
```bash
pytest -sx 'tests/test_cameras.py::test_camera[opencv-False]'
pytest -sx 'tests/test_cameras.py::test_camera[intelrealsense-False]'
```
Example of running test on a mocked version of the camera:
```bash
pytest -sx 'tests/test_cameras.py::test_camera[opencv-True]'
pytest -sx 'tests/test_cameras.py::test_camera[intelrealsense-True]'
```
"""
import numpy as np
import pytest
from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
from tests.utils import TEST_CAMERA_TYPES, make_camera, require_camera
# Maximum absolute difference between two consecutive images recorded by a camera.
# This value differs with respect to the camera.
MAX_PIXEL_DIFFERENCE = 25
def compute_max_pixel_difference(first_image, second_image):
return np.abs(first_image.astype(float) - second_image.astype(float)).max()
@pytest.mark.parametrize("camera_type, mock", TEST_CAMERA_TYPES)
@require_camera
def test_camera(request, camera_type, mock):
"""Test assumes that `camera.read()` returns the same image when called multiple times in a row.
So the environment should not change (you shouldnt be in front of the camera) and the camera should not be moving.
Warning: The tests worked for a macbookpro camera, but I am getting assertion error (`np.allclose(color_image, async_color_image)`)
for my iphone camera and my LG monitor camera.
"""
# TODO(rcadene): measure fps in nightly?
# TODO(rcadene): test logs
if camera_type == "opencv" and not mock:
pytest.skip("TODO(rcadene): fix test for opencv physical camera")
camera_kwargs = {"camera_type": camera_type, "mock": mock}
# Test instantiating
camera = make_camera(**camera_kwargs)
# Test reading, async reading, disconnecting before connecting raises an error
with pytest.raises(DeviceNotConnectedError):
camera.read()
with pytest.raises(DeviceNotConnectedError):
camera.async_read()
with pytest.raises(DeviceNotConnectedError):
camera.disconnect()
# Test deleting the object without connecting first
del camera
# Test connecting
camera = make_camera(**camera_kwargs)
camera.connect()
assert camera.is_connected
assert camera.fps is not None
assert camera.capture_width is not None
assert camera.capture_height is not None
# Test connecting twice raises an error
with pytest.raises(DeviceAlreadyConnectedError):
camera.connect()
# Test reading from the camera
color_image = camera.read()
assert isinstance(color_image, np.ndarray)
assert color_image.ndim == 3
h, w, c = color_image.shape
assert c == 3
assert w > h
# Test read and async_read outputs similar images
# ...warming up as the first frames can be black
for _ in range(30):
camera.read()
color_image = camera.read()
async_color_image = camera.async_read()
error_msg = (
"max_pixel_difference between read() and async_read()",
compute_max_pixel_difference(color_image, async_color_image),
)
# TODO(rcadene): properly set `rtol`
np.testing.assert_allclose(
color_image, async_color_image, rtol=1e-5, atol=MAX_PIXEL_DIFFERENCE, err_msg=error_msg
)
# Test disconnecting
camera.disconnect()
assert camera.camera is None
assert camera.thread is None
# Test disconnecting with `__del__`
camera = make_camera(**camera_kwargs)
camera.connect()
del camera
# Test acquiring a bgr image
camera = make_camera(**camera_kwargs, color_mode="bgr")
camera.connect()
assert camera.color_mode == "bgr"
bgr_color_image = camera.read()
np.testing.assert_allclose(
color_image, bgr_color_image[:, :, [2, 1, 0]], rtol=1e-5, atol=MAX_PIXEL_DIFFERENCE, err_msg=error_msg
)
del camera
# Test acquiring a rotated image
camera = make_camera(**camera_kwargs)
camera.connect()
ori_color_image = camera.read()
del camera
for rotation in [None, 90, 180, -90]:
camera = make_camera(**camera_kwargs, rotation=rotation)
camera.connect()
if mock:
import tests.cameras.mock_cv2 as cv2
else:
import cv2
if rotation is None:
manual_rot_img = ori_color_image
assert camera.rotation is None
elif rotation == 90:
manual_rot_img = np.rot90(color_image, k=1)
assert camera.rotation == cv2.ROTATE_90_CLOCKWISE
elif rotation == 180:
manual_rot_img = np.rot90(color_image, k=2)
assert camera.rotation == cv2.ROTATE_180
elif rotation == -90:
manual_rot_img = np.rot90(color_image, k=3)
assert camera.rotation == cv2.ROTATE_90_COUNTERCLOCKWISE
rot_color_image = camera.read()
np.testing.assert_allclose(
rot_color_image, manual_rot_img, rtol=1e-5, atol=MAX_PIXEL_DIFFERENCE, err_msg=error_msg
)
del camera
# TODO(rcadene): Add a test for a camera that doesnt support fps=60 and raises an OSError
# TODO(rcadene): Add a test for a camera that supports fps=60
# Test width and height can be set
camera = make_camera(**camera_kwargs, fps=30, width=1280, height=720)
camera.connect()
assert camera.fps == 30
assert camera.width == 1280
assert camera.height == 720
color_image = camera.read()
h, w, c = color_image.shape
assert h == 720
assert w == 1280
assert c == 3
del camera
# Test not supported width and height raise an error
camera = make_camera(**camera_kwargs, fps=30, width=0, height=0)
with pytest.raises(OSError):
camera.connect()
del camera
@pytest.mark.parametrize("camera_type, mock", TEST_CAMERA_TYPES)
@require_camera
def test_save_images_from_cameras(tmp_path, request, camera_type, mock):
# TODO(rcadene): refactor
if camera_type == "opencv":
from lerobot.common.cameras.opencv.camera_opencv import save_images_from_cameras
elif camera_type == "intelrealsense":
from lerobot.common.cameras.intel.camera_realsense import save_images_from_cameras
# Small `record_time_s` to speedup unit tests
save_images_from_cameras(tmp_path, record_time_s=0.02, mock=mock)
@pytest.mark.parametrize("camera_type, mock", TEST_CAMERA_TYPES)
@require_camera
def test_camera_rotation(request, camera_type, mock):
config_kwargs = {"camera_type": camera_type, "mock": mock, "width": 640, "height": 480, "fps": 30}
# No rotation.
camera = make_camera(**config_kwargs, rotation=None)
camera.connect()
assert camera.capture_width == 640
assert camera.capture_height == 480
assert camera.width == 640
assert camera.height == 480
no_rot_img = camera.read()
h, w, c = no_rot_img.shape
assert h == 480 and w == 640 and c == 3
camera.disconnect()
# Rotation = 90 (clockwise).
camera = make_camera(**config_kwargs, rotation=90)
camera.connect()
# With a 90° rotation, we expect the metadata dimensions to be swapped.
assert camera.capture_width == 640
assert camera.capture_height == 480
assert camera.width == 480
assert camera.height == 640
import cv2
assert camera.rotation == cv2.ROTATE_90_CLOCKWISE
rot_img = camera.read()
h, w, c = rot_img.shape
assert h == 640 and w == 480 and c == 3
camera.disconnect()
# Rotation = 180.
camera = make_camera(**config_kwargs, rotation=None)
camera.connect()
assert camera.capture_width == 640
assert camera.capture_height == 480
assert camera.width == 640
assert camera.height == 480
no_rot_img = camera.read()
h, w, c = no_rot_img.shape
assert h == 480 and w == 640 and c == 3
camera.disconnect()

View File

@@ -0,0 +1,190 @@
#!/usr/bin/env python
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Example of running a specific test:
# ```bash
# pytest tests/cameras/test_opencv.py::test_connect
# ```
import os
import numpy as np
import pytest
from lerobot.common.cameras.configs import Cv2Rotation
from lerobot.common.cameras.opencv import OpenCVCamera, OpenCVCameraConfig
from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
# NOTE(Steven): more tests + assertions?
TEST_ARTIFACTS_DIR = os.path.join(os.path.dirname(os.path.dirname(__file__)), "artifacts", "cameras")
DEFAULT_PNG_FILE_PATH = os.path.join(TEST_ARTIFACTS_DIR, "fakecam_sd_160x120.png")
TEST_IMAGE_PATHS = [
os.path.join(TEST_ARTIFACTS_DIR, "fakecam_sd_160x120.png"),
os.path.join(TEST_ARTIFACTS_DIR, "fakecam_hd_320x180.png"),
os.path.join(TEST_ARTIFACTS_DIR, "fakecam_fullhd_480x270.png"),
os.path.join(TEST_ARTIFACTS_DIR, "fakecam_square_128x128.png"),
]
def test_base_class_implementation():
config = OpenCVCameraConfig(index_or_path=0)
_ = OpenCVCamera(config)
def test_connect():
config = OpenCVCameraConfig(index_or_path=DEFAULT_PNG_FILE_PATH)
camera = OpenCVCamera(config)
camera.connect(do_warmup_read=False)
assert camera.is_connected
def test_connect_already_connected():
config = OpenCVCameraConfig(index_or_path=DEFAULT_PNG_FILE_PATH)
camera = OpenCVCamera(config)
camera.connect(do_warmup_read=False)
with pytest.raises(DeviceAlreadyConnectedError):
camera.connect(do_warmup_read=False)
def test_connect_invalid_camera_path():
config = OpenCVCameraConfig(index_or_path="nonexistent/camera.png")
camera = OpenCVCamera(config)
with pytest.raises(ConnectionError):
camera.connect(do_warmup_read=False)
def test_invalid_width_connect():
config = OpenCVCameraConfig(
index_or_path=DEFAULT_PNG_FILE_PATH,
width=99999, # Invalid width to trigger error
height=480,
)
camera = OpenCVCamera(config)
with pytest.raises(RuntimeError):
camera.connect(do_warmup_read=False)
@pytest.mark.parametrize("index_or_path", TEST_IMAGE_PATHS)
def test_read(index_or_path):
config = OpenCVCameraConfig(index_or_path=index_or_path)
camera = OpenCVCamera(config)
camera.connect(do_warmup_read=False)
img = camera.read()
assert isinstance(img, np.ndarray)
def test_read_before_connect():
config = OpenCVCameraConfig(index_or_path=DEFAULT_PNG_FILE_PATH)
camera = OpenCVCamera(config)
with pytest.raises(DeviceNotConnectedError):
_ = camera.read()
def test_disconnect():
config = OpenCVCameraConfig(index_or_path=DEFAULT_PNG_FILE_PATH)
camera = OpenCVCamera(config)
camera.connect(do_warmup_read=False)
camera.disconnect()
assert not camera.is_connected
def test_disconnect_before_connect():
config = OpenCVCameraConfig(index_or_path=DEFAULT_PNG_FILE_PATH)
camera = OpenCVCamera(config)
with pytest.raises(DeviceNotConnectedError):
_ = camera.disconnect()
@pytest.mark.parametrize("index_or_path", TEST_IMAGE_PATHS)
def test_async_read(index_or_path):
config = OpenCVCameraConfig(index_or_path=index_or_path)
camera = OpenCVCamera(config)
camera.connect(do_warmup_read=False)
try:
img = camera.async_read()
assert camera.thread is not None
assert camera.thread.is_alive()
assert isinstance(img, np.ndarray)
finally:
if camera.is_connected:
camera.disconnect() # To stop/join the thread. Otherwise get warnings when the test ends
def test_async_read_timeout():
config = OpenCVCameraConfig(index_or_path=DEFAULT_PNG_FILE_PATH)
camera = OpenCVCamera(config)
camera.connect(do_warmup_read=False)
try:
with pytest.raises(TimeoutError):
camera.async_read(timeout_ms=0)
finally:
if camera.is_connected:
camera.disconnect()
def test_async_read_before_connect():
config = OpenCVCameraConfig(index_or_path=DEFAULT_PNG_FILE_PATH)
camera = OpenCVCamera(config)
with pytest.raises(DeviceNotConnectedError):
_ = camera.async_read()
@pytest.mark.parametrize("index_or_path", TEST_IMAGE_PATHS)
@pytest.mark.parametrize(
"rotation",
[
Cv2Rotation.NO_ROTATION,
Cv2Rotation.ROTATE_90,
Cv2Rotation.ROTATE_180,
Cv2Rotation.ROTATE_270,
],
)
def test_all_rotations(rotation, index_or_path):
filename = os.path.basename(index_or_path)
dimensions = filename.split("_")[-1].split(".")[0] # Assumes filenames format (_wxh.png)
original_width, original_height = map(int, dimensions.split("x"))
config = OpenCVCameraConfig(index_or_path=index_or_path, rotation=rotation)
camera = OpenCVCamera(config)
camera.connect(do_warmup_read=False)
img = camera.read()
assert isinstance(img, np.ndarray)
if rotation in (Cv2Rotation.ROTATE_90, Cv2Rotation.ROTATE_270):
assert camera.width == original_height
assert camera.height == original_width
assert img.shape[:2] == (original_width, original_height)
else:
assert camera.width == original_width
assert camera.height == original_height
assert img.shape[:2] == (original_height, original_width)

View File

@@ -0,0 +1,199 @@
#!/usr/bin/env python
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Example of running a specific test:
# ```bash
# pytest tests/cameras/test_opencv.py::test_connect
# ```
import os
from unittest.mock import patch
import numpy as np
import pytest
from lerobot.common.cameras.configs import Cv2Rotation
from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
try:
from lerobot.common.cameras.intel import RealSenseCamera, RealSenseCameraConfig
except (ImportError, ModuleNotFoundError):
pytest.skip("pyrealsense2 not available", allow_module_level=True)
TEST_ARTIFACTS_DIR = os.path.join(os.path.dirname(os.path.dirname(__file__)), "artifacts", "cameras")
BAG_FILE_PATH = os.path.join(TEST_ARTIFACTS_DIR, "test_rs.bag")
# NOTE(Steven): Missing tests for depth
# NOTE(Steven): Takes 20sec, the patch being the biggest bottleneck
# NOTE(Steven): more tests + assertions?
if not os.path.exists(BAG_FILE_PATH):
print(f"Warning: Bag file not found at {BAG_FILE_PATH}. Some tests might fail or be skipped.")
def mock_rs_config_enable_device_from_file(rs_config_instance, sn):
if not os.path.exists(BAG_FILE_PATH):
raise FileNotFoundError(f"Test bag file not found: {BAG_FILE_PATH}")
return rs_config_instance.enable_device_from_file(BAG_FILE_PATH, repeat_playback=True)
def mock_rs_config_enable_device_bad_file(rs_config_instance, sn):
return rs_config_instance.enable_device_from_file("non_existent_file.bag", repeat_playback=True)
def test_base_class_implementation():
config = RealSenseCameraConfig(serial_number=42)
_ = RealSenseCamera(config)
@patch("pyrealsense2.config.enable_device", side_effect=mock_rs_config_enable_device_from_file)
def test_connect(mock_enable_device):
config = RealSenseCameraConfig(serial_number=42)
camera = RealSenseCamera(config)
camera.connect(do_warmup_read=False)
assert camera.is_connected
@patch("pyrealsense2.config.enable_device", side_effect=mock_rs_config_enable_device_from_file)
def test_connect_already_connected(mock_enable_device):
config = RealSenseCameraConfig(serial_number=42)
camera = RealSenseCamera(config)
camera.connect(do_warmup_read=False)
with pytest.raises(DeviceAlreadyConnectedError):
camera.connect(do_warmup_read=False)
@patch("pyrealsense2.config.enable_device", side_effect=mock_rs_config_enable_device_bad_file)
def test_connect_invalid_camera_path(mock_enable_device):
config = RealSenseCameraConfig(serial_number=42)
camera = RealSenseCamera(config)
with pytest.raises(ConnectionError):
camera.connect(do_warmup_read=False)
@patch("pyrealsense2.config.enable_device", side_effect=mock_rs_config_enable_device_from_file)
def test_invalid_width_connect(mock_enable_device):
config = RealSenseCameraConfig(serial_number=42, width=99999, height=480, fps=30)
camera = RealSenseCamera(config)
with pytest.raises(ConnectionError):
camera.connect(do_warmup_read=False)
@patch("pyrealsense2.config.enable_device", side_effect=mock_rs_config_enable_device_from_file)
def test_read(mock_enable_device):
config = RealSenseCameraConfig(serial_number=42, width=640, height=480, fps=30)
camera = RealSenseCamera(config)
camera.connect(do_warmup_read=False)
img = camera.read()
assert isinstance(img, np.ndarray)
def test_read_before_connect():
config = RealSenseCameraConfig(serial_number=42)
camera = RealSenseCamera(config)
with pytest.raises(DeviceNotConnectedError):
_ = camera.read()
@patch("pyrealsense2.config.enable_device", side_effect=mock_rs_config_enable_device_from_file)
def test_disconnect(mock_enable_device):
config = RealSenseCameraConfig(serial_number=42)
camera = RealSenseCamera(config)
camera.connect(do_warmup_read=False)
camera.disconnect()
assert not camera.is_connected
def test_disconnect_before_connect():
config = RealSenseCameraConfig(serial_number=42)
camera = RealSenseCamera(config)
with pytest.raises(DeviceNotConnectedError):
camera.disconnect()
@patch("pyrealsense2.config.enable_device", side_effect=mock_rs_config_enable_device_from_file)
def test_async_read(mock_enable_device):
config = RealSenseCameraConfig(serial_number=42, width=640, height=480, fps=30)
camera = RealSenseCamera(config)
camera.connect(do_warmup_read=False)
try:
img = camera.async_read()
assert camera.thread is not None
assert camera.thread.is_alive()
assert isinstance(img, np.ndarray)
finally:
if camera.is_connected:
camera.disconnect() # To stop/join the thread. Otherwise get warnings when the test ends
@patch("pyrealsense2.config.enable_device", side_effect=mock_rs_config_enable_device_from_file)
def test_async_read_timeout(mock_enable_device):
config = RealSenseCameraConfig(serial_number=42, width=640, height=480, fps=30)
camera = RealSenseCamera(config)
camera.connect(do_warmup_read=False)
try:
with pytest.raises(TimeoutError):
camera.async_read(timeout_ms=0)
finally:
if camera.is_connected:
camera.disconnect()
def test_async_read_before_connect():
config = RealSenseCameraConfig(serial_number=42)
camera = RealSenseCamera(config)
with pytest.raises(DeviceNotConnectedError):
_ = camera.async_read()
@pytest.mark.parametrize(
"rotation",
[
Cv2Rotation.NO_ROTATION,
Cv2Rotation.ROTATE_90,
Cv2Rotation.ROTATE_180,
Cv2Rotation.ROTATE_270,
],
)
@patch("pyrealsense2.config.enable_device", side_effect=mock_rs_config_enable_device_from_file)
def test_all_rotations(mock_enable_device, rotation):
config = RealSenseCameraConfig(serial_number=42, rotation=rotation)
camera = RealSenseCamera(config)
camera.connect(do_warmup_read=False)
img = camera.read()
assert isinstance(img, np.ndarray)
if rotation in (Cv2Rotation.ROTATE_90, Cv2Rotation.ROTATE_270):
assert camera.width == 480
assert camera.height == 640
assert img.shape[:2] == (640, 480)
else:
assert camera.width == 640
assert camera.height == 480
assert img.shape[:2] == (480, 640)