|
|
|
|
|
|
|
import io |
|
import time |
|
import requests |
|
from PIL import Image, ImageOps |
|
from typing import List |
|
from requests.adapters import HTTPAdapter |
|
from urllib3.util.retry import Retry |
|
from src.utils import ProcessingContext, create_pipeline_step, LOG_LEVEL_MAP, EMOJI_MAP |
|
|
|
|
|
|
|
|
|
BATCH_DOWNLOAD_TIMEOUT = 30 |
|
MAX_RETRIES = 3 |
|
RETRY_DELAY = 2 |
|
BACKOFF_MULTIPLIER = 1.5 |
|
MAX_RETRIES_PER_REQUEST = 2 |
|
|
|
|
|
|
|
|
|
session = requests.Session() |
|
session.headers.update({ |
|
'User-Agent': 'Mozilla/5.0 (compatible; ImageProcessor/1.0)', |
|
'Accept': 'image/*', |
|
'Accept-Encoding': 'gzip, deflate', |
|
'Connection': 'keep-alive' |
|
}) |
|
|
|
retry_strategy = Retry( |
|
total=MAX_RETRIES_PER_REQUEST, |
|
status_forcelist=[429, 500, 502, 503, 504], |
|
backoff_factor=1, |
|
allowed_methods=["GET"] |
|
) |
|
adapter = HTTPAdapter(max_retries=retry_strategy) |
|
session.mount("http://", adapter) |
|
session.mount("https://", adapter) |
|
|
|
|
|
|
|
|
|
def download_image_with_retry(url, max_retries=MAX_RETRIES, timeout=BATCH_DOWNLOAD_TIMEOUT): |
|
last_exception = None |
|
|
|
for attempt in range(max_retries + 1): |
|
try: |
|
delay = RETRY_DELAY * (BACKOFF_MULTIPLIER ** attempt) if attempt > 0 else 0 |
|
if delay > 0: |
|
time.sleep(delay) |
|
|
|
resp = session.get(url, timeout=timeout) |
|
resp.raise_for_status() |
|
|
|
if "image" not in resp.headers.get("Content-Type", ""): |
|
raise ValueError("Non-image content received") |
|
|
|
return resp.content, attempt + 1 |
|
|
|
except Exception as e: |
|
last_exception = e |
|
if attempt < max_retries: |
|
continue |
|
else: |
|
raise last_exception |
|
|
|
def download_images_batch(contexts, batch_logs): |
|
function_name = "download_images_batch" |
|
start_time = time.perf_counter() |
|
downloaded_count = 0 |
|
skipped_count = 0 |
|
error_count = 0 |
|
|
|
for ctx in contexts: |
|
log_item = { |
|
"image_url": ctx.url, |
|
"function": function_name, |
|
"data": {} |
|
} |
|
|
|
if ctx.skip_run or ctx.skip_processing: |
|
log_item["status"] = "skipped" |
|
log_item["data"]["reason"] = "marked_for_skip" |
|
batch_logs.append(log_item) |
|
skipped_count += 1 |
|
continue |
|
|
|
try: |
|
download_start = time.perf_counter() |
|
content, attempts = download_image_with_retry(ctx.url) |
|
download_time = time.perf_counter() - download_start |
|
|
|
content_type = session.head(ctx.url, timeout=5).headers.get("Content-Type", "unknown") |
|
content_size = len(content) |
|
|
|
img = Image.open(io.BytesIO(content)) |
|
original_size = img.size |
|
|
|
ctx._download_content = content |
|
|
|
log_item["status"] = "ok" |
|
log_item["data"].update({ |
|
"download_time": round(download_time, 4), |
|
"attempts": attempts, |
|
"content_size": content_size, |
|
"content_type": content_type, |
|
"image_size": original_size |
|
}) |
|
downloaded_count += 1 |
|
|
|
except Exception as e: |
|
log_item["status"] = "error" |
|
log_item["exception"] = str(e) |
|
log_item["data"]["download_time"] = round(time.perf_counter() - download_start, 4) if 'download_start' in locals() else 0 |
|
ctx.skip_run = True |
|
error_count += 1 |
|
|
|
batch_logs.append(log_item) |
|
|
|
processing_time = time.perf_counter() - start_time |
|
download_summary = { |
|
"function": "download_summary", |
|
"status": "info", |
|
"data": { |
|
"total_time": round(processing_time, 4), |
|
"downloaded_count": downloaded_count, |
|
"skipped_count": skipped_count, |
|
"error_count": error_count, |
|
"success_rate": f"{downloaded_count/(downloaded_count+error_count):.2%}" if (downloaded_count + error_count) > 0 else "N/A" |
|
} |
|
} |
|
batch_logs.append(download_summary) |
|
|
|
if error_count > 0: |
|
batch_abort_log = { |
|
"function": "batch_abort_due_to_download_failures", |
|
"status": "error", |
|
"data": { |
|
"reason": "One or more images failed to download", |
|
"total_contexts": len(contexts), |
|
"download_errors": error_count, |
|
"downloaded_successfully": downloaded_count, |
|
"action": "Aborting entire batch processing" |
|
} |
|
} |
|
batch_logs.append(batch_abort_log) |
|
|
|
for ctx in contexts: |
|
ctx.skip_run = True |
|
|
|
return batch_logs, downloaded_count, skipped_count, error_count |
|
|
|
def image_download_batch_implementation(contexts, batch_logs): |
|
batch_logs, downloaded, skipped, errors = download_images_batch(contexts, batch_logs) |
|
return batch_logs |
|
|
|
|
|
|
|
|
|
def _ensure_models_loaded(): |
|
import app |
|
app.ensure_models_loaded() |
|
|
|
pipeline_step = create_pipeline_step(_ensure_models_loaded) |
|
|
|
@pipeline_step |
|
def image_download( |
|
contexts: List[ProcessingContext], |
|
batch_logs: List[dict] | None = None |
|
): |
|
import logging |
|
|
|
if batch_logs is None: |
|
batch_logs = [] |
|
|
|
calibration_info = { |
|
"function": "image_download_calibration_info", |
|
"status": "info", |
|
"data": { |
|
"download_timeout": BATCH_DOWNLOAD_TIMEOUT, |
|
"max_retries": MAX_RETRIES, |
|
"retry_delay": RETRY_DELAY, |
|
"backoff_multiplier": BACKOFF_MULTIPLIER, |
|
"max_retries_per_request": MAX_RETRIES_PER_REQUEST |
|
} |
|
} |
|
batch_logs.append(calibration_info) |
|
|
|
start_time = time.perf_counter() |
|
logging.log(LOG_LEVEL_MAP["INFO"], f"{EMOJI_MAP['INFO']} Starting image_download for {len(contexts)} items") |
|
|
|
image_download_batch_implementation(contexts, batch_logs) |
|
|
|
processing_time = time.perf_counter() - start_time |
|
logging.log(LOG_LEVEL_MAP["SUCCESS"], f"{EMOJI_MAP['SUCCESS']} Completed image_download for {len(contexts)} items in {processing_time:.3f}s") |
|
|
|
return batch_logs |
|
|