Spaces:
Running
on
Zero
Running
on
Zero
# ---------------------------------------------------------------------- | |
# IMPORTS | |
# ---------------------------------------------------------------------- | |
import torch | |
import numpy as np | |
import logging | |
import time | |
import spaces | |
import sys | |
from PIL import Image, ImageDraw | |
from collections import defaultdict | |
from typing import Dict, List, Optional, Tuple, Any, Union | |
from src.utils import ProcessingContext, create_pipeline_step, LOG_LEVEL_MAP, EMOJI_MAP | |
from src.processing.bounding_box.yolos_fashionpedia_model import ( | |
detect_with_yolos_fashionpedia, | |
detect_yolos_in_roi, | |
process_yolos_results | |
) | |
from src.processing.bounding_box.rtdetr_model import ( | |
detect_rtdetr_in_roi, | |
detect_rtdetr_artifacts_in_roi, | |
update_fallback_detection, | |
get_rtdetr_clothing_labels, | |
get_rtdetr_person_and_product_labels, | |
get_rtdetr_artifact_labels, | |
process_rtdetr_results | |
) | |
from src.processing.bounding_box.head_model import detect_head_in_roi | |
# ---------------------------------------------------------------------- | |
# CONSTANTS | |
# ---------------------------------------------------------------------- | |
Apply_Draw = False | |
DETECT_ARTIFACTS = False | |
PRODUCT_TYPE_BOX_FILTER_THRESHOLD = 0.4 | |
HEAD_BOX_FILTER_THRESHOLD = 0.5 | |
SHOES_BOX_FILTER_THRESHOLD = 0.5 | |
CLOTHING_FEATURES_BOX_FILTER_THRESHOLD = 0.4 | |
ARTIFACTS_BOX_FILTER_THRESHOLD = 0.7 | |
BOX_OVERLAP = 50 | |
LARGEST_BOX_EXTENSION_RATIO = 0.02 | |
LARGEST_BOX_THRESHOLD = 5 | |
CREATE_UPPER_BLUE_LOWER_GREEN_RATIO = 0.02 | |
CREATE_LOWER_BLUE_LOWER_VIOLET_RATIO = 0.5 | |
LEFT_RIGHT_BLUE_OFFSET = 0.5 | |
LOWER_UPPER_BLUE_OFFSET = 0.02 | |
BLUE_BOX_FALLBACK_THRESHOLD = 33 | |
UPPER_HEAD_FILTER = 33 | |
LOWER_SHOE_FILTER = 33 | |
NECKLINE_COLLAR_HEAD = 50 | |
BLUE_BOX_PRODUCT_TYPE = (0, 0, 255, 200) | |
GREEN_BOX_HEAD = (0, 255, 0, 200) | |
VIOLET_BOX_SHOES = (238, 130, 238, 200) | |
ORANGE_BOX_CLOTHING_FEATURES = (255, 165, 0, 200) | |
RED_BOX_ARTIFACTS = (255, 0, 0, 200) | |
BLACK_BOX_PERSON = (0, 0, 0, 200) | |
# ---------------------------------------------------------------------- | |
# DYNAMIC LIST EXTRACTION | |
# ---------------------------------------------------------------------- | |
def extract_category_lists(): | |
from .yolos_fashionpedia_model import MODEL_LABEL_CONFIG as YOLOS_CONFIG | |
from .rtdetr_model import MODEL_LABEL_CONFIG as RTDETR_CONFIG | |
from .head_model import MODEL_LABEL_CONFIG as HEAD_CONFIG | |
MODEL_LABEL_CONFIG = {**YOLOS_CONFIG, **RTDETR_CONFIG, **HEAD_CONFIG} | |
lists = { | |
"PERSON_LIST": set(), | |
"PRODUCT_TYPE_LIST": set(), | |
"HEAD_LIST": set(), | |
"SHOES_LIST": set(), | |
"CLOTHING_FEATURES_LIST": set(), | |
"ARTIFACTS_LIST": set() | |
} | |
for model_name, model_config in MODEL_LABEL_CONFIG.items(): | |
for list_type, keywords in model_config.items(): | |
list_key = list_type.upper() | |
if list_key in lists: | |
lists[list_key].update(keywords.keys()) | |
return {k: list(v) for k, v in lists.items()} | |
extracted_lists = extract_category_lists() | |
PERSON_LIST = extracted_lists["PERSON_LIST"] | |
PRODUCT_TYPE_LIST = extracted_lists["PRODUCT_TYPE_LIST"] | |
HEAD_LIST = extracted_lists["HEAD_LIST"] | |
SHOES_LIST = extracted_lists["SHOES_LIST"] | |
CLOTHING_FEATURES_LIST = extracted_lists["CLOTHING_FEATURES_LIST"] | |
ARTIFACTS_LIST = extracted_lists["ARTIFACTS_LIST"] | |
# ---------------------------------------------------------------------- | |
# SHARED UTILITY FUNCTIONS | |
# ---------------------------------------------------------------------- | |
def get_label_name_from_model(model, label_id): | |
if hasattr(model, 'config') and hasattr(model.config, 'id2label'): | |
return model.config.id2label.get(label_id, f"unknown_{label_id}").lower() | |
if hasattr(model, 'model_labels') and isinstance(model.model_labels, dict): | |
return model.model_labels.get(label_id, f"unknown_{label_id}").lower() | |
return f"unknown_{label_id}" | |
def clamp_box_to_region(box: List[int], region: List[int]) -> List[int]: | |
x1, y1, x2, y2 = box | |
rx1, ry1, rx2, ry2 = region | |
xx1 = max(rx1, min(x1, rx2)) | |
yy1 = max(ry1, min(y1, ry2)) | |
xx2 = max(rx1, min(x2, rx2)) | |
yy2 = max(ry1, min(y2, ry2)) | |
return [xx1, yy1, xx2, yy2] | |
def box_iou(b1, b2): | |
xx1 = max(b1[0], b2[0]) | |
yy1 = max(b1[1], b2[1]) | |
xx2 = min(b1[2], b2[2]) | |
yy2 = min(b1[3], b2[3]) | |
iw = max(0, xx2 - xx1) | |
ih = max(0, yy2 - yy1) | |
inter_area = iw * ih | |
area1 = abs(b1[2] - b1[0]) * abs(b1[3] - b1[1]) | |
area2 = abs(b2[2] - b2[0]) * abs(b2[3] - b2[1]) | |
union = area1 + area2 - inter_area | |
if union <= 0: | |
return 0 | |
return inter_area / union | |
def build_keywords(product_type: str) -> List[str]: | |
pt = product_type.lower().strip() | |
if not pt or pt == "unknown": | |
return [] | |
kw = [pt] | |
if pt in ["jeans", "shorts", "skirt"]: | |
kw += ["shoes", "closure", "pocket"] | |
elif pt in ["jacket", "vest", "shirt"]: | |
kw += ["head", "closure", "pocket", "collar", "sleeve"] | |
elif pt in ["overall", "dress"]: | |
kw += ["head", "shoes", "closure", "pocket", "neckline", "sleeve"] | |
if DETECT_ARTIFACTS: | |
kw += ["bag", "cup"] | |
return kw | |
def apply_box_overlap_filter( | |
boxes: List[List[int]], | |
labels: List[int], | |
scores: List[float], | |
keywords: List[str], | |
raw_labels: List[str], | |
models: List[str] | |
): | |
grouped = defaultdict(list) | |
for i in range(len(boxes)): | |
kwd = keywords[i] | |
grouped[kwd].append({ | |
"box": boxes[i], | |
"label": labels[i], | |
"score": scores[i], | |
"raw_label": raw_labels[i], | |
"model": models[i] | |
}) | |
new_boxes = [] | |
new_labels = [] | |
new_scores = [] | |
new_kws = [] | |
new_raws = [] | |
new_models = [] | |
overlap_thresh = BOX_OVERLAP / 100.0 | |
for kwd, items in grouped.items(): | |
sorted_items = sorted(items, key=lambda x: x["score"], reverse=True) | |
if sorted_items: | |
best_item = sorted_items[0] | |
new_boxes.append(best_item["box"]) | |
new_labels.append(best_item["label"]) | |
new_scores.append(round(best_item["score"], 2)) | |
new_kws.append(kwd) | |
new_raws.append(best_item["raw_label"]) | |
new_models.append(best_item["model"]) | |
return new_boxes, new_labels, new_scores, new_kws, new_raws, new_models | |
def get_threshold_for_keyword(kw: str) -> float: | |
if kw in PRODUCT_TYPE_LIST: | |
return PRODUCT_TYPE_BOX_FILTER_THRESHOLD | |
elif kw in HEAD_LIST: | |
return HEAD_BOX_FILTER_THRESHOLD | |
elif kw in SHOES_LIST: | |
return SHOES_BOX_FILTER_THRESHOLD | |
elif kw in CLOTHING_FEATURES_LIST: | |
return CLOTHING_FEATURES_BOX_FILTER_THRESHOLD | |
elif kw in ARTIFACTS_LIST: | |
return ARTIFACTS_BOX_FILTER_THRESHOLD | |
return 0.0 | |
def map_label_to_keyword(label_name: str, valid_kws: List[str], model_name: str) -> Optional[str]: | |
ln = label_name.strip().lower() | |
if ln in valid_kws: | |
return ln | |
if model_name == "yolos_fashionpedia_model": | |
from .yolos_fashionpedia_model import MODEL_LABEL_CONFIG | |
elif model_name == "rtdetr_model": | |
from .rtdetr_model import MODEL_LABEL_CONFIG | |
elif model_name == "head_model": | |
from .head_model import MODEL_LABEL_CONFIG | |
else: | |
return None | |
model_config = MODEL_LABEL_CONFIG.get(model_name, {}) | |
for list_type in ["person_list", "product_type_list", "head_list", | |
"shoes_list", "clothing_features_list", "artifacts_list"]: | |
category_config = model_config.get(list_type, {}) | |
for keyword, labels in category_config.items(): | |
if keyword in valid_kws: | |
for label in labels: | |
if ln == label.lower() or ln in label.lower(): | |
return keyword | |
return None | |
# ---------------------------------------------------------------------- | |
# MODEL VALIDATION | |
# ---------------------------------------------------------------------- | |
def validate_model_configurations(): | |
from .yolos_fashionpedia_model import MODEL_LABEL_CONFIG as YOLOS_CONFIG | |
from .rtdetr_model import MODEL_LABEL_CONFIG as RTDETR_CONFIG | |
from .head_model import MODEL_LABEL_CONFIG as HEAD_CONFIG | |
MODEL_LABEL_CONFIG = {**YOLOS_CONFIG, **RTDETR_CONFIG, **HEAD_CONFIG} | |
logging.info("=" * 70) | |
logging.info("✅ MODEL CONFIGURATIONS VALIDATION") | |
logging.info("=" * 70) | |
for model_name, model_config in MODEL_LABEL_CONFIG.items(): | |
logging.info(f"\n📌 {model_name.upper()}") | |
logging.info("-" * 50) | |
total_prompts = 0 | |
for category, category_config in model_config.items(): | |
category_prompts = 0 | |
for keyword, labels in category_config.items(): | |
category_prompts += len(labels) | |
total_prompts += len(labels) | |
if category_config: | |
logging.info(f" 📂 {category}: {len(category_config)} keywords, {category_prompts} prompts") | |
logging.info(f" 🎯 Total: {total_prompts} prompts") | |
if model_name == "yolos_fashionpedia_model": | |
logging.info(" ✨ YOLOS Fashionpedia specializes in fashion detection!") | |
logging.info("\n" + "=" * 70) | |
logging.info("🚀 FEATURE CAPABILITIES:") | |
logging.info("-" * 50) | |
logging.info(f" • YOLOS Fashionpedia Detection: ENABLED") | |
logging.info(f" • Artifact Detection: {'ENABLED' if DETECT_ARTIFACTS else 'DISABLED'}") | |
logging.info(f" • Apply Draw (Visualization): {'ENABLED' if Apply_Draw else 'DISABLED'}") | |
logging.info("=" * 70) | |
# ---------------------------------------------------------------------- | |
# DETECTION PIPELINE FUNCTIONS | |
# ---------------------------------------------------------------------- | |
def define_largest_box_batch(contexts, batch_logs, RTDETR_PROCESSOR, RTDETR_MODEL, RTDETR_FULL_PRECISION, DEVICE, MODELS_LOADED, LOAD_ERROR): | |
from .rtdetr_model import RTDETR_CONF | |
function_name = "define_largest_box_batch" | |
start_time = time.perf_counter() | |
logging.log(LOG_LEVEL_MAP["INFO"], f"{EMOJI_MAP['INFO']} Starting {function_name} for {len(contexts)} items") | |
items_for_batch = [] | |
valid_ctx_indices = [] | |
batch_items = [] | |
image_sizes = [] | |
for i, ctx in enumerate(contexts): | |
item_ = { | |
"image_url": ctx.url, | |
"data": {"largest_box": None}, | |
"function": function_name | |
} | |
if ctx.skip_run or ctx.skip_processing: | |
item_["status"] = "skipped" | |
batch_logs.append(item_) | |
continue | |
if not MODELS_LOADED: | |
import sys | |
import traceback | |
error_msg = LOAD_ERROR or "Models not loaded" | |
error_trace = traceback.format_exc() | |
logging.error(f"CRITICAL: Model not loaded in {function_name}: {error_msg}") | |
logging.error(f"Traceback:\n{error_trace}") | |
item_["status"] = "critical_error" | |
item_["exception"] = error_msg | |
item_["traceback"] = error_trace | |
ctx.skip_run = True | |
ctx.error = error_msg | |
ctx.error_traceback = error_trace | |
batch_logs.append(item_) | |
logging.critical("Terminating due to model loading failure") | |
sys.exit(1) | |
if "original" not in ctx.pil_img: | |
item_["status"] = "error" | |
item_["exception"] = "No RBC 'original' found" | |
ctx.skip_run = True | |
batch_logs.append(item_) | |
continue | |
pi_rgba, _, _ = ctx.pil_img["original"] | |
rgb_img = pi_rgba.convert("RGB") | |
items_for_batch.append(rgb_img) | |
image_sizes.append([rgb_img.height, rgb_img.width]) | |
valid_ctx_indices.append(i) | |
batch_items.append(item_) | |
if not items_for_batch: | |
processing_time = time.perf_counter() - start_time | |
logging.log(LOG_LEVEL_MAP["INFO"], f"{EMOJI_MAP['INFO']} {function_name}: No valid items for batch processing in {processing_time:.3f}s") | |
return batch_logs | |
try: | |
results = process_rtdetr_batch( | |
items_for_batch, image_sizes, RTDETR_PROCESSOR, RTDETR_MODEL, DEVICE | |
) | |
rtdetr_clothing_labels = get_rtdetr_clothing_labels() | |
rtdetr_person_product_labels = get_rtdetr_person_and_product_labels() | |
for b_idx, i_ in enumerate(valid_ctx_indices): | |
ctx = contexts[i_] | |
item_ = batch_items[b_idx] | |
pi_rgb = items_for_batch[b_idx] | |
W, H = pi_rgb.size | |
detection_log = { | |
"model": "RT-DETR", | |
"largest_area": 0, | |
"total_area": W * H, | |
"extension_ratio": LARGEST_BOX_EXTENSION_RATIO, | |
"threshold": LARGEST_BOX_THRESHOLD, | |
"raw_detections": [] | |
} | |
if isinstance(results, list): | |
if b_idx < len(results): | |
result = results[b_idx] | |
else: | |
result = results[0] | |
else: | |
result = results | |
largest_area = 0 | |
main_box = None | |
for score, label, box in zip(result["scores"], result["labels"], result["boxes"]): | |
label_id = label.item() | |
score_val = score.item() | |
x1, y1, x2, y2 = [int(val) for val in box.tolist()] | |
label_name = get_label_name_from_model(RTDETR_MODEL, label_id) | |
if label_name in rtdetr_person_product_labels: | |
logging.log(LOG_LEVEL_MAP["INFO"], f"{EMOJI_MAP['INFO']} RT-DETR detected: {label_name} at score {score_val:.3f} | box=[{x1},{y1},{x2},{y2}]") | |
detection_log["raw_detections"].append({ | |
"box": [x1, y1, x2, y2], | |
"score": round(score_val, 3), | |
"label": label_name, | |
"is_clothing": label_name in rtdetr_clothing_labels | |
}) | |
if label_name in rtdetr_clothing_labels: | |
area_ = (x2 - x1) * (y2 - y1) | |
if area_ > largest_area: | |
largest_area = area_ | |
main_box = [x1, y1, x2, y2] | |
if main_box is not None: | |
ratio_off = int(min(W, H) * LARGEST_BOX_EXTENSION_RATIO) | |
x1, y1, x2, y2 = main_box | |
x1e = max(0, x1 - ratio_off) | |
y1e = max(0, y1 - ratio_off) | |
x2e = min(W, x2 + ratio_off) | |
y2e = min(H, y2 + ratio_off) | |
main_box = [x1e, y1e, x2e, y2e] | |
total_area = W * H | |
if largest_area < total_area * (LARGEST_BOX_THRESHOLD / 100.0): | |
main_box = [0, 0, W, H] | |
logging.log(LOG_LEVEL_MAP["INFO"], f"{EMOJI_MAP['INFO']} Fallback to full image: box=[0,0,{W},{H}] | {ctx.url}") | |
else: | |
logging.log(LOG_LEVEL_MAP["INFO"], f"{EMOJI_MAP['INFO']} Largest box detected: box=[{x1e},{y1e},{x2e},{y2e}] | area={largest_area} | {ctx.url}") | |
else: | |
main_box = [0, 0, W, H] | |
logging.log(LOG_LEVEL_MAP["INFO"], f"{EMOJI_MAP['INFO']} No clothing detected, using full image: box=[0,0,{W},{H}] | {ctx.url}") | |
ctx.define_result["largest_box"] = main_box | |
item_["data"]["largest_box"] = main_box | |
detection_log["largest_area"] = largest_area | |
item_["data"]["largest_box_detection"] = detection_log | |
item_["status"] = "ok" | |
except Exception as e: | |
import sys | |
import traceback | |
error_msg = f"{function_name} error: {str(e)}" | |
error_trace = traceback.format_exc() | |
logging.error(f"CRITICAL: {error_msg}") | |
logging.error(f"Traceback:\n{error_trace}") | |
for b_idx, i_ in enumerate(valid_ctx_indices): | |
ctx = contexts[i_] | |
item_ = batch_items[b_idx] | |
item_["status"] = "critical_error" | |
item_["exception"] = error_msg | |
item_["traceback"] = error_trace | |
ctx.skip_run = True | |
ctx.error = str(e) | |
ctx.error_traceback = error_trace | |
logging.critical("Terminating due to bounding box processing failure") | |
sys.exit(1) | |
for item_ in batch_items: | |
batch_logs.append(item_) | |
processing_time = time.perf_counter() - start_time | |
logging.log(LOG_LEVEL_MAP["SUCCESS"], f"{EMOJI_MAP['SUCCESS']} Completed {function_name} for {len(batch_items)} items in {processing_time:.3f}s") | |
return batch_logs | |
def process_rtdetr_batch(images, image_sizes, RTDETR_PROCESSOR, RTDETR_MODEL, DEVICE): | |
from .rtdetr_model import RTDETR_CONF | |
try: | |
rtdetr_inputs = RTDETR_PROCESSOR(images=images, return_tensors="pt").to(DEVICE) | |
with torch.no_grad(): | |
rtdetr_outputs = RTDETR_MODEL(**rtdetr_inputs) | |
rtdetr_results = RTDETR_PROCESSOR.post_process_object_detection( | |
rtdetr_outputs, | |
target_sizes=torch.tensor(image_sizes).to(DEVICE), | |
threshold=RTDETR_CONF | |
) | |
return rtdetr_results | |
except Exception as e: | |
logging.error(f"RT-DETR batch processing error: {str(e)}") | |
return [] | |
def process_single_image_detection(roi_rgb, keywords, rx1, ry1, rW, rH, detect_artifacts, detect_head, | |
YOLOS_PROCESSOR, YOLOS_MODEL, | |
RTDETR_PROCESSOR, RTDETR_MODEL, | |
HEAD_PROCESSOR, HEAD_MODEL, HEAD_DETECTION_FULL_PRECISION, | |
DEVICE): | |
log_item = {"warnings": []} | |
yolos_boxes, yolos_labels, yolos_scores, yolos_raws = detect_yolos_in_roi( | |
roi_rgb, keywords, YOLOS_PROCESSOR, YOLOS_MODEL, DEVICE, log_item | |
) | |
rtdetr_boxes, rtdetr_labels, rtdetr_scores, rtdetr_raws = detect_rtdetr_in_roi( | |
roi_rgb, RTDETR_PROCESSOR, RTDETR_MODEL, DEVICE, log_item | |
) | |
if detect_artifacts: | |
artifact_boxes, artifact_labels, artifact_scores, artifact_raws = detect_rtdetr_artifacts_in_roi( | |
roi_rgb, keywords, RTDETR_PROCESSOR, RTDETR_MODEL, DEVICE, log_item | |
) | |
else: | |
artifact_boxes, artifact_labels, artifact_scores, artifact_raws = [], [], [], [] | |
if detect_head: | |
head_boxes, head_labels, head_scores, head_raws = detect_head_in_roi( | |
roi_rgb, rx1, ry1, rW, rH, HEAD_PROCESSOR, HEAD_MODEL, | |
HEAD_DETECTION_FULL_PRECISION, DEVICE, log_item | |
) | |
else: | |
head_boxes, head_labels, head_scores, head_raws = [], [], [], [] | |
return { | |
'yolos': (yolos_boxes, yolos_labels, yolos_scores, yolos_raws), | |
'rtdetr': (rtdetr_boxes, rtdetr_labels, rtdetr_scores, rtdetr_raws), | |
'artifacts': (artifact_boxes, artifact_labels, artifact_scores, artifact_raws), | |
'head': (head_boxes, head_labels, head_scores, head_raws), | |
'warnings': log_item.get("warnings", []) | |
} | |
def detect_batch(contexts, batch_logs, | |
HEAD_PROCESSOR, HEAD_MODEL, HEAD_DETECTION_FULL_PRECISION, | |
RTDETR_PROCESSOR, RTDETR_MODEL, RTDETR_FULL_PRECISION, | |
YOLOS_PROCESSOR, YOLOS_MODEL, | |
DEVICE, MODELS_LOADED, LOAD_ERROR): | |
function_name = "detect_batch" | |
start_time = time.perf_counter() | |
logging.log(LOG_LEVEL_MAP["INFO"], f"{EMOJI_MAP['INFO']} Starting {function_name} for {len(contexts)} items") | |
for ctx in contexts: | |
log_item = { | |
"image_url": ctx.url, | |
"data": { | |
"detection_result_log": {}, | |
"product_type": ctx.product_type, | |
"keywords": ctx.keywords | |
}, | |
"function": function_name | |
} | |
logging.log(LOG_LEVEL_MAP["INFO"], f"{EMOJI_MAP['INFO']} Processing image: {ctx.url} | Product Type: {ctx.product_type} | Keywords: {ctx.keywords}") | |
if ctx.skip_run or ctx.skip_processing: | |
log_item["status"] = "skipped" | |
batch_logs.append(log_item) | |
continue | |
if "original" not in ctx.pil_img: | |
log_item["status"] = "error" | |
log_item["exception"] = "No RBC 'original'" | |
ctx.skip_run = True | |
batch_logs.append(log_item) | |
continue | |
pi_rgba, _, _ = ctx.pil_img["original"] | |
W, H = pi_rgba.size | |
largest_box = ctx.define_result.get("largest_box") | |
if (not largest_box or | |
not isinstance(largest_box, list) or | |
len(largest_box) != 4): | |
log_item["status"] = "no_detection" | |
log_item["data"]["detection_result_log"] = "no_main_box_detected" | |
batch_logs.append(log_item) | |
continue | |
rx1, ry1, rx2, ry2 = largest_box | |
rW = rx2 - rx1 | |
rH = ry2 - ry1 | |
logging.log(LOG_LEVEL_MAP["INFO"], f"{EMOJI_MAP['INFO']} ROI box: [{rx1},{ry1},{rx2},{ry2}] | size: {rW}x{rH} | {ctx.url}") | |
roi = pi_rgba.crop((rx1, ry1, rx2, ry2)) | |
roi_rgb = roi.convert("RGB") | |
all_boxes = [] | |
all_labels = [] | |
all_scores = [] | |
all_raw = [] | |
all_models = [] | |
def box_to_global(x1, y1, x2, y2): | |
gx1 = int(rx1 + x1) | |
gy1 = int(ry1 + y1) | |
gx2 = int(rx1 + x2) | |
gy2 = int(ry1 + y2) | |
return [gx1, gy1, gx2, gy2] | |
detect_artifacts = DETECT_ARTIFACTS and any(kw in ctx.keywords for kw in ARTIFACTS_LIST) | |
lower_body_types = ["jeans", "shorts", "skirt"] | |
detect_head = ctx.product_type.lower() not in lower_body_types | |
detection_start = time.perf_counter() | |
detection_results = process_single_image_detection( | |
roi_rgb, ctx.keywords, rx1, ry1, rW, rH, detect_artifacts, detect_head, | |
YOLOS_PROCESSOR, YOLOS_MODEL, | |
RTDETR_PROCESSOR, RTDETR_MODEL, | |
HEAD_PROCESSOR, HEAD_MODEL, HEAD_DETECTION_FULL_PRECISION, | |
DEVICE | |
) | |
yolos_boxes, yolos_labels, yolos_scores, yolos_raws = detection_results['yolos'] | |
rtdetr_boxes, rtdetr_labels, rtdetr_scores, rtdetr_raws = detection_results['rtdetr'] | |
artifact_boxes, artifact_labels, artifact_scores, artifact_raws = detection_results['artifacts'] | |
head_boxes, head_labels, head_scores, head_raws = detection_results['head'] | |
if detection_results['warnings']: | |
log_item["warnings"] = log_item.get("warnings", []) + detection_results['warnings'] | |
detection_time = time.perf_counter() - detection_start | |
logging.log(LOG_LEVEL_MAP["INFO"], f"{EMOJI_MAP['INFO']} Detection completed in {detection_time:.3f}s") | |
for i in range(len(yolos_boxes)): | |
x1, y1, x2, y2 = yolos_boxes[i] | |
global_box = box_to_global(x1, y1, x2, y2) | |
global_box = clamp_box_to_region(global_box, [rx1, ry1, rx2, ry2]) | |
all_boxes.append(global_box) | |
all_labels.append(40000 + yolos_labels[i]) | |
all_scores.append(yolos_scores[i]) | |
all_raw.append(yolos_raws[i]) | |
all_models.append("yolos_fashionpedia_model") | |
for i in range(len(rtdetr_boxes)): | |
x1, y1, x2, y2 = rtdetr_boxes[i] | |
global_box = box_to_global(x1, y1, x2, y2) | |
global_box = clamp_box_to_region(global_box, [rx1, ry1, rx2, ry2]) | |
all_boxes.append(global_box) | |
all_labels.append(10000 + rtdetr_labels[i]) | |
all_scores.append(rtdetr_scores[i]) | |
all_raw.append(rtdetr_raws[i]) | |
all_models.append("rtdetr_model") | |
for i in range(len(artifact_boxes)): | |
x1, y1, x2, y2 = artifact_boxes[i] | |
global_box = box_to_global(x1, y1, x2, y2) | |
global_box = clamp_box_to_region(global_box, [rx1, ry1, rx2, ry2]) | |
all_boxes.append(global_box) | |
all_labels.append(20000 + artifact_labels[i]) | |
all_scores.append(artifact_scores[i]) | |
all_raw.append(artifact_raws[i]) | |
all_models.append("rtdetr_artifact") | |
for i in range(len(head_boxes)): | |
all_boxes.append(head_boxes[i]) | |
all_labels.append(9999) | |
all_scores.append(head_scores[i]) | |
all_raw.append(head_raws[i]) | |
all_models.append("head_model") | |
logging.log(LOG_LEVEL_MAP["INFO"], f"{EMOJI_MAP['INFO']} Total detections before filtering: {len(all_boxes)} - YOLOS: {sum(1 for m in all_models if m == 'yolos_fashionpedia_model')}, RTDETR: {sum(1 for m in all_models if m == 'rtdetr_model')}, Head: {sum(1 for m in all_models if m == 'head_model')}") | |
final_keywords = [None] * len(all_boxes) | |
for i_ in range(len(all_boxes)): | |
lb_ = all_labels[i_] | |
model_name = all_models[i_] | |
if lb_ == 9999: | |
if "head" in ctx.keywords: | |
final_keywords[i_] = "head" | |
else: | |
label_str = all_raw[i_] | |
mapped = map_label_to_keyword(label_str, ctx.keywords, model_name) | |
if mapped: | |
final_keywords[i_] = mapped | |
keep_mask = [True] * len(all_boxes) | |
for i_ in range(len(all_boxes)): | |
cat_ = final_keywords[i_] | |
sc_ = all_scores[i_] | |
if not cat_: | |
keep_mask[i_] = False | |
else: | |
threshold_needed = get_threshold_for_keyword(cat_) | |
if sc_ < threshold_needed: | |
keep_mask[i_] = False | |
fb, fl, fs, fk, fr, fm = [], [], [], [], [], [] | |
for i_ in range(len(all_boxes)): | |
if keep_mask[i_]: | |
fb.append(all_boxes[i_]) | |
fl.append(all_labels[i_]) | |
fs.append(all_scores[i_]) | |
fk.append(final_keywords[i_]) | |
fr.append(all_raw[i_]) | |
fm.append(all_models[i_]) | |
fb, fl, fs, fk, fr, fm = apply_box_overlap_filter( | |
fb, fl, fs, fk, fr, fm | |
) | |
second_pass = [] | |
for i_ in range(len(fb)): | |
cat_ = fk[i_] | |
(bx1, by1, bx2, by2) = fb[i_] | |
centerY = 0.5 * (by1 + by2) | |
if cat_ == "head": | |
if centerY <= (H * (UPPER_HEAD_FILTER / 100.0)): | |
second_pass.append(i_) | |
elif cat_ == "shoes": | |
if centerY >= (H * (1 - (LOWER_SHOE_FILTER / 100.0))): | |
second_pass.append(i_) | |
else: | |
second_pass.append(i_) | |
collar_idx = [i_ for i_, kw_ in enumerate(fk) if kw_ in ["collar", "neckline"]] | |
collar_boxes = [fb[x] for x in collar_idx] | |
def intersect_area(a_, b_): | |
xx1 = max(a_[0], b_[0]) | |
yy1 = max(a_[1], b_[1]) | |
xx2 = min(a_[2], b_[2]) | |
yy2 = min(a_[3], b_[3]) | |
iw = max(0, xx2 - xx1) | |
ih = max(0, yy2 - yy1) | |
return iw * ih | |
final_keep = [] | |
for idx2 in second_pass: | |
kw_ = fk[idx2] | |
if kw_ == "head": | |
hx1, hy1, hx2, hy2 = fb[idx2] | |
hArea = (hx2 - hx1) * (hy2 - hy1) | |
if hArea <= 0: | |
continue | |
remove = False | |
for cb_ in collar_boxes: | |
ia = intersect_area([hx1, hy1, hx2, hy2], cb_) | |
if ia / float(hArea) > (NECKLINE_COLLAR_HEAD / 100.0): | |
remove = True | |
break | |
if not remove: | |
final_keep.append(idx2) | |
else: | |
final_keep.append(idx2) | |
final_boxes = [fb[x] for x in final_keep] | |
final_labels = [fl[x] for x in final_keep] | |
final_scores = [round(fs[x], 2) for x in final_keep] | |
final_kws = [fk[x] for x in final_keep] | |
final_raws = [fr[x] for x in final_keep] | |
final_mods = [fm[x] for x in final_keep] | |
logging.log(LOG_LEVEL_MAP["INFO"], f"{EMOJI_MAP['INFO']} Final detections after filtering: {len(final_boxes)} boxes") | |
for i in range(len(final_boxes)): | |
box = final_boxes[i] | |
logging.log(LOG_LEVEL_MAP["INFO"], f"{EMOJI_MAP['INFO']} Final detection: {final_kws[i]} ({final_raws[i]}) at score {final_scores[i]} | box=[{box[0]},{box[1]},{box[2]},{box[3]}]") | |
dd_log = defaultdict(list) | |
for i in range(len(final_boxes)): | |
cat_ = final_kws[i] | |
if cat_: | |
dd_log[cat_].append({ | |
"box": final_boxes[i], | |
"score": final_scores[i], | |
"raw_label": final_raws[i], | |
"model": final_mods[i] | |
}) | |
if ctx.product_type not in dd_log: | |
from .rtdetr_model import RTDETR_CONF | |
fallback_box = ctx.define_result.get("largest_box", None) | |
final_boxes, final_labels, final_scores, final_kws, final_raws, final_mods, dd_log = update_fallback_detection( | |
ctx, pi_rgba, fallback_box, RTDETR_PROCESSOR, RTDETR_MODEL, | |
DEVICE, RTDETR_CONF, final_boxes, final_labels, final_scores, | |
final_kws, final_raws, final_mods, dd_log | |
) | |
detection_result = { | |
"status": "ok", | |
"boxes": final_boxes, | |
"labels": final_labels, | |
"scores": final_scores, | |
"final_keywords": final_kws, | |
"raw_labels": final_raws, | |
"models": final_mods | |
} | |
BOTTOM_CLOTHING_TYPES = ["jeans", "shorts", "skirt"] | |
is_bottom_clothing = ctx.product_type.lower() in BOTTOM_CLOTHING_TYPES | |
detection_log_schema = { | |
"yolos_fashionpedia_model": { | |
"person_list": {}, | |
"product_type_list": {}, | |
"head_list": {}, | |
"shoes_list": {}, | |
"clothing_features_list": {}, | |
"artifacts_list": {} | |
}, | |
"rtdetr_model": { | |
"person_list": {}, | |
"product_type_list": {}, | |
"head_list": {}, | |
"shoes_list": {}, | |
"clothing_features_list": {}, | |
"artifacts_list": {} | |
} | |
} | |
if not is_bottom_clothing: | |
detection_log_schema["head_model"] = { | |
"head_list": {} | |
} | |
for i in range(len(final_boxes)): | |
keyword = final_kws[i] | |
raw_label = final_raws[i] | |
score = final_scores[i] | |
model = final_mods[i] | |
if is_bottom_clothing and model == "head_model": | |
continue | |
if model not in detection_log_schema: | |
continue | |
model_dict = detection_log_schema[model] | |
if keyword in PERSON_LIST and "person_list" in model_dict: | |
if keyword not in model_dict["person_list"]: | |
model_dict["person_list"][keyword] = {} | |
model_dict["person_list"][keyword][raw_label] = score | |
elif keyword in PRODUCT_TYPE_LIST and "product_type_list" in model_dict: | |
if keyword not in model_dict["product_type_list"]: | |
model_dict["product_type_list"][keyword] = {} | |
model_dict["product_type_list"][keyword][raw_label] = score | |
elif keyword in HEAD_LIST and "head_list" in model_dict: | |
if keyword not in model_dict["head_list"]: | |
model_dict["head_list"][keyword] = {} | |
model_dict["head_list"][keyword][raw_label] = score | |
elif keyword in SHOES_LIST and "shoes_list" in model_dict: | |
if keyword not in model_dict["shoes_list"]: | |
model_dict["shoes_list"][keyword] = {} | |
model_dict["shoes_list"][keyword][raw_label] = score | |
elif keyword in CLOTHING_FEATURES_LIST and "clothing_features_list" in model_dict: | |
if keyword not in model_dict["clothing_features_list"]: | |
model_dict["clothing_features_list"][keyword] = {} | |
model_dict["clothing_features_list"][keyword][raw_label] = score | |
elif keyword in ARTIFACTS_LIST and "artifacts_list" in model_dict: | |
if keyword not in model_dict["artifacts_list"]: | |
model_dict["artifacts_list"][keyword] = {} | |
model_dict["artifacts_list"][keyword][raw_label] = score | |
for model in list(detection_log_schema.keys()): | |
model_dict = detection_log_schema[model] | |
for cat in list(model_dict.keys()): | |
if not model_dict[cat]: | |
del model_dict[cat] | |
if not model_dict: | |
del detection_log_schema[model] | |
log_item["status"] = "ok" | |
log_item["data"]["detection_result_log"] = detection_log_schema | |
ctx.detection_result = detection_result | |
batch_logs.append(log_item) | |
processing_time = time.perf_counter() - start_time | |
logging.log(LOG_LEVEL_MAP["SUCCESS"], f"{EMOJI_MAP['SUCCESS']} Completed {function_name} for {len(contexts)} items in {processing_time:.3f}s") | |
return batch_logs | |
def choose_color_for_feature_batch(contexts, batch_logs): | |
function_name = "choose_color_for_feature_batch" | |
start_time = time.perf_counter() | |
logging.log(LOG_LEVEL_MAP["INFO"], f"{EMOJI_MAP['INFO']} Starting {function_name} for {len(contexts)} items") | |
processed_count = 0 | |
skipped_count = 0 | |
error_count = 0 | |
for ctx in contexts: | |
it_ = { | |
"image_url": ctx.url, | |
"function": function_name | |
} | |
if ctx.skip_run or ctx.skip_processing: | |
it_["status"] = "skipped" | |
batch_logs.append(it_) | |
skipped_count += 1 | |
continue | |
dr = ctx.detection_result | |
if dr.get("status") != "ok": | |
it_["status"] = "no_detection" | |
batch_logs.append(it_) | |
error_count += 1 | |
continue | |
bxs = dr["boxes"] | |
kws = dr["final_keywords"] | |
pt = ctx.product_type | |
colors = [] | |
for i, (bx1, by1, bx2, by2) in enumerate(bxs): | |
kw = kws[i] | |
if kw == pt: | |
colors.append(BLUE_BOX_PRODUCT_TYPE) | |
elif kw in HEAD_LIST: | |
colors.append(GREEN_BOX_HEAD) | |
elif kw in SHOES_LIST: | |
colors.append(VIOLET_BOX_SHOES) | |
elif kw in CLOTHING_FEATURES_LIST: | |
colors.append(ORANGE_BOX_CLOTHING_FEATURES) | |
elif kw in ARTIFACTS_LIST: | |
colors.append(RED_BOX_ARTIFACTS) | |
else: | |
colors.append(BLUE_BOX_PRODUCT_TYPE) | |
ctx.box_colors = colors | |
it_["status"] = "ok" | |
it_["data"] = { | |
"color_assignments": { | |
"total_boxes": len(bxs), | |
"color_counts": { | |
"product_type": colors.count(BLUE_BOX_PRODUCT_TYPE), | |
"head": colors.count(GREEN_BOX_HEAD), | |
"shoes": colors.count(VIOLET_BOX_SHOES), | |
"clothing_features": colors.count(ORANGE_BOX_CLOTHING_FEATURES), | |
"artifacts": colors.count(RED_BOX_ARTIFACTS) | |
} | |
} | |
} | |
batch_logs.append(it_) | |
processed_count += 1 | |
processing_time = time.perf_counter() - start_time | |
logging.log(LOG_LEVEL_MAP["SUCCESS"], f"{EMOJI_MAP['SUCCESS']} Completed {function_name}: processed={processed_count}, skipped={skipped_count}, errors={error_count} in {processing_time:.3f}s") | |
return batch_logs | |
def adjust_blue_box_batch(contexts, batch_logs): | |
function_name = "adjust_blue_box_batch" | |
start_time = time.perf_counter() | |
logging.log(LOG_LEVEL_MAP["INFO"], f"{EMOJI_MAP['INFO']} Starting {function_name} for {len(contexts)} items") | |
processed_count = 0 | |
skipped_count = 0 | |
error_count = 0 | |
for ctx in contexts: | |
it_ = { | |
"image_url": ctx.url, | |
"function": function_name | |
} | |
if ctx.skip_run or ctx.skip_processing: | |
it_["status"] = "skipped" | |
batch_logs.append(it_) | |
skipped_count += 1 | |
continue | |
dr = ctx.detection_result | |
if dr.get("status") != "ok": | |
it_["status"] = "no_detection" | |
batch_logs.append(it_) | |
error_count += 1 | |
continue | |
if "original" not in ctx.pil_img: | |
it_["status"] = "error" | |
it_["exception"] = "No original image found" | |
batch_logs.append(it_) | |
error_count += 1 | |
continue | |
pi_rgba, _, _ = ctx.pil_img["original"] | |
largest_box = ctx.define_result.get("largest_box") | |
if not largest_box: | |
it_["status"] = "error" | |
it_["exception"] = "No main box detected" | |
batch_logs.append(it_) | |
error_count += 1 | |
continue | |
final_boxes = dr["boxes"] | |
final_kws = dr["final_keywords"] | |
final_cols = ctx.box_colors | |
bx1, by1, bx2, by2 = map(int, largest_box) | |
largest_green = None | |
largest_violet = None | |
largest_blue = None | |
areaG = 0 | |
areaV = 0 | |
areaB = 0 | |
for i, (xx1, yy1, xx2, yy2) in enumerate(final_boxes): | |
area_ = (xx2 - xx1) * (yy2 - yy1) | |
col_ = final_cols[i] | |
kw_ = final_kws[i] | |
if kw_ in HEAD_LIST and area_ > areaG: | |
areaG = area_ | |
largest_green = (xx1, yy1, xx2, yy2) | |
elif kw_ in SHOES_LIST and area_ > areaV: | |
areaV = area_ | |
largest_violet = (xx1, yy1, xx2, yy2) | |
if col_ == BLUE_BOX_PRODUCT_TYPE and area_ > areaB: | |
areaB = area_ | |
largest_blue = (xx1, yy1, xx2, yy2) | |
adjusted_box = largest_blue | |
lines_update = {} | |
adjustment_log = { | |
"original_blue_box": largest_blue, | |
"head_box": largest_green, | |
"shoes_box": largest_violet, | |
"adjustments": {} | |
} | |
if adjusted_box and areaB > 0: | |
x1b, y1b, x2b, y2b = adjusted_box | |
m = min(pi_rgba.width, pi_rgba.height) | |
horiz_off = int(m * LEFT_RIGHT_BLUE_OFFSET) | |
up_ofs = int(m * LOWER_UPPER_BLUE_OFFSET) | |
dn_ofs = int(m * LOWER_UPPER_BLUE_OFFSET) | |
x1_ext = max(x1b - horiz_off, bx1) | |
x2_ext = min(x2b + horiz_off, bx2) | |
y1_ext = y1b | |
y2_ext = y2b | |
adjustment_log["adjustments"]["horizontal_offset"] = horiz_off | |
adjustment_log["adjustments"]["vertical_offset"] = {"up": up_ofs, "down": dn_ofs} | |
if largest_green: | |
gx1, gy1, gx2, gy2 = largest_green | |
top_off = int(m * CREATE_UPPER_BLUE_LOWER_GREEN_RATIO) | |
new_top = gy2 - top_off | |
y1_ext = max(by1, new_top) | |
adjustment_log["adjustments"]["head_top_offset"] = top_off | |
else: | |
y1_ext = max(y1_ext - up_ofs, by1) | |
adjustment_log["adjustments"]["default_top_offset"] = up_ofs | |
if largest_violet: | |
vx1, vy1, vx2, vy2 = largest_violet | |
bot_off = int(m * CREATE_LOWER_BLUE_LOWER_VIOLET_RATIO) | |
new_bot = vy2 + bot_off | |
y2_ext = min(by2, max(y2_ext, new_bot)) | |
adjustment_log["adjustments"]["shoes_bottom_offset"] = bot_off | |
else: | |
y2_ext = min(y2_ext + dn_ofs, by2) | |
adjustment_log["adjustments"]["default_bottom_offset"] = dn_ofs | |
x1_ext = max(bx1, min(x1_ext, bx2)) | |
x2_ext = max(bx1, min(x2_ext, bx2)) | |
y1_ext = max(by1, min(y1_ext, by2)) | |
y2_ext = max(by1, min(y2_ext, by2)) | |
adjusted_box = (x1_ext, y1_ext, x2_ext, y2_ext) | |
lines_update = { | |
"left": f"{x1b}->{x1_ext}", | |
"right": f"{x2b}->{x2_ext}", | |
"upper": f"{y1b}->{y1_ext}", | |
"lower": f"{y2b}->{y2_ext}" | |
} | |
adjustment_log["adjustments"]["before"] = [x1b, y1b, x2b, y2b] | |
adjustment_log["adjustments"]["after"] = [x1_ext, y1_ext, x2_ext, y2_ext] | |
largest_area = (bx2 - bx1) * (by2 - by1) | |
adjusted_area = (x2_ext - x1_ext) * (y2_ext - y1_ext) | |
adjustment_log["areas"] = { | |
"largest_box_area": largest_area, | |
"adjusted_area": adjusted_area | |
} | |
if largest_area > 0: | |
ratio = (adjusted_area / float(largest_area)) * 100 | |
adjustment_log["areas"]["ratio_percent"] = round(ratio, 2) | |
if ratio < BLUE_BOX_FALLBACK_THRESHOLD: | |
adjusted_box = (bx1, by1, bx2, by2) | |
lines_update["fallback"] = ( | |
f"Adjusted area {round(ratio,1)}% < {BLUE_BOX_FALLBACK_THRESHOLD}%, " | |
"fallback to entire largest box." | |
) | |
adjustment_log["fallback_used"] = True | |
adjustment_log["fallback_reason"] = f"Area ratio {round(ratio,2)}% below threshold {BLUE_BOX_FALLBACK_THRESHOLD}%" | |
else: | |
adjustment_log["fallback_used"] = False | |
ctx.adjusted_blue_box = adjusted_box | |
it_["status"] = "ok" | |
it_["lines_update"] = lines_update | |
it_["data"] = {"adjustment_log": adjustment_log} | |
batch_logs.append(it_) | |
processed_count += 1 | |
processing_time = time.perf_counter() - start_time | |
logging.log(LOG_LEVEL_MAP["SUCCESS"], f"{EMOJI_MAP['SUCCESS']} Completed {function_name}: processed={processed_count}, skipped={skipped_count}, errors={error_count} in {processing_time:.3f}s") | |
return batch_logs | |
def draw_batch(contexts, batch_logs): | |
function_name = "draw_batch" | |
start_time = time.perf_counter() | |
logging.log(LOG_LEVEL_MAP["INFO"], f"{EMOJI_MAP['INFO']} Starting {function_name} for {len(contexts)} items") | |
processed_count = 0 | |
skipped_count = 0 | |
error_count = 0 | |
for ctx in contexts: | |
it_ = { | |
"image_url": ctx.url, | |
"function": function_name | |
} | |
if ctx.skip_run or ctx.skip_processing: | |
it_["status"] = "skipped" | |
batch_logs.append(it_) | |
skipped_count += 1 | |
continue | |
dr = ctx.detection_result | |
if dr.get("status") != "ok": | |
it_["status"] = "no_detection" | |
batch_logs.append(it_) | |
error_count += 1 | |
continue | |
if "original" not in ctx.pil_img: | |
it_["status"] = "error" | |
it_["exception"] = "No original image found" | |
batch_logs.append(it_) | |
error_count += 1 | |
continue | |
pi_rgba, orig_filename, _ = ctx.pil_img["original"] | |
final_boxes = dr["boxes"] | |
final_kws = dr["final_keywords"] | |
final_cols = ctx.box_colors | |
largest_box = ctx.define_result.get("largest_box") | |
box_drawn = 0 | |
color_log = { | |
"BLACK_BOX_PERSON": {"count": 0, "boxes": []}, | |
"BLUE_BOX_PRODUCT_TYPE": {"count": 0, "boxes": []}, | |
"GREEN_BOX_HEAD": {"count": 0, "boxes": []}, | |
"VIOLET_BOX_SHOES": {"count": 0, "boxes": []}, | |
"ORANGE_BOX_CLOTHING_FEATURES": {"count": 0, "boxes": []}, | |
"RED_BOX_ARTIFACTS": {"count": 0, "boxes": []}, | |
} | |
apply_draw = Apply_Draw | |
if apply_draw: | |
d_ = ImageDraw.Draw(pi_rgba, mode="RGBA") | |
for i, (bx1, by1, bx2, by2) in enumerate(final_boxes): | |
c_ = final_cols[i] | |
if c_ != BLUE_BOX_PRODUCT_TYPE: | |
s_ = f"({bx1},{by1},{bx2},{by2})" | |
d_.rectangle([bx1, by1, bx2, by2], outline=c_, width=2) | |
box_drawn += 1 | |
if c_ == GREEN_BOX_HEAD: | |
color_log["GREEN_BOX_HEAD"]["count"] += 1 | |
color_log["GREEN_BOX_HEAD"]["boxes"].append(s_) | |
elif c_ == VIOLET_BOX_SHOES: | |
color_log["VIOLET_BOX_SHOES"]["count"] += 1 | |
color_log["VIOLET_BOX_SHOES"]["boxes"].append(s_) | |
elif c_ == ORANGE_BOX_CLOTHING_FEATURES: | |
color_log["ORANGE_BOX_CLOTHING_FEATURES"]["count"] += 1 | |
color_log["ORANGE_BOX_CLOTHING_FEATURES"]["boxes"].append(s_) | |
elif c_ == RED_BOX_ARTIFACTS: | |
color_log["RED_BOX_ARTIFACTS"]["count"] += 1 | |
color_log["RED_BOX_ARTIFACTS"]["boxes"].append(s_) | |
if ctx.adjusted_blue_box: | |
abx1, aby1, abx2, aby2 = ctx.adjusted_blue_box | |
s_ = f"({abx1},{aby1},{abx2},{aby2})" | |
d_.rectangle([abx1, aby1, abx2, aby2], outline=BLUE_BOX_PRODUCT_TYPE, width=2) | |
color_log["BLUE_BOX_PRODUCT_TYPE"]["count"] += 1 | |
color_log["BLUE_BOX_PRODUCT_TYPE"]["boxes"].append(s_) | |
box_drawn += 1 | |
if largest_box: | |
lx1, ly1, lx2, ly2 = largest_box | |
s_ = f"({lx1},{ly1},{lx2},{ly2})" | |
d_.rectangle([lx1, ly1, lx2, ly2], outline=BLACK_BOX_PERSON, width=4) | |
color_log["BLACK_BOX_PERSON"]["count"] += 1 | |
color_log["BLACK_BOX_PERSON"]["boxes"].append(s_) | |
box_drawn += 1 | |
ctx.pil_img["original"] = [pi_rgba, orig_filename, None] | |
it_["status"] = "ok" if apply_draw else "no_drawing" | |
it_["boxes_drawn"] = box_drawn | |
it_["data"] = { | |
"colors": color_log, | |
"draw_enabled": apply_draw, | |
"boxes_count": { | |
"total": len(final_boxes), | |
"drawn": box_drawn | |
} | |
} | |
batch_logs.append(it_) | |
processed_count += 1 | |
processing_time = time.perf_counter() - start_time | |
logging.log(LOG_LEVEL_MAP["SUCCESS"], f"{EMOJI_MAP['SUCCESS']} Completed {function_name}: processed={processed_count}, skipped={skipped_count}, errors={error_count} in {processing_time:.3f}s") | |
return batch_logs | |
# ---------------------------------------------------------------------- | |
# MAIN PIPELINE FUNCTION | |
# ---------------------------------------------------------------------- | |
def ensure_models_loaded(): | |
import app | |
app.ensure_models_loaded() | |
pipeline_step = create_pipeline_step(ensure_models_loaded) | |
def bounding_box(contexts: List[ProcessingContext], batch_logs: List[dict] = None): | |
if batch_logs is None: | |
batch_logs = [] | |
start_time = time.perf_counter() | |
logging.log(LOG_LEVEL_MAP["INFO"], f"{EMOJI_MAP['INFO']} Starting bounding_box pipeline for {len(contexts)} items") | |
from src.models import model_loader | |
RTDETR_PROCESSOR = model_loader.RTDETR_PROCESSOR | |
RTDETR_MODEL = model_loader.RTDETR_MODEL | |
RTDETR_FULL_PRECISION = model_loader.RTDETR_FULL_PRECISION | |
HEAD_PROCESSOR = model_loader.HEAD_PROCESSOR | |
HEAD_MODEL = model_loader.HEAD_MODEL | |
HEAD_DETECTION_FULL_PRECISION = model_loader.HEAD_DETECTION_FULL_PRECISION | |
YOLOS_PROCESSOR = model_loader.YOLOS_PROCESSOR | |
YOLOS_MODEL = model_loader.YOLOS_MODEL | |
DEVICE = model_loader.DEVICE | |
MODELS_LOADED = model_loader.MODELS_LOADED | |
LOAD_ERROR = model_loader.LOAD_ERROR | |
define_largest_box_batch( | |
contexts, batch_logs, RTDETR_PROCESSOR, RTDETR_MODEL, | |
RTDETR_FULL_PRECISION, DEVICE, MODELS_LOADED, LOAD_ERROR | |
) | |
detect_batch( | |
contexts, batch_logs, | |
HEAD_PROCESSOR, HEAD_MODEL, HEAD_DETECTION_FULL_PRECISION, | |
RTDETR_PROCESSOR, RTDETR_MODEL, RTDETR_FULL_PRECISION, | |
YOLOS_PROCESSOR, YOLOS_MODEL, | |
DEVICE, MODELS_LOADED, LOAD_ERROR | |
) | |
choose_color_for_feature_batch(contexts, batch_logs) | |
adjust_blue_box_batch(contexts, batch_logs) | |
draw_batch(contexts, batch_logs) | |
processing_time = time.perf_counter() - start_time | |
logging.log(LOG_LEVEL_MAP["SUCCESS"], f"{EMOJI_MAP['SUCCESS']} Completed bounding_box pipeline for {len(contexts)} items in {processing_time:.3f}s") | |
return batch_logs | |