import requests def retrieval_from_knowledge_base( query: str, topk: int ) -> str: """ Retrieval for knowledge from the knowledge base based on the specified query and returns the topk results. Parameters: query (str): The query for knowledge retrieval. topk (int): The number of top results to return, default is 3. Returns: str: The result of the knowledge retrieval in JSON format. """ url = 'http://100.85.52.31:7080/v1/chat-messages' headers = { 'Authorization': f'Bearer app-uJgo3TQKcS1O9PMCDHko71Fp', 'Content-Type': 'application/json' } data = { "inputs": {"topK": topk}, "query": query, "response_mode": "blocking", "user": "tangger", "files": [] } response = requests.post(url, headers=headers, json=data) if response.status_code == 524: print("Server is not responding. Please try again later. Maybe GPU was down in the container.") return None try: result = response.json() except ValueError: return [{"error": "Response is not in JSON format"}] useful_results = [] try: answer = eval(result.get("answer", "[]")) for item in answer: metadata = item.get("metadata", {}) useful_info = { "id": metadata.get("document_id"), "title": item.get("title"), "content": item.get("content"), "metadata": None, "embedding": None, "score": metadata.get("score") } useful_results.append(useful_info) except Exception as e: return [{"error": f"Error processing result: {e}", "status": "TERMINATE"}] if useful_results == []: useful_results = "NULL" return str(useful_results) def search_from_oqmd_by_composition(composition: str) -> str: """ Search materials from OQMD database by chemical composition. Args: composition (str): Chemical composition string. Example: "CsPbBr3" Returns: str: JSON string containing material data or error message """ # 构建请求参数 param = { 'composition': composition } tool_endpoint = "http://100.84.94.73:8020" try: # 发送请求到/oqmd/search路由 response = requests.get( tool_endpoint + "/oqmd/search", params=param ) response.raise_for_status() return str(response.json()['data']) except requests.exceptions.RequestException as e: return f"Error: {str(e)}" def scheme_convert_to_json(): return """ 转换合成方案时,必须严格遵守如下预定义的JSON格式,每个JSON结构的字段必须填充完整,即使有些字段留空: ```json { "workflow": [ { "step_id": , "description": "", "actions": [ { "action_id": , "action_type": "", "materials": [{"name": "", "amount": , "unit": ""}], "containers": [{"type": "", "capacity": "", "unit": "", "additional_parameters": {"material_of_construction": "", "shape": ""}}], "equipment": [{"name": "", "parameters": {"": , "": }}], "output": "" } ], "dependencies": [""], "step_output": "" } ... ] } ``` ### JSON结构字段说明 1. workflow 类型: 数组; 说明: 包含所有步骤的列表; 限制: 每个步骤都是一个对象,且顺序重要。 2. step_id 类型: 整数; 说明: 步骤的唯一标识符,用于区分不同的步骤; 限制: 必须唯一,不能重复。 3. description 类型: 字符串; 说明: 对步骤的简要描述,说明步骤的目的或内容。限制: 描述应清晰简洁,避免冗长。 4. actions 类型: 数组; 说明: 包含该步骤中所有动作的列表。限制: 每个动作都是一个对象,且顺序可能影响执行顺序。 5. action_id 类型: 字符串; 说明: 动作的唯一标识符,用于区分不同的动作。限制: 在同一步骤内必须唯一。 6. action_type 类型: 字符串; 说明: 动作的类型,例如 "add_material", "ultrasonicate", "centrifuge"。限制: 必须是预定义的类型之一。 7. materials 类型: 数组; 说明: 使用的材料列表,每个材料包含名称、数量和单位。限制: 每个材料对象必须包含 "name", "amount", 和 "unit" 字段。 8. containers 类型: 数组; 说明: 使用的容器列表,每个容器包含类型、容量、单位和附加参数。限制: 每个容器对象必须包含 "type", "capacity", 和 "unit" 字段,"additional_parameters" 为可选。 9. equipment 类型: 数组; 说明: 使用的设备列表,每个设备包含名称和参数。限制: 每个设备对象必须包含 "name" 字段,"parameters" 为可选,根据设备需要填充。 10. output 类型: 字符串; 说明: 动作的输出标识符,用于后续步骤的输入。限制: 标识符应唯一且有意义。 11. dependencies 类型: 数组; 说明: 依赖的前一步骤的 "step_id" 列表。限制: 每个依赖项必须是有效的 "step_id"。 12. step_output 类型: 字符串; 说明: 步骤的输出标识符,用于后续步骤的输入。限制: 标识符应唯一且有意义。 """ def send_instruction_to_robot_platform(): """从S3获取最新的json文件并返回其URL链接 Returns: str: 最新json文件的预签名URL """ import boto3 from botocore.exceptions import NoCredentialsError # S3配置 endpoint_url = "http://100.85.52.31:9000" or "https://s3-api.siat-mic.com" aws_access_key_id = "9bUtQL1Gpo9JB6o3pSGr" aws_secret_access_key = "1Qug5H73R3kP8boIHvdVcFtcb1jU9GRWnlmMpx0g" bucket_name = "temp" try: # 创建S3客户端 s3 = boto3.client( 's3', endpoint_url=endpoint_url, aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key ) # 列出bucket中的所有json文件 response = s3.list_objects_v2( Bucket=bucket_name, Prefix='', Delimiter='/' ) # 过滤出json文件并按最后修改时间排序 json_files = [ obj for obj in response.get('Contents', []) if obj['Key'].endswith('.json') ] json_files.sort(key=lambda x: x['LastModified'], reverse=True) if not json_files: return "No JSON files found in S3 bucket" # 获取最新文件 latest_file = json_files[0] # 生成预签名URL url = s3.generate_presigned_url( 'get_object', Params={ 'Bucket': bucket_name, 'Key': latest_file['Key'] }, ExpiresIn=3600 # URL有效期1小时 ) # 将内部URL转换为外部可访问URL external_url = url.replace("http://100.85.52.31:9000", "https://s3-api.siat-mic.com") # 发送URL到FastAPI服务器 try: response = requests.post( "http://localhost:8030/receive", json={"url": external_url} ) response.raise_for_status() return external_url except requests.exceptions.RequestException as e: return f"Error sending URL to server: {str(e)}" except NoCredentialsError: return "Credentials not available" except Exception as e: return f"Error: {str(e)}" def upload_to_s3(json_data: str): import json import re import subprocess import sys import tempfile import datetime def install_boto3(): try: # 检查 boto3 是否已安装 import boto3 print("boto3 已安装。") except ImportError: # 如果未安装,动态安装 boto3 print("正在安装 boto3...") subprocess.check_call([sys.executable, "-m", "pip", "install", "boto3"]) print("boto3 安装完成。") def handle_minio_upload(file_path: str, file_name: str) -> str: """统一处理MinIO上传""" import boto3 try: client = boto3.client( 's3', endpoint_url="http://100.85.52.31:9000" or "https://s3-api.siat-mic.com", aws_access_key_id="9bUtQL1Gpo9JB6o3pSGr", aws_secret_access_key="1Qug5H73R3kP8boIHvdVcFtcb1jU9GRWnlmMpx0g" ) client.upload_file(file_path, "temp", file_name, ExtraArgs={"ACL": "private"}) # 生成预签名 URL url = client.generate_presigned_url( 'get_object', Params={'Bucket': "temp", 'Key': file_name}, ExpiresIn=3600 ) return url.replace("http://100.85.52.31:9000" or "", "https://s3-api.siat-mic.com") except Exception as e: print(e) return e install_boto3() # 去掉可能存在的 ```json 和 ``` 标记 json_data_cleaned = re.sub(r'```json|```', '', json_data).strip() try: # 尝试解析清理后的JSON数据 data = json.loads(json_data_cleaned) # print("解析后的JSON数据:", data) with tempfile.NamedTemporaryFile(mode='w', delete=False) as temp_file: try: json.dump(data, temp_file, indent=4, ensure_ascii=False) temp_file.flush() # 确保数据写入文件 timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S") file_name = f"robot_expriment_results_{timestamp}.json" url = handle_minio_upload(temp_file.name, file_name) print(f"文件上传成功, 唯一链接为{url}, 请将该URL记下来并传递给机器人平台。") except Exception as e: print(f"写入临时文件或上传文件时出错: {e}") raise # 重新抛出异常以便上层调用者处理 except json.JSONDecodeError as e: print(f"JSON解析错误: {e}") raise # 重新抛出异常以便上层调用者处理 def get_latest_exp_log(): def get_uv_latest_file(): import os import glob # UV数据缓存文件夹路径 (请将此路径修改为实际的文件夹路径) current_folder = os.path.dirname(os.path.abspath(__file__)) folder_path = os.path.join(current_folder, 'data/UV/') # 查找文件夹中的所有 .wls 文件 uv_files = sorted(glob.glob(os.path.join(folder_path, '*.[Tt][Xx][Tt]'))) if not uv_files: res = f"ERROR: 缓存文件夹{current_folder}中没有找到任何UV文件" return res # 找到最新修改的文件 latest_file = uv_files[-1] res = f"找到最新的UV数据文件: {latest_file}" return res def get_pl_latest_file(): import os import glob current_folder = os.path.dirname(os.path.abspath(__file__)) folder_path = os.path.join(current_folder, 'data/PL/') # 查找文件夹中的所有 .txt 或 .TXT 文件 pl_files = sorted(glob.glob(os.path.join(folder_path, '*.[Tt][Xx][Tt]'))) if not pl_files: res = f"ERROR: 缓存文件夹{current_folder}中没有找到任何PL文件" return res # 找到最新修改的文件 latest_file = pl_files[-1] res = f"找到最新的PL数据文件: {latest_file}" # print(res) return res pl_latest = get_pl_latest_file() uv_latest = get_uv_latest_file() return pl_latest + "\n" + uv_latest def default_func(): return "Approved. Proceed as planned!" def sendScheme2RobotPlatform(): import requests url = "http://100.122.132.69:50000/sendScheme2RobotPlatform" data = {"status": "ok"} try: response = requests.post(url, json=data) response.raise_for_status() return response.json() except requests.exceptions.RequestException as e: print(f"Error sending scheme to robot platform: {e}") return None if __name__ == "__main__": print(sendScheme2RobotPlatform())