update get source to docs; process youtube,stackoverflow,superuser,ubuntu and normal web respectively

This commit is contained in:
tsuky_chen
2024-01-22 05:07:12 +08:00
parent 613a2199ed
commit ec23be3ecb
2 changed files with 465 additions and 324 deletions

View File

@@ -1,6 +1,6 @@
import csv
import os
import yt_dlp as youtube_dl
import yt_dlp
from docx import Document
import requests
from bs4 import BeautifulSoup
@@ -9,81 +9,226 @@ import pytesseract
from io import BytesIO
from docx import Document
import re
import markdownify
from markdownify import markdownify as md
# convert .vtt file to .docx file
def vtt_to_docx(vtt_filepath, docx_filepath):
doc = Document()
# open .vtt file
with open(vtt_filepath, 'r', encoding='utf-8') as file:
lines = file.readlines()
# apply regex to each line to check if it is a timecode
vtt_text_pattern = re.compile(r'^\d{2}:\d{2}:\d{2}.\d{3} --> \d{2}:\d{2}:\d{2}.\d{3}')
# deal with each line
for line in lines:
# if it is a timecode, skip it
if vtt_text_pattern.match(line) or 'WEBVTT' in line:
continue
# else, add it to the document
if line.strip():
doc.add_paragraph(line.strip())
def valid_xml_char_ordinal(c):
codepoint = ord(c)
# conditions ordered by presumed frequency
return (
0x20 <= codepoint <= 0xD7FF or
codepoint in (0x9, 0xA, 0xD) or
0xE000 <= codepoint <= 0xFFFD or
0x10000 <= codepoint <= 0x10FFFF
)
doc.save(docx_filepath)
# download youtube subtitles and convert them to .docx file
def download_youtube_subtitles(video_url, doc_filename):
def download_and_clean_youtube_subtitles(video_url, txt_filepath):
# set up youtube-dl options to download the subtitles
subtitles_path = txt_filepath[0:-4]
ydl_opts = {
'skip_download': True,
'writeautomaticsub': True,
'subtitleslangs': ['en'],
'outtmpl': f'{doc_filename}.%(ext)s',
'writesubtitles': True,
'writeautomaticsub': True, # if no subtitles are available, try to generate them
'subtitleslangs': ['en'],
'outtmpl': f'{subtitles_path}.%(ext)s',
'quiet': True,
}
with youtube_dl.YoutubeDL(ydl_opts) as ydl:
ydl.download([video_url])
# call vtt_to_docx function to convert .vtt file to .docx file
vtt_to_docx(f'/content/{doc_filename}.en.vtt', f'/content/{doc_filename}.docx')
# scrape and OCR a forum
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
# download the subtitles
ydl.download([video_url])
subtitle_file = f'{subtitles_path}.en.vtt'
# read the subtitle file
subtitles = []
try:
with open(subtitle_file, 'r', encoding='utf-8') as file:
lines = file.readlines()
# define a pattern to match the time line
pattern = re.compile(r'(\d{2}:\d{2}:\d{2}.\d{3} --> \d{2}:\d{2}:\d{2}.\d{3})|(^WEBVTT)|(^Kind: captions)|(^Language: .*)')
# clean the subtitles
for line in lines:
# if this line is a time line or it is blank , skip it
if pattern.match(line) or line.strip() == '':
continue
# add this subtitle line to subtitles list, remove the trailing spaces and line change
subtitles.append(line.strip())
# remove duplicated subtitles
subtitles = list(dict.fromkeys(subtitles))
# save the subtitles as a txt file
with open(txt_filepath, 'w', encoding='utf-8') as f:
for line in subtitles:
if line:
f.write(line + '\n')
except IOError:
print(f"Could not read file: {subtitle_file}")
# scrape a webpage and perform OCR on images
def scrape_and_ocr_forum(url, doc):
response = requests.get(url)
soup = BeautifulSoup(response.content, 'html.parser')
text_elements = soup.find_all(['h1', 'h2', 'h3', 'p', 'li'])
for element in text_elements:
doc.add_paragraph(element.get_text())
image_elements = soup.find_all('img')
for image in image_elements:
if 'src' not in image.attrs:
continue
image_url = image['src']
if image_url.startswith('http'):
if not image_url.endswith('.svg') and not image_url.endswith('.png'):
continue
if 'neveragain.allstatics.com/2019/assets/icon/logo' in image_url:
continue
img_response = requests.get(image_url, stream=True)
img = Image.open(BytesIO(img_response.content))
ocr_text = pytesseract.image_to_string(img)
if not ocr_text:
doc.add_paragraph(ocr_text)
# process a url
def process_url(url, doc_id):
doc_filepath = f"{doc_id}.docx"
if ocr_text != ' ' and ocr_text != '':
cleaned_string = ''.join(c for c in ocr_text if valid_xml_char_ordinal(c))
doc.add_paragraph(cleaned_string)
def superuser_to_markdown(url, doc_filepath):
response = requests.get(url)
soup = BeautifulSoup(response.content, 'html.parser')
# set up the markdown document
markdown_content = ""
# get the question title and body
question_title = soup.find('h1').get_text(strip=True)
question = soup.find('div', {'id': 'question'})
if question:
question_body = question.find('div', {'class': 's-prose js-post-body'}).prettify()
markdown_content += f"# {question_title}\n\n" + markdownify.markdownify(question_body, heading_style="ATX") + "\n\n"
# get all answers
answers = soup.find_all('div', {'class': 'answer'})
for answer in answers:
answer_body = answer.find('div', {'class': 's-prose js-post-body'}).prettify()
markdown_content += markdownify.markdownify(answer_body, heading_style="ATX") + "\n\n"
# deal with images and perform OCR
all_img_tags = question.find_all('img') + [img for answer in answers for img in answer.find_all('img')]
for img_tag in all_img_tags:
image_src = img_tag.get('src') or img_tag.get('data-src') # Superuser uses lazy loading
if image_src and image_src.startswith('http'):
img_response = requests.get(image_src, stream=True)
img = Image.open(BytesIO(img_response.content))
ocr_text = pytesseract.image_to_string(img)
if ocr_text.strip(): # if the OCR result is not empty, add it to the markdown content
markdown_content += "```\n" + ocr_text.strip() + "\n```\n\n"
with open(doc_filepath, 'w', encoding='utf-8') as f:
f.write(markdown_content)
def stack_overflow_to_markdown(url, doc_filepath):
response = requests.get(url)
soup = BeautifulSoup(response.content, 'html.parser')
# set up the markdown document
markdown_content = ""
# get the question title and body
question = soup.find('div', {'id': 'question'})
question_title = soup.find('h1').get_text(strip=True)
if question:
question_body = question.find('div', {'class': 's-prose js-post-body'}).prettify()
markdown_content += f"# {question_title}\n\n" + markdownify.markdownify(question_body, heading_style="ATX") + "\n\n"
# get all answers
answers = soup.find_all('div', {'class': 'answer'})
for answer in answers:
answer_body = answer.find('div', {'class': 's-prose js-post-body'}).prettify()
markdown_content += markdownify.markdownify(answer_body, heading_style="ATX") + "\n\n"
# deal with images and perform OCR
all_img_tags = soup.find_all('img')
for img_tag in all_img_tags:
image_url = img_tag['src']
if image_url.startswith('http') and (image_url.endswith('.svg') or image_url.endswith('.png')): # 确保图片URL有效
img_response = requests.get(image_url, stream=True)
img = Image.open(BytesIO(img_response.content))
ocr_text = pytesseract.image_to_string(img)
if ocr_text.strip():
markdown_content += "```\n" + ocr_text.strip() + "\n```\n\n"
with open(doc_filepath, 'w', encoding='utf-8') as f:
f.write(markdown_content)
def scrape_webpage_to_markdown(url, doc_filepath):
response = requests.get(url)
soup = BeautifulSoup(response.content, 'html.parser')
articles = soup.find_all('article') or soup.find_all('main') or soup.find_all('div', {'class': 'lia-message-body-content'})
if not articles:
return
markdown_content = ''
# scrape the webpage and perform OCR on images
for article in articles:
for child in article.recursiveChildGenerator():
# if this is an image, perform OCR
if child.name == 'img':
img_url = child.get('src')
if not img_url.startswith(('http:', 'https:')):
img_url = '{}{}'.format(url, img_url)
if not img_url.endswith('.svg') and not img_url.endswith('.png'):
continue
if 'neveragain.allstatics.com/2019/assets/icon/logo' in img_url:
continue
img_response = requests.get(img_url, stream=True)
img = Image.open(BytesIO(img_response.content))
ocr_text = pytesseract.image_to_string(img)
if ocr_text.strip():
markdown_content += '\n```plaintext\n{}\n```\n'.format(ocr_text.strip())
continue
# Not an image, so continue recursively calling function
if child.name is None:
continue
html_str = str(child)
markdown_content += md(html_str) + '\n\n'
with open(doc_filepath, 'w', encoding='utf-8') as f:
f.write(markdown_content)
# process a URL and save the file
def process_url(url, doc_id, app):
doc_filepath = f"/content/drive/MyDrive/SourceDoc/{doc_id}_{app}.md"
txt_filepath = f"/content/drive/MyDrive/SourceDoc/{doc_id}_{app}.txt"
doc = Document()
if 'youtube.com' in url or 'youtu.be' in url:
download_youtube_subtitles(url, doc_id)
else:
scrape_and_ocr_forum(url, doc)
doc.save(doc_filepath)
# read csv file and process each row
csv_filepath = './Get_Source_Doc - Sheet1.csv'
if 'youtube.com' in url or 'youtu.be' in url:
download_and_clean_youtube_subtitles(url, txt_filepath)
elif 'superuser.com' in url:
superuser_to_markdown(url, doc_filepath)
elif 'stackoverflow.com' in url:
stack_overflow_to_markdown(url, doc_filepath)
else:
scrape_webpage_to_markdown(url, doc_filepath)
# read the CSV file and process each URL
csv_filepath = './Get_Source_Doc - Sheet1.csv'
with open(csv_filepath, 'r', newline='', encoding='utf-8') as csvfile:
reader = csv.DictReader(csvfile)
cnt = 55
for row in reader:
process_url(row['Source'], row['id'])
if cnt>0:
cnt -= 1
continue
process_url(row['Source'], row['id'], row['InvolvedApp'])
print(row)