from autogen.coding import LocalCommandLineCodeExecutor import autogen import os from .constant import config_list, STREAM, SILENT, WORK_DIR, CACHE from .utils import load_agent_configs,retrieval_from_knowledge_base, web_searcher, retrieval_from_graphrag agent_configs = load_agent_configs(os.path.join(os.path.dirname(os.path.abspath(__file__)), "config/retrieval_group.yaml")) llm_config = {"config_list": config_list, "stream": STREAM, "cache_seed": CACHE} def init_retrieval_group(work_dir, venv_context): # 向量检索 vector_executor = LocalCommandLineCodeExecutor( work_dir=work_dir, virtual_env_context=venv_context, functions=[retrieval_from_knowledge_base] ) vector_code_executor = autogen.UserProxyAgent( name="vector_code_executor", human_input_mode="NEVER", llm_config=False, code_execution_config={ "executor": vector_executor, }, description="vector_code_executor", max_consecutive_auto_reply=1 ) vector_searcher_name = "vector_searcher" vector_searcher_system_message = agent_configs[vector_searcher_name]["system_message"] + vector_executor.format_functions_for_prompt() vector_searcher = autogen.AssistantAgent( name=vector_searcher_name, human_input_mode="NEVER", system_message=vector_searcher_system_message, llm_config={"config_list": config_list}, # is_termination_msg=lambda x: x.get("content", "").find("TERMINATE") >= 0, # human_input_mode="TERMINATE", description=vector_searcher_name, max_consecutive_auto_reply=1 ) # graphrag retrieval agent graphrag_executor = LocalCommandLineCodeExecutor( work_dir=work_dir, virtual_env_context=venv_context, functions=[retrieval_from_graphrag] ) graphrag_code_executor = autogen.UserProxyAgent( name="graphrag_code_executor", human_input_mode="NEVER", llm_config=False, code_execution_config={ "executor": graphrag_executor, }, description="graphrag_code_executor", max_consecutive_auto_reply=1 ) graphrag_searcher_name = "graphrag_searcher" graphrag_searcher_system_message = agent_configs[graphrag_searcher_name]["system_message"] + graphrag_executor.format_functions_for_prompt() graphrag_searcher = autogen.AssistantAgent( name=graphrag_searcher_name, human_input_mode="NEVER", system_message=graphrag_searcher_system_message, llm_config={"config_list": config_list}, # is_termination_msg=lambda x: x.get("content", "").find("TERMINATE") >= 0, # human_input_mode="TERMINATE", description=graphrag_searcher_name, max_consecutive_auto_reply=1 ) # web searcher with jina reader web_executor = LocalCommandLineCodeExecutor( work_dir=work_dir, virtual_env_context=venv_context, functions=[web_searcher] ) web_code_executor = autogen.UserProxyAgent( name="web_code_executor", human_input_mode="NEVER", llm_config=False, code_execution_config={ "executor": web_executor, }, description="web_code_executor", ) web_seacher_name = "web_searcher" web_seacher_system_message = agent_configs[web_seacher_name]["system_message"] + web_executor.format_functions_for_prompt() web_seacher = autogen.AssistantAgent( name=web_seacher_name, human_input_mode="NEVER", system_message=web_seacher_system_message, llm_config={"config_list": config_list}, # is_termination_msg=lambda x: x.get("content", "").find("TERMINATE") >= 0, # human_input_mode="TERMINATE", description=web_seacher_name, ) web_summary = autogen.AssistantAgent( name="web_summary", human_input_mode="NEVER", system_message="You are a web summary agent. You are given a task and a web search result. You need to summarize the web search result to answer the user's request. DONOT ANSWER THE USER'S REQUEST DIRECTLY.", llm_config={"config_list": config_list}, description="web_summary", ) outer_retrieval_agent = autogen.ConversableAgent( name="Outer_Retrieval_Admin", human_input_mode="NEVER", # human_input_mode="TERMINATE", code_execution_config={ "work_dir": work_dir, "use_docker": False, }, # is_termination_msg=lambda x: x.get("content", "").find("TERMINATE") >= 0, description="Outer_Retrieval_Admin", ) def state_transition(last_speaker, groupchat): messages = groupchat.messages if last_speaker is outer_retrieval_agent: return vector_searcher # 向量检索 elif last_speaker is vector_searcher: return vector_code_executor elif last_speaker is vector_code_executor: if "NULL" in messages[-1]["content"]: return 'auto' else: return outer_retrieval_agent # 知识图谱检索 elif last_speaker is graphrag_searcher: return graphrag_code_executor elif last_speaker is graphrag_code_executor: return outer_retrieval_agent # 向量检索为空时尝试web搜索 elif last_speaker is web_seacher: return web_code_executor elif last_speaker is web_code_executor: return web_summary elif last_speaker is web_summary: return outer_retrieval_agent retrieval_group = autogen.GroupChat( agents=[outer_retrieval_agent, vector_searcher, vector_code_executor, graphrag_searcher, graphrag_code_executor, web_seacher, web_code_executor, web_summary], messages=[], speaker_selection_method=state_transition, # With two agents, this is equivalent to a 1:1 conversation. allow_repeat_speaker=False, max_round=8, ) inner_retrieval_admin = autogen.GroupChatManager( name="Inner_Retrieval_Admin", groupchat=retrieval_group, # is_termination_msg=lambda x: x.get("content", "").find("TERMINATE") >= 0, llm_config={"config_list": config_list}, human_input_mode="NEVER", description="Inner_Retrieval_Admin", ) return inner_retrieval_admin, outer_retrieval_agent