Files
MatBench/layer2/PGEE/code/step4_high_quality.py

507 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import pandas as pd
from typing import List, Dict, Any, Tuple
from collections import defaultdict, Counter
import numpy as np
class QuestionFilterAndSelector:
"""题目筛选和选择器 - 用于构建高质量评测集"""
def __init__(self):
# 各题型的最低难度阈值(排除过于简单的题目)
self.min_difficulty_thresholds = {
"calculation": 2, # 计算题排除难度1的基础计算
"short_answer": 2, # 简答题排除难度1的简单记忆
"true_false": 2, # 判断题排除难度1的基础概念判断
"multiple_choice": 2 # 选择题排除难度1的简单选择
}
# 各知识层次的最低难度要求
self.knowledge_level_min_difficulty = {
"basic_concepts": 2, # 基础概念至少难度2需要理解不只是记忆
"simple_application": 2, # 简单应用至少难度2
"medium_application": 2, # 中等应用至少难度2
"complex_analysis": 3, # 复杂分析至少难度3
"advanced_synthesis": 4 # 高级综合至少难度4
}
# 目标题库结构(百分比)
self.target_distribution = {
"question_types": {
"calculation": 0.25, # 25% 计算题
"short_answer": 0.45, # 45% 简答题
"true_false": 0.15, # 15% 判断题
"multiple_choice": 0.15 # 15% 选择题
},
"knowledge_levels": {
"basic_concepts": 0.20, # 20% 基础概念但要求难度2+
"simple_application": 0.25, # 25% 简单应用
"medium_application": 0.30, # 30% 中等应用
"complex_analysis": 0.20, # 20% 复杂分析
"advanced_synthesis": 0.05 # 5% 高级综合
},
"difficulty_levels": {
1: 0.05, # 5% 难度1仅保留最有价值的
2: 0.25, # 25% 难度2
3: 0.35, # 35% 难度3
4: 0.25, # 25% 难度4
5: 0.10 # 10% 难度5
}
}
def filter_questions_by_quality(self, questions: List[Dict]) -> List[Dict]:
"""第一步:按质量标准过滤题目"""
filtered_questions = []
for q in questions:
# 基础质量检查
if not self._is_valid_question(q):
continue
question_type = q.get('question_type', '')
knowledge_level = q.get('knowledge_level', '')
difficulty = q.get('difficulty', 0)
# 应用题型最低难度阈值
min_type_difficulty = self.min_difficulty_thresholds.get(question_type, 1)
if difficulty < min_type_difficulty:
continue
# 应用知识层次最低难度要求
min_level_difficulty = self.knowledge_level_min_difficulty.get(knowledge_level, 1)
if difficulty < min_level_difficulty:
continue
# 特殊过滤规则
if self._should_exclude_question(q):
continue
filtered_questions.append(q)
return filtered_questions
def _is_valid_question(self, q: Dict) -> bool:
"""检查题目的基本有效性"""
required_fields = ['question', 'answer', 'question_type', 'knowledge_level', 'difficulty']
for field in required_fields:
if not q.get(field):
return False
# 检查题目和答案长度(排除过短的)
if len(q['question'].strip()) < 20: # 题目太短
return False
if len(q['answer'].strip()) < 5: # 答案太短
return False
return True
def _should_exclude_question(self, q: Dict) -> bool:
"""特殊排除规则"""
question_type = q.get('question_type', '')
knowledge_level = q.get('knowledge_level', '')
difficulty = q.get('difficulty', 0)
# 排除一些特定的低质量组合
exclude_combinations = [
# 基础概念+难度1的组合纯记忆
(knowledge_level == 'basic_concepts' and difficulty == 1),
# 选择题+基础概念+低难度的组合
(question_type == 'multiple_choice' and
knowledge_level == 'basic_concepts' and difficulty <= 2),
# 判断题+基础概念+难度1的组合
(question_type == 'true_false' and
knowledge_level == 'basic_concepts' and difficulty == 1),
]
return any(exclude_combinations)
def smart_sample_questions(self, filtered_questions: List[Dict],
target_count: int = 2000) -> List[Dict]:
"""智能抽样,保持分布平衡"""
# 按类别分组
grouped_questions = self._group_questions_by_categories(filtered_questions)
# 计算目标数量
target_counts = self._calculate_target_counts(target_count)
# 分层抽样
selected_questions = []
# 1. 按题型分层抽样
selected_by_type = self._stratified_sample_by_type(
grouped_questions, target_counts, target_count
)
# 2. 在每个题型内按知识层次和难度平衡抽样
final_selected = self._balance_within_types(selected_by_type, target_count)
return final_selected
def _group_questions_by_categories(self, questions: List[Dict]) -> Dict:
"""按多个维度对题目分组"""
grouped = {
'by_type': defaultdict(list),
'by_level': defaultdict(list),
'by_difficulty': defaultdict(list),
'by_type_level': defaultdict(lambda: defaultdict(list)),
'by_type_difficulty': defaultdict(lambda: defaultdict(list))
}
for q in questions:
qtype = q['question_type']
level = q['knowledge_level']
difficulty = q['difficulty']
grouped['by_type'][qtype].append(q)
grouped['by_level'][level].append(q)
grouped['by_difficulty'][difficulty].append(q)
grouped['by_type_level'][qtype][level].append(q)
grouped['by_type_difficulty'][qtype][difficulty].append(q)
return grouped
def _calculate_target_counts(self, total_target: int) -> Dict:
"""计算各类别的目标数量"""
return {
'by_type': {
qtype: int(total_target * ratio)
for qtype, ratio in self.target_distribution['question_types'].items()
},
'by_level': {
level: int(total_target * ratio)
for level, ratio in self.target_distribution['knowledge_levels'].items()
},
'by_difficulty': {
diff: int(total_target * ratio)
for diff, ratio in self.target_distribution['difficulty_levels'].items()
}
}
def _stratified_sample_by_type(self, grouped_questions: Dict,
target_counts: Dict, total_target: int) -> Dict:
"""按题型分层抽样"""
selected_by_type = {}
for qtype, target_count in target_counts['by_type'].items():
available_questions = grouped_questions['by_type'].get(qtype, [])
if len(available_questions) <= target_count:
# 如果可用题目不足,全部选择
selected_by_type[qtype] = available_questions
else:
# 在该题型内进行智能抽样
selected_by_type[qtype] = self._smart_sample_within_type(
available_questions, target_count
)
return selected_by_type
def _smart_sample_within_type(self, questions: List[Dict], target_count: int) -> List[Dict]:
"""在单一题型内智能抽样"""
# 按难度和知识层次分组
by_difficulty = defaultdict(list)
by_level = defaultdict(list)
for q in questions:
by_difficulty[q['difficulty']].append(q)
by_level[q['knowledge_level']].append(q)
selected = []
# 优先选择高难度题目
difficulty_priorities = [5, 4, 3, 2, 1]
remaining_target = target_count
for difficulty in difficulty_priorities:
if remaining_target <= 0:
break
diff_questions = by_difficulty[difficulty]
if not diff_questions:
continue
# 在该难度级别内按知识层次平衡选择
level_groups = defaultdict(list)
for q in diff_questions:
level_groups[q['knowledge_level']].append(q)
# 计算该难度级别应该选多少题
target_for_this_diff = min(remaining_target,
int(remaining_target * 0.4) if difficulty >= 4
else int(remaining_target * 0.3))
# 在各知识层次间分配
selected_from_diff = self._distribute_across_levels(
level_groups, target_for_this_diff
)
selected.extend(selected_from_diff)
remaining_target -= len(selected_from_diff)
# 如果还没达到目标数量,随机补充
if len(selected) < target_count:
remaining_questions = [q for q in questions if q not in selected]
additional_needed = target_count - len(selected)
if remaining_questions:
import random
additional = random.sample(
remaining_questions,
min(additional_needed, len(remaining_questions))
)
selected.extend(additional)
return selected[:target_count]
def _distribute_across_levels(self, level_groups: Dict, target_count: int) -> List[Dict]:
"""在知识层次间分配题目"""
if not level_groups or target_count <= 0:
return []
selected = []
# 知识层次优先级(优先选择更高层次的)
level_priorities = [
'advanced_synthesis',
'complex_analysis',
'medium_application',
'simple_application',
'basic_concepts'
]
# 为每个层次分配配额
level_quotas = {}
remaining_target = target_count
for level in level_priorities:
if level not in level_groups or remaining_target <= 0:
continue
available_count = len(level_groups[level])
if level in ['advanced_synthesis', 'complex_analysis']:
quota = min(available_count, max(1, int(remaining_target * 0.4)))
elif level == 'medium_application':
quota = min(available_count, max(1, int(remaining_target * 0.3)))
else:
quota = min(available_count, max(1, int(remaining_target * 0.2)))
level_quotas[level] = quota
remaining_target -= quota
# 按配额选择
import random
for level, quota in level_quotas.items():
if quota > 0 and level in level_groups:
sample_size = min(quota, len(level_groups[level]))
selected.extend(random.sample(level_groups[level], sample_size))
return selected
def _balance_within_types(self, selected_by_type: Dict, target_count: int) -> List[Dict]:
"""在题型选择结果间进行最终平衡"""
all_selected = []
for questions in selected_by_type.values():
all_selected.extend(questions)
# 如果总数超过目标,需要进一步筛选
if len(all_selected) > target_count:
# 按综合质量评分排序
scored_questions = [(q, self._calculate_quality_score(q)) for q in all_selected]
scored_questions.sort(key=lambda x: x[1], reverse=True)
all_selected = [q for q, score in scored_questions[:target_count]]
return all_selected
def _calculate_quality_score(self, question: Dict) -> float:
"""计算题目质量评分"""
score = 0.0
# 难度权重
difficulty = question.get('difficulty', 1)
score += difficulty * 2.0
# 知识层次权重
level_weights = {
'basic_concepts': 1.0,
'simple_application': 2.0,
'medium_application': 3.0,
'complex_analysis': 4.0,
'advanced_synthesis': 5.0
}
score += level_weights.get(question.get('knowledge_level', ''), 1.0)
# 题型权重(简答题和计算题更有价值)
type_weights = {
'short_answer': 2.0,
'calculation': 2.0,
'multiple_choice': 1.5,
'true_false': 1.0
}
score += type_weights.get(question.get('question_type', ''), 1.0)
# 题目长度权重(更详细的题目更有价值)
question_length = len(question.get('question', ''))
if question_length > 100:
score += 1.0
elif question_length > 200:
score += 2.0
return score
def analyze_selection_results(self, original_questions: List[Dict],
selected_questions: List[Dict]) -> Dict:
"""分析选择结果"""
def get_distribution(questions):
total = len(questions)
if total == 0:
return {}
dist = {
'total': total,
'by_type': Counter(q.get('question_type', '') for q in questions),
'by_level': Counter(q.get('knowledge_level', '') for q in questions),
'by_difficulty': Counter(q.get('difficulty', 0) for q in questions),
'avg_difficulty': np.mean([q.get('difficulty', 0) for q in questions])
}
# 转换为百分比
for key in ['by_type', 'by_level', 'by_difficulty']:
dist[key + '_pct'] = {
k: v/total*100 for k, v in dist[key].items()
}
return dist
original_dist = get_distribution(original_questions)
selected_dist = get_distribution(selected_questions)
return {
'original': original_dist,
'selected': selected_dist,
'selection_ratio': len(selected_questions) / len(original_questions) if original_questions else 0,
'difficulty_improvement': selected_dist['avg_difficulty'] - original_dist['avg_difficulty']
}
def print_selection_report(self, analysis_results: Dict):
"""打印选择报告"""
print("\n" + "="*60)
print("题目筛选结果报告")
print("="*60)
original = analysis_results['original']
selected = analysis_results['selected']
print(f"\n📊 基本统计:")
print(f"原始题目数: {original['total']}")
print(f"筛选后题目数: {selected['total']}")
print(f"筛选比例: {analysis_results['selection_ratio']:.1%}")
print(f"平均难度提升: {analysis_results['difficulty_improvement']:.2f}")
print(f"\n📈 题型分布对比:")
print(f"{'题型':<15} {'原始':<10} {'筛选后':<10} {'变化':<10}")
print("-" * 50)
for qtype in ['calculation', 'short_answer', 'true_false', 'multiple_choice']:
orig_pct = original['by_type_pct'].get(qtype, 0)
sel_pct = selected['by_type_pct'].get(qtype, 0)
change = sel_pct - orig_pct
print(f"{qtype:<15} {orig_pct:>7.1f}% {sel_pct:>8.1f}% {change:>+7.1f}%")
print(f"\n🎯 难度分布对比:")
print(f"{'难度':<8} {'原始':<10} {'筛选后':<10} {'变化':<10}")
print("-" * 40)
for diff in range(1, 6):
orig_pct = original['by_difficulty_pct'].get(diff, 0)
sel_pct = selected['by_difficulty_pct'].get(diff, 0)
change = sel_pct - orig_pct
print(f"难度{diff:<3} {orig_pct:>7.1f}% {sel_pct:>8.1f}% {change:>+7.1f}%")
def main_filter_questions():
"""主函数:筛选高质量题目"""
# 文件路径
INPUT_FILE = "/home/ubuntu/50T/LYT/MatBench/layer2/PGEE/code/step4_enhanced_classified_questions.json" # 分类后的题目文件
OUTPUT_FILE = "/home/ubuntu/50T/LYT/MatBench/layer2/PGEE/code/step4_filtered_high_quality_questions.json" # 筛选后的输出文件
ANALYSIS_FILE = "/home/ubuntu/50T/LYT/MatBench/layer2/PGEE/code/step4_selection_analysis.xlsx" # 分析报告
# 加载数据
print("正在加载已分类的题目...")
with open(INPUT_FILE, 'r', encoding='utf-8') as f:
all_questions = json.load(f)
print(f"加载了 {len(all_questions)} 道题目")
# 初始化筛选器
selector = QuestionFilterAndSelector()
# 第一步:质量过滤
print("\n第一步:按质量标准过滤题目...")
filtered_questions = selector.filter_questions_by_quality(all_questions)
print(f"质量过滤后剩余: {len(filtered_questions)} 道题目")
# 第二步:智能抽样
print("\n第二步:智能抽样构建评测集...")
target_count = 2000 # 目标题目数
selected_questions = selector.smart_sample_questions(filtered_questions, target_count)
print(f"最终选择: {len(selected_questions)} 道题目")
# 保存结果
print(f"\n保存筛选结果到: {OUTPUT_FILE}")
with open(OUTPUT_FILE, 'w', encoding='utf-8') as f:
json.dump(selected_questions, f, ensure_ascii=False, indent=2)
# 分析结果
print("\n分析筛选结果...")
analysis_results = selector.analyze_selection_results(all_questions, selected_questions)
selector.print_selection_report(analysis_results)
# 导出详细分析
try:
df_original = pd.DataFrame(all_questions)
df_selected = pd.DataFrame(selected_questions)
with pd.ExcelWriter(ANALYSIS_FILE, engine='openpyxl') as writer:
df_selected.to_excel(writer, sheet_name='筛选结果', index=False)
# 统计对比
comparison_data = []
for metric in ['question_type', 'knowledge_level', 'difficulty']:
orig_dist = df_original[metric].value_counts(normalize=True) * 100
sel_dist = df_selected[metric].value_counts(normalize=True) * 100
for category in set(orig_dist.index) | set(sel_dist.index):
comparison_data.append({
'维度': metric,
'类别': category,
'原始占比': orig_dist.get(category, 0),
'筛选后占比': sel_dist.get(category, 0),
'变化': sel_dist.get(category, 0) - orig_dist.get(category, 0)
})
pd.DataFrame(comparison_data).to_excel(writer, sheet_name='分布对比', index=False)
print(f"详细分析已保存到: {ANALYSIS_FILE}")
except ImportError:
print("提示: 安装pandas和openpyxl可生成详细分析报告")
print(f"\n✅ 筛选完成!")
print(f"🎯 最终评测集: {len(selected_questions)} 道高质量题目")
print(f"📈 平均难度提升: {analysis_results['difficulty_improvement']:.2f}")
print(f"💾 结果文件: {OUTPUT_FILE}")
if __name__ == "__main__":
main_filter_questions()