GitHub Actions
Deploy to Hugging Face Space: product-image-update-port-10
18faf97
# ----------------------------------------------------------------------
# IMPORTS
# ----------------------------------------------------------------------
import io
import time
import requests
from PIL import Image, ImageOps
from typing import List
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from src.utils import ProcessingContext, create_pipeline_step, LOG_LEVEL_MAP, EMOJI_MAP
# ----------------------------------------------------------------------
# GLOBAL CONSTANTS
# ----------------------------------------------------------------------
BATCH_DOWNLOAD_TIMEOUT = 30
MAX_RETRIES = 3
RETRY_DELAY = 2
BACKOFF_MULTIPLIER = 1.5
MAX_RETRIES_PER_REQUEST = 2
# ----------------------------------------------------------------------
# SESSION CONFIGURATION
# ----------------------------------------------------------------------
session = requests.Session()
session.headers.update({
'User-Agent': 'Mozilla/5.0 (compatible; ImageProcessor/1.0)',
'Accept': 'image/*',
'Accept-Encoding': 'gzip, deflate',
'Connection': 'keep-alive'
})
retry_strategy = Retry(
total=MAX_RETRIES_PER_REQUEST,
status_forcelist=[429, 500, 502, 503, 504],
backoff_factor=1,
allowed_methods=["GET"]
)
adapter = HTTPAdapter(max_retries=retry_strategy)
session.mount("http://", adapter)
session.mount("https://", adapter)
# ----------------------------------------------------------------------
# CORE IMPLEMENTATION
# ----------------------------------------------------------------------
def download_image_with_retry(url, max_retries=MAX_RETRIES, timeout=BATCH_DOWNLOAD_TIMEOUT):
last_exception = None
for attempt in range(max_retries + 1):
try:
delay = RETRY_DELAY * (BACKOFF_MULTIPLIER ** attempt) if attempt > 0 else 0
if delay > 0:
time.sleep(delay)
resp = session.get(url, timeout=timeout)
resp.raise_for_status()
if "image" not in resp.headers.get("Content-Type", ""):
raise ValueError("Non-image content received")
return resp.content, attempt + 1
except Exception as e:
last_exception = e
if attempt < max_retries:
continue
else:
raise last_exception
def download_images_batch(contexts, batch_logs):
function_name = "download_images_batch"
start_time = time.perf_counter()
downloaded_count = 0
skipped_count = 0
error_count = 0
for ctx in contexts:
log_item = {
"image_url": ctx.url,
"function": function_name,
"data": {}
}
if ctx.skip_run or ctx.skip_processing:
log_item["status"] = "skipped"
log_item["data"]["reason"] = "marked_for_skip"
batch_logs.append(log_item)
skipped_count += 1
continue
try:
download_start = time.perf_counter()
content, attempts = download_image_with_retry(ctx.url)
download_time = time.perf_counter() - download_start
content_type = session.head(ctx.url, timeout=5).headers.get("Content-Type", "unknown")
content_size = len(content)
img = Image.open(io.BytesIO(content))
original_size = img.size
ctx._download_content = content
log_item["status"] = "ok"
log_item["data"].update({
"download_time": round(download_time, 4),
"attempts": attempts,
"content_size": content_size,
"content_type": content_type,
"image_size": original_size
})
downloaded_count += 1
except Exception as e:
log_item["status"] = "error"
log_item["exception"] = str(e)
log_item["data"]["download_time"] = round(time.perf_counter() - download_start, 4) if 'download_start' in locals() else 0
ctx.skip_run = True
error_count += 1
batch_logs.append(log_item)
processing_time = time.perf_counter() - start_time
download_summary = {
"function": "download_summary",
"status": "info",
"data": {
"total_time": round(processing_time, 4),
"downloaded_count": downloaded_count,
"skipped_count": skipped_count,
"error_count": error_count,
"success_rate": f"{downloaded_count/(downloaded_count+error_count):.2%}" if (downloaded_count + error_count) > 0 else "N/A"
}
}
batch_logs.append(download_summary)
if error_count > 0:
batch_abort_log = {
"function": "batch_abort_due_to_download_failures",
"status": "error",
"data": {
"reason": "One or more images failed to download",
"total_contexts": len(contexts),
"download_errors": error_count,
"downloaded_successfully": downloaded_count,
"action": "Aborting entire batch processing"
}
}
batch_logs.append(batch_abort_log)
for ctx in contexts:
ctx.skip_run = True
return batch_logs, downloaded_count, skipped_count, error_count
def image_download_batch_implementation(contexts, batch_logs):
batch_logs, downloaded, skipped, errors = download_images_batch(contexts, batch_logs)
return batch_logs
# ----------------------------------------------------------------------
# MAIN PIPELINE FUNCTION
# ----------------------------------------------------------------------
def _ensure_models_loaded():
import app
app.ensure_models_loaded()
pipeline_step = create_pipeline_step(_ensure_models_loaded)
@pipeline_step
def image_download(
contexts: List[ProcessingContext],
batch_logs: List[dict] | None = None
):
import logging
if batch_logs is None:
batch_logs = []
calibration_info = {
"function": "image_download_calibration_info",
"status": "info",
"data": {
"download_timeout": BATCH_DOWNLOAD_TIMEOUT,
"max_retries": MAX_RETRIES,
"retry_delay": RETRY_DELAY,
"backoff_multiplier": BACKOFF_MULTIPLIER,
"max_retries_per_request": MAX_RETRIES_PER_REQUEST
}
}
batch_logs.append(calibration_info)
start_time = time.perf_counter()
logging.log(LOG_LEVEL_MAP["INFO"], f"{EMOJI_MAP['INFO']} Starting image_download for {len(contexts)} items")
image_download_batch_implementation(contexts, batch_logs)
processing_time = time.perf_counter() - start_time
logging.log(LOG_LEVEL_MAP["SUCCESS"], f"{EMOJI_MAP['SUCCESS']} Completed image_download for {len(contexts)} items in {processing_time:.3f}s")
return batch_logs