Files
sci-gui-agent-benchmark/desktop_env/providers/aws/manager.py
Tianbao Xie 30138c5db1 VLC fix (#224)
* Enhance SetupController with improved logging and error handling during setup and file upload processes. Update instance type to t3.xlarge and AMI ID for AWS configuration. Add download progress logging and exception handling for better debugging.

* Enhance VLC status evaluation by adding multiple paths for file and URL information extraction, improving robustness against varying VLC XML structures. Implement detailed logging for better debugging and error handling in case of mismatches or missing data. Update example JSON for VLC evaluation to use a valid HLS stream URL.

* Improve audio comparison robustness in VLC evaluator by adding error handling for audio file loading and extraction. Implement detailed logging for empty or corrupt files, and normalize DTW distance calculation for more accurate similarity scoring. Remove deprecated audio fingerprint comparison function.

---------

Co-authored-by: yuanmengqi <yuanmengqi@mail.ustc.edu.cn>
2025-06-29 20:18:44 +08:00

271 lines
10 KiB
Python

import os
from filelock import FileLock
import boto3
import psutil
import logging
import dotenv
import signal
INSTANCE_TYPE = "t3.xlarge"
# Load environment variables from .env file
dotenv.load_dotenv()
# Ensure the AWS region is set in the environment
if not os.getenv('AWS_REGION'):
raise EnvironmentError("AWS_REGION must be set in the environment variables.")
# Ensure the AWS subnet and security group IDs are set in the environment
if not os.getenv('AWS_SUBNET_ID') or not os.getenv('AWS_SECURITY_GROUP_ID'):
raise EnvironmentError("AWS_SUBNET_ID and AWS_SECURITY_GROUP_ID must be set in the environment variables.")
from desktop_env.providers.base import VMManager
# Import proxy-related modules only when needed
try:
from desktop_env.providers.aws.proxy_pool import get_global_proxy_pool, init_proxy_pool
PROXY_SUPPORT_AVAILABLE = True
except ImportError:
PROXY_SUPPORT_AVAILABLE = False
logger = logging.getLogger("desktopenv.providers.aws.AWSVMManager")
logger.setLevel(logging.INFO)
DEFAULT_REGION = "us-east-1"
# todo: Add doc for the configuration of image, security group and network interface
# todo: public the AMI images
IMAGE_ID_MAP = {
"us-east-1": "ami-0cae20d2680c939d4",
"ap-east-1": "ami-0c092a5b8be4116f5",
}
def _allocate_vm(region=DEFAULT_REGION):
if region not in IMAGE_ID_MAP:
raise ValueError(f"Region {region} is not supported. Supported regions are: {list(IMAGE_ID_MAP.keys())}")
ec2_client = boto3.client('ec2', region_name=region)
instance_id = None
original_sigint_handler = signal.getsignal(signal.SIGINT)
original_sigterm_handler = signal.getsignal(signal.SIGTERM)
def signal_handler(sig, frame):
if instance_id:
signal_name = "SIGINT" if sig == signal.SIGINT else "SIGTERM"
logger.warning(f"Received {signal_name} signal, terminating instance {instance_id}...")
try:
ec2_client.terminate_instances(InstanceIds=[instance_id])
logger.info(f"Successfully terminated instance {instance_id} after {signal_name}.")
except Exception as cleanup_error:
logger.error(f"Failed to terminate instance {instance_id} after {signal_name}: {str(cleanup_error)}")
# Restore original signal handlers
signal.signal(signal.SIGINT, original_sigint_handler)
signal.signal(signal.SIGTERM, original_sigterm_handler)
# Raise appropriate exception based on signal type
if sig == signal.SIGINT:
raise KeyboardInterrupt
else:
# For SIGTERM, exit gracefully
import sys
sys.exit(0)
try:
# Set up signal handlers for both SIGINT and SIGTERM
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
if not os.getenv('AWS_SECURITY_GROUP_ID'):
raise ValueError("AWS_SECURITY_GROUP_ID is not set in the environment variables.")
if not os.getenv('AWS_SUBNET_ID'):
raise ValueError("AWS_SUBNET_ID is not set in the environment variables.")
run_instances_params = {
"MaxCount": 1,
"MinCount": 1,
"ImageId": IMAGE_ID_MAP[region],
"InstanceType": INSTANCE_TYPE,
"EbsOptimized": True,
"NetworkInterfaces": [
{
"SubnetId": os.getenv('AWS_SUBNET_ID'),
"AssociatePublicIpAddress": True,
"DeviceIndex": 0,
"Groups": [
os.getenv('AWS_SECURITY_GROUP_ID')
]
}
],
"BlockDeviceMappings": [
{
"DeviceName": "/dev/sda1",
"Ebs": {
# "VolumeInitializationRate": 300
"VolumeSize": 30, # Size in GB
"VolumeType": "gp3", # General Purpose SSD
"Throughput": 1000,
"Iops": 4000 # Adjust IOPS as needed
}
}
]
}
response = ec2_client.run_instances(**run_instances_params)
instance_id = response['Instances'][0]['InstanceId']
waiter = ec2_client.get_waiter('instance_running')
logger.info(f"Waiting for instance {instance_id} to be running...")
waiter.wait(InstanceIds=[instance_id])
logger.info(f"Instance {instance_id} is ready.")
# 获取并显示VNC访问地址
try:
instance_details = ec2_client.describe_instances(InstanceIds=[instance_id])
instance = instance_details['Reservations'][0]['Instances'][0]
public_ip = instance.get('PublicIpAddress', '')
if public_ip:
vnc_url = f"http://{public_ip}:5910/vnc.html"
logger.info("="*80)
logger.info(f"🖥️ VNC Web Access URL: {vnc_url}")
logger.info(f"📡 Public IP: {public_ip}")
logger.info(f"🆔 Instance ID: {instance_id}")
logger.info("="*80)
print(f"\n🌐 VNC访问地址: {vnc_url}")
print(f"📍 请在浏览器中打开上述地址进行远程桌面访问\n")
except Exception as e:
logger.warning(f"Failed to get VNC address for instance {instance_id}: {e}")
except KeyboardInterrupt:
logger.warning("VM allocation interrupted by user (SIGINT).")
if instance_id:
logger.info(f"Terminating instance {instance_id} due to interruption.")
ec2_client.terminate_instances(InstanceIds=[instance_id])
raise
except Exception as e:
logger.error(f"Failed to allocate VM: {e}", exc_info=True)
if instance_id:
logger.info(f"Terminating instance {instance_id} due to an error.")
ec2_client.terminate_instances(InstanceIds=[instance_id])
raise
finally:
# Restore original signal handlers
signal.signal(signal.SIGINT, original_sigint_handler)
signal.signal(signal.SIGTERM, original_sigterm_handler)
return instance_id
def _allocate_vm_with_proxy(region=DEFAULT_REGION, proxy_config_file=None):
"""Allocate a VM with proxy configuration"""
if not PROXY_SUPPORT_AVAILABLE:
logger.warning("Proxy support not available, falling back to regular VM allocation")
return _allocate_vm(region)
from desktop_env.providers.aws.provider_with_proxy import AWSProviderWithProxy
# Initialize proxy pool if needed
if proxy_config_file:
init_proxy_pool(proxy_config_file)
# Get current proxy
proxy_pool = get_global_proxy_pool()
current_proxy = proxy_pool.get_next_proxy()
if current_proxy:
logger.info(f"Allocating VM with proxy: {current_proxy.host}:{current_proxy.port}")
# Create provider instance
provider = AWSProviderWithProxy(region=region, proxy_config_file=proxy_config_file)
# Create new instance
instance_id = provider.create_instance_with_proxy(
image_id=IMAGE_ID_MAP[region],
instance_type=INSTANCE_TYPE,
security_groups=[os.getenv('AWS_SECURITY_GROUP_ID')],
subnet_id=os.getenv('AWS_SUBNET_ID')
)
try:
ec2_client = boto3.client('ec2', region_name=region)
instance_details = ec2_client.describe_instances(InstanceIds=[instance_id])
instance = instance_details['Reservations'][0]['Instances'][0]
public_ip = instance.get('PublicIpAddress', '')
if public_ip:
vnc_url = f"http://{public_ip}:5910/vnc.html"
logger.info("="*80)
logger.info(f"🖥️ VNC Web Access URL: {vnc_url}")
logger.info(f"📡 Public IP: {public_ip}")
logger.info(f"🆔 Instance ID: {instance_id}")
if current_proxy:
logger.info(f"🌐 Proxy: {current_proxy.host}:{current_proxy.port}")
logger.info("="*80)
print(f"\n🌐 VNC Web Access URL: {vnc_url}")
if current_proxy:
print(f"🔄 Current Proxy: {current_proxy.host}:{current_proxy.port}")
print(f"📍 Please open the above address in the browser for remote desktop access\n")
except Exception as e:
logger.warning(f"Failed to get VNC address for proxy instance {instance_id}: {e}")
return instance_id
class AWSVMManager(VMManager):
"""
AWS VM Manager for managing virtual machines on AWS.
AWS does not need to maintain a registry of VMs, as it can dynamically allocate and deallocate VMs.
This class supports both regular VM allocation and proxy-enabled VM allocation.
"""
def __init__(self, proxy_config_file=None, **kwargs):
self.proxy_config_file = proxy_config_file
# self.lock = FileLock(".aws_lck", timeout=60)
self.initialize_registry()
# Initialize proxy pool if proxy configuration is provided
if proxy_config_file and PROXY_SUPPORT_AVAILABLE:
init_proxy_pool(proxy_config_file)
logger.info(f"Proxy pool initialized with config: {proxy_config_file}")
def initialize_registry(self, **kwargs):
pass
def add_vm(self, vm_path, region=DEFAULT_REGION, lock_needed=True, **kwargs):
pass
def _add_vm(self, vm_path, region=DEFAULT_REGION):
pass
def delete_vm(self, vm_path, region=DEFAULT_REGION, lock_needed=True, **kwargs):
pass
def _delete_vm(self, vm_path, region=DEFAULT_REGION):
pass
def occupy_vm(self, vm_path, pid, region=DEFAULT_REGION, lock_needed=True, **kwargs):
pass
def _occupy_vm(self, vm_path, pid, region=DEFAULT_REGION):
pass
def check_and_clean(self, lock_needed=True, **kwargs):
pass
def _check_and_clean(self):
pass
def list_free_vms(self, region=DEFAULT_REGION, lock_needed=True, **kwargs):
pass
def _list_free_vms(self, region=DEFAULT_REGION):
pass
def get_vm_path(self, region=DEFAULT_REGION, **kwargs):
if self.proxy_config_file:
logger.info("Allocating a new VM with proxy configuration in region: {}".format(region))
new_vm_path = _allocate_vm_with_proxy(region, self.proxy_config_file)
else:
logger.info("Allocating a new VM in region: {}".format(region))
new_vm_path = _allocate_vm(region)
return new_vm_path