From 1752309c77c720591a70ffb0dbca8c879ab3d01c Mon Sep 17 00:00:00 2001 From: yutang Date: Wed, 20 Aug 2025 21:45:22 +0800 Subject: [PATCH] =?UTF-8?q?=E8=AF=AD=E9=9F=B3=E6=92=AD=E6=8A=A5=E4=BF=AE?= =?UTF-8?q?=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../common/robot_devices/robots/configs.py | 2 +- .../robot_devices/robots/llm_prompt.yaml | 25 ++ .../robot_devices/robots/realman_dual.py | 157 +++----- lerobot/common/robot_devices/robots/utils.py | 337 +++++------------- .../common/robot_devices/teleop/gamepad.py | 2 +- realman.md | 16 +- 6 files changed, 174 insertions(+), 365 deletions(-) create mode 100644 lerobot/common/robot_devices/robots/llm_prompt.yaml diff --git a/lerobot/common/robot_devices/robots/configs.py b/lerobot/common/robot_devices/robots/configs.py index b7e573fb..8b834653 100644 --- a/lerobot/common/robot_devices/robots/configs.py +++ b/lerobot/common/robot_devices/robots/configs.py @@ -756,7 +756,7 @@ class RealmanDualRobotConfig(RobotConfig): min_gripper: int = 10 servo_config_file: str = "/home/maic/LYT/lerobot/lerobot/common/robot_devices/teleop/servo_dual.yaml" left_end_control_guid: str = '0300b14bff1100003708000010010000' - right_end_control_guid: str = '030003f05e0400008e02000010010000' + right_end_control_guid: str = '0300e639bc2000007f50000011010000' follower_arm: dict[str, MotorsBusConfig] = field( default_factory=lambda: { diff --git a/lerobot/common/robot_devices/robots/llm_prompt.yaml b/lerobot/common/robot_devices/robots/llm_prompt.yaml new file mode 100644 index 00000000..f6c9cbdd --- /dev/null +++ b/lerobot/common/robot_devices/robots/llm_prompt.yaml @@ -0,0 +1,25 @@ +system_prompt: | + 你是一个聪明的双臂机器人助手: + **机器人配置:** + - 左臂:配备夹爪,可以抓取物体 + - 右臂:没有夹爪,主要用于辅助定位和观察 + - 四个摄像头:high camera(俯视角)、front camera(正视角)、left camera(左臂视角)、right camera(右臂视角) + **任务目标:** {query} + **工作流程:** + 1. 我向你展示当前4个摄像头的画面 + 2. 你分析场景,给出下一步具体操作指令 + 3. 我执行你的指令后,再次更新画面 + 4. 重复此过程直到完成任务 + **重要要求:** + - 每次只给出ONE STEP最关键的操作指令 + - 指令要具体明确,便于执行(如"将左臂夹爪移动到试管正上方5cm处,准备下降抓取") + - 当视角不清晰时,要求调整摄像头位置 + - 左臂负责抓取,右臂负责辅助观察和定位 + - 给出的指令必须可执行,避免模糊描述 + **输出格式要求:** + - 使用纯文本输出,不要使用任何Markdown格式符号 + - 不要使用星号、井号、下划线等格式化符号 + - 直接给出简洁分析和具体操作指令 + - 输出内容适合语音播报 + **输出格式:** + 简洁分析 + 具体操作指令 \ No newline at end of file diff --git a/lerobot/common/robot_devices/robots/realman_dual.py b/lerobot/common/robot_devices/robots/realman_dual.py index cec45022..b3b90dda 100644 --- a/lerobot/common/robot_devices/robots/realman_dual.py +++ b/lerobot/common/robot_devices/robots/realman_dual.py @@ -1,7 +1,7 @@ + """ Teleoperation Realman with a PS5 controller and LLM interaction """ - import time import torch import numpy as np @@ -11,7 +11,6 @@ from dataclasses import dataclass, field, replace from collections import deque import signal import sys - from lerobot.common.robot_devices.teleop.realman_aloha_dual import HybridController from lerobot.common.robot_devices.motors.utils import get_motor_names, make_motors_buses_from_configs from lerobot.common.robot_devices.cameras.utils import make_cameras_from_configs @@ -54,6 +53,7 @@ class RealmanDualRobot: self._shutdown_flag = False self._llm_triggered = False + def _initialize_teleop(self): """初始化遥操作控制器""" self.init_info = { @@ -95,11 +95,9 @@ class RealmanDualRobot: print("检测到W键按下") elif key_char == 'q': self.q_pressed = True - print("检测到Q键按下") - + print("检测到Q键按下") self.keyboard_listener = create_keyboard_listener(on_key_press) - success = start_keyboard_listener(self.keyboard_listener) - + success = start_keyboard_listener(self.keyboard_listener) if success: print("键盘监听器启动成功 (W键:调用LLM, Q键:退出)") else: @@ -115,8 +113,7 @@ class RealmanDualRobot: def _execute_action(self, action: dict, state: dict): """执行机器人动作""" - before_write_t = time.perf_counter() - + before_write_t = time.perf_counter() if action['control_mode'] == 'joint': pass else: @@ -125,21 +122,17 @@ class RealmanDualRobot: self.arm.write_endpose_canfd(pose) elif list(action['joint'].values()) != list(state['joint'].values()): target_joint = list(action['joint'].values()) - self.arm.write(target_joint) - + self.arm.write(target_joint) self.logs["write_pos_dt_s"] = time.perf_counter() - before_write_t def _prepare_record_data(self) -> Tuple[Dict, Dict]: """准备记录数据 - 保持原有逻辑""" if len(self.joint_queue) < 2: - return {}, {} - + return {}, {} state = torch.as_tensor(list(self.joint_queue[0]), dtype=torch.float32) - action = torch.as_tensor(list(self.joint_queue[-1]), dtype=torch.float32) - + action = torch.as_tensor(list(self.joint_queue[-1]), dtype=torch.float32) # 捕获图像 - images = self._capture_images() - + images = self._capture_images() # 构建输出字典 obs_dict = { "observation.state": state, @@ -170,24 +163,23 @@ class RealmanDualRobot: self.logs[f"async_read_camera_{name}_dt_s"] = time.perf_counter() - before_camread_t return images - def _handle_llm_interaction(self, obs_dict: Dict) -> bool: - """处理LLM交互逻辑""" - print("[W键已按下] 正在准备数据并调用LLM...") + # def _handle_llm_interaction(self, obs_dict: Dict) -> bool: + # """处理LLM交互逻辑""" + # print("[W键已按下] 正在准备数据并调用LLM...") - # 提取图像数据 - camera_images = {} - for key, value in obs_dict.items(): - if key.startswith("observation.images."): - camera_name = key.replace("observation.images.", "") - camera_images[camera_name] = value.cpu().numpy() + # # 提取图像数据 + # camera_images = {} + # for key, value in obs_dict.items(): + # if key.startswith("observation.images."): + # camera_name = key.replace("observation.images.", "") + # camera_images[camera_name] = value.cpu().numpy() - # 使用utils中的函数处理LLM交互 - success, response = handle_llm_interaction_with_images( - "将超声仪左下角试管架上的蓝色试管移动到超声仪中", - camera_images - ) - - return success + # # 使用utils中的函数处理LLM交互 + # success, response = handle_llm_interaction_with_images( + # "将超声仪左下角试管架上的蓝色试管移动到超声仪中", + # camera_images + # ) + # return success def connect(self) -> None: """连接机器人和摄像头""" @@ -195,8 +187,7 @@ class RealmanDualRobot: raise RobotDeviceAlreadyConnectedError( "RealmanArm is already connected. Do not run `robot.connect()` twice." ) - - # 连接机械臂 + # 连接机械臂 self.arm.connect(enable=True) print("RealmanArm conneted") @@ -247,8 +238,7 @@ class RealmanDualRobot: print(f"断开相机 {name} 时出错: {e}") self.is_connected = False - print("所有设备已断开连接") - + print("所有设备已断开连接") except Exception as e: print(f"断开连接时发生错误: {e}") @@ -260,52 +250,6 @@ class RealmanDualRobot: if not self.inference_time: self.teleop.reset() - # def teleop_step(self, record_data=False) -> None | tuple[dict[str, torch.Tensor], dict[str, torch.Tensor]]: - # """遥操作步骤 - 保持原有数据记录逻辑,添加LLM交互""" - # if not self.is_connected: - # raise ConnectionError() - - # if self.teleop is None and self.inference_time: - # self.teleop = HybridController(self.init_info) - - # try: - # # 检查退出条件 - # if self.q_pressed: - # print("检测到Q键,任务终止...") - # speak_async("任务已终止") - # raise KeyboardInterrupt("用户请求退出") - - # # 执行基础遥操作 - # state = self._read_robot_state() - # action = self.teleop.get_action(state) - # self._execute_action(action, state) - - # # 更新状态队列 - # self._update_state_queue() - # time.sleep(0.019) # 50Hz - - # # 处理数据记录 - # if record_data: - # data = self._prepare_record_data() - # if data[0]: # 如果有有效数据 - # # 处理LLM交互请求 - # if self.w_pressed: - # self.w_pressed = False # 重置标志位 - # self._llm_triggered = True - # success = self._handle_llm_interaction(data[0]) - # if not success: - # print("LLM交互处理失败") - - # return data - - # return None - - # except KeyboardInterrupt: - # # 重新抛出键盘中断,让上层处理 - # raise - # except Exception as e: - # logging.error(f"遥操作步骤失败: {e}") - # return None def teleop_step(self, record_data=False) -> None | tuple[dict[str, torch.Tensor], dict[str, torch.Tensor]]: """遥操作步骤 - 保持原有数据记录逻辑,添加LLM交互""" if not self.is_connected: @@ -336,9 +280,19 @@ class RealmanDualRobot: # 处理LLM交互请求(只有在有有效数据时才处理) if data[0] and self.w_pressed: # 如果有有效数据且W键被按下 - self.w_pressed = False # 重置标志位 - self._llm_triggered = True - success = self._handle_llm_interaction(data[0]) + self.w_pressed = False + camera_images = { + name.replace("observation.images.", ""): img.cpu().numpy() + for name, img in data[0].items() + if name.startswith("observation.images.") + } + success, resp = handle_llm_interaction_with_images( + "将超声仪左下角试管架上的蓝色试管移动到超声仪中", + camera_images + ) + # self.w_pressed = False # 重置标志位 + # self._llm_triggered = True + # success = self._handle_llm_interaction(data[0]) if not success: print("LLM交互处理失败") @@ -352,16 +306,13 @@ class RealmanDualRobot: # 捕获当前图像 images = self._capture_images() - obs_dict = { "observation.state": state_tensor, **{f"observation.images.{name}": img for name, img in images.items()} } action_dict = {"action": state_tensor} # 使用相同的状态作为动作 - data = (obs_dict, action_dict) - - return data - + data = (obs_dict, action_dict) + return data return None except KeyboardInterrupt: @@ -380,15 +331,13 @@ class RealmanDualRobot: except: # 如果连图像都无法捕获,创建空图像 for camera_name in self.cameras.keys(): - images[camera_name] = torch.zeros((480, 640, 3), dtype=torch.uint8) - + images[camera_name] = torch.zeros((480, 640, 3), dtype=torch.uint8) obs_dict = { "observation.state": state_tensor, **{f"observation.images.{name}": img for name, img in images.items()} } action_dict = {"action": state_tensor} - return obs_dict, action_dict - + return obs_dict, action_dict return None def send_action(self, action: torch.Tensor) -> torch.Tensor: @@ -402,8 +351,7 @@ class RealmanDualRobot: len_joint = len(target_joints) - 1 target_joints = [target_joints[i]*180 for i in range(len_joint)] + [target_joints[-1]] target_joints[-1] = int(target_joints[-1]*500 + 500) - self.arm.write(target_joints) - + self.arm.write(target_joints) return action def capture_observation(self) -> dict: @@ -411,14 +359,12 @@ class RealmanDualRobot: if not self.is_connected: raise RobotDeviceNotConnectedError( "Piper is not connected. You need to run `robot.connect()`." - ) - + ) before_read_t = time.perf_counter() state = self.arm.read() self.logs["read_pos_dt_s"] = time.perf_counter() - before_read_t - state = torch.as_tensor(list(state.values()), dtype=torch.float32) - + state = torch.as_tensor(list(state.values()), dtype=torch.float32) # 读取图像 images = {} for name in self.cameras: @@ -426,8 +372,7 @@ class RealmanDualRobot: images[name] = self.cameras[name].async_read() images[name] = torch.from_numpy(images[name]) self.logs[f"read_camera_{name}_dt_s"] = self.cameras[name].logs["delta_timestamp_s"] - self.logs[f"async_read_camera_{name}_dt_s"] = time.perf_counter() - before_camread_t - + self.logs[f"async_read_camera_{name}_dt_s"] = time.perf_counter() - before_camread_t # 构建观测字典 obs_dict = {} obs_dict["observation.state"] = state @@ -438,7 +383,6 @@ class RealmanDualRobot: def teleop_safety_stop(self): """遥操作安全停止""" self.run_calibration() - @property def camera_features(self) -> dict: """获取摄像头特征""" @@ -486,13 +430,11 @@ class RealmanDualRobot: except: pass - def signal_handler(signum, frame): """信号处理器""" print("\n收到中断信号,正在安全退出...") sys.exit(0) - if __name__ == '__main__': signal.signal(signal.SIGINT, signal_handler) robot = None @@ -513,8 +455,7 @@ if __name__ == '__main__': result = robot.teleop_step(record_data=True) if result is None: continue - # 这里可以处理记录的数据 - + # 这里可以处理记录的数据 except KeyboardInterrupt: print("\n收到键盘中断信号") except Exception as e: @@ -525,4 +466,4 @@ if __name__ == '__main__': robot.disconnect() except: pass - print("程序已完全退出") \ No newline at end of file + print("程序已完全退出") diff --git a/lerobot/common/robot_devices/robots/utils.py b/lerobot/common/robot_devices/robots/utils.py index 29654ecb..62643ffc 100644 --- a/lerobot/common/robot_devices/robots/utils.py +++ b/lerobot/common/robot_devices/robots/utils.py @@ -13,6 +13,20 @@ # limitations under the License. from typing import Protocol, Dict # Robot configuration imports +import os +import re +import json +import base64 +import yaml +import threading +from io import BytesIO +from typing import Protocol, Dict +import numpy as np +from openai import OpenAI +from PIL import Image +from pynput import keyboard +import torch +# Robot config imports from lerobot.common.robot_devices.robots.configs import ( AlohaRobotConfig, KochBimanualRobotConfig, @@ -27,28 +41,23 @@ from lerobot.common.robot_devices.robots.configs import ( RealmanRobotConfig, RealmanDualRobotConfig ) -# Added library imports for LLM interaction -import os -import base64 -from io import BytesIO -import json -from openai import OpenAI -from PIL import Image -import numpy as np -import torch -def get_arm_id(name, arm_type): - """Returns the string identifier of a robot arm.""" - return f"{name}_{arm_type}" + class Robot(Protocol): robot_type: str features: dict - cameras: Dict + cameras: Dict def connect(self): ... def run_calibration(self): ... def teleop_step(self, record_data=False): ... def capture_observation(self) -> Dict: ... def send_action(self, action): ... def disconnect(self): ... + + +def get_arm_id(name, arm_type) -> str: + """返回机器人手臂ID""" + return f"{name}_{arm_type}" + def make_robot_config(robot_type: str, **kwargs) -> RobotConfig: if robot_type == "aloha": return AlohaRobotConfig(**kwargs) @@ -72,7 +81,9 @@ def make_robot_config(robot_type: str, **kwargs) -> RobotConfig: return RealmanDualRobotConfig(**kwargs) else: raise ValueError(f"Robot type '{robot_type}' is not available.") + def make_robot_from_config(config: RobotConfig): + """根据配置生成机器人实例""" if isinstance(config, ManipulatorRobotConfig): from lerobot.common.robot_devices.robots.manipulator import ManipulatorRobot return ManipulatorRobot(config) @@ -88,227 +99,55 @@ def make_robot_from_config(config: RobotConfig): else: from lerobot.common.robot_devices.robots.stretch import StretchRobot return StretchRobot(config) + def make_robot(robot_type: str, **kwargs) -> Robot: + """指定类型构造机器人""" config = make_robot_config(robot_type, **kwargs) return make_robot_from_config(config) -# 全局变量,用于管理会话历史和API客户端 -conversation_history = [] -conversation_client = None -def reset_conversation_history(): - """清空对话历史,开始一个新任务。""" - global conversation_history - conversation_history = [] - print("对话历史已重置。") + +def numpy_to_base64(img_array: np.ndarray) -> str: + """numpy数组转Base64 PNG图像""" + if img_array.dtype != np.uint8: + if img_array.max() <= 1.0: + img_array = (img_array * 255).astype(np.uint8) + else: + img_array = img_array.astype(np.uint8) + buffered = BytesIO() + Image.fromarray(img_array).save(buffered, format="PNG") + return base64.b64encode(buffered.getvalue()).decode('utf-8') def extract_json_from_response(response: str) -> dict: - """ - 从LLM响应中提取JSON格式的指令 - 从realman_dual.py移动过来 - - Args: - response: LLM的原始响应文本 - - Returns: - dict: 解析后的JSON指令字典 - """ + """解析LLM返回的结构化指令""" try: - # 尝试直接解析整个响应 return json.loads(response) except: - # 尝试从响应中查找JSON部分 - import re - json_pattern = r'\{[^{}]*\}' - matches = re.findall(json_pattern, response) - - for match in matches: - try: - return json.loads(match) - except: - continue - - # 如果无法解析JSON,返回默认格式 return { - "action": "unknown", - "description": response, - "status": "parse_error" - } - -def ask_llm(query: str, state: dict): - """ - 向大型语言模型发送查询,并获取下一步操作指令。 - 优化版本:只保留最新的4张图片,避免Token超限 - """ - global conversation_client - global conversation_history - - # 将NumPy数组转换为Base64编码的图片 - def numpy_to_base64(img_array): - if img_array.dtype != np.uint8: - if img_array.max() <= 1.0: - img_array = (img_array * 255).astype(np.uint8) - else: - img_array = img_array.astype(np.uint8) - image = Image.fromarray(img_array) - buffered = BytesIO() - image.save(buffered, format="PNG") - img_base64 = base64.b64encode(buffered.getvalue()).decode('utf-8') - return img_base64 - - # 初始化API客户端 - api_key = "sk-H5FY8bQn6ZwBCja56280C3A4C8824017A4CdB683Dc990e35" - base_url = "https://api.apiyi.com/v1" - - # if not api_key: - # raise ValueError("API Key为空!请直接在代码中填写。") - - if conversation_client is None: - conversation_client = OpenAI(api_key=api_key, base_url=base_url) - - # 仅在对话历史为空时添加系统提示 - if not conversation_history: - system_prompt = f""" -你是一个聪明的双臂机器人助手: -**机器人配置:** -- 左臂:配备夹爪,可以抓取物体 -- 右臂:没有夹爪,主要用于辅助定位和观察 -- 四个摄像头:high camera(俯视角)、front camera(正视角)、left camera(左臂视角)、right camera(右臂视角) -**任务目标:** {query} -**工作流程:** -1. 我向你展示当前4个摄像头的画面 -2. 你分析场景,给出下一步具体操作指令 -3. 我执行你的指令后,再次更新画面 -4. 重复w此过程直到完成任务 -**重要要求:** -- 每次只给出ONE STEP最关键的操作指令 -- 指令要具体明确,便于执行(如"将左臂夹爪移动到试管正上方5cm处,准备下降抓取") -- 当视角不清晰时,要求调整摄像头位置 -- 左臂负责抓取,右臂负责辅助观察和定位 -- 给出的指令必须可执行,避免模糊描述 -**输出格式要求:** -- 使用纯文本输出,不要使用任何Markdown格式符号 -- 不要使用星号、井号、下划线等格式化符号 -- 直接给出简洁分析和具体操作指令 -- 输出内容适合语音播报 -**输出格式:** -简洁分析 + 具体操作指令 - """ - conversation_history.append({"role": "system", "content": system_prompt}) - - # 如果对话历史中有图片消息,只保留最近的一轮对话(系统消息 + 最后一轮用户-助手对话) - if len(conversation_history) > 3: # 系统消息 + 用户消息 + 助手回复 - # 保留系统消息 - system_msg = conversation_history[0] - # 只保留最后一轮对话(最后的用户消息和助手回复) - recent_messages = conversation_history[-2:] # 最后2条消息 - conversation_history = [system_msg] + recent_messages - print("清理对话历史,避免Token超限") - - # 构建本次用户输入的消息内容 - user_content = [{"type": "text", "text": "这是当前的4个摄像头画面,请根据任务目标分析场景并给出下一步具体操作指令。"}] - - # 添加图片,按照指定顺序 - camera_order = ['high', 'front', 'left', 'right'] - for camera_name in camera_order: - if camera_name in state: - img_base64 = numpy_to_base64(state[camera_name]) - user_content.append({ - "type": "image_url", - "image_url": { - "url": f"data:image/png;base64,{img_base64}", - "detail": "high" # 使用高清晰度分析 - } - }) - - # 将当前用户输入添加到对话历史中 - conversation_history.append({"role": "user", "content": user_content}) - - try: - completion = conversation_client.chat.completions.create( - model="claude-sonnet-4-20250514", - messages=conversation_history, - max_tokens=800, - temperature=0.3 - ) - - response_message = completion.choices[0].message - response_content = response_message.content - - # 将模型的响应添加到历史中 - conversation_history.append(response_message) - - print(f"机器人响应: {response_content}") - return response_content - - except Exception as e: - error_msg = str(e) - print(f"调用LLM时出错: {error_msg}") - - # Token超限时的特殊处理 - if "token_limit_exceeded" in error_msg or "context_length_exceeded" in error_msg: - print("检测到Token超限,清空对话历史重新开始...") - # 清空除系统消息外的所有历史 - if conversation_history: - conversation_history = [conversation_history[0]] # 只保留系统消息 - # 移除刚刚添加的用户消息,准备重试 - if conversation_history and len(conversation_history) > 1: - conversation_history.pop() - return "由于对话历史过长,已重置对话。请再次按Enter键继续。" - - # 其他错误时也要清理最后的用户输入 - if conversation_history and conversation_history[-1]["role"] == "user": - conversation_history.pop() - return None - -def extract_json_from_response(response: str) -> dict: - """ - 从LLM响应中提取结构化指令 - 优化版本 - """ - try: - # 尝试直接解析整个响应 - return json.loads(response) - except: - # 如果无法解析JSON,创建结构化的响应 - return { - "action": "move_arm", # 默认动作类型 + "action": "move_arm", "description": response.strip(), - "arm": "left", # 默认使用左臂 + "arm": "left", "target": "unknown", "status": "ready" } - -from pynput import keyboard -import threading - def create_keyboard_listener(on_key_press_callback): - """ - 创建键盘监听器 - - Args: - on_key_press_callback: 按键回调函数,接收参数 (key_char) - - Returns: - keyboard.Listener: 键盘监听器对象 - """ + """创建键盘监听器""" def on_press(key): try: if hasattr(key, 'char') and key.char: on_key_press_callback(key.char.lower()) except AttributeError: pass - return keyboard.Listener(on_press=on_press) def start_keyboard_listener(listener): - """启动键盘监听器""" try: listener.start() return True except Exception as e: print(f"键盘监听器启动失败: {e}") return False - + def stop_keyboard_listener(listener): - """停止键盘监听器""" try: if listener and listener.is_alive(): listener.stop() @@ -316,71 +155,61 @@ def stop_keyboard_listener(listener): except Exception: pass return False - + def speak_async(text: str): """异步语音播报""" from lerobot.common.utils.utils import say - def speak_thread(): try: - print(f"开始语音播报: {text}") say(text, blocking=True) - print("语音播报完成") except Exception as e: print(f"语音播报失败: {e}") - thread = threading.Thread(target=speak_thread, daemon=True) thread.start() + +def ask_llm(query: str, state: dict, history=None): + """调用LLM进行场景分析""" + + api_key = os.getenv("ASK_LLM_API_KEY") + base_url = os.getenv("ASK_LLM_BASE_URL") + model_name = os.getenv("ASK_LLM_MODEL") + if not (api_key and base_url and model_name): + raise EnvironmentError("请设置 ASK_LLM_API_KEY, ASK_LLM_BASE_URL, ASK_LLM_MODEL 环境变量") + prompt_path = os.path.join(os.path.dirname(__file__), "llm_prompt.yaml") + with open(prompt_path, "r", encoding="utf-8") as f: + prompts = yaml.safe_load(f) + system_prompt = prompts["system_prompt"].replace("{query}", query) + if history is None: + history = [] + if not history: + history.append({"role": "system", "content": system_prompt}) + if len(history) > 3: + history = [history[0]] + history[-2:] # 保留系统提示和最后一轮 + user_content = [{"type": "text", "text": "以下是当前4个摄像头画面,请分析并给出一步操作指令"}] + for name in ["high", "front", "left", "right"]: + if name in state: + img_base64 = numpy_to_base64(state[name]) + user_content.append({ + "type": "image_url", + "image_url": {"url": f"data:image/png;base64,{img_base64}", "detail": "high"} + }) + history.append({"role": "user", "content": user_content}) + client = OpenAI(api_key=api_key, base_url=base_url) + completion = client.chat.completions.create(model=model_name, messages=history, max_tokens=800, temperature=0.3) + resp_msg = completion.choices[0].message + history.append(resp_msg) + return resp_msg.content def handle_llm_interaction_with_images(query: str, camera_images: dict): - """ - 处理LLM交互,接收相机图像字典 - - Args: - query: 用户查询文本 - camera_images: 相机图像字典 {camera_name: numpy_array} - - Returns: - tuple: (success: bool, response: str) - """ + """处理LLM交互""" if len(camera_images) != 4: - print(f"警告: 期望4个相机,实际获取到{len(camera_images)}个") - return False, "相机数量不足" - - print("图像提取成功,正在向LLM发送请求...") - + return False, "摄像头数量不足" try: response = ask_llm(query, camera_images) - if response: - if "已重置对话" in response: - print("对话历史已重置,请再次按键继续") - return True, response - - instruction = extract_json_from_response(response) - print("\n下一步指令:") - print(f"操作类型: {instruction.get('action', 'unknown')}") - print(f"详细描述: {instruction.get('description', '无描述')}") - - # 准备语音播报内容 - if "description" in instruction: - desc = instruction['description'] - if len(desc) > 100: - sentences = desc.split('。') - key_instruction = sentences[-2] if len(sentences) > 1 else desc[:50] - else: - key_instruction = desc - speech_text = f"下一步操作:{key_instruction}" - print(f"即将播报: {speech_text}") - speak_async(speech_text) - - return True, response - else: - print("LLM请求失败,可能是网络问题") - speak_async("LLM请求失败") - return False, "LLM请求失败" - + instruction = extract_json_from_response(response) + if desc := instruction.get("description"): + speak_async(f"下一步操作:{desc}") + return True, response except Exception as e: - print(f"LLM交互出错: {e}") - print("建议:1.检查网络连接 2.检查API密钥 3.稍后重试") speak_async("LLM交互失败") - return False, f"LLM交互出错: {e}" \ No newline at end of file + return False, str(e) diff --git a/lerobot/common/robot_devices/teleop/gamepad.py b/lerobot/common/robot_devices/teleop/gamepad.py index b40e3098..f62930b0 100644 --- a/lerobot/common/robot_devices/teleop/gamepad.py +++ b/lerobot/common/robot_devices/teleop/gamepad.py @@ -406,7 +406,7 @@ if __name__ == "__main__": 'max_gripper': {}, 'min_gripper': {}, 'servo_config_file': {}, - 'end_control_info': {'left': "0300b14bff1100003708000010010000" , 'right': '0300509d5e040000120b000009050000'} + 'end_control_info': {'left': "0300b14bff1100003708000010010000" , 'right': '0300e639bc2000007f50000011010000'} } config = parse_init_info(config) endpose_arm = DummyEndposeMaster(config) diff --git a/realman.md b/realman.md index 28bda1fc..ff3615d2 100644 --- a/realman.md +++ b/realman.md @@ -151,4 +151,18 @@ python lerobot/scripts/control_robot.py \ --control.push_to_hub=false \ --control.policy.path=outputs/train/smolvla_move_the_bottle_into_ultrasonic_device_with_realman_single/checkpoints/160000/pretrained_model \ --control.display_data=true -``` \ No newline at end of file +``` + +```bash +export ASK_LLM_BASE_URL=https://api.apiyi.com/v1 +export ASK_LLM_API_KEY=sk-H5FY8bQn6ZwBCja56280C3A4C8824017A4CdB683Dc990e35 +export ASK_LLM_MODEL=claude-sonnet-4-20250514 + +python lerobot/scripts/control_robot.py \ +--robot.type=realman_dual \ +--robot.inference_time=false \ +--control.type=teleoperate \ +--control.display_data=true \ +--robot.left_end_control_guid="0300b14bff1100003708000010010000" \ +--robot.right_end_control_guid="0300e639bc2000007f50000011010000" +```