This commit is contained in:
2025-01-04 19:56:33 +08:00
parent 34f1a581ab
commit 87c87aa328

View File

@@ -10,6 +10,7 @@ from autogen.agentchat.contrib.capabilities.teachability import Teachability
from autogen.agentchat.contrib.capabilities.vision_capability import VisionCapability
from autogen.agentchat.contrib.multimodal_conversable_agent import MultimodalConversableAgent
from pathlib import Path
import websockets
import autogen
import os
from .constant import config_list, STREAM, SILENT, WORK_DIR
@@ -32,6 +33,7 @@ from channels.generic.websocket import AsyncWebsocketConsumer
venv_context = create_virtual_env(WORK_DIR)
llm_config = {"config_list": config_list, "stream": True}
import threading
class ChatConsumer(AsyncWebsocketConsumer):
@@ -49,84 +51,81 @@ class ChatConsumer(AsyncWebsocketConsumer):
await self.handle_websocket_connection(text_data)
async def handle_websocket_connection(self, text_data):
# 启动独立的 WebSocket 服务器,使用异步线程避免阻塞
uri = "ws://localhost:8765" # 示例 URI可以替换为你的 URI
# 启动 WebSocket 服务器在后台线程中运行
await asyncio.to_thread(self.run_websocket_server, uri, text_data)
# 启动 WebSocket 服务器的同步部分在后台线程中
threading.Thread(target=self.run_websocket_server, args=(text_data,), daemon=True).start()
def run_websocket_server(self, uri, text_data):
def run_websocket_server(self, text_data):
"""
这是一个同步方法,负责启动 IOWebsockets 的服务器处理消息
使用 asyncio.to_thread() 将其异步化
启动 IOWebsocket 服务器处理消息并与客户端进行交互
"""
uri = "ws://localhost:8765" # 你要连接的 WebSocket 服务器 URI
# 运行 WebSocket 服务器并处理消息
print(f"Starting WebSocket server at {uri}")
with IOWebsockets.run_server_in_thread(on_connect=self.handle_message, port=8765) as uri:
print(f" - test_setup() with websocket server running on {uri}.", flush=True)
asyncio.run(self.connect_to_server(uri, text_data))
with ws_connect(uri) as websocket:
print(f" - Connected to server on {uri}", flush=True)
async def connect_to_server(self, uri, text_data):
# async with ws_connect(uri) as websocket:
async with websockets.connect(uri) as websocket:
print(f" - Connected to server on {uri}", flush=True)
print(" - Sending message to server.", flush=True)
print(text_data)
# json_data = json.loads(text_data)
# chat_id = json_data['chat_id']
# websocket.send(json_data['message'])
websocket.send(text_data)
print(" - Sending message to server.", flush=True)
print(text_data)
json_data = json.loads(json.loads(text_data))
chat_id = json_data['chat_id']
await websocket.send(json_data['message'])
import re
current_agent = "User"
while True:
message = websocket.recv()
message = message.decode("utf-8") if isinstance(message, bytes) else message
print(message, end="", flush=True)
import re
current_agent = "User"
while True:
message = await websocket.recv()
message = message.decode("utf-8") if isinstance(message, bytes) else message
print(message, end="", flush=True)
cleaned_string = re.sub(r'\x1b\[[0-?]*[ -/]*[@-~]', '', message)
# match = re.findall(r"(\w+)\s*\(to\s+(\w+)\)", cleaned_string)
# print(match)
# if len(match)==1:
# current_agent = match[0][0] if current_agent != match[0][0] else current_agent
# print(f"A: {match[0][0]}, B: {match[0][1]}")
if "Next speaker" in cleaned_string:
match = re.search(r"Next\s+speaker:\s+(\w+)", cleaned_string)
current_agent = match.group(1)
else:
if cleaned_string == "\n--------------------------------------------------------------------------------\n":
continue
if cleaned_string == "\n********************************************************************************\n":
continue
if cleaned_string == "Starting a new chat....\n":
continue
if cleaned_string == "\n>>>>>>>> USING AUTO REPLY...\n":
continue
match = re.findall(r"(\w+)\s*\(to\s+(\w+)\)", cleaned_string)
if len(match)==1:
continue
cleaned_string = re.sub(r'\x1b\[[0-?]*[ -/]*[@-~]', '', message)
cleaned_string = cleaned_string.replace('\n>>>>>>>> USING AUTO REPLY...', '').replace('\n>>>>>>>> ', '')
if "Next speaker: User" in cleaned_string:
print('wcnm')
if current_agent in ['Outer_Retrieval_Admin', 'Outer_Generate_Admin', 'Outer_Converter_Admin']:
current_agent = current_agent.replace('Outer_', '')
if current_agent in ['vector_code_executor']:
continue
if current_agent == 'User':
group_name = 'Planner'
if current_agent in ['vector_searcher','vector_code_executor', 'graphrag_searcher', 'graphrag_code_executor', 'web_searcher', 'web_summary', 'Outer_Retrieval_Admin']:
group_name = 'Retrieval'
if current_agent in ['structure_scientist','property_scientist', 'application_scientist', 'synthesis_scientist', 'scheme_critic', 'Outer_Generate_Admin']:
group_name = 'Generator'
if current_agent in ['scheme_converter','converter_critic', 'mergrid_ploter', 'scheme_code_writer', 'scheme_code_critic', 'Outer_Converter_Admin']:
group_name = 'Converter'
if current_agent in ['experiment_executor','expriment_code_writer', 'data_collector', 'collector_code_writer', 'Outer_Executor_Admin']:
group_name = 'Executor'
if current_agent in ['analysis_executor','analysis_pl_uv', 'analysis_picturer', 'Experiment_Optimizer', 'optimizer_critic', 'Outer_Analysis_Admin']:
group_name = 'Optimizer'
if "Next speaker" in cleaned_string:
match = re.search(r"Next\s+speaker:\s+(\w+)", cleaned_string)
current_agent = match.group(1)
else:
if cleaned_string == "\n--------------------------------------------------------------------------------\n":
continue
if cleaned_string == "\n********************************************************************************\n":
continue
if cleaned_string == "Starting a new chat....\n":
continue
if cleaned_string == "\n>>>>>>>> USING AUTO REPLY...\n":
continue
match = re.findall(r"(\w+)\s*\(to\s+(\w+)\)", cleaned_string)
if len(match)==1:
continue
content = {"group_name": group_name, "agent_name": current_agent.replace("_", " ").title(), "content": cleaned_string.replace('>>>>>>>>', '')}
# 通过 WebSocket 消费者发送消息
asyncio.run(self.send(text_data=json.dumps(content)))
if current_agent in ['Outer_Retrieval_Admin', 'Outer_Generate_Admin', 'Outer_Converter_Admin']:
current_agent = current_agent.replace('Outer_', '')
if current_agent in ['vector_code_executor']:
continue
if current_agent == 'User':
group_name = 'Planner'
if current_agent in ['vector_searcher','vector_code_executor', 'graphrag_searcher', 'graphrag_code_executor', 'web_searcher', 'web_summary', 'Outer_Retrieval_Admin']:
group_name = 'Retrieval'
if current_agent in ['structure_scientist','property_scientist', 'application_scientist', 'synthesis_scientist', 'scheme_critic', 'Outer_Generate_Admin']:
group_name = 'Generator'
if current_agent in ['scheme_converter','converter_critic', 'mergrid_ploter', 'scheme_code_writer', 'scheme_code_critic', 'Outer_Converter_Admin']:
group_name = 'Converter'
if current_agent in ['experiment_executor','expriment_code_writer', 'data_collector', 'collector_code_writer', 'Outer_Executor_Admin']:
group_name = 'Executor'
if current_agent in ['analysis_executor','analysis_pl_uv', 'analysis_picturer', 'Experiment_Optimizer', 'optimizer_critic', 'Outer_Analysis_Admin']:
group_name = 'Optimizer'
if 'TERMINATE' in cleaned_string or (group_name != 'Retrieval' and len(cleaned_string) > 100):
continue
content = {"group_name": group_name, "agent_name": current_agent.replace("_", " ").title(), "content": cleaned_string}
await self.send(text_data=json.dumps(content))
# if "TERMINATE" in message:
# print(" - Received TERMINATE message. Exiting.", flush=True)
# break
def handle_message(self, iostream: IOWebsockets):
@@ -136,7 +135,7 @@ class ChatConsumer(AsyncWebsocketConsumer):
user = autogen.UserProxyAgent(
name="User",
chat_messages=None,
human_input_mode="NEVER",
human_input_mode="ALWAYS",
code_execution_config={
"work_dir": WORK_DIR,
"use_docker": False,
@@ -178,7 +177,7 @@ class ChatConsumer(AsyncWebsocketConsumer):
optimizer],
messages=[],
speaker_selection_method=state_transition,
max_round=50,
max_round=60,
)
matagent_admin_name = "Planer"
@@ -226,6 +225,200 @@ class ChatConsumer(AsyncWebsocketConsumer):
cache=False
)
return iostream
# class ChatConsumer(AsyncWebsocketConsumer):
# async def connect(self):
# await self.accept()
# async def disconnect(self, close_code):
# pass
# async def receive(self, text_data):
# # 在这里处理接收到的消息
# print(f"Received message: {text_data}")
# # 异步启动 WebSocket 连接和处理过程
# await self.handle_websocket_connection(text_data)
# async def handle_websocket_connection(self, text_data):
# # 启动独立的 WebSocket 服务器,使用异步线程避免阻塞
# uri = "ws://localhost:8765" # 示例 URI可以替换为你的 URI
# # 启动 WebSocket 服务器在后台线程中运行
# await asyncio.to_thread(self.run_websocket_server, uri, text_data)
# def run_websocket_server(self, uri, text_data):
# """
# 这是一个同步方法,负责启动 IOWebsockets 的服务器并处理消息
# 使用 asyncio.to_thread() 将其异步化
# """
# with IOWebsockets.run_server_in_thread(on_connect=self.handle_message, port=8765) as uri:
# print(f" - test_setup() with websocket server running on {uri}.", flush=True)
# with ws_connect(uri) as websocket:
# print(f" - Connected to server on {uri}", flush=True)
# print(" - Sending message to server.", flush=True)
# print(text_data)
# json_data = json.loads(json.loads(text_data))
# chat_id = json_data['chat_id']
# websocket.send(json_data['message'])
# import re
# current_agent = "User"
# while True:
# message = websocket.recv()
# message = message.decode("utf-8") if isinstance(message, bytes) else message
# print(message, end="", flush=True)
# cleaned_string = re.sub(r'\x1b\[[0-?]*[ -/]*[@-~]', '', message)
# cleaned_string = cleaned_string.replace('\n>>>>>>>> USING AUTO REPLY...', '').replace('\n>>>>>>>> ', '')
# if "Next speaker" in cleaned_string:
# match = re.search(r"Next\s+speaker:\s+(\w+)", cleaned_string)
# current_agent = match.group(1)
# else:
# if cleaned_string == "\n--------------------------------------------------------------------------------\n":
# continue
# if cleaned_string == "\n********************************************************************************\n":
# continue
# if cleaned_string == "Starting a new chat....\n":
# continue
# if cleaned_string == "\n>>>>>>>> USING AUTO REPLY...\n":
# continue
# match = re.findall(r"(\w+)\s*\(to\s+(\w+)\)", cleaned_string)
# if len(match)==1:
# continue
# if current_agent in ['Outer_Retrieval_Admin', 'Outer_Generate_Admin', 'Outer_Converter_Admin']:
# current_agent = current_agent.replace('Outer_', '')
# if current_agent in ['vector_code_executor']:
# continue
# if current_agent == 'User':
# group_name = 'Planner'
# if current_agent in ['vector_searcher','vector_code_executor', 'graphrag_searcher', 'graphrag_code_executor', 'web_searcher', 'web_summary', 'Outer_Retrieval_Admin']:
# group_name = 'Retrieval'
# if current_agent in ['structure_scientist','property_scientist', 'application_scientist', 'synthesis_scientist', 'scheme_critic', 'Outer_Generate_Admin']:
# group_name = 'Generator'
# if current_agent in ['scheme_converter','converter_critic', 'mergrid_ploter', 'scheme_code_writer', 'scheme_code_critic', 'Outer_Converter_Admin']:
# group_name = 'Converter'
# if current_agent in ['experiment_executor','expriment_code_writer', 'data_collector', 'collector_code_writer', 'Outer_Executor_Admin']:
# group_name = 'Executor'
# if current_agent in ['analysis_executor','analysis_pl_uv', 'analysis_picturer', 'Experiment_Optimizer', 'optimizer_critic', 'Outer_Analysis_Admin']:
# group_name = 'Optimizer'
# if 'TERMINATE' in cleaned_string or (group_name != 'Retrieval' and len(cleaned_string) > 100):
# continue
# content = {"group_name": group_name, "agent_name": current_agent.replace("_", " ").title(), "content": cleaned_string}
# # 通过 WebSocket 消费者发送消息
# asyncio.run(self.send(text_data=json.dumps(content)))
# # if "TERMINATE" in message:
# # print(" - Received TERMINATE message. Exiting.", flush=True)
# # break
# def handle_message(self, iostream: IOWebsockets):
# initial_msg = iostream.input()
# agent_configs = load_agent_configs(
# os.path.join(os.path.dirname(os.path.abspath(__file__)), "config/plan_group.yaml"))
# user = autogen.UserProxyAgent(
# name="User",
# chat_messages=None,
# human_input_mode="NEVER",
# code_execution_config={
# "work_dir": WORK_DIR,
# "use_docker": False,
# },
# is_termination_msg=lambda x: x.get("content", "").find("TERMINATE") >= 0,
# description="User",
# )
# inner_retrieval_admin, outer_retrieval_agent = init_retrieval_group(WORK_DIR, venv_context)
# inner_generate_admin, outer_generate_agent = init_generate_group(outer_retrieval_agent, inner_retrieval_admin)
# inner_converter_admin, outer_converter_agent = init_converter_group()
# inner_executor_admin, outer_executor_agent = init_executor_group(WORK_DIR, venv_context)
# inner_analysis_admin, outer_analysis_agent, optimizer = init_optimize_group(WORK_DIR, venv_context)
# def state_transition(last_speaker, groupchat):
# messages = groupchat.messages
# if last_speaker is user:
# if len(messages) <= 1:
# return outer_generate_agent
# else:
# return "auto"
# elif last_speaker is outer_generate_agent:
# if "synthesis" in messages[-1]["content"].lower():
# return outer_converter_agent
# else:
# return user
# elif last_speaker is outer_converter_agent:
# return outer_executor_agent
# elif last_speaker is outer_executor_agent:
# return outer_analysis_agent
# elif last_speaker is outer_analysis_agent:
# return optimizer
# else:
# return user
# matagent_group = autogen.GroupChat(
# agents=[user, outer_generate_agent, outer_converter_agent, outer_executor_agent, outer_analysis_agent,
# optimizer],
# messages=[],
# speaker_selection_method=state_transition,
# max_round=50,
# )
# matagent_admin_name = "Planer"
# matagent_admin = autogen.GroupChatManager(
# name=matagent_admin_name,
# groupchat=matagent_group,
# # is_termination_msg=lambda x: x.get("content", "").find("TERMINATE") >= 0,
# llm_config=llm_config,
# system_message=agent_configs[matagent_admin_name]['system_message'],
# description=matagent_admin_name
# )
# outer_generate_agent.register_nested_chats(
# [
# {"recipient": inner_generate_admin, "max_turn": 1, "summary_method": "last_msg", "silent": SILENT},
# ],
# trigger=matagent_admin,
# #reply_func_from_nested_chats =
# )
# outer_converter_agent.register_nested_chats(
# [
# {"recipient": inner_converter_admin, "max_turn": 1, "summary_method": "last_msg", "silent": SILENT},
# ],
# trigger=matagent_admin,
# )
# outer_executor_agent.register_nested_chats(
# [
# {"recipient": inner_executor_admin, "max_turn": 1, "summary_method": "last_msg", "silent": SILENT},
# ],
# trigger=matagent_admin,
# )
# outer_analysis_agent.register_nested_chats(
# [
# {"recipient": inner_analysis_admin, "max_turn": 1, "summary_method": "last_msg", "silent": SILENT},
# ],
# trigger=matagent_admin,
# )
# user.initiate_chat(
# matagent_admin,
# message=initial_msg, # "如何在常温条件下制备CsPbBr3纳米立方体",
# cache=False
# )
# return iostream