-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpmap.py
50 lines (41 loc) · 1.8 KB
/
pmap.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
"""Parallel (multi-threaded) map function for python.
Uses multiprocessing.Pool with error-resistant importing. There are two map
functions:
1) pmap(function, iterable) -> rapid fork-based multi-threaded map function.
2) low_memory_pmap(function, iterable) -> a more memory-efficient version
intended for function calls that are individually long & memory-intensive.
"""
import os
from warnings import warn
from pickle import PicklingError
import progressbar
from time import sleep
import multiprocessing
def fake_pmap(*args, **kwargs):
return list(map(*args, **kwargs))
CPUs = multiprocessing.cpu_count()
CHUNKS = 50*CPUs
def pmap(func, Iter, processes=CPUs-1):
with multiprocessing.Pool(processes=processes) as P:
return P.map(func, Iter)
def low_memory_pmap(func, Iter, processes=int(round(CPUs/2)), chunksize=1):
with multiprocessing.Pool(processes=processes) as P:
return [result for result in P.imap(func, Iter)]
def large_iter_pmap(func, Iter, processes=CPUs-1, status_bar=True, nice=10, wait_interval=1):
os.nice(nice)
chunksize = max(1, int(round(len(Iter)/CHUNKS)))
try:
with multiprocessing.Pool(processes=processes) as P:
rs = P.map_async(func, Iter, chunksize=chunksize)
maxval = rs._number_left
bar = progressbar.ProgressBar(
maxval=maxval,
widgets=[progressbar.Percentage(), ' ', progressbar.Bar('=', '[', ']'), ' ', progressbar.ETA()])
while not rs.ready():
sleep(wait_interval)
bar.update(maxval - rs._number_left)
bar.finish()
return rs.get()
except PicklingError:
warn("Lambda functions cannot be Pickled for Parallelization. Using single Process.", RuntimeWarning)
return fake_pmap(func, Iter)