修改名称

This commit is contained in:
2025-03-02 15:06:35 +08:00
parent 24882543a9
commit 329f5c8310
126 changed files with 126 additions and 11322 deletions

636
backend/tools.py Executable file
View File

@@ -0,0 +1,636 @@
import requests
def vector_retrieval_from_knowledge_base(
query: str,
topk: int = 3
) -> str:
"""
Retrieval for knowledge from the knowledge base based on the specified query and returns the topk results.
Parameters:
query (str): The query for knowledge retrieval.
topk (int): The number of top results to return, default is 3.
Returns:
str: The result of the knowledge retrieval in JSON format.
"""
url = 'http://100.85.52.31:7080/v1/chat-messages'
# retrieval_from_knowledge_base
headers = {
'Authorization': f'Bearer app-uJgo3TQKcS1O9PMCDHko71Fp',
'Content-Type': 'application/json'
}
data = {
"inputs": {"topK": topk},
"query": query,
"response_mode": "blocking",
"user": "tangger",
"files": []
}
response = requests.post(url, headers=headers, json=data)
if response.status_code == 524:
print("Server is not responding. Please try again later. Maybe GPU was down in the container.")
return None
try:
result = response.json()
except ValueError:
return [{"error": "Response is not in JSON format"}]
useful_results = []
try:
answer = eval(result.get("answer", "[]"))
for item in answer:
metadata = item.get("metadata", {})
useful_info = {
"id": metadata.get("document_id"),
"title": item.get("title"),
"content": item.get("content"),
"metadata": None,
"embedding": None,
"score": metadata.get("score")
}
useful_results.append(useful_info)
except Exception as e:
return [{"error": f"Error processing result: {e}", "status": "TERMINATE"}]
if useful_results == []:
useful_results = "NULL"
return str(useful_results)
def hybird_retrieval_from_knowledge_base(
query: str,
topk: int = 3
) -> str:
"""
Retrieval for knowledge from the knowledge base based on the specified query and returns the topk results.
Parameters:
query (str): The query for knowledge retrieval.
topk (int): The number of top results to return, default is 3.
Returns:
str: The result of the knowledge retrieval in JSON format.
"""
url = 'http://100.85.52.31:7080/v1/chat-messages'
# retrieval_from_knowledge_base
# headers = {
# 'Authorization': f'Bearer app-uJgo3TQKcS1O9PMCDHko71Fp',
# 'Content-Type': 'application/json'
# }
# retrieval_from_knowledge_base_new
headers = {
'Authorization': f'Bearer app-PP7g4pIQORsXwha4SBvSEqBs',
'Content-Type': 'application/json'
}
data = {
"inputs": {"topK": topk},
"query": query,
"response_mode": "blocking",
"user": "tangger",
"files": []
}
response = requests.post(url, headers=headers, json=data)
if response.status_code == 524:
print("Server is not responding. Please try again later. Maybe GPU was down in the container.")
return None
try:
result = response.json()
except ValueError:
return [{"error": "Response is not in JSON format"}]
useful_results = []
try:
answer = eval(result.get("answer", "[]"))
for item in answer:
metadata = item.get("metadata", {})
useful_info = {
"id": metadata.get("document_id"),
"title": item.get("title"),
"content": item.get("content"),
"metadata": None,
"embedding": None,
"score": metadata.get("score")
}
useful_results.append(useful_info)
except Exception as e:
return [{"error": f"Error processing result: {e}", "status": "TERMINATE"}]
if useful_results == []:
useful_results = "NULL"
return str(useful_results)
def search_from_oqmd_by_composition(composition: str) -> str:
"""
Search materials from OQMD database by chemical composition.
Args:
composition (str): Chemical composition string. Example: "CsPbBr3"
Returns:
str: JSON string containing material data or error message
"""
# 构建请求参数
param = {
'composition': composition
}
tool_endpoint = "http://100.84.94.73:8020"
try:
# 发送请求到/oqmd/search路由
response = requests.get(
tool_endpoint + "/oqmd/search",
params=param
)
response.raise_for_status()
return str(response.json()['data'])
except requests.exceptions.RequestException as e:
return f"Error: {str(e)}"
def scheme_convert_to_json():
return """
转换合成方案时必须严格遵守如下预定义的JSON格式每个JSON结构的字段必须填充完整即使有些字段留空
```json
{
"TaskId": "",
"ExperimentName": "",
"Materials": [
{ "MaterialId": "", "Name": "", "Formula": "", "Amount": "", "Unit": "", "Purity": "", "State": ""}
// ...可在此处添加更多material对象
],
"Containers": [{
"ContainerId": "", "Name": "", "Capacity": "", "Unit": "", "MaterialOfConstruction": "", "Shape": "", "HeatResistant": "", "PressureRating": "",
// ...可在此处添加更多container对象
],
"Equipments": [{
"EquipmentId": "", "Name": "",
"Parameters": {
// 具体设备参数(例如 rpm 范围, 温度范围等)
},
}
// ...可在此处添加更多equipment对象
],
"RobotWorkflow": [{
"Stepid": "", "Description": "",
"Actions": [{
"ActionType": "", // limited robot action: "pick_container""place_container""pick_container_with_material""place_container_into_equipment""remove_container_from_equipment"
"ContainerId": "",
"MaterialId": "",
"EquipmentId": "",
}
// ...可在此处添加更多子动作
],
"Dependencies": [
// 若需要依赖之前的若干 step_id可列在这里如 ["1", "2"]
],
"StepOutput": {
"ContainerId": "",
"Contents": [
{
"MaterialId": "",
"Amount": "",
"Unit": ""
}
// ...可在此处列出执行完本步骤后容器中的产物或状态
]
}
}
// ...可在此处添加更多step对象
]
}
```
### JSON结构主要字段说明
1. TaskId 类型: 字符串 说明: 任务的唯一标识符,用于区分不同的任务 限制: 必须唯一,不能重复
2. Materials 类型: 数组 说明: 使用的材料列表每个材料包含ID、名称、数量和单位
3. Containers 类型: 数组 说明: 使用的容器列表每个容器包含ID、类型、容量、单位和可选的附加参数
限制: a.数组中的每个对象必须包含以下字段a.1 "Type": 容器类型(如烧杯、锥形瓶、离心管等) a.2 "Capacity": 容器的容量 a.3 "Unit": 容量的单位
4. Equipments 类型: 数组 说明: 使用的设备列表每个设备包含ID、名称和可选的参数 限制: 数组中的每个对象必须包含 "Name" 字段 "Parameters" 为可选,可根据设备实际需求进行配置(如搅拌速度、超声功率、温度范围等)
5. Workflow 类型: 数组 说明: 包含所有步骤的列表 限制:每个步骤都是一个对象 顺序重要(一般按步骤顺序依次执行)
6. StepId 类型: 整数 说明: 步骤的唯一标识符,用于区分不同的步骤 限制: 必须唯一,不能重复
7. Actions 类型: 数组 说明: 包含该步骤中所有动作的列表 限制: a.每个动作都是一个对象 b.动作在数组中的顺序通常会影响执行顺序
8. ActionId 类型: 字符串 说明: 动作的唯一标识符,用于区分同一步骤内的不同动作 限制: 在同一步骤内必须唯一
9. ActionType 类型: 字符串 说明: 动作的类型,但此处特别强调仅限于机械臂可执行的动作 限制:
a.必须是以下预定义类型之一(对应机械臂操作):
a.1 "pick_container" (拿容器)
a.2 "place_container" (放容器)
a.3 "pick_container_with_material" (拿容器接材料/把材料加到容器里时的动作)
a.4 "place_container_into_equipment" (将容器放进某设备)
a.5 "remove_container_from_equipment" (从设备中取出容器)
b. 诸如“搅拌”、“超声”、“离心”等动作不在此列,它们属于设备自身的潜在动作,不在机械臂的动作范围内
10. Dependencies 类型: 数组 说明: 依赖的前一步骤的 step_id 列表 限制: 每个依赖项必须是有效的 step_id 当本步骤需要等待前面若干步骤完成后再执行时,可通过此字段进行控制
11. StepOutput 类型: 字符串 说明: 步骤的输出标识符,用于后续步骤的输入或引用 限制: 标识符应唯一且有意义 可用来表示该步骤总体产出或容器中的新溶液名等
"""
# def send_instruction_to_robot_platform():
"""从S3获取最新的json文件并返回其URL链接
Returns:
str: 最新json文件的预签名URL
"""
import boto3
from botocore.exceptions import NoCredentialsError
# S3配置
endpoint_url = "http://100.85.52.31:9000" or "https://s3-api.siat-mic.com"
aws_access_key_id = "9bUtQL1Gpo9JB6o3pSGr"
aws_secret_access_key = "1Qug5H73R3kP8boIHvdVcFtcb1jU9GRWnlmMpx0g"
bucket_name = "temp"
try:
# 创建S3客户端
s3 = boto3.client(
's3',
endpoint_url=endpoint_url,
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key
)
# 列出bucket中的所有json文件
response = s3.list_objects_v2(
Bucket=bucket_name,
Prefix='',
Delimiter='/'
)
# 过滤出json文件并按最后修改时间排序
json_files = [
obj for obj in response.get('Contents', [])
if obj['Key'].endswith('.json')
]
json_files.sort(key=lambda x: x['LastModified'], reverse=True)
if not json_files:
return "No JSON files found in S3 bucket"
# 获取最新文件
latest_file = json_files[0]
# 生成预签名URL
url = s3.generate_presigned_url(
'get_object',
Params={
'Bucket': bucket_name,
'Key': latest_file['Key']
},
ExpiresIn=3600 # URL有效期1小时
)
# 将内部URL转换为外部可访问URL
external_url = url.replace("http://100.85.52.31:9000", "https://s3-api.siat-mic.com")
# 发送URL到FastAPI服务器
try:
response = requests.post(
"http://localhost:8030/receive",
json={"url": external_url}
)
response.raise_for_status()
return external_url
except requests.exceptions.RequestException as e:
return f"Error sending URL to server: {str(e)}"
except NoCredentialsError:
return "Credentials not available"
except Exception as e:
return f"Error: {str(e)}"
def upload_to_s3(json_data: str):
import json
import re
import subprocess
import sys
import tempfile
import datetime
def install_boto3():
try:
# 检查 boto3 是否已安装
import boto3
print("boto3 已安装。")
except ImportError:
# 如果未安装,动态安装 boto3
print("正在安装 boto3...")
subprocess.check_call([sys.executable, "-m", "pip", "install", "boto3"])
print("boto3 安装完成。")
def handle_minio_upload(file_path: str, file_name: str) -> str:
"""统一处理MinIO上传"""
import boto3
try:
client = boto3.client(
's3',
endpoint_url="http://100.85.52.31:9000", # or "https://s3-api.siat-mic.com",
aws_access_key_id="9bUtQL1Gpo9JB6o3pSGr",
aws_secret_access_key="1Qug5H73R3kP8boIHvdVcFtcb1jU9GRWnlmMpx0g"
)
client.upload_file(file_path, "temp", file_name, ExtraArgs={"ACL": "private"})
# 生成预签名 URL
url = client.generate_presigned_url(
'get_object',
Params={'Bucket': "temp", 'Key': file_name},
ExpiresIn=36000
)
# return url.replace("http://100.85.52.31:9000" or "", "https://s3-api.siat-mic.com")
return url
except Exception as e:
# print(e)
return f"Error: {str(e)}, Request human/user intervention."
install_boto3()
# 去掉可能存在的 ```json 和 ``` 标记
json_data_cleaned = re.sub(r'```json|```', '', json_data).strip()
try:
# 尝试解析清理后的JSON数据
data = json.loads(json_data_cleaned)
# 取得task id
task_id = data['TaskId']
# print("解析后的JSON数据:", data)
with tempfile.NamedTemporaryFile(mode='w', delete=False) as temp_file:
try:
json.dump(data, temp_file, indent=4, ensure_ascii=False)
temp_file.flush() # 确保数据写入文件
file_name = f"robotExprimentScheme_{task_id}.json"
url = handle_minio_upload(temp_file.name, file_name)
return f"JSON Scheme has been uploaded to S3 storage. The unique URL is: {url}, please pass it to the robot platform."
except Exception as e:
# print(f"写入临时文件或上传文件时出错: {e}")
return f"Error: {str(e)}, Request human/user intervention."
except json.JSONDecodeError as e:
# print(f"JSON解析错误: {e}")
return f"Error: {str(e)}, Request human/user intervention."
def get_latest_exp_log():
def get_uv_latest_file():
import os
import glob
# UV数据缓存文件夹路径 (请将此路径修改为实际的文件夹路径)
current_folder = os.path.dirname(os.path.abspath(__file__))
folder_path = os.path.join(current_folder, 'data/UV/')
# 查找文件夹中的所有 .wls 文件
uv_files = sorted(glob.glob(os.path.join(folder_path, '*.[Tt][Xx][Tt]')))
if not uv_files:
res = f"ERROR: 缓存文件夹{current_folder}中没有找到任何UV文件"
return res
# 找到最新修改的文件
latest_file = uv_files[-1]
res = f"找到最新的UV数据文件: {latest_file}"
return res
def get_pl_latest_file():
import os
import glob
current_folder = os.path.dirname(os.path.abspath(__file__))
folder_path = os.path.join(current_folder, 'data/PL/')
# 查找文件夹中的所有 .txt 或 .TXT 文件
pl_files = sorted(glob.glob(os.path.join(folder_path, '*.[Tt][Xx][Tt]')))
if not pl_files:
res = f"ERROR: 缓存文件夹{current_folder}中没有找到任何PL文件"
return res
# 找到最新修改的文件
latest_file = pl_files[-1]
res = f"找到最新的PL数据文件: {latest_file}"
return res
pl_latest = get_pl_latest_file()
uv_latest = get_uv_latest_file()
return pl_latest + "\n" + uv_latest
def read_data():
get_latest_exp_log()
def default_func():
return "Approved. Proceed as planned!"
def generate_task_id():
import datetime
# 获取当前时间
now = datetime.datetime.now()
# 格式化时间为字符串
formatted_time = now.strftime("%Y%m%d%H%M%S")
# 生成任务ID
task_id = f"task_{formatted_time}"
return task_id
def sendScheme2RobotWorkstation(task_id: str, scheme_url: str):
def mol2mg(formula: str, source_unit: str, target_unit: str, value: float):
"""
将mol转换为mg
Args:
formula: 化学式如CsPb
source_unit: 源单位 (mol或mmol)
target_unit: 目标单位 (mg)
value: 要转换的值
"""
import requests
from periodictable import formula as chem_formula
# 检查单位是否有效
if source_unit.lower() not in ['mol', 'mmol'] or target_unit.lower() != 'mg':
return {"status": "error", "message": "Invalid units. Only mol/mmol to mg conversion supported"}
try:
# 解析化学式并计算摩尔质量
compound = chem_formula(formula)
molar_mass = compound.mass # 获取化合物摩尔质量 (g/mol)
# 转换计算
if source_unit.lower() == 'mol':
mg_value = value * molar_mass * 1000 # mol -> g -> mg
elif source_unit.lower() == 'mmol':
mg_value = value * molar_mass # mmol -> g -> mg
return {
"status": "success",
"formula": formula,
"value": round(mg_value, 4), # 保留4位小数
"unit": "mg"
}
except ValueError as e:
return {"status": "error", "message": f"Invalid chemical formula: {formula}. Error: {str(e)}"}
# 首先检查task_id是否和scheme_url是否匹配
if task_id not in scheme_url:
return {"status": "error", "message": "task_id and scheme_url do not match, Request human/user intervention."}
# 读取scheme_url的内容
import requests
try:
response = requests.get(scheme_url)
response.raise_for_status()
scheme_content = response.text
# 读取scheme_content的内容为JSON
import json
scheme_data = json.loads(scheme_content)
robot_scheme = {}
robot_scheme['TaskId'] = scheme_data['TaskId']
robot_scheme['ExperimentName'] = scheme_data['ExperimentName']
materials = []
for mat in scheme_data['Materials']:
materials.append({
"MaterialId": mat['MaterialId'],
"Name": mat['Name'],
"Amount": mat['Amount'] if "mol" not in mat['Unit'] else mol2mg(mat['Formula'], mat['Unit'], 'mg', float(mat['Amount']))["value"],
"Unit": mat['Unit'] if "mol" not in mat['Unit'] else "mg",
"Purity": mat['Purity'],
"State": mat['State']
})
robot_scheme['Materials'] = materials
robot_scheme['Containers'] = scheme_data['Containers']
robot_scheme['Equipments'] = scheme_data['Equipments']
robot_scheme['RobotWorkflow'] = scheme_data['RobotWorkflow']
# print(scheme_data)
except requests.exceptions.RequestException as e:
return {"status": "error", "message": f"Error reading scheme_url: {e}"}
import requests
url = "http://100.122.132.69:50000/sendScheme2RobotPlatform"
# url = "http://localhost:50000/sendScheme2RobotWorkstation"
data = {"status": "ok"}
try:
response = requests.post(url, json=robot_scheme)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
print(f"Error sending scheme to robot workstation: {e}")
return None
def sendScheme2MobileRobot(task_id: str, scheme_url: str):
def mol2mg(formula: str, source_unit: str, target_unit: str, value: float):
"""
将mol转换为mg
Args:
formula: 化学式如CsPb
source_unit: 源单位 (mol或mmol)
target_unit: 目标单位 (mg)
value: 要转换的值
"""
import requests
from periodictable import formula as chem_formula
# 检查单位是否有效
if source_unit.lower() not in ['mol', 'mmol'] or target_unit.lower() != 'mg':
return {"status": "error", "message": "Invalid units. Only mol/mmol to mg conversion supported"}
try:
# 解析化学式并计算摩尔质量
compound = chem_formula(formula)
molar_mass = compound.mass # 获取化合物摩尔质量 (g/mol)
# 转换计算
if source_unit.lower() == 'mol':
mg_value = value * molar_mass * 1000 # mol -> g -> mg
elif source_unit.lower() == 'mmol':
mg_value = value * molar_mass # mmol -> g -> mg
return {
"status": "success",
"formula": formula,
"value": round(mg_value, 4), # 保留4位小数
"unit": "mg"
}
except ValueError as e:
return {"status": "error", "message": f"Invalid chemical formula: {formula}. Error: {str(e)}"}
# 首先检查task_id是否和scheme_url是否匹配
if task_id not in scheme_url:
return {"status": "error", "message": "task_id and scheme_url do not match, Request human/user intervention."}
# 读取scheme_url的内容
import requests
try:
response = requests.get(scheme_url)
response.raise_for_status()
scheme_content = response.text
# 读取scheme_content的内容为JSON
import json
scheme_data = json.loads(scheme_content)
robot_scheme = {}
robot_scheme['TaskId'] = scheme_data['TaskId']
robot_scheme['ExperimentName'] = scheme_data['ExperimentName']
robot_scheme['Containers'] = scheme_data['Containers']
robot_scheme['Equipments'] = scheme_data['Equipments']
# print(scheme_data)
except requests.exceptions.RequestException as e:
return {"status": "error", "message": f"Error reading scheme_url: {e}"}
import requests
url = "http://100.122.132.69:50000/sendScheme2RobotPlatform"
# url = "http://localhost:50000/sendScheme2MobileRobot"
data = {"status": "ok"}
try:
response = requests.post(url, json=robot_scheme)
response.raise_for_status()
return str(response.json()) + "\n" + "task_id: " + task_id + "\n" + "scheme_url: " + scheme_url + "\n"
except requests.exceptions.RequestException as e:
print(f"Error sending scheme to robot workstation: {e}")
return None
def readPLdata():
def list_to_markdown_table(data):
# 拆分表头和数据行
headers = data[0].split("\t") # 假设列之间用制表符(\t分隔
rows = [line.split("\t") for line in data[1:]]
# 创建表头部分
header_row = "| " + " | ".join(headers) + " |"
separator_row = "| " + " | ".join(["---"] * len(headers)) + " |"
# 创建数据行部分
data_rows = []
for row in rows:
data_rows.append("| " + " | ".join(row) + " |")
# 合并所有部分
markdown_table = "\n".join([header_row, separator_row] + data_rows)
return markdown_table
import requests
url = "http://100.122.132.69:50000/getPLdata"
try:
response = requests.get(url)
response.raise_for_status()
scheme_content = response.text
table = scheme_content.split("Data Points\r\n")[0].split("Peaks\r\n")[-1].split("\r\n")
table = [t for t in table if t.strip()]
md_table = list_to_markdown_table(table)
max_pl = table[1].split('\t')[2]
return md_table + "\n" + f"The max PL value in this expriment is {max_pl}"
except Exception as e:
print(e)
if __name__ == "__main__":
# print(sendScheme2RobotWorkstation(task_id="20250225173342", scheme_url="http://100.85.52.31:9000/temp/robotExprimentScheme_task_20250225173342.json?AWSAccessKeyId=9bUtQL1Gpo9JB6o3pSGr&Signature=bkfes1Nv%2BVzwiUj05p7uHk%2B0P9g%3D&Expires=1740512067"))
# print(get_latest_exp_log())
print(readPLdata())