Fix: camera code changes only (#1788)

This commit is contained in:
Jaisree25
2025-10-20 03:57:10 -07:00
committed by GitHub
parent a5ca206c49
commit c54cd529a2
10 changed files with 104 additions and 52 deletions

View File

@@ -17,7 +17,7 @@
import abc import abc
from typing import Any from typing import Any
import numpy as np from numpy.typing import NDArray # type: ignore # TODO: add type stubs for numpy.typing
from .configs import CameraConfig, ColorMode from .configs import CameraConfig, ColorMode
@@ -89,7 +89,7 @@ class Camera(abc.ABC):
pass pass
@abc.abstractmethod @abc.abstractmethod
def read(self, color_mode: ColorMode | None = None) -> np.ndarray: def read(self, color_mode: ColorMode | None = None) -> NDArray[Any]:
"""Capture and return a single frame from the camera. """Capture and return a single frame from the camera.
Args: Args:
@@ -102,7 +102,7 @@ class Camera(abc.ABC):
pass pass
@abc.abstractmethod @abc.abstractmethod
def async_read(self, timeout_ms: float = ...) -> np.ndarray: def async_read(self, timeout_ms: float = ...) -> NDArray[Any]:
"""Asynchronously capture and return a single frame from the camera. """Asynchronously capture and return a single frame from the camera.
Args: Args:

View File

@@ -18,7 +18,7 @@ import abc
from dataclasses import dataclass from dataclasses import dataclass
from enum import Enum from enum import Enum
import draccus import draccus # type: ignore # TODO: add type stubs for draccus
class ColorMode(str, Enum): class ColorMode(str, Enum):
@@ -34,11 +34,11 @@ class Cv2Rotation(int, Enum):
@dataclass(kw_only=True) @dataclass(kw_only=True)
class CameraConfig(draccus.ChoiceRegistry, abc.ABC): class CameraConfig(draccus.ChoiceRegistry, abc.ABC): # type: ignore # TODO: add type stubs for draccus
fps: int | None = None fps: int | None = None
width: int | None = None width: int | None = None
height: int | None = None height: int | None = None
@property @property
def type(self) -> str: def type(self) -> str:
return self.get_choice_name(self.__class__) return str(self.get_choice_name(self.__class__))

View File

@@ -14,3 +14,5 @@
from .camera_opencv import OpenCVCamera from .camera_opencv import OpenCVCamera
from .configuration_opencv import OpenCVCameraConfig from .configuration_opencv import OpenCVCameraConfig
__all__ = ["OpenCVCamera", "OpenCVCameraConfig"]

View File

@@ -25,11 +25,12 @@ from pathlib import Path
from threading import Event, Lock, Thread from threading import Event, Lock, Thread
from typing import Any from typing import Any
from numpy.typing import NDArray # type: ignore # TODO: add type stubs for numpy.typing
# Fix MSMF hardware transform compatibility for Windows before importing cv2 # Fix MSMF hardware transform compatibility for Windows before importing cv2
if platform.system() == "Windows" and "OPENCV_VIDEOIO_MSMF_ENABLE_HW_TRANSFORMS" not in os.environ: if platform.system() == "Windows" and "OPENCV_VIDEOIO_MSMF_ENABLE_HW_TRANSFORMS" not in os.environ:
os.environ["OPENCV_VIDEOIO_MSMF_ENABLE_HW_TRANSFORMS"] = "0" os.environ["OPENCV_VIDEOIO_MSMF_ENABLE_HW_TRANSFORMS"] = "0"
import cv2 import cv2 # type: ignore # TODO: add type stubs for OpenCV
import numpy as np
from lerobot.utils.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError from lerobot.utils.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
@@ -121,7 +122,7 @@ class OpenCVCamera(Camera):
self.thread: Thread | None = None self.thread: Thread | None = None
self.stop_event: Event | None = None self.stop_event: Event | None = None
self.frame_lock: Lock = Lock() self.frame_lock: Lock = Lock()
self.latest_frame: np.ndarray | None = None self.latest_frame: NDArray[Any] | None = None
self.new_frame_event: Event = Event() self.new_frame_event: Event = Event()
self.rotation: int | None = get_cv2_rotation(config.rotation) self.rotation: int | None = get_cv2_rotation(config.rotation)
@@ -140,7 +141,7 @@ class OpenCVCamera(Camera):
"""Checks if the camera is currently connected and opened.""" """Checks if the camera is currently connected and opened."""
return isinstance(self.videocapture, cv2.VideoCapture) and self.videocapture.isOpened() return isinstance(self.videocapture, cv2.VideoCapture) and self.videocapture.isOpened()
def connect(self, warmup: bool = True): def connect(self, warmup: bool = True) -> None:
""" """
Connects to the OpenCV camera specified in the configuration. Connects to the OpenCV camera specified in the configuration.
@@ -199,6 +200,9 @@ class OpenCVCamera(Camera):
if not self.is_connected: if not self.is_connected:
raise DeviceNotConnectedError(f"Cannot configure settings for {self} as it is not connected.") raise DeviceNotConnectedError(f"Cannot configure settings for {self} as it is not connected.")
if self.videocapture is None:
raise DeviceNotConnectedError(f"{self} videocapture is not initialized")
if self.fps is None: if self.fps is None:
self.fps = self.videocapture.get(cv2.CAP_PROP_FPS) self.fps = self.videocapture.get(cv2.CAP_PROP_FPS)
else: else:
@@ -219,6 +223,12 @@ class OpenCVCamera(Camera):
def _validate_fps(self) -> None: def _validate_fps(self) -> None:
"""Validates and sets the camera's frames per second (FPS).""" """Validates and sets the camera's frames per second (FPS)."""
if self.videocapture is None:
raise DeviceNotConnectedError(f"{self} videocapture is not initialized")
if self.fps is None:
raise ValueError(f"{self} FPS is not set")
success = self.videocapture.set(cv2.CAP_PROP_FPS, float(self.fps)) success = self.videocapture.set(cv2.CAP_PROP_FPS, float(self.fps))
actual_fps = self.videocapture.get(cv2.CAP_PROP_FPS) actual_fps = self.videocapture.get(cv2.CAP_PROP_FPS)
# Use math.isclose for robust float comparison # Use math.isclose for robust float comparison
@@ -228,6 +238,12 @@ class OpenCVCamera(Camera):
def _validate_width_and_height(self) -> None: def _validate_width_and_height(self) -> None:
"""Validates and sets the camera's frame capture width and height.""" """Validates and sets the camera's frame capture width and height."""
if self.videocapture is None:
raise DeviceNotConnectedError(f"{self} videocapture is not initialized")
if self.capture_width is None or self.capture_height is None:
raise ValueError(f"{self} capture_width or capture_height is not set")
width_success = self.videocapture.set(cv2.CAP_PROP_FRAME_WIDTH, float(self.capture_width)) width_success = self.videocapture.set(cv2.CAP_PROP_FRAME_WIDTH, float(self.capture_width))
height_success = self.videocapture.set(cv2.CAP_PROP_FRAME_HEIGHT, float(self.capture_height)) height_success = self.videocapture.set(cv2.CAP_PROP_FRAME_HEIGHT, float(self.capture_height))
@@ -262,7 +278,7 @@ class OpenCVCamera(Camera):
possible_paths = sorted(Path("/dev").glob("video*"), key=lambda p: p.name) possible_paths = sorted(Path("/dev").glob("video*"), key=lambda p: p.name)
targets_to_scan = [str(p) for p in possible_paths] targets_to_scan = [str(p) for p in possible_paths]
else: else:
targets_to_scan = list(range(MAX_OPENCV_INDEX)) targets_to_scan = [str(i) for i in range(MAX_OPENCV_INDEX)]
for target in targets_to_scan: for target in targets_to_scan:
camera = cv2.VideoCapture(target) camera = cv2.VideoCapture(target)
@@ -289,7 +305,7 @@ class OpenCVCamera(Camera):
return found_cameras_info return found_cameras_info
def read(self, color_mode: ColorMode | None = None) -> np.ndarray: def read(self, color_mode: ColorMode | None = None) -> NDArray[Any]:
""" """
Reads a single frame synchronously from the camera. Reads a single frame synchronously from the camera.
@@ -317,6 +333,9 @@ class OpenCVCamera(Camera):
start_time = time.perf_counter() start_time = time.perf_counter()
if self.videocapture is None:
raise DeviceNotConnectedError(f"{self} videocapture is not initialized")
ret, frame = self.videocapture.read() ret, frame = self.videocapture.read()
if not ret or frame is None: if not ret or frame is None:
@@ -329,7 +348,7 @@ class OpenCVCamera(Camera):
return processed_frame return processed_frame
def _postprocess_image(self, image: np.ndarray, color_mode: ColorMode | None = None) -> np.ndarray: def _postprocess_image(self, image: NDArray[Any], color_mode: ColorMode | None = None) -> NDArray[Any]:
""" """
Applies color conversion, dimension validation, and rotation to a raw frame. Applies color conversion, dimension validation, and rotation to a raw frame.
@@ -372,7 +391,7 @@ class OpenCVCamera(Camera):
return processed_image return processed_image
def _read_loop(self): def _read_loop(self) -> None:
""" """
Internal loop run by the background thread for asynchronous reading. Internal loop run by the background thread for asynchronous reading.
@@ -383,6 +402,9 @@ class OpenCVCamera(Camera):
Stops on DeviceNotConnectedError, logs other errors and continues. Stops on DeviceNotConnectedError, logs other errors and continues.
""" """
if self.stop_event is None:
raise RuntimeError(f"{self}: stop_event is not initialized before starting read loop.")
while not self.stop_event.is_set(): while not self.stop_event.is_set():
try: try:
color_image = self.read() color_image = self.read()
@@ -419,7 +441,7 @@ class OpenCVCamera(Camera):
self.thread = None self.thread = None
self.stop_event = None self.stop_event = None
def async_read(self, timeout_ms: float = 200) -> np.ndarray: def async_read(self, timeout_ms: float = 200) -> NDArray[Any]:
""" """
Reads the latest available frame asynchronously. Reads the latest available frame asynchronously.
@@ -462,7 +484,7 @@ class OpenCVCamera(Camera):
return frame return frame
def disconnect(self): def disconnect(self) -> None:
""" """
Disconnects from the camera and cleans up resources. Disconnects from the camera and cleans up resources.

View File

@@ -17,6 +17,8 @@ from pathlib import Path
from ..configs import CameraConfig, ColorMode, Cv2Rotation from ..configs import CameraConfig, ColorMode, Cv2Rotation
__all__ = ["OpenCVCameraConfig", "ColorMode", "Cv2Rotation"]
@CameraConfig.register_subclass("opencv") @CameraConfig.register_subclass("opencv")
@dataclass @dataclass
@@ -56,7 +58,7 @@ class OpenCVCameraConfig(CameraConfig):
rotation: Cv2Rotation = Cv2Rotation.NO_ROTATION rotation: Cv2Rotation = Cv2Rotation.NO_ROTATION
warmup_s: int = 1 warmup_s: int = 1
def __post_init__(self): def __post_init__(self) -> None:
if self.color_mode not in (ColorMode.RGB, ColorMode.BGR): if self.color_mode not in (ColorMode.RGB, ColorMode.BGR):
raise ValueError( raise ValueError(
f"`color_mode` is expected to be {ColorMode.RGB.value} or {ColorMode.BGR.value}, but {self.color_mode} is provided." f"`color_mode` is expected to be {ColorMode.RGB.value} or {ColorMode.BGR.value}, but {self.color_mode} is provided."

View File

@@ -16,6 +16,8 @@ from dataclasses import dataclass
from ..configs import CameraConfig, ColorMode from ..configs import CameraConfig, ColorMode
__all__ = ["CameraConfig", "ColorMode", "Reachy2CameraConfig"]
@CameraConfig.register_subclass("reachy2_camera") @CameraConfig.register_subclass("reachy2_camera")
@dataclass @dataclass
@@ -62,7 +64,7 @@ class Reachy2CameraConfig(CameraConfig):
port: int = 50065 port: int = 50065
# use_depth: bool = False # use_depth: bool = False
def __post_init__(self): def __post_init__(self) -> None:
if self.name not in ["teleop", "depth"]: if self.name not in ["teleop", "depth"]:
raise ValueError(f"`name` is expected to be 'teleop' or 'depth', but {self.name} is provided.") raise ValueError(f"`name` is expected to be 'teleop' or 'depth', but {self.name} is provided.")
if (self.name == "teleop" and self.image_type not in ["left", "right"]) or ( if (self.name == "teleop" and self.image_type not in ["left", "right"]) or (

View File

@@ -23,13 +23,17 @@ import time
from threading import Event, Lock, Thread from threading import Event, Lock, Thread
from typing import Any from typing import Any
from numpy.typing import NDArray # type: ignore # TODO: add type stubs for numpy.typing
# Fix MSMF hardware transform compatibility for Windows before importing cv2 # Fix MSMF hardware transform compatibility for Windows before importing cv2
if platform.system() == "Windows" and "OPENCV_VIDEOIO_MSMF_ENABLE_HW_TRANSFORMS" not in os.environ: if platform.system() == "Windows" and "OPENCV_VIDEOIO_MSMF_ENABLE_HW_TRANSFORMS" not in os.environ:
os.environ["OPENCV_VIDEOIO_MSMF_ENABLE_HW_TRANSFORMS"] = "0" os.environ["OPENCV_VIDEOIO_MSMF_ENABLE_HW_TRANSFORMS"] = "0"
import cv2 import cv2 # type: ignore # TODO: add type stubs for OpenCV
import numpy as np import numpy as np # type: ignore # TODO: add type stubs for numpy
from reachy2_sdk.media.camera import CameraView from reachy2_sdk.media.camera import CameraView # type: ignore # TODO: add type stubs for reachy2_sdk
from reachy2_sdk.media.camera_manager import CameraManager from reachy2_sdk.media.camera_manager import ( # type: ignore # TODO: add type stubs for reachy2_sdk
CameraManager,
)
from lerobot.utils.errors import DeviceNotConnectedError from lerobot.utils.errors import DeviceNotConnectedError
@@ -73,7 +77,7 @@ class Reachy2Camera(Camera):
self.thread: Thread | None = None self.thread: Thread | None = None
self.stop_event: Event | None = None self.stop_event: Event | None = None
self.frame_lock: Lock = Lock() self.frame_lock: Lock = Lock()
self.latest_frame: np.ndarray | None = None self.latest_frame: NDArray[Any] | None = None
self.new_frame_event: Event = Event() self.new_frame_event: Event = Event()
def __str__(self) -> str: def __str__(self) -> str:
@@ -83,13 +87,17 @@ class Reachy2Camera(Camera):
def is_connected(self) -> bool: def is_connected(self) -> bool:
"""Checks if the camera is currently connected and opened.""" """Checks if the camera is currently connected and opened."""
if self.config.name == "teleop": if self.config.name == "teleop":
return self.cam_manager._grpc_connected and self.cam_manager.teleop if self.cam_manager else False return bool(
self.cam_manager._grpc_connected and self.cam_manager.teleop if self.cam_manager else False
)
elif self.config.name == "depth": elif self.config.name == "depth":
return self.cam_manager._grpc_connected and self.cam_manager.depth if self.cam_manager else False return bool(
self.cam_manager._grpc_connected and self.cam_manager.depth if self.cam_manager else False
)
else: else:
raise ValueError(f"Invalid camera name '{self.config.name}'. Expected 'teleop' or 'depth'.") raise ValueError(f"Invalid camera name '{self.config.name}'. Expected 'teleop' or 'depth'.")
def connect(self, warmup: bool = True): def connect(self, warmup: bool = True) -> None:
""" """
Connects to the Reachy2 CameraManager as specified in the configuration. Connects to the Reachy2 CameraManager as specified in the configuration.
""" """
@@ -131,7 +139,7 @@ class Reachy2Camera(Camera):
camera_manager.disconnect() camera_manager.disconnect()
return initialized_cameras return initialized_cameras
def read(self, color_mode: ColorMode | None = None) -> np.ndarray: def read(self, color_mode: ColorMode | None = None) -> NDArray[Any]:
""" """
Reads a single frame synchronously from the camera. Reads a single frame synchronously from the camera.
@@ -152,7 +160,7 @@ class Reachy2Camera(Camera):
start_time = time.perf_counter() start_time = time.perf_counter()
frame = None frame: NDArray[Any] = np.empty((0, 0, 3), dtype=np.uint8)
if self.cam_manager is None: if self.cam_manager is None:
raise DeviceNotConnectedError(f"{self} is not connected.") raise DeviceNotConnectedError(f"{self} is not connected.")
@@ -179,7 +187,7 @@ class Reachy2Camera(Camera):
return frame return frame
def _read_loop(self): def _read_loop(self) -> None:
""" """
Internal loop run by the background thread for asynchronous reading. Internal loop run by the background thread for asynchronous reading.
@@ -190,6 +198,9 @@ class Reachy2Camera(Camera):
Stops on DeviceNotConnectedError, logs other errors and continues. Stops on DeviceNotConnectedError, logs other errors and continues.
""" """
if self.stop_event is None:
raise RuntimeError(f"{self}: stop_event is not initialized before starting read loop.")
while not self.stop_event.is_set(): while not self.stop_event.is_set():
try: try:
color_image = self.read() color_image = self.read()
@@ -226,7 +237,7 @@ class Reachy2Camera(Camera):
self.thread = None self.thread = None
self.stop_event = None self.stop_event = None
def async_read(self, timeout_ms: float = 200) -> np.ndarray: def async_read(self, timeout_ms: float = 200) -> NDArray[Any]:
""" """
Reads the latest available frame asynchronously. Reads the latest available frame asynchronously.
@@ -269,7 +280,7 @@ class Reachy2Camera(Camera):
return frame return frame
def disconnect(self): def disconnect(self) -> None:
""" """
Stops the background read thread (if running). Stops the background read thread (if running).

View File

@@ -21,11 +21,12 @@ import time
from threading import Event, Lock, Thread from threading import Event, Lock, Thread
from typing import Any from typing import Any
import cv2 import cv2 # type: ignore # TODO: add type stubs for OpenCV
import numpy as np import numpy as np # type: ignore # TODO: add type stubs for numpy
from numpy.typing import NDArray # type: ignore # TODO: add type stubs for numpy.typing
try: try:
import pyrealsense2 as rs import pyrealsense2 as rs # type: ignore # TODO: add type stubs for pyrealsense2
except Exception as e: except Exception as e:
logging.info(f"Could not import realsense: {e}") logging.info(f"Could not import realsense: {e}")
@@ -132,7 +133,7 @@ class RealSenseCamera(Camera):
self.thread: Thread | None = None self.thread: Thread | None = None
self.stop_event: Event | None = None self.stop_event: Event | None = None
self.frame_lock: Lock = Lock() self.frame_lock: Lock = Lock()
self.latest_frame: np.ndarray | None = None self.latest_frame: NDArray[Any] | None = None
self.new_frame_event: Event = Event() self.new_frame_event: Event = Event()
self.rotation: int | None = get_cv2_rotation(config.rotation) self.rotation: int | None = get_cv2_rotation(config.rotation)
@@ -150,7 +151,7 @@ class RealSenseCamera(Camera):
"""Checks if the camera pipeline is started and streams are active.""" """Checks if the camera pipeline is started and streams are active."""
return self.rs_pipeline is not None and self.rs_profile is not None return self.rs_pipeline is not None and self.rs_profile is not None
def connect(self, warmup: bool = True): def connect(self, warmup: bool = True) -> None:
""" """
Connects to the RealSense camera specified in the configuration. Connects to the RealSense camera specified in the configuration.
@@ -264,7 +265,7 @@ class RealSenseCamera(Camera):
serial_number = str(found_devices[0]["serial_number"]) serial_number = str(found_devices[0]["serial_number"])
return serial_number return serial_number
def _configure_rs_pipeline_config(self, rs_config): def _configure_rs_pipeline_config(self, rs_config: Any) -> None:
"""Creates and configures the RealSense pipeline configuration object.""" """Creates and configures the RealSense pipeline configuration object."""
rs.config.enable_device(rs_config, self.serial_number) rs.config.enable_device(rs_config, self.serial_number)
@@ -293,6 +294,9 @@ class RealSenseCamera(Camera):
if not self.is_connected: if not self.is_connected:
raise DeviceNotConnectedError(f"Cannot validate settings for {self} as it is not connected.") raise DeviceNotConnectedError(f"Cannot validate settings for {self} as it is not connected.")
if self.rs_profile is None:
raise RuntimeError(f"{self}: rs_profile must be initialized before use.")
stream = self.rs_profile.get_stream(rs.stream.color).as_video_stream_profile() stream = self.rs_profile.get_stream(rs.stream.color).as_video_stream_profile()
if self.fps is None: if self.fps is None:
@@ -308,7 +312,7 @@ class RealSenseCamera(Camera):
self.width, self.height = actual_width, actual_height self.width, self.height = actual_width, actual_height
self.capture_width, self.capture_height = actual_width, actual_height self.capture_width, self.capture_height = actual_width, actual_height
def read_depth(self, timeout_ms: int = 200) -> np.ndarray: def read_depth(self, timeout_ms: int = 200) -> NDArray[Any]:
""" """
Reads a single frame (depth) synchronously from the camera. Reads a single frame (depth) synchronously from the camera.
@@ -336,6 +340,9 @@ class RealSenseCamera(Camera):
start_time = time.perf_counter() start_time = time.perf_counter()
if self.rs_pipeline is None:
raise RuntimeError(f"{self}: rs_pipeline must be initialized before use.")
ret, frame = self.rs_pipeline.try_wait_for_frames(timeout_ms=timeout_ms) ret, frame = self.rs_pipeline.try_wait_for_frames(timeout_ms=timeout_ms)
if not ret or frame is None: if not ret or frame is None:
@@ -351,7 +358,7 @@ class RealSenseCamera(Camera):
return depth_map_processed return depth_map_processed
def read(self, color_mode: ColorMode | None = None, timeout_ms: int = 200) -> np.ndarray: def read(self, color_mode: ColorMode | None = None, timeout_ms: int = 200) -> NDArray[Any]:
""" """
Reads a single frame (color) synchronously from the camera. Reads a single frame (color) synchronously from the camera.
@@ -376,6 +383,9 @@ class RealSenseCamera(Camera):
start_time = time.perf_counter() start_time = time.perf_counter()
if self.rs_pipeline is None:
raise RuntimeError(f"{self}: rs_pipeline must be initialized before use.")
ret, frame = self.rs_pipeline.try_wait_for_frames(timeout_ms=timeout_ms) ret, frame = self.rs_pipeline.try_wait_for_frames(timeout_ms=timeout_ms)
if not ret or frame is None: if not ret or frame is None:
@@ -392,8 +402,8 @@ class RealSenseCamera(Camera):
return color_image_processed return color_image_processed
def _postprocess_image( def _postprocess_image(
self, image: np.ndarray, color_mode: ColorMode | None = None, depth_frame: bool = False self, image: NDArray[Any], color_mode: ColorMode | None = None, depth_frame: bool = False
) -> np.ndarray: ) -> NDArray[Any]:
""" """
Applies color conversion, dimension validation, and rotation to a raw color frame. Applies color conversion, dimension validation, and rotation to a raw color frame.
@@ -438,7 +448,7 @@ class RealSenseCamera(Camera):
return processed_image return processed_image
def _read_loop(self): def _read_loop(self) -> None:
""" """
Internal loop run by the background thread for asynchronous reading. Internal loop run by the background thread for asynchronous reading.
@@ -449,6 +459,9 @@ class RealSenseCamera(Camera):
Stops on DeviceNotConnectedError, logs other errors and continues. Stops on DeviceNotConnectedError, logs other errors and continues.
""" """
if self.stop_event is None:
raise RuntimeError(f"{self}: stop_event is not initialized before starting read loop.")
while not self.stop_event.is_set(): while not self.stop_event.is_set():
try: try:
color_image = self.read(timeout_ms=500) color_image = self.read(timeout_ms=500)
@@ -474,7 +487,7 @@ class RealSenseCamera(Camera):
self.thread.daemon = True self.thread.daemon = True
self.thread.start() self.thread.start()
def _stop_read_thread(self): def _stop_read_thread(self) -> None:
"""Signals the background read thread to stop and waits for it to join.""" """Signals the background read thread to stop and waits for it to join."""
if self.stop_event is not None: if self.stop_event is not None:
self.stop_event.set() self.stop_event.set()
@@ -486,7 +499,7 @@ class RealSenseCamera(Camera):
self.stop_event = None self.stop_event = None
# NOTE(Steven): Missing implementation for depth for now # NOTE(Steven): Missing implementation for depth for now
def async_read(self, timeout_ms: float = 200) -> np.ndarray: def async_read(self, timeout_ms: float = 200) -> NDArray[Any]:
""" """
Reads the latest available frame data (color) asynchronously. Reads the latest available frame data (color) asynchronously.
@@ -529,7 +542,7 @@ class RealSenseCamera(Camera):
return frame return frame
def disconnect(self): def disconnect(self) -> None:
""" """
Disconnects from the camera, stops the pipeline, and cleans up resources. Disconnects from the camera, stops the pipeline, and cleans up resources.

View File

@@ -59,7 +59,7 @@ class RealSenseCameraConfig(CameraConfig):
rotation: Cv2Rotation = Cv2Rotation.NO_ROTATION rotation: Cv2Rotation = Cv2Rotation.NO_ROTATION
warmup_s: int = 1 warmup_s: int = 1
def __post_init__(self): def __post_init__(self) -> None:
if self.color_mode not in (ColorMode.RGB, ColorMode.BGR): if self.color_mode not in (ColorMode.RGB, ColorMode.BGR):
raise ValueError( raise ValueError(
f"`color_mode` is expected to be {ColorMode.RGB.value} or {ColorMode.BGR.value}, but {self.color_mode} is provided." f"`color_mode` is expected to be {ColorMode.RGB.value} or {ColorMode.BGR.value}, but {self.color_mode} is provided."

View File

@@ -53,14 +53,14 @@ def make_cameras_from_configs(camera_configs: dict[str, CameraConfig]) -> dict[s
def get_cv2_rotation(rotation: Cv2Rotation) -> int | None: def get_cv2_rotation(rotation: Cv2Rotation) -> int | None:
import cv2 import cv2 # type: ignore # TODO: add type stubs for OpenCV
if rotation == Cv2Rotation.ROTATE_90: if rotation == Cv2Rotation.ROTATE_90:
return cv2.ROTATE_90_CLOCKWISE return int(cv2.ROTATE_90_CLOCKWISE)
elif rotation == Cv2Rotation.ROTATE_180: elif rotation == Cv2Rotation.ROTATE_180:
return cv2.ROTATE_180 return int(cv2.ROTATE_180)
elif rotation == Cv2Rotation.ROTATE_270: elif rotation == Cv2Rotation.ROTATE_270:
return cv2.ROTATE_90_COUNTERCLOCKWISE return int(cv2.ROTATE_90_COUNTERCLOCKWISE)
else: else:
return None return None
@@ -69,8 +69,8 @@ def get_cv2_backend() -> int:
import cv2 import cv2
if platform.system() == "Windows": if platform.system() == "Windows":
return cv2.CAP_MSMF # Use MSMF for Windows instead of AVFOUNDATION return int(cv2.CAP_MSMF) # Use MSMF for Windows instead of AVFOUNDATION
# elif platform.system() == "Darwin": # macOS # elif platform.system() == "Darwin": # macOS
# return cv2.CAP_AVFOUNDATION # return cv2.CAP_AVFOUNDATION
else: # Linux and others else: # Linux and others
return cv2.CAP_ANY return int(cv2.CAP_ANY)