Fix thread lock in AWS, VirtualBox, and VMware (#49)

* Initailize aws support

* Add README for the VM server

* Refactor OSWorld for supporting more cloud services.

* Initialize vmware and aws implementation v1, waiting for verification

* Initlize files for azure, gcp and virtualbox support

* Debug on the VMware provider

* Fix on aws interface mapping

* Fix instance type

* Refactor

* Clean

* Add Azure provider

* hk region; debug

* Fix lock

* Remove print

* Remove key_name requirements when allocating aws vm

* Clean README

* Fix reset

* Fix bugs

* Add VirtualBox and Azure providers

* Add VirtualBox OVF link

* Raise exception on macOS host

* Init RAEDME for VBox

* Update VirtualBox VM download link

* Update requirements and setup.py; Improve robustness on Windows

* Fix network adapter

* Go through on Windows machine

* Add default adapter option

* Fix minor error

* Change resolution before creating snapshot

* Fix small error

* Change default provider option

* Fix thread lock

* Refactor for more smooth VMware support

---------

Co-authored-by: Timothyxxx <384084775@qq.com>
Co-authored-by: XinyuanWangCS <xywang626@gmail.com>
Co-authored-by: Tianbao Xie <47296835+Timothyxxx@users.noreply.github.com>
This commit is contained in:
HappySix
2024-06-20 23:54:50 +08:00
committed by GitHub
parent 16c3defe20
commit df70b11cf6
5 changed files with 171 additions and 63 deletions

View File

@@ -3,7 +3,6 @@ from filelock import FileLock
import boto3
import psutil
import logging
from multiprocessing import Manager
from desktop_env.providers.base import VMManager
@@ -67,9 +66,6 @@ def _allocate_vm(region=DEFAULT_REGION):
class AWSVMManager(VMManager):
manager = Manager()
check_and_clean_event = manager.Event()
def __init__(self, registry_path=REGISTRY_PATH):
self.registry_path = registry_path
self.lock = FileLock(".aws_lck", timeout=60)
@@ -240,24 +236,27 @@ class AWSVMManager(VMManager):
def get_vm_path(self, region=DEFAULT_REGION):
with self.lock:
if not AWSVMManager.check_and_clean_event.is_set():
AWSVMManager.check_and_clean_event.set()
if not AWSVMManager.checked_and_cleaned:
AWSVMManager.checked_and_cleaned = True
self._check_and_clean()
allocation_needed = False
with self.lock:
free_vms_paths = self._list_free_vms(region)
if len(free_vms_paths) == 0:
# No free virtual machine available, generate a new one
if len(free_vms_paths) == 0:
# No free virtual machine available, generate a new one
allocation_needed = True
else:
# Choose the first free virtual machine
chosen_vm_path = free_vms_paths[0][0]
self._occupy_vm(chosen_vm_path, os.getpid(), region)
return chosen_vm_path
if allocation_needed:
logger.info("No free virtual machine available. Generating a new one, which would take a while...☕")
new_vm_path = _allocate_vm(region)
with self.lock:
self._add_vm(new_vm_path, region)
self._occupy_vm(new_vm_path, os.getpid(), region)
return new_vm_path
else:
# Choose the first free virtual machine
chosen_vm_path = free_vms_paths[0][0]
with self.lock:
self._occupy_vm(chosen_vm_path, os.getpid(), region)
return chosen_vm_path

View File

@@ -45,6 +45,7 @@ class Provider(ABC):
class VMManager(ABC):
checked_and_cleaned = False
@abstractmethod
def initialize_registry(self, **kwargs):

View File

@@ -300,7 +300,7 @@ def _install_vm(vm_name, vms_dir, downloaded_file_name, original_vm_name="Ubuntu
class VirtualBoxVMManager(VMManager):
def __init__(self, registry_path=REGISTRY_PATH):
self.registry_path = registry_path
self.lock = FileLock(LOCK_FILE_NAME, timeout=10)
self.lock = FileLock(LOCK_FILE_NAME, timeout=60)
self.initialize_registry()
def initialize_registry(self):
@@ -309,7 +309,14 @@ class VirtualBoxVMManager(VMManager):
with open(self.registry_path, 'w') as file:
file.write('')
def add_vm(self, vm_path, region=None):
def add_vm(self, vm_path, lock_needed=True):
if lock_needed:
with self.lock:
self._add_vm(vm_path)
else:
self._add_vm(vm_path)
def _add_vm(self, vm_path, region=None):
assert region in [None, 'local'], "For VirtualBox provider, the region should be neither None or 'local'."
with self.lock:
with open(self.registry_path, 'r') as file:
@@ -318,7 +325,14 @@ class VirtualBoxVMManager(VMManager):
with open(self.registry_path, 'w') as file:
file.writelines(new_lines)
def occupy_vm(self, vm_path, pid, region=None):
def occupy_vm(self, vm_path, pid, lock_needed=True):
if lock_needed:
with self.lock:
self._occupy_vm(vm_path, pid)
else:
self._occupy_vm(vm_path, pid)
def _occupy_vm(self, vm_path, pid, region=None):
assert region in [None, 'local'], "For VirtualBox provider, the region should be neither None or 'local'."
with self.lock:
new_lines = []
@@ -333,7 +347,24 @@ class VirtualBoxVMManager(VMManager):
with open(self.registry_path, 'w') as file:
file.writelines(new_lines)
def check_and_clean(self, vms_dir):
def delete_vm(self, vm_path, lock_needed=True):
if lock_needed:
with self.lock:
self._delete_vm(vm_path)
else:
self._delete_vm(vm_path)
def _delete_vm(self, vm_path):
raise NotImplementedError
def check_and_clean(self, vms_dir, lock_needed=True):
if lock_needed:
with self.lock:
self._check_and_clean(vms_dir)
else:
self._check_and_clean(vms_dir)
def _check_and_clean(self, vms_dir):
with self.lock: # Lock when cleaning up the registry and vms_dir
# Check and clean on the running vms, detect the released ones and mark then as 'free'
active_pids = {p.pid for p in psutil.process_iter()}
@@ -379,7 +410,14 @@ class VirtualBoxVMManager(VMManager):
if flag:
shutil.rmtree(os.path.join(vms_dir, vm_name))
def list_free_vms(self):
def list_free_vms(self, lock_needed=True):
if lock_needed:
with self.lock:
return self._list_free_vms()
else:
return self._list_free_vms()
def _list_free_vms(self):
with self.lock: # Lock when reading the registry
free_vms = []
with open(self.registry_path, 'r') as file:
@@ -391,20 +429,30 @@ class VirtualBoxVMManager(VMManager):
return free_vms
def get_vm_path(self, region=None):
self.check_and_clean(vms_dir=VMS_DIR)
free_vms_paths = self.list_free_vms()
if len(free_vms_paths) == 0:
# No free virtual machine available, generate a new one
with self.lock:
if not VirtualBoxVMManager.checked_and_cleaned:
VirtualBoxVMManager.checked_and_cleaned = True
self._check_and_clean(vms_dir=VMS_DIR)
allocation_needed = False
with self.lock:
free_vms_paths = self._list_free_vms()
if len(free_vms_paths) == 0:
# No free virtual machine available, generate a new one
allocation_needed = True
else:
# Choose the first free virtual machine
chosen_vm_path = free_vms_paths[0][0]
self._occupy_vm(chosen_vm_path, os.getpid())
return chosen_vm_path
if allocation_needed:
logger.info("No free virtual machine available. Generating a new one, which would take a while...☕")
new_vm_name = generate_new_vm_name(vms_dir=VMS_DIR)
new_vm_path = _install_vm(new_vm_name, vms_dir=VMS_DIR,
downloaded_file_name=DOWNLOADED_FILE_NAME,
bridged_adapter_name=region)
self.add_vm(new_vm_path)
self.occupy_vm(new_vm_path, os.getpid())
downloaded_file_name=DOWNLOADED_FILE_NAME,
bridged_adapter_name=region)
with self.lock:
self._add_vm(new_vm_path)
self._occupy_vm(new_vm_path, os.getpid())
return new_vm_path
else:
# Choose the first free virtual machine
chosen_vm_path = free_vms_paths[0][0]
self.occupy_vm(chosen_vm_path, os.getpid())
return chosen_vm_path

View File

@@ -274,7 +274,7 @@ def _install_vm(vm_name, vms_dir, downloaded_file_name, original_vm_name="Ubuntu
class VMwareVMManager(VMManager):
def __init__(self, registry_path=REGISTRY_PATH):
self.registry_path = registry_path
self.lock = FileLock(LOCK_FILE_NAME, timeout=10)
self.lock = FileLock(LOCK_FILE_NAME, timeout=60)
self.initialize_registry()
def initialize_registry(self):
@@ -283,7 +283,14 @@ class VMwareVMManager(VMManager):
with open(self.registry_path, 'w') as file:
file.write('')
def add_vm(self, vm_path, region=None):
def add_vm(self, vm_path, lock_needed=True):
if lock_needed:
with self.lock:
self._add_vm(vm_path)
else:
self._add_vm(vm_path)
def _add_vm(self, vm_path, region=None):
assert region in [None, 'local'], "For VMware provider, the region should be neither None or 'local'."
with self.lock:
with open(self.registry_path, 'r') as file:
@@ -292,7 +299,14 @@ class VMwareVMManager(VMManager):
with open(self.registry_path, 'w') as file:
file.writelines(new_lines)
def occupy_vm(self, vm_path, pid, region=None):
def occupy_vm(self, vm_path, pid, lock_needed=True):
if lock_needed:
with self.lock:
self._occupy_vm(vm_path, pid)
else:
self._occupy_vm(vm_path, pid)
def _occupy_vm(self, vm_path, pid, region=None):
assert region in [None, 'local'], "For VMware provider, the region should be neither None or 'local'."
with self.lock:
new_lines = []
@@ -307,7 +321,24 @@ class VMwareVMManager(VMManager):
with open(self.registry_path, 'w') as file:
file.writelines(new_lines)
def check_and_clean(self, vms_dir):
def delete_vm(self, vm_path, lock_needed=True):
if lock_needed:
with self.lock:
self._delete_vm(vm_path)
else:
self._delete_vm(vm_path)
def _delete_vm(self, vm_path):
raise NotImplementedError
def check_and_clean(self, vms_dir, lock_needed=True):
if lock_needed:
with self.lock:
self._check_and_clean(vms_dir)
else:
self._check_and_clean(vms_dir)
def _check_and_clean(self, vms_dir):
with self.lock: # Lock when cleaning up the registry and vms_dir
# Check and clean on the running vms, detect the released ones and mark then as 'free'
active_pids = {p.pid for p in psutil.process_iter()}
@@ -353,7 +384,14 @@ class VMwareVMManager(VMManager):
if flag:
shutil.rmtree(os.path.join(vms_dir, vm_name))
def list_free_vms(self):
def list_free_vms(self, lock_needed=True):
if lock_needed:
with self.lock:
return self._list_free_vms()
else:
return self._list_free_vms()
def _list_free_vms(self):
with self.lock: # Lock when reading the registry
free_vms = []
with open(self.registry_path, 'r') as file:
@@ -365,20 +403,29 @@ class VMwareVMManager(VMManager):
return free_vms
def get_vm_path(self, region=None):
assert region in [None, 'local'], "For VMware provider, the region should be neither None or 'local'."
self.check_and_clean(vms_dir=VMS_DIR)
free_vms_paths = self.list_free_vms()
if len(free_vms_paths) == 0:
# No free virtual machine available, generate a new one
with self.lock:
if not VMwareVMManager.checked_and_cleaned:
VMwareVMManager.checked_and_cleaned = True
self._check_and_clean(vms_dir=VMS_DIR)
allocation_needed = False
with self.lock:
free_vms_paths = self._list_free_vms()
if len(free_vms_paths) == 0:
# No free virtual machine available, generate a new one
allocation_needed = True
else:
# Choose the first free virtual machine
chosen_vm_path = free_vms_paths[0][0]
self._occupy_vm(chosen_vm_path, os.getpid())
return chosen_vm_path
if allocation_needed:
logger.info("No free virtual machine available. Generating a new one, which would take a while...☕")
new_vm_name = generate_new_vm_name(vms_dir=VMS_DIR)
new_vm_path = _install_vm(new_vm_name, vms_dir=VMS_DIR,
downloaded_file_name=DOWNLOADED_FILE_NAME)
self.add_vm(new_vm_path)
self.occupy_vm(new_vm_path, os.getpid())
downloaded_file_name=DOWNLOADED_FILE_NAME)
with self.lock:
self._add_vm(new_vm_path)
self._occupy_vm(new_vm_path, os.getpid())
return new_vm_path
else:
# Choose the first free virtual machine
chosen_vm_path = free_vms_paths[0][0]
self.occupy_vm(chosen_vm_path, os.getpid())
return chosen_vm_path

View File

@@ -1,8 +1,9 @@
import logging
import os
import platform
import subprocess
import time
import os
from desktop_env.providers.base import Provider
logger = logging.getLogger("desktopenv.providers.vmware.VMwareProvider")
@@ -28,12 +29,20 @@ def get_vmrun_type(return_list=False):
class VMwareProvider(Provider):
@staticmethod
def _execute_command(command: list):
result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=60, text=True,
encoding="utf-8")
if result.returncode != 0:
raise Exception("\033[91m" + result.stdout + result.stderr + "\033[0m")
return result.stdout.strip()
def _execute_command(command: list, return_output=False):
process = subprocess.Popen(
command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
encoding="utf-8"
)
if return_output:
output = process.communicate()[0].strip()
return output
else:
return None
def start_emulator(self, path_to_vm: str, headless: bool):
print("Starting VMware VM...")
@@ -51,9 +60,10 @@ class VMwareProvider(Provider):
break
else:
logger.info("Starting VM...")
VMwareProvider._execute_command(["vmrun"] + get_vmrun_type(return_list=True) + ["start", path_to_vm]) if not headless else \
VMwareProvider._execute_command(
["vmrun"] + get_vmrun_type(return_list=True) + ["start", path_to_vm, "nogui"])
_command = ["vmrun"] + get_vmrun_type(return_list=True) + ["start", path_to_vm]
if headless:
_command.append("nogui")
VMwareProvider._execute_command(_command)
time.sleep(WAIT_TIME)
except subprocess.CalledProcessError as e:
@@ -64,7 +74,8 @@ class VMwareProvider(Provider):
while True:
try:
output = VMwareProvider._execute_command(
["vmrun"] + get_vmrun_type(return_list=True) + ["getGuestIPAddress", path_to_vm, "-wait"]
["vmrun"] + get_vmrun_type(return_list=True) + ["getGuestIPAddress", path_to_vm, "-wait"],
return_output=True
)
logger.info(f"VMware VM IP address: {output}")
return output
@@ -75,12 +86,14 @@ class VMwareProvider(Provider):
def save_state(self, path_to_vm: str, snapshot_name: str):
logger.info("Saving VMware VM state...")
VMwareProvider._execute_command(["vmrun"] + get_vmrun_type(return_list=True) + ["snapshot", path_to_vm, snapshot_name])
VMwareProvider._execute_command(
["vmrun"] + get_vmrun_type(return_list=True) + ["snapshot", path_to_vm, snapshot_name])
time.sleep(WAIT_TIME) # Wait for the VM to save
def revert_to_snapshot(self, path_to_vm: str, snapshot_name: str):
logger.info(f"Reverting VMware VM to snapshot: {snapshot_name}...")
VMwareProvider._execute_command(["vmrun"] + get_vmrun_type(return_list=True) + ["revertToSnapshot", path_to_vm, snapshot_name])
VMwareProvider._execute_command(
["vmrun"] + get_vmrun_type(return_list=True) + ["revertToSnapshot", path_to_vm, snapshot_name])
time.sleep(WAIT_TIME) # Wait for the VM to revert
return path_to_vm