Files
sci-gui-agent-benchmark/mm_agents/uipath/agent.py
alexandruilie7 f59cf00cae Add ui agent (#343)
* add uipath agent

* readme update
2025-09-24 19:42:46 +08:00

224 lines
8.9 KiB
Python

import json
from mm_agents.uipath.types_utils import (
ComputerUseAction,
ComputerUseStep,
SupportedActions,
PlanActionType,
PlanAction,
key_maps,
ExecutionState,
State,
)
import mm_agents.uipath.utils as utils
from mm_agents.uipath.action_planner import ActionPlanner, PlannerOutput
from mm_agents.uipath.grounder_client import GrounderClient
class UiPathComputerUseV1(object):
def __init__(self):
self.planner = ActionPlanner()
self.executor = GrounderClient()
async def predict_request(
self, request_body: dict, model_name: str
) -> tuple[dict, dict]:
state = State(
task=request_body["userTask"],
image_base64=request_body["image"],
previous_steps=request_body.get("previousSteps", []),
)
execution_state = ExecutionState(model_name=model_name, execution_info={})
output = await self.predict(state, execution_state)
return output
def process_grounding(
self,
plan_action: PlanAction,
grounding_result: utils.GroundingOutput,
x: int,
y: int,
):
match plan_action.action_type:
case PlanActionType.Scroll:
# guess the scroll direction if missing in the plan output
if "direction" not in plan_action.parameters:
if "scroll up" in plan_action.description.lower():
scroll_direction = "up"
else:
scroll_direction = "down"
else:
scroll_direction = plan_action.parameters["direction"]
action = ComputerUseAction(
name=SupportedActions.Scroll,
description=plan_action.description,
parameters={"position": [x, y], "direction": scroll_direction},
)
if "distance" in plan_action.parameters:
match scroll_direction:
case "up":
action.parameters["offset"] = [
0,
plan_action.parameters["distance"],
]
case "down":
action.parameters["offset"] = [
0,
-plan_action.parameters["distance"],
]
case "left":
action.parameters["offset"] = [
plan_action.parameters["distance"],
0,
]
case "right":
action.parameters["offset"] = [
-plan_action.parameters["distance"],
0,
]
case PlanActionType.Drag:
assert grounding_result.end_position is not None, (
"End position must be provided for drag action"
)
x_end, y_end = grounding_result.end_position
action = ComputerUseAction(
name=SupportedActions.Drag,
description=plan_action.description,
parameters={
"path": [
{"x": x, "y": y},
{"x": x_end, "y": y_end},
]
},
)
case _:
action_name = plan_action.action_type
parameters = {"position": [x, y]}
if plan_action.action_type == PlanActionType.DoubleClick:
action_name = SupportedActions.Click
parameters["click_type"] = "double"
elif plan_action.action_type == PlanActionType.RightClick:
action_name = SupportedActions.Click
parameters["button"] = "right"
elif plan_action.action_type == PlanActionType.MouseMove:
action_name = SupportedActions.MouseMove # different names
assert action_name in [
SupportedActions.Click,
SupportedActions.MouseMove,
]
action = ComputerUseAction(
name=action_name,
description=plan_action.description,
parameters=parameters,
)
return action
async def predict(
self, state: State, execution_state: ExecutionState
) -> tuple[dict, dict]:
planer_output: PlannerOutput = self.planner.predict(state, execution_state)
plan_action = planer_output.plan_action
action: ComputerUseAction | None = None
step: ComputerUseStep | None = None
match plan_action.action_type:
case PlanActionType.KeyPress:
keys = plan_action.parameters["key"].split(" ")
keys = [key.strip() for key in keys]
keys = [key_maps.get(key, key) for key in keys]
action = ComputerUseAction(
name=SupportedActions.KeyPress,
description=plan_action.description,
parameters={"keys": keys},
)
case PlanActionType.Wait:
action = ComputerUseAction(
name=SupportedActions.Wait,
description=plan_action.description,
parameters={},
)
case PlanActionType.ExtractData:
# return a step with no action, just to store the extracted data
step = ComputerUseStep(
description=plan_action.description,
actions=[],
additional_parameters={
"extracted_data": plan_action.parameters,
},
thought=planer_output.thought,
)
case PlanActionType.Finish:
action = ComputerUseAction(
name=SupportedActions.Finish,
description=plan_action.description,
parameters=plan_action.parameters,
)
case (
PlanActionType.Click
| PlanActionType.MouseMove
| PlanActionType.Scroll
| PlanActionType.Drag
| PlanActionType.DoubleClick
| PlanActionType.RightClick
):
if plan_action.action_type != PlanActionType.Drag:
grounding_result = await self.executor.predict(
state.image_base64,
plan_action.description,
action=plan_action.action_type,
)
else:
grounding_result = await self.executor.predict(
state.image_base64,
plan_action.parameters["start_description"],
action=plan_action.action_type,
)
grounding_result_end = await self.executor.predict(
state.image_base64,
plan_action.parameters["end_description"],
action=plan_action.action_type,
)
grounding_result.end_position = grounding_result_end.position
x, y = grounding_result.position
action = self.process_grounding(plan_action, grounding_result, x, y)
case PlanActionType.Type:
action = ComputerUseAction(
name=SupportedActions.TypeInto,
description=plan_action.description,
parameters={"value": plan_action.parameters["text"]},
)
if step is None:
assert action is not None
step = ComputerUseStep(
description=plan_action.description,
actions=[action],
additional_parameters={},
thought=planer_output.thought,
)
# save additional data for history
assert step.additional_parameters is not None
step.additional_parameters["thought"] = planer_output.thought
step.additional_parameters["review"] = planer_output.review
step.additional_parameters.update(planer_output.additional_sections)
step.additional_parameters["plan_action"] = json.dumps(plan_action.to_dict())
history_image = state.image_base64
previous_steps_parameters = {
"max_chat_history_messages": 1000,
"max_chat_history_images": self.planner.number_history_steps_with_images,
"image": history_image,
}
agent_response = {
"step": step.to_response_dict(),
"previous_steps_parameters": previous_steps_parameters,
}
return agent_response