refactor(cameras): realsense camera init

This commit is contained in:
Simon Alibert
2025-04-21 16:14:51 +02:00
committed by Steven Palma
parent 0003fa928a
commit 656c382cac
2 changed files with 30 additions and 50 deletions

View File

@@ -38,20 +38,18 @@ from lerobot.common.utils.robot_utils import (
from lerobot.common.utils.utils import capture_timestamp_utc
from ..camera import Camera
from ..utils import get_cv2_rotation
from .configuration_realsense import RealSenseCameraConfig
SERIAL_NUMBER_INDEX = 1
def find_cameras(raise_when_empty=True, mock=False) -> list[dict]:
def find_realsense_cameras(raise_when_empty: bool = True) -> list[dict]:
"""
Find the names and the serial numbers of the Intel RealSense cameras
connected to the computer.
"""
if mock:
import tests.cameras.mock_pyrealsense2 as rs
else:
import pyrealsense2 as rs
import pyrealsense2 as rs
cameras = []
for device in rs.context().query_devices():
@@ -90,26 +88,22 @@ def save_images_from_cameras(
width=None,
height=None,
record_time_s=2,
mock=False,
):
"""
Initializes all the cameras and saves images to the directory. Useful to visually identify the camera
associated to a given serial number.
"""
if serial_numbers is None or len(serial_numbers) == 0:
camera_infos = find_cameras(mock=mock)
camera_infos = find_realsense_cameras()
serial_numbers = [cam["serial_number"] for cam in camera_infos]
if mock:
import tests.cameras.mock_cv2 as cv2
else:
import cv2
import cv2
print("Connecting cameras")
cameras = []
for cam_sn in serial_numbers:
print(f"{cam_sn=}")
config = RealSenseCameraConfig(serial_number=cam_sn, fps=fps, width=width, height=height, mock=mock)
config = RealSenseCameraConfig(serial_number=cam_sn, fps=fps, width=width, height=height)
camera = RealSenseCamera(config)
camera.connect()
print(
@@ -165,6 +159,22 @@ def save_images_from_cameras(
camera.disconnect()
def find_serial_number_from_name(name):
camera_infos = find_realsense_cameras()
camera_names = [cam["name"] for cam in camera_infos]
this_name_count = Counter(camera_names)[name]
if this_name_count > 1:
# TODO(aliberts): Test this with multiple identical cameras (Aloha)
raise ValueError(
f"Multiple {name} cameras have been detected. Please use their serial number to instantiate them."
)
name_to_serial_dict = {cam["name"]: cam["serial_number"] for cam in camera_infos}
cam_sn = name_to_serial_dict[name]
return cam_sn
class RealSenseCamera(Camera):
"""
The RealSenseCamera class is similar to OpenCVCamera class but adds additional features for Intel Real Sense cameras:
@@ -220,7 +230,7 @@ class RealSenseCamera(Camera):
):
self.config = config
if config.name is not None:
self.serial_number = self.find_serial_number_from_name(config.name)
self.serial_number = find_serial_number_from_name(config.name)
else:
self.serial_number = config.serial_number
@@ -241,7 +251,6 @@ class RealSenseCamera(Camera):
self.color_mode = config.color_mode
self.use_depth = config.use_depth
self.force_hardware_reset = config.force_hardware_reset
self.mock = config.mock
self.camera = None
self.is_connected = False
@@ -251,42 +260,17 @@ class RealSenseCamera(Camera):
self.depth_map = None
self.logs = {}
if self.mock:
import tests.cameras.mock_cv2 as cv2
else:
import cv2
self.rotation = get_cv2_rotation(config.rotation)
self.rotation = None
if config.rotation == -90:
self.rotation = cv2.ROTATE_90_COUNTERCLOCKWISE
elif config.rotation == 90:
self.rotation = cv2.ROTATE_90_CLOCKWISE
elif config.rotation == 180:
self.rotation = cv2.ROTATE_180
def find_serial_number_from_name(self, name):
camera_infos = find_cameras()
camera_names = [cam["name"] for cam in camera_infos]
this_name_count = Counter(camera_names)[name]
if this_name_count > 1:
# TODO(aliberts): Test this with multiple identical cameras (Aloha)
raise ValueError(
f"Multiple {name} cameras have been detected. Please use their serial number to instantiate them."
)
name_to_serial_dict = {cam["name"]: cam["serial_number"] for cam in camera_infos}
cam_sn = name_to_serial_dict[name]
return cam_sn
@property
def is_connected(self) -> bool:
pass
def connect(self):
if self.is_connected:
raise DeviceAlreadyConnectedError(f"RealSenseCamera({self.serial_number}) is already connected.")
if self.mock:
import tests.cameras.mock_pyrealsense2 as rs
else:
import pyrealsense2 as rs
import pyrealsense2 as rs
config = rs.config()
config.enable_device(str(self.serial_number))
@@ -319,7 +303,7 @@ class RealSenseCamera(Camera):
# valid cameras.
if not is_camera_open:
# Verify that the provided `serial_number` is valid before printing the traceback
camera_infos = find_cameras()
camera_infos = find_realsense_cameras()
serial_numbers = [cam["serial_number"] for cam in camera_infos]
if self.serial_number not in serial_numbers:
raise ValueError(
@@ -371,10 +355,7 @@ class RealSenseCamera(Camera):
f"RealSenseCamera({self.serial_number}) is not connected. Try running `camera.connect()` first."
)
if self.mock:
import tests.cameras.mock_cv2 as cv2
else:
import cv2
import cv2
start_time = time.perf_counter()

View File

@@ -43,7 +43,6 @@ class RealSenseCameraConfig(CameraConfig):
use_depth: bool = False
force_hardware_reset: bool = True
rotation: int | None = None
mock: bool = False
def __post_init__(self):
# bool is stronger than is None, since it works with empty strings