chore(cameras): address review comments + make test pass again

This commit is contained in:
Steven Palma
2025-05-20 14:42:05 +02:00
parent 3a08eddeab
commit 41179d0996
13 changed files with 150 additions and 127 deletions

View File

@@ -22,7 +22,35 @@ from .configs import CameraConfig, ColorMode
class Camera(abc.ABC):
"""Base class for camera implementations.
Defines a standard interface for camera operations across different backends.
Subclasses must implement all abstract methods.
Manages basic camera properties (FPS, resolution) and core operations:
- Connection/disconnection
- Frame capture (sync/async)
Attributes:
fps (int | None): Configured frames per second
width (int | None): Frame width in pixels
height (int | None): Frame height in pixels
Example:
class MyCamera(Camera):
def __init__(self, config): ...
@property
def is_connected(self) -> bool: ...
def connect(self, warmup=True): ...
# Plus other required methods
"""
def __init__(self, config: CameraConfig):
"""Initialize the camera with the given configuration.
Args:
config: Camera configuration containing FPS and resolution.
"""
self.fps: int | None = config.fps
self.width: int | None = config.width
self.height: int | None = config.height
@@ -30,20 +58,50 @@ class Camera(abc.ABC):
@property
@abc.abstractmethod
def is_connected(self) -> bool:
"""Check if the camera is currently connected.
Returns:
bool: True if the camera is connected and ready to capture frames,
False otherwise.
"""
pass
@abc.abstractmethod
def connect(self, warmup: bool = True) -> None:
"""Establish connection to the camera.
Args:
warmup: If True (default), captures a warmup frame before returning.
"""
pass
@abc.abstractmethod
def read(self, color_mode: ColorMode | None = None) -> np.ndarray:
"""Capture and return a single frame from the camera.
Args:
color_mode: Desired color mode for the output frame. If None,
uses the camera's default color mode.
Returns:
np.ndarray: Captured frame as a numpy array.
"""
pass
@abc.abstractmethod
def async_read(self, timeout_ms: float = ...) -> np.ndarray:
"""Asynchronously capture and return a single frame from the camera.
Args:
timeout_ms: Maximum time to wait for a frame in milliseconds.
Defaults to implementation-specific timeout.
Returns:
np.ndarray: Captured frame as a numpy array.
"""
pass
@abc.abstractmethod
def disconnect(self) -> None:
"""Disconnect from the camera and release resources."""
pass

View File

