Feat/monitor cache (#267)

* feat&style: add task status configuration and clear cache functionality; enhance UI styles

* feat&refactor: enhance current configuration API and improve cache clearing logic

* refactor&style: simplify task status update logic and improve page refresh mechanism

* refactor&feat: streamline default configuration retrieval and enhance cache initialization logic

* feat&refactor: add caching to default configuration retrieval and streamline task status logic

* feat&style: add collapsible section for additional model parameters and enhance styling for config items

* refactor&style: remove floating action button and clean up related styles
This commit is contained in:
Zilong Zhou
2025-07-18 01:58:20 +08:00
committed by GitHub
parent e70cf0bd93
commit 66694c663d
5 changed files with 450 additions and 293 deletions

View File

@@ -5,10 +5,8 @@ from functools import cache
import os
import json
import time
import subprocess
from datetime import datetime
from pathlib import Path
from flask import Flask, render_template_string, jsonify, send_file, request, render_template
from flask import Flask, jsonify, send_file, request, render_template
from dotenv import load_dotenv
@@ -36,15 +34,11 @@ else:
EXAMPLES_BASE_PATH = os.getenv("EXAMPLES_BASE_PATH", "../evaluation_examples/examples")
RESULTS_BASE_PATH = os.getenv("RESULTS_BASE_PATH", "../results")
ACTION_SPACE=os.getenv("ACTION_SPACE", "pyautogui")
OBSERVATION_TYPE=os.getenv("OBSERVATION_TYPE", "screenshot")
MODEL_NAME=os.getenv("MODEL_NAME", "computer-use-preview")
MAX_STEPS = int(os.getenv("MAX_STEPS", "150"))
def initialize_default_config():
"""Initialize default configuration from the first available config in results directory"""
global ACTION_SPACE, OBSERVATION_TYPE, MODEL_NAME, RESULTS_PATH, MAX_STEPS
@cache
def get_default_config():
"""Get the first available configuration from results directory"""
if os.path.exists(RESULTS_BASE_PATH):
try:
# Scan for the first available configuration
@@ -57,34 +51,38 @@ def initialize_default_config():
for model_name in os.listdir(obs_path):
model_path = os.path.join(obs_path, model_name)
if os.path.isdir(model_path):
# Use the first available configuration as default
ACTION_SPACE = action_space
OBSERVATION_TYPE = obs_type
MODEL_NAME = model_name
RESULTS_PATH = model_path
# Read max_steps from args.json if available
# Get max_steps from args.json if available
model_args = get_model_args(action_space, obs_type, model_name)
max_steps = MAX_STEPS
if model_args and 'max_steps' in model_args:
MAX_STEPS = model_args['max_steps']
max_steps = model_args['max_steps']
print(f"Initialized default config: {ACTION_SPACE}/{OBSERVATION_TYPE}/{MODEL_NAME} (max_steps: {MAX_STEPS})")
return
print(f"Found default config: {action_space}/{obs_type}/{model_name} (max_steps: {max_steps})")
return {
'action_space': action_space,
'observation_type': obs_type,
'model_name': model_name,
'max_steps': max_steps
}
except Exception as e:
print(f"Error scanning results directory for default config: {e}")
# Fallback to original environment-based path if no configs found
RESULTS_PATH = os.path.join(RESULTS_BASE_PATH, ACTION_SPACE, OBSERVATION_TYPE, MODEL_NAME)
print(f"Using fallback config from environment: {ACTION_SPACE}/{OBSERVATION_TYPE}/{MODEL_NAME} (max_steps: {MAX_STEPS})")
# Fallback to environment-based config if no configs found
fallback_config = {
'action_space': os.getenv("ACTION_SPACE", "pyautogui"),
'observation_type': os.getenv("OBSERVATION_TYPE", "screenshot"),
'model_name': os.getenv("MODEL_NAME", "computer-use-preview"),
'max_steps': MAX_STEPS
}
print(f"Using fallback config from environment: {fallback_config['action_space']}/{fallback_config['observation_type']}/{fallback_config['model_name']} (max_steps: {fallback_config['max_steps']})")
return fallback_config
# Initialize default configuration
initialize_default_config()
RESULTS_PATH = os.path.join(RESULTS_BASE_PATH, ACTION_SPACE, OBSERVATION_TYPE, MODEL_NAME)
if RESULTS_PATH not in TASK_STATUS_CACHE:
# Initialize cache for this results path
TASK_STATUS_CACHE[RESULTS_PATH] = {}
def ensure_cache_initialized(action_space, observation_type, model_name):
"""Ensure cache is initialized for the given configuration"""
results_path = os.path.join(RESULTS_BASE_PATH, action_space, observation_type, model_name)
if results_path not in TASK_STATUS_CACHE:
TASK_STATUS_CACHE[results_path] = {}
return results_path
@cache
def load_task_list():
@@ -99,8 +97,16 @@ def get_task_info(task_type, task_id):
return json.load(f)
return None
def get_task_status(task_type, task_id):
result_dir = os.path.join(RESULTS_PATH, task_type, task_id)
def get_task_status_with_config(task_type, task_id, action_space, observation_type, model_name):
results_path = os.path.join(RESULTS_BASE_PATH, action_space, observation_type, model_name)
max_steps = MAX_STEPS
# Get max_steps from args.json if available
model_args = get_model_args(action_space, observation_type, model_name)
if model_args and 'max_steps' in model_args:
max_steps = model_args['max_steps']
result_dir = os.path.join(results_path, task_type, task_id)
if not os.path.exists(result_dir):
return {
@@ -152,7 +158,7 @@ def get_task_status(task_type, task_id):
log_content = f.readlines()
last_response = None
for i, line in enumerate(log_content):
for line in log_content:
# Extract agent responses for each step
if "Responses: [" in line:
response_text = line.split("Responses: [")[1].strip()
@@ -192,7 +198,7 @@ def get_task_status(task_type, task_id):
status = "Done (Message Exit)"
elif log_data.get("exit_condition") and "thought_exit: True" in log_data.get("exit_condition", ""):
status = "Done (Thought Exit)"
elif len(steps) >= MAX_STEPS:
elif len(steps) >= max_steps:
status = "Done (Max Steps)"
else:
status = "Running"
@@ -214,25 +220,41 @@ def get_task_status(task_type, task_id):
return {
"status": status,
"progress": len(steps),
"max_steps": MAX_STEPS,
"max_steps": max_steps,
"last_update": last_update,
"steps": steps,
"log_data": log_data,
"result": result_content
}
def get_task_status_brief(task_type, task_id):
def get_task_status(task_type, task_id):
# This function should not be used anymore - use get_task_status_with_config instead
default_config = get_default_config()
return get_task_status_with_config(task_type, task_id,
default_config['action_space'],
default_config['observation_type'],
default_config['model_name'])
def get_task_status_brief_with_config(task_type, task_id, action_space, observation_type, model_name):
"""
Get brief status info for a task, without detailed step data, for fast homepage loading.
"""
# Generate cache key based on task type and ID
cache_key = f"{task_type}_{task_id}"
results_path = os.path.join(RESULTS_BASE_PATH, action_space, observation_type, model_name)
max_steps = MAX_STEPS
# Get max_steps from args.json if available
model_args = get_model_args(action_space, observation_type, model_name)
if model_args and 'max_steps' in model_args:
max_steps = model_args['max_steps']
# Generate cache key based on task type, ID, and config
cache_key = f"{task_type}_{task_id}_{action_space}_{observation_type}_{model_name}"
# Check if the status is already cached
current_time = time.time()
last_cache_time = None
if cache_key in TASK_STATUS_CACHE[RESULTS_PATH]:
cached_status, cached_time = TASK_STATUS_CACHE[RESULTS_PATH][cache_key]
if results_path in TASK_STATUS_CACHE and cache_key in TASK_STATUS_CACHE[results_path]:
cached_status, cached_time = TASK_STATUS_CACHE[results_path][cache_key]
last_cache_time = cached_time
# If cached status is "Done", check if it's within the stability period
if cached_status["status"].startswith("Done"):
@@ -247,13 +269,13 @@ def get_task_status_brief(task_type, task_id):
# For non-Done status (like Error), just return from cache
return cached_status
result_dir = os.path.join(RESULTS_PATH, task_type, task_id)
result_dir = os.path.join(results_path, task_type, task_id)
if not os.path.exists(result_dir):
return {
"status": "Not Started",
"progress": 0,
"max_steps": MAX_STEPS,
"max_steps": max_steps,
"last_update": None
}
@@ -265,7 +287,7 @@ def get_task_status_brief(task_type, task_id):
return {
"status": "Preparing",
"progress": 0,
"max_steps": MAX_STEPS,
"max_steps": max_steps,
"last_update": datetime.fromtimestamp(os.path.getmtime(result_dir)).strftime("%Y-%m-%d %H:%M:%S")
}
@@ -296,7 +318,7 @@ def get_task_status_brief(task_type, task_id):
return {
"status": "Initializing",
"progress": 0,
"max_steps": MAX_STEPS,
"max_steps": max_steps,
"last_update": datetime.fromtimestamp(os.path.getmtime(traj_file)).strftime("%Y-%m-%d %H:%M:%S")
}
@@ -311,7 +333,7 @@ def get_task_status_brief(task_type, task_id):
status = "Error"
# If step count reaches max, consider as done
if step_count >= MAX_STEPS:
if step_count >= max_steps:
status = "Done (Max Steps)"
# Quickly check exit condition in log file (only last few lines)
@@ -329,7 +351,7 @@ def get_task_status_brief(task_type, task_id):
pass
# If step count reaches max again (double check)
if step_count >= MAX_STEPS:
if step_count >= max_steps:
status = "Done (Max Steps)"
# Get last update time
@@ -352,18 +374,34 @@ def get_task_status_brief(task_type, task_id):
status_dict = {
"status": status,
"progress": step_count,
"max_steps": MAX_STEPS,
"max_steps": max_steps,
"last_update": last_update,
"result": result_content
}
# Initialize cache for this results path if it doesn't exist
if results_path not in TASK_STATUS_CACHE:
TASK_STATUS_CACHE[results_path] = {}
# Cache the status if it is done or error
if status.startswith("Done") or status == "Error":
current_time = last_cache_time if last_cache_time else current_time
TASK_STATUS_CACHE[RESULTS_PATH][cache_key] = (status_dict, current_time)
TASK_STATUS_CACHE[results_path][cache_key] = (status_dict, current_time)
return status_dict
def get_task_status_brief(task_type, task_id):
"""
Get brief status info for a task, without detailed step data, for fast homepage loading.
"""
# This function should not be used anymore - use get_task_status_brief_with_config instead
default_config = get_default_config()
return get_task_status_brief_with_config(task_type, task_id,
default_config['action_space'],
default_config['observation_type'],
default_config['model_name'])
def get_all_tasks_status():
task_list = load_task_list()
result = {}
@@ -389,6 +427,59 @@ def get_all_tasks_status():
return result
def get_all_tasks_status_with_config(action_space, observation_type, model_name):
task_list = load_task_list()
result = {}
for task_type, task_ids in task_list.items():
result[task_type] = []
for task_id in task_ids:
task_info = get_task_info(task_type, task_id)
task_status = get_task_status_with_config(task_type, task_id, action_space, observation_type, model_name)
if task_info:
result[task_type].append({
"id": task_id,
"instruction": task_info.get("instruction", "No instruction provided"),
"status": task_status
})
else:
result[task_type].append({
"id": task_id,
"instruction": "No task info available",
"status": task_status
})
return result
def get_all_tasks_status_brief_with_config(action_space, observation_type, model_name):
"""
Get brief status info for all tasks, without detailed step data, for fast homepage loading.
"""
task_list = load_task_list()
result = {}
for task_type, task_ids in task_list.items():
result[task_type] = []
for task_id in task_ids:
task_info = get_task_info(task_type, task_id)
task_status = get_task_status_brief_with_config(task_type, task_id, action_space, observation_type, model_name)
if task_info:
result[task_type].append({
"id": task_id,
"instruction": task_info.get("instruction", "No instruction provided"),
"status": task_status
})
else:
result[task_type].append({
"id": task_id,
"instruction": "No task info available",
"status": task_status
})
return result
def get_all_tasks_status_brief():
"""
Get brief status info for all tasks, without detailed step data, for fast homepage loading.
@@ -423,8 +514,14 @@ def index():
@app.route('/task/<task_type>/<task_id>')
def task_detail(task_type, task_id):
# Get config from URL parameters
default_config = get_default_config()
action_space = request.args.get('action_space', default_config['action_space'])
observation_type = request.args.get('observation_type', default_config['observation_type'])
model_name = request.args.get('model_name', default_config['model_name'])
task_info = get_task_info(task_type, task_id)
task_status = get_task_status(task_type, task_id)
task_status = get_task_status_with_config(task_type, task_id, action_space, observation_type, model_name)
if not task_info:
return "Task not found", 404
@@ -433,22 +530,44 @@ def task_detail(task_type, task_id):
task_id=task_id,
task_type=task_type,
task_info=task_info,
task_status=task_status)
task_status=task_status,
action_space=action_space,
observation_type=observation_type,
model_name=model_name)
@app.route('/api/tasks')
def api_tasks():
"""Task status API"""
return jsonify(get_all_tasks_status())
# Get config from URL parameters
default_config = get_default_config()
action_space = request.args.get('action_space', default_config['action_space'])
observation_type = request.args.get('observation_type', default_config['observation_type'])
model_name = request.args.get('model_name', default_config['model_name'])
return jsonify(get_all_tasks_status_with_config(action_space, observation_type, model_name))
@app.route('/api/tasks/brief')
def api_tasks_brief():
"""Return brief status info for all tasks, without detailed step data, for fast homepage loading."""
return jsonify(get_all_tasks_status_brief())
# Get config from URL parameters
default_config = get_default_config()
action_space = request.args.get('action_space', default_config['action_space'])
observation_type = request.args.get('observation_type', default_config['observation_type'])
model_name = request.args.get('model_name', default_config['model_name'])
return jsonify(get_all_tasks_status_brief_with_config(action_space, observation_type, model_name))
@app.route('/task/<task_type>/<task_id>/screenshot/<path:filename>')
def task_screenshot(task_type, task_id, filename):
"""Get task screenshot"""
screenshot_path = os.path.join(RESULTS_PATH, task_type, task_id, filename)
# Get config from URL parameters
default_config = get_default_config()
action_space = request.args.get('action_space', default_config['action_space'])
observation_type = request.args.get('observation_type', default_config['observation_type'])
model_name = request.args.get('model_name', default_config['model_name'])
results_path = os.path.join(RESULTS_BASE_PATH, action_space, observation_type, model_name)
screenshot_path = os.path.join(results_path, task_type, task_id, filename)
if os.path.exists(screenshot_path):
return send_file(screenshot_path, mimetype='image/png')
else:
@@ -457,7 +576,14 @@ def task_screenshot(task_type, task_id, filename):
@app.route('/task/<task_type>/<task_id>/recording')
def task_recording(task_type, task_id):
"""Get task recording video"""
recording_path = os.path.join(RESULTS_PATH, task_type, task_id, "recording.mp4")
# Get config from URL parameters
default_config = get_default_config()
action_space = request.args.get('action_space', default_config['action_space'])
observation_type = request.args.get('observation_type', default_config['observation_type'])
model_name = request.args.get('model_name', default_config['model_name'])
results_path = os.path.join(RESULTS_BASE_PATH, action_space, observation_type, model_name)
recording_path = os.path.join(results_path, task_type, task_id, "recording.mp4")
if os.path.exists(recording_path):
response = send_file(recording_path, mimetype='video/mp4')
# Add headers to improve mobile compatibility
@@ -471,8 +597,14 @@ def task_recording(task_type, task_id):
@app.route('/api/task/<task_type>/<task_id>')
def api_task_detail(task_type, task_id):
"""Task detail API"""
# Get config from URL parameters
default_config = get_default_config()
action_space = request.args.get('action_space', default_config['action_space'])
observation_type = request.args.get('observation_type', default_config['observation_type'])
model_name = request.args.get('model_name', default_config['model_name'])
task_info = get_task_info(task_type, task_id)
task_status = get_task_status(task_type, task_id)
task_status = get_task_status_with_config(task_type, task_id, action_space, observation_type, model_name)
if not task_info:
return jsonify({"error": "Task does not exist"}), 404
@@ -488,9 +620,9 @@ def api_config():
config_info = {
"task_config_path": TASK_CONFIG_PATH,
"results_base_path": RESULTS_BASE_PATH,
"action_space": ACTION_SPACE,
"observation_type": OBSERVATION_TYPE,
"model_name": MODEL_NAME,
"action_space": get_default_config()['action_space'],
"observation_type": get_default_config()['observation_type'],
"model_name": get_default_config()['model_name'],
"max_steps": MAX_STEPS,
"examples_base_path": EXAMPLES_BASE_PATH
}
@@ -529,16 +661,27 @@ def api_available_configs():
@app.route('/api/current-config')
def api_current_config():
"""Get current configuration including args.json data"""
# Get config from URL parameters or use defaults
default_config = get_default_config()
action_space = request.args.get('action_space', default_config['action_space'])
observation_type = request.args.get('observation_type', default_config['observation_type'])
model_name = request.args.get('model_name', default_config['model_name'])
# Get max_steps from args.json if available
model_args = get_model_args(action_space, observation_type, model_name)
max_steps = MAX_STEPS
if model_args and 'max_steps' in model_args:
max_steps = model_args['max_steps']
config = {
"action_space": ACTION_SPACE,
"observation_type": OBSERVATION_TYPE,
"model_name": MODEL_NAME,
"max_steps": MAX_STEPS,
"results_path": RESULTS_PATH
"action_space": action_space,
"observation_type": observation_type,
"model_name": model_name,
"max_steps": max_steps,
"results_path": os.path.join(RESULTS_BASE_PATH, action_space, observation_type, model_name)
}
# Add model args from args.json
model_args = get_model_args(ACTION_SPACE, OBSERVATION_TYPE, MODEL_NAME)
if model_args:
config["model_args"] = model_args
else:
@@ -546,39 +689,6 @@ def api_current_config():
return jsonify(config)
@app.route('/api/set-config', methods=['POST'])
def api_set_config():
"""Set current configuration"""
global ACTION_SPACE, OBSERVATION_TYPE, MODEL_NAME, RESULTS_PATH, MAX_STEPS
data = request.get_json()
if not data:
return jsonify({"error": "No data provided"}), 400
# Update global variables
ACTION_SPACE = data.get('action_space', ACTION_SPACE)
OBSERVATION_TYPE = data.get('observation_type', OBSERVATION_TYPE)
MODEL_NAME = data.get('model_name', MODEL_NAME)
# Update results path
RESULTS_PATH = os.path.join(RESULTS_BASE_PATH, ACTION_SPACE, OBSERVATION_TYPE, MODEL_NAME)
# Update max_steps from args.json if available
model_args = get_model_args(ACTION_SPACE, OBSERVATION_TYPE, MODEL_NAME)
if model_args and 'max_steps' in model_args:
MAX_STEPS = model_args['max_steps']
if RESULTS_PATH not in TASK_STATUS_CACHE:
# Initialize cache for this results path
TASK_STATUS_CACHE[RESULTS_PATH] = {}
return jsonify({
"action_space": ACTION_SPACE,
"observation_type": OBSERVATION_TYPE,
"model_name": MODEL_NAME,
"max_steps": MAX_STEPS,
"results_path": RESULTS_PATH
})
def get_model_args(action_space, observation_type, model_name):
"""Get model arguments from args.json file"""
@@ -591,6 +701,28 @@ def get_model_args(action_space, observation_type, model_name):
print(f"Error reading args.json: {e}")
return None
@app.route('/api/clear-cache', methods=['POST'])
def api_clear_cache():
"""Clear task status cache for current configuration"""
global TASK_STATUS_CACHE
# Get config from URL parameters or use defaults
default_config = get_default_config()
action_space = request.args.get('action_space', default_config['action_space'])
observation_type = request.args.get('observation_type', default_config['observation_type'])
model_name = request.args.get('model_name', default_config['model_name'])
results_path = os.path.join(RESULTS_BASE_PATH, action_space, observation_type, model_name)
# Clear cache only for the current configuration
if results_path in TASK_STATUS_CACHE:
TASK_STATUS_CACHE[results_path].clear()
message = f"Cache cleared for configuration: {action_space}/{observation_type}/{model_name}"
else:
message = f"No cache found for configuration: {action_space}/{observation_type}/{model_name}"
return jsonify({"message": message})
if __name__ == '__main__':
# Check if necessary directories exist
if not os.path.exists(TASK_CONFIG_PATH):