|
|
|
|
|
|
|
import numpy as np |
|
import logging |
|
import time |
|
from PIL import Image, ImageDraw |
|
from typing import Dict, List, Optional, Tuple, Any |
|
from scipy.ndimage import ( |
|
binary_fill_holes, |
|
binary_closing, |
|
label, |
|
find_objects |
|
) |
|
from src.utils import ProcessingContext, create_pipeline_step, LOG_LEVEL_MAP, EMOJI_MAP |
|
|
|
|
|
|
|
|
|
UNIVERSAL_PAD_RATIO = 0.075 |
|
COVERAGE_THRESHOLD = 0.25 |
|
FEATHER_THRESHOLD_MIN = 0.3 |
|
FEATHER_THRESHOLD_MAX = 0.7 |
|
|
|
ENABLE_CROPPING_PADDING = True |
|
|
|
PRODUCT_TYPE_LIST = ["jacket", "shirt", "vest", "jeans", "shorts", "skirt", "overall", "dress"] |
|
HEAD_LIST = ["head"] |
|
SHOES_LIST = ["shoes"] |
|
CLOTHING_FEATURES_LIST = ["neckline", "collar", "sleeve", "closure", "pocket"] |
|
|
|
|
|
|
|
|
|
def calculate_transparency(image, coverage_threshold=0.9999): |
|
alpha = image.getchannel("A") |
|
px = alpha.load() |
|
w, h = image.size |
|
total_pixels = w * h |
|
non_transparent = 0 |
|
for y in range(h): |
|
for x in range(w): |
|
if px[x, y] >= 1: |
|
non_transparent += 1 |
|
ratio = non_transparent / float(total_pixels) if total_pixels else 0 |
|
return ratio |
|
|
|
def parse_line_flag(val) -> bool: |
|
if isinstance(val, bool): |
|
return val |
|
if isinstance(val, str): |
|
return val.strip().lower().startswith("true") |
|
return False |
|
|
|
def partial_pad_square( |
|
img: Image.Image, |
|
pad_left: int, |
|
pad_right: int, |
|
pad_top: int, |
|
pad_bottom: int |
|
) -> Tuple[Image.Image, Dict[str,int]]: |
|
w,h= img.size |
|
new_w= w+ pad_left + pad_right |
|
new_h= h+ pad_top + pad_bottom |
|
side= max(new_w, new_h) |
|
|
|
out= Image.new("RGBA",(side, side),(0,0,0,0)) |
|
offx= (side- new_w)//2 + pad_left |
|
offy= (side- new_h)//2 + pad_top |
|
out.paste(img,(offx,offy)) |
|
|
|
changes= { |
|
"left": pad_left, |
|
"right": pad_right, |
|
"top": pad_top, |
|
"bottom": pad_bottom |
|
} |
|
return (out, changes) |
|
|
|
def two_step_pad_to_square( |
|
img: Image.Image, |
|
orientation: str, |
|
border_pad: int |
|
)->Tuple[Image.Image, Dict[str,int]]: |
|
changes= {"left":0,"right":0,"top":0,"bottom":0} |
|
w2,h2= img.size |
|
working= img |
|
|
|
if orientation=="Landscape" and w2>h2: |
|
diff= w2- h2 |
|
tpad= diff//2 |
|
bpad= diff- tpad |
|
wtmp,c1= partial_pad_square(working,0,0,tpad,bpad) |
|
working= wtmp |
|
for k_ in c1: changes[k_]+= c1[k_] |
|
|
|
elif orientation=="Portrait" and h2> w2: |
|
diff= h2- w2 |
|
lpad= diff//2 |
|
rpad= diff- lpad |
|
wtmp,c2= partial_pad_square(working,lpad,rpad,0,0) |
|
working= wtmp |
|
for k_ in c2: changes[k_]+= c2[k_] |
|
|
|
wtmp2,c3= partial_pad_square(working, border_pad,border_pad,border_pad,border_pad) |
|
for k_ in c3: changes[k_]+= c3[k_] |
|
|
|
return (wtmp2, changes) |
|
|
|
def pad_left_right_only( |
|
img: Image.Image, |
|
pad_val: int |
|
)->Tuple[Image.Image, Dict[str,int]]: |
|
changes= {"left":0,"right":0,"top":0,"bottom":0} |
|
w3,h3= img.size |
|
new_w= w3+ pad_val*2 |
|
new_h= h3 |
|
side3= max(new_w, new_h) |
|
|
|
out3= Image.new("RGBA",(side3,side3),(0,0,0,0)) |
|
offx3= (side3- new_w)//2 + pad_val |
|
offy3= (side3- new_h)//2 |
|
out3.paste(img,(offx3,offy3)) |
|
|
|
changes["left"] += pad_val |
|
changes["right"]+= pad_val |
|
return (out3, changes) |
|
|
|
def _center_min_square( |
|
img: Image.Image, |
|
orientation: str |
|
)->Tuple[Image.Image, Dict[str,int]]: |
|
w,h= img.size |
|
side_= min(w,h) |
|
l_= (w-side_)//2 |
|
t_= (h-side_)//2 |
|
r_= l_+ side_ |
|
b_= t_+ side_ |
|
|
|
crp= img.crop((l_,t_,r_,b_)) |
|
chg={ |
|
"left": -l_, |
|
"top": -t_, |
|
"right": -(w- r_), |
|
"bottom": -(h- b_) |
|
} |
|
return (crp, chg) |
|
|
|
def coverage_crop_with_shorter_dimension( |
|
img: Image.Image, |
|
ctx: "ProcessingContext", |
|
orientation: str, |
|
force_side_to_min: bool |
|
)->Tuple[Image.Image, Dict[str,int]]: |
|
w,h= img.size |
|
rbc_min= min(w,h) |
|
|
|
def fallback_center_crop(): |
|
return _center_min_square(img, orientation) |
|
|
|
dr= ctx.detection_result |
|
if not dr or dr.get("status")!="ok": |
|
return fallback_center_crop() |
|
|
|
bxs= dr.get("boxes",[]) |
|
kws= dr.get("final_keywords",[]) |
|
if not bxs or not kws or len(bxs)!= len(kws): |
|
return fallback_center_crop() |
|
|
|
cf_ = [bx for (bx,kw) in zip(bxs,kws) if kw in CLOTHING_FEATURES_LIST] |
|
if not cf_: |
|
fallback_box= ctx.define_result.get("largest_box") |
|
if fallback_box and isinstance(fallback_box,list) and len(fallback_box)==4: |
|
cf_=[fallback_box] |
|
else: |
|
return fallback_center_crop() |
|
|
|
x1= max(0, min(b[0] for b in cf_)) |
|
y1= max(0, min(b[1] for b in cf_)) |
|
x2= min(w, max(b[2] for b in cf_)) |
|
y2= min(h, max(b[3] for b in cf_)) |
|
bw= x2- x1 |
|
bh= y2- y1 |
|
if bw<=0 or bh<=0: |
|
return fallback_center_crop() |
|
|
|
side0= min(bw,bh) |
|
if side0< rbc_min: |
|
side0= rbc_min |
|
|
|
cx= (x1+ x2)//2 |
|
cy= (y1+ y2)//2 |
|
half= side0//2 |
|
left_= cx- half |
|
top_= cy- half |
|
right_= left_ + side0 |
|
bot_= top_ + side0 |
|
|
|
if left_<0: |
|
left_=0 |
|
right_= side0 |
|
if top_<0: |
|
top_=0 |
|
bot_= side0 |
|
if right_> w: |
|
right_= w |
|
left_= w- side0 |
|
if bot_> h: |
|
bot_= h |
|
top_= h- side0 |
|
|
|
cropped= img.crop((left_,top_,right_,bot_)) |
|
changes={ |
|
"left": -left_, |
|
"top": -top_, |
|
"right": -(w- right_), |
|
"bottom": -(h- bot_) |
|
} |
|
return (cropped, changes) |
|
|
|
|
|
|
|
|
|
ACTION_LIB_SQUARE = { |
|
"square_shoes_exception": "SQUARE_SHOES => pad bottom ignoring lower_line, no top", |
|
"square_head_exception": "SQUARE_HEAD => lines => no changes", |
|
"square_has_lines": "SQUARE_HAS_LINES => lines => no changes", |
|
"square_all_false": "SQUARE_ALL_FALSE => no lines => 2-step pad => square", |
|
} |
|
|
|
ACTION_LIB_LANDSCAPE = { |
|
"landscape_shoes_exception":"LANDSCAPE_SHOES => pad bottom, coverage-crop => square", |
|
"landscape_coverage": "LANDSCAPE_COVERAGE => any line => coverage-crop => square", |
|
"landscape_all_false": "LANDSCAPE_ALL_FALSE => no lines => 2-step pad => square", |
|
"landscape_head_exception": "LANDSCAPE_HEAD => remove top pad, keep shape square", |
|
} |
|
|
|
ACTION_LIB_PORTRAIT = { |
|
"portrait_shoes_exception": "PORTRAIT_SHOES => never pad top, pad bottom ignoring lower_line", |
|
"portrait_lr_any": "PORTRAIT_LR_COVERAGE => (left_line/right_line) => coverage-crop => square", |
|
"portrait_up_low_both": "PORTRAIT_BOTH_UP_LOW => pad left/right only => no top/bottom => square", |
|
"portrait_any_up_low": "PORTRAIT_ANY_UP_OR_LOW => exactly 1 => 2-step pad => square", |
|
"portrait_all_false": "PORTRAIT_ALL_FALSE => no lines => 2-step pad => square", |
|
"portrait_head_exception": "PORTRAIT_HEAD => pad left/right only => no top/bottom => square" |
|
} |
|
|
|
|
|
|
|
|
|
def cropp_batch(contexts: List[ProcessingContext], batch_logs: List[dict]): |
|
function_name = "cropp_batch" |
|
start_time = time.perf_counter() |
|
logging.log(LOG_LEVEL_MAP["INFO"], f"{EMOJI_MAP['INFO']} Starting {function_name} for {len(contexts)} items") |
|
|
|
processed_count = 0 |
|
skipped_count = 0 |
|
error_count = 0 |
|
morph_structure = np.ones((5, 5), dtype=np.uint8) |
|
|
|
for ctx in contexts: |
|
it_ = {"image_url": ctx.url, "function": function_name} |
|
if ctx.skip_run or ctx.skip_processing: |
|
it_["status"] = "skipped" |
|
batch_logs.append(it_) |
|
skipped_count += 1 |
|
continue |
|
|
|
dr = ctx.detection_result |
|
if dr.get("status") != "ok": |
|
it_["status"] = "no_detection" |
|
batch_logs.append(it_) |
|
error_count += 1 |
|
continue |
|
|
|
if "original" not in ctx.pil_img: |
|
it_["status"] = "no_original" |
|
batch_logs.append(it_) |
|
error_count += 1 |
|
continue |
|
|
|
try: |
|
pi_rgba, orig_filename, _ = ctx.pil_img["original"] |
|
|
|
if ctx.adjusted_blue_box: |
|
abx1, aby1, abx2, aby2 = ctx.adjusted_blue_box |
|
W_full = pi_rgba.width |
|
H_full = pi_rgba.height |
|
|
|
x1 = 0 |
|
x2 = W_full |
|
y1 = max(0, min(aby1, H_full)) |
|
y2 = max(0, min(aby2, H_full)) |
|
|
|
if y2 <= y1: |
|
it_["status"] = "invalid_crop_range" |
|
batch_logs.append(it_) |
|
error_count += 1 |
|
continue |
|
|
|
cropped = pi_rgba.crop((x1, y1, x2, y2)) |
|
else: |
|
cropped = pi_rgba |
|
|
|
cropped_np = np.array(cropped) |
|
if cropped_np.shape[2] < 4: |
|
cropped = cropped.convert("RGBA") |
|
cropped_np = np.array(cropped) |
|
|
|
alpha = cropped_np[:, :, 3] |
|
|
|
bin_mask = (alpha > 0).astype(np.uint8) |
|
bin_mask = binary_fill_holes(bin_mask).astype(np.uint8) |
|
bin_mask = binary_closing(bin_mask, structure=morph_structure, iterations=1).astype(np.uint8) |
|
|
|
labeled, num_components = label(bin_mask) |
|
if num_components > 1: |
|
largest_area = 0 |
|
largest_label = None |
|
for i in range(1, num_components + 1): |
|
area = (labeled == i).sum() |
|
if area > largest_area: |
|
largest_area = area |
|
largest_label = i |
|
bin_mask = (labeled == largest_label).astype(np.uint8) |
|
|
|
alpha_clean = alpha.copy() |
|
alpha_clean[bin_mask == 0] = 0 |
|
cropped_np[:, :, 3] = alpha_clean |
|
|
|
non_zero_rows = np.where(np.any(bin_mask != 0, axis=1))[0] |
|
non_zero_cols = np.where(np.any(bin_mask != 0, axis=0))[0] |
|
|
|
if len(non_zero_rows) > 0 and len(non_zero_cols) > 0: |
|
row_min, row_max = non_zero_rows[0], non_zero_rows[-1] |
|
col_min, col_max = non_zero_cols[0], non_zero_cols[-1] |
|
cropped_np = cropped_np[row_min:row_max + 1, col_min:col_max + 1, :] |
|
|
|
final_img = Image.fromarray(cropped_np, mode="RGBA") |
|
|
|
ctx.pil_img["original"] = [final_img, orig_filename, None] |
|
it_["status"] = "ok" |
|
processed_count += 1 |
|
except Exception as e: |
|
it_["status"] = "error" |
|
it_["exception"] = str(e) |
|
error_count += 1 |
|
|
|
batch_logs.append(it_) |
|
|
|
processing_time = time.perf_counter() - start_time |
|
logging.log(LOG_LEVEL_MAP["SUCCESS"], f"{EMOJI_MAP['SUCCESS']} Completed {function_name}: processed={processed_count}, skipped={skipped_count}, errors={error_count} in {processing_time:.3f}s") |
|
return batch_logs |
|
|
|
def shrink_primary_box_batch(contexts: List[ProcessingContext], batch_logs: List[dict]): |
|
function_name = "shrink_primary_box_batch" |
|
start_time = time.perf_counter() |
|
logging.log(LOG_LEVEL_MAP["INFO"], f"{EMOJI_MAP['INFO']} Starting {function_name} for {len(contexts)} items") |
|
|
|
processed_count = 0 |
|
skipped_count = 0 |
|
error_count = 0 |
|
WHITE_CUTOFF = 240 |
|
|
|
for ctx in contexts: |
|
step_log = { |
|
"function": function_name, |
|
"image_url": ctx.url, |
|
"status": None, |
|
"data": { |
|
"primary_box": None, |
|
"primary_box_dimensions": None, |
|
"primary_box_orientation": None, |
|
"primary_box_transparency": None, |
|
"primary_box_border_lines_transparency": {}, |
|
"primary_shrinked_box_dimensions": None, |
|
"primary_shrinked_box_transparency": None, |
|
"primary_shrinked_box_border_lines_transparency": {}, |
|
"shrink_top": None, |
|
"shrink_bottom": None, |
|
"shrink_left": None, |
|
"shrink_right": None, |
|
"notes": "" |
|
} |
|
} |
|
|
|
if ctx.skip_run or ctx.skip_processing: |
|
step_log["status"] = "skipped" |
|
batch_logs.append(step_log) |
|
skipped_count += 1 |
|
continue |
|
|
|
if "original" not in ctx.pil_img: |
|
step_log["status"] = "error" |
|
step_log["data"]["notes"] = "No original image found in context." |
|
batch_logs.append(step_log) |
|
ctx.skip_run = True |
|
error_count += 1 |
|
continue |
|
|
|
try: |
|
pil_img_obj = ctx.pil_img["original"][0] |
|
width, height = pil_img_obj.size |
|
alpha = pil_img_obj.getchannel("A") |
|
|
|
top, bottom = 0, height - 1 |
|
left, right = 0, width - 1 |
|
|
|
while top < height: |
|
row_data = alpha.crop((0, top, width, top + 1)).tobytes() |
|
if all(v == 0 for v in row_data): |
|
top += 1 |
|
else: |
|
break |
|
|
|
while bottom >= 0: |
|
row_data = alpha.crop((0, bottom, width, bottom + 1)).tobytes() |
|
if all(v == 0 for v in row_data): |
|
bottom -= 1 |
|
else: |
|
break |
|
|
|
while left < width: |
|
col_data = alpha.crop((left, 0, left + 1, height)).tobytes() |
|
if all(v == 0 for v in col_data): |
|
left += 1 |
|
else: |
|
break |
|
|
|
while right >= 0: |
|
col_data = alpha.crop((right, 0, right + 1, height)).tobytes() |
|
if all(v == 0 for v in col_data): |
|
right -= 1 |
|
else: |
|
break |
|
|
|
pil_rgb = pil_img_obj.convert("RGB") |
|
px = pil_rgb.load() |
|
|
|
def is_white_row(row_idx: int) -> bool: |
|
for x in range(left, right + 1): |
|
r, g, b = px[x, row_idx] |
|
if not (r >= WHITE_CUTOFF and g >= WHITE_CUTOFF and b >= WHITE_CUTOFF): |
|
return False |
|
return True |
|
|
|
def is_white_col(col_idx: int) -> bool: |
|
for y in range(top, bottom + 1): |
|
r, g, b = px[col_idx, y] |
|
if not (r >= WHITE_CUTOFF and g >= WHITE_CUTOFF and b >= WHITE_CUTOFF): |
|
return False |
|
return True |
|
|
|
while top <= bottom: |
|
if is_white_row(top): |
|
top += 1 |
|
else: |
|
break |
|
|
|
while bottom >= top: |
|
if is_white_row(bottom): |
|
bottom -= 1 |
|
else: |
|
break |
|
|
|
while left <= right: |
|
if is_white_col(left): |
|
left += 1 |
|
else: |
|
break |
|
|
|
while right >= left: |
|
if is_white_col(right): |
|
right -= 1 |
|
else: |
|
break |
|
|
|
if left > right or top > bottom: |
|
step_log["data"]["notes"] += " Entire image trimmed away by alpha/white => skipping" |
|
step_log["status"] = "error" |
|
batch_logs.append(step_log) |
|
ctx.skip_run = True |
|
error_count += 1 |
|
continue |
|
|
|
shrink_top = top |
|
shrink_bottom = (height - 1) - bottom |
|
shrink_left = left |
|
shrink_right = (width - 1) - right |
|
|
|
step_log["data"]["shrink_top"] = shrink_top |
|
step_log["data"]["shrink_bottom"] = shrink_bottom |
|
step_log["data"]["shrink_left"] = shrink_left |
|
step_log["data"]["shrink_right"] = shrink_right |
|
|
|
primary_box = [left, top, right, bottom] |
|
w = right - left + 1 |
|
h = bottom - top + 1 |
|
step_log["data"]["primary_box"] = primary_box |
|
step_log["data"]["primary_box_dimensions"] = [w, h] |
|
|
|
orientation = "Square" |
|
if h > w: |
|
orientation = "Portrait" |
|
elif w > h: |
|
orientation = "Landscape" |
|
step_log["data"]["primary_box_orientation"] = orientation |
|
|
|
cropped_img = pil_img_obj.crop((left, top, right + 1, bottom + 1)) |
|
box_transparency = calculate_transparency(cropped_img) |
|
step_log["data"]["primary_box_transparency"] = box_transparency |
|
|
|
ctx.pil_img["original"] = [cropped_img, ctx.pil_img["original"][1], None] |
|
|
|
cw, ch = cropped_img.size |
|
step_log["data"]["primary_shrinked_box_dimensions"] = [cw, ch] |
|
step_log["data"]["primary_shrinked_box_transparency"] = box_transparency |
|
step_log["status"] = "ok" |
|
step_log["data"]["notes"] += " alpha+white trim done." |
|
|
|
ctx.define_result["primary_box_transparency"] = box_transparency |
|
processed_count += 1 |
|
|
|
except Exception as e: |
|
step_log["status"] = "error" |
|
step_log["data"]["notes"] = f"Exception: {e}" |
|
error_count += 1 |
|
|
|
batch_logs.append(step_log) |
|
|
|
processing_time = time.perf_counter() - start_time |
|
logging.log(LOG_LEVEL_MAP["SUCCESS"], f"{EMOJI_MAP['SUCCESS']} Completed {function_name}: processed={processed_count}, skipped={skipped_count}, errors={error_count} in {processing_time:.3f}s") |
|
return batch_logs |
|
|
|
def detect_border_stright_line_batch(contexts: List[ProcessingContext], batch_logs: List[dict]): |
|
function_name = "detect_border_stright_line_batch" |
|
start_time = time.perf_counter() |
|
logging.log(LOG_LEVEL_MAP["INFO"], f"{EMOJI_MAP['INFO']} Starting {function_name} for {len(contexts)} items") |
|
|
|
processed_count = 0 |
|
skipped_count = 0 |
|
error_count = 0 |
|
local_patch_size = 7 |
|
std_threshold = 5 |
|
|
|
for ctx in contexts: |
|
step_log = { |
|
"function": function_name, |
|
"image_url": ctx.url, |
|
"status": None, |
|
"data": { |
|
"left_line": False, |
|
"right_line": False, |
|
"upper_line": False, |
|
"lower_line": False, |
|
"left_line_coverage": 0.0, |
|
"right_line_coverage": 0.0, |
|
"upper_line_coverage": 0.0, |
|
"lower_line_coverage": 0.0, |
|
"left_feather_ratio": 0.0, |
|
"right_feather_ratio": 0.0, |
|
"upper_feather_ratio": 0.0, |
|
"lower_feather_ratio": 0.0, |
|
"performed_action": "single_px_border_feather_ratio_inverted_logic", |
|
"current_feather_threshold": (0.0, 0.0), |
|
"current_coverage_threshold": 0.0 |
|
} |
|
} |
|
|
|
if ctx.skip_run or ctx.skip_processing: |
|
step_log["status"] = "skipped" |
|
batch_logs.append(step_log) |
|
skipped_count += 1 |
|
continue |
|
|
|
if "original" not in ctx.pil_img: |
|
step_log["status"] = "error" |
|
step_log["error"] = "No padded image found in context." |
|
ctx.skip_run = True |
|
batch_logs.append(step_log) |
|
error_count += 1 |
|
continue |
|
|
|
try: |
|
pil_img_obj = ctx.pil_img["original"][0] |
|
w, h = pil_img_obj.size |
|
if w == 0 or h == 0: |
|
step_log["status"] = "error" |
|
step_log["error"] = f"Invalid dims (w={w}, h={h})" |
|
ctx.skip_run = True |
|
batch_logs.append(step_log) |
|
error_count += 1 |
|
continue |
|
|
|
pil_rgba = pil_img_obj.convert("RGBA") |
|
|
|
top_cov, bot_cov, left_cov, right_cov = 0.0, 0.0, 0.0, 0.0 |
|
|
|
if h > 0: |
|
strip_top = pil_rgba.crop((0, 0, w, 1)) |
|
top_cov = calculate_transparency(strip_top) |
|
step_log["data"]["upper_line_coverage"] = round(top_cov, 3) |
|
if h > 1: |
|
strip_bot = pil_rgba.crop((0, h - 1, w, h)) |
|
bot_cov = calculate_transparency(strip_bot) |
|
step_log["data"]["lower_line_coverage"] = round(bot_cov, 3) |
|
if w > 0: |
|
strip_left = pil_rgba.crop((0, 0, 1, h)) |
|
left_cov = calculate_transparency(strip_left) |
|
step_log["data"]["left_line_coverage"] = round(left_cov, 3) |
|
if w > 1: |
|
strip_right = pil_rgba.crop((w - 1, 0, w, h)) |
|
right_cov = calculate_transparency(strip_right) |
|
step_log["data"]["right_line_coverage"] = round(right_cov, 3) |
|
|
|
px_data = pil_rgba.load() |
|
|
|
def patch_alpha_values(cx, cy): |
|
half = local_patch_size // 2 |
|
vals = [] |
|
for dy in range(-half, half+1): |
|
for dx in range(-half, half+1): |
|
nx = cx + dx |
|
ny = cy + dy |
|
if 0 <= nx < w and 0 <= ny < h: |
|
_, _, _, a_ = px_data[nx, ny] |
|
vals.append(a_) |
|
return vals |
|
|
|
def is_feather_pixel(alpha_vals): |
|
if len(alpha_vals) <= 1: |
|
return True |
|
avg_ = sum(alpha_vals) / len(alpha_vals) |
|
var_ = sum((v - avg_)**2 for v in alpha_vals) / len(alpha_vals) |
|
return (var_**0.5 < std_threshold) |
|
|
|
def measure_feather_ratio(x1, y1, x2, y2): |
|
ww = x2 - x1 |
|
hh = y2 - y1 |
|
total_ = ww * hh |
|
if total_ <= 0: |
|
return 0.0 |
|
c_ = 0 |
|
for yy in range(y1, y2): |
|
for xx in range(x1, x2): |
|
pv = patch_alpha_values(xx, yy) |
|
if is_feather_pixel(pv): |
|
c_ += 1 |
|
return c_ / float(total_) |
|
|
|
top_f = 0.0 |
|
bot_f = 0.0 |
|
left_f = 0.0 |
|
right_f = 0.0 |
|
|
|
if h > 0: |
|
top_f = measure_feather_ratio(0, 0, w, 1) |
|
step_log["data"]["upper_feather_ratio"] = round(top_f, 3) |
|
if h > 1: |
|
bot_f = measure_feather_ratio(0, h - 1, w, h) |
|
step_log["data"]["lower_feather_ratio"] = round(bot_f, 3) |
|
if w > 0: |
|
left_f = measure_feather_ratio(0, 0, 1, h) |
|
step_log["data"]["left_feather_ratio"] = round(left_f, 3) |
|
if w > 1: |
|
right_f = measure_feather_ratio(w - 1, 0, w, h) |
|
step_log["data"]["right_feather_ratio"] = round(right_f, 3) |
|
|
|
if top_cov >= COVERAGE_THRESHOLD and (top_f < FEATHER_THRESHOLD_MIN or top_f > FEATHER_THRESHOLD_MAX): |
|
step_log["data"]["upper_line"] = True |
|
|
|
if bot_cov >= COVERAGE_THRESHOLD and (bot_f < FEATHER_THRESHOLD_MIN or bot_f > FEATHER_THRESHOLD_MAX): |
|
step_log["data"]["lower_line"] = True |
|
|
|
if left_cov >= COVERAGE_THRESHOLD and (left_f < FEATHER_THRESHOLD_MIN or left_f > FEATHER_THRESHOLD_MAX): |
|
step_log["data"]["left_line"] = True |
|
|
|
if right_cov >= COVERAGE_THRESHOLD and (right_f < FEATHER_THRESHOLD_MIN or right_f > FEATHER_THRESHOLD_MAX): |
|
step_log["data"]["right_line"] = True |
|
|
|
ctx.define_result["borders"] = { |
|
"left_line": step_log["data"]["left_line"], |
|
"right_line": step_log["data"]["right_line"], |
|
"upper_line": step_log["data"]["upper_line"], |
|
"lower_line": step_log["data"]["lower_line"] |
|
} |
|
|
|
step_log["data"]["current_feather_threshold"] = (FEATHER_THRESHOLD_MIN, FEATHER_THRESHOLD_MAX) |
|
step_log["data"]["current_coverage_threshold"] = COVERAGE_THRESHOLD |
|
|
|
step_log["status"] = "ok" |
|
processed_count += 1 |
|
|
|
except Exception as e: |
|
step_log["status"] = "error" |
|
step_log["error"] = str(e) |
|
error_count += 1 |
|
|
|
batch_logs.append(step_log) |
|
|
|
processing_time = time.perf_counter() - start_time |
|
logging.log(LOG_LEVEL_MAP["SUCCESS"], f"{EMOJI_MAP['SUCCESS']} Completed {function_name}: processed={processed_count}, skipped={skipped_count}, errors={error_count} in {processing_time:.3f}s") |
|
return batch_logs |
|
|
|
def pad_image_box_to_squere_batch(contexts: List[ProcessingContext], batch_logs: List[dict]): |
|
function_name = "pad_image_box_to_squere_batch" |
|
start_time = time.perf_counter() |
|
logging.log(LOG_LEVEL_MAP["INFO"], f"{EMOJI_MAP['INFO']} Starting {function_name} for {len(contexts)} items") |
|
|
|
processed_count = 0 |
|
skipped_count = 0 |
|
error_count = 0 |
|
|
|
for ctx in contexts: |
|
step_log = { |
|
"function": function_name, |
|
"image_url": ctx.url, |
|
"status": None, |
|
"data": { |
|
"primary_width": None, |
|
"primary_height": None, |
|
"primary_orientation": None, |
|
"border_lines": {}, |
|
"final_width": None, |
|
"final_height": None, |
|
"condition": None, |
|
"actions": [] |
|
} |
|
} |
|
|
|
if ctx.skip_run or ctx.skip_processing: |
|
step_log["status"]="skipped" |
|
batch_logs.append(step_log) |
|
skipped_count += 1 |
|
continue |
|
|
|
if "original" not in ctx.pil_img: |
|
step_log["status"]="error" |
|
step_log["data"]["actions"].append("ERROR => RBC missing or no original image.") |
|
ctx.skip_run= True |
|
batch_logs.append(step_log) |
|
error_count += 1 |
|
continue |
|
|
|
try: |
|
im, fn, _= ctx.pil_img["original"] |
|
w,h= im.size |
|
step_log["data"]["primary_width"]= w |
|
step_log["data"]["primary_height"]= h |
|
|
|
if w==h: |
|
orientation= "Square" |
|
elif w> h: |
|
orientation= "Landscape" |
|
else: |
|
orientation= "Portrait" |
|
step_log["data"]["primary_orientation"]= orientation |
|
|
|
brds= ctx.define_result.get("borders",{}) |
|
left_line = parse_line_flag(brds.get("left_line",False)) |
|
right_line = parse_line_flag(brds.get("right_line",False)) |
|
upper_line = parse_line_flag(brds.get("upper_line",False)) |
|
lower_line = parse_line_flag(brds.get("lower_line",False)) |
|
|
|
step_log["data"]["border_lines"]={ |
|
"left_line": str(left_line), |
|
"right_line": str(right_line), |
|
"upper_line": str(upper_line), |
|
"lower_line": str(lower_line) |
|
} |
|
|
|
dr= ctx.detection_result |
|
final_kws= dr.get("final_keywords",[]) if dr else [] |
|
shoes_detected= any(k in SHOES_LIST for k in final_kws) |
|
head_detected= any(k in HEAD_LIST for k in final_kws) |
|
|
|
border_pad= int(0.075* max(w,h)) |
|
final_img= im |
|
px_info= {"left":0,"right":0,"top":0,"bottom":0} |
|
scenario= None |
|
|
|
if orientation=="Square": |
|
if shoes_detected: |
|
scenario= "square_shoes_exception" |
|
pl= border_pad if not left_line else 0 |
|
pr= border_pad if not right_line else 0 |
|
pt= 0 |
|
pb= border_pad |
|
padded,cA= partial_pad_square(final_img, pl,pr,pt,pb) |
|
for kk in cA: px_info[kk]+= cA[kk] |
|
final_img= padded |
|
elif (left_line or right_line or upper_line or lower_line): |
|
scenario= "square_has_lines" |
|
else: |
|
scenario= "square_all_false" |
|
wtmp2,cB= two_step_pad_to_square(final_img,"Landscape",border_pad) |
|
for kk in cB: px_info[kk]+= cB[kk] |
|
final_img= wtmp2 |
|
|
|
if head_detected and not shoes_detected: |
|
scenario= "square_head_exception" |
|
|
|
step_log["data"]["condition"]= scenario |
|
step_log["data"]["actions"]= [ ACTION_LIB_SQUARE[scenario] ] |
|
|
|
elif orientation=="Landscape": |
|
if shoes_detected: |
|
scenario= "landscape_shoes_exception" |
|
pad_l=0; pad_r=0; pad_t=0; pad_b= border_pad |
|
padded0,c0= partial_pad_square(final_img,pad_l,pad_r,pad_t,pad_b) |
|
for kk in c0: px_info[kk]+= c0[kk] |
|
final_img= padded0 |
|
|
|
cimg,cx1= coverage_crop_with_shorter_dimension( |
|
final_img, ctx, "landscape", False |
|
) |
|
for kk in cx1: px_info[kk]+= cx1[kk] |
|
final_img= cimg |
|
|
|
elif (left_line or right_line or upper_line or lower_line): |
|
scenario= "landscape_coverage" |
|
cimg2,cx2= coverage_crop_with_shorter_dimension( |
|
final_img, ctx, "landscape", False |
|
) |
|
for kk in cx2: px_info[kk]+= cx2[kk] |
|
final_img= cimg2 |
|
|
|
else: |
|
scenario= "landscape_all_false" |
|
wtmp3,c3= two_step_pad_to_square(final_img,"Landscape",border_pad) |
|
for kk in c3: px_info[kk]+= c3[kk] |
|
final_img= wtmp3 |
|
|
|
if head_detected and not shoes_detected: |
|
scenario= "landscape_head_exception" |
|
if h> w: |
|
forced_y = h - w |
|
ctx.define_result["largest_box"] = [0, forced_y, w, h] |
|
cimgH,cxH= coverage_crop_with_shorter_dimension( |
|
final_img, ctx, "landscape", False |
|
) |
|
for kk in cxH: px_info[kk]+= cxH[kk] |
|
final_img= cimgH |
|
|
|
step_log["data"]["condition"]= scenario |
|
step_log["data"]["actions"]= [ ACTION_LIB_LANDSCAPE[scenario] ] |
|
|
|
else: |
|
up_low_count= (1 if upper_line else 0)+(1 if lower_line else 0) |
|
|
|
if shoes_detected: |
|
scenario= "portrait_shoes_exception" |
|
pl= border_pad if not left_line else 0 |
|
pr= border_pad if not right_line else 0 |
|
pt=0 |
|
pb= border_pad |
|
paddedP,cS= partial_pad_square(final_img, pl,pr,pt,pb) |
|
for kk in cS: px_info[kk]+= cS[kk] |
|
final_img= paddedP |
|
|
|
elif head_detected: |
|
scenario= "portrait_head_exception" |
|
side_diff= h - w if h> w else 0 |
|
half_ = side_diff//2 |
|
leftover= side_diff- half_ |
|
pimg, cHL= partial_pad_square(final_img, half_, leftover, 0, 0) |
|
for kk in cHL: px_info[kk]+= cHL[kk] |
|
final_img= pimg |
|
|
|
elif (left_line or right_line): |
|
scenario= "portrait_lr_any" |
|
cimg3,c33= coverage_crop_with_shorter_dimension( |
|
final_img, ctx, "portrait", False |
|
) |
|
for kk in c33: px_info[kk]+= c33[kk] |
|
final_img= cimg3 |
|
|
|
elif up_low_count==2: |
|
scenario= "portrait_up_low_both" |
|
wtmp4,c44= pad_left_right_only(final_img,border_pad) |
|
for kk in c44: px_info[kk]+= c44[kk] |
|
final_img= wtmp4 |
|
|
|
elif up_low_count==1: |
|
scenario= "portrait_any_up_low" |
|
wtmp5,c55= two_step_pad_to_square(final_img,"Portrait",border_pad) |
|
for kk in c55: px_info[kk]+= c55[kk] |
|
final_img= wtmp5 |
|
|
|
else: |
|
scenario= "portrait_all_false" |
|
wtmp6,c66= two_step_pad_to_square(final_img,"Portrait",border_pad) |
|
for kk in c66: px_info[kk]+= c66[kk] |
|
final_img= wtmp6 |
|
|
|
step_log["data"]["condition"]= scenario |
|
step_log["data"]["actions"]= [ ACTION_LIB_PORTRAIT[scenario] ] |
|
|
|
step_log["status"]="ok" |
|
fw,fh= final_img.size |
|
step_log["data"]["final_width"] = fw |
|
step_log["data"]["final_height"]= fh |
|
|
|
def plus_minus(dx): |
|
return f"+{dx}" if dx>=0 else str(dx) |
|
|
|
step_log["data"]["border_lines"] = { |
|
"left_line": f"{left_line} {plus_minus(px_info['left'])}px", |
|
"right_line": f"{right_line} {plus_minus(px_info['right'])}px", |
|
"upper_line": f"{upper_line} {plus_minus(px_info['top'])}px", |
|
"lower_line": f"{lower_line} {plus_minus(px_info['bottom'])}px" |
|
} |
|
|
|
ctx.pil_img["original"]= [final_img, fn, None] |
|
ctx.pad_info.update(px_info) |
|
processed_count += 1 |
|
|
|
except Exception as e: |
|
step_log["status"]="error" |
|
step_log["data"]["actions"].append(f"ERROR => exception => {repr(e)}") |
|
error_count += 1 |
|
|
|
batch_logs.append(step_log) |
|
|
|
processing_time = time.perf_counter() - start_time |
|
logging.log(LOG_LEVEL_MAP["SUCCESS"], f"{EMOJI_MAP['SUCCESS']} Completed {function_name}: processed={processed_count}, skipped={skipped_count}, errors={error_count} in {processing_time:.3f}s") |
|
return batch_logs |
|
|
|
def center_object_batch(contexts: List[ProcessingContext], batch_logs: List[dict]): |
|
function_name = "center_object_batch" |
|
start_time = time.perf_counter() |
|
logging.log(LOG_LEVEL_MAP["INFO"], f"{EMOJI_MAP['INFO']} Starting {function_name} for {len(contexts)} items") |
|
|
|
processed_count = 0 |
|
skipped_count = 0 |
|
error_count = 0 |
|
|
|
for ctx in contexts: |
|
step_log = { |
|
"function": function_name, |
|
"image_url": ctx.url, |
|
"status": None, |
|
"data": { |
|
"leftmost_x": None, |
|
"rightmost_x": None, |
|
"midpoint_x": None, |
|
"shift_x": None, |
|
"bbox": None, |
|
"notes": "" |
|
} |
|
} |
|
|
|
if ctx.skip_run or ctx.skip_processing: |
|
step_log["status"] = "skipped" |
|
batch_logs.append(step_log) |
|
skipped_count += 1 |
|
continue |
|
|
|
if "original" not in ctx.pil_img: |
|
step_log["status"] = "error" |
|
step_log["data"]["notes"] = "No final image found in context." |
|
ctx.skip_run = True |
|
batch_logs.append(step_log) |
|
error_count += 1 |
|
continue |
|
|
|
try: |
|
pil_img, _, _ = ctx.pil_img["original"] |
|
image = pil_img.convert("RGBA") |
|
width, height = image.size |
|
center_y = height // 2 |
|
|
|
alpha = image.split()[3] |
|
non_transparent_xs = [] |
|
|
|
for x in range(width): |
|
if alpha.getpixel((x, center_y)) != 0: |
|
non_transparent_xs.append(x) |
|
|
|
if not non_transparent_xs: |
|
step_log["status"] = "no_op" |
|
step_log["data"]["notes"] = "No non-transparent pixel found on horizontal mid-line." |
|
batch_logs.append(step_log) |
|
skipped_count += 1 |
|
continue |
|
|
|
leftmost = min(non_transparent_xs) |
|
rightmost = max(non_transparent_xs) |
|
midpoint_x = (leftmost + rightmost) / 2.0 |
|
image_center_x = width / 2.0 |
|
shift_x = image_center_x - midpoint_x |
|
|
|
bbox = alpha.getbbox() |
|
if not bbox: |
|
step_log["status"] = "no_op" |
|
step_log["data"]["notes"] = "Image has no non-transparent bounding box." |
|
batch_logs.append(step_log) |
|
skipped_count += 1 |
|
continue |
|
|
|
region = image.crop(bbox) |
|
|
|
new_left = int(bbox[0] + shift_x) |
|
new_top = bbox[1] |
|
|
|
new_image = Image.new("RGBA", (width, height), (0, 0, 0, 0)) |
|
new_image.paste(region, (new_left, new_top), region) |
|
|
|
ctx.pil_img["original"] = [new_image, ctx.pil_img["original"][1], None] |
|
|
|
step_log["status"] = "ok" |
|
step_log["data"]["leftmost_x"] = leftmost |
|
step_log["data"]["rightmost_x"] = rightmost |
|
step_log["data"]["midpoint_x"] = round(midpoint_x, 2) |
|
step_log["data"]["shift_x"] = round(shift_x, 2) |
|
step_log["data"]["bbox"] = bbox |
|
step_log["data"]["notes"] = "Object horizontally centered (red lines removed)." |
|
processed_count += 1 |
|
|
|
except Exception as e: |
|
step_log["status"] = "error" |
|
step_log["data"]["notes"] = f"Error: {str(e)}" |
|
error_count += 1 |
|
|
|
batch_logs.append(step_log) |
|
|
|
processing_time = time.perf_counter() - start_time |
|
logging.log(LOG_LEVEL_MAP["SUCCESS"], f"{EMOJI_MAP['SUCCESS']} Completed {function_name}: processed={processed_count}, skipped={skipped_count}, errors={error_count} in {processing_time:.3f}s") |
|
return batch_logs |
|
|
|
|
|
|
|
|
|
def ensure_models_loaded(): |
|
import app |
|
app.ensure_models_loaded() |
|
|
|
pipeline_step = create_pipeline_step(ensure_models_loaded) |
|
|
|
@pipeline_step |
|
def cropping_padding(contexts: List[ProcessingContext], batch_logs: List[dict] = None): |
|
if batch_logs is None: |
|
batch_logs = [] |
|
|
|
start_time = time.perf_counter() |
|
logging.log(LOG_LEVEL_MAP["INFO"], f"{EMOJI_MAP['INFO']} Starting cropping_padding pipeline for {len(contexts)} items") |
|
|
|
if not ENABLE_CROPPING_PADDING: |
|
logging.log(LOG_LEVEL_MAP["WARNING"], f"{EMOJI_MAP['WARNING']} Cropping and padding operations are disabled (ENABLE_CROPPING_PADDING=False)") |
|
logging.log(LOG_LEVEL_MAP["INFO"], f"{EMOJI_MAP['INFO']} Returning original images unchanged") |
|
|
|
for ctx in contexts: |
|
skip_log = { |
|
"function": "cropping_padding_pipeline", |
|
"image_url": ctx.url, |
|
"status": "skipped", |
|
"reason": "ENABLE_CROPPING_PADDING is False", |
|
"data": {"operations_performed": "none", "original_image_preserved": True} |
|
} |
|
batch_logs.append(skip_log) |
|
|
|
processing_time = time.perf_counter() - start_time |
|
logging.log(LOG_LEVEL_MAP["SUCCESS"], f"{EMOJI_MAP['SUCCESS']} Completed cropping_padding pipeline (skipped) for {len(contexts)} items in {processing_time:.3f}s") |
|
return batch_logs |
|
|
|
cropp_batch(contexts, batch_logs) |
|
|
|
shrink_primary_box_batch(contexts, batch_logs) |
|
|
|
detect_border_stright_line_batch(contexts, batch_logs) |
|
|
|
pad_image_box_to_squere_batch(contexts, batch_logs) |
|
|
|
center_object_batch(contexts, batch_logs) |
|
|
|
processing_time = time.perf_counter() - start_time |
|
logging.log(LOG_LEVEL_MAP["SUCCESS"], f"{EMOJI_MAP['SUCCESS']} Completed cropping_padding pipeline for {len(contexts)} items in {processing_time:.3f}s") |
|
|
|
return batch_logs |
|
|