删除.venv

重构为RL和IL两部分内容。
This commit is contained in:
2026-03-03 22:26:55 +08:00
parent 9325184f4d
commit b14448240d
475 changed files with 2876 additions and 87711 deletions

View File

@@ -0,0 +1,101 @@
# Copyright (c) 2022-2026, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""Script to an environment with random action agent."""
"""Launch Isaac Sim Simulator first."""
import argparse
import os
from isaaclab.app import AppLauncher
# add argparse arguments
parser = argparse.ArgumentParser(description="Random agent for Isaac Lab environments.")
parser.add_argument("--task", type=str, default=None, help="Name of the task.")
parser.add_argument("--output_dir", type=str, default=None, help="Path to the output directory.")
# append AppLauncher cli args
AppLauncher.add_app_launcher_args(parser)
# parse the arguments
args_cli = parser.parse_args()
args_cli.headless = True
# launch omniverse app
app_launcher = AppLauncher(args_cli)
simulation_app = app_launcher.app
"""Rest everything follows."""
import gymnasium as gym
import torch
import isaaclab_tasks # noqa: F401
from isaaclab_tasks.utils import parse_env_cfg
# PLACEHOLDER: Extension template (do not remove this comment)
def main():
"""Random actions agent with Isaac Lab environment."""
# create environment configuration
env_cfg = parse_env_cfg(args_cli.task, device=args_cli.device, num_envs=1, use_fabric=True)
# create environment
env = gym.make(args_cli.task, cfg=env_cfg)
# print info (this is vectorized environment)
print(f"[INFO]: Gym observation space: {env.observation_space}")
print(f"[INFO]: Gym action space: {env.action_space}")
# reset environment
env.reset()
outs = env.unwrapped.get_IO_descriptors
out_observations = outs["observations"]
out_actions = outs["actions"]
out_articulations = outs["articulations"]
out_scene = outs["scene"]
# Make a yaml file with the output
import yaml
name = args_cli.task.lower().replace("-", "_")
name = name.replace(" ", "_")
if not os.path.exists(args_cli.output_dir):
os.makedirs(args_cli.output_dir)
with open(os.path.join(args_cli.output_dir, f"{name}_IO_descriptors.yaml"), "w") as f:
print(f"[INFO]: Exporting IO descriptors to {os.path.join(args_cli.output_dir, f'{name}_IO_descriptors.yaml')}")
yaml.safe_dump(outs, f)
for k in out_actions:
print(f"--- Action term: {k['name']} ---")
k.pop("name")
for k1, v1 in k.items():
print(f"{k1}: {v1}")
for obs_group_name, obs_group in out_observations.items():
print(f"--- Obs group: {obs_group_name} ---")
for k in obs_group:
print(f"--- Obs term: {k['name']} ---")
k.pop("name")
for k1, v1 in k.items():
print(f"{k1}: {v1}")
for articulation_name, articulation_data in out_articulations.items():
print(f"--- Articulation: {articulation_name} ---")
for k1, v1 in articulation_data.items():
print(f"{k1}: {v1}")
for k1, v1 in out_scene.items():
print(f"{k1}: {v1}")
env.step(torch.zeros(env.action_space.shape, device=env.unwrapped.device))
env.close()
if __name__ == "__main__":
# run the main function
main()
# close sim app
simulation_app.close()

View File

@@ -0,0 +1,72 @@
# Copyright (c) 2022-2026, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""
Script to print all the available environments in Isaac Lab.
The script iterates over all registered environments and stores the details in a table.
It prints the name of the environment, the entry point and the config file.
All the environments are registered in the `isaaclab_tasks` extension. They start
with `Isaac` in their name.
"""
"""Launch Isaac Sim Simulator first."""
import argparse
from isaaclab.app import AppLauncher
# add argparse arguments
parser = argparse.ArgumentParser(description="List Isaac Lab environments.")
parser.add_argument("--keyword", type=str, default=None, help="Keyword to filter environments.")
# parse the arguments
args_cli = parser.parse_args()
# launch omniverse app
app_launcher = AppLauncher(headless=True)
simulation_app = app_launcher.app
"""Rest everything follows."""
import gymnasium as gym
from prettytable import PrettyTable
import isaaclab_tasks # noqa: F401
def main():
"""Print all environments registered in `isaaclab_tasks` extension."""
# print all the available environments
table = PrettyTable(["S. No.", "Task Name", "Entry Point", "Config"])
table.title = "Available Environments in Isaac Lab"
# set alignment of table columns
table.align["Task Name"] = "l"
table.align["Entry Point"] = "l"
table.align["Config"] = "l"
# count of environments
index = 0
# acquire all Isaac environments names
for task_spec in gym.registry.values():
if "Isaac" in task_spec.id and (args_cli.keyword is None or args_cli.keyword in task_spec.id):
# add details to table
table.add_row([index + 1, task_spec.id, task_spec.entry_point, task_spec.kwargs["env_cfg_entry_point"]])
# increment count
index += 1
print(table)
if __name__ == "__main__":
try:
# run the main function
main()
except Exception as e:
raise e
finally:
# close the app
simulation_app.close()

View File

@@ -0,0 +1,72 @@
# Copyright (c) 2022-2026, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""Script to an environment with random action agent."""
"""Launch Isaac Sim Simulator first."""
import argparse
from isaaclab.app import AppLauncher
# add argparse arguments
parser = argparse.ArgumentParser(description="Random agent for Isaac Lab environments.")
parser.add_argument(
"--disable_fabric", action="store_true", default=False, help="Disable fabric and use USD I/O operations."
)
parser.add_argument("--num_envs", type=int, default=None, help="Number of environments to simulate.")
parser.add_argument("--task", type=str, default=None, help="Name of the task.")
# append AppLauncher cli args
AppLauncher.add_app_launcher_args(parser)
# parse the arguments
args_cli = parser.parse_args()
# launch omniverse app
app_launcher = AppLauncher(args_cli)
simulation_app = app_launcher.app
"""Rest everything follows."""
import gymnasium as gym
import torch
import isaaclab_tasks # noqa: F401
from isaaclab_tasks.utils import parse_env_cfg
# PLACEHOLDER: Extension template (do not remove this comment)
def main():
"""Random actions agent with Isaac Lab environment."""
# create environment configuration
env_cfg = parse_env_cfg(
args_cli.task, device=args_cli.device, num_envs=args_cli.num_envs, use_fabric=not args_cli.disable_fabric
)
# create environment
env = gym.make(args_cli.task, cfg=env_cfg)
# print info (this is vectorized environment)
print(f"[INFO]: Gym observation space: {env.observation_space}")
print(f"[INFO]: Gym action space: {env.action_space}")
# reset environment
env.reset()
# simulate environment
while simulation_app.is_running():
# run everything in inference mode
with torch.inference_mode():
# sample actions from -1 to 1
actions = 2 * torch.rand(env.action_space.shape, device=env.unwrapped.device) - 1
# apply actions
env.step(actions)
# close the simulator
env.close()
if __name__ == "__main__":
# run the main function
main()
# close sim app
simulation_app.close()

View File

