import json import openai from typing import Dict, Any, List import time import logging from concurrent.futures import ThreadPoolExecutor, as_completed import threading from tqdm import tqdm import random import re # 配置日志 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') class ChoiceOptionsGenerator: def __init__(self, api_key: str, base_url: str, model_name: str, max_workers: int = 20): self.api_key = api_key self.base_url = base_url self.model_name = model_name self.max_workers = max_workers self.thread_local = threading.local() self.lock = threading.Lock() self.max_retries = 5 # 最大重试次数 def get_client(self): if not hasattr(self.thread_local, 'client'): self.thread_local.client = openai.OpenAI( api_key=self.api_key, base_url=self.base_url ) return self.thread_local.client def create_options_prompt(self, question_data: Dict[str, Any]) -> str: """创建生成选项的提示词""" choice_question = question_data.get("choice_question", "") correct_option = question_data.get("correct_option", "") original_question = question_data.get("question", "") question_type = question_data.get("question_type", "") # 根据question_type判断题目类型 if question_type == "true_false": return self._create_true_false_prompt(choice_question, correct_option, original_question) else: # 其他所有类型都生成选择题 return self._create_multiple_choice_prompt(choice_question, correct_option, original_question, question_type) def _create_true_false_prompt(self, question: str, correct_option: str, original_question: str) -> str: """创建判断题的提示词""" return f""" 请为以下判断题生成完整的题目格式。 题目: {question} 正确答案: {correct_option} 原始题目: {original_question} 请按以下要求输出判断题: 1. 将题目转换为一个明确的陈述句 2. 要求学生判断该陈述的正确性 3. 提供标准的判断题格式 输出格式(严格按照JSON格式): {{ "question_type": "true_false", "statement": "需要判断的陈述句", "options": ["True", "False"], "correct_answer": "True/False", "explanation": "答案解释" }} """ def _create_multiple_choice_prompt(self, question: str, correct_option: str, original_question: str, question_type: str) -> str: """创建选择题的提示词 - 采用两步法""" return f""" 你是一个材料科学专业的教育评估专家。请为以下题目生成高质量的选择题选项。 题目: {question} 正确答案: {correct_option} 原始题目: {original_question} 题目类型: {question_type} **请分两步完成:** **第一步:打草稿 - 生成10个候选干扰项** 请先分析题目,然后生成10个可能的干扰项。要求: - 与正确答案在同一知识领域和格式 - 涵盖不同的错误类型(概念混淆、数值错误、逻辑错误、计算过程中间值等) - 干扰项首先要把你自己都成功干扰,然后才能干扰学生,否则就没有意义了 - 包含至少7个高难度干扰项(需要深入理解才能排除) - 包含1-2个中等难度干扰项 - 包含1-2个相对简单的干扰项 **第二步:精选最佳选项** 从10个候选项中选择3个最佳干扰项,要求: - 至少2个是高难度干扰项(专业人士也可能犯错) - 避免明显错误或不合理的选项(没有意义,这些学生都是清华北大的顶级学生,考察他们需要相当大的难度才行) - 确保每个选项都有相当大的迷惑性(首先要能干扰你自己) **输出格式(严格按照JSON格式):** {{ "draft_analysis": {{ "question_analysis": "题目分析和知识点识别", "correct_answer_analysis": "正确答案的原理解释", "distractor_strategy": "干扰项设计策略" }}, "candidate_distractors": [ {{"option": "候选干扰项1", "difficulty": "high/medium/low", "reasoning": "设计理由"}}, {{"option": "候选干扰项2", "difficulty": "high/medium/low", "reasoning": "设计理由"}}, // ... 总共10个候选项 ], "final_selection": {{ "question_type": "multiple_choice", "options": {{ "A": "选项A内容", "B": "选项B内容", "C": "选项C内容", "D": "选项D内容" }}, "correct_answer": "A/B/C/D", "difficulty_distribution": {{ "high_difficulty_count": 2, "medium_difficulty_count": 1, "selected_distractors_reasoning": "为什么选择这3个干扰项的详细说明" }}, "explanation": "正确答案解释及其他选项错误原因分析" }} }} **重要要求:** 1. 确保至少2个干扰项具有高度迷惑性,即使是专业人士也需要仔细思考才能排除,最低限度的迷惑度是骗过你自己 2. 所有干扰项必须在学术上是合理的概念,不能是胡编乱造 3. 正确答案位置要随机分布,不要总是放在A选项、B选项、C选项或D选项 4. 每个干扰项都要有明确的设计理由和难度评估 """ def generate_options(self, question_data: Dict[str, Any]) -> Dict[str, Any]: """为单个题目生成选项,带重试机制""" for attempt in range(self.max_retries): try: result = self._attempt_generate_options(question_data) # 验证结果质量 if self._validate_options_quality(result, question_data): return result else: if attempt < self.max_retries - 1: logging.warning(f"第{attempt+1}次生成的选项质量不佳,重试中...") time.sleep(1) # 短暂延迟后重试 continue except Exception as e: logging.error(f"第{attempt+1}次生成选项失败: {e}") if attempt < self.max_retries - 1: time.sleep(2) # 失败后延迟重试 continue # 所有重试都失败,返回备用选项 logging.error("所有重试都失败,使用备用选项生成") return self._create_fallback_options(question_data) def _attempt_generate_options(self, question_data: Dict[str, Any]) -> Dict[str, Any]: """单次尝试生成选项""" client = self.get_client() prompt = self.create_options_prompt(question_data) response = client.chat.completions.create( model=self.model_name, messages=[ { "role": "system", "content": "你是一个材料科学专业的教育评估专家,具有丰富的出题经验。你特别擅长设计高质量的干扰项,能够创造出既合理又具有高度迷惑性的选项。请严格按照要求的JSON格式输出。" }, {"role": "user", "content": prompt} ], temperature=0.8, # 提高温度增加创造性 max_tokens=1500, # 增加token数以支持更详细的分析 top_p=0.9 ) result_text = response.choices[0].message.content.strip() # 解析JSON结果 json_result = self._extract_json_from_response(result_text) # 如果是判断题,直接返回 if json_result.get("question_type") == "true_false": return json_result # 如果是选择题,提取final_selection部分 if "final_selection" in json_result: return json_result["final_selection"] else: return json_result def _extract_json_from_response(self, response_text: str) -> Dict[str, Any]: """从响应文本中提取JSON""" # 寻找JSON内容 json_start = response_text.find('{') json_end = response_text.rfind('}') + 1 if json_start == -1 or json_end <= json_start: raise ValueError("无法在响应中找到JSON格式内容") json_str = response_text[json_start:json_end] # 尝试解析JSON try: return json.loads(json_str) except json.JSONDecodeError as e: # 尝试修复常见的JSON错误 json_str = self._fix_common_json_errors(json_str) return json.loads(json_str) def _fix_common_json_errors(self, json_str: str) -> str: """修复常见的JSON格式错误""" # 移除注释 json_str = re.sub(r'//.*', '', json_str) # 修复尾随逗号 json_str = re.sub(r',\s*}', '}', json_str) json_str = re.sub(r',\s*]', ']', json_str) # 确保字符串被正确引用 # 这里可以添加更多修复逻辑 return json_str def _validate_options_quality(self, result: Dict[str, Any], original_data: Dict[str, Any]) -> bool: """验证生成选项的质量""" if not result: return False question_type = result.get("question_type", "") if question_type == "true_false": return self._validate_true_false_quality(result) elif question_type == "multiple_choice": return self._validate_multiple_choice_quality(result, original_data) return False def _validate_true_false_quality(self, result: Dict[str, Any]) -> bool: """验证判断题质量""" required_fields = ["statement", "options", "correct_answer", "explanation"] # 检查必需字段 if not all(field in result for field in required_fields): return False # 检查选项是否为True/False options = result.get("options", []) if not (len(options) == 2 and "True" in options and "False" in options): return False # 检查正确答案是否有效 correct_answer = result.get("correct_answer", "") if correct_answer not in ["True", "False"]: return False return True def _validate_multiple_choice_quality(self, result: Dict[str, Any], original_data: Dict[str, Any]) -> bool: """验证选择题质量""" # 检查基本结构 if not all(key in result for key in ["options", "correct_answer", "explanation"]): return False options = result.get("options", {}) # 检查是否有4个选项 if len(options) != 4 or not all(label in options for label in ["A", "B", "C", "D"]): return False # 检查正确答案是否有效 correct_answer = result.get("correct_answer", "") if correct_answer not in ["A", "B", "C", "D"]: return False # 检查是否包含原始正确答案(放宽检查条件) original_correct = original_data.get("correct_option", "").strip() if original_correct: # 检查是否有选项包含或相似于原始正确答案 found_match = False for option in options.values(): option_str = str(option).strip() # 检查完全包含或高度相似 if (original_correct.lower() in option_str.lower() or option_str.lower() in original_correct.lower() or self._are_similar_answers(original_correct, option_str)): found_match = True break if not found_match: logging.warning(f"未找到匹配的原始答案: {original_correct}") return False # 检查选项长度(避免过短的选项) if any(len(str(option).strip()) < 2 for option in options.values()): return False # 检查选项是否有重复 option_values = [str(option).strip().lower() for option in options.values()] if len(set(option_values)) != 4: return False return True def _are_similar_answers(self, answer1: str, answer2: str) -> bool: """检查两个答案是否相似""" # 简单的相似度检查,可以根据需要扩展 answer1_clean = re.sub(r'[^\w\s]', '', answer1.lower()).strip() answer2_clean = re.sub(r'[^\w\s]', '', answer2.lower()).strip() # 检查关键词重叠 words1 = set(answer1_clean.split()) words2 = set(answer2_clean.split()) if len(words1) == 0 or len(words2) == 0: return False overlap = len(words1.intersection(words2)) similarity = overlap / min(len(words1), len(words2)) return similarity > 0.6 # 60%相似度阈值 def _create_fallback_options(self, question_data: Dict[str, Any]) -> Dict[str, Any]: """当AI生成失败时的备用选项生成""" question_type = question_data.get("question_type", "") correct_option = question_data.get("correct_option", "") # 根据question_type生成相应的备用选项 if question_type == "true_false": return { "question_type": "true_false", "statement": question_data.get("choice_question", ""), "options": ["True", "False"], "correct_answer": self._determine_true_false_answer(correct_option), "explanation": "基于题目分析的判断结果" } else: # 其他类型都生成选择题 distractors = self._generate_rule_based_distractors(correct_option) all_options = [correct_option] + distractors random.shuffle(all_options) # 找到正确答案的位置 correct_index = all_options.index(correct_option) correct_label = ["A", "B", "C", "D"][correct_index] return { "question_type": "multiple_choice", "options": { "A": all_options[0], "B": all_options[1], "C": all_options[2], "D": all_options[3] }, "correct_answer": correct_label, "explanation": "基于规则生成的备用选项", "generated_by": "fallback_rules" } def _determine_true_false_answer(self, correct_option: str) -> str: """确定判断题的正确答案""" true_indicators = ["true", "正确", "是", "对", "T", "√", "yes"] false_indicators = ["false", "错误", "否", "错", "F", "×", "no"] correct_lower = correct_option.lower().strip() # 检查是否包含True相关的词汇 if any(indicator in correct_lower for indicator in true_indicators): return "True" # 检查是否包含False相关的词汇 elif any(indicator in correct_lower for indicator in false_indicators): return "False" else: # 如果无法确定,默认返回True return "True" def _generate_rule_based_distractors(self, correct_answer: str) -> List[str]: """基于规则生成干扰项""" distractors = [] # 尝试识别答案类型并生成相应的干扰项 if self._is_numeric_answer(correct_answer): distractors = self._generate_numeric_distractors(correct_answer) elif self._is_structure_name(correct_answer): distractors = self._generate_structure_distractors(correct_answer) elif self._is_material_property(correct_answer): distractors = self._generate_property_distractors(correct_answer) else: # 通用干扰项 distractors = [ f"Alternative option 1", f"Alternative option 2", f"Alternative option 3" ] # 确保返回3个干扰项 return distractors[:3] def _is_numeric_answer(self, answer: str) -> bool: """检查答案是否为数值型""" return bool(re.search(r'\d+\.?\d*', answer)) def _is_structure_name(self, answer: str) -> bool: """检查答案是否为结构名称""" structure_keywords = ["cubic", "hexagonal", "tetragonal", "orthorhombic", "bcc", "fcc", "hcp"] return any(keyword in answer.lower() for keyword in structure_keywords) def _is_material_property(self, answer: str) -> bool: """检查答案是否为材料属性""" property_keywords = ["strength", "hardness", "ductility", "brittleness", "conductivity", "elastic"] return any(keyword in answer.lower() for keyword in property_keywords) def _generate_numeric_distractors(self, correct_answer: str) -> List[str]: """生成数值型干扰项""" # 提取数值 numbers = re.findall(r'\d+\.?\d*', correct_answer) if not numbers: return ["Option B", "Option C", "Option D"] base_num = float(numbers[0]) unit = correct_answer.replace(numbers[0], "").strip() distractors = [ f"{base_num * 0.5:.2f} {unit}".strip(), f"{base_num * 2:.2f} {unit}".strip(), f"{base_num * 1.5:.2f} {unit}".strip() ] return distractors def _generate_structure_distractors(self, correct_answer: str) -> List[str]: """生成结构名称型干扰项""" all_structures = [ "simple cubic", "body-centered cubic", "face-centered cubic", "hexagonal close-packed", "diamond cubic", "tetragonal", "orthorhombic", "monoclinic", "triclinic" ] distractors = [s for s in all_structures if s.lower() != correct_answer.lower()] return random.sample(distractors, min(3, len(distractors))) def _generate_property_distractors(self, correct_answer: str) -> List[str]: """生成材料属性型干扰项""" all_properties = [ "high strength", "low strength", "high ductility", "brittleness", "high hardness", "low hardness", "high toughness", "low toughness", "high elasticity", "low elasticity", "high conductivity", "low conductivity" ] distractors = [p for p in all_properties if p.lower() != correct_answer.lower()] return random.sample(distractors, min(3, len(distractors))) def process_single_question(generator, question, question_index): """处理单个题目的函数""" try: # 生成选项 options_data = generator.generate_options(question) # 合并到原题目数据 complete_question = question.copy() complete_question["generated_options"] = options_data complete_question["generation_status"] = "success" complete_question["question_index"] = question_index # 保持原始顺序 return complete_question except Exception as e: logging.error(f"第{question_index+1}题处理失败: {e}") # 添加失败标记 failed_question = question.copy() failed_question["generated_options"] = generator._create_fallback_options(question) failed_question["generation_status"] = "failed" failed_question["error_message"] = str(e) failed_question["question_index"] = question_index return failed_question def main(): # 配置信息 API_KEY = "sk-oYh3Xrhg8oDY2gW02c966f31C84449Ad86F9Cd9dF6E64a8d" BASE_URL = "https://vip.apiyi.com/v1" MODEL_NAME = "deepseek-chat" # MODEL_NAME = "claude-sonnet-4-20250514" MAX_WORKERS = 20 # 线程数 INPUT_FILE = "/home/ubuntu/50T/LYT/MatBench/layer2/PGEE/code/stepx_filtered_high_quality_questions.json" OUTPUT_FILE = "/home/ubuntu/50T/LYT/MatBench/layer2/PGEE/code/stepy_complete_choice_questions.json" # 加载数据 print("正在加载数据...") with open(INPUT_FILE, 'r', encoding='utf-8') as f: questions = json.load(f) import random random.shuffle(questions) # 打乱题目顺序,增加多样性 questions = questions[:200] # 调试期间只处理前200道题目 print(f"加载了 {len(questions)} 道题目") # 统计题目类型分布 type_counts = {} for q in questions: qtype = q.get("question_type", "unknown") type_counts[qtype] = type_counts.get(qtype, 0) + 1 print("题目类型分布:") for qtype, count in type_counts.items(): print(f" {qtype}: {count} 道") # 初始化生成器 generator = ChoiceOptionsGenerator(API_KEY, BASE_URL, MODEL_NAME, MAX_WORKERS) # 多线程处理题目 complete_questions = [] processed_count = 0 # 使用ThreadPoolExecutor进行并发处理 with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: # 提交所有任务 future_to_question = { executor.submit(process_single_question, generator, question, i): (question, i) for i, question in enumerate(questions) } # 使用tqdm显示进度 with tqdm(total=len(questions), desc="生成选项") as pbar: # 收集结果 temp_results = [] for future in as_completed(future_to_question): try: result = future.result() temp_results.append(result) processed_count += 1 pbar.update(1) # # 每处理100题保存一次中间结果 # if processed_count % 100 == 0: # # 按原始顺序排序临时结果 # temp_results_sorted = sorted(temp_results, key=lambda x: x.get("question_index", 0)) # temp_file = OUTPUT_FILE.replace('.json', f'_temp_{processed_count}.json') # with open(temp_file, 'w', encoding='utf-8') as f: # json.dump(temp_results_sorted, f, ensure_ascii=False, indent=2) # print(f"\n已保存中间结果到 {temp_file}") except Exception as e: logging.error(f"处理结果时发生错误: {e}") original_question, question_index = future_to_question[future] # 创建失败结果 failed_result = original_question.copy() failed_result["generated_options"] = generator._create_fallback_options(original_question) failed_result["generation_status"] = "processing_failed" failed_result["error_message"] = str(e) failed_result["question_index"] = question_index temp_results.append(failed_result) processed_count += 1 pbar.update(1) # 按原始顺序排序结果 complete_questions = sorted(temp_results, key=lambda x: x.get("question_index", 0)) # 移除临时的索引字段 for question in complete_questions: if "question_index" in question: del question["question_index"] # 保存最终结果 with open(OUTPUT_FILE, 'w', encoding='utf-8') as f: json.dump(complete_questions, f, ensure_ascii=False, indent=2) # 统计结果 success_count = sum(1 for q in complete_questions if q.get("generation_status") == "success") failed_count = len(complete_questions) - success_count print(f"\n完成!总共处理了 {len(complete_questions)} 道题目") print(f"成功生成: {success_count} 道") print(f"使用备用方案: {failed_count} 道") print(f"成功率: {success_count/len(complete_questions)*100:.2f}%") print(f"结果已保存到: {OUTPUT_FILE}") # 按题目类型统计结果 type_success = {} type_total = {} for q in complete_questions: qtype = q.get("question_type", "unknown") type_total[qtype] = type_total.get(qtype, 0) + 1 if q.get("generation_status") == "success": type_success[qtype] = type_success.get(qtype, 0) + 1 print("\n各题型处理结果:") for qtype in type_total: success = type_success.get(qtype, 0) total = type_total[qtype] success_rate = success / total * 100 if total > 0 else 0 print(f" {qtype}: {success}/{total} ({success_rate:.1f}%)") # 详细的失败统计 if failed_count > 0: failure_reasons = {} for q in complete_questions: if q.get("generation_status") != "success": reason = q.get("error_message", "未知错误") failure_reasons[reason] = failure_reasons.get(reason, 0) + 1 print("\n失败原因统计:") for reason, count in failure_reasons.items(): print(f" {reason}: {count} 道题") if __name__ == "__main__": main()