File size: 6,040 Bytes
d70d685 3dd82ef 700eb8c d70d685 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 |
import time
from concurrent.futures import Future, ThreadPoolExecutor, as_completed
from typing import Generator, TypeAlias
import api
import cloudflare
from misc import log_pair, trigger_push
import db.persistence as persistence
from models import Pair, PendingPair
Failed: TypeAlias = set[PendingPair]
Futures: TypeAlias = dict[Future[Pair], PendingPair]
Headers: TypeAlias = dict[str, str]
def valid_pending_pairs(
allow_numbers: bool,
*,
failed: Failed,
futures: Futures,
order: persistence.PendingPairOrder,
) -> Generator[PendingPair, None, None]:
for pending_pair in persistence.select_pending_pairs(order):
if not allow_numbers and pending_pair.numeric:
continue
if pending_pair in failed:
continue
if pending_pair in futures.values():
continue
yield pending_pair
def queue_pair(
executor: ThreadPoolExecutor,
pending_pair: PendingPair,
futures: Futures,
*,
headers: Headers,
) -> None:
futures[
executor.submit(
api.make_pair_exp_backoff,
pending_pair,
headers,
timeout=5,
)
] = pending_pair
def push_one_future(
executor: ThreadPoolExecutor,
futures: Futures,
*,
allow_numbers: bool,
failed: Failed,
headers: Headers,
order: persistence.PendingPairOrder,
) -> bool:
for pending_pair in valid_pending_pairs(
allow_numbers,
failed=failed,
futures=futures,
order=order,
):
queue_pair(executor, pending_pair, futures, headers=headers)
return True
return False
def handle_completed_futures(
futures: Futures,
*,
failed: Failed,
timeout: float,
) -> Generator[Pair | None, None, None]:
n_elements, n_pairs = persistence.counts()
log_line = f"Pairs: {n_pairs:,d} Elements: {n_elements:,d}"
last_n_elements = n_elements
for future in as_completed(futures, timeout=timeout):
pending_pair = futures.pop(future)
try:
pair = future.result()
except TimeoutError:
print(f"[API TIMED OUT] {pending_pair}".ljust(len(log_line)))
print(log_line, end="\r")
failed.add(pending_pair)
yield None
continue
except Exception as e:
print(f"[API FAILED - {e!r}] {pending_pair}".ljust(len(log_line)))
print(log_line, end="\r")
failed.add(pending_pair)
yield None
continue
try:
persistence.record_pair(pair)
except Exception as e:
print(f"[DATABASE FAILED - {e!r}] {pair}".ljust(len(log_line)))
print(log_line, end="\r")
failed.add(pending_pair)
yield None
continue
yield pair
n_elements, n_pairs = persistence.counts()
log_line = f"Pairs: {n_pairs:,d} Elements: {n_elements:,d}"
print(f"Pair #{n_pairs}: {str(pair)}")
log_pair(f"Pair #{n_pairs}: {str(pair)}")
if n_elements != last_n_elements:
res_name = pair.result.name
res_emoji = pair.result.emoji
res_id = pair.result.database_id
print(f'New element: {res_emoji} {res_name} (ID {res_id})')
log_pair(f'New element: {res_emoji} {res_name} (ID {res_id})')
if n_pairs % 10000 == 0:
print(f'Reached {n_pairs} pairs. Sending to DB.')
trigger_push(n_pairs)
time.sleep(90)
def now() -> float:
return time.perf_counter()
def scan(allow_numbers: bool, seconds_per_request: float, threads: int) -> None:
threads = max(threads, 1)
headers: Headers = cloudflare.get_headers()
failed: Failed = set()
futures: Futures = {}
orders = persistence.PENDING_PAIR_ORDERS.copy()
with ThreadPoolExecutor(threads) as executor:
def shutdown() -> None:
executor.shutdown(False, cancel_futures=True)
incomplete_futures = [f for f in futures if not f.done()]
if not incomplete_futures:
return
n = len(incomplete_futures)
before = time.perf_counter()
print(f"[SHUTTING DOWN] 0/{n} threads terminated...", end="\r")
for i, _ in enumerate(as_completed(incomplete_futures), 1):
print(f"[SHUTTING DOWN] {i}/{n} threads terminated...", end="\r")
duration = 1000 * (time.perf_counter() - before)
print(f"[SHUTDOWN] {n} thread(s) completed in {duration:.2f} milliseconds.")
while True:
if len(futures) < threads * 2:
pushed = push_one_future(
executor,
futures,
allow_numbers=allow_numbers,
failed=failed,
headers=headers,
order=orders[0],
)
if not pushed:
if failed:
failed.clear()
continue
if not futures:
print("Completed! All possible pairs have been made!")
return
next_future_at = now() + seconds_per_request
try:
for pair in handle_completed_futures(
futures,
failed=failed,
timeout=next_future_at - now(),
):
if not pair or pair.result.name.lower() == "nothing":
orders.insert(0, orders.pop())
except TimeoutError:
pass
except Exception as e:
pass
delay_remaining = next_future_at - now()
if delay_remaining < 0:
continue
try:
time.sleep(delay_remaining)
except:
shutdown()
raise
if __name__ == "__main__":
scan(False, 0.25, 8)
|