Environment is_used flag; OS domain fix (#219)

* Refactor evaluator structure in LibreOffice Writer example JSON to support multiple expected and result files, enhancing evaluation flexibility.

* Update instance type to t3.large and add VNC access URL logging for allocated VMs, enhancing remote access capabilities.

* Update instance type to t3.large and add VNC access URL logging for allocated VMs, enhancing remote access capabilities.

* Update time format in get_vm_file function to include hours, minutes, and seconds for more precise file naming with time suffix.

* More delay for 936321ce-5236-426a-9a20-e0e3c5dc536f; support one more potential solutions.

* Enhance SetupController with configurable retry limit and improved error handling for file opening requests. Introduce new function to compare unique training records, and update logging for better debugging. Adjust JSON examples for evaluation to support multiple expected and result files.

* Clean debug code

* Enhance DesktopEnv to track environment usage for optimized snapshot management. Introduce is_environment_used flag to determine if a snapshot revert is necessary based on provider type. Update setup and step methods to mark environment usage appropriately. Add new execute_with_verification method in SetupController for command execution with result verification, improving reliability. Change AWS instance type to m5.large for better performance and update AMI ID for compatibility. Update file opening logic in main.py to handle both file paths and application commands more effectively.

---------

Co-authored-by: yuanmengqi <yuanmengqi@mail.ustc.edu.cn>
This commit is contained in:
Tianbao Xie
2025-06-28 00:45:53 +08:00
committed by GitHub
parent 48ac57697a
commit 0cc93543a8
5 changed files with 224 additions and 21 deletions

View File

@@ -305,6 +305,57 @@ class SetupController:
if not terminates:
time.sleep(0.3)
def _execute_with_verification_setup(
self,
command: List[str],
verification: Dict[str, Any] = None,
max_wait_time: int = 10,
check_interval: float = 1.0,
shell: bool = False
):
"""Execute command with verification of results
Args:
command: Command to execute
verification: Dict with verification criteria:
- window_exists: Check if window with this name exists
- command_success: Execute this command and check if it succeeds
max_wait_time: Maximum time to wait for verification
check_interval: Time between verification checks
shell: Whether to use shell
"""
if not command:
raise Exception("Empty command to launch.")
verification = verification or {}
payload = json.dumps({
"command": command,
"shell": shell,
"verification": verification,
"max_wait_time": max_wait_time,
"check_interval": check_interval
})
headers = {"Content-Type": "application/json"}
try:
response = requests.post(self.http_server + "/setup" + "/execute_with_verification",
headers=headers, data=payload, timeout=max_wait_time + 10)
if response.status_code == 200:
result = response.json()
logger.info("Command executed and verified successfully: %s -> %s"
, " ".join(command) if isinstance(command, list) else command
, response.text
)
return result
else:
logger.error("Failed to execute with verification. Status code: %s", response.text)
raise Exception(f"Command verification failed: {response.text}")
except requests.exceptions.RequestException as e:
logger.error("An error occurred while trying to send the request: %s", e)
traceback.print_exc()
raise Exception(f"Request failed: {e}")
def _command_setup(self, command: List[str], **kwargs):
self._execute_setup(command, **kwargs)

View File

@@ -18,7 +18,7 @@ logger = logging.getLogger("desktopenv.env")
Metric = Callable[[Any, Any], float]
Getter = Callable[[gym.Env, Dict[str, Any]], Any]
MAX_RETRIES = 5
MAX_RETRIES = 5 # Maximum retries for environment setup
class DesktopEnv(gym.Env):
"""
@@ -72,6 +72,16 @@ class DesktopEnv(gym.Env):
self.os_type = os_type
# Track whether environment has been used (step/setup) to optimize snapshot revert
# docker, aws, gcp, azure are always unused as the emulator starts from a clean state
# vmware, virtualbox are always used as the emulator starts from a dirty state
if self.provider_name in {"docker", "aws", "gcp", "azure"}:
self.is_environment_used = False
elif self.provider_name in {"vmware", "virtualbox"}:
self.is_environment_used = True
else:
raise ValueError(f"Invalid provider name: {self.provider_name}")
# Initialize environment variables
if path_to_vm:
self.path_to_vm = os.path.abspath(os.path.expandvars(os.path.expanduser(path_to_vm))) \
@@ -190,11 +200,19 @@ class DesktopEnv(gym.Env):
logger.info("Using regular AWS provider.")
logger.info("Reverting to snapshot to {}...".format(self.snapshot_name))
self._revert_to_snapshot()
logger.info("Starting emulator...")
self._start_emulator()
logger.info("Emulator started.")
# Only revert to snapshot if environment has been used (step/setup)
# This optimization is especially important for cloud providers like AWS
# where unnecessary snapshot operations are costly and time-consuming
if self.is_environment_used:
logger.info("Environment has been used, reverting to snapshot {}...".format(self.snapshot_name))
self._revert_to_snapshot()
logger.info("Starting emulator...")
self._start_emulator()
logger.info("Emulator started.")
# Reset the usage flag after reverting
self.is_environment_used = False
else:
logger.info("Environment is clean, skipping snapshot revert (provider: {}).".format(self.provider_name))
if task_config is not None:
self._set_task_info(task_config)
@@ -202,6 +220,9 @@ class DesktopEnv(gym.Env):
logger.info("Setting up environment...")
success = self.setup_controller.setup(self.config)
if success:
# Mark environment as used when setup is successfully executed
if self.config: # Only mark as used if there were actual setup operations
self.is_environment_used = True
break
else:
logger.error(
@@ -300,6 +321,9 @@ class DesktopEnv(gym.Env):
def step(self, action, pause=2):
self._step_no += 1
self.action_history.append(action)
# Mark environment as used when step is called
self.is_environment_used = True
reward = 0 # todo: Define reward calculation for each example
done = False # todo: Define episode termination condition for each example
@@ -336,7 +360,11 @@ class DesktopEnv(gym.Env):
Evaluate whether the task is successfully completed.
"""
self.setup_controller.setup(self.evaluator.get("postconfig", []))
postconfig = self.evaluator.get("postconfig", [])
self.setup_controller.setup(postconfig)
# Mark environment as used if there were postconfig setup operations
if postconfig:
self.is_environment_used = True
if self.evaluator['func'] == "infeasible":
if len(self.action_history) > 0 and self.action_history[-1] == "FAIL":

View File

@@ -6,7 +6,8 @@ import logging
import dotenv
import signal
INSTANCE_TYPE = "t3.medium"
# Using m5.large for better storage I/O performance to match io2 capabilities
INSTANCE_TYPE = "m5.large"
# Load environment variables from .env file
dotenv.load_dotenv()

View File

@@ -117,6 +117,113 @@ def execute_command():
}), 500
@app.route('/setup/execute_with_verification', methods=['POST'])
@app.route('/execute_with_verification', methods=['POST'])
def execute_command_with_verification():
"""Execute command and verify the result based on provided verification criteria"""
data = request.json
shell = data.get('shell', False)
command = data.get('command', "" if shell else [])
verification = data.get('verification', {})
max_wait_time = data.get('max_wait_time', 10) # Maximum wait time in seconds
check_interval = data.get('check_interval', 1) # Check interval in seconds
if isinstance(command, str) and not shell:
command = shlex.split(command)
# Expand user directory
for i, arg in enumerate(command):
if arg.startswith("~/"):
command[i] = os.path.expanduser(arg)
# Execute the main command
try:
if platform_name == "Windows":
flags = subprocess.CREATE_NO_WINDOW
else:
flags = 0
result = subprocess.run(
command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
shell=shell,
text=True,
timeout=120,
creationflags=flags,
)
# If no verification is needed, return immediately
if not verification:
return jsonify({
'status': 'success',
'output': result.stdout,
'error': result.stderr,
'returncode': result.returncode
})
# Wait and verify the result
import time
start_time = time.time()
while time.time() - start_time < max_wait_time:
verification_passed = True
# Check window existence if specified
if 'window_exists' in verification:
window_name = verification['window_exists']
try:
if platform_name == 'Linux':
wmctrl_result = subprocess.run(['wmctrl', '-l'],
capture_output=True, text=True, check=True)
if window_name.lower() not in wmctrl_result.stdout.lower():
verification_passed = False
elif platform_name in ['Windows', 'Darwin']:
import pygetwindow as gw
windows = gw.getWindowsWithTitle(window_name)
if not windows:
verification_passed = False
except Exception:
verification_passed = False
# Check command execution if specified
if 'command_success' in verification:
verify_cmd = verification['command_success']
try:
verify_result = subprocess.run(verify_cmd, shell=True,
capture_output=True, text=True, timeout=5)
if verify_result.returncode != 0:
verification_passed = False
except Exception:
verification_passed = False
if verification_passed:
return jsonify({
'status': 'success',
'output': result.stdout,
'error': result.stderr,
'returncode': result.returncode,
'verification': 'passed',
'wait_time': time.time() - start_time
})
time.sleep(check_interval)
# Verification failed
return jsonify({
'status': 'verification_failed',
'output': result.stdout,
'error': result.stderr,
'returncode': result.returncode,
'verification': 'failed',
'wait_time': max_wait_time
}), 500
except Exception as e:
return jsonify({
'status': 'error',
'message': str(e)
}), 500
def _get_machine_architecture() -> str:
""" Get the machine architecture, e.g., x86_64, arm64, aarch64, i386, etc.
"""
@@ -1129,20 +1236,36 @@ def open_file():
path_obj = Path(os.path.expandvars(os.path.expanduser(path)))
if not path_obj.exists():
return f"File not found: {path_obj}", 404
# Check if it's a file path that exists
is_file_path = path_obj.exists()
# If it's not a file path, treat it as an application name/command
if not is_file_path:
# Check if it's a valid command by trying to find it in PATH
import shutil
if not shutil.which(path):
return f"Application/file not found: {path}", 404
try:
if platform.system() == "Windows":
os.startfile(path_obj)
if is_file_path:
# Handle file opening
if platform.system() == "Windows":
os.startfile(path_obj)
else:
open_cmd: str = "open" if platform.system() == "Darwin" else "xdg-open"
subprocess.Popen([open_cmd, str(path_obj)])
file_name = path_obj.name
file_name_without_ext, _ = os.path.splitext(file_name)
else:
open_cmd: str = "open" if platform.system() == "Darwin" else "xdg-open"
subprocess.Popen([open_cmd, str(path_obj)])
# Handle application launching
if platform.system() == "Windows":
subprocess.Popen([path])
else:
subprocess.Popen([path])
file_name = path
file_name_without_ext = path
# Wait for the file to open
file_name = path_obj.name
# Some apps don't include the extension in the title
file_name_without_ext, _ = os.path.splitext(file_name)
# Wait for the file/application to open
start_time = time.time()
window_found = False
@@ -1365,7 +1488,7 @@ def end_recording():
error_output = ""
try:
# Send SIGINT for a graceful shutdown, allowing ffmpeg to finalize the file.
recording_process.send_signal(signal.SIGINT)
recording_process.send_signal(signal.SIGINT)
# Wait for ffmpeg to terminate. communicate() gets output and waits.
_, error_output = recording_process.communicate(timeout=15)
except subprocess.TimeoutExpired:
@@ -1373,7 +1496,7 @@ def end_recording():
recording_process.kill()
# After killing, communicate to get any remaining output.
_, error_output = recording_process.communicate()
recording_process = None
recording_process = None
return jsonify({
'status': 'error',
'message': f'Recording process was unresponsive and had to be killed. Stderr: {error_output}'

View File

@@ -50,7 +50,7 @@
"type": "rule",
"rules": {
"include": [
"54\n"
"54"
],
"exclude": []
}