aws_communication_success
This commit is contained in:
@@ -23,10 +23,10 @@ class DesktopEnv(gym.Env):
|
||||
"""
|
||||
DesktopEnv with OpenAI Gym interface. It provides a desktop environment for setting and evaluating desktop automation tasks.
|
||||
"""
|
||||
|
||||
#TODO:provider_name: str = "vmware",
|
||||
def __init__(
|
||||
self,
|
||||
provider_name: str = "vmware",
|
||||
provider_name: str = "aws",
|
||||
region: str = None,
|
||||
path_to_vm: str = None,
|
||||
snapshot_name: str = "init_state",
|
||||
@@ -55,7 +55,7 @@ class DesktopEnv(gym.Env):
|
||||
# Initialize VM manager and vitualization provider
|
||||
self.region = region
|
||||
|
||||
# Default
|
||||
# Default TODO:
|
||||
self.server_port = 5000
|
||||
self.chromium_port = 9222
|
||||
self.vnc_port = 8006
|
||||
@@ -69,7 +69,7 @@ class DesktopEnv(gym.Env):
|
||||
self.path_to_vm = os.path.abspath(os.path.expandvars(os.path.expanduser(path_to_vm))) \
|
||||
if provider_name in {"vmware", "virtualbox"} else path_to_vm
|
||||
else:
|
||||
self.path_to_vm = self.manager.get_vm_path(self.os_type, region)
|
||||
self.path_to_vm = self.manager.get_vm_path(region) # self.os_type,
|
||||
|
||||
self.snapshot_name = snapshot_name
|
||||
self.cache_dir_base: str = cache_dir
|
||||
@@ -114,7 +114,8 @@ class DesktopEnv(gym.Env):
|
||||
# due to the fact it could be changed when implemented by cloud services
|
||||
path_to_vm = self.provider.revert_to_snapshot(self.path_to_vm, self.snapshot_name)
|
||||
if path_to_vm and not path_to_vm == self.path_to_vm:
|
||||
# path_to_vm has to be a new path
|
||||
# path_to_vm has to be a new path
|
||||
|
||||
self.manager.delete_vm(self.path_to_vm, self.region)
|
||||
self.manager.add_vm(path_to_vm, self.region)
|
||||
self.manager.occupy_vm(path_to_vm, os.getpid(), self.region)
|
||||
|
||||
@@ -21,14 +21,15 @@ IMAGE_ID_MAP = {
|
||||
|
||||
INSTANCE_TYPE = "t3.medium"
|
||||
|
||||
# sg-0342574803206ee9c subnet-037edfff66c2eb894
|
||||
NETWORK_INTERFACE_MAP = {
|
||||
"us-east-1": [
|
||||
{
|
||||
"SubnetId": "subnet-037edfff66c2eb894",
|
||||
"SubnetId": "subnet-0a4b0c5b8f6066712",
|
||||
"AssociatePublicIpAddress": True,
|
||||
"DeviceIndex": 0,
|
||||
"Groups": [
|
||||
"sg-0342574803206ee9c"
|
||||
"sg-08a53433e9b4abde6"
|
||||
]
|
||||
}
|
||||
],
|
||||
@@ -240,19 +241,20 @@ class AWSVMManager(VMManager):
|
||||
AWSVMManager.checked_and_cleaned = True
|
||||
self._check_and_clean()
|
||||
|
||||
allocation_needed = False
|
||||
with self.lock:
|
||||
free_vms_paths = self._list_free_vms(region)
|
||||
# allocation_needed = False
|
||||
# with self.lock:
|
||||
# free_vms_paths = self._list_free_vms(region)
|
||||
|
||||
if len(free_vms_paths) == 0:
|
||||
# No free virtual machine available, generate a new one
|
||||
allocation_needed = True
|
||||
else:
|
||||
# Choose the first free virtual machine
|
||||
chosen_vm_path = free_vms_paths[0][0]
|
||||
self._occupy_vm(chosen_vm_path, os.getpid(), region)
|
||||
return chosen_vm_path
|
||||
# if len(free_vms_paths) == 0:
|
||||
# # No free virtual machine available, generate a new one
|
||||
# allocation_needed = True
|
||||
# else:
|
||||
# # Choose the first free virtual machine
|
||||
# chosen_vm_path = free_vms_paths[0][0]
|
||||
# self._occupy_vm(chosen_vm_path, os.getpid(), region)
|
||||
# return chosen_vm_path
|
||||
|
||||
allocation_needed = True
|
||||
if allocation_needed:
|
||||
logger.info("No free virtual machine available. Generating a new one, which would take a while...☕")
|
||||
new_vm_path = _allocate_vm(region)
|
||||
|
||||
@@ -4,6 +4,9 @@ from botocore.exceptions import ClientError
|
||||
import logging
|
||||
|
||||
from desktop_env.providers.base import Provider
|
||||
from datetime import datetime
|
||||
import time
|
||||
|
||||
|
||||
logger = logging.getLogger("desktopenv.providers.aws.AWSProvider")
|
||||
logger.setLevel(logging.INFO)
|
||||
@@ -14,24 +17,60 @@ MAX_ATTEMPTS = 10
|
||||
|
||||
class AWSProvider(Provider):
|
||||
|
||||
def start_emulator(self, path_to_vm: str, headless: bool):
|
||||
# def start_emulator(self, path_to_vm: str, headless: bool, os_type: str):
|
||||
# logger.info("Starting AWS VM...")
|
||||
# ec2_client = boto3.client('ec2', region_name=self.region)
|
||||
|
||||
# try:
|
||||
# # Start the instance
|
||||
# ec2_client.start_instances(InstanceIds=[path_to_vm])
|
||||
# logger.info(f"Instance {path_to_vm} is starting...")
|
||||
|
||||
# # Wait for the instance to be in the 'running' state
|
||||
# waiter = ec2_client.get_waiter('instance_running')
|
||||
# waiter.wait(InstanceIds=[path_to_vm], WaiterConfig={'Delay': WAIT_DELAY, 'MaxAttempts': MAX_ATTEMPTS})
|
||||
# logger.info(f"Instance {path_to_vm} is now running.")
|
||||
|
||||
# except ClientError as e:
|
||||
# logger.error(f"Failed to start the AWS VM {path_to_vm}: {str(e)}")
|
||||
# raise
|
||||
|
||||
def start_emulator(self, path_to_vm: str, headless: bool, os_type: str):
|
||||
logger.info("Starting AWS VM...")
|
||||
ec2_client = boto3.client('ec2', region_name=self.region)
|
||||
|
||||
try:
|
||||
# Start the instance
|
||||
ec2_client.start_instances(InstanceIds=[path_to_vm])
|
||||
logger.info(f"Instance {path_to_vm} is starting...")
|
||||
# Check the current state of the instance
|
||||
response = ec2_client.describe_instances(InstanceIds=[path_to_vm])
|
||||
state = response['Reservations'][0]['Instances'][0]['State']['Name']
|
||||
logger.info(f"Instance {path_to_vm} current state: {state}")
|
||||
|
||||
# Wait for the instance to be in the 'running' state
|
||||
waiter = ec2_client.get_waiter('instance_running')
|
||||
waiter.wait(InstanceIds=[path_to_vm], WaiterConfig={'Delay': WAIT_DELAY, 'MaxAttempts': MAX_ATTEMPTS})
|
||||
logger.info(f"Instance {path_to_vm} is now running.")
|
||||
if state == 'running':
|
||||
# If the instance is already running, skip starting it
|
||||
logger.info(f"Instance {path_to_vm} is already running. Skipping start.")
|
||||
return
|
||||
|
||||
if state == 'stopped':
|
||||
# Start the instance if it's currently stopped
|
||||
ec2_client.start_instances(InstanceIds=[path_to_vm])
|
||||
logger.info(f"Instance {path_to_vm} is starting...")
|
||||
|
||||
# Wait until the instance reaches 'running' state
|
||||
waiter = ec2_client.get_waiter('instance_running')
|
||||
waiter.wait(
|
||||
InstanceIds=[path_to_vm],
|
||||
WaiterConfig={'Delay': WAIT_DELAY, 'MaxAttempts': MAX_ATTEMPTS}
|
||||
)
|
||||
logger.info(f"Instance {path_to_vm} is now running.")
|
||||
else:
|
||||
# For all other states (terminated, pending, etc.), log a warning
|
||||
logger.warning(f"Instance {path_to_vm} is in state '{state}' and cannot be started.")
|
||||
|
||||
except ClientError as e:
|
||||
logger.error(f"Failed to start the AWS VM {path_to_vm}: {str(e)}")
|
||||
raise
|
||||
|
||||
|
||||
def get_ip_address(self, path_to_vm: str) -> str:
|
||||
logger.info("Getting AWS VM IP address...")
|
||||
ec2_client = boto3.client('ec2', region_name=self.region)
|
||||
@@ -71,21 +110,46 @@ class AWSProvider(Provider):
|
||||
security_groups = [sg['GroupId'] for sg in instance['SecurityGroups']]
|
||||
subnet_id = instance['SubnetId']
|
||||
instance_type = instance['InstanceType']
|
||||
instance_snapshot = instance_details['Reservations'][0]['Instances'][0]['ImageId']
|
||||
|
||||
# Step 2: Terminate the old instance
|
||||
# TODO:Step 2: Terminate the old instance
|
||||
ec2_client.terminate_instances(InstanceIds=[path_to_vm])
|
||||
logger.info(f"Old instance {path_to_vm} has been terminated.")
|
||||
|
||||
# Step 3: Launch a new instance from the snapshot
|
||||
logger.info(f"Launching a new instance from snapshot {snapshot_name}...")
|
||||
logger.info(f"Launching a new instance from snapshot {instance_snapshot}...")
|
||||
|
||||
run_instances_params = {
|
||||
"MaxCount": 1,
|
||||
"MinCount": 1,
|
||||
"ImageId": snapshot_name,
|
||||
"InstanceType": instance_type,
|
||||
"EbsOptimized": True,
|
||||
"NetworkInterfaces": [
|
||||
# run_instances_params = {
|
||||
# "MaxCount": 1,
|
||||
# "MinCount": 1,
|
||||
# "ImageId": instance_snapshot,
|
||||
# "InstanceType": instance_type,
|
||||
# "EbsOptimized": True,
|
||||
# "NetworkInterfaces": [
|
||||
# {
|
||||
# "SubnetId": subnet_id,
|
||||
# "AssociatePublicIpAddress": True,
|
||||
# "DeviceIndex": 0,
|
||||
# "Groups": security_groups
|
||||
# }
|
||||
# ],
|
||||
# "BlockDeviceMappings":[
|
||||
# {
|
||||
# "Ebs": {
|
||||
# "VolumeSize": 30,
|
||||
# "VolumeType": "gp3"
|
||||
# },
|
||||
# },
|
||||
# ],
|
||||
# }
|
||||
|
||||
new_instance = ec2_client.run_instances(
|
||||
MaxCount = 1,
|
||||
MinCount = 1,
|
||||
ImageId = instance_snapshot,
|
||||
InstanceType = instance_type,
|
||||
EbsOptimized = True,
|
||||
NetworkInterfaces = [
|
||||
{
|
||||
"SubnetId": subnet_id,
|
||||
"AssociatePublicIpAddress": True,
|
||||
@@ -93,13 +157,13 @@ class AWSProvider(Provider):
|
||||
"Groups": security_groups
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
new_instance = ec2_client.run_instances(**run_instances_params)
|
||||
)
|
||||
new_instance_id = new_instance['Instances'][0]['InstanceId']
|
||||
logger.info(f"New instance {new_instance_id} launched from snapshot {snapshot_name}.")
|
||||
logger.info(f"Waiting for instance {new_instance_id} to be running...")
|
||||
ec2_client.get_waiter('instance_running').wait(InstanceIds=[new_instance_id])
|
||||
# waite 60 seconds for the instance to be ready
|
||||
time.sleep(60)
|
||||
logger.info(f"Instance {new_instance_id} is ready.")
|
||||
|
||||
return new_instance_id
|
||||
@@ -108,6 +172,121 @@ class AWSProvider(Provider):
|
||||
logger.error(f"Failed to revert to snapshot {snapshot_name} for the instance {path_to_vm}: {str(e)}")
|
||||
raise
|
||||
|
||||
|
||||
# # Step 1: Retrieve the original instance details
|
||||
# instance_details = ec2_client.describe_instances(InstanceIds=[path_to_vm])
|
||||
# instance = instance_details['Reservations'][0]['Instances'][0]
|
||||
# security_groups = [sg['GroupId'] for sg in instance['SecurityGroups']]
|
||||
# #subnet_id = instance['SubnetId']
|
||||
# #TODO:instance_type = instance['InstanceType']
|
||||
# instance_type = 't3.large'
|
||||
# instance_snapshot = instance_details['Reservations'][0]['Instances'][0]['ImageId']
|
||||
|
||||
# # TODO:Step 2: Terminate the old instance
|
||||
# if not path_to_vm == 'i-00017dfb534d22011':
|
||||
# ec2_client.terminate_instances(InstanceIds=[path_to_vm])
|
||||
# logger.info(f"Old instance {path_to_vm} has been terminated.")
|
||||
|
||||
# # Step 3: Launch a new instance from the snapshot
|
||||
# logger.info(f"Launching a new instance from snapshot {instance_snapshot}...")
|
||||
|
||||
# timestamp_str = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
|
||||
# instance_name = "/dev/sda1"
|
||||
|
||||
# new_instance = ec2_client.run_instances(
|
||||
# BlockDeviceMappings = [
|
||||
# {
|
||||
# "Ebs": {
|
||||
# "VolumeSize": 30,
|
||||
# "VolumeType": "gp3"
|
||||
# },
|
||||
# 'DeviceName':instance_name,
|
||||
# },
|
||||
# ],
|
||||
# MaxCount = 1,
|
||||
# MinCount = 1,
|
||||
# ImageId = instance_snapshot,
|
||||
# InstanceType = instance_type,
|
||||
# EbsOptimized = True,
|
||||
# NetworkInterfaces = [
|
||||
# {
|
||||
# "AssociatePublicIpAddress": True,
|
||||
# "DeviceIndex": 0,
|
||||
# "Groups": security_groups
|
||||
# }
|
||||
# ]
|
||||
# )
|
||||
# '''NetworkInterfaces = [
|
||||
# {
|
||||
# "SubnetId": subnet_id,
|
||||
# "AssociatePublicIpAddress": True,
|
||||
# "DeviceIndex": 0,
|
||||
# "Groups": security_groups
|
||||
# }
|
||||
# ]'''
|
||||
# new_instance_id = new_instance['Instances'][0]['InstanceId']
|
||||
# logger.info(f"New instance {new_instance_id} launched from snapshot {snapshot_name}.")
|
||||
# logger.info(f"Waiting for instance {new_instance_id} to be running...")
|
||||
# ec2_client.get_waiter('instance_running').wait(InstanceIds=[new_instance_id])
|
||||
# logger.info(f"Instance {new_instance_id} is ready.")
|
||||
|
||||
# # # Step 4: set inbound rules
|
||||
# # # TODO: get host sg automatically
|
||||
# # host = ec2_client.describe_instances(InstanceIds=['i-027eab0d007b62793'])
|
||||
# # host_sg_id = host['Reservations'][0]['Instances'][0]['SecurityGroups'][0]['GroupId']
|
||||
# # vm_sg_id = new_instance['Instances'][0]['SecurityGroups'][0]['GroupId']
|
||||
|
||||
# # # add inbound rules to the host security group
|
||||
# # try:
|
||||
# # host.authorize_security_group_ingress(
|
||||
# # GroupId= host_sg_id,
|
||||
# # IpPermissions=[
|
||||
# # {
|
||||
# # "IpProtocol": "tcp",
|
||||
# # "FromPort": 5000,
|
||||
# # "ToPort": 5000,
|
||||
# # "UserIdGroupPairs": [
|
||||
# # {
|
||||
# # "GroupId": vm_sg_id
|
||||
# # }
|
||||
# # ]
|
||||
# # }
|
||||
# # ]
|
||||
# # )
|
||||
# # print(f"Port 5000 opened on {host_sg_id} for {vm_sg_id}")
|
||||
# # except ClientError as e:
|
||||
# # if "InvalidPermission.Duplicate" in str(e):
|
||||
# # print(f"Rule already exists on {host_sg_id}")
|
||||
# # else:
|
||||
# # print(f"Error updating {host_sg_id}: {e}")
|
||||
|
||||
# # # add inbound rules to the new instance security group
|
||||
# # try:
|
||||
# # new_instance.authorize_security_group_ingress(
|
||||
# # GroupId= new_instance_id,
|
||||
# # IpPermissions=[
|
||||
# # {
|
||||
# # "IpProtocol": "tcp",
|
||||
# # "FromPort": 6000,
|
||||
# # "ToPort": 6000,
|
||||
# # "UserIdGroupPairs": [
|
||||
# # {
|
||||
# # "GroupId": host_sg_id
|
||||
# # }
|
||||
# # ]
|
||||
# # }
|
||||
# # ]
|
||||
# # )
|
||||
# # print(f"Port 6000 opened on {new_instance_id} for {host_sg_id}")
|
||||
# # except ClientError as e:
|
||||
# # if "InvalidPermission.Duplicate" in str(e):
|
||||
# # print(f"Rule already exists on {new_instance_id}")
|
||||
# # else:
|
||||
# # print(f"Error updating {new_instance_id}: {e}")
|
||||
|
||||
# return new_instance_id
|
||||
|
||||
|
||||
def stop_emulator(self, path_to_vm, region=None):
|
||||
logger.info(f"Stopping AWS VM {path_to_vm}...")
|
||||
ec2_client = boto3.client('ec2', region_name=self.region)
|
||||
|
||||
Reference in New Issue
Block a user