""" Author: Yutang LI Institution: SIAT-MIC Contact: yt.li2@siat.ac.cn """ import datetime import logging import os import httpx import pandas as pd from bs4 import BeautifulSoup from PIL import Image from playwright.async_api import async_playwright from io import StringIO from utils import settings, handle_minio_upload logger = logging.getLogger(__name__) async def fetch_oqmd_data(composition: str) -> str: """从OQMD获取数据""" url = f"https://www.oqmd.org/materials/composition/{composition}" try: async with httpx.AsyncClient(timeout=30.0) as client: response = await client.get(url) response.raise_for_status() # 验证响应内容 if not response.text or len(response.text) < 100: raise ValueError("Invalid response content from OQMD API") return response.text except httpx.HTTPStatusError as e: logger.error(f"OQMD API request failed: {str(e)}") raise except httpx.TimeoutException: logger.error("OQMD API request timed out") raise except httpx.NetworkError as e: logger.error(f"Network error occurred: {str(e)}") raise except ValueError as e: logger.error(f"Invalid response content: {str(e)}") raise def parse_oqmd_html(html: str) -> tuple[list, str, list]: """解析OQMD HTML数据""" soup = BeautifulSoup(html, 'html.parser') # 解析基本数据 basic_data = [] basic_data.append(soup.find('h1').text.strip()) for script in soup.find_all('p'): if script: combined_text = "" for element in script.contents: if element.name == 'a': url = "https://www.oqmd.org" + element['href'] combined_text += f"[{element.text.strip()}]({url}) " else: combined_text += element.text.strip() + " " basic_data.append(combined_text.strip()) # 解析表格数据 table = soup.find('table') if table: df = pd.read_html(StringIO(str(table)))[0] df = df.fillna('') df = df.replace([float('inf'), float('-inf')], '') table_data = df.to_markdown(index=False) # 提取JavaScript数据 phase_data = [] for script in soup.find_all('script'): if script.string and '$(function()' in script.string: phase_data.append({ 'type': script.get('type', 'text/javascript'), 'content': script.string.strip() }) return basic_data, table_data, phase_data async def render_and_save_charts(script_data: list) -> str: """渲染并保存图表到MinIO""" browser = None temp_files = [] try: # 初始化Playwright async with async_playwright() as p: browser = await p.chromium.launch(headless=True) page = await browser.new_page() # 构建包含 JavaScript 的 HTML 代码 html_content = """ Phase Diagram
""" html_content = html_content.format( placeholder_content=script_data[0]['content'], phasediagram_content=script_data[1]['content']) await page.set_content(html_content) await page.wait_for_timeout(5000) # 分别截图两个图表 placeholder = page.locator('#placeholder') placeholder_box = await placeholder.bounding_box() await page.screenshot( path="placeholder.png", clip={ 'x': placeholder_box['x'], 'y': placeholder_box['y'], 'width': placeholder_box['width'] + 40, 'height': placeholder_box['height'] + 40 } ) phasediagram = page.locator('#phasediagram') phasediagram_box = await phasediagram.bounding_box() await page.screenshot( path="phasediagram.png", clip={ 'x': phasediagram_box['x'], 'y': phasediagram_box['y'], 'width': phasediagram_box['width'] + 40, 'height': phasediagram_box['height'] + 40 } ) await browser.close() # 拼接图片 try: img1 = Image.open("placeholder.png") temp_files.append("placeholder.png") img2 = Image.open("phasediagram.png") temp_files.append("phasediagram.png") new_img = Image.new('RGB', (img1.width + img2.width, max(img1.height, img2.height))) new_img.paste(img2, (0, 0)) new_img.paste(img1, (img2.width, 0)) timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S") file_name = f"oqmd_phase_diagram_{timestamp}.png" new_img.save(file_name) temp_files.append(file_name) except Exception as e: logger.error(f"Failed to process images: {str(e)}") raise RuntimeError(f"Image processing failed: {str(e)}") from e # 上传到 MinIO url = handle_minio_upload(file_name, file_name) return url except Exception as e: logger.error(f"Failed to render and save charts: {str(e)}") raise finally: # 清理临时文件 for temp_file in temp_files: try: if os.path.exists(temp_file): os.remove(temp_file) except Exception as e: logger.warning(f"Failed to remove temporary file {temp_file}: {str(e)}") # 确保浏览器关闭 if browser: try: await browser.close() except Exception as e: logger.warning(f"Failed to close browser: {str(e)}")