From 9c0cbebf9a7fdf60f1945e219c1dc46abf537a4c Mon Sep 17 00:00:00 2001 From: adlsdztony Date: Sun, 1 Jun 2025 08:31:47 +0000 Subject: [PATCH] refactor: simplify AWS VM management by removing unused methods and improving logging --- desktop_env/providers/aws/manager.py | 189 +++----------------------- desktop_env/providers/aws/provider.py | 145 +------------------- evaluation_examples/test_small.json | 29 +++- 3 files changed, 52 insertions(+), 311 deletions(-) diff --git a/desktop_env/providers/aws/manager.py b/desktop_env/providers/aws/manager.py index 6e8d6de..618b8af 100644 --- a/desktop_env/providers/aws/manager.py +++ b/desktop_env/providers/aws/manager.py @@ -66,198 +66,51 @@ def _allocate_vm(region=DEFAULT_REGION): class AWSVMManager(VMManager): + """ + AWS VM Manager for managing virtual machines on AWS. + + AWS does not need to maintain a registry of VMs, as it can dynamically allocate and deallocate VMs. + This class remains the interface of VMManager for compatibility with other components. + """ def __init__(self, registry_path=REGISTRY_PATH): self.registry_path = registry_path - self.lock = FileLock(".aws_lck", timeout=60) + # self.lock = FileLock(".aws_lck", timeout=60) self.initialize_registry() def initialize_registry(self, **kwargs): - with self.lock: # Locking during initialization - if not os.path.exists(self.registry_path): - with open(self.registry_path, 'w') as file: - file.write('') + pass def add_vm(self, vm_path, region=DEFAULT_REGION, lock_needed=True, **kwargs): - if lock_needed: - with self.lock: - self._add_vm(vm_path, region) - else: - self._add_vm(vm_path, region) + pass def _add_vm(self, vm_path, region=DEFAULT_REGION): - with open(self.registry_path, 'r') as file: - lines = file.readlines() - vm_path_at_vm_region = "{}@{}".format(vm_path, region) - new_lines = lines + [f'{vm_path_at_vm_region}|free\n'] - with open(self.registry_path, 'w') as file: - file.writelines(new_lines) + pass def delete_vm(self, vm_path, region=DEFAULT_REGION, lock_needed=True, **kwargs): - if lock_needed: - with self.lock: - self._delete_vm(vm_path, region) - else: - self._delete_vm(vm_path, region) + pass def _delete_vm(self, vm_path, region=DEFAULT_REGION): - new_lines = [] - with open(self.registry_path, 'r') as file: - lines = file.readlines() - for line in lines: - vm_path_at_vm_region, pid_str = line.strip().split('|') - if vm_path_at_vm_region == "{}@{}".format(vm_path, region): - continue - else: - new_lines.append(line) - with open(self.registry_path, 'w') as file: - file.writelines(new_lines) + pass def occupy_vm(self, vm_path, pid, region=DEFAULT_REGION, lock_needed=True, **kwargs): - if lock_needed: - with self.lock: - self._occupy_vm(vm_path, pid, region) - else: - self._occupy_vm(vm_path, pid, region) + pass def _occupy_vm(self, vm_path, pid, region=DEFAULT_REGION): - new_lines = [] - with open(self.registry_path, 'r') as file: - lines = file.readlines() - for line in lines: - registered_vm_path, _ = line.strip().split('|') - if registered_vm_path == "{}@{}".format(vm_path, region): - new_lines.append(f'{registered_vm_path}|{pid}\n') - else: - new_lines.append(line) - with open(self.registry_path, 'w') as file: - file.writelines(new_lines) + pass def check_and_clean(self, lock_needed=True, **kwargs): - if lock_needed: - with self.lock: - self._check_and_clean() - else: - self._check_and_clean() + pass def _check_and_clean(self): - # Get active PIDs - active_pids = {p.pid for p in psutil.process_iter()} - - new_lines = [] - vm_path_at_vm_regions = {} - - with open(self.registry_path, 'r') as file: - lines = file.readlines() - - # Collect all VM paths and their regions - for line in lines: - vm_path_at_vm_region, pid_str = line.strip().split('|') - vm_path, vm_region = vm_path_at_vm_region.split("@") - if vm_region not in vm_path_at_vm_regions: - vm_path_at_vm_regions[vm_region] = [] - vm_path_at_vm_regions[vm_region].append((vm_path_at_vm_region, pid_str)) - - # Process each region - for region, vm_info_list in vm_path_at_vm_regions.items(): - ec2_client = boto3.client('ec2', region_name=region) - instance_ids = [vm_info[0].split('@')[0] for vm_info in vm_info_list] - - # Batch describe instances - try: - response = ec2_client.describe_instances(InstanceIds=instance_ids) - reservations = response.get('Reservations', []) - - terminated_ids = set() - stopped_ids = set() - active_ids = set() - - # Collect states of all instances - for reservation in reservations: - for instance in reservation.get('Instances', []): - instance_id = instance.get('InstanceId') - instance_state = instance['State']['Name'] - if instance_state in ['terminated', 'shutting-down']: - terminated_ids.add(instance_id) - elif instance_state == 'stopped': - stopped_ids.add(instance_id) - else: - active_ids.add(instance_id) - - # Write results back to file - for vm_path_at_vm_region, pid_str in vm_info_list: - vm_path = vm_path_at_vm_region.split('@')[0] - - if vm_path in terminated_ids: - logger.info(f"VM {vm_path} not found or terminated, releasing it.") - continue - elif vm_path in stopped_ids: - logger.info(f"VM {vm_path} stopped, mark it as free") - new_lines.append(f'{vm_path}@{region}|free\n') - continue - - if pid_str == "free": - new_lines.append(f'{vm_path}@{region}|{pid_str}\n') - elif int(pid_str) in active_pids: - new_lines.append(f'{vm_path}@{region}|{pid_str}\n') - else: - new_lines.append(f'{vm_path}@{region}|free\n') - - except ec2_client.exceptions.ClientError as e: - if 'InvalidInstanceID.NotFound' in str(e): - logger.info(f"VM not found, releasing instances in region {region}.") - continue - - # Writing updated lines back to the registry file - with open(self.registry_path, 'w') as file: - file.writelines(new_lines) - - # We won't check and clean on the files on aws and delete the unregistered ones - # Since this can lead to unexpected delete on other server - # PLease do monitor the instances to avoid additional cost + pass def list_free_vms(self, region=DEFAULT_REGION, lock_needed=True, **kwargs): - if lock_needed: - with self.lock: - return self._list_free_vms(region) - else: - return self._list_free_vms(region) + pass def _list_free_vms(self, region=DEFAULT_REGION): - free_vms = [] - with open(self.registry_path, 'r') as file: - lines = file.readlines() - for line in lines: - vm_path_at_vm_region, pid_str = line.strip().split('|') - vm_path, vm_region = vm_path_at_vm_region.split("@") - if pid_str == "free" and vm_region == region: - free_vms.append((vm_path, pid_str)) - - return free_vms + pass def get_vm_path(self, region=DEFAULT_REGION, **kwargs): - with self.lock: - if not AWSVMManager.checked_and_cleaned: - AWSVMManager.checked_and_cleaned = True - self._check_and_clean() - - # allocation_needed = False - # with self.lock: - # free_vms_paths = self._list_free_vms(region) - - # if len(free_vms_paths) == 0: - # # No free virtual machine available, generate a new one - # allocation_needed = True - # else: - # # Choose the first free virtual machine - # chosen_vm_path = free_vms_paths[0][0] - # self._occupy_vm(chosen_vm_path, os.getpid(), region) - # return chosen_vm_path - - allocation_needed = True - if allocation_needed: - logger.info("No free virtual machine available. Generating a new one, which would take a while...☕") - new_vm_path = _allocate_vm(region) - with self.lock: - self._add_vm(new_vm_path, region) - self._occupy_vm(new_vm_path, os.getpid(), region) - return new_vm_path + logger.info("Allocating a new VM in region: {}".format(region)) + new_vm_path = _allocate_vm(region) + return new_vm_path diff --git a/desktop_env/providers/aws/provider.py b/desktop_env/providers/aws/provider.py index 7e9f708..882710e 100644 --- a/desktop_env/providers/aws/provider.py +++ b/desktop_env/providers/aws/provider.py @@ -95,36 +95,13 @@ class AWSProvider(Provider): instance_type = instance['InstanceType'] instance_snapshot = instance_details['Reservations'][0]['Instances'][0]['ImageId'] - # TODO:Step 2: Terminate the old instance + # Step 2: Terminate the old instance ec2_client.terminate_instances(InstanceIds=[path_to_vm]) logger.info(f"Old instance {path_to_vm} has been terminated.") # Step 3: Launch a new instance from the snapshot logger.info(f"Launching a new instance from snapshot {instance_snapshot}...") - # run_instances_params = { - # "MaxCount": 1, - # "MinCount": 1, - # "ImageId": instance_snapshot, - # "InstanceType": instance_type, - # "EbsOptimized": True, - # "NetworkInterfaces": [ - # { - # "SubnetId": subnet_id, - # "AssociatePublicIpAddress": True, - # "DeviceIndex": 0, - # "Groups": security_groups - # } - # ], - # "BlockDeviceMappings":[ - # { - # "Ebs": { - # "VolumeSize": 30, - # "VolumeType": "gp3" - # }, - # }, - # ], - # } new_instance = ec2_client.run_instances( MaxCount = 1, @@ -155,129 +132,13 @@ class AWSProvider(Provider): raise - # # Step 1: Retrieve the original instance details - # instance_details = ec2_client.describe_instances(InstanceIds=[path_to_vm]) - # instance = instance_details['Reservations'][0]['Instances'][0] - # security_groups = [sg['GroupId'] for sg in instance['SecurityGroups']] - # #subnet_id = instance['SubnetId'] - # #TODO:instance_type = instance['InstanceType'] - # instance_type = 't3.large' - # instance_snapshot = instance_details['Reservations'][0]['Instances'][0]['ImageId'] - - # # TODO:Step 2: Terminate the old instance - # if not path_to_vm == 'i-00017dfb534d22011': - # ec2_client.terminate_instances(InstanceIds=[path_to_vm]) - # logger.info(f"Old instance {path_to_vm} has been terminated.") - - # # Step 3: Launch a new instance from the snapshot - # logger.info(f"Launching a new instance from snapshot {instance_snapshot}...") - - # timestamp_str = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") - # instance_name = "/dev/sda1" - - # new_instance = ec2_client.run_instances( - # BlockDeviceMappings = [ - # { - # "Ebs": { - # "VolumeSize": 30, - # "VolumeType": "gp3" - # }, - # 'DeviceName':instance_name, - # }, - # ], - # MaxCount = 1, - # MinCount = 1, - # ImageId = instance_snapshot, - # InstanceType = instance_type, - # EbsOptimized = True, - # NetworkInterfaces = [ - # { - # "AssociatePublicIpAddress": True, - # "DeviceIndex": 0, - # "Groups": security_groups - # } - # ] - # ) - # '''NetworkInterfaces = [ - # { - # "SubnetId": subnet_id, - # "AssociatePublicIpAddress": True, - # "DeviceIndex": 0, - # "Groups": security_groups - # } - # ]''' - # new_instance_id = new_instance['Instances'][0]['InstanceId'] - # logger.info(f"New instance {new_instance_id} launched from snapshot {snapshot_name}.") - # logger.info(f"Waiting for instance {new_instance_id} to be running...") - # ec2_client.get_waiter('instance_running').wait(InstanceIds=[new_instance_id]) - # logger.info(f"Instance {new_instance_id} is ready.") - - # # # Step 4: set inbound rules - # # # TODO: get host sg automatically - # # host = ec2_client.describe_instances(InstanceIds=['i-027eab0d007b62793']) - # # host_sg_id = host['Reservations'][0]['Instances'][0]['SecurityGroups'][0]['GroupId'] - # # vm_sg_id = new_instance['Instances'][0]['SecurityGroups'][0]['GroupId'] - - # # # add inbound rules to the host security group - # # try: - # # host.authorize_security_group_ingress( - # # GroupId= host_sg_id, - # # IpPermissions=[ - # # { - # # "IpProtocol": "tcp", - # # "FromPort": 5000, - # # "ToPort": 5000, - # # "UserIdGroupPairs": [ - # # { - # # "GroupId": vm_sg_id - # # } - # # ] - # # } - # # ] - # # ) - # # print(f"Port 5000 opened on {host_sg_id} for {vm_sg_id}") - # # except ClientError as e: - # # if "InvalidPermission.Duplicate" in str(e): - # # print(f"Rule already exists on {host_sg_id}") - # # else: - # # print(f"Error updating {host_sg_id}: {e}") - - # # # add inbound rules to the new instance security group - # # try: - # # new_instance.authorize_security_group_ingress( - # # GroupId= new_instance_id, - # # IpPermissions=[ - # # { - # # "IpProtocol": "tcp", - # # "FromPort": 6000, - # # "ToPort": 6000, - # # "UserIdGroupPairs": [ - # # { - # # "GroupId": host_sg_id - # # } - # # ] - # # } - # # ] - # # ) - # # print(f"Port 6000 opened on {new_instance_id} for {host_sg_id}") - # # except ClientError as e: - # # if "InvalidPermission.Duplicate" in str(e): - # # print(f"Rule already exists on {new_instance_id}") - # # else: - # # print(f"Error updating {new_instance_id}: {e}") - - # return new_instance_id - - def stop_emulator(self, path_to_vm, region=None): logger.info(f"Stopping AWS VM {path_to_vm}...") ec2_client = boto3.client('ec2', region_name=self.region) try: - ec2_client.stop_instances(InstanceIds=[path_to_vm]) - waiter = ec2_client.get_waiter('instance_stopped') - waiter.wait(InstanceIds=[path_to_vm], WaiterConfig={'Delay': WAIT_DELAY, 'MaxAttempts': MAX_ATTEMPTS}) - logger.info(f"Instance {path_to_vm} has been stopped.") + ec2_client.terminate_instances(InstanceIds=[path_to_vm]) + logger.info(f"Instance {path_to_vm} has been terminated.") except ClientError as e: logger.error(f"Failed to stop the AWS VM {path_to_vm}: {str(e)}") raise diff --git a/evaluation_examples/test_small.json b/evaluation_examples/test_small.json index 3e0f127..dbf95d3 100644 --- a/evaluation_examples/test_small.json +++ b/evaluation_examples/test_small.json @@ -29,6 +29,33 @@ "46407397-a7d5-4c6b-92c6-dbe038b1457b", "4e9f0faf-2ecc-4ae8-a804-28c9a75d1ddc", "510f64c8-9bcc-4be1-8d30-638705850618", - "897e3b53-5d4d-444b-85cb-2cdc8a97d903" + "897e3b53-5d4d-444b-85cb-2cdc8a97d903", + "c867c42d-a52d-4a24-8ae3-f75d256b5618", + "74d5859f-ed66-4d3e-aa0e-93d7a592ce41", + "b5062e3e-641c-4e3a-907b-ac864d2e7652", + "48d05431-6cd5-4e76-82eb-12b60d823f7d", + "eb303e01-261e-4972-8c07-c9b4e7a4922a", + "d1acdb87-bb67-4f30-84aa-990e56a09c92", + "deec51c9-3b1e-4b9e-993c-4776f20e8bb2", + "8e116af7-7db7-4e35-a68b-b0939c066c78", + "716a6079-22da-47f1-ba73-c9d58f986a38", + "2373b66a-092d-44cb-bfd7-82e86e7a3b4d" + ], + "os": [ + "5ea617a3-0e86-4ba6-aab2-dac9aa2e8d57", + "5812b315-e7bd-4265-b51f-863c02174c28" + ], + "thunderbird": [ + "dfac9ee8-9bc4-4cdc-b465-4a4bfcd2f397", + "15c3b339-88f7-4a86-ab16-e71c58dcb01e" + ], + "vlc": [ + "59f21cfb-0120-4326-b255-a5b827b38967", + "8f080098-ddb1-424c-b438-4e96e5e4786e" + ], + "vs_code": [ + "0ed39f63-6049-43d4-ba4d-5fa2fe04a951", + "53ad5833-3455-407b-bbc6-45b4c79ab8fb", + "276cc624-87ea-4f08-ab93-f770e3790175" ] } \ No newline at end of file