| import time | |
| from concurrent.futures import Future, ThreadPoolExecutor, as_completed | |
| from typing import Generator, TypeAlias | |
| import api | |
| import cloudflare | |
| from misc import log_pair, trigger_push | |
| import db.persistence as persistence | |
| from models import Pair, PendingPair | |
| Failed: TypeAlias = set[PendingPair] | |
| Futures: TypeAlias = dict[Future[Pair], PendingPair] | |
| Headers: TypeAlias = dict[str, str] | |
| def valid_pending_pairs( | |
| allow_numbers: bool, | |
| *, | |
| failed: Failed, | |
| futures: Futures, | |
| order: persistence.PendingPairOrder, | |
| ) -> Generator[PendingPair, None, None]: | |
| for pending_pair in persistence.select_pending_pairs(order): | |
| if not allow_numbers and pending_pair.numeric: | |
| continue | |
| if pending_pair in failed: | |
| continue | |
| if pending_pair in futures.values(): | |
| continue | |
| yield pending_pair | |
| def queue_pair( | |
| executor: ThreadPoolExecutor, | |
| pending_pair: PendingPair, | |
| futures: Futures, | |
| *, | |
| headers: Headers, | |
| ) -> None: | |
| futures[ | |
| executor.submit( | |
| api.make_pair_exp_backoff, | |
| pending_pair, | |
| headers, | |
| timeout=5, | |
| ) | |
| ] = pending_pair | |
| def push_one_future( | |
| executor: ThreadPoolExecutor, | |
| futures: Futures, | |
| *, | |
| allow_numbers: bool, | |
| failed: Failed, | |
| headers: Headers, | |
| order: persistence.PendingPairOrder, | |
| ) -> bool: | |
| for pending_pair in valid_pending_pairs( | |
| allow_numbers, | |
| failed=failed, | |
| futures=futures, | |
| order=order, | |
| ): | |
| queue_pair(executor, pending_pair, futures, headers=headers) | |
| return True | |
| return False | |
| def handle_completed_futures( | |
| futures: Futures, | |
| *, | |
| failed: Failed, | |
| timeout: float, | |
| ) -> Generator[Pair | None, None, None]: | |
| n_elements, n_pairs = persistence.counts() | |
| log_line = f"Pairs: {n_pairs:,d} Elements: {n_elements:,d}" | |
| last_n_elements = n_elements | |
| for future in as_completed(futures, timeout=timeout): | |
| pending_pair = futures.pop(future) | |
| try: | |
| pair = future.result() | |
| except TimeoutError: | |
| print(f"[API TIMED OUT] {pending_pair}".ljust(len(log_line))) | |
| print(log_line, end="\r") | |
| failed.add(pending_pair) | |
| yield None | |
| continue | |
| except Exception as e: | |
| print(f"[API FAILED - {e!r}] {pending_pair}".ljust(len(log_line))) | |
| print(log_line, end="\r") | |
| failed.add(pending_pair) | |
| yield None | |
| continue | |
| try: | |
| persistence.record_pair(pair) | |
| except Exception as e: | |
| print(f"[DATABASE FAILED - {e!r}] {pair}".ljust(len(log_line))) | |
| print(log_line, end="\r") | |
| failed.add(pending_pair) | |
| yield None | |
| continue | |
| yield pair | |
| n_elements, n_pairs = persistence.counts() | |
| log_line = f"Pairs: {n_pairs:,d} Elements: {n_elements:,d}" | |
| print(f"Pair #{n_pairs}: {str(pair)}") | |
| log_pair(f"Pair #{n_pairs}: {str(pair)}") | |
| if n_elements != last_n_elements: | |
| res_name = pair.result.name | |
| res_emoji = pair.result.emoji | |
| res_id = pair.result.database_id | |
| print(f'New element: {res_emoji} {res_name} (ID {res_id})') | |
| log_pair(f'New element: {res_emoji} {res_name} (ID {res_id})') | |
| if n_pairs % 10000 == 0: | |
| print(f'Reached {n_pairs} pairs. Sending to DB.') | |
| trigger_push(n_pairs) | |
| time.sleep(90) | |
| def now() -> float: | |
| return time.perf_counter() | |
| def scan(allow_numbers: bool, seconds_per_request: float, threads: int) -> None: | |
| threads = max(threads, 1) | |
| headers: Headers = cloudflare.get_headers() | |
| failed: Failed = set() | |
| futures: Futures = {} | |
| orders = persistence.PENDING_PAIR_ORDERS.copy() | |
| with ThreadPoolExecutor(threads) as executor: | |
| def shutdown() -> None: | |
| executor.shutdown(False, cancel_futures=True) | |
| incomplete_futures = [f for f in futures if not f.done()] | |
| if not incomplete_futures: | |
| return | |
| n = len(incomplete_futures) | |
| before = time.perf_counter() | |
| print(f"[SHUTTING DOWN] 0/{n} threads terminated...", end="\r") | |
| for i, _ in enumerate(as_completed(incomplete_futures), 1): | |
| print(f"[SHUTTING DOWN] {i}/{n} threads terminated...", end="\r") | |
| duration = 1000 * (time.perf_counter() - before) | |
| print(f"[SHUTDOWN] {n} thread(s) completed in {duration:.2f} milliseconds.") | |
| while True: | |
| if len(futures) < threads * 2: | |
| pushed = push_one_future( | |
| executor, | |
| futures, | |
| allow_numbers=allow_numbers, | |
| failed=failed, | |
| headers=headers, | |
| order=orders[0], | |
| ) | |
| if not pushed: | |
| if failed: | |
| failed.clear() | |
| continue | |
| if not futures: | |
| print("Completed! All possible pairs have been made!") | |
| return | |
| next_future_at = now() + seconds_per_request | |
| try: | |
| for pair in handle_completed_futures( | |
| futures, | |
| failed=failed, | |
| timeout=next_future_at - now(), | |
| ): | |
| if not pair or pair.result.name.lower() == "nothing": | |
| orders.insert(0, orders.pop()) | |
| except TimeoutError: | |
| pass | |
| except Exception as e: | |
| pass | |
| delay_remaining = next_future_at - now() | |
| if delay_remaining < 0: | |
| continue | |
| try: | |
| time.sleep(delay_remaining) | |
| except: | |
| shutdown() | |
| raise | |
| if __name__ == "__main__": | |
| scan(False, 0.25, 8) | |