refactor: simplify AWS VM management by removing unused methods and improving logging

This commit is contained in:
adlsdztony
2025-06-01 08:31:47 +00:00
parent e48bd6b059
commit 9c0cbebf9a
3 changed files with 52 additions and 311 deletions

View File

@@ -66,198 +66,51 @@ def _allocate_vm(region=DEFAULT_REGION):
class AWSVMManager(VMManager):
"""
AWS VM Manager for managing virtual machines on AWS.
AWS does not need to maintain a registry of VMs, as it can dynamically allocate and deallocate VMs.
This class remains the interface of VMManager for compatibility with other components.
"""
def __init__(self, registry_path=REGISTRY_PATH):
self.registry_path = registry_path
self.lock = FileLock(".aws_lck", timeout=60)
# self.lock = FileLock(".aws_lck", timeout=60)
self.initialize_registry()
def initialize_registry(self, **kwargs):
with self.lock: # Locking during initialization
if not os.path.exists(self.registry_path):
with open(self.registry_path, 'w') as file:
file.write('')
pass
def add_vm(self, vm_path, region=DEFAULT_REGION, lock_needed=True, **kwargs):
if lock_needed:
with self.lock:
self._add_vm(vm_path, region)
else:
self._add_vm(vm_path, region)
pass
def _add_vm(self, vm_path, region=DEFAULT_REGION):
with open(self.registry_path, 'r') as file:
lines = file.readlines()
vm_path_at_vm_region = "{}@{}".format(vm_path, region)
new_lines = lines + [f'{vm_path_at_vm_region}|free\n']
with open(self.registry_path, 'w') as file:
file.writelines(new_lines)
pass
def delete_vm(self, vm_path, region=DEFAULT_REGION, lock_needed=True, **kwargs):
if lock_needed:
with self.lock:
self._delete_vm(vm_path, region)
else:
self._delete_vm(vm_path, region)
pass
def _delete_vm(self, vm_path, region=DEFAULT_REGION):
new_lines = []
with open(self.registry_path, 'r') as file:
lines = file.readlines()
for line in lines:
vm_path_at_vm_region, pid_str = line.strip().split('|')
if vm_path_at_vm_region == "{}@{}".format(vm_path, region):
continue
else:
new_lines.append(line)
with open(self.registry_path, 'w') as file:
file.writelines(new_lines)
pass
def occupy_vm(self, vm_path, pid, region=DEFAULT_REGION, lock_needed=True, **kwargs):
if lock_needed:
with self.lock:
self._occupy_vm(vm_path, pid, region)
else:
self._occupy_vm(vm_path, pid, region)
pass
def _occupy_vm(self, vm_path, pid, region=DEFAULT_REGION):
new_lines = []
with open(self.registry_path, 'r') as file:
lines = file.readlines()
for line in lines:
registered_vm_path, _ = line.strip().split('|')
if registered_vm_path == "{}@{}".format(vm_path, region):
new_lines.append(f'{registered_vm_path}|{pid}\n')
else:
new_lines.append(line)
with open(self.registry_path, 'w') as file:
file.writelines(new_lines)
pass
def check_and_clean(self, lock_needed=True, **kwargs):
if lock_needed:
with self.lock:
self._check_and_clean()
else:
self._check_and_clean()
pass
def _check_and_clean(self):
# Get active PIDs
active_pids = {p.pid for p in psutil.process_iter()}
new_lines = []
vm_path_at_vm_regions = {}
with open(self.registry_path, 'r') as file:
lines = file.readlines()
# Collect all VM paths and their regions
for line in lines:
vm_path_at_vm_region, pid_str = line.strip().split('|')
vm_path, vm_region = vm_path_at_vm_region.split("@")
if vm_region not in vm_path_at_vm_regions:
vm_path_at_vm_regions[vm_region] = []
vm_path_at_vm_regions[vm_region].append((vm_path_at_vm_region, pid_str))
# Process each region
for region, vm_info_list in vm_path_at_vm_regions.items():
ec2_client = boto3.client('ec2', region_name=region)
instance_ids = [vm_info[0].split('@')[0] for vm_info in vm_info_list]
# Batch describe instances
try:
response = ec2_client.describe_instances(InstanceIds=instance_ids)
reservations = response.get('Reservations', [])
terminated_ids = set()
stopped_ids = set()
active_ids = set()
# Collect states of all instances
for reservation in reservations:
for instance in reservation.get('Instances', []):
instance_id = instance.get('InstanceId')
instance_state = instance['State']['Name']
if instance_state in ['terminated', 'shutting-down']:
terminated_ids.add(instance_id)
elif instance_state == 'stopped':
stopped_ids.add(instance_id)
else:
active_ids.add(instance_id)
# Write results back to file
for vm_path_at_vm_region, pid_str in vm_info_list:
vm_path = vm_path_at_vm_region.split('@')[0]
if vm_path in terminated_ids:
logger.info(f"VM {vm_path} not found or terminated, releasing it.")
continue
elif vm_path in stopped_ids:
logger.info(f"VM {vm_path} stopped, mark it as free")
new_lines.append(f'{vm_path}@{region}|free\n')
continue
if pid_str == "free":
new_lines.append(f'{vm_path}@{region}|{pid_str}\n')
elif int(pid_str) in active_pids:
new_lines.append(f'{vm_path}@{region}|{pid_str}\n')
else:
new_lines.append(f'{vm_path}@{region}|free\n')
except ec2_client.exceptions.ClientError as e:
if 'InvalidInstanceID.NotFound' in str(e):
logger.info(f"VM not found, releasing instances in region {region}.")
continue
# Writing updated lines back to the registry file
with open(self.registry_path, 'w') as file:
file.writelines(new_lines)
# We won't check and clean on the files on aws and delete the unregistered ones
# Since this can lead to unexpected delete on other server
# PLease do monitor the instances to avoid additional cost
pass
def list_free_vms(self, region=DEFAULT_REGION, lock_needed=True, **kwargs):
if lock_needed:
with self.lock:
return self._list_free_vms(region)
else:
return self._list_free_vms(region)
pass
def _list_free_vms(self, region=DEFAULT_REGION):
free_vms = []
with open(self.registry_path, 'r') as file:
lines = file.readlines()
for line in lines:
vm_path_at_vm_region, pid_str = line.strip().split('|')
vm_path, vm_region = vm_path_at_vm_region.split("@")
if pid_str == "free" and vm_region == region:
free_vms.append((vm_path, pid_str))
return free_vms
pass
def get_vm_path(self, region=DEFAULT_REGION, **kwargs):
with self.lock:
if not AWSVMManager.checked_and_cleaned:
AWSVMManager.checked_and_cleaned = True
self._check_and_clean()
# allocation_needed = False
# with self.lock:
# free_vms_paths = self._list_free_vms(region)
# if len(free_vms_paths) == 0:
# # No free virtual machine available, generate a new one
# allocation_needed = True
# else:
# # Choose the first free virtual machine
# chosen_vm_path = free_vms_paths[0][0]
# self._occupy_vm(chosen_vm_path, os.getpid(), region)
# return chosen_vm_path
allocation_needed = True
if allocation_needed:
logger.info("No free virtual machine available. Generating a new one, which would take a while...☕")
new_vm_path = _allocate_vm(region)
with self.lock:
self._add_vm(new_vm_path, region)
self._occupy_vm(new_vm_path, os.getpid(), region)
return new_vm_path
logger.info("Allocating a new VM in region: {}".format(region))
new_vm_path = _allocate_vm(region)
return new_vm_path