diff --git a/lerobot/common/robot_devices/robots/realman_dual.py b/lerobot/common/robot_devices/robots/realman_dual.py index 109dfc25..cec45022 100644 --- a/lerobot/common/robot_devices/robots/realman_dual.py +++ b/lerobot/common/robot_devices/robots/realman_dual.py @@ -1,5 +1,5 @@ """ - Teleoperation Realman with a PS5 controller and + Teleoperation Realman with a PS5 controller and LLM interaction """ import time @@ -9,58 +9,102 @@ import logging from typing import Optional, Tuple, Dict from dataclasses import dataclass, field, replace from collections import deque +import signal +import sys + from lerobot.common.robot_devices.teleop.realman_aloha_dual import HybridController from lerobot.common.robot_devices.motors.utils import get_motor_names, make_motors_buses_from_configs from lerobot.common.robot_devices.cameras.utils import make_cameras_from_configs from lerobot.common.robot_devices.utils import RobotDeviceAlreadyConnectedError, RobotDeviceNotConnectedError from lerobot.common.robot_devices.robots.configs import RealmanDualRobotConfig -from lerobot.common.robot_devices.robots.utils import ask_llm - +from lerobot.common.robot_devices.robots.utils import ( + ask_llm, + extract_json_from_response, + create_keyboard_listener, + start_keyboard_listener, + stop_keyboard_listener, + speak_async, + handle_llm_interaction_with_images +) class RealmanDualRobot: + """RealmanDual机器人控制类,支持双臂操作和LLM交互""" + def __init__(self, config: RealmanDualRobotConfig | None = None, **kwargs): if config is None: config = RealmanDualRobotConfig() - # Overwrite config arguments using kwargs + + # 配置初始化 self.config = replace(config, **kwargs) self.robot_type = self.config.type - self.inference_time = self.config.inference_time # if it is inference time + self.inference_time = self.config.inference_time - # build cameras + # 硬件初始化 self.cameras = make_cameras_from_configs(self.config.cameras) - # build realman motors self.piper_motors = make_motors_buses_from_configs(self.config.follower_arm) self.arm = self.piper_motors['main'] - # 初始化遥操作 + # 控制系统初始化 self._initialize_teleop() - # init state self._initialize_state() + self._initialize_keyboard_interface() + + # 状态标志 + self._shutdown_flag = False + self._llm_triggered = False def _initialize_teleop(self): - """初始化遥操作""" + """初始化遥操作控制器""" self.init_info = { 'init_joint': self.arm.init_joint_position, 'init_pose': self.arm.init_pose, 'max_gripper': self.config.max_gripper, 'min_gripper': self.config.min_gripper, 'servo_config_file': self.config.servo_config_file, - 'end_control_info': {'left': self.config.left_end_control_guid , 'right': self.config.right_end_control_guid} + 'end_control_info': { + 'left': self.config.left_end_control_guid, + 'right': self.config.right_end_control_guid + } } if not self.inference_time: self.teleop = HybridController(self.init_info) else: self.teleop = None - + def _initialize_state(self): - """初始化状态""" + """初始化状态管理""" self.joint_queue = deque(maxlen=2) self.last_endpose = self.arm.init_pose self.logs = {} self.is_connected = False + def _initialize_keyboard_interface(self): + """初始化键盘交互接口""" + self.w_pressed = False # W键触发LLM + self.q_pressed = False # Q键退出 + self.keyboard_listener = None + self._start_keyboard_listener() + + def _start_keyboard_listener(self): + """启动键盘监听器""" + def on_key_press(key_char): + if key_char == 'w': + self.w_pressed = True + print("检测到W键按下") + elif key_char == 'q': + self.q_pressed = True + print("检测到Q键按下") + + self.keyboard_listener = create_keyboard_listener(on_key_press) + success = start_keyboard_listener(self.keyboard_listener) + + if success: + print("键盘监听器启动成功 (W键:调用LLM, Q键:退出)") + else: + print("键盘监听器启动失败") + def _read_robot_state(self) -> dict: """读取机器人状态""" before_read_t = time.perf_counter() @@ -70,17 +114,15 @@ class RealmanDualRobot: return state def _execute_action(self, action: dict, state: dict): - """执行动作""" + """执行机器人动作""" before_write_t = time.perf_counter() - + if action['control_mode'] == 'joint': - # self.arm.write_action(action, state) pass else: if list(action['pose'].values()) != list(state['pose'].values()): pose = list(action['pose'].values()) self.arm.write_endpose_canfd(pose) - elif list(action['joint'].values()) != list(state['joint'].values()): target_joint = list(action['joint'].values()) self.arm.write(target_joint) @@ -88,14 +130,16 @@ class RealmanDualRobot: self.logs["write_pos_dt_s"] = time.perf_counter() - before_write_t def _prepare_record_data(self) -> Tuple[Dict, Dict]: - """准备记录数据""" + """准备记录数据 - 保持原有逻辑""" if len(self.joint_queue) < 2: return {}, {} - + state = torch.as_tensor(list(self.joint_queue[0]), dtype=torch.float32) action = torch.as_tensor(list(self.joint_queue[-1]), dtype=torch.float32) + # 捕获图像 images = self._capture_images() + # 构建输出字典 obs_dict = { "observation.state": state, @@ -103,7 +147,7 @@ class RealmanDualRobot: } action_dict = {"action": action} return obs_dict, action_dict - + def _update_state_queue(self): """更新状态队列""" current_state = self.arm.read()['joint'] @@ -114,21 +158,290 @@ class RealmanDualRobot: elif "gripper" in data: current_state_lst.append((current_state[data]-500)/500) self.joint_queue.append(current_state_lst) - + def _capture_images(self) -> Dict[str, torch.Tensor]: - """捕获图像""" + """捕获摄像头图像""" images = {} for name, camera in self.cameras.items(): before_camread_t = time.perf_counter() image = camera.async_read() images[name] = torch.from_numpy(image) - self.logs[f"read_camera_{name}_dt_s"] = camera.logs["delta_timestamp_s"] self.logs[f"async_read_camera_{name}_dt_s"] = time.perf_counter() - before_camread_t return images + def _handle_llm_interaction(self, obs_dict: Dict) -> bool: + """处理LLM交互逻辑""" + print("[W键已按下] 正在准备数据并调用LLM...") + + # 提取图像数据 + camera_images = {} + for key, value in obs_dict.items(): + if key.startswith("observation.images."): + camera_name = key.replace("observation.images.", "") + camera_images[camera_name] = value.cpu().numpy() + + # 使用utils中的函数处理LLM交互 + success, response = handle_llm_interaction_with_images( + "将超声仪左下角试管架上的蓝色试管移动到超声仪中", + camera_images + ) + + return success + + def connect(self) -> None: + """连接机器人和摄像头""" + if self.is_connected: + raise RobotDeviceAlreadyConnectedError( + "RealmanArm is already connected. Do not run `robot.connect()` twice." + ) + + # 连接机械臂 + self.arm.connect(enable=True) + print("RealmanArm conneted") + + # 连接摄像头 + for name in self.cameras: + self.cameras[name].connect() + self.is_connected = self.is_connected and self.cameras[name].is_connected + print(f"camera {name} conneted") + + print("All connected") + self.is_connected = True + self.run_calibration() + + def disconnect(self) -> None: + """断开连接""" + if self._shutdown_flag: + return + self._shutdown_flag = True + + try: + # 停止键盘监听器 + stop_keyboard_listener(self.keyboard_listener) + print("键盘监听器已停止") + + # 停止遥操作控制器 + if hasattr(self, 'teleop') and self.teleop and not self.inference_time: + self.teleop.stop() + print("遥操作控制器已停止") + + # 断开机械臂连接 + if hasattr(self, 'arm'): + try: + self.arm.safe_disconnect() + print("RealmanArm 安全断开连接") + time.sleep(2) + self.arm.connect(enable=False) + print("RealmanArm 已禁用") + except Exception as e: + print(f"断开机械臂连接时出错: {e}") + + # 断开摄像头连接 + if len(self.cameras) > 0: + for name, cam in self.cameras.items(): + try: + cam.disconnect() + print(f"Camera {name} 已断开连接") + except Exception as e: + print(f"断开相机 {name} 时出错: {e}") + + self.is_connected = False + print("所有设备已断开连接") + + except Exception as e: + print(f"断开连接时发生错误: {e}") + + def run_calibration(self): + """运行标定""" + if not self.is_connected: + raise ConnectionError() + self.arm.apply_calibration() + if not self.inference_time: + self.teleop.reset() + + # def teleop_step(self, record_data=False) -> None | tuple[dict[str, torch.Tensor], dict[str, torch.Tensor]]: + # """遥操作步骤 - 保持原有数据记录逻辑,添加LLM交互""" + # if not self.is_connected: + # raise ConnectionError() + + # if self.teleop is None and self.inference_time: + # self.teleop = HybridController(self.init_info) + + # try: + # # 检查退出条件 + # if self.q_pressed: + # print("检测到Q键,任务终止...") + # speak_async("任务已终止") + # raise KeyboardInterrupt("用户请求退出") + + # # 执行基础遥操作 + # state = self._read_robot_state() + # action = self.teleop.get_action(state) + # self._execute_action(action, state) + + # # 更新状态队列 + # self._update_state_queue() + # time.sleep(0.019) # 50Hz + + # # 处理数据记录 + # if record_data: + # data = self._prepare_record_data() + # if data[0]: # 如果有有效数据 + # # 处理LLM交互请求 + # if self.w_pressed: + # self.w_pressed = False # 重置标志位 + # self._llm_triggered = True + # success = self._handle_llm_interaction(data[0]) + # if not success: + # print("LLM交互处理失败") + + # return data + + # return None + + # except KeyboardInterrupt: + # # 重新抛出键盘中断,让上层处理 + # raise + # except Exception as e: + # logging.error(f"遥操作步骤失败: {e}") + # return None + def teleop_step(self, record_data=False) -> None | tuple[dict[str, torch.Tensor], dict[str, torch.Tensor]]: + """遥操作步骤 - 保持原有数据记录逻辑,添加LLM交互""" + if not self.is_connected: + raise ConnectionError() + + if self.teleop is None and self.inference_time: + self.teleop = HybridController(self.init_info) + + try: + # 检查退出条件 + if self.q_pressed: + print("检测到Q键,任务终止...") + speak_async("任务已终止") + raise KeyboardInterrupt("用户请求退出") + + # 执行基础遥操作 + state = self._read_robot_state() + action = self.teleop.get_action(state) + self._execute_action(action, state) + + # 更新状态队列 + self._update_state_queue() + time.sleep(0.019) # 50Hz + + # 处理数据记录 + if record_data: + data = self._prepare_record_data() + + # 处理LLM交互请求(只有在有有效数据时才处理) + if data[0] and self.w_pressed: # 如果有有效数据且W键被按下 + self.w_pressed = False # 重置标志位 + self._llm_triggered = True + success = self._handle_llm_interaction(data[0]) + if not success: + print("LLM交互处理失败") + + # 如果没有有效数据,创建默认数据 + if not data[0]: + # 创建默认的观测数据 + if len(self.joint_queue) > 0: + state_tensor = torch.as_tensor(list(self.joint_queue[-1]), dtype=torch.float32) + else: + state_tensor = torch.zeros(14, dtype=torch.float32) # 根据你的机器人调整维度 + + # 捕获当前图像 + images = self._capture_images() + + obs_dict = { + "observation.state": state_tensor, + **{f"observation.images.{name}": img for name, img in images.items()} + } + action_dict = {"action": state_tensor} # 使用相同的状态作为动作 + data = (obs_dict, action_dict) + + return data + + return None + + except KeyboardInterrupt: + # 重新抛出键盘中断,让上层处理 + raise + except Exception as e: + logging.error(f"遥操作步骤失败: {e}") + + # 即使出错,在record_data=True时也要返回有效数据 + if record_data: + # 创建紧急默认数据 + state_tensor = torch.zeros(14, dtype=torch.float32) # 根据你的机器人调整维度 + images = {} + try: + images = self._capture_images() + except: + # 如果连图像都无法捕获,创建空图像 + for camera_name in self.cameras.keys(): + images[camera_name] = torch.zeros((480, 640, 3), dtype=torch.uint8) + + obs_dict = { + "observation.state": state_tensor, + **{f"observation.images.{name}": img for name, img in images.items()} + } + action_dict = {"action": state_tensor} + return obs_dict, action_dict + + return None + + def send_action(self, action: torch.Tensor) -> torch.Tensor: + """发送动作到机器人""" + if not self.is_connected: + raise RobotDeviceNotConnectedError( + "Piper is not connected. You need to run `robot.connect()`." + ) + + target_joints = action.tolist() + len_joint = len(target_joints) - 1 + target_joints = [target_joints[i]*180 for i in range(len_joint)] + [target_joints[-1]] + target_joints[-1] = int(target_joints[-1]*500 + 500) + self.arm.write(target_joints) + + return action + + def capture_observation(self) -> dict: + """捕获当前观测""" + if not self.is_connected: + raise RobotDeviceNotConnectedError( + "Piper is not connected. You need to run `robot.connect()`." + ) + + before_read_t = time.perf_counter() + state = self.arm.read() + self.logs["read_pos_dt_s"] = time.perf_counter() - before_read_t + + state = torch.as_tensor(list(state.values()), dtype=torch.float32) + + # 读取图像 + images = {} + for name in self.cameras: + before_camread_t = time.perf_counter() + images[name] = self.cameras[name].async_read() + images[name] = torch.from_numpy(images[name]) + self.logs[f"read_camera_{name}_dt_s"] = self.cameras[name].logs["delta_timestamp_s"] + self.logs[f"async_read_camera_{name}_dt_s"] = time.perf_counter() - before_camread_t + + # 构建观测字典 + obs_dict = {} + obs_dict["observation.state"] = state + for name in self.cameras: + obs_dict[f"observation.images.{name}"] = images[name] + return obs_dict + + def teleop_safety_stop(self): + """遥操作安全停止""" + self.run_calibration() + @property def camera_features(self) -> dict: + """获取摄像头特征""" cam_ft = {} for cam_key, cam in self.cameras.items(): key = f"observation.images.{cam_key}" @@ -139,9 +452,9 @@ class RealmanDualRobot: } return cam_ft - @property def motor_features(self) -> dict: + """获取电机特征""" action_names = get_motor_names(self.piper_motors) state_names = get_motor_names(self.piper_motors) return { @@ -156,7 +469,7 @@ class RealmanDualRobot: "names": state_names, }, } - + @property def has_camera(self): return len(self.cameras) > 0 @@ -165,158 +478,51 @@ class RealmanDualRobot: def num_cameras(self): return len(self.cameras) - - def connect(self) -> None: - """Connect RealmanArm and cameras""" - if self.is_connected: - raise RobotDeviceAlreadyConnectedError( - "RealmanArm is already connected. Do not run `robot.connect()` twice." - ) - - # connect RealmanArm - self.arm.connect(enable=True) - print("RealmanArm conneted") - - # connect cameras - for name in self.cameras: - self.cameras[name].connect() - self.is_connected = self.is_connected and self.cameras[name].is_connected - print(f"camera {name} conneted") - - print("All connected") - self.is_connected = True - - self.run_calibration() - - - def disconnect(self) -> None: - """move to home position, disenable piper and cameras""" - # move piper to home position, disable - if not self.inference_time: - self.teleop.stop() - - # disconnect piper - self.arm.safe_disconnect() - print("RealmanArm disable after 5 seconds") - time.sleep(5) - self.arm.connect(enable=False) - - # disconnect cameras - if len(self.cameras) > 0: - for cam in self.cameras.values(): - cam.disconnect() - - self.is_connected = False - - - def run_calibration(self): - """move piper to the home position""" - if not self.is_connected: - raise ConnectionError() - - self.arm.apply_calibration() - if not self.inference_time: - self.teleop.reset() - - def teleop_step( - self, record_data=False - ) -> None | tuple[dict[str, torch.Tensor], dict[str, torch.Tensor]]: - if not self.is_connected: - raise ConnectionError() - - if self.teleop is None and self.inference_time: - self.teleop = HybridController(self.init_info) - - try: - # 读取当前状态 - state = self._read_robot_state() - # 获取动作 - action = self.teleop.get_action(state) - self._execute_action(action, state) - # 更新状态队列 - self._update_state_queue() - time.sleep(0.019) # 50HZ - - if record_data: - data = self._prepare_record_data() - if data[0]: - # # ask_llm("将超声仪左下角试管架上的试管移动到超声仪中", data[0]) - pass - return data - - except Exception as e: - logging.error(f"遥操作步骤失败: {e}") - return None - - - def send_action(self, action: torch.Tensor) -> torch.Tensor: - """Write the predicted actions from policy to the motors""" - if not self.is_connected: - raise RobotDeviceNotConnectedError( - "Piper is not connected. You need to run `robot.connect()`." - ) - - # send to motors, torch to list - target_joints = action.tolist() - len_joint = len(target_joints) - 1 - target_joints = [target_joints[i]*180 for i in range(len_joint)] + [target_joints[-1]] - target_joints[-1] = int(target_joints[-1]*500 + 500) - self.arm.write(target_joints) - - return action - - - def capture_observation(self) -> dict: - """capture current images and joint positions""" - if not self.is_connected: - raise RobotDeviceNotConnectedError( - "Piper is not connected. You need to run `robot.connect()`." - ) - - # read current joint positions - before_read_t = time.perf_counter() - state = self.arm.read() # 6 joints + 1 gripper - self.logs["read_pos_dt_s"] = time.perf_counter() - before_read_t - - state = torch.as_tensor(list(state.values()), dtype=torch.float32) - - # read images from cameras - images = {} - for name in self.cameras: - before_camread_t = time.perf_counter() - images[name] = self.cameras[name].async_read() - images[name] = torch.from_numpy(images[name]) - self.logs[f"read_camera_{name}_dt_s"] = self.cameras[name].logs["delta_timestamp_s"] - self.logs[f"async_read_camera_{name}_dt_s"] = time.perf_counter() - before_camread_t - - # Populate output dictionnaries and format to pytorch - obs_dict = {} - obs_dict["observation.state"] = state - for name in self.cameras: - obs_dict[f"observation.images.{name}"] = images[name] - return obs_dict - - def teleop_safety_stop(self): - """ move to home position after record one episode """ - self.run_calibration() - - def __del__(self): - if self.is_connected: - self.disconnect() - if not self.inference_time: - self.teleop.stop() + """析构函数""" + try: + if not self._shutdown_flag: + self.disconnect() + except: + pass + + +def signal_handler(signum, frame): + """信号处理器""" + print("\n收到中断信号,正在安全退出...") + sys.exit(0) if __name__ == '__main__': - robot = RealmanDualRobot() - robot.connect() - # robot.run_calibration() - while True: - robot.teleop_step(record_data=True) + signal.signal(signal.SIGINT, signal_handler) + robot = None - # robot.capture_observation() - # dummy_action = torch.Tensor([-0.40586111280653214, 0.5522833506266276, 0.4998166826036241, -0.3539944542778863, -0.524433347913954, 0.9064999898274739, 0.482]) - # robot.send_action(dummy_action) - # robot.disconnect() - # print('ok') \ No newline at end of file + try: + robot = RealmanDualRobot() + robot.connect() + + print("RealmanDual 机器人控制已启动") + print("操作说明:") + print(" - 使用手柄进行遥操作控制") + print(" - 按 W 键:调用LLM分析当前场景") + print(" - 按 Q 键:安全退出程序") + print(" - Ctrl+C:强制退出") + print("\n等待操作...") + + while True: + result = robot.teleop_step(record_data=True) + if result is None: + continue + # 这里可以处理记录的数据 + + except KeyboardInterrupt: + print("\n收到键盘中断信号") + except Exception as e: + print(f"程序运行出错: {e}") + finally: + if robot: + try: + robot.disconnect() + except: + pass + print("程序已完全退出") \ No newline at end of file diff --git a/lerobot/common/robot_devices/robots/utils.py b/lerobot/common/robot_devices/robots/utils.py index 1f740fb4..29654ecb 100644 --- a/lerobot/common/robot_devices/robots/utils.py +++ b/lerobot/common/robot_devices/robots/utils.py @@ -11,13 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -conversation_history = [] -conversation_client = None - - from typing import Protocol, Dict - # Robot configuration imports from lerobot.common.robot_devices.robots.configs import ( AlohaRobotConfig, @@ -33,39 +27,29 @@ from lerobot.common.robot_devices.robots.configs import ( RealmanRobotConfig, RealmanDualRobotConfig ) - # Added library imports for LLM interaction -from openai import OpenAI -import base64 import os -import cv2 -import torch +import base64 +from io import BytesIO +import json +from openai import OpenAI +from PIL import Image import numpy as np -from pynput import keyboard -import time -import json -from datetime import datetime - +import torch def get_arm_id(name, arm_type): """Returns the string identifier of a robot arm.""" return f"{name}_{arm_type}" - - class Robot(Protocol): robot_type: str features: dict cameras: Dict - def connect(self): ... def run_calibration(self): ... def teleop_step(self, record_data=False): ... def capture_observation(self) -> Dict: ... def send_action(self, action): ... def disconnect(self): ... - - def make_robot_config(robot_type: str, **kwargs) -> RobotConfig: - # ... (此函数内容保持不变) ... if robot_type == "aloha": return AlohaRobotConfig(**kwargs) elif robot_type == "koch": @@ -88,10 +72,7 @@ def make_robot_config(robot_type: str, **kwargs) -> RobotConfig: return RealmanDualRobotConfig(**kwargs) else: raise ValueError(f"Robot type '{robot_type}' is not available.") - - def make_robot_from_config(config: RobotConfig): - # ... (此函数内容保持不变) ... if isinstance(config, ManipulatorRobotConfig): from lerobot.common.robot_devices.robots.manipulator import ManipulatorRobot return ManipulatorRobot(config) @@ -107,37 +88,299 @@ def make_robot_from_config(config: RobotConfig): else: from lerobot.common.robot_devices.robots.stretch import StretchRobot return StretchRobot(config) - - def make_robot(robot_type: str, **kwargs) -> Robot: config = make_robot_config(robot_type, **kwargs) return make_robot_from_config(config) - -# -------------------- LLM 交互功能区 -------------------- - -def encode_image_to_base64(image_tensor: torch.Tensor) -> str: - """将PyTorch张量格式的图像编码为Base64字符串""" - image_np = image_tensor.cpu().numpy().astype(np.uint8) - image_bgr = cv2.cvtColor(image_np, cv2.COLOR_RGB2BGR) - success, buffer = cv2.imencode(".jpg", image_bgr) - if not success: - raise ValueError("图像编码失败") - return base64.b64encode(buffer).decode("utf-8") - -# 在文件顶部添加全局变量来管理会话 +# 全局变量,用于管理会话历史和API客户端 conversation_history = [] conversation_client = None +def reset_conversation_history(): + """清空对话历史,开始一个新任务。""" + global conversation_history + conversation_history = [] + print("对话历史已重置。") + +def extract_json_from_response(response: str) -> dict: + """ + 从LLM响应中提取JSON格式的指令 - 从realman_dual.py移动过来 + + Args: + response: LLM的原始响应文本 + + Returns: + dict: 解析后的JSON指令字典 + """ + try: + # 尝试直接解析整个响应 + return json.loads(response) + except: + # 尝试从响应中查找JSON部分 + import re + json_pattern = r'\{[^{}]*\}' + matches = re.findall(json_pattern, response) + + for match in matches: + try: + return json.loads(match) + except: + continue + + # 如果无法解析JSON,返回默认格式 + return { + "action": "unknown", + "description": response, + "status": "parse_error" + } def ask_llm(query: str, state: dict): - - prompt = """ """ + """ + 向大型语言模型发送查询,并获取下一步操作指令。 + 优化版本:只保留最新的4张图片,避免Token超限 + """ + global conversation_client + global conversation_history - api_key = os.getenv("OPENAI_API_KEY") - base_url = os.getenv("OPENAI_BASE_URL") - client = OpenAI(api_key=api_key, base_url=base_url) - # keys = [key for key in state] - # import pdb - # pdb.set_trace() + # 将NumPy数组转换为Base64编码的图片 + def numpy_to_base64(img_array): + if img_array.dtype != np.uint8: + if img_array.max() <= 1.0: + img_array = (img_array * 255).astype(np.uint8) + else: + img_array = img_array.astype(np.uint8) + image = Image.fromarray(img_array) + buffered = BytesIO() + image.save(buffered, format="PNG") + img_base64 = base64.b64encode(buffered.getvalue()).decode('utf-8') + return img_base64 + + # 初始化API客户端 + api_key = "sk-H5FY8bQn6ZwBCja56280C3A4C8824017A4CdB683Dc990e35" + base_url = "https://api.apiyi.com/v1" + + # if not api_key: + # raise ValueError("API Key为空!请直接在代码中填写。") + + if conversation_client is None: + conversation_client = OpenAI(api_key=api_key, base_url=base_url) + + # 仅在对话历史为空时添加系统提示 + if not conversation_history: + system_prompt = f""" +你是一个聪明的双臂机器人助手: +**机器人配置:** +- 左臂:配备夹爪,可以抓取物体 +- 右臂:没有夹爪,主要用于辅助定位和观察 +- 四个摄像头:high camera(俯视角)、front camera(正视角)、left camera(左臂视角)、right camera(右臂视角) +**任务目标:** {query} +**工作流程:** +1. 我向你展示当前4个摄像头的画面 +2. 你分析场景,给出下一步具体操作指令 +3. 我执行你的指令后,再次更新画面 +4. 重复w此过程直到完成任务 +**重要要求:** +- 每次只给出ONE STEP最关键的操作指令 +- 指令要具体明确,便于执行(如"将左臂夹爪移动到试管正上方5cm处,准备下降抓取") +- 当视角不清晰时,要求调整摄像头位置 +- 左臂负责抓取,右臂负责辅助观察和定位 +- 给出的指令必须可执行,避免模糊描述 +**输出格式要求:** +- 使用纯文本输出,不要使用任何Markdown格式符号 +- 不要使用星号、井号、下划线等格式化符号 +- 直接给出简洁分析和具体操作指令 +- 输出内容适合语音播报 +**输出格式:** +简洁分析 + 具体操作指令 + """ + conversation_history.append({"role": "system", "content": system_prompt}) + + # 如果对话历史中有图片消息,只保留最近的一轮对话(系统消息 + 最后一轮用户-助手对话) + if len(conversation_history) > 3: # 系统消息 + 用户消息 + 助手回复 + # 保留系统消息 + system_msg = conversation_history[0] + # 只保留最后一轮对话(最后的用户消息和助手回复) + recent_messages = conversation_history[-2:] # 最后2条消息 + conversation_history = [system_msg] + recent_messages + print("清理对话历史,避免Token超限") + + # 构建本次用户输入的消息内容 + user_content = [{"type": "text", "text": "这是当前的4个摄像头画面,请根据任务目标分析场景并给出下一步具体操作指令。"}] + + # 添加图片,按照指定顺序 + camera_order = ['high', 'front', 'left', 'right'] + for camera_name in camera_order: + if camera_name in state: + img_base64 = numpy_to_base64(state[camera_name]) + user_content.append({ + "type": "image_url", + "image_url": { + "url": f"data:image/png;base64,{img_base64}", + "detail": "high" # 使用高清晰度分析 + } + }) + + # 将当前用户输入添加到对话历史中 + conversation_history.append({"role": "user", "content": user_content}) + + try: + completion = conversation_client.chat.completions.create( + model="claude-sonnet-4-20250514", + messages=conversation_history, + max_tokens=800, + temperature=0.3 + ) + + response_message = completion.choices[0].message + response_content = response_message.content + + # 将模型的响应添加到历史中 + conversation_history.append(response_message) + + print(f"机器人响应: {response_content}") + return response_content + + except Exception as e: + error_msg = str(e) + print(f"调用LLM时出错: {error_msg}") + + # Token超限时的特殊处理 + if "token_limit_exceeded" in error_msg or "context_length_exceeded" in error_msg: + print("检测到Token超限,清空对话历史重新开始...") + # 清空除系统消息外的所有历史 + if conversation_history: + conversation_history = [conversation_history[0]] # 只保留系统消息 + # 移除刚刚添加的用户消息,准备重试 + if conversation_history and len(conversation_history) > 1: + conversation_history.pop() + return "由于对话历史过长,已重置对话。请再次按Enter键继续。" + + # 其他错误时也要清理最后的用户输入 + if conversation_history and conversation_history[-1]["role"] == "user": + conversation_history.pop() + return None + +def extract_json_from_response(response: str) -> dict: + """ + 从LLM响应中提取结构化指令 - 优化版本 + """ + try: + # 尝试直接解析整个响应 + return json.loads(response) + except: + # 如果无法解析JSON,创建结构化的响应 + return { + "action": "move_arm", # 默认动作类型 + "description": response.strip(), + "arm": "left", # 默认使用左臂 + "target": "unknown", + "status": "ready" + } - pass \ No newline at end of file +from pynput import keyboard +import threading + +def create_keyboard_listener(on_key_press_callback): + """ + 创建键盘监听器 + + Args: + on_key_press_callback: 按键回调函数,接收参数 (key_char) + + Returns: + keyboard.Listener: 键盘监听器对象 + """ + def on_press(key): + try: + if hasattr(key, 'char') and key.char: + on_key_press_callback(key.char.lower()) + except AttributeError: + pass + + return keyboard.Listener(on_press=on_press) + +def start_keyboard_listener(listener): + """启动键盘监听器""" + try: + listener.start() + return True + except Exception as e: + print(f"键盘监听器启动失败: {e}") + return False + +def stop_keyboard_listener(listener): + """停止键盘监听器""" + try: + if listener and listener.is_alive(): + listener.stop() + return True + except Exception: + pass + return False + +def speak_async(text: str): + """异步语音播报""" + from lerobot.common.utils.utils import say + + def speak_thread(): + try: + print(f"开始语音播报: {text}") + say(text, blocking=True) + print("语音播报完成") + except Exception as e: + print(f"语音播报失败: {e}") + + thread = threading.Thread(target=speak_thread, daemon=True) + thread.start() + +def handle_llm_interaction_with_images(query: str, camera_images: dict): + """ + 处理LLM交互,接收相机图像字典 + + Args: + query: 用户查询文本 + camera_images: 相机图像字典 {camera_name: numpy_array} + + Returns: + tuple: (success: bool, response: str) + """ + if len(camera_images) != 4: + print(f"警告: 期望4个相机,实际获取到{len(camera_images)}个") + return False, "相机数量不足" + + print("图像提取成功,正在向LLM发送请求...") + + try: + response = ask_llm(query, camera_images) + if response: + if "已重置对话" in response: + print("对话历史已重置,请再次按键继续") + return True, response + + instruction = extract_json_from_response(response) + print("\n下一步指令:") + print(f"操作类型: {instruction.get('action', 'unknown')}") + print(f"详细描述: {instruction.get('description', '无描述')}") + + # 准备语音播报内容 + if "description" in instruction: + desc = instruction['description'] + if len(desc) > 100: + sentences = desc.split('。') + key_instruction = sentences[-2] if len(sentences) > 1 else desc[:50] + else: + key_instruction = desc + speech_text = f"下一步操作:{key_instruction}" + print(f"即将播报: {speech_text}") + speak_async(speech_text) + + return True, response + else: + print("LLM请求失败,可能是网络问题") + speak_async("LLM请求失败") + return False, "LLM请求失败" + + except Exception as e: + print(f"LLM交互出错: {e}") + print("建议:1.检查网络连接 2.检查API密钥 3.稍后重试") + speak_async("LLM交互失败") + return False, f"LLM交互出错: {e}" \ No newline at end of file diff --git a/lerobot/common/utils/utils.py b/lerobot/common/utils/utils.py index b72fde50..da280014 100644 --- a/lerobot/common/utils/utils.py +++ b/lerobot/common/utils/utils.py @@ -177,8 +177,9 @@ def say(text, blocking=False): elif system == "Linux": # cmd = ["spd-say", text] cmd = ["edge-playback", "-t", text] - if blocking: - cmd.append("--wait") +################## 修改:注释这部分 ########################### + # if blocking: + # cmd.append("--wait") elif system == "Windows": cmd = [