generated from fastai/nbdev_template
-
Notifications
You must be signed in to change notification settings - Fork 276
/
parallel.py
134 lines (117 loc) · 4.82 KB
/
parallel.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# AUTOGENERATED! DO NOT EDIT! File to edit: nbs/03a_parallel.ipynb (unless otherwise specified).
__all__ = ['threaded', 'startthread', 'set_num_threads', 'ThreadPoolExecutor', 'ProcessPoolExecutor', 'parallel',
'run_procs', 'parallel_gen']
# Cell
from .imports import *
from .foundation import *
from .basics import *
from .xtras import *
from functools import wraps
# from contextlib import contextmanager,ExitStack
from multiprocessing import Process, Queue
import concurrent.futures,time
from multiprocessing import Manager
from threading import Thread
# Cell
def threaded(f):
"Run `f` in a thread, and returns the thread"
@wraps(f)
def _f(*args, **kwargs):
res = Thread(target=f, args=args, kwargs=kwargs)
res.start()
return res
return _f
# Cell
def startthread(f):
"Like `threaded`, but start thread immediately"
threaded(f)()
# Cell
def set_num_threads(nt):
"Get numpy (and others) to use `nt` threads"
try: import mkl; mkl.set_num_threads(nt)
except: pass
try: import torch; torch.set_num_threads(nt)
except: pass
os.environ['IPC_ENABLE']='1'
for o in ['OPENBLAS_NUM_THREADS','NUMEXPR_NUM_THREADS','OMP_NUM_THREADS','MKL_NUM_THREADS']:
os.environ[o] = str(nt)
# Cell
def _call(lock, pause, n, g, item):
l = False
if pause:
try:
l = lock.acquire(timeout=pause*(n+2))
time.sleep(pause)
finally:
if l: lock.release()
return g(item)
# Cell
class ThreadPoolExecutor(concurrent.futures.ThreadPoolExecutor):
"Same as Python's ThreadPoolExecutor, except can pass `max_workers==0` for serial execution"
def __init__(self, max_workers=defaults.cpus, on_exc=print, pause=0, **kwargs):
if max_workers is None: max_workers=defaults.cpus
store_attr()
self.not_parallel = max_workers==0
if self.not_parallel: max_workers=1
super().__init__(max_workers, **kwargs)
def map(self, f, items, *args, timeout=None, chunksize=1, **kwargs):
self.lock = Manager().Lock()
g = partial(f, *args, **kwargs)
if self.not_parallel: return map(g, items)
_g = partial(_call, self.lock, self.pause, self.max_workers, g)
try: return super().map(_g, items, timeout=timeout, chunksize=chunksize)
except Exception as e: self.on_exc(e)
# Cell
class ProcessPoolExecutor(concurrent.futures.ProcessPoolExecutor):
"Same as Python's ProcessPoolExecutor, except can pass `max_workers==0` for serial execution"
def __init__(self, max_workers=defaults.cpus, on_exc=print, pause=0, **kwargs):
if max_workers is None: max_workers=defaults.cpus
store_attr()
self.not_parallel = max_workers==0
if self.not_parallel: max_workers=1
super().__init__(max_workers, **kwargs)
def map(self, f, items, *args, timeout=None, chunksize=1, **kwargs):
self.lock = Manager().Lock()
g = partial(f, *args, **kwargs)
if self.not_parallel: return map(g, items)
_g = partial(_call, self.lock, self.pause, self.max_workers, g)
try: return super().map(_g, items, timeout=timeout, chunksize=chunksize)
except Exception as e: self.on_exc(e)
# Cell
try: from fastprogress import progress_bar
except: progress_bar = None
# Cell
def parallel(f, items, *args, n_workers=defaults.cpus, total=None, progress=None, pause=0,
threadpool=False, timeout=None, chunksize=1, **kwargs):
"Applies `func` in parallel to `items`, using `n_workers`"
pool = ThreadPoolExecutor if threadpool else ProcessPoolExecutor
with pool(n_workers, pause=pause) as ex:
r = ex.map(f,items, *args, timeout=timeout, chunksize=chunksize, **kwargs)
if progress and progress_bar:
if total is None: total = len(items)
r = progress_bar(r, total=total, leave=False)
return L(r)
# Cell
def run_procs(f, f_done, args):
"Call `f` for each item in `args` in parallel, yielding `f_done`"
processes = L(args).map(Process, args=arg0, target=f)
for o in processes: o.start()
yield from f_done()
processes.map(Self.join())
# Cell
def _f_pg(obj, queue, batch, start_idx):
for i,b in enumerate(obj(batch)): queue.put((start_idx+i,b))
def _done_pg(queue, items): return (queue.get() for _ in items)
# Cell
def parallel_gen(cls, items, n_workers=defaults.cpus, **kwargs):
"Instantiate `cls` in `n_workers` procs & call each on a subset of `items` in parallel."
if n_workers==0:
yield from enumerate(list(cls(**kwargs)(items)))
return
batches = L(chunked(items, n_chunks=n_workers))
idx = L(itertools.accumulate(0 + batches.map(len)))
queue = Queue()
if progress_bar: items = progress_bar(items, leave=False)
f=partial(_f_pg, cls(**kwargs), queue)
done=partial(_done_pg, queue, items)
yield from run_procs(f, done, L(batches,idx).zip())