Spaces:
Running
Running
# import os | |
# from pathlib import Path | |
# from typing import List, Union | |
# from PIL import Image | |
# import ezdxf.units | |
# import numpy as np | |
# import torch | |
# from torchvision import transforms | |
# from ultralytics import YOLOWorld, YOLO | |
# from ultralytics.engine.results import Results | |
# from ultralytics.utils.plotting import save_one_box | |
# from transformers import AutoModelForImageSegmentation | |
# import cv2 | |
# import ezdxf | |
# import gradio as gr | |
# import gc | |
# from scalingtestupdated import calculate_scaling_factor | |
# from scipy.interpolate import splprep, splev | |
# from scipy.ndimage import gaussian_filter1d | |
# import json | |
# import time | |
# import signal | |
# from shapely.ops import unary_union | |
# from shapely.geometry import MultiPolygon, GeometryCollection, Polygon, Point | |
# from u2netp import U2NETP # Add U2NETP import | |
# import logging | |
# import shutil | |
# # Initialize logging | |
# logging.basicConfig(level=logging.INFO) | |
# logger = logging.getLogger(__name__) | |
# # Create cache directory for models | |
# CACHE_DIR = os.path.join(os.path.dirname(__file__), ".cache") | |
# os.makedirs(CACHE_DIR, exist_ok=True) | |
# # Custom Exception Classes | |
# class TimeoutReachedError(Exception): | |
# pass | |
# class BoundaryOverlapError(Exception): | |
# pass | |
# class TextOverlapError(Exception): | |
# pass | |
# class ReferenceBoxNotDetectedError(Exception): | |
# """Raised when the Reference coin cannot be detected in the image""" | |
# pass | |
# class FingerCutOverlapError(Exception): | |
# """Raised when finger cuts overlap with existing geometry""" | |
# def __init__(self, message="There was an overlap with fingercuts... Please try again to generate dxf."): | |
# super().__init__(message) | |
# # Global model initialization | |
# print("Loading models...") | |
# start_time = time.time() | |
# # Load YOLO reference model | |
# reference_model_path = os.path.join("", "best1.pt") | |
# if not os.path.exists(reference_model_path): | |
# shutil.copy("best1.pt", reference_model_path) | |
# reference_detector_global = YOLO(reference_model_path) | |
# # Load U2NETP model | |
# u2net_model_path = os.path.join(CACHE_DIR, "u2netp.pth") | |
# if not os.path.exists(u2net_model_path): | |
# shutil.copy("u2netp.pth", u2net_model_path) | |
# u2net_global = U2NETP(3, 1) | |
# u2net_global.load_state_dict(torch.load(u2net_model_path, map_location="cpu")) | |
# # Load BiRefNet model | |
# birefnet = AutoModelForImageSegmentation.from_pretrained( | |
# "zhengpeng7/BiRefNet", trust_remote_code=True, cache_dir=CACHE_DIR | |
# ) | |
# device = "cpu" | |
# torch.set_float32_matmul_precision(["high", "highest"][0]) | |
# # Move models to device | |
# u2net_global.to(device) | |
# u2net_global.eval() | |
# birefnet.to(device) | |
# birefnet.eval() | |
# # Define transforms | |
# transform_image = transforms.Compose([ | |
# transforms.Resize((1024, 1024)), | |
# transforms.ToTensor(), | |
# transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]), | |
# ]) | |
# # Language translations dictionary remains unchanged | |
# TRANSLATIONS = { | |
# "english": { | |
# "input_image": "Input Image", | |
# "offset_value": "Offset value", | |
# "offset_unit": "Offset unit (mm/in)", | |
# "enable_finger": "Enable Finger Clearance", | |
# "edge_radius": "Edge rounding radius (mm)", | |
# "output_image": "Output Image", | |
# "outlines": "Outlines of Objects", | |
# "dxf_file": "DXF file", | |
# "mask": "Mask", | |
# "enable_radius": "Enable Edge Rounding", | |
# "radius_disabled": "Rounding Disabled", | |
# "scaling_factor": "Scaling Factor(mm)", | |
# "scaling_placeholder": "Every pixel is equal to mentioned number in millimeters", | |
# "language_selector": "Select Language", | |
# }, | |
# "dutch": { | |
# "input_image": "Invoer Afbeelding", | |
# "offset_value": "Offset waarde", | |
# "offset_unit": "Offset unit (mm/inch)", | |
# "enable_finger": "Finger Clearance inschakelen", | |
# "edge_radius": "Ronding radius rand (mm)", | |
# "output_image": "Uitvoer Afbeelding", | |
# "outlines": "Contouren van Objecten", | |
# "dxf_file": "DXF bestand", | |
# "mask": "Masker", | |
# "enable_radius": "Ronding inschakelen", | |
# "radius_disabled": "Ronding uitgeschakeld", | |
# "scaling_factor": "Schalingsfactor(mm)", | |
# "scaling_placeholder": "Elke pixel is gelijk aan genoemd aantal in millimeters", | |
# "language_selector": "Selecteer Taal", | |
# } | |
# } | |
# def remove_bg_u2netp(image: np.ndarray) -> np.ndarray: | |
# """Remove background using U2NETP model specifically for reference objects""" | |
# try: | |
# image_pil = Image.fromarray(image) | |
# transform_u2netp = transforms.Compose([ | |
# transforms.Resize((320, 320)), | |
# transforms.ToTensor(), | |
# transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]), | |
# ]) | |
# input_tensor = transform_u2netp(image_pil).unsqueeze(0).to(device) | |
# with torch.no_grad(): | |
# outputs = u2net_global(input_tensor) | |
# pred = outputs[0] | |
# pred = (pred - pred.min()) / (pred.max() - pred.min() + 1e-8) | |
# pred_np = pred.squeeze().cpu().numpy() | |
# pred_np = cv2.resize(pred_np, (image_pil.width, image_pil.height)) | |
# pred_np = (pred_np * 255).astype(np.uint8) | |
# return pred_np | |
# except Exception as e: | |
# logger.error(f"Error in U2NETP background removal: {e}") | |
# raise | |
# def remove_bg(image: np.ndarray) -> np.ndarray: | |
# """Remove background using BiRefNet model for main objects""" | |
# try: | |
# image = Image.fromarray(image) | |
# input_images = transform_image(image).unsqueeze(0).to(device) | |
# with torch.no_grad(): | |
# preds = birefnet(input_images)[-1].sigmoid().cpu() | |
# pred = preds[0].squeeze() | |
# pred_pil: Image = transforms.ToPILImage()(pred) | |
# scale_ratio = 1024 / max(image.size) | |
# scaled_size = (int(image.size[0] * scale_ratio), int(image.size[1] * scale_ratio)) | |
# return np.array(pred_pil.resize(scaled_size)) | |
# except Exception as e: | |
# logger.error(f"Error in BiRefNet background removal: {e}") | |
# raise | |
# def resize_img(img: np.ndarray, resize_dim): | |
# return np.array(Image.fromarray(img).resize(resize_dim)) | |
# def make_square(img: np.ndarray): | |
# """Make the image square by padding""" | |
# height, width = img.shape[:2] | |
# max_dim = max(height, width) | |
# pad_height = (max_dim - height) // 2 | |
# pad_width = (max_dim - width) // 2 | |
# pad_height_extra = max_dim - height - 2 * pad_height | |
# pad_width_extra = max_dim - width - 2 * pad_width | |
# if len(img.shape) == 3: # Color image | |
# padded = np.pad( | |
# img, | |
# ( | |
# (pad_height, pad_height + pad_height_extra), | |
# (pad_width, pad_width + pad_width_extra), | |
# (0, 0), | |
# ), | |
# mode="edge", | |
# ) | |
# else: # Grayscale image | |
# padded = np.pad( | |
# img, | |
# ( | |
# (pad_height, pad_height + pad_height_extra), | |
# (pad_width, pad_width + pad_width_extra), | |
# ), | |
# mode="edge", | |
# ) | |
# return padded | |
# def detect_reference_square(img) -> tuple: | |
# """Detect reference square in the image and ignore other coins""" | |
# try: | |
# res = reference_detector_global.predict(img, conf=0.75) | |
# if not res or len(res) == 0 or len(res[0].boxes) == 0: | |
# raise ReferenceBoxNotDetectedError("Unable to detect the reference coin in the image.") | |
# # Get all detected boxes | |
# boxes = res[0].cpu().boxes.xyxy | |
# # Find the largest box (most likely the reference coin) | |
# largest_box = None | |
# max_area = 0 | |
# for box in boxes: | |
# x_min, y_min, x_max, y_max = box | |
# area = (x_max - x_min) * (y_max - y_min) | |
# if area > max_area: | |
# max_area = area | |
# largest_box = box | |
# return ( | |
# save_one_box(largest_box.unsqueeze(0), img, save=False), | |
# largest_box | |
# ) | |
# except Exception as e: | |
# if not isinstance(e, ReferenceBoxNotDetectedError): | |
# logger.error(f"Error in reference square detection: {e}") | |
# raise ReferenceBoxNotDetectedError("Error detecting reference coin. Please try again with a clearer image.") | |
# raise | |
# def exclude_scaling_box( | |
# image: np.ndarray, | |
# bbox: np.ndarray, | |
# orig_size: tuple, | |
# processed_size: tuple, | |
# expansion_factor: float = 1.2, | |
# ) -> np.ndarray: | |
# x_min, y_min, x_max, y_max = map(int, bbox) | |
# scale_x = processed_size[1] / orig_size[1] | |
# scale_y = processed_size[0] / orig_size[0] | |
# x_min = int(x_min * scale_x) | |
# x_max = int(x_max * scale_x) | |
# y_min = int(y_min * scale_y) | |
# y_max = int(y_max * scale_y) | |
# box_width = x_max - x_min | |
# box_height = y_max - y_min | |
# expanded_x_min = max(0, int(x_min - (expansion_factor - 1) * box_width / 2)) | |
# expanded_x_max = min( | |
# image.shape[1], int(x_max + (expansion_factor - 1) * box_width / 2) | |
# ) | |
# expanded_y_min = max(0, int(y_min - (expansion_factor - 1) * box_height / 2)) | |
# expanded_y_max = min( | |
# image.shape[0], int(y_max + (expansion_factor - 1) * box_height / 2) | |
# ) | |
# image[expanded_y_min:expanded_y_max, expanded_x_min:expanded_x_max] = 0 | |
# return image | |
# def resample_contour(contour, edge_radius_px: int = 0): | |
# """Resample contour with radius-aware smoothing and periodic handling.""" | |
# logger.info(f"Starting resample_contour with contour of shape {contour.shape}") | |
# num_points = 1500 | |
# sigma = max(2, int(edge_radius_px) // 4) # Adjust sigma based on radius | |
# if len(contour) < 4: # Need at least 4 points for spline with periodic condition | |
# error_msg = f"Contour must have at least 4 points, but has {len(contour)} points." | |
# logger.error(error_msg) | |
# raise ValueError(error_msg) | |
# try: | |
# contour = contour[:, 0, :] | |
# logger.debug(f"Reshaped contour to shape {contour.shape}") | |
# # Ensure contour is closed by making start and end points the same | |
# if not np.array_equal(contour[0], contour[-1]): | |
# contour = np.vstack([contour, contour[0]]) | |
# # Create periodic spline representation | |
# tck, u = splprep(contour.T, u=None, s=0, per=True) | |
# # Evaluate spline at evenly spaced points | |
# u_new = np.linspace(u.min(), u.max(), num_points) | |
# x_new, y_new = splev(u_new, tck, der=0) | |
# # Apply Gaussian smoothing with wrap-around | |
# if sigma > 0: | |
# x_new = gaussian_filter1d(x_new, sigma=sigma, mode='wrap') | |
# y_new = gaussian_filter1d(y_new, sigma=sigma, mode='wrap') | |
# # Re-close the contour after smoothing | |
# x_new[-1] = x_new[0] | |
# y_new[-1] = y_new[0] | |
# result = np.array([x_new, y_new]).T | |
# logger.info(f"Completed resample_contour with result shape {result.shape}") | |
# return result | |
# except Exception as e: | |
# logger.error(f"Error in resample_contour: {e}") | |
# raise | |
# # def save_dxf_spline(inflated_contours, scaling_factor, height, finger_clearance=False): | |
# # doc = ezdxf.new(units=ezdxf.units.MM) | |
# # doc.header["$INSUNITS"] = ezdxf.units.MM | |
# # msp = doc.modelspace() | |
# # final_polygons_inch = [] | |
# # finger_centers = [] | |
# # original_polygons = [] | |
# # for contour in inflated_contours: | |
# # try: | |
# # # Removed the second parameter since it was causing the error | |
# # resampled_contour = resample_contour(contour) | |
# # points_inch = [(x * scaling_factor, (height - y) * scaling_factor) | |
# # for x, y in resampled_contour] | |
# # if len(points_inch) < 3: | |
# # continue | |
# # tool_polygon = build_tool_polygon(points_inch) | |
# # original_polygons.append(tool_polygon) | |
# # if finger_clearance: | |
# # try: | |
# # tool_polygon, center = place_finger_cut_adjusted( | |
# # tool_polygon, points_inch, finger_centers, final_polygons_inch | |
# # ) | |
# # except FingerCutOverlapError: | |
# # tool_polygon = original_polygons[-1] | |
# # exterior_coords = polygon_to_exterior_coords(tool_polygon) | |
# # if len(exterior_coords) < 3: | |
# # continue | |
# # msp.add_spline(exterior_coords, degree=3, dxfattribs={"layer": "TOOLS"}) | |
# # final_polygons_inch.append(tool_polygon) | |
# # except ValueError as e: | |
# # logger.warning(f"Skipping contour: {e}") | |
# # dxf_filepath = os.path.join("./outputs", "out.dxf") | |
# # doc.saveas(dxf_filepath) | |
# # return dxf_filepath, final_polygons_inch, original_polygons | |
# def save_dxf_spline(inflated_contours, scaling_factor, height, finger_clearance=False): | |
# doc = ezdxf.new(units=ezdxf.units.MM) | |
# doc.header["$INSUNITS"] = ezdxf.units.MM | |
# msp = doc.modelspace() | |
# final_polygons_inch = [] | |
# finger_centers = [] | |
# original_polygons = [] | |
# # Scale correction factor based on your analysis | |
# scale_correction = 1.079 | |
# for contour in inflated_contours: | |
# try: | |
# resampled_contour = resample_contour(contour) | |
# points_inch = [(x * scaling_factor, (height - y) * scaling_factor) | |
# for x, y in resampled_contour] | |
# if len(points_inch) < 3: | |
# continue | |
# tool_polygon = build_tool_polygon(points_inch) | |
# original_polygons.append(tool_polygon) | |
# if finger_clearance: | |
# try: | |
# tool_polygon, center = place_finger_cut_adjusted( | |
# tool_polygon, points_inch, finger_centers, final_polygons_inch | |
# ) | |
# except FingerCutOverlapError: | |
# tool_polygon = original_polygons[-1] | |
# exterior_coords = polygon_to_exterior_coords(tool_polygon) | |
# if len(exterior_coords) < 3: | |
# continue | |
# # Apply scale correction AFTER finger cuts and polygon adjustments | |
# corrected_coords = [(x * scale_correction, y * scale_correction) for x, y in exterior_coords] | |
# msp.add_spline(corrected_coords, degree=3, dxfattribs={"layer": "TOOLS"}) | |
# final_polygons_inch.append(tool_polygon) | |
# except ValueError as e: | |
# logger.warning(f"Skipping contour: {e}") | |
# dxf_filepath = os.path.join("./outputs", "out.dxf") | |
# doc.saveas(dxf_filepath) | |
# return dxf_filepath, final_polygons_inch, original_polygons | |
# def build_tool_polygon(points_inch): | |
# return Polygon(points_inch) | |
# def polygon_to_exterior_coords(poly): | |
# logger.info(f"Starting polygon_to_exterior_coords with input geometry type: {poly.geom_type}") | |
# try: | |
# # 1) If it's a GeometryCollection or MultiPolygon, fuse everything into one shape | |
# if poly.geom_type == "GeometryCollection" or poly.geom_type == "MultiPolygon": | |
# logger.debug(f"Performing unary_union on {poly.geom_type}") | |
# unified = unary_union(poly) | |
# if unified.is_empty: | |
# logger.warning("unary_union produced an empty geometry; returning empty list") | |
# return [] | |
# # If union still yields multiple disjoint pieces, pick the largest Polygon | |
# if unified.geom_type == "GeometryCollection" or unified.geom_type == "MultiPolygon": | |
# largest = None | |
# max_area = 0.0 | |
# for g in getattr(unified, "geoms", []): | |
# if hasattr(g, "area") and g.area > max_area and hasattr(g, "exterior"): | |
# max_area = g.area | |
# largest = g | |
# if largest is None: | |
# logger.warning("No valid Polygon found in unified geometry; returning empty list") | |
# return [] | |
# poly = largest | |
# else: | |
# # Now unified should be a single Polygon or LinearRing | |
# poly = unified | |
# # 2) At this point, we must have a single Polygon (or something with an exterior) | |
# if not hasattr(poly, "exterior") or poly.exterior is None: | |
# logger.warning("Input geometry has no exterior ring; returning empty list") | |
# return [] | |
# raw_coords = list(poly.exterior.coords) | |
# total = len(raw_coords) | |
# logger.info(f"Extracted {total} raw exterior coordinates") | |
# if total == 0: | |
# return [] | |
# # 3) Subsample coordinates to at most 100 points (evenly spaced) | |
# max_pts = 100 | |
# if total > max_pts: | |
# step = total // max_pts | |
# sampled = [raw_coords[i] for i in range(0, total, step)] | |
# # Ensure we include the last point to close the loop | |
# if sampled[-1] != raw_coords[-1]: | |
# sampled.append(raw_coords[-1]) | |
# logger.info(f"Downsampled perimeter from {total} to {len(sampled)} points") | |
# return sampled | |
# else: | |
# return raw_coords | |
# except Exception as e: | |
# logger.error(f"Error in polygon_to_exterior_coords: {e}") | |
# return [] | |
# def place_finger_cut_adjusted( | |
# tool_polygon: Polygon, | |
# points_inch: list, | |
# existing_centers: list, | |
# all_polygons: list, | |
# circle_diameter: float = 25.4, | |
# min_gap: float = 0.5, | |
# max_attempts: int = 100 | |
# ) -> (Polygon, tuple): | |
# logger.info(f"Starting place_finger_cut_adjusted with {len(points_inch)} input points") | |
# from shapely.geometry import Point | |
# import numpy as np | |
# import time | |
# import random | |
# # Fallback: if we run out of time or attempts, place in the "middle" of the outline | |
# def fallback_solution(): | |
# logger.warning("Using fallback approach for finger cut placement") | |
# # Pick the midpoint of the original outline as a last-resort center | |
# fallback_center = points_inch[len(points_inch) // 2] | |
# r = circle_diameter / 2.0 | |
# fallback_circle = Point(fallback_center).buffer(r, resolution=32) | |
# try: | |
# union_poly = tool_polygon.union(fallback_circle) | |
# except Exception as e: | |
# logger.warning(f"Fallback union failed ({e}); trying buffer-union fallback") | |
# union_poly = tool_polygon.buffer(0).union(fallback_circle.buffer(0)) | |
# existing_centers.append(fallback_center) | |
# logger.info(f"Fallback finger cut placed at {fallback_center}") | |
# return union_poly, fallback_center | |
# # Precompute values | |
# r = circle_diameter / 2.0 | |
# needed_center_dist = circle_diameter + min_gap | |
# # 1) Get perimeter coordinates of this polygon | |
# raw_perimeter = polygon_to_exterior_coords(tool_polygon) | |
# if not raw_perimeter: | |
# logger.warning("No valid exterior coords found; using fallback immediately") | |
# return fallback_solution() | |
# # 2) Possibly subsample to at most 100 perimeter points | |
# if len(raw_perimeter) > 100: | |
# step = len(raw_perimeter) // 100 | |
# perimeter_coords = raw_perimeter[::step] | |
# logger.info(f"Subsampled perimeter from {len(raw_perimeter)} to {len(perimeter_coords)} points") | |
# else: | |
# perimeter_coords = raw_perimeter[:] | |
# # 3) Randomize the order to avoid bias | |
# indices = list(range(len(perimeter_coords))) | |
# random.shuffle(indices) | |
# logger.debug(f"Shuffled perimeter indices for candidate order") | |
# # 4) Non-blocking timeout setup | |
# start_time = time.time() | |
# timeout_secs = 5.0 # leave ~0.1s margin | |
# attempts = 0 | |
# try: | |
# while attempts < max_attempts: | |
# # 5) Abort if we're running out of time | |
# if time.time() - start_time > timeout_secs - 0.1: | |
# logger.warning(f"Approaching timeout after {attempts} attempts") | |
# return fallback_solution() | |
# # 6) For each shuffled perimeter point, try small offsets | |
# for idx in indices: | |
# # Check timeout inside the loop as well | |
# if time.time() - start_time > timeout_secs - 0.05: | |
# logger.warning("Timeout during candidate-point loop") | |
# return fallback_solution() | |
# cx, cy = perimeter_coords[idx] | |
# # Try five small offsets: (0,0), (±min_gap/2, 0), (0, ±min_gap/2) | |
# for dx, dy in [(0, 0), (-min_gap/2, 0), (min_gap/2, 0), (0, -min_gap/2), (0, min_gap/2)]: | |
# candidate_center = (cx + dx, cy + dy) | |
# # 6a) Check distance to existing finger centers | |
# too_close_finger = any( | |
# np.hypot(candidate_center[0] - ex, candidate_center[1] - ey) | |
# < needed_center_dist | |
# for (ex, ey) in existing_centers | |
# ) | |
# if too_close_finger: | |
# continue | |
# # 6b) Build candidate circle with reduced resolution for speed | |
# candidate_circle = Point(candidate_center).buffer(r, resolution=32) | |
# # 6c) Must overlap ≥30% with this polygon | |
# try: | |
# inter_area = tool_polygon.intersection(candidate_circle).area | |
# except Exception: | |
# continue | |
# if inter_area < 0.3 * candidate_circle.area: | |
# continue | |
# # 6d) Must not intersect or even "touch" any other polygon (buffered by min_gap) | |
# invalid = False | |
# for other_poly in all_polygons: | |
# if other_poly.equals(tool_polygon): | |
# # Don't compare against itself | |
# continue | |
# # Buffer the other polygon by min_gap to enforce a strict clearance | |
# if other_poly.buffer(min_gap).intersects(candidate_circle) or \ | |
# other_poly.buffer(min_gap).touches(candidate_circle): | |
# invalid = True | |
# break | |
# if invalid: | |
# continue | |
# # 6e) Candidate passes all tests → union and return | |
# try: | |
# union_poly = tool_polygon.union(candidate_circle) | |
# # If union is a MultiPolygon (more than one piece), reject | |
# if union_poly.geom_type == "MultiPolygon" and len(union_poly.geoms) > 1: | |
# continue | |
# # If union didn't change anything (no real cut), reject | |
# if union_poly.equals(tool_polygon): | |
# continue | |
# except Exception: | |
# continue | |
# existing_centers.append(candidate_center) | |
# logger.info(f"Finger cut placed successfully at {candidate_center} after {attempts} attempts") | |
# return union_poly, candidate_center | |
# attempts += 1 | |
# # If we've done half the attempts and we're near timeout, bail out | |
# if attempts >= (max_attempts // 2) and (time.time() - start_time) > timeout_secs * 0.8: | |
# logger.warning(f"Approaching timeout (attempt {attempts})") | |
# return fallback_solution() | |
# logger.debug(f"Completed iteration {attempts}/{max_attempts}") | |
# # If we exit loop without finding a valid spot | |
# logger.warning(f"No valid spot after {max_attempts} attempts, using fallback") | |
# return fallback_solution() | |
# except Exception as e: | |
# logger.error(f"Error in place_finger_cut_adjusted: {e}") | |
# return fallback_solution() | |
# def extract_outlines(binary_image: np.ndarray) -> tuple: | |
# contours, _ = cv2.findContours( | |
# binary_image, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE | |
# ) | |
# outline_image = np.full_like(binary_image, 255) # White background | |
# return outline_image, contours | |
# def round_edges(mask: np.ndarray, radius_mm: float, scaling_factor: float) -> np.ndarray: | |
# """Rounds mask edges using contour smoothing.""" | |
# if radius_mm <= 0 or scaling_factor <= 0: | |
# return mask | |
# radius_px = max(1, int(radius_mm / scaling_factor)) # Ensure min 1px | |
# # Handle small objects | |
# if np.count_nonzero(mask) < 500: # Small object threshold | |
# return cv2.dilate(cv2.erode(mask, np.ones((3,3))), np.ones((3,3))) | |
# # Existing contour processing with improvements: | |
# contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE) | |
# # NEW: Filter small contours | |
# contours = [c for c in contours if cv2.contourArea(c) > 100] | |
# smoothed_contours = [] | |
# for contour in contours: | |
# try: | |
# # Resample with radius-based smoothing | |
# resampled = resample_contour(contour, radius_px) | |
# resampled = resampled.astype(np.int32).reshape((-1, 1, 2)) | |
# smoothed_contours.append(resampled) | |
# except Exception as e: | |
# logger.warning(f"Error smoothing contour: {e}") | |
# smoothed_contours.append(contour) # Fallback to original contour | |
# # Draw smoothed contours | |
# rounded = np.zeros_like(mask) | |
# cv2.drawContours(rounded, smoothed_contours, -1, 255, thickness=cv2.FILLED) | |
# return rounded | |
# def predict_og(image, offset, offset_unit, edge_radius, finger_clearance=False): | |
# print(f"DEBUG: Image shape: {image.shape}, dtype: {image.dtype}, range: {image.min()}-{image.max()}") | |
# coin_size_mm = 20.0 | |
# if offset_unit == "inches": | |
# offset *= 25.4 | |
# if edge_radius is None or edge_radius == 0: | |
# edge_radius = 0.0001 | |
# if offset < 0: | |
# raise gr.Error("Offset Value Can't be negative") | |
# try: | |
# reference_obj_img, scaling_box_coords = detect_reference_square(image) | |
# except ReferenceBoxNotDetectedError as e: | |
# return ( | |
# None, | |
# None, | |
# None, | |
# None, | |
# f"Error: {str(e)}" | |
# ) | |
# except Exception as e: | |
# raise gr.Error(f"Error processing image: {str(e)}") | |
# reference_obj_img = make_square(reference_obj_img) | |
# # Use U2NETP for reference object background removal | |
# reference_square_mask = remove_bg_u2netp(reference_obj_img) | |
# reference_square_mask = resize_img(reference_square_mask, reference_obj_img.shape[:2][::-1]) | |
# try: | |
# scaling_factor = calculate_scaling_factor( | |
# target_image=reference_square_mask, | |
# reference_obj_size_mm=coin_size_mm, | |
# feature_detector="ORB", | |
# ) | |
# except Exception as e: | |
# scaling_factor = None | |
# logger.warning(f"Error calculating scaling factor: {e}") | |
# if not scaling_factor: | |
# ref_size_px = (reference_square_mask.shape[0] + reference_square_mask.shape[1]) / 2 | |
# scaling_factor = 20.0 / ref_size_px | |
# logger.info(f"Fallback scaling: {scaling_factor:.4f} mm/px using 20mm reference") | |
# # Use BiRefNet for main object background removal | |
# orig_size = image.shape[:2] | |
# objects_mask = remove_bg(image) | |
# processed_size = objects_mask.shape[:2] | |
# # REMOVE ALL COINS from mask: | |
# res = reference_detector_global.predict(image, conf=0.05) | |
# boxes = res[0].cpu().boxes.xyxy if res and len(res) > 0 else [] | |
# for box in boxes: | |
# objects_mask = exclude_scaling_box( | |
# objects_mask, | |
# box, | |
# orig_size, | |
# processed_size, | |
# expansion_factor=1.2, | |
# ) | |
# objects_mask = resize_img(objects_mask, (image.shape[1], image.shape[0])) | |
# # offset_pixels = (float(offset) / scaling_factor) * 2 + 1 if scaling_factor else 1 | |
# # dilated_mask = cv2.dilate(objects_mask, np.ones((int(offset_pixels), int(offset_pixels)), np.uint8)) | |
# # Image.fromarray(dilated_mask).save("./outputs/scaled_mask_original.jpg") | |
# # dilated_mask_orig = dilated_mask.copy() | |
# # #if edge_radius > 0: | |
# # # Use morphological rounding instead of contour-based | |
# # rounded_mask = round_edges(objects_mask, edge_radius, scaling_factor) | |
# # #else: | |
# # #rounded_mask = objects_mask.copy() | |
# # # Apply dilation AFTER rounding | |
# # offset_pixels = (float(offset) / scaling_factor) * 2 + 1 if scaling_factor else 1 | |
# # kernel = np.ones((int(offset_pixels), int(offset_pixels)), np.uint8) | |
# # dilated_mask = cv2.dilate(rounded_mask, kernel) | |
# # Apply edge rounding first | |
# rounded_mask = round_edges(objects_mask, edge_radius, scaling_factor) | |
# # Apply dilation AFTER rounding | |
# offset_pixels = (float(offset) / scaling_factor) * 2 + 1 if scaling_factor else 1 | |
# kernel = np.ones((int(offset_pixels), int(offset_pixels)), np.uint8) | |
# final_dilated_mask = cv2.dilate(rounded_mask, kernel) | |
# # Save for debugging | |
# Image.fromarray(final_dilated_mask).save("./outputs/scaled_mask_original.jpg") | |
# outlines, contours = extract_outlines(final_dilated_mask) | |
# try: | |
# dxf, finger_polygons, original_polygons = save_dxf_spline( | |
# contours, | |
# scaling_factor, | |
# processed_size[0], | |
# finger_clearance=(finger_clearance == "On") | |
# ) | |
# except FingerCutOverlapError as e: | |
# raise gr.Error(str(e)) | |
# shrunked_img_contours = image.copy() | |
# if finger_clearance == "On": | |
# outlines = np.full_like(final_dilated_mask, 255) | |
# for poly in finger_polygons: | |
# try: | |
# coords = np.array([ | |
# (int(x / scaling_factor), int(processed_size[0] - y / scaling_factor)) | |
# for x, y in poly.exterior.coords | |
# ], np.int32).reshape((-1, 1, 2)) | |
# cv2.drawContours(shrunked_img_contours, [coords], -1, 0, thickness=2) | |
# cv2.drawContours(outlines, [coords], -1, 0, thickness=2) | |
# except Exception as e: | |
# logger.warning(f"Failed to draw finger cut: {e}") | |
# continue | |
# else: | |
# outlines = np.full_like(final_dilated_mask, 255) | |
# cv2.drawContours(shrunked_img_contours, contours, -1, 0, thickness=2) | |
# cv2.drawContours(outlines, contours, -1, 0, thickness=2) | |
# return ( | |
# shrunked_img_contours, | |
# outlines, | |
# dxf, | |
# final_dilated_mask, | |
# f"{scaling_factor:.4f}") | |
# def predict_simple(image): | |
# """ | |
# Only image in → returns (annotated, outlines, dxf, mask). | |
# Uses offset=0 mm, no fillet, no finger-cut. | |
# """ | |
# ann, outlines, dxf_path, mask, _ = predict_og( | |
# image, | |
# offset=0, | |
# offset_unit="mm", | |
# edge_radius=0, | |
# finger_clearance="Off", | |
# ) | |
# return ann, outlines, dxf_path, mask | |
# def predict_middle(image, enable_fillet, fillet_value_mm): | |
# """ | |
# image + (On/Off) fillet toggle + fillet radius → returns (annotated, outlines, dxf, mask). | |
# Uses offset=0 mm, finger-cut off. | |
# """ | |
# radius = fillet_value_mm if enable_fillet == "On" else 0 | |
# ann, outlines, dxf_path, mask, _ = predict_og( | |
# image, | |
# offset=0, | |
# offset_unit="mm", | |
# edge_radius=radius, | |
# finger_clearance="Off", | |
# ) | |
# return ann, outlines, dxf_path, mask | |
# def predict_full(image, enable_fillet, fillet_value_mm, enable_finger_cut): | |
# """ | |
# image + fillet toggle/value + finger-cut toggle → returns (annotated, outlines, dxf, mask). | |
# Uses offset=0 mm. | |
# """ | |
# radius = fillet_value_mm if enable_fillet == "On" else 0 | |
# finger_flag = "On" if enable_finger_cut == "On" else "Off" | |
# ann, outlines, dxf_path, mask, _ = predict_og( | |
# image, | |
# offset=0, | |
# offset_unit="mm", | |
# edge_radius=radius, | |
# finger_clearance=finger_flag, | |
# ) | |
# return ann, outlines, dxf_path, mask | |
# def update_interface(language): | |
# return [ | |
# gr.Image(label=TRANSLATIONS[language]["input_image"], type="numpy"), | |
# gr.Row([ | |
# gr.Number(label=TRANSLATIONS[language]["offset_value"], value=0), | |
# gr.Dropdown(["mm", "inches"], value="mm", | |
# label=TRANSLATIONS[language]["offset_unit"]) | |
# ]), | |
# gr.Slider(minimum=0,maximum=20,step=1,value=5,label=TRANSLATIONS[language]["edge_radius"],visible=False,interactive=True), | |
# gr.Radio(choices=["On", "Off"],value="Off",label=TRANSLATIONS[language]["enable_radius"],), | |
# gr.Image(label=TRANSLATIONS[language]["output_image"]), | |
# gr.Image(label=TRANSLATIONS[language]["outlines"]), | |
# gr.File(label=TRANSLATIONS[language]["dxf_file"]), | |
# gr.Image(label=TRANSLATIONS[language]["mask"]), | |
# gr.Textbox(label=TRANSLATIONS[language]["scaling_factor"],placeholder=TRANSLATIONS[language]["scaling_placeholder"],), | |
# ] | |
# if __name__ == "__main__": | |
# os.makedirs("./outputs", exist_ok=True) | |
# with gr.Blocks() as demo: | |
# language = gr.Dropdown( | |
# choices=["english", "dutch"], | |
# value="english", | |
# label="Select Language", | |
# interactive=True | |
# ) | |
# input_image = gr.Image(label=TRANSLATIONS["english"]["input_image"], type="numpy") | |
# with gr.Row(): | |
# offset = gr.Number(label=TRANSLATIONS["english"]["offset_value"], value=0) | |
# offset_unit = gr.Dropdown([ | |
# "mm", "inches" | |
# ], value="mm", label=TRANSLATIONS["english"]["offset_unit"]) | |
# finger_toggle = gr.Radio( | |
# choices=["On", "Off"], | |
# value="Off", | |
# label=TRANSLATIONS["english"]["enable_finger"] | |
# ) | |
# edge_radius = gr.Slider( | |
# minimum=0, | |
# maximum=20, | |
# step=1, | |
# value=5, | |
# label=TRANSLATIONS["english"]["edge_radius"], | |
# visible=False, | |
# interactive=True | |
# ) | |
# radius_toggle = gr.Radio( | |
# choices=["On", "Off"], | |
# value="Off", | |
# label=TRANSLATIONS["english"]["enable_radius"], | |
# interactive=True | |
# ) | |
# def toggle_radius(choice): | |
# if choice == "On": | |
# return gr.Slider(visible=True) | |
# return gr.Slider(visible=False, value=0) | |
# radius_toggle.change( | |
# fn=toggle_radius, | |
# inputs=radius_toggle, | |
# outputs=edge_radius | |
# ) | |
# output_image = gr.Image(label=TRANSLATIONS["english"]["output_image"]) | |
# outlines = gr.Image(label=TRANSLATIONS["english"]["outlines"]) | |
# dxf_file = gr.File(label=TRANSLATIONS["english"]["dxf_file"]) | |
# mask = gr.Image(label=TRANSLATIONS["english"]["mask"]) | |
# scaling = gr.Textbox( | |
# label=TRANSLATIONS["english"]["scaling_factor"], | |
# placeholder=TRANSLATIONS["english"]["scaling_placeholder"] | |
# ) | |
# submit_btn = gr.Button("Submit") | |
# language.change( | |
# fn=lambda x: [ | |
# gr.update(label=TRANSLATIONS[x]["input_image"]), | |
# gr.update(label=TRANSLATIONS[x]["offset_value"]), | |
# gr.update(label=TRANSLATIONS[x]["offset_unit"]), | |
# gr.update(label=TRANSLATIONS[x]["output_image"]), | |
# gr.update(label=TRANSLATIONS[x]["outlines"]), | |
# gr.update(label=TRANSLATIONS[x]["enable_finger"]), | |
# gr.update(label=TRANSLATIONS[x]["dxf_file"]), | |
# gr.update(label=TRANSLATIONS[x]["mask"]), | |
# gr.update(label=TRANSLATIONS[x]["enable_radius"]), | |
# gr.update(label=TRANSLATIONS[x]["edge_radius"]), | |
# gr.update( | |
# label=TRANSLATIONS[x]["scaling_factor"], | |
# placeholder=TRANSLATIONS[x]["scaling_placeholder"] | |
# ), | |
# ], | |
# inputs=[language], | |
# outputs=[ | |
# input_image, offset, offset_unit, | |
# output_image, outlines, finger_toggle, dxf_file, | |
# mask, radius_toggle, edge_radius, scaling | |
# ] | |
# ) | |
# def custom_predict_and_format(*args): | |
# output_image, outlines, dxf_path, mask, scaling = predict_og(*args) | |
# if output_image is None: | |
# return ( | |
# None, None, None, None, "Reference coin not detected!" | |
# ) | |
# return ( | |
# output_image, outlines, dxf_path, mask, scaling | |
# ) | |
# submit_btn.click( | |
# fn=custom_predict_and_format, | |
# inputs=[input_image, offset, offset_unit, edge_radius, finger_toggle], | |
# outputs=[output_image, outlines, dxf_file, mask, scaling] | |
# ) | |
# gr.Examples( | |
# examples=[ | |
# ["./examples/Test20.jpg", 0, "mm"], | |
# ["./examples/Test21.jpg", 0, "mm"], | |
# ["./examples/Test22.jpg", 0, "mm"], | |
# ["./examples/Test23.jpg", 0, "mm"], | |
# ], | |
# inputs=[input_image, offset, offset_unit] | |
# ) | |
# demo.launch(share=True) | |
import os | |
from pathlib import Path | |
from typing import List, Union | |
from PIL import Image | |
import ezdxf.units | |
import numpy as np | |
import torch | |
from torchvision import transforms | |
from ultralytics import YOLOWorld, YOLO | |
from ultralytics.engine.results import Results | |
from ultralytics.utils.plotting import save_one_box | |
from transformers import AutoModelForImageSegmentation | |
import cv2 | |
import ezdxf | |
import gradio as gr | |
import gc | |
from scalingtestupdated import calculate_scaling_factor | |
from scipy.interpolate import splprep, splev | |
from scipy.ndimage import gaussian_filter1d | |
import json | |
import time | |
import signal | |
from shapely.ops import unary_union | |
from shapely.geometry import MultiPolygon, GeometryCollection, Polygon, Point | |
from u2netp import U2NETP # Add U2NETP import | |
import logging | |
import shutil | |
# Initialize logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
# Create cache directory for models | |
CACHE_DIR = os.path.join(os.path.dirname(__file__), ".cache") | |
os.makedirs(CACHE_DIR, exist_ok=True) | |
# Custom Exception Classes | |
class TimeoutReachedError(Exception): | |
pass | |
class BoundaryOverlapError(Exception): | |
pass | |
class TextOverlapError(Exception): | |
pass | |
class ReferenceBoxNotDetectedError(Exception): | |
"""Raised when the Reference coin cannot be detected in the image""" | |
pass | |
class FingerCutOverlapError(Exception): | |
"""Raised when finger cuts overlap with existing geometry""" | |
def __init__(self, message="There was an overlap with fingercuts... Please try again to generate dxf."): | |
super().__init__(message) | |
# ===== LAZY LOADING - REPLACE THE GLOBAL MODEL INITIALIZATION ===== | |
# Instead of loading models at startup, declare them as None | |
print("Initializing lazy model loading...") | |
reference_detector_global = None | |
u2net_global = None | |
birefnet = None | |
# Model paths - use absolute paths for Docker | |
reference_model_path = os.path.join(CACHE_DIR, "best1.pt") | |
u2net_model_path = os.path.join(CACHE_DIR, "u2netp.pth") | |
# Copy model files to cache if they don't exist - with error handling | |
def ensure_model_files(): | |
if not os.path.exists(reference_model_path): | |
if os.path.exists("best1.pt"): | |
shutil.copy("best1.pt", reference_model_path) | |
else: | |
raise FileNotFoundError("best1.pt model file not found") | |
if not os.path.exists(u2net_model_path): | |
if os.path.exists("u2netp.pth"): | |
shutil.copy("u2netp.pth", u2net_model_path) | |
else: | |
raise FileNotFoundError("u2netp.pth model file not found") | |
# Call this at startup | |
ensure_model_files() | |
# device = "cpu" | |
# torch.set_float32_matmul_precision(["high", "highest"][0]) | |
# ===== LAZY LOADING FUNCTIONS - ADD THESE ===== | |
def get_reference_detector(): | |
"""Lazy load reference detector model""" | |
global reference_detector_global | |
if reference_detector_global is None: | |
logger.info("Loading reference detector model...") | |
reference_detector_global = YOLO(reference_model_path) | |
logger.info("Reference detector loaded successfully") | |
return reference_detector_global | |
def get_u2net(): | |
"""Lazy load U2NETP model""" | |
global u2net_global | |
if u2net_global is None: | |
logger.info("Loading U2NETP model...") | |
u2net_global = U2NETP(3, 1) | |
u2net_global.load_state_dict(torch.load(u2net_model_path, map_location="cpu")) | |
u2net_global.to(device) | |
u2net_global.eval() | |
logger.info("U2NETP model loaded successfully") | |
return u2net_global | |
def load_birefnet_model(): | |
"""Load BiRefNet model from HuggingFace""" | |
from transformers import AutoModelForImageSegmentation | |
return AutoModelForImageSegmentation.from_pretrained( | |
'ZhengPeng7/BiRefNet', | |
trust_remote_code=True | |
) | |
def get_birefnet(): | |
"""Lazy load BiRefNet model""" | |
global birefnet | |
if birefnet is None: | |
logger.info("Loading BiRefNet model...") | |
birefnet = load_birefnet_model() | |
birefnet.to(device) | |
birefnet.eval() | |
logger.info("BiRefNet model loaded successfully") | |
return birefnet | |
device = "cpu" | |
torch.set_float32_matmul_precision(["high", "highest"][0]) | |
# Move models to device | |
# u2net_global.to(device) | |
# u2net_global.eval() | |
# birefnet.to(device) | |
# birefnet.eval() | |
# Define transforms | |
transform_image = transforms.Compose([ | |
transforms.Resize((1024, 1024)), | |
transforms.ToTensor(), | |
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]), | |
]) | |
def remove_bg_u2netp(image: np.ndarray) -> np.ndarray: | |
"""Remove background using U2NETP model specifically for reference objects""" | |
try: | |
u2net_model = get_u2net() # <-- ADD THIS LINE | |
image_pil = Image.fromarray(image) | |
transform_u2netp = transforms.Compose([ | |
transforms.Resize((320, 320)), | |
transforms.ToTensor(), | |
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]), | |
]) | |
input_tensor = transform_u2netp(image_pil).unsqueeze(0).to(device) | |
with torch.no_grad(): | |
outputs = u2net_model(input_tensor) # <-- CHANGE FROM u2net_global | |
pred = outputs[0] | |
pred = (pred - pred.min()) / (pred.max() - pred.min() + 1e-8) | |
pred_np = pred.squeeze().cpu().numpy() | |
pred_np = cv2.resize(pred_np, (image_pil.width, image_pil.height)) | |
pred_np = (pred_np * 255).astype(np.uint8) | |
return pred_np | |
except Exception as e: | |
logger.error(f"Error in U2NETP background removal: {e}") | |
raise | |
def remove_bg(image: np.ndarray) -> np.ndarray: | |
"""Remove background using BiRefNet model for main objects""" | |
try: | |
birefnet_model = get_birefnet() # <-- ADD THIS LINE | |
image = Image.fromarray(image) | |
input_images = transform_image(image).unsqueeze(0).to(device) | |
with torch.no_grad(): | |
preds = birefnet_model(input_images)[-1].sigmoid().cpu() # <-- CHANGE FROM birefnet | |
pred = preds[0].squeeze() | |
pred_pil: Image = transforms.ToPILImage()(pred) | |
scale_ratio = 1024 / max(image.size) | |
scaled_size = (int(image.size[0] * scale_ratio), int(image.size[1] * scale_ratio)) | |
return np.array(pred_pil.resize(scaled_size)) | |
except Exception as e: | |
logger.error(f"Error in BiRefNet background removal: {e}") | |
raise | |
def resize_img(img: np.ndarray, resize_dim): | |
return np.array(Image.fromarray(img).resize(resize_dim)) | |
def make_square(img: np.ndarray): | |
"""Make the image square by padding""" | |
height, width = img.shape[:2] | |
max_dim = max(height, width) | |
pad_height = (max_dim - height) // 2 | |
pad_width = (max_dim - width) // 2 | |
pad_height_extra = max_dim - height - 2 * pad_height | |
pad_width_extra = max_dim - width - 2 * pad_width | |
if len(img.shape) == 3: # Color image | |
padded = np.pad( | |
img, | |
( | |
(pad_height, pad_height + pad_height_extra), | |
(pad_width, pad_width + pad_width_extra), | |
(0, 0), | |
), | |
mode="edge", | |
) | |
else: # Grayscale image | |
padded = np.pad( | |
img, | |
( | |
(pad_height, pad_height + pad_height_extra), | |
(pad_width, pad_width + pad_width_extra), | |
), | |
mode="edge", | |
) | |
return padded | |
def detect_reference_square(img) -> tuple: | |
"""Detect reference square in the image and ignore other coins""" | |
try: | |
reference_detector = get_reference_detector() # <-- ADD THIS LINE | |
res = reference_detector.predict(img, conf=0.70) # <-- CHANGE FROM reference_detector_global | |
if not res or len(res) == 0 or len(res[0].boxes) == 0: | |
raise ReferenceBoxNotDetectedError("Unable to detect the reference coin in the image.") | |
# Get all detected boxes | |
boxes = res[0].cpu().boxes.xyxy | |
# Find the largest box (most likely the reference coin) | |
largest_box = None | |
max_area = 0 | |
for box in boxes: | |
x_min, y_min, x_max, y_max = box | |
area = (x_max - x_min) * (y_max - y_min) | |
if area > max_area: | |
max_area = area | |
largest_box = box | |
return ( | |
save_one_box(largest_box.unsqueeze(0), img, save=False), | |
largest_box | |
) | |
except Exception as e: | |
if not isinstance(e, ReferenceBoxNotDetectedError): | |
logger.error(f"Error in reference square detection: {e}") | |
raise ReferenceBoxNotDetectedError("Error detecting reference coin. Please try again with a clearer image.") | |
raise | |
def exclude_scaling_box( | |
image: np.ndarray, | |
bbox: np.ndarray, | |
orig_size: tuple, | |
processed_size: tuple, | |
expansion_factor: float = 1.2, | |
) -> np.ndarray: | |
x_min, y_min, x_max, y_max = map(int, bbox) | |
scale_x = processed_size[1] / orig_size[1] | |
scale_y = processed_size[0] / orig_size[0] | |
x_min = int(x_min * scale_x) | |
x_max = int(x_max * scale_x) | |
y_min = int(y_min * scale_y) | |
y_max = int(y_max * scale_y) | |
box_width = x_max - x_min | |
box_height = y_max - y_min | |
expanded_x_min = max(0, int(x_min - (expansion_factor - 1) * box_width / 2)) | |
expanded_x_max = min( | |
image.shape[1], int(x_max + (expansion_factor - 1) * box_width / 2) | |
) | |
expanded_y_min = max(0, int(y_min - (expansion_factor - 1) * box_height / 2)) | |
expanded_y_max = min( | |
image.shape[0], int(y_max + (expansion_factor - 1) * box_height / 2) | |
) | |
image[expanded_y_min:expanded_y_max, expanded_x_min:expanded_x_max] = 0 | |
return image | |
def resample_contour(contour, edge_radius_px: int = 0): | |
"""Resample contour with radius-aware smoothing and periodic handling.""" | |
logger.info(f"Starting resample_contour with contour of shape {contour.shape}") | |
num_points = 1500 | |
sigma = max(2, int(edge_radius_px) // 4) # Adjust sigma based on radius | |
if len(contour) < 4: # Need at least 4 points for spline with periodic condition | |
error_msg = f"Contour must have at least 4 points, but has {len(contour)} points." | |
logger.error(error_msg) | |
raise ValueError(error_msg) | |
try: | |
contour = contour[:, 0, :] | |
logger.debug(f"Reshaped contour to shape {contour.shape}") | |
# Ensure contour is closed by making start and end points the same | |
if not np.array_equal(contour[0], contour[-1]): | |
contour = np.vstack([contour, contour[0]]) | |
# Create periodic spline representation | |
tck, u = splprep(contour.T, u=None, s=0, per=True) | |
# Evaluate spline at evenly spaced points | |
u_new = np.linspace(u.min(), u.max(), num_points) | |
x_new, y_new = splev(u_new, tck, der=0) | |
# Apply Gaussian smoothing with wrap-around | |
if sigma > 0: | |
x_new = gaussian_filter1d(x_new, sigma=sigma, mode='wrap') | |
y_new = gaussian_filter1d(y_new, sigma=sigma, mode='wrap') | |
# Re-close the contour after smoothing | |
x_new[-1] = x_new[0] | |
y_new[-1] = y_new[0] | |
result = np.array([x_new, y_new]).T | |
logger.info(f"Completed resample_contour with result shape {result.shape}") | |
return result | |
except Exception as e: | |
logger.error(f"Error in resample_contour: {e}") | |
raise | |
# def save_dxf_spline(inflated_contours, scaling_factor, height, finger_clearance=False): | |
# doc = ezdxf.new(units=ezdxf.units.MM) | |
# doc.header["$INSUNITS"] = ezdxf.units.MM | |
# msp = doc.modelspace() | |
# final_polygons_inch = [] | |
# finger_centers = [] | |
# original_polygons = [] | |
# for contour in inflated_contours: | |
# try: | |
# # Removed the second parameter since it was causing the error | |
# resampled_contour = resample_contour(contour) | |
# points_inch = [(x * scaling_factor, (height - y) * scaling_factor) | |
# for x, y in resampled_contour] | |
# if len(points_inch) < 3: | |
# continue | |
# tool_polygon = build_tool_polygon(points_inch) | |
# original_polygons.append(tool_polygon) | |
# if finger_clearance: | |
# try: | |
# tool_polygon, center = place_finger_cut_adjusted( | |
# tool_polygon, points_inch, finger_centers, final_polygons_inch | |
# ) | |
# except FingerCutOverlapError: | |
# tool_polygon = original_polygons[-1] | |
# exterior_coords = polygon_to_exterior_coords(tool_polygon) | |
# if len(exterior_coords) < 3: | |
# continue | |
# msp.add_spline(exterior_coords, degree=3, dxfattribs={"layer": "TOOLS"}) | |
# final_polygons_inch.append(tool_polygon) | |
# except ValueError as e: | |
# logger.warning(f"Skipping contour: {e}") | |
# dxf_filepath = os.path.join("./outputs", "out.dxf") | |
# doc.saveas(dxf_filepath) | |
# return dxf_filepath, final_polygons_inch, original_polygons | |
def save_dxf_spline(inflated_contours, scaling_factor, height, finger_clearance=False): | |
doc = ezdxf.new(units=ezdxf.units.MM) | |
doc.header["$INSUNITS"] = ezdxf.units.MM | |
msp = doc.modelspace() | |
final_polygons_inch = [] | |
finger_centers = [] | |
original_polygons = [] | |
# Scale correction factor based on your analysis | |
scale_correction = 1.079 | |
for contour in inflated_contours: | |
try: | |
resampled_contour = resample_contour(contour) | |
points_inch = [(x * scaling_factor, (height - y) * scaling_factor) | |
for x, y in resampled_contour] | |
if len(points_inch) < 3: | |
continue | |
tool_polygon = build_tool_polygon(points_inch) | |
original_polygons.append(tool_polygon) | |
if finger_clearance: | |
try: | |
tool_polygon, center = place_finger_cut_adjusted( | |
tool_polygon, points_inch, finger_centers, final_polygons_inch | |
) | |
except FingerCutOverlapError: | |
tool_polygon = original_polygons[-1] | |
exterior_coords = polygon_to_exterior_coords(tool_polygon) | |
if len(exterior_coords) < 3: | |
continue | |
# Apply scale correction AFTER finger cuts and polygon adjustments | |
corrected_coords = [(x * scale_correction, y * scale_correction) for x, y in exterior_coords] | |
msp.add_spline(corrected_coords, degree=3, dxfattribs={"layer": "TOOLS"}) | |
final_polygons_inch.append(tool_polygon) | |
except ValueError as e: | |
logger.warning(f"Skipping contour: {e}") | |
dxf_filepath = os.path.join("./outputs", "out.dxf") | |
doc.saveas(dxf_filepath) | |
return dxf_filepath, final_polygons_inch, original_polygons | |
def build_tool_polygon(points_inch): | |
return Polygon(points_inch) | |
def polygon_to_exterior_coords(poly): | |
logger.info(f"Starting polygon_to_exterior_coords with input geometry type: {poly.geom_type}") | |
try: | |
# 1) If it's a GeometryCollection or MultiPolygon, fuse everything into one shape | |
if poly.geom_type == "GeometryCollection" or poly.geom_type == "MultiPolygon": | |
logger.debug(f"Performing unary_union on {poly.geom_type}") | |
unified = unary_union(poly) | |
if unified.is_empty: | |
logger.warning("unary_union produced an empty geometry; returning empty list") | |
return [] | |
# If union still yields multiple disjoint pieces, pick the largest Polygon | |
if unified.geom_type == "GeometryCollection" or unified.geom_type == "MultiPolygon": | |
largest = None | |
max_area = 0.0 | |
for g in getattr(unified, "geoms", []): | |
if hasattr(g, "area") and g.area > max_area and hasattr(g, "exterior"): | |
max_area = g.area | |
largest = g | |
if largest is None: | |
logger.warning("No valid Polygon found in unified geometry; returning empty list") | |
return [] | |
poly = largest | |
else: | |
# Now unified should be a single Polygon or LinearRing | |
poly = unified | |
# 2) At this point, we must have a single Polygon (or something with an exterior) | |
if not hasattr(poly, "exterior") or poly.exterior is None: | |
logger.warning("Input geometry has no exterior ring; returning empty list") | |
return [] | |
raw_coords = list(poly.exterior.coords) | |
total = len(raw_coords) | |
logger.info(f"Extracted {total} raw exterior coordinates") | |
if total == 0: | |
return [] | |
# 3) Subsample coordinates to at most 100 points (evenly spaced) | |
max_pts = 100 | |
if total > max_pts: | |
step = total // max_pts | |
sampled = [raw_coords[i] for i in range(0, total, step)] | |
# Ensure we include the last point to close the loop | |
if sampled[-1] != raw_coords[-1]: | |
sampled.append(raw_coords[-1]) | |
logger.info(f"Downsampled perimeter from {total} to {len(sampled)} points") | |
return sampled | |
else: | |
return raw_coords | |
except Exception as e: | |
logger.error(f"Error in polygon_to_exterior_coords: {e}") | |
return [] | |
def place_finger_cut_adjusted( | |
tool_polygon: Polygon, | |
points_inch: list, | |
existing_centers: list, | |
all_polygons: list, | |
circle_diameter: float = 25.4, | |
min_gap: float = 0.5, | |
max_attempts: int = 100 | |
) -> (Polygon, tuple): | |
logger.info(f"Starting place_finger_cut_adjusted with {len(points_inch)} input points") | |
from shapely.geometry import Point | |
import numpy as np | |
import time | |
import random | |
# Fallback: if we run out of time or attempts, place in the "middle" of the outline | |
def fallback_solution(): | |
logger.warning("Using fallback approach for finger cut placement") | |
# Pick the midpoint of the original outline as a last-resort center | |
fallback_center = points_inch[len(points_inch) // 2] | |
r = circle_diameter / 2.0 | |
fallback_circle = Point(fallback_center).buffer(r, resolution=32) | |
try: | |
union_poly = tool_polygon.union(fallback_circle) | |
except Exception as e: | |
logger.warning(f"Fallback union failed ({e}); trying buffer-union fallback") | |
union_poly = tool_polygon.buffer(0).union(fallback_circle.buffer(0)) | |
existing_centers.append(fallback_center) | |
logger.info(f"Fallback finger cut placed at {fallback_center}") | |
return union_poly, fallback_center | |
# Precompute values | |
r = circle_diameter / 2.0 | |
needed_center_dist = circle_diameter + min_gap | |
# 1) Get perimeter coordinates of this polygon | |
raw_perimeter = polygon_to_exterior_coords(tool_polygon) | |
if not raw_perimeter: | |
logger.warning("No valid exterior coords found; using fallback immediately") | |
return fallback_solution() | |
# 2) Possibly subsample to at most 100 perimeter points | |
if len(raw_perimeter) > 100: | |
step = len(raw_perimeter) // 100 | |
perimeter_coords = raw_perimeter[::step] | |
logger.info(f"Subsampled perimeter from {len(raw_perimeter)} to {len(perimeter_coords)} points") | |
else: | |
perimeter_coords = raw_perimeter[:] | |
# 3) Randomize the order to avoid bias | |
indices = list(range(len(perimeter_coords))) | |
random.shuffle(indices) | |
logger.debug(f"Shuffled perimeter indices for candidate order") | |
# 4) Non-blocking timeout setup | |
start_time = time.time() | |
timeout_secs = 5.0 # leave ~0.1s margin | |
attempts = 0 | |
try: | |
while attempts < max_attempts: | |
# 5) Abort if we're running out of time | |
if time.time() - start_time > timeout_secs - 0.1: | |
logger.warning(f"Approaching timeout after {attempts} attempts") | |
return fallback_solution() | |
# 6) For each shuffled perimeter point, try small offsets | |
for idx in indices: | |
# Check timeout inside the loop as well | |
if time.time() - start_time > timeout_secs - 0.05: | |
logger.warning("Timeout during candidate-point loop") | |
return fallback_solution() | |
cx, cy = perimeter_coords[idx] | |
# Try five small offsets: (0,0), (±min_gap/2, 0), (0, ±min_gap/2) | |
for dx, dy in [(0, 0), (-min_gap/2, 0), (min_gap/2, 0), (0, -min_gap/2), (0, min_gap/2)]: | |
candidate_center = (cx + dx, cy + dy) | |
# 6a) Check distance to existing finger centers | |
too_close_finger = any( | |
np.hypot(candidate_center[0] - ex, candidate_center[1] - ey) | |
< needed_center_dist | |
for (ex, ey) in existing_centers | |
) | |
if too_close_finger: | |
continue | |
# 6b) Build candidate circle with reduced resolution for speed | |
candidate_circle = Point(candidate_center).buffer(r, resolution=32) | |
# 6c) Must overlap ≥30% with this polygon | |
try: | |
inter_area = tool_polygon.intersection(candidate_circle).area | |
except Exception: | |
continue | |
if inter_area < 0.3 * candidate_circle.area: | |
continue | |
# 6d) Must not intersect or even "touch" any other polygon (buffered by min_gap) | |
invalid = False | |
for other_poly in all_polygons: | |
if other_poly.equals(tool_polygon): | |
# Don't compare against itself | |
continue | |
# Buffer the other polygon by min_gap to enforce a strict clearance | |
if other_poly.buffer(min_gap).intersects(candidate_circle) or \ | |
other_poly.buffer(min_gap).touches(candidate_circle): | |
invalid = True | |
break | |
if invalid: | |
continue | |
# 6e) Candidate passes all tests → union and return | |
try: | |
union_poly = tool_polygon.union(candidate_circle) | |
# If union is a MultiPolygon (more than one piece), reject | |
if union_poly.geom_type == "MultiPolygon" and len(union_poly.geoms) > 1: | |
continue | |
# If union didn't change anything (no real cut), reject | |
if union_poly.equals(tool_polygon): | |
continue | |
except Exception: | |
continue | |
existing_centers.append(candidate_center) | |
logger.info(f"Finger cut placed successfully at {candidate_center} after {attempts} attempts") | |
return union_poly, candidate_center | |
attempts += 1 | |
# If we've done half the attempts and we're near timeout, bail out | |
if attempts >= (max_attempts // 2) and (time.time() - start_time) > timeout_secs * 0.8: | |
logger.warning(f"Approaching timeout (attempt {attempts})") | |
return fallback_solution() | |
logger.debug(f"Completed iteration {attempts}/{max_attempts}") | |
# If we exit loop without finding a valid spot | |
logger.warning(f"No valid spot after {max_attempts} attempts, using fallback") | |
return fallback_solution() | |
except Exception as e: | |
logger.error(f"Error in place_finger_cut_adjusted: {e}") | |
return fallback_solution() | |
def extract_outlines(binary_image: np.ndarray) -> tuple: | |
contours, _ = cv2.findContours( | |
binary_image, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE | |
) | |
outline_image = np.full_like(binary_image, 255) # White background | |
return outline_image, contours | |
def round_edges(mask: np.ndarray, radius_mm: float, scaling_factor: float) -> np.ndarray: | |
"""Rounds mask edges using contour smoothing.""" | |
if radius_mm <= 0 or scaling_factor <= 0: | |
return mask | |
radius_px = max(1, int(radius_mm / scaling_factor)) # Ensure min 1px | |
# Handle small objects | |
if np.count_nonzero(mask) < 500: # Small object threshold | |
return cv2.dilate(cv2.erode(mask, np.ones((3,3))), np.ones((3,3))) | |
# Existing contour processing with improvements: | |
contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE) | |
# NEW: Filter small contours | |
contours = [c for c in contours if cv2.contourArea(c) > 100] | |
smoothed_contours = [] | |
for contour in contours: | |
try: | |
# Resample with radius-based smoothing | |
resampled = resample_contour(contour, radius_px) | |
resampled = resampled.astype(np.int32).reshape((-1, 1, 2)) | |
smoothed_contours.append(resampled) | |
except Exception as e: | |
logger.warning(f"Error smoothing contour: {e}") | |
smoothed_contours.append(contour) # Fallback to original contour | |
# Draw smoothed contours | |
rounded = np.zeros_like(mask) | |
cv2.drawContours(rounded, smoothed_contours, -1, 255, thickness=cv2.FILLED) | |
return rounded | |
def cleanup_memory(): | |
"""Clean up memory after processing""" | |
if torch.cuda.is_available(): | |
torch.cuda.empty_cache() | |
gc.collect() | |
logger.info("Memory cleanup completed") | |
def cleanup_models(): | |
"""Unload models to free memory""" | |
global reference_detector_global, u2net_global, birefnet | |
if reference_detector_global is not None: | |
del reference_detector_global | |
reference_detector_global = None | |
if u2net_global is not None: | |
del u2net_global | |
u2net_global = None | |
if birefnet is not None: | |
del birefnet | |
birefnet = None | |
cleanup_memory() | |
def predict_og(image, offset, offset_unit, edge_radius, finger_clearance=False): | |
coin_size_mm = 20.0 | |
if offset_unit == "inches": | |
offset *= 25.4 | |
if edge_radius is None or edge_radius == 0: | |
edge_radius = 0.0001 | |
if offset < 0: | |
raise gr.Error("Offset Value Can't be negative") | |
try: | |
reference_obj_img, scaling_box_coords = detect_reference_square(image) | |
except ReferenceBoxNotDetectedError as e: | |
return ( | |
None, | |
None, | |
None, | |
None, | |
f"Error: {str(e)}" | |
) | |
except Exception as e: | |
raise gr.Error(f"Error processing image: {str(e)}") | |
reference_obj_img = make_square(reference_obj_img) | |
# Use U2NETP for reference object background removal | |
reference_square_mask = remove_bg_u2netp(reference_obj_img) | |
reference_square_mask = resize_img(reference_square_mask, reference_obj_img.shape[:2][::-1]) | |
try: | |
scaling_factor = calculate_scaling_factor( | |
target_image=reference_square_mask, | |
reference_obj_size_mm=coin_size_mm, | |
feature_detector="ORB", | |
) | |
except Exception as e: | |
scaling_factor = None | |
logger.warning(f"Error calculating scaling factor: {e}") | |
if not scaling_factor: | |
ref_size_px = (reference_square_mask.shape[0] + reference_square_mask.shape[1]) / 2 | |
scaling_factor = 20.0 / ref_size_px | |
logger.info(f"Fallback scaling: {scaling_factor:.4f} mm/px using 20mm reference") | |
# Use BiRefNet for main object background removal | |
orig_size = image.shape[:2] | |
objects_mask = remove_bg(image) | |
processed_size = objects_mask.shape[:2] | |
# REMOVE ALL COINS from mask: | |
# res = reference_detector_global.predict(image, conf=0.05) | |
res = get_reference_detector().predict(image, conf=0.05) | |
boxes = res[0].cpu().boxes.xyxy if res and len(res) > 0 else [] | |
for box in boxes: | |
objects_mask = exclude_scaling_box( | |
objects_mask, | |
box, | |
orig_size, | |
processed_size, | |
expansion_factor=1.2, | |
) | |
objects_mask = resize_img(objects_mask, (image.shape[1], image.shape[0])) | |
offset_pixels = (float(offset) / scaling_factor) * 2 + 1 if scaling_factor else 1 | |
dilated_mask = cv2.dilate(objects_mask, np.ones((int(offset_pixels), int(offset_pixels)), np.uint8)) | |
Image.fromarray(dilated_mask).save("./outputs/scaled_mask_original.jpg") | |
dilated_mask_orig = dilated_mask.copy() | |
#if edge_radius > 0: | |
# Use morphological rounding instead of contour-based | |
rounded_mask = round_edges(objects_mask, edge_radius, scaling_factor) | |
#else: | |
#rounded_mask = objects_mask.copy() | |
# Apply dilation AFTER rounding | |
offset_pixels = (float(offset) / scaling_factor) * 2 + 1 if scaling_factor else 1 | |
kernel = np.ones((int(offset_pixels), int(offset_pixels)), np.uint8) | |
dilated_mask = cv2.dilate(rounded_mask, kernel) | |
outlines, contours = extract_outlines(dilated_mask) | |
try: | |
dxf, finger_polygons, original_polygons = save_dxf_spline( | |
contours, | |
scaling_factor, | |
processed_size[0], | |
finger_clearance=(finger_clearance == "On") | |
) | |
except FingerCutOverlapError as e: | |
raise gr.Error(str(e)) | |
shrunked_img_contours = image.copy() | |
if finger_clearance == "On": | |
outlines = np.full_like(dilated_mask, 255) | |
for poly in finger_polygons: | |
try: | |
coords = np.array([ | |
(int(x / scaling_factor), int(processed_size[0] - y / scaling_factor)) | |
for x, y in poly.exterior.coords | |
], np.int32).reshape((-1, 1, 2)) | |
cv2.drawContours(shrunked_img_contours, [coords], -1, 0, thickness=2) | |
cv2.drawContours(outlines, [coords], -1, 0, thickness=2) | |
except Exception as e: | |
logger.warning(f"Failed to draw finger cut: {e}") | |
continue | |
else: | |
outlines = np.full_like(dilated_mask, 255) | |
cv2.drawContours(shrunked_img_contours, contours, -1, 0, thickness=2) | |
cv2.drawContours(outlines, contours, -1, 0, thickness=2) | |
cleanup_models() | |
return ( | |
shrunked_img_contours, | |
outlines, | |
dxf, | |
dilated_mask_orig, | |
f"{scaling_factor:.4f}") | |
def predict_simple(image): | |
""" | |
Only image in → returns (annotated, outlines, dxf, mask). | |
Uses offset=0 mm, no fillet, no finger-cut. | |
""" | |
ann, outlines, dxf_path, mask, _ = predict_og( | |
image, | |
offset=0, | |
offset_unit="mm", | |
edge_radius=0, | |
finger_clearance="Off", | |
) | |
return ann, outlines, dxf_path, mask | |
def predict_middle(image, enable_fillet, fillet_value_mm): | |
""" | |
image + (On/Off) fillet toggle + fillet radius → returns (annotated, outlines, dxf, mask). | |
Uses offset=0 mm, finger-cut off. | |
""" | |
radius = fillet_value_mm if enable_fillet == "On" else 0 | |
ann, outlines, dxf_path, mask, _ = predict_og( | |
image, | |
offset=0, | |
offset_unit="mm", | |
edge_radius=radius, | |
finger_clearance="Off", | |
) | |
return ann, outlines, dxf_path, mask | |
def predict_full(image, enable_fillet, fillet_value_mm, enable_finger_cut): | |
""" | |
image + fillet toggle/value + finger-cut toggle → returns (annotated, outlines, dxf, mask). | |
Uses offset=0 mm. | |
""" | |
radius = fillet_value_mm if enable_fillet == "On" else 0 | |
finger_flag = "On" if enable_finger_cut == "On" else "Off" | |
ann, outlines, dxf_path, mask, _ = predict_og( | |
image, | |
offset=0, | |
offset_unit="mm", | |
edge_radius=radius, | |
finger_clearance=finger_flag, | |
) | |
return ann, outlines, dxf_path, mask | |
if __name__ == "__main__": | |
os.makedirs("./outputs", exist_ok=True) | |
with gr.Blocks() as demo: | |
input_image = gr.Image(label="Input Image", type="numpy") | |
enable_fillet = gr.Radio( | |
choices=["On", "Off"], | |
value="Off", | |
label="Enable Fillet", | |
interactive=True | |
) | |
fillet_value_mm = gr.Slider( | |
minimum=0, | |
maximum=20, | |
step=1, | |
value=5, | |
label="Edge Radius (mm)", | |
visible=False, | |
interactive=True | |
) | |
enable_finger_cut = gr.Radio( | |
choices=["On", "Off"], | |
value="Off", | |
label="Enable Finger Cut" | |
) | |
def toggle_fillet(choice): | |
if choice == "On": | |
return gr.update(visible=True) | |
return gr.update(visible=False, value=0) | |
enable_fillet.change( | |
fn=toggle_fillet, | |
inputs=enable_fillet, | |
outputs=fillet_value_mm | |
) | |
output_image = gr.Image(label="Output Image") | |
outlines = gr.Image(label="Outlines of Objects") | |
dxf_file = gr.File(label="DXF file") | |
mask = gr.Image(label="Mask") | |
submit_btn = gr.Button("Submit") | |
submit_btn.click( | |
fn=predict_full, | |
inputs=[input_image, enable_fillet, fillet_value_mm, enable_finger_cut], | |
outputs=[output_image, outlines, dxf_file, mask] | |
) | |
demo.launch(share=True) |