# { # "success": true, # "detection_data": { # "defect_count": 5, # "total_defect_area": 123.45, # "total_crystal_area": 5000.0, # "defect_score": 2.47, # "algorithm_used": 1 # }, # "images": { # "original_image": "base64_string", # "binary_defects": "base64_string", # "image_with_defects": "base64_string", # "segmented_crystal": "base64_string" # } # } import cv2 import numpy as np import socket import struct import json import base64 from io import BytesIO from PIL import Image from sklearn.mixture import GaussianMixture import matplotlib.pyplot as plt class DefectDetectionServer: def __init__(self, host='localhost', port=8888): self.host = host self.port = port self.socket = None def preprocess_image(self, image): gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) blurred = cv2.GaussianBlur(gray, (5, 5), 0) return blurred def segment_crystal(self, blurred_image): _, binary = cv2.threshold(blurred_image, 30, 255, cv2.THRESH_BINARY) contours, _ = cv2.findContours(binary, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) if len(contours) == 0: raise ValueError("No crystal detected in the image.") crystal_contour = max(contours, key=cv2.contourArea) mask = np.zeros_like(blurred_image) cv2.drawContours(mask, [crystal_contour], -1, 255, thickness=cv2.FILLED) segmented_crystal = cv2.bitwise_and(blurred_image, blurred_image, mask=mask) return segmented_crystal, crystal_contour, binary def detect_defects_GMM(self, segmented_crystal, crystal_contour): # Extract the region of interest (ROI) using the crystal contour mask = np.zeros_like(segmented_crystal) cv2.drawContours(mask, [crystal_contour], -1, 255, thickness=cv2.FILLED) roi = cv2.bitwise_and(segmented_crystal, segmented_crystal, mask=mask) # Flatten the ROI to a 1D array of pixel intensities pixel_intensities = roi.flatten() pixel_intensities = pixel_intensities[pixel_intensities > 0] # Remove background pixels if len(pixel_intensities) < 10: raise ValueError("Not enough data points to fit Gaussian Mixture Model.") # Reshape for GMM X = pixel_intensities.reshape(-1, 1) # Fit a Gaussian Mixture Model with two components gmm = GaussianMixture(n_components=2, random_state=0).fit(X) # Get the means and covariances of the fitted Gaussians means = gmm.means_.flatten() covars = gmm.covariances_.flatten() # Determine which component corresponds to high brightness high_brightness_mean_index = np.argmax(means) high_brightness_mean = means[high_brightness_mean_index] high_brightness_covar = covars[high_brightness_mean_index] # Calculate the probability density function (PDF) values for each pixel intensity pdf_values = gmm.score_samples(X) # Set a threshold to identify high brightness regions threshold = np.percentile(pdf_values, 98) # Adjust this threshold as needed # Identify high brightness pixels high_brightness_pixels = X[pdf_values >= threshold].flatten() # Find contours corresponding to high brightness regions _, binary_high_brightness = cv2.threshold(roi, int(high_brightness_mean), 255, cv2.THRESH_BINARY) contours, _ = cv2.findContours(binary_high_brightness, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) defects = [] for contour in contours: perimeter = cv2.arcLength(contour, True) if perimeter > 5: defects.append(contour) # Create a black image with the same shape as the original image binary_defects = np.zeros_like(segmented_crystal, dtype=np.uint8) # Draw high brightness regions on the binary defects image for y in range(segmented_crystal.shape[0]): for x in range(segmented_crystal.shape[1]): if mask[y, x] != 0 and segmented_crystal[y, x] >= high_brightness_mean: binary_defects[y, x] = 255 return defects, binary_defects def detect_defects(self, segmented_crystal, crystal_contour): edges = cv2.Canny(segmented_crystal, 30, 90) mask = np.zeros_like(edges) cv2.drawContours(mask, [crystal_contour], -1, 255, thickness=cv2.FILLED) inverted_mask = mask defects_edges = cv2.bitwise_and(edges, inverted_mask) contours, _ = cv2.findContours(defects_edges, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) defects = [] for contour in contours: perimeter = cv2.arcLength(contour, True) if perimeter > 5: defects.append(contour) return defects, defects_edges def calculate_total_area(self, crystal_contour): total_area = cv2.contourArea(crystal_contour) return total_area def score_defects(self, defects, total_area): defect_area = sum(cv2.contourArea(defect) for defect in defects) score = (defect_area / total_area) * 100 return score def image_to_base64(self, image): """Convert OpenCV image to base64 string""" _, buffer = cv2.imencode('.png', image) image_base64 = base64.b64encode(buffer).decode('utf-8') return image_base64 def base64_to_image(self, base64_string): """Convert base64 string to OpenCV image""" image_data = base64.b64decode(base64_string) nparr = np.frombuffer(image_data, np.uint8) image = cv2.imdecode(nparr, cv2.IMREAD_COLOR) return image def process_image(self, image, algorithm): """Process image and return results""" try: blurred_image = self.preprocess_image(image) segmented_crystal, crystal_contour, binary_image = self.segment_crystal(blurred_image) if algorithm == 1: defects, binary_defects = self.detect_defects(segmented_crystal, crystal_contour) elif algorithm == 2: defects, binary_defects = self.detect_defects_GMM(segmented_crystal, crystal_contour) else: raise ValueError("Invalid algorithm. Use 1 or 2.") total_area = self.calculate_total_area(crystal_contour) score = self.score_defects(defects, total_area) # Draw defects on original image image_with_defects = cv2.drawContours(image.copy(), defects, -1, (0, 255, 0), 2) # Prepare detection data detection_data = { 'defect_count': len(defects), 'total_defect_area': float(sum(cv2.contourArea(defect) for defect in defects)), 'total_crystal_area': float(total_area), 'defect_score': float(score), 'algorithm_used': algorithm } # Convert images to base64 original_image_b64 = self.image_to_base64(image) binary_defects_b64 = self.image_to_base64(binary_defects) image_with_defects_b64 = self.image_to_base64(image_with_defects) segmented_crystal_b64 = self.image_to_base64(segmented_crystal) result = { 'success': True, 'detection_data': detection_data, 'images': { 'original_image': original_image_b64, 'binary_defects': binary_defects_b64, 'image_with_defects': image_with_defects_b64, 'segmented_crystal': segmented_crystal_b64 } } return result except Exception as e: return { 'success': False, 'error': str(e) } def send_data(self, conn, data): """Send data with length prefix""" json_data = json.dumps(data) data_bytes = json_data.encode('utf-8') data_length = len(data_bytes) # Send length first (4 bytes) conn.sendall(struct.pack('!I', data_length)) # Send data conn.sendall(data_bytes) def receive_data(self, conn): """Receive data with length prefix""" # Receive length first (4 bytes) length_data = b'' while len(length_data) < 4: chunk = conn.recv(4 - len(length_data)) if not chunk: return None length_data += chunk data_length = struct.unpack('!I', length_data)[0] # Receive data received_data = b'' while len(received_data) < data_length: chunk = conn.recv(data_length - len(received_data)) if not chunk: return None received_data += chunk return json.loads(received_data.decode('utf-8')) def handle_client(self, conn, addr): """Handle client connection""" print(f"Connected to {addr}") try: while True: # Receive request from client request = self.receive_data(conn) if not request: break print(f"Received request from {addr}") # Extract image and algorithm from request image_b64 = request.get('image') algorithm = request.get('algorithm', 1) if not image_b64: response = {'success': False, 'error': 'No image provided'} else: # Convert base64 to image image = self.base64_to_image(image_b64) if image is None: response = {'success': False, 'error': 'Invalid image data'} else: # Process image response = self.process_image(image, algorithm) # Send response back to client self.send_data(conn, response) print(f"Sent response to {addr}") except Exception as e: print(f"Error handling client {addr}: {e}") finally: conn.close() print(f"Connection to {addr} closed") def start_server(self): """Start the socket server""" self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) try: self.socket.bind((self.host, self.port)) self.socket.listen(5) print(f"Server listening on {self.host}:{self.port}") while True: conn, addr = self.socket.accept() self.handle_client(conn, addr) except KeyboardInterrupt: print("\nServer shutting down...") except Exception as e: print(f"Server error: {e}") finally: if self.socket: self.socket.close() if __name__ == "__main__": server = DefectDetectionServer(host='172.0.01', port=8888) server.start_server()