forked from tangger/lerobot
fix(hil-serl): drain queue on get_last_item_from_queue (#1524)
* fix(hil-serl): drain queue on get_last_item_from_queue * parametrize queue tests * revert changes for Darwin * revert parametrize queue tests * add test_get_last_item_multiple_items_with_torch_queue * update test_get_last_item_multiple_items_with_torch_queue * update test_get_last_item_multiple_items_with_torch_queue
This commit is contained in:
@@ -14,6 +14,8 @@
|
|||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
|
import platform
|
||||||
|
from contextlib import suppress
|
||||||
from queue import Empty
|
from queue import Empty
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
@@ -30,10 +32,21 @@ def get_last_item_from_queue(queue: Queue, block=True, timeout: float = 0.1) ->
|
|||||||
item = None
|
item = None
|
||||||
|
|
||||||
# Drain queue and keep only the most recent parameters
|
# Drain queue and keep only the most recent parameters
|
||||||
try:
|
if platform.system() == "Darwin":
|
||||||
while True:
|
# On Mac, avoid using `qsize` due to unreliable implementation.
|
||||||
|
# There is a comment on `qsize` code in the Python source:
|
||||||
|
# Raises NotImplementedError on Mac OSX because of broken sem_getvalue()
|
||||||
|
try:
|
||||||
|
while True:
|
||||||
|
item = queue.get_nowait()
|
||||||
|
except Empty:
|
||||||
|
pass
|
||||||
|
|
||||||
|
return item
|
||||||
|
|
||||||
|
# Details about using qsize in https://github.com/huggingface/lerobot/issues/1523
|
||||||
|
while queue.qsize() > 0:
|
||||||
|
with suppress(Empty):
|
||||||
item = queue.get_nowait()
|
item = queue.get_nowait()
|
||||||
except Empty:
|
|
||||||
pass
|
|
||||||
|
|
||||||
return item
|
return item
|
||||||
|
|||||||
@@ -18,6 +18,8 @@ import threading
|
|||||||
import time
|
import time
|
||||||
from queue import Queue
|
from queue import Queue
|
||||||
|
|
||||||
|
from torch.multiprocessing import Queue as TorchMPQueue
|
||||||
|
|
||||||
from lerobot.utils.queue import get_last_item_from_queue
|
from lerobot.utils.queue import get_last_item_from_queue
|
||||||
|
|
||||||
|
|
||||||
@@ -46,6 +48,20 @@ def test_get_last_item_multiple_items():
|
|||||||
assert queue.empty()
|
assert queue.empty()
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_last_item_multiple_items_with_torch_queue():
|
||||||
|
"""Test getting the last item when queue has multiple items."""
|
||||||
|
queue = TorchMPQueue()
|
||||||
|
items = ["first", "second", "third", "fourth", "last"]
|
||||||
|
|
||||||
|
for item in items:
|
||||||
|
queue.put(item)
|
||||||
|
|
||||||
|
result = get_last_item_from_queue(queue)
|
||||||
|
|
||||||
|
assert result == "last"
|
||||||
|
assert queue.empty()
|
||||||
|
|
||||||
|
|
||||||
def test_get_last_item_different_types():
|
def test_get_last_item_different_types():
|
||||||
"""Test with different data types in the queue."""
|
"""Test with different data types in the queue."""
|
||||||
queue = Queue()
|
queue = Queue()
|
||||||
|
|||||||
Reference in New Issue
Block a user