From 492c910e944165ce9079aeba3456b2ba070f8d15 Mon Sep 17 00:00:00 2001 From: Timothyxxx Date: Mon, 18 Aug 2025 17:57:31 +0000 Subject: [PATCH] Refactor AWS scheduler role handling in scheduler_utils.py - Improved error handling and logging for role resolution and creation. - Added checks to ensure the trust policy allows for AWS EventBridge Scheduler to assume the role. - Implemented retry logic for scheduling EC2 termination to handle IAM eventual consistency. - Maintained existing code logic while enhancing robustness and clarity in role management. --- desktop_env/providers/aws/scheduler_utils.py | 146 ++++++++++++------- 1 file changed, 96 insertions(+), 50 deletions(-) diff --git a/desktop_env/providers/aws/scheduler_utils.py b/desktop_env/providers/aws/scheduler_utils.py index 0471157..63ec7a1 100644 --- a/desktop_env/providers/aws/scheduler_utils.py +++ b/desktop_env/providers/aws/scheduler_utils.py @@ -20,15 +20,12 @@ def _resolve_scheduler_role_arn(logger) -> str: derived_arn = f"arn:aws:iam::{account_id}:role/{role_name}" iam = boto3.client('iam') try: - iam.get_role(RoleName=role_name) - logger.info(f"Derived AWS_SCHEDULER_ROLE_ARN={derived_arn} from role name '{role_name}'") - return derived_arn - except ClientError as e: + role = iam.get_role(RoleName=role_name)["Role"] + except ClientError: auto_create = os.getenv('AWS_AUTO_CREATE_SCHEDULER_ROLE', 'true').lower() == 'true' if not auto_create: - logger.warning(f"Scheduler role '{role_name}' not found and auto-create disabled: {e}") + logger.warning(f"Scheduler role '{role_name}' not found and auto-create disabled.") return '' - # Attempt to create role try: trust_policy = { "Version": "2012-10-17", @@ -40,33 +37,68 @@ def _resolve_scheduler_role_arn(logger) -> str: } ] } - iam.create_role( - RoleName=role_name, - AssumeRolePolicyDocument=json.dumps(trust_policy) - ) - # Attach minimal inline policy - inline_policy = { - "Version": "2012-10-17", - "Statement": [ - { - "Effect": "Allow", - "Action": ["ec2:TerminateInstances", "ec2:DescribeInstances"], - "Resource": "*" - } - ] - } - iam.put_role_policy( - RoleName=role_name, - PolicyName=f"{role_name}-inline", - PolicyDocument=json.dumps(inline_policy) - ) - # Small wait for IAM propagation - time.sleep(3) - logger.info(f"Auto-created scheduler role '{role_name}'. Using {derived_arn}") - return derived_arn + iam.create_role(RoleName=role_name, AssumeRolePolicyDocument=json.dumps(trust_policy)) + role = iam.get_role(RoleName=role_name)["Role"] except ClientError as ce: - logger.warning(f"Failed to auto-create scheduler role '{role_name}': {ce}") - return '' + # If another process created it, fetch again + try: + role = iam.get_role(RoleName=role_name)["Role"] + except ClientError: + logger.warning(f"Failed to auto-create scheduler role '{role_name}': {ce}") + return '' + + # Ensure trust policy allows scheduler.amazonaws.com + assume_doc = role.get("AssumeRolePolicyDocument", {}) + principal_ok = False + try: + for stmt in assume_doc.get("Statement", []): + principal = stmt.get("Principal", {}) + svc = principal.get("Service") + if isinstance(svc, str) and svc == "scheduler.amazonaws.com": + principal_ok = True + break + if isinstance(svc, list) and "scheduler.amazonaws.com" in svc: + principal_ok = True + break + except Exception: + principal_ok = False + if not principal_ok: + trust_policy = { + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Principal": {"Service": "scheduler.amazonaws.com"}, + "Action": "sts:AssumeRole" + } + ] + } + iam.update_assume_role_policy(RoleName=role_name, PolicyDocument=json.dumps(trust_policy)) + + # Ensure minimal inline policy exists + inline_name = f"{role_name}-inline" + need_policy = False + try: + iam.get_role_policy(RoleName=role_name, PolicyName=inline_name) + except ClientError: + need_policy = True + if need_policy: + inline_policy = { + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Action": ["ec2:TerminateInstances", "ec2:DescribeInstances"], + "Resource": "*" + } + ] + } + iam.put_role_policy(RoleName=role_name, PolicyName=inline_name, PolicyDocument=json.dumps(inline_policy)) + + # Wait for IAM propagation + time.sleep(8) + logger.info(f"Derived AWS_SCHEDULER_ROLE_ARN={derived_arn} from role name '{role_name}'") + return derived_arn except Exception as e: logger.warning(f"Failed to resolve Scheduler Role ARN: {e}") return '' @@ -81,27 +113,41 @@ def schedule_instance_termination(region: str, instance_id: str, ttl_seconds: in scheduler_client = boto3.client('scheduler', region_name=region) schedule_name = f"osworld-ttl-{instance_id}-{int(time.time())}" eta_scheduler = datetime.now(timezone.utc) + timedelta(seconds=ttl_seconds) - # EventBridge Scheduler expects RFC3339 without trailing Z for 'at()' when region-local is fine schedule_expression = f"at({eta_scheduler.strftime('%Y-%m-%dT%H:%M:%S')})" target_arn = "arn:aws:scheduler:::aws-sdk:ec2:terminateInstances" input_payload = '{"InstanceIds":["' + instance_id + '"]}' - scheduler_client.create_schedule( - Name=schedule_name, - ScheduleExpression=schedule_expression, - FlexibleTimeWindow={"Mode": "OFF"}, - ActionAfterCompletion='DELETE', - Target={ - "Arn": target_arn, - "RoleArn": role_arn, - "Input": input_payload - }, - State='ENABLED', - Description=f"OSWorld TTL terminate for {instance_id}" - ) - - logger.info( - f"Scheduled EC2 termination via EventBridge Scheduler: name={schedule_name}, when={eta_scheduler.isoformat()} (UTC)" - ) + # Retry to tolerate IAM eventual consistency + last_err = None + for attempt in range(1, 7): # ~ up to ~60s + try: + scheduler_client.create_schedule( + Name=schedule_name, + ScheduleExpression=schedule_expression, + FlexibleTimeWindow={"Mode": "OFF"}, + ActionAfterCompletion='DELETE', + Target={ + "Arn": target_arn, + "RoleArn": role_arn, + "Input": input_payload + }, + State='ENABLED', + Description=f"OSWorld TTL terminate for {instance_id}" + ) + logger.info(f"Scheduled EC2 termination via EventBridge Scheduler: name={schedule_name}, when={eta_scheduler.isoformat()} (UTC)") + last_err = None + break + except ClientError as e: + last_err = e + code = e.response.get('Error', {}).get('Code') + msg = e.response.get('Error', {}).get('Message', '') + if code == 'ValidationException' and 'must allow AWS EventBridge Scheduler to assume the role' in msg: + time.sleep(10) + continue + else: + raise + if last_err is not None: + # If we exhausted retries, re-raise to surface warning upstream + raise last_err