Files
backend/seed_detection_server.py
2025-07-01 18:06:39 +08:00

210 lines
6.8 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import socket
import cv2
import numpy as np
import json
import struct
import time
import re
def getseeds(image, dislen=89.01):
frame = image
# 将图像从BGR转换为HSV
hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
# 定义橘红色在HSV颜色空间中的范围
# 色调(H)橘红色大致在0-20之间
# 饱和度(S):通常较高的值表示更纯的颜色
# 亮度(V):根据具体需求调整,这里我们选择较低的阈值以包含较暗的部分
lower_orange_red = np.array([0, 100, 80])
upper_orange_red = np.array([50, 255, 255])
# 创建掩码
mask = cv2.inRange(hsv, lower_orange_red, upper_orange_red)
# 找到符合条件的像素坐标
orange_red_points = cv2.findNonZero(mask)
frameshow = frame.copy()
# 创建一个与原图大小相同的空白掩码
mask = np.zeros_like(frame[:, :, 0]) # 单通道掩码
if orange_red_points is not None:
for point in orange_red_points:
x, y = point[0]
cv2.circle(frameshow, (x, y), 1, (0, 255, 0), -1) # 使用绿色标记点
cv2.circle(mask, (x, y), 1, 255, -1) # 使用白色标记点
# 查找轮廓
contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
if contours:
for max_contour in contours:
# 计算最小边界矩形(旋转)
rect = cv2.minAreaRect(max_contour)
box = cv2.boxPoints(rect)
box = np.int32(box)
# 计算边界框的宽度和高度
width = int(rect[1][0])
height = int(rect[1][1])
if dislen > 20 and rect[1][0] > 25 and rect[1][1] > 25:
ww = rect[1][0] * 3.0 / dislen
hh = rect[1][1] * 3.0 / dislen
# 格式化输出保留两位有效数字
ww_text = f"w=: {ww:.2f}"
hh_text = f"h=: {hh:.2f}"
# 计算中心点
x = int(np.mean(box[:, 0]))
y = int(np.mean(box[:, 1]))
# 设置字体、缩放比例、颜色和厚度
font = cv2.FONT_HERSHEY_SIMPLEX
font_scale = 2
color = (0, 0, 255) # BGR颜色格式这里是蓝色
thickness = 2
# 显示宽度信息
cv2.putText(frameshow, ww_text, (x, y-10), font, font_scale, color, thickness)
# 显示高度信息
cv2.putText(frameshow, hh_text, (x, y - 40), font, font_scale, color, thickness)
# 在原图上绘制边界框
cv2.drawContours(frameshow, [box], 0, (0, 0, 255), 2)
try:
ww_text
hh_text
except NameError:
ww_text = "w=: 0.00"
hh_text = "h=: 0.00"
return ww_text,hh_text,frameshow
def extract_number_from_string(value_str):
"""从字符串中提取数字部分"""
try:
# 如果已经是数字类型,直接返回
if isinstance(value_str, (int, float)):
return float(value_str)
# 如果是字符串,使用正则表达式提取数字
if isinstance(value_str, str):
# 匹配数字(包括小数)
match = re.search(r'\d+\.?\d*', value_str)
if match:
return float(match.group())
else:
print(f"无法从字符串 '{value_str}' 中提取数字")
return 0.0
# 其他情况返回0
return 0.0
except Exception as e:
print(f"提取数字时出错: {e}")
return 0.0
def receive_image(sock):
"""从socket接收图像数据"""
try:
# 首先接收图像数据长度
raw_msglen = recvall(sock, 4)
if not raw_msglen:
return None
msglen = struct.unpack('>I', raw_msglen)[0]
print(f"准备接收图像,大小: {msglen} 字节")
# 接收图像数据
raw_data = recvall(sock, msglen)
if not raw_data:
return None
# 将字节数组转换为numpy数组然后解码为图像
nparr = np.frombuffer(raw_data, np.uint8)
img = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
return img
except Exception as e:
print(f"接收图像时发生错误: {e}")
return None
def send_image(sock, img, width, height):
"""通过socket发送图像数据"""
try:
# 将图像编码为jpg格式
_, encoded_img = cv2.imencode('.jpg', img, [cv2.IMWRITE_JPEG_QUALITY, 80])
data = encoded_img.tobytes()
width_float = extract_number_from_string(width)
height_float = extract_number_from_string(height)
# 发送数据长度
sock.sendall(struct.pack('>I', len(data)))
# 发送width
sock.sendall(struct.pack('>f', float(width_float)))
# 发送height
sock.sendall(struct.pack('>f', float(height_float)))
# 发送图像数据
sock.sendall(data)
return True
except Exception as e:
print(f"发送图像时发生错误: {e}")
return False
def recvall(sock, n):
"""辅助函数确保接收到n个字节的数据"""
data = bytearray()
while len(data) < n:
packet = sock.recv(n - len(data))
if not packet:
return None
data.extend(packet)
return data
def main():
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# 绑定IP和端口
HOST = '127.0.0.1'
PORT = 8888
server_socket.bind((HOST, PORT))
server_socket.listen(1)
print(f"Python图像检测服务器启动监听 {HOST}:{PORT}")
while True:
try:
# 等待客户端连接
client_socket, addr = server_socket.accept()
print(f"客户端连接来自: {addr}")
while True:
# 接收图像
image = receive_image(client_socket)
if image is None:
print("客户端断开连接")
break
print(f"接收到图像,大小: {image.shape}")
# 进行图像检测处理
dislen = 147.50 # 固定值,也可以根据需要调整
width, height, processed_image = getseeds(image, dislen)
# 发送处理结果
if not send_image(client_socket, processed_image, width, height):
print("发送结果图像失败")
break
print("结果图像发送成功")
except Exception as e:
print(f"处理客户端连接时发生错误: {e}")
finally:
client_socket.close()
if __name__ == "__main__":
main()