# Copyright (c) 2022-2026, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md). # All rights reserved. # # SPDX-License-Identifier: BSD-3-Clause """Script to play a checkpoint if an RL agent from RSL-RL.""" """Launch Isaac Sim Simulator first.""" import argparse import sys import os import torch import torchvision import gymnasium as gym import time from datetime import datetime from isaaclab.app import AppLauncher # local imports import cli_args # isort: skip # add argparse arguments parser = argparse.ArgumentParser(description="Train an RL agent with RSL-RL.") parser.add_argument("--video", action="store_true", default=False, help="Record videos during training.") parser.add_argument("--video_length", type=int, default=2000, help="Length of the recorded video (in steps).") parser.add_argument( "--disable_fabric", action="store_true", default=False, help="Disable fabric and use USD I/O operations." ) parser.add_argument("--num_envs", type=int, default=None, help="Number of environments to simulate.") parser.add_argument("--task", type=str, default=None, help="Name of the task.") parser.add_argument( "--agent", type=str, default="rsl_rl_cfg_entry_point", help="Name of the RL agent configuration entry point." ) parser.add_argument("--seed", type=int, default=None, help="Seed used for the environment") parser.add_argument( "--use_pretrained_checkpoint", action="store_true", help="Use the pre-trained checkpoint from Nucleus.", ) parser.add_argument("--real-time", action="store_true", default=False, help="Run in real-time, if possible.") # append RSL-RL cli arguments cli_args.add_rsl_rl_args(parser) # append AppLauncher cli args AppLauncher.add_app_launcher_args(parser) # parse the arguments args_cli, hydra_args = parser.parse_known_args() # always enable cameras to record video if args_cli.video: args_cli.enable_cameras = True # clear out sys.argv for Hydra sys.argv = [sys.argv[0]] + hydra_args # launch omniverse app app_launcher = AppLauncher(args_cli) simulation_app = app_launcher.app """Rest everything follows.""" from rsl_rl.runners import DistillationRunner, OnPolicyRunner from isaaclab.envs import ( DirectMARLEnv, DirectMARLEnvCfg, DirectRLEnvCfg, ManagerBasedRLEnvCfg, multi_agent_to_single_agent, ) from isaaclab.utils.assets import retrieve_file_path from isaaclab.utils.dict import print_dict from isaaclab_rl.rsl_rl import RslRlBaseRunnerCfg, RslRlVecEnvWrapper, export_policy_as_jit, export_policy_as_onnx from isaaclab_rl.utils.pretrained_checkpoint import get_published_pretrained_checkpoint import isaaclab_tasks # noqa: F401 from isaaclab_tasks.utils import get_checkpoint_path from isaaclab_tasks.utils.hydra import hydra_task_config import mindbot.tasks # noqa: F401 # ============================================================================== # [Update] 多相机、多环境录制类 # ============================================================================== # 1. 确保文件顶部有以下导入 import imageio import numpy as np """use gpu compute to record video""" class MultiCameraRecorder: def __init__(self, env, camera_names: list[str], env_indices: list[int], output_dir: str, fps: int = 30): self.env = env self.camera_names = camera_names self.env_indices = env_indices self.output_dir = output_dir self.fps = fps self.frames = {cam_name: {env_idx: [] for env_idx in env_indices} for cam_name in camera_names} os.makedirs(self.output_dir, exist_ok=True) self.cameras = {} for name in camera_names: if name in self.env.unwrapped.scene.keys(): self.cameras[name] = self.env.unwrapped.scene[name] print(f"[INFO] Camera {name} linked.") def record_step(self): """保持在 GPU 上克隆数据""" for cam_name, camera_obj in self.cameras.items(): # 获取数据前强制同步一次(防止后端丢失) rgb_data = camera_obj.data.output["rgb"] for env_idx in self.env_indices: # 使用 .clone() 保持在 GPU,但要注意显存占用 self.frames[cam_name][env_idx].append(rgb_data[env_idx].clone()) def save_videos(self, filename_suffix=""): print(f"[INFO] Saving videos from GPU to Disk...") for cam_name, env_dict in self.frames.items(): for env_idx, frame_list in env_dict.items(): if not frame_list: continue # 转换为 torchvision 格式 (T, C, H, W) video_tensor = torch.stack(frame_list) if video_tensor.shape[-1] == 4: # RGBA -> RGB video_tensor = video_tensor[..., :3] # 移动到 CPU 并保存 video_cpu = video_tensor.cpu() output_path = os.path.join(self.output_dir, f"{cam_name}_env{env_idx}_{filename_suffix}.mp4") # 使用 torchvision 写入 (T, H, W, C) torchvision.io.write_video(output_path, video_cpu, fps=self.fps) # 【关键】保存后立即释放显存 del video_tensor del video_cpu frame_list.clear() torch.cuda.empty_cache() """use cpu compute to record video""" # # 2. 修改 MultiCameraRecorder 类 # class MultiCameraRecorder: # def __init__(self, env, camera_names: list[str], env_indices: list[int], output_dir: str, fps: int = 30): # self.env = env # self.camera_names = camera_names # self.env_indices = env_indices # self.output_dir = output_dir # self.fps = fps # self.frames = {cam_name: {env_idx: [] for env_idx in env_indices} for cam_name in camera_names} # os.makedirs(self.output_dir, exist_ok=True) # self.cameras = {} # for name in camera_names: # try: # self.cameras[name] = self.env.unwrapped.scene[name] # print(f"[INFO][MultiCameraRecorder] Found camera: {name}") # except KeyError: # print(f"[WARN][MultiCameraRecorder] Camera '{name}' not found!") # def record_step(self): # """在每个仿真步调用""" # for cam_name, camera_obj in self.cameras.items(): # # [关键修改] 获取数据前先确保数据已同步 # # 这可以防止读取到正在渲染中的内存导致 access violation # rgb_data = camera_obj.data.output["rgb"] # for env_idx in self.env_indices: # if env_idx >= rgb_data.shape[0]: continue # # 转换为 CPU 上的 numpy,这种方式通常比 torchvision 的 tensor 堆叠更稳 # frame = rgb_data[env_idx].clone().detach().cpu().numpy() # self.frames[cam_name][env_idx].append(frame) # def save_videos(self, filename_suffix=""): # """循环结束后调用""" # print(f"[INFO][MultiCameraRecorder] Saving videos...") # for cam_name, env_dict in self.frames.items(): # for env_idx, frame_list in env_dict.items(): # if not frame_list: continue # print(f" -> Saving {cam_name} (Env {env_idx})...") # # 处理格式并使用 imageio 保存 # processed_frames = [] # for img in frame_list: # # [0, 1] -> [0, 255] # if img.dtype != np.uint8: # if img.max() <= 1.01: img = (img * 255).astype(np.uint8) # else: img = img.astype(np.uint8) # # 去掉 Alpha 通道 # if img.shape[-1] == 4: img = img[:, :, :3] # processed_frames.append(img) # fname = f"{cam_name}_env{env_idx}_{filename_suffix}.mp4" # output_path = os.path.join(self.output_dir, fname) # try: # # 使用 imageio 写入视频 # imageio.mimsave(output_path, processed_frames, fps=self.fps) # print(f" Saved: {output_path}") # except Exception as e: # print(f" [ERROR] Failed to save {fname}: {e}") # class MultiCameraRecorder: # """ # 用于从 Isaac Lab 环境中录制多个相机的图像数据,并支持多个环境实例。 # """ # def __init__(self, env, camera_names: list[str], env_indices: list[int], output_dir: str, fps: int = 30): # self.env = env # self.camera_names = camera_names # self.env_indices = env_indices # 要录制的环境索引列表,例如 [0, 1] # self.output_dir = output_dir # self.fps = fps # # 数据结构: self.frames[camera_name][env_idx] = [list of tensors] # self.frames = { # cam_name: {env_idx: [] for env_idx in env_indices} # for cam_name in camera_names # } # os.makedirs(self.output_dir, exist_ok=True) # # 获取场景中的相机对象 # self.cameras = {} # for name in camera_names: # try: # # 尝试获取相机句柄 # self.cameras[name] = self.env.unwrapped.scene[name] # print(f"[INFO][MultiCameraRecorder] Found camera: {name}") # except KeyError: # print(f"[WARN][MultiCameraRecorder] Camera '{name}' not found in scene! Skipping.") # def record_step(self): # """在每个仿真步调用""" # for cam_name, camera_obj in self.cameras.items(): # # 获取 RGB 数据 (Shape: [num_envs, H, W, C]) # # 注意:如果数据类型是 RGBA,后面处理 # rgb_data = camera_obj.data.output["rgb"] # for env_idx in self.env_indices: # # 边界检查 # if env_idx >= rgb_data.shape[0]: # continue # # [关键] 立即 clone 并转到 CPU。 # # 如果不转 CPU,4个相机 * 长时间录制会迅速耗尽 GPU 显存。 # frame = rgb_data[env_idx].clone().detach().cpu() # self.frames[cam_name][env_idx].append(frame) # def save_videos(self, filename_suffix=""): # """循环结束后调用,保存所有视频""" # print(f"[INFO][MultiCameraRecorder] Saving videos for {len(self.cameras)} cameras...") # for cam_name, env_dict in self.frames.items(): # for env_idx, frame_list in env_dict.items(): # if not frame_list: # continue # print(f" -> Processing {cam_name} (Env {env_idx}) with {len(frame_list)} frames...") # # 堆叠 (T, H, W, C) # video_tensor = torch.stack(frame_list) # # 去除 Alpha 通道 # if video_tensor.shape[-1] == 4: # video_tensor = video_tensor[..., :3] # # (T, H, W, C) -> (T, C, H, W) # video_tensor = video_tensor.permute(0, 3, 1, 2) # # 确保是 uint8 # if video_tensor.dtype != torch.uint8: # if video_tensor.max() <= 1.0: # video_tensor = (video_tensor * 255).to(torch.uint8) # else: # video_tensor = video_tensor.to(torch.uint8) # # 文件名: left_hand_camera_env0_2024-xx-xx.mp4 # fname = f"{cam_name}_env{env_idx}_{filename_suffix}.mp4" # output_path = os.path.join(self.output_dir, fname) # torchvision.io.write_video(output_path, video_tensor, fps=self.fps) # print(f" Saved: {output_path}") # ============================================================================== @hydra_task_config(args_cli.task, args_cli.agent) def main(env_cfg: ManagerBasedRLEnvCfg | DirectRLEnvCfg | DirectMARLEnvCfg, agent_cfg: RslRlBaseRunnerCfg): """Play with RSL-RL agent.""" # grab task name for checkpoint path task_name = args_cli.task.split(":")[-1] train_task_name = task_name.replace("-Play", "") # override configurations with non-hydra CLI arguments agent_cfg: RslRlBaseRunnerCfg = cli_args.update_rsl_rl_cfg(agent_cfg, args_cli) env_cfg.scene.num_envs = args_cli.num_envs if args_cli.num_envs is not None else env_cfg.scene.num_envs # set the environment seed env_cfg.seed = agent_cfg.seed env_cfg.sim.device = args_cli.device if args_cli.device is not None else env_cfg.sim.device # specify directory for logging experiments log_root_path = os.path.join("logs", "rsl_rl", agent_cfg.experiment_name) log_root_path = os.path.abspath(log_root_path) print(f"[INFO] Loading experiment from directory: {log_root_path}") if args_cli.use_pretrained_checkpoint: resume_path = get_published_pretrained_checkpoint("rsl_rl", train_task_name) if not resume_path: print("[INFO] Unfortunately a pre-trained checkpoint is currently unavailable for this task.") return elif args_cli.checkpoint: resume_path = retrieve_file_path(args_cli.checkpoint) else: resume_path = get_checkpoint_path(log_root_path, agent_cfg.load_run, agent_cfg.load_checkpoint) log_dir = os.path.dirname(resume_path) # set the log directory for the environment (works for all environment types) env_cfg.log_dir = log_dir # create isaac environment env = gym.make(args_cli.task, cfg=env_cfg, render_mode="rgb_array" if args_cli.video else None) # convert to single-agent instance if required by the RL algorithm if isinstance(env.unwrapped, DirectMARLEnv): env = multi_agent_to_single_agent(env) # wrap for video recording (standard Gym recording - Viewport) if args_cli.video: video_kwargs = { "video_folder": os.path.join(log_dir, "videos", "play"), "step_trigger": lambda step: step == 0, "video_length": args_cli.video_length, "disable_logger": True, } print("[INFO] Recording videos during training.") print_dict(video_kwargs, nesting=4) env = gym.wrappers.RecordVideo(env, **video_kwargs) # wrap around environment for rsl-rl env = RslRlVecEnvWrapper(env, clip_actions=agent_cfg.clip_actions) print(f"[INFO]: Loading model checkpoint from: {resume_path}") # load previously trained model if agent_cfg.class_name == "OnPolicyRunner": runner = OnPolicyRunner(env, agent_cfg.to_dict(), log_dir=None, device=agent_cfg.device) elif agent_cfg.class_name == "DistillationRunner": runner = DistillationRunner(env, agent_cfg.to_dict(), log_dir=None, device=agent_cfg.device) else: raise ValueError(f"Unsupported runner class: {agent_cfg.class_name}") runner.load(resume_path) # obtain the trained policy for inference policy = runner.get_inference_policy(device=env.unwrapped.device) # extract the neural network module try: policy_nn = runner.alg.policy except AttributeError: policy_nn = runner.alg.actor_critic # extract the normalizer if hasattr(policy_nn, "actor_obs_normalizer"): normalizer = policy_nn.actor_obs_normalizer elif hasattr(policy_nn, "student_obs_normalizer"): normalizer = policy_nn.student_obs_normalizer else: normalizer = None # export policy to onnx/jit export_model_dir = os.path.join(os.path.dirname(resume_path), "exported") export_policy_as_jit(policy_nn, normalizer=normalizer, path=export_model_dir, filename="policy.pt") export_policy_as_onnx(policy_nn, normalizer=normalizer, path=export_model_dir, filename="policy.onnx") dt = env.unwrapped.step_dt # ========================================================================== # 初始化多相机录制器 # ========================================================================== recorder = None if args_cli.video: # 1. 定义要录制的相机列表(需与 mindbot_env_cfg.py 中的命名一致) target_cameras = [ "left_hand_camera", "right_hand_camera", "head_camera", "chest_camera" ] # 2. 定义要录制的环境索引 (env_nums) # 默认只录制第 0 个环境,如果要录制前 2 个,改为 [0, 1] # 注意:不要录制太多环境,否则写入视频时非常慢且占内存 envs_to_record = [0,1,2,3] # 如果你想录制所有环境(不推荐,除非num_envs很小): # envs_to_record = list(range(env.unwrapped.num_envs)) save_dir = os.path.join(log_dir, "robot_camera_recordings") recorder = MultiCameraRecorder( env=env, camera_names=target_cameras, env_indices=envs_to_record, output_dir=save_dir, fps=int(1/dt) ) # reset environment obs = env.get_observations() timestep = 0 print("[INFO] Starting simulation loop...") # simulate environment while simulation_app.is_running(): start_time = time.time() # run everything in inference mode with torch.inference_mode(): # agent stepping actions = policy(obs) # env stepping obs, _, dones, _ = env.step(actions) # ================================================================== # 每一帧录制数据 # ================================================================== if recorder: recorder.record_step() # reset recurrent states for episodes that have terminated policy_nn.reset(dones) if args_cli.video: timestep += 1 # Exit the play loop after recording one video if timestep == args_cli.video_length: # ============================================================== # 循环结束时保存所有视频 # ============================================================== if recorder: timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") recorder.save_videos(filename_suffix=timestamp) break # time delay for real-time evaluation sleep_time = dt - (time.time() - start_time) if args_cli.real_time and sleep_time > 0: time.sleep(sleep_time) # close the simulator env.close() if __name__ == "__main__": # run the main function main() # close sim app simulation_app.close() # # Copyright (c) 2022-2026, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md). # # All rights reserved. # # # # SPDX-License-Identifier: BSD-3-Clause # """Script to play a checkpoint if an RL agent from RSL-RL.""" # """Launch Isaac Sim Simulator first.""" # import argparse # import sys # import os # import torch # import torchvision # import imageio # import numpy as np # import gymnasium as gym # import time # from datetime import datetime # from isaaclab.app import AppLauncher # # local imports # import cli_args # isort: skip # # add argparse arguments # parser = argparse.ArgumentParser(description="Train an RL agent with RSL-RL.") # parser.add_argument("--video", action="store_true", default=False, help="Record videos during training.") # parser.add_argument("--video_length", type=int, default=2000, help="Length of the recorded video (in steps).") # parser.add_argument( # "--disable_fabric", action="store_true", default=False, help="Disable fabric and use USD I/O operations." # ) # parser.add_argument("--num_envs", type=int, default=None, help="Number of environments to simulate.") # parser.add_argument("--task", type=str, default=None, help="Name of the task.") # parser.add_argument( # "--agent", type=str, default="rsl_rl_cfg_entry_point", help="Name of the RL agent configuration entry point." # ) # parser.add_argument("--seed", type=int, default=None, help="Seed used for the environment") # parser.add_argument( # "--use_pretrained_checkpoint", # action="store_true", # help="Use the pre-trained checkpoint from Nucleus.", # ) # parser.add_argument("--real-time", action="store_true", default=False, help="Run in real-time, if possible.") # # append RSL-RL cli arguments # cli_args.add_rsl_rl_args(parser) # # append AppLauncher cli args # AppLauncher.add_app_launcher_args(parser) # # parse the arguments # args_cli, hydra_args = parser.parse_known_args() # # always enable cameras to record video # if args_cli.video: # args_cli.enable_cameras = True # # clear out sys.argv for Hydra # sys.argv = [sys.argv[0]] + hydra_args # # launch omniverse app # app_launcher = AppLauncher(args_cli) # simulation_app = app_launcher.app # """Rest everything follows.""" # from rsl_rl.runners import DistillationRunner, OnPolicyRunner # from isaaclab.envs import ( # DirectMARLEnv, # DirectMARLEnvCfg, # DirectRLEnvCfg, # ManagerBasedRLEnvCfg, # multi_agent_to_single_agent, # ) # from isaaclab.utils.assets import retrieve_file_path # from isaaclab.utils.dict import print_dict # from isaaclab_rl.rsl_rl import RslRlBaseRunnerCfg, RslRlVecEnvWrapper, export_policy_as_jit, export_policy_as_onnx # from isaaclab_rl.utils.pretrained_checkpoint import get_published_pretrained_checkpoint # import isaaclab_tasks # noqa: F401 # from isaaclab_tasks.utils import get_checkpoint_path # from isaaclab_tasks.utils.hydra import hydra_task_config # import mindbot.tasks # noqa: F401 # # ============================================================================== # # 新增:自定义相机录制类 # # ============================================================================== # class CameraRecorder: # """ # 用于从 Isaac Lab 环境中录制指定相机的图像数据并保存为视频。 # """ # def __init__(self, env, camera_name: str, output_dir: str, fps: int = 30): # self.env = env # self.camera_name = camera_name # self.output_dir = output_dir # self.fps = fps # self.frames = [] # 用于存储每一帧的 Tensor # # 确保输出目录存在 # os.makedirs(self.output_dir, exist_ok=True) # # 尝试从场景中获取相机对象 # # env.unwrapped 访问原始的 ManagerBasedRLEnv # try: # self.camera = self.env.unwrapped.scene[camera_name] # print(f"[INFO][CameraRecorder] Successfully found camera: {camera_name}") # except KeyError: # print(f"[ERROR][CameraRecorder] Camera '{camera_name}' not found in scene entities!") # print(f"Available entities: {self.env.unwrapped.scene.keys()}") # raise # def record_step(self): # """在每个仿真步调用,获取图像数据""" # # 获取 RGB 数据。注意:Isaac Lab 的输出通常是 (num_envs, H, W, C) # # 我们这里只取第一个环境 (env_index=0) # # output["rgb"] 可能是 RGBA 或 RGB # rgb_tensor = self.camera.data.output["rgb"][0] # # 如果数据在 GPU 上,克隆一份并 detach,防止被后续步骤覆盖 # # 此时先不转 CPU,以免拖慢仿真循环,最后保存时再转 # self.frames.append(rgb_tensor.clone().detach()) # def save_video(self, filename="robot_camera_video.mp4"): # """循环结束后调用,将内存中的帧保存为视频""" # if not self.frames: # print("[WARN][CameraRecorder] No frames recorded.") # return # print(f"[INFO][CameraRecorder] Saving {len(self.frames)} frames to video...") # # 将 tensor 转换为 numpy 格式 # video_frames = [] # for frame in self.frames: # # 转换为 CPU 上的 numpy 数组 (H, W, C) # img = frame.cpu().numpy() # # 处理归一化数据 [0, 1] -> [0, 255] # if img.dtype != np.uint8: # if img.max() <= 1.01: # img = (img * 255).astype(np.uint8) # else: # img = img.astype(np.uint8) # # 处理 RGBA -> RGB # if img.shape[-1] == 4: # img = img[:, :, :3] # video_frames.append(img) # output_path = os.path.join(self.output_dir, filename) # # 使用 imageio 写入视频,这种方式在 Windows 上更稳定 # try: # imageio.mimsave(output_path, video_frames, fps=self.fps) # print(f"[INFO][CameraRecorder] Video saved to: {output_path}") # except Exception as e: # print(f"[ERROR][CameraRecorder] Failed to save video using imageio: {e}") # # 如果 imageio 也失败,尝试最后的备选方案 # print("[INFO] Attempting to save frames as images instead...") # # ... (可选:保存为一系列图片) # # def save_video(self, filename="robot_camera_video.mp4"): # # """循环结束后调用,将内存中的帧保存为视频""" # # if not self.frames: # # print("[WARN][CameraRecorder] No frames recorded.") # # return # # print(f"[INFO][CameraRecorder] Saving {len(self.frames)} frames to video...") # # # 堆叠帧 -> (T, H, W, C) # # video_tensor = torch.stack(self.frames) # # # 处理 Alpha 通道:如果是 RGBA (4通道),只取前3个 RGB # # if video_tensor.shape[-1] == 4: # # video_tensor = video_tensor[..., :3] # # # 转换为 (T, C, H, W) 以符合 torchvision 的输入要求 # # # 原始: (T, H, W, C) -> Permute -> (T, C, H, W) # # video_tensor = video_tensor.permute(0, 3, 1, 2) # # # 确保是 uint8 类型 [0, 255] # # if video_tensor.dtype != torch.uint8: # # # 如果是 float [0,1],则 * 255 并转 uint8 # # if video_tensor.max() <= 1.0: # # video_tensor = (video_tensor * 255).to(torch.uint8) # # else: # # video_tensor = video_tensor.to(torch.uint8) # # # 移动到 CPU # # video_tensor = video_tensor.cpu() # # output_path = os.path.join(self.output_dir, filename) # # # 使用 torchvision 保存视频 # # torchvision.io.write_video(output_path, video_tensor, fps=self.fps) # # print(f"[INFO][CameraRecorder] Video saved to: {output_path}") # # ============================================================================== # @hydra_task_config(args_cli.task, args_cli.agent) # def main(env_cfg: ManagerBasedRLEnvCfg | DirectRLEnvCfg | DirectMARLEnvCfg, agent_cfg: RslRlBaseRunnerCfg): # """Play with RSL-RL agent.""" # # grab task name for checkpoint path # task_name = args_cli.task.split(":")[-1] # train_task_name = task_name.replace("-Play", "") # # override configurations with non-hydra CLI arguments # agent_cfg: RslRlBaseRunnerCfg = cli_args.update_rsl_rl_cfg(agent_cfg, args_cli) # env_cfg.scene.num_envs = args_cli.num_envs if args_cli.num_envs is not None else env_cfg.scene.num_envs # # set the environment seed # env_cfg.seed = agent_cfg.seed # env_cfg.sim.device = args_cli.device if args_cli.device is not None else env_cfg.sim.device # # specify directory for logging experiments # log_root_path = os.path.join("logs", "rsl_rl", agent_cfg.experiment_name) # log_root_path = os.path.abspath(log_root_path) # print(f"[INFO] Loading experiment from directory: {log_root_path}") # if args_cli.use_pretrained_checkpoint: # resume_path = get_published_pretrained_checkpoint("rsl_rl", train_task_name) # if not resume_path: # print("[INFO] Unfortunately a pre-trained checkpoint is currently unavailable for this task.") # return # elif args_cli.checkpoint: # resume_path = retrieve_file_path(args_cli.checkpoint) # else: # resume_path = get_checkpoint_path(log_root_path, agent_cfg.load_run, agent_cfg.load_checkpoint) # log_dir = os.path.dirname(resume_path) # # set the log directory for the environment (works for all environment types) # env_cfg.log_dir = log_dir # # create isaac environment # env = gym.make(args_cli.task, cfg=env_cfg, render_mode="rgb_array" if args_cli.video else None) # # convert to single-agent instance if required by the RL algorithm # if isinstance(env.unwrapped, DirectMARLEnv): # env = multi_agent_to_single_agent(env) # # wrap for video recording (standard Gym recording) # # 注意:这只是录制 Viewport 的画面,不是你机器人相机的画面 # if args_cli.video: # video_kwargs = { # "video_folder": os.path.join(log_dir, "videos", "play"), # "step_trigger": lambda step: step == 0, # "video_length": args_cli.video_length, # "disable_logger": True, # } # print("[INFO] Recording videos during training.") # print_dict(video_kwargs, nesting=4) # env = gym.wrappers.RecordVideo(env, **video_kwargs) # # wrap around environment for rsl-rl # env = RslRlVecEnvWrapper(env, clip_actions=agent_cfg.clip_actions) # print(f"[INFO]: Loading model checkpoint from: {resume_path}") # # load previously trained model # if agent_cfg.class_name == "OnPolicyRunner": # runner = OnPolicyRunner(env, agent_cfg.to_dict(), log_dir=None, device=agent_cfg.device) # elif agent_cfg.class_name == "DistillationRunner": # runner = DistillationRunner(env, agent_cfg.to_dict(), log_dir=None, device=agent_cfg.device) # else: # raise ValueError(f"Unsupported runner class: {agent_cfg.class_name}") # runner.load(resume_path) # # obtain the trained policy for inference # policy = runner.get_inference_policy(device=env.unwrapped.device) # # extract the neural network module # try: # policy_nn = runner.alg.policy # except AttributeError: # policy_nn = runner.alg.actor_critic # # extract the normalizer # if hasattr(policy_nn, "actor_obs_normalizer"): # normalizer = policy_nn.actor_obs_normalizer # elif hasattr(policy_nn, "student_obs_normalizer"): # normalizer = policy_nn.student_obs_normalizer # else: # normalizer = None # # export policy to onnx/jit # export_model_dir = os.path.join(os.path.dirname(resume_path), "exported") # export_policy_as_jit(policy_nn, normalizer=normalizer, path=export_model_dir, filename="policy.pt") # export_policy_as_onnx(policy_nn, normalizer=normalizer, path=export_model_dir, filename="policy.onnx") # dt = env.unwrapped.step_dt # # ========================================================================== # # 初始化自定义相机录制器 # # ========================================================================== # recorder = None # if args_cli.video: # # 这里的 output_dir 设为日志目录下的 hand_camera_videos # save_dir = os.path.join(log_dir, "hand_camera_videos") # # 这里的 "left_hand_camera" 必须对应 mindbot_env_cfg.py 中定义的名称 # recorder = CameraRecorder(env, "left_hand_camera", save_dir, fps=int(1/dt)) # # reset environment # obs = env.get_observations() # timestep = 0 # print("[INFO] Starting simulation loop...") # # simulate environment # while simulation_app.is_running(): # start_time = time.time() # # run everything in inference mode # with torch.inference_mode(): # # agent stepping # actions = policy(obs) # # env stepping # obs, _, dones, _ = env.step(actions) # # ================================================================== # # 每一帧录制数据 # # ================================================================== # if recorder: # recorder.record_step() # # reset recurrent states for episodes that have terminated # policy_nn.reset(dones) # if args_cli.video: # timestep += 1 # # Exit the play loop after recording one video # if timestep == args_cli.video_length: # # ============================================================== # # 循环结束时保存视频 # # ============================================================== # if recorder: # timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") # recorder.save_video(filename=f"hand_cam_{timestamp}.mp4") # break # # time delay for real-time evaluation # sleep_time = dt - (time.time() - start_time) # if args_cli.real_time and sleep_time > 0: # time.sleep(sleep_time) # # close the simulator # env.close() # if __name__ == "__main__": # # run the main function # main() # # close sim app # simulation_app.close() # # # Copyright (c) 2022-2026, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md). # # # All rights reserved. # # # # # # SPDX-License-Identifier: BSD-3-Clause # # """Script to play a checkpoint if an RL agent from RSL-RL.""" # # """Launch Isaac Sim Simulator first.""" # # import argparse # # import sys # # from isaaclab.app import AppLauncher # # # local imports # # import cli_args # isort: skip # # # add argparse arguments # # parser = argparse.ArgumentParser(description="Train an RL agent with RSL-RL.") # # parser.add_argument("--video", action="store_true", default=False, help="Record videos during training.") # # parser.add_argument("--video_length", type=int, default=200, help="Length of the recorded video (in steps).") # # parser.add_argument( # # "--disable_fabric", action="store_true", default=False, help="Disable fabric and use USD I/O operations." # # ) # # parser.add_argument("--num_envs", type=int, default=None, help="Number of environments to simulate.") # # parser.add_argument("--task", type=str, default=None, help="Name of the task.") # # parser.add_argument( # # "--agent", type=str, default="rsl_rl_cfg_entry_point", help="Name of the RL agent configuration entry point." # # ) # # parser.add_argument("--seed", type=int, default=None, help="Seed used for the environment") # # parser.add_argument( # # "--use_pretrained_checkpoint", # # action="store_true", # # help="Use the pre-trained checkpoint from Nucleus.", # # ) # # parser.add_argument("--real-time", action="store_true", default=False, help="Run in real-time, if possible.") # # # append RSL-RL cli arguments # # cli_args.add_rsl_rl_args(parser) # # # append AppLauncher cli args # # AppLauncher.add_app_launcher_args(parser) # # # parse the arguments # # args_cli, hydra_args = parser.parse_known_args() # # # always enable cameras to record video # # if args_cli.video: # # args_cli.enable_cameras = True # # # clear out sys.argv for Hydra # # sys.argv = [sys.argv[0]] + hydra_args # # # launch omniverse app # # app_launcher = AppLauncher(args_cli) # # simulation_app = app_launcher.app # # """Rest everything follows.""" # # import gymnasium as gym # # import os # # import time # # import torch # # from rsl_rl.runners import DistillationRunner, OnPolicyRunner # # from isaaclab.envs import ( # # DirectMARLEnv, # # DirectMARLEnvCfg, # # DirectRLEnvCfg, # # ManagerBasedRLEnvCfg, # # multi_agent_to_single_agent, # # ) # # from isaaclab.utils.assets import retrieve_file_path # # from isaaclab.utils.dict import print_dict # # from isaaclab_rl.rsl_rl import RslRlBaseRunnerCfg, RslRlVecEnvWrapper, export_policy_as_jit, export_policy_as_onnx # # from isaaclab_rl.utils.pretrained_checkpoint import get_published_pretrained_checkpoint # # import isaaclab_tasks # noqa: F401 # # from isaaclab_tasks.utils import get_checkpoint_path # # from isaaclab_tasks.utils.hydra import hydra_task_config # # import mindbot.tasks # noqa: F401 # # @hydra_task_config(args_cli.task, args_cli.agent) # # def main(env_cfg: ManagerBasedRLEnvCfg | DirectRLEnvCfg | DirectMARLEnvCfg, agent_cfg: RslRlBaseRunnerCfg): # # """Play with RSL-RL agent.""" # # # grab task name for checkpoint path # # task_name = args_cli.task.split(":")[-1] # # train_task_name = task_name.replace("-Play", "") # # # override configurations with non-hydra CLI arguments # # agent_cfg: RslRlBaseRunnerCfg = cli_args.update_rsl_rl_cfg(agent_cfg, args_cli) # # env_cfg.scene.num_envs = args_cli.num_envs if args_cli.num_envs is not None else env_cfg.scene.num_envs # # # set the environment seed # # # note: certain randomizations occur in the environment initialization so we set the seed here # # env_cfg.seed = agent_cfg.seed # # env_cfg.sim.device = args_cli.device if args_cli.device is not None else env_cfg.sim.device # # # specify directory for logging experiments # # log_root_path = os.path.join("logs", "rsl_rl", agent_cfg.experiment_name) # # log_root_path = os.path.abspath(log_root_path) # # print(f"[INFO] Loading experiment from directory: {log_root_path}") # # if args_cli.use_pretrained_checkpoint: # # resume_path = get_published_pretrained_checkpoint("rsl_rl", train_task_name) # # if not resume_path: # # print("[INFO] Unfortunately a pre-trained checkpoint is currently unavailable for this task.") # # return # # elif args_cli.checkpoint: # # resume_path = retrieve_file_path(args_cli.checkpoint) # # else: # # resume_path = get_checkpoint_path(log_root_path, agent_cfg.load_run, agent_cfg.load_checkpoint) # # log_dir = os.path.dirname(resume_path) # # # set the log directory for the environment (works for all environment types) # # env_cfg.log_dir = log_dir # # # create isaac environment # # env = gym.make(args_cli.task, cfg=env_cfg, render_mode="rgb_array" if args_cli.video else None) # # # convert to single-agent instance if required by the RL algorithm # # if isinstance(env.unwrapped, DirectMARLEnv): # # env = multi_agent_to_single_agent(env) # # # wrap for video recording # # if args_cli.video: # # video_kwargs = { # # "video_folder": os.path.join(log_dir, "videos", "play"), # # "step_trigger": lambda step: step == 0, # # "video_length": args_cli.video_length, # # "disable_logger": True, # # } # # print("[INFO] Recording videos during training.") # # print_dict(video_kwargs, nesting=4) # # env = gym.wrappers.RecordVideo(env, **video_kwargs) # # # wrap around environment for rsl-rl # # env = RslRlVecEnvWrapper(env, clip_actions=agent_cfg.clip_actions) # # print(f"[INFO]: Loading model checkpoint from: {resume_path}") # # # load previously trained model # # if agent_cfg.class_name == "OnPolicyRunner": # # runner = OnPolicyRunner(env, agent_cfg.to_dict(), log_dir=None, device=agent_cfg.device) # # elif agent_cfg.class_name == "DistillationRunner": # # runner = DistillationRunner(env, agent_cfg.to_dict(), log_dir=None, device=agent_cfg.device) # # else: # # raise ValueError(f"Unsupported runner class: {agent_cfg.class_name}") # # runner.load(resume_path) # # # obtain the trained policy for inference # # policy = runner.get_inference_policy(device=env.unwrapped.device) # # # extract the neural network module # # # we do this in a try-except to maintain backwards compatibility. # # try: # # # version 2.3 onwards # # policy_nn = runner.alg.policy # # except AttributeError: # # # version 2.2 and below # # policy_nn = runner.alg.actor_critic # # # extract the normalizer # # if hasattr(policy_nn, "actor_obs_normalizer"): # # normalizer = policy_nn.actor_obs_normalizer # # elif hasattr(policy_nn, "student_obs_normalizer"): # # normalizer = policy_nn.student_obs_normalizer # # else: # # normalizer = None # # # export policy to onnx/jit # # export_model_dir = os.path.join(os.path.dirname(resume_path), "exported") # # export_policy_as_jit(policy_nn, normalizer=normalizer, path=export_model_dir, filename="policy.pt") # # export_policy_as_onnx(policy_nn, normalizer=normalizer, path=export_model_dir, filename="policy.onnx") # # dt = env.unwrapped.step_dt # # # reset environment # # obs = env.get_observations() # # timestep = 0 # # # simulate environment # # while simulation_app.is_running(): # # start_time = time.time() # # # run everything in inference mode # # with torch.inference_mode(): # # # agent stepping # # actions = policy(obs) # # # env stepping # # obs, _, dones, _ = env.step(actions) # # # reset recurrent states for episodes that have terminated # # policy_nn.reset(dones) # # if args_cli.video: # # timestep += 1 # # # Exit the play loop after recording one video # # if timestep == args_cli.video_length: # # break # # # time delay for real-time evaluation # # sleep_time = dt - (time.time() - start_time) # # if args_cli.real_time and sleep_time > 0: # # time.sleep(sleep_time) # # # close the simulator # # env.close() # # if __name__ == "__main__": # # # run the main function # # main() # # # close sim app # # simulation_app.close()