chore(cameras): move if check out of _validate functions

This commit is contained in:
Steven Palma
2025-05-20 17:24:51 +02:00
parent 99ea24d6a3
commit 39f908e9db
3 changed files with 50 additions and 47 deletions

View File

@@ -200,16 +200,29 @@ class OpenCVCamera(Camera):
if not self.is_connected:
raise DeviceNotConnectedError(f"Cannot configure settings for {self} as it is not connected.")
self._validate_fps()
self._validate_width_and_height()
def _validate_fps(self) -> None:
"""Validates and sets the camera's frames per second (FPS)."""
if self.fps is None:
self.fps = self.videocapture.get(cv2.CAP_PROP_FPS)
logger.info(f"FPS set to camera default: {self.fps}.")
return
else:
self._validate_fps()
default_width = int(round(self.videocapture.get(cv2.CAP_PROP_FRAME_WIDTH)))
default_height = int(round(self.videocapture.get(cv2.CAP_PROP_FRAME_HEIGHT)))
if self.width is None or self.height is None:
if self.rotation in [cv2.ROTATE_90_CLOCKWISE, cv2.ROTATE_90_COUNTERCLOCKWISE]:
self.width, self.height = default_height, default_width
self.prerotated_width, self.prerotated_height = default_width, default_height
else:
self.width, self.height = default_width, default_height
self.prerotated_width, self.prerotated_height = default_width, default_height
logger.info(f"Capture width set to camera default: {self.width}.")
logger.info(f"Capture height set to camera default: {self.height}.")
else:
self._validate_width_and_height()
def _validate_fps(self) -> None:
"""Validates and sets the camera's frames per second (FPS)."""
success = self.videocapture.set(cv2.CAP_PROP_FPS, float(self.fps))
actual_fps = self.videocapture.get(cv2.CAP_PROP_FPS)
@@ -227,20 +240,6 @@ class OpenCVCamera(Camera):
def _validate_width_and_height(self) -> None:
"""Validates and sets the camera's frame capture width and height."""
default_width = int(round(self.videocapture.get(cv2.CAP_PROP_FRAME_WIDTH)))
default_height = int(round(self.videocapture.get(cv2.CAP_PROP_FRAME_HEIGHT)))
if self.width is None or self.height is None:
if self.rotation in [cv2.ROTATE_90_CLOCKWISE, cv2.ROTATE_90_COUNTERCLOCKWISE]:
self.width, self.height = default_height, default_width
self.prerotated_width, self.prerotated_height = default_width, default_height
else:
self.width, self.height = default_width, default_height
self.prerotated_width, self.prerotated_height = default_width, default_height
logger.info(f"Capture width set to camera default: {self.width}.")
logger.info(f"Capture height set to camera default: {self.height}.")
return
success = self.videocapture.set(cv2.CAP_PROP_FRAME_WIDTH, float(self.prerotated_width))
actual_width = int(round(self.videocapture.get(cv2.CAP_PROP_FRAME_WIDTH)))
if not success or self.prerotated_width != actual_width:

View File

@@ -314,32 +314,12 @@ class RealSenseCamera(Camera):
if not self.is_connected:
raise DeviceNotConnectedError(f"Cannot validate settings for {self} as it is not connected.")
self._validate_fps(self.rs_profile.get_stream(rs.stream.color).as_video_stream_profile())
self._validate_width_and_height(self.rs_profile.get_stream(rs.stream.color).as_video_stream_profile())
if self.use_depth:
self._validate_fps(self.rs_profile.get_stream(rs.stream.depth).as_video_stream_profile())
self._validate_width_and_height(
self.rs_profile.get_stream(rs.stream.depth).as_video_stream_profile()
)
def _validate_fps(self, stream: rs.video_stream_profile) -> None:
"""Validates and sets the internal FPS based on actual stream FPS."""
actual_fps = stream.fps()
stream = self.rs_profile.get_stream(rs.stream.color).as_video_stream_profile()
if self.fps is None:
self.fps = actual_fps
return
self.fps = stream.fps()
else:
self._validate_fps(stream)
# Use math.isclose for robust float comparison
if not math.isclose(self.fps, actual_fps, rel_tol=1e-3):
raise RuntimeError(
f"Failed to set requested FPS {self.fps} for {self}. Actual value reported: {actual_fps}."
)
logger.debug(f"FPS set to {actual_fps} for {self}.")
def _validate_width_and_height(self, stream) -> None:
"""Validates and sets the internal capture width and height based on actual stream width."""
actual_width = int(round(stream.width()))
actual_height = int(round(stream.height()))
@@ -352,7 +332,29 @@ class RealSenseCamera(Camera):
self.prerotated_width, self.prerotated_height = actual_width, actual_height
logger.info(f"Capture width set to camera default: {self.width}.")
logger.info(f"Capture height set to camera default: {self.height}.")
return
else:
self._validate_width_and_height(stream)
if self.use_depth:
stream = self.rs_profile.get_stream(rs.stream.depth).as_video_stream_profile()
self._validate_fps(stream)
self._validate_width_and_height(stream)
def _validate_fps(self, stream: rs.video_stream_profile) -> None:
"""Validates and sets the internal FPS based on actual stream FPS."""
actual_fps = stream.fps()
# Use math.isclose for robust float comparison
if not math.isclose(self.fps, actual_fps, rel_tol=1e-3):
raise RuntimeError(
f"Failed to set requested FPS {self.fps} for {self}. Actual value reported: {actual_fps}."
)
logger.debug(f"FPS set to {actual_fps} for {self}.")
def _validate_width_and_height(self, stream) -> None:
"""Validates and sets the internal capture width and height based on actual stream width."""
actual_width = int(round(stream.width()))
actual_height = int(round(stream.height()))
if self.prerotated_width != actual_width:
logger.warning(

View File

@@ -154,7 +154,9 @@ def test_async_read_timeout(mock_enable_device):
try:
with pytest.raises(TimeoutError):
camera.async_read(timeout_ms=0)
camera.async_read(
timeout_ms=0
) # NOTE(Steven): This is flaky as sdometimes we actually get a frame
finally:
if camera.is_connected:
camera.disconnect()