Files
test-xh/rm_move_usd.py
2025-08-11 13:03:26 +08:00

287 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import omni.kit.app
import omni.usd
import omni.timeline
import asyncio
import math
from pxr import Gf, UsdPhysics
class DirectRM75Controller:
def __init__(self, robot_path="/World/RM75_6F"):
"""直接通过USD接口控制RM75机器人"""
self.robot_path = robot_path
self.stage = None
self.joint_paths = []
self.joint_prims = []
self.joint_names = []
# 预定义姿势 (角度制)
self.poses = {
"零位": [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0],
"直立姿势": [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0],
"弯曲姿势": [0.0, 45.0, 30.0, 40.0, 0.0, 0.0, 0.0],
"伸展姿势": [30.0, -30.0, -45.0, 0.0, 20.0, 0.0, 0.0],
"工作姿势": [0.0, 60.0, -90.0, 60.0, 0.0, 30.0, 0.0],
}
# 将角度制转换为弧度制
for name, angles in self.poses.items():
self.poses[name] = [math.radians(angle) for angle in angles]
# 控制设置
self.transition_steps = 60
self.frames_per_step = 2
async def initialize(self):
"""初始化控制器"""
try:
# 获取阶段
self.stage = omni.usd.get_context().get_stage()
if not self.stage:
print("错误: 无法获取USD阶段")
return False
# 检查机器人是否存在
if not self.stage.GetPrimAtPath(self.robot_path):
print(f"错误: 找不到机器人路径 {self.robot_path}")
return False
# 查找机器人关节
if not await self.find_joints():
return False
# 确保物理模拟运行
timeline = omni.timeline.get_timeline_interface()
if not timeline.is_playing():
timeline.play()
# 等待几帧以确保物理初始化
for _ in range(10):
await self._wait_for_update()
return True
except Exception as e:
print(f"初始化时出错: {e}")
import traceback
traceback.print_exc()
return False
async def find_joints(self):
"""查找机器人关节"""
try:
self.joint_paths = []
self.joint_prims = []
self.joint_names = []
# 根据截图中的结构,直接查找关节
# 您的结构显示关节位于不同层级
joint_locations = [
# 从截图看joint_1应该是root_joint
(f"{self.robot_path}/base_link/root_joint", "root_joint"),
(f"{self.robot_path}/Link_1/joint_2", "joint_2"),
(f"{self.robot_path}/Link_2/joint_3", "joint_3"),
# 其他关节路径需要检查
(f"{self.robot_path}/Link_3", "Link_3"),
(f"{self.robot_path}/Link_4", "Link_4"),
(f"{self.robot_path}/Link_5", "Link_5"),
(f"{self.robot_path}/Link_6", "Link_6"),
(f"{self.robot_path}/Link7", "Link7")
]
# 查找关节
for path, name in joint_locations:
if self.stage.GetPrimAtPath(path):
prim = self.stage.GetPrimAtPath(path)
# 检查是否是PhysicsRevolute类型或具有驱动属性
if "joint" in name.lower() and prim.GetAttribute("drive:angular:physics:targetPosition"):
self.joint_paths.append(path)
self.joint_prims.append(prim)
self.joint_names.append(name)
print(f"找到关节: {path} ({name})")
elif "Link" in name:
# 查找该Link下的joint
for child in prim.GetChildren():
child_name = child.GetName()
if "joint" in child_name.lower() and child.GetAttribute("drive:angular:physics:targetPosition"):
child_path = str(child.GetPath())
self.joint_paths.append(child_path)
self.joint_prims.append(child)
self.joint_names.append(child_name)
print(f"找到关节: {child_path} ({child_name})")
if not self.joint_paths:
# 如果上面的方法失败,尝试全局搜索
print("使用全局搜索查找关节...")
for prim in self.stage.Traverse():
path = str(prim.GetPath())
name = prim.GetName()
if self.robot_path in path and "joint" in name.lower():
# 检查是否是关节类型
if prim.GetAttribute("drive:angular:physics:targetPosition"):
self.joint_paths.append(path)
self.joint_prims.append(prim)
self.joint_names.append(name)
print(f"找到关节: {path} ({name})")
if len(self.joint_paths) == 0:
print("错误: 找不到任何关节")
return False
print(f"总共找到 {len(self.joint_paths)} 个关节")
return True
except Exception as e:
print(f"查找关节时出错: {e}")
import traceback
traceback.print_exc()
return False
async def _wait_for_update(self):
"""等待帧更新"""
await omni.kit.app.get_app().next_update_async()
async def get_joint_positions(self):
"""获取当前关节位置"""
positions = []
for i, prim in enumerate(self.joint_prims):
try:
# 读取目标位置
target_attr = prim.GetAttribute("drive:angular:physics:targetPosition")
if target_attr and target_attr.IsValid():
pos = target_attr.Get()
positions.append(pos)
else:
# 读取当前物理位置
state_attr = prim.GetAttribute("state:angular:physics:position")
if state_attr and state_attr.IsValid():
pos = state_attr.Get()
positions.append(pos)
else:
# 默认位置
positions.append(0.0)
print(f"警告: 无法获取关节 {self.joint_names[i]} 位置,使用默认值 0")
except Exception as e:
print(f"获取关节 {self.joint_names[i]} 位置时出错: {e}")
positions.append(0.0)
return positions
async def set_joint_positions(self, target_positions):
"""设置关节位置"""
if len(target_positions) < len(self.joint_prims):
# 如果目标位置不足,使用默认值补充
target_positions = list(target_positions) + [0.0] * (len(self.joint_prims) - len(target_positions))
# 获取关节限制
joint_limits = []
for prim in self.joint_prims:
try:
lower_attr = prim.GetAttribute("physics:lowerLimit")
upper_attr = prim.GetAttribute("physics:upperLimit")
lower = -math.pi if not lower_attr or not lower_attr.IsValid() else lower_attr.Get()
upper = math.pi if not upper_attr or not upper_attr.IsValid() else upper_attr.Get()
joint_limits.append((lower, upper))
except:
joint_limits.append((-math.pi, math.pi))
# 设置关节位置
for i, (prim, pos, (lower, upper)) in enumerate(zip(self.joint_prims, target_positions, joint_limits)):
try:
# 确保位置在限制范围内
constrained_pos = max(lower, min(pos, upper))
if constrained_pos != pos:
print(f"警告: 关节 {self.joint_names[i]} 位置 {math.degrees(pos):.1f}° 超出限制 [{math.degrees(lower):.1f}°, {math.degrees(upper):.1f}°],已调整")
# 设置目标位置
target_attr = prim.GetAttribute("drive:angular:physics:targetPosition")
if target_attr and target_attr.IsValid():
current = target_attr.Get()
target_attr.Set(constrained_pos)
print(f"设置关节 {self.joint_names[i]} 位置: {math.degrees(constrained_pos):.1f}° (当前: {math.degrees(current):.1f}°)")
else:
print(f"警告: 无法设置关节 {self.joint_names[i]} 位置")
except Exception as e:
print(f"设置关节 {self.joint_names[i]} 位置时出错: {e}")
async def move_to_pose(self, target_angles):
"""平滑移动到目标姿势"""
try:
# 获取当前位置
current_angles = await self.get_joint_positions()
print(f"当前位置: [{', '.join([f'{math.degrees(a):.1f}°' for a in current_angles])}]")
print(f"目标位置: [{', '.join([f'{math.degrees(a):.1f}°' for a in target_angles])}]")
print("开始平滑移动...")
# 平滑过渡
for step in range(1, self.transition_steps + 1):
fraction = step / self.transition_steps
interpolated_positions = []
for i, (current, target) in enumerate(zip(current_angles, target_angles)):
# 如果索引超出范围,使用默认值
if i >= len(target_angles):
pos = current
else:
pos = current + fraction * (target - current)
interpolated_positions.append(pos)
# 设置插值位置
await self.set_joint_positions(interpolated_positions)
# 等待几帧
for _ in range(self.frames_per_step):
await self._wait_for_update()
print("移动完成")
return True
except Exception as e:
print(f"移动到姿势时出错: {e}")
import traceback
traceback.print_exc()
return False
async def run_demo(self):
"""运行演示程序"""
if not await self.initialize():
print("初始化失败,无法运行演示")
return
print("\n开始RM75机器人姿势演示...")
# 保存初始位置
initial_angles = await self.get_joint_positions()
# 依次展示预定义姿势
for pose_name, angles in self.poses.items():
print(f"\n正在移动到 {pose_name}...")
success = await self.move_to_pose(angles)
if success:
print(f"已达到 {pose_name}!")
else:
print(f"移动到 {pose_name} 失败")
# 在此姿势停留一段时间
print(f"停留在 {pose_name} 2秒...")
for _ in range(20):
await self._wait_for_update()
# 返回初始姿势
print("\n返回初始姿势...")
await self.move_to_pose(initial_angles)
print("\n演示完成!")
# 主函数
async def main():
controller = DirectRM75Controller("/World/RM75_6F")
await controller.run_demo()
# 执行主函数
asyncio.ensure_future(main())