import requests import json import yaml from typing_extensions import Annotated, List from autogen.coding.func_with_reqs import with_requirements # 读取YAML配置文件 def load_agent_configs(config_path): with open(config_path, 'r') as file: return yaml.safe_load(file) # Define the termination message function def termination_msg(x): return isinstance(x, dict) and "TERMINATE" == str(x.get("content", ""))[-9:].upper() @with_requirements(python_packages=["requests"], global_imports=["requests"]) def retrieval_from_knowledge_base( query: str, topk: int ) -> str: """ Retrieval for knowledge from the knowledge base based on the specified query and returns the topk results. Parameters: query (str): The query for knowledge retrieval. topk (int): The number of top results to return, default is 3. Returns: str: The result of the knowledge retrieval in JSON format. """ url = 'http://127.0.0.1:7080/v1/chat-messages' headers = { 'Authorization': f'Bearer app-uJgo3TQKcS1O9PMCDHko71Fp', 'Content-Type': 'application/json' } data = { "inputs": {"topK": topk}, "query": query, "response_mode": "blocking", "user": "tangger", "files": [] } response = requests.post(url, headers=headers, json=data) if response.status_code == 524: print("Server is not responding. Please try again later. Maybe GPU was down in the container.") return None try: result = response.json() except ValueError: return [{"error": "Response is not in JSON format"}] useful_results = [] try: answer = eval(result.get("answer", "[]")) for item in answer: metadata = item.get("metadata", {}) useful_info = { "id": metadata.get("document_id"), "title": item.get("title"), "content": item.get("content"), "metadata": None, "embedding": None, "score": metadata.get("score") } useful_results.append(useful_info) except Exception as e: return [{"error": f"Error processing result: {e}", "status": "TERMINATE"}] if useful_results == []: useful_results = "NULL" return str(useful_results) @with_requirements(python_packages=["graphrag", "graphrag_api"], global_imports=["graphrag", "graphrag_api"]) def retrieval_from_graphrag(query: str,) -> str: from graphrag_api.search import SearchRunner search_runner = SearchRunner(root_dir="/home/ubuntu/workplace/LYT/llm-agent/psk-graphrag") result = search_runner.remove_sources(search_runner.run_local_search(query=query)) return result @with_requirements(python_packages=["requests"], global_imports=["requests", "urllib.parse"]) def web_searcher(query: str): from urllib.parse import quote encoded_query = quote(query) url = 'https://s.jina.ai/' + encoded_query headers = { 'Authorization': 'Bearer jina_8fc99db105ed48d7ab6a76ecd117e8e9GkNoWHY3tbmIMYSwQ6GAgzmr6c7z', 'X-Return-Format': 'markdown' } response = requests.get(url, headers=headers) # print(response.text) return str(response.text) @with_requirements(python_packages=["requests", "mp-api"], global_imports=["requests", "mp-api"]) def mp_database(api_key: str, query: str): """ Query the Materials Project database using an API key and a material ID. Parameters: api_key (str): The API key used to access the Materials Project database. query (str): The material ID to query in the database. Returns: list: A list of document objects related to the material ID query. """ from mp_api.client import MPRester import requests.exceptions # Validate API key and query parameters if not isinstance(api_key, str) or not api_key.strip(): raise ValueError("The API key must be a non-empty string.") if not isinstance(query, str) or not query.strip(): raise ValueError("The query must be a non-empty string.") try: # Accessing the MPRester with provided API key with MPRester(api_key) as mpr: # Search for the material based on the query docs = mpr.materials.summary.search( material_ids=[query] ) # Check if any documents are returned if not docs: return f"No results found for the given material ID: {query}" return docs except requests.exceptions.HTTPError as http_err: return f"HTTP error occurred: {http_err}" except requests.exceptions.ConnectionError as conn_err: return f"Connection error occurred: {conn_err}" except requests.exceptions.Timeout as timeout_err: return f"Request timed out: {timeout_err}" except requests.exceptions.RequestException as req_err: return f"An error occurred while making the request: {req_err}" except Exception as e: return f"An unexpected error occurred: {e}" @with_requirements(python_packages=["requests"], global_imports=["requests"]) def oqdm_database(query: str): """ Query the OQMD (Open Quantum Materials Database) to retrieve information about a material composition. Parameters: query (str): The material composition to query in the database. Returns: dict: A dictionary containing information related to the queried material composition. """ import requests # Validate query parameter if not isinstance(query, str) or not query.strip(): raise ValueError("The query must be a non-empty string.") url = f"https://oqmd.org/materials/composition/{query}" try: # Make the request to the OQMD API response = requests.get(url) response.raise_for_status() # Parse the response data = response.json() # Check if data is available if not data: return f"No results found for the given material composition: {query}" return data except requests.exceptions.HTTPError as http_err: return f"HTTP error occurred: {http_err}" except requests.exceptions.ConnectionError as conn_err: return f"Connection error occurred: {conn_err}" except requests.exceptions.Timeout as timeout_err: return f"Request timed out: {timeout_err}" except requests.exceptions.RequestException as req_err: return f"An error occurred while making the request: {req_err}" except Exception as e: return f"An unexpected error occurred: {e}" @with_requirements(python_packages=["requests"], global_imports=["requests"]) def aflow_database(query: list[str]): """ Query the AFLOW database to retrieve information about materials based on a list of species. Parameters: query (list[str]): A list of chemical species to query in the AFLOW database. Returns: dict: A dictionary containing information related to the queried species. """ import requests # Validate query parameter if not isinstance(query, list) or not query: raise ValueError("The query must be a non-empty list of species.") if not all(isinstance(species, str) and species.strip() for species in query): raise ValueError("All species in the query must be non-empty strings.") query_str = ",".join(query) url = f"https://aflowlib.duke.edu/search/ui/API/aflux/?species({query_str}),$catalog(ICSD),$paging(1,1000)" try: # Make the request to the AFLOW API response = requests.get(url) response.raise_for_status() # Parse the response data = response.json() # Check if data is available if not data: return f"No results found for the given species: {query_str}" return data except requests.exceptions.HTTPError as http_err: return f"HTTP error occurred: {http_err}" except requests.exceptions.ConnectionError as conn_err: return f"Connection error occurred: {conn_err}" except requests.exceptions.Timeout as timeout_err: return f"Request timed out: {timeout_err}" except requests.exceptions.RequestException as req_err: return f"An error occurred while making the request: {req_err}" except Exception as e: return f"An unexpected error occurred: {e}" @with_requirements(python_packages=["requests"], global_imports=["socket, json, re"]) def send_instruction_to_robot(json_data: str): import socket import json import re # 去掉可能存在的 ```json 和 ``` 标记 json_data_cleaned = re.sub(r'```json|```', '', json_data).strip() try: # 尝试解析清理后的JSON数据 data = json.loads(json_data_cleaned) except json.JSONDecodeError as e: print(f"JSON解析错误: {e}") return # 创建UDP套接字 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # 目标地址和端口 server_address = ('172.20.103.79', 10000) try: # 序列化为JSON字符串并编码为字节 json_bytes = json.dumps(data).encode('utf-8') # 发送数据 sock.sendto(json_bytes, server_address) print("指令发送成功") except Exception as e: print(f"发送数据时发生错误: {e}") finally: # 关闭套接字 sock.close() @with_requirements(python_packages=["pandas"], global_imports=["pandas", "os", "glob"]) def get_uv_latest_file(): import os import glob # UV数据缓存文件夹路径 (请将此路径修改为实际的文件夹路径) current_folder = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) folder_path = os.path.join(current_folder, 'data/UV/') # 查找文件夹中的所有 .wls 文件 uv_files = sorted(glob.glob(os.path.join(folder_path, '*.[Tt][Xx][Tt]'))) if not uv_files: res = f"ERROR: 缓存文件夹{current_folder}中没有找到任何UV文件" return res # 找到最新修改的文件 latest_file = uv_files[-1] res = f"找到最新的UV数据文件: {latest_file}" return res @with_requirements(python_packages=["pandas"], global_imports=["pandas", "os", "glob"]) def get_max_uv_wavelength_from_txt(latest_file_path: str): import pandas as pd import os # 文件检查 if not os.path.isfile(latest_file_path): res = "ERROR: 指定的文件不存在" return res # 打开并读取最新文件 with open(latest_file_path, 'r') as file: lines = file.readlines() # 找到数据开始的行号 data_start_index = -1 for i, line in enumerate(lines): if "Wavelength Scan Data Record" in line: data_start_index = i + 2 # 数据从该行的下两行开始 break if data_start_index == -1: res = "ERROR: 无法找到数据记录部分" return res # 解析数据并构建表格 data = [] for line in lines[data_start_index:]: parts = line.split() if len(parts) == 7: # 保证每行有7列数据 no, wavelength, abs_value, trans, energy, energy_100, energy_0 = parts try: data.append({ 'No': int(no), 'Wavelength(nm)': float(wavelength), 'Abs': float(abs_value), 'Trans(%T)': float(trans), 'Energy': float(energy), 'Energy(100%T)': float(energy_100), 'Energy(0%T)': float(energy_0) }) except ValueError: print(f"跳过无法解析的行: {line}") if not data: res = "ERROR: 未解析到任何有效数据" return res # 构建DataFrame df = pd.DataFrame(data) # 找到Abs值最大的行 max_abs_row = df.loc[df['Abs'].idxmax()] # 获取最大Abs值对应的波长 max_abs_wavelength = max_abs_row['Wavelength(nm)'] res = f"本次实验的UV波长为: {max_abs_wavelength} nm" print(res) return res @with_requirements(python_packages=["pandas"], global_imports=["pandas", "os", "glob"]) def get_pl_latest_file(): import os import glob # PL数据缓存文件夹路径 (请将此路径修改为实际的文件夹路径) current_folder = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) folder_path = os.path.join(current_folder, 'data/PL/') # 查找文件夹中的所有 .txt 或 .TXT 文件 pl_files = sorted(glob.glob(os.path.join(folder_path, '*.[Tt][Xx][Tt]'))) if not pl_files: res = f"ERROR: 缓存文件夹{current_folder}中没有找到任何PL文件" return res # 找到最新修改的文件 latest_file = pl_files[-1] res = f"找到最新的PL数据文件: {latest_file}" # print(res) return res @with_requirements(python_packages=["pandas"], global_imports=["pandas", "os", "glob"]) def get_max_pl_peak_from_txt(latest_file_path: str): import pandas as pd import os # 文件检查 if not os.path.isfile(latest_file_path): res = "ERROR: 指定的文件不存在" return res # 打开并读取最新文件 with open(latest_file_path, 'r') as file: lines = file.readlines() # 找到 'Data Points' 开始的行号 data_start_index = -1 for i, line in enumerate(lines): if "Data Points" in line: data_start_index = i + 1 # 数据从该行的下一行开始 break if data_start_index == -1: res = "ERROR: 无法找到数据记录部分" return res # 解析nm和Data数据 data = [] for line in lines[data_start_index:]: parts = line.split() if len(parts) == 2: # 每行应该有2列数据,nm 和 Data try: nm = float(parts[0]) data_value = float(parts[1]) data.append({'nm': nm, 'Data': data_value}) except ValueError: print(f"跳过无法解析的行: {line}") if not data: res = "ERROR: 未解析到任何有效数据" return res # 构建DataFrame df = pd.DataFrame(data) # 找到Data值最大的行 max_data_row = df.loc[df['Data'].idxmax()] # 获取最大Data值对应的nm max_data_nm = max_data_row['nm'] res = f"本次实验的PL峰位为: {max_data_nm} nm" print(res) return res if __name__ == "__main__": # res = web_searcher("how to Synthesis CsPbBr3 nanocubes at room temperature?") # print(res) # res = retrieval_from_knowledge_base("how to Synthesis CsPbBr3 nanocubes at room temperature?", 3) # print(res) # res = get_max_uv_wavelength_from_txt() # print(res) # res = get_max_pl_peak_from_txt() # print(res) # res = retrieval_from_graphrag("how to Synthesis CsPbBr3 nanocubes at room temperature?") # print(res) pass