Files
sci-gui-agent-benchmark/resouce_collection/Source2Doc/source.py
2024-01-23 22:02:09 +08:00

294 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import csv
import os
import io
import fitz
import yt_dlp
from docx import Document
import requests
from bs4 import BeautifulSoup
from PIL import Image
import pytesseract
from io import BytesIO
from docx import Document
import re
import markdownify
from markdownify import markdownify as md
def download_pdf(url):
response = requests.get(url)
response.raise_for_status() # 确保请求是成功的
return io.BytesIO(response.content)
def pdf_to_markdown(pdf_stream, markdown_path):
document = fitz.open(stream=pdf_stream, filetype="pdf")
markdown_content = ""
for page_number in range(len(document)):
page = document[page_number]
text = page.get_text()
markdown_content += text + "\n\n"
# 提取图片并添加到 Markdown 文件。图片被保存在同一目录下
image_list = page.get_images(full=True)
if image_list:
markdown_content += f"### Page {page_number + 1} Images\n"
for img_index, image in enumerate(image_list, start=1):
# 提取图片
xref = image[0]
base64_image = document.extract_image(xref)
image_bytes = base64_image["image"]
# 写入图片到磁盘
image_filename = f"output_image_page_{page_number + 1}_{img_index}.png"
image_abs_path = os.path.join(os.path.dirname(markdown_path), image_filename)
with open(image_abs_path, "wb") as image_file:
image_file.write(image_bytes)
# 在 Markdown 文件中添加图片引用
markdown_content += f"![Page {page_number + 1} Image {img_index}]({image_filename})\n\n"
with open(markdown_path, "w", encoding="utf-8") as md_file:
md_file.write(markdown_content)
document.close()
def valid_xml_char_ordinal(c):
codepoint = ord(c)
# conditions ordered by presumed frequency
return (
0x20 <= codepoint <= 0xD7FF or
codepoint in (0x9, 0xA, 0xD) or
0xE000 <= codepoint <= 0xFFFD or
0x10000 <= codepoint <= 0x10FFFF
)
def download_and_clean_youtube_subtitles(video_url, txt_filepath):
# 设置yt-dlp库的选项来下载字幕
subtitles_path = txt_filepath[0:-4]
ydl_opts = {
'skip_download': True,
'writesubtitles': True,
'writeautomaticsub': True, # 如果视频没有字幕,尝试下载自动生成的字幕
'subtitleslangs': ['en'], # 下载英文字幕
'outtmpl': f'{subtitles_path}.%(ext)s', # 确保保存到可写目录
'quiet': True,
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
# 获取视频信息,下载字幕文件
ydl.download([video_url])
subtitle_file = f'{subtitles_path}.en.vtt'
# 读取下载的字幕文件
subtitles = []
try:
with open(subtitle_file, 'r', encoding='utf-8') as file:
# 读取所有行
lines = file.readlines()
# 正则表达式匹配时间戳和其他不相关的标记
pattern = re.compile(r'(\d+:\d\d:\d\d.\d+ --> \d+:\d\d:\d\d.\d+)|(\s*<[^>]+>)')
# 去除时间戳和HTML标签等只保留字幕文本
lines = [re.sub(pattern, '', line).strip() for line in lines if line.strip() and not pattern.match(line)]
# 清洗字幕
for line in lines:
# 如果这是一个时间线或者其他不需要的信息,跳过它
if pattern.match(line) or line.strip() == '':
continue
# 添加到字幕列表,同时去除愈加和前导空白符
subtitles.append(line.strip())
# 去除可能的重复行
subtitles = list(dict.fromkeys(subtitles))
# 保存至txt文件
with open(txt_filepath, 'w', encoding='utf-8') as f:
for line in subtitles:
if line: # 避免写入空行
f.write(line + '\n')
except IOError:
print(f"Could not read file: {subtitle_file}")
# 爬取论坛内容对图片进行OCR处理并保存为.docx文件
def scrape_and_ocr_forum(url, doc):
response = requests.get(url)
soup = BeautifulSoup(response.content, 'html.parser')
text_elements = soup.find_all(['h1', 'h2', 'h3', 'p', 'li'])
for element in text_elements:
doc.add_paragraph(element.get_text())
image_elements = soup.find_all('img')
for image in image_elements:
if 'src' not in image.attrs:
continue
image_url = image['src']
if image_url.startswith('http'):
if not image_url.endswith('.svg') and not image_url.endswith('.png'):
continue
if 'neveragain.allstatics.com/2019/assets/icon/logo' in image_url:
continue
img_response = requests.get(image_url, stream=True)
img = Image.open(BytesIO(img_response.content))
ocr_text = pytesseract.image_to_string(img)
if ocr_text != ' ' and ocr_text != '':
cleaned_string = ''.join(c for c in ocr_text if valid_xml_char_ordinal(c))
doc.add_paragraph(cleaned_string)
def superuser_to_markdown(url, doc_filepath):
response = requests.get(url)
soup = BeautifulSoup(response.content, 'html.parser')
# 创建Markdown文件的内容
markdown_content = ""
# 获取问题标题和内容
question_title = soup.find('h1').get_text(strip=True)
question = soup.find('div', {'id': 'question'})
if question:
question_body = question.find('div', {'class': 's-prose js-post-body'}).prettify()
markdown_content += f"# {question_title}\n\n" + markdownify.markdownify(question_body, heading_style="ATX") + "\n\n"
# 获取所有回答
answers = soup.find_all('div', {'class': 'answer'})
for answer in answers:
answer_body = answer.find('div', {'class': 's-prose js-post-body'}).prettify()
markdown_content += markdownify.markdownify(answer_body, heading_style="ATX") + "\n\n"
# 处理图片并执行OCR
all_img_tags = question.find_all('img') + [img for answer in answers for img in answer.find_all('img')]
for img_tag in all_img_tags:
image_src = img_tag.get('src') or img_tag.get('data-src') # Superuser使用延迟加载的图片
if image_src and image_src.startswith('http'):
img_response = requests.get(image_src, stream=True)
img = Image.open(BytesIO(img_response.content))
ocr_text = pytesseract.image_to_string(img)
if ocr_text.strip(): # 如果OCR结果非空则添加到Markdown内容中
markdown_content += "```\n" + ocr_text.strip() + "\n```\n\n"
# 将Markdown内容写入文件
with open(doc_filepath, 'w', encoding='utf-8') as f:
f.write(markdown_content)
def stack_overflow_to_markdown(url, doc_filepath):
response = requests.get(url)
soup = BeautifulSoup(response.content, 'html.parser')
# 创建Markdown文件的内容
markdown_content = ""
# 获取问题标题和内容
question = soup.find('div', {'id': 'question'})
question_title = soup.find('h1').get_text(strip=True)
if question:
question_body = question.find('div', {'class': 's-prose js-post-body'}).prettify()
markdown_content += f"# {question_title}\n\n" + markdownify.markdownify(question_body, heading_style="ATX") + "\n\n"
# 获取所有回答
answers = soup.find_all('div', {'class': 'answer'})
for answer in answers:
answer_body = answer.find('div', {'class': 's-prose js-post-body'}).prettify()
markdown_content += markdownify.markdownify(answer_body, heading_style="ATX") + "\n\n"
# 处理图片并执行OCR
all_img_tags = soup.find_all('img')
for img_tag in all_img_tags:
image_url = img_tag['src']
if image_url.startswith('http') and (image_url.endswith('.svg') or image_url.endswith('.png')): # 确保图片URL有效
img_response = requests.get(image_url, stream=True)
img = Image.open(BytesIO(img_response.content))
ocr_text = pytesseract.image_to_string(img)
if ocr_text.strip(): # 如果OCR结果非空则添加到Markdown内容中
markdown_content += "```\n" + ocr_text.strip() + "\n```\n\n"
# 将Markdown内容写入文件
with open(doc_filepath, 'w', encoding='utf-8') as f:
f.write(markdown_content)
def scrape_webpage_to_markdown(url, doc_filepath):
response = requests.get(url)
soup = BeautifulSoup(response.content, 'html.parser')
# 假设文章内容在 HTML 的 'article' 标签中,根据实际页面结构调整
articles = soup.find_all('article') or soup.find_all('main') or soup.find_all('div', {'id':'steps'}, {'class':'section_text'}) # 或其他包含主要内容的HTML标签
if not articles:
articles = soup.find_all('div', {'class': 'lia-message-body-content'})
markdown_content = ''
# 抓取所有图文信息
for article in articles:
for child in article.recursiveChildGenerator():
# 如果是图片则进行OCR
if child.name == 'img':
img_url = child.get('src')
if not img_url:
continue
if not img_url.startswith(('http:', 'https:')):
img_url = '{}{}'.format(url, img_url)
if not img_url.endswith('.svg') and not img_url.endswith('.png'):
continue
if 'neveragain.allstatics.com/2019/assets/icon/logo' in img_url:
continue
print(img_url)
try:
img_response = requests.get(img_url, stream=True)
img = Image.open(BytesIO(img_response.content))
ocr_text = pytesseract.image_to_string(img)
if ocr_text.strip():
markdown_content += '\n```plaintext\n{}\n```\n'.format(ocr_text.strip())
continue
except PIL.UnidentifiedImageError:
print("unidentified image")
# 不是标签可能是NavigableString或其他
if child.name is None:
continue
# 抓取标签并转换为Markdown
html_str = str(child)
markdown_content += md(html_str) + '\n\n'
# 写入markdown文件
with open(doc_filepath, 'w', encoding='utf-8') as f:
f.write(markdown_content)
# 处理单个URL
def process_url(url, doc_id, app):
doc_filepath = f"/content/drive/MyDrive/SourceDoc/{doc_id}_{app}.md"
txt_filepath = f"/content/drive/MyDrive/SourceDoc/{doc_id}_{app}.txt"
doc = Document()
if 'youtube.com' in url or 'youtu.be' in url:
download_and_clean_youtube_subtitles(url, txt_filepath)
elif url.endswith('.pdf'):
pdf_stream = download_pdf(url)
pdf_to_markdown(pdf_stream, doc_filepath)
elif 'superuser.com' in url or 'askubuntu.com' in url:
superuser_to_markdown(url, doc_filepath)
elif 'stackoverflow.com' in url:
stack_overflow_to_markdown(url, doc_filepath)
else:
scrape_webpage_to_markdown(url, doc_filepath)
# 读取CSV文件中的数据并执行对应操作
csv_filepath = '/content/Get_Source_Doc - Sheet1.csv' # 更新为你的CSV文件实际路径
with open(csv_filepath, 'r', newline='', encoding='utf-8') as csvfile:
reader = csv.DictReader(csvfile)
cnt = 176
for row in reader:
if cnt>0:
cnt -= 1
continue
process_url(row['Source'], row['id'], row['InvolvedApp'])
print(row)