210 lines
6.8 KiB
Python
210 lines
6.8 KiB
Python
import socket
|
||
import cv2
|
||
import numpy as np
|
||
import json
|
||
import struct
|
||
import time
|
||
import re
|
||
def getseeds(image, dislen=89.01):
|
||
frame = image
|
||
# 将图像从BGR转换为HSV
|
||
hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
|
||
|
||
# 定义橘红色在HSV颜色空间中的范围
|
||
# 色调(H):橘红色大致在0-20之间
|
||
# 饱和度(S):通常较高的值表示更纯的颜色
|
||
# 亮度(V):根据具体需求调整,这里我们选择较低的阈值以包含较暗的部分
|
||
lower_orange_red = np.array([0, 100, 80])
|
||
upper_orange_red = np.array([50, 255, 255])
|
||
|
||
# 创建掩码
|
||
mask = cv2.inRange(hsv, lower_orange_red, upper_orange_red)
|
||
|
||
# 找到符合条件的像素坐标
|
||
orange_red_points = cv2.findNonZero(mask)
|
||
|
||
frameshow = frame.copy()
|
||
# 创建一个与原图大小相同的空白掩码
|
||
mask = np.zeros_like(frame[:, :, 0]) # 单通道掩码
|
||
|
||
if orange_red_points is not None:
|
||
for point in orange_red_points:
|
||
x, y = point[0]
|
||
cv2.circle(frameshow, (x, y), 1, (0, 255, 0), -1) # 使用绿色标记点
|
||
cv2.circle(mask, (x, y), 1, 255, -1) # 使用白色标记点
|
||
|
||
# 查找轮廓
|
||
contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
|
||
|
||
if contours:
|
||
for max_contour in contours:
|
||
# 计算最小边界矩形(旋转)
|
||
rect = cv2.minAreaRect(max_contour)
|
||
box = cv2.boxPoints(rect)
|
||
box = np.int32(box)
|
||
|
||
# 计算边界框的宽度和高度
|
||
width = int(rect[1][0])
|
||
height = int(rect[1][1])
|
||
|
||
if dislen > 20 and rect[1][0] > 25 and rect[1][1] > 25:
|
||
ww = rect[1][0] * 3.0 / dislen
|
||
hh = rect[1][1] * 3.0 / dislen
|
||
# 格式化输出保留两位有效数字
|
||
ww_text = f"w=: {ww:.2f}"
|
||
hh_text = f"h=: {hh:.2f}"
|
||
|
||
# 计算中心点
|
||
x = int(np.mean(box[:, 0]))
|
||
y = int(np.mean(box[:, 1]))
|
||
|
||
# 设置字体、缩放比例、颜色和厚度
|
||
font = cv2.FONT_HERSHEY_SIMPLEX
|
||
font_scale = 2
|
||
color = (0, 0, 255) # BGR颜色格式,这里是蓝色
|
||
thickness = 2
|
||
|
||
# 显示宽度信息
|
||
cv2.putText(frameshow, ww_text, (x, y-10), font, font_scale, color, thickness)
|
||
|
||
# 显示高度信息
|
||
cv2.putText(frameshow, hh_text, (x, y - 40), font, font_scale, color, thickness)
|
||
|
||
# 在原图上绘制边界框
|
||
cv2.drawContours(frameshow, [box], 0, (0, 0, 255), 2)
|
||
|
||
try:
|
||
ww_text
|
||
hh_text
|
||
except NameError:
|
||
ww_text = "w=: 0.00"
|
||
hh_text = "h=: 0.00"
|
||
|
||
return ww_text,hh_text,frameshow
|
||
|
||
def extract_number_from_string(value_str):
|
||
"""从字符串中提取数字部分"""
|
||
try:
|
||
# 如果已经是数字类型,直接返回
|
||
if isinstance(value_str, (int, float)):
|
||
return float(value_str)
|
||
|
||
# 如果是字符串,使用正则表达式提取数字
|
||
if isinstance(value_str, str):
|
||
# 匹配数字(包括小数)
|
||
match = re.search(r'\d+\.?\d*', value_str)
|
||
if match:
|
||
return float(match.group())
|
||
else:
|
||
print(f"无法从字符串 '{value_str}' 中提取数字")
|
||
return 0.0
|
||
|
||
# 其他情况返回0
|
||
return 0.0
|
||
except Exception as e:
|
||
print(f"提取数字时出错: {e}")
|
||
return 0.0
|
||
|
||
def receive_image(sock):
|
||
"""从socket接收图像数据"""
|
||
try:
|
||
# 首先接收图像数据长度
|
||
raw_msglen = recvall(sock, 4)
|
||
if not raw_msglen:
|
||
return None
|
||
msglen = struct.unpack('>I', raw_msglen)[0]
|
||
print(f"准备接收图像,大小: {msglen} 字节")
|
||
|
||
# 接收图像数据
|
||
raw_data = recvall(sock, msglen)
|
||
if not raw_data:
|
||
return None
|
||
|
||
# 将字节数组转换为numpy数组,然后解码为图像
|
||
nparr = np.frombuffer(raw_data, np.uint8)
|
||
img = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
|
||
return img
|
||
except Exception as e:
|
||
print(f"接收图像时发生错误: {e}")
|
||
return None
|
||
|
||
def send_image(sock, img, width, height):
|
||
"""通过socket发送图像数据"""
|
||
try:
|
||
# 将图像编码为jpg格式
|
||
_, encoded_img = cv2.imencode('.jpg', img, [cv2.IMWRITE_JPEG_QUALITY, 80])
|
||
data = encoded_img.tobytes()
|
||
|
||
width_float = extract_number_from_string(width)
|
||
height_float = extract_number_from_string(height)
|
||
|
||
# 发送数据长度
|
||
sock.sendall(struct.pack('>I', len(data)))
|
||
# 发送width
|
||
sock.sendall(struct.pack('>f', float(width_float)))
|
||
# 发送height
|
||
sock.sendall(struct.pack('>f', float(height_float)))
|
||
# 发送图像数据
|
||
sock.sendall(data)
|
||
|
||
return True
|
||
except Exception as e:
|
||
print(f"发送图像时发生错误: {e}")
|
||
return False
|
||
|
||
def recvall(sock, n):
|
||
"""辅助函数,确保接收到n个字节的数据"""
|
||
data = bytearray()
|
||
while len(data) < n:
|
||
packet = sock.recv(n - len(data))
|
||
if not packet:
|
||
return None
|
||
data.extend(packet)
|
||
return data
|
||
|
||
def main():
|
||
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||
|
||
# 绑定IP和端口
|
||
HOST = '127.0.0.1'
|
||
PORT = 8888
|
||
server_socket.bind((HOST, PORT))
|
||
server_socket.listen(1)
|
||
|
||
print(f"Python图像检测服务器启动,监听 {HOST}:{PORT}")
|
||
|
||
while True:
|
||
try:
|
||
# 等待客户端连接
|
||
client_socket, addr = server_socket.accept()
|
||
print(f"客户端连接来自: {addr}")
|
||
|
||
while True:
|
||
# 接收图像
|
||
image = receive_image(client_socket)
|
||
if image is None:
|
||
print("客户端断开连接")
|
||
break
|
||
|
||
print(f"接收到图像,大小: {image.shape}")
|
||
|
||
# 进行图像检测处理
|
||
dislen = 147.50 # 固定值,也可以根据需要调整
|
||
width, height, processed_image = getseeds(image, dislen)
|
||
|
||
# 发送处理结果
|
||
if not send_image(client_socket, processed_image, width, height):
|
||
print("发送结果图像失败")
|
||
break
|
||
|
||
print("结果图像发送成功")
|
||
|
||
except Exception as e:
|
||
print(f"处理客户端连接时发生错误: {e}")
|
||
finally:
|
||
client_socket.close()
|
||
|
||
if __name__ == "__main__":
|
||
main()
|