格式转换

This commit is contained in:
lzy
2025-05-29 20:18:57 +08:00
parent 1156bfdd7c
commit 6c87af5614
14 changed files with 11996 additions and 13 deletions

View File

@@ -0,0 +1,616 @@
import json
import openai
from typing import Dict, Any, List
import time
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
from tqdm import tqdm
import random
import re
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
class ChoiceOptionsGenerator:
def __init__(self, api_key: str, base_url: str, model_name: str, max_workers: int = 20):
self.api_key = api_key
self.base_url = base_url
self.model_name = model_name
self.max_workers = max_workers
self.thread_local = threading.local()
self.lock = threading.Lock()
self.max_retries = 5 # 最大重试次数
def get_client(self):
if not hasattr(self.thread_local, 'client'):
self.thread_local.client = openai.OpenAI(
api_key=self.api_key,
base_url=self.base_url
)
return self.thread_local.client
def create_options_prompt(self, question_data: Dict[str, Any]) -> str:
"""创建生成选项的提示词"""
choice_question = question_data.get("choice_question", "")
correct_option = question_data.get("correct_option", "")
original_question = question_data.get("question", "")
question_type = question_data.get("question_type", "")
# 根据question_type判断题目类型
if question_type == "true_false":
return self._create_true_false_prompt(choice_question, correct_option, original_question)
else:
# 其他所有类型都生成选择题
return self._create_multiple_choice_prompt(choice_question, correct_option, original_question, question_type)
def _create_true_false_prompt(self, question: str, correct_option: str, original_question: str) -> str:
"""创建判断题的提示词"""
return f"""
请为以下判断题生成完整的题目格式。
题目: {question}
正确答案: {correct_option}
原始题目: {original_question}
请按以下要求输出判断题:
1. 将题目转换为一个明确的陈述句
2. 要求学生判断该陈述的正确性
3. 提供标准的判断题格式
输出格式严格按照JSON格式
{{
"question_type": "true_false",
"statement": "需要判断的陈述句",
"options": ["True", "False"],
"correct_answer": "True/False",
"explanation": "答案解释"
}}
"""
def _create_multiple_choice_prompt(self, question: str, correct_option: str, original_question: str, question_type: str) -> str:
"""创建选择题的提示词 - 采用两步法"""
return f"""
你是一个材料科学专业的教育评估专家。请为以下题目生成高质量的选择题选项。
题目: {question}
正确答案: {correct_option}
原始题目: {original_question}
题目类型: {question_type}
**请分两步完成:**
**第一步:打草稿 - 生成10个候选干扰项**
请先分析题目然后生成10个可能的干扰项。要求
- 与正确答案在同一知识领域和格式
- 涵盖不同的错误类型(概念混淆、数值错误、逻辑错误、计算过程中间值等)
- 干扰项首先要把你自己都成功干扰,然后才能干扰学生,否则就没有意义了
- 包含至少7个高难度干扰项需要深入理解才能排除
- 包含1-2个中等难度干扰项
- 包含1-2个相对简单的干扰项
**第二步:精选最佳选项**
从10个候选项中选择3个最佳干扰项要求
- 至少2个是高难度干扰项专业人士也可能犯错
- 避免明显错误或不合理的选项(没有意义,这些学生都是清华北大的顶级学生,考察他们需要相当大的难度才行)
- 确保每个选项都有相当大的迷惑性(首先要能干扰你自己)
**输出格式严格按照JSON格式**
{{
"draft_analysis": {{
"question_analysis": "题目分析和知识点识别",
"correct_answer_analysis": "正确答案的原理解释",
"distractor_strategy": "干扰项设计策略"
}},
"candidate_distractors": [
{{"option": "候选干扰项1", "difficulty": "high/medium/low", "reasoning": "设计理由"}},
{{"option": "候选干扰项2", "difficulty": "high/medium/low", "reasoning": "设计理由"}},
// ... 总共10个候选项
],
"final_selection": {{
"question_type": "multiple_choice",
"options": {{
"A": "选项A内容",
"B": "选项B内容",
"C": "选项C内容",
"D": "选项D内容"
}},
"correct_answer": "A/B/C/D",
"difficulty_distribution": {{
"high_difficulty_count": 2,
"medium_difficulty_count": 1,
"selected_distractors_reasoning": "为什么选择这3个干扰项的详细说明"
}},
"explanation": "正确答案解释及其他选项错误原因分析"
}}
}}
**重要要求:**
1. 确保至少2个干扰项具有高度迷惑性即使是专业人士也需要仔细思考才能排除最低限度的迷惑度是骗过你自己
2. 所有干扰项必须在学术上是合理的概念,不能是胡编乱造
3. 正确答案位置要随机分布不要总是放在A选项、B选项、C选项或D选项
4. 每个干扰项都要有明确的设计理由和难度评估
"""
def generate_options(self, question_data: Dict[str, Any]) -> Dict[str, Any]:
"""为单个题目生成选项,带重试机制"""
for attempt in range(self.max_retries):
try:
result = self._attempt_generate_options(question_data)
# 验证结果质量
if self._validate_options_quality(result, question_data):
return result
else:
if attempt < self.max_retries - 1:
logging.warning(f"{attempt+1}次生成的选项质量不佳,重试中...")
time.sleep(1) # 短暂延迟后重试
continue
except Exception as e:
logging.error(f"{attempt+1}次生成选项失败: {e}")
if attempt < self.max_retries - 1:
time.sleep(2) # 失败后延迟重试
continue
# 所有重试都失败,返回备用选项
logging.error("所有重试都失败,使用备用选项生成")
return self._create_fallback_options(question_data)
def _attempt_generate_options(self, question_data: Dict[str, Any]) -> Dict[str, Any]:
"""单次尝试生成选项"""
client = self.get_client()
prompt = self.create_options_prompt(question_data)
response = client.chat.completions.create(
model=self.model_name,
messages=[
{
"role": "system",
"content": "你是一个材料科学专业的教育评估专家具有丰富的出题经验。你特别擅长设计高质量的干扰项能够创造出既合理又具有高度迷惑性的选项。请严格按照要求的JSON格式输出。"
},
{"role": "user", "content": prompt}
],
temperature=0.8, # 提高温度增加创造性
max_tokens=1500, # 增加token数以支持更详细的分析
top_p=0.9
)
result_text = response.choices[0].message.content.strip()
# 解析JSON结果
json_result = self._extract_json_from_response(result_text)
# 如果是判断题,直接返回
if json_result.get("question_type") == "true_false":
return json_result
# 如果是选择题提取final_selection部分
if "final_selection" in json_result:
return json_result["final_selection"]
else:
return json_result
def _extract_json_from_response(self, response_text: str) -> Dict[str, Any]:
"""从响应文本中提取JSON"""
# 寻找JSON内容
json_start = response_text.find('{')
json_end = response_text.rfind('}') + 1
if json_start == -1 or json_end <= json_start:
raise ValueError("无法在响应中找到JSON格式内容")
json_str = response_text[json_start:json_end]
# 尝试解析JSON
try:
return json.loads(json_str)
except json.JSONDecodeError as e:
# 尝试修复常见的JSON错误
json_str = self._fix_common_json_errors(json_str)
return json.loads(json_str)
def _fix_common_json_errors(self, json_str: str) -> str:
"""修复常见的JSON格式错误"""
# 移除注释
json_str = re.sub(r'//.*', '', json_str)
# 修复尾随逗号
json_str = re.sub(r',\s*}', '}', json_str)
json_str = re.sub(r',\s*]', ']', json_str)
# 确保字符串被正确引用
# 这里可以添加更多修复逻辑
return json_str
def _validate_options_quality(self, result: Dict[str, Any], original_data: Dict[str, Any]) -> bool:
"""验证生成选项的质量"""
if not result:
return False
question_type = result.get("question_type", "")
if question_type == "true_false":
return self._validate_true_false_quality(result)
elif question_type == "multiple_choice":
return self._validate_multiple_choice_quality(result, original_data)
return False
def _validate_true_false_quality(self, result: Dict[str, Any]) -> bool:
"""验证判断题质量"""
required_fields = ["statement", "options", "correct_answer", "explanation"]
# 检查必需字段
if not all(field in result for field in required_fields):
return False
# 检查选项是否为True/False
options = result.get("options", [])
if not (len(options) == 2 and "True" in options and "False" in options):
return False
# 检查正确答案是否有效
correct_answer = result.get("correct_answer", "")
if correct_answer not in ["True", "False"]:
return False
return True
def _validate_multiple_choice_quality(self, result: Dict[str, Any], original_data: Dict[str, Any]) -> bool:
"""验证选择题质量"""
# 检查基本结构
if not all(key in result for key in ["options", "correct_answer", "explanation"]):
return False
options = result.get("options", {})
# 检查是否有4个选项
if len(options) != 4 or not all(label in options for label in ["A", "B", "C", "D"]):
return False
# 检查正确答案是否有效
correct_answer = result.get("correct_answer", "")
if correct_answer not in ["A", "B", "C", "D"]:
return False
# 检查是否包含原始正确答案(放宽检查条件)
original_correct = original_data.get("correct_option", "").strip()
if original_correct:
# 检查是否有选项包含或相似于原始正确答案
found_match = False
for option in options.values():
option_str = str(option).strip()
# 检查完全包含或高度相似
if (original_correct.lower() in option_str.lower() or
option_str.lower() in original_correct.lower() or
self._are_similar_answers(original_correct, option_str)):
found_match = True
break
if not found_match:
logging.warning(f"未找到匹配的原始答案: {original_correct}")
return False
# 检查选项长度(避免过短的选项)
if any(len(str(option).strip()) < 2 for option in options.values()):
return False
# 检查选项是否有重复
option_values = [str(option).strip().lower() for option in options.values()]
if len(set(option_values)) != 4:
return False
return True
def _are_similar_answers(self, answer1: str, answer2: str) -> bool:
"""检查两个答案是否相似"""
# 简单的相似度检查,可以根据需要扩展
answer1_clean = re.sub(r'[^\w\s]', '', answer1.lower()).strip()
answer2_clean = re.sub(r'[^\w\s]', '', answer2.lower()).strip()
# 检查关键词重叠
words1 = set(answer1_clean.split())
words2 = set(answer2_clean.split())
if len(words1) == 0 or len(words2) == 0:
return False
overlap = len(words1.intersection(words2))
similarity = overlap / min(len(words1), len(words2))
return similarity > 0.6 # 60%相似度阈值
def _create_fallback_options(self, question_data: Dict[str, Any]) -> Dict[str, Any]:
"""当AI生成失败时的备用选项生成"""
question_type = question_data.get("question_type", "")
correct_option = question_data.get("correct_option", "")
# 根据question_type生成相应的备用选项
if question_type == "true_false":
return {
"question_type": "true_false",
"statement": question_data.get("choice_question", ""),
"options": ["True", "False"],
"correct_answer": self._determine_true_false_answer(correct_option),
"explanation": "基于题目分析的判断结果"
}
else:
# 其他类型都生成选择题
distractors = self._generate_rule_based_distractors(correct_option)
all_options = [correct_option] + distractors
random.shuffle(all_options)
# 找到正确答案的位置
correct_index = all_options.index(correct_option)
correct_label = ["A", "B", "C", "D"][correct_index]
return {
"question_type": "multiple_choice",
"options": {
"A": all_options[0],
"B": all_options[1],
"C": all_options[2],
"D": all_options[3]
},
"correct_answer": correct_label,
"explanation": "基于规则生成的备用选项",
"generated_by": "fallback_rules"
}
def _determine_true_false_answer(self, correct_option: str) -> str:
"""确定判断题的正确答案"""
true_indicators = ["true", "正确", "", "", "T", "", "yes"]
false_indicators = ["false", "错误", "", "", "F", "×", "no"]
correct_lower = correct_option.lower().strip()
# 检查是否包含True相关的词汇
if any(indicator in correct_lower for indicator in true_indicators):
return "True"
# 检查是否包含False相关的词汇
elif any(indicator in correct_lower for indicator in false_indicators):
return "False"
else:
# 如果无法确定默认返回True
return "True"
def _generate_rule_based_distractors(self, correct_answer: str) -> List[str]:
"""基于规则生成干扰项"""
distractors = []
# 尝试识别答案类型并生成相应的干扰项
if self._is_numeric_answer(correct_answer):
distractors = self._generate_numeric_distractors(correct_answer)
elif self._is_structure_name(correct_answer):
distractors = self._generate_structure_distractors(correct_answer)
elif self._is_material_property(correct_answer):
distractors = self._generate_property_distractors(correct_answer)
else:
# 通用干扰项
distractors = [
f"Alternative option 1",
f"Alternative option 2",
f"Alternative option 3"
]
# 确保返回3个干扰项
return distractors[:3]
def _is_numeric_answer(self, answer: str) -> bool:
"""检查答案是否为数值型"""
return bool(re.search(r'\d+\.?\d*', answer))
def _is_structure_name(self, answer: str) -> bool:
"""检查答案是否为结构名称"""
structure_keywords = ["cubic", "hexagonal", "tetragonal", "orthorhombic", "bcc", "fcc", "hcp"]
return any(keyword in answer.lower() for keyword in structure_keywords)
def _is_material_property(self, answer: str) -> bool:
"""检查答案是否为材料属性"""
property_keywords = ["strength", "hardness", "ductility", "brittleness", "conductivity", "elastic"]
return any(keyword in answer.lower() for keyword in property_keywords)
def _generate_numeric_distractors(self, correct_answer: str) -> List[str]:
"""生成数值型干扰项"""
# 提取数值
numbers = re.findall(r'\d+\.?\d*', correct_answer)
if not numbers:
return ["Option B", "Option C", "Option D"]
base_num = float(numbers[0])
unit = correct_answer.replace(numbers[0], "").strip()
distractors = [
f"{base_num * 0.5:.2f} {unit}".strip(),
f"{base_num * 2:.2f} {unit}".strip(),
f"{base_num * 1.5:.2f} {unit}".strip()
]
return distractors
def _generate_structure_distractors(self, correct_answer: str) -> List[str]:
"""生成结构名称型干扰项"""
all_structures = [
"simple cubic", "body-centered cubic", "face-centered cubic",
"hexagonal close-packed", "diamond cubic", "tetragonal",
"orthorhombic", "monoclinic", "triclinic"
]
distractors = [s for s in all_structures if s.lower() != correct_answer.lower()]
return random.sample(distractors, min(3, len(distractors)))
def _generate_property_distractors(self, correct_answer: str) -> List[str]:
"""生成材料属性型干扰项"""
all_properties = [
"high strength", "low strength", "high ductility", "brittleness",
"high hardness", "low hardness", "high toughness", "low toughness",
"high elasticity", "low elasticity", "high conductivity", "low conductivity"
]
distractors = [p for p in all_properties if p.lower() != correct_answer.lower()]
return random.sample(distractors, min(3, len(distractors)))
def process_single_question(generator, question, question_index):
"""处理单个题目的函数"""
try:
# 生成选项
options_data = generator.generate_options(question)
# 合并到原题目数据
complete_question = question.copy()
complete_question["generated_options"] = options_data
complete_question["generation_status"] = "success"
complete_question["question_index"] = question_index # 保持原始顺序
return complete_question
except Exception as e:
logging.error(f"{question_index+1}题处理失败: {e}")
# 添加失败标记
failed_question = question.copy()
failed_question["generated_options"] = generator._create_fallback_options(question)
failed_question["generation_status"] = "failed"
failed_question["error_message"] = str(e)
failed_question["question_index"] = question_index
return failed_question
def main():
# 配置信息
API_KEY = "sk-oYh3Xrhg8oDY2gW02c966f31C84449Ad86F9Cd9dF6E64a8d"
BASE_URL = "https://vip.apiyi.com/v1"
MODEL_NAME = "deepseek-chat"
# MODEL_NAME = "claude-sonnet-4-20250514"
MAX_WORKERS = 20 # 线程数
INPUT_FILE = "/home/ubuntu/50T/LYT/MatBench/layer2/PGEE/code/stepx_filtered_high_quality_questions.json"
OUTPUT_FILE = "/home/ubuntu/50T/LYT/MatBench/layer2/PGEE/code/stepy_complete_choice_questions.json"
# 加载数据
print("正在加载数据...")
with open(INPUT_FILE, 'r', encoding='utf-8') as f:
questions = json.load(f)
import random
random.shuffle(questions) # 打乱题目顺序,增加多样性
questions = questions[:200] # 调试期间只处理前200道题目
print(f"加载了 {len(questions)} 道题目")
# 统计题目类型分布
type_counts = {}
for q in questions:
qtype = q.get("question_type", "unknown")
type_counts[qtype] = type_counts.get(qtype, 0) + 1
print("题目类型分布:")
for qtype, count in type_counts.items():
print(f" {qtype}: {count}")
# 初始化生成器
generator = ChoiceOptionsGenerator(API_KEY, BASE_URL, MODEL_NAME, MAX_WORKERS)
# 多线程处理题目
complete_questions = []
processed_count = 0
# 使用ThreadPoolExecutor进行并发处理
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
# 提交所有任务
future_to_question = {
executor.submit(process_single_question, generator, question, i): (question, i)
for i, question in enumerate(questions)
}
# 使用tqdm显示进度
with tqdm(total=len(questions), desc="生成选项") as pbar:
# 收集结果
temp_results = []
for future in as_completed(future_to_question):
try:
result = future.result()
temp_results.append(result)
processed_count += 1
pbar.update(1)
# # 每处理100题保存一次中间结果
# if processed_count % 100 == 0:
# # 按原始顺序排序临时结果
# temp_results_sorted = sorted(temp_results, key=lambda x: x.get("question_index", 0))
# temp_file = OUTPUT_FILE.replace('.json', f'_temp_{processed_count}.json')
# with open(temp_file, 'w', encoding='utf-8') as f:
# json.dump(temp_results_sorted, f, ensure_ascii=False, indent=2)
# print(f"\n已保存中间结果到 {temp_file}")
except Exception as e:
logging.error(f"处理结果时发生错误: {e}")
original_question, question_index = future_to_question[future]
# 创建失败结果
failed_result = original_question.copy()
failed_result["generated_options"] = generator._create_fallback_options(original_question)
failed_result["generation_status"] = "processing_failed"
failed_result["error_message"] = str(e)
failed_result["question_index"] = question_index
temp_results.append(failed_result)
processed_count += 1
pbar.update(1)
# 按原始顺序排序结果
complete_questions = sorted(temp_results, key=lambda x: x.get("question_index", 0))
# 移除临时的索引字段
for question in complete_questions:
if "question_index" in question:
del question["question_index"]
# 保存最终结果
with open(OUTPUT_FILE, 'w', encoding='utf-8') as f:
json.dump(complete_questions, f, ensure_ascii=False, indent=2)
# 统计结果
success_count = sum(1 for q in complete_questions if q.get("generation_status") == "success")
failed_count = len(complete_questions) - success_count
print(f"\n完成!总共处理了 {len(complete_questions)} 道题目")
print(f"成功生成: {success_count}")
print(f"使用备用方案: {failed_count}")
print(f"成功率: {success_count/len(complete_questions)*100:.2f}%")
print(f"结果已保存到: {OUTPUT_FILE}")
# 按题目类型统计结果
type_success = {}
type_total = {}
for q in complete_questions:
qtype = q.get("question_type", "unknown")
type_total[qtype] = type_total.get(qtype, 0) + 1
if q.get("generation_status") == "success":
type_success[qtype] = type_success.get(qtype, 0) + 1
print("\n各题型处理结果:")
for qtype in type_total:
success = type_success.get(qtype, 0)
total = type_total[qtype]
success_rate = success / total * 100 if total > 0 else 0
print(f" {qtype}: {success}/{total} ({success_rate:.1f}%)")
# 详细的失败统计
if failed_count > 0:
failure_reasons = {}
for q in complete_questions:
if q.get("generation_status") != "success":
reason = q.get("error_message", "未知错误")
failure_reasons[reason] = failure_reasons.get(reason, 0) + 1
print("\n失败原因统计:")
for reason, count in failure_reasons.items():
print(f" {reason}: {count} 道题")
if __name__ == "__main__":
main()