@@ -29,7 +29,6 @@ import numpy as np
import pyrealsense2 as rs
from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
from lerobot.common.utils.utils import capture_timestamp_utc
from ..camera import Camera
from ..configs import ColorMode
@@ -131,7 +130,6 @@ class RealSenseCamera(Camera):
raise ValueError("RealSenseCameraConfig must provide either 'serial_number' or 'name'.")
self.fps = config.fps
self.channels = config.channels
self.color_mode = config.color_mode
self.use_depth = config.use_depth
@@ -213,7 +211,7 @@ class RealSenseCamera(Camera):
def _find_serial_number_from_name(self, name: str) -> str:
"""Finds the serial number for a given unique camera name."""
camera_infos = self.find_cameras(raise_when_empty=True)
camera_infos = self.find_cameras()
found_devices = [cam for cam in camera_infos if str(cam["name"]) == name]
if not found_devices:
@@ -304,7 +302,7 @@ class RealSenseCamera(Camera):
raise DeviceAlreadyConnectedError(f"{self} is already connected.")
self.rs_pipeline = rs.pipeline()
rs_config = self._configure_realsense_settings()
rs_config = self._make_rs_pipeline_config()
try:
self.rs_profile = self.rs_pipeline.start(rs_config)
@@ -313,7 +311,7 @@ class RealSenseCamera(Camera):
self.rs_profile = None
self.rs_pipeline = None
raise ConnectionError(
f"Failed to open {self} camera. Run 'python -m find_cameras list-cameras' for details."
f"Failed to open {self} camera. Run 'python -m find_cameras' for details about the available cameras in your system."
) from e
logger.debug(f"Validating stream configuration for {self}...")
@@ -416,7 +414,6 @@ class RealSenseCamera(Camera):
read_duration_ms = (time.perf_counter() - start_time) * 1e3
logger.debug(f"{self} synchronous read took: {read_duration_ms:.1f}ms")
self.logs["timestamp_utc"] = capture_timestamp_utc()
return depth_map_processed
def read(self, color_mode: ColorMode | None = None, timeout_ms: int = 100) -> np.ndarray:
@@ -461,7 +458,6 @@ class RealSenseCamera(Camera):
read_duration_ms = (time.perf_counter() - start_time) * 1e3
logger.debug(f"{self} synchronous read took: {read_duration_ms:.1f}ms")
self.logs["timestamp_utc"] = capture_timestamp_utc()
return color_image_processed
def _postprocess_image(
@@ -492,11 +488,7 @@ class RealSenseCamera(Camera):
if depth_frame:
h, w = image.shape
else:
h, w, c = image.shape
if c != self.channels:
logger.warning(
f"Captured frame channels ({c}) do not match configured channels ({self.channels}) for {self}."
)
h, w, _c = image.shape
if h != self.prerotated_height or w != self.prerotated_width:
raise RuntimeError(
@@ -580,7 +572,7 @@ class RealSenseCamera(Camera):
raise DeviceNotConnectedError(f"{self} is not connected.")
if self.thread is None or not self.thread.is_alive():
self._ensure_read_thread_running()
self._start_read_thread()
try:
return self.frame_queue.get(timeout=timeout_ms / 1000.0)
@@ -600,7 +592,7 @@ class RealSenseCamera(Camera):
f"Error getting frame data from queue for camera {self.serial_number}: {e}"
) from e
def _shutdown_read_thread(self):
def _stop_read_thread(self):
"""Signals the background read thread to stop and waits for it to join."""
if self.stop_event is not None:
logger.debug(f"Signaling stop event for read thread of {self}.")
@@ -633,7 +625,7 @@ class RealSenseCamera(Camera):
)
if self.thread is not None:
self._shutdown_read_thread()
self._stop_read_thread()
if self.rs_pipeline is not None:
logger.debug(f"Stopping RealSense pipeline object for {self}.")

View File

