Files
matagent/backend/utils.py
2025-01-18 17:53:58 +08:00

461 lines
15 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import requests
import json
import yaml
from typing_extensions import Annotated, List
from autogen.coding.func_with_reqs import with_requirements
# 读取YAML配置文件
def load_agent_configs(config_path):
with open(config_path, 'r') as file:
return yaml.safe_load(file)
# Define the termination message function
def termination_msg(x):
return isinstance(x, dict) and "TERMINATE" == str(x.get("content", ""))[-9:].upper()
@with_requirements(python_packages=["requests"], global_imports=["requests"])
def retrieval_from_knowledge_base(
query: str,
topk: int
) -> str:
"""
Retrieval for knowledge from the knowledge base based on the specified query and returns the topk results.
Parameters:
query (str): The query for knowledge retrieval.
topk (int): The number of top results to return, default is 3.
Returns:
str: The result of the knowledge retrieval in JSON format.
"""
url = 'http://127.0.0.1:7080/v1/chat-messages'
headers = {
'Authorization': f'Bearer app-uJgo3TQKcS1O9PMCDHko71Fp',
'Content-Type': 'application/json'
}
data = {
"inputs": {"topK": topk},
"query": query,
"response_mode": "blocking",
"user": "tangger",
"files": []
}
response = requests.post(url, headers=headers, json=data)
if response.status_code == 524:
print("Server is not responding. Please try again later. Maybe GPU was down in the container.")
return None
try:
result = response.json()
except ValueError:
return [{"error": "Response is not in JSON format"}]
useful_results = []
try:
answer = eval(result.get("answer", "[]"))
for item in answer:
metadata = item.get("metadata", {})
useful_info = {
"id": metadata.get("document_id"),
"title": item.get("title"),
"content": item.get("content"),
"metadata": None,
"embedding": None,
"score": metadata.get("score")
}
useful_results.append(useful_info)
except Exception as e:
return [{"error": f"Error processing result: {e}", "status": "TERMINATE"}]
if useful_results == []:
useful_results = "NULL"
return str(useful_results)
@with_requirements(python_packages=["graphrag", "graphrag_api"], global_imports=["graphrag", "graphrag_api"])
def retrieval_from_graphrag(query: str,) -> str:
from graphrag_api.search import SearchRunner
search_runner = SearchRunner(root_dir="/home/ubuntu/workplace/LYT/llm-agent/psk-graphrag")
result = search_runner.remove_sources(search_runner.run_local_search(query=query))
return result
@with_requirements(python_packages=["requests"], global_imports=["requests", "urllib.parse"])
def web_searcher(query: str):
from urllib.parse import quote
encoded_query = quote(query)
url = 'https://s.jina.ai/' + encoded_query
headers = {
'Authorization': 'Bearer jina_8fc99db105ed48d7ab6a76ecd117e8e9GkNoWHY3tbmIMYSwQ6GAgzmr6c7z',
'X-Return-Format': 'markdown'
}
response = requests.get(url, headers=headers)
# print(response.text)
return str(response.text)
@with_requirements(python_packages=["requests", "mp-api"], global_imports=["requests", "mp-api"])
def mp_database(api_key: str, query: str):
"""
Query the Materials Project database using an API key and a material ID.
Parameters:
api_key (str): The API key used to access the Materials Project database.
query (str): The material ID to query in the database.
Returns:
list: A list of document objects related to the material ID query.
"""
from mp_api.client import MPRester
import requests.exceptions
# Validate API key and query parameters
if not isinstance(api_key, str) or not api_key.strip():
raise ValueError("The API key must be a non-empty string.")
if not isinstance(query, str) or not query.strip():
raise ValueError("The query must be a non-empty string.")
try:
# Accessing the MPRester with provided API key
with MPRester(api_key) as mpr:
# Search for the material based on the query
docs = mpr.materials.summary.search(
material_ids=[query]
)
# Check if any documents are returned
if not docs:
return f"No results found for the given material ID: {query}"
return docs
except requests.exceptions.HTTPError as http_err:
return f"HTTP error occurred: {http_err}"
except requests.exceptions.ConnectionError as conn_err:
return f"Connection error occurred: {conn_err}"
except requests.exceptions.Timeout as timeout_err:
return f"Request timed out: {timeout_err}"
except requests.exceptions.RequestException as req_err:
return f"An error occurred while making the request: {req_err}"
except Exception as e:
return f"An unexpected error occurred: {e}"
@with_requirements(python_packages=["requests"], global_imports=["requests"])
def oqdm_database(query: str):
"""
Query the OQMD (Open Quantum Materials Database) to retrieve information about a material composition.
Parameters:
query (str): The material composition to query in the database.
Returns:
dict: A dictionary containing information related to the queried material composition.
"""
import requests
# Validate query parameter
if not isinstance(query, str) or not query.strip():
raise ValueError("The query must be a non-empty string.")
url = f"https://oqmd.org/materials/composition/{query}"
try:
# Make the request to the OQMD API
response = requests.get(url)
response.raise_for_status()
# Parse the response
data = response.json()
# Check if data is available
if not data:
return f"No results found for the given material composition: {query}"
return data
except requests.exceptions.HTTPError as http_err:
return f"HTTP error occurred: {http_err}"
except requests.exceptions.ConnectionError as conn_err:
return f"Connection error occurred: {conn_err}"
except requests.exceptions.Timeout as timeout_err:
return f"Request timed out: {timeout_err}"
except requests.exceptions.RequestException as req_err:
return f"An error occurred while making the request: {req_err}"
except Exception as e:
return f"An unexpected error occurred: {e}"
@with_requirements(python_packages=["requests"], global_imports=["requests"])
def aflow_database(query: list[str]):
"""
Query the AFLOW database to retrieve information about materials based on a list of species.
Parameters:
query (list[str]): A list of chemical species to query in the AFLOW database.
Returns:
dict: A dictionary containing information related to the queried species.
"""
import requests
# Validate query parameter
if not isinstance(query, list) or not query:
raise ValueError("The query must be a non-empty list of species.")
if not all(isinstance(species, str) and species.strip() for species in query):
raise ValueError("All species in the query must be non-empty strings.")
query_str = ",".join(query)
url = f"https://aflowlib.duke.edu/search/ui/API/aflux/?species({query_str}),$catalog(ICSD),$paging(1,1000)"
try:
# Make the request to the AFLOW API
response = requests.get(url)
response.raise_for_status()
# Parse the response
data = response.json()
# Check if data is available
if not data:
return f"No results found for the given species: {query_str}"
return data
except requests.exceptions.HTTPError as http_err:
return f"HTTP error occurred: {http_err}"
except requests.exceptions.ConnectionError as conn_err:
return f"Connection error occurred: {conn_err}"
except requests.exceptions.Timeout as timeout_err:
return f"Request timed out: {timeout_err}"
except requests.exceptions.RequestException as req_err:
return f"An error occurred while making the request: {req_err}"
except Exception as e:
return f"An unexpected error occurred: {e}"
@with_requirements(python_packages=["requests"], global_imports=["socket, json, re"])
def send_instruction_to_robot(json_data: str):
import socket
import json
import re
# 去掉可能存在的 ```json 和 ``` 标记
json_data_cleaned = re.sub(r'```json|```', '', json_data).strip()
try:
# 尝试解析清理后的JSON数据
data = json.loads(json_data_cleaned)
except json.JSONDecodeError as e:
print(f"JSON解析错误: {e}")
return
# 创建UDP套接字
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# 目标地址和端口
server_address = ('172.20.103.79', 10000)
try:
# 序列化为JSON字符串并编码为字节
json_bytes = json.dumps(data).encode('utf-8')
# 发送数据
sock.sendto(json_bytes, server_address)
print("指令发送成功")
except Exception as e:
print(f"发送数据时发生错误: {e}")
finally:
# 关闭套接字
sock.close()
@with_requirements(python_packages=["pandas"], global_imports=["pandas", "os", "glob"])
def get_uv_latest_file():
import os
import glob
# UV数据缓存文件夹路径 (请将此路径修改为实际的文件夹路径)
current_folder = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
folder_path = os.path.join(current_folder, 'data/UV/')
# 查找文件夹中的所有 .wls 文件
uv_files = sorted(glob.glob(os.path.join(folder_path, '*.[Tt][Xx][Tt]')))
if not uv_files:
res = f"ERROR: 缓存文件夹{current_folder}中没有找到任何UV文件"
return res
# 找到最新修改的文件
latest_file = uv_files[-1]
res = f"找到最新的UV数据文件: {latest_file}"
return res
@with_requirements(python_packages=["pandas"], global_imports=["pandas", "os", "glob"])
def get_max_uv_wavelength_from_txt(latest_file_path: str):
import pandas as pd
import os
# 文件检查
if not os.path.isfile(latest_file_path):
res = "ERROR: 指定的文件不存在"
return res
# 打开并读取最新文件
with open(latest_file_path, 'r') as file:
lines = file.readlines()
# 找到数据开始的行号
data_start_index = -1
for i, line in enumerate(lines):
if "Wavelength Scan Data Record" in line:
data_start_index = i + 2 # 数据从该行的下两行开始
break
if data_start_index == -1:
res = "ERROR: 无法找到数据记录部分"
return res
# 解析数据并构建表格
data = []
for line in lines[data_start_index:]:
parts = line.split()
if len(parts) == 7: # 保证每行有7列数据
no, wavelength, abs_value, trans, energy, energy_100, energy_0 = parts
try:
data.append({
'No': int(no),
'Wavelength(nm)': float(wavelength),
'Abs': float(abs_value),
'Trans(%T)': float(trans),
'Energy': float(energy),
'Energy(100%T)': float(energy_100),
'Energy(0%T)': float(energy_0)
})
except ValueError:
print(f"跳过无法解析的行: {line}")
if not data:
res = "ERROR: 未解析到任何有效数据"
return res
# 构建DataFrame
df = pd.DataFrame(data)
# 找到Abs值最大的行
max_abs_row = df.loc[df['Abs'].idxmax()]
# 获取最大Abs值对应的波长
max_abs_wavelength = max_abs_row['Wavelength(nm)']
res = f"本次实验的UV波长为: {max_abs_wavelength} nm"
print(res)
return res
@with_requirements(python_packages=["pandas"], global_imports=["pandas", "os", "glob"])
def get_pl_latest_file():
import os
import glob
# PL数据缓存文件夹路径 (请将此路径修改为实际的文件夹路径)
current_folder = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
folder_path = os.path.join(current_folder, 'data/PL/')
# 查找文件夹中的所有 .txt 或 .TXT 文件
pl_files = sorted(glob.glob(os.path.join(folder_path, '*.[Tt][Xx][Tt]')))
if not pl_files:
res = f"ERROR: 缓存文件夹{current_folder}中没有找到任何PL文件"
return res
# 找到最新修改的文件
latest_file = pl_files[-1]
res = f"找到最新的PL数据文件: {latest_file}"
# print(res)
return res
@with_requirements(python_packages=["pandas"], global_imports=["pandas", "os", "glob"])
def get_max_pl_peak_from_txt(latest_file_path: str):
import pandas as pd
import os
# 文件检查
if not os.path.isfile(latest_file_path):
res = "ERROR: 指定的文件不存在"
return res
# 打开并读取最新文件
with open(latest_file_path, 'r') as file:
lines = file.readlines()
# 找到 'Data Points' 开始的行号
data_start_index = -1
for i, line in enumerate(lines):
if "Data Points" in line:
data_start_index = i + 1 # 数据从该行的下一行开始
break
if data_start_index == -1:
res = "ERROR: 无法找到数据记录部分"
return res
# 解析nm和Data数据
data = []
for line in lines[data_start_index:]:
parts = line.split()
if len(parts) == 2: # 每行应该有2列数据nm 和 Data
try:
nm = float(parts[0])
data_value = float(parts[1])
data.append({'nm': nm, 'Data': data_value})
except ValueError:
print(f"跳过无法解析的行: {line}")
if not data:
res = "ERROR: 未解析到任何有效数据"
return res
# 构建DataFrame
df = pd.DataFrame(data)
# 找到Data值最大的行
max_data_row = df.loc[df['Data'].idxmax()]
# 获取最大Data值对应的nm
max_data_nm = max_data_row['nm']
res = f"本次实验的PL峰位为: {max_data_nm} nm"
print(res)
return res
if __name__ == "__main__":
# res = web_searcher("how to Synthesis CsPbBr3 nanocubes at room temperature?")
# print(res)
# res = retrieval_from_knowledge_base("how to Synthesis CsPbBr3 nanocubes at room temperature?", 3)
# print(res)
# res = get_max_uv_wavelength_from_txt()
# print(res)
# res = get_max_pl_peak_from_txt()
# print(res)
# res = retrieval_from_graphrag("how to Synthesis CsPbBr3 nanocubes at room temperature?")
# print(res)
pass