- Add pick_test_tube task: USDC asset repackaging, grasp generation, task config - Add tools: usdc_to_obj.py, repackage_test_tube.py, fix_test_tube_materials.py - Add custom_task_guide.md: full Chinese documentation for creating custom tasks - Add crawled InternDataEngine online docs (23 pages) - Add grasp generation script (gen_tube_grasp.py) and pipeline config
32 KiB
Source: https://internrobotics.github.io/InternDataEngine-Docs/concepts/controllers.html
Controllers
Controllers are the core component responsible for robot arm motion planning in InternDataEngine. They provide **GPU-accelerated collision-free trajectory generation **using CuRobo, while simultaneously handling gripper actions.
Overview
Each controller manages motion planning for a single robot arm. For dual-arm robots (like Lift2, Genie1, Split Aloha), two controller instances are created - one for each arm.
The controller's primary responsibilities include:
- **Collision-Free Motion Planning **: Generate safe trajectories that avoid obstacles in the scene using CuRobo's GPU-accelerated motion generation
- **Inverse Kinematics **: Solve IK queries to find joint configurations for desired end-effector poses
- **Gripper Control **: Handle gripper open/close commands integrated with arm motion
- **Collision World Management **: Update and maintain the collision world from the simulation scene
CuRobo Configuration Files
Each robot arm requires a CuRobo configuration file that defines kinematics, collision geometry, and motion generation parameters:
- **YAML Configs **:
workflows/simbox/curobo/src/curobo/content/configs/robot/ - **URDF Files **:
workflows/simbox/curobo/src/curobo/content/assets/robot/
Available robot configs include:
r5a_left_arm.yml/r5a_right_arm.yml- ARX Lift-2 (R5a arms)piper100_left_arm.yml/piper100_right_arm.yml- AgiLEx Split AlohaG1_120s_left_arm_parallel_gripper.yml/G1_120s_right_arm_parallel_gripper.yml- Genie-1fr3_left_arm.yml- Franka FR3frankarobotiq_left_arm.yml- Franka with Robotiq 2F-85 gripper
Controller Architecture
All controllers inherit from TemplateController and customize behavior via method overrides.
TemplateController (base class)
├── FR3Controller
├── FrankaRobotiq85Controller
├── Genie1Controller
├── Lift2Controller
└── SplitAlohaController
1 2 3 4 5 6
Controller Wrappers
Controller wrappers (located in workflows/simbox/core/controllers/ ) provide a unified interface for motion planning. The base class TemplateController implements all core functionality, with subclasses overriding specific methods for robot-specific configurations.
Template Code Example: python
"""
Template Controller base class for robot motion planning.
Common functionality extracted from FR3, FrankaRobotiq85, Genie1, Lift2, SplitAloha.
Subclasses implement _get_default_ignore_substring() and _configure_joint_indices().
"""
import random
import time
from copy import deepcopy
from typing import List, Optional
import numpy as np
import torch
from core.utils.constants import CUROBO_BATCH_SIZE
from core.utils.plan_utils import (
filter_paths_by_position_error,
filter_paths_by_rotation_error,
sort_by_difference_js,
)
from curobo.cuda_robot_model.cuda_robot_model import CudaRobotModel
from curobo.geom.sdf.world import CollisionCheckerType
from curobo.geom.sphere_fit import SphereFitType
from curobo.geom.types import WorldConfig
from curobo.types.base import TensorDeviceType
from curobo.types.math import Pose
from curobo.types.robot import JointState, RobotConfig
from curobo.util.usd_helper import UsdHelper
from curobo.util_file import get_world_configs_path, join_path, load_yaml
from curobo.wrap.reacher.ik_solver import IKSolver, IKSolverConfig
from curobo.wrap.reacher.motion_gen import (
MotionGen,
MotionGenConfig,
MotionGenPlanConfig,
PoseCostMetric,
)
from omni.isaac.core import World
from omni.isaac.core.controllers import BaseController
from omni.isaac.core.tasks import BaseTask
from omni.isaac.core.utils.prims import get_prim_at_path
from omni.isaac.core.utils.transformations import (
get_relative_transform,
pose_from_tf_matrix,
)
from omni.isaac.core.utils.types import ArticulationAction
class TemplateController(BaseController):
"""Base controller for CuRobo-based motion planning. Supports single and batch planning."""
def __init__(
self,
name: str,
robot_file: str,
task: BaseTask,
world: World,
constrain_grasp_approach: bool = False,
collision_activation_distance: float = 0.03,
ignore_substring: Optional[List[str]] = None,
use_batch: bool = False,
**kwargs,
) -> None:
super().__init__(name=name)
self.name = name
self.world = world
self.task = task
self.robot = self.task.robots[name]
self.ignore_substring = self._get_default_ignore_substring()
if ignore_substring is not None:
self.ignore_substring = ignore_substring
self.ignore_substring.append(name)
self.use_batch = use_batch
self.constrain_grasp_approach = constrain_grasp_approach
self.collision_activation_distance = collision_activation_distance
self.usd_help = UsdHelper()
self.tensor_args = TensorDeviceType()
self.init_curobo = False
self.robot_file = robot_file
self.num_plan_failed = 0
self.raw_js_names = []
self.cmd_js_names = []
self.arm_indices = np.array([])
self.gripper_indices = np.array([])
self.reference_prim_path = None
self.lr_name = None
self._ee_trans = 0.0
self._ee_ori = 0.0
self._gripper_state = 1.0
self._gripper_joint_position = np.array([1.0])
self.idx_list = None
self._configure_joint_indices(robot_file)
self._load_robot(robot_file)
self._load_kin_model()
self._load_world()
self._init_motion_gen()
self.usd_help.load_stage(self.world.stage)
self.cmd_plan = None
self.cmd_idx = 0
self._step_idx = 0
self.num_last_cmd = 0
self.ds_ratio = 1
def _get_default_ignore_substring(self) -> List[str]:
return ["material", "Plane", "conveyor", "scene", "table"]
def _configure_joint_indices(self, robot_file: str) -> None:
raise NotImplementedError
def _load_robot(self, robot_file: str) -> None:
self.robot_cfg = load_yaml(robot_file)["robot_cfg"]
def _load_kin_model(self) -> None:
urdf_file = self.robot_cfg["kinematics"]["urdf_path"]
base_link = self.robot_cfg["kinematics"]["base_link"]
ee_link = self.robot_cfg["kinematics"]["ee_link"]
robot_cfg = RobotConfig.from_basic(urdf_file, base_link, ee_link, self.tensor_args)
self.kin_model = CudaRobotModel(robot_cfg.kinematics)
def _load_world(self, use_default: bool = True) -> None:
if use_default:
self.world_cfg = WorldConfig()
else:
world_cfg_table = WorldConfig.from_dict(
load_yaml(join_path(get_world_configs_path(), "collision_table.yml"))
)
self._world_cfg_table = world_cfg_table
self._world_cfg_table.cuboid[0].pose[2] -= 10.5
world_cfg1 = WorldConfig.from_dict(
load_yaml(join_path(get_world_configs_path(), "collision_table.yml"))
).get_mesh_world()
world_cfg1.mesh[0].name += "_mesh"
world_cfg1.mesh[0].pose[2] = -10.5
self.world_cfg = WorldConfig(cuboid=world_cfg_table.cuboid, mesh=world_cfg1.mesh)
def _get_motion_gen_collision_cache(self):
return {"obb": 700, "mesh": 700}
def _get_grasp_approach_linear_axis(self) -> int:
return 2
def _get_sort_path_weights(self) -> Optional[List[float]]:
return None
def _init_motion_gen(self) -> None:
pose_metric = None
if self.constrain_grasp_approach:
pose_metric = PoseCostMetric.create_grasp_approach_metric(
offset_position=0.1,
linear_axis=self._get_grasp_approach_linear_axis(),
)
if self.use_batch:
self.plan_config = MotionGenPlanConfig(
enable_graph=True,
enable_opt=True,
need_graph_success=True,
enable_graph_attempt=4,
max_attempts=4,
enable_finetune_trajopt=True,
parallel_finetune=True,
time_dilation_factor=1.0,
)
else:
self.plan_config = MotionGenPlanConfig(
enable_graph=False,
enable_graph_attempt=7,
max_attempts=10,
pose_cost_metric=pose_metric,
enable_finetune_trajopt=True,
time_dilation_factor=1.0,
)
motion_gen_config = MotionGenConfig.load_from_robot_config(
self.robot_cfg,
self.world_cfg,
self.tensor_args,
interpolation_dt=0.01,
collision_activation_distance=self.collision_activation_distance,
trajopt_tsteps=32,
collision_checker_type=CollisionCheckerType.MESH,
use_cuda_graph=True,
self_collision_check=True,
collision_cache=self._get_motion_gen_collision_cache(),
num_trajopt_seeds=12,
num_graph_seeds=12,
optimize_dt=True,
trajopt_dt=None,
trim_steps=None,
project_pose_to_goal_frame=False,
)
ik_config = IKSolverConfig.load_from_robot_config(
self.robot_cfg,
self.world_cfg,
rotation_threshold=0.05,
position_threshold=0.005,
num_seeds=20,
self_collision_check=True,
self_collision_opt=True,
tensor_args=self.tensor_args,
use_cuda_graph=True,
collision_checker_type=CollisionCheckerType.MESH,
collision_cache={"obb": 700, "mesh": 700},
)
self.ik_solver = IKSolver(ik_config)
self.motion_gen = MotionGen(motion_gen_config)
print("warming up..")
if self.use_batch:
self.motion_gen.warmup(parallel_finetune=True, batch=CUROBO_BATCH_SIZE)
else:
self.motion_gen.warmup(enable_graph=True, warmup_js_trajopt=False)
self.world_model = self.motion_gen.world_collision
self.motion_gen.clear_world_cache()
self.motion_gen.reset(reset_seed=False)
self.motion_gen.update_world(self.world_cfg)
def update_pose_cost_metric(self, hold_vec_weight: Optional[List[float]] = None) -> None:
if hold_vec_weight:
pose_cost_metric = PoseCostMetric(
hold_partial_pose=True,
hold_vec_weight=self.motion_gen.tensor_args.to_device(hold_vec_weight),
)
else:
pose_cost_metric = None
self.plan_config.pose_cost_metric = pose_cost_metric
def update(self) -> None:
obstacles = self.usd_help.get_obstacles_from_stage(
ignore_substring=self.ignore_substring, reference_prim_path=self.reference_prim_path
).get_collision_check_world()
if self.motion_gen is not None:
self.motion_gen.update_world(obstacles)
self.world_cfg = obstacles
def reset(self, ignore_substring: Optional[str] = None) -> None:
if ignore_substring:
self.ignore_substring = ignore_substring
self.update()
self.init_curobo = True
self.cmd_plan = None
self.cmd_idx = 0
self.num_plan_failed = 0
if self.lr_name == "left":
self._gripper_state = 1.0 if self.robot.left_gripper_state == 1.0 else -1.0
elif self.lr_name == "right":
self._gripper_state = 1.0 if self.robot.right_gripper_state == 1.0 else -1.0
if self.lr_name == "left":
self.robot_ee_path = self.robot.fl_ee_path
self.robot_base_path = self.robot.fl_base_path
else:
self.robot_ee_path = self.robot.fr_ee_path
self.robot_base_path = self.robot.fr_base_path
self.T_base_ee_init = get_relative_transform(
get_prim_at_path(self.robot_ee_path), get_prim_at_path(self.robot_base_path)
)
self.T_world_base_init = get_relative_transform(
get_prim_at_path(self.robot_base_path), get_prim_at_path(self.task.root_prim_path)
)
self.T_world_ee_init = self.T_world_base_init @ self.T_base_ee_init
self._ee_trans, self._ee_ori = self.get_ee_pose()
self._ee_trans = self.tensor_args.to_device(self._ee_trans)
self._ee_ori = self.tensor_args.to_device(self._ee_ori)
self.update_pose_cost_metric()
def update_specific(self, ignore_substring, reference_prim_path):
obstacles = self.usd_help.get_obstacles_from_stage(
ignore_substring=ignore_substring, reference_prim_path=reference_prim_path
).get_collision_check_world()
if self.motion_gen is not None:
self.motion_gen.update_world(obstacles)
self.world_cfg = obstacles
def plan(self, ee_translation_goal, ee_orientation_goal, sim_js: JointState, js_names: list):
if self.use_batch:
ik_goal = Pose(
position=self.tensor_args.to_device(ee_translation_goal.unsqueeze(0).expand(CUROBO_BATCH_SIZE, -1)),
quaternion=self.tensor_args.to_device(ee_orientation_goal.unsqueeze(0).expand(CUROBO_BATCH_SIZE, -1)),
batch=CUROBO_BATCH_SIZE,
)
cu_js = JointState(
position=self.tensor_args.to_device(np.tile((sim_js.positions)[np.newaxis, :], (CUROBO_BATCH_SIZE, 1))),
velocity=self.tensor_args.to_device(np.tile((sim_js.positions)[np.newaxis, :], (CUROBO_BATCH_SIZE, 1)))
* 0.0,
acceleration=self.tensor_args.to_device(
np.tile((sim_js.positions)[np.newaxis, :], (CUROBO_BATCH_SIZE, 1))
)
* 0.0,
jerk=self.tensor_args.to_device(np.tile((sim_js.positions)[np.newaxis, :], (CUROBO_BATCH_SIZE, 1)))
* 0.0,
joint_names=js_names,
)
cu_js = cu_js.get_ordered_joint_state(self.cmd_js_names)
return self.motion_gen.plan_batch(cu_js, ik_goal, self.plan_config.clone())
ik_goal = Pose(
position=self.tensor_args.to_device(ee_translation_goal),
quaternion=self.tensor_args.to_device(ee_orientation_goal),
)
cu_js = JointState(
position=self.tensor_args.to_device(sim_js.positions),
velocity=self.tensor_args.to_device(sim_js.velocities) * 0.0,
acceleration=self.tensor_args.to_device(sim_js.velocities) * 0.0,
jerk=self.tensor_args.to_device(sim_js.velocities) * 0.0,
joint_names=js_names,
)
cu_js = cu_js.get_ordered_joint_state(self.cmd_js_names)
return self.motion_gen.plan_single(cu_js.unsqueeze(0), ik_goal, self.plan_config.clone())
def forward(self, manip_cmd, eps=5e-3):
ee_trans, ee_ori = manip_cmd[0:2]
gripper_fn = manip_cmd[2]
params = manip_cmd[3]
assert hasattr(self, gripper_fn)
method = getattr(self, gripper_fn)
if gripper_fn in ["in_plane_rotation", "mobile_move", "dummy_forward"]:
return method(**params)
elif gripper_fn in ["update_pose_cost_metric", "update_specific"]:
method(**params)
return self.ee_forward(ee_trans, ee_ori, eps=eps, skip_plan=True)
else:
method(**params)
return self.ee_forward(ee_trans, ee_ori, eps)
def ee_forward(
self,
ee_trans: torch.Tensor | np.ndarray,
ee_ori: torch.Tensor | np.ndarray,
eps=1e-4,
skip_plan=False,
):
ee_trans = self.tensor_args.to_device(ee_trans)
ee_ori = self.tensor_args.to_device(ee_ori)
sim_js = self.robot.get_joints_state()
js_names = self.robot.dof_names
plan_flag = torch.logical_or(
torch.norm(self._ee_trans - ee_trans) > eps,
torch.norm(self._ee_ori - ee_ori) > eps,
)
if not skip_plan:
if plan_flag:
self.cmd_idx = 0
self._step_idx = 0
self.num_last_cmd = 0
result = self.plan(ee_trans, ee_ori, sim_js, js_names)
if self.use_batch:
if result.success.any():
self._ee_trans = ee_trans
self._ee_ori = ee_ori
paths = result.get_successful_paths()
position_filter_res = filter_paths_by_position_error(
paths, result.position_error[result.success]
)
rotation_filter_res = filter_paths_by_rotation_error(
paths, result.rotation_error[result.success]
)
filtered_paths = [
p for i, p in enumerate(paths) if position_filter_res[i] and rotation_filter_res[i]
]
if len(filtered_paths) == 0:
filtered_paths = paths
sort_weights = self._get_sort_path_weights()
weights_arg = self.tensor_args.to_device(sort_weights) if sort_weights is not None else None
sorted_indices = sort_by_difference_js(filtered_paths, weights=weights_arg)
cmd_plan = self.motion_gen.get_full_js(paths[sorted_indices[0]])
self.idx_list = list(range(len(self.raw_js_names)))
self.cmd_plan = cmd_plan.get_ordered_joint_state(self.raw_js_names)
self.num_plan_failed = 0
else:
print("Plan did not converge to a solution.")
self.num_plan_failed += 1
else:
succ = result.success.item()
if succ:
self._ee_trans = ee_trans
self._ee_ori = ee_ori
cmd_plan = result.get_interpolated_plan()
self.idx_list = list(range(len(self.raw_js_names)))
self.cmd_plan = cmd_plan.get_ordered_joint_state(self.raw_js_names)
self.num_plan_failed = 0
else:
print("Plan did not converge to a solution.")
self.num_plan_failed += 1
if self.cmd_plan and self._step_idx % 1 == 0:
cmd_state = self.cmd_plan[self.cmd_idx]
art_action = ArticulationAction(
cmd_state.position.cpu().numpy(),
cmd_state.velocity.cpu().numpy() * 0.0,
joint_indices=self.idx_list,
)
self.cmd_idx += self.ds_ratio
if self.cmd_idx >= len(self.cmd_plan):
self.cmd_idx = 0
self.cmd_plan = None
else:
self.num_last_cmd += 1
art_action = ArticulationAction(joint_positions=sim_js.positions[self.arm_indices])
else:
art_action = ArticulationAction(joint_positions=sim_js.positions[self.arm_indices])
self._step_idx += 1
arm_action = art_action.joint_positions
gripper_action = self.get_gripper_action()
joint_positions = np.concatenate([arm_action, gripper_action])
self._action = {
"joint_positions": joint_positions,
"joint_indices": np.concatenate([self.arm_indices, self.gripper_indices]),
"lr_name": self.lr_name,
"arm_action": arm_action,
"gripper_action": gripper_action,
}
return self._action
def get_gripper_action(self):
return np.clip(self._gripper_state * self._gripper_joint_position, 0.0, 0.04)
def get_ee_pose(self):
sim_js = self.robot.get_joints_state()
q_state = torch.tensor(sim_js.positions[self.arm_indices], **self.tensor_args.as_torch_dict()).reshape(1, -1)
ee_pose = self.kin_model.get_state(q_state)
return ee_pose.ee_position[0].cpu().numpy(), ee_pose.ee_quaternion[0].cpu().numpy()
def get_armbase_pose(self):
armbase_pose = get_relative_transform(
get_prim_at_path(self.robot_base_path), get_prim_at_path(self.task.root_prim_path)
)
return pose_from_tf_matrix(armbase_pose)
def forward_kinematic(self, q_state: np.ndarray):
q_state = q_state.reshape(1, -1)
q_state = self.tensor_args.to_device(q_state)
out = self.kin_model.get_state(q_state)
return out.ee_position[0].cpu().numpy(), out.ee_quaternion[0].cpu().numpy()
def close_gripper(self):
self._gripper_state = -1.0
def open_gripper(self):
self._gripper_state = 1.0
def attach_obj(self, obj_prim_path: str, link_name="attached_object"):
sim_js = self.robot.get_joints_state()
js_names = self.robot.dof_names
cu_js = JointState(
position=self.tensor_args.to_device(sim_js.positions),
velocity=self.tensor_args.to_device(sim_js.velocities) * 0.0,
acceleration=self.tensor_args.to_device(sim_js.velocities) * 0.0,
jerk=self.tensor_args.to_device(sim_js.velocities) * 0.0,
joint_names=js_names,
)
self.motion_gen.attach_objects_to_robot(
cu_js,
[obj_prim_path],
link_name=link_name,
sphere_fit_type=SphereFitType.VOXEL_VOLUME_SAMPLE_SURFACE,
world_objects_pose_offset=Pose.from_list([0, 0, 0.01, 1, 0, 0, 0], self.tensor_args),
)
def detach_obj(self):
self.motion_gen.detach_object_from_robot()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456
init(self, name, robot_file, task, world, constrain_grasp_approach, collision_activation_distance, ignore_substring, use_batch, **kwargs)
Initialize the controller, load robot configuration, and set up CuRobo motion generator.
Parameters:
- **name **( str ): Controller name, matching the robot name in task.
- **robot_file **( str ): Path to CuRobo robot YAML configuration file.
- **task **( BaseTask ): The task instance containing robots and scene.
- **world **( World ): Isaac Sim world instance.
- **constrain_grasp_approach **( bool , optional): Whether to add grasp approach constraint. Default is
False. - **collision_activation_distance **( float , optional): Distance threshold for collision activation. Default is
0.03. - **ignore_substring **( List[str] , optional): Substrings for objects to ignore in collision checking.
- **use_batch **( bool , optional): Enable batch planning mode for multiple goals. Default is
False. - ** **kwargs **: Additional keyword arguments.
reset(self, ignore_substring=None)
Reset controller state, update collision world, and initialize end-effector pose.
Parameters:
- **ignore_substring **( List[str] , optional): New collision filter substrings to use.
update_pose_cost_metric(self, hold_vec_weight=None)
Update the pose cost metric for constrained motion planning. This method is used to hold specific orientations or positions during robot motion, which is useful for tasks like keeping a container upright while moving.
Parameters:
- **hold_vec_weight **( List[float] , optional): A 6-element weight vector
[angular-x, angular-y, angular-z, linear-x, linear-y, linear-z]that controls constraint costs. Defaults toNone, which corresponds to[0, 0, 0, 0, 0, 0](no constraints). For example,[1, 1, 1, 0, 0, 0]holds the tool orientation fixed during motion.
Reference:
update_specific(self, ignore_substring, reference_prim_path)
Update collision world with specific ignore substrings and reference prim path. Used for fine-grained collision filtering during skill execution.
Parameters:
- **ignore_substring **( List[str] ): List of substrings for objects to ignore in collision checking.
- **reference_prim_path **( str ): Reference prim path for relative transform calculations.
plan(self, ee_translation_goal, ee_orientation_goal, sim_js, js_names)
Generate a collision-free trajectory from current joint state to target end-effector pose.
Parameters:
- **ee_translation_goal **( torch.Tensor | np.ndarray ): Target end-effector position
[x, y, z]. - **ee_orientation_goal **( torch.Tensor | np.ndarray ): Target end-effector orientation as quaternion
[w, x, y, z]. - **sim_js **( JointState ): Current joint state from simulation.
- **js_names **( list ): List of joint names in simulation order.
Returns:
- **MotionGenResult **: CuRobo planning result containing success status, trajectory, and errors.
forward(self, manip_cmd, eps=5e-3)
Execute a manipulation command. This is the main entry point called by skills to execute motions.
Parameters:
-
**manip_cmd **( tuple ): Command tuple
(ee_trans, ee_ori, gripper_fn, params): -
ee_trans: Target end-effector translation. -
ee_ori: Target end-effector orientation. -
gripper_fn: Name of gripper function to call (e.g.,"open_gripper","close_gripper"). -
params: Parameters for the gripper function. -
**eps **( float , optional): Position/orientation threshold to trigger new planning. Default is
5e-3.
Returns:
- **dict **: Action dictionary containing:
joint_positions: Full joint positions including gripper.joint_indices: Indices of joints to command.lr_name: Left/right arm identifier.arm_action: Arm joint positions only.gripper_action: Gripper joint positions only.
Example: Lift2Controller
The Lift2Controller demonstrates how to create a robot-specific controller by overriding key methods from TemplateController . This controller manages the ARX Lift-2 dual-arm robot with R5a arms.
python
"""Lift2 mobile manipulator controller – template-based."""
import numpy as np
from core.controllers.base_controller import register_controller
from core.controllers.template_controller import TemplateController
@register_controller
class Lift2Controller(TemplateController):
def _get_default_ignore_substring(self):
return ["material", "Plane", "conveyor", "scene", "table", "fluid"]
def _configure_joint_indices(self, robot_file: str) -> None:
self.raw_js_names = ["joint1", "joint2", "joint3", "joint4", "joint5", "joint6"]
if "left" in robot_file:
self.cmd_js_names = ["fl_joint1", "fl_joint2", "fl_joint3", "fl_joint4", "fl_joint5", "fl_joint6"]
self.arm_indices = np.array([10, 12, 14, 16, 18, 20])
self.gripper_indices = np.array([23])
self.reference_prim_path = self.task.robots[self.name].fl_base_path
self.lr_name = "left"
self._gripper_state = 1.0 if self.robot.left_gripper_state == 1.0 else -1.0
elif "right" in robot_file:
self.cmd_js_names = ["fr_joint1", "fr_joint2", "fr_joint3", "fr_joint4", "fr_joint5", "fr_joint6"]
self.arm_indices = np.array([9, 11, 13, 15, 17, 19])
self.gripper_indices = np.array([21])
self.reference_prim_path = self.task.robots[self.name].fr_base_path
self.lr_name = "right"
self._gripper_state = 1.0 if self.robot.right_gripper_state == 1.0 else -1.0
else:
raise NotImplementedError("robot_file must contain 'left' or 'right'")
self._gripper_joint_position = np.array([1.0])
def _get_grasp_approach_linear_axis(self) -> int:
"""Lift2 uses x-axis (0) for grasp approach."""
return 0
def get_gripper_action(self):
return np.clip(self._gripper_state * self._gripper_joint_position, 0.0, 0.1)
def forward(self, manip_cmd, eps=5e-3):
ee_trans, ee_ori = manip_cmd[0:2]
gripper_fn = manip_cmd[2]
params = manip_cmd[3]
gripper_vel = manip_cmd[4] if len(manip_cmd) > 4 else None
assert hasattr(self, gripper_fn)
method = getattr(self, gripper_fn)
if gripper_fn in ["in_plane_rotation", "mobile_move", "dummy_forward", "joint_ctrl"]:
return method(**params)
elif gripper_fn in ["update_pose_cost_metric", "update_specific"]:
method(**params)
return self.ee_forward(ee_trans, ee_ori, eps=eps, gripper_vel=gripper_vel, skip_plan=True)
else:
method(**params)
return self.ee_forward(ee_trans, ee_ori, eps=eps, gripper_vel=gripper_vel)
def ee_forward(
self,
ee_trans,
ee_ori,
eps=1e-4,
skip_plan=False,
gripper_vel=None,
):
super().ee_forward(ee_trans, ee_ori, eps=eps, skip_plan=skip_plan)
self._action["gripper_vel"] = gripper_vel
return self._action
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
Overridden Methods
The following four methods must be overridden to create a robot-specific controller:
_get_default_ignore_substring(self)
Return a list of substrings for objects to ignore during collision checking. These typically include visual-only objects, scene elements, and robot-specific parts.
Returns:
- **List[str] **: List of substrings to filter out collision objects. For Lift2, this includes
"material","Plane","conveyor","scene","table", and"fluid".
_configure_joint_indices(self, robot_file)
Configure the mapping between CuRobo planner joint names and simulation joint names. This is critical for translating planned trajectories into simulation commands.
Parameters:
- **robot_file **( str ): Path to the CuRobo robot configuration file. Used to determine left/right arm for dual-arm robots.
Sets the following attributes:
- **raw_js_names **( List[str] ): Joint names in CuRobo planner order.
- **cmd_js_names **( List[str] ): Joint names in simulation order.
- **arm_indices **( np.ndarray ): Indices of arm joints in simulation.
- **gripper_indices **( np.ndarray ): Indices of gripper joints in simulation.
- **reference_prim_path **( str ): Path to robot base prim for collision reference.
- **lr_name **( str ): "left" or "right" for dual-arm robots.
_get_grasp_approach_linear_axis(self)
Return the axis used for constrained grasp approach motion. This defines which axis the end-effector should align with during approach.
Returns:
- **int **: Axis index (0=x, 1=y, 2=z). Default in
TemplateControlleris 2 (z-axis). Lift2 uses 0 (x-axis) due to its gripper orientation.
get_gripper_action(self)
Convert internal gripper state to actual gripper joint position command. Different robots have different gripper mechanisms and ranges.
Returns:
- **np.ndarray **: Gripper joint position(s). The value is clipped to the robot's valid gripper range. For Lift2, the range is
[0.0, 0.1].