refactor(cameras): improvements utils functionalities v0.1

This commit is contained in:
Steven Palma
2025-05-07 18:28:25 +02:00
parent a26cfe34b0
commit 06e02f6d17
6 changed files with 378 additions and 390 deletions

View File

@@ -154,9 +154,8 @@ class RealSenseCamera(Camera):
"""Checks if the camera pipeline is started and streams are active."""
return self.rs_pipeline is not None and self.rs_profile is not None
# NOTE(Steven): Make it a class method and an util for calling it in utils.py (don't raise)
@staticmethod
def find_cameras(raise_when_empty: bool = True) -> List[Dict[str, str]]:
def find_cameras(raise_when_empty: bool = True) -> List[Dict[str, Union[str, int, float]]]:
"""
Detects available Intel RealSense cameras connected to the system.
@@ -164,38 +163,47 @@ class RealSenseCamera(Camera):
raise_when_empty (bool): If True, raises an OSError if no cameras are found.
Returns:
list[dict]: A list of dictionaries, where each dictionary contains
'serial_number' and 'name' of a found camera.
List[Dict[str, Union[str, int, float]]]: A list of dictionaries,
where each dictionary contains 'type', 'serial_number', 'name',
firmware version, USB type, and other available specs.
Raises:
OSError: If `raise_when_empty` is True and no cameras are detected.
OSError: If `raise_when_empty` is True and no cameras are detected,
or if pyrealsense2 is not installed.
"""
cameras_info = []
found_cameras_info = []
context = rs.context()
devices = context.query_devices()
if not devices:
logger.warning("No RealSense devices detected.")
if raise_when_empty:
raise OSError(
"No RealSense devices detected. Ensure cameras are connected, "
"drivers (`librealsense`) and wrapper (`pyrealsense2`) are installed, "
"and firmware is up-to-date."
"library (`pyrealsense2`) is installed, and firmware is up-to-date."
)
return []
for device in devices:
serial = device.get_info(rs.camera_info.serial_number)
name = device.get_info(rs.camera_info.name)
cameras_info.append({"serial_number": serial, "name": name})
logger.debug(f"Found RealSense camera: Name='{name}', SN='{serial}'")
camera_info = {
"type": "RealSense",
"serial_number": device.get_info(rs.camera_info.serial_number),
"firmware_version": device.get_info(rs.camera_info.firmware_version),
"usb_type_descriptor": device.get_info(rs.camera_info.usb_type_descriptor),
"physical_port": device.get_info(rs.camera_info.physical_port),
"product_id": device.get_info(rs.camera_info.product_id),
"product_line": device.get_info(rs.camera_info.product_line),
"name": device.get_info(rs.camera_info.name),
}
found_cameras_info.append(camera_info)
logger.debug(f"Found RealSense camera: {camera_info}")
logger.info(f"Detected RealSense cameras: {cameras_info}")
return cameras_info
logger.info(f"Detected RealSense cameras: {[cam['serial_number'] for cam in found_cameras_info]}")
return found_cameras_info
def _find_serial_number_from_name(self, name: str) -> str:
"""Finds the serial number for a given unique camera name."""
camera_infos = self.find_cameras(raise_when_empty=True) # Raises if none found
found_devices = [cam for cam in camera_infos if cam["name"] == name]
camera_infos = self.find_cameras(raise_when_empty=True)
found_devices = [cam for cam in camera_infos if str(cam["name"]) == name]
if not found_devices:
available_names = [cam["name"] for cam in camera_infos]
@@ -210,7 +218,7 @@ class RealSenseCamera(Camera):
f"Please use a unique serial number instead. Found SNs: {serial_numbers}"
)
serial_number = found_devices[0]["serial_number"]
serial_number = str(found_devices[0]["serial_number"])
logger.info(f"Found serial number '{serial_number}' for camera name '{name}'.")
return serial_number
@@ -265,7 +273,6 @@ class RealSenseCamera(Camera):
self._validate_capture_width()
self._validate_capture_height()
# Validate Depth Stream if enabled
if self.use_depth:
try:
depth_stream = self.rs_profile.get_stream(rs.stream.depth).as_video_stream_profile()
@@ -329,8 +336,9 @@ class RealSenseCamera(Camera):
self.rs_profile = None
self.rs_pipeline = None
raise ConnectionError(
f"Failed to open camera {self.serial_number}. Run NOTE(Steven): Add right file to scan for available cameras."
) from e # NOTE(Steven): Run find_cameras: available_sns = [cam["serial_number"] for cam in self.find_cameras(raise_when_empty=False)]
f"Failed to open RealSense camera {self.serial_number}. Error: {e}. "
f"Run 'python -m find_cameras list-cameras' for details."
) from e
logger.debug(f"Validating stream configuration for {self.serial_number}...")
self._validate_capture_settings()
@@ -362,14 +370,13 @@ class RealSenseCamera(Camera):
"""Validates and sets the internal capture width based on actual stream width."""
color_stream = self.rs_profile.get_stream(rs.stream.color).as_video_stream_profile()
actual_width = color_stream.width()
actual_width = int(round(color_stream.width()))
if self.capture_width is None:
self.capture_width = actual_width
logger.info(f"Capture width not specified, using camera default: {self.capture_width} pixels.")
return
actual_width = round(actual_width)
if self.capture_width != actual_width:
logger.warning(
f"Requested capture width {self.capture_width} for {self}, but camera reported {actual_width}."
@@ -383,14 +390,13 @@ class RealSenseCamera(Camera):
"""Validates and sets the internal capture height based on actual stream height."""
color_stream = self.rs_profile.get_stream(rs.stream.color).as_video_stream_profile()
actual_height = color_stream.height()
actual_height = int(round(color_stream.height()))
if self.capture_height is None:
self.capture_height = actual_height
logger.info(f"Capture height not specified, using camera default: {self.capture_height} pixels.")
return
actual_height = round(actual_height)
if self.capture_height != actual_height:
logger.warning(
f"Requested capture height {self.capture_height} for {self}, but camera reported {actual_height}."
@@ -531,7 +537,7 @@ class RealSenseCamera(Camera):
logger.debug(f"Starting read loop thread for {self}.")
while not self.stop_event.is_set():
try:
frame_data = self.read()
frame_data = self.read(timeout_ms=500)
with contextlib.suppress(queue.Empty):
_ = self.frame_queue.get_nowait()
@@ -548,7 +554,7 @@ class RealSenseCamera(Camera):
def _ensure_read_thread_running(self):
"""Starts or restarts the background read thread if it's not running."""
if self.thread is not None:
if self.thread is not None and self.thread.is_alive():
self.thread.join(timeout=0.1)
if self.stop_event is not None:
self.stop_event.set()
@@ -615,18 +621,14 @@ class RealSenseCamera(Camera):
if self.stop_event is not None:
logger.debug(f"Signaling stop event for read thread of {self}.")
self.stop_event.set()
else:
logger.warning(f"Read thread exists for {self}, but stop event is None. Cannot signal.")
if self.thread.is_alive():
if self.thread is not None and self.thread.is_alive():
logger.debug(f"Waiting for read thread of {self} to join...")
self.thread.join(timeout=2.0)
if self.thread.is_alive():
logger.warning(f"Read thread for {self} did not terminate gracefully after 2 seconds.")
else:
logger.debug(f"Read thread for {self} joined successfully.")
else:
logger.debug(f"Read thread for {self} was already stopped.")
self.thread = None
self.stop_event = None

View File

@@ -65,7 +65,6 @@ class RealSenseCameraConfig(CameraConfig):
if self.channels != 3:
raise NotImplementedError(f"Unsupported number of channels: {self.channels}")
# bool is stronger than is None, since it works with empty strings
if bool(self.name) and bool(self.serial_number):
raise ValueError(
f"One of them must be set: name or serial_number, but {self.name=} and {self.serial_number=} provided."

View File

@@ -24,6 +24,7 @@ import queue
import time
from pathlib import Path
from threading import Event, Thread
from typing import Dict, List, Union
import cv2
import numpy as np
@@ -142,34 +143,6 @@ class OpenCVCamera(Camera):
"""Checks if the camera is currently connected and opened."""
return isinstance(self.videocapture_camera, cv2.VideoCapture) and self.videocapture_camera.isOpened()
# NOTE(Steven): Make it a class method and an util for calling it in utils.py (don't raise)
def _scan_available_cameras_and_raise(self, index_or_path: IndexOrPath) -> None:
"""
Scans for available cameras and raises an error if the specified
camera cannot be found or accessed.
Args:
index_or_path: The camera identifier that failed to connect.
Raises:
ValueError: If the specified `index_or_path` is not found among the
list of detected, usable cameras.
ConnectionError: If the camera connection fails (this specific method
is called within the connection failure context).
"""
cameras_info = self.find_cameras()
available_cam_ids = [cam["index"] for cam in cameras_info]
if index_or_path not in available_cam_ids:
raise ValueError(
f"Camera '{index_or_path}' not found or not accessible among available cameras: {available_cam_ids}. "
"Ensure the camera is properly connected and drivers are installed. "
"Run 'NOTE(Steven): Add script path' to list detected cameras."
)
raise ConnectionError(
f"Failed to connect to camera {self}. Even though it might be listed, it could not be opened."
)
# NOTE(Steven): Moving it to a different function for now. To be evaluated later if it is worth it and if it makes sense to have it as an abstract method
def _configure_capture_settings(self) -> None:
"""
Applies the specified FPS, width, and height settings to the connected camera.
@@ -228,8 +201,9 @@ class OpenCVCamera(Camera):
self.videocapture_camera.release()
self.videocapture_camera = None
raise ConnectionError(
f"Failed to open camera {self.index_or_path}. Run NOTE(Steven): Add right file to scan for available cameras."
) # NOTE(Steven): Run this _scan_available_cameras_and_raise
f"Failed to open OpenCV camera {self.index_or_path}."
f"Run 'python -m find_cameras list-cameras' for details."
)
logger.debug(f"Successfully opened camera {self.index_or_path}. Applying configuration...")
self._configure_capture_settings()
@@ -259,13 +233,14 @@ class OpenCVCamera(Camera):
def _validate_capture_width(self) -> None:
"""Validates and sets the camera's frame capture width."""
actual_width = int(round(self.videocapture_camera.get(cv2.CAP_PROP_FRAME_WIDTH)))
if self.capture_width is None:
self.capture_width = self.videocapture_camera.get(cv2.CAP_PROP_FRAME_WIDTH)
self.capture_width = actual_width
logger.info(f"Capture width set to camera default: {self.capture_width}.")
return
success = self.videocapture_camera.set(cv2.CAP_PROP_FRAME_WIDTH, float(self.capture_width))
actual_width = round(self.videocapture_camera.get(cv2.CAP_PROP_FRAME_WIDTH))
if not success or self.capture_width != actual_width:
logger.warning(
f"Requested capture width {self.capture_width} for {self}, but camera reported {actual_width} (set success: {success})."
@@ -278,13 +253,14 @@ class OpenCVCamera(Camera):
def _validate_capture_height(self) -> None:
"""Validates and sets the camera's frame capture height."""
actual_height = int(round(self.videocapture_camera.get(cv2.CAP_PROP_FRAME_HEIGHT)))
if self.capture_height is None:
self.capture_height = self.videocapture_camera.get(cv2.CAP_PROP_FRAME_HEIGHT)
self.capture_height = actual_height
logger.info(f"Capture height set to camera default: {self.capture_height}.")
return
success = self.videocapture_camera.set(cv2.CAP_PROP_FRAME_HEIGHT, float(self.capture_height))
actual_height = round(self.videocapture_camera.get(cv2.CAP_PROP_FRAME_HEIGHT))
if not success or self.capture_height != actual_height:
logger.warning(
f"Requested capture height {self.capture_height} for {self}, but camera reported {actual_height} (set success: {success})."
@@ -295,25 +271,25 @@ class OpenCVCamera(Camera):
logger.debug(f"Capture height set to {actual_height} for {self}.")
@staticmethod
def find_cameras(max_index_search_range=MAX_OPENCV_INDEX) -> list[IndexOrPath]:
def find_cameras(
max_index_search_range=MAX_OPENCV_INDEX, raise_when_empty: bool = True
) -> List[Dict[str, Union[str, int, float]]]:
"""
Detects available cameras connected to the system.
Detects available OpenCV cameras connected to the system.
On Linux, it scans '/dev/video*' paths. On other systems (like macOS, Windows),
it checks indices from 0 up to `max_index_search_range`.
Args:
max_index_search_range (int): The maximum index to check on non-Linux systems.
raise_when_empty (bool): If True, raises an OSError if no cameras are found.
Returns:
list[dict]: A list of dictionaries, where each dictionary contains information
about a found camera, primarily its 'index' (which can be an
integer index or a string path).
Note:
This method temporarily opens each potential camera to check availability,
which might briefly interfere with other applications using cameras.
List[Dict[str, Union[str, int, float]]]: A list of dictionaries,
where each dictionary contains 'type', 'id' (port index or path),
'default_width', 'default_height', and 'default_fps'.
"""
found_cameras_info = []
if platform.system() == "Linux":
logger.info("Linux detected. Scanning '/dev/video*' device paths...")
@@ -326,20 +302,32 @@ class OpenCVCamera(Camera):
)
targets_to_scan = list(range(max_index_search_range))
found_cameras = []
for target in targets_to_scan:
camera = cv2.VideoCapture(target, get_cv2_backend())
if camera.isOpened():
logger.debug(f"Camera found and accessible at '{target}'")
found_cameras.append(target)
default_width = int(camera.get(cv2.CAP_PROP_FRAME_WIDTH))
default_height = int(camera.get(cv2.CAP_PROP_FRAME_HEIGHT))
default_fps = camera.get(cv2.CAP_PROP_FPS)
camera_info = {
"type": "OpenCV",
"id": target,
"default_width": default_width,
"default_height": default_height,
"default_fps": default_fps,
"backend_api": camera.getBackendName(),
}
found_cameras_info.append(camera_info)
logger.debug(f"Found OpenCV camera:: {camera_info}")
camera.release()
if not found_cameras:
logger.warning("No OpenCV cameras detected.")
else:
logger.info(f"Detected accessible cameras: {found_cameras}")
if not found_cameras_info:
logger.warning("No OpenCV devices detected.")
if raise_when_empty:
raise OSError("No OpenCV devices detected. Ensure cameras are connected.")
return found_cameras
logger.info(f"Detected OpenCV cameras: {[cam['id'] for cam in found_cameras_info]}")
return found_cameras_info
def read(self, color_mode: ColorMode | None = None) -> np.ndarray:
"""
@@ -460,7 +448,7 @@ class OpenCVCamera(Camera):
def _ensure_read_thread_running(self):
"""Starts or restarts the background read thread if it's not running."""
if self.thread is not None:
if self.thread is not None and self.thread.is_alive():
self.thread.join(timeout=0.1)
if self.stop_event is not None:
self.stop_event.set()
@@ -522,18 +510,14 @@ class OpenCVCamera(Camera):
if self.stop_event is not None:
logger.debug(f"Signaling stop event for read thread of {self}.")
self.stop_event.set()
else:
logger.warning(f"Stop event not found for thread of {self}, cannot signal.")
if self.thread.is_alive():
if self.thread is not None and self.thread.is_alive():
logger.debug(f"Waiting for read thread of {self} to join...")
self.thread.join(timeout=2.0)
if self.thread.is_alive():
logger.warning(f"Read thread for {self} did not terminate gracefully after 2 seconds.")
else:
logger.debug(f"Read thread for {self} joined successfully.")
else:
logger.debug(f"Read thread for {self} was already stopped.")
self.thread = None
self.stop_event = None

View File

@@ -71,301 +71,3 @@ def save_image(img_array: np.ndarray, camera_index: int, frame_index: int, image
path = images_dir / f"camera_{camera_index:02d}_frame_{frame_index:06d}.png"
path.parent.mkdir(parents=True, exist_ok=True)
img.save(str(path), quality=100)
# NOTE(Steven): This should be use with both cameras implementations
# def save_images_from_cameras(
# images_dir: Path,
# camera_idx_or_paths: list[IndexOrPath] | None = None,
# fps: int | None = None,
# width: int | None = None,
# height: int | None = None,
# record_time_s: int = 2,
# ):
# """
# Initializes all the cameras and saves images to the directory. Useful to visually identify the camera
# associated to a given camera index.
# """
# if not camera_idx_or_paths:
# camera_idx_or_paths = OpenCVCamera.find_cameras()
# if len(camera_idx_or_paths) == 0:
# raise RuntimeError(
# "Not a single camera was detected. Try re-plugging, or re-installing `opencv-python`, "
# "or your camera driver, or make sure your camera is compatible with opencv."
# )
# print("Connecting cameras")
# cameras = []
# for idx_or_path in camera_idx_or_paths:
# config = OpenCVCameraConfig(index_or_path=idx_or_path, fps=fps, width=width, height=height)
# camera = OpenCVCamera(config)
# camera.connect()
# print(
# f"OpenCVCamera({camera.index_or_path}, fps={camera.fps}, width={camera.capture_width}, "
# f"height={camera.capture_height}, color_mode={camera.color_mode})"
# )
# cameras.append(camera)
# images_dir = Path(images_dir)
# if images_dir.exists():
# shutil.rmtree(
# images_dir,
# )
# images_dir.mkdir(parents=True, exist_ok=True)
# print(f"Saving images to {images_dir}")
# frame_index = 0
# start_time = time.perf_counter()
# with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
# while True:
# now = time.perf_counter()
# for camera in cameras:
# # If we use async_read when fps is None, the loop will go full speed, and we will endup
# # saving the same images from the cameras multiple times until the RAM/disk is full.
# image = camera.read() if fps is None else camera.async_read()
# executor.submit(
# save_image,
# image,
# camera.camera_index,
# frame_index,
# images_dir,
# )
# if fps is not None:
# dt_s = time.perf_counter() - now
# busy_wait(1 / fps - dt_s)
# print(f"Frame: {frame_index:04d}\tLatency (ms): {(time.perf_counter() - now) * 1000:.2f}")
# if time.perf_counter() - start_time > record_time_s:
# break
# frame_index += 1
# print(f"Images have been saved to {images_dir}")
# # NOTE(Steven): Cameras don't get disconnected
# # NOTE(Steven): Update this to be valid for both cameras type
# if __name__ == "__main__":
# parser = argparse.ArgumentParser(
# description="Save a few frames using `OpenCVCamera` for all cameras connected to the computer, or a selected subset."
# )
# parser.add_argument(
# "--camera-ids",
# type=int,
# nargs="*",
# default=None,
# help="List of camera indices used to instantiate the `OpenCVCamera`. If not provided, find and use all available camera indices.",
# )
# parser.add_argument(
# "--fps",
# type=int,
# default=None,
# help="Set the number of frames recorded per seconds for all cameras. If not provided, use the default fps of each camera.",
# )
# parser.add_argument(
# "--width",
# type=str,
# default=None,
# help="Set the width for all cameras. If not provided, use the default width of each camera.",
# )
# parser.add_argument(
# "--height",
# type=str,
# default=None,
# help="Set the height for all cameras. If not provided, use the default height of each camera.",
# )
# parser.add_argument(
# "--images-dir",
# type=Path,
# default="outputs/images_from_opencv_cameras",
# help="Set directory to save a few frames for each camera.",
# )
# parser.add_argument(
# "--record-time-s",
# type=float,
# default=4.0,
# help="Set the number of seconds used to record the frames. By default, 2 seconds.",
# )
# args = parser.parse_args()
# save_images_from_cameras(**vars(args))
### Realsense
# def find_realsense_cameras(raise_when_empty: bool = True) -> list[dict]:
# """
# Find the names and the serial numbers of the Intel RealSense cameras
# connected to the computer.
# """
# import pyrealsense2 as rs
# cameras = []
# for device in rs.context().query_devices():
# serial_number = int(device.get_info(rs.camera_info(SERIAL_NUMBER_INDEX)))
# name = device.get_info(rs.camera_info.name)
# cameras.append(
# {
# "serial_number": serial_number,
# "name": name,
# }
# )
# if raise_when_empty and len(cameras) == 0:
# raise OSError(
# "Not a single camera was detected. Try re-plugging, or re-installing `librealsense` and its python wrapper `pyrealsense2`, or updating the firmware."
# )
# return cameras
# def save_image(img_array, serial_number, frame_index, images_dir):
# try:
# img = Image.fromarray(img_array)
# path = images_dir / f"camera_{serial_number}_frame_{frame_index:06d}.png"
# path.parent.mkdir(parents=True, exist_ok=True)
# img.save(str(path), quality=100)
# logging.info(f"Saved image: {path}")
# except Exception as e:
# logging.error(f"Failed to save image for camera {serial_number} frame {frame_index}: {e}")
# def save_images_from_cameras(
# images_dir: Path,
# serial_numbers: list[int] | None = None,
# fps=None,
# width=None,
# height=None,
# record_time_s=2,
# ):
# """
# Initializes all the cameras and saves images to the directory. Useful to visually identify the camera
# associated to a given serial number.
# """
# if serial_numbers is None or len(serial_numbers) == 0:
# camera_infos = find_realsense_cameras()
# serial_numbers = [cam["serial_number"] for cam in camera_infos]
# import cv2
# print("Connecting cameras")
# cameras = []
# for cam_sn in serial_numbers:
# print(f"{cam_sn=}")
# config = RealSenseCameraConfig(serial_number=cam_sn, fps=fps, width=width, height=height)
# camera = RealSenseCamera(config)
# camera.connect()
# print(
# f"RealSenseCamera({camera.serial_number}, fps={camera.fps}, width={camera.capture_width}, height={camera.capture_height}, color_mode={camera.color_mode})"
# )
# cameras.append(camera)
# images_dir = Path(images_dir)
# if images_dir.exists():
# shutil.rmtree(
# images_dir,
# )
# images_dir.mkdir(parents=True, exist_ok=True)
# print(f"Saving images to {images_dir}")
# frame_index = 0
# start_time = time.perf_counter()
# try:
# with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
# while True:
# now = time.perf_counter()
# for camera in cameras:
# # If we use async_read when fps is None, the loop will go full speed, and we will end up
# # saving the same images from the cameras multiple times until the RAM/disk is full.
# image = camera.read() if fps is None else camera.async_read()
# if image is None:
# print("No Frame")
# bgr_converted_image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
# executor.submit(
# save_image,
# bgr_converted_image,
# camera.serial_number,
# frame_index,
# images_dir,
# )
# if fps is not None:
# dt_s = time.perf_counter() - now
# busy_wait(1 / fps - dt_s)
# if time.perf_counter() - start_time > record_time_s:
# break
# print(f"Frame: {frame_index:04d}\tLatency (ms): {(time.perf_counter() - now) * 1000:.2f}")
# frame_index += 1
# finally:
# print(f"Images have been saved to {images_dir}")
# for camera in cameras:
# camera.disconnect()
# def find_serial_number_from_name(name):
# camera_infos = find_realsense_cameras()
# camera_names = [cam["name"] for cam in camera_infos]
# this_name_count = Counter(camera_names)[name]
# if this_name_count > 1:
# # TODO(aliberts): Test this with multiple identical cameras (Aloha)
# raise ValueError(
# f"Multiple {name} cameras have been detected. Please use their serial number to instantiate them."
# )
# name_to_serial_dict = {cam["name"]: cam["serial_number"] for cam in camera_infos}
# cam_sn = name_to_serial_dict[name]
# return cam_sn
# if __name__ == "__main__":
# parser = argparse.ArgumentParser(
# description="Save a few frames using `RealSenseCamera` for all cameras connected to the computer, or a selected subset."
# )
# parser.add_argument(
# "--serial-numbers",
# type=int,
# nargs="*",
# default=None,
# help="List of serial numbers used to instantiate the `RealSenseCamera`. If not provided, find and use all available camera indices.",
# )
# parser.add_argument(
# "--fps",
# type=int,
# default=30,
# help="Set the number of frames recorded per seconds for all cameras. If not provided, use the default fps of each camera.",
# )
# parser.add_argument(
# "--width",
# type=str,
# default=640,
# help="Set the width for all cameras. If not provided, use the default width of each camera.",
# )
# parser.add_argument(
# "--height",
# type=str,
# default=480,
# help="Set the height for all cameras. If not provided, use the default height of each camera.",
# )
# parser.add_argument(
# "--images-dir",
# type=Path,
# default="outputs/images_from_intelrealsense_cameras",
# help="Set directory to save a few frames for each camera.",
# )
# parser.add_argument(
# "--record-time-s",
# type=float,
# default=2.0,
# help="Set the number of seconds used to record the frames. By default, 2 seconds.",
# )
# args = parser.parse_args()
# save_images_from_cameras(**vars(args))

View File

@@ -1 +1,302 @@
# NOTE(Steven): Place here the code for save_img (valid for both imnplementations:: python -m lerobot.find_cameras)
#!/usr/bin/env python
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import concurrent.futures
import logging
import shutil
import time
from pathlib import Path
from typing import Dict, List, Union
import numpy as np
from PIL import Image
from lerobot.common.cameras.configs import ColorMode
from lerobot.common.cameras.intel.camera_realsense import RealSenseCamera
from lerobot.common.cameras.intel.configuration_realsense import RealSenseCameraConfig
from lerobot.common.cameras.opencv.camera_opencv import OpenCVCamera
from lerobot.common.cameras.opencv.configuration_opencv import OpenCVCameraConfig
logger = logging.getLogger(__name__)
def find_all_opencv_cameras() -> List[Dict[str, Union[str, int, float, List[str], None]]]:
"""
Finds all available OpenCV cameras plugged into the system.
Returns:
A list of all available OpenCV cameras with their metadata.
"""
all_opencv_cameras_info: List[Dict[str, Union[str, int, float, List[str], None]]] = []
logger.info("Searching for OpenCV cameras...")
try:
opencv_cameras = OpenCVCamera.find_cameras(raise_when_empty=False)
for cam_info in opencv_cameras:
cam_info.setdefault("name", f"OpenCV Camera @ {cam_info['id']}")
all_opencv_cameras_info.append(cam_info)
logger.info(f"Found {len(opencv_cameras)} OpenCV cameras.")
except Exception as e:
logger.error(f"Error finding OpenCV cameras: {e}")
return all_opencv_cameras_info
def find_all_realsense_cameras() -> List[Dict[str, Union[str, int, float, List[str], None]]]:
"""
Finds all available RealSense cameras plugged into the system.
Returns:
A list of all available RealSense cameras with their metadata.
"""
all_realsense_cameras_info: List[Dict[str, Union[str, int, float, List[str], None]]] = []
logger.info("Searching for RealSense cameras...")
try:
realsense_cameras = RealSenseCamera.find_cameras(raise_when_empty=False)
for cam_info in realsense_cameras:
all_realsense_cameras_info.append(cam_info)
logger.info(f"Found {len(realsense_cameras)} RealSense cameras.")
except ImportError:
logger.warning("Skipping RealSense camera search: pyrealsense2 library not found or not importable.")
except Exception as e:
logger.error(f"Error finding RealSense cameras: {e}")
return all_realsense_cameras_info
def find_all_cameras() -> List[Dict[str, Union[str, int, float, List[str], None]]]:
"""
Finds all available cameras (OpenCV and RealSense) plugged into the system.
Returns:
A unified list of all available cameras with their metadata.
"""
all_opencv_cameras_info = find_all_opencv_cameras()
all_realsense_cameras_info = find_all_realsense_cameras()
all_cameras_info = all_opencv_cameras_info + all_realsense_cameras_info
if not all_cameras_info:
logger.warning("No cameras (OpenCV or RealSense) were detected.")
else:
print("\n--- Detected Cameras ---")
for i, cam_info in enumerate(all_cameras_info):
print(f"Camera #{i + 1}:")
for key, value in cam_info.items():
print(f" {key.replace('_', ' ').capitalize()}: {value}")
print("-" * 20)
return all_cameras_info
def save_image(
img_array: np.ndarray,
camera_identifier: Union[str, int],
images_dir: Path,
camera_type: str,
):
"""
Saves a single image to disk using Pillow. Handles color conversion if necessary.
"""
try:
img = Image.fromarray(img_array, mode="RGB")
safe_identifier = str(camera_identifier).replace("/", "_").replace("\\", "_")
filename_prefix = f"{camera_type.lower()}_{safe_identifier}"
filename = f"{filename_prefix}.png"
path = images_dir / filename
path.parent.mkdir(parents=True, exist_ok=True)
img.save(str(path))
logger.info(f"Saved image: {path}")
except Exception as e:
logger.error(f"Failed to save image for camera {camera_identifier} (type {camera_type}): {e}")
def save_images_from_all_cameras(
output_dir: Union[str, Path],
width: int = 640,
height: int = 480,
record_time_s: int = 2,
):
"""
Connects to all detected cameras and saves a few images from each.
Args:
output_dir: Directory to save images.
width: Target width.
height: Target height.
record_time_s: Duration in seconds to record images.
"""
output_dir = Path(output_dir)
if output_dir.exists():
logger.info(f"Output directory {output_dir} exists. Removing previous content.")
shutil.rmtree(output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
logger.info(f"Saving images to {output_dir}")
all_camera_metadata = find_all_cameras()
if not all_camera_metadata:
logger.warning("No cameras detected. Cannot save images.")
return
cameras_to_use = []
for cam_meta in all_camera_metadata:
cam_type = cam_meta.get("type")
cam_id = cam_meta.get("id")
instance = None
try:
if cam_type == "OpenCV":
cv_config = OpenCVCameraConfig(
index_or_path=cam_id, color_mode=ColorMode.RGB, width=width, height=height, fps=30
)
instance = OpenCVCamera(cv_config)
elif cam_type == "RealSense":
rs_config = RealSenseCameraConfig(
serial_number=str(cam_id), width=width, height=height, fps=30
)
instance = RealSenseCamera(rs_config)
else:
logger.warning(f"Unknown camera type: {cam_type} for ID {cam_id}. Skipping.")
continue
if instance:
logger.info(f"Connecting to {cam_type} camera: {cam_id}...")
instance.connect()
cameras_to_use.append({"instance": instance, "meta": cam_meta})
except Exception as e:
logger.error(f"Failed to connect or configure {cam_type} camera {cam_id}: {e}")
if instance and instance.is_connected:
instance.disconnect()
if not cameras_to_use:
logger.warning("No cameras could be connected. Aborting image save.")
return
logger.info(f"Starting image capture for {record_time_s} seconds from {len(cameras_to_use)} cameras.")
frame_index = 0
start_time = time.perf_counter()
with concurrent.futures.ThreadPoolExecutor(max_workers=len(cameras_to_use) * 2) as executor:
try:
while time.perf_counter() - start_time < record_time_s:
futures = []
for cam_dict in cameras_to_use:
cam = cam_dict["instance"]
meta = cam_dict["meta"]
cam_type_str = str(meta.get("type", "unknown"))
cam_id_str = str(meta.get("id", "unknown"))
try:
image_data = cam.read()
if image_data is None:
logger.warning(
f"No frame received from {cam_type_str} camera {cam_id_str} for frame {frame_index}."
)
continue
futures.append(
executor.submit(
save_image,
image_data,
cam_id_str,
output_dir,
cam_type_str,
)
)
except TimeoutError:
logger.warning(
f"Timeout reading from {cam_type_str} camera {cam_id_str} for frame {frame_index}."
)
except Exception as e:
logger.error(f"Error reading from {cam_type_str} camera {cam_id_str}: {e}")
concurrent.futures.wait(futures)
except KeyboardInterrupt:
logger.info("Capture interrupted by user.")
finally:
print("\nFinalizing image saving...")
executor.shutdown(wait=True)
logger.info(f"Disconnecting {len(cameras_to_use)} cameras...")
for cam_dict in cameras_to_use:
try:
if cam_dict["instance"] and cam_dict["instance"].is_connected:
cam_dict["instance"].disconnect()
except Exception as e:
logger.error(f"Error disconnecting camera {cam_dict['meta'].get('id')}: {e}")
logger.info(f"Image capture finished. Images saved to {output_dir}")
# NOTE(Steven): Add CLI for finding-cameras of just one type
# NOTE(Steven): Check why opencv detects realsense cameras
# NOTE(Steven): Check why saving cameras is buggy
# NOTE(Steven): Check how to deal with different resolutions macos
# NOTE(Steven): Ditch width height resolutions in favor of defaults
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Unified camera utility script for listing cameras and capturing images."
)
subparsers = parser.add_subparsers(dest="command", required=True, help="Available commands")
# List cameras command
list_parser = subparsers.add_parser(
"list-cameras", help="Shows all connected cameras (OpenCV and RealSense)"
)
list_parser.set_defaults(func=lambda args: find_all_cameras())
# Capture images command
capture_parser = subparsers.add_parser("capture-images", help="Saves images from all detected cameras")
capture_parser.add_argument(
"--output-dir",
type=Path,
default="outputs/captured_images",
help="Directory to save images. Default: outputs/captured_images",
)
capture_parser.add_argument(
"--width",
type=int,
default=1920,
help="Set the capture width for all cameras. If not provided, uses camera defaults.",
)
capture_parser.add_argument(
"--height",
type=int,
default=1080,
help="Set the capture height for all cameras. If not provided, uses camera defaults.",
)
capture_parser.add_argument(
"--record-time-s",
type=float,
default=10.0,
help="Set the number of seconds to record frames. Default: 2.0 seconds.",
)
capture_parser.set_defaults(
func=lambda args: save_images_from_all_cameras(
output_dir=args.output_dir,
width=args.width,
height=args.height,
record_time_s=args.record_time_s,
)
)
args = parser.parse_args()
args.func(args)

View File

@@ -86,7 +86,7 @@ dynamixel = ["dynamixel-sdk>=3.7.31"]
feetech = ["feetech-servo-sdk>=1.0.0"]
intelrealsense = [
"pyrealsense2>=2.55.1.6486 ; sys_platform != 'darwin'",
"pyrealsense2-macosx>=2.54 ; sys_platform == 'darwin'",
"pyrealsense2-macosx>=2.54 ; sys_platform == 'darwin'", #NOTE(Steven): Check previous version for sudo issue
]
pi0 = ["transformers>=4.48.0"]
pusht = ["gym-pusht>=0.1.5 ; python_version < '4.0'"]