Refactor AWS scheduler role handling in scheduler_utils.py
- Improved error handling and logging for role resolution and creation. - Added checks to ensure the trust policy allows for AWS EventBridge Scheduler to assume the role. - Implemented retry logic for scheduling EC2 termination to handle IAM eventual consistency. - Maintained existing code logic while enhancing robustness and clarity in role management.
This commit is contained in:
@@ -20,15 +20,12 @@ def _resolve_scheduler_role_arn(logger) -> str:
|
|||||||
derived_arn = f"arn:aws:iam::{account_id}:role/{role_name}"
|
derived_arn = f"arn:aws:iam::{account_id}:role/{role_name}"
|
||||||
iam = boto3.client('iam')
|
iam = boto3.client('iam')
|
||||||
try:
|
try:
|
||||||
iam.get_role(RoleName=role_name)
|
role = iam.get_role(RoleName=role_name)["Role"]
|
||||||
logger.info(f"Derived AWS_SCHEDULER_ROLE_ARN={derived_arn} from role name '{role_name}'")
|
except ClientError:
|
||||||
return derived_arn
|
|
||||||
except ClientError as e:
|
|
||||||
auto_create = os.getenv('AWS_AUTO_CREATE_SCHEDULER_ROLE', 'true').lower() == 'true'
|
auto_create = os.getenv('AWS_AUTO_CREATE_SCHEDULER_ROLE', 'true').lower() == 'true'
|
||||||
if not auto_create:
|
if not auto_create:
|
||||||
logger.warning(f"Scheduler role '{role_name}' not found and auto-create disabled: {e}")
|
logger.warning(f"Scheduler role '{role_name}' not found and auto-create disabled.")
|
||||||
return ''
|
return ''
|
||||||
# Attempt to create role
|
|
||||||
try:
|
try:
|
||||||
trust_policy = {
|
trust_policy = {
|
||||||
"Version": "2012-10-17",
|
"Version": "2012-10-17",
|
||||||
@@ -40,33 +37,68 @@ def _resolve_scheduler_role_arn(logger) -> str:
|
|||||||
}
|
}
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
iam.create_role(
|
iam.create_role(RoleName=role_name, AssumeRolePolicyDocument=json.dumps(trust_policy))
|
||||||
RoleName=role_name,
|
role = iam.get_role(RoleName=role_name)["Role"]
|
||||||
AssumeRolePolicyDocument=json.dumps(trust_policy)
|
|
||||||
)
|
|
||||||
# Attach minimal inline policy
|
|
||||||
inline_policy = {
|
|
||||||
"Version": "2012-10-17",
|
|
||||||
"Statement": [
|
|
||||||
{
|
|
||||||
"Effect": "Allow",
|
|
||||||
"Action": ["ec2:TerminateInstances", "ec2:DescribeInstances"],
|
|
||||||
"Resource": "*"
|
|
||||||
}
|
|
||||||
]
|
|
||||||
}
|
|
||||||
iam.put_role_policy(
|
|
||||||
RoleName=role_name,
|
|
||||||
PolicyName=f"{role_name}-inline",
|
|
||||||
PolicyDocument=json.dumps(inline_policy)
|
|
||||||
)
|
|
||||||
# Small wait for IAM propagation
|
|
||||||
time.sleep(3)
|
|
||||||
logger.info(f"Auto-created scheduler role '{role_name}'. Using {derived_arn}")
|
|
||||||
return derived_arn
|
|
||||||
except ClientError as ce:
|
except ClientError as ce:
|
||||||
logger.warning(f"Failed to auto-create scheduler role '{role_name}': {ce}")
|
# If another process created it, fetch again
|
||||||
return ''
|
try:
|
||||||
|
role = iam.get_role(RoleName=role_name)["Role"]
|
||||||
|
except ClientError:
|
||||||
|
logger.warning(f"Failed to auto-create scheduler role '{role_name}': {ce}")
|
||||||
|
return ''
|
||||||
|
|
||||||
|
# Ensure trust policy allows scheduler.amazonaws.com
|
||||||
|
assume_doc = role.get("AssumeRolePolicyDocument", {})
|
||||||
|
principal_ok = False
|
||||||
|
try:
|
||||||
|
for stmt in assume_doc.get("Statement", []):
|
||||||
|
principal = stmt.get("Principal", {})
|
||||||
|
svc = principal.get("Service")
|
||||||
|
if isinstance(svc, str) and svc == "scheduler.amazonaws.com":
|
||||||
|
principal_ok = True
|
||||||
|
break
|
||||||
|
if isinstance(svc, list) and "scheduler.amazonaws.com" in svc:
|
||||||
|
principal_ok = True
|
||||||
|
break
|
||||||
|
except Exception:
|
||||||
|
principal_ok = False
|
||||||
|
if not principal_ok:
|
||||||
|
trust_policy = {
|
||||||
|
"Version": "2012-10-17",
|
||||||
|
"Statement": [
|
||||||
|
{
|
||||||
|
"Effect": "Allow",
|
||||||
|
"Principal": {"Service": "scheduler.amazonaws.com"},
|
||||||
|
"Action": "sts:AssumeRole"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
iam.update_assume_role_policy(RoleName=role_name, PolicyDocument=json.dumps(trust_policy))
|
||||||
|
|
||||||
|
# Ensure minimal inline policy exists
|
||||||
|
inline_name = f"{role_name}-inline"
|
||||||
|
need_policy = False
|
||||||
|
try:
|
||||||
|
iam.get_role_policy(RoleName=role_name, PolicyName=inline_name)
|
||||||
|
except ClientError:
|
||||||
|
need_policy = True
|
||||||
|
if need_policy:
|
||||||
|
inline_policy = {
|
||||||
|
"Version": "2012-10-17",
|
||||||
|
"Statement": [
|
||||||
|
{
|
||||||
|
"Effect": "Allow",
|
||||||
|
"Action": ["ec2:TerminateInstances", "ec2:DescribeInstances"],
|
||||||
|
"Resource": "*"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
iam.put_role_policy(RoleName=role_name, PolicyName=inline_name, PolicyDocument=json.dumps(inline_policy))
|
||||||
|
|
||||||
|
# Wait for IAM propagation
|
||||||
|
time.sleep(8)
|
||||||
|
logger.info(f"Derived AWS_SCHEDULER_ROLE_ARN={derived_arn} from role name '{role_name}'")
|
||||||
|
return derived_arn
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning(f"Failed to resolve Scheduler Role ARN: {e}")
|
logger.warning(f"Failed to resolve Scheduler Role ARN: {e}")
|
||||||
return ''
|
return ''
|
||||||
@@ -81,27 +113,41 @@ def schedule_instance_termination(region: str, instance_id: str, ttl_seconds: in
|
|||||||
scheduler_client = boto3.client('scheduler', region_name=region)
|
scheduler_client = boto3.client('scheduler', region_name=region)
|
||||||
schedule_name = f"osworld-ttl-{instance_id}-{int(time.time())}"
|
schedule_name = f"osworld-ttl-{instance_id}-{int(time.time())}"
|
||||||
eta_scheduler = datetime.now(timezone.utc) + timedelta(seconds=ttl_seconds)
|
eta_scheduler = datetime.now(timezone.utc) + timedelta(seconds=ttl_seconds)
|
||||||
# EventBridge Scheduler expects RFC3339 without trailing Z for 'at()' when region-local is fine
|
|
||||||
schedule_expression = f"at({eta_scheduler.strftime('%Y-%m-%dT%H:%M:%S')})"
|
schedule_expression = f"at({eta_scheduler.strftime('%Y-%m-%dT%H:%M:%S')})"
|
||||||
target_arn = "arn:aws:scheduler:::aws-sdk:ec2:terminateInstances"
|
target_arn = "arn:aws:scheduler:::aws-sdk:ec2:terminateInstances"
|
||||||
input_payload = '{"InstanceIds":["' + instance_id + '"]}'
|
input_payload = '{"InstanceIds":["' + instance_id + '"]}'
|
||||||
|
|
||||||
scheduler_client.create_schedule(
|
# Retry to tolerate IAM eventual consistency
|
||||||
Name=schedule_name,
|
last_err = None
|
||||||
ScheduleExpression=schedule_expression,
|
for attempt in range(1, 7): # ~ up to ~60s
|
||||||
FlexibleTimeWindow={"Mode": "OFF"},
|
try:
|
||||||
ActionAfterCompletion='DELETE',
|
scheduler_client.create_schedule(
|
||||||
Target={
|
Name=schedule_name,
|
||||||
"Arn": target_arn,
|
ScheduleExpression=schedule_expression,
|
||||||
"RoleArn": role_arn,
|
FlexibleTimeWindow={"Mode": "OFF"},
|
||||||
"Input": input_payload
|
ActionAfterCompletion='DELETE',
|
||||||
},
|
Target={
|
||||||
State='ENABLED',
|
"Arn": target_arn,
|
||||||
Description=f"OSWorld TTL terminate for {instance_id}"
|
"RoleArn": role_arn,
|
||||||
)
|
"Input": input_payload
|
||||||
|
},
|
||||||
logger.info(
|
State='ENABLED',
|
||||||
f"Scheduled EC2 termination via EventBridge Scheduler: name={schedule_name}, when={eta_scheduler.isoformat()} (UTC)"
|
Description=f"OSWorld TTL terminate for {instance_id}"
|
||||||
)
|
)
|
||||||
|
logger.info(f"Scheduled EC2 termination via EventBridge Scheduler: name={schedule_name}, when={eta_scheduler.isoformat()} (UTC)")
|
||||||
|
last_err = None
|
||||||
|
break
|
||||||
|
except ClientError as e:
|
||||||
|
last_err = e
|
||||||
|
code = e.response.get('Error', {}).get('Code')
|
||||||
|
msg = e.response.get('Error', {}).get('Message', '')
|
||||||
|
if code == 'ValidationException' and 'must allow AWS EventBridge Scheduler to assume the role' in msg:
|
||||||
|
time.sleep(10)
|
||||||
|
continue
|
||||||
|
else:
|
||||||
|
raise
|
||||||
|
if last_err is not None:
|
||||||
|
# If we exhausted retries, re-raise to surface warning upstream
|
||||||
|
raise last_err
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user