@@ -44,7 +44,6 @@ class RealSenseCameraConfig(CameraConfig):
serial_number: Optional unique serial number to identify the camera.
Either name or serial_number must be provided.
color_mode: Color mode for image output (RGB or BGR). Defaults to RGB.
channels: Number of color channels (currently only 3 is supported).
use_depth: Whether to enable depth stream. Defaults to False.
rotation: Image rotation setting (0°, 90°, 180°, or 270°). Defaults to no rotation.
@@ -58,7 +57,6 @@ class RealSenseCameraConfig(CameraConfig):
name: str | None = None
serial_number: int | None = None
color_mode: ColorMode = ColorMode.RGB
channels: int | None = 3
use_depth: bool = False
rotation: Cv2Rotation = Cv2Rotation.NO_ROTATION # NOTE(Steven): Check if draccus can parse to an enum
@@ -78,9 +76,6 @@ class RealSenseCameraConfig(CameraConfig):
f"`rotation` is expected to be in {(Cv2Rotation.NO_ROTATION, Cv2Rotation.ROTATE_90, Cv2Rotation.ROTATE_180, Cv2Rotation.ROTATE_270)}, but {self.rotation} is provided."
)
if self.channels != 3:
raise NotImplementedError(f"Unsupported number of channels: {self.channels}")
if bool(self.name) and bool(self.serial_number):
raise ValueError(
f"One of them must be set: name or serial_number, but {self.name=} and {self.serial_number=} provided."

View File

@@ -30,7 +30,6 @@ import cv2
import numpy as np
from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
from lerobot.common.utils.utils import capture_timestamp_utc
from ..camera import Camera
from ..utils import get_cv2_backend, get_cv2_rotation
@@ -117,7 +116,6 @@ class OpenCVCamera(Camera):
self.index_or_path = config.index_or_path
self.fps = config.fps
self.channels = config.channels
self.color_mode = config.color_mode
self.videocapture: cv2.VideoCapture | None = None
@@ -141,7 +139,7 @@ class OpenCVCamera(Camera):
@property
def is_connected(self) -> bool:
"""Checks if the camera is currently connected and opened."""
return isinstance(self.videocapture_camera, cv2.VideoCapture) and self.videocapture_camera.isOpened()
return isinstance(self.videocapture, cv2.VideoCapture) and self.videocapture.isOpened()
def _configure_capture_settings(self) -> None:
"""
@@ -187,19 +185,19 @@ class OpenCVCamera(Camera):
# blocking in multi-threaded applications, especially during data collection.
cv2.setNumThreads(1)
self.videocapture_camera = cv2.VideoCapture(self.index_or_path)
self.videocapture = cv2.VideoCapture(self.index_or_path)
if not self.videocapture_camera.isOpened():
self.videocapture_camera.release()
self.videocapture_camera = None
if not self.videocapture.isOpened():
self.videocapture.release()
self.videocapture = None
raise ConnectionError(
f"Failed to open OpenCV camera {self.index_or_path}."
f"Run 'python -m find_cameras list-cameras' for details."
f"Run 'python -m find_cameras Run 'python -m find_cameras' for details about the available cameras in your system."
)
self._configure_capture_settings()
if do_warmup_read:
if warmup:
logger.debug(f"Reading a warm-up frame for {self.index_or_path}...")
self.read() # NOTE(Steven): For now we just read one frame, we could also loop for X frames/secs
@@ -209,12 +207,12 @@ class OpenCVCamera(Camera):
"""Validates and sets the camera's frames per second (FPS)."""
if self.fps is None:
self.fps = self.videocapture_camera.get(cv2.CAP_PROP_FPS)
self.fps = self.videocapture.get(cv2.CAP_PROP_FPS)
logger.info(f"FPS set to camera default: {self.fps}.")
return
success = self.videocapture_camera.set(cv2.CAP_PROP_FPS, float(self.fps))
actual_fps = self.videocapture_camera.get(cv2.CAP_PROP_FPS)
success = self.videocapture.set(cv2.CAP_PROP_FPS, float(self.fps))
actual_fps = self.videocapture.get(cv2.CAP_PROP_FPS)
# Use math.isclose for robust float comparison
if not success or not math.isclose(self.fps, actual_fps, rel_tol=1e-3):
logger.warning(
@@ -229,8 +227,8 @@ class OpenCVCamera(Camera):
def _validate_width_and_height(self) -> None:
"""Validates and sets the camera's frame capture width and height."""
default_width = int(round(self.videocapture_camera.get(cv2.CAP_PROP_FRAME_WIDTH)))
default_height = int(round(self.videocapture_camera.get(cv2.CAP_PROP_FRAME_HEIGHT)))
default_width = int(round(self.videocapture.get(cv2.CAP_PROP_FRAME_WIDTH)))
default_height = int(round(self.videocapture.get(cv2.CAP_PROP_FRAME_HEIGHT)))
if self.width is None or self.height is None:
if self.rotation in [cv2.ROTATE_90_CLOCKWISE, cv2.ROTATE_90_COUNTERCLOCKWISE]:
@@ -243,8 +241,8 @@ class OpenCVCamera(Camera):
logger.info(f"Capture height set to camera default: {self.height}.")
return
success = self.videocapture_camera.set(cv2.CAP_PROP_FRAME_WIDTH, float(self.prerotated_width))
actual_width = int(round(self.videocapture_camera.get(cv2.CAP_PROP_FRAME_WIDTH)))
success = self.videocapture.set(cv2.CAP_PROP_FRAME_WIDTH, float(self.prerotated_width))
actual_width = int(round(self.videocapture.get(cv2.CAP_PROP_FRAME_WIDTH)))
if not success or self.prerotated_width != actual_width:
logger.warning(
f"Requested capture width {self.prerotated_width} for {self}, but camera reported {actual_width} (set success: {success})."
@@ -254,8 +252,8 @@ class OpenCVCamera(Camera):
)
logger.debug(f"Capture width set to {actual_width} for {self}.")
success = self.videocapture_camera.set(cv2.CAP_PROP_FRAME_HEIGHT, float(self.prerotated_height))
actual_height = int(round(self.videocapture_camera.get(cv2.CAP_PROP_FRAME_HEIGHT)))
success = self.videocapture.set(cv2.CAP_PROP_FRAME_HEIGHT, float(self.prerotated_height))
actual_height = int(round(self.videocapture.get(cv2.CAP_PROP_FRAME_HEIGHT)))
if not success or self.prerotated_height != actual_height:
logger.warning(
f"Requested capture height {self.prerotated_height} for {self}, but camera reported {actual_height} (set success: {success})."
@@ -275,7 +273,6 @@ class OpenCVCamera(Camera):
Args:
max_index_search_range (int): The maximum index to check on non-Linux systems.
raise_when_empty (bool): If True, raises an OSError if no cameras are found.
Returns:
List[Dict[str, Any]]: A list of dictionaries,
@@ -321,8 +318,6 @@ class OpenCVCamera(Camera):
if not found_cameras_info:
logger.warning("No OpenCV devices detected.")
if raise_when_empty:
raise OSError("No OpenCV devices detected. Ensure cameras are connected.")
logger.info(f"Detected OpenCV cameras: {[cam['id'] for cam in found_cameras_info]}")
return found_cameras_info
@@ -356,7 +351,7 @@ class OpenCVCamera(Camera):
start_time = time.perf_counter()
# NOTE(Steven): Are we okay with this blocking an undefined amount of time?
ret, frame = self.videocapture_camera.read()
ret, frame = self.videocapture.read()
if not ret or frame is None:
raise RuntimeError(
@@ -369,7 +364,6 @@ class OpenCVCamera(Camera):
read_duration_ms = (time.perf_counter() - start_time) * 1e3
logger.debug(f"{self} synchronous read took: {read_duration_ms:.1f}ms")
self.logs["timestamp_utc"] = capture_timestamp_utc()
return processed_frame
def _postprocess_image(self, image: np.ndarray, color_mode: ColorMode | None = None) -> np.ndarray:
@@ -402,10 +396,6 @@ class OpenCVCamera(Camera):
raise RuntimeError(
f"Captured frame dimensions ({h}x{w}) do not match configured capture dimensions ({self.prerotated_height}x{self.prerotated_width}) for {self}."
)
if c != self.channels:
logger.warning(
f"Captured frame channels ({c}) do not match configured channels ({self.channels}) for {self}."
)
processed_image = image
if requested_color_mode == ColorMode.RGB:
@@ -485,7 +475,7 @@ class OpenCVCamera(Camera):
raise DeviceNotConnectedError(f"{self} is not connected.")
if self.thread is None or not self.thread.is_alive():
self._ensure_read_thread_running()
self._start_read_thread()
try:
return self.frame_queue.get(timeout=timeout_ms / 1000.0)
@@ -534,10 +524,10 @@ class OpenCVCamera(Camera):
raise DeviceNotConnectedError(f"{self} not connected.")
if self.thread is not None:
self._shutdown_read_thread()
self._stop_read_thread()
if self.videocapture_camera is not None:
self.videocapture_camera.release()
self.videocapture_camera = None
if self.videocapture is not None:
self.videocapture.release()
self.videocapture = None
logger.info(f"{self} disconnected.")

View File

@@ -44,7 +44,6 @@ class OpenCVCameraConfig(CameraConfig):
width: Requested frame width in pixels for the color stream.
height: Requested frame height in pixels for the color stream.
color_mode: Color mode for image output (RGB or BGR). Defaults to RGB.
channels: Number of color channels (currently only 3 is supported).
rotation: Image rotation setting (0°, 90°, 180°, or 270°). Defaults to no rotation.
Note:
@@ -70,6 +69,3 @@ class OpenCVCameraConfig(CameraConfig):
raise ValueError(
f"`rotation` is expected to be in {(Cv2Rotation.NO_ROTATION, Cv2Rotation.ROTATE_90, Cv2Rotation.ROTATE_180, Cv2Rotation.ROTATE_270)}, but {self.rotation} is provided."
)
if self.channels != 3:
raise NotImplementedError(f"Unsupported number of channels: {self.channels}")

View File

@@ -14,6 +14,16 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Helper to find the camera devices available in your system.
Example:
```shell
python -m lerobot.find_cameras
```
"""
import argparse
import concurrent.futures
import logging
@@ -44,7 +54,7 @@ def find_all_opencv_cameras() -> List[Dict[str, Any]]:
all_opencv_cameras_info: List[Dict[str, Any]] = []
logger.info("Searching for OpenCV cameras...")
try:
opencv_cameras = OpenCVCamera.find_cameras(raise_when_empty=False)
opencv_cameras = OpenCVCamera.find_cameras()
for cam_info in opencv_cameras:
all_opencv_cameras_info.append(cam_info)
logger.info(f"Found {len(opencv_cameras)} OpenCV cameras.")
@@ -64,7 +74,7 @@ def find_all_realsense_cameras() -> List[Dict[str, Any]]:
all_realsense_cameras_info: List[Dict[str, Any]] = []
logger.info("Searching for RealSense cameras...")
try:
realsense_cameras = RealSenseCamera.find_cameras(raise_when_empty=False)
realsense_cameras = RealSenseCamera.find_cameras()
for cam_info in realsense_cameras:
all_realsense_cameras_info.append(cam_info)
logger.info(f"Found {len(realsense_cameras)} RealSense cameras.")
@@ -179,7 +189,7 @@ def create_camera_instance(cam_meta: Dict[str, Any]) -> Dict[str, Any] | None:
if instance:
logger.info(f"Connecting to {cam_type} camera: {cam_id}...")
instance.connect(do_warmup_read=False)
instance.connect(warmup=False)
return {"instance": instance, "meta": cam_meta}
except Exception as e:
logger.error(f"Failed to connect or configure {cam_type} camera {cam_id}: {e}")
@@ -292,28 +302,8 @@ if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Unified camera utility script for listing cameras and capturing images."
)
subparsers = parser.add_subparsers(dest="command", help="Available commands")
# List cameras command
list_parser = subparsers.add_parser(
"list-cameras", help="Shows connected cameras. Optionally filter by type (realsense or opencv)."
)
list_parser.add_argument(
"camera_type",
type=str,
nargs="?",
default=None,
choices=["realsense", "opencv"],
help="Specify camera type to list (e.g., 'realsense', 'opencv'). Lists all if omitted.",
)
list_parser.set_defaults(func=lambda args: find_and_print_cameras(args.camera_type))
# Capture images command
capture_parser = subparsers.add_parser(
"capture-images",
help="Saves images from detected cameras (optionally filtered by type) using their default stream profiles.",
)
capture_parser.add_argument(
parser.add_argument(
"camera_type",
type=str,
nargs="?",
@@ -321,19 +311,19 @@ if __name__ == "__main__":
choices=["realsense", "opencv"],
help="Specify camera type to capture from (e.g., 'realsense', 'opencv'). Captures from all if omitted.",
)
capture_parser.add_argument(
parser.add_argument(
"--output-dir",
type=Path,
default="outputs/captured_images",
help="Directory to save images. Default: outputs/captured_images",
)
capture_parser.add_argument(
parser.add_argument(
"--record-time-s",
type=float,
default=6.0,
help="Time duration to attempt capturing frames. Default: 6 seconds.",
)
capture_parser.set_defaults(
parser.set_defaults(
func=lambda args: save_images_from_all_cameras(
output_dir=args.output_dir,
record_time_s=args.record_time_s,
@@ -343,14 +333,4 @@ if __name__ == "__main__":
args = parser.parse_args()
if args.command is None:
default_output_dir = capture_parser.get_default("output_dir")
default_record_time_s = capture_parser.get_default("record_time_s")
save_images_from_all_cameras(
output_dir=default_output_dir,
record_time_s=default_record_time_s,
camera_type_filter=None,
)
else:
args.func(args)
args.func(args)

View File

@@ -86,7 +86,7 @@ dynamixel = ["dynamixel-sdk>=3.7.31"]
feetech = ["feetech-servo-sdk>=1.0.0"]
intelrealsense = [
"pyrealsense2>=2.55.1.6486 ; sys_platform != 'darwin'",
"pyrealsense2-macosx>=2.54 ; sys_platform == 'darwin'", #NOTE(Steven): Check previous version for sudo issue
"pyrealsense2-macosx>=2.54 ; sys_platform == 'darwin'",
]
pi0 = ["transformers>=4.48.0"]
pusht = ["gym-pusht>=0.1.5 ; python_version < '4.0'"]

View File

@@ -0,0 +1,3 @@
version https://git-lfs.github.com/spec/v1
oid sha256:9dc9df05797dc0e7b92edc845caab2e4c37c3cfcabb4ee6339c67212b5baba3b
size 38023

View File

@@ -0,0 +1,3 @@
version https://git-lfs.github.com/spec/v1
oid sha256:7e11af87616b83c1cdb30330e951b91e86b51c64a1326e1ba5b4a3fbcdec1a11
size 55698

View File

@@ -0,0 +1,3 @@
version https://git-lfs.github.com/spec/v1
oid sha256:b8840fb643afe903191248703b1f95a57faf5812ecd9978ac502ee939646fdb2
size 121115

View File

@@ -0,0 +1,3 @@
version https://git-lfs.github.com/spec/v1
oid sha256:f79d14daafb1c0cf2fec5d46ee8029a73fe357402fdd31a7cd4a4794d7319a7c
size 260367

View File

@@ -19,7 +19,7 @@
# pytest tests/cameras/test_opencv.py::test_connect
# ```
import os
from pathlib import Path
import numpy as np
import pytest
@@ -29,13 +29,13 @@ from lerobot.common.cameras.opencv import OpenCVCamera, OpenCVCameraConfig
from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
# NOTE(Steven): more tests + assertions?
TEST_ARTIFACTS_DIR = os.path.join(os.path.dirname(os.path.dirname(__file__)), "artifacts", "cameras")
DEFAULT_PNG_FILE_PATH = os.path.join(TEST_ARTIFACTS_DIR, "fakecam_sd_160x120.png")
TEST_ARTIFACTS_DIR = Path(__file__).parent.parent / "artifacts" / "cameras"
DEFAULT_PNG_FILE_PATH = TEST_ARTIFACTS_DIR / "fakecam_sd_160x120.png"
TEST_IMAGE_PATHS = [
os.path.join(TEST_ARTIFACTS_DIR, "fakecam_sd_160x120.png"),
os.path.join(TEST_ARTIFACTS_DIR, "fakecam_hd_320x180.png"),
os.path.join(TEST_ARTIFACTS_DIR, "fakecam_fullhd_480x270.png"),
os.path.join(TEST_ARTIFACTS_DIR, "fakecam_square_128x128.png"),
TEST_ARTIFACTS_DIR / "image_160x120.png",
TEST_ARTIFACTS_DIR / "image_320x180.png",
TEST_ARTIFACTS_DIR / "image_480x270.png",
TEST_ARTIFACTS_DIR / "image_128x128.png",
]
@@ -50,7 +50,7 @@ def test_connect():
config = OpenCVCameraConfig(index_or_path=DEFAULT_PNG_FILE_PATH)
camera = OpenCVCamera(config)
camera.connect(do_warmup_read=False)
camera.connect(warmup=False)
assert camera.is_connected
@@ -58,10 +58,10 @@ def test_connect():
def test_connect_already_connected():
config = OpenCVCameraConfig(index_or_path=DEFAULT_PNG_FILE_PATH)
camera = OpenCVCamera(config)
camera.connect(do_warmup_read=False)
camera.connect(warmup=False)
with pytest.raises(DeviceAlreadyConnectedError):
camera.connect(do_warmup_read=False)
camera.connect(warmup=False)
def test_connect_invalid_camera_path():
@@ -69,7 +69,7 @@ def test_connect_invalid_camera_path():
camera = OpenCVCamera(config)
with pytest.raises(ConnectionError):
camera.connect(do_warmup_read=False)
camera.connect(warmup=False)
def test_invalid_width_connect():
@@ -81,14 +81,14 @@ def test_invalid_width_connect():
camera = OpenCVCamera(config)
with pytest.raises(RuntimeError):
camera.connect(do_warmup_read=False)
camera.connect(warmup=False)
@pytest.mark.parametrize("index_or_path", TEST_IMAGE_PATHS)
def test_read(index_or_path):
config = OpenCVCameraConfig(index_or_path=index_or_path)
camera = OpenCVCamera(config)
camera.connect(do_warmup_read=False)
camera.connect(warmup=False)
img = camera.read()
@@ -106,7 +106,7 @@ def test_read_before_connect():
def test_disconnect():
config = OpenCVCameraConfig(index_or_path=DEFAULT_PNG_FILE_PATH)
camera = OpenCVCamera(config)
camera.connect(do_warmup_read=False)
camera.connect(warmup=False)
camera.disconnect()
@@ -125,7 +125,7 @@ def test_disconnect_before_connect():
def test_async_read(index_or_path):
config = OpenCVCameraConfig(index_or_path=index_or_path)
camera = OpenCVCamera(config)
camera.connect(do_warmup_read=False)
camera.connect(warmup=False)
try:
img = camera.async_read()
@@ -141,7 +141,7 @@ def test_async_read(index_or_path):
def test_async_read_timeout():
config = OpenCVCameraConfig(index_or_path=DEFAULT_PNG_FILE_PATH)
camera = OpenCVCamera(config)
camera.connect(do_warmup_read=False)
camera.connect(warmup=False)
try:
with pytest.raises(TimeoutError):
@@ -170,13 +170,13 @@ def test_async_read_before_connect():
],
)
def test_rotation(rotation, index_or_path):
filename = os.path.basename(index_or_path)
filename = Path(index_or_path).name
dimensions = filename.split("_")[-1].split(".")[0] # Assumes filenames format (_wxh.png)
original_width, original_height = map(int, dimensions.split("x"))
config = OpenCVCameraConfig(index_or_path=index_or_path, rotation=rotation)
camera = OpenCVCamera(config)
camera.connect(do_warmup_read=False)
camera.connect(warmup=False)
img = camera.read()
assert isinstance(img, np.ndarray)

View File

@@ -19,7 +19,7 @@
# pytest tests/cameras/test_opencv.py::test_connect
# ```
import os
from pathlib import Path
from unittest.mock import patch
import numpy as np
@@ -33,8 +33,8 @@ try:
except (ImportError, ModuleNotFoundError):
pytest.skip("pyrealsense2 not available", allow_module_level=True)
TEST_ARTIFACTS_DIR = os.path.join(os.path.dirname(os.path.dirname(__file__)), "artifacts", "cameras")
BAG_FILE_PATH = os.path.join(TEST_ARTIFACTS_DIR, "test_rs.bag")
TEST_ARTIFACTS_DIR = Path(__file__).parent.parent / "artifacts" / "cameras"
BAG_FILE_PATH = TEST_ARTIFACTS_DIR / "test_rs.bag"
# NOTE(Steven): Missing tests for depth
# NOTE(Steven): Takes 20sec, the patch being the biggest bottleneck
@@ -42,7 +42,7 @@ BAG_FILE_PATH = os.path.join(TEST_ARTIFACTS_DIR, "test_rs.bag")
def mock_rs_config_enable_device_from_file(rs_config_instance, sn):
return rs_config_instance.enable_device_from_file(BAG_FILE_PATH, repeat_playback=True)
return rs_config_instance.enable_device_from_file(str(BAG_FILE_PATH), repeat_playback=True)
def mock_rs_config_enable_device_bad_file(rs_config_instance, sn):
@@ -60,7 +60,7 @@ def test_connect(mock_enable_device):
config = RealSenseCameraConfig(serial_number=42)
camera = RealSenseCamera(config)
camera.connect(do_warmup_read=False)
camera.connect(warmup=False)
assert camera.is_connected
@@ -68,10 +68,10 @@ def test_connect(mock_enable_device):
def test_connect_already_connected(mock_enable_device):
config = RealSenseCameraConfig(serial_number=42)
camera = RealSenseCamera(config)
camera.connect(do_warmup_read=False)
camera.connect(warmup=False)
with pytest.raises(DeviceAlreadyConnectedError):
camera.connect(do_warmup_read=False)
camera.connect(warmup=False)
@patch("pyrealsense2.config.enable_device", side_effect=mock_rs_config_enable_device_bad_file)
@@ -80,7 +80,7 @@ def test_connect_invalid_camera_path(mock_enable_device):
camera = RealSenseCamera(config)
with pytest.raises(ConnectionError):
camera.connect(do_warmup_read=False)
camera.connect(warmup=False)
@patch("pyrealsense2.config.enable_device", side_effect=mock_rs_config_enable_device_from_file)
@@ -89,14 +89,14 @@ def test_invalid_width_connect(mock_enable_device):
camera = RealSenseCamera(config)
with pytest.raises(ConnectionError):
camera.connect(do_warmup_read=False)
camera.connect(warmup=False)
@patch("pyrealsense2.config.enable_device", side_effect=mock_rs_config_enable_device_from_file)
def test_read(mock_enable_device):
config = RealSenseCameraConfig(serial_number=42, width=640, height=480, fps=30)
camera = RealSenseCamera(config)
camera.connect(do_warmup_read=False)
camera.connect(warmup=False)
img = camera.read()
assert isinstance(img, np.ndarray)
@@ -114,7 +114,7 @@ def test_read_before_connect():
def test_disconnect(mock_enable_device):
config = RealSenseCameraConfig(serial_number=42)
camera = RealSenseCamera(config)
camera.connect(do_warmup_read=False)
camera.connect(warmup=False)
camera.disconnect()
@@ -133,7 +133,7 @@ def test_disconnect_before_connect():
def test_async_read(mock_enable_device):
config = RealSenseCameraConfig(serial_number=42, width=640, height=480, fps=30)
camera = RealSenseCamera(config)
camera.connect(do_warmup_read=False)
camera.connect(warmup=False)
try:
img = camera.async_read()
@@ -150,7 +150,7 @@ def test_async_read(mock_enable_device):
def test_async_read_timeout(mock_enable_device):
config = RealSenseCameraConfig(serial_number=42, width=640, height=480, fps=30)
camera = RealSenseCamera(config)
camera.connect(do_warmup_read=False)
camera.connect(warmup=False)
try:
with pytest.raises(TimeoutError):
@@ -181,7 +181,7 @@ def test_async_read_before_connect():
def test_rotation(mock_enable_device, rotation):
config = RealSenseCameraConfig(serial_number=42, rotation=rotation)
camera = RealSenseCamera(config)
camera.connect(do_warmup_read=False)
camera.connect(warmup=False)
img = camera.read()
assert isinstance(img, np.ndarray)