File size: 7,025 Bytes
18faf97
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
# ----------------------------------------------------------------------
# IMPORTS
# ----------------------------------------------------------------------
import io
import time
import requests
from PIL import Image, ImageOps
from typing import List
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from src.utils import ProcessingContext, create_pipeline_step, LOG_LEVEL_MAP, EMOJI_MAP

# ----------------------------------------------------------------------
# GLOBAL CONSTANTS
# ----------------------------------------------------------------------
BATCH_DOWNLOAD_TIMEOUT = 30
MAX_RETRIES = 3
RETRY_DELAY = 2
BACKOFF_MULTIPLIER = 1.5
MAX_RETRIES_PER_REQUEST = 2

# ----------------------------------------------------------------------
# SESSION CONFIGURATION
# ----------------------------------------------------------------------
session = requests.Session()
session.headers.update({
    'User-Agent': 'Mozilla/5.0 (compatible; ImageProcessor/1.0)',
    'Accept': 'image/*',
    'Accept-Encoding': 'gzip, deflate',
    'Connection': 'keep-alive'
})

retry_strategy = Retry(
    total=MAX_RETRIES_PER_REQUEST,
    status_forcelist=[429, 500, 502, 503, 504],
    backoff_factor=1,
    allowed_methods=["GET"]
)
adapter = HTTPAdapter(max_retries=retry_strategy)
session.mount("http://", adapter)
session.mount("https://", adapter)

# ----------------------------------------------------------------------
# CORE IMPLEMENTATION
# ----------------------------------------------------------------------
def download_image_with_retry(url, max_retries=MAX_RETRIES, timeout=BATCH_DOWNLOAD_TIMEOUT):
    last_exception = None
    
    for attempt in range(max_retries + 1):
        try:
            delay = RETRY_DELAY * (BACKOFF_MULTIPLIER ** attempt) if attempt > 0 else 0
            if delay > 0:
                time.sleep(delay)
                
            resp = session.get(url, timeout=timeout)
            resp.raise_for_status()
            
            if "image" not in resp.headers.get("Content-Type", ""):
                raise ValueError("Non-image content received")
                
            return resp.content, attempt + 1
            
        except Exception as e:
            last_exception = e
            if attempt < max_retries:
                continue
            else:
                raise last_exception

def download_images_batch(contexts, batch_logs):
    function_name = "download_images_batch"
    start_time = time.perf_counter()
    downloaded_count = 0
    skipped_count = 0
    error_count = 0
    
    for ctx in contexts:
        log_item = {
            "image_url": ctx.url,
            "function": function_name,
            "data": {}
        }
        
        if ctx.skip_run or ctx.skip_processing:
            log_item["status"] = "skipped"
            log_item["data"]["reason"] = "marked_for_skip"
            batch_logs.append(log_item)
            skipped_count += 1
            continue
            
        try:
            download_start = time.perf_counter()
            content, attempts = download_image_with_retry(ctx.url)
            download_time = time.perf_counter() - download_start
            
            content_type = session.head(ctx.url, timeout=5).headers.get("Content-Type", "unknown")
            content_size = len(content)
            
            img = Image.open(io.BytesIO(content))
            original_size = img.size
            
            ctx._download_content = content
            
            log_item["status"] = "ok"
            log_item["data"].update({
                "download_time": round(download_time, 4),
                "attempts": attempts,
                "content_size": content_size,
                "content_type": content_type,
                "image_size": original_size
            })
            downloaded_count += 1
            
        except Exception as e:
            log_item["status"] = "error"
            log_item["exception"] = str(e)
            log_item["data"]["download_time"] = round(time.perf_counter() - download_start, 4) if 'download_start' in locals() else 0
            ctx.skip_run = True
            error_count += 1
            
        batch_logs.append(log_item)
    
    processing_time = time.perf_counter() - start_time
    download_summary = {
        "function": "download_summary",
        "status": "info",
        "data": {
            "total_time": round(processing_time, 4),
            "downloaded_count": downloaded_count,
            "skipped_count": skipped_count,
            "error_count": error_count,
            "success_rate": f"{downloaded_count/(downloaded_count+error_count):.2%}" if (downloaded_count + error_count) > 0 else "N/A"
        }
    }
    batch_logs.append(download_summary)
    
    if error_count > 0:
        batch_abort_log = {
            "function": "batch_abort_due_to_download_failures",
            "status": "error", 
            "data": {
                "reason": "One or more images failed to download",
                "total_contexts": len(contexts),
                "download_errors": error_count,
                "downloaded_successfully": downloaded_count,
                "action": "Aborting entire batch processing"
            }
        }
        batch_logs.append(batch_abort_log)
        
        for ctx in contexts:
            ctx.skip_run = True
    
    return batch_logs, downloaded_count, skipped_count, error_count

def image_download_batch_implementation(contexts, batch_logs):
    batch_logs, downloaded, skipped, errors = download_images_batch(contexts, batch_logs)
    return batch_logs

# ----------------------------------------------------------------------
# MAIN PIPELINE FUNCTION
# ----------------------------------------------------------------------
def _ensure_models_loaded():
    import app
    app.ensure_models_loaded()

pipeline_step = create_pipeline_step(_ensure_models_loaded)

@pipeline_step
def image_download(
    contexts: List[ProcessingContext],
    batch_logs: List[dict] | None = None
):
    import logging
    
    if batch_logs is None:
        batch_logs = []
        
    calibration_info = {
        "function": "image_download_calibration_info",
        "status": "info",
        "data": {
            "download_timeout": BATCH_DOWNLOAD_TIMEOUT,
            "max_retries": MAX_RETRIES,
            "retry_delay": RETRY_DELAY,
            "backoff_multiplier": BACKOFF_MULTIPLIER,
            "max_retries_per_request": MAX_RETRIES_PER_REQUEST
        }
    }
    batch_logs.append(calibration_info)
    
    start_time = time.perf_counter()
    logging.log(LOG_LEVEL_MAP["INFO"], f"{EMOJI_MAP['INFO']} Starting image_download for {len(contexts)} items")
    
    image_download_batch_implementation(contexts, batch_logs)
    
    processing_time = time.perf_counter() - start_time
    logging.log(LOG_LEVEL_MAP["SUCCESS"], f"{EMOJI_MAP['SUCCESS']} Completed image_download for {len(contexts)} items in {processing_time:.3f}s")
    
    return batch_logs