merge but remove refactor of save_camera_images

This commit is contained in:
Remi Cadene
2024-10-16 18:10:01 +02:00
55 changed files with 7644 additions and 2545 deletions

View File

@@ -2,27 +2,27 @@
This file contains utilities for recording frames from cameras. For more info look at `OpenCVCamera` docstring.
"""
import argparse
import concurrent.futures
import math
import platform
import shutil
import threading
import time
from dataclasses import dataclass, replace
from pathlib import Path
from threading import Thread
import cv2
import numpy as np
from PIL import Image
from lerobot.common.robot_devices.utils import (
RobotDeviceAlreadyConnectedError,
RobotDeviceNotConnectedError,
busy_wait,
)
from lerobot.common.utils.utils import capture_timestamp_utc
# Use 1 thread to avoid blocking the main thread. Especially useful during data collection
# when other threads are used to save the images.
cv2.setNumThreads(1)
# The maximum opencv device index depends on your operating system. For instance,
# if you have 3 cameras, they should be associated to index 0, 1, and 2. This is the case
# on MacOS. However, on Ubuntu, the indices are different like 6, 16, 23.
@@ -31,20 +31,44 @@ cv2.setNumThreads(1)
MAX_OPENCV_INDEX = 60
def find_camera_indices(raise_when_empty=False, max_index_search_range=MAX_OPENCV_INDEX):
def find_cameras(raise_when_empty=False, max_index_search_range=MAX_OPENCV_INDEX, mock=False) -> list[dict]:
cameras = []
if platform.system() == "Linux":
# Linux uses camera ports
print("Linux detected. Finding available camera indices through scanning '/dev/video*' ports")
possible_camera_ids = []
for port in Path("/dev").glob("video*"):
camera_idx = int(str(port).replace("/dev/video", ""))
possible_camera_ids.append(camera_idx)
possible_ports = [str(port) for port in Path("/dev").glob("video*")]
ports = _find_cameras(possible_ports, mock=mock)
for port in ports:
cameras.append(
{
"port": port,
"index": int(port.removeprefix("/dev/video")),
}
)
else:
print(
"Mac or Windows detected. Finding available camera indices through "
f"scanning all indices from 0 to {MAX_OPENCV_INDEX}"
)
possible_camera_ids = range(max_index_search_range)
possible_indices = range(max_index_search_range)
indices = _find_cameras(possible_indices, mock=mock)
for index in indices:
cameras.append(
{
"port": None,
"index": index,
}
)
return cameras
def _find_cameras(
possible_camera_ids: list[int | str], raise_when_empty=False, mock=False
) -> list[int | str]:
if mock:
import tests.mock_cv2 as cv2
else:
import cv2
camera_ids = []
for camera_idx in possible_camera_ids:
@@ -65,6 +89,92 @@ def find_camera_indices(raise_when_empty=False, max_index_search_range=MAX_OPENC
return camera_ids
def is_valid_unix_path(path: str) -> bool:
"""Note: if 'path' points to a symlink, this will return True only if the target exists"""
p = Path(path)
return p.is_absolute() and p.exists()
def get_camera_index_from_unix_port(port: Path) -> int:
return int(str(port.resolve()).removeprefix("/dev/video"))
def save_image(img_array, camera_index, frame_index, images_dir):
img = Image.fromarray(img_array)
path = images_dir / f"camera_{camera_index:02d}_frame_{frame_index:06d}.png"
path.parent.mkdir(parents=True, exist_ok=True)
img.save(str(path), quality=100)
def save_images_from_cameras(
images_dir: Path,
camera_ids: list | None = None,
fps=None,
width=None,
height=None,
record_time_s=2,
mock=False,
):
"""
Initializes all the cameras and saves images to the directory. Useful to visually identify the camera
associated to a given camera index.
"""
if camera_ids is None or len(camera_ids) == 0:
camera_infos = find_cameras(mock=mock)
camera_ids = [cam["index"] for cam in camera_infos]
print("Connecting cameras")
cameras = []
for cam_idx in camera_ids:
camera = OpenCVCamera(cam_idx, fps=fps, width=width, height=height, mock=mock)
camera.connect()
print(
f"OpenCVCamera({camera.camera_index}, fps={camera.fps}, width={camera.width}, "
f"height={camera.height}, color_mode={camera.color_mode})"
)
cameras.append(camera)
images_dir = Path(images_dir)
if images_dir.exists():
shutil.rmtree(
images_dir,
)
images_dir.mkdir(parents=True, exist_ok=True)
print(f"Saving images to {images_dir}")
frame_index = 0
start_time = time.perf_counter()
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
while True:
now = time.perf_counter()
for camera in cameras:
# If we use async_read when fps is None, the loop will go full speed, and we will endup
# saving the same images from the cameras multiple times until the RAM/disk is full.
image = camera.read() if fps is None else camera.async_read()
executor.submit(
save_image,
image,
camera.camera_index,
frame_index,
images_dir,
)
if fps is not None:
dt_s = time.perf_counter() - now
busy_wait(1 / fps - dt_s)
print(f"Frame: {frame_index:04d}\tLatency (ms): {(time.perf_counter() - now) * 1000:.2f}")
if time.perf_counter() - start_time > record_time_s:
break
frame_index += 1
print(f"Images have been saved to {images_dir}")
@dataclass
class OpenCVCameraConfig:
"""
@@ -82,6 +192,8 @@ class OpenCVCameraConfig:
width: int | None = None
height: int | None = None
color_mode: str = "rgb"
rotation: int | None = None
mock: bool = False
def __post_init__(self):
if self.color_mode not in ["rgb", "bgr"]:
@@ -89,6 +201,9 @@ class OpenCVCameraConfig:
f"`color_mode` is expected to be 'rgb' or 'bgr', but {self.color_mode} is provided."
)
if self.rotation not in [-90, None, 90, 180]:
raise ValueError(f"`rotation` must be in [-90, None, 90, 180] (got {self.rotation})")
class OpenCVCamera:
"""
@@ -131,7 +246,7 @@ class OpenCVCamera:
```
"""
def __init__(self, camera_index: int, config: OpenCVCameraConfig | None = None, **kwargs):
def __init__(self, camera_index: int | str, config: OpenCVCameraConfig | None = None, **kwargs):
if config is None:
config = OpenCVCameraConfig()
@@ -139,10 +254,24 @@ class OpenCVCamera:
config = replace(config, **kwargs)
self.camera_index = camera_index
self.port = None
# Linux uses ports for connecting to cameras
if platform.system() == "Linux":
if isinstance(self.camera_index, int):
self.port = Path(f"/dev/video{self.camera_index}")
elif isinstance(self.camera_index, str) and is_valid_unix_path(self.camera_index):
self.port = Path(self.camera_index)
# Retrieve the camera index from a potentially symlinked path
self.camera_index = get_camera_index_from_unix_port(self.port)
else:
raise ValueError(f"Please check the provided camera_index: {camera_index}")
self.fps = config.fps
self.width = config.width
self.height = config.height
self.color_mode = config.color_mode
self.mock = config.mock
self.camera = None
self.is_connected = False
@@ -151,43 +280,60 @@ class OpenCVCamera:
self.color_image = None
self.logs = {}
if self.mock:
import tests.mock_cv2 as cv2
else:
import cv2
# TODO(aliberts): Do we keep original width/height or do we define them after rotation?
self.rotation = None
if config.rotation == -90:
self.rotation = cv2.ROTATE_90_COUNTERCLOCKWISE
elif config.rotation == 90:
self.rotation = cv2.ROTATE_90_CLOCKWISE
elif config.rotation == 180:
self.rotation = cv2.ROTATE_180
def connect(self):
if self.is_connected:
raise RobotDeviceAlreadyConnectedError(f"OpenCVCamera({self.camera_index}) is already connected.")
if self.mock:
import tests.mock_cv2 as cv2
else:
import cv2
# Use 1 thread to avoid blocking the main thread. Especially useful during data collection
# when other threads are used to save the images.
cv2.setNumThreads(1)
camera_idx = f"/dev/video{self.camera_index}" if platform.system() == "Linux" else self.camera_index
# First create a temporary camera trying to access `camera_index`,
# and verify it is a valid camera by calling `isOpened`.
if platform.system() == "Linux":
# Linux uses ports for connecting to cameras
tmp_camera = cv2.VideoCapture(f"/dev/video{self.camera_index}")
else:
tmp_camera = cv2.VideoCapture(self.camera_index)
tmp_camera = cv2.VideoCapture(camera_idx)
is_camera_open = tmp_camera.isOpened()
# Release camera to make it accessible for `find_camera_indices`
tmp_camera.release()
del tmp_camera
# If the camera doesn't work, display the camera indices corresponding to
# valid cameras.
if not is_camera_open:
# Verify that the provided `camera_index` is valid before printing the traceback
available_cam_ids = find_camera_indices()
cameras_info = find_cameras()
available_cam_ids = [cam["index"] for cam in cameras_info]
if self.camera_index not in available_cam_ids:
raise ValueError(
f"`camera_index` is expected to be one of these available cameras {available_cam_ids}, but {self.camera_index} is provided instead. "
"To find the camera index you should use, run `python lerobot/scripts/save_images_from_cameras.py --driver opencv`."
)
raise OSError(f"Can't access OpenCVCamera({self.camera_index}).")
raise OSError(f"Can't access OpenCVCamera({camera_idx}).")
# Secondly, create the camera that will be used downstream.
# Note: For some unknown reason, calling `isOpened` blocks the camera which then
# needs to be re-created.
if platform.system() == "Linux":
self.camera = cv2.VideoCapture(f"/dev/video{self.camera_index}")
else:
self.camera = cv2.VideoCapture(self.camera_index)
self.camera = cv2.VideoCapture(camera_idx)
if self.fps is not None:
self.camera.set(cv2.CAP_PROP_FPS, self.fps)
@@ -200,22 +346,24 @@ class OpenCVCamera:
actual_width = self.camera.get(cv2.CAP_PROP_FRAME_WIDTH)
actual_height = self.camera.get(cv2.CAP_PROP_FRAME_HEIGHT)
# Using `math.isclose` since actual fps can be a float (e.g. 29.9 instead of 30)
if self.fps is not None and not math.isclose(self.fps, actual_fps, rel_tol=1e-3):
# Using `OSError` since it's a broad that encompasses issues related to device communication
raise OSError(
f"Can't set {self.fps=} for OpenCVCamera({self.camera_index}). Actual value is {actual_fps}."
)
if self.width is not None and self.width != actual_width:
if self.width is not None and not math.isclose(self.width, actual_width, rel_tol=1e-3):
raise OSError(
f"Can't set {self.width=} for OpenCVCamera({self.camera_index}). Actual value is {actual_width}."
)
if self.height is not None and self.height != actual_height:
if self.height is not None and not math.isclose(self.height, actual_height, rel_tol=1e-3):
raise OSError(
f"Can't set {self.height=} for OpenCVCamera({self.camera_index}). Actual value is {actual_height}."
)
self.fps = actual_fps
self.width = actual_width
self.height = actual_height
self.fps = round(actual_fps)
self.width = round(actual_width)
self.height = round(actual_height)
self.is_connected = True
@@ -234,6 +382,7 @@ class OpenCVCamera:
start_time = time.perf_counter()
ret, color_image = self.camera.read()
if not ret:
raise OSError(f"Can't capture color image from camera {self.camera_index}.")
@@ -248,6 +397,11 @@ class OpenCVCamera:
# However, Deep Learning framework such as LeRobot uses RGB format as default to train neural networks,
# so we convert the image color from BGR to RGB.
if requested_color_mode == "rgb":
if self.mock:
import tests.mock_cv2 as cv2
else:
import cv2
color_image = cv2.cvtColor(color_image, cv2.COLOR_BGR2RGB)
h, w, _ = color_image.shape
@@ -256,17 +410,25 @@ class OpenCVCamera:
f"Can't capture color image with expected height and width ({self.height} x {self.width}). ({h} x {w}) returned instead."
)
if self.rotation is not None:
color_image = cv2.rotate(color_image, self.rotation)
# log the number of seconds it took to read the image
self.logs["delta_timestamp_s"] = time.perf_counter() - start_time
# log the utc time at which the image was received
self.logs["timestamp_utc"] = capture_timestamp_utc()
self.color_image = color_image
return color_image
def read_loop(self):
while self.stop_event is None or not self.stop_event.is_set():
self.color_image = self.read()
while not self.stop_event.is_set():
try:
self.color_image = self.read()
except Exception as e:
print(f"Error reading in thread: {e}")
def async_read(self):
if not self.is_connected:
@@ -281,15 +443,14 @@ class OpenCVCamera:
self.thread.start()
num_tries = 0
while self.color_image is None:
num_tries += 1
time.sleep(1 / self.fps)
if num_tries > self.fps and (self.thread.ident is None or not self.thread.is_alive()):
raise Exception(
"The thread responsible for `self.async_read()` took too much time to start. There might be an issue. Verify that `self.thread.start()` has been called."
)
while True:
if self.color_image is not None:
return self.color_image
return self.color_image
time.sleep(1 / self.fps)
num_tries += 1
if num_tries > self.fps * 2:
raise TimeoutError("Timed out waiting for async_read() to start.")
def disconnect(self):
if not self.is_connected:
@@ -297,18 +458,61 @@ class OpenCVCamera:
f"OpenCVCamera({self.camera_index}) is not connected. Try running `camera.connect()` first."
)
if self.thread is not None and self.thread.is_alive():
# wait for the thread to finish
if self.thread is not None:
self.stop_event.set()
self.thread.join()
self.thread.join() # wait for the thread to finish
self.thread = None
self.stop_event = None
self.camera.release()
self.camera = None
self.is_connected = False
def __del__(self):
if getattr(self, "is_connected", False):
self.disconnect()
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Save a few frames using `OpenCVCamera` for all cameras connected to the computer, or a selected subset."
)
parser.add_argument(
"--camera-ids",
type=int,
nargs="*",
default=None,
help="List of camera indices used to instantiate the `OpenCVCamera`. If not provided, find and use all available camera indices.",
)
parser.add_argument(
"--fps",
type=int,
default=None,
help="Set the number of frames recorded per seconds for all cameras. If not provided, use the default fps of each camera.",
)
parser.add_argument(
"--width",
type=str,
default=None,
help="Set the width for all cameras. If not provided, use the default width of each camera.",
)
parser.add_argument(
"--height",
type=str,
default=None,
help="Set the height for all cameras. If not provided, use the default height of each camera.",
)
parser.add_argument(
"--images-dir",
type=Path,
default="outputs/images_from_opencv_cameras",
help="Set directory to save a few frames for each camera.",
)
parser.add_argument(
"--record-time-s",
type=float,
default=4.0,
help="Set the number of seconds used to record the frames. By default, 2 seconds.",
)
args = parser.parse_args()
save_images_from_cameras(**vars(args))