Compare commits

...

2 Commits

Author SHA1 Message Date
1752309c77 语音播报修改
Some checks failed
Secret Leaks / trufflehog (push) Failing after 18s
2025-08-20 21:45:22 +08:00
63a8d74733 语音播报 2025-08-11 20:41:02 +08:00
7 changed files with 499 additions and 240 deletions

View File

@@ -756,7 +756,7 @@ class RealmanDualRobotConfig(RobotConfig):
min_gripper: int = 10
servo_config_file: str = "/home/maic/LYT/lerobot/lerobot/common/robot_devices/teleop/servo_dual.yaml"
left_end_control_guid: str = '0300b14bff1100003708000010010000'
right_end_control_guid: str = '030003f05e0400008e02000010010000'
right_end_control_guid: str = '0300e639bc2000007f50000011010000'
follower_arm: dict[str, MotorsBusConfig] = field(
default_factory=lambda: {

View File

@@ -0,0 +1,25 @@
system_prompt: |
你是一个聪明的双臂机器人助手:
**机器人配置:**
- 左臂:配备夹爪,可以抓取物体
- 右臂:没有夹爪,主要用于辅助定位和观察
- 四个摄像头high camera(俯视角)、front camera(正视角)、left camera(左臂视角)、right camera(右臂视角)
**任务目标:** {query}
**工作流程:**
1. 我向你展示当前4个摄像头的画面
2. 你分析场景,给出下一步具体操作指令
3. 我执行你的指令后,再次更新画面
4. 重复此过程直到完成任务
**重要要求:**
- 每次只给出ONE STEP最关键的操作指令
- 指令要具体明确,便于执行(如"将左臂夹爪移动到试管正上方5cm处准备下降抓取"
- 当视角不清晰时,要求调整摄像头位置
- 左臂负责抓取,右臂负责辅助观察和定位
- 给出的指令必须可执行,避免模糊描述
**输出格式要求:**
- 使用纯文本输出不要使用任何Markdown格式符号
- 不要使用星号、井号、下划线等格式化符号
- 直接给出简洁分析和具体操作指令
- 输出内容适合语音播报
**输出格式:**
简洁分析 + 具体操作指令

View File

@@ -1,7 +1,7 @@
"""
Teleoperation Realman with a PS5 controller and
"""
"""
Teleoperation Realman with a PS5 controller and LLM interaction
"""
import time
import torch
import numpy as np
@@ -9,58 +9,100 @@ import logging
from typing import Optional, Tuple, Dict
from dataclasses import dataclass, field, replace
from collections import deque
import signal
import sys
from lerobot.common.robot_devices.teleop.realman_aloha_dual import HybridController
from lerobot.common.robot_devices.motors.utils import get_motor_names, make_motors_buses_from_configs
from lerobot.common.robot_devices.cameras.utils import make_cameras_from_configs
from lerobot.common.robot_devices.utils import RobotDeviceAlreadyConnectedError, RobotDeviceNotConnectedError
from lerobot.common.robot_devices.robots.configs import RealmanDualRobotConfig
from lerobot.common.robot_devices.robots.utils import ask_llm
from lerobot.common.robot_devices.robots.utils import (
ask_llm,
extract_json_from_response,
create_keyboard_listener,
start_keyboard_listener,
stop_keyboard_listener,
speak_async,
handle_llm_interaction_with_images
)
class RealmanDualRobot:
"""RealmanDual机器人控制类支持双臂操作和LLM交互"""
def __init__(self, config: RealmanDualRobotConfig | None = None, **kwargs):
if config is None:
config = RealmanDualRobotConfig()
# Overwrite config arguments using kwargs
# 配置初始化
self.config = replace(config, **kwargs)
self.robot_type = self.config.type
self.inference_time = self.config.inference_time # if it is inference time
self.inference_time = self.config.inference_time
# build cameras
# 硬件初始化
self.cameras = make_cameras_from_configs(self.config.cameras)
# build realman motors
self.piper_motors = make_motors_buses_from_configs(self.config.follower_arm)
self.arm = self.piper_motors['main']
# 初始化遥操作
# 控制系统初始化
self._initialize_teleop()
# init state
self._initialize_state()
self._initialize_keyboard_interface()
# 状态标志
self._shutdown_flag = False
self._llm_triggered = False
def _initialize_teleop(self):
"""初始化遥操作"""
"""初始化遥操作控制器"""
self.init_info = {
'init_joint': self.arm.init_joint_position,
'init_pose': self.arm.init_pose,
'max_gripper': self.config.max_gripper,
'min_gripper': self.config.min_gripper,
'servo_config_file': self.config.servo_config_file,
'end_control_info': {'left': self.config.left_end_control_guid , 'right': self.config.right_end_control_guid}
'end_control_info': {
'left': self.config.left_end_control_guid,
'right': self.config.right_end_control_guid
}
}
if not self.inference_time:
self.teleop = HybridController(self.init_info)
else:
self.teleop = None
def _initialize_state(self):
"""初始化状态"""
"""初始化状态管理"""
self.joint_queue = deque(maxlen=2)
self.last_endpose = self.arm.init_pose
self.logs = {}
self.is_connected = False
def _initialize_keyboard_interface(self):
"""初始化键盘交互接口"""
self.w_pressed = False # W键触发LLM
self.q_pressed = False # Q键退出
self.keyboard_listener = None
self._start_keyboard_listener()
def _start_keyboard_listener(self):
"""启动键盘监听器"""
def on_key_press(key_char):
if key_char == 'w':
self.w_pressed = True
print("检测到W键按下")
elif key_char == 'q':
self.q_pressed = True
print("检测到Q键按下")
self.keyboard_listener = create_keyboard_listener(on_key_press)
success = start_keyboard_listener(self.keyboard_listener)
if success:
print("键盘监听器启动成功 (W键:调用LLM, Q键:退出)")
else:
print("键盘监听器启动失败")
def _read_robot_state(self) -> dict:
"""读取机器人状态"""
before_read_t = time.perf_counter()
@@ -70,32 +112,27 @@ class RealmanDualRobot:
return state
def _execute_action(self, action: dict, state: dict):
"""执行动作"""
before_write_t = time.perf_counter()
"""执行机器人动作"""
before_write_t = time.perf_counter()
if action['control_mode'] == 'joint':
# self.arm.write_action(action, state)
pass
else:
if list(action['pose'].values()) != list(state['pose'].values()):
pose = list(action['pose'].values())
self.arm.write_endpose_canfd(pose)
elif list(action['joint'].values()) != list(state['joint'].values()):
target_joint = list(action['joint'].values())
self.arm.write(target_joint)
self.arm.write(target_joint)
self.logs["write_pos_dt_s"] = time.perf_counter() - before_write_t
def _prepare_record_data(self) -> Tuple[Dict, Dict]:
"""准备记录数据"""
"""准备记录数据 - 保持原有逻辑"""
if len(self.joint_queue) < 2:
return {}, {}
return {}, {}
state = torch.as_tensor(list(self.joint_queue[0]), dtype=torch.float32)
action = torch.as_tensor(list(self.joint_queue[-1]), dtype=torch.float32)
action = torch.as_tensor(list(self.joint_queue[-1]), dtype=torch.float32)
# 捕获图像
images = self._capture_images()
images = self._capture_images()
# 构建输出字典
obs_dict = {
"observation.state": state,
@@ -103,7 +140,7 @@ class RealmanDualRobot:
}
action_dict = {"action": action}
return obs_dict, action_dict
def _update_state_queue(self):
"""更新状态队列"""
current_state = self.arm.read()['joint']
@@ -114,21 +151,241 @@ class RealmanDualRobot:
elif "gripper" in data:
current_state_lst.append((current_state[data]-500)/500)
self.joint_queue.append(current_state_lst)
def _capture_images(self) -> Dict[str, torch.Tensor]:
"""捕获图像"""
"""捕获摄像头图像"""
images = {}
for name, camera in self.cameras.items():
before_camread_t = time.perf_counter()
image = camera.async_read()
images[name] = torch.from_numpy(image)
self.logs[f"read_camera_{name}_dt_s"] = camera.logs["delta_timestamp_s"]
self.logs[f"async_read_camera_{name}_dt_s"] = time.perf_counter() - before_camread_t
return images
# def _handle_llm_interaction(self, obs_dict: Dict) -> bool:
# """处理LLM交互逻辑"""
# print("[W键已按下] 正在准备数据并调用LLM...")
# # 提取图像数据
# camera_images = {}
# for key, value in obs_dict.items():
# if key.startswith("observation.images."):
# camera_name = key.replace("observation.images.", "")
# camera_images[camera_name] = value.cpu().numpy()
# # 使用utils中的函数处理LLM交互
# success, response = handle_llm_interaction_with_images(
# "将超声仪左下角试管架上的蓝色试管移动到超声仪中",
# camera_images
# )
# return success
def connect(self) -> None:
"""连接机器人和摄像头"""
if self.is_connected:
raise RobotDeviceAlreadyConnectedError(
"RealmanArm is already connected. Do not run `robot.connect()` twice."
)
# 连接机械臂
self.arm.connect(enable=True)
print("RealmanArm conneted")
# 连接摄像头
for name in self.cameras:
self.cameras[name].connect()
self.is_connected = self.is_connected and self.cameras[name].is_connected
print(f"camera {name} conneted")
print("All connected")
self.is_connected = True
self.run_calibration()
def disconnect(self) -> None:
"""断开连接"""
if self._shutdown_flag:
return
self._shutdown_flag = True
try:
# 停止键盘监听器
stop_keyboard_listener(self.keyboard_listener)
print("键盘监听器已停止")
# 停止遥操作控制器
if hasattr(self, 'teleop') and self.teleop and not self.inference_time:
self.teleop.stop()
print("遥操作控制器已停止")
# 断开机械臂连接
if hasattr(self, 'arm'):
try:
self.arm.safe_disconnect()
print("RealmanArm 安全断开连接")
time.sleep(2)
self.arm.connect(enable=False)
print("RealmanArm 已禁用")
except Exception as e:
print(f"断开机械臂连接时出错: {e}")
# 断开摄像头连接
if len(self.cameras) > 0:
for name, cam in self.cameras.items():
try:
cam.disconnect()
print(f"Camera {name} 已断开连接")
except Exception as e:
print(f"断开相机 {name} 时出错: {e}")
self.is_connected = False
print("所有设备已断开连接")
except Exception as e:
print(f"断开连接时发生错误: {e}")
def run_calibration(self):
"""运行标定"""
if not self.is_connected:
raise ConnectionError()
self.arm.apply_calibration()
if not self.inference_time:
self.teleop.reset()
def teleop_step(self, record_data=False) -> None | tuple[dict[str, torch.Tensor], dict[str, torch.Tensor]]:
"""遥操作步骤 - 保持原有数据记录逻辑添加LLM交互"""
if not self.is_connected:
raise ConnectionError()
if self.teleop is None and self.inference_time:
self.teleop = HybridController(self.init_info)
try:
# 检查退出条件
if self.q_pressed:
print("检测到Q键任务终止...")
speak_async("任务已终止")
raise KeyboardInterrupt("用户请求退出")
# 执行基础遥操作
state = self._read_robot_state()
action = self.teleop.get_action(state)
self._execute_action(action, state)
# 更新状态队列
self._update_state_queue()
time.sleep(0.019) # 50Hz
# 处理数据记录
if record_data:
data = self._prepare_record_data()
# 处理LLM交互请求只有在有有效数据时才处理
if data[0] and self.w_pressed: # 如果有有效数据且W键被按下
self.w_pressed = False
camera_images = {
name.replace("observation.images.", ""): img.cpu().numpy()
for name, img in data[0].items()
if name.startswith("observation.images.")
}
success, resp = handle_llm_interaction_with_images(
"将超声仪左下角试管架上的蓝色试管移动到超声仪中",
camera_images
)
# self.w_pressed = False # 重置标志位
# self._llm_triggered = True
# success = self._handle_llm_interaction(data[0])
if not success:
print("LLM交互处理失败")
# 如果没有有效数据,创建默认数据
if not data[0]:
# 创建默认的观测数据
if len(self.joint_queue) > 0:
state_tensor = torch.as_tensor(list(self.joint_queue[-1]), dtype=torch.float32)
else:
state_tensor = torch.zeros(14, dtype=torch.float32) # 根据你的机器人调整维度
# 捕获当前图像
images = self._capture_images()
obs_dict = {
"observation.state": state_tensor,
**{f"observation.images.{name}": img for name, img in images.items()}
}
action_dict = {"action": state_tensor} # 使用相同的状态作为动作
data = (obs_dict, action_dict)
return data
return None
except KeyboardInterrupt:
# 重新抛出键盘中断,让上层处理
raise
except Exception as e:
logging.error(f"遥操作步骤失败: {e}")
# 即使出错在record_data=True时也要返回有效数据
if record_data:
# 创建紧急默认数据
state_tensor = torch.zeros(14, dtype=torch.float32) # 根据你的机器人调整维度
images = {}
try:
images = self._capture_images()
except:
# 如果连图像都无法捕获,创建空图像
for camera_name in self.cameras.keys():
images[camera_name] = torch.zeros((480, 640, 3), dtype=torch.uint8)
obs_dict = {
"observation.state": state_tensor,
**{f"observation.images.{name}": img for name, img in images.items()}
}
action_dict = {"action": state_tensor}
return obs_dict, action_dict
return None
def send_action(self, action: torch.Tensor) -> torch.Tensor:
"""发送动作到机器人"""
if not self.is_connected:
raise RobotDeviceNotConnectedError(
"Piper is not connected. You need to run `robot.connect()`."
)
target_joints = action.tolist()
len_joint = len(target_joints) - 1
target_joints = [target_joints[i]*180 for i in range(len_joint)] + [target_joints[-1]]
target_joints[-1] = int(target_joints[-1]*500 + 500)
self.arm.write(target_joints)
return action
def capture_observation(self) -> dict:
"""捕获当前观测"""
if not self.is_connected:
raise RobotDeviceNotConnectedError(
"Piper is not connected. You need to run `robot.connect()`."
)
before_read_t = time.perf_counter()
state = self.arm.read()
self.logs["read_pos_dt_s"] = time.perf_counter() - before_read_t
state = torch.as_tensor(list(state.values()), dtype=torch.float32)
# 读取图像
images = {}
for name in self.cameras:
before_camread_t = time.perf_counter()
images[name] = self.cameras[name].async_read()
images[name] = torch.from_numpy(images[name])
self.logs[f"read_camera_{name}_dt_s"] = self.cameras[name].logs["delta_timestamp_s"]
self.logs[f"async_read_camera_{name}_dt_s"] = time.perf_counter() - before_camread_t
# 构建观测字典
obs_dict = {}
obs_dict["observation.state"] = state
for name in self.cameras:
obs_dict[f"observation.images.{name}"] = images[name]
return obs_dict
def teleop_safety_stop(self):
"""遥操作安全停止"""
self.run_calibration()
@property
def camera_features(self) -> dict:
"""获取摄像头特征"""
cam_ft = {}
for cam_key, cam in self.cameras.items():
key = f"observation.images.{cam_key}"
@@ -139,9 +396,9 @@ class RealmanDualRobot:
}
return cam_ft
@property
def motor_features(self) -> dict:
"""获取电机特征"""
action_names = get_motor_names(self.piper_motors)
state_names = get_motor_names(self.piper_motors)
return {
@@ -156,7 +413,7 @@ class RealmanDualRobot:
"names": state_names,
},
}
@property
def has_camera(self):
return len(self.cameras) > 0
@@ -165,158 +422,48 @@ class RealmanDualRobot:
def num_cameras(self):
return len(self.cameras)
def connect(self) -> None:
"""Connect RealmanArm and cameras"""
if self.is_connected:
raise RobotDeviceAlreadyConnectedError(
"RealmanArm is already connected. Do not run `robot.connect()` twice."
)
# connect RealmanArm
self.arm.connect(enable=True)
print("RealmanArm conneted")
# connect cameras
for name in self.cameras:
self.cameras[name].connect()
self.is_connected = self.is_connected and self.cameras[name].is_connected
print(f"camera {name} conneted")
print("All connected")
self.is_connected = True
self.run_calibration()
def disconnect(self) -> None:
"""move to home position, disenable piper and cameras"""
# move piper to home position, disable
if not self.inference_time:
self.teleop.stop()
# disconnect piper
self.arm.safe_disconnect()
print("RealmanArm disable after 5 seconds")
time.sleep(5)
self.arm.connect(enable=False)
# disconnect cameras
if len(self.cameras) > 0:
for cam in self.cameras.values():
cam.disconnect()
self.is_connected = False
def run_calibration(self):
"""move piper to the home position"""
if not self.is_connected:
raise ConnectionError()
self.arm.apply_calibration()
if not self.inference_time:
self.teleop.reset()
def teleop_step(
self, record_data=False
) -> None | tuple[dict[str, torch.Tensor], dict[str, torch.Tensor]]:
if not self.is_connected:
raise ConnectionError()
if self.teleop is None and self.inference_time:
self.teleop = HybridController(self.init_info)
try:
# 读取当前状态
state = self._read_robot_state()
# 获取动作
action = self.teleop.get_action(state)
self._execute_action(action, state)
# 更新状态队列
self._update_state_queue()
time.sleep(0.019) # 50HZ
if record_data:
data = self._prepare_record_data()
if data[0]:
# # ask_llm("将超声仪左下角试管架上的试管移动到超声仪中", data[0])
pass
return data
except Exception as e:
logging.error(f"遥操作步骤失败: {e}")
return None
def send_action(self, action: torch.Tensor) -> torch.Tensor:
"""Write the predicted actions from policy to the motors"""
if not self.is_connected:
raise RobotDeviceNotConnectedError(
"Piper is not connected. You need to run `robot.connect()`."
)
# send to motors, torch to list
target_joints = action.tolist()
len_joint = len(target_joints) - 1
target_joints = [target_joints[i]*180 for i in range(len_joint)] + [target_joints[-1]]
target_joints[-1] = int(target_joints[-1]*500 + 500)
self.arm.write(target_joints)
return action
def capture_observation(self) -> dict:
"""capture current images and joint positions"""
if not self.is_connected:
raise RobotDeviceNotConnectedError(
"Piper is not connected. You need to run `robot.connect()`."
)
# read current joint positions
before_read_t = time.perf_counter()
state = self.arm.read() # 6 joints + 1 gripper
self.logs["read_pos_dt_s"] = time.perf_counter() - before_read_t
state = torch.as_tensor(list(state.values()), dtype=torch.float32)
# read images from cameras
images = {}
for name in self.cameras:
before_camread_t = time.perf_counter()
images[name] = self.cameras[name].async_read()
images[name] = torch.from_numpy(images[name])
self.logs[f"read_camera_{name}_dt_s"] = self.cameras[name].logs["delta_timestamp_s"]
self.logs[f"async_read_camera_{name}_dt_s"] = time.perf_counter() - before_camread_t
# Populate output dictionnaries and format to pytorch
obs_dict = {}
obs_dict["observation.state"] = state
for name in self.cameras:
obs_dict[f"observation.images.{name}"] = images[name]
return obs_dict
def teleop_safety_stop(self):
""" move to home position after record one episode """
self.run_calibration()
def __del__(self):
if self.is_connected:
self.disconnect()
if not self.inference_time:
self.teleop.stop()
"""析构函数"""
try:
if not self._shutdown_flag:
self.disconnect()
except:
pass
def signal_handler(signum, frame):
"""信号处理器"""
print("\n收到中断信号,正在安全退出...")
sys.exit(0)
if __name__ == '__main__':
robot = RealmanDualRobot()
robot.connect()
# robot.run_calibration()
while True:
robot.teleop_step(record_data=True)
signal.signal(signal.SIGINT, signal_handler)
robot = None
# robot.capture_observation()
# dummy_action = torch.Tensor([-0.40586111280653214, 0.5522833506266276, 0.4998166826036241, -0.3539944542778863, -0.524433347913954, 0.9064999898274739, 0.482])
# robot.send_action(dummy_action)
# robot.disconnect()
# print('ok')
try:
robot = RealmanDualRobot()
robot.connect()
print("RealmanDual 机器人控制已启动")
print("操作说明:")
print(" - 使用手柄进行遥操作控制")
print(" - 按 W 键调用LLM分析当前场景")
print(" - 按 Q 键:安全退出程序")
print(" - Ctrl+C强制退出")
print("\n等待操作...")
while True:
result = robot.teleop_step(record_data=True)
if result is None:
continue
# 这里可以处理记录的数据
except KeyboardInterrupt:
print("\n收到键盘中断信号")
except Exception as e:
print(f"程序运行出错: {e}")
finally:
if robot:
try:
robot.disconnect()
except:
pass
print("程序已完全退出")

View File

@@ -11,14 +11,22 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
conversation_history = []
conversation_client = None
from typing import Protocol, Dict
# Robot configuration imports
import os
import re
import json
import base64
import yaml
import threading
from io import BytesIO
from typing import Protocol, Dict
import numpy as np
from openai import OpenAI
from PIL import Image
from pynput import keyboard
import torch
# Robot config imports
from lerobot.common.robot_devices.robots.configs import (
AlohaRobotConfig,
KochBimanualRobotConfig,
@@ -34,28 +42,10 @@ from lerobot.common.robot_devices.robots.configs import (
RealmanDualRobotConfig
)
# Added library imports for LLM interaction
from openai import OpenAI
import base64
import os
import cv2
import torch
import numpy as np
from pynput import keyboard
import time
import json
from datetime import datetime
def get_arm_id(name, arm_type):
"""Returns the string identifier of a robot arm."""
return f"{name}_{arm_type}"
class Robot(Protocol):
robot_type: str
features: dict
cameras: Dict
cameras: Dict
def connect(self): ...
def run_calibration(self): ...
def teleop_step(self, record_data=False): ...
@@ -64,8 +54,11 @@ class Robot(Protocol):
def disconnect(self): ...
def get_arm_id(name, arm_type) -> str:
"""返回机器人手臂ID"""
return f"{name}_{arm_type}"
def make_robot_config(robot_type: str, **kwargs) -> RobotConfig:
# ... (此函数内容保持不变) ...
if robot_type == "aloha":
return AlohaRobotConfig(**kwargs)
elif robot_type == "koch":
@@ -89,9 +82,8 @@ def make_robot_config(robot_type: str, **kwargs) -> RobotConfig:
else:
raise ValueError(f"Robot type '{robot_type}' is not available.")
def make_robot_from_config(config: RobotConfig):
# ... (此函数内容保持不变) ...
"""根据配置生成机器人实例"""
if isinstance(config, ManipulatorRobotConfig):
from lerobot.common.robot_devices.robots.manipulator import ManipulatorRobot
return ManipulatorRobot(config)
@@ -108,36 +100,116 @@ def make_robot_from_config(config: RobotConfig):
from lerobot.common.robot_devices.robots.stretch import StretchRobot
return StretchRobot(config)
def make_robot(robot_type: str, **kwargs) -> Robot:
"""指定类型构造机器人"""
config = make_robot_config(robot_type, **kwargs)
return make_robot_from_config(config)
# -------------------- LLM 交互功能区 --------------------
def numpy_to_base64(img_array: np.ndarray) -> str:
"""numpy数组转Base64 PNG图像"""
if img_array.dtype != np.uint8:
if img_array.max() <= 1.0:
img_array = (img_array * 255).astype(np.uint8)
else:
img_array = img_array.astype(np.uint8)
buffered = BytesIO()
Image.fromarray(img_array).save(buffered, format="PNG")
return base64.b64encode(buffered.getvalue()).decode('utf-8')
def encode_image_to_base64(image_tensor: torch.Tensor) -> str:
"""将PyTorch张量格式的图像编码为Base64字符串"""
image_np = image_tensor.cpu().numpy().astype(np.uint8)
image_bgr = cv2.cvtColor(image_np, cv2.COLOR_RGB2BGR)
success, buffer = cv2.imencode(".jpg", image_bgr)
if not success:
raise ValueError("图像编码失败")
return base64.b64encode(buffer).decode("utf-8")
def extract_json_from_response(response: str) -> dict:
"""解析LLM返回的结构化指令"""
try:
return json.loads(response)
except:
return {
"action": "move_arm",
"description": response.strip(),
"arm": "left",
"target": "unknown",
"status": "ready"
}
# 在文件顶部添加全局变量来管理会话
conversation_history = []
conversation_client = None
def create_keyboard_listener(on_key_press_callback):
"""创建键盘监听器"""
def on_press(key):
try:
if hasattr(key, 'char') and key.char:
on_key_press_callback(key.char.lower())
except AttributeError:
pass
return keyboard.Listener(on_press=on_press)
def ask_llm(query: str, state: dict):
prompt = """ """
api_key = os.getenv("OPENAI_API_KEY")
base_url = os.getenv("OPENAI_BASE_URL")
def start_keyboard_listener(listener):
try:
listener.start()
return True
except Exception as e:
print(f"键盘监听器启动失败: {e}")
return False
def stop_keyboard_listener(listener):
try:
if listener and listener.is_alive():
listener.stop()
return True
except Exception:
pass
return False
def speak_async(text: str):
"""异步语音播报"""
from lerobot.common.utils.utils import say
def speak_thread():
try:
say(text, blocking=True)
except Exception as e:
print(f"语音播报失败: {e}")
thread = threading.Thread(target=speak_thread, daemon=True)
thread.start()
def ask_llm(query: str, state: dict, history=None):
"""调用LLM进行场景分析"""
api_key = os.getenv("ASK_LLM_API_KEY")
base_url = os.getenv("ASK_LLM_BASE_URL")
model_name = os.getenv("ASK_LLM_MODEL")
if not (api_key and base_url and model_name):
raise EnvironmentError("请设置 ASK_LLM_API_KEY, ASK_LLM_BASE_URL, ASK_LLM_MODEL 环境变量")
prompt_path = os.path.join(os.path.dirname(__file__), "llm_prompt.yaml")
with open(prompt_path, "r", encoding="utf-8") as f:
prompts = yaml.safe_load(f)
system_prompt = prompts["system_prompt"].replace("{query}", query)
if history is None:
history = []
if not history:
history.append({"role": "system", "content": system_prompt})
if len(history) > 3:
history = [history[0]] + history[-2:] # 保留系统提示和最后一轮
user_content = [{"type": "text", "text": "以下是当前4个摄像头画面请分析并给出一步操作指令"}]
for name in ["high", "front", "left", "right"]:
if name in state:
img_base64 = numpy_to_base64(state[name])
user_content.append({
"type": "image_url",
"image_url": {"url": f"data:image/png;base64,{img_base64}", "detail": "high"}
})
history.append({"role": "user", "content": user_content})
client = OpenAI(api_key=api_key, base_url=base_url)
# keys = [key for key in state]
# import pdb
# pdb.set_trace()
completion = client.chat.completions.create(model=model_name, messages=history, max_tokens=800, temperature=0.3)
resp_msg = completion.choices[0].message
history.append(resp_msg)
return resp_msg.content
pass
def handle_llm_interaction_with_images(query: str, camera_images: dict):
"""处理LLM交互"""
if len(camera_images) != 4:
return False, "摄像头数量不足"
try:
response = ask_llm(query, camera_images)
instruction = extract_json_from_response(response)
if desc := instruction.get("description"):
speak_async(f"下一步操作:{desc}")
return True, response
except Exception as e:
speak_async("LLM交互失败")
return False, str(e)

View File

@@ -406,7 +406,7 @@ if __name__ == "__main__":
'max_gripper': {},
'min_gripper': {},
'servo_config_file': {},
'end_control_info': {'left': "0300b14bff1100003708000010010000" , 'right': '0300509d5e040000120b000009050000'}
'end_control_info': {'left': "0300b14bff1100003708000010010000" , 'right': '0300e639bc2000007f50000011010000'}
}
config = parse_init_info(config)
endpose_arm = DummyEndposeMaster(config)

View File

@@ -177,8 +177,9 @@ def say(text, blocking=False):
elif system == "Linux":
# cmd = ["spd-say", text]
cmd = ["edge-playback", "-t", text]
if blocking:
cmd.append("--wait")
################## 修改:注释这部分 ###########################
# if blocking:
# cmd.append("--wait")
elif system == "Windows":
cmd = [

View File

@@ -151,4 +151,18 @@ python lerobot/scripts/control_robot.py \
--control.push_to_hub=false \
--control.policy.path=outputs/train/smolvla_move_the_bottle_into_ultrasonic_device_with_realman_single/checkpoints/160000/pretrained_model \
--control.display_data=true
```
```
```bash
export ASK_LLM_BASE_URL=https://api.apiyi.com/v1
export ASK_LLM_API_KEY=sk-H5FY8bQn6ZwBCja56280C3A4C8824017A4CdB683Dc990e35
export ASK_LLM_MODEL=claude-sonnet-4-20250514
python lerobot/scripts/control_robot.py \
--robot.type=realman_dual \
--robot.inference_time=false \
--control.type=teleoperate \
--control.display_data=true \
--robot.left_end_control_guid="0300b14bff1100003708000010010000" \
--robot.right_end_control_guid="0300e639bc2000007f50000011010000"
```