diff --git a/lerobot/common/cameras/intel/camera_realsense.py b/lerobot/common/cameras/intel/camera_realsense.py index cd41286f2..f7657e7d0 100644 --- a/lerobot/common/cameras/intel/camera_realsense.py +++ b/lerobot/common/cameras/intel/camera_realsense.py @@ -38,20 +38,18 @@ from lerobot.common.utils.robot_utils import ( from lerobot.common.utils.utils import capture_timestamp_utc from ..camera import Camera +from ..utils import get_cv2_rotation from .configuration_realsense import RealSenseCameraConfig SERIAL_NUMBER_INDEX = 1 -def find_cameras(raise_when_empty=True, mock=False) -> list[dict]: +def find_realsense_cameras(raise_when_empty: bool = True) -> list[dict]: """ Find the names and the serial numbers of the Intel RealSense cameras connected to the computer. """ - if mock: - import tests.cameras.mock_pyrealsense2 as rs - else: - import pyrealsense2 as rs + import pyrealsense2 as rs cameras = [] for device in rs.context().query_devices(): @@ -90,26 +88,22 @@ def save_images_from_cameras( width=None, height=None, record_time_s=2, - mock=False, ): """ Initializes all the cameras and saves images to the directory. Useful to visually identify the camera associated to a given serial number. """ if serial_numbers is None or len(serial_numbers) == 0: - camera_infos = find_cameras(mock=mock) + camera_infos = find_realsense_cameras() serial_numbers = [cam["serial_number"] for cam in camera_infos] - if mock: - import tests.cameras.mock_cv2 as cv2 - else: - import cv2 + import cv2 print("Connecting cameras") cameras = [] for cam_sn in serial_numbers: print(f"{cam_sn=}") - config = RealSenseCameraConfig(serial_number=cam_sn, fps=fps, width=width, height=height, mock=mock) + config = RealSenseCameraConfig(serial_number=cam_sn, fps=fps, width=width, height=height) camera = RealSenseCamera(config) camera.connect() print( @@ -165,6 +159,22 @@ def save_images_from_cameras( camera.disconnect() +def find_serial_number_from_name(name): + camera_infos = find_realsense_cameras() + camera_names = [cam["name"] for cam in camera_infos] + this_name_count = Counter(camera_names)[name] + if this_name_count > 1: + # TODO(aliberts): Test this with multiple identical cameras (Aloha) + raise ValueError( + f"Multiple {name} cameras have been detected. Please use their serial number to instantiate them." + ) + + name_to_serial_dict = {cam["name"]: cam["serial_number"] for cam in camera_infos} + cam_sn = name_to_serial_dict[name] + + return cam_sn + + class RealSenseCamera(Camera): """ The RealSenseCamera class is similar to OpenCVCamera class but adds additional features for Intel Real Sense cameras: @@ -220,7 +230,7 @@ class RealSenseCamera(Camera): ): self.config = config if config.name is not None: - self.serial_number = self.find_serial_number_from_name(config.name) + self.serial_number = find_serial_number_from_name(config.name) else: self.serial_number = config.serial_number @@ -241,7 +251,6 @@ class RealSenseCamera(Camera): self.color_mode = config.color_mode self.use_depth = config.use_depth self.force_hardware_reset = config.force_hardware_reset - self.mock = config.mock self.camera = None self.is_connected = False @@ -251,42 +260,17 @@ class RealSenseCamera(Camera): self.depth_map = None self.logs = {} - if self.mock: - import tests.cameras.mock_cv2 as cv2 - else: - import cv2 + self.rotation = get_cv2_rotation(config.rotation) - self.rotation = None - if config.rotation == -90: - self.rotation = cv2.ROTATE_90_COUNTERCLOCKWISE - elif config.rotation == 90: - self.rotation = cv2.ROTATE_90_CLOCKWISE - elif config.rotation == 180: - self.rotation = cv2.ROTATE_180 - - def find_serial_number_from_name(self, name): - camera_infos = find_cameras() - camera_names = [cam["name"] for cam in camera_infos] - this_name_count = Counter(camera_names)[name] - if this_name_count > 1: - # TODO(aliberts): Test this with multiple identical cameras (Aloha) - raise ValueError( - f"Multiple {name} cameras have been detected. Please use their serial number to instantiate them." - ) - - name_to_serial_dict = {cam["name"]: cam["serial_number"] for cam in camera_infos} - cam_sn = name_to_serial_dict[name] - - return cam_sn + @property + def is_connected(self) -> bool: + pass def connect(self): if self.is_connected: raise DeviceAlreadyConnectedError(f"RealSenseCamera({self.serial_number}) is already connected.") - if self.mock: - import tests.cameras.mock_pyrealsense2 as rs - else: - import pyrealsense2 as rs + import pyrealsense2 as rs config = rs.config() config.enable_device(str(self.serial_number)) @@ -319,7 +303,7 @@ class RealSenseCamera(Camera): # valid cameras. if not is_camera_open: # Verify that the provided `serial_number` is valid before printing the traceback - camera_infos = find_cameras() + camera_infos = find_realsense_cameras() serial_numbers = [cam["serial_number"] for cam in camera_infos] if self.serial_number not in serial_numbers: raise ValueError( @@ -371,10 +355,7 @@ class RealSenseCamera(Camera): f"RealSenseCamera({self.serial_number}) is not connected. Try running `camera.connect()` first." ) - if self.mock: - import tests.cameras.mock_cv2 as cv2 - else: - import cv2 + import cv2 start_time = time.perf_counter() diff --git a/lerobot/common/cameras/intel/configuration_realsense.py b/lerobot/common/cameras/intel/configuration_realsense.py index 66bb1b4f1..e61d80572 100644 --- a/lerobot/common/cameras/intel/configuration_realsense.py +++ b/lerobot/common/cameras/intel/configuration_realsense.py @@ -43,7 +43,6 @@ class RealSenseCameraConfig(CameraConfig): use_depth: bool = False force_hardware_reset: bool = True rotation: int | None = None - mock: bool = False def __post_init__(self): # bool is stronger than is None, since it works with empty strings