Files
MatBench/layer2/PGEE/code/step2_translate.py
2025-05-28 11:00:24 +08:00

137 lines
4.5 KiB
Python

"""
0. 将问题从xls提取为json
1. 将问题进行拆分
2. 翻译成英文
3. 去重
4. 使用大模型进行难度评估和筛选
"""
import json
import time
import threading
import queue
from concurrent.futures import ThreadPoolExecutor
from openai import OpenAI
import re
result_lock = threading.Lock()
api_semaphore = threading.Semaphore(5)
processed_data =[]
error_items = []
API_KEY="sk-oYh3Xrhg8oDY2gW02c966f31C84449Ad86F9Cd9dF6E64a8d"
BASE_URL="https://vip.apiyi.com/v1"
MODEL_GPT ="deepseek-chat"
client = OpenAI(api_key=API_KEY,base_url=BASE_URL)
def load_qa_data(file_path):
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
return data
def translate_qa_type(question,answer):
prompt = f"""
Please strictly translate the following Chinese questions and answers into English, and return the results according to the specified JSON format:
Question: {question}
Answer: {answer}
Translation requirements:
- Only translate the Chinese expressions, any additions or modifications to the content are prohibited
- Maintain all information points, expressions, and numerical values exactly as in the original text
- Keep professional terminology accurate
- Return plain text, do not use markdown format
Return the translation results according to the following JSON format:
[
{{
"question": "Translated English question",
"answer": "Translated English answer"
}}
]
"""
with api_semaphore:
try:
response = client.chat.completions.create(
model = MODEL_GPT,
messages=[
{"role": "system", "content": "You are an expert translator with extensive knowledge of materials science, tasked with translating Chinese texts into highly accurate English, ensuring the correct usage of scientific terminology."},
{"role": "user", "content": prompt}
],
stream=False
)
result = response.choices[0].message.content.strip()
print(result)
process_result = comfirm_json_string(result)
return json.loads(process_result)
except Exception as e:
print(f"API调用错误: {e}")
return "2"
def comfirm_json_string(json_string):
json_string = re.sub(r'[“”]', '"', json_string)
json_string = re.sub(r'\\', r'\\\\', json_string)
json_string = re.sub(r'\\"', r'\"', json_string)
json_string = json_string.replace("\n", "").replace("\r", "")
# 去掉 Markdown 的语法包裹
if json_string.startswith("```json"):
json_string = json_string.strip("`json\n")
json_string = json_string.strip('`\n')
return json_string
def process_item(item, index, total):
print(f"处理第 {index+1}/{total} 条数据...")
question = item["question"]
answer = item["answer"]
data = translate_qa_type(question,answer)
with result_lock:
if isinstance(data, list):
processed_data.append({
"idx": item['idx'],
"question": data[0]["question"],
"answer": data[0]["answer"]
})
else:
error_items.append({
"idx": item['idx'],
"question": question,
"answer": answer
})
def save_processed_data(data, output_file):
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
def main():
input_file = "/home/ubuntu/50T/fsy/layer2/QA/single_select.json"
output_file = "/home/ubuntu/50T/fsy/layer2/QA/EN-single_select.json"
error_file = "/home/ubuntu/50T/fsy/error.json"
data = load_qa_data(input_file)
total = len(data)
with ThreadPoolExecutor(max_workers=10) as executor:
futures = []
for i, item in enumerate(data):
future = executor.submit(process_item, item, i, total)
futures.append(future)
if (i+1) % 10 == 0:
time.sleep(1)
for future in futures:
future.result()
save_processed_data(processed_data, output_file)
print(f"处理完成,已保存到 {output_file}")
if error_items:
save_processed_data(error_items, error_file)
print(f"处理出错的条目已保存到 {error_file}")
if __name__ == "__main__":
main()