""" 0. 将问题从xls提取为json 1. 将问题进行拆分 2. 翻译成英文 3. 去重 4. 使用大模型进行难度评估和筛选 """ import json import time import threading import queue from concurrent.futures import ThreadPoolExecutor from openai import OpenAI import re result_lock = threading.Lock() api_semaphore = threading.Semaphore(5) processed_data =[] error_items = [] API_KEY="sk-oYh3Xrhg8oDY2gW02c966f31C84449Ad86F9Cd9dF6E64a8d" BASE_URL="https://vip.apiyi.com/v1" MODEL_GPT ="deepseek-chat" client = OpenAI(api_key=API_KEY,base_url=BASE_URL) def load_qa_data(file_path): with open(file_path, 'r', encoding='utf-8') as f: data = json.load(f) return data def translate_qa_type(question,answer): prompt = f""" Please strictly translate the following Chinese questions and answers into English, and return the results according to the specified JSON format: Question: {question} Answer: {answer} Translation requirements: - Only translate the Chinese expressions, any additions or modifications to the content are prohibited - Maintain all information points, expressions, and numerical values exactly as in the original text - Keep professional terminology accurate - Return plain text, do not use markdown format Return the translation results according to the following JSON format: [ {{ "question": "Translated English question", "answer": "Translated English answer" }} ] """ with api_semaphore: try: response = client.chat.completions.create( model = MODEL_GPT, messages=[ {"role": "system", "content": "You are an expert translator with extensive knowledge of materials science, tasked with translating Chinese texts into highly accurate English, ensuring the correct usage of scientific terminology."}, {"role": "user", "content": prompt} ], stream=False ) result = response.choices[0].message.content.strip() print(result) process_result = comfirm_json_string(result) return json.loads(process_result) except Exception as e: print(f"API调用错误: {e}") return "2" def comfirm_json_string(json_string): json_string = re.sub(r'[“”]', '"', json_string) json_string = re.sub(r'\\', r'\\\\', json_string) json_string = re.sub(r'\\"', r'\"', json_string) json_string = json_string.replace("\n", "").replace("\r", "") # 去掉 Markdown 的语法包裹 if json_string.startswith("```json"): json_string = json_string.strip("`json\n") json_string = json_string.strip('`\n') return json_string def process_item(item, index, total): print(f"处理第 {index+1}/{total} 条数据...") question = item["question"] answer = item["answer"] data = translate_qa_type(question,answer) with result_lock: if isinstance(data, list): processed_data.append({ "idx": item['idx'], "question": data[0]["question"], "answer": data[0]["answer"] }) else: error_items.append({ "idx": item['idx'], "question": question, "answer": answer }) def save_processed_data(data, output_file): with open(output_file, 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False, indent=2) def main(): input_file = "/home/ubuntu/50T/fsy/layer2/QA/single_select.json" output_file = "/home/ubuntu/50T/fsy/layer2/QA/EN-single_select.json" error_file = "/home/ubuntu/50T/fsy/error.json" data = load_qa_data(input_file) total = len(data) with ThreadPoolExecutor(max_workers=10) as executor: futures = [] for i, item in enumerate(data): future = executor.submit(process_item, item, i, total) futures.append(future) if (i+1) % 10 == 0: time.sleep(1) for future in futures: future.result() save_processed_data(processed_data, output_file) print(f"处理完成,已保存到 {output_file}") if error_items: save_processed_data(error_items, error_file) print(f"处理出错的条目已保存到 {error_file}") if __name__ == "__main__": main()