import socket import cv2 import numpy as np import json import struct import time import re def getseeds(image, dislen=89.01): frame = image # 将图像从BGR转换为HSV hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV) # 定义橘红色在HSV颜色空间中的范围 # 色调(H):橘红色大致在0-20之间 # 饱和度(S):通常较高的值表示更纯的颜色 # 亮度(V):根据具体需求调整,这里我们选择较低的阈值以包含较暗的部分 lower_orange_red = np.array([0, 100, 80]) upper_orange_red = np.array([50, 255, 255]) # 创建掩码 mask = cv2.inRange(hsv, lower_orange_red, upper_orange_red) # 找到符合条件的像素坐标 orange_red_points = cv2.findNonZero(mask) frameshow = frame.copy() # 创建一个与原图大小相同的空白掩码 mask = np.zeros_like(frame[:, :, 0]) # 单通道掩码 if orange_red_points is not None: for point in orange_red_points: x, y = point[0] cv2.circle(frameshow, (x, y), 1, (0, 255, 0), -1) # 使用绿色标记点 cv2.circle(mask, (x, y), 1, 255, -1) # 使用白色标记点 # 查找轮廓 contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) if contours: for max_contour in contours: # 计算最小边界矩形(旋转) rect = cv2.minAreaRect(max_contour) box = cv2.boxPoints(rect) box = np.int32(box) # 计算边界框的宽度和高度 width = int(rect[1][0]) height = int(rect[1][1]) if dislen > 20 and rect[1][0] > 25 and rect[1][1] > 25: ww = rect[1][0] * 3.0 / dislen hh = rect[1][1] * 3.0 / dislen # 格式化输出保留两位有效数字 ww_text = f"w=: {ww:.2f}" hh_text = f"h=: {hh:.2f}" # 计算中心点 x = int(np.mean(box[:, 0])) y = int(np.mean(box[:, 1])) # 设置字体、缩放比例、颜色和厚度 font = cv2.FONT_HERSHEY_SIMPLEX font_scale = 2 color = (0, 0, 255) # BGR颜色格式,这里是蓝色 thickness = 2 # 显示宽度信息 cv2.putText(frameshow, ww_text, (x, y-10), font, font_scale, color, thickness) # 显示高度信息 cv2.putText(frameshow, hh_text, (x, y - 40), font, font_scale, color, thickness) # 在原图上绘制边界框 cv2.drawContours(frameshow, [box], 0, (0, 0, 255), 2) try: ww_text hh_text except NameError: ww_text = "w=: 0.00" hh_text = "h=: 0.00" return ww_text,hh_text,frameshow def extract_number_from_string(value_str): """从字符串中提取数字部分""" try: # 如果已经是数字类型,直接返回 if isinstance(value_str, (int, float)): return float(value_str) # 如果是字符串,使用正则表达式提取数字 if isinstance(value_str, str): # 匹配数字(包括小数) match = re.search(r'\d+\.?\d*', value_str) if match: return float(match.group()) else: print(f"无法从字符串 '{value_str}' 中提取数字") return 0.0 # 其他情况返回0 return 0.0 except Exception as e: print(f"提取数字时出错: {e}") return 0.0 def receive_image(sock): """从socket接收图像数据""" try: # 首先接收图像数据长度 raw_msglen = recvall(sock, 4) if not raw_msglen: return None msglen = struct.unpack('>I', raw_msglen)[0] print(f"准备接收图像,大小: {msglen} 字节") # 接收图像数据 raw_data = recvall(sock, msglen) if not raw_data: return None # 将字节数组转换为numpy数组,然后解码为图像 nparr = np.frombuffer(raw_data, np.uint8) img = cv2.imdecode(nparr, cv2.IMREAD_COLOR) return img except Exception as e: print(f"接收图像时发生错误: {e}") return None def send_image(sock, img, width, height): """通过socket发送图像数据""" try: # 将图像编码为jpg格式 _, encoded_img = cv2.imencode('.jpg', img, [cv2.IMWRITE_JPEG_QUALITY, 80]) data = encoded_img.tobytes() width_float = extract_number_from_string(width) height_float = extract_number_from_string(height) # 发送数据长度 sock.sendall(struct.pack('>I', len(data))) # 发送width sock.sendall(struct.pack('>f', float(width_float))) # 发送height sock.sendall(struct.pack('>f', float(height_float))) # 发送图像数据 sock.sendall(data) return True except Exception as e: print(f"发送图像时发生错误: {e}") return False def recvall(sock, n): """辅助函数,确保接收到n个字节的数据""" data = bytearray() while len(data) < n: packet = sock.recv(n - len(data)) if not packet: return None data.extend(packet) return data def main(): server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # 绑定IP和端口 HOST = '127.0.0.1' PORT = 8888 server_socket.bind((HOST, PORT)) server_socket.listen(1) print(f"Python图像检测服务器启动,监听 {HOST}:{PORT}") while True: try: # 等待客户端连接 client_socket, addr = server_socket.accept() print(f"客户端连接来自: {addr}") while True: # 接收图像 image = receive_image(client_socket) if image is None: print("客户端断开连接") break print(f"接收到图像,大小: {image.shape}") # 进行图像检测处理 dislen = 147.50 # 固定值,也可以根据需要调整 width, height, processed_image = getseeds(image, dislen) # 发送处理结果 if not send_image(client_socket, processed_image, width, height): print("发送结果图像失败") break print("结果图像发送成功") except Exception as e: print(f"处理客户端连接时发生错误: {e}") finally: client_socket.close() if __name__ == "__main__": main()