- Added logging for file retrieval and error handling in file.py, improving robustness during file operations. - Implemented checks for file existence and parsing errors in general.py, enhancing reliability in JSON/YAML processing. - Improved table comparison logic in table.py with detailed error logging for sheet loading and cell value reading. - Enhanced metrics evaluation in slides.py with additional checks for paragraph and run counts, ensuring thorough comparison. - Updated utils.py to include file existence checks and detailed error logging during cell value reading.
155 lines
5.3 KiB
Python
155 lines
5.3 KiB
Python
import os
|
|
import logging
|
|
from typing import Dict, List, Set
|
|
from typing import Optional, Any, Union
|
|
from datetime import datetime
|
|
import requests
|
|
import pandas as pd
|
|
|
|
logger = logging.getLogger("desktopenv.getter.file")
|
|
|
|
|
|
def get_content_from_vm_file(env, config: Dict[str, Any]) -> Any:
|
|
"""
|
|
Config:
|
|
path (str): absolute path on the VM to fetch
|
|
"""
|
|
|
|
path = config["path"]
|
|
file_path = get_vm_file(env, {"path": path, "dest": os.path.basename(path)})
|
|
file_type, file_content = config['file_type'], config['file_content']
|
|
if file_type == 'xlsx':
|
|
if file_content == 'last_row':
|
|
df = pd.read_excel(file_path)
|
|
last_row = df.iloc[-1]
|
|
last_row_as_list = last_row.astype(str).tolist()
|
|
return last_row_as_list
|
|
else:
|
|
raise NotImplementedError(f"File type {file_type} not supported")
|
|
|
|
|
|
def get_cloud_file(env, config: Dict[str, Any]) -> Union[str, List[str]]:
|
|
"""
|
|
Config:
|
|
path (str|List[str]): the url to download from
|
|
dest (str|List[str])): file name of the downloaded file
|
|
multi (bool) : optional. if path and dest are lists providing
|
|
information of multiple files. defaults to False
|
|
gives (List[int]): optional. defaults to [0]. which files are directly
|
|
returned to the metric. if len==1, str is returned; else, list is
|
|
returned.
|
|
"""
|
|
|
|
if not config.get("multi", False):
|
|
paths: List[str] = [config["path"]]
|
|
dests: List[str] = [config["dest"]]
|
|
else:
|
|
paths: List[str] = config["path"]
|
|
dests: List[str] = config["dest"]
|
|
cache_paths: List[str] = []
|
|
|
|
gives: Set[int] = set(config.get("gives", [0]))
|
|
|
|
for i, (p, d) in enumerate(zip(paths, dests)):
|
|
_path = os.path.join(env.cache_dir, d)
|
|
if i in gives:
|
|
cache_paths.append(_path)
|
|
|
|
if os.path.exists(_path):
|
|
#return _path
|
|
continue
|
|
|
|
url = p
|
|
response = requests.get(url, stream=True)
|
|
response.raise_for_status()
|
|
|
|
with open(_path, 'wb') as f:
|
|
for chunk in response.iter_content(chunk_size=8192):
|
|
if chunk:
|
|
f.write(chunk)
|
|
|
|
return cache_paths[0] if len(cache_paths)==1 else cache_paths
|
|
|
|
|
|
def get_vm_file(env, config: Dict[str, Any]) -> Union[Optional[str], List[Optional[str]]]:
|
|
"""
|
|
Config:
|
|
path (str): absolute path on the VM to fetch
|
|
dest (str): file name of the downloaded file
|
|
multi (bool) : optional. if path and dest are lists providing
|
|
information of multiple files. defaults to False
|
|
gives (List[int]): optional. defaults to [0]. which files are directly
|
|
returned to the metric. if len==1, str is returned; else, list is
|
|
returned.
|
|
only support for single file now:
|
|
time_suffix(bool): optional. defaults to False. if True, append the current time in required format.
|
|
time_format(str): optional. defaults to "%Y%m%d_%H%M%S". format of the time suffix.
|
|
"""
|
|
time_format = "%Y%m%d_%H%M%S"
|
|
if not config.get("multi", False):
|
|
paths: List[str] = [config["path"]]
|
|
dests: List[str] = [config["dest"]]
|
|
if config.get("time_suffix", False):
|
|
time_format = config.get("time_format", time_format)
|
|
# Insert time before file extension.
|
|
dests = [f"{os.path.splitext(d)[0]}_{datetime.now().strftime(time_format)}{os.path.splitext(d)[1]}" for d in dests]
|
|
else:
|
|
paths: List[str] = config["path"]
|
|
dests: List[str] = config["dest"]
|
|
|
|
|
|
cache_paths: List[str] = []
|
|
|
|
gives: Set[int] = set(config.get("gives", [0]))
|
|
|
|
for i, (p, d) in enumerate(zip(paths, dests)):
|
|
_path = os.path.join(env.cache_dir, d)
|
|
|
|
try:
|
|
# Try to get file from VM
|
|
file = env.controller.get_file(p)
|
|
if file is None:
|
|
logger.warning(f"Failed to get file from VM: {p}")
|
|
if i in gives:
|
|
cache_paths.append(None)
|
|
continue
|
|
|
|
if i in gives:
|
|
cache_paths.append(_path)
|
|
|
|
# Write file with robust error handling
|
|
try:
|
|
# Ensure cache directory exists
|
|
os.makedirs(env.cache_dir, exist_ok=True)
|
|
|
|
with open(_path, "wb") as f:
|
|
f.write(file)
|
|
logger.info(f"Successfully saved file: {_path} ({len(file)} bytes)")
|
|
|
|
except IOError as e:
|
|
logger.error(f"IO error writing file {_path}: {e}")
|
|
if i in gives:
|
|
cache_paths[-1] = None # Replace the path we just added with None
|
|
except Exception as e:
|
|
logger.error(f"Unexpected error writing file {_path}: {e}")
|
|
if i in gives:
|
|
cache_paths[-1] = None
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing file {p}: {e}")
|
|
if i in gives:
|
|
cache_paths.append(None)
|
|
|
|
return cache_paths[0] if len(cache_paths)==1 else cache_paths
|
|
|
|
|
|
def get_cache_file(env, config: Dict[str, str]) -> str:
|
|
"""
|
|
Config:
|
|
path (str): relative path in cache dir
|
|
"""
|
|
|
|
_path = os.path.join(env.cache_dir, config["path"])
|
|
assert os.path.exists(_path)
|
|
return _path
|