refactor(cameras): improvements utils functionalities v0.2

This commit is contained in:
Steven Palma
2025-05-09 13:56:59 +02:00
parent 7f34e1af9c
commit ddd8fd325b
4 changed files with 227 additions and 152 deletions

View File

@@ -22,7 +22,7 @@ import math
import queue
import time
from threading import Event, Thread
from typing import Dict, List, Tuple, Union
from typing import Any, Dict, List, Tuple, Union
import cv2
import numpy as np
@@ -155,7 +155,7 @@ class RealSenseCamera(Camera):
return self.rs_pipeline is not None and self.rs_profile is not None
@staticmethod
def find_cameras(raise_when_empty: bool = True) -> List[Dict[str, Union[str, int, float]]]:
def find_cameras(raise_when_empty: bool = True) -> List[Dict[str, Any]]:
"""
Detects available Intel RealSense cameras connected to the system.
@@ -163,13 +163,14 @@ class RealSenseCamera(Camera):
raise_when_empty (bool): If True, raises an OSError if no cameras are found.
Returns:
List[Dict[str, Union[str, int, float]]]: A list of dictionaries,
where each dictionary contains 'type', 'serial_number', 'name',
firmware version, USB type, and other available specs.
List[Dict[str, Any]]: A list of dictionaries,
where each dictionary contains 'type', 'id' (serial number), 'name',
firmware version, USB type, and other available specs, and the default profile properties (width, height, fps, format).
Raises:
OSError: If `raise_when_empty` is True and no cameras are detected,
or if pyrealsense2 is not installed.
ImportError: If pyrealsense2 is not installed.
"""
found_cameras_info = []
context = rs.context()
@@ -185,19 +186,37 @@ class RealSenseCamera(Camera):
for device in devices:
camera_info = {
"name": device.get_info(rs.camera_info.name),
"type": "RealSense",
"serial_number": device.get_info(rs.camera_info.serial_number),
"id": device.get_info(rs.camera_info.serial_number),
"firmware_version": device.get_info(rs.camera_info.firmware_version),
"usb_type_descriptor": device.get_info(rs.camera_info.usb_type_descriptor),
"physical_port": device.get_info(rs.camera_info.physical_port),
"product_id": device.get_info(rs.camera_info.product_id),
"product_line": device.get_info(rs.camera_info.product_line),
"name": device.get_info(rs.camera_info.name),
}
# Get stream profiles for each sensor
sensors = device.query_sensors()
for sensor in sensors:
profiles = sensor.get_stream_profiles()
for profile in profiles:
if profile.is_video_stream_profile() and profile.is_default():
vprofile = profile.as_video_stream_profile()
stream_info = {
"stream_type": vprofile.stream_name(),
"format": vprofile.format().name,
"width": vprofile.width(),
"height": vprofile.height(),
"fps": vprofile.fps(),
}
camera_info["default_stream_profile"] = stream_info
found_cameras_info.append(camera_info)
logger.debug(f"Found RealSense camera: {camera_info}")
logger.info(f"Detected RealSense cameras: {[cam['serial_number'] for cam in found_cameras_info]}")
logger.info(f"Detected RealSense cameras: {[cam['id'] for cam in found_cameras_info]}")
return found_cameras_info
def _find_serial_number_from_name(self, name: str) -> str:

View File

@@ -69,11 +69,3 @@ class RealSenseCameraConfig(CameraConfig):
raise ValueError(
f"One of them must be set: name or serial_number, but {self.name=} and {self.serial_number=} provided."
)
at_least_one_is_not_none = self.fps is not None or self.width is not None or self.height is not None
at_least_one_is_none = self.fps is None or self.width is None or self.height is None
if at_least_one_is_not_none and at_least_one_is_none:
raise ValueError(
"For `fps`, `width` and `height`, either all of them need to be set, or none of them, "
f"but {self.fps=}, {self.width=}, {self.height=} were provided."
)

View File

@@ -24,7 +24,7 @@ import queue
import time
from pathlib import Path
from threading import Event, Thread
from typing import Dict, List, Union
from typing import Any, Dict, List
import cv2
import numpy as np
@@ -59,7 +59,7 @@ class OpenCVCamera(Camera):
or port changes, especially on Linux. Use the provided utility script to find
available camera indices or paths:
```bash
NOTE(Steven): Point to future util
python -m lerobot.find_cameras
```
The camera's default settings (FPS, resolution, color mode) are used unless
@@ -132,7 +132,7 @@ class OpenCVCamera(Camera):
self.logs: dict = {} # NOTE(Steven): Might be removed in the future
self.rotation: int | None = get_cv2_rotation(config.rotation)
self.backend: int = get_cv2_backend()
self.backend: int = get_cv2_backend() # NOTE(Steven): If I specify backend the opencv open fails
def __str__(self) -> str:
"""Returns a string representation of the camera instance."""
@@ -195,7 +195,7 @@ class OpenCVCamera(Camera):
cv2.setNumThreads(1)
logger.debug(f"Attempting to connect to camera {self.index_or_path} using backend {self.backend}...")
self.videocapture_camera = cv2.VideoCapture(self.index_or_path, self.backend)
self.videocapture_camera = cv2.VideoCapture(self.index_or_path)
if not self.videocapture_camera.isOpened():
self.videocapture_camera.release()
@@ -273,7 +273,7 @@ class OpenCVCamera(Camera):
@staticmethod
def find_cameras(
max_index_search_range=MAX_OPENCV_INDEX, raise_when_empty: bool = True
) -> List[Dict[str, Union[str, int, float]]]:
) -> List[Dict[str, Any]]:
"""
Detects available OpenCV cameras connected to the system.
@@ -285,9 +285,9 @@ class OpenCVCamera(Camera):
raise_when_empty (bool): If True, raises an OSError if no cameras are found.
Returns:
List[Dict[str, Union[str, int, float]]]: A list of dictionaries,
List[Dict[str, Any]]: A list of dictionaries,
where each dictionary contains 'type', 'id' (port index or path),
'default_width', 'default_height', and 'default_fps'.
and the default profile properties (width, height, fps, format).
"""
found_cameras_info = []
@@ -303,20 +303,25 @@ class OpenCVCamera(Camera):
targets_to_scan = list(range(max_index_search_range))
for target in targets_to_scan:
camera = cv2.VideoCapture(target, get_cv2_backend())
camera = cv2.VideoCapture(target)
if camera.isOpened():
default_width = int(camera.get(cv2.CAP_PROP_FRAME_WIDTH))
default_height = int(camera.get(cv2.CAP_PROP_FRAME_HEIGHT))
default_fps = camera.get(cv2.CAP_PROP_FPS)
default_format = camera.get(cv2.CAP_PROP_FORMAT)
camera_info = {
"name": f"OpenCV Camera @ {target}",
"type": "OpenCV",
"id": target,
"default_width": default_width,
"default_height": default_height,
"default_fps": default_fps,
"backend_api": camera.getBackendName(),
"default_stream_profile": {
"format": default_format,
"width": default_width,
"height": default_height,
"fps": default_fps,
},
}
found_cameras_info.append(camera_info)
logger.debug(f"Found OpenCV camera:: {camera_info}")
camera.release()