当前版本

This commit is contained in:
2025-02-16 16:49:36 +08:00
parent 97c13d11fd
commit f912c98e16
7 changed files with 347 additions and 54 deletions

View File

@@ -2,19 +2,35 @@ import json
import logging
import os
from typing import Any, Awaitable, Callable, Optional, Sequence
import uuid
import aiofiles
import yaml
import cv2
import base64
import asyncio
import numpy as np
import time
import subprocess
import ffmpeg
import io
from PIL import Image
from collections import deque
from fastapi import FastAPI, HTTPException, WebSocket, WebSocketDisconnect
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse
from fastapi.staticfiles import StaticFiles
from fastapi.responses import HTMLResponse
from aiortc import RTCPeerConnection, RTCSessionDescription, VideoStreamTrack
from aiortc.contrib.media import MediaRelay
from aiortc.contrib.media import MediaPlayer
from autogen_agentchat.agents import AssistantAgent, UserProxyAgent
from autogen_agentchat.base import TaskResult
from autogen_agentchat.messages import TextMessage, UserInputRequestedEvent
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_core import CancellationToken
from autogen_core.models import ChatCompletionClient
from fastapi import FastAPI, HTTPException, WebSocket, WebSocketDisconnect
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse
from fastapi.staticfiles import StaticFiles
from autogen_ext.models.openai import OpenAIChatCompletionClient
from autogen_agentchat.conditions import MaxMessageTermination, TextMentionTermination
from autogen_agentchat.teams import SelectorGroupChat, RoundRobinGroupChat
@@ -27,8 +43,27 @@ from analyst_team import create_analyst_team
from utils import load_agent_configs
logger = logging.getLogger(__name__)
relay = MediaRelay()
model_client = OpenAIChatCompletionClient(
model=MODEL,
base_url=OPENAI_BASE_URL,
api_key=OPENAI_API_KEY,
model_info={
"vision": True,
"function_calling": True,
"json_output": True,
"family": "unknown",
},
)
async def get_team(
user_input_func: Callable[[str, Optional[CancellationToken]], Awaitable[str]],
session_dir: str
) -> RoundRobinGroupChat | SelectorGroupChat:
# Create the team.
@@ -71,6 +106,7 @@ async def get_team(
selector_func=selector_func,
)
# Load state from file.
state_path = os.path.join(session_dir, "team_state.json")
if not os.path.exists(state_path):
return team
async with aiofiles.open(state_path, "r") as file:
@@ -79,7 +115,6 @@ async def get_team(
return team
logger = logging.getLogger(__name__)
app = FastAPI()
current_task = None # 用于跟踪当前任务
@@ -92,48 +127,58 @@ app.add_middleware(
allow_headers=["*"], # Allows all headers
)
model_config_path = "model_config.yaml"
state_path = "team_state.json"
history_path = "team_history.json"
# model_config_path = "model_config.yaml"
# state_path = "team_state.json"
# history_path = "team_history.json"
# Serve static files
app.mount("/static", StaticFiles(directory="."), name="static")
@app.get("/")
async def root():
"""Serve the chat interface HTML file."""
return FileResponse("app_team.html")
model_client = OpenAIChatCompletionClient(
model=MODEL,
base_url=OPENAI_BASE_URL,
api_key=OPENAI_API_KEY,
model_info={
"vision": True,
"function_calling": True,
"json_output": True,
"family": "unknown",
},
)
async def get_history() -> list[dict[str, Any]]:
"""Get chat history from file."""
if not os.path.exists(history_path):
async def get_session_history(session_dir: str) -> list[dict[str, Any]]:
"""Get chat history from file using UUID."""
session_history_path = os.path.join(session_dir, "team_history.json")
if not os.path.exists(session_history_path):
return []
async with aiofiles.open(history_path, "r") as file:
return json.loads(await file.read())
async with aiofiles.open(session_history_path, "r") as file:
content = await file.read()
if content:
return json.loads(content)
else:
return []
@app.get("/history")
async def history() -> list[dict[str, Any]]:
@app.websocket("/history/{session_uuid}")
async def history(websocket: WebSocket) -> list[dict[str, Any]]:
await websocket.accept()
data = await websocket.receive_json()
session_uuid = data["uuid"]
try:
return await get_history()
session_history = await get_session_history(session_uuid)
await websocket.send_json(session_history)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e)) from e
@app.websocket("/sessions")
async def sessions(websocket: WebSocket) -> list[dict[str, str]]:
"""Get all history UUIDs and their main content."""
await websocket.accept()
cur_path = os.path.dirname(os.path.abspath(__file__))
history_dir = os.path.join(cur_path, "history")
session_data = []
for dir_name in os.listdir(history_dir):
session_dir = os.path.join(history_dir, dir_name)
if os.path.isdir(session_dir): # Check if it's a directory
try:
history = await get_session_history(session_dir)
main_content = history[0]["content"] if history and "content" in history[0] else ""
session_data.append({"uuid": dir_name, "content": main_content})
except Exception as e:
print(f"Error reading history for {dir_name}: {e}") # Log the error but continue
await websocket.send_json(session_data)
@app.websocket("/ws/chat")
async def chat(websocket: WebSocket):
@@ -142,22 +187,35 @@ async def chat(websocket: WebSocket):
# User input function used by the team.
async def _user_input(prompt: str, cancellation_token: CancellationToken | None) -> str:
data = await websocket.receive_json()
# message = TextMessage.model_validate(data)
# return message.content
return data['content']
try:
while True:
# Get user message.
data = await websocket.receive_json()
# request = TextMessage.model_validate(data)
request = data['content']
if 'session_uuid' not in data:
# New session
request = TextMessage.model_validate(data)
# request = data['content']
session_uuid = str(uuid.uuid4()) # Initialize a unique UUID for each session
cur_path = os.path.dirname(os.path.abspath(__file__))
session_dir = os.path.join(cur_path, "history", session_uuid) # Directory for session states
os.makedirs(session_dir, exist_ok=True) # ensure the directory is created.
history = []
else:
session_uuid = data['session_uuid']
cur_path = os.path.dirname(os.path.abspath(__file__))
session_dir = os.path.join(cur_path, "history", session_uuid) # Directory for session states
history = await get_session_history(session_dir)
new_data = {k: v for k, v in data.items() if k != "session_uuid"}
request = TextMessage.model_validate(new_data)
request = history + request
try:
# Get the team and respond to the message.
team = await get_team(_user_input)
history = await get_history()
team = await get_team(_user_input, session_dir)
stream = team.run_stream(task=request)
async for message in stream:
if isinstance(message, TaskResult):
@@ -165,17 +223,18 @@ async def chat(websocket: WebSocket):
print(f"----------------{message.source}----------------\n {message.content}")
await websocket.send_json(message.model_dump())
if not isinstance(message, UserInputRequestedEvent):
# Don't save user input events to history.
history.append(message.model_dump())
# Save team state to file.
async with aiofiles.open(state_path, "w") as file:
state = await team.save_state()
await file.write(json.dumps(state))
# Save chat history to file.
async with aiofiles.open(history_path, "w") as file:
await file.write(json.dumps(history))
# Save chat history to file.
session_history_path = os.path.join(session_dir, "team_history.json")
async with aiofiles.open(session_history_path, "w") as file:
await file.write(json.dumps(history))
# # Save team state to file.
# session_state_path = os.path.join(session_dir, "team_state.json")
# async with aiofiles.open(session_state_path, "w") as file:
# state = await team.save_state()
# await file.write(json.dumps(state))
except Exception as e:
# Send error message to client
@@ -206,6 +265,195 @@ async def chat(websocket: WebSocket):
pass
# 设置缓冲区大小
BUFFER_SIZE = 10
frame_buffer = deque(maxlen=BUFFER_SIZE)
RTSP_STREAMS = {
"camera1": "rtsp://admin:@192.168.1.13:554/live",
"camera2": "rtsp://admin:@192.168.1.10:554/live",
}
# @app.websocket("/video_stream/{camera_id}")
# async def websocket_endpoint(websocket: WebSocket, camera_id: str):
# await websocket.accept()
# cap = cv2.VideoCapture(RTSP_STREAMS[camera_id])
# fps = 15
# cap.set(cv2.CAP_PROP_FPS, 15)
# frame_interval = 1 / fps if fps > 0 else 0.1 # 计算帧间隔时间
# async def capture_frames():
# while True:
# ret, frame = cap.read()
# if not ret:
# break
# _, buffer = cv2.imencode('.jpg', frame)
# jpeg_frame = base64.b64encode(buffer).decode('utf-8')
# frame_buffer.append(jpeg_frame)
# await asyncio.sleep(frame_interval)
# async def send_frames():
# while True:
# if frame_buffer:
# frame = frame_buffer.popleft()
# await websocket.send_text(frame)
# await asyncio.sleep(frame_interval)
# # 启动帧捕获和发送任务
# capture_task = asyncio.create_task(capture_frames())
# send_task = asyncio.create_task(send_frames())
# try:
# await asyncio.gather(capture_task, send_task)
# except asyncio.CancelledError:
# pass
# finally:
# cap.release()
frame_buffer = deque()
@app.websocket("/video_stream/{camera_id}")
async def websocket_endpoint(websocket: WebSocket, camera_id: str):
await websocket.accept()
rtsp_url = RTSP_STREAMS.get(camera_id)
if not rtsp_url:
await websocket.close(code=4001)
return
fps = 15
frame_interval = 1 / fps
frame_buffer = deque() # 每个连接独立的队列
# 启动FFmpeg进程
process = (
ffmpeg
.input(rtsp_url, rtsp_transport='tcp')
.output('pipe:', format='image2pipe', vcodec='mjpeg', r=fps)
.run_async(pipe_stdout=True, pipe_stderr=True, quiet=True)
)
async def capture_frames():
"""读取并分割完整的JPEG帧"""
buffer = b''
loop = asyncio.get_event_loop()
while True:
# 异步读取防止阻塞
data = await loop.run_in_executor(None, process.stdout.read, 4096)
if not data:
break
buffer += data
# 提取完整帧
while True:
start = buffer.find(b'\xff\xd8') # JPEG开始标记
if start == -1:
break
end = buffer.find(b'\xff\xd9', start + 2) # JPEG结束标记
if end == -1:
break
# 提取帧数据并编码
frame_data = buffer[start:end+2]
frame = base64.b64encode(frame_data).decode('utf-8')
frame_buffer.append(frame)
buffer = buffer[end+2:] # 移除已处理数据
# 清理FFmpeg进程
process.kill()
async def send_frames():
"""发送帧到客户端"""
while True:
if frame_buffer:
frame = frame_buffer.popleft()
await websocket.send_text(frame)
await asyncio.sleep(frame_interval)
# 启动任务并处理退出
capture_task = asyncio.create_task(capture_frames())
send_task = asyncio.create_task(send_frames())
try:
await asyncio.gather(capture_task, send_task)
except asyncio.CancelledError:
capture_task.cancel()
send_task.cancel()
await asyncio.gather(capture_task, send_task, return_exceptions=True)
finally:
process.kill()
await websocket.close()
@app.get("/")
async def debug_page():
return HTMLResponse("""
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>RTSP Stream</title>
<style>
#video {
width: 100%;
height: auto;
}
</style>
</head>
<body>
<!-- 视频流 1 的画布 -->
<canvas id="canvas1"></canvas>
<script>
const canvas1 = document.getElementById('canvas1');
const ctx1 = canvas1.getContext('2d');
const img1 = new Image();
img1.onload = () => {
ctx1.drawImage(img1, 0, 0, canvas1.width, canvas1.height);
};
const ws1 = new WebSocket('ws://100.85.52.31:8000/video_stream/camera1');
ws1.onmessage = (event) => {
img1.src = 'data:image/jpeg;base64,' + event.data;
};
// 动态调整 Canvas 尺寸
function resizeCanvas1() {
canvas1.width = 720; // 设置宽度为屏幕的一半
canvas1.height = 480; // 设置高度为屏幕的一半
}
resizeCanvas1();
window.addEventListener('resize', resizeCanvas1);
</script>
<!-- 视频流 2 的画布 -->
<canvas id="canvas2"></canvas>
<script>
const canvas2 = document.getElementById('canvas2');
const ctx2 = canvas2.getContext('2d');
const img2 = new Image();
img2.onload = () => {
ctx2.drawImage(img2, 0, 0, canvas2.width, canvas2.height);
};
const ws2 = new WebSocket('ws://100.85.52.31:8000/video_stream/camera2');
ws2.onmessage = (event) => {
img2.src = 'data:image/jpeg;base64,' + event.data;
};
// 动态调整 Canvas 尺寸
function resizeCanvas2() {
canvas2.width = 720; // 设置宽度为屏幕的一半
canvas2.height = 480; // 设置高度为屏幕的一半
}
resizeCanvas2();
window.addEventListener('resize', resizeCanvas2);
</script>
</body>
</html>
""")
# Example usage
if __name__ == "__main__":
import uvicorn