@@ -0,0 +1,319 @@
# Copyright (c) 2022-2026, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""
Script to run an environment with a pick and lift state machine.
The state machine is implemented in the kernel function `infer_state_machine`.
It uses the `warp` library to run the state machine in parallel on the GPU.
.. code-block:: bash
./isaaclab.sh -p scripts/environments/state_machine/lift_cube_sm.py --num_envs 32
"""
"""Launch Omniverse Toolkit first."""
import argparse
from isaaclab.app import AppLauncher
# add argparse arguments
parser = argparse.ArgumentParser(description="Pick and lift state machine for lift environments.")
parser.add_argument(
"--disable_fabric", action="store_true", default=False, help="Disable fabric and use USD I/O operations."
)
parser.add_argument("--num_envs", type=int, default=None, help="Number of environments to simulate.")
# append AppLauncher cli args
AppLauncher.add_app_launcher_args(parser)
# parse the arguments
args_cli = parser.parse_args()
# launch omniverse app
app_launcher = AppLauncher(headless=args_cli.headless)
simulation_app = app_launcher.app
"""Rest everything else."""
from collections.abc import Sequence
import gymnasium as gym
import torch
import warp as wp
from isaaclab.assets.rigid_object.rigid_object_data import RigidObjectData
import isaaclab_tasks # noqa: F401
from isaaclab_tasks.manager_based.manipulation.lift.lift_env_cfg import LiftEnvCfg
from isaaclab_tasks.utils.parse_cfg import parse_env_cfg
# initialize warp
wp.init()
class GripperState:
"""States for the gripper."""
OPEN = wp.constant(1.0)
CLOSE = wp.constant(-1.0)
class PickSmState:
"""States for the pick state machine."""
REST = wp.constant(0)
APPROACH_ABOVE_OBJECT = wp.constant(1)
APPROACH_OBJECT = wp.constant(2)
GRASP_OBJECT = wp.constant(3)
LIFT_OBJECT = wp.constant(4)
class PickSmWaitTime:
"""Additional wait times (in s) for states for before switching."""
REST = wp.constant(0.2)
APPROACH_ABOVE_OBJECT = wp.constant(0.5)
APPROACH_OBJECT = wp.constant(0.6)
GRASP_OBJECT = wp.constant(0.3)
LIFT_OBJECT = wp.constant(1.0)
@wp.func
def distance_below_threshold(current_pos: wp.vec3, desired_pos: wp.vec3, threshold: float) -> bool:
return wp.length(current_pos - desired_pos) < threshold
@wp.kernel
def infer_state_machine(
dt: wp.array(dtype=float),
sm_state: wp.array(dtype=int),
sm_wait_time: wp.array(dtype=float),
ee_pose: wp.array(dtype=wp.transform),
object_pose: wp.array(dtype=wp.transform),
des_object_pose: wp.array(dtype=wp.transform),
des_ee_pose: wp.array(dtype=wp.transform),
gripper_state: wp.array(dtype=float),
offset: wp.array(dtype=wp.transform),
position_threshold: float,
):
# retrieve thread id
tid = wp.tid()
# retrieve state machine state
state = sm_state[tid]
# decide next state
if state == PickSmState.REST:
des_ee_pose[tid] = ee_pose[tid]
gripper_state[tid] = GripperState.OPEN
# wait for a while
if sm_wait_time[tid] >= PickSmWaitTime.REST:
# move to next state and reset wait time
sm_state[tid] = PickSmState.APPROACH_ABOVE_OBJECT
sm_wait_time[tid] = 0.0
elif state == PickSmState.APPROACH_ABOVE_OBJECT:
des_ee_pose[tid] = wp.transform_multiply(offset[tid], object_pose[tid])
gripper_state[tid] = GripperState.OPEN
if distance_below_threshold(
wp.transform_get_translation(ee_pose[tid]),
wp.transform_get_translation(des_ee_pose[tid]),
position_threshold,
):
# wait for a while
if sm_wait_time[tid] >= PickSmWaitTime.APPROACH_OBJECT:
# move to next state and reset wait time
sm_state[tid] = PickSmState.APPROACH_OBJECT
sm_wait_time[tid] = 0.0
elif state == PickSmState.APPROACH_OBJECT:
des_ee_pose[tid] = object_pose[tid]
gripper_state[tid] = GripperState.OPEN
if distance_below_threshold(
wp.transform_get_translation(ee_pose[tid]),
wp.transform_get_translation(des_ee_pose[tid]),
position_threshold,
):
if sm_wait_time[tid] >= PickSmWaitTime.APPROACH_OBJECT:
# move to next state and reset wait time
sm_state[tid] = PickSmState.GRASP_OBJECT
sm_wait_time[tid] = 0.0
elif state == PickSmState.GRASP_OBJECT:
des_ee_pose[tid] = object_pose[tid]
gripper_state[tid] = GripperState.CLOSE
# wait for a while
if sm_wait_time[tid] >= PickSmWaitTime.GRASP_OBJECT:
# move to next state and reset wait time
sm_state[tid] = PickSmState.LIFT_OBJECT
sm_wait_time[tid] = 0.0
elif state == PickSmState.LIFT_OBJECT:
des_ee_pose[tid] = des_object_pose[tid]
gripper_state[tid] = GripperState.CLOSE
if distance_below_threshold(
wp.transform_get_translation(ee_pose[tid]),
wp.transform_get_translation(des_ee_pose[tid]),
position_threshold,
):
# wait for a while
if sm_wait_time[tid] >= PickSmWaitTime.LIFT_OBJECT:
# move to next state and reset wait time
sm_state[tid] = PickSmState.LIFT_OBJECT
sm_wait_time[tid] = 0.0
# increment wait time
sm_wait_time[tid] = sm_wait_time[tid] + dt[tid]
class PickAndLiftSm:
"""A simple state machine in a robot's task space to pick and lift an object.
The state machine is implemented as a warp kernel. It takes in the current state of
the robot's end-effector and the object, and outputs the desired state of the robot's
end-effector and the gripper. The state machine is implemented as a finite state
machine with the following states:
1. REST: The robot is at rest.
2. APPROACH_ABOVE_OBJECT: The robot moves above the object.
3. APPROACH_OBJECT: The robot moves to the object.
4. GRASP_OBJECT: The robot grasps the object.
5. LIFT_OBJECT: The robot lifts the object to the desired pose. This is the final state.
"""
def __init__(self, dt: float, num_envs: int, device: torch.device | str = "cpu", position_threshold=0.01):
"""Initialize the state machine.
Args:
dt: The environment time step.
num_envs: The number of environments to simulate.
device: The device to run the state machine on.
"""
# save parameters
self.dt = float(dt)
self.num_envs = num_envs
self.device = device
self.position_threshold = position_threshold
# initialize state machine
self.sm_dt = torch.full((self.num_envs,), self.dt, device=self.device)
self.sm_state = torch.full((self.num_envs,), 0, dtype=torch.int32, device=self.device)
self.sm_wait_time = torch.zeros((self.num_envs,), device=self.device)
# desired state
self.des_ee_pose = torch.zeros((self.num_envs, 7), device=self.device)
self.des_gripper_state = torch.full((self.num_envs,), 0.0, device=self.device)
# approach above object offset
self.offset = torch.zeros((self.num_envs, 7), device=self.device)
self.offset[:, 2] = 0.1
self.offset[:, -1] = 1.0 # warp expects quaternion as (x, y, z, w)
# convert to warp
self.sm_dt_wp = wp.from_torch(self.sm_dt, wp.float32)
self.sm_state_wp = wp.from_torch(self.sm_state, wp.int32)
self.sm_wait_time_wp = wp.from_torch(self.sm_wait_time, wp.float32)
self.des_ee_pose_wp = wp.from_torch(self.des_ee_pose, wp.transform)
self.des_gripper_state_wp = wp.from_torch(self.des_gripper_state, wp.float32)
self.offset_wp = wp.from_torch(self.offset, wp.transform)
def reset_idx(self, env_ids: Sequence[int] = None):
"""Reset the state machine."""
if env_ids is None:
env_ids = slice(None)
self.sm_state[env_ids] = 0
self.sm_wait_time[env_ids] = 0.0
def compute(self, ee_pose: torch.Tensor, object_pose: torch.Tensor, des_object_pose: torch.Tensor) -> torch.Tensor:
"""Compute the desired state of the robot's end-effector and the gripper."""
# convert all transformations from (w, x, y, z) to (x, y, z, w)
ee_pose = ee_pose[:, [0, 1, 2, 4, 5, 6, 3]]
object_pose = object_pose[:, [0, 1, 2, 4, 5, 6, 3]]
des_object_pose = des_object_pose[:, [0, 1, 2, 4, 5, 6, 3]]
# convert to warp
ee_pose_wp = wp.from_torch(ee_pose.contiguous(), wp.transform)
object_pose_wp = wp.from_torch(object_pose.contiguous(), wp.transform)
des_object_pose_wp = wp.from_torch(des_object_pose.contiguous(), wp.transform)
# run state machine
wp.launch(
kernel=infer_state_machine,
dim=self.num_envs,
inputs=[
self.sm_dt_wp,
self.sm_state_wp,
self.sm_wait_time_wp,
ee_pose_wp,
object_pose_wp,
des_object_pose_wp,
self.des_ee_pose_wp,
self.des_gripper_state_wp,
self.offset_wp,
self.position_threshold,
],
device=self.device,
)
# convert transformations back to (w, x, y, z)
des_ee_pose = self.des_ee_pose[:, [0, 1, 2, 6, 3, 4, 5]]
# convert to torch
return torch.cat([des_ee_pose, self.des_gripper_state.unsqueeze(-1)], dim=-1)
def main():
# parse configuration
env_cfg: LiftEnvCfg = parse_env_cfg(
"Isaac-Lift-Cube-Franka-IK-Abs-v0",
device=args_cli.device,
num_envs=args_cli.num_envs,
use_fabric=not args_cli.disable_fabric,
)
# create environment
env = gym.make("Isaac-Lift-Cube-Franka-IK-Abs-v0", cfg=env_cfg)
# reset environment at start
env.reset()
# create action buffers (position + quaternion)
actions = torch.zeros(env.unwrapped.action_space.shape, device=env.unwrapped.device)
actions[:, 3] = 1.0
# desired object orientation (we only do position control of object)
desired_orientation = torch.zeros((env.unwrapped.num_envs, 4), device=env.unwrapped.device)
desired_orientation[:, 1] = 1.0
# create state machine
pick_sm = PickAndLiftSm(
env_cfg.sim.dt * env_cfg.decimation, env.unwrapped.num_envs, env.unwrapped.device, position_threshold=0.01
)
while simulation_app.is_running():
# run everything in inference mode
with torch.inference_mode():
# step environment
dones = env.step(actions)[-2]
# observations
# -- end-effector frame
ee_frame_sensor = env.unwrapped.scene["ee_frame"]
tcp_rest_position = ee_frame_sensor.data.target_pos_w[..., 0, :].clone() - env.unwrapped.scene.env_origins
tcp_rest_orientation = ee_frame_sensor.data.target_quat_w[..., 0, :].clone()
# -- object frame
object_data: RigidObjectData = env.unwrapped.scene["object"].data
object_position = object_data.root_pos_w - env.unwrapped.scene.env_origins
# -- target object frame
desired_position = env.unwrapped.command_manager.get_command("object_pose")[..., :3]
# advance state machine
actions = pick_sm.compute(
torch.cat([tcp_rest_position, tcp_rest_orientation], dim=-1),
torch.cat([object_position, desired_orientation], dim=-1),
torch.cat([desired_position, desired_orientation], dim=-1),
)
# reset state machine
if dones.any():
pick_sm.reset_idx(dones.nonzero(as_tuple=False).squeeze(-1))
# close the environment
env.close()
if __name__ == "__main__":
# run the main function
main()
# close sim app
simulation_app.close()

