|
import time |
|
from concurrent.futures import Future, ThreadPoolExecutor, as_completed |
|
from typing import Generator, TypeAlias |
|
|
|
import api |
|
import cloudflare |
|
from misc import log_pair, trigger_push |
|
import db.persistence as persistence |
|
from models import Pair, PendingPair |
|
|
|
Failed: TypeAlias = set[PendingPair] |
|
Futures: TypeAlias = dict[Future[Pair], PendingPair] |
|
Headers: TypeAlias = dict[str, str] |
|
|
|
|
|
def valid_pending_pairs( |
|
allow_numbers: bool, |
|
*, |
|
failed: Failed, |
|
futures: Futures, |
|
order: persistence.PendingPairOrder, |
|
) -> Generator[PendingPair, None, None]: |
|
for pending_pair in persistence.select_pending_pairs(order): |
|
if not allow_numbers and pending_pair.numeric: |
|
continue |
|
|
|
if pending_pair in failed: |
|
continue |
|
|
|
if pending_pair in futures.values(): |
|
continue |
|
|
|
yield pending_pair |
|
|
|
|
|
def queue_pair( |
|
executor: ThreadPoolExecutor, |
|
pending_pair: PendingPair, |
|
futures: Futures, |
|
*, |
|
headers: Headers, |
|
) -> None: |
|
futures[ |
|
executor.submit( |
|
api.make_pair_exp_backoff, |
|
pending_pair, |
|
headers, |
|
timeout=5, |
|
) |
|
] = pending_pair |
|
|
|
|
|
def push_one_future( |
|
executor: ThreadPoolExecutor, |
|
futures: Futures, |
|
*, |
|
allow_numbers: bool, |
|
failed: Failed, |
|
headers: Headers, |
|
order: persistence.PendingPairOrder, |
|
) -> bool: |
|
for pending_pair in valid_pending_pairs( |
|
allow_numbers, |
|
failed=failed, |
|
futures=futures, |
|
order=order, |
|
): |
|
queue_pair(executor, pending_pair, futures, headers=headers) |
|
return True |
|
return False |
|
|
|
|
|
def handle_completed_futures( |
|
futures: Futures, |
|
*, |
|
failed: Failed, |
|
timeout: float, |
|
) -> Generator[Pair | None, None, None]: |
|
n_elements, n_pairs = persistence.counts() |
|
log_line = f"Pairs: {n_pairs:,d} Elements: {n_elements:,d}" |
|
last_n_elements = n_elements |
|
for future in as_completed(futures, timeout=timeout): |
|
pending_pair = futures.pop(future) |
|
try: |
|
pair = future.result() |
|
except TimeoutError: |
|
print(f"[API TIMED OUT] {pending_pair}".ljust(len(log_line))) |
|
print(log_line, end="\r") |
|
failed.add(pending_pair) |
|
yield None |
|
continue |
|
except Exception as e: |
|
print(f"[API FAILED - {e!r}] {pending_pair}".ljust(len(log_line))) |
|
print(log_line, end="\r") |
|
failed.add(pending_pair) |
|
yield None |
|
continue |
|
|
|
try: |
|
persistence.record_pair(pair) |
|
except Exception as e: |
|
print(f"[DATABASE FAILED - {e!r}] {pair}".ljust(len(log_line))) |
|
print(log_line, end="\r") |
|
failed.add(pending_pair) |
|
yield None |
|
continue |
|
|
|
yield pair |
|
|
|
n_elements, n_pairs = persistence.counts() |
|
log_line = f"Pairs: {n_pairs:,d} Elements: {n_elements:,d}" |
|
|
|
print(f"Pair #{n_pairs}: {str(pair)}") |
|
log_pair(f"Pair #{n_pairs}: {str(pair)}") |
|
if n_elements != last_n_elements: |
|
res_name = pair.result.name |
|
res_emoji = pair.result.emoji |
|
res_id = pair.result.database_id |
|
print(f'New element: {res_emoji} {res_name} (ID {res_id})') |
|
log_pair(f'New element: {res_emoji} {res_name} (ID {res_id})') |
|
if n_pairs % 10000 == 0: |
|
print(f'Reached {n_pairs} pairs. Sending to DB.') |
|
trigger_push(n_pairs) |
|
time.sleep(90) |
|
|
|
def now() -> float: |
|
return time.perf_counter() |
|
|
|
|
|
def scan(allow_numbers: bool, seconds_per_request: float, threads: int) -> None: |
|
threads = max(threads, 1) |
|
|
|
headers: Headers = cloudflare.get_headers() |
|
failed: Failed = set() |
|
futures: Futures = {} |
|
|
|
orders = persistence.PENDING_PAIR_ORDERS.copy() |
|
|
|
with ThreadPoolExecutor(threads) as executor: |
|
|
|
def shutdown() -> None: |
|
executor.shutdown(False, cancel_futures=True) |
|
incomplete_futures = [f for f in futures if not f.done()] |
|
if not incomplete_futures: |
|
return |
|
|
|
n = len(incomplete_futures) |
|
|
|
before = time.perf_counter() |
|
print(f"[SHUTTING DOWN] 0/{n} threads terminated...", end="\r") |
|
for i, _ in enumerate(as_completed(incomplete_futures), 1): |
|
print(f"[SHUTTING DOWN] {i}/{n} threads terminated...", end="\r") |
|
duration = 1000 * (time.perf_counter() - before) |
|
print(f"[SHUTDOWN] {n} thread(s) completed in {duration:.2f} milliseconds.") |
|
|
|
while True: |
|
if len(futures) < threads * 2: |
|
pushed = push_one_future( |
|
executor, |
|
futures, |
|
allow_numbers=allow_numbers, |
|
failed=failed, |
|
headers=headers, |
|
order=orders[0], |
|
) |
|
|
|
if not pushed: |
|
if failed: |
|
failed.clear() |
|
continue |
|
|
|
if not futures: |
|
print("Completed! All possible pairs have been made!") |
|
return |
|
|
|
next_future_at = now() + seconds_per_request |
|
try: |
|
for pair in handle_completed_futures( |
|
futures, |
|
failed=failed, |
|
timeout=next_future_at - now(), |
|
): |
|
if not pair or pair.result.name.lower() == "nothing": |
|
orders.insert(0, orders.pop()) |
|
except TimeoutError: |
|
pass |
|
except Exception as e: |
|
pass |
|
|
|
delay_remaining = next_future_at - now() |
|
if delay_remaining < 0: |
|
continue |
|
|
|
try: |
|
time.sleep(delay_remaining) |
|
except: |
|
shutdown() |
|
raise |
|
|
|
|
|
if __name__ == "__main__": |
|
scan(False, 0.25, 8) |
|
|