|
|
|
|
@@ -13,35 +13,27 @@
|
|
|
|
|
# limitations under the License.
|
|
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
This file contains utilities for recording frames from cameras. For more info look at `OpenCVCamera` docstring.
|
|
|
|
|
Provides the OpenCVCamera class for capturing frames from cameras using OpenCV.
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
import argparse
|
|
|
|
|
import concurrent.futures
|
|
|
|
|
import contextlib
|
|
|
|
|
import logging
|
|
|
|
|
import math
|
|
|
|
|
import platform
|
|
|
|
|
import shutil
|
|
|
|
|
import queue
|
|
|
|
|
import time
|
|
|
|
|
from pathlib import Path
|
|
|
|
|
from threading import Event, Thread
|
|
|
|
|
from typing import TypeAlias
|
|
|
|
|
|
|
|
|
|
import cv2
|
|
|
|
|
import numpy as np
|
|
|
|
|
from PIL import Image
|
|
|
|
|
|
|
|
|
|
from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
|
|
|
|
|
from lerobot.common.utils.robot_utils import (
|
|
|
|
|
busy_wait,
|
|
|
|
|
)
|
|
|
|
|
from lerobot.common.utils.utils import capture_timestamp_utc
|
|
|
|
|
|
|
|
|
|
from ..camera import Camera
|
|
|
|
|
from ..utils import get_cv2_backend, get_cv2_rotation
|
|
|
|
|
from .configuration_opencv import OpenCVCameraConfig
|
|
|
|
|
|
|
|
|
|
IndexOrPath: TypeAlias = int | Path
|
|
|
|
|
from ..utils import IndexOrPath, get_cv2_backend, get_cv2_rotation
|
|
|
|
|
from .configuration_opencv import ColorMode, OpenCVCameraConfig
|
|
|
|
|
|
|
|
|
|
# The maximum opencv device index depends on your operating system. For instance,
|
|
|
|
|
# if you have 3 cameras, they should be associated to index 0, 1, and 2. This is the case
|
|
|
|
|
@@ -53,376 +45,505 @@ MAX_OPENCV_INDEX = 60
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def is_valid_unix_path(path: str) -> bool:
|
|
|
|
|
"""Note: if 'path' points to a symlink, this will return True only if the target exists"""
|
|
|
|
|
p = Path(path)
|
|
|
|
|
return p.is_absolute() and p.exists()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_camera_index_from_unix_port(port: Path) -> int:
|
|
|
|
|
return int(str(port.resolve()).removeprefix("/dev/video"))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def save_image(img_array: np.ndarray, camera_index: int, frame_index: int, images_dir: Path):
|
|
|
|
|
img = Image.fromarray(img_array)
|
|
|
|
|
path = images_dir / f"camera_{camera_index:02d}_frame_{frame_index:06d}.png"
|
|
|
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
img.save(str(path), quality=100)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def save_images_from_cameras(
|
|
|
|
|
images_dir: Path,
|
|
|
|
|
camera_idx_or_paths: list[IndexOrPath] | None = None,
|
|
|
|
|
fps: int | None = None,
|
|
|
|
|
width: int | None = None,
|
|
|
|
|
height: int | None = None,
|
|
|
|
|
record_time_s: int = 2,
|
|
|
|
|
):
|
|
|
|
|
"""
|
|
|
|
|
Initializes all the cameras and saves images to the directory. Useful to visually identify the camera
|
|
|
|
|
associated to a given camera index.
|
|
|
|
|
"""
|
|
|
|
|
if not camera_idx_or_paths:
|
|
|
|
|
camera_idx_or_paths = OpenCVCamera.find_cameras()
|
|
|
|
|
if len(camera_idx_or_paths) == 0:
|
|
|
|
|
raise RuntimeError(
|
|
|
|
|
"Not a single camera was detected. Try re-plugging, or re-installing `opencv-python`, "
|
|
|
|
|
"or your camera driver, or make sure your camera is compatible with opencv."
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
print("Connecting cameras")
|
|
|
|
|
cameras = []
|
|
|
|
|
for idx_or_path in camera_idx_or_paths:
|
|
|
|
|
config = OpenCVCameraConfig(index_or_path=idx_or_path, fps=fps, width=width, height=height)
|
|
|
|
|
camera = OpenCVCamera(config)
|
|
|
|
|
camera.connect()
|
|
|
|
|
print(
|
|
|
|
|
f"OpenCVCamera({camera.index_or_path}, fps={camera.fps}, width={camera.capture_width}, "
|
|
|
|
|
f"height={camera.capture_height}, color_mode={camera.color_mode})"
|
|
|
|
|
)
|
|
|
|
|
cameras.append(camera)
|
|
|
|
|
|
|
|
|
|
images_dir = Path(images_dir)
|
|
|
|
|
if images_dir.exists():
|
|
|
|
|
shutil.rmtree(
|
|
|
|
|
images_dir,
|
|
|
|
|
)
|
|
|
|
|
images_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
|
|
|
|
print(f"Saving images to {images_dir}")
|
|
|
|
|
frame_index = 0
|
|
|
|
|
start_time = time.perf_counter()
|
|
|
|
|
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
|
|
|
|
|
while True:
|
|
|
|
|
now = time.perf_counter()
|
|
|
|
|
|
|
|
|
|
for camera in cameras:
|
|
|
|
|
# If we use async_read when fps is None, the loop will go full speed, and we will endup
|
|
|
|
|
# saving the same images from the cameras multiple times until the RAM/disk is full.
|
|
|
|
|
image = camera.read() if fps is None else camera.async_read()
|
|
|
|
|
|
|
|
|
|
executor.submit(
|
|
|
|
|
save_image,
|
|
|
|
|
image,
|
|
|
|
|
camera.camera_index,
|
|
|
|
|
frame_index,
|
|
|
|
|
images_dir,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
if fps is not None:
|
|
|
|
|
dt_s = time.perf_counter() - now
|
|
|
|
|
busy_wait(1 / fps - dt_s)
|
|
|
|
|
|
|
|
|
|
print(f"Frame: {frame_index:04d}\tLatency (ms): {(time.perf_counter() - now) * 1000:.2f}")
|
|
|
|
|
|
|
|
|
|
if time.perf_counter() - start_time > record_time_s:
|
|
|
|
|
break
|
|
|
|
|
|
|
|
|
|
frame_index += 1
|
|
|
|
|
|
|
|
|
|
print(f"Images have been saved to {images_dir}")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class OpenCVCamera(Camera):
|
|
|
|
|
"""
|
|
|
|
|
The OpenCVCamera class allows to efficiently record images from cameras. It relies on opencv2 to communicate
|
|
|
|
|
with the cameras. Most cameras are compatible. For more info, see the [Video I/O with OpenCV Overview](https://docs.opencv.org/4.x/d0/da7/videoio_overview.html).
|
|
|
|
|
Manages camera interactions using OpenCV for efficient frame recording.
|
|
|
|
|
|
|
|
|
|
An OpenCVCamera instance requires a camera index (e.g. `OpenCVCamera(camera_index=0)`). When you only have one camera
|
|
|
|
|
like a webcam of a laptop, the camera index is expected to be 0, but it might also be very different, and the camera index
|
|
|
|
|
might change if you reboot your computer or re-plug your camera. This behavior depends on your operation system.
|
|
|
|
|
This class provides a high-level interface to connect to, configure, and read
|
|
|
|
|
frames from cameras compatible with OpenCV's VideoCapture. It supports both
|
|
|
|
|
synchronous and asynchronous frame reading.
|
|
|
|
|
|
|
|
|
|
To find the camera indices of your cameras, you can run our utility script that will be save a few frames for each camera:
|
|
|
|
|
An OpenCVCamera instance requires a camera index (e.g., 0) or a device path
|
|
|
|
|
(e.g., '/dev/video0' on Linux). Camera indices can be unstable across reboots
|
|
|
|
|
or port changes, especially on Linux. Use the provided utility script to find
|
|
|
|
|
available camera indices or paths:
|
|
|
|
|
```bash
|
|
|
|
|
python lerobot/common/robot_devices/cameras/opencv.py --images-dir outputs/images_from_opencv_cameras
|
|
|
|
|
NOTE(Steven): Point to future util
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
When an OpenCVCamera is instantiated, if no specific config is provided, the default fps, width, height and color_mode
|
|
|
|
|
of the given camera will be used.
|
|
|
|
|
The camera's default settings (FPS, resolution, color mode) are used unless
|
|
|
|
|
overridden in the configuration.
|
|
|
|
|
|
|
|
|
|
Example of usage:
|
|
|
|
|
```python
|
|
|
|
|
from lerobot.common.robot_devices.cameras.configs import OpenCVCameraConfig
|
|
|
|
|
Args:
|
|
|
|
|
config (OpenCVCameraConfig): Configuration object containing settings like
|
|
|
|
|
camera index/path, desired FPS, width, height, color mode, and rotation.
|
|
|
|
|
|
|
|
|
|
config = OpenCVCameraConfig(index_or_path=0)
|
|
|
|
|
camera = OpenCVCamera(config)
|
|
|
|
|
camera.connect()
|
|
|
|
|
color_image = camera.read()
|
|
|
|
|
# when done using the camera, consider disconnecting
|
|
|
|
|
camera.disconnect()
|
|
|
|
|
```
|
|
|
|
|
Example:
|
|
|
|
|
```python
|
|
|
|
|
from lerobot.common.cameras.opencv import OpenCVCamera
|
|
|
|
|
from lerobot.common.cameras.configuration_opencv import OpenCVCameraConfig, ColorMode
|
|
|
|
|
|
|
|
|
|
Example of changing default fps, width, height and color_mode:
|
|
|
|
|
```python
|
|
|
|
|
config = OpenCVCameraConfig(index_or_path=0, fps=30, width=1280, height=720)
|
|
|
|
|
config = OpenCVCameraConfig(index_or_path=0, fps=90, width=640, height=480)
|
|
|
|
|
config = OpenCVCameraConfig(index_or_path=0, fps=90, width=640, height=480, color_mode="bgr")
|
|
|
|
|
# Note: might error out open `camera.connect()` if these settings are not compatible with the camera
|
|
|
|
|
```
|
|
|
|
|
# Basic usage with camera index 0
|
|
|
|
|
config = OpenCVCameraConfig(index_or_path=0)
|
|
|
|
|
camera = OpenCVCamera(config)
|
|
|
|
|
try:
|
|
|
|
|
camera.connect()
|
|
|
|
|
print(f"Connected to {camera}")
|
|
|
|
|
color_image = camera.read() # Synchronous read
|
|
|
|
|
print(f"Read frame shape: {color_image.shape}")
|
|
|
|
|
async_image = camera.async_read() # Asynchronous read
|
|
|
|
|
print(f"Async read frame shape: {async_image.shape}")
|
|
|
|
|
except Exception as e:
|
|
|
|
|
print(f"An error occurred: {e}")
|
|
|
|
|
finally:
|
|
|
|
|
camera.disconnect()
|
|
|
|
|
print(f"Disconnected from {camera}")
|
|
|
|
|
|
|
|
|
|
# Example with custom settings
|
|
|
|
|
custom_config = OpenCVCameraConfig(
|
|
|
|
|
index_or_path='/dev/video0', # Or use an index
|
|
|
|
|
fps=30,
|
|
|
|
|
width=1280,
|
|
|
|
|
height=720,
|
|
|
|
|
color_mode=ColorMode.RGB,
|
|
|
|
|
rotation=90
|
|
|
|
|
)
|
|
|
|
|
custom_camera = OpenCVCamera(custom_config)
|
|
|
|
|
# ... connect, read, disconnect ...
|
|
|
|
|
```
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
def __init__(self, config: OpenCVCameraConfig):
|
|
|
|
|
"""
|
|
|
|
|
Initializes the OpenCVCamera instance.
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
config: The configuration settings for the camera.
|
|
|
|
|
"""
|
|
|
|
|
self.config = config
|
|
|
|
|
self.index_or_path = config.index_or_path
|
|
|
|
|
self.index_or_path: IndexOrPath = config.index_or_path
|
|
|
|
|
|
|
|
|
|
# Store the raw (capture) resolution from the config.
|
|
|
|
|
self.capture_width = config.width
|
|
|
|
|
self.capture_height = config.height
|
|
|
|
|
self.capture_width: int | None = config.width
|
|
|
|
|
self.capture_height: int | None = config.height
|
|
|
|
|
|
|
|
|
|
# If rotated by ±90, swap width and height.
|
|
|
|
|
if config.rotation in [-90, 90]:
|
|
|
|
|
self.width = config.height
|
|
|
|
|
self.height = config.width
|
|
|
|
|
self.width: int | None = config.height
|
|
|
|
|
self.height: int | None = config.width
|
|
|
|
|
else:
|
|
|
|
|
self.width = config.width
|
|
|
|
|
self.height = config.height
|
|
|
|
|
self.width: int | None = config.width
|
|
|
|
|
self.height: int | None = config.height
|
|
|
|
|
|
|
|
|
|
self.fps = config.fps
|
|
|
|
|
self.channels = config.channels
|
|
|
|
|
self.color_mode = config.color_mode
|
|
|
|
|
self.fps: int | None = config.fps
|
|
|
|
|
self.channels: int = config.channels
|
|
|
|
|
self.color_mode: ColorMode = config.color_mode
|
|
|
|
|
|
|
|
|
|
self.camera: cv2.VideoCapture | None = None
|
|
|
|
|
self.videocapture_camera: cv2.VideoCapture | None = None
|
|
|
|
|
self.thread: Thread | None = None
|
|
|
|
|
self.stop_event: Event | None = None
|
|
|
|
|
self.color_image = None
|
|
|
|
|
self.logs = {}
|
|
|
|
|
self.frame_queue: queue.Queue = queue.Queue(maxsize=1)
|
|
|
|
|
|
|
|
|
|
self.rotation = get_cv2_rotation(config.rotation)
|
|
|
|
|
self.backend = get_cv2_backend()
|
|
|
|
|
self.logs: dict = {} # NOTE(Steven): Might be removed in the future
|
|
|
|
|
|
|
|
|
|
self.rotation: int | None = get_cv2_rotation(config.rotation)
|
|
|
|
|
self.backend: int = get_cv2_backend()
|
|
|
|
|
|
|
|
|
|
def __str__(self) -> str:
|
|
|
|
|
"""Returns a string representation of the camera instance."""
|
|
|
|
|
return f"{self.__class__.__name__}({self.index_or_path})"
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
def is_connected(self) -> bool:
|
|
|
|
|
return self.camera.isOpened() if isinstance(self.camera, cv2.VideoCapture) else False
|
|
|
|
|
"""Checks if the camera is currently connected and opened."""
|
|
|
|
|
return isinstance(self.videocapture_camera, cv2.VideoCapture) and self.videocapture_camera.isOpened()
|
|
|
|
|
|
|
|
|
|
def _scan_available_cameras_and_raise(self, index_or_path: IndexOrPath) -> None:
|
|
|
|
|
"""
|
|
|
|
|
Scans for available cameras and raises an error if the specified
|
|
|
|
|
camera cannot be found or accessed.
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
index_or_path: The camera identifier that failed to connect.
|
|
|
|
|
|
|
|
|
|
Raises:
|
|
|
|
|
ValueError: If the specified `index_or_path` is not found among the
|
|
|
|
|
list of detected, usable cameras.
|
|
|
|
|
ConnectionError: If the camera connection fails (this specific method
|
|
|
|
|
is called within the connection failure context).
|
|
|
|
|
"""
|
|
|
|
|
cameras_info = self.find_cameras()
|
|
|
|
|
available_cam_ids = [cam["index"] for cam in cameras_info]
|
|
|
|
|
if index_or_path not in available_cam_ids:
|
|
|
|
|
raise ValueError(
|
|
|
|
|
f"Camera '{index_or_path}' not found or not accessible among available cameras: {available_cam_ids}. "
|
|
|
|
|
"Ensure the camera is properly connected and drivers are installed. "
|
|
|
|
|
"Run 'NOTE(Steven): Add script path' to list detected cameras."
|
|
|
|
|
)
|
|
|
|
|
raise ConnectionError(
|
|
|
|
|
f"Failed to connect to camera {self}. Even though it might be listed, it could not be opened."
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# NOTE(Steven): Moving it to a different function for now. To be evaluated later if it is worth it and if it makes sense to have it as an abstract method
|
|
|
|
|
def _configure_capture_settings(self, fps: int | None, width: int | None, height: int | None) -> None:
|
|
|
|
|
"""
|
|
|
|
|
Applies the specified FPS, width, and height settings to the connected camera.
|
|
|
|
|
|
|
|
|
|
This method attempts to set the camera properties via OpenCV. It checks if
|
|
|
|
|
the camera successfully applied the settings and raises an error if not.
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
fps: The desired frames per second. If None, the setting is skipped.
|
|
|
|
|
width: The desired capture width. If None, the setting is skipped.
|
|
|
|
|
height: The desired capture height. If None, the setting is skipped.
|
|
|
|
|
|
|
|
|
|
Raises:
|
|
|
|
|
RuntimeError: If the camera fails to set any of the specified properties
|
|
|
|
|
to the requested value.
|
|
|
|
|
DeviceNotConnectedError: If the camera is not connected when attempting
|
|
|
|
|
to configure settings.
|
|
|
|
|
"""
|
|
|
|
|
if not self.is_connected:
|
|
|
|
|
raise DeviceNotConnectedError(f"Cannot configure settings for {self} as it is not connected.")
|
|
|
|
|
|
|
|
|
|
if fps is not None:
|
|
|
|
|
self._set_fps(fps)
|
|
|
|
|
if width is not None:
|
|
|
|
|
self._set_capture_width(width)
|
|
|
|
|
if height is not None:
|
|
|
|
|
self._set_capture_height(height)
|
|
|
|
|
|
|
|
|
|
def connect(self):
|
|
|
|
|
"""
|
|
|
|
|
Connects to the camera device specified in the configuration.
|
|
|
|
|
|
|
|
|
|
Initializes the OpenCV VideoCapture object, sets desired camera properties
|
|
|
|
|
(FPS, width, height), and performs initial checks.
|
|
|
|
|
|
|
|
|
|
Raises:
|
|
|
|
|
DeviceAlreadyConnectedError: If the camera is already connected.
|
|
|
|
|
ValueError: If the specified camera index/path is not found or accessible.
|
|
|
|
|
ConnectionError: If the camera is found but fails to open.
|
|
|
|
|
RuntimeError: If the camera opens but fails to apply requested FPS/resolution settings.
|
|
|
|
|
"""
|
|
|
|
|
if self.is_connected:
|
|
|
|
|
raise DeviceAlreadyConnectedError(f"{self} is already connected.")
|
|
|
|
|
|
|
|
|
|
# Use 1 thread to avoid blocking the main thread. Especially useful during data collection
|
|
|
|
|
# when other threads are used to save the images.
|
|
|
|
|
# Use 1 thread for OpenCV operations to avoid potential conflicts or
|
|
|
|
|
# blocking in multi-threaded applications, especially during data collection.
|
|
|
|
|
cv2.setNumThreads(1)
|
|
|
|
|
|
|
|
|
|
self.camera = cv2.VideoCapture(self.index_or_path, self.backend)
|
|
|
|
|
logger.debug(f"Attempting to connect to camera {self.index_or_path} using backend {self.backend}...")
|
|
|
|
|
self.videocapture_camera = cv2.VideoCapture(self.index_or_path, self.backend)
|
|
|
|
|
|
|
|
|
|
# If the camera doesn't work, display the camera indices corresponding to valid cameras.
|
|
|
|
|
if not self.camera.isOpened():
|
|
|
|
|
# Release camera to make it accessible for `find_camera_indices`
|
|
|
|
|
self.camera.release()
|
|
|
|
|
# Verify that the provided `camera_index` is valid before printing the traceback
|
|
|
|
|
cameras_info = self.find_cameras()
|
|
|
|
|
available_cam_ids = [cam["index"] for cam in cameras_info]
|
|
|
|
|
if self.index_or_path not in available_cam_ids:
|
|
|
|
|
raise ValueError(
|
|
|
|
|
f"`camera_index` is expected to be one of these available cameras {available_cam_ids}, "
|
|
|
|
|
f"but {self.index_or_path} is provided instead. To find the camera index you should use, "
|
|
|
|
|
"run `python lerobot/common/robot_devices/cameras/opencv.py`."
|
|
|
|
|
)
|
|
|
|
|
if not self.videocapture_camera.isOpened():
|
|
|
|
|
logger.error(f"Failed to open camera {self.index_or_path}.")
|
|
|
|
|
self.videocapture_camera.release()
|
|
|
|
|
self.videocapture_camera = None
|
|
|
|
|
self._scan_available_cameras_and_raise(self.index_or_path) # This raises an exception everytime
|
|
|
|
|
|
|
|
|
|
raise ConnectionError(f"Can't access {self}.")
|
|
|
|
|
|
|
|
|
|
if self.fps is not None:
|
|
|
|
|
self._set_fps(self.fps)
|
|
|
|
|
if self.capture_width is not None:
|
|
|
|
|
self._set_capture_width(self.capture_width)
|
|
|
|
|
if self.capture_height is not None:
|
|
|
|
|
self._set_capture_height(self.capture_height)
|
|
|
|
|
logger.debug(f"Successfully opened camera {self.index_or_path}. Applying configuration...")
|
|
|
|
|
self._configure_capture_settings(self.fps, self.capture_width, self.capture_height)
|
|
|
|
|
logger.debug(f"Camera {self.index_or_path} connected and configured successfully.")
|
|
|
|
|
|
|
|
|
|
def _set_fps(self, fps: int) -> None:
|
|
|
|
|
self.camera.set(cv2.CAP_PROP_FPS, fps)
|
|
|
|
|
actual_fps = self.camera.get(cv2.CAP_PROP_FPS)
|
|
|
|
|
# Using `math.isclose` since actual fps can be a float (e.g. 29.9 instead of 30)
|
|
|
|
|
if not math.isclose(fps, actual_fps, rel_tol=1e-3):
|
|
|
|
|
raise RuntimeError(f"Can't set {fps=} for {self}. Actual value is {actual_fps}.")
|
|
|
|
|
"""Sets the camera's frames per second (FPS)."""
|
|
|
|
|
success = self.videocapture_camera.set(cv2.CAP_PROP_FPS, float(fps))
|
|
|
|
|
actual_fps = self.videocapture_camera.get(cv2.CAP_PROP_FPS)
|
|
|
|
|
# Use math.isclose for robust float comparison
|
|
|
|
|
if not success or not math.isclose(fps, actual_fps, rel_tol=1e-3):
|
|
|
|
|
logger.warning(
|
|
|
|
|
f"Requested FPS {fps} for {self}, but camera reported {actual_fps} (set success: {success}). "
|
|
|
|
|
"This might be due to camera limitations."
|
|
|
|
|
)
|
|
|
|
|
raise RuntimeError(
|
|
|
|
|
f"Failed to set requested FPS {fps} for {self}. Actual value reported: {actual_fps}."
|
|
|
|
|
)
|
|
|
|
|
logger.debug(f"FPS set to {actual_fps} for {self}.")
|
|
|
|
|
|
|
|
|
|
def _set_capture_width(self, capture_width: int) -> None:
|
|
|
|
|
self.camera.set(cv2.CAP_PROP_FRAME_WIDTH, capture_width)
|
|
|
|
|
actual_width = self.camera.get(cv2.CAP_PROP_FRAME_WIDTH)
|
|
|
|
|
if not math.isclose(self.capture_width, actual_width, rel_tol=1e-3):
|
|
|
|
|
raise RuntimeError(f"Can't set {capture_width=} for {self}. Actual value is {actual_width}.")
|
|
|
|
|
"""Sets the camera's frame capture width."""
|
|
|
|
|
success = self.videocapture_camera.set(cv2.CAP_PROP_FRAME_WIDTH, float(capture_width))
|
|
|
|
|
actual_width = round(self.videocapture_camera.get(cv2.CAP_PROP_FRAME_WIDTH))
|
|
|
|
|
if not success or self.capture_width != actual_width:
|
|
|
|
|
logger.warning(
|
|
|
|
|
f"Requested capture width {capture_width} for {self}, but camera reported {actual_width} (set success: {success})."
|
|
|
|
|
)
|
|
|
|
|
raise RuntimeError(
|
|
|
|
|
f"Failed to set requested capture width {capture_width} for {self}. Actual value: {actual_width}."
|
|
|
|
|
)
|
|
|
|
|
logger.debug(f"Capture width set to {actual_width} for {self}.")
|
|
|
|
|
|
|
|
|
|
def _set_capture_height(self, capture_height: int) -> None:
|
|
|
|
|
self.camera.set(cv2.CAP_PROP_FRAME_HEIGHT, capture_height)
|
|
|
|
|
actual_height = self.camera.get(cv2.CAP_PROP_FRAME_HEIGHT)
|
|
|
|
|
if not math.isclose(self.capture_height, actual_height, rel_tol=1e-3):
|
|
|
|
|
raise RuntimeError(f"Can't set {capture_height=} for {self}. Actual value is {actual_height}.")
|
|
|
|
|
"""Sets the camera's frame capture height."""
|
|
|
|
|
success = self.videocapture_camera.set(cv2.CAP_PROP_FRAME_HEIGHT, float(capture_height))
|
|
|
|
|
actual_height = round(self.videocapture_camera.get(cv2.CAP_PROP_FRAME_HEIGHT))
|
|
|
|
|
if not success or self.capture_height != actual_height:
|
|
|
|
|
logger.warning(
|
|
|
|
|
f"Requested capture height {capture_height} for {self}, but camera reported {actual_height} (set success: {success})."
|
|
|
|
|
)
|
|
|
|
|
raise RuntimeError(
|
|
|
|
|
f"Failed to set requested capture height {capture_height} for {self}. Actual value: {actual_height}."
|
|
|
|
|
)
|
|
|
|
|
logger.debug(f"Capture height set to {actual_height} for {self}.")
|
|
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
|
def find_cameras(max_index_search_range=MAX_OPENCV_INDEX) -> list[IndexOrPath]:
|
|
|
|
|
"""
|
|
|
|
|
Detects available cameras connected to the system.
|
|
|
|
|
|
|
|
|
|
On Linux, it scans '/dev/video*' paths. On other systems (like macOS, Windows),
|
|
|
|
|
it checks indices from 0 up to `max_index_search_range`.
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
max_index_search_range (int): The maximum index to check on non-Linux systems.
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
list[dict]: A list of dictionaries, where each dictionary contains information
|
|
|
|
|
about a found camera, primarily its 'index' (which can be an
|
|
|
|
|
integer index or a string path).
|
|
|
|
|
|
|
|
|
|
Note:
|
|
|
|
|
This method temporarily opens each potential camera to check availability,
|
|
|
|
|
which might briefly interfere with other applications using cameras.
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
if platform.system() == "Linux":
|
|
|
|
|
print("Linux detected. Finding available camera indices through scanning '/dev/video*' ports")
|
|
|
|
|
possible_idx_or_paths = [str(port) for port in Path("/dev").glob("video*")]
|
|
|
|
|
logger.info("Linux detected. Scanning '/dev/video*' device paths...")
|
|
|
|
|
possible_paths = sorted(Path("/dev").glob("video*"), key=lambda p: p.name)
|
|
|
|
|
targets_to_scan = [str(p) for p in possible_paths]
|
|
|
|
|
logger.debug(f"Found potential paths: {targets_to_scan}")
|
|
|
|
|
else:
|
|
|
|
|
print(
|
|
|
|
|
f"{platform.system()} system detected. Finding available camera indices through "
|
|
|
|
|
f"scanning all indices from 0 to {MAX_OPENCV_INDEX}"
|
|
|
|
|
logger.info(
|
|
|
|
|
f"{platform.system()} system detected. Scanning indices from 0 to {max_index_search_range}..."
|
|
|
|
|
)
|
|
|
|
|
possible_idx_or_paths = range(max_index_search_range)
|
|
|
|
|
targets_to_scan = list(range(max_index_search_range))
|
|
|
|
|
|
|
|
|
|
found_idx_or_paths = []
|
|
|
|
|
for target in possible_idx_or_paths:
|
|
|
|
|
camera = cv2.VideoCapture(target)
|
|
|
|
|
is_open = camera.isOpened()
|
|
|
|
|
camera.release()
|
|
|
|
|
if is_open:
|
|
|
|
|
print(f"Camera found at {target}")
|
|
|
|
|
found_idx_or_paths.append(target)
|
|
|
|
|
found_cameras = []
|
|
|
|
|
for target in targets_to_scan:
|
|
|
|
|
camera = cv2.VideoCapture(target, get_cv2_backend())
|
|
|
|
|
if camera.isOpened():
|
|
|
|
|
logger.debug(f"Camera found and accessible at '{target}'")
|
|
|
|
|
found_cameras.append(target)
|
|
|
|
|
camera.release()
|
|
|
|
|
|
|
|
|
|
return found_idx_or_paths
|
|
|
|
|
if not found_cameras:
|
|
|
|
|
logger.warning("No OpenCV cameras detected.")
|
|
|
|
|
else:
|
|
|
|
|
logger.info(f"Detected accessible cameras: {found_cameras}")
|
|
|
|
|
|
|
|
|
|
def read(self, temporary_color_mode: str | None = None) -> np.ndarray:
|
|
|
|
|
"""Read a frame from the camera returned in the format (height, width, channels)
|
|
|
|
|
(e.g. 480 x 640 x 3), contrarily to the pytorch format which is channel first.
|
|
|
|
|
return found_cameras
|
|
|
|
|
|
|
|
|
|
Note: Reading a frame is done every `camera.fps` times per second, and it is blocking.
|
|
|
|
|
If you are reading data from other sensors, we advise to use `camera.async_read()` which is non blocking version of `camera.read()`.
|
|
|
|
|
def read(self, color_mode: ColorMode | None = None) -> np.ndarray:
|
|
|
|
|
"""
|
|
|
|
|
Reads a single frame synchronously from the camera.
|
|
|
|
|
|
|
|
|
|
This is a blocking call. It waits for the next available frame from the
|
|
|
|
|
camera hardware via OpenCV.
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
color_mode (Optional[ColorMode]): If specified, overrides the default
|
|
|
|
|
color mode (`self.color_mode`) for this read operation (e.g.,
|
|
|
|
|
request RGB even if default is BGR).
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
np.ndarray: The captured frame as a NumPy array in the format
|
|
|
|
|
(height, width, channels), using the specified or default
|
|
|
|
|
color mode and applying any configured rotation.
|
|
|
|
|
|
|
|
|
|
Raises:
|
|
|
|
|
DeviceNotConnectedError: If the camera is not connected.
|
|
|
|
|
RuntimeError: If reading the frame from the camera fails or if the
|
|
|
|
|
received frame dimensions don't match expectations before rotation.
|
|
|
|
|
ValueError: If an invalid `color_mode` is requested.
|
|
|
|
|
"""
|
|
|
|
|
if not self.is_connected:
|
|
|
|
|
raise DeviceNotConnectedError(f"{self} is not connected.")
|
|
|
|
|
|
|
|
|
|
start = time.perf_counter()
|
|
|
|
|
start_time = time.perf_counter()
|
|
|
|
|
|
|
|
|
|
ret, color_image = self.camera.read()
|
|
|
|
|
if not ret:
|
|
|
|
|
raise RuntimeError(f"Can't capture color image from {self}.")
|
|
|
|
|
ret, frame = self.videocapture_camera.read()
|
|
|
|
|
|
|
|
|
|
self.color_image = self._postprocess_image(color_image, temporary_color_mode)
|
|
|
|
|
|
|
|
|
|
dt_ms = (time.perf_counter() - start) * 1e3
|
|
|
|
|
logger.debug(f"{self} read: {dt_ms:.1f}ms")
|
|
|
|
|
|
|
|
|
|
# log the utc time at which the image was received
|
|
|
|
|
self.logs["timestamp_utc"] = capture_timestamp_utc()
|
|
|
|
|
|
|
|
|
|
def _postprocess_image(self, image, temporary_color_mode: str | None = None):
|
|
|
|
|
requested_color_mode = self.color_mode if temporary_color_mode is None else temporary_color_mode
|
|
|
|
|
|
|
|
|
|
if requested_color_mode not in ["rgb", "bgr"]:
|
|
|
|
|
raise ValueError(
|
|
|
|
|
f"Expected color values are 'rgb' or 'bgr', but {requested_color_mode} is provided."
|
|
|
|
|
if not ret or frame is None:
|
|
|
|
|
raise RuntimeError(
|
|
|
|
|
f"Failed to capture frame from {self}. '.read()' returned status={ret} and frame is None."
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# OpenCV uses BGR format as default (blue, green, red) for all operations, including displaying images.
|
|
|
|
|
# However, Deep Learning framework such as LeRobot uses RGB format as default to train neural networks,
|
|
|
|
|
# so we convert the image color from BGR to RGB.
|
|
|
|
|
if requested_color_mode == "rgb":
|
|
|
|
|
color_image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
|
|
|
|
|
# Post-process the frame (color conversion, dimension check, rotation)
|
|
|
|
|
processed_frame = self._postprocess_image(frame, color_mode)
|
|
|
|
|
|
|
|
|
|
read_duration_ms = (time.perf_counter() - start_time) * 1e3
|
|
|
|
|
logger.debug(f"{self} synchronous read took: {read_duration_ms:.1f}ms")
|
|
|
|
|
|
|
|
|
|
self.logs["timestamp_utc"] = capture_timestamp_utc()
|
|
|
|
|
return processed_frame
|
|
|
|
|
|
|
|
|
|
def _postprocess_image(self, image: np.ndarray, color_mode: ColorMode | None = None) -> np.ndarray:
|
|
|
|
|
"""
|
|
|
|
|
Applies color conversion, dimension validation, and rotation to a raw frame.
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
image (np.ndarray): The raw image frame (expected BGR format from OpenCV).
|
|
|
|
|
color_mode (Optional[ColorMode]): The target color mode (RGB or BGR). If None,
|
|
|
|
|
uses the instance's default `self.color_mode`.
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
np.ndarray: The processed image frame.
|
|
|
|
|
|
|
|
|
|
Raises:
|
|
|
|
|
ValueError: If the requested `color_mode` is invalid.
|
|
|
|
|
RuntimeError: If the raw frame dimensions do not match the configured
|
|
|
|
|
`capture_width` and `capture_height`.
|
|
|
|
|
"""
|
|
|
|
|
requested_color_mode = self.color_mode if color_mode is None else color_mode
|
|
|
|
|
|
|
|
|
|
if requested_color_mode not in (ColorMode.RGB, ColorMode.BGR):
|
|
|
|
|
raise ValueError(
|
|
|
|
|
f"Invalid requested color mode '{requested_color_mode}'. Expected {ColorMode.RGB} or {ColorMode.BGR}."
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
h, w, c = image.shape
|
|
|
|
|
|
|
|
|
|
h, w, _ = color_image.shape
|
|
|
|
|
if h != self.capture_height or w != self.capture_width:
|
|
|
|
|
raise RuntimeError(
|
|
|
|
|
f"Can't capture color image with expected height and width ({self.height} x {self.width}). "
|
|
|
|
|
f"({h} x {w}) returned instead."
|
|
|
|
|
f"Captured frame dimensions ({h}x{w}) do not match configured capture dimensions ({self.capture_height}x{self.capture_width}) for {self}."
|
|
|
|
|
)
|
|
|
|
|
if c != self.channels:
|
|
|
|
|
logger.warning(
|
|
|
|
|
f"Captured frame channels ({c}) do not match configured channels ({self.channels}) for {self}."
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
if self.rotation is not None:
|
|
|
|
|
color_image = cv2.rotate(color_image, self.rotation)
|
|
|
|
|
processed_image = image
|
|
|
|
|
if requested_color_mode == ColorMode.RGB:
|
|
|
|
|
processed_image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
|
|
|
|
|
logger.debug(f"Converted frame from BGR to RGB for {self}.")
|
|
|
|
|
|
|
|
|
|
return color_image
|
|
|
|
|
if self.rotation is not None:
|
|
|
|
|
processed_image = cv2.rotate(processed_image, self.rotation)
|
|
|
|
|
logger.debug(f"Rotated frame by {self.config.rotation} degrees for {self}.")
|
|
|
|
|
|
|
|
|
|
return processed_image
|
|
|
|
|
|
|
|
|
|
def _read_loop(self):
|
|
|
|
|
"""
|
|
|
|
|
Internal loop run by the background thread for asynchronous reading.
|
|
|
|
|
|
|
|
|
|
Continuously reads frames from the camera using the synchronous `read()`
|
|
|
|
|
method and places the latest frame into the `frame_queue`. It overwrites
|
|
|
|
|
any previous frame in the queue to ensure only the most recent one is available.
|
|
|
|
|
"""
|
|
|
|
|
logger.debug(f"Starting read loop thread for {self}.")
|
|
|
|
|
while not self.stop_event.is_set():
|
|
|
|
|
try:
|
|
|
|
|
self.color_image = self.read()
|
|
|
|
|
except Exception as e:
|
|
|
|
|
print(f"Error reading in thread: {e}")
|
|
|
|
|
color_image = self.read()
|
|
|
|
|
|
|
|
|
|
def async_read(self):
|
|
|
|
|
with contextlib.suppress(queue.Empty):
|
|
|
|
|
_ = self.frame_queue.get_nowait()
|
|
|
|
|
self.frame_queue.put(color_image)
|
|
|
|
|
logger.debug(f"Frame placed in queue for {self}.")
|
|
|
|
|
|
|
|
|
|
except DeviceNotConnectedError:
|
|
|
|
|
logger.error(f"Read loop for {self} stopped: Camera disconnected.")
|
|
|
|
|
break
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.warning(f"Error reading frame in background thread for {self}: {e}")
|
|
|
|
|
|
|
|
|
|
logger.debug(f"Stopping read loop thread for {self}.")
|
|
|
|
|
|
|
|
|
|
def _ensure_read_thread_running(self):
|
|
|
|
|
"""Starts or restarts the background read thread if it's not running."""
|
|
|
|
|
logger.debug(f"Read thread for {self} is not running. Starting...")
|
|
|
|
|
if self.thread is not None:
|
|
|
|
|
self.thread.join(timeout=0.1)
|
|
|
|
|
if self.stop_event is not None:
|
|
|
|
|
self.stop_event.set()
|
|
|
|
|
|
|
|
|
|
self.stop_event = Event()
|
|
|
|
|
self.thread = Thread(
|
|
|
|
|
target=self._read_loop, args=(), name=f"OpenCVCameraReadLoop-{self}-{self.index_or_path}"
|
|
|
|
|
)
|
|
|
|
|
self.thread.daemon = True
|
|
|
|
|
self.thread.start()
|
|
|
|
|
logger.debug(f"Read thread started for {self}.")
|
|
|
|
|
|
|
|
|
|
def async_read(self, timeout_ms: float = 2000) -> np.ndarray:
|
|
|
|
|
"""
|
|
|
|
|
Reads the latest available frame asynchronously.
|
|
|
|
|
|
|
|
|
|
This method retrieves the most recent frame captured by the background
|
|
|
|
|
read thread. It does not block waiting for the camera hardware directly,
|
|
|
|
|
only waits for a frame to appear in the internal queue up to the specified
|
|
|
|
|
timeout.
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
timeout_ms (float): Maximum time in milliseconds to wait for a frame
|
|
|
|
|
to become available in the queue. Defaults to 2000ms (2 seconds).
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
np.ndarray: The latest captured frame as a NumPy array in the format
|
|
|
|
|
(height, width, channels), processed according to configuration.
|
|
|
|
|
|
|
|
|
|
Raises:
|
|
|
|
|
DeviceNotConnectedError: If the camera is not connected.
|
|
|
|
|
TimeoutError: If no frame becomes available within the specified timeout.
|
|
|
|
|
RuntimeError: If an unexpected error occurs while retrieving from the queue.
|
|
|
|
|
"""
|
|
|
|
|
if not self.is_connected:
|
|
|
|
|
raise DeviceNotConnectedError(f"{self} is not connected.")
|
|
|
|
|
|
|
|
|
|
if self.thread is None:
|
|
|
|
|
self.stop_event = Event()
|
|
|
|
|
self.thread = Thread(target=self._read_loop, args=())
|
|
|
|
|
self.thread.daemon = True
|
|
|
|
|
self.thread.start()
|
|
|
|
|
if self.thread is None or not self.thread.is_alive():
|
|
|
|
|
# Ensure the background reading thread is operational
|
|
|
|
|
self._ensure_read_thread_running()
|
|
|
|
|
|
|
|
|
|
num_tries = 0
|
|
|
|
|
while True:
|
|
|
|
|
if self.color_image is not None:
|
|
|
|
|
return self.color_image
|
|
|
|
|
try:
|
|
|
|
|
# Get the latest frame from the queue, waiting up to the timeout
|
|
|
|
|
return self.frame_queue.get(timeout=timeout_ms / 1000.0)
|
|
|
|
|
except queue.Empty as e:
|
|
|
|
|
thread_alive = self.thread is not None and self.thread.is_alive()
|
|
|
|
|
logger.error(
|
|
|
|
|
f"Timeout waiting for frame from {self} queue after {timeout_ms}ms. "
|
|
|
|
|
f"(Read thread alive: {thread_alive})"
|
|
|
|
|
)
|
|
|
|
|
raise TimeoutError(
|
|
|
|
|
f"Timed out waiting for frame from camera {self.index_or_path} after {timeout_ms} ms. "
|
|
|
|
|
f"Read thread alive: {thread_alive}."
|
|
|
|
|
) from e
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.exception(f"Unexpected error getting frame from queue for {self}: {e}")
|
|
|
|
|
raise RuntimeError(f"Error getting frame from queue for camera {self.index_or_path}: {e}") from e
|
|
|
|
|
|
|
|
|
|
time.sleep(1 / self.fps)
|
|
|
|
|
num_tries += 1
|
|
|
|
|
if num_tries > self.fps * 2:
|
|
|
|
|
raise TimeoutError("Timed out waiting for async_read() to start.")
|
|
|
|
|
def _shutdown_read_thread(self):
|
|
|
|
|
"""Cleans the background read thread if it's running."""
|
|
|
|
|
if self.stop_event is not None:
|
|
|
|
|
logger.debug(f"Signaling stop event for read thread of {self}.")
|
|
|
|
|
self.stop_event.set()
|
|
|
|
|
else:
|
|
|
|
|
logger.warning(f"Stop event not found for thread of {self}, cannot signal.")
|
|
|
|
|
|
|
|
|
|
logger.debug(f"Waiting for read thread of {self} to join...")
|
|
|
|
|
self.thread.join(timeout=2.0)
|
|
|
|
|
if self.thread.is_alive():
|
|
|
|
|
logger.warning(f"Read thread for {self} did not terminate gracefully after 2 seconds.")
|
|
|
|
|
else:
|
|
|
|
|
logger.debug(f"Read thread for {self} joined successfully.")
|
|
|
|
|
self.thread = None
|
|
|
|
|
self.stop_event = None
|
|
|
|
|
|
|
|
|
|
def disconnect(self):
|
|
|
|
|
if not self.is_connected:
|
|
|
|
|
raise DeviceNotConnectedError(f"{self} is not connected.")
|
|
|
|
|
"""
|
|
|
|
|
Disconnects from the camera and cleans up resources.
|
|
|
|
|
|
|
|
|
|
Stops the background read thread (if running) and releases the OpenCV
|
|
|
|
|
VideoCapture object.
|
|
|
|
|
|
|
|
|
|
Raises:
|
|
|
|
|
DeviceNotConnectedError: If the camera is already disconnected.
|
|
|
|
|
"""
|
|
|
|
|
if not self.is_connected and self.thread is None:
|
|
|
|
|
raise DeviceNotConnectedError(
|
|
|
|
|
f"Attempted to disconnect {self}, but it appears already disconnected."
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
logger.debug(f"Disconnecting from camera {self.index_or_path}...")
|
|
|
|
|
|
|
|
|
|
if self.thread is not None:
|
|
|
|
|
self.stop_event.set()
|
|
|
|
|
self.thread.join() # wait for the thread to finish
|
|
|
|
|
self.thread = None
|
|
|
|
|
self.stop_event = None
|
|
|
|
|
self._shutdown_read_thread()
|
|
|
|
|
|
|
|
|
|
self.camera.release()
|
|
|
|
|
self.camera = None
|
|
|
|
|
if self.videocapture_camera is not None:
|
|
|
|
|
logger.debug(f"Releasing OpenCV VideoCapture object for {self}.")
|
|
|
|
|
self.videocapture_camera.release()
|
|
|
|
|
self.videocapture_camera = None
|
|
|
|
|
else:
|
|
|
|
|
logger.debug(
|
|
|
|
|
f"No OpenCV VideoCapture object to release for {self} (was already None or failed connection)."
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
|
parser = argparse.ArgumentParser(
|
|
|
|
|
description="Save a few frames using `OpenCVCamera` for all cameras connected to the computer, or a selected subset."
|
|
|
|
|
)
|
|
|
|
|
parser.add_argument(
|
|
|
|
|
"--camera-ids",
|
|
|
|
|
type=int,
|
|
|
|
|
nargs="*",
|
|
|
|
|
default=None,
|
|
|
|
|
help="List of camera indices used to instantiate the `OpenCVCamera`. If not provided, find and use all available camera indices.",
|
|
|
|
|
)
|
|
|
|
|
parser.add_argument(
|
|
|
|
|
"--fps",
|
|
|
|
|
type=int,
|
|
|
|
|
default=None,
|
|
|
|
|
help="Set the number of frames recorded per seconds for all cameras. If not provided, use the default fps of each camera.",
|
|
|
|
|
)
|
|
|
|
|
parser.add_argument(
|
|
|
|
|
"--width",
|
|
|
|
|
type=int,
|
|
|
|
|
default=None,
|
|
|
|
|
help="Set the width for all cameras. If not provided, use the default width of each camera.",
|
|
|
|
|
)
|
|
|
|
|
parser.add_argument(
|
|
|
|
|
"--height",
|
|
|
|
|
type=int,
|
|
|
|
|
default=None,
|
|
|
|
|
help="Set the height for all cameras. If not provided, use the default height of each camera.",
|
|
|
|
|
)
|
|
|
|
|
parser.add_argument(
|
|
|
|
|
"--images-dir",
|
|
|
|
|
type=Path,
|
|
|
|
|
default="outputs/images_from_opencv_cameras",
|
|
|
|
|
help="Set directory to save a few frames for each camera.",
|
|
|
|
|
)
|
|
|
|
|
parser.add_argument(
|
|
|
|
|
"--record-time-s",
|
|
|
|
|
type=float,
|
|
|
|
|
default=4.0,
|
|
|
|
|
help="Set the number of seconds used to record the frames. By default, 2 seconds.",
|
|
|
|
|
)
|
|
|
|
|
args = parser.parse_args()
|
|
|
|
|
save_images_from_cameras(**vars(args))
|
|
|
|
|
logger.debug(f"Camera {self.index_or_path} disconnected successfully.")
|
|
|
|
|
|