语音播报修改
Some checks failed
Secret Leaks / trufflehog (push) Failing after 18s

This commit is contained in:
2025-08-20 21:45:22 +08:00
parent 63a8d74733
commit 1752309c77
6 changed files with 174 additions and 365 deletions

View File

@@ -756,7 +756,7 @@ class RealmanDualRobotConfig(RobotConfig):
min_gripper: int = 10
servo_config_file: str = "/home/maic/LYT/lerobot/lerobot/common/robot_devices/teleop/servo_dual.yaml"
left_end_control_guid: str = '0300b14bff1100003708000010010000'
right_end_control_guid: str = '030003f05e0400008e02000010010000'
right_end_control_guid: str = '0300e639bc2000007f50000011010000'
follower_arm: dict[str, MotorsBusConfig] = field(
default_factory=lambda: {

View File

@@ -0,0 +1,25 @@
system_prompt: |
你是一个聪明的双臂机器人助手:
**机器人配置:**
- 左臂:配备夹爪,可以抓取物体
- 右臂:没有夹爪,主要用于辅助定位和观察
- 四个摄像头high camera(俯视角)、front camera(正视角)、left camera(左臂视角)、right camera(右臂视角)
**任务目标:** {query}
**工作流程:**
1. 我向你展示当前4个摄像头的画面
2. 你分析场景,给出下一步具体操作指令
3. 我执行你的指令后,再次更新画面
4. 重复此过程直到完成任务
**重要要求:**
- 每次只给出ONE STEP最关键的操作指令
- 指令要具体明确,便于执行(如"将左臂夹爪移动到试管正上方5cm处准备下降抓取"
- 当视角不清晰时,要求调整摄像头位置
- 左臂负责抓取,右臂负责辅助观察和定位
- 给出的指令必须可执行,避免模糊描述
**输出格式要求:**
- 使用纯文本输出不要使用任何Markdown格式符号
- 不要使用星号、井号、下划线等格式化符号
- 直接给出简洁分析和具体操作指令
- 输出内容适合语音播报
**输出格式:**
简洁分析 + 具体操作指令

View File

@@ -1,7 +1,7 @@
"""
Teleoperation Realman with a PS5 controller and LLM interaction
"""
import time
import torch
import numpy as np
@@ -11,7 +11,6 @@ from dataclasses import dataclass, field, replace
from collections import deque
import signal
import sys
from lerobot.common.robot_devices.teleop.realman_aloha_dual import HybridController
from lerobot.common.robot_devices.motors.utils import get_motor_names, make_motors_buses_from_configs
from lerobot.common.robot_devices.cameras.utils import make_cameras_from_configs
@@ -54,6 +53,7 @@ class RealmanDualRobot:
self._shutdown_flag = False
self._llm_triggered = False
def _initialize_teleop(self):
"""初始化遥操作控制器"""
self.init_info = {
@@ -95,11 +95,9 @@ class RealmanDualRobot:
print("检测到W键按下")
elif key_char == 'q':
self.q_pressed = True
print("检测到Q键按下")
print("检测到Q键按下")
self.keyboard_listener = create_keyboard_listener(on_key_press)
success = start_keyboard_listener(self.keyboard_listener)
success = start_keyboard_listener(self.keyboard_listener)
if success:
print("键盘监听器启动成功 (W键:调用LLM, Q键:退出)")
else:
@@ -115,8 +113,7 @@ class RealmanDualRobot:
def _execute_action(self, action: dict, state: dict):
"""执行机器人动作"""
before_write_t = time.perf_counter()
before_write_t = time.perf_counter()
if action['control_mode'] == 'joint':
pass
else:
@@ -125,21 +122,17 @@ class RealmanDualRobot:
self.arm.write_endpose_canfd(pose)
elif list(action['joint'].values()) != list(state['joint'].values()):
target_joint = list(action['joint'].values())
self.arm.write(target_joint)
self.arm.write(target_joint)
self.logs["write_pos_dt_s"] = time.perf_counter() - before_write_t
def _prepare_record_data(self) -> Tuple[Dict, Dict]:
"""准备记录数据 - 保持原有逻辑"""
if len(self.joint_queue) < 2:
return {}, {}
return {}, {}
state = torch.as_tensor(list(self.joint_queue[0]), dtype=torch.float32)
action = torch.as_tensor(list(self.joint_queue[-1]), dtype=torch.float32)
action = torch.as_tensor(list(self.joint_queue[-1]), dtype=torch.float32)
# 捕获图像
images = self._capture_images()
images = self._capture_images()
# 构建输出字典
obs_dict = {
"observation.state": state,
@@ -170,24 +163,23 @@ class RealmanDualRobot:
self.logs[f"async_read_camera_{name}_dt_s"] = time.perf_counter() - before_camread_t
return images
def _handle_llm_interaction(self, obs_dict: Dict) -> bool:
"""处理LLM交互逻辑"""
print("[W键已按下] 正在准备数据并调用LLM...")
# def _handle_llm_interaction(self, obs_dict: Dict) -> bool:
# """处理LLM交互逻辑"""
# print("[W键已按下] 正在准备数据并调用LLM...")
# 提取图像数据
camera_images = {}
for key, value in obs_dict.items():
if key.startswith("observation.images."):
camera_name = key.replace("observation.images.", "")
camera_images[camera_name] = value.cpu().numpy()
# # 提取图像数据
# camera_images = {}
# for key, value in obs_dict.items():
# if key.startswith("observation.images."):
# camera_name = key.replace("observation.images.", "")
# camera_images[camera_name] = value.cpu().numpy()
# 使用utils中的函数处理LLM交互
success, response = handle_llm_interaction_with_images(
"将超声仪左下角试管架上的蓝色试管移动到超声仪中",
camera_images
)
return success
# # 使用utils中的函数处理LLM交互
# success, response = handle_llm_interaction_with_images(
# "将超声仪左下角试管架上的蓝色试管移动到超声仪中",
# camera_images
# )
# return success
def connect(self) -> None:
"""连接机器人和摄像头"""
@@ -195,8 +187,7 @@ class RealmanDualRobot:
raise RobotDeviceAlreadyConnectedError(
"RealmanArm is already connected. Do not run `robot.connect()` twice."
)
# 连接机械臂
# 连接机械臂
self.arm.connect(enable=True)
print("RealmanArm conneted")
@@ -247,8 +238,7 @@ class RealmanDualRobot:
print(f"断开相机 {name} 时出错: {e}")
self.is_connected = False
print("所有设备已断开连接")
print("所有设备已断开连接")
except Exception as e:
print(f"断开连接时发生错误: {e}")
@@ -260,52 +250,6 @@ class RealmanDualRobot:
if not self.inference_time:
self.teleop.reset()
# def teleop_step(self, record_data=False) -> None | tuple[dict[str, torch.Tensor], dict[str, torch.Tensor]]:
# """遥操作步骤 - 保持原有数据记录逻辑添加LLM交互"""
# if not self.is_connected:
# raise ConnectionError()
# if self.teleop is None and self.inference_time:
# self.teleop = HybridController(self.init_info)
# try:
# # 检查退出条件
# if self.q_pressed:
# print("检测到Q键任务终止...")
# speak_async("任务已终止")
# raise KeyboardInterrupt("用户请求退出")
# # 执行基础遥操作
# state = self._read_robot_state()
# action = self.teleop.get_action(state)
# self._execute_action(action, state)
# # 更新状态队列
# self._update_state_queue()
# time.sleep(0.019) # 50Hz
# # 处理数据记录
# if record_data:
# data = self._prepare_record_data()
# if data[0]: # 如果有有效数据
# # 处理LLM交互请求
# if self.w_pressed:
# self.w_pressed = False # 重置标志位
# self._llm_triggered = True
# success = self._handle_llm_interaction(data[0])
# if not success:
# print("LLM交互处理失败")
# return data
# return None
# except KeyboardInterrupt:
# # 重新抛出键盘中断,让上层处理
# raise
# except Exception as e:
# logging.error(f"遥操作步骤失败: {e}")
# return None
def teleop_step(self, record_data=False) -> None | tuple[dict[str, torch.Tensor], dict[str, torch.Tensor]]:
"""遥操作步骤 - 保持原有数据记录逻辑添加LLM交互"""
if not self.is_connected:
@@ -336,9 +280,19 @@ class RealmanDualRobot:
# 处理LLM交互请求只有在有有效数据时才处理
if data[0] and self.w_pressed: # 如果有有效数据且W键被按下
self.w_pressed = False # 重置标志位
self._llm_triggered = True
success = self._handle_llm_interaction(data[0])
self.w_pressed = False
camera_images = {
name.replace("observation.images.", ""): img.cpu().numpy()
for name, img in data[0].items()
if name.startswith("observation.images.")
}
success, resp = handle_llm_interaction_with_images(
"将超声仪左下角试管架上的蓝色试管移动到超声仪中",
camera_images
)
# self.w_pressed = False # 重置标志位
# self._llm_triggered = True
# success = self._handle_llm_interaction(data[0])
if not success:
print("LLM交互处理失败")
@@ -352,16 +306,13 @@ class RealmanDualRobot:
# 捕获当前图像
images = self._capture_images()
obs_dict = {
"observation.state": state_tensor,
**{f"observation.images.{name}": img for name, img in images.items()}
}
action_dict = {"action": state_tensor} # 使用相同的状态作为动作
data = (obs_dict, action_dict)
return data
data = (obs_dict, action_dict)
return data
return None
except KeyboardInterrupt:
@@ -380,15 +331,13 @@ class RealmanDualRobot:
except:
# 如果连图像都无法捕获,创建空图像
for camera_name in self.cameras.keys():
images[camera_name] = torch.zeros((480, 640, 3), dtype=torch.uint8)
images[camera_name] = torch.zeros((480, 640, 3), dtype=torch.uint8)
obs_dict = {
"observation.state": state_tensor,
**{f"observation.images.{name}": img for name, img in images.items()}
}
action_dict = {"action": state_tensor}
return obs_dict, action_dict
return obs_dict, action_dict
return None
def send_action(self, action: torch.Tensor) -> torch.Tensor:
@@ -402,8 +351,7 @@ class RealmanDualRobot:
len_joint = len(target_joints) - 1
target_joints = [target_joints[i]*180 for i in range(len_joint)] + [target_joints[-1]]
target_joints[-1] = int(target_joints[-1]*500 + 500)
self.arm.write(target_joints)
self.arm.write(target_joints)
return action
def capture_observation(self) -> dict:
@@ -411,14 +359,12 @@ class RealmanDualRobot:
if not self.is_connected:
raise RobotDeviceNotConnectedError(
"Piper is not connected. You need to run `robot.connect()`."
)
)
before_read_t = time.perf_counter()
state = self.arm.read()
self.logs["read_pos_dt_s"] = time.perf_counter() - before_read_t
state = torch.as_tensor(list(state.values()), dtype=torch.float32)
state = torch.as_tensor(list(state.values()), dtype=torch.float32)
# 读取图像
images = {}
for name in self.cameras:
@@ -426,8 +372,7 @@ class RealmanDualRobot:
images[name] = self.cameras[name].async_read()
images[name] = torch.from_numpy(images[name])
self.logs[f"read_camera_{name}_dt_s"] = self.cameras[name].logs["delta_timestamp_s"]
self.logs[f"async_read_camera_{name}_dt_s"] = time.perf_counter() - before_camread_t
self.logs[f"async_read_camera_{name}_dt_s"] = time.perf_counter() - before_camread_t
# 构建观测字典
obs_dict = {}
obs_dict["observation.state"] = state
@@ -438,7 +383,6 @@ class RealmanDualRobot:
def teleop_safety_stop(self):
"""遥操作安全停止"""
self.run_calibration()
@property
def camera_features(self) -> dict:
"""获取摄像头特征"""
@@ -486,13 +430,11 @@ class RealmanDualRobot:
except:
pass
def signal_handler(signum, frame):
"""信号处理器"""
print("\n收到中断信号,正在安全退出...")
sys.exit(0)
if __name__ == '__main__':
signal.signal(signal.SIGINT, signal_handler)
robot = None
@@ -513,8 +455,7 @@ if __name__ == '__main__':
result = robot.teleop_step(record_data=True)
if result is None:
continue
# 这里可以处理记录的数据
# 这里可以处理记录的数据
except KeyboardInterrupt:
print("\n收到键盘中断信号")
except Exception as e:
@@ -525,4 +466,4 @@ if __name__ == '__main__':
robot.disconnect()
except:
pass
print("程序已完全退出")
print("程序已完全退出")

View File

@@ -13,6 +13,20 @@
# limitations under the License.
from typing import Protocol, Dict
# Robot configuration imports
import os
import re
import json
import base64
import yaml
import threading
from io import BytesIO
from typing import Protocol, Dict
import numpy as np
from openai import OpenAI
from PIL import Image
from pynput import keyboard
import torch
# Robot config imports
from lerobot.common.robot_devices.robots.configs import (
AlohaRobotConfig,
KochBimanualRobotConfig,
@@ -27,28 +41,23 @@ from lerobot.common.robot_devices.robots.configs import (
RealmanRobotConfig,
RealmanDualRobotConfig
)
# Added library imports for LLM interaction
import os
import base64
from io import BytesIO
import json
from openai import OpenAI
from PIL import Image
import numpy as np
import torch
def get_arm_id(name, arm_type):
"""Returns the string identifier of a robot arm."""
return f"{name}_{arm_type}"
class Robot(Protocol):
robot_type: str
features: dict
cameras: Dict
cameras: Dict
def connect(self): ...
def run_calibration(self): ...
def teleop_step(self, record_data=False): ...
def capture_observation(self) -> Dict: ...
def send_action(self, action): ...
def disconnect(self): ...
def get_arm_id(name, arm_type) -> str:
"""返回机器人手臂ID"""
return f"{name}_{arm_type}"
def make_robot_config(robot_type: str, **kwargs) -> RobotConfig:
if robot_type == "aloha":
return AlohaRobotConfig(**kwargs)
@@ -72,7 +81,9 @@ def make_robot_config(robot_type: str, **kwargs) -> RobotConfig:
return RealmanDualRobotConfig(**kwargs)
else:
raise ValueError(f"Robot type '{robot_type}' is not available.")
def make_robot_from_config(config: RobotConfig):
"""根据配置生成机器人实例"""
if isinstance(config, ManipulatorRobotConfig):
from lerobot.common.robot_devices.robots.manipulator import ManipulatorRobot
return ManipulatorRobot(config)
@@ -88,227 +99,55 @@ def make_robot_from_config(config: RobotConfig):
else:
from lerobot.common.robot_devices.robots.stretch import StretchRobot
return StretchRobot(config)
def make_robot(robot_type: str, **kwargs) -> Robot:
"""指定类型构造机器人"""
config = make_robot_config(robot_type, **kwargs)
return make_robot_from_config(config)
# 全局变量用于管理会话历史和API客户端
conversation_history = []
conversation_client = None
def reset_conversation_history():
"""清空对话历史,开始一个新任务。"""
global conversation_history
conversation_history = []
print("对话历史已重置。")
def numpy_to_base64(img_array: np.ndarray) -> str:
"""numpy数组转Base64 PNG图像"""
if img_array.dtype != np.uint8:
if img_array.max() <= 1.0:
img_array = (img_array * 255).astype(np.uint8)
else:
img_array = img_array.astype(np.uint8)
buffered = BytesIO()
Image.fromarray(img_array).save(buffered, format="PNG")
return base64.b64encode(buffered.getvalue()).decode('utf-8')
def extract_json_from_response(response: str) -> dict:
"""
从LLM响应中提取JSON格式的指令 - 从realman_dual.py移动过来
Args:
response: LLM的原始响应文本
Returns:
dict: 解析后的JSON指令字典
"""
"""解析LLM返回的结构化指令"""
try:
# 尝试直接解析整个响应
return json.loads(response)
except:
# 尝试从响应中查找JSON部分
import re
json_pattern = r'\{[^{}]*\}'
matches = re.findall(json_pattern, response)
for match in matches:
try:
return json.loads(match)
except:
continue
# 如果无法解析JSON返回默认格式
return {
"action": "unknown",
"description": response,
"status": "parse_error"
}
def ask_llm(query: str, state: dict):
"""
向大型语言模型发送查询,并获取下一步操作指令。
优化版本只保留最新的4张图片避免Token超限
"""
global conversation_client
global conversation_history
# 将NumPy数组转换为Base64编码的图片
def numpy_to_base64(img_array):
if img_array.dtype != np.uint8:
if img_array.max() <= 1.0:
img_array = (img_array * 255).astype(np.uint8)
else:
img_array = img_array.astype(np.uint8)
image = Image.fromarray(img_array)
buffered = BytesIO()
image.save(buffered, format="PNG")
img_base64 = base64.b64encode(buffered.getvalue()).decode('utf-8')
return img_base64
# 初始化API客户端
api_key = "sk-H5FY8bQn6ZwBCja56280C3A4C8824017A4CdB683Dc990e35"
base_url = "https://api.apiyi.com/v1"
# if not api_key:
# raise ValueError("API Key为空请直接在代码中填写。")
if conversation_client is None:
conversation_client = OpenAI(api_key=api_key, base_url=base_url)
# 仅在对话历史为空时添加系统提示
if not conversation_history:
system_prompt = f"""
你是一个聪明的双臂机器人助手:
**机器人配置:**
- 左臂:配备夹爪,可以抓取物体
- 右臂:没有夹爪,主要用于辅助定位和观察
- 四个摄像头high camera(俯视角)、front camera(正视角)、left camera(左臂视角)、right camera(右臂视角)
**任务目标:** {query}
**工作流程:**
1. 我向你展示当前4个摄像头的画面
2. 你分析场景,给出下一步具体操作指令
3. 我执行你的指令后,再次更新画面
4. 重复w此过程直到完成任务
**重要要求:**
- 每次只给出ONE STEP最关键的操作指令
- 指令要具体明确,便于执行(如"将左臂夹爪移动到试管正上方5cm处准备下降抓取"
- 当视角不清晰时,要求调整摄像头位置
- 左臂负责抓取,右臂负责辅助观察和定位
- 给出的指令必须可执行,避免模糊描述
**输出格式要求:**
- 使用纯文本输出不要使用任何Markdown格式符号
- 不要使用星号、井号、下划线等格式化符号
- 直接给出简洁分析和具体操作指令
- 输出内容适合语音播报
**输出格式:**
简洁分析 + 具体操作指令
"""
conversation_history.append({"role": "system", "content": system_prompt})
# 如果对话历史中有图片消息,只保留最近的一轮对话(系统消息 + 最后一轮用户-助手对话)
if len(conversation_history) > 3: # 系统消息 + 用户消息 + 助手回复
# 保留系统消息
system_msg = conversation_history[0]
# 只保留最后一轮对话(最后的用户消息和助手回复)
recent_messages = conversation_history[-2:] # 最后2条消息
conversation_history = [system_msg] + recent_messages
print("清理对话历史避免Token超限")
# 构建本次用户输入的消息内容
user_content = [{"type": "text", "text": "这是当前的4个摄像头画面请根据任务目标分析场景并给出下一步具体操作指令。"}]
# 添加图片,按照指定顺序
camera_order = ['high', 'front', 'left', 'right']
for camera_name in camera_order:
if camera_name in state:
img_base64 = numpy_to_base64(state[camera_name])
user_content.append({
"type": "image_url",
"image_url": {
"url": f"data:image/png;base64,{img_base64}",
"detail": "high" # 使用高清晰度分析
}
})
# 将当前用户输入添加到对话历史中
conversation_history.append({"role": "user", "content": user_content})
try:
completion = conversation_client.chat.completions.create(
model="claude-sonnet-4-20250514",
messages=conversation_history,
max_tokens=800,
temperature=0.3
)
response_message = completion.choices[0].message
response_content = response_message.content
# 将模型的响应添加到历史中
conversation_history.append(response_message)
print(f"机器人响应: {response_content}")
return response_content
except Exception as e:
error_msg = str(e)
print(f"调用LLM时出错: {error_msg}")
# Token超限时的特殊处理
if "token_limit_exceeded" in error_msg or "context_length_exceeded" in error_msg:
print("检测到Token超限清空对话历史重新开始...")
# 清空除系统消息外的所有历史
if conversation_history:
conversation_history = [conversation_history[0]] # 只保留系统消息
# 移除刚刚添加的用户消息,准备重试
if conversation_history and len(conversation_history) > 1:
conversation_history.pop()
return "由于对话历史过长已重置对话。请再次按Enter键继续。"
# 其他错误时也要清理最后的用户输入
if conversation_history and conversation_history[-1]["role"] == "user":
conversation_history.pop()
return None
def extract_json_from_response(response: str) -> dict:
"""
从LLM响应中提取结构化指令 - 优化版本
"""
try:
# 尝试直接解析整个响应
return json.loads(response)
except:
# 如果无法解析JSON创建结构化的响应
return {
"action": "move_arm", # 默认动作类型
"action": "move_arm",
"description": response.strip(),
"arm": "left", # 默认使用左臂
"arm": "left",
"target": "unknown",
"status": "ready"
}
from pynput import keyboard
import threading
def create_keyboard_listener(on_key_press_callback):
"""
创建键盘监听器
Args:
on_key_press_callback: 按键回调函数,接收参数 (key_char)
Returns:
keyboard.Listener: 键盘监听器对象
"""
"""创建键盘监听器"""
def on_press(key):
try:
if hasattr(key, 'char') and key.char:
on_key_press_callback(key.char.lower())
except AttributeError:
pass
return keyboard.Listener(on_press=on_press)
def start_keyboard_listener(listener):
"""启动键盘监听器"""
try:
listener.start()
return True
except Exception as e:
print(f"键盘监听器启动失败: {e}")
return False
def stop_keyboard_listener(listener):
"""停止键盘监听器"""
try:
if listener and listener.is_alive():
listener.stop()
@@ -316,71 +155,61 @@ def stop_keyboard_listener(listener):
except Exception:
pass
return False
def speak_async(text: str):
"""异步语音播报"""
from lerobot.common.utils.utils import say
def speak_thread():
try:
print(f"开始语音播报: {text}")
say(text, blocking=True)
print("语音播报完成")
except Exception as e:
print(f"语音播报失败: {e}")
thread = threading.Thread(target=speak_thread, daemon=True)
thread.start()
def ask_llm(query: str, state: dict, history=None):
"""调用LLM进行场景分析"""
api_key = os.getenv("ASK_LLM_API_KEY")
base_url = os.getenv("ASK_LLM_BASE_URL")
model_name = os.getenv("ASK_LLM_MODEL")
if not (api_key and base_url and model_name):
raise EnvironmentError("请设置 ASK_LLM_API_KEY, ASK_LLM_BASE_URL, ASK_LLM_MODEL 环境变量")
prompt_path = os.path.join(os.path.dirname(__file__), "llm_prompt.yaml")
with open(prompt_path, "r", encoding="utf-8") as f:
prompts = yaml.safe_load(f)
system_prompt = prompts["system_prompt"].replace("{query}", query)
if history is None:
history = []
if not history:
history.append({"role": "system", "content": system_prompt})
if len(history) > 3:
history = [history[0]] + history[-2:] # 保留系统提示和最后一轮
user_content = [{"type": "text", "text": "以下是当前4个摄像头画面请分析并给出一步操作指令"}]
for name in ["high", "front", "left", "right"]:
if name in state:
img_base64 = numpy_to_base64(state[name])
user_content.append({
"type": "image_url",
"image_url": {"url": f"data:image/png;base64,{img_base64}", "detail": "high"}
})
history.append({"role": "user", "content": user_content})
client = OpenAI(api_key=api_key, base_url=base_url)
completion = client.chat.completions.create(model=model_name, messages=history, max_tokens=800, temperature=0.3)
resp_msg = completion.choices[0].message
history.append(resp_msg)
return resp_msg.content
def handle_llm_interaction_with_images(query: str, camera_images: dict):
"""
处理LLM交互接收相机图像字典
Args:
query: 用户查询文本
camera_images: 相机图像字典 {camera_name: numpy_array}
Returns:
tuple: (success: bool, response: str)
"""
"""处理LLM交互"""
if len(camera_images) != 4:
print(f"警告: 期望4个相机实际获取到{len(camera_images)}")
return False, "相机数量不足"
print("图像提取成功正在向LLM发送请求...")
return False, "摄像头数量不足"
try:
response = ask_llm(query, camera_images)
if response:
if "已重置对话" in response:
print("对话历史已重置,请再次按键继续")
return True, response
instruction = extract_json_from_response(response)
print("\n下一步指令:")
print(f"操作类型: {instruction.get('action', 'unknown')}")
print(f"详细描述: {instruction.get('description', '无描述')}")
# 准备语音播报内容
if "description" in instruction:
desc = instruction['description']
if len(desc) > 100:
sentences = desc.split('')
key_instruction = sentences[-2] if len(sentences) > 1 else desc[:50]
else:
key_instruction = desc
speech_text = f"下一步操作:{key_instruction}"
print(f"即将播报: {speech_text}")
speak_async(speech_text)
return True, response
else:
print("LLM请求失败可能是网络问题")
speak_async("LLM请求失败")
return False, "LLM请求失败"
instruction = extract_json_from_response(response)
if desc := instruction.get("description"):
speak_async(f"下一步操作:{desc}")
return True, response
except Exception as e:
print(f"LLM交互出错: {e}")
print("建议1.检查网络连接 2.检查API密钥 3.稍后重试")
speak_async("LLM交互失败")
return False, f"LLM交互出错: {e}"
return False, str(e)

View File

@@ -406,7 +406,7 @@ if __name__ == "__main__":
'max_gripper': {},
'min_gripper': {},
'servo_config_file': {},
'end_control_info': {'left': "0300b14bff1100003708000010010000" , 'right': '0300509d5e040000120b000009050000'}
'end_control_info': {'left': "0300b14bff1100003708000010010000" , 'right': '0300e639bc2000007f50000011010000'}
}
config = parse_init_info(config)
endpose_arm = DummyEndposeMaster(config)

View File

@@ -151,4 +151,18 @@ python lerobot/scripts/control_robot.py \
--control.push_to_hub=false \
--control.policy.path=outputs/train/smolvla_move_the_bottle_into_ultrasonic_device_with_realman_single/checkpoints/160000/pretrained_model \
--control.display_data=true
```
```
```bash
export ASK_LLM_BASE_URL=https://api.apiyi.com/v1
export ASK_LLM_API_KEY=sk-H5FY8bQn6ZwBCja56280C3A4C8824017A4CdB683Dc990e35
export ASK_LLM_MODEL=claude-sonnet-4-20250514
python lerobot/scripts/control_robot.py \
--robot.type=realman_dual \
--robot.inference_time=false \
--control.type=teleoperate \
--control.display_data=true \
--robot.left_end_control_guid="0300b14bff1100003708000010010000" \
--robot.right_end_control_guid="0300e639bc2000007f50000011010000"
```