ThongCoder commited on
Commit
d70d685
·
verified ·
1 Parent(s): 709c473

Update scan.py

Browse files
Files changed (1) hide show
  1. scan.py +202 -202
scan.py CHANGED
@@ -1,202 +1,202 @@
1
- import time
2
- from concurrent.futures import Future, ThreadPoolExecutor, as_completed
3
- from typing import Generator, TypeAlias
4
-
5
- import api
6
- import cloudflare
7
- from misc import log_pair, trigger_push
8
- import db.persistence as persistence
9
- from models import Pair, PendingPair
10
-
11
- Failed: TypeAlias = set[PendingPair]
12
- Futures: TypeAlias = dict[Future[Pair], PendingPair]
13
- Headers: TypeAlias = dict[str, str]
14
-
15
-
16
- def valid_pending_pairs(
17
- allow_numbers: bool,
18
- *,
19
- failed: Failed,
20
- futures: Futures,
21
- order: persistence.PendingPairOrder,
22
- ) -> Generator[PendingPair, None, None]:
23
- for pending_pair in persistence.select_pending_pairs(order):
24
- if not allow_numbers and pending_pair.numeric:
25
- continue
26
-
27
- if pending_pair in failed:
28
- continue
29
-
30
- if pending_pair in futures.values():
31
- continue
32
-
33
- yield pending_pair
34
-
35
-
36
- def queue_pair(
37
- executor: ThreadPoolExecutor,
38
- pending_pair: PendingPair,
39
- futures: Futures,
40
- *,
41
- headers: Headers,
42
- ) -> None:
43
- futures[
44
- executor.submit(
45
- api.make_pair_exp_backoff,
46
- pending_pair,
47
- headers,
48
- timeout=5,
49
- )
50
- ] = pending_pair
51
-
52
-
53
- def push_one_future(
54
- executor: ThreadPoolExecutor,
55
- futures: Futures,
56
- *,
57
- allow_numbers: bool,
58
- failed: Failed,
59
- headers: Headers,
60
- order: persistence.PendingPairOrder,
61
- ) -> bool:
62
- for pending_pair in valid_pending_pairs(
63
- allow_numbers,
64
- failed=failed,
65
- futures=futures,
66
- order=order,
67
- ):
68
- queue_pair(executor, pending_pair, futures, headers=headers)
69
- return True
70
- return False
71
-
72
-
73
- def handle_completed_futures(
74
- futures: Futures,
75
- *,
76
- failed: Failed,
77
- timeout: float,
78
- ) -> Generator[Pair | None, None, None]:
79
- n_elements, n_pairs = persistence.counts()
80
- log_line = f"Pairs: {n_pairs:,d} Elements: {n_elements:,d}"
81
- last_n_elements = n_elements
82
- for future in as_completed(futures, timeout=timeout):
83
- pending_pair = futures.pop(future)
84
- try:
85
- pair = future.result()
86
- except TimeoutError:
87
- print(f"[API TIMED OUT] {pending_pair}".ljust(len(log_line)))
88
- print(log_line, end="\r")
89
- failed.add(pending_pair)
90
- yield None
91
- continue
92
- except Exception as e:
93
- print(f"[API FAILED - {e!r}] {pending_pair}".ljust(len(log_line)))
94
- print(log_line, end="\r")
95
- failed.add(pending_pair)
96
- yield None
97
- continue
98
-
99
- try:
100
- persistence.record_pair(pair)
101
- except Exception as e:
102
- print(f"[DATABASE FAILED - {e!r}] {pair}".ljust(len(log_line)))
103
- print(log_line, end="\r")
104
- failed.add(pending_pair)
105
- yield None
106
- continue
107
-
108
- yield pair
109
-
110
- n_elements, n_pairs = persistence.counts()
111
- log_line = f"Pairs: {n_pairs:,d} Elements: {n_elements:,d}"
112
-
113
- print(f"Pair #{n_pairs}: {str(pair)}")
114
- log_pair(f"Pair #{n_pairs}: {str(pair)}")
115
- if n_elements != last_n_elements:
116
- res_name = pair.result.name
117
- res_emoji = pair.result.emoji
118
- res_id = pair.result.database_id
119
- print(f'New element: {res_emoji} {res_name} (ID {res_id})')
120
- log_pair(f'New element: {res_emoji} {res_name} (ID {res_id})')
121
- if n_pairs % 10000 == 0:
122
- print(f'Reached {n_pairs} pairs. Sending to DB.')
123
- trigger_push()
124
- time.sleep(90)
125
-
126
- def now() -> float:
127
- return time.perf_counter()
128
-
129
-
130
- def scan(allow_numbers: bool, seconds_per_request: float, threads: int) -> None:
131
- threads = max(threads, 1)
132
-
133
- headers: Headers = cloudflare.get_headers()
134
- failed: Failed = set()
135
- futures: Futures = {}
136
-
137
- orders = persistence.PENDING_PAIR_ORDERS.copy()
138
-
139
- with ThreadPoolExecutor(threads) as executor:
140
-
141
- def shutdown() -> None:
142
- executor.shutdown(False, cancel_futures=True)
143
- incomplete_futures = [f for f in futures if not f.done()]
144
- if not incomplete_futures:
145
- return
146
-
147
- n = len(incomplete_futures)
148
-
149
- before = time.perf_counter()
150
- print(f"[SHUTTING DOWN] 0/{n} threads terminated...", end="\r")
151
- for i, _ in enumerate(as_completed(incomplete_futures), 1):
152
- print(f"[SHUTTING DOWN] {i}/{n} threads terminated...", end="\r")
153
- duration = 1000 * (time.perf_counter() - before)
154
- print(f"[SHUTDOWN] {n} thread(s) completed in {duration:.2f} milliseconds.")
155
-
156
- while True:
157
- if len(futures) < threads * 2:
158
- pushed = push_one_future(
159
- executor,
160
- futures,
161
- allow_numbers=allow_numbers,
162
- failed=failed,
163
- headers=headers,
164
- order=orders[0],
165
- )
166
-
167
- if not pushed:
168
- if failed:
169
- failed.clear()
170
- continue
171
-
172
- if not futures:
173
- print("Completed! All possible pairs have been made!")
174
- return
175
-
176
- next_future_at = now() + seconds_per_request
177
- try:
178
- for pair in handle_completed_futures(
179
- futures,
180
- failed=failed,
181
- timeout=next_future_at - now(),
182
- ):
183
- if not pair or pair.result.name.lower() == "nothing":
184
- orders.insert(0, orders.pop())
185
- except TimeoutError:
186
- pass
187
- except Exception as e:
188
- pass
189
-
190
- delay_remaining = next_future_at - now()
191
- if delay_remaining < 0:
192
- continue
193
-
194
- try:
195
- time.sleep(delay_remaining)
196
- except:
197
- shutdown()
198
- raise
199
-
200
-
201
- if __name__ == "__main__":
202
- scan(False, 0.25, 8)
 
