diff --git a/rm_move_usd.py b/rm_move_usd.py new file mode 100644 index 0000000..f625f84 --- /dev/null +++ b/rm_move_usd.py @@ -0,0 +1,287 @@ +import omni.kit.app +import omni.usd +import omni.timeline +import asyncio +import math +from pxr import Gf, UsdPhysics + +class DirectRM75Controller: + def __init__(self, robot_path="/World/RM75_6F"): + """直接通过USD接口控制RM75机器人""" + self.robot_path = robot_path + self.stage = None + self.joint_paths = [] + self.joint_prims = [] + self.joint_names = [] + + # 预定义姿势 (角度制) + self.poses = { + "零位": [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0], + "直立姿势": [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0], + "弯曲姿势": [0.0, 45.0, 30.0, 40.0, 0.0, 0.0, 0.0], + "伸展姿势": [30.0, -30.0, -45.0, 0.0, 20.0, 0.0, 0.0], + "工作姿势": [0.0, 60.0, -90.0, 60.0, 0.0, 30.0, 0.0], + } + + # 将角度制转换为弧度制 + for name, angles in self.poses.items(): + self.poses[name] = [math.radians(angle) for angle in angles] + + # 控制设置 + self.transition_steps = 60 + self.frames_per_step = 2 + + async def initialize(self): + """初始化控制器""" + try: + # 获取阶段 + self.stage = omni.usd.get_context().get_stage() + if not self.stage: + print("错误: 无法获取USD阶段") + return False + + # 检查机器人是否存在 + if not self.stage.GetPrimAtPath(self.robot_path): + print(f"错误: 找不到机器人路径 {self.robot_path}") + return False + + # 查找机器人关节 + if not await self.find_joints(): + return False + + # 确保物理模拟运行 + timeline = omni.timeline.get_timeline_interface() + if not timeline.is_playing(): + timeline.play() + + # 等待几帧以确保物理初始化 + for _ in range(10): + await self._wait_for_update() + + return True + except Exception as e: + print(f"初始化时出错: {e}") + import traceback + traceback.print_exc() + return False + + async def find_joints(self): + """查找机器人关节""" + try: + self.joint_paths = [] + self.joint_prims = [] + self.joint_names = [] + + # 根据截图中的结构,直接查找关节 + # 您的结构显示关节位于不同层级 + joint_locations = [ + # 从截图看,joint_1应该是root_joint + (f"{self.robot_path}/base_link/root_joint", "root_joint"), + (f"{self.robot_path}/Link_1/joint_2", "joint_2"), + (f"{self.robot_path}/Link_2/joint_3", "joint_3"), + # 其他关节路径需要检查 + (f"{self.robot_path}/Link_3", "Link_3"), + (f"{self.robot_path}/Link_4", "Link_4"), + (f"{self.robot_path}/Link_5", "Link_5"), + (f"{self.robot_path}/Link_6", "Link_6"), + (f"{self.robot_path}/Link7", "Link7") + ] + + # 查找关节 + for path, name in joint_locations: + if self.stage.GetPrimAtPath(path): + prim = self.stage.GetPrimAtPath(path) + + # 检查是否是PhysicsRevolute类型或具有驱动属性 + if "joint" in name.lower() and prim.GetAttribute("drive:angular:physics:targetPosition"): + self.joint_paths.append(path) + self.joint_prims.append(prim) + self.joint_names.append(name) + print(f"找到关节: {path} ({name})") + elif "Link" in name: + # 查找该Link下的joint + for child in prim.GetChildren(): + child_name = child.GetName() + if "joint" in child_name.lower() and child.GetAttribute("drive:angular:physics:targetPosition"): + child_path = str(child.GetPath()) + self.joint_paths.append(child_path) + self.joint_prims.append(child) + self.joint_names.append(child_name) + print(f"找到关节: {child_path} ({child_name})") + + if not self.joint_paths: + # 如果上面的方法失败,尝试全局搜索 + print("使用全局搜索查找关节...") + for prim in self.stage.Traverse(): + path = str(prim.GetPath()) + name = prim.GetName() + + if self.robot_path in path and "joint" in name.lower(): + # 检查是否是关节类型 + if prim.GetAttribute("drive:angular:physics:targetPosition"): + self.joint_paths.append(path) + self.joint_prims.append(prim) + self.joint_names.append(name) + print(f"找到关节: {path} ({name})") + + if len(self.joint_paths) == 0: + print("错误: 找不到任何关节") + return False + + print(f"总共找到 {len(self.joint_paths)} 个关节") + return True + except Exception as e: + print(f"查找关节时出错: {e}") + import traceback + traceback.print_exc() + return False + + async def _wait_for_update(self): + """等待帧更新""" + await omni.kit.app.get_app().next_update_async() + + async def get_joint_positions(self): + """获取当前关节位置""" + positions = [] + + for i, prim in enumerate(self.joint_prims): + try: + # 读取目标位置 + target_attr = prim.GetAttribute("drive:angular:physics:targetPosition") + if target_attr and target_attr.IsValid(): + pos = target_attr.Get() + positions.append(pos) + else: + # 读取当前物理位置 + state_attr = prim.GetAttribute("state:angular:physics:position") + if state_attr and state_attr.IsValid(): + pos = state_attr.Get() + positions.append(pos) + else: + # 默认位置 + positions.append(0.0) + print(f"警告: 无法获取关节 {self.joint_names[i]} 位置,使用默认值 0") + except Exception as e: + print(f"获取关节 {self.joint_names[i]} 位置时出错: {e}") + positions.append(0.0) + + return positions + + async def set_joint_positions(self, target_positions): + """设置关节位置""" + if len(target_positions) < len(self.joint_prims): + # 如果目标位置不足,使用默认值补充 + target_positions = list(target_positions) + [0.0] * (len(self.joint_prims) - len(target_positions)) + + # 获取关节限制 + joint_limits = [] + for prim in self.joint_prims: + try: + lower_attr = prim.GetAttribute("physics:lowerLimit") + upper_attr = prim.GetAttribute("physics:upperLimit") + + lower = -math.pi if not lower_attr or not lower_attr.IsValid() else lower_attr.Get() + upper = math.pi if not upper_attr or not upper_attr.IsValid() else upper_attr.Get() + + joint_limits.append((lower, upper)) + except: + joint_limits.append((-math.pi, math.pi)) + + # 设置关节位置 + for i, (prim, pos, (lower, upper)) in enumerate(zip(self.joint_prims, target_positions, joint_limits)): + try: + # 确保位置在限制范围内 + constrained_pos = max(lower, min(pos, upper)) + if constrained_pos != pos: + print(f"警告: 关节 {self.joint_names[i]} 位置 {math.degrees(pos):.1f}° 超出限制 [{math.degrees(lower):.1f}°, {math.degrees(upper):.1f}°],已调整") + + # 设置目标位置 + target_attr = prim.GetAttribute("drive:angular:physics:targetPosition") + if target_attr and target_attr.IsValid(): + current = target_attr.Get() + target_attr.Set(constrained_pos) + print(f"设置关节 {self.joint_names[i]} 位置: {math.degrees(constrained_pos):.1f}° (当前: {math.degrees(current):.1f}°)") + else: + print(f"警告: 无法设置关节 {self.joint_names[i]} 位置") + except Exception as e: + print(f"设置关节 {self.joint_names[i]} 位置时出错: {e}") + + async def move_to_pose(self, target_angles): + """平滑移动到目标姿势""" + try: + # 获取当前位置 + current_angles = await self.get_joint_positions() + + print(f"当前位置: [{', '.join([f'{math.degrees(a):.1f}°' for a in current_angles])}]") + print(f"目标位置: [{', '.join([f'{math.degrees(a):.1f}°' for a in target_angles])}]") + + print("开始平滑移动...") + + # 平滑过渡 + for step in range(1, self.transition_steps + 1): + fraction = step / self.transition_steps + interpolated_positions = [] + + for i, (current, target) in enumerate(zip(current_angles, target_angles)): + # 如果索引超出范围,使用默认值 + if i >= len(target_angles): + pos = current + else: + pos = current + fraction * (target - current) + + interpolated_positions.append(pos) + + # 设置插值位置 + await self.set_joint_positions(interpolated_positions) + + # 等待几帧 + for _ in range(self.frames_per_step): + await self._wait_for_update() + + print("移动完成") + return True + except Exception as e: + print(f"移动到姿势时出错: {e}") + import traceback + traceback.print_exc() + return False + + async def run_demo(self): + """运行演示程序""" + if not await self.initialize(): + print("初始化失败,无法运行演示") + return + + print("\n开始RM75机器人姿势演示...") + + # 保存初始位置 + initial_angles = await self.get_joint_positions() + + # 依次展示预定义姿势 + for pose_name, angles in self.poses.items(): + print(f"\n正在移动到 {pose_name}...") + success = await self.move_to_pose(angles) + + if success: + print(f"已达到 {pose_name}!") + else: + print(f"移动到 {pose_name} 失败") + + # 在此姿势停留一段时间 + print(f"停留在 {pose_name} 2秒...") + for _ in range(20): + await self._wait_for_update() + + # 返回初始姿势 + print("\n返回初始姿势...") + await self.move_to_pose(initial_angles) + + print("\n演示完成!") + +# 主函数 +async def main(): + controller = DirectRM75Controller("/World/RM75_6F") + await controller.run_demo() + +# 执行主函数 +asyncio.ensure_future(main()) \ No newline at end of file