Files
sci-gui-agent-benchmark/desktop_env/providers/aws/manager.py
2025-07-11 12:15:03 +00:00

278 lines
11 KiB
Python

import os
from filelock import FileLock
import boto3
import psutil
import logging
import dotenv
import signal
INSTANCE_TYPE = "t3.xlarge"
# Load environment variables from .env file
dotenv.load_dotenv()
# Ensure the AWS region is set in the environment
if not os.getenv('AWS_REGION'):
raise EnvironmentError("AWS_REGION must be set in the environment variables.")
# Ensure the AWS subnet and security group IDs are set in the environment
if not os.getenv('AWS_SUBNET_ID') or not os.getenv('AWS_SECURITY_GROUP_ID'):
raise EnvironmentError("AWS_SUBNET_ID and AWS_SECURITY_GROUP_ID must be set in the environment variables.")
from desktop_env.providers.base import VMManager
# Import proxy-related modules only when needed
try:
from desktop_env.providers.aws.proxy_pool import get_global_proxy_pool, init_proxy_pool
PROXY_SUPPORT_AVAILABLE = True
except ImportError:
PROXY_SUPPORT_AVAILABLE = False
logger = logging.getLogger("desktopenv.providers.aws.AWSVMManager")
logger.setLevel(logging.INFO)
DEFAULT_REGION = "us-east-1"
# todo: Add doc for the configuration of image, security group and network interface
# todo: public the AMI images
IMAGE_ID_MAP = {
"us-east-1": {
(1920, 1080): "ami-09138bff939f82bd8"
},
"ap-east-1": {
(1920, 1080): "ami-0c092a5b8be4116f5"
}
}
def _allocate_vm(region=DEFAULT_REGION, screen_size=(1920, 1080)):
if region not in IMAGE_ID_MAP:
raise ValueError(f"Region {region} is not supported. Supported regions are: {list(IMAGE_ID_MAP.keys())}")
if screen_size not in IMAGE_ID_MAP[region]:
raise ValueError(f"Screen size {screen_size} not supported for region {region}. Supported: {list(IMAGE_ID_MAP[region].keys())}")
ami_id = IMAGE_ID_MAP[region][screen_size]
ec2_client = boto3.client('ec2', region_name=region)
instance_id = None
original_sigint_handler = signal.getsignal(signal.SIGINT)
original_sigterm_handler = signal.getsignal(signal.SIGTERM)
def signal_handler(sig, frame):
if instance_id:
signal_name = "SIGINT" if sig == signal.SIGINT else "SIGTERM"
logger.warning(f"Received {signal_name} signal, terminating instance {instance_id}...")
try:
ec2_client.terminate_instances(InstanceIds=[instance_id])
logger.info(f"Successfully terminated instance {instance_id} after {signal_name}.")
except Exception as cleanup_error:
logger.error(f"Failed to terminate instance {instance_id} after {signal_name}: {str(cleanup_error)}")
# Restore original signal handlers
signal.signal(signal.SIGINT, original_sigint_handler)
signal.signal(signal.SIGTERM, original_sigterm_handler)
# Raise appropriate exception based on signal type
if sig == signal.SIGINT:
raise KeyboardInterrupt
else:
# For SIGTERM, exit gracefully
import sys
sys.exit(0)
try:
# Set up signal handlers for both SIGINT and SIGTERM
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
if not os.getenv('AWS_SECURITY_GROUP_ID'):
raise ValueError("AWS_SECURITY_GROUP_ID is not set in the environment variables.")
if not os.getenv('AWS_SUBNET_ID'):
raise ValueError("AWS_SUBNET_ID is not set in the environment variables.")
run_instances_params = {
"MaxCount": 1,
"MinCount": 1,
"ImageId": ami_id,
"InstanceType": INSTANCE_TYPE,
"EbsOptimized": True,
"NetworkInterfaces": [
{
"SubnetId": os.getenv('AWS_SUBNET_ID'),
"AssociatePublicIpAddress": True,
"DeviceIndex": 0,
"Groups": [
os.getenv('AWS_SECURITY_GROUP_ID')
]
}
],
"BlockDeviceMappings": [
{
"DeviceName": "/dev/sda1",
"Ebs": {
# "VolumeInitializationRate": 300
"VolumeSize": 30, # Size in GB
"VolumeType": "gp3", # General Purpose SSD
"Throughput": 1000,
"Iops": 4000 # Adjust IOPS as needed
}
}
]
}
response = ec2_client.run_instances(**run_instances_params)
instance_id = response['Instances'][0]['InstanceId']
waiter = ec2_client.get_waiter('instance_running')
logger.info(f"Waiting for instance {instance_id} to be running...")
waiter.wait(InstanceIds=[instance_id])
logger.info(f"Instance {instance_id} is ready.")
# 获取并显示VNC访问地址
try:
instance_details = ec2_client.describe_instances(InstanceIds=[instance_id])
instance = instance_details['Reservations'][0]['Instances'][0]
public_ip = instance.get('PublicIpAddress', '')
if public_ip:
vnc_url = f"http://{public_ip}:5910/vnc.html"
logger.info("="*80)
logger.info(f"🖥️ VNC Web Access URL: {vnc_url}")
logger.info(f"📡 Public IP: {public_ip}")
logger.info(f"🆔 Instance ID: {instance_id}")
logger.info("="*80)
print(f"\n🌐 VNC访问地址: {vnc_url}")
print(f"📍 请在浏览器中打开上述地址进行远程桌面访问\n")
except Exception as e:
logger.warning(f"Failed to get VNC address for instance {instance_id}: {e}")
except KeyboardInterrupt:
logger.warning("VM allocation interrupted by user (SIGINT).")
if instance_id:
logger.info(f"Terminating instance {instance_id} due to interruption.")
ec2_client.terminate_instances(InstanceIds=[instance_id])
raise
except Exception as e:
logger.error(f"Failed to allocate VM: {e}", exc_info=True)
if instance_id:
logger.info(f"Terminating instance {instance_id} due to an error.")
ec2_client.terminate_instances(InstanceIds=[instance_id])
raise
finally:
# Restore original signal handlers
signal.signal(signal.SIGINT, original_sigint_handler)
signal.signal(signal.SIGTERM, original_sigterm_handler)
return instance_id
def _allocate_vm_with_proxy(region=DEFAULT_REGION, proxy_config_file=None):
"""Allocate a VM with proxy configuration"""
if not PROXY_SUPPORT_AVAILABLE:
logger.warning("Proxy support not available, falling back to regular VM allocation")
return _allocate_vm(region)
from desktop_env.providers.aws.provider_with_proxy import AWSProviderWithProxy
# Initialize proxy pool if needed
if proxy_config_file:
init_proxy_pool(proxy_config_file)
# Get current proxy
proxy_pool = get_global_proxy_pool()
current_proxy = proxy_pool.get_next_proxy()
if current_proxy:
logger.info(f"Allocating VM with proxy: {current_proxy.host}:{current_proxy.port}")
# Create provider instance
provider = AWSProviderWithProxy(region=region, proxy_config_file=proxy_config_file)
# Create new instance
instance_id = provider.create_instance_with_proxy(
image_id=IMAGE_ID_MAP[region],
instance_type=INSTANCE_TYPE,
security_groups=[os.getenv('AWS_SECURITY_GROUP_ID')],
subnet_id=os.getenv('AWS_SUBNET_ID')
)
try:
ec2_client = boto3.client('ec2', region_name=region)
instance_details = ec2_client.describe_instances(InstanceIds=[instance_id])
instance = instance_details['Reservations'][0]['Instances'][0]
public_ip = instance.get('PublicIpAddress', '')
if public_ip:
vnc_url = f"http://{public_ip}:5910/vnc.html"
logger.info("="*80)
logger.info(f"🖥️ VNC Web Access URL: {vnc_url}")
logger.info(f"📡 Public IP: {public_ip}")
logger.info(f"🆔 Instance ID: {instance_id}")
if current_proxy:
logger.info(f"🌐 Proxy: {current_proxy.host}:{current_proxy.port}")
logger.info("="*80)
print(f"\n🌐 VNC Web Access URL: {vnc_url}")
if current_proxy:
print(f"🔄 Current Proxy: {current_proxy.host}:{current_proxy.port}")
print(f"📍 Please open the above address in the browser for remote desktop access\n")
except Exception as e:
logger.warning(f"Failed to get VNC address for proxy instance {instance_id}: {e}")
return instance_id
class AWSVMManager(VMManager):
"""
AWS VM Manager for managing virtual machines on AWS.
AWS does not need to maintain a registry of VMs, as it can dynamically allocate and deallocate VMs.
This class supports both regular VM allocation and proxy-enabled VM allocation.
"""
def __init__(self, proxy_config_file=None, **kwargs):
self.proxy_config_file = proxy_config_file
# self.lock = FileLock(".aws_lck", timeout=60)
self.initialize_registry()
# Initialize proxy pool if proxy configuration is provided
if proxy_config_file and PROXY_SUPPORT_AVAILABLE:
init_proxy_pool(proxy_config_file)
logger.info(f"Proxy pool initialized with config: {proxy_config_file}")
def initialize_registry(self, **kwargs):
pass
def add_vm(self, vm_path, region=DEFAULT_REGION, lock_needed=True, **kwargs):
pass
def _add_vm(self, vm_path, region=DEFAULT_REGION):
pass
def delete_vm(self, vm_path, region=DEFAULT_REGION, lock_needed=True, **kwargs):
pass
def _delete_vm(self, vm_path, region=DEFAULT_REGION):
pass
def occupy_vm(self, vm_path, pid, region=DEFAULT_REGION, lock_needed=True, **kwargs):
pass
def _occupy_vm(self, vm_path, pid, region=DEFAULT_REGION):
pass
def check_and_clean(self, lock_needed=True, **kwargs):
pass
def _check_and_clean(self):
pass
def list_free_vms(self, region=DEFAULT_REGION, lock_needed=True, **kwargs):
pass
def _list_free_vms(self, region=DEFAULT_REGION):
pass
def get_vm_path(self, region=DEFAULT_REGION, **kwargs):
if self.proxy_config_file:
logger.info("Allocating a new VM with proxy configuration in region: {}".format(region))
new_vm_path = _allocate_vm_with_proxy(region, self.proxy_config_file)
else:
logger.info("Allocating a new VM in region: {}".format(region))
new_vm_path = _allocate_vm(region)
return new_vm_path