View File

@@ -5,8 +5,8 @@ from autogen_ext.code_executors.docker import DockerCommandLineCodeExecutor
# Define your API keys and configurations
OPENAI_API_KEY = "sk-4aJj5ygdQ9rw6lS6920712Ef9bB848439522E72318439eCd"
# OPENAI_BASE_URL = "http://47.239.94.171:17935/v1"
OPENAI_BASE_URL = "https://vip.apiyi.com/v1"
OPENAI_BASE_URL = "http://154.44.26.195:17935/v1"
# OPENAI_BASE_URL = "https://vip.apiyi.com/v1"
# MODEL = "chatgpt-4o-latest"
MODEL = "gpt-4o-2024-11-20"

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1 @@
[{"source": "user", "models_usage": null, "content": "Let the robot synthesize CsPbBr3 nanocubes at room temperature", "type": "TextMessage"}]

View File

@@ -0,0 +1 @@
[{"source": "user", "models_usage": null, "content": "Let the robot synthesize CsPbBr3 nanocubes at room temperature", "type": "TextMessage"}]

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1,41 @@
{
"server": {
"debug": true,
"log_level": "info",
"http_demo": true,
"http_debug": false,
"http_login": "demo",
"http_password": "demo",
"http_port": ":8083",
"ice_servers": [
"stun:stun.xten.com:3478"
],
"rtsp_port": ":5541"
},
"streams": {
"demo1": {
"name": "test video stream 1",
"channels": {
"0": {
"name": "ch1",
"url": "rtsp://admin:@192.168.1.13:554/live",
"on_demand": true,
"debug": true,
"audio": true,
"status": 0
},
"1": {
"name": "ch2",
"url": "rtsp://admin:@192.168.1.10:554/live",
"on_demand": true,
"debug": true,
"audio": true,
"status": 0
}
}
}
},
"channel_defaults": {
"on_demand": true
}
}