GitHub Actions
Deploy to Hugging Face Space: product-image-update-port-10
18faf97
# ----------------------------------------------------------------------
# IMPORTS
# ----------------------------------------------------------------------
import numpy as np
import logging
import time
from PIL import Image, ImageDraw
from typing import Dict, List, Optional, Tuple, Any
from scipy.ndimage import (
binary_fill_holes,
binary_closing,
label,
find_objects
)
from src.utils import ProcessingContext, create_pipeline_step, LOG_LEVEL_MAP, EMOJI_MAP
# ----------------------------------------------------------------------
# CONSTANTS
# ----------------------------------------------------------------------
UNIVERSAL_PAD_RATIO = 0.075
COVERAGE_THRESHOLD = 0.25
FEATHER_THRESHOLD_MIN = 0.3
FEATHER_THRESHOLD_MAX = 0.7
ENABLE_CROPPING_PADDING = True
PRODUCT_TYPE_LIST = ["jacket", "shirt", "vest", "jeans", "shorts", "skirt", "overall", "dress"]
HEAD_LIST = ["head"]
SHOES_LIST = ["shoes"]
CLOTHING_FEATURES_LIST = ["neckline", "collar", "sleeve", "closure", "pocket"]
# ----------------------------------------------------------------------
# HELPER FUNCTIONS
# ----------------------------------------------------------------------
def calculate_transparency(image, coverage_threshold=0.9999):
alpha = image.getchannel("A")
px = alpha.load()
w, h = image.size
total_pixels = w * h
non_transparent = 0
for y in range(h):
for x in range(w):
if px[x, y] >= 1:
non_transparent += 1
ratio = non_transparent / float(total_pixels) if total_pixels else 0
return ratio
def parse_line_flag(val) -> bool:
if isinstance(val, bool):
return val
if isinstance(val, str):
return val.strip().lower().startswith("true")
return False
def partial_pad_square(
img: Image.Image,
pad_left: int,
pad_right: int,
pad_top: int,
pad_bottom: int
) -> Tuple[Image.Image, Dict[str,int]]:
w,h= img.size
new_w= w+ pad_left + pad_right
new_h= h+ pad_top + pad_bottom
side= max(new_w, new_h)
out= Image.new("RGBA",(side, side),(0,0,0,0))
offx= (side- new_w)//2 + pad_left
offy= (side- new_h)//2 + pad_top
out.paste(img,(offx,offy))
changes= {
"left": pad_left,
"right": pad_right,
"top": pad_top,
"bottom": pad_bottom
}
return (out, changes)
def two_step_pad_to_square(
img: Image.Image,
orientation: str,
border_pad: int
)->Tuple[Image.Image, Dict[str,int]]:
changes= {"left":0,"right":0,"top":0,"bottom":0}
w2,h2= img.size
working= img
if orientation=="Landscape" and w2>h2:
diff= w2- h2
tpad= diff//2
bpad= diff- tpad
wtmp,c1= partial_pad_square(working,0,0,tpad,bpad)
working= wtmp
for k_ in c1: changes[k_]+= c1[k_]
elif orientation=="Portrait" and h2> w2:
diff= h2- w2
lpad= diff//2
rpad= diff- lpad
wtmp,c2= partial_pad_square(working,lpad,rpad,0,0)
working= wtmp
for k_ in c2: changes[k_]+= c2[k_]
wtmp2,c3= partial_pad_square(working, border_pad,border_pad,border_pad,border_pad)
for k_ in c3: changes[k_]+= c3[k_]
return (wtmp2, changes)
def pad_left_right_only(
img: Image.Image,
pad_val: int
)->Tuple[Image.Image, Dict[str,int]]:
changes= {"left":0,"right":0,"top":0,"bottom":0}
w3,h3= img.size
new_w= w3+ pad_val*2
new_h= h3
side3= max(new_w, new_h)
out3= Image.new("RGBA",(side3,side3),(0,0,0,0))
offx3= (side3- new_w)//2 + pad_val
offy3= (side3- new_h)//2
out3.paste(img,(offx3,offy3))
changes["left"] += pad_val
changes["right"]+= pad_val
return (out3, changes)
def _center_min_square(
img: Image.Image,
orientation: str
)->Tuple[Image.Image, Dict[str,int]]:
w,h= img.size
side_= min(w,h)
l_= (w-side_)//2
t_= (h-side_)//2
r_= l_+ side_
b_= t_+ side_
crp= img.crop((l_,t_,r_,b_))
chg={
"left": -l_,
"top": -t_,
"right": -(w- r_),
"bottom": -(h- b_)
}
return (crp, chg)
def coverage_crop_with_shorter_dimension(
img: Image.Image,
ctx: "ProcessingContext",
orientation: str,
force_side_to_min: bool
)->Tuple[Image.Image, Dict[str,int]]:
w,h= img.size
rbc_min= min(w,h)
def fallback_center_crop():
return _center_min_square(img, orientation)
dr= ctx.detection_result
if not dr or dr.get("status")!="ok":
return fallback_center_crop()
bxs= dr.get("boxes",[])
kws= dr.get("final_keywords",[])
if not bxs or not kws or len(bxs)!= len(kws):
return fallback_center_crop()
cf_ = [bx for (bx,kw) in zip(bxs,kws) if kw in CLOTHING_FEATURES_LIST]
if not cf_:
fallback_box= ctx.define_result.get("largest_box")
if fallback_box and isinstance(fallback_box,list) and len(fallback_box)==4:
cf_=[fallback_box]
else:
return fallback_center_crop()
x1= max(0, min(b[0] for b in cf_))
y1= max(0, min(b[1] for b in cf_))
x2= min(w, max(b[2] for b in cf_))
y2= min(h, max(b[3] for b in cf_))
bw= x2- x1
bh= y2- y1
if bw<=0 or bh<=0:
return fallback_center_crop()
side0= min(bw,bh)
if side0< rbc_min:
side0= rbc_min
cx= (x1+ x2)//2
cy= (y1+ y2)//2
half= side0//2
left_= cx- half
top_= cy- half
right_= left_ + side0
bot_= top_ + side0
if left_<0:
left_=0
right_= side0
if top_<0:
top_=0
bot_= side0
if right_> w:
right_= w
left_= w- side0
if bot_> h:
bot_= h
top_= h- side0
cropped= img.crop((left_,top_,right_,bot_))
changes={
"left": -left_,
"top": -top_,
"right": -(w- right_),
"bottom": -(h- bot_)
}
return (cropped, changes)
# ----------------------------------------------------------------------
# ACTION DICTIONARIES
# ----------------------------------------------------------------------
ACTION_LIB_SQUARE = {
"square_shoes_exception": "SQUARE_SHOES => pad bottom ignoring lower_line, no top",
"square_head_exception": "SQUARE_HEAD => lines => no changes",
"square_has_lines": "SQUARE_HAS_LINES => lines => no changes",
"square_all_false": "SQUARE_ALL_FALSE => no lines => 2-step pad => square",
}
ACTION_LIB_LANDSCAPE = {
"landscape_shoes_exception":"LANDSCAPE_SHOES => pad bottom, coverage-crop => square",
"landscape_coverage": "LANDSCAPE_COVERAGE => any line => coverage-crop => square",
"landscape_all_false": "LANDSCAPE_ALL_FALSE => no lines => 2-step pad => square",
"landscape_head_exception": "LANDSCAPE_HEAD => remove top pad, keep shape square",
}
ACTION_LIB_PORTRAIT = {
"portrait_shoes_exception": "PORTRAIT_SHOES => never pad top, pad bottom ignoring lower_line",
"portrait_lr_any": "PORTRAIT_LR_COVERAGE => (left_line/right_line) => coverage-crop => square",
"portrait_up_low_both": "PORTRAIT_BOTH_UP_LOW => pad left/right only => no top/bottom => square",
"portrait_any_up_low": "PORTRAIT_ANY_UP_OR_LOW => exactly 1 => 2-step pad => square",
"portrait_all_false": "PORTRAIT_ALL_FALSE => no lines => 2-step pad => square",
"portrait_head_exception": "PORTRAIT_HEAD => pad left/right only => no top/bottom => square"
}
# ----------------------------------------------------------------------
# PIPELINE FUNCTIONS
# ----------------------------------------------------------------------
def cropp_batch(contexts: List[ProcessingContext], batch_logs: List[dict]):
function_name = "cropp_batch"
start_time = time.perf_counter()
logging.log(LOG_LEVEL_MAP["INFO"], f"{EMOJI_MAP['INFO']} Starting {function_name} for {len(contexts)} items")
processed_count = 0
skipped_count = 0
error_count = 0
morph_structure = np.ones((5, 5), dtype=np.uint8)
for ctx in contexts:
it_ = {"image_url": ctx.url, "function": function_name}
if ctx.skip_run or ctx.skip_processing:
it_["status"] = "skipped"
batch_logs.append(it_)
skipped_count += 1
continue
dr = ctx.detection_result
if dr.get("status") != "ok":
it_["status"] = "no_detection"
batch_logs.append(it_)
error_count += 1
continue
if "original" not in ctx.pil_img:
it_["status"] = "no_original"
batch_logs.append(it_)
error_count += 1
continue
try:
pi_rgba, orig_filename, _ = ctx.pil_img["original"]
if ctx.adjusted_blue_box:
abx1, aby1, abx2, aby2 = ctx.adjusted_blue_box
W_full = pi_rgba.width
H_full = pi_rgba.height
x1 = 0
x2 = W_full
y1 = max(0, min(aby1, H_full))
y2 = max(0, min(aby2, H_full))
if y2 <= y1:
it_["status"] = "invalid_crop_range"
batch_logs.append(it_)
error_count += 1
continue
cropped = pi_rgba.crop((x1, y1, x2, y2))
else:
cropped = pi_rgba
cropped_np = np.array(cropped)
if cropped_np.shape[2] < 4:
cropped = cropped.convert("RGBA")
cropped_np = np.array(cropped)
alpha = cropped_np[:, :, 3]
bin_mask = (alpha > 0).astype(np.uint8)
bin_mask = binary_fill_holes(bin_mask).astype(np.uint8)
bin_mask = binary_closing(bin_mask, structure=morph_structure, iterations=1).astype(np.uint8)
labeled, num_components = label(bin_mask)
if num_components > 1:
largest_area = 0
largest_label = None
for i in range(1, num_components + 1):
area = (labeled == i).sum()
if area > largest_area:
largest_area = area
largest_label = i
bin_mask = (labeled == largest_label).astype(np.uint8)
alpha_clean = alpha.copy()
alpha_clean[bin_mask == 0] = 0
cropped_np[:, :, 3] = alpha_clean
non_zero_rows = np.where(np.any(bin_mask != 0, axis=1))[0]
non_zero_cols = np.where(np.any(bin_mask != 0, axis=0))[0]
if len(non_zero_rows) > 0 and len(non_zero_cols) > 0:
row_min, row_max = non_zero_rows[0], non_zero_rows[-1]
col_min, col_max = non_zero_cols[0], non_zero_cols[-1]
cropped_np = cropped_np[row_min:row_max + 1, col_min:col_max + 1, :]
final_img = Image.fromarray(cropped_np, mode="RGBA")
ctx.pil_img["original"] = [final_img, orig_filename, None]
it_["status"] = "ok"
processed_count += 1
except Exception as e:
it_["status"] = "error"
it_["exception"] = str(e)
error_count += 1
batch_logs.append(it_)
processing_time = time.perf_counter() - start_time
logging.log(LOG_LEVEL_MAP["SUCCESS"], f"{EMOJI_MAP['SUCCESS']} Completed {function_name}: processed={processed_count}, skipped={skipped_count}, errors={error_count} in {processing_time:.3f}s")
return batch_logs
def shrink_primary_box_batch(contexts: List[ProcessingContext], batch_logs: List[dict]):
function_name = "shrink_primary_box_batch"
start_time = time.perf_counter()
logging.log(LOG_LEVEL_MAP["INFO"], f"{EMOJI_MAP['INFO']} Starting {function_name} for {len(contexts)} items")
processed_count = 0
skipped_count = 0
error_count = 0
WHITE_CUTOFF = 240
for ctx in contexts:
step_log = {
"function": function_name,
"image_url": ctx.url,
"status": None,
"data": {
"primary_box": None,
"primary_box_dimensions": None,
"primary_box_orientation": None,
"primary_box_transparency": None,
"primary_box_border_lines_transparency": {},
"primary_shrinked_box_dimensions": None,
"primary_shrinked_box_transparency": None,
"primary_shrinked_box_border_lines_transparency": {},
"shrink_top": None,
"shrink_bottom": None,
"shrink_left": None,
"shrink_right": None,
"notes": ""
}
}
if ctx.skip_run or ctx.skip_processing:
step_log["status"] = "skipped"
batch_logs.append(step_log)
skipped_count += 1
continue
if "original" not in ctx.pil_img:
step_log["status"] = "error"
step_log["data"]["notes"] = "No original image found in context."
batch_logs.append(step_log)
ctx.skip_run = True
error_count += 1
continue
try:
pil_img_obj = ctx.pil_img["original"][0]
width, height = pil_img_obj.size
alpha = pil_img_obj.getchannel("A")
top, bottom = 0, height - 1
left, right = 0, width - 1
while top < height:
row_data = alpha.crop((0, top, width, top + 1)).tobytes()
if all(v == 0 for v in row_data):
top += 1
else:
break
while bottom >= 0:
row_data = alpha.crop((0, bottom, width, bottom + 1)).tobytes()
if all(v == 0 for v in row_data):
bottom -= 1
else:
break
while left < width:
col_data = alpha.crop((left, 0, left + 1, height)).tobytes()
if all(v == 0 for v in col_data):
left += 1
else:
break
while right >= 0:
col_data = alpha.crop((right, 0, right + 1, height)).tobytes()
if all(v == 0 for v in col_data):
right -= 1
else:
break
pil_rgb = pil_img_obj.convert("RGB")
px = pil_rgb.load()
def is_white_row(row_idx: int) -> bool:
for x in range(left, right + 1):
r, g, b = px[x, row_idx]
if not (r >= WHITE_CUTOFF and g >= WHITE_CUTOFF and b >= WHITE_CUTOFF):
return False
return True
def is_white_col(col_idx: int) -> bool:
for y in range(top, bottom + 1):
r, g, b = px[col_idx, y]
if not (r >= WHITE_CUTOFF and g >= WHITE_CUTOFF and b >= WHITE_CUTOFF):
return False
return True
while top <= bottom:
if is_white_row(top):
top += 1
else:
break
while bottom >= top:
if is_white_row(bottom):
bottom -= 1
else:
break
while left <= right:
if is_white_col(left):
left += 1
else:
break
while right >= left:
if is_white_col(right):
right -= 1
else:
break
if left > right or top > bottom:
step_log["data"]["notes"] += " Entire image trimmed away by alpha/white => skipping"
step_log["status"] = "error"
batch_logs.append(step_log)
ctx.skip_run = True
error_count += 1
continue
shrink_top = top
shrink_bottom = (height - 1) - bottom
shrink_left = left
shrink_right = (width - 1) - right
step_log["data"]["shrink_top"] = shrink_top
step_log["data"]["shrink_bottom"] = shrink_bottom
step_log["data"]["shrink_left"] = shrink_left
step_log["data"]["shrink_right"] = shrink_right
primary_box = [left, top, right, bottom]
w = right - left + 1
h = bottom - top + 1
step_log["data"]["primary_box"] = primary_box
step_log["data"]["primary_box_dimensions"] = [w, h]
orientation = "Square"
if h > w:
orientation = "Portrait"
elif w > h:
orientation = "Landscape"
step_log["data"]["primary_box_orientation"] = orientation
cropped_img = pil_img_obj.crop((left, top, right + 1, bottom + 1))
box_transparency = calculate_transparency(cropped_img)
step_log["data"]["primary_box_transparency"] = box_transparency
ctx.pil_img["original"] = [cropped_img, ctx.pil_img["original"][1], None]
cw, ch = cropped_img.size
step_log["data"]["primary_shrinked_box_dimensions"] = [cw, ch]
step_log["data"]["primary_shrinked_box_transparency"] = box_transparency
step_log["status"] = "ok"
step_log["data"]["notes"] += " alpha+white trim done."
ctx.define_result["primary_box_transparency"] = box_transparency
processed_count += 1
except Exception as e:
step_log["status"] = "error"
step_log["data"]["notes"] = f"Exception: {e}"
error_count += 1
batch_logs.append(step_log)
processing_time = time.perf_counter() - start_time
logging.log(LOG_LEVEL_MAP["SUCCESS"], f"{EMOJI_MAP['SUCCESS']} Completed {function_name}: processed={processed_count}, skipped={skipped_count}, errors={error_count} in {processing_time:.3f}s")
return batch_logs
def detect_border_stright_line_batch(contexts: List[ProcessingContext], batch_logs: List[dict]):
function_name = "detect_border_stright_line_batch"
start_time = time.perf_counter()
logging.log(LOG_LEVEL_MAP["INFO"], f"{EMOJI_MAP['INFO']} Starting {function_name} for {len(contexts)} items")
processed_count = 0
skipped_count = 0
error_count = 0
local_patch_size = 7
std_threshold = 5
for ctx in contexts:
step_log = {
"function": function_name,
"image_url": ctx.url,
"status": None,
"data": {
"left_line": False,
"right_line": False,
"upper_line": False,
"lower_line": False,
"left_line_coverage": 0.0,
"right_line_coverage": 0.0,
"upper_line_coverage": 0.0,
"lower_line_coverage": 0.0,
"left_feather_ratio": 0.0,
"right_feather_ratio": 0.0,
"upper_feather_ratio": 0.0,
"lower_feather_ratio": 0.0,
"performed_action": "single_px_border_feather_ratio_inverted_logic",
"current_feather_threshold": (0.0, 0.0),
"current_coverage_threshold": 0.0
}
}
if ctx.skip_run or ctx.skip_processing:
step_log["status"] = "skipped"
batch_logs.append(step_log)
skipped_count += 1
continue
if "original" not in ctx.pil_img:
step_log["status"] = "error"
step_log["error"] = "No padded image found in context."
ctx.skip_run = True
batch_logs.append(step_log)
error_count += 1
continue
try:
pil_img_obj = ctx.pil_img["original"][0]
w, h = pil_img_obj.size
if w == 0 or h == 0:
step_log["status"] = "error"
step_log["error"] = f"Invalid dims (w={w}, h={h})"
ctx.skip_run = True
batch_logs.append(step_log)
error_count += 1
continue
pil_rgba = pil_img_obj.convert("RGBA")
top_cov, bot_cov, left_cov, right_cov = 0.0, 0.0, 0.0, 0.0
if h > 0:
strip_top = pil_rgba.crop((0, 0, w, 1))
top_cov = calculate_transparency(strip_top)
step_log["data"]["upper_line_coverage"] = round(top_cov, 3)
if h > 1:
strip_bot = pil_rgba.crop((0, h - 1, w, h))
bot_cov = calculate_transparency(strip_bot)
step_log["data"]["lower_line_coverage"] = round(bot_cov, 3)
if w > 0:
strip_left = pil_rgba.crop((0, 0, 1, h))
left_cov = calculate_transparency(strip_left)
step_log["data"]["left_line_coverage"] = round(left_cov, 3)
if w > 1:
strip_right = pil_rgba.crop((w - 1, 0, w, h))
right_cov = calculate_transparency(strip_right)
step_log["data"]["right_line_coverage"] = round(right_cov, 3)
px_data = pil_rgba.load()
def patch_alpha_values(cx, cy):
half = local_patch_size // 2
vals = []
for dy in range(-half, half+1):
for dx in range(-half, half+1):
nx = cx + dx
ny = cy + dy
if 0 <= nx < w and 0 <= ny < h:
_, _, _, a_ = px_data[nx, ny]
vals.append(a_)
return vals
def is_feather_pixel(alpha_vals):
if len(alpha_vals) <= 1:
return True
avg_ = sum(alpha_vals) / len(alpha_vals)
var_ = sum((v - avg_)**2 for v in alpha_vals) / len(alpha_vals)
return (var_**0.5 < std_threshold)
def measure_feather_ratio(x1, y1, x2, y2):
ww = x2 - x1
hh = y2 - y1
total_ = ww * hh
if total_ <= 0:
return 0.0
c_ = 0
for yy in range(y1, y2):
for xx in range(x1, x2):
pv = patch_alpha_values(xx, yy)
if is_feather_pixel(pv):
c_ += 1
return c_ / float(total_)
top_f = 0.0
bot_f = 0.0
left_f = 0.0
right_f = 0.0
if h > 0:
top_f = measure_feather_ratio(0, 0, w, 1)
step_log["data"]["upper_feather_ratio"] = round(top_f, 3)
if h > 1:
bot_f = measure_feather_ratio(0, h - 1, w, h)
step_log["data"]["lower_feather_ratio"] = round(bot_f, 3)
if w > 0:
left_f = measure_feather_ratio(0, 0, 1, h)
step_log["data"]["left_feather_ratio"] = round(left_f, 3)
if w > 1:
right_f = measure_feather_ratio(w - 1, 0, w, h)
step_log["data"]["right_feather_ratio"] = round(right_f, 3)
if top_cov >= COVERAGE_THRESHOLD and (top_f < FEATHER_THRESHOLD_MIN or top_f > FEATHER_THRESHOLD_MAX):
step_log["data"]["upper_line"] = True
if bot_cov >= COVERAGE_THRESHOLD and (bot_f < FEATHER_THRESHOLD_MIN or bot_f > FEATHER_THRESHOLD_MAX):
step_log["data"]["lower_line"] = True
if left_cov >= COVERAGE_THRESHOLD and (left_f < FEATHER_THRESHOLD_MIN or left_f > FEATHER_THRESHOLD_MAX):
step_log["data"]["left_line"] = True
if right_cov >= COVERAGE_THRESHOLD and (right_f < FEATHER_THRESHOLD_MIN or right_f > FEATHER_THRESHOLD_MAX):
step_log["data"]["right_line"] = True
ctx.define_result["borders"] = {
"left_line": step_log["data"]["left_line"],
"right_line": step_log["data"]["right_line"],
"upper_line": step_log["data"]["upper_line"],
"lower_line": step_log["data"]["lower_line"]
}
step_log["data"]["current_feather_threshold"] = (FEATHER_THRESHOLD_MIN, FEATHER_THRESHOLD_MAX)
step_log["data"]["current_coverage_threshold"] = COVERAGE_THRESHOLD
step_log["status"] = "ok"
processed_count += 1
except Exception as e:
step_log["status"] = "error"
step_log["error"] = str(e)
error_count += 1
batch_logs.append(step_log)
processing_time = time.perf_counter() - start_time
logging.log(LOG_LEVEL_MAP["SUCCESS"], f"{EMOJI_MAP['SUCCESS']} Completed {function_name}: processed={processed_count}, skipped={skipped_count}, errors={error_count} in {processing_time:.3f}s")
return batch_logs
def pad_image_box_to_squere_batch(contexts: List[ProcessingContext], batch_logs: List[dict]):
function_name = "pad_image_box_to_squere_batch"
start_time = time.perf_counter()
logging.log(LOG_LEVEL_MAP["INFO"], f"{EMOJI_MAP['INFO']} Starting {function_name} for {len(contexts)} items")
processed_count = 0
skipped_count = 0
error_count = 0
for ctx in contexts:
step_log = {
"function": function_name,
"image_url": ctx.url,
"status": None,
"data": {
"primary_width": None,
"primary_height": None,
"primary_orientation": None,
"border_lines": {},
"final_width": None,
"final_height": None,
"condition": None,
"actions": []
}
}
if ctx.skip_run or ctx.skip_processing:
step_log["status"]="skipped"
batch_logs.append(step_log)
skipped_count += 1
continue
if "original" not in ctx.pil_img:
step_log["status"]="error"
step_log["data"]["actions"].append("ERROR => RBC missing or no original image.")
ctx.skip_run= True
batch_logs.append(step_log)
error_count += 1
continue
try:
im, fn, _= ctx.pil_img["original"]
w,h= im.size
step_log["data"]["primary_width"]= w
step_log["data"]["primary_height"]= h
if w==h:
orientation= "Square"
elif w> h:
orientation= "Landscape"
else:
orientation= "Portrait"
step_log["data"]["primary_orientation"]= orientation
brds= ctx.define_result.get("borders",{})
left_line = parse_line_flag(brds.get("left_line",False))
right_line = parse_line_flag(brds.get("right_line",False))
upper_line = parse_line_flag(brds.get("upper_line",False))
lower_line = parse_line_flag(brds.get("lower_line",False))
step_log["data"]["border_lines"]={
"left_line": str(left_line),
"right_line": str(right_line),
"upper_line": str(upper_line),
"lower_line": str(lower_line)
}
dr= ctx.detection_result
final_kws= dr.get("final_keywords",[]) if dr else []
shoes_detected= any(k in SHOES_LIST for k in final_kws)
head_detected= any(k in HEAD_LIST for k in final_kws)
border_pad= int(0.075* max(w,h))
final_img= im
px_info= {"left":0,"right":0,"top":0,"bottom":0}
scenario= None
if orientation=="Square":
if shoes_detected:
scenario= "square_shoes_exception"
pl= border_pad if not left_line else 0
pr= border_pad if not right_line else 0
pt= 0
pb= border_pad
padded,cA= partial_pad_square(final_img, pl,pr,pt,pb)
for kk in cA: px_info[kk]+= cA[kk]
final_img= padded
elif (left_line or right_line or upper_line or lower_line):
scenario= "square_has_lines"
else:
scenario= "square_all_false"
wtmp2,cB= two_step_pad_to_square(final_img,"Landscape",border_pad)
for kk in cB: px_info[kk]+= cB[kk]
final_img= wtmp2
if head_detected and not shoes_detected:
scenario= "square_head_exception"
step_log["data"]["condition"]= scenario
step_log["data"]["actions"]= [ ACTION_LIB_SQUARE[scenario] ]
elif orientation=="Landscape":
if shoes_detected:
scenario= "landscape_shoes_exception"
pad_l=0; pad_r=0; pad_t=0; pad_b= border_pad
padded0,c0= partial_pad_square(final_img,pad_l,pad_r,pad_t,pad_b)
for kk in c0: px_info[kk]+= c0[kk]
final_img= padded0
cimg,cx1= coverage_crop_with_shorter_dimension(
final_img, ctx, "landscape", False
)
for kk in cx1: px_info[kk]+= cx1[kk]
final_img= cimg
elif (left_line or right_line or upper_line or lower_line):
scenario= "landscape_coverage"
cimg2,cx2= coverage_crop_with_shorter_dimension(
final_img, ctx, "landscape", False
)
for kk in cx2: px_info[kk]+= cx2[kk]
final_img= cimg2
else:
scenario= "landscape_all_false"
wtmp3,c3= two_step_pad_to_square(final_img,"Landscape",border_pad)
for kk in c3: px_info[kk]+= c3[kk]
final_img= wtmp3
if head_detected and not shoes_detected:
scenario= "landscape_head_exception"
if h> w:
forced_y = h - w
ctx.define_result["largest_box"] = [0, forced_y, w, h]
cimgH,cxH= coverage_crop_with_shorter_dimension(
final_img, ctx, "landscape", False
)
for kk in cxH: px_info[kk]+= cxH[kk]
final_img= cimgH
step_log["data"]["condition"]= scenario
step_log["data"]["actions"]= [ ACTION_LIB_LANDSCAPE[scenario] ]
else:
up_low_count= (1 if upper_line else 0)+(1 if lower_line else 0)
if shoes_detected:
scenario= "portrait_shoes_exception"
pl= border_pad if not left_line else 0
pr= border_pad if not right_line else 0
pt=0
pb= border_pad
paddedP,cS= partial_pad_square(final_img, pl,pr,pt,pb)
for kk in cS: px_info[kk]+= cS[kk]
final_img= paddedP
elif head_detected:
scenario= "portrait_head_exception"
side_diff= h - w if h> w else 0
half_ = side_diff//2
leftover= side_diff- half_
pimg, cHL= partial_pad_square(final_img, half_, leftover, 0, 0)
for kk in cHL: px_info[kk]+= cHL[kk]
final_img= pimg
elif (left_line or right_line):
scenario= "portrait_lr_any"
cimg3,c33= coverage_crop_with_shorter_dimension(
final_img, ctx, "portrait", False
)
for kk in c33: px_info[kk]+= c33[kk]
final_img= cimg3
elif up_low_count==2:
scenario= "portrait_up_low_both"
wtmp4,c44= pad_left_right_only(final_img,border_pad)
for kk in c44: px_info[kk]+= c44[kk]
final_img= wtmp4
elif up_low_count==1:
scenario= "portrait_any_up_low"
wtmp5,c55= two_step_pad_to_square(final_img,"Portrait",border_pad)
for kk in c55: px_info[kk]+= c55[kk]
final_img= wtmp5
else:
scenario= "portrait_all_false"
wtmp6,c66= two_step_pad_to_square(final_img,"Portrait",border_pad)
for kk in c66: px_info[kk]+= c66[kk]
final_img= wtmp6
step_log["data"]["condition"]= scenario
step_log["data"]["actions"]= [ ACTION_LIB_PORTRAIT[scenario] ]
step_log["status"]="ok"
fw,fh= final_img.size
step_log["data"]["final_width"] = fw
step_log["data"]["final_height"]= fh
def plus_minus(dx):
return f"+{dx}" if dx>=0 else str(dx)
step_log["data"]["border_lines"] = {
"left_line": f"{left_line} {plus_minus(px_info['left'])}px",
"right_line": f"{right_line} {plus_minus(px_info['right'])}px",
"upper_line": f"{upper_line} {plus_minus(px_info['top'])}px",
"lower_line": f"{lower_line} {plus_minus(px_info['bottom'])}px"
}
ctx.pil_img["original"]= [final_img, fn, None]
ctx.pad_info.update(px_info)
processed_count += 1
except Exception as e:
step_log["status"]="error"
step_log["data"]["actions"].append(f"ERROR => exception => {repr(e)}")
error_count += 1
batch_logs.append(step_log)
processing_time = time.perf_counter() - start_time
logging.log(LOG_LEVEL_MAP["SUCCESS"], f"{EMOJI_MAP['SUCCESS']} Completed {function_name}: processed={processed_count}, skipped={skipped_count}, errors={error_count} in {processing_time:.3f}s")
return batch_logs
def center_object_batch(contexts: List[ProcessingContext], batch_logs: List[dict]):
function_name = "center_object_batch"
start_time = time.perf_counter()
logging.log(LOG_LEVEL_MAP["INFO"], f"{EMOJI_MAP['INFO']} Starting {function_name} for {len(contexts)} items")
processed_count = 0
skipped_count = 0
error_count = 0
for ctx in contexts:
step_log = {
"function": function_name,
"image_url": ctx.url,
"status": None,
"data": {
"leftmost_x": None,
"rightmost_x": None,
"midpoint_x": None,
"shift_x": None,
"bbox": None,
"notes": ""
}
}
if ctx.skip_run or ctx.skip_processing:
step_log["status"] = "skipped"
batch_logs.append(step_log)
skipped_count += 1
continue
if "original" not in ctx.pil_img:
step_log["status"] = "error"
step_log["data"]["notes"] = "No final image found in context."
ctx.skip_run = True
batch_logs.append(step_log)
error_count += 1
continue
try:
pil_img, _, _ = ctx.pil_img["original"]
image = pil_img.convert("RGBA")
width, height = image.size
center_y = height // 2
alpha = image.split()[3]
non_transparent_xs = []
for x in range(width):
if alpha.getpixel((x, center_y)) != 0:
non_transparent_xs.append(x)
if not non_transparent_xs:
step_log["status"] = "no_op"
step_log["data"]["notes"] = "No non-transparent pixel found on horizontal mid-line."
batch_logs.append(step_log)
skipped_count += 1
continue
leftmost = min(non_transparent_xs)
rightmost = max(non_transparent_xs)
midpoint_x = (leftmost + rightmost) / 2.0
image_center_x = width / 2.0
shift_x = image_center_x - midpoint_x
bbox = alpha.getbbox()
if not bbox:
step_log["status"] = "no_op"
step_log["data"]["notes"] = "Image has no non-transparent bounding box."
batch_logs.append(step_log)
skipped_count += 1
continue
region = image.crop(bbox)
new_left = int(bbox[0] + shift_x)
new_top = bbox[1]
new_image = Image.new("RGBA", (width, height), (0, 0, 0, 0))
new_image.paste(region, (new_left, new_top), region)
ctx.pil_img["original"] = [new_image, ctx.pil_img["original"][1], None]
step_log["status"] = "ok"
step_log["data"]["leftmost_x"] = leftmost
step_log["data"]["rightmost_x"] = rightmost
step_log["data"]["midpoint_x"] = round(midpoint_x, 2)
step_log["data"]["shift_x"] = round(shift_x, 2)
step_log["data"]["bbox"] = bbox
step_log["data"]["notes"] = "Object horizontally centered (red lines removed)."
processed_count += 1
except Exception as e:
step_log["status"] = "error"
step_log["data"]["notes"] = f"Error: {str(e)}"
error_count += 1
batch_logs.append(step_log)
processing_time = time.perf_counter() - start_time
logging.log(LOG_LEVEL_MAP["SUCCESS"], f"{EMOJI_MAP['SUCCESS']} Completed {function_name}: processed={processed_count}, skipped={skipped_count}, errors={error_count} in {processing_time:.3f}s")
return batch_logs
# ----------------------------------------------------------------------
# MAIN PIPELINE FUNCTION
# ----------------------------------------------------------------------
def ensure_models_loaded():
import app
app.ensure_models_loaded()
pipeline_step = create_pipeline_step(ensure_models_loaded)
@pipeline_step
def cropping_padding(contexts: List[ProcessingContext], batch_logs: List[dict] = None):
if batch_logs is None:
batch_logs = []
start_time = time.perf_counter()
logging.log(LOG_LEVEL_MAP["INFO"], f"{EMOJI_MAP['INFO']} Starting cropping_padding pipeline for {len(contexts)} items")
if not ENABLE_CROPPING_PADDING:
logging.log(LOG_LEVEL_MAP["WARNING"], f"{EMOJI_MAP['WARNING']} Cropping and padding operations are disabled (ENABLE_CROPPING_PADDING=False)")
logging.log(LOG_LEVEL_MAP["INFO"], f"{EMOJI_MAP['INFO']} Returning original images unchanged")
for ctx in contexts:
skip_log = {
"function": "cropping_padding_pipeline",
"image_url": ctx.url,
"status": "skipped",
"reason": "ENABLE_CROPPING_PADDING is False",
"data": {"operations_performed": "none", "original_image_preserved": True}
}
batch_logs.append(skip_log)
processing_time = time.perf_counter() - start_time
logging.log(LOG_LEVEL_MAP["SUCCESS"], f"{EMOJI_MAP['SUCCESS']} Completed cropping_padding pipeline (skipped) for {len(contexts)} items in {processing_time:.3f}s")
return batch_logs
cropp_batch(contexts, batch_logs)
shrink_primary_box_batch(contexts, batch_logs)
detect_border_stright_line_batch(contexts, batch_logs)
pad_image_box_to_squere_batch(contexts, batch_logs)
center_object_batch(contexts, batch_logs)
processing_time = time.perf_counter() - start_time
logging.log(LOG_LEVEL_MAP["SUCCESS"], f"{EMOJI_MAP['SUCCESS']} Completed cropping_padding pipeline for {len(contexts)} items in {processing_time:.3f}s")
return batch_logs