1
+ import time
2
+ from concurrent.futures import Future, ThreadPoolExecutor, as_completed
3
+ from typing import Generator, TypeAlias
4
+
5
+ import api
6
+ import cloudflare
7
+ from misc import log_pair, trigger_push
8
+ import db.persistence as persistence
9
+ from models import Pair, PendingPair
10
+
11
+ Failed: TypeAlias = set[PendingPair]
12
+ Futures: TypeAlias = dict[Future[Pair], PendingPair]
13
+ Headers: TypeAlias = dict[str, str]
14
+
15
+
16
+ def valid_pending_pairs(
17
+ allow_numbers: bool,
18
+ *,
19
+ failed: Failed,
20
+ futures: Futures,
21
+ order: persistence.PendingPairOrder,
22
+ ) -> Generator[PendingPair, None, None]:
23
+ for pending_pair in persistence.select_pending_pairs(order):
24
+ if not allow_numbers and pending_pair.numeric:
25
+ continue
26
+
27
+ if pending_pair in failed:
28
+ continue
29
+
30
+ if pending_pair in futures.values():
31
+ continue
32
+
33
+ yield pending_pair
34
+
35
+
36
+ def queue_pair(
37
+ executor: ThreadPoolExecutor,
38
+ pending_pair: PendingPair,
39
+ futures: Futures,
40
+ *,
41
+ headers: Headers,
42
+ ) -> None:
43
+ futures[
44
+ executor.submit(
45
+ api.make_pair_exp_backoff,
46
+ pending_pair,
47
+ headers,
48
+ timeout=5,
49
+ )
50
+ ] = pending_pair
51
+
52
+
53
+ def push_one_future(
54
+ executor: ThreadPoolExecutor,
55
+ futures: Futures,
56
+ *,
57
+ allow_numbers: bool,
58
+ failed: Failed,
59
+ headers: Headers,
60
+ order: persistence.PendingPairOrder,
61
+ ) -> bool:
62
+ for pending_pair in valid_pending_pairs(
63
+ allow_numbers,
64
+ failed=failed,
65
+ futures=futures,
66
+ order=order,
67
+ ):
68
+ queue_pair(executor, pending_pair, futures, headers=headers)
69
+ return True
70
+ return False
71
+
72
+
73
+ def handle_completed_futures(
74
+ futures: Futures,
75
+ *,
76
+ failed: Failed,
77
+ timeout: float,
78
+ ) -> Generator[Pair | None, None, None]:
79
+ n_elements, n_pairs = persistence.counts()
80
+ log_line = f"Pairs: {n_pairs:,d} Elements: {n_elements:,d}"
81
+ last_n_elements = n_elements
82
+ for future in as_completed(futures, timeout=timeout):
83
+ pending_pair = futures.pop(future)
84
+ try:
85
+ pair = future.result()
86
+ except TimeoutError:
87
+ print(f"[API TIMED OUT] {pending_pair}".ljust(len(log_line)))
88
+ print(log_line, end="\r")
89
+ failed.add(pending_pair)
90
+ yield None
91
+ continue
92
+ except Exception as e:
93
+ print(f"[API FAILED - {e!r}] {pending_pair}".ljust(len(log_line)))
94
+ print(log_line, end="\r")
95
+ failed.add(pending_pair)
96
+ yield None
97
+ continue
98
+
99
+ try:
100
+ persistence.record_pair(pair)
101
+ except Exception as e:
102
+ print(f"[DATABASE FAILED - {e!r}] {pair}".ljust(len(log_line)))
103
+ print(log_line, end="\r")
104
+ failed.add(pending_pair)
105
+ yield None
106
+ continue
107
+
108
+ yield pair
109
+
110
+ n_elements, n_pairs = persistence.counts()
111
+ log_line = f"Pairs: {n_pairs:,d} Elements: {n_elements:,d}"
112
+
113
+ print(f"Pair #{n_pairs}: {str(pair)}")
114
+ log_pair(f"Pair #{n_pairs}: {str(pair)}")
115
+ if n_elements != last_n_elements:
116
+ res_name = pair.result.name
117
+ res_emoji = pair.result.emoji
118
+ res_id = pair.result.database_id
119
+ print(f'New element: {res_emoji} {res_name} (ID {res_id})')
120
+ log_pair(f'New element: {res_emoji} {res_name} (ID {res_id})')
121
+ if n_pairs % 10000 == 0:
122
+ print(f'Reached {n_pairs} pairs. Sending to DB.')
123
+ trigger_push(n_pairs)
124
+ time.sleep(90)
125
+
126
+ def now() -> float:
127
+ return time.perf_counter()
128
+
129
+
130
+ def scan(allow_numbers: bool, seconds_per_request: float, threads: int) -> None:
131
+ threads = max(threads, 1)
132
+
133
+ headers: Headers = cloudflare.get_headers()
134
+ failed: Failed = set()
135
+ futures: Futures = {}
136
+
137
+ orders = persistence.PENDING_PAIR_ORDERS.copy()
138
+
139
+ with ThreadPoolExecutor(threads) as executor:
140
+
141
+ def shutdown() -> None:
142
+ executor.shutdown(False, cancel_futures=True)
143
+ incomplete_futures = [f for f in futures if not f.done()]
144
+ if not incomplete_futures:
145
+ return
146
+
147
+ n = len(incomplete_futures)
148
+
149
+ before = time.perf_counter()
150
+ print(f"[SHUTTING DOWN] 0/{n} threads terminated...", end="\r")
151
+ for i, _ in enumerate(as_completed(incomplete_futures), 1):
152
+ print(f"[SHUTTING DOWN] {i}/{n} threads terminated...", end="\r")
153
+ duration = 1000 * (time.perf_counter() - before)
154
+ print(f"[SHUTDOWN] {n} thread(s) completed in {duration:.2f} milliseconds.")
155
+
156
+ while True:
157
+ if len(futures) < threads * 2:
158
+ pushed = push_one_future(
159
+ executor,
160
+ futures,
161
+ allow_numbers=allow_numbers,
162
+ failed=failed,
163
+ headers=headers,
164
+ order=orders[0],
165
+ )
166
+
167
+ if not pushed:
168
+ if failed:
169
+ failed.clear()
170
+ continue
171
+
172
+ if not futures:
173
+ print("Completed! All possible pairs have been made!")
174
+ return
175
+
176
+ next_future_at = now() + seconds_per_request
177
+ try:
178
+ for pair in handle_completed_futures(
179
+ futures,
180
+ failed=failed,
181
+ timeout=next_future_at - now(),
182
+ ):
183
+ if not pair or pair.result.name.lower() == "nothing":
184
+ orders.insert(0, orders.pop())
185
+ except TimeoutError:
186
+ pass
187
+ except Exception as e:
188
+ pass
189
+
190
+ delay_remaining = next_future_at - now()
191
+ if delay_remaining < 0:
192
+ continue
193
+
194
+ try:
195
+ time.sleep(delay_remaining)
196
+ except:
197
+ shutdown()
198
+ raise
199
+
200
+
201
+ if __name__ == "__main__":
202
+ scan(False, 0.25, 8)