-
Notifications
You must be signed in to change notification settings - Fork 1
/
threadtoolkit.py
executable file
·50 lines (45 loc) · 1.44 KB
/
threadtoolkit.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
#coding=utf-8
from multiprocessing import cpu_count
from multiprocessing import Process,Queue,Pool
from functools import partial
import traceback
import time
import os
import wml_utils as wmlu
DEFAULT_THREAD_NR=cpu_count() if cpu_count()<4 else cpu_count()-1
def fn_wraper(datas,fn,is_factory_fn=False):
if is_factory_fn:
fn = fn()
res_queue = []
print(f"Process {os.getpid()}: data nr {len(datas)}.")
for data,i in datas:
try:
res = fn(data)
res_queue.append((i,res))
except:
traceback.print_exc()
print(f"Process {os.getpid()} is finished.")
return res_queue
def par_for_each(data,fn,thread_nr=DEFAULT_THREAD_NR,is_factory_fn=False,timeout=None):
if len(data) == 0:
return []
thread_nr = min(len(data),thread_nr)
pool = Pool(thread_nr)
data = list(zip(data,range(len(data))))
datas = wmlu.list_to_2dlistv2(data,thread_nr)
raw_res = list(pool.map(partial(fn_wraper,fn=fn,is_factory_fn=is_factory_fn),datas))
pool.close()
pool.join()
res_data = []
for res in raw_res:
res_data.extend(res)
res_data = sorted(res_data,key=lambda x:x[0])
_,res_data = zip(*res_data)
return res_data
def par_for_each_no_return(data,fn,thread_nr=DEFAULT_THREAD_NR):
thread_nr = min(len(data),thread_nr)
pool = Pool(thread_nr)
datas = wmlu.list_to_2dlistv2(data,thread_nr)
pool.map(fn,datas)
pool.close()
pool.join()