# ---------------------------------------------------------------------- # IMPORTS # ---------------------------------------------------------------------- import io import time import requests from PIL import Image, ImageOps from typing import List from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry from src.utils import ProcessingContext, create_pipeline_step, LOG_LEVEL_MAP, EMOJI_MAP # ---------------------------------------------------------------------- # GLOBAL CONSTANTS # ---------------------------------------------------------------------- BATCH_DOWNLOAD_TIMEOUT = 30 MAX_RETRIES = 3 RETRY_DELAY = 2 BACKOFF_MULTIPLIER = 1.5 MAX_RETRIES_PER_REQUEST = 2 # ---------------------------------------------------------------------- # SESSION CONFIGURATION # ---------------------------------------------------------------------- session = requests.Session() session.headers.update({ 'User-Agent': 'Mozilla/5.0 (compatible; ImageProcessor/1.0)', 'Accept': 'image/*', 'Accept-Encoding': 'gzip, deflate', 'Connection': 'keep-alive' }) retry_strategy = Retry( total=MAX_RETRIES_PER_REQUEST, status_forcelist=[429, 500, 502, 503, 504], backoff_factor=1, allowed_methods=["GET"] ) adapter = HTTPAdapter(max_retries=retry_strategy) session.mount("http://", adapter) session.mount("https://", adapter) # ---------------------------------------------------------------------- # CORE IMPLEMENTATION # ---------------------------------------------------------------------- def download_image_with_retry(url, max_retries=MAX_RETRIES, timeout=BATCH_DOWNLOAD_TIMEOUT): last_exception = None for attempt in range(max_retries + 1): try: delay = RETRY_DELAY * (BACKOFF_MULTIPLIER ** attempt) if attempt > 0 else 0 if delay > 0: time.sleep(delay) resp = session.get(url, timeout=timeout) resp.raise_for_status() if "image" not in resp.headers.get("Content-Type", ""): raise ValueError("Non-image content received") return resp.content, attempt + 1 except Exception as e: last_exception = e if attempt < max_retries: continue else: raise last_exception def download_images_batch(contexts, batch_logs): function_name = "download_images_batch" start_time = time.perf_counter() downloaded_count = 0 skipped_count = 0 error_count = 0 for ctx in contexts: log_item = { "image_url": ctx.url, "function": function_name, "data": {} } if ctx.skip_run or ctx.skip_processing: log_item["status"] = "skipped" log_item["data"]["reason"] = "marked_for_skip" batch_logs.append(log_item) skipped_count += 1 continue try: download_start = time.perf_counter() content, attempts = download_image_with_retry(ctx.url) download_time = time.perf_counter() - download_start content_type = session.head(ctx.url, timeout=5).headers.get("Content-Type", "unknown") content_size = len(content) img = Image.open(io.BytesIO(content)) original_size = img.size ctx._download_content = content log_item["status"] = "ok" log_item["data"].update({ "download_time": round(download_time, 4), "attempts": attempts, "content_size": content_size, "content_type": content_type, "image_size": original_size }) downloaded_count += 1 except Exception as e: log_item["status"] = "error" log_item["exception"] = str(e) log_item["data"]["download_time"] = round(time.perf_counter() - download_start, 4) if 'download_start' in locals() else 0 ctx.skip_run = True error_count += 1 batch_logs.append(log_item) processing_time = time.perf_counter() - start_time download_summary = { "function": "download_summary", "status": "info", "data": { "total_time": round(processing_time, 4), "downloaded_count": downloaded_count, "skipped_count": skipped_count, "error_count": error_count, "success_rate": f"{downloaded_count/(downloaded_count+error_count):.2%}" if (downloaded_count + error_count) > 0 else "N/A" } } batch_logs.append(download_summary) if error_count > 0: batch_abort_log = { "function": "batch_abort_due_to_download_failures", "status": "error", "data": { "reason": "One or more images failed to download", "total_contexts": len(contexts), "download_errors": error_count, "downloaded_successfully": downloaded_count, "action": "Aborting entire batch processing" } } batch_logs.append(batch_abort_log) for ctx in contexts: ctx.skip_run = True return batch_logs, downloaded_count, skipped_count, error_count def image_download_batch_implementation(contexts, batch_logs): batch_logs, downloaded, skipped, errors = download_images_batch(contexts, batch_logs) return batch_logs # ---------------------------------------------------------------------- # MAIN PIPELINE FUNCTION # ---------------------------------------------------------------------- def _ensure_models_loaded(): import app app.ensure_models_loaded() pipeline_step = create_pipeline_step(_ensure_models_loaded) @pipeline_step def image_download( contexts: List[ProcessingContext], batch_logs: List[dict] | None = None ): import logging if batch_logs is None: batch_logs = [] calibration_info = { "function": "image_download_calibration_info", "status": "info", "data": { "download_timeout": BATCH_DOWNLOAD_TIMEOUT, "max_retries": MAX_RETRIES, "retry_delay": RETRY_DELAY, "backoff_multiplier": BACKOFF_MULTIPLIER, "max_retries_per_request": MAX_RETRIES_PER_REQUEST } } batch_logs.append(calibration_info) start_time = time.perf_counter() logging.log(LOG_LEVEL_MAP["INFO"], f"{EMOJI_MAP['INFO']} Starting image_download for {len(contexts)} items") image_download_batch_implementation(contexts, batch_logs) processing_time = time.perf_counter() - start_time logging.log(LOG_LEVEL_MAP["SUCCESS"], f"{EMOJI_MAP['SUCCESS']} Completed image_download for {len(contexts)} items in {processing_time:.3f}s") return batch_logs