import time from concurrent.futures import Future, ThreadPoolExecutor, as_completed from typing import Generator, TypeAlias import api import cloudflare from misc import log_pair, trigger_push import db.persistence as persistence from models import Pair, PendingPair Failed: TypeAlias = set[PendingPair] Futures: TypeAlias = dict[Future[Pair], PendingPair] Headers: TypeAlias = dict[str, str] def valid_pending_pairs( allow_numbers: bool, *, failed: Failed, futures: Futures, order: persistence.PendingPairOrder, ) -> Generator[PendingPair, None, None]: for pending_pair in persistence.select_pending_pairs(order): if not allow_numbers and pending_pair.numeric: continue if pending_pair in failed: continue if pending_pair in futures.values(): continue yield pending_pair def queue_pair( executor: ThreadPoolExecutor, pending_pair: PendingPair, futures: Futures, *, headers: Headers, ) -> None: futures[ executor.submit( api.make_pair_exp_backoff, pending_pair, headers, timeout=5, ) ] = pending_pair def push_one_future( executor: ThreadPoolExecutor, futures: Futures, *, allow_numbers: bool, failed: Failed, headers: Headers, order: persistence.PendingPairOrder, ) -> bool: for pending_pair in valid_pending_pairs( allow_numbers, failed=failed, futures=futures, order=order, ): queue_pair(executor, pending_pair, futures, headers=headers) return True return False def handle_completed_futures( futures: Futures, *, failed: Failed, timeout: float, ) -> Generator[Pair | None, None, None]: n_elements, n_pairs = persistence.counts() log_line = f"Pairs: {n_pairs:,d} Elements: {n_elements:,d}" last_n_elements = n_elements for future in as_completed(futures, timeout=timeout): pending_pair = futures.pop(future) try: pair = future.result() except TimeoutError: print(f"[API TIMED OUT] {pending_pair}".ljust(len(log_line))) print(log_line, end="\r") failed.add(pending_pair) yield None continue except Exception as e: print(f"[API FAILED - {e!r}] {pending_pair}".ljust(len(log_line))) print(log_line, end="\r") failed.add(pending_pair) yield None continue try: persistence.record_pair(pair) except Exception as e: print(f"[DATABASE FAILED - {e!r}] {pair}".ljust(len(log_line))) print(log_line, end="\r") failed.add(pending_pair) yield None continue yield pair n_elements, n_pairs = persistence.counts() log_line = f"Pairs: {n_pairs:,d} Elements: {n_elements:,d}" print(f"Pair #{n_pairs}: {str(pair)}") log_pair(f"Pair #{n_pairs}: {str(pair)}") if n_elements != last_n_elements: res_name = pair.result.name res_emoji = pair.result.emoji res_id = pair.result.database_id print(f'New element: {res_emoji} {res_name} (ID {res_id})') log_pair(f'New element: {res_emoji} {res_name} (ID {res_id})') if n_pairs % 10000 == 0: print(f'Reached {n_pairs} pairs. Sending to DB.') trigger_push(n_pairs) time.sleep(90) def now() -> float: return time.perf_counter() def scan(allow_numbers: bool, seconds_per_request: float, threads: int) -> None: threads = max(threads, 1) headers: Headers = cloudflare.get_headers() failed: Failed = set() futures: Futures = {} orders = persistence.PENDING_PAIR_ORDERS.copy() with ThreadPoolExecutor(threads) as executor: def shutdown() -> None: executor.shutdown(False, cancel_futures=True) incomplete_futures = [f for f in futures if not f.done()] if not incomplete_futures: return n = len(incomplete_futures) before = time.perf_counter() print(f"[SHUTTING DOWN] 0/{n} threads terminated...", end="\r") for i, _ in enumerate(as_completed(incomplete_futures), 1): print(f"[SHUTTING DOWN] {i}/{n} threads terminated...", end="\r") duration = 1000 * (time.perf_counter() - before) print(f"[SHUTDOWN] {n} thread(s) completed in {duration:.2f} milliseconds.") while True: if len(futures) < threads * 2: pushed = push_one_future( executor, futures, allow_numbers=allow_numbers, failed=failed, headers=headers, order=orders[0], ) if not pushed: if failed: failed.clear() continue if not futures: print("Completed! All possible pairs have been made!") return next_future_at = now() + seconds_per_request try: for pair in handle_completed_futures( futures, failed=failed, timeout=next_future_at - now(), ): if not pair or pair.result.name.lower() == "nothing": orders.insert(0, orders.pop()) except TimeoutError: pass except Exception as e: pass delay_remaining = next_future_at - now() if delay_remaining < 0: continue try: time.sleep(delay_remaining) except: shutdown() raise if __name__ == "__main__": scan(False, 0.25, 8)