This commit is contained in:
2025-08-11 13:03:26 +08:00
parent 523c4907d3
commit 43ab2fa3d6

287
rm_move_usd.py Normal file
View File

@@ -0,0 +1,287 @@
import omni.kit.app
import omni.usd
import omni.timeline
import asyncio
import math
from pxr import Gf, UsdPhysics
class DirectRM75Controller:
def __init__(self, robot_path="/World/RM75_6F"):
"""直接通过USD接口控制RM75机器人"""
self.robot_path = robot_path
self.stage = None
self.joint_paths = []
self.joint_prims = []
self.joint_names = []
# 预定义姿势 (角度制)
self.poses = {
"零位": [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0],
"直立姿势": [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0],
"弯曲姿势": [0.0, 45.0, 30.0, 40.0, 0.0, 0.0, 0.0],
"伸展姿势": [30.0, -30.0, -45.0, 0.0, 20.0, 0.0, 0.0],
"工作姿势": [0.0, 60.0, -90.0, 60.0, 0.0, 30.0, 0.0],
}
# 将角度制转换为弧度制
for name, angles in self.poses.items():
self.poses[name] = [math.radians(angle) for angle in angles]
# 控制设置
self.transition_steps = 60
self.frames_per_step = 2
async def initialize(self):
"""初始化控制器"""
try:
# 获取阶段
self.stage = omni.usd.get_context().get_stage()
if not self.stage:
print("错误: 无法获取USD阶段")
return False
# 检查机器人是否存在
if not self.stage.GetPrimAtPath(self.robot_path):
print(f"错误: 找不到机器人路径 {self.robot_path}")
return False
# 查找机器人关节
if not await self.find_joints():
return False
# 确保物理模拟运行
timeline = omni.timeline.get_timeline_interface()
if not timeline.is_playing():
timeline.play()
# 等待几帧以确保物理初始化
for _ in range(10):
await self._wait_for_update()
return True
except Exception as e:
print(f"初始化时出错: {e}")
import traceback
traceback.print_exc()
return False
async def find_joints(self):
"""查找机器人关节"""
try:
self.joint_paths = []
self.joint_prims = []
self.joint_names = []
# 根据截图中的结构,直接查找关节
# 您的结构显示关节位于不同层级
joint_locations = [
# 从截图看joint_1应该是root_joint
(f"{self.robot_path}/base_link/root_joint", "root_joint"),
(f"{self.robot_path}/Link_1/joint_2", "joint_2"),
(f"{self.robot_path}/Link_2/joint_3", "joint_3"),
# 其他关节路径需要检查
(f"{self.robot_path}/Link_3", "Link_3"),
(f"{self.robot_path}/Link_4", "Link_4"),
(f"{self.robot_path}/Link_5", "Link_5"),
(f"{self.robot_path}/Link_6", "Link_6"),
(f"{self.robot_path}/Link7", "Link7")
]
# 查找关节
for path, name in joint_locations:
if self.stage.GetPrimAtPath(path):
prim = self.stage.GetPrimAtPath(path)
# 检查是否是PhysicsRevolute类型或具有驱动属性
if "joint" in name.lower() and prim.GetAttribute("drive:angular:physics:targetPosition"):
self.joint_paths.append(path)
self.joint_prims.append(prim)
self.joint_names.append(name)
print(f"找到关节: {path} ({name})")
elif "Link" in name:
# 查找该Link下的joint
for child in prim.GetChildren():
child_name = child.GetName()
if "joint" in child_name.lower() and child.GetAttribute("drive:angular:physics:targetPosition"):
child_path = str(child.GetPath())
self.joint_paths.append(child_path)
self.joint_prims.append(child)
self.joint_names.append(child_name)
print(f"找到关节: {child_path} ({child_name})")
if not self.joint_paths:
# 如果上面的方法失败,尝试全局搜索
print("使用全局搜索查找关节...")
for prim in self.stage.Traverse():
path = str(prim.GetPath())
name = prim.GetName()
if self.robot_path in path and "joint" in name.lower():
# 检查是否是关节类型
if prim.GetAttribute("drive:angular:physics:targetPosition"):
self.joint_paths.append(path)
self.joint_prims.append(prim)
self.joint_names.append(name)
print(f"找到关节: {path} ({name})")
if len(self.joint_paths) == 0:
print("错误: 找不到任何关节")
return False
print(f"总共找到 {len(self.joint_paths)} 个关节")
return True
except Exception as e:
print(f"查找关节时出错: {e}")
import traceback
traceback.print_exc()
return False
async def _wait_for_update(self):
"""等待帧更新"""
await omni.kit.app.get_app().next_update_async()
async def get_joint_positions(self):
"""获取当前关节位置"""
positions = []
for i, prim in enumerate(self.joint_prims):
try:
# 读取目标位置
target_attr = prim.GetAttribute("drive:angular:physics:targetPosition")
if target_attr and target_attr.IsValid():
pos = target_attr.Get()
positions.append(pos)
else:
# 读取当前物理位置
state_attr = prim.GetAttribute("state:angular:physics:position")
if state_attr and state_attr.IsValid():
pos = state_attr.Get()
positions.append(pos)
else:
# 默认位置
positions.append(0.0)
print(f"警告: 无法获取关节 {self.joint_names[i]} 位置,使用默认值 0")
except Exception as e:
print(f"获取关节 {self.joint_names[i]} 位置时出错: {e}")
positions.append(0.0)
return positions
async def set_joint_positions(self, target_positions):
"""设置关节位置"""
if len(target_positions) < len(self.joint_prims):
# 如果目标位置不足,使用默认值补充
target_positions = list(target_positions) + [0.0] * (len(self.joint_prims) - len(target_positions))
# 获取关节限制
joint_limits = []
for prim in self.joint_prims:
try:
lower_attr = prim.GetAttribute("physics:lowerLimit")
upper_attr = prim.GetAttribute("physics:upperLimit")
lower = -math.pi if not lower_attr or not lower_attr.IsValid() else lower_attr.Get()
upper = math.pi if not upper_attr or not upper_attr.IsValid() else upper_attr.Get()
joint_limits.append((lower, upper))
except:
joint_limits.append((-math.pi, math.pi))
# 设置关节位置
for i, (prim, pos, (lower, upper)) in enumerate(zip(self.joint_prims, target_positions, joint_limits)):
try:
# 确保位置在限制范围内
constrained_pos = max(lower, min(pos, upper))
if constrained_pos != pos:
print(f"警告: 关节 {self.joint_names[i]} 位置 {math.degrees(pos):.1f}° 超出限制 [{math.degrees(lower):.1f}°, {math.degrees(upper):.1f}°],已调整")
# 设置目标位置
target_attr = prim.GetAttribute("drive:angular:physics:targetPosition")
if target_attr and target_attr.IsValid():
current = target_attr.Get()
target_attr.Set(constrained_pos)
print(f"设置关节 {self.joint_names[i]} 位置: {math.degrees(constrained_pos):.1f}° (当前: {math.degrees(current):.1f}°)")
else:
print(f"警告: 无法设置关节 {self.joint_names[i]} 位置")
except Exception as e:
print(f"设置关节 {self.joint_names[i]} 位置时出错: {e}")
async def move_to_pose(self, target_angles):
"""平滑移动到目标姿势"""
try:
# 获取当前位置
current_angles = await self.get_joint_positions()
print(f"当前位置: [{', '.join([f'{math.degrees(a):.1f}°' for a in current_angles])}]")
print(f"目标位置: [{', '.join([f'{math.degrees(a):.1f}°' for a in target_angles])}]")
print("开始平滑移动...")
# 平滑过渡
for step in range(1, self.transition_steps + 1):
fraction = step / self.transition_steps
interpolated_positions = []
for i, (current, target) in enumerate(zip(current_angles, target_angles)):
# 如果索引超出范围,使用默认值
if i >= len(target_angles):
pos = current
else:
pos = current + fraction * (target - current)
interpolated_positions.append(pos)
# 设置插值位置
await self.set_joint_positions(interpolated_positions)
# 等待几帧
for _ in range(self.frames_per_step):
await self._wait_for_update()
print("移动完成")
return True
except Exception as e:
print(f"移动到姿势时出错: {e}")
import traceback
traceback.print_exc()
return False
async def run_demo(self):
"""运行演示程序"""
if not await self.initialize():
print("初始化失败,无法运行演示")
return
print("\n开始RM75机器人姿势演示...")
# 保存初始位置
initial_angles = await self.get_joint_positions()
# 依次展示预定义姿势
for pose_name, angles in self.poses.items():
print(f"\n正在移动到 {pose_name}...")
success = await self.move_to_pose(angles)
if success:
print(f"已达到 {pose_name}!")
else:
print(f"移动到 {pose_name} 失败")
# 在此姿势停留一段时间
print(f"停留在 {pose_name} 2秒...")
for _ in range(20):
await self._wait_for_update()
# 返回初始姿势
print("\n返回初始姿势...")
await self.move_to_pose(initial_angles)
print("\n演示完成!")
# 主函数
async def main():
controller = DirectRM75Controller("/World/RM75_6F")
await controller.run_demo()
# 执行主函数
asyncio.ensure_future(main())