Merge remote-tracking branch 'origin/main'

This commit is contained in:
Timothyxxx
2024-01-26 00:15:16 +08:00
5 changed files with 167 additions and 44 deletions

View File

@@ -4,6 +4,7 @@ import os.path
import time
import traceback
import uuid
import tempfile
from typing import Any, Union, Optional
from typing import Dict, List
@@ -431,29 +432,68 @@ class SetupController:
for delete:
query(str): query pattern string to search files or folder in google drive to delete, please refer to
https://developers.google.com/drive/api/guides/search-files?hl=en about how to write query string.
By default, move all files/folders into trash (can be recovered).
trash(bool): whether to delete files permanently or move to trash. By default, trash=True, just move to trash.
TODO: other operations
trash(bool): whether to delete files permanently or move to trash. By default, trash=false, completely delete it.
for upload:
path(str): remote url to download file
dest(List[str]): the path in the google drive to store the downloaded file
"""
settings_file = config.get('settings_file', 'evaluation_examples/settings/googledrive/settings.yml')
gauth = GoogleAuth(settings_file=settings_file)
drive = GoogleDrive(gauth)
def mkdir_in_googledrive(paths: List[str]):
paths = [paths] if type(paths) != list else paths
parent_id = 'root'
for p in paths:
q = f'"{parent_id}" in parents and title = "{p}" and mimeType = "application/vnd.google-apps.folder" and trashed = false'
folder = drive.ListFile({'q': q}).GetList()
if len(folder) == 0: # not exists, create it
parents = {} if parent_id == 'root' else {'parents': [{'id': parent_id}]}
file = drive.CreateFile({'title': p, 'mimeType':'application/vnd.google-apps.folder', **parents})
file.Upload()
parent_id = file['id']
else: parent_id = folder[0]['id']
return parent_id
for oid, operation in enumerate(config['operation']):
if operation == 'delete': # delete a specific file
# query pattern string, by default, remove all files/folders not in the trash to the trash
params = config['args'][oid]
q = params.get('query', 'trashed = false')
trash = params.get('trash', True)
filelist: GoogleDriveFileList = drive.ListFile({'q': q}).GetList()
for file in filelist:
q = params.get('query', '')
trash = params.get('trash', False)
q_file = f"( {q} ) and mimeType != 'application/vnd.google-apps.folder'" if q.strip() else "mimeType != 'application/vnd.google-apps.folder'"
filelist: GoogleDriveFileList = drive.ListFile({'q': q_file}).GetList()
q_folder = f"( {q} ) and mimeType = 'application/vnd.google-apps.folder'" if q.strip() else "mimeType = 'application/vnd.google-apps.folder'"
folderlist: GoogleDriveFileList = drive.ListFile({'q': q_folder}).GetList()
for file in filelist: # first delete file, then folder
file: GoogleDriveFile
# note that, if a folder is trashed/deleted, all files and folders in it will be trashed/deleted
# this is the same for UnTrash
if trash: file.Trash()
else: file.Delete()
for folder in folderlist:
folder: GoogleDriveFile
# note that, if a folder is trashed/deleted, all files and folders in it will be trashed/deleted
if trash: folder.Trash()
else: folder.Delete()
elif operation == 'mkdirs':
params = config['args'][oid]
mkdir_in_googledrive(params['path'])
elif operation == 'upload':
pass
params = config['args'][oid]
url = params['url']
with tempfile.NamedTemporaryFile(mode='wb', delete=False) as tmpf:
response = requests.get(url, stream=True)
response.raise_for_status()
for chunk in response.iter_content(chunk_size=8192):
if chunk:
tmpf.write(chunk)
tmpf.close()
paths = [params['path']] if params['path'] != list else params['path']
parent_id = mkdir_in_googledrive(paths[:-1])
parents = {} if parent_id == 'root' else {'parents': [{'id': parent_id}]}
file = drive.CreateFile({'title': paths[-1], **parents})
file.SetContentFile(tmpf.name)
file.Upload()
return
else:
raise ValueError('[ERROR]: not implemented clean type!')

View File

@@ -1,6 +1,6 @@
from .chrome import get_default_search_engine, get_cookie_data, get_bookmarks, get_open_tabs_info, get_pdf_from_url, \
get_shortcuts_on_desktop, get_history, get_enabled_experiments, get_chrome_language, get_chrome_font_size, \
get_profile_name, get_number_of_search_results, get_googledrive_file
get_profile_name, get_number_of_search_results, get_googledrive_file, get_active_tab_info
from .file import get_cloud_file, get_vm_file, get_cache_file
from .general import get_vm_command_line
from .impress import get_audio_in_slide

View File

@@ -466,44 +466,36 @@ def get_number_of_search_results(env, config: Dict[str, str]):
def get_googledrive_file(env, config: Dict[str, Any]) -> str:
""" Get the desired file from Google Drive based on config, return the downloaded local filepath.
To retrieve the file, we provide two options in config dict:
1. query: a list of queries to search the file, each query is a string that follows the format of Google Drive search query
2. path: a list of path to the file, 'folder/subfolder/filename' -> ['folder', 'subfolder', 'filename']
Return the downloaded filepath locally.
"""
settings_file = config.get('settings_file', 'evaluation_examples/settings/googledrive/settings.json')
auth = GoogleAuth(settings_file=settings_file)
drive = GoogleDrive(auth)
q = config['query']
filelist: GoogleDriveFileList = drive.ListFile({'q': q}).GetList()
if len(filelist) == 0: # target file not found
return None
file: GoogleDriveFile = filelist[0] # HACK: if multiple candidates, just download the first one
_path = os.path.join(env.cache_dir, config['dest'])
try:
file.GetContentFile(_path, mimetype=file.metadata['mimeType'])
except:
logger.info('[ERROR]: Failed to download the file from Google Drive')
return None
return _path
def get_googledrive_file(env, config: Dict[str, Any]) -> str:
""" Get the desired file from Google Drive based on config, return the downloaded local filepath.
"""
settings_file = config.get('settings_file', 'evaluation_examples/settings/googledrive/settings.json')
auth = GoogleAuth(settings_file=settings_file)
drive = GoogleDrive(auth)
q = config['query']
filelist: GoogleDriveFileList = drive.ListFile({'q': q}).GetList()
if len(filelist) == 0: # target file not found
return None
file: GoogleDriveFile = filelist[0] # HACK: if multiple candidates, just download the first one
_path = os.path.join(env.cache_dir, config['dest'])
if 'query' in config:
query = config['query']
if type(query) != list: query = [query]
else:
paths = config['path']
if type(paths) != list: paths = [paths]
query = [f"title = {fp} and mimeType = 'application/vnd.google-apps.folder' and trashed = false" if idx < len(paths) - 1
else f'title = {fp} and trashed = false' for idx, fp in enumerate(paths)]
parent_id = 'root'
try:
file.GetContentFile(_path, mimetype=file.metadata['mimeType'])
for q in query:
search = f'( {q} ) and "{parent_id}" in parents'
filelist: GoogleDriveFileList = drive.ListFile({'q': search}).GetList()
if len(filelist) == 0: # target file not found
return None
file: GoogleDriveFile = filelist[0] # HACK: if multiple candidates, just use the first one
parent_id = file['id']
file.GetContentFile(_path, mimetype=file['mimeType'])
except:
logger.info('[ERROR]: Failed to download the file from Google Drive')
return None