238 lines
9.2 KiB
Python
238 lines
9.2 KiB
Python
import csv
|
|
import os
|
|
import yt_dlp
|
|
from docx import Document
|
|
import requests
|
|
from bs4 import BeautifulSoup
|
|
from PIL import Image
|
|
import pytesseract
|
|
from io import BytesIO
|
|
from docx import Document
|
|
import re
|
|
import markdownify
|
|
from markdownify import markdownify as md
|
|
|
|
def valid_xml_char_ordinal(c):
|
|
codepoint = ord(c)
|
|
# conditions ordered by presumed frequency
|
|
return (
|
|
0x20 <= codepoint <= 0xD7FF or
|
|
codepoint in (0x9, 0xA, 0xD) or
|
|
0xE000 <= codepoint <= 0xFFFD or
|
|
0x10000 <= codepoint <= 0x10FFFF
|
|
)
|
|
|
|
def download_and_clean_youtube_subtitles(video_url, txt_filepath):
|
|
# set up youtube-dl options to download the subtitles
|
|
subtitles_path = txt_filepath[0:-4]
|
|
ydl_opts = {
|
|
'skip_download': True,
|
|
'writesubtitles': True,
|
|
'writeautomaticsub': True, # if no subtitles are available, try to generate them
|
|
'subtitleslangs': ['en'],
|
|
'outtmpl': f'{subtitles_path}.%(ext)s',
|
|
'quiet': True,
|
|
}
|
|
|
|
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
|
|
# download the subtitles
|
|
ydl.download([video_url])
|
|
subtitle_file = f'{subtitles_path}.en.vtt'
|
|
|
|
# read the subtitle file
|
|
subtitles = []
|
|
try:
|
|
with open(subtitle_file, 'r', encoding='utf-8') as file:
|
|
lines = file.readlines()
|
|
|
|
# define a pattern to match the time line
|
|
pattern = re.compile(r'(\d{2}:\d{2}:\d{2}.\d{3} --> \d{2}:\d{2}:\d{2}.\d{3})|(^WEBVTT)|(^Kind: captions)|(^Language: .*)')
|
|
|
|
# clean the subtitles
|
|
for line in lines:
|
|
# if this line is a time line or it is blank , skip it
|
|
if pattern.match(line) or line.strip() == '':
|
|
continue
|
|
# add this subtitle line to subtitles list, remove the trailing spaces and line change
|
|
subtitles.append(line.strip())
|
|
|
|
# remove duplicated subtitles
|
|
subtitles = list(dict.fromkeys(subtitles))
|
|
|
|
# save the subtitles as a txt file
|
|
with open(txt_filepath, 'w', encoding='utf-8') as f:
|
|
for line in subtitles:
|
|
if line:
|
|
f.write(line + '\n')
|
|
|
|
except IOError:
|
|
print(f"Could not read file: {subtitle_file}")
|
|
|
|
# scrape a webpage and perform OCR on images
|
|
def scrape_and_ocr_forum(url, doc):
|
|
response = requests.get(url)
|
|
soup = BeautifulSoup(response.content, 'html.parser')
|
|
|
|
text_elements = soup.find_all(['h1', 'h2', 'h3', 'p', 'li'])
|
|
for element in text_elements:
|
|
doc.add_paragraph(element.get_text())
|
|
|
|
image_elements = soup.find_all('img')
|
|
for image in image_elements:
|
|
if 'src' not in image.attrs:
|
|
continue
|
|
image_url = image['src']
|
|
if image_url.startswith('http'):
|
|
if not image_url.endswith('.svg') and not image_url.endswith('.png'):
|
|
continue
|
|
if 'neveragain.allstatics.com/2019/assets/icon/logo' in image_url:
|
|
continue
|
|
img_response = requests.get(image_url, stream=True)
|
|
img = Image.open(BytesIO(img_response.content))
|
|
ocr_text = pytesseract.image_to_string(img)
|
|
|
|
if ocr_text != ' ' and ocr_text != '':
|
|
cleaned_string = ''.join(c for c in ocr_text if valid_xml_char_ordinal(c))
|
|
doc.add_paragraph(cleaned_string)
|
|
|
|
def superuser_to_markdown(url, doc_filepath):
|
|
response = requests.get(url)
|
|
soup = BeautifulSoup(response.content, 'html.parser')
|
|
|
|
# set up the markdown document
|
|
markdown_content = ""
|
|
|
|
# get the question title and body
|
|
question_title = soup.find('h1').get_text(strip=True)
|
|
question = soup.find('div', {'id': 'question'})
|
|
if question:
|
|
question_body = question.find('div', {'class': 's-prose js-post-body'}).prettify()
|
|
markdown_content += f"# {question_title}\n\n" + markdownify.markdownify(question_body, heading_style="ATX") + "\n\n"
|
|
|
|
# get all answers
|
|
answers = soup.find_all('div', {'class': 'answer'})
|
|
for answer in answers:
|
|
answer_body = answer.find('div', {'class': 's-prose js-post-body'}).prettify()
|
|
markdown_content += markdownify.markdownify(answer_body, heading_style="ATX") + "\n\n"
|
|
|
|
# deal with images and perform OCR
|
|
all_img_tags = question.find_all('img') + [img for answer in answers for img in answer.find_all('img')]
|
|
for img_tag in all_img_tags:
|
|
image_src = img_tag.get('src') or img_tag.get('data-src') # Superuser uses lazy loading
|
|
if image_src and image_src.startswith('http'):
|
|
img_response = requests.get(image_src, stream=True)
|
|
img = Image.open(BytesIO(img_response.content))
|
|
ocr_text = pytesseract.image_to_string(img)
|
|
if ocr_text.strip(): # if the OCR result is not empty, add it to the markdown content
|
|
markdown_content += "```\n" + ocr_text.strip() + "\n```\n\n"
|
|
|
|
with open(doc_filepath, 'w', encoding='utf-8') as f:
|
|
f.write(markdown_content)
|
|
|
|
|
|
def stack_overflow_to_markdown(url, doc_filepath):
|
|
response = requests.get(url)
|
|
soup = BeautifulSoup(response.content, 'html.parser')
|
|
|
|
# set up the markdown document
|
|
markdown_content = ""
|
|
|
|
# get the question title and body
|
|
question = soup.find('div', {'id': 'question'})
|
|
|
|
question_title = soup.find('h1').get_text(strip=True)
|
|
if question:
|
|
|
|
question_body = question.find('div', {'class': 's-prose js-post-body'}).prettify()
|
|
markdown_content += f"# {question_title}\n\n" + markdownify.markdownify(question_body, heading_style="ATX") + "\n\n"
|
|
|
|
# get all answers
|
|
answers = soup.find_all('div', {'class': 'answer'})
|
|
for answer in answers:
|
|
answer_body = answer.find('div', {'class': 's-prose js-post-body'}).prettify()
|
|
markdown_content += markdownify.markdownify(answer_body, heading_style="ATX") + "\n\n"
|
|
|
|
# deal with images and perform OCR
|
|
all_img_tags = soup.find_all('img')
|
|
for img_tag in all_img_tags:
|
|
image_url = img_tag['src']
|
|
if image_url.startswith('http') and (image_url.endswith('.svg') or image_url.endswith('.png')): # 确保图片URL有效
|
|
img_response = requests.get(image_url, stream=True)
|
|
img = Image.open(BytesIO(img_response.content))
|
|
ocr_text = pytesseract.image_to_string(img)
|
|
if ocr_text.strip():
|
|
markdown_content += "```\n" + ocr_text.strip() + "\n```\n\n"
|
|
|
|
with open(doc_filepath, 'w', encoding='utf-8') as f:
|
|
f.write(markdown_content)
|
|
|
|
def scrape_webpage_to_markdown(url, doc_filepath):
|
|
response = requests.get(url)
|
|
soup = BeautifulSoup(response.content, 'html.parser')
|
|
|
|
articles = soup.find_all('article') or soup.find_all('main') or soup.find_all('div', {'class': 'lia-message-body-content'})
|
|
|
|
if not articles:
|
|
return
|
|
|
|
markdown_content = ''
|
|
|
|
# scrape the webpage and perform OCR on images
|
|
for article in articles:
|
|
for child in article.recursiveChildGenerator():
|
|
# if this is an image, perform OCR
|
|
if child.name == 'img':
|
|
img_url = child.get('src')
|
|
if not img_url.startswith(('http:', 'https:')):
|
|
img_url = '{}{}'.format(url, img_url)
|
|
if not img_url.endswith('.svg') and not img_url.endswith('.png'):
|
|
continue
|
|
if 'neveragain.allstatics.com/2019/assets/icon/logo' in img_url:
|
|
continue
|
|
try:
|
|
img_response = requests.get(img_url, stream=True)
|
|
img = Image.open(BytesIO(img_response.content))
|
|
ocr_text = pytesseract.image_to_string(img)
|
|
if ocr_text.strip():
|
|
markdown_content += '\n```plaintext\n{}\n```\n'.format(ocr_text.strip())
|
|
continue
|
|
except PIL.UnidentifiedImageError:
|
|
print("unidentified image")
|
|
|
|
# Not an image, so continue recursively calling function
|
|
if child.name is None:
|
|
continue
|
|
|
|
html_str = str(child)
|
|
markdown_content += md(html_str) + '\n\n'
|
|
|
|
with open(doc_filepath, 'w', encoding='utf-8') as f:
|
|
f.write(markdown_content)
|
|
|
|
|
|
# process a URL and save the file
|
|
def process_url(url, doc_id, app):
|
|
doc_filepath = f"/content/drive/MyDrive/SourceDoc/{doc_id}_{app}.md"
|
|
txt_filepath = f"/content/drive/MyDrive/SourceDoc/{doc_id}_{app}.txt"
|
|
doc = Document()
|
|
|
|
if 'youtube.com' in url or 'youtu.be' in url:
|
|
download_and_clean_youtube_subtitles(url, txt_filepath)
|
|
elif 'superuser.com' in url:
|
|
superuser_to_markdown(url, doc_filepath)
|
|
elif 'stackoverflow.com' in url:
|
|
stack_overflow_to_markdown(url, doc_filepath)
|
|
else:
|
|
scrape_webpage_to_markdown(url, doc_filepath)
|
|
|
|
# read the CSV file and process each URL
|
|
csv_filepath = './Get_Source_Doc - Sheet1.csv'
|
|
with open(csv_filepath, 'r', newline='', encoding='utf-8') as csvfile:
|
|
reader = csv.DictReader(csvfile)
|
|
cnt = 55
|
|
for row in reader:
|
|
if cnt>0:
|
|
cnt -= 1
|
|
continue
|
|
process_url(row['Source'], row['id'], row['InvolvedApp'])
|
|
print(row) |