Apply suggestions from code review camera_realsense.py

Co-authored-by: Simon Alibert <75076266+aliberts@users.noreply.github.com>
This commit is contained in:
Steven Palma
2025-05-20 13:40:45 +02:00
committed by GitHub
parent 295b96c539
commit 1f2cfd3828

View File

@@ -68,25 +68,23 @@ class RealSenseCamera(Camera):
Example:
```python
from lerobot.common.cameras.intel.camera_realsense import RealSenseCamera
from lerobot.common.cameras.intel.configuration_realsense import RealSenseCameraConfig
from lerobot.common.cameras.configs import ColorMode
from lerobot.common.cameras.intel import RealSenseCamera, RealSenseCameraConfig
from lerobot.common.cameras import ColorMode
# Basic usage with serial number
config = RealSenseCameraConfig(serial_number="1234567890") # Replace with actual SN
camera = RealSenseCamera(config)
try:
camera.connect()
print(f"Connected to {camera}")
color_image = camera.read() # Synchronous read (color only)
print(f"Read frame shape: {color_image.shape}")
async_image = camera.async_read() # Asynchronous read
print(f"Async read frame shape: {async_image.shape}")
except Exception as e:
print(f"An error occurred: {e}")
finally:
camera.disconnect()
print(f"Disconnected from {camera}")
camera.connect()
# Read 1 frame synchronously
color_image = camera.read()
print(color_image.shape)
# Read 1 frame asynchronously
async_image = camera.async_read()
# When done, properly disconnect the camera using
camera.disconnect()
# Example with depth capture and custom settings
custom_config = RealSenseCameraConfig(
@@ -132,10 +130,10 @@ class RealSenseCamera(Camera):
else:
raise ValueError("RealSenseCameraConfig must provide either 'serial_number' or 'name'.")
self.fps: int | None = config.fps
self.channels: int = config.channels
self.color_mode: ColorMode = config.color_mode
self.use_depth: bool = config.use_depth
self.fps = config.fps
self.channels = config.channels
self.color_mode = config.color_mode
self.use_depth = config.use_depth
self.rs_pipeline: rs.pipeline | None = None
self.rs_profile: rs.pipeline_profile | None = None
@@ -144,7 +142,6 @@ class RealSenseCamera(Camera):
self.stop_event: Event | None = None
self.frame_queue: queue.Queue = queue.Queue(maxsize=1)
self.logs: dict = {} # For timestamping or other metadata
self.rotation: int | None = get_cv2_rotation(config.rotation)
@@ -155,7 +152,6 @@ class RealSenseCamera(Camera):
self.prerotated_width, self.prerotated_height = self.width, self.height
def __str__(self) -> str:
"""Returns a string representation of the camera instance."""
return f"{self.__class__.__name__}({self.serial_number})"
@property
@@ -164,34 +160,23 @@ class RealSenseCamera(Camera):
return self.rs_pipeline is not None and self.rs_profile is not None
@staticmethod
def find_cameras(raise_when_empty: bool = True) -> List[Dict[str, Any]]:
def find_cameras() -> List[Dict[str, Any]]:
"""
Detects available Intel RealSense cameras connected to the system.
Args:
raise_when_empty (bool): If True, raises an OSError if no cameras are found.
Returns:
List[Dict[str, Any]]: A list of dictionaries,
where each dictionary contains 'type', 'id' (serial number), 'name',
firmware version, USB type, and other available specs, and the default profile properties (width, height, fps, format).
Raises:
OSError: If `raise_when_empty` is True and no cameras are detected,
or if pyrealsense2 is not installed.
OSError: If pyrealsense2 is not installed.
ImportError: If pyrealsense2 is not installed.
"""
found_cameras_info = []
context = rs.context()
devices = context.query_devices()
if not devices:
logger.warning("No RealSense devices detected.")
if raise_when_empty:
raise OSError(
"No RealSense devices detected. Ensure cameras are connected, "
"library (`pyrealsense2`) is installed, and firmware is up-to-date."
)
for device in devices:
camera_info = {
@@ -250,7 +235,7 @@ class RealSenseCamera(Camera):
logger.info(f"Found serial number '{serial_number}' for camera name '{name}'.")
return serial_number
def _configure_realsense_settings(self) -> rs.config:
def _make_rs_pipeline_config(self) -> rs.config:
"""Creates and configures the RealSense pipeline configuration object."""
rs_config = rs.config()
rs.config.enable_device(rs_config, self.serial_number)
@@ -304,7 +289,7 @@ class RealSenseCamera(Camera):
self.rs_profile.get_stream(rs.stream.depth).as_video_stream_profile()
)
def connect(self, do_warmup_read: bool = True):
def connect(self, warmup: bool = True):
"""
Connects to the RealSense camera specified in the configuration.
@@ -314,14 +299,12 @@ class RealSenseCamera(Camera):
Raises:
DeviceAlreadyConnectedError: If the camera is already connected.
ValueError: If the configuration is invalid (e.g., missing serial/name, name not unique).
ConnectionError: If the camera is found but fails to start the pipeline.
ConnectionError: If the camera is found but fails to start the pipeline or no RealSense devices are detected at all.
RuntimeError: If the pipeline starts but fails to apply requested settings.
OSError: If no RealSense devices are detected at all.
"""
if self.is_connected:
raise DeviceAlreadyConnectedError(f"{self} is already connected.")
logger.debug(f"Attempting to connect to camera {self.serial_number}...")
self.rs_pipeline = rs.pipeline()
rs_config = self._configure_realsense_settings()
@@ -332,34 +315,29 @@ class RealSenseCamera(Camera):
self.rs_profile = None
self.rs_pipeline = None
raise ConnectionError(
f"Failed to open RealSense camera {self.serial_number}. Error: {e}. "
f"Failed to open {self} camera. "
f"Run 'python -m find_cameras list-cameras' for details."
) from e
logger.debug(f"Validating stream configuration for {self.serial_number}...")
logger.debug(f"Validating stream configuration for {self}...")
self._validate_capture_settings()
if do_warmup_read:
logger.debug(f"Reading a warm-up frame for {self.serial_number}...")
if warmup:
logger.debug(f"Reading a warm-up frame for {self}...")
self.read() # NOTE(Steven): For now we just read one frame, we could also loop for X frames/secs
logger.info(f"Camera {self.serial_number} connected and configured successfully.")
logger.info(f"{self} connected.")
def _validate_fps(self, stream) -> None:
def _validate_fps(self, stream: rs.video_stream_profile) -> None:
"""Validates and sets the internal FPS based on actual stream FPS."""
actual_fps = stream.fps()
if self.fps is None:
self.fps = actual_fps
logger.info(f"FPS not specified, using camera default: {self.fps} FPS.")
return
# Use math.isclose for robust float comparison
if not math.isclose(self.fps, actual_fps, rel_tol=1e-3):
logger.warning(
f"Requested FPS {self.fps} for {self}, but camera reported {actual_fps}. "
"This might be due to camera limitations."
)
raise RuntimeError(
f"Failed to set requested FPS {self.fps} for {self}. Actual value reported: {actual_fps}."
)
@@ -399,7 +377,7 @@ class RealSenseCamera(Camera):
)
logger.debug(f"Capture height set to {actual_height} for {self}.")
def read_depth(self, timeout_ms: int = 5000) -> np.ndarray:
def read_depth(self, timeout_ms: int = 100) -> np.ndarray:
"""
Reads a single frame (depth) synchronously from the camera.
@@ -407,7 +385,7 @@ class RealSenseCamera(Camera):
from the camera hardware via the RealSense pipeline.
Args:
timeout_ms (int): Maximum time in milliseconds to wait for a frame. Defaults to 5000ms.
timeout_ms (int): Maximum time in milliseconds to wait for a frame. Defaults to 100ms.
Returns:
np.ndarray: The depth map as a NumPy array (height, width)
@@ -428,13 +406,11 @@ class RealSenseCamera(Camera):
start_time = time.perf_counter()
ret, frame = self.rs_pipeline.try_wait_for_frames(
timeout_ms=timeout_ms
) # NOTE(Steven): This read has a timeout
ret, frame = self.rs_pipeline.try_wait_for_frames(timeout_ms=timeout_ms)
if not ret or frame is None:
raise RuntimeError(
f"Failed to capture frame from {self}. '.read_depth()' returned status={ret} and frame is None."
f"{self} failed to capture frame. Returned status='{ret}'."
)
depth_frame = frame.get_depth_frame()
@@ -448,7 +424,7 @@ class RealSenseCamera(Camera):
self.logs["timestamp_utc"] = capture_timestamp_utc()
return depth_map_processed
def read(self, color_mode: ColorMode | None = None, timeout_ms: int = 5000) -> np.ndarray:
def read(self, color_mode: ColorMode | None = None, timeout_ms: int = 100) -> np.ndarray:
"""
Reads a single frame (color) synchronously from the camera.
@@ -456,7 +432,7 @@ class RealSenseCamera(Camera):
from the camera hardware via the RealSense pipeline.
Args:
timeout_ms (int): Maximum time in milliseconds to wait for a frame. Defaults to 5000ms.
timeout_ms (int): Maximum time in milliseconds to wait for a frame. Defaults to 100ms.
Returns:
np.ndarray: The captured color frame as a NumPy array
@@ -569,7 +545,7 @@ class RealSenseCamera(Camera):
logger.debug(f"Stopping read loop thread for {self}.")
def _ensure_read_thread_running(self):
def _start_read_thread(self) -> None:
"""Starts or restarts the background read thread if it's not running."""
if self.thread is not None and self.thread.is_alive():
self.thread.join(timeout=0.1)
@@ -578,14 +554,14 @@ class RealSenseCamera(Camera):
self.stop_event = Event()
self.thread = Thread(
target=self._read_loop, args=(), name=f"RealSenseReadLoop-{self}-{self.serial_number}"
target=self._read_loop, args=(), name=f"{self}_read_loop"
)
self.thread.daemon = True
self.thread.start()
logger.debug(f"Read thread started for {self}.")
# NOTE(Steven): Missing implementation for depth for now
def async_read(self, timeout_ms: float = 2000) -> np.ndarray:
def async_read(self, timeout_ms: float = 100) -> np.ndarray:
"""
Reads the latest available frame data (color or color+depth) asynchronously.
@@ -596,7 +572,7 @@ class RealSenseCamera(Camera):
Args:
timeout_ms (float): Maximum time in milliseconds to wait for a frame
to become available in the queue. Defaults to 2000ms (2 seconds).
to become available in the queue. Defaults to 100ms (0.1 seconds).
Returns:
np.ndarray:
@@ -663,8 +639,6 @@ class RealSenseCamera(Camera):
f"Attempted to disconnect {self}, but it appears already disconnected."
)
logger.debug(f"Disconnecting from camera {self.serial_number}...")
if self.thread is not None:
self._shutdown_read_thread()
@@ -674,4 +648,4 @@ class RealSenseCamera(Camera):
self.rs_pipeline = None
self.rs_profile = None
logger.info(f"Camera {self.serial_number} disconnected successfully.")
logger.info(f"{self} disconnected.")