Files
sci-gui-agent-benchmark/experiment_screenshot_som.py

174 lines
6.1 KiB
Python

import datetime
import json
import logging
import os
import sys
import threading
import time
from desktop_env.envs.desktop_env import DesktopEnv
from mm_agents.gpt_4v_agent import GPT4v_Agent
# Logger Configs {{{ #
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
datetime_str: str = datetime.datetime.now().strftime("%Y%m%d@%H%M%S")
file_handler = logging.FileHandler(os.path.join("logs", "normal-{:}.log".format(datetime_str)), encoding="utf-8")
debug_handler = logging.FileHandler(os.path.join("logs", "debug-{:}.log".format(datetime_str)), encoding="utf-8")
stdout_handler = logging.StreamHandler(sys.stdout)
sdebug_handler = logging.FileHandler(os.path.join("logs", "sdebug-{:}.log".format(datetime_str)), encoding="utf-8")
file_handler.setLevel(logging.INFO)
debug_handler.setLevel(logging.DEBUG)
stdout_handler.setLevel(logging.INFO)
sdebug_handler.setLevel(logging.DEBUG)
formatter = logging.Formatter(
fmt="\x1b[1;33m[%(asctime)s \x1b[31m%(levelname)s \x1b[32m%(module)s/%(lineno)d-%(processName)s\x1b[1;33m] \x1b[0m%(message)s")
file_handler.setFormatter(formatter)
debug_handler.setFormatter(formatter)
stdout_handler.setFormatter(formatter)
sdebug_handler.setFormatter(formatter)
stdout_handler.addFilter(logging.Filter("desktopenv"))
sdebug_handler.addFilter(logging.Filter("desktopenv"))
logger.addHandler(file_handler)
logger.addHandler(debug_handler)
logger.addHandler(stdout_handler)
logger.addHandler(sdebug_handler)
# }}} Logger Configs #
logger = logging.getLogger("desktopenv.experiment")
PATH_TO_VM = r"C:\Users\tianbaox\Documents\Virtual Machines\Ubuntu\Ubuntu.vmx"
def run_one_example(example, agent, max_steps=10, example_trajectory_dir="exp_trajectory", recording=True):
trajectory_recording_path = os.path.join(example_trajectory_dir, "trajectory.json")
env = DesktopEnv(
path_to_vm=PATH_TO_VM,
action_space=agent.action_space,
task_config=example
)
# reset the environment to certain snapshot
observation = env.reset()
done = False
step_num = 0
if recording:
# send a request to the server to start recording
env.controller.start_recording()
while not done and step_num < max_steps:
actions = agent.predict(observation)
step_num += 1
for action in actions:
# Capture the timestamp before executing the action
action_timestamp = datetime.datetime.now().strftime("%Y%m%d@%H%M%S")
logger.info("Step %d: %s", step_num, action)
observation, reward, done, info = env.step(action)
logger.info("Reward: %.2f", reward)
logger.info("Done: %s", done)
logger.info("Info: %s", info)
# Save screenshot and trajectory information
with open(os.path.join(example_trajectory_dir, f"step_{step_num}_{action_timestamp}.png"), "wb") as _f:
with open(observation['screenshot'], "rb") as __f:
screenshot = __f.read()
_f.write(screenshot)
with open(trajectory_recording_path, "a") as f:
f.write(json.dumps({
"step_num": step_num,
"action_timestamp": action_timestamp,
"action": action,
"reward": reward,
"done": done,
"info": info,
"screenshot_file": f"step_{step_num}_{action_timestamp}.png"
}))
f.write("\n")
if done:
logger.info("The episode is done.")
break
def stop_recording():
try:
env.controller.end_recording(os.path.join(example_trajectory_dir, "recording.mp4"))
except Exception as e:
print(f"An error occurred while stopping the recording: {e}")
# Run the `record` function in a separate thread
recording_thread = threading.Thread(target=stop_recording())
recording_thread.start()
# Start a timer for your timeout length (in this case, 60 seconds)
timeout = 60 # seconds
start_time = time.time()
# The main thread will wait for the set timeout period or until the recording is done
while recording_thread.is_alive():
elapsed_time = time.time() - start_time
if elapsed_time >= timeout:
print("Timeout reached. Stopping recording.")
break
time.sleep(0.1) # Sleep for a short time to prevent this loop from using too much CPU
# kill the recording thread if it is still alive
if recording_thread.is_alive():
recording_thread.kill()
# Wait for the recording thread to finish before exiting
recording_thread.join()
result = env.evaluate()
logger.info("Result: %.2f", result)
with open(trajectory_recording_path, "a") as f:
f.write(json.dumps({
"result": result
}))
f.write("\n")
# env.close()
logger.info("Environment closed.")
def main(example_class, example_id):
action_space = "pyautogui"
gpt4_model = "gpt-4-vision-preview"
gemini_model = "gemini-pro-vision"
with open(f"evaluation_examples/examples/{example_class}/{example_id}.json", "r", encoding="utf-8") as f:
example = json.load(f)
example["snapshot"] = "exp_chrome"
api_key = os.environ.get("OPENAI_API_KEY")
agent = GPT4v_Agent(api_key=api_key, model=gpt4_model, instruction=example['instruction'],
action_space=action_space, exp="som")
# api_key = os.environ.get("GENAI_API_KEY")
# agent = GeminiPro_Agent(api_key=api_key, model=gemini_model, instruction=example['instruction'], action_space=action_space)
root_trajectory_dir = "exp_trajectory"
example_trajectory_dir = os.path.join(root_trajectory_dir, "som", example_class, gpt4_model, example_id)
# example_trajectory_dir = os.path.join(root_trajectory_dir, "som", example_class, gemini_model, example_id)
os.makedirs(example_trajectory_dir, exist_ok=True)
run_one_example(example, agent, 15, example_trajectory_dir)
if __name__ == '__main__':
xx_list = [
]
for example_id in xx_list:
main("xx", example_id)