Files
issacdataengine/nimbus/utils/utils.py
2026-03-16 11:44:10 +00:00

183 lines
6.4 KiB
Python

import functools
import os
import re
import sys
import time
from typing import Tuple, Type, Union
from nimbus.components.data.observation import Observations
from nimbus.components.data.scene import Scene
from nimbus.components.data.sequence import Sequence
def init_env():
sys.path.append("./")
sys.path.append("./data_engine")
sys.path.append("workflows/simbox")
def unpack_iter_data(data: tuple):
assert len(data) <= 3, "not support yet"
scene = None
seq = None
obs = None
for item in data:
if isinstance(item, Scene):
scene = item
elif isinstance(item, Sequence):
seq = item
elif isinstance(item, Observations):
obs = item
return scene, seq, obs
def consume_stage(stage_input):
if hasattr(stage_input, "Args"):
consume_iterators(stage_input.Args)
for value in stage_input.Args:
if hasattr(value, "__del__"):
value.__del__() # pylint: disable=C2801
if hasattr(stage_input, "Kwargs"):
if stage_input.Kwargs is not None:
for value in stage_input.Kwargs.values():
consume_iterators(value)
if hasattr(value, "__del__"):
value.__del__() # pylint: disable=C2801
# prevent isaac sim close pipe worker in advance
def pipe_consume_stage(stage_input):
if hasattr(stage_input, "Args"):
consume_iterators(stage_input.Args)
if hasattr(stage_input, "Kwargs"):
if stage_input.Kwargs is not None:
for value in stage_input.Kwargs.values():
consume_iterators(value)
def consume_iterators(obj):
# from pdb import set_trace; set_trace()
if isinstance(obj, (str, bytes)):
return obj
if isinstance(obj, dict):
return {key: consume_iterators(value) for key, value in obj.items()}
if isinstance(obj, list):
return [consume_iterators(item) for item in obj]
if isinstance(obj, tuple):
return tuple(consume_iterators(item) for item in obj)
if hasattr(obj, "__iter__"):
for item in obj:
consume_iterators(item)
return obj
def scene_names_postprocess(scene_names: list) -> list:
"""
Distributes a list of scene names (folders) among multiple workers in a distributed environment.
This function is designed to work with Deep Learning Container (DLC) environments, where worker
information is extracted from environment variables. It assigns a subset of the input scene names
to the current worker based on its rank and the total number of workers, using a round-robin strategy.
If not running in a DLC environment, all scene names are assigned to a single worker.
Args:
scene_names (list): List of scene names (typically folder names) to be distributed.
Returns:
list: The subset of scene names assigned to the current worker.
Raises:
PermissionError: If there is a permission issue accessing the input directory.
RuntimeError: For any other errors encountered during processing.
Notes:
- The function expects certain environment variables (e.g., POD_NAME, WORLD_SIZE) to be set
in DLC environments.
- If multiple workers are present, the input list is sorted before distribution to ensure
consistent assignment across workers.
"""
def _get_dlc_worker_info():
"""Extract worker rank and world size from DLC environment variables."""
pod_name = os.environ.get("POD_NAME")
if pod_name:
# Match worker-N or master-N patterns
match = re.search(r"dlc.*?-(worker|master)-(\d+)$", pod_name)
if match:
node_type, node_id = match.groups()
world_size = int(os.environ.get("WORLD_SIZE", "1"))
if node_type == "worker":
rank = int(node_id)
else: # master node
rank = world_size - 1
return rank, world_size
# Default for non-DLC environment
return 0, 1
def _distribute_folders(all_folders, rank, world_size):
"""Distribute folders among workers using round-robin strategy."""
if not all_folders:
return []
# Only sort when there are multiple workers to ensure consistency
if world_size > 1:
all_folders.sort()
# Distribute using slicing: worker i gets folders at indices i, i+world_size, ...
return all_folders[rank::world_size]
try:
# Get all subfolders
all_subfolders = scene_names
if not all_subfolders:
print(f"Warning: No scene found in {scene_names}")
return []
# Get worker identity and distribute folders
rank, world_size = _get_dlc_worker_info()
assigned_folders = _distribute_folders(all_subfolders, rank, world_size)
print(
f"DLC Worker {rank}/{world_size}: Assigned {len(assigned_folders)} out of "
f"{len(all_subfolders)} total folders"
)
return assigned_folders
except PermissionError:
raise PermissionError(f"No permission to access directory: {scene_names}")
except Exception as e:
raise RuntimeError(f"Error reading input directory {scene_names}: {e}")
def retry_on_exception(
max_retries: int = 3, retry_exceptions: Union[bool, Tuple[Type[Exception], ...]] = True, delay: float = 1.0
):
def decorator(func):
@functools.wraps(func)
def wrapper(self, *args, **kwargs):
last_exception = None
for attempt in range(max_retries + 1):
try:
if attempt > 0:
print(f"Retry attempt {attempt}/{max_retries} for {func.__name__}")
return func(self, *args, **kwargs)
except Exception as e:
last_exception = e
should_retry = False
if retry_exceptions is True:
should_retry = True
elif isinstance(retry_exceptions, (tuple, list)):
should_retry = isinstance(e, retry_exceptions)
if should_retry and attempt < max_retries:
print(f"Error in {func.__name__}: {e}. Retrying in {delay} seconds...")
time.sleep(delay)
else:
raise
if last_exception:
raise last_exception
return wrapper
return decorator