diff --git a/lerobot/common/robot_devices/robots/realman_dual.py b/lerobot/common/robot_devices/robots/realman_dual.py index b0afdd51d..935ac254d 100644 --- a/lerobot/common/robot_devices/robots/realman_dual.py +++ b/lerobot/common/robot_devices/robots/realman_dual.py @@ -260,52 +260,6 @@ class RealmanDualRobot: if not self.inference_time: self.teleop.reset() - # def teleop_step(self, record_data=False) -> None | tuple[dict[str, torch.Tensor], dict[str, torch.Tensor]]: - # """遥操作步骤 - 保持原有数据记录逻辑,添加LLM交互""" - # if not self.is_connected: - # raise ConnectionError() - - # if self.teleop is None and self.inference_time: - # self.teleop = HybridController(self.init_info) - - # try: - # # 检查退出条件 - # if self.q_pressed: - # print("检测到Q键,任务终止...") - # speak_async("任务已终止") - # raise KeyboardInterrupt("用户请求退出") - - # # 执行基础遥操作 - # state = self._read_robot_state() - # action = self.teleop.get_action(state) - # self._execute_action(action, state) - - # # 更新状态队列 - # self._update_state_queue() - # time.sleep(0.019) # 50Hz - - # # 处理数据记录 - # if record_data: - # data = self._prepare_record_data() - # if data[0]: # 如果有有效数据 - # # 处理LLM交互请求 - # if self.w_pressed: - # self.w_pressed = False # 重置标志位 - # self._llm_triggered = True - # success = self._handle_llm_interaction(data[0]) - # if not success: - # print("LLM交互处理失败") - - # return data - - # return None - - # except KeyboardInterrupt: - # # 重新抛出键盘中断,让上层处理 - # raise - # except Exception as e: - # logging.error(f"遥操作步骤失败: {e}") - # return None def teleop_step(self, record_data=False) -> None | tuple[dict[str, torch.Tensor], dict[str, torch.Tensor]]: """遥操作步骤 - 保持原有数据记录逻辑,添加LLM交互""" if not self.is_connected: diff --git a/lerobot/common/robot_devices/robots/utils.py b/lerobot/common/robot_devices/robots/utils.py index c916ccccb..29654ecbb 100644 --- a/lerobot/common/robot_devices/robots/utils.py +++ b/lerobot/common/robot_devices/robots/utils.py @@ -1,162 +1,3 @@ -# # Copyright 2024 The HuggingFace Inc. team. All rights reserved. -# # -# # Licensed under the Apache License, Version 2.0 (the "License"); -# # you may not use this file except in compliance with the License. -# # You may obtain a copy of the License at -# # -# # http://www.apache.org/licenses/LICENSE-2.0 -# # -# # Unless required by applicable law or agreed to in writing, software -# # distributed under the License is distributed on an "AS IS" BASIS, -# # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# # See the License for the specific language governing permissions and -# # limitations under the License. - -# conversation_history = [] -# conversation_client = None - - -# from typing import Protocol, Dict - -# # Robot configuration imports -# from lerobot.common.robot_devices.robots.configs import ( -# AlohaRobotConfig, -# KochBimanualRobotConfig, -# KochRobotConfig, -# LeKiwiRobotConfig, -# ManipulatorRobotConfig, -# MossRobotConfig, -# RobotConfig, -# So100RobotConfig, -# So101RobotConfig, -# StretchRobotConfig, -# RealmanRobotConfig, -# RealmanDualRobotConfig -# ) - -# # Added library imports for LLM interaction -# from openai import OpenAI -# import base64 -# import os -# import cv2 -# import torch -# import numpy as np -# from pynput import keyboard -# import time -# import json -# from datetime import datetime - -# def get_arm_id(name, arm_type): -# """Returns the string identifier of a robot arm.""" -# return f"{name}_{arm_type}" - - -# class Robot(Protocol): -# robot_type: str -# features: dict -# cameras: Dict - -# def connect(self): ... -# def run_calibration(self): ... -# def teleop_step(self, record_data=False): ... -# def capture_observation(self) -> Dict: ... -# def send_action(self, action): ... -# def disconnect(self): ... - - -# def make_robot_config(robot_type: str, **kwargs) -> RobotConfig: -# # ... (此函数内容保持不变) ... -# if robot_type == "aloha": -# return AlohaRobotConfig(**kwargs) -# elif robot_type == "koch": -# return KochRobotConfig(**kwargs) -# elif robot_type == "koch_bimanual": -# return KochBimanualRobotConfig(**kwargs) -# elif robot_type == "moss": -# return MossRobotConfig(**kwargs) -# elif robot_type == "so100": -# return So100RobotConfig(**kwargs) -# elif robot_type == "so101": -# return So101RobotConfig(**kwargs) -# elif robot_type == "stretch": -# return StretchRobotConfig(**kwargs) -# elif robot_type == "lekiwi": -# return LeKiwiRobotConfig(**kwargs) -# elif robot_type == 'realman': -# return RealmanRobotConfig(**kwargs) -# elif robot_type == 'realman_dual': -# return RealmanDualRobotConfig(**kwargs) -# else: -# raise ValueError(f"Robot type '{robot_type}' is not available.") - - -# def make_robot_from_config(config: RobotConfig): -# # ... (此函数内容保持不变) ... -# if isinstance(config, ManipulatorRobotConfig): -# from lerobot.common.robot_devices.robots.manipulator import ManipulatorRobot -# return ManipulatorRobot(config) -# elif isinstance(config, LeKiwiRobotConfig): -# from lerobot.common.robot_devices.robots.mobile_manipulator import MobileManipulator -# return MobileManipulator(config) -# elif isinstance(config, RealmanRobotConfig): -# from lerobot.common.robot_devices.robots.realman import RealmanRobot -# return RealmanRobot(config) -# elif isinstance(config, RealmanDualRobotConfig): -# from lerobot.common.robot_devices.robots.realman_dual import RealmanDualRobot -# return RealmanDualRobot(config) -# else: -# from lerobot.common.robot_devices.robots.stretch import StretchRobot -# return StretchRobot(config) - - -# def make_robot(robot_type: str, **kwargs) -> Robot: -# config = make_robot_config(robot_type, **kwargs) -# return make_robot_from_config(config) - -# # -------------------- LLM 交互功能区 -------------------- - -# def encode_image_to_base64(image_tensor: torch.Tensor) -> str: -# """将PyTorch张量格式的图像编码为Base64字符串""" -# image_np = image_tensor.cpu().numpy().astype(np.uint8) -# image_bgr = cv2.cvtColor(image_np, cv2.COLOR_RGB2BGR) -# success, buffer = cv2.imencode(".jpg", image_bgr) -# if not success: -# raise ValueError("图像编码失败") -# return base64.b64encode(buffer).decode("utf-8") - -# # 在文件顶部添加全局变量来管理会话 -# conversation_history = [] -# conversation_client = None - -# def ask_llm(query: str, state: dict): - -# prompt = """ """ - -# api_key = os.getenv("OPENAI_API_KEY") -# base_url = os.getenv("OPENAI_BASE_URL") -# client = OpenAI(api_key=api_key, base_url=base_url) -# # keys = [key for key in state] -# # import pdb -# # pdb.set_trace() - - -# pass - - - - - - - - - - - - - - - - # Copyright 2024 The HuggingFace Inc. team. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License");