Files
mindbot/scripts/rsl_rl/play.py
2026-02-02 16:39:33 +08:00

1025 lines
41 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Copyright (c) 2022-2026, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""Script to play a checkpoint if an RL agent from RSL-RL."""
"""Launch Isaac Sim Simulator first."""
import argparse
import sys
import os
import torch
import torchvision
import gymnasium as gym
import time
from datetime import datetime
from isaaclab.app import AppLauncher
# local imports
import cli_args # isort: skip
# add argparse arguments
parser = argparse.ArgumentParser(description="Train an RL agent with RSL-RL.")
parser.add_argument("--video", action="store_true", default=False, help="Record videos during training.")
parser.add_argument("--video_length", type=int, default=2000, help="Length of the recorded video (in steps).")
parser.add_argument(
"--disable_fabric", action="store_true", default=False, help="Disable fabric and use USD I/O operations."
)
parser.add_argument("--num_envs", type=int, default=None, help="Number of environments to simulate.")
parser.add_argument("--task", type=str, default=None, help="Name of the task.")
parser.add_argument(
"--agent", type=str, default="rsl_rl_cfg_entry_point", help="Name of the RL agent configuration entry point."
)
parser.add_argument("--seed", type=int, default=None, help="Seed used for the environment")
parser.add_argument(
"--use_pretrained_checkpoint",
action="store_true",
help="Use the pre-trained checkpoint from Nucleus.",
)
parser.add_argument("--real-time", action="store_true", default=False, help="Run in real-time, if possible.")
# append RSL-RL cli arguments
cli_args.add_rsl_rl_args(parser)
# append AppLauncher cli args
AppLauncher.add_app_launcher_args(parser)
# parse the arguments
args_cli, hydra_args = parser.parse_known_args()
# always enable cameras to record video
if args_cli.video:
args_cli.enable_cameras = True
# clear out sys.argv for Hydra
sys.argv = [sys.argv[0]] + hydra_args
# launch omniverse app
app_launcher = AppLauncher(args_cli)
simulation_app = app_launcher.app
"""Rest everything follows."""
from rsl_rl.runners import DistillationRunner, OnPolicyRunner
from isaaclab.envs import (
DirectMARLEnv,
DirectMARLEnvCfg,
DirectRLEnvCfg,
ManagerBasedRLEnvCfg,
multi_agent_to_single_agent,
)
from isaaclab.utils.assets import retrieve_file_path
from isaaclab.utils.dict import print_dict
from isaaclab_rl.rsl_rl import RslRlBaseRunnerCfg, RslRlVecEnvWrapper, export_policy_as_jit, export_policy_as_onnx
from isaaclab_rl.utils.pretrained_checkpoint import get_published_pretrained_checkpoint
import isaaclab_tasks # noqa: F401
from isaaclab_tasks.utils import get_checkpoint_path
from isaaclab_tasks.utils.hydra import hydra_task_config
import mindbot.tasks # noqa: F401
# ==============================================================================
# [Update] 多相机、多环境录制类
# ==============================================================================
# 1. 确保文件顶部有以下导入
import imageio
import numpy as np
"""use gpu compute to record video"""
# class MultiCameraRecorder:
# def __init__(self, env, camera_names: list[str], env_indices: list[int], output_dir: str, fps: int = 30):
# self.env = env
# self.camera_names = camera_names
# self.env_indices = env_indices
# self.output_dir = output_dir
# self.fps = fps
# self.frames = {cam_name: {env_idx: [] for env_idx in env_indices} for cam_name in camera_names}
# os.makedirs(self.output_dir, exist_ok=True)
# self.cameras = {}
# for name in camera_names:
# if name in self.env.unwrapped.scene.keys():
# self.cameras[name] = self.env.unwrapped.scene[name]
# print(f"[INFO] Camera {name} linked.")
# def record_step(self):
# """保持在 GPU 上克隆数据"""
# for cam_name, camera_obj in self.cameras.items():
# # 获取数据前强制同步一次(防止后端丢失)
# rgb_data = camera_obj.data.output["rgb"]
# for env_idx in self.env_indices:
# # 使用 .clone() 保持在 GPU但要注意显存占用
# self.frames[cam_name][env_idx].append(rgb_data[env_idx].clone())
# def save_videos(self, filename_suffix=""):
# print(f"[INFO] Saving videos from GPU to Disk...")
# for cam_name, env_dict in self.frames.items():
# for env_idx, frame_list in env_dict.items():
# if not frame_list: continue
# # 转换为 torchvision 格式 (T, C, H, W)
# video_tensor = torch.stack(frame_list)
# if video_tensor.shape[-1] == 4: # RGBA -> RGB
# video_tensor = video_tensor[..., :3]
# # 移动到 CPU 并保存
# video_cpu = video_tensor.cpu()
# output_path = os.path.join(self.output_dir, f"{cam_name}_env{env_idx}_{filename_suffix}.mp4")
# # 使用 torchvision 写入 (T, H, W, C)
# torchvision.io.write_video(output_path, video_cpu, fps=self.fps)
# # 【关键】保存后立即释放显存
# del video_tensor
# del video_cpu
# frame_list.clear()
# torch.cuda.empty_cache()
"""use cpu compute to record video"""
# # 2. 修改 MultiCameraRecorder 类
class MultiCameraRecorder:
def __init__(self, env, camera_names: list[str], env_indices: list[int], output_dir: str, fps: int = 30):
self.env = env
self.camera_names = camera_names
self.env_indices = env_indices
self.output_dir = output_dir
self.fps = fps
self.frames = {cam_name: {env_idx: [] for env_idx in env_indices} for cam_name in camera_names}
os.makedirs(self.output_dir, exist_ok=True)
self.cameras = {}
for name in camera_names:
try:
self.cameras[name] = self.env.unwrapped.scene[name]
print(f"[INFO][MultiCameraRecorder] Found camera: {name}")
except KeyError:
print(f"[WARN][MultiCameraRecorder] Camera '{name}' not found!")
def record_step(self):
"""在每个仿真步调用"""
for cam_name, camera_obj in self.cameras.items():
# [关键修改] 获取数据前先确保数据已同步
# 这可以防止读取到正在渲染中的内存导致 access violation
rgb_data = camera_obj.data.output["rgb"]
for env_idx in self.env_indices:
if env_idx >= rgb_data.shape[0]: continue
# 转换为 CPU 上的 numpy这种方式通常比 torchvision 的 tensor 堆叠更稳
frame = rgb_data[env_idx].clone().detach().cpu().numpy()
self.frames[cam_name][env_idx].append(frame)
def save_videos(self, filename_suffix=""):
"""循环结束后调用"""
print(f"[INFO][MultiCameraRecorder] Saving videos...")
for cam_name, env_dict in self.frames.items():
for env_idx, frame_list in env_dict.items():
if not frame_list: continue
print(f" -> Saving {cam_name} (Env {env_idx})...")
# 处理格式并使用 imageio 保存
processed_frames = []
for img in frame_list:
# [0, 1] -> [0, 255]
if img.dtype != np.uint8:
if img.max() <= 1.01: img = (img * 255).astype(np.uint8)
else: img = img.astype(np.uint8)
# 去掉 Alpha 通道
if img.shape[-1] == 4: img = img[:, :, :3]
processed_frames.append(img)
fname = f"{cam_name}_env{env_idx}_{filename_suffix}.mp4"
output_path = os.path.join(self.output_dir, fname)
try:
# 使用 imageio 写入视频
imageio.mimsave(output_path, processed_frames, fps=self.fps)
print(f" Saved: {output_path}")
except Exception as e:
print(f" [ERROR] Failed to save {fname}: {e}")
# class MultiCameraRecorder:
# """
# 用于从 Isaac Lab 环境中录制多个相机的图像数据,并支持多个环境实例。
# """
# def __init__(self, env, camera_names: list[str], env_indices: list[int], output_dir: str, fps: int = 30):
# self.env = env
# self.camera_names = camera_names
# self.env_indices = env_indices # 要录制的环境索引列表,例如 [0, 1]
# self.output_dir = output_dir
# self.fps = fps
# # 数据结构: self.frames[camera_name][env_idx] = [list of tensors]
# self.frames = {
# cam_name: {env_idx: [] for env_idx in env_indices}
# for cam_name in camera_names
# }
# os.makedirs(self.output_dir, exist_ok=True)
# # 获取场景中的相机对象
# self.cameras = {}
# for name in camera_names:
# try:
# # 尝试获取相机句柄
# self.cameras[name] = self.env.unwrapped.scene[name]
# print(f"[INFO][MultiCameraRecorder] Found camera: {name}")
# except KeyError:
# print(f"[WARN][MultiCameraRecorder] Camera '{name}' not found in scene! Skipping.")
# def record_step(self):
# """在每个仿真步调用"""
# for cam_name, camera_obj in self.cameras.items():
# # 获取 RGB 数据 (Shape: [num_envs, H, W, C])
# # 注意:如果数据类型是 RGBA后面处理
# rgb_data = camera_obj.data.output["rgb"]
# for env_idx in self.env_indices:
# # 边界检查
# if env_idx >= rgb_data.shape[0]:
# continue
# # [关键] 立即 clone 并转到 CPU。
# # 如果不转 CPU4个相机 * 长时间录制会迅速耗尽 GPU 显存。
# frame = rgb_data[env_idx].clone().detach().cpu()
# self.frames[cam_name][env_idx].append(frame)
# def save_videos(self, filename_suffix=""):
# """循环结束后调用,保存所有视频"""
# print(f"[INFO][MultiCameraRecorder] Saving videos for {len(self.cameras)} cameras...")
# for cam_name, env_dict in self.frames.items():
# for env_idx, frame_list in env_dict.items():
# if not frame_list:
# continue
# print(f" -> Processing {cam_name} (Env {env_idx}) with {len(frame_list)} frames...")
# # 堆叠 (T, H, W, C)
# video_tensor = torch.stack(frame_list)
# # 去除 Alpha 通道
# if video_tensor.shape[-1] == 4:
# video_tensor = video_tensor[..., :3]
# # (T, H, W, C) -> (T, C, H, W)
# video_tensor = video_tensor.permute(0, 3, 1, 2)
# # 确保是 uint8
# if video_tensor.dtype != torch.uint8:
# if video_tensor.max() <= 1.0:
# video_tensor = (video_tensor * 255).to(torch.uint8)
# else:
# video_tensor = video_tensor.to(torch.uint8)
# # 文件名: left_hand_camera_env0_2024-xx-xx.mp4
# fname = f"{cam_name}_env{env_idx}_{filename_suffix}.mp4"
# output_path = os.path.join(self.output_dir, fname)
# torchvision.io.write_video(output_path, video_tensor, fps=self.fps)
# print(f" Saved: {output_path}")
# ==============================================================================
@hydra_task_config(args_cli.task, args_cli.agent)
def main(env_cfg: ManagerBasedRLEnvCfg | DirectRLEnvCfg | DirectMARLEnvCfg, agent_cfg: RslRlBaseRunnerCfg):
"""Play with RSL-RL agent."""
# grab task name for checkpoint path
task_name = args_cli.task.split(":")[-1]
train_task_name = task_name.replace("-Play", "")
# override configurations with non-hydra CLI arguments
agent_cfg: RslRlBaseRunnerCfg = cli_args.update_rsl_rl_cfg(agent_cfg, args_cli)
env_cfg.scene.num_envs = args_cli.num_envs if args_cli.num_envs is not None else env_cfg.scene.num_envs
# set the environment seed
env_cfg.seed = agent_cfg.seed
env_cfg.sim.device = args_cli.device if args_cli.device is not None else env_cfg.sim.device
# specify directory for logging experiments
log_root_path = os.path.join("logs", "rsl_rl", agent_cfg.experiment_name)
log_root_path = os.path.abspath(log_root_path)
print(f"[INFO] Loading experiment from directory: {log_root_path}")
if args_cli.use_pretrained_checkpoint:
resume_path = get_published_pretrained_checkpoint("rsl_rl", train_task_name)
if not resume_path:
print("[INFO] Unfortunately a pre-trained checkpoint is currently unavailable for this task.")
return
elif args_cli.checkpoint:
resume_path = retrieve_file_path(args_cli.checkpoint)
else:
resume_path = get_checkpoint_path(log_root_path, agent_cfg.load_run, agent_cfg.load_checkpoint)
log_dir = os.path.dirname(resume_path)
# set the log directory for the environment (works for all environment types)
env_cfg.log_dir = log_dir
# create isaac environment
env = gym.make(args_cli.task, cfg=env_cfg, render_mode="rgb_array" if args_cli.video else None)
# convert to single-agent instance if required by the RL algorithm
if isinstance(env.unwrapped, DirectMARLEnv):
env = multi_agent_to_single_agent(env)
# wrap for video recording (standard Gym recording - Viewport)
if args_cli.video:
video_kwargs = {
"video_folder": os.path.join(log_dir, "videos", "play"),
"step_trigger": lambda step: step == 0,
"video_length": args_cli.video_length,
"disable_logger": True,
}
print("[INFO] Recording videos during training.")
print_dict(video_kwargs, nesting=4)
env = gym.wrappers.RecordVideo(env, **video_kwargs)
# wrap around environment for rsl-rl
env = RslRlVecEnvWrapper(env, clip_actions=agent_cfg.clip_actions)
print(f"[INFO]: Loading model checkpoint from: {resume_path}")
# load previously trained model
if agent_cfg.class_name == "OnPolicyRunner":
runner = OnPolicyRunner(env, agent_cfg.to_dict(), log_dir=None, device=agent_cfg.device)
elif agent_cfg.class_name == "DistillationRunner":
runner = DistillationRunner(env, agent_cfg.to_dict(), log_dir=None, device=agent_cfg.device)
else:
raise ValueError(f"Unsupported runner class: {agent_cfg.class_name}")
runner.load(resume_path)
# obtain the trained policy for inference
policy = runner.get_inference_policy(device=env.unwrapped.device)
# extract the neural network module
try:
policy_nn = runner.alg.policy
except AttributeError:
policy_nn = runner.alg.actor_critic
# extract the normalizer
if hasattr(policy_nn, "actor_obs_normalizer"):
normalizer = policy_nn.actor_obs_normalizer
elif hasattr(policy_nn, "student_obs_normalizer"):
normalizer = policy_nn.student_obs_normalizer
else:
normalizer = None
# export policy to onnx/jit
export_model_dir = os.path.join(os.path.dirname(resume_path), "exported")
export_policy_as_jit(policy_nn, normalizer=normalizer, path=export_model_dir, filename="policy.pt")
export_policy_as_onnx(policy_nn, normalizer=normalizer, path=export_model_dir, filename="policy.onnx")
dt = env.unwrapped.step_dt
# ==========================================================================
# 初始化多相机录制器
# ==========================================================================
recorder = None
if args_cli.video:
# 1. 定义要录制的相机列表(需与 mindbot_env_cfg.py 中的命名一致)
target_cameras = [
"left_hand_camera",
"right_hand_camera",
"head_camera",
"chest_camera"
]
# 2. 定义要录制的环境索引 (env_nums)
# 默认只录制第 0 个环境,如果要录制前 2 个,改为 [0, 1]
# 注意:不要录制太多环境,否则写入视频时非常慢且占内存
envs_to_record = [0,1,2,3]
# 如果你想录制所有环境不推荐除非num_envs很小
# envs_to_record = list(range(env.unwrapped.num_envs))
save_dir = os.path.join(log_dir, "robot_camera_recordings")
recorder = MultiCameraRecorder(
env=env,
camera_names=target_cameras,
env_indices=envs_to_record,
output_dir=save_dir,
fps=int(1/dt)
)
# reset environment
obs = env.get_observations()
timestep = 0
print("[INFO] Starting simulation loop...")
# simulate environment
while simulation_app.is_running():
start_time = time.time()
# run everything in inference mode
with torch.inference_mode():
# agent stepping
actions = policy(obs)
# env stepping
obs, _, dones, _ = env.step(actions)
# ==================================================================
# 每一帧录制数据
# ==================================================================
if recorder:
recorder.record_step()
# reset recurrent states for episodes that have terminated
policy_nn.reset(dones)
if args_cli.video:
timestep += 1
# Exit the play loop after recording one video
if timestep == args_cli.video_length:
# ==============================================================
# 循环结束时保存所有视频
# ==============================================================
if recorder:
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
recorder.save_videos(filename_suffix=timestamp)
break
# time delay for real-time evaluation
sleep_time = dt - (time.time() - start_time)
if args_cli.real_time and sleep_time > 0:
time.sleep(sleep_time)
# close the simulator
env.close()
if __name__ == "__main__":
# run the main function
main()
# close sim app
simulation_app.close()
# # Copyright (c) 2022-2026, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
# # All rights reserved.
# #
# # SPDX-License-Identifier: BSD-3-Clause
# """Script to play a checkpoint if an RL agent from RSL-RL."""
# """Launch Isaac Sim Simulator first."""
# import argparse
# import sys
# import os
# import torch
# import torchvision
# import imageio
# import numpy as np
# import gymnasium as gym
# import time
# from datetime import datetime
# from isaaclab.app import AppLauncher
# # local imports
# import cli_args # isort: skip
# # add argparse arguments
# parser = argparse.ArgumentParser(description="Train an RL agent with RSL-RL.")
# parser.add_argument("--video", action="store_true", default=False, help="Record videos during training.")
# parser.add_argument("--video_length", type=int, default=2000, help="Length of the recorded video (in steps).")
# parser.add_argument(
# "--disable_fabric", action="store_true", default=False, help="Disable fabric and use USD I/O operations."
# )
# parser.add_argument("--num_envs", type=int, default=None, help="Number of environments to simulate.")
# parser.add_argument("--task", type=str, default=None, help="Name of the task.")
# parser.add_argument(
# "--agent", type=str, default="rsl_rl_cfg_entry_point", help="Name of the RL agent configuration entry point."
# )
# parser.add_argument("--seed", type=int, default=None, help="Seed used for the environment")
# parser.add_argument(
# "--use_pretrained_checkpoint",
# action="store_true",
# help="Use the pre-trained checkpoint from Nucleus.",
# )
# parser.add_argument("--real-time", action="store_true", default=False, help="Run in real-time, if possible.")
# # append RSL-RL cli arguments
# cli_args.add_rsl_rl_args(parser)
# # append AppLauncher cli args
# AppLauncher.add_app_launcher_args(parser)
# # parse the arguments
# args_cli, hydra_args = parser.parse_known_args()
# # always enable cameras to record video
# if args_cli.video:
# args_cli.enable_cameras = True
# # clear out sys.argv for Hydra
# sys.argv = [sys.argv[0]] + hydra_args
# # launch omniverse app
# app_launcher = AppLauncher(args_cli)
# simulation_app = app_launcher.app
# """Rest everything follows."""
# from rsl_rl.runners import DistillationRunner, OnPolicyRunner
# from isaaclab.envs import (
# DirectMARLEnv,
# DirectMARLEnvCfg,
# DirectRLEnvCfg,
# ManagerBasedRLEnvCfg,
# multi_agent_to_single_agent,
# )
# from isaaclab.utils.assets import retrieve_file_path
# from isaaclab.utils.dict import print_dict
# from isaaclab_rl.rsl_rl import RslRlBaseRunnerCfg, RslRlVecEnvWrapper, export_policy_as_jit, export_policy_as_onnx
# from isaaclab_rl.utils.pretrained_checkpoint import get_published_pretrained_checkpoint
# import isaaclab_tasks # noqa: F401
# from isaaclab_tasks.utils import get_checkpoint_path
# from isaaclab_tasks.utils.hydra import hydra_task_config
# import mindbot.tasks # noqa: F401
# # ==============================================================================
# # 新增:自定义相机录制类
# # ==============================================================================
# class CameraRecorder:
# """
# 用于从 Isaac Lab 环境中录制指定相机的图像数据并保存为视频。
# """
# def __init__(self, env, camera_name: str, output_dir: str, fps: int = 30):
# self.env = env
# self.camera_name = camera_name
# self.output_dir = output_dir
# self.fps = fps
# self.frames = [] # 用于存储每一帧的 Tensor
# # 确保输出目录存在
# os.makedirs(self.output_dir, exist_ok=True)
# # 尝试从场景中获取相机对象
# # env.unwrapped 访问原始的 ManagerBasedRLEnv
# try:
# self.camera = self.env.unwrapped.scene[camera_name]
# print(f"[INFO][CameraRecorder] Successfully found camera: {camera_name}")
# except KeyError:
# print(f"[ERROR][CameraRecorder] Camera '{camera_name}' not found in scene entities!")
# print(f"Available entities: {self.env.unwrapped.scene.keys()}")
# raise
# def record_step(self):
# """在每个仿真步调用,获取图像数据"""
# # 获取 RGB 数据。注意Isaac Lab 的输出通常是 (num_envs, H, W, C)
# # 我们这里只取第一个环境 (env_index=0)
# # output["rgb"] 可能是 RGBA 或 RGB
# rgb_tensor = self.camera.data.output["rgb"][0]
# # 如果数据在 GPU 上,克隆一份并 detach防止被后续步骤覆盖
# # 此时先不转 CPU以免拖慢仿真循环最后保存时再转
# self.frames.append(rgb_tensor.clone().detach())
# def save_video(self, filename="robot_camera_video.mp4"):
# """循环结束后调用,将内存中的帧保存为视频"""
# if not self.frames:
# print("[WARN][CameraRecorder] No frames recorded.")
# return
# print(f"[INFO][CameraRecorder] Saving {len(self.frames)} frames to video...")
# # 将 tensor 转换为 numpy 格式
# video_frames = []
# for frame in self.frames:
# # 转换为 CPU 上的 numpy 数组 (H, W, C)
# img = frame.cpu().numpy()
# # 处理归一化数据 [0, 1] -> [0, 255]
# if img.dtype != np.uint8:
# if img.max() <= 1.01:
# img = (img * 255).astype(np.uint8)
# else:
# img = img.astype(np.uint8)
# # 处理 RGBA -> RGB
# if img.shape[-1] == 4:
# img = img[:, :, :3]
# video_frames.append(img)
# output_path = os.path.join(self.output_dir, filename)
# # 使用 imageio 写入视频,这种方式在 Windows 上更稳定
# try:
# imageio.mimsave(output_path, video_frames, fps=self.fps)
# print(f"[INFO][CameraRecorder] Video saved to: {output_path}")
# except Exception as e:
# print(f"[ERROR][CameraRecorder] Failed to save video using imageio: {e}")
# # 如果 imageio 也失败,尝试最后的备选方案
# print("[INFO] Attempting to save frames as images instead...")
# # ... (可选:保存为一系列图片)
# # def save_video(self, filename="robot_camera_video.mp4"):
# # """循环结束后调用,将内存中的帧保存为视频"""
# # if not self.frames:
# # print("[WARN][CameraRecorder] No frames recorded.")
# # return
# # print(f"[INFO][CameraRecorder] Saving {len(self.frames)} frames to video...")
# # # 堆叠帧 -> (T, H, W, C)
# # video_tensor = torch.stack(self.frames)
# # # 处理 Alpha 通道:如果是 RGBA (4通道)只取前3个 RGB
# # if video_tensor.shape[-1] == 4:
# # video_tensor = video_tensor[..., :3]
# # # 转换为 (T, C, H, W) 以符合 torchvision 的输入要求
# # # 原始: (T, H, W, C) -> Permute -> (T, C, H, W)
# # video_tensor = video_tensor.permute(0, 3, 1, 2)
# # # 确保是 uint8 类型 [0, 255]
# # if video_tensor.dtype != torch.uint8:
# # # 如果是 float [0,1],则 * 255 并转 uint8
# # if video_tensor.max() <= 1.0:
# # video_tensor = (video_tensor * 255).to(torch.uint8)
# # else:
# # video_tensor = video_tensor.to(torch.uint8)
# # # 移动到 CPU
# # video_tensor = video_tensor.cpu()
# # output_path = os.path.join(self.output_dir, filename)
# # # 使用 torchvision 保存视频
# # torchvision.io.write_video(output_path, video_tensor, fps=self.fps)
# # print(f"[INFO][CameraRecorder] Video saved to: {output_path}")
# # ==============================================================================
# @hydra_task_config(args_cli.task, args_cli.agent)
# def main(env_cfg: ManagerBasedRLEnvCfg | DirectRLEnvCfg | DirectMARLEnvCfg, agent_cfg: RslRlBaseRunnerCfg):
# """Play with RSL-RL agent."""
# # grab task name for checkpoint path
# task_name = args_cli.task.split(":")[-1]
# train_task_name = task_name.replace("-Play", "")
# # override configurations with non-hydra CLI arguments
# agent_cfg: RslRlBaseRunnerCfg = cli_args.update_rsl_rl_cfg(agent_cfg, args_cli)
# env_cfg.scene.num_envs = args_cli.num_envs if args_cli.num_envs is not None else env_cfg.scene.num_envs
# # set the environment seed
# env_cfg.seed = agent_cfg.seed
# env_cfg.sim.device = args_cli.device if args_cli.device is not None else env_cfg.sim.device
# # specify directory for logging experiments
# log_root_path = os.path.join("logs", "rsl_rl", agent_cfg.experiment_name)
# log_root_path = os.path.abspath(log_root_path)
# print(f"[INFO] Loading experiment from directory: {log_root_path}")
# if args_cli.use_pretrained_checkpoint:
# resume_path = get_published_pretrained_checkpoint("rsl_rl", train_task_name)
# if not resume_path:
# print("[INFO] Unfortunately a pre-trained checkpoint is currently unavailable for this task.")
# return
# elif args_cli.checkpoint:
# resume_path = retrieve_file_path(args_cli.checkpoint)
# else:
# resume_path = get_checkpoint_path(log_root_path, agent_cfg.load_run, agent_cfg.load_checkpoint)
# log_dir = os.path.dirname(resume_path)
# # set the log directory for the environment (works for all environment types)
# env_cfg.log_dir = log_dir
# # create isaac environment
# env = gym.make(args_cli.task, cfg=env_cfg, render_mode="rgb_array" if args_cli.video else None)
# # convert to single-agent instance if required by the RL algorithm
# if isinstance(env.unwrapped, DirectMARLEnv):
# env = multi_agent_to_single_agent(env)
# # wrap for video recording (standard Gym recording)
# # 注意:这只是录制 Viewport 的画面,不是你机器人相机的画面
# if args_cli.video:
# video_kwargs = {
# "video_folder": os.path.join(log_dir, "videos", "play"),
# "step_trigger": lambda step: step == 0,
# "video_length": args_cli.video_length,
# "disable_logger": True,
# }
# print("[INFO] Recording videos during training.")
# print_dict(video_kwargs, nesting=4)
# env = gym.wrappers.RecordVideo(env, **video_kwargs)
# # wrap around environment for rsl-rl
# env = RslRlVecEnvWrapper(env, clip_actions=agent_cfg.clip_actions)
# print(f"[INFO]: Loading model checkpoint from: {resume_path}")
# # load previously trained model
# if agent_cfg.class_name == "OnPolicyRunner":
# runner = OnPolicyRunner(env, agent_cfg.to_dict(), log_dir=None, device=agent_cfg.device)
# elif agent_cfg.class_name == "DistillationRunner":
# runner = DistillationRunner(env, agent_cfg.to_dict(), log_dir=None, device=agent_cfg.device)
# else:
# raise ValueError(f"Unsupported runner class: {agent_cfg.class_name}")
# runner.load(resume_path)
# # obtain the trained policy for inference
# policy = runner.get_inference_policy(device=env.unwrapped.device)
# # extract the neural network module
# try:
# policy_nn = runner.alg.policy
# except AttributeError:
# policy_nn = runner.alg.actor_critic
# # extract the normalizer
# if hasattr(policy_nn, "actor_obs_normalizer"):
# normalizer = policy_nn.actor_obs_normalizer
# elif hasattr(policy_nn, "student_obs_normalizer"):
# normalizer = policy_nn.student_obs_normalizer
# else:
# normalizer = None
# # export policy to onnx/jit
# export_model_dir = os.path.join(os.path.dirname(resume_path), "exported")
# export_policy_as_jit(policy_nn, normalizer=normalizer, path=export_model_dir, filename="policy.pt")
# export_policy_as_onnx(policy_nn, normalizer=normalizer, path=export_model_dir, filename="policy.onnx")
# dt = env.unwrapped.step_dt
# # ==========================================================================
# # 初始化自定义相机录制器
# # ==========================================================================
# recorder = None
# if args_cli.video:
# # 这里的 output_dir 设为日志目录下的 hand_camera_videos
# save_dir = os.path.join(log_dir, "hand_camera_videos")
# # 这里的 "left_hand_camera" 必须对应 mindbot_env_cfg.py 中定义的名称
# recorder = CameraRecorder(env, "left_hand_camera", save_dir, fps=int(1/dt))
# # reset environment
# obs = env.get_observations()
# timestep = 0
# print("[INFO] Starting simulation loop...")
# # simulate environment
# while simulation_app.is_running():
# start_time = time.time()
# # run everything in inference mode
# with torch.inference_mode():
# # agent stepping
# actions = policy(obs)
# # env stepping
# obs, _, dones, _ = env.step(actions)
# # ==================================================================
# # 每一帧录制数据
# # ==================================================================
# if recorder:
# recorder.record_step()
# # reset recurrent states for episodes that have terminated
# policy_nn.reset(dones)
# if args_cli.video:
# timestep += 1
# # Exit the play loop after recording one video
# if timestep == args_cli.video_length:
# # ==============================================================
# # 循环结束时保存视频
# # ==============================================================
# if recorder:
# timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
# recorder.save_video(filename=f"hand_cam_{timestamp}.mp4")
# break
# # time delay for real-time evaluation
# sleep_time = dt - (time.time() - start_time)
# if args_cli.real_time and sleep_time > 0:
# time.sleep(sleep_time)
# # close the simulator
# env.close()
# if __name__ == "__main__":
# # run the main function
# main()
# # close sim app
# simulation_app.close()
# # # Copyright (c) 2022-2026, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
# # # All rights reserved.
# # #
# # # SPDX-License-Identifier: BSD-3-Clause
# # """Script to play a checkpoint if an RL agent from RSL-RL."""
# # """Launch Isaac Sim Simulator first."""
# # import argparse
# # import sys
# # from isaaclab.app import AppLauncher
# # # local imports
# # import cli_args # isort: skip
# # # add argparse arguments
# # parser = argparse.ArgumentParser(description="Train an RL agent with RSL-RL.")
# # parser.add_argument("--video", action="store_true", default=False, help="Record videos during training.")
# # parser.add_argument("--video_length", type=int, default=200, help="Length of the recorded video (in steps).")
# # parser.add_argument(
# # "--disable_fabric", action="store_true", default=False, help="Disable fabric and use USD I/O operations."
# # )
# # parser.add_argument("--num_envs", type=int, default=None, help="Number of environments to simulate.")
# # parser.add_argument("--task", type=str, default=None, help="Name of the task.")
# # parser.add_argument(
# # "--agent", type=str, default="rsl_rl_cfg_entry_point", help="Name of the RL agent configuration entry point."
# # )
# # parser.add_argument("--seed", type=int, default=None, help="Seed used for the environment")
# # parser.add_argument(
# # "--use_pretrained_checkpoint",
# # action="store_true",
# # help="Use the pre-trained checkpoint from Nucleus.",
# # )
# # parser.add_argument("--real-time", action="store_true", default=False, help="Run in real-time, if possible.")
# # # append RSL-RL cli arguments
# # cli_args.add_rsl_rl_args(parser)
# # # append AppLauncher cli args
# # AppLauncher.add_app_launcher_args(parser)
# # # parse the arguments
# # args_cli, hydra_args = parser.parse_known_args()
# # # always enable cameras to record video
# # if args_cli.video:
# # args_cli.enable_cameras = True
# # # clear out sys.argv for Hydra
# # sys.argv = [sys.argv[0]] + hydra_args
# # # launch omniverse app
# # app_launcher = AppLauncher(args_cli)
# # simulation_app = app_launcher.app
# # """Rest everything follows."""
# # import gymnasium as gym
# # import os
# # import time
# # import torch
# # from rsl_rl.runners import DistillationRunner, OnPolicyRunner
# # from isaaclab.envs import (
# # DirectMARLEnv,
# # DirectMARLEnvCfg,
# # DirectRLEnvCfg,
# # ManagerBasedRLEnvCfg,
# # multi_agent_to_single_agent,
# # )
# # from isaaclab.utils.assets import retrieve_file_path
# # from isaaclab.utils.dict import print_dict
# # from isaaclab_rl.rsl_rl import RslRlBaseRunnerCfg, RslRlVecEnvWrapper, export_policy_as_jit, export_policy_as_onnx
# # from isaaclab_rl.utils.pretrained_checkpoint import get_published_pretrained_checkpoint
# # import isaaclab_tasks # noqa: F401
# # from isaaclab_tasks.utils import get_checkpoint_path
# # from isaaclab_tasks.utils.hydra import hydra_task_config
# # import mindbot.tasks # noqa: F401
# # @hydra_task_config(args_cli.task, args_cli.agent)
# # def main(env_cfg: ManagerBasedRLEnvCfg | DirectRLEnvCfg | DirectMARLEnvCfg, agent_cfg: RslRlBaseRunnerCfg):
# # """Play with RSL-RL agent."""
# # # grab task name for checkpoint path
# # task_name = args_cli.task.split(":")[-1]
# # train_task_name = task_name.replace("-Play", "")
# # # override configurations with non-hydra CLI arguments
# # agent_cfg: RslRlBaseRunnerCfg = cli_args.update_rsl_rl_cfg(agent_cfg, args_cli)
# # env_cfg.scene.num_envs = args_cli.num_envs if args_cli.num_envs is not None else env_cfg.scene.num_envs
# # # set the environment seed
# # # note: certain randomizations occur in the environment initialization so we set the seed here
# # env_cfg.seed = agent_cfg.seed
# # env_cfg.sim.device = args_cli.device if args_cli.device is not None else env_cfg.sim.device
# # # specify directory for logging experiments
# # log_root_path = os.path.join("logs", "rsl_rl", agent_cfg.experiment_name)
# # log_root_path = os.path.abspath(log_root_path)
# # print(f"[INFO] Loading experiment from directory: {log_root_path}")
# # if args_cli.use_pretrained_checkpoint:
# # resume_path = get_published_pretrained_checkpoint("rsl_rl", train_task_name)
# # if not resume_path:
# # print("[INFO] Unfortunately a pre-trained checkpoint is currently unavailable for this task.")
# # return
# # elif args_cli.checkpoint:
# # resume_path = retrieve_file_path(args_cli.checkpoint)
# # else:
# # resume_path = get_checkpoint_path(log_root_path, agent_cfg.load_run, agent_cfg.load_checkpoint)
# # log_dir = os.path.dirname(resume_path)
# # # set the log directory for the environment (works for all environment types)
# # env_cfg.log_dir = log_dir
# # # create isaac environment
# # env = gym.make(args_cli.task, cfg=env_cfg, render_mode="rgb_array" if args_cli.video else None)
# # # convert to single-agent instance if required by the RL algorithm
# # if isinstance(env.unwrapped, DirectMARLEnv):
# # env = multi_agent_to_single_agent(env)
# # # wrap for video recording
# # if args_cli.video:
# # video_kwargs = {
# # "video_folder": os.path.join(log_dir, "videos", "play"),
# # "step_trigger": lambda step: step == 0,
# # "video_length": args_cli.video_length,
# # "disable_logger": True,
# # }
# # print("[INFO] Recording videos during training.")
# # print_dict(video_kwargs, nesting=4)
# # env = gym.wrappers.RecordVideo(env, **video_kwargs)
# # # wrap around environment for rsl-rl
# # env = RslRlVecEnvWrapper(env, clip_actions=agent_cfg.clip_actions)
# # print(f"[INFO]: Loading model checkpoint from: {resume_path}")
# # # load previously trained model
# # if agent_cfg.class_name == "OnPolicyRunner":
# # runner = OnPolicyRunner(env, agent_cfg.to_dict(), log_dir=None, device=agent_cfg.device)
# # elif agent_cfg.class_name == "DistillationRunner":
# # runner = DistillationRunner(env, agent_cfg.to_dict(), log_dir=None, device=agent_cfg.device)
# # else:
# # raise ValueError(f"Unsupported runner class: {agent_cfg.class_name}")
# # runner.load(resume_path)
# # # obtain the trained policy for inference
# # policy = runner.get_inference_policy(device=env.unwrapped.device)
# # # extract the neural network module
# # # we do this in a try-except to maintain backwards compatibility.
# # try:
# # # version 2.3 onwards
# # policy_nn = runner.alg.policy
# # except AttributeError:
# # # version 2.2 and below
# # policy_nn = runner.alg.actor_critic
# # # extract the normalizer
# # if hasattr(policy_nn, "actor_obs_normalizer"):
# # normalizer = policy_nn.actor_obs_normalizer
# # elif hasattr(policy_nn, "student_obs_normalizer"):
# # normalizer = policy_nn.student_obs_normalizer
# # else:
# # normalizer = None
# # # export policy to onnx/jit
# # export_model_dir = os.path.join(os.path.dirname(resume_path), "exported")
# # export_policy_as_jit(policy_nn, normalizer=normalizer, path=export_model_dir, filename="policy.pt")
# # export_policy_as_onnx(policy_nn, normalizer=normalizer, path=export_model_dir, filename="policy.onnx")
# # dt = env.unwrapped.step_dt
# # # reset environment
# # obs = env.get_observations()
# # timestep = 0
# # # simulate environment
# # while simulation_app.is_running():
# # start_time = time.time()
# # # run everything in inference mode
# # with torch.inference_mode():
# # # agent stepping
# # actions = policy(obs)
# # # env stepping
# # obs, _, dones, _ = env.step(actions)
# # # reset recurrent states for episodes that have terminated
# # policy_nn.reset(dones)
# # if args_cli.video:
# # timestep += 1
# # # Exit the play loop after recording one video
# # if timestep == args_cli.video_length:
# # break
# # # time delay for real-time evaluation
# # sleep_time = dt - (time.time() - start_time)
# # if args_cli.real_time and sleep_time > 0:
# # time.sleep(sleep_time)
# # # close the simulator
# # env.close()
# # if __name__ == "__main__":
# # # run the main function
# # main()
# # # close sim app
# # simulation_app.close()