Files
issacdataengine/workflows/simbox/core/skills/track.py
Tangger 03d9a5b909 fix: IS 4.5.0 -> 5.0.0 migration — USD metadata, DomeLight, scene reuse
- Fix USD metersPerUnit/upAxis for IS 5.0.0 (no longer auto-compensated)
- Batch fix all Aligned_obj.usd, table, and art USD files with backups
- Fix DomeLight rotation to Z-axis only (prevent tilted environment map)
- Fix scene reuse across episodes (arena_file caching, task clearing, prim guard)
- Add migration tools: scan_usd_metadata.py, fix_usd_metadata.py
- Add migration guide: migerate/migerate.md
- Add nvidia-curobo to .gitignore
- Fix sort_the_rubbish config: obj_0 -> obj_1 (obj_0 does not exist)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-03 11:10:39 +08:00

221 lines
8.9 KiB
Python

import numpy as np
from core.skills.base_skill import BaseSkill, register_skill
from core.utils.transformation_utils import get_orientation, perturb_orientation
from core.utils.usd_geom_utils import compute_bbox
from omegaconf import DictConfig
from omni.isaac.core.controllers import BaseController
from omni.isaac.core.objects.cylinder import VisualCylinder
from omni.isaac.core.robots.robot import Robot
from omni.isaac.core.tasks import BaseTask
from omni.isaac.core.utils.prims import get_prim_at_path
from omni.isaac.core.utils.transformations import (
get_relative_transform,
tf_matrix_from_pose,
)
from scipy.spatial.transform import Rotation as R
# pylint: disable=consider-using-generator,too-many-public-methods,unused-argument
@register_skill
class Track(BaseSkill):
def __init__(
self, robot: Robot, controller: BaseController, task: BaseTask, cfg: DictConfig, world, *args, **kwargs
):
super().__init__()
self.robot = robot
self.controller = controller
self.task = task
self.skill_cfg = cfg
self.world = world
self.frame = self.skill_cfg.get("frame", "robot")
self.robot_base_path = self.controller.robot_base_path
self.T_base_2_world = get_relative_transform(
get_prim_at_path(self.robot_base_path), get_prim_at_path(self.task.root_prim_path)
)
self.table_2_base = self.cal_table_2_base()
self.way_points = self.sample_waypoints()
self.last_target_trans = self.way_points[-1][0]
self.last_target_ori = get_orientation(None, self.way_points[-1][1])
if "left" in controller.robot_file:
self.robot_lr = "left"
self.visual_color = {
"x": np.array([1.0, 0.0, 0.0]),
"y": np.array([0.0, 1.0, 0.0]),
"z": np.array([0.0, 0.0, 1.0]),
}
elif "right" in controller.robot_file:
self.robot_lr = "right"
self.visual_color = {
"x": np.array([1.0, 1.0, 0.0]),
"y": np.array([0.0, 1.0, 1.0]),
"z": np.array([1.0, 0.0, 1.0]),
}
self.vs_cylinder_radius = 0.005
self.vs_cylinder_height = 0.13
self.T_tcp_2_ee = np.array(self.skill_cfg.get("T_tcp_2_ee", np.eye(4)))
# !!! keyposes should be generated after previous skill is done
self.manip_list = []
T_tcp_2_world = self.get_tcp_pose()
trans, ori = self.cal_axis(T_tcp_2_world)
for axis in ["x", "y", "z"]:
self.task.visuals[f"{axis}_{self.robot_lr}"] = VisualCylinder(
prim_path=f"/World/visual/{axis}_{self.robot_lr}",
radius=self.vs_cylinder_radius,
height=self.vs_cylinder_height,
translation=trans[axis],
orientation=ori[axis],
color=self.visual_color[f"{axis}"],
)
def simple_generate_manip_cmds(self):
manip_list = []
p_base_ee_cur, q_base_ee_cur = self.controller.get_ee_pose()
cmd = (p_base_ee_cur, q_base_ee_cur, "update_pose_cost_metric", {"hold_vec_weight": [0, 0, 0, 0, 0, 0]})
manip_list.append(cmd)
if self.frame == "robot":
for target in self.way_points:
cmd = (np.array(target[0]), get_orientation(None, target[1]), "open_gripper", {})
manip_list.append(cmd)
else:
raise NotImplementedError
self.manip_list = manip_list
def get_tcp_pose(self, frame: str = "world"):
if frame == "world":
p_base_ee, q_base_ee = self.controller.get_ee_pose()
T_ee_2_base = tf_matrix_from_pose(p_base_ee, q_base_ee)
T_tcp_2_world = self.T_base_2_world @ T_ee_2_base @ self.T_tcp_2_ee
return T_tcp_2_world
else:
raise NotImplementedError
def visualize_target(self, world):
if len(self.manip_list) > 0:
p_base_ee, q_base_ee, *_ = self.manip_list[0]
T_ee_2_base = tf_matrix_from_pose(p_base_ee, q_base_ee)
T_tcp_2_world = self.T_base_2_world @ T_ee_2_base @ self.T_tcp_2_ee
trans, ori = self.cal_axis(T_tcp_2_world)
for axis in ["x", "y", "z"]:
self.task.visuals[f"{axis}_{self.robot_lr}"].set_world_pose(trans[axis], ori[axis])
if self.curr_way_points_num > len(self.manip_list):
self.curr_way_points_num -= 1
for _ in range(40):
world.step(render=True)
# elif self.task.visuals["x"] and self.task.visuals["y"] and self.task.visuals["z"]:
# for axis in ['x', 'y', 'z']:
# delete_prim(f"/World/visual/{axis}_{self.robot_lr}")
# for i in range(40):
# world.step(render=True)
def cal_axis(self, T):
origin = T[:3, 3]
rotation_matrix = T[:3, :3]
trans = {}
ori = {}
axis_table = ["x", "y", "z"]
for i in range(3):
axis = rotation_matrix[:, i]
axis_start = origin
axis_end = origin + axis * self.vs_cylinder_height
trans[axis_table[i]] = self.calculate_cylinder_center(axis_start, axis_end)
ori[axis_table[i]] = self.calculate_orientation(axis)
return trans, ori
def calculate_cylinder_center(self, start_point, end_point):
return (start_point + end_point) / 2
def calculate_orientation(self, axis_vector):
default_z_axis = np.array([0, 0, 1])
target_axis = axis_vector / np.linalg.norm(axis_vector)
rotation = R.align_vectors([target_axis], [default_z_axis])[0]
return rotation.as_quat(scalar_first=True)
def cal_table_2_base(self):
tgt = self.task.fixtures["table"]
bbox_tgt = compute_bbox(tgt.prim)
print(f"[BBOX_DBG] table bbox min={list(bbox_tgt.min)}, max={list(bbox_tgt.max)}", flush=True)
print(f"[BBOX_DBG] T_base_2_world translation={self.T_base_2_world[:3, 3]}", flush=True)
table_center = (np.asarray(bbox_tgt.min) + np.asarray(bbox_tgt.max)) / 2
tgt_z_max = bbox_tgt.max[2]
table_center[2] = tgt_z_max
base_trans = self.T_base_2_world[:3, 3]
table_2_base = table_center - base_trans
table_2_base[0], table_2_base[1] = table_2_base[1], -table_2_base[0]
return table_2_base
def from_table_2_base(self, trans, ori):
return (np.array(trans) + self.table_2_base).tolist(), ori
def sample_waypoints(self):
way_points_num = self.skill_cfg.get("way_points_num", 1)
self.curr_way_points_num = way_points_num
way_points_trans = self.skill_cfg.get("way_points_trans", None)
way_points_ori = np.array(self.skill_cfg.get("way_points_ori", None))
trans_min = np.array(way_points_trans["min"])
trans_max = np.array(way_points_trans["max"])
way_points = []
for i in range(way_points_num):
while True:
print(f"... sampling waypoint {i} ...")
x = np.random.uniform(trans_min[0], trans_max[0])
y = np.random.uniform(trans_min[1], trans_max[1])
z = np.random.uniform(trans_min[2], trans_max[2])
trans = [x, y, z]
ori = perturb_orientation(way_points_ori, self.skill_cfg.get("max_noise_deg", 5))
trans, ori = self.from_table_2_base(trans, ori)
if self.controller.test_single_ik(trans, ori):
way_points.append([trans, ori.tolist()])
break
for p in way_points:
print(p)
return way_points
def is_feasible(self, th=5):
return self.controller.num_plan_failed <= th
def is_subtask_done(self, t_eps=1e-3, o_eps=5e-3):
assert len(self.manip_list) != 0
p_base_ee_cur, q_base_ee_cur = self.controller.get_ee_pose()
p_base_ee, q_base_ee, *_ = self.manip_list[0]
diff_trans = np.linalg.norm(p_base_ee_cur - p_base_ee)
diff_ori = 2 * np.arccos(min(abs(np.dot(q_base_ee_cur, q_base_ee)), 1.0))
pose_flag = np.logical_and(
diff_trans < t_eps,
diff_ori < o_eps,
)
self.plan_flag = self.controller.num_last_cmd > 10
return np.logical_or(pose_flag, self.plan_flag)
def is_done(self):
if len(self.manip_list) == 0:
return True
if self.is_subtask_done():
self.manip_list.pop(0)
return len(self.manip_list) == 0
def is_success(self, t_eps=5e-3, o_eps=0.087):
p_base_ee_cur, q_base_ee_cur = self.controller.get_ee_pose()
p_base_ee, q_base_ee = self.last_target_trans, self.last_target_ori
diff_trans = np.linalg.norm(p_base_ee_cur - p_base_ee)
diff_ori = 2 * np.arccos(min(abs(np.dot(q_base_ee_cur, q_base_ee)), 1.0))
pose_flag = np.logical_or(
diff_trans < t_eps,
diff_ori < o_eps,
)
return pose_flag