Files
sci-gui-agent-benchmark/mm_agents/os_symphony/utils/loop_detection.py
2025-12-23 14:30:44 +08:00

217 lines
7.1 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import io
import math
import numpy as np
from typing import List, Tuple, Dict, Any, Optional
from PIL import Image, ImageDraw, ImageFont
from rapidfuzz import fuzz
import logging
from mm_agents.os_symphony.agents.memoryer_agent import StepBehavior
logger = logging.getLogger("desktopenv.loop_detection")
def _are_actions_similar(
action1: Dict[str, Any],
action2: Dict[str, Any],
image_width: int,
image_height: int,
relative_coord_threshold: float,
fuzzy_text_threshold: float,
) -> bool:
"""
[Internal Auxiliary] Determine if two actions are similar based on detailed rules.
Args:
action1: The first action.
action2: The second action.
image_width: The width of the screenshot.
image_height: The height of the screenshot.
relative_coord_threshold: A relative distance threshold for coordinate comparison.
fuzzy_text_threshold: A similarity threshold (0-100) for fuzzy text matching.
Returns:
Return True if the actions are similar, otherwise return False.
"""
# ensure same action
if action1.get("function") != action2.get("function"):
return False
func = action1.get("function")
args1 = action1.get("args", {})
args2 = action2.get("args", {})
diagonal = math.sqrt(image_width**2 + image_height**2)
abs_coord_thresh = relative_coord_threshold * diagonal
def are_coords_close(x1, y1, x2, y2):
if None in [x1, y1, x2, y2]: return False
distance = math.sqrt((x1 - x2)**2 + (y1 - y2)**2)
return distance < abs_coord_thresh
if func == "click":
return (
are_coords_close(args1.get("x"), args1.get("y"), args2.get("x"), args2.get("y")) and
args1.get("button") == args2.get("button") and
args1.get("clicks") == args2.get("clicks")
)
elif func == "open":
return args1.get("name") == args2.get("name")
elif func == "type":
if args1.get("x") and args1.get("y") and args2.get("x") and args2.get("y"):
return (
are_coords_close(args1.get("x"), args1.get("y"), args2.get("x"), args2.get("y")) and
args1.get("text") == args2.get("text")
)
else:
return args1.get("text") == args2.get("text")
elif func == "drag":
return (
are_coords_close(args1.get("x1"), args1.get("y1"), args2.get("x1"), args2.get("y1")) and
are_coords_close(args1.get("x2"), args1.get("y2"), args2.get("x2"), args2.get("y2"))
)
elif func == "set_cell_values":
return args1.get("text") == args2.get("text")
elif func == "scroll":
clicks1 = args1.get("clicks", 0)
clicks2 = args2.get("clicks", 0)
if (clicks1 == 0 and clicks2 != 0) or (clicks1 != 0 and clicks2 == 0):
same_direction = False
else:
same_direction = math.copysign(1, clicks1) == math.copysign(1, clicks2)
return (
are_coords_close(args1.get("x"), args1.get("y"), args2.get("x"), args2.get("y")) and
same_direction and
args1.get("shift") == args2.get("shift")
)
elif func == "key":
return args1.get("keys") == args2.get("keys")
elif func == "wait":
return True
elif func in ["call_code_agent", "call_search_agent"]:
query1 = args1.get("query", "")
query2 = args2.get("query", "")
# use Levenshtein distance to calculate fuzzy similarity
query_similarity = fuzz.token_set_ratio(query1, query2)
# print(f'query_sim: {query_similarity}')
return (
query_similarity >= fuzzy_text_threshold and
args1.get("result") == args2.get("result")
)
else:
return False
def _are_steps_similar_optimized(
step1: StepBehavior,
step2: StepBehavior,
idx1: int,
idx2: int,
full_trajectory: List[StepBehavior],
phash_threshold: int,
ssim_threshold: float,
# 动作比较所需的参数
image_width: int,
image_height: int,
relative_coord_threshold: float,
fuzzy_text_threshold: float,
) -> bool:
"""
[Internal Auxiliary] use pre-calculated data to quickly determine if the two actions are similar/
"""
if step1.phash is None or step2.phash is None:
return False
if (step1.phash - step2.phash) > phash_threshold:
return False
later_step_idx = max(idx1, idx2)
earlier_step_idx = min(idx1, idx2)
ssim_score = full_trajectory[later_step_idx].ssim_list[earlier_step_idx]
if ssim_score < ssim_threshold:
return False
if not _are_actions_similar(
step1.action_dict, step2.action_dict,
image_width, image_height, relative_coord_threshold, fuzzy_text_threshold
):
return False
return True
def detect_loop(
full_trajectory: List[StepBehavior],
image_width: int = 1920,
image_height: int = 1080,
N: int = 3,
phash_threshold: int = 1,
ssim_threshold: float = 0.99,
relative_coord_threshold: float = 0.02,
fuzzy_text_threshold: float = 85.0,
) -> Tuple[bool, Optional[Dict[str, List[int]]]]:
"""
Efficiently detect the presence of looping patterns based on precomputed data.
Args:
full_trajectory (List[StepBehavior]): Full history including the current step.
image_width (int): Width of the screenshot.
image_height (int): Height of the screenshot.
N (int): Number of steps in the candidate loop (sequence length).
phash_threshold (int): Hamming distance threshold for pHash similarity. Recommended: 02.
ssim_threshold (float): SSIM similarity threshold for image comparison. Recommended: 0.950.99.
relative_coord_threshold (float): Relative threshold for coordinate similarity. Recommended: 0.010.05.
fuzzy_text_threshold (float): Fuzzy text matching similarity threshold (0100) for agent queries.
Returns:
A tuple (is_loop_detected, loop_info):
- is_loop_detected (bool): Whether a loop is detected.
- loop_info (Dict | None): If a loop is detected, contains the indices of the two matching sequences.
"""
L = len(full_trajectory)
if not isinstance(N, int) or N <= 0 or L < 2 * N:
return False, None
max_start_index = L - 2 * N
for i in range(max_start_index, -1, -1):
is_potential_match = True
for j in range(N):
idx_prev = i + j
idx_curr = (L - N) + j
step_prev = full_trajectory[idx_prev]
step_curr = full_trajectory[idx_curr]
if not _are_steps_similar_optimized(
step_prev, step_curr, idx_prev, idx_curr, full_trajectory,
phash_threshold, ssim_threshold,
image_width, image_height, relative_coord_threshold, fuzzy_text_threshold
):
is_potential_match = False
break
if is_potential_match:
previous_sequence_indices = list(range(i, i + N))
loop_info = {
"match_sequence_indices": previous_sequence_indices
}
return True, loop_info
return False, None