feat: Implement task monitoring web application
This commit is contained in:
189
monitor/main.py
Normal file
189
monitor/main.py
Normal file
@@ -0,0 +1,189 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
import os
|
||||
import json
|
||||
import time
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from flask import Flask, render_template_string, jsonify, send_file, request, render_template
|
||||
from dotenv import load_dotenv
|
||||
|
||||
# Load environment variables from .env file
|
||||
load_dotenv()
|
||||
|
||||
app = Flask(__name__)
|
||||
|
||||
# Load configuration from environment variables
|
||||
TASK_CONFIG_PATH = os.getenv("TASK_CONFIG_PATH", "evaluation_examples/test_small.json")
|
||||
EXAMPLES_BASE_PATH = os.getenv("EXAMPLES_BASE_PATH", "evaluation_examples/examples")
|
||||
RESULTS_BASE_PATH = os.getenv("RESULTS_BASE_PATH", "results_operator_aws/pyautogui/screenshot/computer-use-preview")
|
||||
MAX_STEPS = int(os.getenv("MAX_STEPS", "50"))
|
||||
|
||||
def load_task_list():
|
||||
with open(TASK_CONFIG_PATH, 'r') as f:
|
||||
return json.load(f)
|
||||
|
||||
def get_task_info(task_type, task_id):
|
||||
task_file = os.path.join(EXAMPLES_BASE_PATH, task_type, f"{task_id}.json")
|
||||
if os.path.exists(task_file):
|
||||
with open(task_file, 'r') as f:
|
||||
return json.load(f)
|
||||
return None
|
||||
|
||||
def get_task_status(task_type, task_id):
|
||||
result_dir = os.path.join(RESULTS_BASE_PATH, task_type, task_id)
|
||||
|
||||
if not os.path.exists(result_dir):
|
||||
return {
|
||||
"status": "Not Started",
|
||||
"progress": 0,
|
||||
"total_steps": 0,
|
||||
"last_update": None
|
||||
}
|
||||
|
||||
traj_file = os.path.join(result_dir, "traj.jsonl")
|
||||
log_file = os.path.join(result_dir, "runtime.log")
|
||||
result_file = os.path.join(result_dir, "result.txt")
|
||||
|
||||
if not os.path.exists(traj_file):
|
||||
return {
|
||||
"status": "Preparing",
|
||||
"progress": 0,
|
||||
"total_steps": 0,
|
||||
"last_update": datetime.fromtimestamp(os.path.getmtime(result_dir)).strftime("%Y-%m-%d %H:%M:%S")
|
||||
}
|
||||
|
||||
# read trajectory file
|
||||
steps = []
|
||||
with open(traj_file, 'r') as f:
|
||||
for line in f:
|
||||
if line.strip():
|
||||
steps.append(json.loads(line))
|
||||
|
||||
if not steps:
|
||||
return {
|
||||
"status": "Initializing",
|
||||
"progress": 0,
|
||||
"total_steps": 0,
|
||||
"last_update": datetime.fromtimestamp(os.path.getmtime(traj_file)).strftime("%Y-%m-%d %H:%M:%S")
|
||||
}
|
||||
|
||||
last_step = steps[-1]
|
||||
|
||||
# check if the task is done
|
||||
if last_step.get("done", False):
|
||||
status = "Done"
|
||||
elif last_step.get("Error", False):
|
||||
status = "Error"
|
||||
else:
|
||||
status = "Running"
|
||||
|
||||
# get last action timestamp
|
||||
try:
|
||||
last_update = datetime.strptime(last_step["action_timestamp"], "%Y%m%d@%H%M%S").strftime("%Y-%m-%d %H:%M:%S")
|
||||
except KeyError:
|
||||
last_update = "None"
|
||||
|
||||
result_content = "Task not completed"
|
||||
if status == "Done":
|
||||
if os.path.exists(result_file):
|
||||
with open(result_file, 'r') as f:
|
||||
result_content = f.read().strip()
|
||||
else:
|
||||
result_content = "Result file not found"
|
||||
|
||||
return {
|
||||
"status": status,
|
||||
"progress": len(steps),
|
||||
"max_steps": MAX_STEPS,
|
||||
"last_update": last_update,
|
||||
"steps": steps,
|
||||
"result": result_content
|
||||
}
|
||||
|
||||
def get_all_tasks_status():
|
||||
task_list = load_task_list()
|
||||
result = {}
|
||||
|
||||
for task_type, task_ids in task_list.items():
|
||||
result[task_type] = []
|
||||
for task_id in task_ids:
|
||||
task_info = get_task_info(task_type, task_id)
|
||||
task_status = get_task_status(task_type, task_id)
|
||||
|
||||
if task_info:
|
||||
result[task_type].append({
|
||||
"id": task_id,
|
||||
"instruction": task_info.get("instruction", "No instruction provided"),
|
||||
"status": task_status
|
||||
})
|
||||
else:
|
||||
result[task_type].append({
|
||||
"id": task_id,
|
||||
"instruction": "No task info available",
|
||||
"status": task_status
|
||||
})
|
||||
|
||||
return result
|
||||
|
||||
@app.route('/')
|
||||
def index():
|
||||
return render_template("index.html")
|
||||
|
||||
@app.route('/task/<task_type>/<task_id>')
|
||||
def task_detail(task_type, task_id):
|
||||
task_info = get_task_info(task_type, task_id)
|
||||
task_status = get_task_status(task_type, task_id)
|
||||
|
||||
if not task_info:
|
||||
return "Task not found", 404
|
||||
|
||||
return render_template("task_detail.html",
|
||||
task_id=task_id,
|
||||
task_type=task_type,
|
||||
task_info=task_info,
|
||||
task_status=task_status)
|
||||
|
||||
@app.route('/api/tasks')
|
||||
def api_tasks():
|
||||
"""Task status API"""
|
||||
return jsonify(get_all_tasks_status())
|
||||
|
||||
@app.route('/task/<task_type>/<task_id>/screenshot/<path:filename>')
|
||||
def task_screenshot(task_type, task_id, filename):
|
||||
"""Get task screenshot"""
|
||||
screenshot_path = os.path.join(RESULTS_BASE_PATH, task_type, task_id, filename)
|
||||
if os.path.exists(screenshot_path):
|
||||
return send_file(screenshot_path, mimetype='image/png')
|
||||
else:
|
||||
return "Screenshot does not exist", 404
|
||||
|
||||
@app.route('/api/task/<task_type>/<task_id>')
|
||||
def api_task_detail(task_type, task_id):
|
||||
"""Task detail API"""
|
||||
task_info = get_task_info(task_type, task_id)
|
||||
task_status = get_task_status(task_type, task_id)
|
||||
|
||||
if not task_info:
|
||||
return jsonify({"error": "Task does not exist"}), 404
|
||||
|
||||
return jsonify({
|
||||
"info": task_info,
|
||||
"status": task_status
|
||||
})
|
||||
|
||||
if __name__ == '__main__':
|
||||
# Check if necessary directories exist
|
||||
if not os.path.exists(TASK_CONFIG_PATH):
|
||||
print(f"Warning: Task config file does not exist: {TASK_CONFIG_PATH}")
|
||||
|
||||
if not os.path.exists(EXAMPLES_BASE_PATH):
|
||||
print(f"Warning: Task examples directory does not exist: {EXAMPLES_BASE_PATH}")
|
||||
|
||||
# Start web service
|
||||
host = os.getenv("FLASK_HOST", "0.0.0.0")
|
||||
port = int(os.getenv("FLASK_PORT", "8080"))
|
||||
debug = os.getenv("FLASK_DEBUG", "false").lower() == "true"
|
||||
|
||||
app.run(host=host, port=port, debug=debug)
|
||||
Reference in New Issue
Block a user