-
Notifications
You must be signed in to change notification settings - Fork 9
/
cachy.py
358 lines (276 loc) · 8.16 KB
/
cachy.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
import cachetools, cachetools.func, time, threading, traceback
from flaskthreads import AppContextThread
from flaskthreads.thread_helpers import has_app_context, _app_ctx_stack, APP_CONTEXT_ERROR
from flask import g
import concurrent.futures
from concurrent.futures.thread import _threads_queues
import functools
def get_context(): return _app_ctx_stack.top if has_app_context() else None
class TPEMod(concurrent.futures.ThreadPoolExecutor):
def submit(self, fn, *a, **kw):
context = get_context()
def fnwrapper(*aa, **akw):
if context:
with context:
return fn(*aa, **akw)
else:
return fn(*aa, **akw)
res = super().submit(fnwrapper, *a, **kw)
_threads_queues.clear() # hack to stop joining from preventing ctrl-c
return res
tpe = TPEMod(max_workers=256)
class AppContextThreadMod(threading.Thread):
"""Implements Thread with flask AppContext."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.app_ctx = get_context()
def run(self):
if self.app_ctx:
with self.app_ctx:
super().run()
else:
super().run()
Thread = AppContextThreadMod
# hashkey = cachetools.keys.hashkey
tm = time.monotonic
empty = 0
idle = 1
dispatching = 2
import time, random
ts = time.sleep
rr = random.random
def tsr():ts(rr()*.1)
# buffer that refreshes in the bkgnd
class StaleBuffer:
# f returns what we want to serve
def __init__(self, f, ttr=5, ttl=10): # time to refresh / time to live
self.a = None
self.ts = tm()
self.l = threading.Lock()
self.state = empty
self.f = f
self.ttr = ttr
self.ttl = ttl
assert ttl>ttr
def refresh_threaded(self):
# tsr()
try:
r = self.f()
except Exception as e:
traceback.print_exc()
with self.l:
self.state = idle
else:
with self.l:
self.state = idle
self.a = r
self.ts = tm()
def dispatch_refresh(self):
tpe.submit(self.refresh_threaded)
# t = Thread(target=self.refresh_threaded, daemon=True)
# t.start()
def get(self):
# ttl = self.ttl
# ttr = self.ttr
# f = self.f
# last = self.ts
# now = tm()
# past = now - last
past = tm() - self.ts
state = self.state
# we couldn't afford expensive locking everytime, so
if state==idle and past < self.ttr:
return self.a
elif state==dispatching:
return self.a
else:
with self.l:
# cache is empty
if state == empty:
self.a = self.f()
self.ts = tm()
self.state = idle
# cache is not empty, no dispatch on the way
elif state == idle:
# is cache fresh?
if past > self.ttl:
# too old.
self.a = self.f()
self.ts = tm()
elif past > self.ttr:
# kinda old
self.state = dispatching
self.dispatch_refresh()
# # cache is fresh
# else:
# pass
# elif self.state == 'dispatching':
# pass
# else:
# pass
return self.a
tmg = tm()
def update_tmg():
global tmg
while 1:
tmg = tm()
time.sleep(0.2)
tpe.submit(update_tmg)
def StaleBufferFunctional(f, ttr=10, ttl=1800):
global tmg
a = None
tspttr = 0
tspttl = 0
l = threading.Lock()
state = empty
def update_t():
nonlocal tspttl,tspttr
tspttr = tmg+ttr
tspttl = tmg+ttl
def refresh_threaded():
nonlocal a,state
# tsr()
try:
res = f()
except Exception as e:
traceback.print_exc()
with l:
state = idle
else:
with l:
state = idle
a = res
update_t()
def dispatch_refresh():
tpe.submit(refresh_threaded)
def get():
nonlocal a,state,tspttl,tspttr
# past = tm() - ts
# we couldn't afford expensive locking everytime, so
if state==idle and tmg < tspttr:
# return a
pass
elif state==dispatching:
# return a
pass
else:
with l:
# cache is empty
if state == empty:
a = f()
update_t()
state = idle
# cache is not empty, no dispatch on the way
elif state == idle:
# is cache fresh?
if tmg > tspttl:
# too old.
a = f()
update_t()
elif tmg > tspttr:
# kinda old
state = dispatching
dispatch_refresh()
# # cache is fresh
# else:
# pass
# elif self.state == 'dispatching':
# pass
# else:
# pass
return a
return get
if 1 and __name__ == '__main__':
from commons_static import timethis
def by33():return random.random()+random.random()*111
sb = StaleBuffer(by33, 15, 1000)
sbf = StaleBufferFunctional(by33)
timethis('$by33()')
timethis('$sb.get()')
timethis('$sbf()')
if 0 and __name__ == '__main__':
def kg():
j = 1
def k():
nonlocal j
j+=1
time.sleep(1)
return j
return k
sb = StaleBuffer(kg(), ttr=1, ttl=6)
sbf = StaleBufferFunctional(kg(), ttr=1, ttl=6)
for i in range(10):
print('old',sb.get(), sb.state)
print('new',sbf())
time.sleep(0.3)
print('stalebuf test end')
def stale_cache_old(ttr=3, ttl=6, maxsize=128):
def stale_cache_wrapper(f):
@cachetools.func.lru_cache(maxsize=maxsize)
def get_stale_buffer(*a, **kw):
def sbw():
return f(*a, **kw)
sb = StaleBuffer(sbw, ttr=ttr, ttl=ttl)
return sb
def stale_cache_inner(*a, **kw):
sb = get_stale_buffer(*a, **kw)
return sb.get()
return stale_cache_inner
return stale_cache_wrapper
def stale_cache(ttr=3, ttl=6, maxsize=128):
def stale_cache_wrapped(f):
@functools.lru_cache(maxsize=maxsize)
def get_stale_buffer(*a, **kw):
return StaleBufferFunctional(
lambda:f(*a, **kw),
ttr=ttr,
ttl=ttl,
)
def stale_cache_inner(*a, **kw):
return get_stale_buffer(*a, **kw)()
return stale_cache_inner
return stale_cache_wrapped
if 1 and __name__ == '__main__':
from commons_static import timethis
print('00000'*5)
@stale_cache_old()
def by33():return random.random()+random.random()*111
@stale_cache()
def by34():return random.random()+random.random()*111
timethis('$by33()')
timethis('$by34()')
if 0 and __name__ == '__main__':
def return3():
return 31234019374194
future = tpe.submit(return3)
print(future.result())
j = 1
k = 1
@stale_cache(ttr=1.5)
def a(i):
global j
j+=1
time.sleep(.5)
return i*j
@stale_cache2(ttr=1.5)
def a2(i):
global j
j+=1
time.sleep(.5)
return i*j
@stale_cache(ttr=3)
def b(n):
global k
k+=1
time.sleep(.7)
return k*n
@stale_cache2(ttr=3)
def b2(n):
global k
k+=1
time.sleep(.7)
return k*n
for i in range(20):
print('old',a(3.5), b(6))
print('new',a2(3.5), b2(6))
time.sleep(0.4)