Files
matagent/_backend/main.py
2025-01-10 09:57:42 +08:00

117 lines
5.0 KiB
Python

import asyncio
from typing import Sequence
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.conditions import MaxMessageTermination, TextMentionTermination
from autogen_agentchat.messages import AgentEvent, ChatMessage, TextMessage, ToolCallExecutionEvent
from autogen_agentchat.teams import SelectorGroupChat, RoundRobinGroupChat
from autogen_agentchat.ui import Console
from autogen_ext.models.openai import OpenAIChatCompletionClient
from constant import MODEL, OPENAI_API_KEY, OPENAI_BASE_URL
from tools import retrieval_from_knowledge_base, search_from_oqmd_by_composition
model_client = OpenAIChatCompletionClient(
model=MODEL,
base_url=OPENAI_BASE_URL,
api_key=OPENAI_API_KEY,
model_info={
"vision": True,
"function_calling": True,
"json_output": True,
"family": "unknown",
},
)
def create_team() -> SelectorGroupChat:
planning_agent = AssistantAgent(
"PlanningAgent",
description="An agent for planning tasks, this agent should be the first to engage when given a new task.",
model_client=model_client,
system_message="""
You are a planning agent.
Your job is to break down complex search tasks into smaller, manageable subtasks.
Assign these subtasks to the appropriate team members; not all team members are required to participate in every task.
Your team members are:
Vector search agent: Searches for paper information in Vector database of knowledge base.
OQMD search agent: Searches for crystal structure and property information in OQMD database by composition.
You only plan and delegate tasks - you do not execute them yourself.
When assigning tasks, use this format:
1. <agent> : <task>
After all search tasks are complete, summarize the findings and end with "TERMINATE".
""",
)
vector_search_agent = AssistantAgent(
"VectorSearcher",
description="A vector search agent.",
tools=[retrieval_from_knowledge_base],
model_client=model_client,
system_message="""
You are a vector search agent.
Your only tool is retrieval_from_knowledge_base - use it to find information.
You make only one search call at a time.
Once you have the results, you never do calculations based on them.
""",
reflect_on_tool_use=False, # Set to True to have the model reflect on the tool use, set to False to return the tool call result directly.
)
oqmd_database_search_agent = AssistantAgent(
"OQMDDatabaseSearcher",
description="A database search agent.",
tools=[search_from_oqmd_by_composition],
model_client=model_client,
system_message="""
You are a database search agent of OQMD.
Your only tool is search_from_oqmd_by_composition - use it to find information.
You make only one search call at a time.
Once you have the results, you never do calculations based on them.
""",
reflect_on_tool_use=False, # Set to True to have the model reflect on the tool use, set to False to return the tool call result directly.
)
# The termination condition is a combination of text mention termination and max message termination.
text_mention_termination = TextMentionTermination("TERMINATE")
max_messages_termination = MaxMessageTermination(max_messages=25)
termination = text_mention_termination | max_messages_termination
# The selector function is a function that takes the current message thread of the group chat
# and returns the next speaker's name. If None is returned, the LLM-based selection method will be used.
def selector_func(messages: Sequence[AgentEvent | ChatMessage]) -> str | None:
if messages[-1].source != planning_agent.name:
return planning_agent.name # Always return to the planning agent after the other agents have spoken.
return None
team = SelectorGroupChat(
[planning_agent, vector_search_agent, oqmd_database_search_agent],
model_client=model_client, # Use a smaller model for the selector.
termination_condition=termination,
selector_func=selector_func,
)
return team
async def main(task: str = "") -> dict:
team = create_team()
result = {}
async for message in team.run_stream(task=task):
if isinstance(message, TextMessage):
print(f"----------------{message.source}----------------\n {message.content}")
result[message.source] = message.content
elif isinstance(message, ToolCallExecutionEvent):
print(f"----------------{message.source}----------------\n {message.content}")
result[message.source] = [content.content for content in message.content]
return result
# Example usage in another function
async def main_1():
result = await main("How to synthesis CsPbBr3 nanocubes at room temperature?")
# Now you can use result in main_1
print(result)
if __name__ == "__main__":
asyncio.run(main_1())