chore(cameras): move connect under is_connected

This commit is contained in:
Steven Palma
2025-05-20 17:09:16 +02:00
parent efd2849184
commit 99ea24d6a3
2 changed files with 62 additions and 62 deletions

View File

@@ -141,30 +141,6 @@ class OpenCVCamera(Camera):
"""Checks if the camera is currently connected and opened."""
return isinstance(self.videocapture, cv2.VideoCapture) and self.videocapture.isOpened()
def _configure_capture_settings(self) -> None:
"""
Applies the specified FPS, width, and height settings to the connected camera.
This method attempts to set the camera properties via OpenCV. It checks if
the camera successfully applied the settings and raises an error if not.
Args:
fps: The desired frames per second. If None, the setting is skipped.
width: The desired capture width. If None, the setting is skipped.
height: The desired capture height. If None, the setting is skipped.
Raises:
RuntimeError: If the camera fails to set any of the specified properties
to the requested value.
DeviceNotConnectedError: If the camera is not connected when attempting
to configure settings.
"""
if not self.is_connected:
raise DeviceNotConnectedError(f"Cannot configure settings for {self} as it is not connected.")
self._validate_fps()
self._validate_width_and_height()
def connect(self, warmup: bool = True):
"""
Connects to the OpenCV camera specified in the configuration.
@@ -203,6 +179,30 @@ class OpenCVCamera(Camera):
logger.debug(f"Camera {self.index_or_path} connected and configured successfully.")
def _configure_capture_settings(self) -> None:
"""
Applies the specified FPS, width, and height settings to the connected camera.
This method attempts to set the camera properties via OpenCV. It checks if
the camera successfully applied the settings and raises an error if not.
Args:
fps: The desired frames per second. If None, the setting is skipped.
width: The desired capture width. If None, the setting is skipped.
height: The desired capture height. If None, the setting is skipped.
Raises:
RuntimeError: If the camera fails to set any of the specified properties
to the requested value.
DeviceNotConnectedError: If the camera is not connected when attempting
to configure settings.
"""
if not self.is_connected:
raise DeviceNotConnectedError(f"Cannot configure settings for {self} as it is not connected.")
self._validate_fps()
self._validate_width_and_height()
def _validate_fps(self) -> None:
"""Validates and sets the camera's frames per second (FPS)."""

View File

@@ -156,6 +156,44 @@ class RealSenseCamera(Camera):
"""Checks if the camera pipeline is started and streams are active."""
return self.rs_pipeline is not None and self.rs_profile is not None
def connect(self, warmup: bool = True):
"""
Connects to the RealSense camera specified in the configuration.
Initializes the RealSense pipeline, configures the required streams (color
and optionally depth), starts the pipeline, and validates the actual stream settings.
Raises:
DeviceAlreadyConnectedError: If the camera is already connected.
ValueError: If the configuration is invalid (e.g., missing serial/name, name not unique).
ConnectionError: If the camera is found but fails to start the pipeline or no RealSense devices are detected at all.
RuntimeError: If the pipeline starts but fails to apply requested settings.
"""
if self.is_connected:
raise DeviceAlreadyConnectedError(f"{self} is already connected.")
self.rs_pipeline = rs.pipeline()
rs_config = self._make_rs_pipeline_config()
try:
self.rs_profile = self.rs_pipeline.start(rs_config)
logger.debug(f"Successfully started pipeline for camera {self.serial_number}.")
except RuntimeError as e:
self.rs_profile = None
self.rs_pipeline = None
raise ConnectionError(
f"Failed to open {self} camera. Run 'python -m find_cameras' for details about the available cameras in your system."
) from e
logger.debug(f"Validating stream configuration for {self}...")
self._validate_capture_settings()
if warmup:
logger.debug(f"Reading a warm-up frame for {self}...")
self.read() # NOTE(Steven): For now we just read one frame, we could also loop for X frames/secs
logger.info(f"{self} connected.")
@staticmethod
def find_cameras() -> List[Dict[str, Any]]:
"""
@@ -285,44 +323,6 @@ class RealSenseCamera(Camera):
self.rs_profile.get_stream(rs.stream.depth).as_video_stream_profile()
)
def connect(self, warmup: bool = True):
"""
Connects to the RealSense camera specified in the configuration.
Initializes the RealSense pipeline, configures the required streams (color
and optionally depth), starts the pipeline, and validates the actual stream settings.
Raises:
DeviceAlreadyConnectedError: If the camera is already connected.
ValueError: If the configuration is invalid (e.g., missing serial/name, name not unique).
ConnectionError: If the camera is found but fails to start the pipeline or no RealSense devices are detected at all.
RuntimeError: If the pipeline starts but fails to apply requested settings.
"""
if self.is_connected:
raise DeviceAlreadyConnectedError(f"{self} is already connected.")
self.rs_pipeline = rs.pipeline()
rs_config = self._make_rs_pipeline_config()
try:
self.rs_profile = self.rs_pipeline.start(rs_config)
logger.debug(f"Successfully started pipeline for camera {self.serial_number}.")
except RuntimeError as e:
self.rs_profile = None
self.rs_pipeline = None
raise ConnectionError(
f"Failed to open {self} camera. Run 'python -m find_cameras' for details about the available cameras in your system."
) from e
logger.debug(f"Validating stream configuration for {self}...")
self._validate_capture_settings()
if warmup:
logger.debug(f"Reading a warm-up frame for {self}...")
self.read() # NOTE(Steven): For now we just read one frame, we could also loop for X frames/secs
logger.info(f"{self} connected.")
def _validate_fps(self, stream: rs.video_stream_profile) -> None:
"""Validates and sets the internal FPS based on actual stream FPS."""
actual_fps = stream.fps()