import json import openai from typing import Dict, Any, List import time import logging from concurrent.futures import ThreadPoolExecutor, as_completed import threading from tqdm import tqdm # 设置日志 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) class QuestionPerplexityAnalyzer: def __init__(self, api_key: str, base_url: str, model_name: str, max_workers: int = 20): """ 初始化题目完整性分析器 Args: api_key: OpenAI API密钥 base_url: API基础URL model_name: 模型名称 max_workers: 最大线程数 """ self.api_key = api_key self.base_url = base_url self.model_name = model_name self.max_workers = max_workers # 线程锁用于保护共享资源 self.lock = threading.Lock() self.processed_count = 0 self.total_count = 0 # 为每个线程创建独立的client self.thread_local = threading.local() # 进度条 self.progress_bar = None def get_client(self): """ 获取线程本地的OpenAI客户端 """ if not hasattr(self.thread_local, 'client'): self.thread_local.client = openai.OpenAI( api_key=self.api_key, base_url=self.base_url ) return self.thread_local.client def create_perplexity_prompt(self, question_data: Dict[str, Any]) -> str: """ 创建用于判断题目完整性的提示词 Args: question_data: 题目数据字典 Returns: str: 格式化的提示词 """ choice_question = question_data.get("choice_question", "") correct_option = question_data.get("correct_option", "") original_question = question_data.get("question", "") extra_info_question = question_data.get("reasoning", "") prompt = f""" 请分析以下选择题是否存在信息不完整的问题,这些问题会让材料科学硕士研究生做题者感到困惑。 转换后的选择题题目: {choice_question} 正确选项: {correct_option} 原始题目: {original_question} 题型、考察知识点与难度:{extra_info_question} 需要检测的困惑类型(不包括材料科学专业学生应该掌握、熟悉、了解的知识点): 1. **指代不明确**: 题目中提到"两种类型"、"这些物质"、"上述材料"等,但没有明确说明具体是什么 2. **缺少关键信息**: 题目中缺少必要的数据、条件或背景信息(不包括材料科学领域学生应该记忆、熟悉、掌握的知识,这正是考点而不是信息缺失) 3. **上下文依赖**: 题目依赖于图表、前文或其他未提供的信息 4. **条件不足**: 解题所需的条件或参数不完整 分析要求: 1. 判断题目是否存在上述困惑问题 2. 如果存在,识别具体的困惑类型和原因 3. 评估困惑程度(轻微、中等、严重) 4. 有一些题目的困惑目的是为了考察学生的能力,不能认为是困惑 5. 仔细识别缺失的信息,有一些题目的缺失信息正是考察学生是否熟悉材料科学专业知识点而故意设计的,不能认为是困惑 5. 凡是不影响考察学生能力的题目都不认为是困惑 输出格式(严格按照JSON格式): {{ "has_perplexity": true/false, "perplexity_type": "困惑类型(如果存在)", "perplexity_level": "困惑程度: mild/moderate/severe", "perplexity_reason": "具体的困惑原因说明", "missing_info": "缺少的关键信息" }} 示例分析: - "Which of the two types of glass has higher viscosity?" → 缺少具体的玻璃类型信息 - "根据上图,计算该材料的密度" → 缺少图表信息 - "这种方法的优势是什么?" → 没有明确指明是哪种方法 """ return prompt def analyze_question_perplexity(self, question_data: Dict[str, Any]) -> Dict[str, Any]: """ 分析单个题目的完整性 Args: question_data: 题目数据字典 Returns: Dict: 包含分析结果的字典 """ question_idx = question_data.get("idx", "N/A") thread_id = threading.current_thread().ident try: # 检查是否有choice_question字段 if not question_data.get("choice_question"): result = { "has_perplexity": False, "perplexity_type": "no_choice_question", "perplexity_level": "none", "perplexity_reason": "没有转换后的选择题题目", "missing_info": "" } else: # 调用AI分析 result = self._call_ai_analysis(question_data) # 更新进度条 with self.lock: self.processed_count += 1 if self.progress_bar: self.progress_bar.update(1) self.progress_bar.set_postfix({ 'current': question_idx, 'thread': f'{thread_id % 10000}', 'perplexity': result.get('has_perplexity', False) }) return result except Exception as e: logger.error(f"[线程-{thread_id}] 分析题目 {question_idx} 失败: {e}") # 更新进度条(即使失败也要更新) with self.lock: self.processed_count += 1 if self.progress_bar: self.progress_bar.update(1) self.progress_bar.set_postfix({ 'current': question_idx, 'thread': f'{thread_id % 10000}', 'status': 'ERROR' }) return { "has_perplexity": True, "perplexity_type": "analysis_error", "perplexity_level": "unknown", "perplexity_reason": f"分析失败: {str(e)}", "missing_info": "" } def _call_ai_analysis(self, question_data: Dict[str, Any]) -> Dict[str, Any]: """ 调用AI进行分析(内部方法) """ max_retries = 3 retry_delay = 1.0 for attempt in range(max_retries): try: client = self.get_client() prompt = self.create_perplexity_prompt(question_data) response = client.chat.completions.create( model=self.model_name, messages=[ { "role": "system", "content": "你是一个教育评估专家,专门分析题目的完整性和清晰度。请仔细分析题目是否存在信息不完整的问题,严格按照要求的JSON格式输出结果。" }, {"role": "user", "content": prompt} ], temperature=0.1, max_tokens=600 ) result_text = response.choices[0].message.content.strip() # 尝试解析JSON结果 try: # 提取JSON部分 json_start = result_text.find('{') json_end = result_text.rfind('}') + 1 if json_start != -1 and json_end > json_start: json_str = result_text[json_start:json_end] result = json.loads(json_str) else: raise ValueError("无法找到有效的JSON格式") return { "has_perplexity": result.get("has_perplexity", False), "perplexity_type": result.get("perplexity_type", ""), "perplexity_level": result.get("perplexity_level", "none"), "perplexity_reason": result.get("perplexity_reason", ""), "missing_info": result.get("missing_info", "") } except (json.JSONDecodeError, ValueError) as e: if attempt == max_retries - 1: # 最后一次尝试 logger.error(f"解析AI响应失败: {e}, 响应内容: {result_text}") return { "has_perplexity": True, "perplexity_type": "parsing_error", "perplexity_level": "unknown", "perplexity_reason": f"AI响应解析失败: {str(e)}", "missing_info": "" } else: logger.warning(f"解析失败,第{attempt+1}次重试...") time.sleep(retry_delay) continue except Exception as e: if attempt == max_retries - 1: # 最后一次尝试 logger.error(f"调用AI接口失败: {e}") return { "has_perplexity": True, "perplexity_type": "api_error", "perplexity_level": "unknown", "perplexity_reason": f"API调用失败: {str(e)}", "missing_info": "" } else: logger.warning(f"API调用失败,第{attempt+1}次重试: {e}") time.sleep(retry_delay * (attempt + 1)) # 递增延迟 continue def process_single_question(self, question: Dict[str, Any]) -> Dict[str, Any]: """ 处理单个题目的包装函数,用于线程池 Args: question: 题目数据字典 Returns: Dict: 处理后的题目数据 """ # 分析题目完整性 perplexity_result = self.analyze_question_perplexity(question) # 保留原有字段并添加新字段 processed_question = question.copy() processed_question["perplexity"] = { "has_perplexity": perplexity_result["has_perplexity"], "perplexity_type": perplexity_result["perplexity_type"], "perplexity_level": perplexity_result["perplexity_level"], "perplexity_reason": perplexity_result["perplexity_reason"], "missing_info": perplexity_result["missing_info"] } return processed_question def process_questions(self, questions: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """ 使用多线程批量处理题目列表 Args: questions: 题目列表 Returns: List: 处理后的题目列表 """ self.total_count = len(questions) self.processed_count = 0 processed_questions = [] logger.info(f"开始使用 {self.max_workers} 个线程分析 {len(questions)} 道题目的完整性...") # 创建进度条 with tqdm( total=len(questions), desc="分析题目完整性", ncols=100, unit="题", bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}] {postfix}" ) as pbar: self.progress_bar = pbar # 使用线程池执行器 with ThreadPoolExecutor(max_workers=self.max_workers) as executor: # 提交所有任务 future_to_question = { executor.submit(self.process_single_question, question): question for question in questions } # 收集结果,保持原有顺序 question_results = {} # 处理完成的任务 for future in as_completed(future_to_question): original_question = future_to_question[future] question_idx = original_question.get("idx", "N/A") try: result = future.result() question_results[question_idx] = result except Exception as exc: logger.error(f"题目 {question_idx} 处理异常: {exc}") # 创建错误结果 error_result = original_question.copy() error_result["perplexity"] = { "has_perplexity": True, "perplexity_type": "processing_error", "perplexity_level": "unknown", "perplexity_reason": f"处理异常: {str(exc)}", "missing_info": "" } question_results[question_idx] = error_result # 手动更新进度条(如果异常没有被正常处理) with self.lock: if self.processed_count < self.total_count: remaining = self.total_count - pbar.n if remaining > 0: pbar.update(remaining) # 确保进度条完成 if pbar.n < pbar.total: pbar.update(pbar.total - pbar.n) # 重置进度条引用 self.progress_bar = None # 按原始顺序重新排列结果 for question in questions: question_idx = question.get("idx", "N/A") if question_idx in question_results: processed_questions.append(question_results[question_idx]) else: # 如果没有找到结果,创建默认错误结果 error_result = question.copy() error_result["perplexity"] = { "has_perplexity": True, "perplexity_type": "missing_result", "perplexity_level": "unknown", "perplexity_reason": "未找到处理结果", "missing_info": "" } processed_questions.append(error_result) logger.info(f"多线程处理完成!总共处理了 {len(processed_questions)} 道题目") return processed_questions def save_results(self, processed_questions: List[Dict[str, Any]], output_file: str): """ 保存处理结果到JSON文件 Args: processed_questions: 处理后的题目列表 output_file: 输出文件路径 """ try: # 保存文件的进度条 with tqdm(desc="保存文件", unit="题", total=len(processed_questions)) as pbar: with open(output_file, 'w', encoding='utf-8') as f: json.dump(processed_questions, f, ensure_ascii=False, indent=2) pbar.update(len(processed_questions)) logger.info(f"结果已保存到: {output_file}") # 输出统计信息 total_questions = len(processed_questions) perplexity_count = sum(1 for q in processed_questions if q.get("perplexity", {}).get("has_perplexity", False)) logger.info(f"完整性分析统计:") logger.info(f"总题目数: {total_questions}") logger.info(f"存在困惑问题的题目数: {perplexity_count}") logger.info(f"困惑率: {perplexity_count/total_questions*100:.1f}%") # 按困惑类型统计 type_stats = {} level_stats = {"mild": 0, "moderate": 0, "severe": 0, "unknown": 0, "none": 0} for q in processed_questions: perplexity_info = q.get("perplexity", {}) if perplexity_info.get("has_perplexity", False): p_type = perplexity_info.get("perplexity_type", "unknown") p_level = perplexity_info.get("perplexity_level", "unknown") type_stats[p_type] = type_stats.get(p_type, 0) + 1 level_stats[p_level] = level_stats.get(p_level, 0) + 1 logger.info("困惑类型统计:") for p_type, count in type_stats.items(): logger.info(f" {p_type}: {count}") logger.info("困惑程度统计:") for level, count in level_stats.items(): if count > 0: logger.info(f" {level}: {count}") # 输出一些困惑题目示例 logger.info("\n困惑题目示例:") example_count = 0 for q in processed_questions: perplexity_info = q.get("perplexity", {}) if perplexity_info.get("has_perplexity", False) and example_count < 3: logger.info(f" 题目: {q.get('choice_question', '')[:80]}...") logger.info(f" 困惑类型: {perplexity_info.get('perplexity_type', '')}") logger.info(f" 困惑程度: {perplexity_info.get('perplexity_level', '')}") logger.info(f" 困惑原因: {perplexity_info.get('perplexity_reason', '')}") logger.info(f" 缺少信息: {perplexity_info.get('missing_info', '')}") logger.info(" " + "-"*50) example_count += 1 except Exception as e: logger.error(f"保存文件失败: {e}") def load_questions(input_file: str) -> List[Dict[str, Any]]: """ 从JSON文件加载题目数据 Args: input_file: 输入文件路径 Returns: List: 题目列表 """ try: with tqdm(desc="加载文件", unit="B", unit_scale=True) as pbar: with open(input_file, 'r', encoding='utf-8') as f: questions = json.load(f) pbar.update(1) logger.info(f"成功加载 {len(questions)} 道题目") return questions except Exception as e: logger.error(f"加载文件失败: {e}") return [] def main(): """ 主函数 - 配置API信息并执行分析 """ # ========== 配置区域 ========== # 请在这里填入您的API配置信息 API_KEY = "sk-oYh3Xrhg8oDY2gW02c966f31C84449Ad86F9Cd9dF6E64a8d" # 填入您的OpenAI API Key BASE_URL = "https://vip.apiyi.com/v1" # 填入API基础URL MODEL_NAME = "deepseek-chat" # 填入模型名称 # 文件路径配置 INPUT_FILE = "/home/ubuntu/50T/LYT/MatBench/layer2/PGEE/code/step5_converted_questions.json" # 输入文件路径 OUTPUT_FILE = "/home/ubuntu/50T/LYT/MatBench/layer2/PGEE/code/step6_perplexity_analyzed_questions.json" # 输出文件路径 # 多线程配置 MAX_WORKERS = 20 # 线程数,根据API限制和系统性能调整 # ============================ # 检查必要的配置 if not all([API_KEY, BASE_URL, MODEL_NAME]): logger.error("请在main函数中配置API_KEY, BASE_URL和MODEL_NAME!") return try: print("🔍 开始题目完整性分析...") # 加载题目数据 questions = load_questions(INPUT_FILE) if not questions: logger.error("没有加载到有效的题目数据") return # target_questions = target_questions[:200] # 调试用 target_questions = questions # 使用全部题目 logger.info(f"筛选出需要分析的题目: {len(target_questions)} 道") # 初始化分析器 analyzer = QuestionPerplexityAnalyzer( api_key=API_KEY, base_url=BASE_URL, model_name=MODEL_NAME, max_workers=MAX_WORKERS ) # 记录开始时间 start_time = time.time() # 处理题目 processed_questions = analyzer.process_questions(target_questions) # 记录结束时间并计算用时 end_time = time.time() total_time = end_time - start_time logger.info(f"处理耗时: {total_time:.2f} 秒 ({total_time/60:.2f} 分钟)") logger.info(f"平均每题处理时间: {total_time/len(target_questions):.2f} 秒") # 保存结果 analyzer.save_results(processed_questions, OUTPUT_FILE) print("✅ 题目完整性分析完成!") except Exception as e: logger.error(f"程序执行失败: {e}") if __name__ == "__main__": main()