View File

@@ -0,0 +1,341 @@
# Copyright (c) 2022-2026, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""
Script to demonstrate lifting a deformable object with a robotic arm.
The state machine is implemented in the kernel function `infer_state_machine`.
It uses the `warp` library to run the state machine in parallel on the GPU.
.. code-block:: bash
./isaaclab.sh -p scripts/environments/state_machine/lift_teddy_bear.py
"""
"""Launch Omniverse Toolkit first."""
import argparse
from isaaclab.app import AppLauncher
# add argparse arguments
parser = argparse.ArgumentParser(description="Pick and lift a teddy bear with a robotic arm.")
parser.add_argument("--num_envs", type=int, default=1, help="Number of environments to simulate.")
# append AppLauncher cli args
AppLauncher.add_app_launcher_args(parser)
# parse the arguments
args_cli = parser.parse_args()
# launch omniverse app
app_launcher = AppLauncher(headless=args_cli.headless)
simulation_app = app_launcher.app
# disable metrics assembler due to scene graph instancing
from isaacsim.core.utils.extensions import disable_extension
disable_extension("omni.usd.metrics.assembler.ui")
"""Rest everything else."""
from collections.abc import Sequence
import gymnasium as gym
import torch
import warp as wp
from isaaclab.assets.rigid_object.rigid_object_data import RigidObjectData
import isaaclab_tasks # noqa: F401
from isaaclab_tasks.manager_based.manipulation.lift.lift_env_cfg import LiftEnvCfg
from isaaclab_tasks.utils.parse_cfg import parse_env_cfg
# initialize warp
wp.init()
class GripperState:
"""States for the gripper."""
OPEN = wp.constant(1.0)
CLOSE = wp.constant(-1.0)
class PickSmState:
"""States for the pick state machine."""
REST = wp.constant(0)
APPROACH_ABOVE_OBJECT = wp.constant(1)
APPROACH_OBJECT = wp.constant(2)
GRASP_OBJECT = wp.constant(3)
LIFT_OBJECT = wp.constant(4)
OPEN_GRIPPER = wp.constant(5)
class PickSmWaitTime:
"""Additional wait times (in s) for states for before switching."""
REST = wp.constant(0.2)
APPROACH_ABOVE_OBJECT = wp.constant(0.5)
APPROACH_OBJECT = wp.constant(0.6)
GRASP_OBJECT = wp.constant(0.6)
LIFT_OBJECT = wp.constant(1.0)
OPEN_GRIPPER = wp.constant(0.0)
@wp.func
def distance_below_threshold(current_pos: wp.vec3, desired_pos: wp.vec3, threshold: float) -> bool:
return wp.length(current_pos - desired_pos) < threshold
@wp.kernel
def infer_state_machine(
dt: wp.array(dtype=float),
sm_state: wp.array(dtype=int),
sm_wait_time: wp.array(dtype=float),
ee_pose: wp.array(dtype=wp.transform),
object_pose: wp.array(dtype=wp.transform),
des_object_pose: wp.array(dtype=wp.transform),
des_ee_pose: wp.array(dtype=wp.transform),
gripper_state: wp.array(dtype=float),
offset: wp.array(dtype=wp.transform),
position_threshold: float,
):
# retrieve thread id
tid = wp.tid()
# retrieve state machine state
state = sm_state[tid]
# decide next state
if state == PickSmState.REST:
des_ee_pose[tid] = ee_pose[tid]
gripper_state[tid] = GripperState.OPEN
# wait for a while
if sm_wait_time[tid] >= PickSmWaitTime.REST:
# move to next state and reset wait time
sm_state[tid] = PickSmState.APPROACH_ABOVE_OBJECT
sm_wait_time[tid] = 0.0
elif state == PickSmState.APPROACH_ABOVE_OBJECT:
des_ee_pose[tid] = wp.transform_multiply(offset[tid], object_pose[tid])
gripper_state[tid] = GripperState.OPEN
if distance_below_threshold(
wp.transform_get_translation(ee_pose[tid]),
wp.transform_get_translation(des_ee_pose[tid]),
position_threshold,
):
# wait for a while
if sm_wait_time[tid] >= PickSmWaitTime.APPROACH_OBJECT:
# move to next state and reset wait time
sm_state[tid] = PickSmState.APPROACH_OBJECT
sm_wait_time[tid] = 0.0
elif state == PickSmState.APPROACH_OBJECT:
des_ee_pose[tid] = object_pose[tid]
gripper_state[tid] = GripperState.OPEN
if distance_below_threshold(
wp.transform_get_translation(ee_pose[tid]),
wp.transform_get_translation(des_ee_pose[tid]),
position_threshold,
):
# wait for a while
if sm_wait_time[tid] >= PickSmWaitTime.APPROACH_OBJECT:
# move to next state and reset wait time
sm_state[tid] = PickSmState.GRASP_OBJECT
sm_wait_time[tid] = 0.0
elif state == PickSmState.GRASP_OBJECT:
des_ee_pose[tid] = object_pose[tid]
gripper_state[tid] = GripperState.CLOSE
# wait for a while
if sm_wait_time[tid] >= PickSmWaitTime.GRASP_OBJECT:
# move to next state and reset wait time
sm_state[tid] = PickSmState.LIFT_OBJECT
sm_wait_time[tid] = 0.0
elif state == PickSmState.LIFT_OBJECT:
des_ee_pose[tid] = des_object_pose[tid]
gripper_state[tid] = GripperState.CLOSE
if distance_below_threshold(
wp.transform_get_translation(ee_pose[tid]),
wp.transform_get_translation(des_ee_pose[tid]),
position_threshold,
):
# wait for a while
if sm_wait_time[tid] >= PickSmWaitTime.LIFT_OBJECT:
# move to next state and reset wait time
sm_state[tid] = PickSmState.OPEN_GRIPPER
sm_wait_time[tid] = 0.0
elif state == PickSmState.OPEN_GRIPPER:
# des_ee_pose[tid] = object_pose[tid]
gripper_state[tid] = GripperState.OPEN
# wait for a while
if sm_wait_time[tid] >= PickSmWaitTime.OPEN_GRIPPER:
# move to next state and reset wait time
sm_state[tid] = PickSmState.OPEN_GRIPPER
sm_wait_time[tid] = 0.0
# increment wait time
sm_wait_time[tid] = sm_wait_time[tid] + dt[tid]
class PickAndLiftSm:
"""A simple state machine in a robot's task space to pick and lift an object.
The state machine is implemented as a warp kernel. It takes in the current state of
the robot's end-effector and the object, and outputs the desired state of the robot's
end-effector and the gripper. The state machine is implemented as a finite state
machine with the following states:
1. REST: The robot is at rest.
2. APPROACH_ABOVE_OBJECT: The robot moves above the object.
3. APPROACH_OBJECT: The robot moves to the object.
4. GRASP_OBJECT: The robot grasps the object.
5. LIFT_OBJECT: The robot lifts the object to the desired pose. This is the final state.
"""
def __init__(self, dt: float, num_envs: int, device: torch.device | str = "cpu", position_threshold=0.01):
"""Initialize the state machine.
Args:
dt: The environment time step.
num_envs: The number of environments to simulate.
device: The device to run the state machine on.
"""
# save parameters
self.dt = float(dt)
self.num_envs = num_envs
self.device = device
self.position_threshold = position_threshold
# initialize state machine
self.sm_dt = torch.full((self.num_envs,), self.dt, device=self.device)
self.sm_state = torch.full((self.num_envs,), 0, dtype=torch.int32, device=self.device)
self.sm_wait_time = torch.zeros((self.num_envs,), device=self.device)
# desired state
self.des_ee_pose = torch.zeros((self.num_envs, 7), device=self.device)
self.des_gripper_state = torch.full((self.num_envs,), 0.0, device=self.device)
# approach above object offset
self.offset = torch.zeros((self.num_envs, 7), device=self.device)
self.offset[:, 2] = 0.2
self.offset[:, -1] = 1.0 # warp expects quaternion as (x, y, z, w)
# convert to warp
self.sm_dt_wp = wp.from_torch(self.sm_dt, wp.float32)
self.sm_state_wp = wp.from_torch(self.sm_state, wp.int32)
self.sm_wait_time_wp = wp.from_torch(self.sm_wait_time, wp.float32)
self.des_ee_pose_wp = wp.from_torch(self.des_ee_pose, wp.transform)
self.des_gripper_state_wp = wp.from_torch(self.des_gripper_state, wp.float32)
self.offset_wp = wp.from_torch(self.offset, wp.transform)
def reset_idx(self, env_ids: Sequence[int] = None):
"""Reset the state machine."""
if env_ids is None:
env_ids = slice(None)
self.sm_state[env_ids] = 0
self.sm_wait_time[env_ids] = 0.0
def compute(self, ee_pose: torch.Tensor, object_pose: torch.Tensor, des_object_pose: torch.Tensor):
"""Compute the desired state of the robot's end-effector and the gripper."""
# convert all transformations from (w, x, y, z) to (x, y, z, w)
ee_pose = ee_pose[:, [0, 1, 2, 4, 5, 6, 3]]
object_pose = object_pose[:, [0, 1, 2, 4, 5, 6, 3]]
des_object_pose = des_object_pose[:, [0, 1, 2, 4, 5, 6, 3]]
# convert to warp
ee_pose_wp = wp.from_torch(ee_pose.contiguous(), wp.transform)
object_pose_wp = wp.from_torch(object_pose.contiguous(), wp.transform)
des_object_pose_wp = wp.from_torch(des_object_pose.contiguous(), wp.transform)
# run state machine
wp.launch(
kernel=infer_state_machine,
dim=self.num_envs,
inputs=[
self.sm_dt_wp,
self.sm_state_wp,
self.sm_wait_time_wp,
ee_pose_wp,
object_pose_wp,
des_object_pose_wp,
self.des_ee_pose_wp,
self.des_gripper_state_wp,
self.offset_wp,
self.position_threshold,
],
device=self.device,
)
# convert transformations back to (w, x, y, z)
des_ee_pose = self.des_ee_pose[:, [0, 1, 2, 6, 3, 4, 5]]
# convert to torch
return torch.cat([des_ee_pose, self.des_gripper_state.unsqueeze(-1)], dim=-1)
def main():
# parse configuration
env_cfg: LiftEnvCfg = parse_env_cfg(
"Isaac-Lift-Teddy-Bear-Franka-IK-Abs-v0",
device=args_cli.device,
num_envs=args_cli.num_envs,
)
env_cfg.viewer.eye = (2.1, 1.0, 1.3)
# create environment
env = gym.make("Isaac-Lift-Teddy-Bear-Franka-IK-Abs-v0", cfg=env_cfg)
# reset environment at start
env.reset()
# create action buffers (position + quaternion)
actions = torch.zeros(env.unwrapped.action_space.shape, device=env.unwrapped.device)
actions[:, 3] = 1.0
# desired rotation after grasping
desired_orientation = torch.zeros((env.unwrapped.num_envs, 4), device=env.unwrapped.device)
desired_orientation[:, 1] = 1.0
object_grasp_orientation = torch.zeros((env.unwrapped.num_envs, 4), device=env.unwrapped.device)
# z-axis pointing down and 45 degrees rotation
object_grasp_orientation[:, 1] = 0.9238795
object_grasp_orientation[:, 2] = -0.3826834
object_local_grasp_position = torch.tensor([0.02, -0.08, 0.0], device=env.unwrapped.device)
# create state machine
pick_sm = PickAndLiftSm(env_cfg.sim.dt * env_cfg.decimation, env.unwrapped.num_envs, env.unwrapped.device)
while simulation_app.is_running():
# run everything in inference mode
with torch.inference_mode():
# step environment
dones = env.step(actions)[-2]
# observations
# -- end-effector frame
ee_frame_sensor = env.unwrapped.scene["ee_frame"]
tcp_rest_position = ee_frame_sensor.data.target_pos_w[..., 0, :].clone() - env.unwrapped.scene.env_origins
tcp_rest_orientation = ee_frame_sensor.data.target_quat_w[..., 0, :].clone()
# -- object frame
object_data: RigidObjectData = env.unwrapped.scene["object"].data
object_position = object_data.root_pos_w - env.unwrapped.scene.env_origins
object_position += object_local_grasp_position
# -- target object frame
desired_position = env.unwrapped.command_manager.get_command("object_pose")[..., :3]
# advance state machine
actions = pick_sm.compute(
torch.cat([tcp_rest_position, tcp_rest_orientation], dim=-1),
torch.cat([object_position, object_grasp_orientation], dim=-1),
torch.cat([desired_position, desired_orientation], dim=-1),
)
# reset state machine
if dones.any():
pick_sm.reset_idx(dones.nonzero(as_tuple=False).squeeze(-1))
# close the environment
env.close()
if __name__ == "__main__":
# run the main function
main()
# close sim app
simulation_app.close()

