判断所有的问题是否可以转换为单选和判断题;删掉xlsx文件

This commit is contained in:
lzy
2025-05-29 14:49:32 +08:00
parent a28774f6f0
commit 998c740df7
5 changed files with 3703 additions and 54974 deletions

View File

@@ -0,0 +1,501 @@
import json
import openai
from typing import Dict, Any, List
import time
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
from queue import Queue
from tqdm import tqdm
# 设置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class QuestionConverter:
def __init__(self, api_key: str, base_url: str, model_name: str, max_workers: int = 20):
"""
初始化转换器
Args:
api_key: OpenAI API密钥
base_url: API基础URL
model_name: 模型名称
max_workers: 最大线程数
"""
self.api_key = api_key
self.base_url = base_url
self.model_name = model_name
self.max_workers = max_workers
# 线程锁用于保护共享资源
self.lock = threading.Lock()
self.processed_count = 0
self.total_count = 0
# 为每个线程创建独立的client
self.thread_local = threading.local()
# 进度条
self.progress_bar = None
def get_client(self):
"""
获取线程本地的OpenAI客户端
"""
if not hasattr(self.thread_local, 'client'):
self.thread_local.client = openai.OpenAI(
api_key=self.api_key,
base_url=self.base_url
)
return self.thread_local.client
def create_conversion_prompt(self, question_data: Dict[str, Any]) -> str:
"""
创建用于判断题目转换的提示词
Args:
question_data: 题目数据字典
Returns:
str: 格式化的提示词
"""
question_type = question_data.get("question_type", "")
question = question_data.get("question", "")
answer = question_data.get("answer", "")
prompt = f"""
请分析以下题目是否可以转换为单选题格式,并提取正确选项和转换后的题目。
题目类型: {question_data.get("question_type_name", "")} ({question_type})
题目: {question}
答案: {answer}
分析要求:
1. 判断题目是否可以转换为单选题格式(是/否)
2. 如果可以转换,请提取或识别正确选项内容
3. 如果可以转换,将原题目整理为适合单选题的题目表述(不要包含选项,只要题目部分)
4. 如果不能转换,说明原因
输出格式严格按照JSON格式
{{
"convertible": true/false,
"correct_option": "正确选项内容(如果可转换)",
"choice_question": "转换后的单选题题目(如果可转换,不要选项)",
"reason": "判断理由"
}}
注意:
- 选择题本身已经是单选题格式,标记为可转换,正确选项就是原答案,题目保持原样
- 计算题如果答案是确定的数值或选项,可以转换,需要将题目改写为适合选择的形式
- 简答题如果答案是标准术语或概念,可以转换,需要将题目改写为"下列哪个是..."的形式
- 判断题理论上都可以转换为"正确/错误"的单选题,题目保持原样
转换示例:
- 计算题 "计算2+3等于多少""2+3等于"
- 简答题 "什么是光合作用?""下列关于光合作用的描述,正确的是:"
- 判断题 "地球是圆的。""地球是圆的。"
"""
return prompt
def analyze_question(self, question_data: Dict[str, Any]) -> Dict[str, Any]:
"""
分析单个题目是否可以转换为单选题
Args:
question_data: 题目数据字典
Returns:
Dict: 包含转换结果的字典
"""
question_idx = question_data.get("idx", "N/A")
thread_id = threading.current_thread().ident
try:
question_type = question_data.get("question_type", "")
# 判断题特殊处理:全部标记为可转换
if question_type == "true_false":
result = {
"convertible": True,
"correct_option": question_data.get("answer", ""),
"choice_question": question_data.get("question", ""),
"reason": "判断题可以转换为正确/错误的单选题格式"
}
else:
# 其他题型调用AI分析
result = self._call_ai_analysis(question_data)
# 更新进度条
with self.lock:
self.processed_count += 1
if self.progress_bar:
self.progress_bar.update(1)
self.progress_bar.set_postfix({
'current': question_idx,
'thread': f'{thread_id % 10000}',
'convertible': result.get('convertible', False)
})
return result
except Exception as e:
logger.error(f"[线程-{thread_id}] 处理题目 {question_idx} 失败: {e}")
# 更新进度条(即使失败也要更新)
with self.lock:
self.processed_count += 1
if self.progress_bar:
self.progress_bar.update(1)
self.progress_bar.set_postfix({
'current': question_idx,
'thread': f'{thread_id % 10000}',
'status': 'ERROR'
})
return {
"convertible": False,
"correct_option": "",
"choice_question": "",
"reason": f"处理失败: {str(e)}"
}
def _call_ai_analysis(self, question_data: Dict[str, Any]) -> Dict[str, Any]:
"""
调用AI进行分析内部方法
"""
max_retries = 3
retry_delay = 1.0
for attempt in range(max_retries):
try:
client = self.get_client()
prompt = self.create_conversion_prompt(question_data)
response = client.chat.completions.create(
model=self.model_name,
messages=[
{
"role": "system",
"content": "你是一个教育评估专家专门分析题目格式转换的可行性。请严格按照要求的JSON格式输出结果。"
},
{"role": "user", "content": prompt}
],
temperature=0.1,
max_tokens=800 # 增加token数量以容纳转换后的题目
)
result_text = response.choices[0].message.content.strip()
# 尝试解析JSON结果
try:
# 提取JSON部分
json_start = result_text.find('{')
json_end = result_text.rfind('}') + 1
if json_start != -1 and json_end > json_start:
json_str = result_text[json_start:json_end]
result = json.loads(json_str)
else:
raise ValueError("无法找到有效的JSON格式")
return {
"convertible": result.get("convertible", False),
"correct_option": result.get("correct_option", ""),
"choice_question": result.get("choice_question", ""),
"reason": result.get("reason", "")
}
except (json.JSONDecodeError, ValueError) as e:
if attempt == max_retries - 1: # 最后一次尝试
logger.error(f"解析AI响应失败: {e}, 响应内容: {result_text}")
return {
"convertible": False,
"correct_option": "",
"choice_question": "",
"reason": f"AI响应解析失败: {str(e)}"
}
else:
logger.warning(f"解析失败,第{attempt+1}次重试...")
time.sleep(retry_delay)
continue
except Exception as e:
if attempt == max_retries - 1: # 最后一次尝试
logger.error(f"调用AI接口失败: {e}")
return {
"convertible": False,
"correct_option": "",
"choice_question": "",
"reason": f"API调用失败: {str(e)}"
}
else:
logger.warning(f"API调用失败{attempt+1}次重试: {e}")
time.sleep(retry_delay * (attempt + 1)) # 递增延迟
continue
def process_single_question(self, question: Dict[str, Any]) -> Dict[str, Any]:
"""
处理单个题目的包装函数,用于线程池
Args:
question: 题目数据字典
Returns:
Dict: 处理后的题目数据
"""
# 分析题目转换可行性
conversion_result = self.analyze_question(question)
# 保留原有字段并添加新字段
processed_question = question.copy()
processed_question["convertible"] = conversion_result["convertible"]
processed_question["correct_option"] = conversion_result["correct_option"]
processed_question["choice_question"] = conversion_result["choice_question"]
processed_question["conversion_reason"] = conversion_result["reason"]
return processed_question
def process_questions(self, questions: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
使用多线程批量处理题目列表
Args:
questions: 题目列表
Returns:
List: 处理后的题目列表
"""
self.total_count = len(questions)
self.processed_count = 0
processed_questions = []
logger.info(f"开始使用 {self.max_workers} 个线程处理 {len(questions)} 道题目...")
# 创建进度条
with tqdm(
total=len(questions),
desc="处理题目",
ncols=100,
unit="",
bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}] {postfix}"
) as pbar:
self.progress_bar = pbar
# 使用线程池执行器
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
# 提交所有任务
future_to_question = {
executor.submit(self.process_single_question, question): question
for question in questions
}
# 收集结果,保持原有顺序
question_results = {}
# 处理完成的任务
for future in as_completed(future_to_question):
original_question = future_to_question[future]
question_idx = original_question.get("idx", "N/A")
try:
result = future.result()
question_results[question_idx] = result
except Exception as exc:
logger.error(f"题目 {question_idx} 处理异常: {exc}")
# 创建错误结果
error_result = original_question.copy()
error_result.update({
"convertible": False,
"correct_option": "",
"choice_question": "",
"conversion_reason": f"处理异常: {str(exc)}"
})
question_results[question_idx] = error_result
# 手动更新进度条(如果异常没有被正常处理)
with self.lock:
if self.processed_count < self.total_count:
remaining = self.total_count - pbar.n
if remaining > 0:
pbar.update(remaining)
# 确保进度条完成
if pbar.n < pbar.total:
pbar.update(pbar.total - pbar.n)
# 重置进度条引用
self.progress_bar = None
# 按原始顺序重新排列结果
for question in questions:
question_idx = question.get("idx", "N/A")
if question_idx in question_results:
processed_questions.append(question_results[question_idx])
else:
# 如果没有找到结果,创建默认错误结果
error_result = question.copy()
error_result.update({
"convertible": False,
"correct_option": "",
"choice_question": "",
"conversion_reason": "未找到处理结果"
})
processed_questions.append(error_result)
logger.info(f"多线程处理完成!总共处理了 {len(processed_questions)} 道题目")
return processed_questions
def save_results(self, processed_questions: List[Dict[str, Any]],
output_file: str):
"""
保存处理结果到JSON文件
Args:
processed_questions: 处理后的题目列表
output_file: 输出文件路径
"""
try:
# 保存文件的进度条
with tqdm(desc="保存文件", unit="", total=len(processed_questions)) as pbar:
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(processed_questions, f, ensure_ascii=False, indent=2)
pbar.update(len(processed_questions))
logger.info(f"结果已保存到: {output_file}")
# 输出统计信息
total_questions = len(processed_questions)
convertible_count = sum(1 for q in processed_questions if q.get("convertible", False))
logger.info(f"处理完成统计:")
logger.info(f"总题目数: {total_questions}")
logger.info(f"可转换题目数: {convertible_count}")
logger.info(f"转换率: {convertible_count/total_questions*100:.1f}%")
# 按题型统计
type_stats = {}
for q in processed_questions:
q_type = q.get("question_type_name", "未知")
if q_type not in type_stats:
type_stats[q_type] = {"total": 0, "convertible": 0}
type_stats[q_type]["total"] += 1
if q.get("convertible", False):
type_stats[q_type]["convertible"] += 1
logger.info("各题型转换统计:")
for q_type, stats in type_stats.items():
rate = stats["convertible"] / stats["total"] * 100 if stats["total"] > 0 else 0
logger.info(f" {q_type}: {stats['convertible']}/{stats['total']} ({rate:.1f}%)")
# 输出一些转换示例
logger.info("\n转换示例:")
example_count = 0
for q in processed_questions:
if q.get("convertible", False) and example_count < 3:
logger.info(f" 原题目: {q.get('question', '')[:50]}...")
logger.info(f" 转换后: {q.get('choice_question', '')[:50]}...")
logger.info(f" 正确选项: {q.get('correct_option', '')}")
logger.info(f" 理由: {q.get('conversion_reason', '')}")
logger.info(" " + "-"*50)
example_count += 1
except Exception as e:
logger.error(f"保存文件失败: {e}")
def load_questions(input_file: str) -> List[Dict[str, Any]]:
"""
从JSON文件加载题目数据
Args:
input_file: 输入文件路径
Returns:
List: 题目列表
"""
try:
with tqdm(desc="加载文件", unit="B", unit_scale=True) as pbar:
with open(input_file, 'r', encoding='utf-8') as f:
questions = json.load(f)
pbar.update(1)
logger.info(f"成功加载 {len(questions)} 道题目")
return questions
except Exception as e:
logger.error(f"加载文件失败: {e}")
return []
def main():
"""
主函数 - 配置API信息并执行转换
"""
# ========== 配置区域 ==========
# 请在这里填入您的API配置信息
API_KEY = "sk-oYh3Xrhg8oDY2gW02c966f31C84449Ad86F9Cd9dF6E64a8d" # 填入您的OpenAI API Key
BASE_URL = "https://vip.apiyi.com/v1" # 填入API基础URL如 "https://api.openai.com/v1"
MODEL_NAME = "deepseek-chat" # 填入模型名称,如 "gpt-4o-mini", "gpt-3.5-turbo"等
# 文件路径配置
INPUT_FILE = "/home/ubuntu/50T/LYT/MatBench/layer2/PGEE/code/step4_enhanced_classified_questions.json" # 输入文件路径
OUTPUT_FILE = "/home/ubuntu/50T/LYT/MatBench/layer2/PGEE/code/step5_converted_questions.json" # 输出文件路径
# 多线程配置
MAX_WORKERS = 20 # 线程数根据API限制和系统性能调整
# ============================
# 检查必要的配置
if not all([API_KEY, BASE_URL, MODEL_NAME]):
logger.error("请在main函数中配置API_KEY, BASE_URL和MODEL_NAME!")
return
try:
print("🚀 开始题目转换分析...")
# 加载题目数据
questions = load_questions(INPUT_FILE)
if not questions:
logger.error("没有加载到有效的题目数据")
return
# 过滤掉判断题(根据需求,判断题单独处理)
target_questions = []
for q in questions:
q_type = q.get("question_type", "")
# 包含选择题、计算题、简答题,以及判断题(统一处理)
if q_type in ["multiple_choice", "calculation", "short_answer", "true_false"]:
target_questions.append(q)
# target_questions = target_questions[:200] # 调试用
logger.info(f"筛选出需要处理的题目: {len(target_questions)}")
# 初始化转换器
converter = QuestionConverter(
api_key=API_KEY,
base_url=BASE_URL,
model_name=MODEL_NAME,
max_workers=MAX_WORKERS
)
# 记录开始时间
start_time = time.time()
# 处理题目
processed_questions = converter.process_questions(target_questions)
# 记录结束时间并计算用时
end_time = time.time()
total_time = end_time - start_time
logger.info(f"处理耗时: {total_time:.2f} 秒 ({total_time/60:.2f} 分钟)")
logger.info(f"平均每题处理时间: {total_time/len(target_questions):.2f}")
# 保存结果
converter.save_results(processed_questions, OUTPUT_FILE)
print("✅ 题目转换分析完成!")
except Exception as e:
logger.error(f"程序执行失败: {e}")
if __name__ == "__main__":
main()