From 87c87aa32827e2ce3c5bc1a4fb9e842268c91058 Mon Sep 17 00:00:00 2001 From: Yutang Li Date: Sat, 4 Jan 2025 19:56:33 +0800 Subject: [PATCH] update --- backend/views.py | 331 +++++++++++++++++++++++++++++++++++++---------- 1 file changed, 262 insertions(+), 69 deletions(-) diff --git a/backend/views.py b/backend/views.py index 48ca2b9..9a9db8e 100644 --- a/backend/views.py +++ b/backend/views.py @@ -10,6 +10,7 @@ from autogen.agentchat.contrib.capabilities.teachability import Teachability from autogen.agentchat.contrib.capabilities.vision_capability import VisionCapability from autogen.agentchat.contrib.multimodal_conversable_agent import MultimodalConversableAgent from pathlib import Path +import websockets import autogen import os from .constant import config_list, STREAM, SILENT, WORK_DIR @@ -32,6 +33,7 @@ from channels.generic.websocket import AsyncWebsocketConsumer venv_context = create_virtual_env(WORK_DIR) llm_config = {"config_list": config_list, "stream": True} +import threading class ChatConsumer(AsyncWebsocketConsumer): @@ -49,84 +51,81 @@ class ChatConsumer(AsyncWebsocketConsumer): await self.handle_websocket_connection(text_data) async def handle_websocket_connection(self, text_data): - # 启动独立的 WebSocket 服务器,使用异步线程避免阻塞 - uri = "ws://localhost:8765" # 示例 URI,可以替换为你的 URI - - # 启动 WebSocket 服务器在后台线程中运行 - await asyncio.to_thread(self.run_websocket_server, uri, text_data) + # 启动 WebSocket 服务器的同步部分在后台线程中 + threading.Thread(target=self.run_websocket_server, args=(text_data,), daemon=True).start() - def run_websocket_server(self, uri, text_data): + def run_websocket_server(self, text_data): """ - 这是一个同步方法,负责启动 IOWebsockets 的服务器并处理消息 - 使用 asyncio.to_thread() 将其异步化 + 启动 IOWebsocket 服务器,处理消息并与客户端进行交互 """ + uri = "ws://localhost:8765" # 你要连接的 WebSocket 服务器 URI + # 运行 WebSocket 服务器并处理消息 + print(f"Starting WebSocket server at {uri}") with IOWebsockets.run_server_in_thread(on_connect=self.handle_message, port=8765) as uri: - print(f" - test_setup() with websocket server running on {uri}.", flush=True) + asyncio.run(self.connect_to_server(uri, text_data)) - with ws_connect(uri) as websocket: - print(f" - Connected to server on {uri}", flush=True) + async def connect_to_server(self, uri, text_data): + # async with ws_connect(uri) as websocket: + async with websockets.connect(uri) as websocket: + print(f" - Connected to server on {uri}", flush=True) - print(" - Sending message to server.", flush=True) - print(text_data) - # json_data = json.loads(text_data) - # chat_id = json_data['chat_id'] - # websocket.send(json_data['message']) - websocket.send(text_data) + print(" - Sending message to server.", flush=True) + print(text_data) + json_data = json.loads(json.loads(text_data)) + chat_id = json_data['chat_id'] + await websocket.send(json_data['message']) - import re - current_agent = "User" - while True: - message = websocket.recv() - message = message.decode("utf-8") if isinstance(message, bytes) else message - print(message, end="", flush=True) + import re + current_agent = "User" + while True: + message = await websocket.recv() + message = message.decode("utf-8") if isinstance(message, bytes) else message + print(message, end="", flush=True) - cleaned_string = re.sub(r'\x1b\[[0-?]*[ -/]*[@-~]', '', message) - # match = re.findall(r"(\w+)\s*\(to\s+(\w+)\)", cleaned_string) - # print(match) - # if len(match)==1: - # current_agent = match[0][0] if current_agent != match[0][0] else current_agent - # print(f"A: {match[0][0]}, B: {match[0][1]}") - if "Next speaker" in cleaned_string: - match = re.search(r"Next\s+speaker:\s+(\w+)", cleaned_string) - current_agent = match.group(1) - else: - if cleaned_string == "\n--------------------------------------------------------------------------------\n": - continue - if cleaned_string == "\n********************************************************************************\n": - continue - if cleaned_string == "Starting a new chat....\n": - continue - if cleaned_string == "\n>>>>>>>> USING AUTO REPLY...\n": - continue - match = re.findall(r"(\w+)\s*\(to\s+(\w+)\)", cleaned_string) - if len(match)==1: - continue + cleaned_string = re.sub(r'\x1b\[[0-?]*[ -/]*[@-~]', '', message) + cleaned_string = cleaned_string.replace('\n>>>>>>>> USING AUTO REPLY...', '').replace('\n>>>>>>>> ', '') + if "Next speaker: User" in cleaned_string: + print('wcnm') - if current_agent in ['Outer_Retrieval_Admin', 'Outer_Generate_Admin', 'Outer_Converter_Admin']: - current_agent = current_agent.replace('Outer_', '') - if current_agent in ['vector_code_executor']: - continue - - if current_agent == 'User': - group_name = 'Planner' - if current_agent in ['vector_searcher','vector_code_executor', 'graphrag_searcher', 'graphrag_code_executor', 'web_searcher', 'web_summary', 'Outer_Retrieval_Admin']: - group_name = 'Retrieval' - if current_agent in ['structure_scientist','property_scientist', 'application_scientist', 'synthesis_scientist', 'scheme_critic', 'Outer_Generate_Admin']: - group_name = 'Generator' - if current_agent in ['scheme_converter','converter_critic', 'mergrid_ploter', 'scheme_code_writer', 'scheme_code_critic', 'Outer_Converter_Admin']: - group_name = 'Converter' - if current_agent in ['experiment_executor','expriment_code_writer', 'data_collector', 'collector_code_writer', 'Outer_Executor_Admin']: - group_name = 'Executor' - if current_agent in ['analysis_executor','analysis_pl_uv', 'analysis_picturer', 'Experiment_Optimizer', 'optimizer_critic', 'Outer_Analysis_Admin']: - group_name = 'Optimizer' + if "Next speaker" in cleaned_string: + match = re.search(r"Next\s+speaker:\s+(\w+)", cleaned_string) + current_agent = match.group(1) + else: + if cleaned_string == "\n--------------------------------------------------------------------------------\n": + continue + if cleaned_string == "\n********************************************************************************\n": + continue + if cleaned_string == "Starting a new chat....\n": + continue + if cleaned_string == "\n>>>>>>>> USING AUTO REPLY...\n": + continue + match = re.findall(r"(\w+)\s*\(to\s+(\w+)\)", cleaned_string) + if len(match)==1: + continue - content = {"group_name": group_name, "agent_name": current_agent.replace("_", " ").title(), "content": cleaned_string.replace('>>>>>>>>', '')} - # 通过 WebSocket 消费者发送消息 - asyncio.run(self.send(text_data=json.dumps(content))) + if current_agent in ['Outer_Retrieval_Admin', 'Outer_Generate_Admin', 'Outer_Converter_Admin']: + current_agent = current_agent.replace('Outer_', '') + if current_agent in ['vector_code_executor']: + continue + + if current_agent == 'User': + group_name = 'Planner' + if current_agent in ['vector_searcher','vector_code_executor', 'graphrag_searcher', 'graphrag_code_executor', 'web_searcher', 'web_summary', 'Outer_Retrieval_Admin']: + group_name = 'Retrieval' + if current_agent in ['structure_scientist','property_scientist', 'application_scientist', 'synthesis_scientist', 'scheme_critic', 'Outer_Generate_Admin']: + group_name = 'Generator' + if current_agent in ['scheme_converter','converter_critic', 'mergrid_ploter', 'scheme_code_writer', 'scheme_code_critic', 'Outer_Converter_Admin']: + group_name = 'Converter' + if current_agent in ['experiment_executor','expriment_code_writer', 'data_collector', 'collector_code_writer', 'Outer_Executor_Admin']: + group_name = 'Executor' + if current_agent in ['analysis_executor','analysis_pl_uv', 'analysis_picturer', 'Experiment_Optimizer', 'optimizer_critic', 'Outer_Analysis_Admin']: + group_name = 'Optimizer' + if 'TERMINATE' in cleaned_string or (group_name != 'Retrieval' and len(cleaned_string) > 100): + continue + + content = {"group_name": group_name, "agent_name": current_agent.replace("_", " ").title(), "content": cleaned_string} + await self.send(text_data=json.dumps(content)) - # if "TERMINATE" in message: - # print(" - Received TERMINATE message. Exiting.", flush=True) - # break def handle_message(self, iostream: IOWebsockets): @@ -136,7 +135,7 @@ class ChatConsumer(AsyncWebsocketConsumer): user = autogen.UserProxyAgent( name="User", chat_messages=None, - human_input_mode="NEVER", + human_input_mode="ALWAYS", code_execution_config={ "work_dir": WORK_DIR, "use_docker": False, @@ -178,7 +177,7 @@ class ChatConsumer(AsyncWebsocketConsumer): optimizer], messages=[], speaker_selection_method=state_transition, - max_round=50, + max_round=60, ) matagent_admin_name = "Planer" @@ -226,6 +225,200 @@ class ChatConsumer(AsyncWebsocketConsumer): cache=False ) + return iostream + + + +# class ChatConsumer(AsyncWebsocketConsumer): +# async def connect(self): +# await self.accept() + +# async def disconnect(self, close_code): +# pass + +# async def receive(self, text_data): +# # 在这里处理接收到的消息 +# print(f"Received message: {text_data}") + +# # 异步启动 WebSocket 连接和处理过程 +# await self.handle_websocket_connection(text_data) + +# async def handle_websocket_connection(self, text_data): +# # 启动独立的 WebSocket 服务器,使用异步线程避免阻塞 +# uri = "ws://localhost:8765" # 示例 URI,可以替换为你的 URI + +# # 启动 WebSocket 服务器在后台线程中运行 +# await asyncio.to_thread(self.run_websocket_server, uri, text_data) + +# def run_websocket_server(self, uri, text_data): +# """ +# 这是一个同步方法,负责启动 IOWebsockets 的服务器并处理消息 +# 使用 asyncio.to_thread() 将其异步化 +# """ +# with IOWebsockets.run_server_in_thread(on_connect=self.handle_message, port=8765) as uri: +# print(f" - test_setup() with websocket server running on {uri}.", flush=True) + +# with ws_connect(uri) as websocket: +# print(f" - Connected to server on {uri}", flush=True) + +# print(" - Sending message to server.", flush=True) +# print(text_data) +# json_data = json.loads(json.loads(text_data)) +# chat_id = json_data['chat_id'] +# websocket.send(json_data['message']) + +# import re +# current_agent = "User" +# while True: +# message = websocket.recv() +# message = message.decode("utf-8") if isinstance(message, bytes) else message +# print(message, end="", flush=True) + +# cleaned_string = re.sub(r'\x1b\[[0-?]*[ -/]*[@-~]', '', message) +# cleaned_string = cleaned_string.replace('\n>>>>>>>> USING AUTO REPLY...', '').replace('\n>>>>>>>> ', '') +# if "Next speaker" in cleaned_string: +# match = re.search(r"Next\s+speaker:\s+(\w+)", cleaned_string) +# current_agent = match.group(1) +# else: +# if cleaned_string == "\n--------------------------------------------------------------------------------\n": +# continue +# if cleaned_string == "\n********************************************************************************\n": +# continue +# if cleaned_string == "Starting a new chat....\n": +# continue +# if cleaned_string == "\n>>>>>>>> USING AUTO REPLY...\n": +# continue +# match = re.findall(r"(\w+)\s*\(to\s+(\w+)\)", cleaned_string) +# if len(match)==1: +# continue + +# if current_agent in ['Outer_Retrieval_Admin', 'Outer_Generate_Admin', 'Outer_Converter_Admin']: +# current_agent = current_agent.replace('Outer_', '') +# if current_agent in ['vector_code_executor']: +# continue + +# if current_agent == 'User': +# group_name = 'Planner' +# if current_agent in ['vector_searcher','vector_code_executor', 'graphrag_searcher', 'graphrag_code_executor', 'web_searcher', 'web_summary', 'Outer_Retrieval_Admin']: +# group_name = 'Retrieval' +# if current_agent in ['structure_scientist','property_scientist', 'application_scientist', 'synthesis_scientist', 'scheme_critic', 'Outer_Generate_Admin']: +# group_name = 'Generator' +# if current_agent in ['scheme_converter','converter_critic', 'mergrid_ploter', 'scheme_code_writer', 'scheme_code_critic', 'Outer_Converter_Admin']: +# group_name = 'Converter' +# if current_agent in ['experiment_executor','expriment_code_writer', 'data_collector', 'collector_code_writer', 'Outer_Executor_Admin']: +# group_name = 'Executor' +# if current_agent in ['analysis_executor','analysis_pl_uv', 'analysis_picturer', 'Experiment_Optimizer', 'optimizer_critic', 'Outer_Analysis_Admin']: +# group_name = 'Optimizer' +# if 'TERMINATE' in cleaned_string or (group_name != 'Retrieval' and len(cleaned_string) > 100): +# continue + +# content = {"group_name": group_name, "agent_name": current_agent.replace("_", " ").title(), "content": cleaned_string} + +# # 通过 WebSocket 消费者发送消息 +# asyncio.run(self.send(text_data=json.dumps(content))) + +# # if "TERMINATE" in message: +# # print(" - Received TERMINATE message. Exiting.", flush=True) +# # break + + +# def handle_message(self, iostream: IOWebsockets): +# initial_msg = iostream.input() +# agent_configs = load_agent_configs( +# os.path.join(os.path.dirname(os.path.abspath(__file__)), "config/plan_group.yaml")) +# user = autogen.UserProxyAgent( +# name="User", +# chat_messages=None, +# human_input_mode="NEVER", +# code_execution_config={ +# "work_dir": WORK_DIR, +# "use_docker": False, +# }, +# is_termination_msg=lambda x: x.get("content", "").find("TERMINATE") >= 0, +# description="User", +# ) + +# inner_retrieval_admin, outer_retrieval_agent = init_retrieval_group(WORK_DIR, venv_context) +# inner_generate_admin, outer_generate_agent = init_generate_group(outer_retrieval_agent, inner_retrieval_admin) +# inner_converter_admin, outer_converter_agent = init_converter_group() +# inner_executor_admin, outer_executor_agent = init_executor_group(WORK_DIR, venv_context) +# inner_analysis_admin, outer_analysis_agent, optimizer = init_optimize_group(WORK_DIR, venv_context) + +# def state_transition(last_speaker, groupchat): +# messages = groupchat.messages + +# if last_speaker is user: +# if len(messages) <= 1: +# return outer_generate_agent +# else: +# return "auto" +# elif last_speaker is outer_generate_agent: +# if "synthesis" in messages[-1]["content"].lower(): +# return outer_converter_agent +# else: +# return user +# elif last_speaker is outer_converter_agent: +# return outer_executor_agent +# elif last_speaker is outer_executor_agent: +# return outer_analysis_agent +# elif last_speaker is outer_analysis_agent: +# return optimizer +# else: +# return user + +# matagent_group = autogen.GroupChat( +# agents=[user, outer_generate_agent, outer_converter_agent, outer_executor_agent, outer_analysis_agent, +# optimizer], +# messages=[], +# speaker_selection_method=state_transition, +# max_round=50, +# ) + +# matagent_admin_name = "Planer" +# matagent_admin = autogen.GroupChatManager( +# name=matagent_admin_name, +# groupchat=matagent_group, +# # is_termination_msg=lambda x: x.get("content", "").find("TERMINATE") >= 0, +# llm_config=llm_config, +# system_message=agent_configs[matagent_admin_name]['system_message'], +# description=matagent_admin_name +# ) + +# outer_generate_agent.register_nested_chats( +# [ +# {"recipient": inner_generate_admin, "max_turn": 1, "summary_method": "last_msg", "silent": SILENT}, +# ], +# trigger=matagent_admin, +# #reply_func_from_nested_chats = +# ) + +# outer_converter_agent.register_nested_chats( +# [ +# {"recipient": inner_converter_admin, "max_turn": 1, "summary_method": "last_msg", "silent": SILENT}, +# ], +# trigger=matagent_admin, +# ) + +# outer_executor_agent.register_nested_chats( +# [ +# {"recipient": inner_executor_admin, "max_turn": 1, "summary_method": "last_msg", "silent": SILENT}, +# ], +# trigger=matagent_admin, +# ) + +# outer_analysis_agent.register_nested_chats( +# [ +# {"recipient": inner_analysis_admin, "max_turn": 1, "summary_method": "last_msg", "silent": SILENT}, +# ], +# trigger=matagent_admin, +# ) + +# user.initiate_chat( +# matagent_admin, +# message=initial_msg, # "如何在常温条件下制备CsPbBr3纳米立方体", +# cache=False +# ) +# return iostream