Files
sci-gui-agent-benchmark/scripts/guest_scripts/agent_server.py
2026-01-12 18:30:12 +08:00

183 lines
6.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# 运行在 Windows 虚拟机内部
from flask import Flask, request, send_file
import pyautogui
import io
import os
import subprocess
import ctypes
import time
app = Flask(__name__)
# 获取Windows DPI缩放比例
def get_dpi_scale():
"""获取Windows的DPI缩放比例"""
try:
# 获取主显示器的DPI缩放比例
scale_factor = ctypes.windll.shcore.GetScaleFactorForDevice(0) / 100.0
return scale_factor
except:
# 如果获取失败默认返回1.0(无缩放)
return 1.0
# 获取实际屏幕分辨率
def get_screen_size():
"""获取实际屏幕分辨率(物理像素)"""
try:
user32 = ctypes.windll.user32
width = user32.GetSystemMetrics(0) # SM_CXSCREEN
height = user32.GetSystemMetrics(1) # SM_CYSCREEN
return width, height
except:
# 如果获取失败,使用 pyautogui 的方法
return pyautogui.size()
DPI_SCALE = get_dpi_scale()
SCREEN_WIDTH, SCREEN_HEIGHT = get_screen_size()
print(f"检测到DPI缩放比例: {DPI_SCALE}")
print(f"实际屏幕分辨率: {SCREEN_WIDTH} x {SCREEN_HEIGHT}")
# 获取截图分辨率(用于坐标转换)
def get_screenshot_size():
"""获取截图的实际分辨率"""
img = pyautogui.screenshot()
return img.size[0], img.size[1]
SCREENSHOT_WIDTH, SCREENSHOT_HEIGHT = get_screenshot_size()
print(f"截图分辨率: {SCREENSHOT_WIDTH} x {SCREENSHOT_HEIGHT}")
# 1. 获取屏幕截图
@app.route('/screenshot', methods=['GET'])
def screenshot():
img = pyautogui.screenshot()
img_io = io.BytesIO()
img.save(img_io, 'PNG')
img_io.seek(0)
return send_file(img_io, mimetype='image/png')
# 获取分辨率信息(用于调试)
@app.route('/screen_info', methods=['GET'])
def screen_info():
"""返回屏幕和截图的分辨率信息,用于调试坐标转换"""
screenshot_w, screenshot_h = get_screenshot_size()
return {
"screen_width": SCREEN_WIDTH,
"screen_height": SCREEN_HEIGHT,
"screenshot_width": screenshot_w,
"screenshot_height": screenshot_h,
"dpi_scale": DPI_SCALE,
"scale_ratio_x": SCREEN_WIDTH / screenshot_w if screenshot_w > 0 else 1.0,
"scale_ratio_y": SCREEN_HEIGHT / screenshot_h if screenshot_h > 0 else 1.0
}
# 2. 执行动作
@app.route('/action', methods=['POST'])
def action():
data = request.json
try:
if data['type'] == 'click':
# 获取当前截图分辨率(可能每次不同)
screenshot_w, screenshot_h = get_screenshot_size()
# 从截图坐标转换为实际屏幕坐标
# 如果截图分辨率和屏幕分辨率不同,需要按比例缩放
x = data['x']
y = data['y']
# 计算缩放比例
scale_x = SCREEN_WIDTH / screenshot_w if screenshot_w > 0 else 1.0
scale_y = SCREEN_HEIGHT / screenshot_h if screenshot_h > 0 else 1.0
# 应用缩放
actual_x = int(x * scale_x)
actual_y = int(y * scale_y)
print(f"收到坐标: ({x}, {y}) -> 转换后: ({actual_x}, {actual_y}) [缩放比例: {scale_x:.2f}, {scale_y:.2f}]")
pyautogui.click(x=actual_x, y=actual_y)
elif data['type'] == 'type':
pyautogui.write(data['text'])
elif data['type'] == 'hotkey':
pyautogui.hotkey(*data['keys']) # 例如 ['ctrl', 's']
return {"status": "success"}
except Exception as e:
return {"status": "error", "msg": str(e)}
# 获取当前鼠标位置 (用于Host录制辅助)
@app.route('/mouse_pos', methods=['GET'])
def mouse_pos():
"""获取虚拟机当前鼠标位置"""
try:
x, y = pyautogui.position()
return {
"status": "success",
"x": int(x),
"y": int(y),
"timestamp": time.time()
}
except Exception as e:
return {"status": "error", "msg": str(e)}, 500
# 3. [关键!] 初始化环境
@app.route('/reset', methods=['POST'])
def reset():
# 这里可以写简单的逻辑:
# 1. 杀死 Jade 进程
os.system("taskkill /f /im jade.exe")
# 2. 这里的"重置"比快照弱,但对于 M1 调试更方便
# 如果必须用快照,需要在 Step 3 的 Mac 端调用 vmrun
return {"status": "reset_done"}
# 4. 列出桌面文件(用于调试)
@app.route('/list_desktop', methods=['GET'])
def list_desktop():
"""列出桌面上的文件"""
try:
desktop = os.path.expanduser(r"~\Desktop")
if os.path.exists(desktop):
files = os.listdir(desktop)
return {"status": "success", "files": files, "desktop_path": desktop}
else:
return {"status": "error", "msg": "Desktop path not found"}
except Exception as e:
return {"status": "error", "msg": str(e)}
# 5. 下载桌面文件(备用文件收集方式)
@app.route('/download/<filename>', methods=['GET'])
def download_file(filename):
"""
从桌面下载文件
用作vmrun文件传输的备用方案
"""
try:
desktop = os.path.expanduser(r"~\Desktop")
filepath = os.path.join(desktop, filename)
if not os.path.exists(filepath):
return {"status": "error", "msg": f"File not found: {filename}"}, 404
return send_file(filepath, as_attachment=True, download_name=filename)
except Exception as e:
return {"status": "error", "msg": str(e)}, 500
if __name__ == '__main__':
# 监听 0.0.0.0 允许外部访问
print("\n" + "=" * 60)
print("JADE Agent Server 启动")
print("=" * 60)
print(f"监听地址: 0.0.0.0:5000")
print(f"屏幕分辨率: {SCREEN_WIDTH}x{SCREEN_HEIGHT}")
print(f"截图分辨率: {SCREENSHOT_WIDTH}x{SCREENSHOT_HEIGHT}")
print(f"DPI缩放: {DPI_SCALE}")
print("=" * 60)
print("\n可用接口:")
print(" GET /screenshot - 获取屏幕截图")
print(" GET /screen_info - 获取屏幕信息")
print(" POST /action - 执行动作")
print(" POST /reset - 重置环境")
print(" GET /list_desktop - 列出桌面文件")
print(" GET /download/<file> - 下载桌面文件")
print("=" * 60 + "\n")
app.run(host='0.0.0.0', port=5000)