import json import openai from typing import Dict, Any, List import time import logging from concurrent.futures import ThreadPoolExecutor, as_completed import threading from queue import Queue from tqdm import tqdm # 设置日志 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) class QuestionConverter: def __init__(self, api_key: str, base_url: str, model_name: str, max_workers: int = 20): """ 初始化转换器 Args: api_key: OpenAI API密钥 base_url: API基础URL model_name: 模型名称 max_workers: 最大线程数 """ self.api_key = api_key self.base_url = base_url self.model_name = model_name self.max_workers = max_workers # 线程锁用于保护共享资源 self.lock = threading.Lock() self.processed_count = 0 self.total_count = 0 # 为每个线程创建独立的client self.thread_local = threading.local() # 进度条 self.progress_bar = None def get_client(self): """ 获取线程本地的OpenAI客户端 """ if not hasattr(self.thread_local, 'client'): self.thread_local.client = openai.OpenAI( api_key=self.api_key, base_url=self.base_url ) return self.thread_local.client def create_conversion_prompt(self, question_data: Dict[str, Any]) -> str: """ 创建用于判断题目转换的提示词 Args: question_data: 题目数据字典 Returns: str: 格式化的提示词 """ question_type = question_data.get("question_type", "") question = question_data.get("question", "") answer = question_data.get("answer", "") prompt = f""" 请分析以下题目是否可以转换为单选题格式,并提取正确选项和转换后的题目。 题目类型: {question_data.get("question_type_name", "")} ({question_type}) 题目: {question} 答案: {answer} 分析要求: 1. 判断题目是否可以转换为单选题格式(是/否) 2. 如果可以转换,请提取或识别正确选项内容 3. 如果可以转换,将原题目整理为适合单选题的题目表述(不要包含选项,只要题目部分) 4. 如果不能转换,说明原因 输出格式(严格按照JSON格式): {{ "convertible": true/false, "correct_option": "正确选项内容(如果可转换)", "choice_question": "转换后的单选题题目(如果可转换,不要选项)", "reason": "判断理由" }} 注意: - 选择题本身已经是单选题格式,标记为可转换,正确选项就是原答案,题目保持原样 - 计算题如果答案是确定的数值或选项,可以转换,需要将题目改写为适合选择的形式 - 简答题如果答案是标准术语或概念,可以转换,需要将题目改写为"下列哪个是..."的形式 - 判断题理论上都可以转换为"正确/错误"的单选题,题目保持原样 转换示例: - 计算题 "计算2+3等于多少?" → "2+3等于:" - 简答题 "什么是光合作用?" → "下列关于光合作用的描述,正确的是:" - 判断题 "地球是圆的。" → "地球是圆的。" """ return prompt def analyze_question(self, question_data: Dict[str, Any]) -> Dict[str, Any]: """ 分析单个题目是否可以转换为单选题 Args: question_data: 题目数据字典 Returns: Dict: 包含转换结果的字典 """ question_idx = question_data.get("idx", "N/A") thread_id = threading.current_thread().ident try: question_type = question_data.get("question_type", "") # 判断题特殊处理:全部标记为可转换 if question_type == "true_false": result = { "convertible": True, "correct_option": question_data.get("answer", ""), "choice_question": question_data.get("question", ""), "reason": "判断题可以转换为正确/错误的单选题格式" } else: # 其他题型调用AI分析 result = self._call_ai_analysis(question_data) # 更新进度条 with self.lock: self.processed_count += 1 if self.progress_bar: self.progress_bar.update(1) self.progress_bar.set_postfix({ 'current': question_idx, 'thread': f'{thread_id % 10000}', 'convertible': result.get('convertible', False) }) return result except Exception as e: logger.error(f"[线程-{thread_id}] 处理题目 {question_idx} 失败: {e}") # 更新进度条(即使失败也要更新) with self.lock: self.processed_count += 1 if self.progress_bar: self.progress_bar.update(1) self.progress_bar.set_postfix({ 'current': question_idx, 'thread': f'{thread_id % 10000}', 'status': 'ERROR' }) return { "convertible": False, "correct_option": "", "choice_question": "", "reason": f"处理失败: {str(e)}" } def _call_ai_analysis(self, question_data: Dict[str, Any]) -> Dict[str, Any]: """ 调用AI进行分析(内部方法) """ max_retries = 3 retry_delay = 1.0 for attempt in range(max_retries): try: client = self.get_client() prompt = self.create_conversion_prompt(question_data) response = client.chat.completions.create( model=self.model_name, messages=[ { "role": "system", "content": "你是一个教育评估专家,专门分析题目格式转换的可行性。请严格按照要求的JSON格式输出结果。" }, {"role": "user", "content": prompt} ], temperature=0.1, max_tokens=800 # 增加token数量以容纳转换后的题目 ) result_text = response.choices[0].message.content.strip() # 尝试解析JSON结果 try: # 提取JSON部分 json_start = result_text.find('{') json_end = result_text.rfind('}') + 1 if json_start != -1 and json_end > json_start: json_str = result_text[json_start:json_end] result = json.loads(json_str) else: raise ValueError("无法找到有效的JSON格式") return { "convertible": result.get("convertible", False), "correct_option": result.get("correct_option", ""), "choice_question": result.get("choice_question", ""), "reason": result.get("reason", "") } except (json.JSONDecodeError, ValueError) as e: if attempt == max_retries - 1: # 最后一次尝试 logger.error(f"解析AI响应失败: {e}, 响应内容: {result_text}") return { "convertible": False, "correct_option": "", "choice_question": "", "reason": f"AI响应解析失败: {str(e)}" } else: logger.warning(f"解析失败,第{attempt+1}次重试...") time.sleep(retry_delay) continue except Exception as e: if attempt == max_retries - 1: # 最后一次尝试 logger.error(f"调用AI接口失败: {e}") return { "convertible": False, "correct_option": "", "choice_question": "", "reason": f"API调用失败: {str(e)}" } else: logger.warning(f"API调用失败,第{attempt+1}次重试: {e}") time.sleep(retry_delay * (attempt + 1)) # 递增延迟 continue def process_single_question(self, question: Dict[str, Any]) -> Dict[str, Any]: """ 处理单个题目的包装函数,用于线程池 Args: question: 题目数据字典 Returns: Dict: 处理后的题目数据 """ # 分析题目转换可行性 conversion_result = self.analyze_question(question) # 保留原有字段并添加新字段 processed_question = question.copy() processed_question["convertible"] = conversion_result["convertible"] processed_question["correct_option"] = conversion_result["correct_option"] processed_question["choice_question"] = conversion_result["choice_question"] processed_question["conversion_reason"] = conversion_result["reason"] return processed_question def process_questions(self, questions: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """ 使用多线程批量处理题目列表 Args: questions: 题目列表 Returns: List: 处理后的题目列表 """ self.total_count = len(questions) self.processed_count = 0 processed_questions = [] logger.info(f"开始使用 {self.max_workers} 个线程处理 {len(questions)} 道题目...") # 创建进度条 with tqdm( total=len(questions), desc="处理题目", ncols=100, unit="题", bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}] {postfix}" ) as pbar: self.progress_bar = pbar # 使用线程池执行器 with ThreadPoolExecutor(max_workers=self.max_workers) as executor: # 提交所有任务 future_to_question = { executor.submit(self.process_single_question, question): question for question in questions } # 收集结果,保持原有顺序 question_results = {} # 处理完成的任务 for future in as_completed(future_to_question): original_question = future_to_question[future] question_idx = original_question.get("idx", "N/A") try: result = future.result() question_results[question_idx] = result except Exception as exc: logger.error(f"题目 {question_idx} 处理异常: {exc}") # 创建错误结果 error_result = original_question.copy() error_result.update({ "convertible": False, "correct_option": "", "choice_question": "", "conversion_reason": f"处理异常: {str(exc)}" }) question_results[question_idx] = error_result # 手动更新进度条(如果异常没有被正常处理) with self.lock: if self.processed_count < self.total_count: remaining = self.total_count - pbar.n if remaining > 0: pbar.update(remaining) # 确保进度条完成 if pbar.n < pbar.total: pbar.update(pbar.total - pbar.n) # 重置进度条引用 self.progress_bar = None # 按原始顺序重新排列结果 for question in questions: question_idx = question.get("idx", "N/A") if question_idx in question_results: processed_questions.append(question_results[question_idx]) else: # 如果没有找到结果,创建默认错误结果 error_result = question.copy() error_result.update({ "convertible": False, "correct_option": "", "choice_question": "", "conversion_reason": "未找到处理结果" }) processed_questions.append(error_result) logger.info(f"多线程处理完成!总共处理了 {len(processed_questions)} 道题目") return processed_questions def save_results(self, processed_questions: List[Dict[str, Any]], output_file: str): """ 保存处理结果到JSON文件 Args: processed_questions: 处理后的题目列表 output_file: 输出文件路径 """ try: # 保存文件的进度条 with tqdm(desc="保存文件", unit="题", total=len(processed_questions)) as pbar: with open(output_file, 'w', encoding='utf-8') as f: json.dump(processed_questions, f, ensure_ascii=False, indent=2) pbar.update(len(processed_questions)) logger.info(f"结果已保存到: {output_file}") # 输出统计信息 total_questions = len(processed_questions) convertible_count = sum(1 for q in processed_questions if q.get("convertible", False)) logger.info(f"处理完成统计:") logger.info(f"总题目数: {total_questions}") logger.info(f"可转换题目数: {convertible_count}") logger.info(f"转换率: {convertible_count/total_questions*100:.1f}%") # 按题型统计 type_stats = {} for q in processed_questions: q_type = q.get("question_type_name", "未知") if q_type not in type_stats: type_stats[q_type] = {"total": 0, "convertible": 0} type_stats[q_type]["total"] += 1 if q.get("convertible", False): type_stats[q_type]["convertible"] += 1 logger.info("各题型转换统计:") for q_type, stats in type_stats.items(): rate = stats["convertible"] / stats["total"] * 100 if stats["total"] > 0 else 0 logger.info(f" {q_type}: {stats['convertible']}/{stats['total']} ({rate:.1f}%)") # 输出一些转换示例 logger.info("\n转换示例:") example_count = 0 for q in processed_questions: if q.get("convertible", False) and example_count < 3: logger.info(f" 原题目: {q.get('question', '')[:50]}...") logger.info(f" 转换后: {q.get('choice_question', '')[:50]}...") logger.info(f" 正确选项: {q.get('correct_option', '')}") logger.info(f" 理由: {q.get('conversion_reason', '')}") logger.info(" " + "-"*50) example_count += 1 except Exception as e: logger.error(f"保存文件失败: {e}") def load_questions(input_file: str) -> List[Dict[str, Any]]: """ 从JSON文件加载题目数据 Args: input_file: 输入文件路径 Returns: List: 题目列表 """ try: with tqdm(desc="加载文件", unit="B", unit_scale=True) as pbar: with open(input_file, 'r', encoding='utf-8') as f: questions = json.load(f) pbar.update(1) logger.info(f"成功加载 {len(questions)} 道题目") return questions except Exception as e: logger.error(f"加载文件失败: {e}") return [] def main(): """ 主函数 - 配置API信息并执行转换 """ # ========== 配置区域 ========== # 请在这里填入您的API配置信息 API_KEY = "sk-oYh3Xrhg8oDY2gW02c966f31C84449Ad86F9Cd9dF6E64a8d" # 填入您的OpenAI API Key BASE_URL = "https://vip.apiyi.com/v1" # 填入API基础URL,如 "https://api.openai.com/v1" MODEL_NAME = "deepseek-chat" # 填入模型名称,如 "gpt-4o-mini", "gpt-3.5-turbo"等 # 文件路径配置 INPUT_FILE = "/home/ubuntu/50T/LYT/MatBench/layer2/PGEE/code/step4_enhanced_classified_questions.json" # 输入文件路径 OUTPUT_FILE = "/home/ubuntu/50T/LYT/MatBench/layer2/PGEE/code/step5_converted_questions.json" # 输出文件路径 # 多线程配置 MAX_WORKERS = 20 # 线程数,根据API限制和系统性能调整 # ============================ # 检查必要的配置 if not all([API_KEY, BASE_URL, MODEL_NAME]): logger.error("请在main函数中配置API_KEY, BASE_URL和MODEL_NAME!") return try: print("🚀 开始题目转换分析...") # 加载题目数据 questions = load_questions(INPUT_FILE) if not questions: logger.error("没有加载到有效的题目数据") return # 过滤掉判断题(根据需求,判断题单独处理) target_questions = [] for q in questions: q_type = q.get("question_type", "") # 包含选择题、计算题、简答题,以及判断题(统一处理) if q_type in ["multiple_choice", "calculation", "short_answer", "true_false"]: target_questions.append(q) # target_questions = target_questions[:200] # 调试用 logger.info(f"筛选出需要处理的题目: {len(target_questions)} 道") # 初始化转换器 converter = QuestionConverter( api_key=API_KEY, base_url=BASE_URL, model_name=MODEL_NAME, max_workers=MAX_WORKERS ) # 记录开始时间 start_time = time.time() # 处理题目 processed_questions = converter.process_questions(target_questions) # 记录结束时间并计算用时 end_time = time.time() total_time = end_time - start_time logger.info(f"处理耗时: {total_time:.2f} 秒 ({total_time/60:.2f} 分钟)") logger.info(f"平均每题处理时间: {total_time/len(target_questions):.2f} 秒") # 保存结果 converter.save_results(processed_questions, OUTPUT_FILE) print("✅ 题目转换分析完成!") except Exception as e: logger.error(f"程序执行失败: {e}") if __name__ == "__main__": main()