feat: Add Aliyun provider support for desktop environment (#304)
* Adding support for aliyun as a provider * feat: enhance Aliyun provider support - Added Aliyun as a new provider in the desktop environment. - Updated the environment configuration guidelines for Aliyun, including prerequisites and environment variables. - Implemented instance allocation and management functions for Aliyun ECS, including signal handling for graceful termination. - Improved logging and error handling during instance creation and status checks. - Adjusted the provider's methods to utilize the new instance management functions.
This commit is contained in:
289
desktop_env/providers/aliyun/manager.py
Normal file
289
desktop_env/providers/aliyun/manager.py
Normal file
@@ -0,0 +1,289 @@
|
||||
import os
|
||||
import logging
|
||||
import dotenv
|
||||
import time
|
||||
import signal
|
||||
import requests
|
||||
|
||||
from alibabacloud_ecs20140526.client import Client as ECSClient
|
||||
from alibabacloud_tea_openapi import models as open_api_models
|
||||
from alibabacloud_ecs20140526 import models as ecs_models
|
||||
from alibabacloud_tea_util.client import Client as UtilClient
|
||||
from desktop_env.providers.base import VMManager
|
||||
|
||||
|
||||
dotenv.load_dotenv()
|
||||
|
||||
for env_name in [
|
||||
"ALIYUN_REGION",
|
||||
"ALIYUN_VSWITCH_ID",
|
||||
"ALIYUN_SECURITY_GROUP_ID",
|
||||
"ALIYUN_IMAGE_ID",
|
||||
"ALIYUN_ACCESS_KEY_ID",
|
||||
"ALIYUN_ACCESS_KEY_SECRET",
|
||||
"ALIYUN_INSTANCE_TYPE",
|
||||
]:
|
||||
if not os.getenv(env_name):
|
||||
raise EnvironmentError(f"{env_name} must be set in the environment variables.")
|
||||
|
||||
|
||||
logger = logging.getLogger("desktopenv.providers.aliyun.AliyunVMManager")
|
||||
logger.setLevel(logging.INFO)
|
||||
|
||||
ALIYUN_INSTANCE_TYPE = os.getenv("ALIYUN_INSTANCE_TYPE")
|
||||
ALIYUN_ACCESS_KEY_ID = os.getenv("ALIYUN_ACCESS_KEY_ID")
|
||||
ALIYUN_ACCESS_KEY_SECRET = os.getenv("ALIYUN_ACCESS_KEY_SECRET")
|
||||
ALIYUN_REGION = os.getenv("ALIYUN_REGION")
|
||||
ALIYUN_IMAGE_ID = os.getenv("ALIYUN_IMAGE_ID")
|
||||
ALIYUN_SECURITY_GROUP_ID = os.getenv("ALIYUN_SECURITY_GROUP_ID")
|
||||
ALIYUN_VSWITCH_ID = os.getenv("ALIYUN_VSWITCH_ID")
|
||||
|
||||
WAIT_DELAY = 20
|
||||
MAX_ATTEMPTS = 15
|
||||
|
||||
|
||||
def _allocate_vm(screen_size=(1920, 1080)):
|
||||
"""
|
||||
Allocate a new Aliyun ECS instance
|
||||
"""
|
||||
assert screen_size == (1920, 1080), "Only 1920x1080 screen size is supported"
|
||||
|
||||
config = open_api_models.Config(
|
||||
access_key_id=ALIYUN_ACCESS_KEY_ID,
|
||||
access_key_secret=ALIYUN_ACCESS_KEY_SECRET,
|
||||
region_id=ALIYUN_REGION,
|
||||
)
|
||||
client = ECSClient(config)
|
||||
instance_id = None
|
||||
original_sigint_handler = signal.getsignal(signal.SIGINT)
|
||||
original_sigterm_handler = signal.getsignal(signal.SIGTERM)
|
||||
|
||||
def signal_handler(sig, frame):
|
||||
if instance_id:
|
||||
signal_name = "SIGINT" if sig == signal.SIGINT else "SIGTERM"
|
||||
logger.warning(
|
||||
f"Received {signal_name} signal, terminating instance {instance_id}..."
|
||||
)
|
||||
try:
|
||||
delete_request = ecs_models.DeleteInstancesRequest(
|
||||
region_id=ALIYUN_REGION,
|
||||
instance_ids=UtilClient.to_jsonstring([instance_id]),
|
||||
force=True,
|
||||
)
|
||||
client.delete_instances(delete_request)
|
||||
logger.info(
|
||||
f"Successfully terminated instance {instance_id} after {signal_name}."
|
||||
)
|
||||
except Exception as cleanup_error:
|
||||
logger.error(
|
||||
f"Failed to terminate instance {instance_id} after {signal_name}: {str(cleanup_error)}"
|
||||
)
|
||||
|
||||
# Restore original signal handlers
|
||||
signal.signal(signal.SIGINT, original_sigint_handler)
|
||||
signal.signal(signal.SIGTERM, original_sigterm_handler)
|
||||
|
||||
# Raise appropriate exception based on signal type
|
||||
if sig == signal.SIGINT:
|
||||
raise KeyboardInterrupt
|
||||
else:
|
||||
# For SIGTERM, exit gracefully
|
||||
import sys
|
||||
|
||||
sys.exit(0)
|
||||
|
||||
try:
|
||||
# Set up signal handlers for both SIGINT and SIGTERM
|
||||
signal.signal(signal.SIGINT, signal_handler)
|
||||
signal.signal(signal.SIGTERM, signal_handler)
|
||||
|
||||
logger.info(
|
||||
f"Creating new ECS instance in region {ALIYUN_REGION} with image {ALIYUN_IMAGE_ID}"
|
||||
)
|
||||
|
||||
# Create instance request
|
||||
request = ecs_models.RunInstancesRequest(
|
||||
region_id=ALIYUN_REGION,
|
||||
image_id=ALIYUN_IMAGE_ID,
|
||||
instance_type=ALIYUN_INSTANCE_TYPE,
|
||||
security_group_id=ALIYUN_SECURITY_GROUP_ID,
|
||||
v_switch_id=ALIYUN_VSWITCH_ID,
|
||||
instance_name=f"OSWorld-Desktop-{int(time.time())}",
|
||||
description="OSWorld Desktop Environment Instance",
|
||||
internet_max_bandwidth_out=10,
|
||||
internet_charge_type="PayByTraffic",
|
||||
instance_charge_type="PostPaid",
|
||||
system_disk=ecs_models.RunInstancesRequestSystemDisk(
|
||||
size="50",
|
||||
category="cloud_essd",
|
||||
),
|
||||
deletion_protection=False,
|
||||
)
|
||||
|
||||
# Create the instance
|
||||
response = client.run_instances(request)
|
||||
instance_ids = response.body.instance_id_sets.instance_id_set
|
||||
|
||||
if not instance_ids:
|
||||
raise RuntimeError(
|
||||
"Failed to create ECS instance - no instance ID returned"
|
||||
)
|
||||
|
||||
instance_id = instance_ids[0]
|
||||
logger.info(f"ECS instance {instance_id} created successfully")
|
||||
|
||||
# Wait for the instance to be running
|
||||
logger.info(f"Waiting for instance {instance_id} to be running...")
|
||||
_wait_for_instance_running(client, instance_id)
|
||||
|
||||
logger.info(f"Instance {instance_id} is now running and ready")
|
||||
|
||||
except KeyboardInterrupt:
|
||||
logger.warning("VM allocation interrupted by user (SIGINT).")
|
||||
if instance_id:
|
||||
logger.info(f"Terminating instance {instance_id} due to interruption.")
|
||||
try:
|
||||
delete_request = ecs_models.DeleteInstancesRequest(
|
||||
region_id=ALIYUN_REGION,
|
||||
instance_ids=UtilClient.to_jsonstring([instance_id]),
|
||||
force=True,
|
||||
)
|
||||
client.delete_instances(delete_request)
|
||||
except Exception as cleanup_error:
|
||||
logger.error(
|
||||
f"Failed to cleanup instance {instance_id}: {str(cleanup_error)}"
|
||||
)
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to allocate ECS instance: {str(e)}")
|
||||
if instance_id:
|
||||
logger.info(f"Terminating instance {instance_id} due to an error.")
|
||||
try:
|
||||
delete_request = ecs_models.DeleteInstancesRequest(
|
||||
region_id=ALIYUN_REGION,
|
||||
instance_ids=UtilClient.to_jsonstring([instance_id]),
|
||||
force=True,
|
||||
)
|
||||
client.delete_instances(delete_request)
|
||||
except Exception as cleanup_error:
|
||||
logger.error(
|
||||
f"Failed to cleanup instance {instance_id}: {str(cleanup_error)}"
|
||||
)
|
||||
raise
|
||||
finally:
|
||||
# Restore original signal handlers
|
||||
signal.signal(signal.SIGINT, original_sigint_handler)
|
||||
signal.signal(signal.SIGTERM, original_sigterm_handler)
|
||||
|
||||
return instance_id
|
||||
|
||||
|
||||
def _wait_for_instance_running(
|
||||
client: ECSClient, instance_id: str, max_attempts: int = MAX_ATTEMPTS
|
||||
):
|
||||
"""Wait for instance to reach Running state"""
|
||||
for _ in range(max_attempts):
|
||||
try:
|
||||
req = ecs_models.DescribeInstancesRequest(
|
||||
region_id=ALIYUN_REGION,
|
||||
instance_ids=UtilClient.to_jsonstring([instance_id]),
|
||||
)
|
||||
response = client.describe_instances(req)
|
||||
|
||||
if response.body.instances.instance:
|
||||
instance = response.body.instances.instance[0]
|
||||
status = instance.status
|
||||
logger.info(f"Instance {instance_id} status: {status}")
|
||||
|
||||
if status == "Running":
|
||||
return
|
||||
elif status in ["Stopped", "Stopping"]:
|
||||
start_req = ecs_models.StartInstanceRequest(instance_id=instance_id)
|
||||
client.start_instance(start_req)
|
||||
logger.info(f"Started instance {instance_id}")
|
||||
|
||||
time.sleep(WAIT_DELAY)
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"Error checking instance status: {e}")
|
||||
time.sleep(WAIT_DELAY)
|
||||
|
||||
raise TimeoutError(
|
||||
f"Instance {instance_id} did not reach Running state within {max_attempts * WAIT_DELAY} seconds"
|
||||
)
|
||||
|
||||
|
||||
def _wait_until_server_ready(public_ip: str):
|
||||
"""Wait until the server is ready"""
|
||||
for _ in range(MAX_ATTEMPTS):
|
||||
try:
|
||||
logger.info(f"Checking server status on {public_ip}...")
|
||||
response = requests.get(f"http://{public_ip}:5000/", timeout=2)
|
||||
if response.status_code == 404:
|
||||
logger.info(f"Server {public_ip} is ready")
|
||||
return
|
||||
except Exception:
|
||||
time.sleep(WAIT_DELAY)
|
||||
|
||||
raise TimeoutError(
|
||||
f"Server {public_ip} did not respond within {MAX_ATTEMPTS * WAIT_DELAY} seconds"
|
||||
)
|
||||
|
||||
|
||||
class AliyunVMManager(VMManager):
|
||||
"""
|
||||
Aliyun ECS VM Manager for managing virtual machines on Aliyun Cloud.
|
||||
|
||||
Aliyun ECS does not need to maintain a registry of VMs, as it can dynamically allocate and deallocate VMs.
|
||||
"""
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
self.initialize_registry()
|
||||
|
||||
def initialize_registry(self, **kwargs):
|
||||
pass
|
||||
|
||||
def add_vm(self, vm_path, lock_needed=True, **kwargs):
|
||||
pass
|
||||
|
||||
def _add_vm(self, vm_path):
|
||||
pass
|
||||
|
||||
def delete_vm(self, vm_path, lock_needed=True, **kwargs):
|
||||
pass
|
||||
|
||||
def _delete_vm(self, vm_path):
|
||||
pass
|
||||
|
||||
def occupy_vm(self, vm_path, pid, lock_needed=True, **kwargs):
|
||||
pass
|
||||
|
||||
def _occupy_vm(self, vm_path, pid):
|
||||
pass
|
||||
|
||||
def check_and_clean(self, lock_needed=True, **kwargs):
|
||||
pass
|
||||
|
||||
def _check_and_clean(self):
|
||||
pass
|
||||
|
||||
def list_free_vms(self, lock_needed=True, **kwargs):
|
||||
pass
|
||||
|
||||
def _list_free_vms(self):
|
||||
pass
|
||||
|
||||
def get_vm_path(self, screen_size=(1920, 1080), **kwargs):
|
||||
"""Get a VM path (instance ID) for use"""
|
||||
logger.info(
|
||||
f"Allocating new ECS instance in region {ALIYUN_REGION} with screen size {screen_size}"
|
||||
)
|
||||
|
||||
try:
|
||||
instance_id = _allocate_vm(screen_size)
|
||||
logger.info(f"Successfully allocated instance {instance_id}")
|
||||
return instance_id
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to allocate instance: {str(e)}")
|
||||
raise
|
||||
Reference in New Issue
Block a user