View File

@@ -0,0 +1,333 @@
# Copyright (c) 2022-2026, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""
Script to run an environment with a cabinet opening state machine.
The state machine is implemented in the kernel function `infer_state_machine`.
It uses the `warp` library to run the state machine in parallel on the GPU.
.. code-block:: bash
./isaaclab.sh -p scripts/environments/state_machine/open_cabinet_sm.py --num_envs 32
"""
"""Launch Omniverse Toolkit first."""
import argparse
from isaaclab.app import AppLauncher
# add argparse arguments
parser = argparse.ArgumentParser(description="Pick and lift state machine for cabinet environments.")
parser.add_argument(
"--disable_fabric", action="store_true", default=False, help="Disable fabric and use USD I/O operations."
)
parser.add_argument("--num_envs", type=int, default=None, help="Number of environments to simulate.")
# append AppLauncher cli args
AppLauncher.add_app_launcher_args(parser)
# parse the arguments
args_cli = parser.parse_args()
# launch omniverse app
app_launcher = AppLauncher(headless=args_cli.headless)
simulation_app = app_launcher.app
"""Rest everything else."""
from collections.abc import Sequence
import gymnasium as gym
import torch
import warp as wp
from isaaclab.sensors import FrameTransformer
import isaaclab_tasks # noqa: F401
from isaaclab_tasks.manager_based.manipulation.cabinet.cabinet_env_cfg import CabinetEnvCfg
from isaaclab_tasks.utils.parse_cfg import parse_env_cfg
# initialize warp
wp.init()
class GripperState:
"""States for the gripper."""
OPEN = wp.constant(1.0)
CLOSE = wp.constant(-1.0)
class OpenDrawerSmState:
"""States for the cabinet drawer opening state machine."""
REST = wp.constant(0)
APPROACH_INFRONT_HANDLE = wp.constant(1)
APPROACH_HANDLE = wp.constant(2)
GRASP_HANDLE = wp.constant(3)
OPEN_DRAWER = wp.constant(4)
RELEASE_HANDLE = wp.constant(5)
class OpenDrawerSmWaitTime:
"""Additional wait times (in s) for states for before switching."""
REST = wp.constant(0.5)
APPROACH_INFRONT_HANDLE = wp.constant(1.25)
APPROACH_HANDLE = wp.constant(1.0)
GRASP_HANDLE = wp.constant(1.0)
OPEN_DRAWER = wp.constant(3.0)
RELEASE_HANDLE = wp.constant(0.2)
@wp.func
def distance_below_threshold(current_pos: wp.vec3, desired_pos: wp.vec3, threshold: float) -> bool:
return wp.length(current_pos - desired_pos) < threshold
@wp.kernel
def infer_state_machine(
dt: wp.array(dtype=float),
sm_state: wp.array(dtype=int),
sm_wait_time: wp.array(dtype=float),
ee_pose: wp.array(dtype=wp.transform),
handle_pose: wp.array(dtype=wp.transform),
des_ee_pose: wp.array(dtype=wp.transform),
gripper_state: wp.array(dtype=float),
handle_approach_offset: wp.array(dtype=wp.transform),
handle_grasp_offset: wp.array(dtype=wp.transform),
drawer_opening_rate: wp.array(dtype=wp.transform),
position_threshold: float,
):
# retrieve thread id
tid = wp.tid()
# retrieve state machine state
state = sm_state[tid]
# decide next state
if state == OpenDrawerSmState.REST:
des_ee_pose[tid] = ee_pose[tid]
gripper_state[tid] = GripperState.OPEN
# wait for a while
if sm_wait_time[tid] >= OpenDrawerSmWaitTime.REST:
# move to next state and reset wait time
sm_state[tid] = OpenDrawerSmState.APPROACH_INFRONT_HANDLE
sm_wait_time[tid] = 0.0
elif state == OpenDrawerSmState.APPROACH_INFRONT_HANDLE:
des_ee_pose[tid] = wp.transform_multiply(handle_approach_offset[tid], handle_pose[tid])
gripper_state[tid] = GripperState.OPEN
if distance_below_threshold(
wp.transform_get_translation(ee_pose[tid]),
wp.transform_get_translation(des_ee_pose[tid]),
position_threshold,
):
# wait for a while
if sm_wait_time[tid] >= OpenDrawerSmWaitTime.APPROACH_INFRONT_HANDLE:
# move to next state and reset wait time
sm_state[tid] = OpenDrawerSmState.APPROACH_HANDLE
sm_wait_time[tid] = 0.0
elif state == OpenDrawerSmState.APPROACH_HANDLE:
des_ee_pose[tid] = handle_pose[tid]
gripper_state[tid] = GripperState.OPEN
if distance_below_threshold(
wp.transform_get_translation(ee_pose[tid]),
wp.transform_get_translation(des_ee_pose[tid]),
position_threshold,
):
# wait for a while
if sm_wait_time[tid] >= OpenDrawerSmWaitTime.APPROACH_HANDLE:
# move to next state and reset wait time
sm_state[tid] = OpenDrawerSmState.GRASP_HANDLE
sm_wait_time[tid] = 0.0
elif state == OpenDrawerSmState.GRASP_HANDLE:
des_ee_pose[tid] = wp.transform_multiply(handle_grasp_offset[tid], handle_pose[tid])
gripper_state[tid] = GripperState.CLOSE
# wait for a while
if sm_wait_time[tid] >= OpenDrawerSmWaitTime.GRASP_HANDLE:
# move to next state and reset wait time
sm_state[tid] = OpenDrawerSmState.OPEN_DRAWER
sm_wait_time[tid] = 0.0
elif state == OpenDrawerSmState.OPEN_DRAWER:
des_ee_pose[tid] = wp.transform_multiply(drawer_opening_rate[tid], handle_pose[tid])
gripper_state[tid] = GripperState.CLOSE
# wait for a while
if sm_wait_time[tid] >= OpenDrawerSmWaitTime.OPEN_DRAWER:
# move to next state and reset wait time
sm_state[tid] = OpenDrawerSmState.RELEASE_HANDLE
sm_wait_time[tid] = 0.0
elif state == OpenDrawerSmState.RELEASE_HANDLE:
des_ee_pose[tid] = ee_pose[tid]
gripper_state[tid] = GripperState.CLOSE
# wait for a while
if sm_wait_time[tid] >= OpenDrawerSmWaitTime.RELEASE_HANDLE:
# move to next state and reset wait time
sm_state[tid] = OpenDrawerSmState.RELEASE_HANDLE
sm_wait_time[tid] = 0.0
# increment wait time
sm_wait_time[tid] = sm_wait_time[tid] + dt[tid]
class OpenDrawerSm:
"""A simple state machine in a robot's task space to open a drawer in the cabinet.
The state machine is implemented as a warp kernel. It takes in the current state of
the robot's end-effector and the object, and outputs the desired state of the robot's
end-effector and the gripper. The state machine is implemented as a finite state
machine with the following states:
1. REST: The robot is at rest.
2. APPROACH_HANDLE: The robot moves towards the handle of the drawer.
3. GRASP_HANDLE: The robot grasps the handle of the drawer.
4. OPEN_DRAWER: The robot opens the drawer.
5. RELEASE_HANDLE: The robot releases the handle of the drawer. This is the final state.
"""
def __init__(self, dt: float, num_envs: int, device: torch.device | str = "cpu", position_threshold=0.01):
"""Initialize the state machine.
Args:
dt: The environment time step.
num_envs: The number of environments to simulate.
device: The device to run the state machine on.
"""
# save parameters
self.dt = float(dt)
self.num_envs = num_envs
self.device = device
self.position_threshold = position_threshold
# initialize state machine
self.sm_dt = torch.full((self.num_envs,), self.dt, device=self.device)
self.sm_state = torch.full((self.num_envs,), 0, dtype=torch.int32, device=self.device)
self.sm_wait_time = torch.zeros((self.num_envs,), device=self.device)
# desired state
self.des_ee_pose = torch.zeros((self.num_envs, 7), device=self.device)
self.des_gripper_state = torch.full((self.num_envs,), 0.0, device=self.device)
# approach in front of the handle
self.handle_approach_offset = torch.zeros((self.num_envs, 7), device=self.device)
self.handle_approach_offset[:, 0] = -0.1
self.handle_approach_offset[:, -1] = 1.0 # warp expects quaternion as (x, y, z, w)
# handle grasp offset
self.handle_grasp_offset = torch.zeros((self.num_envs, 7), device=self.device)
self.handle_grasp_offset[:, 0] = 0.025
self.handle_grasp_offset[:, -1] = 1.0 # warp expects quaternion as (x, y, z, w)
# drawer opening rate
self.drawer_opening_rate = torch.zeros((self.num_envs, 7), device=self.device)
self.drawer_opening_rate[:, 0] = -0.015
self.drawer_opening_rate[:, -1] = 1.0 # warp expects quaternion as (x, y, z, w)
# convert to warp
self.sm_dt_wp = wp.from_torch(self.sm_dt, wp.float32)
self.sm_state_wp = wp.from_torch(self.sm_state, wp.int32)
self.sm_wait_time_wp = wp.from_torch(self.sm_wait_time, wp.float32)
self.des_ee_pose_wp = wp.from_torch(self.des_ee_pose, wp.transform)
self.des_gripper_state_wp = wp.from_torch(self.des_gripper_state, wp.float32)
self.handle_approach_offset_wp = wp.from_torch(self.handle_approach_offset, wp.transform)
self.handle_grasp_offset_wp = wp.from_torch(self.handle_grasp_offset, wp.transform)
self.drawer_opening_rate_wp = wp.from_torch(self.drawer_opening_rate, wp.transform)
def reset_idx(self, env_ids: Sequence[int] | None = None):
"""Reset the state machine."""
if env_ids is None:
env_ids = slice(None)
# reset state machine
self.sm_state[env_ids] = 0
self.sm_wait_time[env_ids] = 0.0
def compute(self, ee_pose: torch.Tensor, handle_pose: torch.Tensor):
"""Compute the desired state of the robot's end-effector and the gripper."""
# convert all transformations from (w, x, y, z) to (x, y, z, w)
ee_pose = ee_pose[:, [0, 1, 2, 4, 5, 6, 3]]
handle_pose = handle_pose[:, [0, 1, 2, 4, 5, 6, 3]]
# convert to warp
ee_pose_wp = wp.from_torch(ee_pose.contiguous(), wp.transform)
handle_pose_wp = wp.from_torch(handle_pose.contiguous(), wp.transform)
# run state machine
wp.launch(
kernel=infer_state_machine,
dim=self.num_envs,
inputs=[
self.sm_dt_wp,
self.sm_state_wp,
self.sm_wait_time_wp,
ee_pose_wp,
handle_pose_wp,
self.des_ee_pose_wp,
self.des_gripper_state_wp,
self.handle_approach_offset_wp,
self.handle_grasp_offset_wp,
self.drawer_opening_rate_wp,
self.position_threshold,
],
device=self.device,
)
# convert transformations back to (w, x, y, z)
des_ee_pose = self.des_ee_pose[:, [0, 1, 2, 6, 3, 4, 5]]
# convert to torch
return torch.cat([des_ee_pose, self.des_gripper_state.unsqueeze(-1)], dim=-1)
def main():
# parse configuration
env_cfg: CabinetEnvCfg = parse_env_cfg(
"Isaac-Open-Drawer-Franka-IK-Abs-v0",
device=args_cli.device,
num_envs=args_cli.num_envs,
use_fabric=not args_cli.disable_fabric,
)
# create environment
env = gym.make("Isaac-Open-Drawer-Franka-IK-Abs-v0", cfg=env_cfg)
# reset environment at start
env.reset()
# create action buffers (position + quaternion)
actions = torch.zeros(env.unwrapped.action_space.shape, device=env.unwrapped.device)
actions[:, 3] = 1.0
# desired object orientation (we only do position control of object)
desired_orientation = torch.zeros((env.unwrapped.num_envs, 4), device=env.unwrapped.device)
desired_orientation[:, 1] = 1.0
# create state machine
open_sm = OpenDrawerSm(env_cfg.sim.dt * env_cfg.decimation, env.unwrapped.num_envs, env.unwrapped.device)
while simulation_app.is_running():
# run everything in inference mode
with torch.inference_mode():
# step environment
dones = env.step(actions)[-2]
# observations
# -- end-effector frame
ee_frame_tf: FrameTransformer = env.unwrapped.scene["ee_frame"]
tcp_rest_position = ee_frame_tf.data.target_pos_w[..., 0, :].clone() - env.unwrapped.scene.env_origins
tcp_rest_orientation = ee_frame_tf.data.target_quat_w[..., 0, :].clone()
# -- handle frame
cabinet_frame_tf: FrameTransformer = env.unwrapped.scene["cabinet_frame"]
cabinet_position = cabinet_frame_tf.data.target_pos_w[..., 0, :].clone() - env.unwrapped.scene.env_origins
cabinet_orientation = cabinet_frame_tf.data.target_quat_w[..., 0, :].clone()
# advance state machine
actions = open_sm.compute(
torch.cat([tcp_rest_position, tcp_rest_orientation], dim=-1),
torch.cat([cabinet_position, cabinet_orientation], dim=-1),
)
# reset state machine
if dones.any():
open_sm.reset_idx(dones.nonzero(as_tuple=False).squeeze(-1))
# close the environment
env.close()
if __name__ == "__main__":
# run the main execution
main()
# close sim app
simulation_app.close()

