second commit
This commit is contained in:
250
CrsLYL02025.py
Normal file
250
CrsLYL02025.py
Normal file
@@ -0,0 +1,250 @@
|
||||
|
||||
import json
|
||||
|
||||
import os
|
||||
import numpy as np
|
||||
import cv2
|
||||
import time
|
||||
|
||||
def set_camera_resolution(cap, width, height):
|
||||
"""
|
||||
设置摄像头的分辨率。
|
||||
|
||||
参数:
|
||||
- cap: cv2.VideoCapture 对象
|
||||
- width: 目标宽度
|
||||
- height: 目标高度
|
||||
"""
|
||||
# 设置摄像头的宽度和高度
|
||||
cap.set(cv2.CAP_PROP_FRAME_WIDTH, width)
|
||||
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height)
|
||||
|
||||
# 获取实际设置的宽度和高度
|
||||
actual_width = cap.get(cv2.CAP_PROP_FRAME_WIDTH)
|
||||
actual_height = cap.get(cv2.CAP_PROP_FRAME_HEIGHT)
|
||||
|
||||
print(f"设置的分辨率: 宽度={width}, 高度={height}")
|
||||
print(f"实际的分辨率: 宽度={actual_width}, 高度={actual_height}")
|
||||
|
||||
def show_mask(mask, ax, random_color=False):
|
||||
if random_color:
|
||||
color = np.concatenate([np.random.random(3), np.array([0.6])], axis=0)
|
||||
else:
|
||||
color = np.array([30/255, 144/255, 255/255, 0.6])
|
||||
h, w = mask.shape[-2:]
|
||||
mask_image = mask.reshape(h, w, 1) * color.reshape(1, 1, -1)
|
||||
ax.imshow(mask_image)
|
||||
|
||||
def findchesslen(image):
|
||||
# 转换为灰度图像
|
||||
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
|
||||
|
||||
# 棋盘格规格,例如8x8内部角点(根据你的实际棋盘规格修改)
|
||||
board_size = (3, 4) # 对于8x8的棋盘,内部角点数为7x7
|
||||
ret, corners = cv2.findChessboardCorners(gray, board_size, None)
|
||||
|
||||
frameshow = image.copy()
|
||||
cv2.drawChessboardCorners(frameshow, board_size, corners, ret)
|
||||
cv2.imshow('Chessboard Corners', frameshow)
|
||||
|
||||
if ret: # 如果成功找到角点
|
||||
distances = []
|
||||
|
||||
for row in range(board_size[1]): # 遍历每一行
|
||||
for col in range(board_size[0] - 1): # 对于每行中的每个角点,计算到下一个角点的距离
|
||||
idx = row * board_size[0] + col
|
||||
p1 = corners[idx][0]
|
||||
p2 = corners[idx + 1][0]
|
||||
distance = np.linalg.norm(p1 - p2)
|
||||
distances.append(distance)
|
||||
print(f"角点 {idx} 到 {idx+1} 的距离: {distance:.2f} 像素")
|
||||
|
||||
for col in range(board_size[0]): # 遍历每一列
|
||||
for row in range(board_size[1] - 1): # 对于每列中的每个角点,计算到下一个角点的距离
|
||||
idx = row * board_size[0] + col
|
||||
p1 = corners[idx][0]
|
||||
p2 = corners[idx + board_size[0]][0]
|
||||
distance = np.linalg.norm(p1 - p2)
|
||||
distances.append(distance)
|
||||
print(f"角点 {idx} 到 {idx+board_size[0]} 的距离: {distance:.2f} 像素")
|
||||
|
||||
# 计算平均边长
|
||||
avg_distance = np.mean(distances)
|
||||
return avg_distance
|
||||
else:
|
||||
return -1
|
||||
|
||||
|
||||
def getseeds(image,dislen=89.01):
|
||||
frame = image
|
||||
# 将图像从BGR转换为HSV
|
||||
hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
|
||||
|
||||
# 定义橘红色在HSV颜色空间中的范围
|
||||
# 色调(H):橘红色大致在0-20之间
|
||||
# 饱和度(S):通常较高的值表示更纯的颜色
|
||||
# 亮度(V):根据具体需求调整,这里我们选择较低的阈值以包含较暗的部分
|
||||
lower_orange_red = np.array([0, 180, 80])
|
||||
upper_orange_red = np.array([30, 255, 195])
|
||||
|
||||
# 创建掩码
|
||||
mask = cv2.inRange(hsv, lower_orange_red, upper_orange_red)
|
||||
|
||||
# 找到符合条件的像素坐标
|
||||
orange_red_points = cv2.findNonZero(mask)
|
||||
|
||||
frameshow = frame.copy()
|
||||
# 创建一个与原图大小相同的空白掩码
|
||||
mask = np.zeros_like(frame[:, :, 0]) # 单通道掩码
|
||||
|
||||
if orange_red_points is not None:
|
||||
for point in orange_red_points:
|
||||
x, y = point[0]
|
||||
# 在原图上画圆标记这些点
|
||||
cv2.circle(frameshow, (x, y), 1, (0, 255, 0), -1) # 使用绿色标记点
|
||||
cv2.circle(mask, (x, y), 1, 255, -1) # 使用白色标记点
|
||||
|
||||
# 查找轮廓
|
||||
contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
|
||||
|
||||
if contours:
|
||||
for max_contour in contours:
|
||||
# 找到面积最大的轮廓
|
||||
# max_contour = max(contours, key=cv2.contourArea)
|
||||
|
||||
# 计算最小边界矩形(旋转)
|
||||
rect = cv2.minAreaRect(max_contour)
|
||||
box = cv2.boxPoints(rect)
|
||||
box = np.int32(box)
|
||||
|
||||
# 计算边界框的宽度和高度
|
||||
width = int(rect[1][0])
|
||||
height = int(rect[1][1])
|
||||
|
||||
# 输出边界框的信息
|
||||
# print(f"Bounding Box Center: ({rect[0][0]:.2f}, {rect[0][1]:.2f})")
|
||||
# print(f"Bounding Box Width: {width}")
|
||||
# print(f"Bounding Box Height: {height}")
|
||||
|
||||
if dislen >20 and rect[1][0] > 25 and rect[1][1] > 25 :
|
||||
ww = rect[1][0]*3.0/dislen
|
||||
hh = rect[1][1]*3.0/dislen
|
||||
# 格式化输出保留两位有效数字
|
||||
ww_text = f"w=: {ww:.2f}"
|
||||
hh_text = f"h=: {hh:.2f}"
|
||||
# 获取边界框左上角的坐标
|
||||
# x, y = np.min(box, axis=0)
|
||||
|
||||
# 计算中心点
|
||||
x = int(np.mean(box[:, 0]))
|
||||
y = int(np.mean(box[:, 1]))
|
||||
print(x,y)
|
||||
print(rect[1][0])
|
||||
print(ww_text)
|
||||
|
||||
# center_point = (center_x, center_y)
|
||||
|
||||
# 设置字体、缩放比例、颜色和厚度
|
||||
font = cv2.FONT_HERSHEY_SIMPLEX
|
||||
font_scale = 2
|
||||
color = (0, 0, 255) # BGR颜色格式,这里是蓝色
|
||||
thickness = 2
|
||||
|
||||
# 显示宽度信息
|
||||
cv2.putText(frameshow, ww_text, (x, y-10), font, font_scale, color, thickness)
|
||||
|
||||
# 显示高度信息
|
||||
cv2.putText(frameshow, hh_text, (x, y - 40), font, font_scale, color, thickness)
|
||||
|
||||
# 在原图上绘制边界框
|
||||
cv2.drawContours(frameshow, [box], 0, (0, 0, 255), 2)
|
||||
|
||||
# 显示结果帧
|
||||
# cv2.imshow('Orange Red Points and Bounding Box', frameshow)
|
||||
return frameshow
|
||||
|
||||
# 显示结果帧
|
||||
#cv2.imshow('Orange Red Points', frameshow)
|
||||
|
||||
|
||||
|
||||
# 打开USB摄像头
|
||||
cap = cv2.VideoCapture(0) # 0是默认的摄像头ID,如果有多个摄像头可能需要调整
|
||||
# 设置目标分辨率
|
||||
target_width = 1920
|
||||
target_height = 1080
|
||||
set_camera_resolution(cap, target_width, target_height)
|
||||
|
||||
if not cap.isOpened():
|
||||
print("无法打开摄像头")
|
||||
else:
|
||||
|
||||
last_save_time = time.time()
|
||||
interval = 10*20 # 10分钟间隔
|
||||
save_dir="data"
|
||||
save_path_format="frame_%m-%d_%H-%M.png"
|
||||
save_path_format_det="frame_%m-%d_%H-%M_det.png"
|
||||
if not os.path.exists(save_dir):
|
||||
os.makedirs(save_dir)
|
||||
|
||||
|
||||
# 设置显示窗口的目标尺寸
|
||||
display_width = 640
|
||||
display_height = 360
|
||||
|
||||
while True:
|
||||
ret, frame = cap.read()
|
||||
## height, width, channels = frame.shape[:3] # 获取帧的高度、宽度和通道数
|
||||
#print(f"Frame 尺寸: 宽度={width}, 高度={height}, 通道数={channels if len(frame.shape) == 3 else 1}")
|
||||
if not ret:
|
||||
print("无法获取帧")
|
||||
break
|
||||
if 0:
|
||||
#print(frame)
|
||||
dislen = findchesslen(frame)
|
||||
#dislen = 148.50
|
||||
print(f"cell={dislen}")
|
||||
cv2.waitKey(20)
|
||||
continue
|
||||
#continue
|
||||
#print(frame)
|
||||
#dislen = findchesslen(frame)
|
||||
|
||||
dislen = 147.50
|
||||
#print(f"cell={dislen}")
|
||||
# frame_det = frame
|
||||
frame_det = getseeds(frame,dislen)
|
||||
|
||||
rzframe = cv2.resize(frame, (display_width, display_height))
|
||||
cv2.imshow('orgimage', rzframe)
|
||||
|
||||
rzframe_det = cv2.resize(frame_det, (display_width, display_height))
|
||||
cv2.imshow('detimage', rzframe_det)
|
||||
|
||||
|
||||
current_time = time.time()
|
||||
if current_time - last_save_time > interval:
|
||||
# 根据当前时间生成文件名
|
||||
file_name = time.strftime(save_path_format, time.localtime(current_time))
|
||||
file_name_det = time.strftime(save_path_format_det, time.localtime(current_time))
|
||||
|
||||
save_path = os.path.join(save_dir, file_name)
|
||||
save_path_det = os.path.join(save_dir, file_name_det)
|
||||
|
||||
# 保存原始帧
|
||||
cv2.imwrite(save_path, frame)
|
||||
print(f"已保存帧到 {save_path}")
|
||||
|
||||
# 获取并保存检测结果帧
|
||||
cv2.imwrite(save_path_det, frame_det)
|
||||
print(f"已保存检测结果帧到 {save_path_det}")
|
||||
|
||||
last_save_time = current_time # 更新最后保存时间为当前时间
|
||||
|
||||
# 显示结果帧
|
||||
# cv2.imshow('Chessboard Corners', frame)
|
||||
if cv2.waitKey(1) & 0xFF == ord('q'):
|
||||
break
|
||||
|
||||
cap.release()
|
||||
cv2.destroyAllWindows()
|
||||
292
defect_detection_server.py
Normal file
292
defect_detection_server.py
Normal file
@@ -0,0 +1,292 @@
|
||||
# {
|
||||
# "success": true,
|
||||
# "detection_data": {
|
||||
# "defect_count": 5,
|
||||
# "total_defect_area": 123.45,
|
||||
# "total_crystal_area": 5000.0,
|
||||
# "defect_score": 2.47,
|
||||
# "algorithm_used": 1
|
||||
# },
|
||||
# "images": {
|
||||
# "original_image": "base64_string",
|
||||
# "binary_defects": "base64_string",
|
||||
# "image_with_defects": "base64_string",
|
||||
# "segmented_crystal": "base64_string"
|
||||
# }
|
||||
# }
|
||||
|
||||
import cv2
|
||||
import numpy as np
|
||||
import socket
|
||||
import struct
|
||||
import json
|
||||
import base64
|
||||
from io import BytesIO
|
||||
from PIL import Image
|
||||
from sklearn.mixture import GaussianMixture
|
||||
import matplotlib.pyplot as plt
|
||||
|
||||
class DefectDetectionServer:
|
||||
def __init__(self, host='localhost', port=8888):
|
||||
self.host = host
|
||||
self.port = port
|
||||
self.socket = None
|
||||
|
||||
def preprocess_image(self, image):
|
||||
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
|
||||
blurred = cv2.GaussianBlur(gray, (5, 5), 0)
|
||||
return blurred
|
||||
|
||||
def segment_crystal(self, blurred_image):
|
||||
_, binary = cv2.threshold(blurred_image, 30, 255, cv2.THRESH_BINARY)
|
||||
contours, _ = cv2.findContours(binary, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
|
||||
if len(contours) == 0:
|
||||
raise ValueError("No crystal detected in the image.")
|
||||
crystal_contour = max(contours, key=cv2.contourArea)
|
||||
mask = np.zeros_like(blurred_image)
|
||||
cv2.drawContours(mask, [crystal_contour], -1, 255, thickness=cv2.FILLED)
|
||||
segmented_crystal = cv2.bitwise_and(blurred_image, blurred_image, mask=mask)
|
||||
return segmented_crystal, crystal_contour, binary
|
||||
|
||||
def detect_defects_GMM(self, segmented_crystal, crystal_contour):
|
||||
# Extract the region of interest (ROI) using the crystal contour
|
||||
mask = np.zeros_like(segmented_crystal)
|
||||
cv2.drawContours(mask, [crystal_contour], -1, 255, thickness=cv2.FILLED)
|
||||
roi = cv2.bitwise_and(segmented_crystal, segmented_crystal, mask=mask)
|
||||
|
||||
# Flatten the ROI to a 1D array of pixel intensities
|
||||
pixel_intensities = roi.flatten()
|
||||
pixel_intensities = pixel_intensities[pixel_intensities > 0] # Remove background pixels
|
||||
|
||||
if len(pixel_intensities) < 10:
|
||||
raise ValueError("Not enough data points to fit Gaussian Mixture Model.")
|
||||
|
||||
# Reshape for GMM
|
||||
X = pixel_intensities.reshape(-1, 1)
|
||||
|
||||
# Fit a Gaussian Mixture Model with two components
|
||||
gmm = GaussianMixture(n_components=2, random_state=0).fit(X)
|
||||
|
||||
# Get the means and covariances of the fitted Gaussians
|
||||
means = gmm.means_.flatten()
|
||||
covars = gmm.covariances_.flatten()
|
||||
|
||||
# Determine which component corresponds to high brightness
|
||||
high_brightness_mean_index = np.argmax(means)
|
||||
high_brightness_mean = means[high_brightness_mean_index]
|
||||
high_brightness_covar = covars[high_brightness_mean_index]
|
||||
|
||||
# Calculate the probability density function (PDF) values for each pixel intensity
|
||||
pdf_values = gmm.score_samples(X)
|
||||
|
||||
# Set a threshold to identify high brightness regions
|
||||
threshold = np.percentile(pdf_values, 98) # Adjust this threshold as needed
|
||||
|
||||
# Identify high brightness pixels
|
||||
high_brightness_pixels = X[pdf_values >= threshold].flatten()
|
||||
|
||||
# Find contours corresponding to high brightness regions
|
||||
_, binary_high_brightness = cv2.threshold(roi, int(high_brightness_mean), 255, cv2.THRESH_BINARY)
|
||||
contours, _ = cv2.findContours(binary_high_brightness, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
|
||||
|
||||
defects = []
|
||||
for contour in contours:
|
||||
perimeter = cv2.arcLength(contour, True)
|
||||
if perimeter > 5:
|
||||
defects.append(contour)
|
||||
|
||||
# Create a black image with the same shape as the original image
|
||||
binary_defects = np.zeros_like(segmented_crystal, dtype=np.uint8)
|
||||
|
||||
# Draw high brightness regions on the binary defects image
|
||||
for y in range(segmented_crystal.shape[0]):
|
||||
for x in range(segmented_crystal.shape[1]):
|
||||
if mask[y, x] != 0 and segmented_crystal[y, x] >= high_brightness_mean:
|
||||
binary_defects[y, x] = 255
|
||||
|
||||
return defects, binary_defects
|
||||
|
||||
def detect_defects(self, segmented_crystal, crystal_contour):
|
||||
edges = cv2.Canny(segmented_crystal, 30, 90)
|
||||
mask = np.zeros_like(edges)
|
||||
cv2.drawContours(mask, [crystal_contour], -1, 255, thickness=cv2.FILLED)
|
||||
inverted_mask = mask
|
||||
defects_edges = cv2.bitwise_and(edges, inverted_mask)
|
||||
contours, _ = cv2.findContours(defects_edges, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
|
||||
defects = []
|
||||
for contour in contours:
|
||||
perimeter = cv2.arcLength(contour, True)
|
||||
if perimeter > 5:
|
||||
defects.append(contour)
|
||||
return defects, defects_edges
|
||||
|
||||
def calculate_total_area(self, crystal_contour):
|
||||
total_area = cv2.contourArea(crystal_contour)
|
||||
return total_area
|
||||
|
||||
def score_defects(self, defects, total_area):
|
||||
defect_area = sum(cv2.contourArea(defect) for defect in defects)
|
||||
score = (defect_area / total_area) * 100
|
||||
return score
|
||||
|
||||
def image_to_base64(self, image):
|
||||
"""Convert OpenCV image to base64 string"""
|
||||
_, buffer = cv2.imencode('.png', image)
|
||||
image_base64 = base64.b64encode(buffer).decode('utf-8')
|
||||
return image_base64
|
||||
|
||||
def base64_to_image(self, base64_string):
|
||||
"""Convert base64 string to OpenCV image"""
|
||||
image_data = base64.b64decode(base64_string)
|
||||
nparr = np.frombuffer(image_data, np.uint8)
|
||||
image = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
|
||||
return image
|
||||
|
||||
def process_image(self, image, algorithm):
|
||||
"""Process image and return results"""
|
||||
try:
|
||||
blurred_image = self.preprocess_image(image)
|
||||
segmented_crystal, crystal_contour, binary_image = self.segment_crystal(blurred_image)
|
||||
|
||||
if algorithm == 1:
|
||||
defects, binary_defects = self.detect_defects(segmented_crystal, crystal_contour)
|
||||
elif algorithm == 2:
|
||||
defects, binary_defects = self.detect_defects_GMM(segmented_crystal, crystal_contour)
|
||||
else:
|
||||
raise ValueError("Invalid algorithm. Use 1 or 2.")
|
||||
|
||||
total_area = self.calculate_total_area(crystal_contour)
|
||||
score = self.score_defects(defects, total_area)
|
||||
|
||||
# Draw defects on original image
|
||||
image_with_defects = cv2.drawContours(image.copy(), defects, -1, (0, 255, 0), 2)
|
||||
|
||||
# Prepare detection data
|
||||
detection_data = {
|
||||
'defect_count': len(defects),
|
||||
'total_defect_area': float(sum(cv2.contourArea(defect) for defect in defects)),
|
||||
'total_crystal_area': float(total_area),
|
||||
'defect_score': float(score),
|
||||
'algorithm_used': algorithm
|
||||
}
|
||||
|
||||
# Convert images to base64
|
||||
original_image_b64 = self.image_to_base64(image)
|
||||
binary_defects_b64 = self.image_to_base64(binary_defects)
|
||||
image_with_defects_b64 = self.image_to_base64(image_with_defects)
|
||||
segmented_crystal_b64 = self.image_to_base64(segmented_crystal)
|
||||
|
||||
result = {
|
||||
'success': True,
|
||||
'detection_data': detection_data,
|
||||
'images': {
|
||||
'original_image': original_image_b64,
|
||||
'binary_defects': binary_defects_b64,
|
||||
'image_with_defects': image_with_defects_b64,
|
||||
'segmented_crystal': segmented_crystal_b64
|
||||
}
|
||||
}
|
||||
|
||||
return result
|
||||
|
||||
except Exception as e:
|
||||
return {
|
||||
'success': False,
|
||||
'error': str(e)
|
||||
}
|
||||
|
||||
def send_data(self, conn, data):
|
||||
"""Send data with length prefix"""
|
||||
json_data = json.dumps(data)
|
||||
data_bytes = json_data.encode('utf-8')
|
||||
data_length = len(data_bytes)
|
||||
|
||||
# Send length first (4 bytes)
|
||||
conn.sendall(struct.pack('!I', data_length))
|
||||
# Send data
|
||||
conn.sendall(data_bytes)
|
||||
|
||||
def receive_data(self, conn):
|
||||
"""Receive data with length prefix"""
|
||||
# Receive length first (4 bytes)
|
||||
length_data = b''
|
||||
while len(length_data) < 4:
|
||||
chunk = conn.recv(4 - len(length_data))
|
||||
if not chunk:
|
||||
return None
|
||||
length_data += chunk
|
||||
|
||||
data_length = struct.unpack('!I', length_data)[0]
|
||||
|
||||
# Receive data
|
||||
received_data = b''
|
||||
while len(received_data) < data_length:
|
||||
chunk = conn.recv(data_length - len(received_data))
|
||||
if not chunk:
|
||||
return None
|
||||
received_data += chunk
|
||||
|
||||
return json.loads(received_data.decode('utf-8'))
|
||||
|
||||
def handle_client(self, conn, addr):
|
||||
"""Handle client connection"""
|
||||
print(f"Connected to {addr}")
|
||||
try:
|
||||
while True:
|
||||
# Receive request from client
|
||||
request = self.receive_data(conn)
|
||||
if not request:
|
||||
break
|
||||
|
||||
print(f"Received request from {addr}")
|
||||
|
||||
# Extract image and algorithm from request
|
||||
image_b64 = request.get('image')
|
||||
algorithm = request.get('algorithm', 1)
|
||||
|
||||
if not image_b64:
|
||||
response = {'success': False, 'error': 'No image provided'}
|
||||
else:
|
||||
# Convert base64 to image
|
||||
image = self.base64_to_image(image_b64)
|
||||
if image is None:
|
||||
response = {'success': False, 'error': 'Invalid image data'}
|
||||
else:
|
||||
# Process image
|
||||
response = self.process_image(image, algorithm)
|
||||
|
||||
# Send response back to client
|
||||
self.send_data(conn, response)
|
||||
print(f"Sent response to {addr}")
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error handling client {addr}: {e}")
|
||||
finally:
|
||||
conn.close()
|
||||
print(f"Connection to {addr} closed")
|
||||
|
||||
def start_server(self):
|
||||
"""Start the socket server"""
|
||||
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
|
||||
try:
|
||||
self.socket.bind((self.host, self.port))
|
||||
self.socket.listen(5)
|
||||
print(f"Server listening on {self.host}:{self.port}")
|
||||
|
||||
while True:
|
||||
conn, addr = self.socket.accept()
|
||||
self.handle_client(conn, addr)
|
||||
|
||||
except KeyboardInterrupt:
|
||||
print("\nServer shutting down...")
|
||||
except Exception as e:
|
||||
print(f"Server error: {e}")
|
||||
finally:
|
||||
if self.socket:
|
||||
self.socket.close()
|
||||
|
||||
if __name__ == "__main__":
|
||||
server = DefectDetectionServer(host='172.0.01', port=8888)
|
||||
server.start_server()
|
||||
209
seed_detection_server.py
Normal file
209
seed_detection_server.py
Normal file
@@ -0,0 +1,209 @@
|
||||
import socket
|
||||
import cv2
|
||||
import numpy as np
|
||||
import json
|
||||
import struct
|
||||
import time
|
||||
import re
|
||||
def getseeds(image, dislen=89.01):
|
||||
frame = image
|
||||
# 将图像从BGR转换为HSV
|
||||
hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
|
||||
|
||||
# 定义橘红色在HSV颜色空间中的范围
|
||||
# 色调(H):橘红色大致在0-20之间
|
||||
# 饱和度(S):通常较高的值表示更纯的颜色
|
||||
# 亮度(V):根据具体需求调整,这里我们选择较低的阈值以包含较暗的部分
|
||||
lower_orange_red = np.array([0, 100, 80])
|
||||
upper_orange_red = np.array([50, 255, 255])
|
||||
|
||||
# 创建掩码
|
||||
mask = cv2.inRange(hsv, lower_orange_red, upper_orange_red)
|
||||
|
||||
# 找到符合条件的像素坐标
|
||||
orange_red_points = cv2.findNonZero(mask)
|
||||
|
||||
frameshow = frame.copy()
|
||||
# 创建一个与原图大小相同的空白掩码
|
||||
mask = np.zeros_like(frame[:, :, 0]) # 单通道掩码
|
||||
|
||||
if orange_red_points is not None:
|
||||
for point in orange_red_points:
|
||||
x, y = point[0]
|
||||
cv2.circle(frameshow, (x, y), 1, (0, 255, 0), -1) # 使用绿色标记点
|
||||
cv2.circle(mask, (x, y), 1, 255, -1) # 使用白色标记点
|
||||
|
||||
# 查找轮廓
|
||||
contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
|
||||
|
||||
if contours:
|
||||
for max_contour in contours:
|
||||
# 计算最小边界矩形(旋转)
|
||||
rect = cv2.minAreaRect(max_contour)
|
||||
box = cv2.boxPoints(rect)
|
||||
box = np.int32(box)
|
||||
|
||||
# 计算边界框的宽度和高度
|
||||
width = int(rect[1][0])
|
||||
height = int(rect[1][1])
|
||||
|
||||
if dislen > 20 and rect[1][0] > 25 and rect[1][1] > 25:
|
||||
ww = rect[1][0] * 3.0 / dislen
|
||||
hh = rect[1][1] * 3.0 / dislen
|
||||
# 格式化输出保留两位有效数字
|
||||
ww_text = f"w=: {ww:.2f}"
|
||||
hh_text = f"h=: {hh:.2f}"
|
||||
|
||||
# 计算中心点
|
||||
x = int(np.mean(box[:, 0]))
|
||||
y = int(np.mean(box[:, 1]))
|
||||
|
||||
# 设置字体、缩放比例、颜色和厚度
|
||||
font = cv2.FONT_HERSHEY_SIMPLEX
|
||||
font_scale = 2
|
||||
color = (0, 0, 255) # BGR颜色格式,这里是蓝色
|
||||
thickness = 2
|
||||
|
||||
# 显示宽度信息
|
||||
cv2.putText(frameshow, ww_text, (x, y-10), font, font_scale, color, thickness)
|
||||
|
||||
# 显示高度信息
|
||||
cv2.putText(frameshow, hh_text, (x, y - 40), font, font_scale, color, thickness)
|
||||
|
||||
# 在原图上绘制边界框
|
||||
cv2.drawContours(frameshow, [box], 0, (0, 0, 255), 2)
|
||||
|
||||
try:
|
||||
ww_text
|
||||
hh_text
|
||||
except NameError:
|
||||
ww_text = "w=: 0.00"
|
||||
hh_text = "h=: 0.00"
|
||||
|
||||
return ww_text,hh_text,frameshow
|
||||
|
||||
def extract_number_from_string(value_str):
|
||||
"""从字符串中提取数字部分"""
|
||||
try:
|
||||
# 如果已经是数字类型,直接返回
|
||||
if isinstance(value_str, (int, float)):
|
||||
return float(value_str)
|
||||
|
||||
# 如果是字符串,使用正则表达式提取数字
|
||||
if isinstance(value_str, str):
|
||||
# 匹配数字(包括小数)
|
||||
match = re.search(r'\d+\.?\d*', value_str)
|
||||
if match:
|
||||
return float(match.group())
|
||||
else:
|
||||
print(f"无法从字符串 '{value_str}' 中提取数字")
|
||||
return 0.0
|
||||
|
||||
# 其他情况返回0
|
||||
return 0.0
|
||||
except Exception as e:
|
||||
print(f"提取数字时出错: {e}")
|
||||
return 0.0
|
||||
|
||||
def receive_image(sock):
|
||||
"""从socket接收图像数据"""
|
||||
try:
|
||||
# 首先接收图像数据长度
|
||||
raw_msglen = recvall(sock, 4)
|
||||
if not raw_msglen:
|
||||
return None
|
||||
msglen = struct.unpack('>I', raw_msglen)[0]
|
||||
print(f"准备接收图像,大小: {msglen} 字节")
|
||||
|
||||
# 接收图像数据
|
||||
raw_data = recvall(sock, msglen)
|
||||
if not raw_data:
|
||||
return None
|
||||
|
||||
# 将字节数组转换为numpy数组,然后解码为图像
|
||||
nparr = np.frombuffer(raw_data, np.uint8)
|
||||
img = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
|
||||
return img
|
||||
except Exception as e:
|
||||
print(f"接收图像时发生错误: {e}")
|
||||
return None
|
||||
|
||||
def send_image(sock, img, width, height):
|
||||
"""通过socket发送图像数据"""
|
||||
try:
|
||||
# 将图像编码为jpg格式
|
||||
_, encoded_img = cv2.imencode('.jpg', img, [cv2.IMWRITE_JPEG_QUALITY, 80])
|
||||
data = encoded_img.tobytes()
|
||||
|
||||
width_float = extract_number_from_string(width)
|
||||
height_float = extract_number_from_string(height)
|
||||
|
||||
# 发送数据长度
|
||||
sock.sendall(struct.pack('>I', len(data)))
|
||||
# 发送width
|
||||
sock.sendall(struct.pack('>f', float(width_float)))
|
||||
# 发送height
|
||||
sock.sendall(struct.pack('>f', float(height_float)))
|
||||
# 发送图像数据
|
||||
sock.sendall(data)
|
||||
|
||||
return True
|
||||
except Exception as e:
|
||||
print(f"发送图像时发生错误: {e}")
|
||||
return False
|
||||
|
||||
def recvall(sock, n):
|
||||
"""辅助函数,确保接收到n个字节的数据"""
|
||||
data = bytearray()
|
||||
while len(data) < n:
|
||||
packet = sock.recv(n - len(data))
|
||||
if not packet:
|
||||
return None
|
||||
data.extend(packet)
|
||||
return data
|
||||
|
||||
def main():
|
||||
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
|
||||
# 绑定IP和端口
|
||||
HOST = '127.0.0.1'
|
||||
PORT = 8888
|
||||
server_socket.bind((HOST, PORT))
|
||||
server_socket.listen(1)
|
||||
|
||||
print(f"Python图像检测服务器启动,监听 {HOST}:{PORT}")
|
||||
|
||||
while True:
|
||||
try:
|
||||
# 等待客户端连接
|
||||
client_socket, addr = server_socket.accept()
|
||||
print(f"客户端连接来自: {addr}")
|
||||
|
||||
while True:
|
||||
# 接收图像
|
||||
image = receive_image(client_socket)
|
||||
if image is None:
|
||||
print("客户端断开连接")
|
||||
break
|
||||
|
||||
print(f"接收到图像,大小: {image.shape}")
|
||||
|
||||
# 进行图像检测处理
|
||||
dislen = 147.50 # 固定值,也可以根据需要调整
|
||||
width, height, processed_image = getseeds(image, dislen)
|
||||
|
||||
# 发送处理结果
|
||||
if not send_image(client_socket, processed_image, width, height):
|
||||
print("发送结果图像失败")
|
||||
break
|
||||
|
||||
print("结果图像发送成功")
|
||||
|
||||
except Exception as e:
|
||||
print(f"处理客户端连接时发生错误: {e}")
|
||||
finally:
|
||||
client_socket.close()
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
39
server.py
Normal file
39
server.py
Normal file
@@ -0,0 +1,39 @@
|
||||
import socket
|
||||
import threading
|
||||
|
||||
# with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
|
||||
# s.bind(("0.0.0.0",1234))
|
||||
# s.listen()
|
||||
# c, addr=s.accept()
|
||||
# with c:
|
||||
# print(addr,"connected.")
|
||||
|
||||
# while True:
|
||||
# data= c.recv(1024)
|
||||
# if not data:
|
||||
# break
|
||||
# c.sendall(data)
|
||||
|
||||
# 多线程解决多链接并发问题
|
||||
# def handle_client(c, addr):
|
||||
# print(addr,"connected.")
|
||||
# while True:
|
||||
# data = c.recv(1024)
|
||||
# if not data:
|
||||
# break
|
||||
# c.sendall(data)
|
||||
|
||||
# with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
|
||||
# s.bind(("0.0.0.0",1234))
|
||||
# s.listen()
|
||||
# while True:
|
||||
# c,addr = s.accept()
|
||||
# t=threading.Thread(target=handle_client, args=(c, addr))
|
||||
# t.start()
|
||||
|
||||
# 客户端
|
||||
with socket.socket(socket.AF_INET, socket.SOCK_STREAM)as s:
|
||||
s.connect(("192.168.124.4",8888))
|
||||
s.sendall(b"Hello, Ross!")
|
||||
data = s.recv(1024)
|
||||
print("Received:",repr(data))
|
||||
229
xiaci.py
Normal file
229
xiaci.py
Normal file
@@ -0,0 +1,229 @@
|
||||
import cv2
|
||||
import numpy as np
|
||||
from tkinter import Tk, Button, filedialog, Label, messagebox, Label, messagebox, StringVar, OptionMenu
|
||||
from PIL import Image, ImageTk
|
||||
import os
|
||||
import cv2
|
||||
import numpy as np
|
||||
from sklearn.mixture import GaussianMixture
|
||||
import matplotlib.pyplot as plt
|
||||
|
||||
def preprocess_image(image_path):
|
||||
image = cv2.imread(image_path)
|
||||
if image is None:
|
||||
raise FileNotFoundError(f"Image not found or unable to read: {image_path}")
|
||||
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
|
||||
blurred = cv2.GaussianBlur(gray, (5, 5), 0)
|
||||
return blurred, image
|
||||
|
||||
def segment_crystal(blurred_image):
|
||||
_, binary = cv2.threshold(blurred_image, 30, 255, cv2.THRESH_BINARY)
|
||||
contours, _ = cv2.findContours(binary, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
|
||||
if len(contours) == 0:
|
||||
raise ValueError("No crystal detected in the image.")
|
||||
crystal_contour = max(contours, key=cv2.contourArea)
|
||||
mask = np.zeros_like(blurred_image)
|
||||
cv2.drawContours(mask, [crystal_contour], -1, 255, thickness=cv2.FILLED)
|
||||
segmented_crystal = cv2.bitwise_and(blurred_image, blurred_image, mask=mask)
|
||||
return segmented_crystal, crystal_contour, binary
|
||||
|
||||
def detect_defects_GMM(segmented_crystal, crystal_contour):
|
||||
# Extract the region of interest (ROI) using the crystal contour
|
||||
mask = np.zeros_like(segmented_crystal)
|
||||
cv2.drawContours(mask, [crystal_contour], -1, 255, thickness=cv2.FILLED)
|
||||
roi = cv2.bitwise_and(segmented_crystal, segmented_crystal, mask=mask)
|
||||
|
||||
# Flatten the ROI to a 1D array of pixel intensities
|
||||
pixel_intensities = roi.flatten()
|
||||
pixel_intensities = pixel_intensities[pixel_intensities > 0] # Remove background pixels
|
||||
|
||||
if len(pixel_intensities) < 10:
|
||||
raise ValueError("Not enough data points to fit Gaussian Mixture Model.")
|
||||
|
||||
# Reshape for GMM
|
||||
X = pixel_intensities.reshape(-1, 1)
|
||||
|
||||
# Fit a Gaussian Mixture Model with two components
|
||||
gmm = GaussianMixture(n_components=2, random_state=0).fit(X)
|
||||
|
||||
# Get the means and covariances of the fitted Gaussians
|
||||
means = gmm.means_.flatten()
|
||||
covars = gmm.covariances_.flatten()
|
||||
|
||||
# Determine which component corresponds to high brightness
|
||||
high_brightness_mean_index = np.argmax(means)
|
||||
high_brightness_mean = means[high_brightness_mean_index]
|
||||
high_brightness_covar = covars[high_brightness_mean_index]
|
||||
|
||||
# Calculate the probability density function (PDF) values for each pixel intensity
|
||||
pdf_values = gmm.score_samples(X)
|
||||
|
||||
# Set a threshold to identify high brightness regions
|
||||
threshold = np.percentile(pdf_values, 98) # Adjust this threshold as needed
|
||||
|
||||
# Identify high brightness pixels
|
||||
high_brightness_pixels = X[pdf_values >= threshold].flatten()
|
||||
|
||||
# Find contours corresponding to high brightness regions
|
||||
_, binary_high_brightness = cv2.threshold(roi, int(high_brightness_mean), 255, cv2.THRESH_BINARY)
|
||||
contours, _ = cv2.findContours(binary_high_brightness, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
|
||||
|
||||
defects = []
|
||||
for contour in contours:
|
||||
perimeter = cv2.arcLength(contour, True)
|
||||
if perimeter > 5:
|
||||
defects.append(contour)
|
||||
|
||||
# Create a black image with the same shape as the original image
|
||||
binary_defects = np.zeros_like(segmented_crystal, dtype=np.uint8)
|
||||
|
||||
# Draw high brightness regions on the binary defects image
|
||||
for y in range(segmented_crystal.shape[0]):
|
||||
for x in range(segmented_crystal.shape[1]):
|
||||
if mask[y, x] != 0 and segmented_crystal[y, x] >= high_brightness_mean:
|
||||
binary_defects[y, x] = 255
|
||||
|
||||
# Generate a GMM plot image
|
||||
fig, ax = plt.subplots(figsize=(4.8, 3.6))
|
||||
ax.hist(pixel_intensities, bins=50, density=True, alpha=0.5, color='gray', edgecolor='black')
|
||||
|
||||
x_vals = np.linspace(0, 255, 1000).reshape(-1, 1)
|
||||
log_prob = gmm.score_samples(x_vals)
|
||||
responsibilities = gmm.predict_proba(x_vals)
|
||||
|
||||
# Convert the plot to an image
|
||||
fig.canvas.draw()
|
||||
gmm_plot_img = np.frombuffer(fig.canvas.tostring_rgb(), dtype=np.uint8)
|
||||
gmm_plot_img = gmm_plot_img.reshape(fig.canvas.get_width_height()[::-1] + (3,))
|
||||
plt.close(fig)
|
||||
|
||||
cv2.imshow("gmm",gmm_plot_img)
|
||||
cv2.waitKey(10)
|
||||
|
||||
return defects, binary_defects
|
||||
|
||||
def detect_defects(segmented_crystal, crystal_contour):
|
||||
edges = cv2.Canny(segmented_crystal, 30, 90)
|
||||
mask = np.zeros_like(edges)
|
||||
cv2.drawContours(mask, [crystal_contour], -1, 255, thickness=cv2.FILLED)
|
||||
inverted_mask = mask#cv2.bitwise_not(mask)
|
||||
defects_edges = cv2.bitwise_and(edges, inverted_mask)
|
||||
contours, _ = cv2.findContours(defects_edges, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
|
||||
defects = []
|
||||
for contour in contours:
|
||||
perimeter = cv2.arcLength(contour, True)
|
||||
if perimeter > 5:
|
||||
defects.append(contour)
|
||||
return defects,defects_edges
|
||||
|
||||
def calculate_total_area(crystal_contour):
|
||||
total_area = cv2.contourArea(crystal_contour)
|
||||
return total_area
|
||||
|
||||
def score_defects(defects, total_area):
|
||||
defect_area = sum(cv2.contourArea(defect) for defect in defects)
|
||||
score = (defect_area / total_area) * 100
|
||||
return score
|
||||
|
||||
def resize_image_for_display(image, width=480, height=360):
|
||||
return cv2.resize(image, (width, height))
|
||||
|
||||
def process_image():
|
||||
global original_image, binary_image, image_with_defects, segmented_crystal, crystal_contour, defects, total_area, score
|
||||
|
||||
# Get the directory of the current script
|
||||
script_dir = os.path.dirname(os.path.abspath(__file__))
|
||||
|
||||
# Open file dialog starting from the script's directory
|
||||
image_path = filedialog.askopenfilename(initialdir=script_dir, title="Select an Image",
|
||||
filetypes=(("JPEG files", "*.jpg *.jpeg"), ("PNG files", "*.png"), ("All files", "*.*")))
|
||||
|
||||
if not image_path:
|
||||
return
|
||||
|
||||
try:
|
||||
blurred_image, original_image = preprocess_image(image_path)
|
||||
segmented_crystal, crystal_contour, binary_image = segment_crystal(blurred_image)
|
||||
selected_algorithm = algorithm_var.get()
|
||||
if selected_algorithm == '1':
|
||||
defects,binary_image = detect_defects(segmented_crystal, crystal_contour)
|
||||
elif selected_algorithm == '2':
|
||||
defects,binary_image = detect_defects_GMM(segmented_crystal, crystal_contour)
|
||||
|
||||
total_area = calculate_total_area(crystal_contour)
|
||||
score = score_defects(defects, total_area)
|
||||
|
||||
result_text = (
|
||||
f"Detected {len(defects)} defects.\n"
|
||||
f"Total Defect Area: {sum(cv2.contourArea(defect) for defect in defects):.2f} pixels\n"
|
||||
f"Total Crystal Area: {total_area} pixels\n"
|
||||
f"Defect Score (% of Total Area): {score:.2f}%"
|
||||
)
|
||||
result_label.config(text=result_text)
|
||||
|
||||
print(result_text)
|
||||
|
||||
image_with_defects = cv2.drawContours(original_image.copy(), defects, -1, (0, 255, 0), 2)
|
||||
|
||||
update_images()
|
||||
except Exception as e:
|
||||
messagebox.showerror("Error", str(e))
|
||||
|
||||
def update_images():
|
||||
images = [
|
||||
("Original Image", resize_image_for_display(original_image)),
|
||||
("Binary Defect Image", resize_image_for_display(binary_image)),
|
||||
("Image with Defects", resize_image_for_display(image_with_defects)),
|
||||
("Segmented Crystal", resize_image_for_display(segmented_crystal))
|
||||
]
|
||||
|
||||
for i, (label_text, img) in enumerate(images):
|
||||
label = labels[i]
|
||||
label.config(text=label_text)
|
||||
|
||||
img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
|
||||
img_pil = Image.fromarray(img_rgb)
|
||||
img_tk = ImageTk.PhotoImage(img_pil)
|
||||
|
||||
image_labels[i].config(image=img_tk)
|
||||
image_labels[i].image = img_tk
|
||||
|
||||
root = Tk()
|
||||
root.title("Crystal Defect Detection")
|
||||
|
||||
original_image = None
|
||||
binary_image = None
|
||||
image_with_defects = None
|
||||
segmented_crystal = None
|
||||
crystal_contour = None
|
||||
defects = []
|
||||
total_area = 0
|
||||
score = 0
|
||||
|
||||
labels = [Label(root, text=f"Image {i+1}") for i in range(4)]
|
||||
image_labels = [Label(root) for i in range(4)]
|
||||
|
||||
# Add a label to display the detection results
|
||||
result_label = Label(root, text="", justify='left')
|
||||
|
||||
button_open = Button(root, text="Open Image", command=process_image)
|
||||
|
||||
# Algorithm selection dropdown menu
|
||||
algorithm_var = StringVar(value='1') # Default to algorithm 1
|
||||
algorithm_menu = OptionMenu(root, algorithm_var, '1', '2')
|
||||
algorithm_menu.config(width=10)
|
||||
|
||||
# Arrange labels and images in a 2x2 grid
|
||||
for i in range(4):
|
||||
row = i // 2
|
||||
col = i % 2
|
||||
labels[i].grid(row=row*2, column=col*2, padx=10, pady=5)
|
||||
image_labels[i].grid(row=row*2+1, column=col*2, padx=10, pady=5)
|
||||
|
||||
# Place the result label below the images
|
||||
result_label.grid(row=4, column=0, columnspan=2, padx=10, pady=10)
|
||||
|
||||
button_open.grid(row=5, column=0, columnspan=2, padx=10, pady=10)
|
||||
algorithm_menu.grid(row=5, column=1, columnspan=2, padx=30, pady=10)
|
||||
|
||||
root.mainloop()
|
||||
Reference in New Issue
Block a user