-
Notifications
You must be signed in to change notification settings - Fork 0
/
__init__.py
132 lines (116 loc) · 4.79 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
from functools import wraps
from typing import Any, Union
from kthread_sleep import sleep
import sys
from collections import deque
import threading
repeatdec_vars = sys.modules[__name__]
repeatdec_vars.active = lambda: threading.active_count()
repeatdec_vars.lock = threading.RLock()
def repeat_func(
f_py: Any = None,
repeat_time: Union[float, int] = 1.0,
variablename: str = "threadresults",
activate_lock: bool = False,
ignore_exceptions: bool = False,
exception_value: Any = None,
max_len_allresults: Union[int, None] = None,
print_results: bool = True,
print_exceptions: bool = True,
execution_limit: int = -1,
max_concurrent_threads: int = -1,
check_max_threads_every_n_seconds: Union[float, int] = 0.015,
) -> Any:
"""Summary of repeat_func.
Args:
f_py (Any)
Description Don't use - reserved for the function
Default None
repeat_time (Union[float,int])
Description Execute each n seconds
Default 1.0
variablename (str)
Description Creates a dict for the results in repeatdec_vars - Use repeatdecorator.repeatdec_vars.variablename to access it
Default "threadresults"
activate_lock (bool)
Description Threading Rlock
Default False
ignore_exceptions (bool)
Description Continue on Exceptions?
Default False
exception_value (Any)
Description Ignored if ignore_exceptions is False
Default None
max_len_allresults (Union[int,None])
Description If None, results are stored in a list (no limit), if int, a deque is used to store
the results. Results can be found here: repeatdecorator.repeatdec_vars.variablename["results"]
Make sure to save the results before calling the function again.
Default None
print_results (bool)
Description Print the return value from each function?
Default True
print_exceptions (bool)
Description Print Exceptions
Default True
execution_limit (int)
Description Stop timer after n executions / -1 = No limit
Default -1
max_concurrent_threads (int)
Description Thread limit / -1 = No limit
Default -1
check_max_threads_every_n_seconds (Union[float,int])
Description Sleep time before checking the current number of threads again.
Default 0.015
Returns:
Any: Description of return value
"""
assert callable(f_py) or f_py is None
setattr(repeatdec_vars, variablename, {})
if max_len_allresults:
getattr(repeatdec_vars, variablename)["results"] = deque([], max_len_allresults)
else:
getattr(repeatdec_vars, variablename)["results"] = []
getattr(repeatdec_vars, variablename)["enabled"] = True
getattr(repeatdec_vars, variablename)["total_count"] = 0
def _decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
if getattr(repeatdec_vars, variablename)["enabled"]:
if (
execution_limit == -1
or execution_limit
> getattr(repeatdec_vars, variablename)["total_count"]
):
while True:
if (
repeatdec_vars.active() < max_concurrent_threads
or max_concurrent_threads == -1
):
t = threading.Timer(repeat_time, wrapper, args, kwargs)
t.start()
getattr(repeatdec_vars, variablename)["total_count"] += 1
break
else:
if not getattr(repeatdec_vars, variablename)["enabled"]:
return
sleep(check_max_threads_every_n_seconds)
try:
if activate_lock:
with repeatdec_vars.lock:
fures = func(*args, **kwargs)
else:
fures = func(*args, **kwargs)
getattr(repeatdec_vars, variablename)["results"].append(fures)
if print_results:
print(fures)
except Exception as fe:
if print_exceptions:
print(fe)
if ignore_exceptions:
getattr(repeatdec_vars, variablename)["results"].append(
exception_value
)
else:
raise fe
return wrapper
return _decorator(f_py) if callable(f_py) else _decorator