feat&fix: add signal handling for VM allocation and improve cleanup on termination
This commit is contained in:
@@ -4,6 +4,7 @@ import boto3
|
|||||||
import psutil
|
import psutil
|
||||||
import logging
|
import logging
|
||||||
import dotenv
|
import dotenv
|
||||||
|
import signal
|
||||||
# Load environment variables from .env file
|
# Load environment variables from .env file
|
||||||
dotenv.load_dotenv()
|
dotenv.load_dotenv()
|
||||||
|
|
||||||
@@ -57,22 +58,62 @@ def _allocate_vm(region=DEFAULT_REGION):
|
|||||||
}
|
}
|
||||||
|
|
||||||
ec2_client = boto3.client('ec2', region_name=region)
|
ec2_client = boto3.client('ec2', region_name=region)
|
||||||
|
instance_id = None
|
||||||
|
original_sigint_handler = signal.getsignal(signal.SIGINT)
|
||||||
|
original_sigterm_handler = signal.getsignal(signal.SIGTERM)
|
||||||
|
|
||||||
|
def signal_handler(sig, frame):
|
||||||
|
if instance_id:
|
||||||
|
signal_name = "SIGINT" if sig == signal.SIGINT else "SIGTERM"
|
||||||
|
logger.warning(f"Received {signal_name} signal, terminating instance {instance_id}...")
|
||||||
|
try:
|
||||||
|
ec2_client.terminate_instances(InstanceIds=[instance_id])
|
||||||
|
logger.info(f"Successfully terminated instance {instance_id} after {signal_name}.")
|
||||||
|
except Exception as cleanup_error:
|
||||||
|
logger.error(f"Failed to terminate instance {instance_id} after {signal_name}: {str(cleanup_error)}")
|
||||||
|
|
||||||
|
# Restore original signal handlers
|
||||||
|
signal.signal(signal.SIGINT, original_sigint_handler)
|
||||||
|
signal.signal(signal.SIGTERM, original_sigterm_handler)
|
||||||
|
|
||||||
|
# Raise appropriate exception based on signal type
|
||||||
|
if sig == signal.SIGINT:
|
||||||
|
raise KeyboardInterrupt
|
||||||
|
else:
|
||||||
|
# For SIGTERM, exit gracefully
|
||||||
|
import sys
|
||||||
|
sys.exit(0)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
# Set up signal handlers for both SIGINT and SIGTERM
|
||||||
|
signal.signal(signal.SIGINT, signal_handler)
|
||||||
|
signal.signal(signal.SIGTERM, signal_handler)
|
||||||
|
|
||||||
response = ec2_client.run_instances(**run_instances_params)
|
response = ec2_client.run_instances(**run_instances_params)
|
||||||
instance_id = response['Instances'][0]['InstanceId']
|
instance_id = response['Instances'][0]['InstanceId']
|
||||||
logger.info(f"Waiting for instance {instance_id} to be running...")
|
logger.info(f"Waiting for instance {instance_id} to be running...")
|
||||||
ec2_client.get_waiter('instance_running').wait(InstanceIds=[instance_id])
|
ec2_client.get_waiter('instance_running').wait(InstanceIds=[instance_id])
|
||||||
logger.info(f"Instance {instance_id} is ready.")
|
logger.info(f"Instance {instance_id} is ready.")
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
logger.warning("VM allocation interrupted by user (SIGINT).")
|
||||||
|
raise
|
||||||
|
except SystemExit:
|
||||||
|
logger.warning("VM allocation terminated by parent process (SIGTERM).")
|
||||||
|
raise
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Failed to allocate VM in region {region}: {str(e)}")
|
logger.error(f"Failed to allocate VM in region {region}: {str(e)}")
|
||||||
# try to clean up any resources that were created
|
# try to clean up any resources that were created
|
||||||
try:
|
try:
|
||||||
if 'InstanceId' in response['Instances'][0]:
|
if instance_id:
|
||||||
ec2_client.terminate_instances(InstanceIds=[instance_id])
|
ec2_client.terminate_instances(InstanceIds=[instance_id])
|
||||||
logger.info(f"Terminated instance {instance_id} due to allocation failure.")
|
logger.info(f"Terminated instance {instance_id} due to allocation failure.")
|
||||||
except Exception as cleanup_error:
|
except Exception as cleanup_error:
|
||||||
logger.error(f"May fail to clean up instance {instance_id}: {str(cleanup_error)}")
|
logger.error(f"May fail to clean up instance {instance_id}: {str(cleanup_error)}")
|
||||||
raise
|
raise
|
||||||
|
finally:
|
||||||
|
# Restore original signal handlers
|
||||||
|
signal.signal(signal.SIGINT, original_sigint_handler)
|
||||||
|
signal.signal(signal.SIGTERM, original_sigterm_handler)
|
||||||
|
|
||||||
return instance_id
|
return instance_id
|
||||||
|
|
||||||
|
|||||||
@@ -161,7 +161,6 @@ def process_signal_handler(signum, frame, env_idx):
|
|||||||
|
|
||||||
# Get the active_environments from the caller's frame
|
# Get the active_environments from the caller's frame
|
||||||
local_vars = frame.f_locals
|
local_vars = frame.f_locals
|
||||||
print(f"Local variables in process {env_idx + 1}: {local_vars}")
|
|
||||||
active_environments = local_vars.get('active_environments', [])
|
active_environments = local_vars.get('active_environments', [])
|
||||||
|
|
||||||
# Close environment in the current process context
|
# Close environment in the current process context
|
||||||
|
|||||||
Reference in New Issue
Block a user