语音播报

This commit is contained in:
2025-08-11 21:17:48 +08:00
parent 951b633970
commit 06a21b1886
2 changed files with 0 additions and 205 deletions

View File

@@ -260,52 +260,6 @@ class RealmanDualRobot:
if not self.inference_time:
self.teleop.reset()
# def teleop_step(self, record_data=False) -> None | tuple[dict[str, torch.Tensor], dict[str, torch.Tensor]]:
# """遥操作步骤 - 保持原有数据记录逻辑添加LLM交互"""
# if not self.is_connected:
# raise ConnectionError()
# if self.teleop is None and self.inference_time:
# self.teleop = HybridController(self.init_info)
# try:
# # 检查退出条件
# if self.q_pressed:
# print("检测到Q键任务终止...")
# speak_async("任务已终止")
# raise KeyboardInterrupt("用户请求退出")
# # 执行基础遥操作
# state = self._read_robot_state()
# action = self.teleop.get_action(state)
# self._execute_action(action, state)
# # 更新状态队列
# self._update_state_queue()
# time.sleep(0.019) # 50Hz
# # 处理数据记录
# if record_data:
# data = self._prepare_record_data()
# if data[0]: # 如果有有效数据
# # 处理LLM交互请求
# if self.w_pressed:
# self.w_pressed = False # 重置标志位
# self._llm_triggered = True
# success = self._handle_llm_interaction(data[0])
# if not success:
# print("LLM交互处理失败")
# return data
# return None
# except KeyboardInterrupt:
# # 重新抛出键盘中断,让上层处理
# raise
# except Exception as e:
# logging.error(f"遥操作步骤失败: {e}")
# return None
def teleop_step(self, record_data=False) -> None | tuple[dict[str, torch.Tensor], dict[str, torch.Tensor]]:
"""遥操作步骤 - 保持原有数据记录逻辑添加LLM交互"""
if not self.is_connected:

View File

@@ -1,162 +1,3 @@
# # Copyright 2024 The HuggingFace Inc. team. All rights reserved.
# #
# # Licensed under the Apache License, Version 2.0 (the "License");
# # you may not use this file except in compliance with the License.
# # You may obtain a copy of the License at
# #
# # http://www.apache.org/licenses/LICENSE-2.0
# #
# # Unless required by applicable law or agreed to in writing, software
# # distributed under the License is distributed on an "AS IS" BASIS,
# # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# # See the License for the specific language governing permissions and
# # limitations under the License.
# conversation_history = []
# conversation_client = None
# from typing import Protocol, Dict
# # Robot configuration imports
# from lerobot.common.robot_devices.robots.configs import (
# AlohaRobotConfig,
# KochBimanualRobotConfig,
# KochRobotConfig,
# LeKiwiRobotConfig,
# ManipulatorRobotConfig,
# MossRobotConfig,
# RobotConfig,
# So100RobotConfig,
# So101RobotConfig,
# StretchRobotConfig,
# RealmanRobotConfig,
# RealmanDualRobotConfig
# )
# # Added library imports for LLM interaction
# from openai import OpenAI
# import base64
# import os
# import cv2
# import torch
# import numpy as np
# from pynput import keyboard
# import time
# import json
# from datetime import datetime
# def get_arm_id(name, arm_type):
# """Returns the string identifier of a robot arm."""
# return f"{name}_{arm_type}"
# class Robot(Protocol):
# robot_type: str
# features: dict
# cameras: Dict
# def connect(self): ...
# def run_calibration(self): ...
# def teleop_step(self, record_data=False): ...
# def capture_observation(self) -> Dict: ...
# def send_action(self, action): ...
# def disconnect(self): ...
# def make_robot_config(robot_type: str, **kwargs) -> RobotConfig:
# # ... (此函数内容保持不变) ...
# if robot_type == "aloha":
# return AlohaRobotConfig(**kwargs)
# elif robot_type == "koch":
# return KochRobotConfig(**kwargs)
# elif robot_type == "koch_bimanual":
# return KochBimanualRobotConfig(**kwargs)
# elif robot_type == "moss":
# return MossRobotConfig(**kwargs)
# elif robot_type == "so100":
# return So100RobotConfig(**kwargs)
# elif robot_type == "so101":
# return So101RobotConfig(**kwargs)
# elif robot_type == "stretch":
# return StretchRobotConfig(**kwargs)
# elif robot_type == "lekiwi":
# return LeKiwiRobotConfig(**kwargs)
# elif robot_type == 'realman':
# return RealmanRobotConfig(**kwargs)
# elif robot_type == 'realman_dual':
# return RealmanDualRobotConfig(**kwargs)
# else:
# raise ValueError(f"Robot type '{robot_type}' is not available.")
# def make_robot_from_config(config: RobotConfig):
# # ... (此函数内容保持不变) ...
# if isinstance(config, ManipulatorRobotConfig):
# from lerobot.common.robot_devices.robots.manipulator import ManipulatorRobot
# return ManipulatorRobot(config)
# elif isinstance(config, LeKiwiRobotConfig):
# from lerobot.common.robot_devices.robots.mobile_manipulator import MobileManipulator
# return MobileManipulator(config)
# elif isinstance(config, RealmanRobotConfig):
# from lerobot.common.robot_devices.robots.realman import RealmanRobot
# return RealmanRobot(config)
# elif isinstance(config, RealmanDualRobotConfig):
# from lerobot.common.robot_devices.robots.realman_dual import RealmanDualRobot
# return RealmanDualRobot(config)
# else:
# from lerobot.common.robot_devices.robots.stretch import StretchRobot
# return StretchRobot(config)
# def make_robot(robot_type: str, **kwargs) -> Robot:
# config = make_robot_config(robot_type, **kwargs)
# return make_robot_from_config(config)
# # -------------------- LLM 交互功能区 --------------------
# def encode_image_to_base64(image_tensor: torch.Tensor) -> str:
# """将PyTorch张量格式的图像编码为Base64字符串"""
# image_np = image_tensor.cpu().numpy().astype(np.uint8)
# image_bgr = cv2.cvtColor(image_np, cv2.COLOR_RGB2BGR)
# success, buffer = cv2.imencode(".jpg", image_bgr)
# if not success:
# raise ValueError("图像编码失败")
# return base64.b64encode(buffer).decode("utf-8")
# # 在文件顶部添加全局变量来管理会话
# conversation_history = []
# conversation_client = None
# def ask_llm(query: str, state: dict):
# prompt = """ """
# api_key = os.getenv("OPENAI_API_KEY")
# base_url = os.getenv("OPENAI_BASE_URL")
# client = OpenAI(api_key=api_key, base_url=base_url)
# # keys = [key for key in state]
# # import pdb
# # pdb.set_trace()
# pass
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");