|
|
|
|
|
|
|
import torch |
|
import numpy as np |
|
import logging |
|
import time |
|
import spaces |
|
import sys |
|
from PIL import Image, ImageDraw |
|
from collections import defaultdict |
|
from typing import Dict, List, Optional, Tuple, Any, Union |
|
from src.utils import ProcessingContext, create_pipeline_step, LOG_LEVEL_MAP, EMOJI_MAP |
|
from src.processing.bounding_box.yolos_fashionpedia_model import ( |
|
detect_with_yolos_fashionpedia, |
|
detect_yolos_in_roi, |
|
process_yolos_results |
|
) |
|
from src.processing.bounding_box.rtdetr_model import ( |
|
detect_rtdetr_in_roi, |
|
detect_rtdetr_artifacts_in_roi, |
|
update_fallback_detection, |
|
get_rtdetr_clothing_labels, |
|
get_rtdetr_person_and_product_labels, |
|
get_rtdetr_artifact_labels, |
|
process_rtdetr_results |
|
) |
|
from src.processing.bounding_box.head_model import detect_head_in_roi |
|
|
|
|
|
|
|
|
|
|
|
Apply_Draw = False |
|
DETECT_ARTIFACTS = False |
|
|
|
PRODUCT_TYPE_BOX_FILTER_THRESHOLD = 0.4 |
|
HEAD_BOX_FILTER_THRESHOLD = 0.5 |
|
SHOES_BOX_FILTER_THRESHOLD = 0.5 |
|
CLOTHING_FEATURES_BOX_FILTER_THRESHOLD = 0.4 |
|
ARTIFACTS_BOX_FILTER_THRESHOLD = 0.7 |
|
|
|
BOX_OVERLAP = 50 |
|
|
|
LARGEST_BOX_EXTENSION_RATIO = 0.02 |
|
LARGEST_BOX_THRESHOLD = 5 |
|
|
|
CREATE_UPPER_BLUE_LOWER_GREEN_RATIO = 0.02 |
|
CREATE_LOWER_BLUE_LOWER_VIOLET_RATIO = 0.5 |
|
LEFT_RIGHT_BLUE_OFFSET = 0.5 |
|
LOWER_UPPER_BLUE_OFFSET = 0.02 |
|
BLUE_BOX_FALLBACK_THRESHOLD = 33 |
|
|
|
UPPER_HEAD_FILTER = 33 |
|
LOWER_SHOE_FILTER = 33 |
|
NECKLINE_COLLAR_HEAD = 50 |
|
|
|
BLUE_BOX_PRODUCT_TYPE = (0, 0, 255, 200) |
|
GREEN_BOX_HEAD = (0, 255, 0, 200) |
|
VIOLET_BOX_SHOES = (238, 130, 238, 200) |
|
ORANGE_BOX_CLOTHING_FEATURES = (255, 165, 0, 200) |
|
RED_BOX_ARTIFACTS = (255, 0, 0, 200) |
|
BLACK_BOX_PERSON = (0, 0, 0, 200) |
|
|
|
|
|
|
|
|
|
def extract_category_lists(): |
|
from .yolos_fashionpedia_model import MODEL_LABEL_CONFIG as YOLOS_CONFIG |
|
from .rtdetr_model import MODEL_LABEL_CONFIG as RTDETR_CONFIG |
|
from .head_model import MODEL_LABEL_CONFIG as HEAD_CONFIG |
|
|
|
MODEL_LABEL_CONFIG = {**YOLOS_CONFIG, **RTDETR_CONFIG, **HEAD_CONFIG} |
|
|
|
lists = { |
|
"PERSON_LIST": set(), |
|
"PRODUCT_TYPE_LIST": set(), |
|
"HEAD_LIST": set(), |
|
"SHOES_LIST": set(), |
|
"CLOTHING_FEATURES_LIST": set(), |
|
"ARTIFACTS_LIST": set() |
|
} |
|
|
|
for model_name, model_config in MODEL_LABEL_CONFIG.items(): |
|
for list_type, keywords in model_config.items(): |
|
list_key = list_type.upper() |
|
if list_key in lists: |
|
lists[list_key].update(keywords.keys()) |
|
|
|
return {k: list(v) for k, v in lists.items()} |
|
|
|
extracted_lists = extract_category_lists() |
|
PERSON_LIST = extracted_lists["PERSON_LIST"] |
|
PRODUCT_TYPE_LIST = extracted_lists["PRODUCT_TYPE_LIST"] |
|
HEAD_LIST = extracted_lists["HEAD_LIST"] |
|
SHOES_LIST = extracted_lists["SHOES_LIST"] |
|
CLOTHING_FEATURES_LIST = extracted_lists["CLOTHING_FEATURES_LIST"] |
|
ARTIFACTS_LIST = extracted_lists["ARTIFACTS_LIST"] |
|
|
|
|
|
|
|
|
|
def get_label_name_from_model(model, label_id): |
|
if hasattr(model, 'config') and hasattr(model.config, 'id2label'): |
|
return model.config.id2label.get(label_id, f"unknown_{label_id}").lower() |
|
if hasattr(model, 'model_labels') and isinstance(model.model_labels, dict): |
|
return model.model_labels.get(label_id, f"unknown_{label_id}").lower() |
|
return f"unknown_{label_id}" |
|
|
|
def clamp_box_to_region(box: List[int], region: List[int]) -> List[int]: |
|
x1, y1, x2, y2 = box |
|
rx1, ry1, rx2, ry2 = region |
|
xx1 = max(rx1, min(x1, rx2)) |
|
yy1 = max(ry1, min(y1, ry2)) |
|
xx2 = max(rx1, min(x2, rx2)) |
|
yy2 = max(ry1, min(y2, ry2)) |
|
return [xx1, yy1, xx2, yy2] |
|
|
|
def box_iou(b1, b2): |
|
xx1 = max(b1[0], b2[0]) |
|
yy1 = max(b1[1], b2[1]) |
|
xx2 = min(b1[2], b2[2]) |
|
yy2 = min(b1[3], b2[3]) |
|
|
|
iw = max(0, xx2 - xx1) |
|
ih = max(0, yy2 - yy1) |
|
inter_area = iw * ih |
|
|
|
area1 = abs(b1[2] - b1[0]) * abs(b1[3] - b1[1]) |
|
area2 = abs(b2[2] - b2[0]) * abs(b2[3] - b2[1]) |
|
union = area1 + area2 - inter_area |
|
|
|
if union <= 0: |
|
return 0 |
|
return inter_area / union |
|
|
|
def build_keywords(product_type: str) -> List[str]: |
|
pt = product_type.lower().strip() |
|
if not pt or pt == "unknown": |
|
return [] |
|
|
|
kw = [pt] |
|
if pt in ["jeans", "shorts", "skirt"]: |
|
kw += ["shoes", "closure", "pocket"] |
|
elif pt in ["jacket", "vest", "shirt"]: |
|
kw += ["head", "closure", "pocket", "collar", "sleeve"] |
|
elif pt in ["overall", "dress"]: |
|
kw += ["head", "shoes", "closure", "pocket", "neckline", "sleeve"] |
|
|
|
if DETECT_ARTIFACTS: |
|
kw += ["bag", "cup"] |
|
return kw |
|
|
|
def apply_box_overlap_filter( |
|
boxes: List[List[int]], |
|
labels: List[int], |
|
scores: List[float], |
|
keywords: List[str], |
|
raw_labels: List[str], |
|
models: List[str] |
|
): |
|
grouped = defaultdict(list) |
|
for i in range(len(boxes)): |
|
kwd = keywords[i] |
|
grouped[kwd].append({ |
|
"box": boxes[i], |
|
"label": labels[i], |
|
"score": scores[i], |
|
"raw_label": raw_labels[i], |
|
"model": models[i] |
|
}) |
|
|
|
new_boxes = [] |
|
new_labels = [] |
|
new_scores = [] |
|
new_kws = [] |
|
new_raws = [] |
|
new_models = [] |
|
overlap_thresh = BOX_OVERLAP / 100.0 |
|
|
|
for kwd, items in grouped.items(): |
|
sorted_items = sorted(items, key=lambda x: x["score"], reverse=True) |
|
|
|
if sorted_items: |
|
best_item = sorted_items[0] |
|
new_boxes.append(best_item["box"]) |
|
new_labels.append(best_item["label"]) |
|
new_scores.append(round(best_item["score"], 2)) |
|
new_kws.append(kwd) |
|
new_raws.append(best_item["raw_label"]) |
|
new_models.append(best_item["model"]) |
|
|
|
return new_boxes, new_labels, new_scores, new_kws, new_raws, new_models |
|
|
|
def get_threshold_for_keyword(kw: str) -> float: |
|
if kw in PRODUCT_TYPE_LIST: |
|
return PRODUCT_TYPE_BOX_FILTER_THRESHOLD |
|
elif kw in HEAD_LIST: |
|
return HEAD_BOX_FILTER_THRESHOLD |
|
elif kw in SHOES_LIST: |
|
return SHOES_BOX_FILTER_THRESHOLD |
|
elif kw in CLOTHING_FEATURES_LIST: |
|
return CLOTHING_FEATURES_BOX_FILTER_THRESHOLD |
|
elif kw in ARTIFACTS_LIST: |
|
return ARTIFACTS_BOX_FILTER_THRESHOLD |
|
return 0.0 |
|
|
|
def map_label_to_keyword(label_name: str, valid_kws: List[str], model_name: str) -> Optional[str]: |
|
ln = label_name.strip().lower() |
|
|
|
if ln in valid_kws: |
|
return ln |
|
|
|
if model_name == "yolos_fashionpedia_model": |
|
from .yolos_fashionpedia_model import MODEL_LABEL_CONFIG |
|
elif model_name == "rtdetr_model": |
|
from .rtdetr_model import MODEL_LABEL_CONFIG |
|
elif model_name == "head_model": |
|
from .head_model import MODEL_LABEL_CONFIG |
|
else: |
|
return None |
|
|
|
model_config = MODEL_LABEL_CONFIG.get(model_name, {}) |
|
|
|
for list_type in ["person_list", "product_type_list", "head_list", |
|
"shoes_list", "clothing_features_list", "artifacts_list"]: |
|
category_config = model_config.get(list_type, {}) |
|
|
|
for keyword, labels in category_config.items(): |
|
if keyword in valid_kws: |
|
for label in labels: |
|
if ln == label.lower() or ln in label.lower(): |
|
return keyword |
|
|
|
return None |
|
|
|
|
|
|
|
|
|
def validate_model_configurations(): |
|
from .yolos_fashionpedia_model import MODEL_LABEL_CONFIG as YOLOS_CONFIG |
|
from .rtdetr_model import MODEL_LABEL_CONFIG as RTDETR_CONFIG |
|
from .head_model import MODEL_LABEL_CONFIG as HEAD_CONFIG |
|
|
|
MODEL_LABEL_CONFIG = {**YOLOS_CONFIG, **RTDETR_CONFIG, **HEAD_CONFIG} |
|
|
|
logging.info("=" * 70) |
|
logging.info("✅ MODEL CONFIGURATIONS VALIDATION") |
|
logging.info("=" * 70) |
|
|
|
for model_name, model_config in MODEL_LABEL_CONFIG.items(): |
|
logging.info(f"\n📌 {model_name.upper()}") |
|
logging.info("-" * 50) |
|
|
|
total_prompts = 0 |
|
for category, category_config in model_config.items(): |
|
category_prompts = 0 |
|
for keyword, labels in category_config.items(): |
|
category_prompts += len(labels) |
|
total_prompts += len(labels) |
|
|
|
if category_config: |
|
logging.info(f" 📂 {category}: {len(category_config)} keywords, {category_prompts} prompts") |
|
|
|
logging.info(f" 🎯 Total: {total_prompts} prompts") |
|
|
|
if model_name == "yolos_fashionpedia_model": |
|
logging.info(" ✨ YOLOS Fashionpedia specializes in fashion detection!") |
|
|
|
logging.info("\n" + "=" * 70) |
|
logging.info("🚀 FEATURE CAPABILITIES:") |
|
logging.info("-" * 50) |
|
logging.info(f" • YOLOS Fashionpedia Detection: ENABLED") |
|
logging.info(f" • Artifact Detection: {'ENABLED' if DETECT_ARTIFACTS else 'DISABLED'}") |
|
logging.info(f" • Apply Draw (Visualization): {'ENABLED' if Apply_Draw else 'DISABLED'}") |
|
logging.info("=" * 70) |
|
|
|
|
|
|
|
|
|
def define_largest_box_batch(contexts, batch_logs, RTDETR_PROCESSOR, RTDETR_MODEL, RTDETR_FULL_PRECISION, DEVICE, MODELS_LOADED, LOAD_ERROR): |
|
from .rtdetr_model import RTDETR_CONF |
|
|
|
function_name = "define_largest_box_batch" |
|
start_time = time.perf_counter() |
|
logging.log(LOG_LEVEL_MAP["INFO"], f"{EMOJI_MAP['INFO']} Starting {function_name} for {len(contexts)} items") |
|
|
|
items_for_batch = [] |
|
valid_ctx_indices = [] |
|
batch_items = [] |
|
image_sizes = [] |
|
|
|
for i, ctx in enumerate(contexts): |
|
item_ = { |
|
"image_url": ctx.url, |
|
"data": {"largest_box": None}, |
|
"function": function_name |
|
} |
|
|
|
if ctx.skip_run or ctx.skip_processing: |
|
item_["status"] = "skipped" |
|
batch_logs.append(item_) |
|
continue |
|
|
|
if not MODELS_LOADED: |
|
import sys |
|
import traceback |
|
|
|
error_msg = LOAD_ERROR or "Models not loaded" |
|
error_trace = traceback.format_exc() |
|
|
|
logging.error(f"CRITICAL: Model not loaded in {function_name}: {error_msg}") |
|
logging.error(f"Traceback:\n{error_trace}") |
|
|
|
item_["status"] = "critical_error" |
|
item_["exception"] = error_msg |
|
item_["traceback"] = error_trace |
|
ctx.skip_run = True |
|
ctx.error = error_msg |
|
ctx.error_traceback = error_trace |
|
batch_logs.append(item_) |
|
|
|
logging.critical("Terminating due to model loading failure") |
|
sys.exit(1) |
|
|
|
if "original" not in ctx.pil_img: |
|
item_["status"] = "error" |
|
item_["exception"] = "No RBC 'original' found" |
|
ctx.skip_run = True |
|
batch_logs.append(item_) |
|
continue |
|
|
|
pi_rgba, _, _ = ctx.pil_img["original"] |
|
rgb_img = pi_rgba.convert("RGB") |
|
items_for_batch.append(rgb_img) |
|
image_sizes.append([rgb_img.height, rgb_img.width]) |
|
valid_ctx_indices.append(i) |
|
batch_items.append(item_) |
|
|
|
if not items_for_batch: |
|
processing_time = time.perf_counter() - start_time |
|
logging.log(LOG_LEVEL_MAP["INFO"], f"{EMOJI_MAP['INFO']} {function_name}: No valid items for batch processing in {processing_time:.3f}s") |
|
return batch_logs |
|
|
|
try: |
|
results = process_rtdetr_batch( |
|
items_for_batch, image_sizes, RTDETR_PROCESSOR, RTDETR_MODEL, DEVICE |
|
) |
|
|
|
rtdetr_clothing_labels = get_rtdetr_clothing_labels() |
|
rtdetr_person_product_labels = get_rtdetr_person_and_product_labels() |
|
|
|
for b_idx, i_ in enumerate(valid_ctx_indices): |
|
ctx = contexts[i_] |
|
item_ = batch_items[b_idx] |
|
pi_rgb = items_for_batch[b_idx] |
|
W, H = pi_rgb.size |
|
|
|
detection_log = { |
|
"model": "RT-DETR", |
|
"largest_area": 0, |
|
"total_area": W * H, |
|
"extension_ratio": LARGEST_BOX_EXTENSION_RATIO, |
|
"threshold": LARGEST_BOX_THRESHOLD, |
|
"raw_detections": [] |
|
} |
|
|
|
if isinstance(results, list): |
|
if b_idx < len(results): |
|
result = results[b_idx] |
|
else: |
|
result = results[0] |
|
else: |
|
result = results |
|
|
|
largest_area = 0 |
|
main_box = None |
|
|
|
for score, label, box in zip(result["scores"], result["labels"], result["boxes"]): |
|
label_id = label.item() |
|
score_val = score.item() |
|
x1, y1, x2, y2 = [int(val) for val in box.tolist()] |
|
label_name = get_label_name_from_model(RTDETR_MODEL, label_id) |
|
|
|
if label_name in rtdetr_person_product_labels: |
|
logging.log(LOG_LEVEL_MAP["INFO"], f"{EMOJI_MAP['INFO']} RT-DETR detected: {label_name} at score {score_val:.3f} | box=[{x1},{y1},{x2},{y2}]") |
|
detection_log["raw_detections"].append({ |
|
"box": [x1, y1, x2, y2], |
|
"score": round(score_val, 3), |
|
"label": label_name, |
|
"is_clothing": label_name in rtdetr_clothing_labels |
|
}) |
|
|
|
if label_name in rtdetr_clothing_labels: |
|
area_ = (x2 - x1) * (y2 - y1) |
|
|
|
if area_ > largest_area: |
|
largest_area = area_ |
|
main_box = [x1, y1, x2, y2] |
|
|
|
if main_box is not None: |
|
ratio_off = int(min(W, H) * LARGEST_BOX_EXTENSION_RATIO) |
|
x1, y1, x2, y2 = main_box |
|
x1e = max(0, x1 - ratio_off) |
|
y1e = max(0, y1 - ratio_off) |
|
x2e = min(W, x2 + ratio_off) |
|
y2e = min(H, y2 + ratio_off) |
|
main_box = [x1e, y1e, x2e, y2e] |
|
|
|
total_area = W * H |
|
if largest_area < total_area * (LARGEST_BOX_THRESHOLD / 100.0): |
|
main_box = [0, 0, W, H] |
|
logging.log(LOG_LEVEL_MAP["INFO"], f"{EMOJI_MAP['INFO']} Fallback to full image: box=[0,0,{W},{H}] | {ctx.url}") |
|
else: |
|
logging.log(LOG_LEVEL_MAP["INFO"], f"{EMOJI_MAP['INFO']} Largest box detected: box=[{x1e},{y1e},{x2e},{y2e}] | area={largest_area} | {ctx.url}") |
|
else: |
|
main_box = [0, 0, W, H] |
|
logging.log(LOG_LEVEL_MAP["INFO"], f"{EMOJI_MAP['INFO']} No clothing detected, using full image: box=[0,0,{W},{H}] | {ctx.url}") |
|
|
|
ctx.define_result["largest_box"] = main_box |
|
item_["data"]["largest_box"] = main_box |
|
|
|
detection_log["largest_area"] = largest_area |
|
item_["data"]["largest_box_detection"] = detection_log |
|
item_["status"] = "ok" |
|
|
|
except Exception as e: |
|
import sys |
|
import traceback |
|
|
|
error_msg = f"{function_name} error: {str(e)}" |
|
error_trace = traceback.format_exc() |
|
|
|
logging.error(f"CRITICAL: {error_msg}") |
|
logging.error(f"Traceback:\n{error_trace}") |
|
|
|
for b_idx, i_ in enumerate(valid_ctx_indices): |
|
ctx = contexts[i_] |
|
item_ = batch_items[b_idx] |
|
item_["status"] = "critical_error" |
|
item_["exception"] = error_msg |
|
item_["traceback"] = error_trace |
|
ctx.skip_run = True |
|
ctx.error = str(e) |
|
ctx.error_traceback = error_trace |
|
|
|
logging.critical("Terminating due to bounding box processing failure") |
|
sys.exit(1) |
|
|
|
for item_ in batch_items: |
|
batch_logs.append(item_) |
|
|
|
processing_time = time.perf_counter() - start_time |
|
logging.log(LOG_LEVEL_MAP["SUCCESS"], f"{EMOJI_MAP['SUCCESS']} Completed {function_name} for {len(batch_items)} items in {processing_time:.3f}s") |
|
return batch_logs |
|
|
|
def process_rtdetr_batch(images, image_sizes, RTDETR_PROCESSOR, RTDETR_MODEL, DEVICE): |
|
from .rtdetr_model import RTDETR_CONF |
|
|
|
try: |
|
rtdetr_inputs = RTDETR_PROCESSOR(images=images, return_tensors="pt").to(DEVICE) |
|
|
|
with torch.no_grad(): |
|
rtdetr_outputs = RTDETR_MODEL(**rtdetr_inputs) |
|
|
|
rtdetr_results = RTDETR_PROCESSOR.post_process_object_detection( |
|
rtdetr_outputs, |
|
target_sizes=torch.tensor(image_sizes).to(DEVICE), |
|
threshold=RTDETR_CONF |
|
) |
|
|
|
return rtdetr_results |
|
|
|
except Exception as e: |
|
logging.error(f"RT-DETR batch processing error: {str(e)}") |
|
return [] |
|
|
|
def process_single_image_detection(roi_rgb, keywords, rx1, ry1, rW, rH, detect_artifacts, detect_head, |
|
YOLOS_PROCESSOR, YOLOS_MODEL, |
|
RTDETR_PROCESSOR, RTDETR_MODEL, |
|
HEAD_PROCESSOR, HEAD_MODEL, HEAD_DETECTION_FULL_PRECISION, |
|
DEVICE): |
|
log_item = {"warnings": []} |
|
|
|
yolos_boxes, yolos_labels, yolos_scores, yolos_raws = detect_yolos_in_roi( |
|
roi_rgb, keywords, YOLOS_PROCESSOR, YOLOS_MODEL, DEVICE, log_item |
|
) |
|
|
|
rtdetr_boxes, rtdetr_labels, rtdetr_scores, rtdetr_raws = detect_rtdetr_in_roi( |
|
roi_rgb, RTDETR_PROCESSOR, RTDETR_MODEL, DEVICE, log_item |
|
) |
|
|
|
if detect_artifacts: |
|
artifact_boxes, artifact_labels, artifact_scores, artifact_raws = detect_rtdetr_artifacts_in_roi( |
|
roi_rgb, keywords, RTDETR_PROCESSOR, RTDETR_MODEL, DEVICE, log_item |
|
) |
|
else: |
|
artifact_boxes, artifact_labels, artifact_scores, artifact_raws = [], [], [], [] |
|
|
|
if detect_head: |
|
head_boxes, head_labels, head_scores, head_raws = detect_head_in_roi( |
|
roi_rgb, rx1, ry1, rW, rH, HEAD_PROCESSOR, HEAD_MODEL, |
|
HEAD_DETECTION_FULL_PRECISION, DEVICE, log_item |
|
) |
|
else: |
|
head_boxes, head_labels, head_scores, head_raws = [], [], [], [] |
|
|
|
return { |
|
'yolos': (yolos_boxes, yolos_labels, yolos_scores, yolos_raws), |
|
'rtdetr': (rtdetr_boxes, rtdetr_labels, rtdetr_scores, rtdetr_raws), |
|
'artifacts': (artifact_boxes, artifact_labels, artifact_scores, artifact_raws), |
|
'head': (head_boxes, head_labels, head_scores, head_raws), |
|
'warnings': log_item.get("warnings", []) |
|
} |
|
|
|
def detect_batch(contexts, batch_logs, |
|
HEAD_PROCESSOR, HEAD_MODEL, HEAD_DETECTION_FULL_PRECISION, |
|
RTDETR_PROCESSOR, RTDETR_MODEL, RTDETR_FULL_PRECISION, |
|
YOLOS_PROCESSOR, YOLOS_MODEL, |
|
DEVICE, MODELS_LOADED, LOAD_ERROR): |
|
|
|
function_name = "detect_batch" |
|
start_time = time.perf_counter() |
|
logging.log(LOG_LEVEL_MAP["INFO"], f"{EMOJI_MAP['INFO']} Starting {function_name} for {len(contexts)} items") |
|
|
|
for ctx in contexts: |
|
log_item = { |
|
"image_url": ctx.url, |
|
"data": { |
|
"detection_result_log": {}, |
|
"product_type": ctx.product_type, |
|
"keywords": ctx.keywords |
|
}, |
|
"function": function_name |
|
} |
|
|
|
logging.log(LOG_LEVEL_MAP["INFO"], f"{EMOJI_MAP['INFO']} Processing image: {ctx.url} | Product Type: {ctx.product_type} | Keywords: {ctx.keywords}") |
|
|
|
if ctx.skip_run or ctx.skip_processing: |
|
log_item["status"] = "skipped" |
|
batch_logs.append(log_item) |
|
continue |
|
|
|
if "original" not in ctx.pil_img: |
|
log_item["status"] = "error" |
|
log_item["exception"] = "No RBC 'original'" |
|
ctx.skip_run = True |
|
batch_logs.append(log_item) |
|
continue |
|
|
|
pi_rgba, _, _ = ctx.pil_img["original"] |
|
W, H = pi_rgba.size |
|
|
|
largest_box = ctx.define_result.get("largest_box") |
|
if (not largest_box or |
|
not isinstance(largest_box, list) or |
|
len(largest_box) != 4): |
|
log_item["status"] = "no_detection" |
|
log_item["data"]["detection_result_log"] = "no_main_box_detected" |
|
batch_logs.append(log_item) |
|
continue |
|
|
|
rx1, ry1, rx2, ry2 = largest_box |
|
rW = rx2 - rx1 |
|
rH = ry2 - ry1 |
|
|
|
logging.log(LOG_LEVEL_MAP["INFO"], f"{EMOJI_MAP['INFO']} ROI box: [{rx1},{ry1},{rx2},{ry2}] | size: {rW}x{rH} | {ctx.url}") |
|
|
|
roi = pi_rgba.crop((rx1, ry1, rx2, ry2)) |
|
roi_rgb = roi.convert("RGB") |
|
|
|
all_boxes = [] |
|
all_labels = [] |
|
all_scores = [] |
|
all_raw = [] |
|
all_models = [] |
|
|
|
def box_to_global(x1, y1, x2, y2): |
|
gx1 = int(rx1 + x1) |
|
gy1 = int(ry1 + y1) |
|
gx2 = int(rx1 + x2) |
|
gy2 = int(ry1 + y2) |
|
return [gx1, gy1, gx2, gy2] |
|
|
|
detect_artifacts = DETECT_ARTIFACTS and any(kw in ctx.keywords for kw in ARTIFACTS_LIST) |
|
lower_body_types = ["jeans", "shorts", "skirt"] |
|
detect_head = ctx.product_type.lower() not in lower_body_types |
|
|
|
detection_start = time.perf_counter() |
|
|
|
detection_results = process_single_image_detection( |
|
roi_rgb, ctx.keywords, rx1, ry1, rW, rH, detect_artifacts, detect_head, |
|
YOLOS_PROCESSOR, YOLOS_MODEL, |
|
RTDETR_PROCESSOR, RTDETR_MODEL, |
|
HEAD_PROCESSOR, HEAD_MODEL, HEAD_DETECTION_FULL_PRECISION, |
|
DEVICE |
|
) |
|
|
|
yolos_boxes, yolos_labels, yolos_scores, yolos_raws = detection_results['yolos'] |
|
rtdetr_boxes, rtdetr_labels, rtdetr_scores, rtdetr_raws = detection_results['rtdetr'] |
|
artifact_boxes, artifact_labels, artifact_scores, artifact_raws = detection_results['artifacts'] |
|
head_boxes, head_labels, head_scores, head_raws = detection_results['head'] |
|
|
|
if detection_results['warnings']: |
|
log_item["warnings"] = log_item.get("warnings", []) + detection_results['warnings'] |
|
|
|
detection_time = time.perf_counter() - detection_start |
|
logging.log(LOG_LEVEL_MAP["INFO"], f"{EMOJI_MAP['INFO']} Detection completed in {detection_time:.3f}s") |
|
|
|
for i in range(len(yolos_boxes)): |
|
x1, y1, x2, y2 = yolos_boxes[i] |
|
global_box = box_to_global(x1, y1, x2, y2) |
|
global_box = clamp_box_to_region(global_box, [rx1, ry1, rx2, ry2]) |
|
|
|
all_boxes.append(global_box) |
|
all_labels.append(40000 + yolos_labels[i]) |
|
all_scores.append(yolos_scores[i]) |
|
all_raw.append(yolos_raws[i]) |
|
all_models.append("yolos_fashionpedia_model") |
|
|
|
for i in range(len(rtdetr_boxes)): |
|
x1, y1, x2, y2 = rtdetr_boxes[i] |
|
global_box = box_to_global(x1, y1, x2, y2) |
|
global_box = clamp_box_to_region(global_box, [rx1, ry1, rx2, ry2]) |
|
|
|
all_boxes.append(global_box) |
|
all_labels.append(10000 + rtdetr_labels[i]) |
|
all_scores.append(rtdetr_scores[i]) |
|
all_raw.append(rtdetr_raws[i]) |
|
all_models.append("rtdetr_model") |
|
|
|
for i in range(len(artifact_boxes)): |
|
x1, y1, x2, y2 = artifact_boxes[i] |
|
global_box = box_to_global(x1, y1, x2, y2) |
|
global_box = clamp_box_to_region(global_box, [rx1, ry1, rx2, ry2]) |
|
|
|
all_boxes.append(global_box) |
|
all_labels.append(20000 + artifact_labels[i]) |
|
all_scores.append(artifact_scores[i]) |
|
all_raw.append(artifact_raws[i]) |
|
all_models.append("rtdetr_artifact") |
|
|
|
for i in range(len(head_boxes)): |
|
all_boxes.append(head_boxes[i]) |
|
all_labels.append(9999) |
|
all_scores.append(head_scores[i]) |
|
all_raw.append(head_raws[i]) |
|
all_models.append("head_model") |
|
|
|
logging.log(LOG_LEVEL_MAP["INFO"], f"{EMOJI_MAP['INFO']} Total detections before filtering: {len(all_boxes)} - YOLOS: {sum(1 for m in all_models if m == 'yolos_fashionpedia_model')}, RTDETR: {sum(1 for m in all_models if m == 'rtdetr_model')}, Head: {sum(1 for m in all_models if m == 'head_model')}") |
|
|
|
final_keywords = [None] * len(all_boxes) |
|
|
|
for i_ in range(len(all_boxes)): |
|
lb_ = all_labels[i_] |
|
model_name = all_models[i_] |
|
|
|
if lb_ == 9999: |
|
if "head" in ctx.keywords: |
|
final_keywords[i_] = "head" |
|
else: |
|
label_str = all_raw[i_] |
|
mapped = map_label_to_keyword(label_str, ctx.keywords, model_name) |
|
if mapped: |
|
final_keywords[i_] = mapped |
|
|
|
keep_mask = [True] * len(all_boxes) |
|
for i_ in range(len(all_boxes)): |
|
cat_ = final_keywords[i_] |
|
sc_ = all_scores[i_] |
|
if not cat_: |
|
keep_mask[i_] = False |
|
else: |
|
threshold_needed = get_threshold_for_keyword(cat_) |
|
if sc_ < threshold_needed: |
|
keep_mask[i_] = False |
|
|
|
fb, fl, fs, fk, fr, fm = [], [], [], [], [], [] |
|
for i_ in range(len(all_boxes)): |
|
if keep_mask[i_]: |
|
fb.append(all_boxes[i_]) |
|
fl.append(all_labels[i_]) |
|
fs.append(all_scores[i_]) |
|
fk.append(final_keywords[i_]) |
|
fr.append(all_raw[i_]) |
|
fm.append(all_models[i_]) |
|
|
|
fb, fl, fs, fk, fr, fm = apply_box_overlap_filter( |
|
fb, fl, fs, fk, fr, fm |
|
) |
|
|
|
second_pass = [] |
|
for i_ in range(len(fb)): |
|
cat_ = fk[i_] |
|
(bx1, by1, bx2, by2) = fb[i_] |
|
centerY = 0.5 * (by1 + by2) |
|
if cat_ == "head": |
|
if centerY <= (H * (UPPER_HEAD_FILTER / 100.0)): |
|
second_pass.append(i_) |
|
elif cat_ == "shoes": |
|
if centerY >= (H * (1 - (LOWER_SHOE_FILTER / 100.0))): |
|
second_pass.append(i_) |
|
else: |
|
second_pass.append(i_) |
|
|
|
collar_idx = [i_ for i_, kw_ in enumerate(fk) if kw_ in ["collar", "neckline"]] |
|
collar_boxes = [fb[x] for x in collar_idx] |
|
|
|
def intersect_area(a_, b_): |
|
xx1 = max(a_[0], b_[0]) |
|
yy1 = max(a_[1], b_[1]) |
|
xx2 = min(a_[2], b_[2]) |
|
yy2 = min(a_[3], b_[3]) |
|
iw = max(0, xx2 - xx1) |
|
ih = max(0, yy2 - yy1) |
|
return iw * ih |
|
|
|
final_keep = [] |
|
for idx2 in second_pass: |
|
kw_ = fk[idx2] |
|
if kw_ == "head": |
|
hx1, hy1, hx2, hy2 = fb[idx2] |
|
hArea = (hx2 - hx1) * (hy2 - hy1) |
|
if hArea <= 0: |
|
continue |
|
remove = False |
|
for cb_ in collar_boxes: |
|
ia = intersect_area([hx1, hy1, hx2, hy2], cb_) |
|
if ia / float(hArea) > (NECKLINE_COLLAR_HEAD / 100.0): |
|
remove = True |
|
break |
|
if not remove: |
|
final_keep.append(idx2) |
|
else: |
|
final_keep.append(idx2) |
|
|
|
final_boxes = [fb[x] for x in final_keep] |
|
final_labels = [fl[x] for x in final_keep] |
|
final_scores = [round(fs[x], 2) for x in final_keep] |
|
final_kws = [fk[x] for x in final_keep] |
|
final_raws = [fr[x] for x in final_keep] |
|
final_mods = [fm[x] for x in final_keep] |
|
|
|
logging.log(LOG_LEVEL_MAP["INFO"], f"{EMOJI_MAP['INFO']} Final detections after filtering: {len(final_boxes)} boxes") |
|
for i in range(len(final_boxes)): |
|
box = final_boxes[i] |
|
logging.log(LOG_LEVEL_MAP["INFO"], f"{EMOJI_MAP['INFO']} Final detection: {final_kws[i]} ({final_raws[i]}) at score {final_scores[i]} | box=[{box[0]},{box[1]},{box[2]},{box[3]}]") |
|
|
|
dd_log = defaultdict(list) |
|
for i in range(len(final_boxes)): |
|
cat_ = final_kws[i] |
|
if cat_: |
|
dd_log[cat_].append({ |
|
"box": final_boxes[i], |
|
"score": final_scores[i], |
|
"raw_label": final_raws[i], |
|
"model": final_mods[i] |
|
}) |
|
|
|
if ctx.product_type not in dd_log: |
|
from .rtdetr_model import RTDETR_CONF |
|
fallback_box = ctx.define_result.get("largest_box", None) |
|
final_boxes, final_labels, final_scores, final_kws, final_raws, final_mods, dd_log = update_fallback_detection( |
|
ctx, pi_rgba, fallback_box, RTDETR_PROCESSOR, RTDETR_MODEL, |
|
DEVICE, RTDETR_CONF, final_boxes, final_labels, final_scores, |
|
final_kws, final_raws, final_mods, dd_log |
|
) |
|
|
|
detection_result = { |
|
"status": "ok", |
|
"boxes": final_boxes, |
|
"labels": final_labels, |
|
"scores": final_scores, |
|
"final_keywords": final_kws, |
|
"raw_labels": final_raws, |
|
"models": final_mods |
|
} |
|
|
|
BOTTOM_CLOTHING_TYPES = ["jeans", "shorts", "skirt"] |
|
|
|
is_bottom_clothing = ctx.product_type.lower() in BOTTOM_CLOTHING_TYPES |
|
|
|
detection_log_schema = { |
|
"yolos_fashionpedia_model": { |
|
"person_list": {}, |
|
"product_type_list": {}, |
|
"head_list": {}, |
|
"shoes_list": {}, |
|
"clothing_features_list": {}, |
|
"artifacts_list": {} |
|
}, |
|
"rtdetr_model": { |
|
"person_list": {}, |
|
"product_type_list": {}, |
|
"head_list": {}, |
|
"shoes_list": {}, |
|
"clothing_features_list": {}, |
|
"artifacts_list": {} |
|
} |
|
} |
|
|
|
if not is_bottom_clothing: |
|
detection_log_schema["head_model"] = { |
|
"head_list": {} |
|
} |
|
|
|
for i in range(len(final_boxes)): |
|
keyword = final_kws[i] |
|
raw_label = final_raws[i] |
|
score = final_scores[i] |
|
model = final_mods[i] |
|
|
|
if is_bottom_clothing and model == "head_model": |
|
continue |
|
|
|
if model not in detection_log_schema: |
|
continue |
|
|
|
model_dict = detection_log_schema[model] |
|
|
|
if keyword in PERSON_LIST and "person_list" in model_dict: |
|
if keyword not in model_dict["person_list"]: |
|
model_dict["person_list"][keyword] = {} |
|
model_dict["person_list"][keyword][raw_label] = score |
|
elif keyword in PRODUCT_TYPE_LIST and "product_type_list" in model_dict: |
|
if keyword not in model_dict["product_type_list"]: |
|
model_dict["product_type_list"][keyword] = {} |
|
model_dict["product_type_list"][keyword][raw_label] = score |
|
elif keyword in HEAD_LIST and "head_list" in model_dict: |
|
if keyword not in model_dict["head_list"]: |
|
model_dict["head_list"][keyword] = {} |
|
model_dict["head_list"][keyword][raw_label] = score |
|
elif keyword in SHOES_LIST and "shoes_list" in model_dict: |
|
if keyword not in model_dict["shoes_list"]: |
|
model_dict["shoes_list"][keyword] = {} |
|
model_dict["shoes_list"][keyword][raw_label] = score |
|
elif keyword in CLOTHING_FEATURES_LIST and "clothing_features_list" in model_dict: |
|
if keyword not in model_dict["clothing_features_list"]: |
|
model_dict["clothing_features_list"][keyword] = {} |
|
model_dict["clothing_features_list"][keyword][raw_label] = score |
|
elif keyword in ARTIFACTS_LIST and "artifacts_list" in model_dict: |
|
if keyword not in model_dict["artifacts_list"]: |
|
model_dict["artifacts_list"][keyword] = {} |
|
model_dict["artifacts_list"][keyword][raw_label] = score |
|
|
|
for model in list(detection_log_schema.keys()): |
|
model_dict = detection_log_schema[model] |
|
for cat in list(model_dict.keys()): |
|
if not model_dict[cat]: |
|
del model_dict[cat] |
|
if not model_dict: |
|
del detection_log_schema[model] |
|
|
|
log_item["status"] = "ok" |
|
log_item["data"]["detection_result_log"] = detection_log_schema |
|
ctx.detection_result = detection_result |
|
batch_logs.append(log_item) |
|
|
|
processing_time = time.perf_counter() - start_time |
|
logging.log(LOG_LEVEL_MAP["SUCCESS"], f"{EMOJI_MAP['SUCCESS']} Completed {function_name} for {len(contexts)} items in {processing_time:.3f}s") |
|
return batch_logs |
|
|
|
def choose_color_for_feature_batch(contexts, batch_logs): |
|
function_name = "choose_color_for_feature_batch" |
|
start_time = time.perf_counter() |
|
logging.log(LOG_LEVEL_MAP["INFO"], f"{EMOJI_MAP['INFO']} Starting {function_name} for {len(contexts)} items") |
|
|
|
processed_count = 0 |
|
skipped_count = 0 |
|
error_count = 0 |
|
|
|
for ctx in contexts: |
|
it_ = { |
|
"image_url": ctx.url, |
|
"function": function_name |
|
} |
|
|
|
if ctx.skip_run or ctx.skip_processing: |
|
it_["status"] = "skipped" |
|
batch_logs.append(it_) |
|
skipped_count += 1 |
|
continue |
|
|
|
dr = ctx.detection_result |
|
if dr.get("status") != "ok": |
|
it_["status"] = "no_detection" |
|
batch_logs.append(it_) |
|
error_count += 1 |
|
continue |
|
|
|
bxs = dr["boxes"] |
|
kws = dr["final_keywords"] |
|
pt = ctx.product_type |
|
colors = [] |
|
|
|
for i, (bx1, by1, bx2, by2) in enumerate(bxs): |
|
kw = kws[i] |
|
if kw == pt: |
|
colors.append(BLUE_BOX_PRODUCT_TYPE) |
|
elif kw in HEAD_LIST: |
|
colors.append(GREEN_BOX_HEAD) |
|
elif kw in SHOES_LIST: |
|
colors.append(VIOLET_BOX_SHOES) |
|
elif kw in CLOTHING_FEATURES_LIST: |
|
colors.append(ORANGE_BOX_CLOTHING_FEATURES) |
|
elif kw in ARTIFACTS_LIST: |
|
colors.append(RED_BOX_ARTIFACTS) |
|
else: |
|
colors.append(BLUE_BOX_PRODUCT_TYPE) |
|
|
|
ctx.box_colors = colors |
|
it_["status"] = "ok" |
|
it_["data"] = { |
|
"color_assignments": { |
|
"total_boxes": len(bxs), |
|
"color_counts": { |
|
"product_type": colors.count(BLUE_BOX_PRODUCT_TYPE), |
|
"head": colors.count(GREEN_BOX_HEAD), |
|
"shoes": colors.count(VIOLET_BOX_SHOES), |
|
"clothing_features": colors.count(ORANGE_BOX_CLOTHING_FEATURES), |
|
"artifacts": colors.count(RED_BOX_ARTIFACTS) |
|
} |
|
} |
|
} |
|
batch_logs.append(it_) |
|
processed_count += 1 |
|
|
|
processing_time = time.perf_counter() - start_time |
|
logging.log(LOG_LEVEL_MAP["SUCCESS"], f"{EMOJI_MAP['SUCCESS']} Completed {function_name}: processed={processed_count}, skipped={skipped_count}, errors={error_count} in {processing_time:.3f}s") |
|
return batch_logs |
|
|
|
def adjust_blue_box_batch(contexts, batch_logs): |
|
function_name = "adjust_blue_box_batch" |
|
start_time = time.perf_counter() |
|
logging.log(LOG_LEVEL_MAP["INFO"], f"{EMOJI_MAP['INFO']} Starting {function_name} for {len(contexts)} items") |
|
|
|
processed_count = 0 |
|
skipped_count = 0 |
|
error_count = 0 |
|
|
|
for ctx in contexts: |
|
it_ = { |
|
"image_url": ctx.url, |
|
"function": function_name |
|
} |
|
|
|
if ctx.skip_run or ctx.skip_processing: |
|
it_["status"] = "skipped" |
|
batch_logs.append(it_) |
|
skipped_count += 1 |
|
continue |
|
|
|
dr = ctx.detection_result |
|
if dr.get("status") != "ok": |
|
it_["status"] = "no_detection" |
|
batch_logs.append(it_) |
|
error_count += 1 |
|
continue |
|
|
|
if "original" not in ctx.pil_img: |
|
it_["status"] = "error" |
|
it_["exception"] = "No original image found" |
|
batch_logs.append(it_) |
|
error_count += 1 |
|
continue |
|
|
|
pi_rgba, _, _ = ctx.pil_img["original"] |
|
largest_box = ctx.define_result.get("largest_box") |
|
if not largest_box: |
|
it_["status"] = "error" |
|
it_["exception"] = "No main box detected" |
|
batch_logs.append(it_) |
|
error_count += 1 |
|
continue |
|
|
|
final_boxes = dr["boxes"] |
|
final_kws = dr["final_keywords"] |
|
final_cols = ctx.box_colors |
|
|
|
bx1, by1, bx2, by2 = map(int, largest_box) |
|
largest_green = None |
|
largest_violet = None |
|
largest_blue = None |
|
areaG = 0 |
|
areaV = 0 |
|
areaB = 0 |
|
|
|
for i, (xx1, yy1, xx2, yy2) in enumerate(final_boxes): |
|
area_ = (xx2 - xx1) * (yy2 - yy1) |
|
col_ = final_cols[i] |
|
kw_ = final_kws[i] |
|
if kw_ in HEAD_LIST and area_ > areaG: |
|
areaG = area_ |
|
largest_green = (xx1, yy1, xx2, yy2) |
|
elif kw_ in SHOES_LIST and area_ > areaV: |
|
areaV = area_ |
|
largest_violet = (xx1, yy1, xx2, yy2) |
|
if col_ == BLUE_BOX_PRODUCT_TYPE and area_ > areaB: |
|
areaB = area_ |
|
largest_blue = (xx1, yy1, xx2, yy2) |
|
|
|
adjusted_box = largest_blue |
|
lines_update = {} |
|
|
|
adjustment_log = { |
|
"original_blue_box": largest_blue, |
|
"head_box": largest_green, |
|
"shoes_box": largest_violet, |
|
"adjustments": {} |
|
} |
|
|
|
if adjusted_box and areaB > 0: |
|
x1b, y1b, x2b, y2b = adjusted_box |
|
m = min(pi_rgba.width, pi_rgba.height) |
|
horiz_off = int(m * LEFT_RIGHT_BLUE_OFFSET) |
|
up_ofs = int(m * LOWER_UPPER_BLUE_OFFSET) |
|
dn_ofs = int(m * LOWER_UPPER_BLUE_OFFSET) |
|
|
|
x1_ext = max(x1b - horiz_off, bx1) |
|
x2_ext = min(x2b + horiz_off, bx2) |
|
y1_ext = y1b |
|
y2_ext = y2b |
|
|
|
adjustment_log["adjustments"]["horizontal_offset"] = horiz_off |
|
adjustment_log["adjustments"]["vertical_offset"] = {"up": up_ofs, "down": dn_ofs} |
|
|
|
if largest_green: |
|
gx1, gy1, gx2, gy2 = largest_green |
|
top_off = int(m * CREATE_UPPER_BLUE_LOWER_GREEN_RATIO) |
|
new_top = gy2 - top_off |
|
y1_ext = max(by1, new_top) |
|
adjustment_log["adjustments"]["head_top_offset"] = top_off |
|
else: |
|
y1_ext = max(y1_ext - up_ofs, by1) |
|
adjustment_log["adjustments"]["default_top_offset"] = up_ofs |
|
|
|
if largest_violet: |
|
vx1, vy1, vx2, vy2 = largest_violet |
|
bot_off = int(m * CREATE_LOWER_BLUE_LOWER_VIOLET_RATIO) |
|
new_bot = vy2 + bot_off |
|
y2_ext = min(by2, max(y2_ext, new_bot)) |
|
adjustment_log["adjustments"]["shoes_bottom_offset"] = bot_off |
|
else: |
|
y2_ext = min(y2_ext + dn_ofs, by2) |
|
adjustment_log["adjustments"]["default_bottom_offset"] = dn_ofs |
|
|
|
x1_ext = max(bx1, min(x1_ext, bx2)) |
|
x2_ext = max(bx1, min(x2_ext, bx2)) |
|
y1_ext = max(by1, min(y1_ext, by2)) |
|
y2_ext = max(by1, min(y2_ext, by2)) |
|
adjusted_box = (x1_ext, y1_ext, x2_ext, y2_ext) |
|
|
|
lines_update = { |
|
"left": f"{x1b}->{x1_ext}", |
|
"right": f"{x2b}->{x2_ext}", |
|
"upper": f"{y1b}->{y1_ext}", |
|
"lower": f"{y2b}->{y2_ext}" |
|
} |
|
|
|
adjustment_log["adjustments"]["before"] = [x1b, y1b, x2b, y2b] |
|
adjustment_log["adjustments"]["after"] = [x1_ext, y1_ext, x2_ext, y2_ext] |
|
|
|
largest_area = (bx2 - bx1) * (by2 - by1) |
|
adjusted_area = (x2_ext - x1_ext) * (y2_ext - y1_ext) |
|
|
|
adjustment_log["areas"] = { |
|
"largest_box_area": largest_area, |
|
"adjusted_area": adjusted_area |
|
} |
|
|
|
if largest_area > 0: |
|
ratio = (adjusted_area / float(largest_area)) * 100 |
|
adjustment_log["areas"]["ratio_percent"] = round(ratio, 2) |
|
|
|
if ratio < BLUE_BOX_FALLBACK_THRESHOLD: |
|
adjusted_box = (bx1, by1, bx2, by2) |
|
lines_update["fallback"] = ( |
|
f"Adjusted area {round(ratio,1)}% < {BLUE_BOX_FALLBACK_THRESHOLD}%, " |
|
"fallback to entire largest box." |
|
) |
|
adjustment_log["fallback_used"] = True |
|
adjustment_log["fallback_reason"] = f"Area ratio {round(ratio,2)}% below threshold {BLUE_BOX_FALLBACK_THRESHOLD}%" |
|
else: |
|
adjustment_log["fallback_used"] = False |
|
|
|
ctx.adjusted_blue_box = adjusted_box |
|
it_["status"] = "ok" |
|
it_["lines_update"] = lines_update |
|
it_["data"] = {"adjustment_log": adjustment_log} |
|
batch_logs.append(it_) |
|
processed_count += 1 |
|
|
|
processing_time = time.perf_counter() - start_time |
|
logging.log(LOG_LEVEL_MAP["SUCCESS"], f"{EMOJI_MAP['SUCCESS']} Completed {function_name}: processed={processed_count}, skipped={skipped_count}, errors={error_count} in {processing_time:.3f}s") |
|
return batch_logs |
|
|
|
def draw_batch(contexts, batch_logs): |
|
function_name = "draw_batch" |
|
start_time = time.perf_counter() |
|
logging.log(LOG_LEVEL_MAP["INFO"], f"{EMOJI_MAP['INFO']} Starting {function_name} for {len(contexts)} items") |
|
|
|
processed_count = 0 |
|
skipped_count = 0 |
|
error_count = 0 |
|
|
|
for ctx in contexts: |
|
it_ = { |
|
"image_url": ctx.url, |
|
"function": function_name |
|
} |
|
|
|
if ctx.skip_run or ctx.skip_processing: |
|
it_["status"] = "skipped" |
|
batch_logs.append(it_) |
|
skipped_count += 1 |
|
continue |
|
|
|
dr = ctx.detection_result |
|
if dr.get("status") != "ok": |
|
it_["status"] = "no_detection" |
|
batch_logs.append(it_) |
|
error_count += 1 |
|
continue |
|
|
|
if "original" not in ctx.pil_img: |
|
it_["status"] = "error" |
|
it_["exception"] = "No original image found" |
|
batch_logs.append(it_) |
|
error_count += 1 |
|
continue |
|
|
|
pi_rgba, orig_filename, _ = ctx.pil_img["original"] |
|
final_boxes = dr["boxes"] |
|
final_kws = dr["final_keywords"] |
|
final_cols = ctx.box_colors |
|
largest_box = ctx.define_result.get("largest_box") |
|
|
|
box_drawn = 0 |
|
color_log = { |
|
"BLACK_BOX_PERSON": {"count": 0, "boxes": []}, |
|
"BLUE_BOX_PRODUCT_TYPE": {"count": 0, "boxes": []}, |
|
"GREEN_BOX_HEAD": {"count": 0, "boxes": []}, |
|
"VIOLET_BOX_SHOES": {"count": 0, "boxes": []}, |
|
"ORANGE_BOX_CLOTHING_FEATURES": {"count": 0, "boxes": []}, |
|
"RED_BOX_ARTIFACTS": {"count": 0, "boxes": []}, |
|
} |
|
|
|
apply_draw = Apply_Draw |
|
if apply_draw: |
|
d_ = ImageDraw.Draw(pi_rgba, mode="RGBA") |
|
|
|
for i, (bx1, by1, bx2, by2) in enumerate(final_boxes): |
|
c_ = final_cols[i] |
|
if c_ != BLUE_BOX_PRODUCT_TYPE: |
|
s_ = f"({bx1},{by1},{bx2},{by2})" |
|
d_.rectangle([bx1, by1, bx2, by2], outline=c_, width=2) |
|
box_drawn += 1 |
|
if c_ == GREEN_BOX_HEAD: |
|
color_log["GREEN_BOX_HEAD"]["count"] += 1 |
|
color_log["GREEN_BOX_HEAD"]["boxes"].append(s_) |
|
elif c_ == VIOLET_BOX_SHOES: |
|
color_log["VIOLET_BOX_SHOES"]["count"] += 1 |
|
color_log["VIOLET_BOX_SHOES"]["boxes"].append(s_) |
|
elif c_ == ORANGE_BOX_CLOTHING_FEATURES: |
|
color_log["ORANGE_BOX_CLOTHING_FEATURES"]["count"] += 1 |
|
color_log["ORANGE_BOX_CLOTHING_FEATURES"]["boxes"].append(s_) |
|
elif c_ == RED_BOX_ARTIFACTS: |
|
color_log["RED_BOX_ARTIFACTS"]["count"] += 1 |
|
color_log["RED_BOX_ARTIFACTS"]["boxes"].append(s_) |
|
|
|
if ctx.adjusted_blue_box: |
|
abx1, aby1, abx2, aby2 = ctx.adjusted_blue_box |
|
s_ = f"({abx1},{aby1},{abx2},{aby2})" |
|
d_.rectangle([abx1, aby1, abx2, aby2], outline=BLUE_BOX_PRODUCT_TYPE, width=2) |
|
color_log["BLUE_BOX_PRODUCT_TYPE"]["count"] += 1 |
|
color_log["BLUE_BOX_PRODUCT_TYPE"]["boxes"].append(s_) |
|
box_drawn += 1 |
|
|
|
if largest_box: |
|
lx1, ly1, lx2, ly2 = largest_box |
|
s_ = f"({lx1},{ly1},{lx2},{ly2})" |
|
d_.rectangle([lx1, ly1, lx2, ly2], outline=BLACK_BOX_PERSON, width=4) |
|
color_log["BLACK_BOX_PERSON"]["count"] += 1 |
|
color_log["BLACK_BOX_PERSON"]["boxes"].append(s_) |
|
box_drawn += 1 |
|
|
|
ctx.pil_img["original"] = [pi_rgba, orig_filename, None] |
|
it_["status"] = "ok" if apply_draw else "no_drawing" |
|
it_["boxes_drawn"] = box_drawn |
|
it_["data"] = { |
|
"colors": color_log, |
|
"draw_enabled": apply_draw, |
|
"boxes_count": { |
|
"total": len(final_boxes), |
|
"drawn": box_drawn |
|
} |
|
} |
|
batch_logs.append(it_) |
|
processed_count += 1 |
|
|
|
processing_time = time.perf_counter() - start_time |
|
logging.log(LOG_LEVEL_MAP["SUCCESS"], f"{EMOJI_MAP['SUCCESS']} Completed {function_name}: processed={processed_count}, skipped={skipped_count}, errors={error_count} in {processing_time:.3f}s") |
|
return batch_logs |
|
|
|
|
|
|
|
|
|
def ensure_models_loaded(): |
|
import app |
|
app.ensure_models_loaded() |
|
|
|
pipeline_step = create_pipeline_step(ensure_models_loaded) |
|
|
|
@pipeline_step |
|
def bounding_box(contexts: List[ProcessingContext], batch_logs: List[dict] = None): |
|
if batch_logs is None: |
|
batch_logs = [] |
|
|
|
start_time = time.perf_counter() |
|
logging.log(LOG_LEVEL_MAP["INFO"], f"{EMOJI_MAP['INFO']} Starting bounding_box pipeline for {len(contexts)} items") |
|
|
|
from src.models import model_loader |
|
|
|
RTDETR_PROCESSOR = model_loader.RTDETR_PROCESSOR |
|
RTDETR_MODEL = model_loader.RTDETR_MODEL |
|
RTDETR_FULL_PRECISION = model_loader.RTDETR_FULL_PRECISION |
|
HEAD_PROCESSOR = model_loader.HEAD_PROCESSOR |
|
HEAD_MODEL = model_loader.HEAD_MODEL |
|
HEAD_DETECTION_FULL_PRECISION = model_loader.HEAD_DETECTION_FULL_PRECISION |
|
YOLOS_PROCESSOR = model_loader.YOLOS_PROCESSOR |
|
YOLOS_MODEL = model_loader.YOLOS_MODEL |
|
DEVICE = model_loader.DEVICE |
|
MODELS_LOADED = model_loader.MODELS_LOADED |
|
LOAD_ERROR = model_loader.LOAD_ERROR |
|
|
|
define_largest_box_batch( |
|
contexts, batch_logs, RTDETR_PROCESSOR, RTDETR_MODEL, |
|
RTDETR_FULL_PRECISION, DEVICE, MODELS_LOADED, LOAD_ERROR |
|
) |
|
|
|
detect_batch( |
|
contexts, batch_logs, |
|
HEAD_PROCESSOR, HEAD_MODEL, HEAD_DETECTION_FULL_PRECISION, |
|
RTDETR_PROCESSOR, RTDETR_MODEL, RTDETR_FULL_PRECISION, |
|
YOLOS_PROCESSOR, YOLOS_MODEL, |
|
DEVICE, MODELS_LOADED, LOAD_ERROR |
|
) |
|
|
|
choose_color_for_feature_batch(contexts, batch_logs) |
|
|
|
adjust_blue_box_batch(contexts, batch_logs) |
|
|
|
draw_batch(contexts, batch_logs) |
|
|
|
processing_time = time.perf_counter() - start_time |
|
logging.log(LOG_LEVEL_MAP["SUCCESS"], f"{EMOJI_MAP['SUCCESS']} Completed bounding_box pipeline for {len(contexts)} items in {processing_time:.3f}s") |
|
|
|
return batch_logs |
|
|