opencv.py

This commit is contained in:
Remi Cadene
2024-07-29 18:06:02 +02:00
parent b0f4f4f9a1
commit 1af22d685b

View File

@@ -50,6 +50,24 @@ def find_camera_indices(raise_when_empty=False, max_index_search_range=MAX_OPENC
return camera_ids
def find_camera_ports(raise_when_empty=False):
camera_ports = []
for port in Path("/dev").glob("video*"):
port = str(port)
camera = cv2.VideoCapture(port)
is_open = camera.isOpened()
camera.release()
if is_open:
print(f"Camera found at port {port}")
camera_ports.append(port)
if raise_when_empty and len(camera_ports) == 0:
raise OSError(
"Not a single camera was detected. Try re-plugging, or re-installing `opencv2`, or your camera driver, or make sure your camera is compatible with opencv2."
)
return camera_ports
def save_image(img_array, camera_index, frame_index, images_dir):
img = Image.fromarray(img_array)
@@ -59,11 +77,16 @@ def save_image(img_array, camera_index, frame_index, images_dir):
def save_images_from_cameras(
images_dir: Path, camera_ids=None, fps=None, width=None, height=None, record_time_s=2
images_dir: Path, camera_ids: list[int] | list[str] | None=None, fps=None, width=None, height=None, record_time_s=2
):
if camera_ids is None:
print("Finding available camera indices")
camera_ids = find_camera_indices()
print("Finding available camera indices through '/dev/video*' ports")
camera_ids = find_camera_ports()
if len(camera_ids) == 0:
print(f"Finding available camera indices through scanning all indices from 0 to {MAX_OPENCV_INDEX}")
camera_ids = find_camera_indices()
print("Connecting cameras")
cameras = []
@@ -76,7 +99,7 @@ def save_images_from_cameras(
cameras.append(camera)
images_dir = Path(
images_dir,
images_dir
)
if images_dir.exists():
shutil.rmtree(
@@ -99,7 +122,7 @@ def save_images_from_cameras(
executor.submit(
save_image,
image,
camera.camera_index,
camera.camera_index_to_int(),
frame_index,
images_dir,
)
@@ -200,6 +223,11 @@ class OpenCVCamera:
self.stop_event = None
self.color_image = None
self.logs = {}
def camera_index_to_int(self):
if isinstance(self.camera_index, int):
return self.camera_index
return int(self.camera_index.replace("/dev/video", ""))
def connect(self):
if self.is_connected:
@@ -216,7 +244,9 @@ class OpenCVCamera:
# valid cameras.
if not is_camera_open:
# Verify that the provided `camera_index` is valid before printing the traceback
available_cam_ids = find_camera_indices()
available_cam_ids = find_camera_ports()
if len(available_cam_ids) == 0:
available_cam_ids = find_camera_indices()
if self.camera_index not in available_cam_ids:
raise ValueError(
f"`camera_index` is expected to be one of these available cameras {available_cam_ids}, but {self.camera_index} is provided instead."
@@ -360,7 +390,7 @@ if __name__ == "__main__":
)
parser.add_argument(
"--camera-ids",
type=int,
type=str,
nargs="*",
default=None,
help="List of camera indices used to instantiate the `OpenCVCamera`. If not provided, find and use all available camera indices.",