Files
mars_toolkit/services/oqmd_service.py
2025-01-06 14:54:41 +08:00

192 lines
6.7 KiB
Python

"""
Author: Yutang LI
Institution: SIAT-MIC
Contact: yt.li2@siat.ac.cn
"""
import datetime
import logging
import os
import httpx
import pandas as pd
from bs4 import BeautifulSoup
from PIL import Image
from playwright.async_api import async_playwright
from io import StringIO
from utils import settings, handle_minio_upload
logger = logging.getLogger(__name__)
async def fetch_oqmd_data(composition: str) -> str:
"""从OQMD获取数据"""
url = f"https://www.oqmd.org/materials/composition/{composition}"
try:
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.get(url)
response.raise_for_status()
# 验证响应内容
if not response.text or len(response.text) < 100:
raise ValueError("Invalid response content from OQMD API")
return response.text
except httpx.HTTPStatusError as e:
logger.error(f"OQMD API request failed: {str(e)}")
raise
except httpx.TimeoutException:
logger.error("OQMD API request timed out")
raise
except httpx.NetworkError as e:
logger.error(f"Network error occurred: {str(e)}")
raise
except ValueError as e:
logger.error(f"Invalid response content: {str(e)}")
raise
def parse_oqmd_html(html: str) -> tuple[list, str, list]:
"""解析OQMD HTML数据"""
soup = BeautifulSoup(html, 'html.parser')
# 解析基本数据
basic_data = []
basic_data.append(soup.find('h1').text.strip())
for script in soup.find_all('p'):
if script:
combined_text = ""
for element in script.contents:
if element.name == 'a':
url = "https://www.oqmd.org" + element['href']
combined_text += f"[{element.text.strip()}]({url}) "
else:
combined_text += element.text.strip() + " "
basic_data.append(combined_text.strip())
# 解析表格数据
table = soup.find('table')
if table:
df = pd.read_html(StringIO(str(table)))[0]
df = df.fillna('')
df = df.replace([float('inf'), float('-inf')], '')
table_data = df.to_markdown(index=False)
# 提取JavaScript数据
phase_data = []
for script in soup.find_all('script'):
if script.string and '$(function()' in script.string:
phase_data.append({
'type': script.get('type', 'text/javascript'),
'content': script.string.strip()
})
return basic_data, table_data, phase_data
async def render_and_save_charts(script_data: list) -> str:
"""渲染并保存图表到MinIO"""
browser = None
temp_files = []
try:
# 初始化Playwright
async with async_playwright() as p:
browser = await p.chromium.launch(headless=True)
page = await browser.new_page()
# 构建包含 JavaScript 的 HTML 代码
html_content = """
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<script src="https://cdnjs.cloudflare.com/ajax/libs/jquery/3.6.0/jquery.min.js"></script>
<script src="https://cdn.jsdelivr.net/npm/jquery.flot@0.8.3/jquery.flot.js"></script>
<title>Phase Diagram</title>
</head>
<body>
<div class="diagram">
<div id="placeholder" width="200" height="400" style="direction: ltr; position: absolute; left: 550px; top: 0px; width: 200px; height: 400px;"></div>
<script>
{placeholder_content}
</script>
<div id="phasediagram" width="500" height="400" style="direction: ltr; position: absolute; left: 0px; top: 0px; width: 500px; height: 400px;"></div>
<script>
{phasediagram_content}
</script>
</div>
</body>
</html>
"""
html_content = html_content.format(
placeholder_content=script_data[0]['content'],
phasediagram_content=script_data[1]['content'])
await page.set_content(html_content)
await page.wait_for_timeout(5000)
# 分别截图两个图表
placeholder = page.locator('#placeholder')
placeholder_box = await placeholder.bounding_box()
await page.screenshot(
path="placeholder.png",
clip={
'x': placeholder_box['x'],
'y': placeholder_box['y'],
'width': placeholder_box['width'] + 40,
'height': placeholder_box['height'] + 40
}
)
phasediagram = page.locator('#phasediagram')
phasediagram_box = await phasediagram.bounding_box()
await page.screenshot(
path="phasediagram.png",
clip={
'x': phasediagram_box['x'],
'y': phasediagram_box['y'],
'width': phasediagram_box['width'] + 40,
'height': phasediagram_box['height'] + 40
}
)
await browser.close()
# 拼接图片
try:
img1 = Image.open("placeholder.png")
temp_files.append("placeholder.png")
img2 = Image.open("phasediagram.png")
temp_files.append("phasediagram.png")
new_img = Image.new('RGB', (img1.width + img2.width, max(img1.height, img2.height)))
new_img.paste(img2, (0, 0))
new_img.paste(img1, (img2.width, 0))
timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
file_name = f"oqmd_phase_diagram_{timestamp}.png"
new_img.save(file_name)
temp_files.append(file_name)
except Exception as e:
logger.error(f"Failed to process images: {str(e)}")
raise RuntimeError(f"Image processing failed: {str(e)}") from e
# 上传到 MinIO
url = handle_minio_upload(file_name, file_name)
return url
except Exception as e:
logger.error(f"Failed to render and save charts: {str(e)}")
raise
finally:
# 清理临时文件
for temp_file in temp_files:
try:
if os.path.exists(temp_file):
os.remove(temp_file)
except Exception as e:
logger.warning(f"Failed to remove temporary file {temp_file}: {str(e)}")
# 确保浏览器关闭
if browser:
try:
await browser.close()
except Exception as e:
logger.warning(f"Failed to close browser: {str(e)}")