Compare commits

2 Commits
main ... lyt1

6 changed files with 26555 additions and 0 deletions

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

9378
layer3/data/tmp.json Normal file

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1,282 @@
import json
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
from openai import OpenAI
import re
from tqdm import tqdm
import os
from threading import Lock
class MaterialProductExtractor:
def __init__(self, base_url="", api_key="", model="gpt-3.5-turbo", max_workers=32):
"""
初始化材料产物抽取器
Args:
base_url (str): OpenAI API base URL
api_key (str): OpenAI API key
model (str): 使用的模型名称
max_workers (int): 线程数
"""
self.client = OpenAI(
base_url=base_url,
api_key=api_key
)
self.model = model
self.max_workers = max_workers
self.lock = Lock()
# 存储结果的列表
self.results_with_target = []
self.results_without_target = []
def create_prompt(self, title, abstract):
"""
创建用于抽取最终产物的提示词
Args:
title (str): 文献标题
abstract (str): 文献摘要
Returns:
str: 格式化的提示词
"""
prompt = f"""
你是一个材料科学专家。请分析以下材料科学文献的标题和摘要,抽取该研究的最终产物。
标题: {title}
摘要: {abstract}
请仔细分析文献内容,识别研究中合成、制备或获得的最终材料产物。最终产物通常是:
1. 新合成的化合物或材料
2. 改性后的材料
3. 复合材料
4. 纳米材料
5. 薄膜、涂层等
请以JSON格式返回结果格式如下
{{
"has_target": true/false,
"target_materials": [
{{
"name": "材料名称",
"chemical_formula": "化学式(如果有)",
"description": "材料描述",
"confidence": 0.0-1.0
}}
],
"reasoning": "抽取理由的简要说明"
}}
如果没有明确的最终产物请将has_target设为falsetarget_materials设为空列表。
请确保返回的是有效的JSON格式。
"""
return prompt
def extract_target_from_response(self, response_text):
"""
从LLM响应中提取JSON内容
Args:
response_text (str): LLM的原始响应
Returns:
dict: 解析后的结果字典
"""
try:
# 尝试直接解析JSON
result = json.loads(response_text)
return result
except json.JSONDecodeError:
# 如果直接解析失败尝试提取JSON部分
json_match = re.search(r'\{.*\}', response_text, re.DOTALL)
if json_match:
try:
result = json.loads(json_match.group())
return result
except json.JSONDecodeError:
pass
# 如果还是失败,返回默认结果
return {
"has_target": False,
"target_materials": [],
"reasoning": "解析失败"
}
def process_single_article(self, article_data):
"""
处理单篇文献
Args:
article_data (dict): 包含文献信息的字典
Returns:
dict: 处理后的结果
"""
try:
title = article_data.get("Article Title", "")
abstract = article_data.get("Abstract", "")
if not title and not abstract:
return None
# 创建提示词
prompt = self.create_prompt(title, abstract)
# 调用LLM
response = self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": "你是一个专业的材料科学研究助手,擅长从文献中抽取材料信息。"},
{"role": "user", "content": prompt}
],
temperature=0.3,
max_tokens=1000
)
# 提取响应内容
response_text = response.choices[0].message.content
# 解析结果
extraction_result = self.extract_target_from_response(response_text)
# 构建返回结果
result = article_data.copy()
if extraction_result.get("has_target", False) and extraction_result.get("target_materials"):
# 格式化目标材料信息
target_info = {
"materials": extraction_result["target_materials"],
"reasoning": extraction_result.get("reasoning", ""),
"extraction_confidence": max([m.get("confidence", 0.5) for m in extraction_result["target_materials"]] + [0.5])
}
result["Target"] = target_info
return ("with_target", result)
else:
result["Target"] = None
return ("without_target", result)
except Exception as e:
print(f"处理文献时出错: {e}")
result = article_data.copy()
result["Target"] = None
result["Error"] = str(e)
return ("error", result)
def process_articles(self, input_file, output_with_target, output_without_target):
"""
批量处理文献
Args:
input_file (str): 输入文件路径
output_with_target (str): 有目标产物的输出文件路径
output_without_target (str): 无目标产物的输出文件路径
"""
# 读取输入数据
print("正在读取输入文件...")
with open(input_file, 'r', encoding='utf-8') as f:
articles = json.load(f)
articles = articles # 测试时只处理前10篇文献
print(f"共读取到 {len(articles)} 篇文献")
# 创建进度条
pbar = tqdm(total=len(articles), desc="处理文献")
# 多线程处理
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
# 提交所有任务
future_to_article = {
executor.submit(self.process_single_article, article): i
for i, article in enumerate(articles)
}
# 收集结果
for future in as_completed(future_to_article):
try:
result = future.result()
if result:
result_type, result_data = result
with self.lock:
if result_type == "with_target":
self.results_with_target.append(result_data)
else: # without_target or error
self.results_without_target.append(result_data)
pbar.update(1)
except Exception as e:
print(f"处理结果时出错: {e}")
pbar.update(1)
pbar.close()
# 保存结果
print(f"保存结果: {len(self.results_with_target)} 篇有目标产物, {len(self.results_without_target)} 篇无目标产物")
# 保存有目标产物的文献
if self.results_with_target:
with open(output_with_target, 'w', encoding='utf-8') as f:
json.dump(self.results_with_target, f, ensure_ascii=False, indent=2)
print(f"有目标产物的文献已保存到: {output_with_target}")
# 保存无目标产物的文献
if self.results_without_target:
with open(output_without_target, 'w', encoding='utf-8') as f:
json.dump(self.results_without_target, f, ensure_ascii=False, indent=2)
print(f"无目标产物的文献已保存到: {output_without_target}")
def main():
"""
主函数
"""
# 配置参数 - 请在这里填入您的配置
BASE_URL = "https://vip.apiyi.com/v1" # 请填入您的API base URL
API_KEY = "sk-oYh3Xrhg8oDY2gW02c966f31C84449Ad86F9Cd9dF6E64a8d" # 请填入您的API key
MODEL = "deepseek-chat" # 可以根据需要修改模型
THREAD_COUNT = 32
# 文件路径
INPUT_FILE = "/home/ubuntu/50T/LYT/MatBench/layer3/data/raw_small_data.json"
OUTPUT_WITH_TARGET = "/home/ubuntu/50T/LYT/MatBench/layer3/data/step1_materials_with_target.json"
OUTPUT_WITHOUT_TARGET = "/home/ubuntu/50T/LYT/MatBench/layer3/data/step1_materials_without_target.json"
# 检查输入文件是否存在
if not os.path.exists(INPUT_FILE):
print(f"错误: 输入文件不存在 - {INPUT_FILE}")
return
# 检查API配置
if not BASE_URL or not API_KEY:
print("警告: 请先配置BASE_URL和API_KEY")
print("请在代码中的BASE_URL和API_KEY变量处填入正确的值")
return
# 创建提取器
extractor = MaterialProductExtractor(
base_url=BASE_URL,
api_key=API_KEY,
model=MODEL,
max_workers=THREAD_COUNT
)
# 处理文献
try:
extractor.process_articles(INPUT_FILE, OUTPUT_WITH_TARGET, OUTPUT_WITHOUT_TARGET)
print("处理完成!")
# 打印统计信息
print(f"\n统计信息:")
print(f"- 检测到目标产物的文献: {len(extractor.results_with_target)}")
print(f"- 未检测到目标产物的文献: {len(extractor.results_without_target)}")
print(f"- 总处理文献: {len(extractor.results_with_target) + len(extractor.results_without_target)}")
except Exception as e:
print(f"处理过程中出错: {e}")
if __name__ == "__main__":
main()