import omni.kit.app import omni.usd import omni.timeline import asyncio import math from pxr import Gf, UsdPhysics class DirectRM75Controller: def __init__(self, robot_path="/World/RM75_6F"): """直接通过USD接口控制RM75机器人""" self.robot_path = robot_path self.stage = None self.joint_paths = [] self.joint_prims = [] self.joint_names = [] # 预定义姿势 (角度制) self.poses = { "零位": [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0], "直立姿势": [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0], "弯曲姿势": [0.0, 45.0, 30.0, 40.0, 0.0, 0.0, 0.0], "伸展姿势": [30.0, -30.0, -45.0, 0.0, 20.0, 0.0, 0.0], "工作姿势": [0.0, 60.0, -90.0, 60.0, 0.0, 30.0, 0.0], } # 将角度制转换为弧度制 for name, angles in self.poses.items(): self.poses[name] = [math.radians(angle) for angle in angles] # 控制设置 self.transition_steps = 60 self.frames_per_step = 2 async def initialize(self): """初始化控制器""" try: # 获取阶段 self.stage = omni.usd.get_context().get_stage() if not self.stage: print("错误: 无法获取USD阶段") return False # 检查机器人是否存在 if not self.stage.GetPrimAtPath(self.robot_path): print(f"错误: 找不到机器人路径 {self.robot_path}") return False # 查找机器人关节 if not await self.find_joints(): return False # 确保物理模拟运行 timeline = omni.timeline.get_timeline_interface() if not timeline.is_playing(): timeline.play() # 等待几帧以确保物理初始化 for _ in range(10): await self._wait_for_update() return True except Exception as e: print(f"初始化时出错: {e}") import traceback traceback.print_exc() return False async def find_joints(self): """查找机器人关节""" try: self.joint_paths = [] self.joint_prims = [] self.joint_names = [] # 根据截图中的结构,直接查找关节 # 您的结构显示关节位于不同层级 joint_locations = [ # 从截图看,joint_1应该是root_joint (f"{self.robot_path}/base_link/root_joint", "root_joint"), (f"{self.robot_path}/Link_1/joint_2", "joint_2"), (f"{self.robot_path}/Link_2/joint_3", "joint_3"), # 其他关节路径需要检查 (f"{self.robot_path}/Link_3", "Link_3"), (f"{self.robot_path}/Link_4", "Link_4"), (f"{self.robot_path}/Link_5", "Link_5"), (f"{self.robot_path}/Link_6", "Link_6"), (f"{self.robot_path}/Link7", "Link7") ] # 查找关节 for path, name in joint_locations: if self.stage.GetPrimAtPath(path): prim = self.stage.GetPrimAtPath(path) # 检查是否是PhysicsRevolute类型或具有驱动属性 if "joint" in name.lower() and prim.GetAttribute("drive:angular:physics:targetPosition"): self.joint_paths.append(path) self.joint_prims.append(prim) self.joint_names.append(name) print(f"找到关节: {path} ({name})") elif "Link" in name: # 查找该Link下的joint for child in prim.GetChildren(): child_name = child.GetName() if "joint" in child_name.lower() and child.GetAttribute("drive:angular:physics:targetPosition"): child_path = str(child.GetPath()) self.joint_paths.append(child_path) self.joint_prims.append(child) self.joint_names.append(child_name) print(f"找到关节: {child_path} ({child_name})") if not self.joint_paths: # 如果上面的方法失败,尝试全局搜索 print("使用全局搜索查找关节...") for prim in self.stage.Traverse(): path = str(prim.GetPath()) name = prim.GetName() if self.robot_path in path and "joint" in name.lower(): # 检查是否是关节类型 if prim.GetAttribute("drive:angular:physics:targetPosition"): self.joint_paths.append(path) self.joint_prims.append(prim) self.joint_names.append(name) print(f"找到关节: {path} ({name})") if len(self.joint_paths) == 0: print("错误: 找不到任何关节") return False print(f"总共找到 {len(self.joint_paths)} 个关节") return True except Exception as e: print(f"查找关节时出错: {e}") import traceback traceback.print_exc() return False async def _wait_for_update(self): """等待帧更新""" await omni.kit.app.get_app().next_update_async() async def get_joint_positions(self): """获取当前关节位置""" positions = [] for i, prim in enumerate(self.joint_prims): try: # 读取目标位置 target_attr = prim.GetAttribute("drive:angular:physics:targetPosition") if target_attr and target_attr.IsValid(): pos = target_attr.Get() positions.append(pos) else: # 读取当前物理位置 state_attr = prim.GetAttribute("state:angular:physics:position") if state_attr and state_attr.IsValid(): pos = state_attr.Get() positions.append(pos) else: # 默认位置 positions.append(0.0) print(f"警告: 无法获取关节 {self.joint_names[i]} 位置,使用默认值 0") except Exception as e: print(f"获取关节 {self.joint_names[i]} 位置时出错: {e}") positions.append(0.0) return positions async def set_joint_positions(self, target_positions): """设置关节位置""" if len(target_positions) < len(self.joint_prims): # 如果目标位置不足,使用默认值补充 target_positions = list(target_positions) + [0.0] * (len(self.joint_prims) - len(target_positions)) # 获取关节限制 joint_limits = [] for prim in self.joint_prims: try: lower_attr = prim.GetAttribute("physics:lowerLimit") upper_attr = prim.GetAttribute("physics:upperLimit") lower = -math.pi if not lower_attr or not lower_attr.IsValid() else lower_attr.Get() upper = math.pi if not upper_attr or not upper_attr.IsValid() else upper_attr.Get() joint_limits.append((lower, upper)) except: joint_limits.append((-math.pi, math.pi)) # 设置关节位置 for i, (prim, pos, (lower, upper)) in enumerate(zip(self.joint_prims, target_positions, joint_limits)): try: # 确保位置在限制范围内 constrained_pos = max(lower, min(pos, upper)) if constrained_pos != pos: print(f"警告: 关节 {self.joint_names[i]} 位置 {math.degrees(pos):.1f}° 超出限制 [{math.degrees(lower):.1f}°, {math.degrees(upper):.1f}°],已调整") # 设置目标位置 target_attr = prim.GetAttribute("drive:angular:physics:targetPosition") if target_attr and target_attr.IsValid(): current = target_attr.Get() target_attr.Set(constrained_pos) print(f"设置关节 {self.joint_names[i]} 位置: {math.degrees(constrained_pos):.1f}° (当前: {math.degrees(current):.1f}°)") else: print(f"警告: 无法设置关节 {self.joint_names[i]} 位置") except Exception as e: print(f"设置关节 {self.joint_names[i]} 位置时出错: {e}") async def move_to_pose(self, target_angles): """平滑移动到目标姿势""" try: # 获取当前位置 current_angles = await self.get_joint_positions() print(f"当前位置: [{', '.join([f'{math.degrees(a):.1f}°' for a in current_angles])}]") print(f"目标位置: [{', '.join([f'{math.degrees(a):.1f}°' for a in target_angles])}]") print("开始平滑移动...") # 平滑过渡 for step in range(1, self.transition_steps + 1): fraction = step / self.transition_steps interpolated_positions = [] for i, (current, target) in enumerate(zip(current_angles, target_angles)): # 如果索引超出范围,使用默认值 if i >= len(target_angles): pos = current else: pos = current + fraction * (target - current) interpolated_positions.append(pos) # 设置插值位置 await self.set_joint_positions(interpolated_positions) # 等待几帧 for _ in range(self.frames_per_step): await self._wait_for_update() print("移动完成") return True except Exception as e: print(f"移动到姿势时出错: {e}") import traceback traceback.print_exc() return False async def run_demo(self): """运行演示程序""" if not await self.initialize(): print("初始化失败,无法运行演示") return print("\n开始RM75机器人姿势演示...") # 保存初始位置 initial_angles = await self.get_joint_positions() # 依次展示预定义姿势 for pose_name, angles in self.poses.items(): print(f"\n正在移动到 {pose_name}...") success = await self.move_to_pose(angles) if success: print(f"已达到 {pose_name}!") else: print(f"移动到 {pose_name} 失败") # 在此姿势停留一段时间 print(f"停留在 {pose_name} 2秒...") for _ in range(20): await self._wait_for_update() # 返回初始姿势 print("\n返回初始姿势...") await self.move_to_pose(initial_angles) print("\n演示完成!") # 主函数 async def main(): controller = DirectRM75Controller("/World/RM75_6F") await controller.run_demo() # 执行主函数 asyncio.ensure_future(main())