From 7e9f955b4023887439eba681f14bbb81b40c1357 Mon Sep 17 00:00:00 2001 From: Juan Pizarro Date: Thu, 17 Jul 2025 17:01:48 +0200 Subject: [PATCH] fix(hil-serl): drain queue on get_last_item_from_queue (#1524) * fix(hil-serl): drain queue on get_last_item_from_queue * parametrize queue tests * revert changes for Darwin * revert parametrize queue tests * add test_get_last_item_multiple_items_with_torch_queue * update test_get_last_item_multiple_items_with_torch_queue * update test_get_last_item_multiple_items_with_torch_queue --- src/lerobot/utils/queue.py | 21 +++++++++++++++++---- tests/utils/test_queue.py | 16 ++++++++++++++++ 2 files changed, 33 insertions(+), 4 deletions(-) diff --git a/src/lerobot/utils/queue.py b/src/lerobot/utils/queue.py index ceb30e2b..864d798a 100644 --- a/src/lerobot/utils/queue.py +++ b/src/lerobot/utils/queue.py @@ -14,6 +14,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +import platform +from contextlib import suppress from queue import Empty from typing import Any @@ -30,10 +32,21 @@ def get_last_item_from_queue(queue: Queue, block=True, timeout: float = 0.1) -> item = None # Drain queue and keep only the most recent parameters - try: - while True: + if platform.system() == "Darwin": + # On Mac, avoid using `qsize` due to unreliable implementation. + # There is a comment on `qsize` code in the Python source: + # Raises NotImplementedError on Mac OSX because of broken sem_getvalue() + try: + while True: + item = queue.get_nowait() + except Empty: + pass + + return item + + # Details about using qsize in https://github.com/huggingface/lerobot/issues/1523 + while queue.qsize() > 0: + with suppress(Empty): item = queue.get_nowait() - except Empty: - pass return item diff --git a/tests/utils/test_queue.py b/tests/utils/test_queue.py index 0a0d2177..6e42acdb 100644 --- a/tests/utils/test_queue.py +++ b/tests/utils/test_queue.py @@ -18,6 +18,8 @@ import threading import time from queue import Queue +from torch.multiprocessing import Queue as TorchMPQueue + from lerobot.utils.queue import get_last_item_from_queue @@ -46,6 +48,20 @@ def test_get_last_item_multiple_items(): assert queue.empty() +def test_get_last_item_multiple_items_with_torch_queue(): + """Test getting the last item when queue has multiple items.""" + queue = TorchMPQueue() + items = ["first", "second", "third", "fourth", "last"] + + for item in items: + queue.put(item) + + result = get_last_item_from_queue(queue) + + assert result == "last" + assert queue.empty() + + def test_get_last_item_different_types(): """Test with different data types in the queue.""" queue = Queue()