Files
issacdataengine/workflows/simbox/solver/planner.py
2026-03-16 11:44:10 +00:00

988 lines
42 KiB
Python

# pylint: skip-file
# flake8: noqa
import json
import time
import carb
import IPython
import numpy as np
import solver.kpam.mp_terms as mp_terms
import solver.kpam.SE3_utils as SE3_utils
# The specification of optimization problem
import solver.kpam.term_spec as term_spec
import yaml
from colored import fg
from omni.isaac.core.utils.prims import (
create_prim,
get_prim_at_path,
is_prim_path_valid,
)
from omni.isaac.core.utils.transformations import (
get_relative_transform,
pose_from_tf_matrix,
tf_matrices_from_poses,
)
from omni.isaac.core.utils.xforms import get_world_pose
from pydrake.all import *
from scipy.spatial.transform import Rotation as R
from solver.kpam.mp_builder import OptimizationBuilderkPAM
from solver.kpam.optimization_problem import OptimizationProblemkPAM, solve_kpam
from solver.kpam.optimization_spec import OptimizationProblemSpecification
# import gensim2.env.solver.planner_utils as plannerutils
from solver.planner_utils import *
MOTION_DICT = {
"move-up": ["translate_z", 0.06],
"move-down": ["translate_z", -0.06],
"move-left": ["translate_y", 0.08],
"move-right": ["translate_y", -0.08],
"move-forward": ["translate_x", 0.1],
"move-backward": ["translate_x", -0.1],
}
class KPAMPlanner:
"""
A general class of keypoint-based trajectory optimization methods to solve
robotic tasks. Includes task specific motion.
https://github.com/liruiw/Fleet-Tools/blob/master/core/expert/base_expert.py
Mostly works with simple and kinematic tasks.
"""
def __init__(self, env, robot, object, cfg_path, obj_rot=0, controller=None, draw_points=False, stage=None):
self.env = env
self.robot = robot
self.object = object
self.cfg_path = cfg_path
self.controller = controller
self.obj_rot = obj_rot
self.draw_points = draw_points
self.stage = stage
self.plan_time = 0
self.goal_joint = None
self.kpam_success = False
self.joint_plan_success = False
if "franka" in self.robot.name:
self.ee_name = "panda_hand"
self.robot_dof = 9
elif "split_aloha" in self.robot.name or "lift2" in self.robot.name:
self.ee_name = "link6"
self.robot_dof = 8
# self.ee_name = "panda_hand"
# self.robot_dof = 9
else:
raise NotImplementedError
# objects initial
self.axis_dict = {
"x": [1.0, 0.0, 0.0],
"-x": [-1.0, 0.0, 0.0],
"y": [0.0, 1.0, 0.0],
"-y": [0.0, -1.0, 0.0],
"z": [0.0, 0.0, 1.0],
"-z": [0.0, 0.0, -1.0],
}
if self.draw_points:
from omni.isaac.debug_draw import _debug_draw
self.draw = _debug_draw.acquire_debug_draw_interface()
if isinstance(cfg_path, dict):
self.cfg = cfg_path
else:
self.cfg = yaml.load(open(cfg_path, "r", encoding="utf-8"), Loader=yaml.SafeLoader)
if "actuation_time" in self.cfg: # time cost to reach task goal pose
self.actuation_time = self.cfg["actuation_time"] + self.env.current_time
self.pre_actuation_times = [t + self.env.current_time for t in self.cfg["pre_actuation_times"]]
self.post_actuation_times = [t + self.env.current_time for t in self.cfg["post_actuation_times"]]
# self.pre_actuation_poses_intool = self.cfg["pre_actuation_poses_intool"]
# self.post_actuation_poses_intool = self.cfg["post_actuation_poses_intool"]
self.pre_actuation_motions = self.cfg["pre_actuation_motions"]
self.post_actuation_motions = self.cfg["post_actuation_motions"]
elif "post_actuation_motions" in self.cfg:
self.actuation_time = 24
self.pre_actuation_motions = self.cfg["pre_actuation_motions"]
self.post_actuation_motions = self.cfg["post_actuation_motions"]
self.pre_actuation_times = [24 - 4 * i for i in range(len(self.pre_actuation_motions), 0, -1)]
self.post_actuation_times = [24 + 4 * (i + 1) for i in range(len(self.post_actuation_motions))]
elif "pre_actuation_motions" in self.cfg:
self.actuation_time = 24
self.pre_actuation_motions = self.cfg["pre_actuation_motions"]
self.post_actuation_motions = []
self.pre_actuation_times = [24 - 4 * i for i in range(len(self.pre_actuation_motions), 0, -1)]
self.post_actuation_times = []
# elif "post_actuation_motions" in self.cfg:
# set_trace()
# self.actuation_time = 24 + self.env.current_time
# self.pre_actuation_motions = self.cfg["pre_actuation_motions"]
# self.post_actuation_motions = self.cfg["post_actuation_motions"]
# self.pre_actuation_times = [
# 24 + self.env.current_time - 4 * i
# for i in range(len(self.pre_actuation_motions), 0, -1)
# ]
# self.post_actuation_times = [
# 24 + self.env.current_time + 4 * (i + 1)
# for i in range(len(self.post_actuation_motions))
# ]
elif "pre_actuation_motions" in self.cfg:
self.actuation_time = 24 + self.env.current_time
self.pre_actuation_motions = self.cfg["pre_actuation_motions"]
self.post_actuation_motions = []
self.pre_actuation_times = [
24 - 4 * i + self.env.current_time for i in range(len(self.pre_actuation_motions), 0, -1)
]
self.post_actuation_times = []
if "modify_actuation_motion" in self.cfg:
self.modify_actuation_motion = self.cfg["modify_actuation_motion"]
self.plant, self.fk_context = build_plant(
robot_name=self.robot.name, init_qpos=self.robot.get_joint_positions()
)
self.reset_expert()
def setup(self):
# load keypoints
self.solved_ik_times = []
self.joint_traj_waypoints = []
def reset_expert(self):
"""reinitialize expert state"""
self.joint_space_traj = None
self.task_space_traj = None
self.plan_succeeded = False
self.plan_time = 0.0
self.setup()
def check_plan_empty(self):
"""check if already have a plan"""
return self.joint_space_traj is None
def get_pose_from_translation(self, translation, pre_pose):
"""get the pose from translation"""
pose = np.eye(4)
translation = np.array(translation)
pose[:3, 3] = translation
# pre_pose: robot base frame to ee frame
# pose: ee frame to target pose frame
# actuation_pose: robot base frame to target pose frame
actuation_pose = pre_pose @ pose
return actuation_pose
def get_pose_from_translation_inworld(self, translation, pre_pose):
"""get the pose from translation"""
translation = np.array(translation)
actuation_pose = pre_pose.copy()
actuation_pose[:3, 3] += translation
return actuation_pose
def get_pose_from_rotation(self, rotation, pre_pose):
"""get the pose from rotation"""
vec = np.array(self.get_object_link0_rot_axis())
vec *= rotation
Rot = np.eye(4)
Rot[:3, :3] = R.from_euler("xyz", vec, degrees=False).as_matrix()
# Rot = rotAxis(angle=rotation, axis=axis)
# actuation_pose = (
# self.object_pose @ Rot @ se3_inverse(self.object_pose) @ pre_pose
# )
world2link_pose = self.get_world2link_pose() # object to link
inv_base_pose = se3_inverse(self.base_pose) # self.base_pose : world to robot base
base2link = inv_base_pose.dot(world2link_pose)
base2link = base2link.reshape(4, 4)
actuation_pose = base2link @ Rot @ se3_inverse(base2link) @ pre_pose
return actuation_pose
def generate_actuation_poses(self):
# get key pose constraints during trajectory
"""build the post-activation trajectory specified in the config
(1) reach above the screw
(2) hammer it by moving downward
"""
self.pre_actuation_poses = []
self.post_actuation_poses = []
curr_pose = self.task_goal_hand_pose
for motion in self.pre_actuation_motions:
mode = motion[0]
value = motion[1]
assert mode in ["translate_x", "translate_y", "translate_z", "rotate"]
assert type(value) == float
if mode == "rotate":
curr_pose = self.get_pose_from_rotation(value, curr_pose)
self.pre_actuation_poses.append(curr_pose)
else:
value_vec = [0, 0, 0]
if mode == "translate_x":
value_vec[0] = value
elif mode == "translate_y":
value_vec[1] = value
elif mode == "translate_z":
value_vec[2] = value
curr_pose = self.get_pose_from_translation(value_vec, curr_pose)
self.pre_actuation_poses.append(curr_pose)
self.pre_actuation_poses.reverse()
curr_pose = self.task_goal_hand_pose # pose in robot base frame
for motion in self.post_actuation_motions:
if type(motion) == list:
mode = motion[0]
value = motion[1]
if mode == "rotate":
curr_pose = self.get_pose_from_rotation(value, curr_pose)
self.post_actuation_poses.append(curr_pose)
else:
value_vec = [0, 0, 0]
if mode == "translate_x":
value_vec[0] = value * np.cos(self.obj_rot)
value_vec[1] = value * np.sin(self.obj_rot)
elif mode == "translate_y":
value_vec[0] = -value * np.sin(self.obj_rot)
value_vec[1] = value * np.cos(self.obj_rot)
elif mode == "translate_z":
value_vec[2] = value
curr_pose = self.get_pose_from_translation(value_vec, curr_pose)
self.post_actuation_poses.append(curr_pose)
elif type(motion) == str:
mode = MOTION_DICT[motion][0]
value = MOTION_DICT[motion][1]
value_vec = [0, 0, 0]
if mode == "translate_x":
value_vec[0] = value
elif mode == "translate_y":
value_vec[1] = value
elif mode == "translate_z":
value_vec[2] = value
curr_pose = self.get_pose_from_translation_inworld(value_vec, curr_pose)
self.post_actuation_poses.append(curr_pose)
self.sample_times = (
[self.env.current_time] + self.pre_actuation_times + [self.actuation_time] + self.post_actuation_times
)
self.traj_keyframes = (
[self.ee_pose.reshape(4, 4)]
+ self.pre_actuation_poses
+ [self.task_goal_hand_pose]
+ self.post_actuation_poses
)
# ========================= visualize keypoints =========================
# base2world=get_relative_transform(get_prim_at_path(self.robot.base_path), get_prim_at_path(self.robot.root_prim_path))
# for traj_keyframe in self.traj_keyframes:
# draw.draw_points(
# [(base2world @ np.append(traj_keyframe[:3,3],1))[:3]],
# [(0,0,0,1)], # black
# [7]
# )
# ========================= visualize keypoints =========================
# debug start: KPAMPlanner, refactor all self.task.xxx() calls into planner methods
def get_world2link_pose(self):
revolute_prim = self.stage.GetPrimAtPath(self.object.object_joint_path)
body0_path = str(revolute_prim.GetRelationship("physics:body0").GetTargets()[0]) # base
body0_prim = self.stage.GetPrimAtPath(body0_path)
local_pos = revolute_prim.GetAttribute("physics:localPos0").Get()
local_rot = revolute_prim.GetAttribute("physics:localRot0").Get() # wxyz
local_wxyz = np.array(
[local_rot.GetReal(), local_rot.GetImaginary()[0], local_rot.GetImaginary()[1], local_rot.GetImaginary()[2]]
)
base2joint_pose = np.eye(4)
base2joint_pose[:3, 3] = np.array([local_pos[0], local_pos[1], local_pos[2]])
base2joint_pose[:3, :3] = R.from_quat(local_wxyz, scalar_first=True).as_matrix()
base2joint_pose[:3, 3] = base2joint_pose[:3, 3] * self.object.object_scale[:3] # PL: we need to scale it first
world2joint_pose = self.get_transform_matrix_by_prim_path(body0_path) @ base2joint_pose
return world2joint_pose
def get_tool_keypoints(self):
if "left_arm" in self.controller.robot_file:
self.robot.hand_path = self.robot.fl_hand_path
self.robot.ee_path = self.robot.fl_ee_path
self.robot.gripper_keypoints = self.robot.fl_gripper_keypoints
self.robot.base_path = self.robot.fl_base_path
elif "right_arm" in self.controller.robot_file:
self.robot.hand_path = self.robot.fr_hand_path
self.robot.ee_path = self.robot.fr_ee_path
self.robot.gripper_keypoints = self.robot.fr_gripper_keypoints
self.robot.base_path = self.robot.fr_base_path
transform_matrix = self.get_transform_matrix_by_prim_path(self.robot.hand_path)
tool_head = transform_matrix @ self.robot.gripper_keypoints["tool_head"]
tool_tail = transform_matrix @ self.robot.gripper_keypoints["tool_tail"]
tool_side = transform_matrix @ self.robot.gripper_keypoints["tool_side"]
tool_keypoints = {}
tool_keypoints["tool_head"] = tool_head[:3]
tool_keypoints["tool_tail"] = tool_tail[:3]
tool_keypoints["tool_side"] = tool_side[:3]
return tool_keypoints
def get_transform_matrix_by_prim_path(self, path):
# return world to prim_path transform matrix
position, orientation = get_world_pose(path)
rotation_matrix = R.from_quat(orientation, scalar_first=True).as_matrix()
transform_matrix = np.eye(4)
transform_matrix[:3, :3] = rotation_matrix
transform_matrix[:3, 3] = position
return transform_matrix
def get_object_keypoints(self):
link2world_transform_matrix = self.get_transform_matrix_by_prim_path(self.object.object_link_path)
base2world_transform_matrix = self.get_transform_matrix_by_prim_path(self.object.object_base_path)
# convert rotation matrix to Euler angles
link2world_euler_angles = R.from_matrix(link2world_transform_matrix[:3, :3]).as_euler("xyz", degrees=False)
base2world_euler_angles = R.from_matrix(base2world_transform_matrix[:3, :3]).as_euler("xyz", degrees=False)
print(f"link2world_euler_angles: {link2world_euler_angles}")
print(f"base2world_euler_angles: {base2world_euler_angles}")
self.object_keypoints = self.object.object_keypoints
obejct_keypoints = {}
for key, value in self.object_keypoints.items():
if key == "base_contact_point":
tranformed_keypoints = base2world_transform_matrix @ (value * self.object.object_scale)
obejct_keypoints[key] = tranformed_keypoints[:3]
print(f"base_contact_point: {obejct_keypoints[key]}")
elif key == "base_object_contact_point":
tranformed_keypoints = base2world_transform_matrix @ (value * self.object.object_scale)
obejct_keypoints[key] = tranformed_keypoints[:3]
print(f"base_object_contact_point: {obejct_keypoints[key]}")
elif key == "link_contact_point":
tranformed_keypoints = link2world_transform_matrix @ (value * self.object.object_scale)
obejct_keypoints[key] = tranformed_keypoints[:3]
print(f"link_contact_point: {obejct_keypoints[key]}")
elif key == "articulated_object_head":
tranformed_keypoints = link2world_transform_matrix @ (value * self.object.object_scale)
obejct_keypoints[key] = tranformed_keypoints[:3]
print(f"articulated_object_head: {obejct_keypoints[key]}")
elif key == "articulated_object_tail":
tranformed_keypoints = link2world_transform_matrix @ (value * self.object.object_scale)
obejct_keypoints[key] = tranformed_keypoints[:3]
print(f"articulated_object_tail: {obejct_keypoints[key]}")
return obejct_keypoints
def get_robot_default_state(self):
transform_matrix = self.get_transform_matrix_by_prim_path(self.robot.base_path)
return transform_matrix
def get_robot_ee_pose(self):
return self.get_transform_matrix_by_prim_path(self.robot.ee_path)
def get_robot_tool_pose(self):
return self.get_transform_matrix_by_prim_path(self.robot.hand_path)
def get_object_link_pose(self):
# return world to object link transform matrix
transform_matrix = self.get_transform_matrix_by_prim_path(self.object.object_link_path)
return transform_matrix
def get_object_link0_contact_axis(self):
return self.axis_dict[self.object.object_link0_contact_axis]
def get_object_link0_move_axis(self):
return self.axis_dict[self.object.object_link0_move_axis]
def get_object_link0_vertical_axis(self):
return np.cross(self.get_object_link0_move_axis(), self.get_object_link0_contact_axis())
def get_object_link0_rot_axis(self):
return self.axis_dict[self.object.object_link0_rot_axis]
# debug end: KPAMPlanner, refactor all self.task.xxx() calls into planner methods
def get_env_info(self):
# get current end effector pose, joint angles, object poses, and keypoints from the environment
self.tool_keypoint_in_world = self.get_tool_keypoints() # robot keypoints in world frame
# {
# 'tool_head': np.array([-2.2506687e-01, -2.2929280e-08, 1.8791561e-01], dtype=np.float32),
# 'tool_tail': np.array([-2.1895210e-01, -1.7026323e-08, 2.7770764e-01], dtype=np.float32),
# 'tool_side': np.array([-0.22506687, -0.04500002, 0.18791561], dtype=np.float32)
# }
self.object_keypoint_in_world = self.get_object_keypoints() # object keypoints in world frame
# "articulated_object_head": np.array([-0.01,0.45,0.07], dtype=np.float32)
if self.draw_points:
# ========================= visualize keypoints =========================
list1 = []
list2 = []
for key, item in self.tool_keypoint_in_world.items():
list1.append(item)
for key, item in self.object_keypoint_in_world.items():
list2.append(item)
self.draw.draw_points(
list1,
[(1, 0, 0, 1), (0, 1, 0, 1), (0, 0, 1, 1)], # red head, green tail, blue side -> robot keypoints
[5] * len(list1),
)
self.draw.draw_points(
list2, [(1.0, 0.0, 1.0, 1)] * len(list2), [5] * len(list2) # purple -> object keypoints
)
# ======================================================================
self.dt = self.env._rendering_dt # simulator dt
self.time = self.env.current_time # current time
self.base_pose = self.get_robot_default_state() # robot base pose in world frame
# self.base_pose = np.array([[ 1.0000000e+00, 0.0000000e+00, 0.0000000e+00, -6.1500001e-01],
# [ 0.0000000e+00, 1.0000000e+00, 0.0000000e+00, -1.1641532e-10],
# [ 0.0000000e+00, 0.0000000e+00, 1.0000000e+00, 1.4901161e-08],
# [ 0.0000000e+00, 0.0000000e+00, 0.0000000e+00, 1.0000000e+00]],
# dtype=np.float32)
if "split_aloha" in self.robot.name:
self.joint_positions = np.array([0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.05, -0.05])
elif "lift2" in self.robot.name:
self.joint_positions = np.array([0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.04, 0.04])
elif "franka" in self.robot.name:
self.joint_positions = self.robot.get_joint_positions()
else:
raise NotImplementedError
# self.joint_positions = np.array([0, -np.pi / 4, 0, -3 * np.pi / 4, 0, np.pi / 2, 0.0, 0.0, 0.0])
self.curr_tool_keypoints = self.compute_tool_keypoints_inbase() # robot keypoints in robot base frame
self.curr_object_keypoints = self.compute_object_keypoints_inbase() # object keypoints in robot base frame
self.ee_pose = self.compute_hand_pose_inbase() # robot base to end effector transform matrix
self.tool_pose = self.compute_tool_pose_inbase() # robot base to gripper transform matrix
self.object_pose = self.compute_object_pose_inbase() # robot base to object link_0 transform matrix
self.tool_keypoints_in_hand = self.compute_tool_keypoints_inhand() # robot keypoints in ee frame
self.tool_rel_pose = self.compute_tool_inhand() # robot gripper pose in ee frame
def compute_hand_pose_inbase(self):
ee_pose = self.get_robot_ee_pose()
inv_base_pose = se3_inverse(self.base_pose)
hand_pose_inbase = inv_base_pose.dot(ee_pose)
hand_pose_inbase = hand_pose_inbase.reshape(4, 4)
return hand_pose_inbase
def compute_tool_pose_inbase(self):
tool_pose = self.get_robot_tool_pose()
inv_base_pose = se3_inverse(self.base_pose)
tool_pose_inbase = inv_base_pose.dot(tool_pose)
tool_pose_inbase = tool_pose_inbase.reshape(4, 4)
return tool_pose_inbase
def compute_object_pose_inbase(self):
object_pose = self.get_object_link_pose()
inv_base_pose = se3_inverse(self.base_pose)
object_pose_inbase = inv_base_pose.dot(object_pose)
object_pose_inbase = object_pose_inbase.reshape(4, 4)
return object_pose_inbase
def compute_tool_keypoints_inbase(self):
inv_base_pose = se3_inverse(self.base_pose)
tool_keypoints_inbase = {}
for name, loc in self.tool_keypoint_in_world.items():
tool_keypoints_inbase[name] = inv_base_pose.dot(np.array([loc[0], loc[1], loc[2], 1]))[:3]
return tool_keypoints_inbase
def compute_object_keypoints_inbase(self):
inv_base_pose = se3_inverse(self.base_pose)
object_keypoints_inbase = {}
for name, loc in self.object_keypoint_in_world.items():
object_keypoints_inbase[name] = inv_base_pose.dot(np.array([loc[0], loc[1], loc[2], 1]))[:3]
return object_keypoints_inbase
def compute_tool_keypoints_inhand(self):
inv_ee_pose = se3_inverse(self.ee_pose)
tool_keypoints_inhand = {}
for name, loc in self.curr_tool_keypoints.items():
tool_keypoints_inhand[name] = inv_ee_pose.dot(np.array([loc[0], loc[1], loc[2], 1]))[:3]
return tool_keypoints_inhand
def compute_tool_inhand(self):
inv_ee_pose = se3_inverse(self.ee_pose)
tool_rel_pose = inv_ee_pose.dot(self.tool_pose)
return tool_rel_pose
def add_random_to_keypose(self, transform_matrix, orientation_range, position_range):
"""
Add random perturbations to a given transform matrix in the local coordinate frame.
Args:
transform_matrix (np.ndarray): 4x4 transform matrix expressed in the robot base frame.
orientation_range (dict): Rotation noise range in degrees, e.g.
{"x_min": min_angle, "x_max": max_angle, "y_min": ..., "y_max": ..., "z_min": ..., "z_max": ...}.
position_range (dict): Translation noise range, e.g.
{"x_min": min_offset, "x_max": max_offset, "y_min": ..., "y_max": ..., "z_min": ..., "z_max": ...}.
Returns:
np.ndarray: 4x4 transform matrix with added randomness.
"""
# extract rotation and translation
rotation_matrix = transform_matrix[:3, :3]
translation_vector = transform_matrix[:3, 3]
# add randomness to rotation (local frame)
# sample random Euler angles (X, Y, Z axes)
random_angles = [
np.random.uniform(orientation_range["x_min"], orientation_range["x_max"]),
np.random.uniform(orientation_range["y_min"], orientation_range["y_max"]),
np.random.uniform(orientation_range["z_min"], orientation_range["z_max"]),
]
# convert random Euler angles to rotation matrix (local frame)
random_rotation_local = R.from_euler("xyz", random_angles, degrees=True).as_matrix()
# apply random rotation to original rotation matrix (local frame; right-multiply)
new_rotation_matrix = rotation_matrix @ random_rotation_local # right-multiply in local frame
# add randomness to translation (local frame)
# sample random translation offsets (X, Y, Z axes)
random_offset = [
np.random.uniform(position_range["x_min"], position_range["x_max"]),
np.random.uniform(position_range["y_min"], position_range["y_max"]),
np.random.uniform(position_range["z_min"], position_range["z_max"]),
]
# transform random offset into global frame
random_offset_global = rotation_matrix @ np.array(random_offset)
# add random offset to original translation
new_translation_vector = translation_vector + random_offset_global
# build new transform matrix
new_transform_matrix = np.eye(4)
new_transform_matrix[:3, :3] = new_rotation_matrix
new_transform_matrix[:3, 3] = new_translation_vector
return new_transform_matrix
def parse_constraints(self):
"""
Parse constraints from YAML config file into list of constraint dictionaries
Returns:
list: List of constraint dictionaries
"""
constraint_dicts = []
# read constraint_list from configuration
for constraint in self.cfg["constraint_list"]:
# directly copy all fields without predefining a schema
constraint_dict = {k: v for k, v in constraint.items()}
if "target_axis" in constraint_dict and constraint_dict["target_axis"] == "link0_contact_axis":
vec = self.get_object_link0_contact_axis()
constraint_dict["target_axis"] = vec
elif "target_axis" in constraint_dict and constraint_dict["target_axis"] == "object_link0_move_axis":
vec = self.get_object_link0_move_axis()
constraint_dict["target_axis"] = vec
elif "target_axis" in constraint_dict and constraint_dict["target_axis"] == "object_link0_contact_axis":
vec = self.get_object_link0_contact_axis()
constraint_dict["target_axis"] = vec
elif "target_axis" in constraint_dict and constraint_dict["target_axis"] == "object_link0_vertical_axis":
vec = self.get_object_link0_vertical_axis()
constraint_dict["target_axis"] = vec
constraint_dicts.append(constraint_dict)
return constraint_dicts
# tool use related
def solve_actuation_joint(self, generate_traj=True):
# get target pose
"""solve the formulated kpam problem and get goal joint"""
# optimization_spec = OptimizationProblemSpecification()
# optimization_spec = self.create_opt_problem(optimization_spec)
# constraint_dicts = [c.to_dict() for c in optimization_spec._constraint_list]
# constraint_dicts example:
# [{'keypoint_name': 'tool_tail',
# 'target_keypoint_name': 'articulated_object_head',
# 'tolerance': 0.0001},
# {'axis_from_keypoint_name': 'tool_head',
# 'axis_to_keypoint_name': 'tool_side',
# 'target_axis': [0.0, -1.0, 2.9802322387695312e-08],
# 'target_axis_frame': 'object',
# 'target_inner_product': 1,
# 'tolerance': 0.01},
# {'axis_from_keypoint_name': 'tool_head',
# 'axis_to_keypoint_name': 'tool_tail',
# 'target_axis': [0.0, -1.0, 2.9802322387695312e-08],
# 'target_axis_frame': 'object',
# 'target_inner_product': 0,
# 'tolerance': 0.01}]
constraint_dicts = self.parse_constraints()
# need to parse the kpam config file and create a kpam problem
indexes = np.random.randint(len(anchor_seeds), size=(8,))
# array([12, 10, 9, 11, 4, 6, 6, 6])
random_seeds = [self.joint_positions.copy()[: self.robot_dof]] + [
anchor_seeds[idx][: self.robot_dof] for idx in indexes
]
solutions = []
for seed in random_seeds:
res = solve_ik_kpam(
get_relative_transform(
get_prim_at_path(self.object.object_link_path), get_prim_at_path(self.robot.base_path)
),
constraint_dicts,
self.plant.GetFrameByName(self.ee_name),
self.tool_keypoints_in_hand,
self.curr_object_keypoints,
RigidTransform(self.ee_pose.reshape(4, 4)),
seed.reshape(-1, 1),
self.joint_positions.copy()[:9],
rot_tol=0.01,
timeout=True,
consider_collision=False,
contact_plane_normal=self.object.contact_plane_normal,
)
if res is not None:
solutions.append(res.get_x_val()[:9])
# solutions example:
# [array([ 1.14329376e-06, 3.53816124e-01, 4.80939611e-06, -2.05026707e+00,
# -1.92385797e-05, 2.65837018e+00, 7.85377454e-01, 4.00000000e-02,
# 4.00000000e-02]),
# array([-1.774606 , -1.61710153, 1.6115427 , -2.16312007, 2.27760554,
# 2.11851842, -0.19566194, 0.04 , 0.04 ]),
# array([-8.44218317e-08, 3.53816313e-01, -1.85909422e-07, -2.05026625e+00,
# 7.54750962e-07, 2.65836829e+00, 7.85399185e-01, 4.00000000e-02,
# 4.00000000e-02]),
# array([-1.774606 , -1.61710153, 1.6115427 , -2.16312007, 2.27760554,
# 2.11851842, -0.19566194, 0.04 , 0.04 ]),
# array([-1.774606 , -1.61710153, 1.6115427 , -2.16312007, 2.27760554,
# 2.11851842, -0.19566194, 0.04 , 0.04 ])]
if len(solutions) == 0:
# raise ValueError("empty solution in kpam, target pose is unavailable")
self.goal_joint = self.joint_positions[:9].copy()
self.kpam_success = False
else:
self.kpam_success = True
solutions = np.array(solutions)
joint_positions = self.joint_positions[:9]
dist_to_init_joints = np.linalg.norm(solutions - joint_positions.copy(), axis=-1)
res = solutions[np.argmin(dist_to_init_joints)]
self.goal_joint = res
# select the best solution from solutions as res
# res example:
# array([-8.44218317e-08, 3.53816313e-01, -1.85909422e-07, -2.05026625e+00,
# 7.54750962e-07, 2.65836829e+00, 7.85399185e-01, 4.00000000e-02,
# 4.00000000e-02])
self.plant.SetPositions(self.fk_context, res)
# self.task_goal_hand_pose = self.differential_ik.ForwardKinematics(diff_ik_context)
self.task_goal_hand_pose = self.plant.EvalBodyPoseInWorld(
self.fk_context, self.plant.GetBodyByName(self.ee_name)
)
# self.task_goal_hand_pose example:
# RigidTransform(
# R=RotationMatrix([
# [0.9678432216687978, -1.8063898958705824e-06, 0.25155416567113187],
# [-1.9244272431764666e-06, -0.9999999999981233, 2.2322813388666276e-07],
# [0.2515541656702566, -7.001475258030282e-07, -0.9678432216704577],
# ]),
# p=[0.6182456460224572, -1.8402926150504245e-07, 0.29068015466097613],
# )
self.task_goal_hand_pose = np.array(self.task_goal_hand_pose.GetAsMatrix4())
### add random to keypose ###
orientation_range = self.cfg["keypose_random_range"]["orientation"]
position_range = self.cfg["keypose_random_range"]["position"]
self.task_goal_hand_pose = self.add_random_to_keypose(
self.task_goal_hand_pose, orientation_range, position_range
)
### add random to keypose ###
# self.task_goal_hand_pose example;
# array([[ 9.67843222e-01, -1.80638990e-06, 2.51554166e-01,
# 6.18245646e-01],
# [-1.92442724e-06, -1.00000000e+00, 2.23228134e-07,
# -1.84029262e-07],
# [ 2.51554166e-01, -7.00147526e-07, -9.67843222e-01,
# 2.90680155e-01],
# [ 0.00000000e+00, 0.00000000e+00, 0.00000000e+00,
# 1.00000000e+00]])
self.task_goal_tool_pose = self.task_goal_hand_pose @ self.tool_rel_pose
# # Transform the keypoint
# self.curr_solution_tool_keypoint_head = SE3_utils.transform_point(
# self.task_goal_hand_pose, tool_keypoint_loc_inhand[0, :]
# )
# self.curr_solution_tool_keypoint_tail = SE3_utils.transform_point(
# self.task_goal_hand_pose, tool_keypoint_loc_inhand[1, :]
# )
# self.curr_solution_tool_keypoint_side = SE3_utils.transform_point(
# self.task_goal_hand_pose, tool_keypoint_loc_inhand[2, :]
# )
self.plan_time = self.env.current_time
# self.goal_keypoint = np.stack(
# (
# self.curr_solution_tool_keypoint_head,
# self.curr_solution_tool_keypoint_tail,
# self.curr_solution_tool_keypoint_side,
# ),
# axis=0,
# )
def get_task_traj_from_joint_traj(self):
"""forward kinematics the joint trajectory to get the task trajectory"""
self.pose_traj = []
ik_times = dense_sample_traj_times(self.sample_times, self.actuation_time)
# print(ik_times)
self.dense_ik_times = ik_times
for traj_time in ik_times:
# diff_ik_context = self.differential_ik.GetMyMutableContextFromRoot(self.context)
set_joints = self.joint_space_traj.value(traj_time)
# print(set_joints)
self.plant.SetPositions(self.fk_context, set_joints)
pose = self.plant.EvalBodyPoseInWorld(self.fk_context, self.plant.GetBodyByName(self.ee_name))
self.pose_traj.append(pose.GetAsMatrix4())
self.task_space_traj = PiecewisePose.MakeLinear(ik_times, [RigidTransform(p) for p in self.pose_traj])
def modify_actuation_joint(self):
curr_pose = self.task_goal_hand_pose
motion = self.modify_actuation_motion
mode = motion[0]
value = motion[1]
assert mode in ["translate_x", "translate_y", "translate_z", "rotate"]
assert type(value) == float
if mode == "rotate":
curr_pose = self.get_pose_from_rotation(value, curr_pose)
self.pre_actuation_poses.append(curr_pose)
else:
value_vec = [0, 0, 0]
if mode == "translate_x":
value_vec[0] = value
elif mode == "translate_y":
value_vec[1] = value
elif mode == "translate_z":
value_vec[2] = value
curr_pose = self.get_pose_from_translation(value_vec, curr_pose)
self.task_goal_hand_pose = curr_pose # update
return
def solve_postactuation_traj(self):
"""
generate the full task trajectory with a FirstOrderHold
"""
self.generate_actuation_poses()
def save_traj(self, traj):
"""save the trajectory to a txt file"""
traj = np.array(traj)
np.savetxt("traj.txt", traj, delimiter=",")
def solve_joint_traj(self, densify=True):
# get traj from key pose constraints
"""
solve for the IKs for each individual waypoint as an initial guess, and then
solve for the whole trajectory with smoothness cost
"""
keyposes = self.traj_keyframes # 5
keytimes = self.sample_times # 5
self.joint_traj_waypoints = [self.joint_positions.copy()]
# set_trace()
# [self.joint_positions.copy(), np.array(self.goal_joint,dtype=np.float32)] is reasonable,
# but the solved self.joint_space_traj can be poor and self.goal_joint may collide with the object.
# self.joint_space_traj = PiecewisePolynomial.FirstOrderHold(
# [self.env.current_time, self.actuation_time, self.post_actuation_times[-1]],
# np.array([self.joint_positions.copy(), np.array(self.goal_joint,dtype=np.float32), self.joint_positions.copy()]).T,
# )
# directly interpolate between current pose and target pose as an initial guess
self.joint_space_traj = PiecewisePolynomial.FirstOrderHold(
[self.env.current_time, self.actuation_time],
np.array([self.joint_positions.copy(), np.array(self.goal_joint, dtype=np.float32)]).T,
)
print("self.env.current_time: ", self.env.current_time, " self.actuation_time: ", self.actuation_time)
print("self.joint_space_traj: ", self.joint_space_traj)
if densify:
self.dense_traj_times = dense_sample_traj_times(self.sample_times, self.actuation_time)
else:
self.dense_traj_times = self.sample_times
print("solve traj endpoint")
# interpolated joint
res = solve_ik_traj_with_standoff(
[self.ee_pose.reshape(4, 4), self.task_goal_hand_pose],
np.array([self.joint_positions.copy(), self.goal_joint]).T,
q_traj=self.joint_space_traj,
waypoint_times=self.dense_traj_times,
keyposes=keyposes,
keytimes=keytimes,
)
# solve the standoff and the remaining pose use the goal as seed.
# stitch the trajectory
if res is not None:
print("endpoint trajectory solved!")
# use the joint trajectory to build task trajectory for panda
self.joint_plan_success = True
self.joint_traj_waypoints = res.get_x_val().reshape(-1, 9)
self.joint_traj_waypoints = list(self.joint_traj_waypoints)
self.joint_space_traj = PiecewisePolynomial.CubicShapePreserving(
self.dense_traj_times, np.array(self.joint_traj_waypoints).T
)
if densify:
self.get_task_traj_from_joint_traj()
else:
raise ValueError("endpoint trajectory not solved!")
self.joint_plan_success = False
self.env.need_termination = True
if densify:
self.get_task_traj_from_joint_traj()
def get_joint_action(self):
"""get the joint space action"""
if self.check_plan_empty():
print("no joint trajectory")
return self.env.reset()
# lookahead
return self.joint_space_traj.value(self.env.current_time + self.env.env_dt).reshape(-1)
def get_pose_action(self, traj_eff_pose):
# transform matrix to action(3+3+1)
"""get the task space action"""
# traj_eff_pose_inworld = self.base_pose @ traj_eff_pose
action = pack_pose(traj_eff_pose, rot_type="euler")
# action = np.concatenate([action, [self.env.gripper_state]])
action = np.concatenate([action, [1]])
return action
def get_actuation_joint(self):
if self.goal_joint is not None:
return self.goal_joint
if self.check_plan_empty():
self.solve_actuation_joint()
return self.goal_joint
raise ValueError("no actuation joint")
def get_keypose(self):
self.get_env_info()
if self.check_plan_empty():
s = time.time()
self.solve_actuation_joint()
if self.kpam_success:
if "modify_actuation_motion" in self.cfg:
self.modify_actuation_joint() # PL: for the rotate knob task, we add this feature to enable goal hand pose modifaction
self.solve_postactuation_traj()
else:
self.traj_keyframes = []
self.sample_times = []
return self.traj_keyframes, self.sample_times
def get_action(self, mode="pose"):
self.get_env_info()
if self.check_plan_empty():
s = time.time()
self.solve_actuation_joint()
self.solve_postactuation_traj()
self.solve_joint_traj()
print("env time: {:.3f} plan generation time: {:.3f}".format(self.env.current_time, time.time() - s))
"""get the task-space action from the kpam expert joint trajectory"""
traj_eff_pose = self.task_space_traj.value(self.time + self.dt) # get transform matrix by current time
pose_action = self.get_pose_action(traj_eff_pose) # transform matrix to action(3+3+1)
return pose_action
def get_actuation_qpos(self):
"""get the actuation qpos"""
self.get_env_info()
self.solve_actuation_joint()
return self.goal_joint, self.kpam_success
def get_sparse_traj_qpos(self):
"""get the sparse trajectory qpos"""
self.get_env_info()
self.solve_actuation_joint()
self.solve_postactuation_traj()
self.solve_joint_traj(densify=False)
return self.joint_traj_waypoints, self.joint_plan_success
if __name__ == "__main__":
from isaacsim import SimulationApp
simulation_app = SimulationApp({"headless": False})
import numpy as np
# from omni.isaac.franka.controllers.pick_place_controller import PickPlaceController
from gensim_testing_v2.tasks.close_microwave import CloseMicrowave
from omni.isaac.core import World
from omni.isaac.core.utils.types import ArticulationAction
# from solver.planner import KPAMPlanner
my_world = World(stage_units_in_meters=1.0)
my_task = CloseMicrowave()
my_world.add_task(my_task)
my_world.reset()
task_params = my_task.get_params()
my_franka = my_world.scene.get_object(task_params["robot_name"]["value"])
expert_planner = KPAMPlanner(
my_world,
cfg_path="gensim_testing_v2/solver/kpam/config/CloseMicrowave.yaml",
)
expert_planner.get_keypose()
# print("action:",action)
# 3d pose + 3d euler + 1d gipper openness
# action: [ 0.38964379 0.00477079 0.45819783 3.09401237 -0.79489916 0.07860673 1. ]