View File

@@ -0,0 +1,283 @@
# Copyright (c) 2022-2026, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""Script to run teleoperation with Isaac Lab manipulation environments.
Supports multiple input devices (e.g., keyboard, spacemouse, gamepad) and devices
configured within the environment (including OpenXR-based hand tracking or motion
controllers)."""
"""Launch Isaac Sim Simulator first."""
import argparse
from collections.abc import Callable
from isaaclab.app import AppLauncher
# add argparse arguments
parser = argparse.ArgumentParser(description="Teleoperation for Isaac Lab environments.")
parser.add_argument("--num_envs", type=int, default=1, help="Number of environments to simulate.")
parser.add_argument(
"--teleop_device",
type=str,
default="keyboard",
help=(
"Teleop device. Set here (legacy) or via the environment config. If using the environment config, pass the"
" device key/name defined under 'teleop_devices' (it can be a custom name, not necessarily 'handtracking')."
" Built-ins: keyboard, spacemouse, gamepad. Not all tasks support all built-ins."
),
)
parser.add_argument("--task", type=str, default=None, help="Name of the task.")
parser.add_argument("--sensitivity", type=float, default=1.0, help="Sensitivity factor.")
parser.add_argument(
"--enable_pinocchio",
action="store_true",
default=False,
help="Enable Pinocchio.",
)
# append AppLauncher cli args
AppLauncher.add_app_launcher_args(parser)
# parse the arguments
args_cli = parser.parse_args()
app_launcher_args = vars(args_cli)
if args_cli.enable_pinocchio:
# Import pinocchio before AppLauncher to force the use of the version installed by IsaacLab and
# not the one installed by Isaac Sim pinocchio is required by the Pink IK controllers and the
# GR1T2 retargeter
import pinocchio # noqa: F401
if "handtracking" in args_cli.teleop_device.lower():
app_launcher_args["xr"] = True
# launch omniverse app
app_launcher = AppLauncher(app_launcher_args)
simulation_app = app_launcher.app
"""Rest everything follows."""
import logging
import gymnasium as gym
import torch
from isaaclab.devices import Se3Gamepad, Se3GamepadCfg, Se3Keyboard, Se3KeyboardCfg, Se3SpaceMouse, Se3SpaceMouseCfg
from isaaclab.devices.openxr import remove_camera_configs
from isaaclab.devices.teleop_device_factory import create_teleop_device
from isaaclab.envs import ManagerBasedRLEnvCfg
from isaaclab.managers import TerminationTermCfg as DoneTerm
import isaaclab_tasks # noqa: F401
import mindbot.tasks # noqa: F401
from isaaclab_tasks.manager_based.manipulation.lift import mdp
from isaaclab_tasks.utils import parse_env_cfg
if args_cli.enable_pinocchio:
import isaaclab_tasks.manager_based.locomanipulation.pick_place # noqa: F401
import isaaclab_tasks.manager_based.manipulation.pick_place # noqa: F401
# import logger
logger = logging.getLogger(__name__)
def main() -> None:
"""
Run teleoperation with an Isaac Lab manipulation environment.
Creates the environment, sets up teleoperation interfaces and callbacks,
and runs the main simulation loop until the application is closed.
Returns:
None
"""
# parse configuration
env_cfg = parse_env_cfg(args_cli.task, device=args_cli.device, num_envs=args_cli.num_envs)
env_cfg.env_name = args_cli.task
if not isinstance(env_cfg, ManagerBasedRLEnvCfg):
raise ValueError(
"Teleoperation is only supported for ManagerBasedRLEnv environments. "
f"Received environment config type: {type(env_cfg).__name__}"
)
# modify configuration
env_cfg.terminations.time_out = None
if "Lift" in args_cli.task:
# set the resampling time range to large number to avoid resampling
env_cfg.commands.object_pose.resampling_time_range = (1.0e9, 1.0e9)
# add termination condition for reaching the goal otherwise the environment won't reset
env_cfg.terminations.object_reached_goal = DoneTerm(func=mdp.object_reached_goal)
if args_cli.xr:
env_cfg = remove_camera_configs(env_cfg)
env_cfg.sim.render.antialiasing_mode = "DLSS"
try:
# create environment
env = gym.make(args_cli.task, cfg=env_cfg).unwrapped
# check environment name (for reach , we don't allow the gripper)
if "Reach" in args_cli.task:
logger.warning(
f"The environment '{args_cli.task}' does not support gripper control. The device command will be"
" ignored."
)
except Exception as e:
logger.error(f"Failed to create environment: {e}")
simulation_app.close()
return
# Flags for controlling teleoperation flow
should_reset_recording_instance = False
teleoperation_active = True
# Callback handlers
def reset_recording_instance() -> None:
"""
Reset the environment to its initial state.
Sets a flag to reset the environment on the next simulation step.
Returns:
None
"""
nonlocal should_reset_recording_instance
should_reset_recording_instance = True
print("Reset triggered - Environment will reset on next step")
def start_teleoperation() -> None:
"""
Activate teleoperation control of the robot.
Enables the application of teleoperation commands to the environment.
Returns:
None
"""
nonlocal teleoperation_active
teleoperation_active = True
print("Teleoperation activated")
def stop_teleoperation() -> None:
"""
Deactivate teleoperation control of the robot.
Disables the application of teleoperation commands to the environment.
Returns:
None
"""
nonlocal teleoperation_active
teleoperation_active = False
print("Teleoperation deactivated")
# Create device config if not already in env_cfg
teleoperation_callbacks: dict[str, Callable[[], None]] = {
"R": reset_recording_instance,
"START": start_teleoperation,
"STOP": stop_teleoperation,
"RESET": reset_recording_instance,
}
# For hand tracking devices, add additional callbacks
if args_cli.xr:
# Default to inactive for hand tracking
teleoperation_active = False
else:
# Always active for other devices
teleoperation_active = True
# Create teleop device from config if present, otherwise create manually
teleop_interface = None
try:
if hasattr(env_cfg, "teleop_devices") and args_cli.teleop_device in env_cfg.teleop_devices.devices:
teleop_interface = create_teleop_device(
args_cli.teleop_device, env_cfg.teleop_devices.devices, teleoperation_callbacks
)
else:
logger.warning(
f"No teleop device '{args_cli.teleop_device}' found in environment config. Creating default."
)
# Create fallback teleop device
sensitivity = args_cli.sensitivity
if args_cli.teleop_device.lower() == "keyboard":
teleop_interface = Se3Keyboard(
Se3KeyboardCfg(pos_sensitivity=0.05 * sensitivity, rot_sensitivity=0.05 * sensitivity)
)
elif args_cli.teleop_device.lower() == "spacemouse":
teleop_interface = Se3SpaceMouse(
Se3SpaceMouseCfg(pos_sensitivity=0.05 * sensitivity, rot_sensitivity=0.05 * sensitivity)
)
elif args_cli.teleop_device.lower() == "gamepad":
teleop_interface = Se3Gamepad(
Se3GamepadCfg(pos_sensitivity=0.1 * sensitivity, rot_sensitivity=0.1 * sensitivity)
)
else:
logger.error(f"Unsupported teleop device: {args_cli.teleop_device}")
logger.error("Configure the teleop device in the environment config.")
env.close()
simulation_app.close()
return
# Add callbacks to fallback device
for key, callback in teleoperation_callbacks.items():
try:
teleop_interface.add_callback(key, callback)
except (ValueError, TypeError) as e:
logger.warning(f"Failed to add callback for key {key}: {e}")
except Exception as e:
logger.error(f"Failed to create teleop device: {e}")
env.close()
simulation_app.close()
return
if teleop_interface is None:
logger.error("Failed to create teleop interface")
env.close()
simulation_app.close()
return
print(f"Using teleop device: {teleop_interface}")
# reset environment
env.reset()
teleop_interface.reset()
print("Teleoperation started. Press 'R' to reset the environment.")
# simulate environment
while simulation_app.is_running():
try:
# run everything in inference mode
with torch.inference_mode():
# get device command
action = teleop_interface.advance()
# Only apply teleop commands when active
if teleoperation_active:
# process actions
actions = action.repeat(env.num_envs, 1)
# apply actions
env.step(actions)
else:
env.sim.render()
if should_reset_recording_instance:
env.reset()
teleop_interface.reset()
should_reset_recording_instance = False
print("Environment reset complete")
except Exception as e:
logger.error(f"Error during simulation step: {e}")
break
# close the simulator
env.close()
print("Environment closed")
if __name__ == "__main__":
# run the main function
main()
# close sim app
simulation_app.close()

View File

@@ -0,0 +1,72 @@
# Copyright (c) 2022-2026, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""Script to run an environment with zero action agent."""
"""Launch Isaac Sim Simulator first."""
import argparse
from isaaclab.app import AppLauncher
# add argparse arguments
parser = argparse.ArgumentParser(description="Zero agent for Isaac Lab environments.")
parser.add_argument(
"--disable_fabric", action="store_true", default=False, help="Disable fabric and use USD I/O operations."
)
parser.add_argument("--num_envs", type=int, default=None, help="Number of environments to simulate.")
parser.add_argument("--task", type=str, default=None, help="Name of the task.")
# append AppLauncher cli args
AppLauncher.add_app_launcher_args(parser)
# parse the arguments
args_cli = parser.parse_args()
# launch omniverse app
app_launcher = AppLauncher(args_cli)
simulation_app = app_launcher.app
"""Rest everything follows."""
import gymnasium as gym
import torch
import isaaclab_tasks # noqa: F401
from isaaclab_tasks.utils import parse_env_cfg
# PLACEHOLDER: Extension template (do not remove this comment)
def main():
"""Zero actions agent with Isaac Lab environment."""
# parse configuration
env_cfg = parse_env_cfg(
args_cli.task, device=args_cli.device, num_envs=args_cli.num_envs, use_fabric=not args_cli.disable_fabric
)
# create environment
env = gym.make(args_cli.task, cfg=env_cfg)
# print info (this is vectorized environment)
print(f"[INFO]: Gym observation space: {env.observation_space}")
print(f"[INFO]: Gym action space: {env.action_space}")
# reset environment
env.reset()
# simulate environment
while simulation_app.is_running():
# run everything in inference mode
with torch.inference_mode():
# compute zero actions
actions = torch.zeros(env.action_space.shape, device=env.unwrapped.device)
# apply actions
env.step(actions)
# close the simulator
env.close()
if __name__ == "__main__":
# run the main function
main()
# close sim app
simulation_app.close()

View File

@@ -52,7 +52,7 @@ def main():
index = 0
# acquire all Isaac environments names
for task_spec in gym.registry.values():
if "Template-" in task_spec.id and (args_cli.keyword is None or args_cli.keyword in task_spec.id):
if ("Template-" in task_spec.id or "Isaac-" in task_spec.id) and (args_cli.keyword is None or args_cli.keyword in task_spec.id):
# add details to table
table.add_row([index + 1, task_spec.id, task_spec.entry_point, task_spec.kwargs["env_cfg_entry_point"]])
# increment count