294 lines
11 KiB
Python
294 lines
11 KiB
Python
import csv
|
||
import os
|
||
import io
|
||
import fitz
|
||
import yt_dlp
|
||
from docx import Document
|
||
import requests
|
||
from bs4 import BeautifulSoup
|
||
from PIL import Image
|
||
import pytesseract
|
||
from io import BytesIO
|
||
from docx import Document
|
||
import re
|
||
import markdownify
|
||
from markdownify import markdownify as md
|
||
|
||
def download_pdf(url):
|
||
response = requests.get(url)
|
||
response.raise_for_status() # 确保请求是成功的
|
||
return io.BytesIO(response.content)
|
||
|
||
def pdf_to_markdown(pdf_stream, markdown_path):
|
||
document = fitz.open(stream=pdf_stream, filetype="pdf")
|
||
markdown_content = ""
|
||
|
||
for page_number in range(len(document)):
|
||
page = document[page_number]
|
||
text = page.get_text()
|
||
markdown_content += text + "\n\n"
|
||
|
||
# 提取图片并添加到 Markdown 文件。图片被保存在同一目录下
|
||
image_list = page.get_images(full=True)
|
||
if image_list:
|
||
markdown_content += f"### Page {page_number + 1} Images\n"
|
||
|
||
for img_index, image in enumerate(image_list, start=1):
|
||
# 提取图片
|
||
xref = image[0]
|
||
base64_image = document.extract_image(xref)
|
||
image_bytes = base64_image["image"]
|
||
|
||
# 写入图片到磁盘
|
||
image_filename = f"output_image_page_{page_number + 1}_{img_index}.png"
|
||
image_abs_path = os.path.join(os.path.dirname(markdown_path), image_filename)
|
||
with open(image_abs_path, "wb") as image_file:
|
||
image_file.write(image_bytes)
|
||
|
||
# 在 Markdown 文件中添加图片引用
|
||
markdown_content += f"\n\n"
|
||
|
||
with open(markdown_path, "w", encoding="utf-8") as md_file:
|
||
md_file.write(markdown_content)
|
||
|
||
document.close()
|
||
|
||
def valid_xml_char_ordinal(c):
|
||
codepoint = ord(c)
|
||
# conditions ordered by presumed frequency
|
||
return (
|
||
0x20 <= codepoint <= 0xD7FF or
|
||
codepoint in (0x9, 0xA, 0xD) or
|
||
0xE000 <= codepoint <= 0xFFFD or
|
||
0x10000 <= codepoint <= 0x10FFFF
|
||
)
|
||
|
||
def download_and_clean_youtube_subtitles(video_url, txt_filepath):
|
||
# 设置yt-dlp库的选项来下载字幕
|
||
subtitles_path = txt_filepath[0:-4]
|
||
ydl_opts = {
|
||
'skip_download': True,
|
||
'writesubtitles': True,
|
||
'writeautomaticsub': True, # 如果视频没有字幕,尝试下载自动生成的字幕
|
||
'subtitleslangs': ['en'], # 下载英文字幕
|
||
'outtmpl': f'{subtitles_path}.%(ext)s', # 确保保存到可写目录
|
||
'quiet': True,
|
||
}
|
||
|
||
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
|
||
# 获取视频信息,下载字幕文件
|
||
ydl.download([video_url])
|
||
subtitle_file = f'{subtitles_path}.en.vtt'
|
||
|
||
# 读取下载的字幕文件
|
||
subtitles = []
|
||
try:
|
||
with open(subtitle_file, 'r', encoding='utf-8') as file:
|
||
# 读取所有行
|
||
lines = file.readlines()
|
||
|
||
# 正则表达式匹配时间戳和其他不相关的标记
|
||
pattern = re.compile(r'(\d+:\d\d:\d\d.\d+ --> \d+:\d\d:\d\d.\d+)|(\s*<[^>]+>)')
|
||
|
||
# 去除时间戳和HTML标签等,只保留字幕文本
|
||
lines = [re.sub(pattern, '', line).strip() for line in lines if line.strip() and not pattern.match(line)]
|
||
|
||
# 清洗字幕
|
||
for line in lines:
|
||
# 如果这是一个时间线或者其他不需要的信息,跳过它
|
||
if pattern.match(line) or line.strip() == '':
|
||
continue
|
||
# 添加到字幕列表,同时去除愈加和前导空白符
|
||
subtitles.append(line.strip())
|
||
|
||
# 去除可能的重复行
|
||
subtitles = list(dict.fromkeys(subtitles))
|
||
|
||
# 保存至txt文件
|
||
with open(txt_filepath, 'w', encoding='utf-8') as f:
|
||
for line in subtitles:
|
||
if line: # 避免写入空行
|
||
f.write(line + '\n')
|
||
|
||
except IOError:
|
||
print(f"Could not read file: {subtitle_file}")
|
||
|
||
# 爬取论坛内容,对图片进行OCR处理,并保存为.docx文件
|
||
def scrape_and_ocr_forum(url, doc):
|
||
response = requests.get(url)
|
||
soup = BeautifulSoup(response.content, 'html.parser')
|
||
|
||
text_elements = soup.find_all(['h1', 'h2', 'h3', 'p', 'li'])
|
||
for element in text_elements:
|
||
doc.add_paragraph(element.get_text())
|
||
|
||
image_elements = soup.find_all('img')
|
||
for image in image_elements:
|
||
if 'src' not in image.attrs:
|
||
continue
|
||
image_url = image['src']
|
||
if image_url.startswith('http'):
|
||
if not image_url.endswith('.svg') and not image_url.endswith('.png'):
|
||
continue
|
||
if 'neveragain.allstatics.com/2019/assets/icon/logo' in image_url:
|
||
continue
|
||
img_response = requests.get(image_url, stream=True)
|
||
img = Image.open(BytesIO(img_response.content))
|
||
ocr_text = pytesseract.image_to_string(img)
|
||
|
||
if ocr_text != ' ' and ocr_text != '':
|
||
cleaned_string = ''.join(c for c in ocr_text if valid_xml_char_ordinal(c))
|
||
doc.add_paragraph(cleaned_string)
|
||
|
||
def superuser_to_markdown(url, doc_filepath):
|
||
response = requests.get(url)
|
||
soup = BeautifulSoup(response.content, 'html.parser')
|
||
|
||
# 创建Markdown文件的内容
|
||
markdown_content = ""
|
||
|
||
# 获取问题标题和内容
|
||
question_title = soup.find('h1').get_text(strip=True)
|
||
question = soup.find('div', {'id': 'question'})
|
||
if question:
|
||
|
||
question_body = question.find('div', {'class': 's-prose js-post-body'}).prettify()
|
||
markdown_content += f"# {question_title}\n\n" + markdownify.markdownify(question_body, heading_style="ATX") + "\n\n"
|
||
|
||
# 获取所有回答
|
||
answers = soup.find_all('div', {'class': 'answer'})
|
||
for answer in answers:
|
||
answer_body = answer.find('div', {'class': 's-prose js-post-body'}).prettify()
|
||
markdown_content += markdownify.markdownify(answer_body, heading_style="ATX") + "\n\n"
|
||
|
||
# 处理图片并执行OCR
|
||
all_img_tags = question.find_all('img') + [img for answer in answers for img in answer.find_all('img')]
|
||
for img_tag in all_img_tags:
|
||
image_src = img_tag.get('src') or img_tag.get('data-src') # Superuser使用延迟加载的图片
|
||
if image_src and image_src.startswith('http'):
|
||
img_response = requests.get(image_src, stream=True)
|
||
img = Image.open(BytesIO(img_response.content))
|
||
ocr_text = pytesseract.image_to_string(img)
|
||
if ocr_text.strip(): # 如果OCR结果非空,则添加到Markdown内容中
|
||
markdown_content += "```\n" + ocr_text.strip() + "\n```\n\n"
|
||
|
||
# 将Markdown内容写入文件
|
||
with open(doc_filepath, 'w', encoding='utf-8') as f:
|
||
f.write(markdown_content)
|
||
|
||
|
||
def stack_overflow_to_markdown(url, doc_filepath):
|
||
response = requests.get(url)
|
||
soup = BeautifulSoup(response.content, 'html.parser')
|
||
|
||
# 创建Markdown文件的内容
|
||
markdown_content = ""
|
||
|
||
# 获取问题标题和内容
|
||
question = soup.find('div', {'id': 'question'})
|
||
|
||
question_title = soup.find('h1').get_text(strip=True)
|
||
if question:
|
||
|
||
question_body = question.find('div', {'class': 's-prose js-post-body'}).prettify()
|
||
markdown_content += f"# {question_title}\n\n" + markdownify.markdownify(question_body, heading_style="ATX") + "\n\n"
|
||
|
||
# 获取所有回答
|
||
answers = soup.find_all('div', {'class': 'answer'})
|
||
for answer in answers:
|
||
answer_body = answer.find('div', {'class': 's-prose js-post-body'}).prettify()
|
||
markdown_content += markdownify.markdownify(answer_body, heading_style="ATX") + "\n\n"
|
||
|
||
# 处理图片并执行OCR
|
||
all_img_tags = soup.find_all('img')
|
||
for img_tag in all_img_tags:
|
||
image_url = img_tag['src']
|
||
if image_url.startswith('http') and (image_url.endswith('.svg') or image_url.endswith('.png')): # 确保图片URL有效
|
||
img_response = requests.get(image_url, stream=True)
|
||
img = Image.open(BytesIO(img_response.content))
|
||
ocr_text = pytesseract.image_to_string(img)
|
||
if ocr_text.strip(): # 如果OCR结果非空,则添加到Markdown内容中
|
||
markdown_content += "```\n" + ocr_text.strip() + "\n```\n\n"
|
||
|
||
# 将Markdown内容写入文件
|
||
with open(doc_filepath, 'w', encoding='utf-8') as f:
|
||
f.write(markdown_content)
|
||
|
||
def scrape_webpage_to_markdown(url, doc_filepath):
|
||
response = requests.get(url)
|
||
soup = BeautifulSoup(response.content, 'html.parser')
|
||
|
||
# 假设文章内容在 HTML 的 'article' 标签中,根据实际页面结构调整
|
||
articles = soup.find_all('article') or soup.find_all('main') or soup.find_all('div', {'id':'steps'}, {'class':'section_text'}) # 或其他包含主要内容的HTML标签
|
||
|
||
if not articles:
|
||
articles = soup.find_all('div', {'class': 'lia-message-body-content'})
|
||
|
||
markdown_content = ''
|
||
|
||
# 抓取所有图文信息
|
||
for article in articles:
|
||
for child in article.recursiveChildGenerator():
|
||
# 如果是图片,则进行OCR
|
||
if child.name == 'img':
|
||
img_url = child.get('src')
|
||
if not img_url:
|
||
continue
|
||
if not img_url.startswith(('http:', 'https:')):
|
||
img_url = '{}{}'.format(url, img_url)
|
||
if not img_url.endswith('.svg') and not img_url.endswith('.png'):
|
||
continue
|
||
if 'neveragain.allstatics.com/2019/assets/icon/logo' in img_url:
|
||
continue
|
||
print(img_url)
|
||
try:
|
||
img_response = requests.get(img_url, stream=True)
|
||
img = Image.open(BytesIO(img_response.content))
|
||
ocr_text = pytesseract.image_to_string(img)
|
||
if ocr_text.strip():
|
||
markdown_content += '\n```plaintext\n{}\n```\n'.format(ocr_text.strip())
|
||
continue
|
||
except PIL.UnidentifiedImageError:
|
||
print("unidentified image")
|
||
# 不是标签,可能是NavigableString或其他
|
||
if child.name is None:
|
||
continue
|
||
# 抓取标签并转换为Markdown
|
||
html_str = str(child)
|
||
markdown_content += md(html_str) + '\n\n'
|
||
|
||
# 写入markdown文件
|
||
with open(doc_filepath, 'w', encoding='utf-8') as f:
|
||
f.write(markdown_content)
|
||
|
||
|
||
# 处理单个URL
|
||
def process_url(url, doc_id, app):
|
||
doc_filepath = f"/content/drive/MyDrive/SourceDoc/{doc_id}_{app}.md"
|
||
txt_filepath = f"/content/drive/MyDrive/SourceDoc/{doc_id}_{app}.txt"
|
||
doc = Document()
|
||
|
||
if 'youtube.com' in url or 'youtu.be' in url:
|
||
download_and_clean_youtube_subtitles(url, txt_filepath)
|
||
elif url.endswith('.pdf'):
|
||
pdf_stream = download_pdf(url)
|
||
pdf_to_markdown(pdf_stream, doc_filepath)
|
||
elif 'superuser.com' in url or 'askubuntu.com' in url:
|
||
superuser_to_markdown(url, doc_filepath)
|
||
elif 'stackoverflow.com' in url:
|
||
stack_overflow_to_markdown(url, doc_filepath)
|
||
else:
|
||
scrape_webpage_to_markdown(url, doc_filepath)
|
||
|
||
# 读取CSV文件中的数据并执行对应操作
|
||
csv_filepath = '/content/Get_Source_Doc - Sheet1.csv' # 更新为你的CSV文件实际路径
|
||
with open(csv_filepath, 'r', newline='', encoding='utf-8') as csvfile:
|
||
reader = csv.DictReader(csvfile)
|
||
cnt = 176
|
||
for row in reader:
|
||
if cnt>0:
|
||
cnt -= 1
|
||
continue
|
||
process_url(row['Source'], row['id'], row['InvolvedApp'])
|
||
print(row)
|