add multi-app example, fix googledrive functions
This commit is contained in:
@@ -4,6 +4,7 @@ import os.path
|
||||
import time
|
||||
import traceback
|
||||
import uuid
|
||||
import tempfile
|
||||
from typing import Any, Union, Optional
|
||||
from typing import Dict, List
|
||||
|
||||
@@ -431,29 +432,68 @@ class SetupController:
|
||||
for delete:
|
||||
query(str): query pattern string to search files or folder in google drive to delete, please refer to
|
||||
https://developers.google.com/drive/api/guides/search-files?hl=en about how to write query string.
|
||||
By default, move all files/folders into trash (can be recovered).
|
||||
trash(bool): whether to delete files permanently or move to trash. By default, trash=True, just move to trash.
|
||||
TODO: other operations
|
||||
trash(bool): whether to delete files permanently or move to trash. By default, trash=false, completely delete it.
|
||||
for upload:
|
||||
path(str): remote url to download file
|
||||
dest(List[str]): the path in the google drive to store the downloaded file
|
||||
"""
|
||||
settings_file = config.get('settings_file', 'evaluation_examples/settings/googledrive/settings.yml')
|
||||
gauth = GoogleAuth(settings_file=settings_file)
|
||||
drive = GoogleDrive(gauth)
|
||||
|
||||
def mkdir_in_googledrive(paths: List[str]):
|
||||
paths = [paths] if type(paths) != list else paths
|
||||
parent_id = 'root'
|
||||
for p in paths:
|
||||
q = f'"{parent_id}" in parents and title = "{p}" and mimeType = "application/vnd.google-apps.folder" and trashed = false'
|
||||
folder = drive.ListFile({'q': q}).GetList()
|
||||
if len(folder) == 0: # not exists, create it
|
||||
parents = {} if parent_id == 'root' else {'parents': [{'id': parent_id}]}
|
||||
file = drive.CreateFile({'title': p, 'mimeType':'application/vnd.google-apps.folder', **parents})
|
||||
file.Upload()
|
||||
parent_id = file['id']
|
||||
else: parent_id = folder[0]['id']
|
||||
return parent_id
|
||||
|
||||
for oid, operation in enumerate(config['operation']):
|
||||
if operation == 'delete': # delete a specific file
|
||||
# query pattern string, by default, remove all files/folders not in the trash to the trash
|
||||
params = config['args'][oid]
|
||||
q = params.get('query', 'trashed = false')
|
||||
trash = params.get('trash', True)
|
||||
filelist: GoogleDriveFileList = drive.ListFile({'q': q}).GetList()
|
||||
for file in filelist:
|
||||
q = params.get('query', '')
|
||||
trash = params.get('trash', False)
|
||||
q_file = f"( {q} ) and mimeType != 'application/vnd.google-apps.folder'" if q.strip() else "mimeType != 'application/vnd.google-apps.folder'"
|
||||
filelist: GoogleDriveFileList = drive.ListFile({'q': q_file}).GetList()
|
||||
q_folder = f"( {q} ) and mimeType = 'application/vnd.google-apps.folder'" if q.strip() else "mimeType = 'application/vnd.google-apps.folder'"
|
||||
folderlist: GoogleDriveFileList = drive.ListFile({'q': q_folder}).GetList()
|
||||
for file in filelist: # first delete file, then folder
|
||||
file: GoogleDriveFile
|
||||
# note that, if a folder is trashed/deleted, all files and folders in it will be trashed/deleted
|
||||
# this is the same for UnTrash
|
||||
if trash: file.Trash()
|
||||
else: file.Delete()
|
||||
for folder in folderlist:
|
||||
folder: GoogleDriveFile
|
||||
# note that, if a folder is trashed/deleted, all files and folders in it will be trashed/deleted
|
||||
if trash: folder.Trash()
|
||||
else: folder.Delete()
|
||||
elif operation == 'mkdirs':
|
||||
params = config['args'][oid]
|
||||
mkdir_in_googledrive(params['path'])
|
||||
elif operation == 'upload':
|
||||
pass
|
||||
params = config['args'][oid]
|
||||
url = params['url']
|
||||
with tempfile.NamedTemporaryFile(mode='wb', delete=False) as tmpf:
|
||||
response = requests.get(url, stream=True)
|
||||
response.raise_for_status()
|
||||
for chunk in response.iter_content(chunk_size=8192):
|
||||
if chunk:
|
||||
tmpf.write(chunk)
|
||||
tmpf.close()
|
||||
paths = [params['path']] if params['path'] != list else params['path']
|
||||
parent_id = mkdir_in_googledrive(paths[:-1])
|
||||
parents = {} if parent_id == 'root' else {'parents': [{'id': parent_id}]}
|
||||
file = drive.CreateFile({'title': paths[-1], **parents})
|
||||
file.SetContentFile(tmpf.name)
|
||||
file.Upload()
|
||||
return
|
||||
else:
|
||||
raise ValueError('[ERROR]: not implemented clean type!')
|
||||
|
||||
|
||||
Reference in New Issue
Block a user