101 lines
3.1 KiB
Python
101 lines
3.1 KiB
Python
import os
|
|
from typing import Dict, List, Set
|
|
from typing import Optional, Any, Union
|
|
|
|
import requests
|
|
|
|
|
|
def get_cloud_file(env, config: Dict[str, Any]) -> Union[str, List[str]]:
|
|
"""
|
|
Config:
|
|
path (str|List[str]): the url to download from
|
|
dest (str|List[str])): file name of the downloaded file
|
|
multi (bool) : optional. if path and dest are lists providing
|
|
information of multiple files. defaults to False
|
|
gives (List[int]): optional. defaults to [0]. which files are directly
|
|
returned to the metric. if len==1, str is returned; else, list is
|
|
returned.
|
|
"""
|
|
|
|
if not config.get("multi", False):
|
|
paths: List[str] = [config["path"]]
|
|
dests: List[str] = [config["dest"]]
|
|
else:
|
|
paths: List[str] = config["path"]
|
|
dests: List[str] = config["dest"]
|
|
cache_paths: List[str] = []
|
|
|
|
gives: Set[int] = set(config.get("gives", [0]))
|
|
|
|
for i, (p, d) in enumerate(zip(paths, dests)):
|
|
_path = os.path.join(env.cache_dir, d)
|
|
if i in gives:
|
|
cache_paths.append(_path)
|
|
|
|
if os.path.exists(_path):
|
|
#return _path
|
|
continue
|
|
|
|
url = p
|
|
response = requests.get(url, stream=True)
|
|
response.raise_for_status()
|
|
|
|
with open(_path, 'wb') as f:
|
|
for chunk in response.iter_content(chunk_size=8192):
|
|
if chunk:
|
|
f.write(chunk)
|
|
|
|
return cache_paths[0] if len(cache_paths)==1 else cache_paths
|
|
|
|
|
|
def get_vm_file(env, config: Dict[str, Any]) -> Union[Optional[str], List[Optional[str]]]:
|
|
"""
|
|
Config:
|
|
path (str): absolute path on the VM to fetch
|
|
dest (str): file name of the downloaded file
|
|
multi (bool) : optional. if path and dest are lists providing
|
|
information of multiple files. defaults to False
|
|
gives (List[int]): optional. defaults to [0]. which files are directly
|
|
returned to the metric. if len==1, str is returned; else, list is
|
|
returned.
|
|
"""
|
|
|
|
if not config.get("multi", False):
|
|
paths: List[str] = [config["path"]]
|
|
dests: List[str] = [config["dest"]]
|
|
else:
|
|
paths: List[str] = config["path"]
|
|
dests: List[str] = config["dest"]
|
|
cache_paths: List[str] = []
|
|
|
|
gives: Set[int] = set(config.get("gives", [0]))
|
|
|
|
for i, (p, d) in enumerate(zip(paths, dests)):
|
|
_path = os.path.join(env.cache_dir, d)
|
|
|
|
file = env.controller.get_file(p)
|
|
if file is None:
|
|
#return None
|
|
# raise FileNotFoundError("File not found on VM: {:}".format(config["path"]))
|
|
if i in gives:
|
|
cache_paths.append(None)
|
|
continue
|
|
|
|
if i in gives:
|
|
cache_paths.append(_path)
|
|
with open(_path, "wb") as f:
|
|
f.write(file)
|
|
|
|
return cache_paths[0] if len(cache_paths)==1 else cache_paths
|
|
|
|
|
|
def get_cache_file(env, config: Dict[str, str]) -> str:
|
|
"""
|
|
Config:
|
|
path (str): relative path in cache dir
|
|
"""
|
|
|
|
_path = os.path.join(env.cache_dir, config["path"])
|
|
assert os.path.exists(_path)
|
|
return _path
|