From ce8be10949856637108b1339bfc8f97e4011b62d Mon Sep 17 00:00:00 2001 From: Matt Ueckermann Date: Wed, 20 Nov 2019 16:49:38 -0500 Subject: [PATCH 1/6] ENH: Adding multi-threading to algorithm eval node. This should allow IO parallelism. --- podpac/core/algorithm/algorithm.py | 28 ++++++++++++++++++++++++++-- 1 file changed, 26 insertions(+), 2 deletions(-) diff --git a/podpac/core/algorithm/algorithm.py b/podpac/core/algorithm/algorithm.py index a2bb8e9b7..631a667e8 100644 --- a/podpac/core/algorithm/algorithm.py +++ b/podpac/core/algorithm/algorithm.py @@ -4,6 +4,7 @@ from __future__ import division, unicode_literals, print_function, absolute_import +from multiprocessing.pool import ThreadPool from collections import OrderedDict import inspect @@ -18,6 +19,7 @@ from podpac.core.node import COMMON_NODE_DOC from podpac.core.node import node_eval from podpac.core.utils import common_doc +from podpac.core.settings import settings COMMON_DOC = COMMON_NODE_DOC.copy() @@ -67,8 +69,30 @@ def eval(self, coordinates, output=None): self._requested_coordinates = coordinates inputs = {} - for key, node in self._inputs.items(): - inputs[key] = node.eval(coordinates) + if settings["MULTITHREADING"]: + # Create a function for each thread to execute asynchronously + def f(node): + return node.eval(coordinates) + + # Create pool of size settings["N_THREADS"] + pool = ThreadPool(processes=settings.get("N_THREADS", 10)) + + # Evaluate nodes in parallel/asynchronously + results = [pool.apply_async(f, [node]) for node in self._inputs.values()] + + # This prevents any more tasks from being submitted to the pool, and will close the workers one done + pool.close() + + # This waits for worker processes to exist. + pool.join() + + # Collect the results in dictionary + for key, res in zip(self._inputs.keys(), results): + inputs[key] = res.get() + else: + # Evaluate nodes in serial + for key, node in self._inputs.items(): + inputs[key] = node.eval(coordinates) # accumulate output coordinates coords_list = [Coordinates.from_xarray(a.coords, crs=a.attrs.get("crs")) for a in inputs.values()] From 39dfeaa6368aa25836e7e7dab6d4a5ff385076a6 Mon Sep 17 00:00:00 2001 From: Matt Ueckermann Date: Thu, 21 Nov 2019 11:20:01 -0500 Subject: [PATCH 2/6] TEST: Adding unit test for multithreaded algorithm Nodes. --- podpac/core/algorithm/test/test_algorithm.py | 28 ++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/podpac/core/algorithm/test/test_algorithm.py b/podpac/core/algorithm/test/test_algorithm.py index 3647dc91a..fd8d47adb 100644 --- a/podpac/core/algorithm/test/test_algorithm.py +++ b/podpac/core/algorithm/test/test_algorithm.py @@ -4,6 +4,7 @@ import pytest from collections import OrderedDict +import numpy as np import podpac from podpac.core.algorithm.utility import Arange @@ -40,3 +41,30 @@ def test_base_definition(self): assert "B" in d["inputs"] # TODO value of d['inputs']['A'], etc + + def test_multi_threading(self): + coords = podpac.Coordinates([[1, 2, 3]], ["lat"]) + with podpac.settings: + podpac.settings["MULTITHREADING"] = True + podpac.settings["N_THREADS"] = 8 + podpac.settings["CACHE_OUTPUT_DEFAULT"] = False + podpac.settings["DEFAULT_CACHE"] = [] + podpac.settings["RAM_CACHE_ENABLED"] = False + podpac.settings.set_unsafe_eval(True) + node1 = Arithmetic(A=Arange(), B=Arange(), eqn="A+B") + node2 = Arithmetic(A=node1, B=Arange(), eqn="A+B") + + omt = node2.eval(coords) + + with podpac.settings: + podpac.settings["MULTITHREADING"] = False + podpac.settings["CACHE_OUTPUT_DEFAULT"] = False + podpac.settings["DEFAULT_CACHE"] = [] + podpac.settings["RAM_CACHE_ENABLED"] = False + podpac.settings.set_unsafe_eval(True) + node1 = Arithmetic(A=Arange(), B=Arange(), eqn="A+B") + node2 = Arithmetic(A=node1, B=Arange(), eqn="A+B") + + ost = node2.eval(coords) + + np.testing.assert_array_equal(omt, ost) From 6bff2348c5585c44c017668d489ef6c43ab3a1ce Mon Sep 17 00:00:00 2001 From: Matt Ueckermann Date: Thu, 21 Nov 2019 16:52:57 -0500 Subject: [PATCH 3/6] ENH: Added a ThreadManager to control the total number of threads used by an 'eval'. --- podpac/core/algorithm/algorithm.py | 23 +++++--- podpac/core/compositor.py | 20 ++++++- podpac/core/managers/multi_threading.py | 71 +++++++++++++++++++++++++ podpac/datalib/gfs.py | 6 +-- 4 files changed, 107 insertions(+), 13 deletions(-) create mode 100644 podpac/core/managers/multi_threading.py diff --git a/podpac/core/algorithm/algorithm.py b/podpac/core/algorithm/algorithm.py index 631a667e8..58b060671 100644 --- a/podpac/core/algorithm/algorithm.py +++ b/podpac/core/algorithm/algorithm.py @@ -20,6 +20,7 @@ from podpac.core.node import node_eval from podpac.core.utils import common_doc from podpac.core.settings import settings +from podpac.core.managers.multi_threading import thread_manager COMMON_DOC = COMMON_NODE_DOC.copy() @@ -69,26 +70,32 @@ def eval(self, coordinates, output=None): self._requested_coordinates = coordinates inputs = {} + if settings["MULTITHREADING"]: + n_threads = thread_manager.request_n_threads(len(self._inputs)) + else: + n_threads = 0 + + if settings["MULTITHREADING"] and n_threads > 0: # Create a function for each thread to execute asynchronously def f(node): return node.eval(coordinates) - # Create pool of size settings["N_THREADS"] - pool = ThreadPool(processes=settings.get("N_THREADS", 10)) + # Create pool of size n_threads, note, this may be created from a sub-thread (i.e. not the main thread) + pool = ThreadPool(processes=n_threads) # Evaluate nodes in parallel/asynchronously results = [pool.apply_async(f, [node]) for node in self._inputs.values()] - # This prevents any more tasks from being submitted to the pool, and will close the workers one done - pool.close() - - # This waits for worker processes to exist. - pool.join() - # Collect the results in dictionary for key, res in zip(self._inputs.keys(), results): inputs[key] = res.get() + + # This prevents any more tasks from being submitted to the pool, and will close the workers one done + pool.close() + + # Release these number of threads back to the thread pool + thread_manager.release_n_threads(n_threads) else: # Evaluate nodes in serial for key, node in self._inputs.items(): diff --git a/podpac/core/compositor.py b/podpac/core/compositor.py index 04cea0ec7..ce75f714f 100644 --- a/podpac/core/compositor.py +++ b/podpac/core/compositor.py @@ -20,6 +20,7 @@ from podpac.core.data.datasource import COMMON_DATA_DOC from podpac.core.data.interpolation import interpolation_trait from podpac.core.utils import trait_is_defined +from podpac.core.managers.multi_threading import thread_manager COMMON_COMPOSITOR_DOC = COMMON_DATA_DOC.copy() # superset of COMMON_NODE_DOC @@ -219,21 +220,36 @@ def iteroutputs(self, coordinates): nc = merge_dims([Coordinates(np.atleast_1d(c), dims=[coords_dim]), self.shared_coordinates]) if trait_is_defined(s, "native_coordinates") is False: - s.set_trait('native_coordinates', nc) + s.set_trait("native_coordinates", nc) if settings["MULTITHREADING"]: + n_threads = thread_manager.request_n_threads(len(src_subset)) + else: + n_threads = 0 + + if settings["MULTITHREADING"] and n_threads > 0: # TODO pool of pre-allocated scratch space # TODO: docstring? def f(src): return src.eval(coordinates) - pool = ThreadPool(processes=settings.get("N_THREADS", 10)) + # Create pool of size n_threads, note, this may be created from a sub-thread (i.e. not the main thread) + pool = ThreadPool(processes=n_threads) + + # Evaluate nodes in parallel/asynchronously results = [pool.apply_async(f, [src]) for src in src_subset] + # Yield results as they are being requested, blocking when the thread is not finished for src, res in zip(src_subset, results): yield res.get() # src._output = None # free up memory + # This prevents any more tasks from being submitted to the pool, and will close the workers one done + pool.close() + + # Release these number of threads back to the thread pool + thread_manager.release_n_threads(n_threads) + else: output = None # scratch space for src in src_subset: diff --git a/podpac/core/managers/multi_threading.py b/podpac/core/managers/multi_threading.py new file mode 100644 index 000000000..0c4687eb9 --- /dev/null +++ b/podpac/core/managers/multi_threading.py @@ -0,0 +1,71 @@ +""" +Module for dealing with multi-threaded execution. + +This is used to ensure that the total number of threads specified in the settings is not exceeded. + +""" + +from __future__ import division, unicode_literals, print_function, absolute_import + +from multiprocessing import Lock + +from podpac.core.settings import settings + +DEFAULT_N_THREADS = 10 + + +class ThreadManager(object): + """ This is a singleton class that keeps track of the total number of threads used in an application. + """ + + _lock = Lock() + _n_threads_used = 0 + __instance = None + + def __new__(cls): + if ThreadManager.__instance is None: + ThreadManager.__instance = object.__new__(cls) + return ThreadManager.__instance + + def request_n_threads(self, n): + """ Returns the number of threads allowed for a pool taking into account all other threads application, as + specified by podpac.settings["N_THREADS"]. + + Parameters + ----------- + n : int + Number of threads requested by operation + + Returns + -------- + int + Number of threads a pool may use. Note, this may be less than or equal to n, and may be 0. + """ + self._lock.acquire() + available = max(0, settings.get("N_THREADS", DEFAULT_N_THREADS) - self._n_threads_used) + claimed = min(available, n) + self._n_threads_used += claimed + self._lock.release() + return claimed + + def release_n_threads(self, n): + """ This releases the number of threads specified. + + Parameters + ------------ + n : int + Number of threads to be released + + Returns + -------- + int + Number of threads available after releases 'n' threads + """ + self._lock.acquire() + self._n_threads_used = max(0, self._n_threads_used - n) + available = max(0, settings.get("N_THREADS", DEFAULT_N_THREADS) - self._n_threads_used) + self._lock.release() + return available + + +thread_manager = ThreadManager() diff --git a/podpac/datalib/gfs.py b/podpac/datalib/gfs.py index 1f7969da6..9386d2e11 100644 --- a/podpac/datalib/gfs.py +++ b/podpac/datalib/gfs.py @@ -109,7 +109,7 @@ def init(self): base_time = datetime.datetime.strptime("%s %s" % (self.date, self.hour), "%Y%m%d %H%M") forecast_times = [base_time + datetime.timedelta(hours=int(h)) for h in self.forecasts] tc = Coordinates([[dt.strftime("%Y-%m-%d %H:%M") for dt in forecast_times]], dims=["time"]) - self.set_trait('native_coordinates', merge_dims([nc, tc])) + self.set_trait("native_coordinates", merge_dims([nc, tc])) def get_data(self, coordinates, coordinates_index): data = self.create_output_array(coordinates) @@ -125,13 +125,13 @@ def init(self): now = datetime.datetime.now() # date - self.set_trait('date', now.strftime("%Y%m%d")) + self.set_trait("date", now.strftime("%Y%m%d")) # hour prefix = "%s/%s/%s/" % (self.parameter, self.level, self.date) objs = bucket.objects.filter(Prefix=prefix) hours = set(obj.key.split("/")[3] for obj in objs) if hours: - self.set_trait('hour', max(hours)) + self.set_trait("hour", max(hours)) super(GFSLatest, self).init() From 66b4bd110423ed2a1097bda38c22c04605a707fa Mon Sep 17 00:00:00 2001 From: Matt Ueckermann Date: Fri, 22 Nov 2019 09:56:47 -0500 Subject: [PATCH 4/6] TEST: Added test for cache race condition. Also added a lock to make sure there should be no issues with caching outputs of Nodes. I think this is overkill given the GIL, but should now be pretty safe. --- podpac/core/algorithm/generic.py | 2 ++ podpac/core/algorithm/test/test_algorithm.py | 31 +++++++++++++++++--- podpac/core/managers/multi_threading.py | 1 + podpac/core/node.py | 10 +++++-- 4 files changed, 38 insertions(+), 6 deletions(-) diff --git a/podpac/core/algorithm/generic.py b/podpac/core/algorithm/generic.py index 5c1c8d94d..5d7a8de5f 100644 --- a/podpac/core/algorithm/generic.py +++ b/podpac/core/algorithm/generic.py @@ -97,6 +97,8 @@ def algorithm(self, inputs): f_locals = dict(zip(fields, res)) try: + import numexpr.evaluate # Needed for some systems to get around lazy_module issues + result = ne.evaluate(eqn, f_locals) except (NotImplementedError, ImportError): result = eval(eqn, f_locals) diff --git a/podpac/core/algorithm/test/test_algorithm.py b/podpac/core/algorithm/test/test_algorithm.py index fd8d47adb..74ea008e6 100644 --- a/podpac/core/algorithm/test/test_algorithm.py +++ b/podpac/core/algorithm/test/test_algorithm.py @@ -44,6 +44,9 @@ def test_base_definition(self): def test_multi_threading(self): coords = podpac.Coordinates([[1, 2, 3]], ["lat"]) + node1 = Arithmetic(A=Arange(), B=Arange(), eqn="A+B") + node2 = Arithmetic(A=node1, B=Arange(), eqn="A+B") + with podpac.settings: podpac.settings["MULTITHREADING"] = True podpac.settings["N_THREADS"] = 8 @@ -51,8 +54,6 @@ def test_multi_threading(self): podpac.settings["DEFAULT_CACHE"] = [] podpac.settings["RAM_CACHE_ENABLED"] = False podpac.settings.set_unsafe_eval(True) - node1 = Arithmetic(A=Arange(), B=Arange(), eqn="A+B") - node2 = Arithmetic(A=node1, B=Arange(), eqn="A+B") omt = node2.eval(coords) @@ -62,9 +63,31 @@ def test_multi_threading(self): podpac.settings["DEFAULT_CACHE"] = [] podpac.settings["RAM_CACHE_ENABLED"] = False podpac.settings.set_unsafe_eval(True) - node1 = Arithmetic(A=Arange(), B=Arange(), eqn="A+B") - node2 = Arithmetic(A=node1, B=Arange(), eqn="A+B") ost = node2.eval(coords) np.testing.assert_array_equal(omt, ost) + + def test_multi_threading_cache_race(self): + coords = podpac.Coordinates([np.linspace(0, 1, 1024)], ["lat"]) + with podpac.settings: + podpac.settings["MULTITHREADING"] = True + podpac.settings["N_THREADS"] = 3 + podpac.settings["CACHE_OUTPUT_DEFAULT"] = True + podpac.settings["DEFAULT_CACHE"] = ["ram"] + podpac.settings["RAM_CACHE_ENABLED"] = True + podpac.settings.set_unsafe_eval(True) + A = Arithmetic(A=Arange(), eqn="A**2") + B = Arithmetic(A=Arange(), eqn="A**2") + C = Arithmetic(A=Arange(), eqn="A**2") + D = Arithmetic(A=Arange(), eqn="A**2") + E = Arithmetic(A=Arange(), eqn="A**2") + F = Arithmetic(A=Arange(), eqn="A**2") + + node2 = Arithmetic(A=A, B=B, C=C, D=D, E=E, F=F, eqn="A+B+C+D+E+F") + + om = node2.eval(coords) + + from_cache = [n._from_cache for n in node2.inputs.values()] + + assert sum(from_cache) > 0 diff --git a/podpac/core/managers/multi_threading.py b/podpac/core/managers/multi_threading.py index 0c4687eb9..6cd4fa130 100644 --- a/podpac/core/managers/multi_threading.py +++ b/podpac/core/managers/multi_threading.py @@ -19,6 +19,7 @@ class ThreadManager(object): """ _lock = Lock() + cache_lock = Lock() _n_threads_used = 0 __instance = None diff --git a/podpac/core/node.py b/podpac/core/node.py index 940a9987e..8d6074c37 100644 --- a/podpac/core/node.py +++ b/podpac/core/node.py @@ -25,6 +25,7 @@ from podpac.core.coordinates import Coordinates from podpac.core.style import Style from podpac.core.cache import CacheCtrl, get_default_cache_ctrl, S3CacheStore, make_cache_ctrl +from podpac.core.managers.multi_threading import thread_manager COMMON_NODE_DOC = { @@ -478,11 +479,12 @@ def put_cache(self, data, key, coordinates=None, overwrite=False): NodeException Cached data already exists (and overwrite is False) """ - if not overwrite and self.has_cache(key, coordinates=coordinates): raise NodeException("Cached data already exists for key '%s' and coordinates %s" % (key, coordinates)) + thread_manager.cache_lock.acquire() self.cache_ctrl.put(self, data, key, coordinates=coordinates, update=overwrite) + thread_manager.cache_lock.release() def has_cache(self, key, coordinates=None): """ @@ -500,7 +502,10 @@ def has_cache(self, key, coordinates=None): bool True if there is cached data for this node, key, and coordinates. """ - return self.cache_ctrl.has(self, key, coordinates=coordinates) + thread_manager.cache_lock.aquire() + has_cache = self.cache_ctrl.has(self, key, coordinates=coordinates) + thread_manager.cache_lock.release() + return has_cache def rem_cache(self, key, coordinates=None, mode=None): """ @@ -807,6 +812,7 @@ def wrapper(self, coordinates, output=None): self._requested_coordinates = coordinates key = cache_key cache_coordinates = coordinates.transpose(*sorted(coordinates.dims)) # order agnostic caching + if self.has_cache(key, cache_coordinates) and not self.cache_update: data = self.get_cache(key, cache_coordinates) if output is not None: From 55928e229152aeffed1acba0083e6cb3accc93eb Mon Sep 17 00:00:00 2001 From: Matt Ueckermann Date: Fri, 22 Nov 2019 09:59:05 -0500 Subject: [PATCH 5/6] FIX: Typo. --- podpac/core/node.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/podpac/core/node.py b/podpac/core/node.py index 8d6074c37..22a565831 100644 --- a/podpac/core/node.py +++ b/podpac/core/node.py @@ -502,7 +502,7 @@ def has_cache(self, key, coordinates=None): bool True if there is cached data for this node, key, and coordinates. """ - thread_manager.cache_lock.aquire() + thread_manager.cache_lock.acquire() has_cache = self.cache_ctrl.has(self, key, coordinates=coordinates) thread_manager.cache_lock.release() return has_cache From 050d3168da87944ee743e84fffc89b554a20259f Mon Sep 17 00:00:00 2001 From: Matt Ueckermann Date: Fri, 22 Nov 2019 14:35:12 -0500 Subject: [PATCH 6/6] FIX: Address review comments. * Made the ThreadPool creation part of the thread_manager * Doing serial computation if N_THREADS == 1 (had to release the obtained thread) * Added the _multi_threaded node attribute to help with testing/debugging multi-threaded execution * Using the Lock context manager instead of 'acquire/release' * Added test to stress the number of threads in the execution and checking to make that the correct number of threads cause a cascade to lower levels in the pipeline. --- podpac/core/algorithm/algorithm.py | 9 +++-- podpac/core/algorithm/test/test_algorithm.py | 39 ++++++++++++++++++++ podpac/core/compositor.py | 10 +++-- podpac/core/managers/multi_threading.py | 36 ++++++++++++------ podpac/core/node.py | 13 +++---- 5 files changed, 82 insertions(+), 25 deletions(-) diff --git a/podpac/core/algorithm/algorithm.py b/podpac/core/algorithm/algorithm.py index 58b060671..b5940e40b 100644 --- a/podpac/core/algorithm/algorithm.py +++ b/podpac/core/algorithm/algorithm.py @@ -4,7 +4,6 @@ from __future__ import division, unicode_literals, print_function, absolute_import -from multiprocessing.pool import ThreadPool from collections import OrderedDict import inspect @@ -73,16 +72,18 @@ def eval(self, coordinates, output=None): if settings["MULTITHREADING"]: n_threads = thread_manager.request_n_threads(len(self._inputs)) + if n_threads == 1: + thread_manager.release_n_threads(n_threads) else: n_threads = 0 - if settings["MULTITHREADING"] and n_threads > 0: + if settings["MULTITHREADING"] and n_threads > 1: # Create a function for each thread to execute asynchronously def f(node): return node.eval(coordinates) # Create pool of size n_threads, note, this may be created from a sub-thread (i.e. not the main thread) - pool = ThreadPool(processes=n_threads) + pool = thread_manager.get_thread_pool(processes=n_threads) # Evaluate nodes in parallel/asynchronously results = [pool.apply_async(f, [node]) for node in self._inputs.values()] @@ -96,10 +97,12 @@ def f(node): # Release these number of threads back to the thread pool thread_manager.release_n_threads(n_threads) + self._multi_threaded = True else: # Evaluate nodes in serial for key, node in self._inputs.items(): inputs[key] = node.eval(coordinates) + self._multi_threaded = False # accumulate output coordinates coords_list = [Coordinates.from_xarray(a.coords, crs=a.attrs.get("crs")) for a in inputs.values()] diff --git a/podpac/core/algorithm/test/test_algorithm.py b/podpac/core/algorithm/test/test_algorithm.py index 74ea008e6..646413498 100644 --- a/podpac/core/algorithm/test/test_algorithm.py +++ b/podpac/core/algorithm/test/test_algorithm.py @@ -91,3 +91,42 @@ def test_multi_threading_cache_race(self): from_cache = [n._from_cache for n in node2.inputs.values()] assert sum(from_cache) > 0 + + def test_multi_threading_stress_nthreads(self): + coords = podpac.Coordinates([np.linspace(0, 1, 4)], ["lat"]) + + A = Arithmetic(A=Arange(), eqn="A**0") + B = Arithmetic(A=Arange(), eqn="A**1") + C = Arithmetic(A=Arange(), eqn="A**2") + D = Arithmetic(A=Arange(), eqn="A**3") + E = Arithmetic(A=Arange(), eqn="A**4") + F = Arithmetic(A=Arange(), eqn="A**5") + + node2 = Arithmetic(A=A, B=B, C=C, D=D, E=E, F=F, eqn="A+B+C+D+E+F") + node3 = Arithmetic(A=A, B=B, C=C, D=D, E=E, F=F, G=node2, eqn="A+B+C+D+E+F+G") + + with podpac.settings: + podpac.settings["MULTITHREADING"] = True + podpac.settings["N_THREADS"] = 8 + podpac.settings["CACHE_OUTPUT_DEFAULT"] = False + podpac.settings["DEFAULT_CACHE"] = [] + podpac.settings["RAM_CACHE_ENABLED"] = False + podpac.settings.set_unsafe_eval(True) + + omt = node3.eval(coords) + + assert node3._multi_threaded + assert not node2._multi_threaded + + with podpac.settings: + podpac.settings["MULTITHREADING"] = True + podpac.settings["N_THREADS"] = 9 # 2 threads available after first 7 + podpac.settings["CACHE_OUTPUT_DEFAULT"] = False + podpac.settings["DEFAULT_CACHE"] = [] + podpac.settings["RAM_CACHE_ENABLED"] = False + podpac.settings.set_unsafe_eval(True) + + omt = node3.eval(coords) + + assert node3._multi_threaded + assert node2._multi_threaded diff --git a/podpac/core/compositor.py b/podpac/core/compositor.py index ce75f714f..bae3ce9d2 100644 --- a/podpac/core/compositor.py +++ b/podpac/core/compositor.py @@ -5,7 +5,6 @@ from __future__ import division, unicode_literals, print_function, absolute_import -from multiprocessing.pool import ThreadPool import numpy as np import traitlets as tl @@ -224,17 +223,19 @@ def iteroutputs(self, coordinates): if settings["MULTITHREADING"]: n_threads = thread_manager.request_n_threads(len(src_subset)) + if n_threads == 1: + thread_manager.release_n_threads(n_threads) else: n_threads = 0 - if settings["MULTITHREADING"] and n_threads > 0: + if settings["MULTITHREADING"] and n_threads > 1: # TODO pool of pre-allocated scratch space # TODO: docstring? def f(src): return src.eval(coordinates) # Create pool of size n_threads, note, this may be created from a sub-thread (i.e. not the main thread) - pool = ThreadPool(processes=n_threads) + pool = thread_manager.get_thread_pool(processes=n_threads) # Evaluate nodes in parallel/asynchronously results = [pool.apply_async(f, [src]) for src in src_subset] @@ -249,13 +250,14 @@ def f(src): # Release these number of threads back to the thread pool thread_manager.release_n_threads(n_threads) - + self._multi_threaded = True else: output = None # scratch space for src in src_subset: output = src.eval(coordinates, output) yield output # output[:] = np.nan + self._multi_threaded = False @node_eval @common_doc(COMMON_COMPOSITOR_DOC) diff --git a/podpac/core/managers/multi_threading.py b/podpac/core/managers/multi_threading.py index 6cd4fa130..d8b4a8b58 100644 --- a/podpac/core/managers/multi_threading.py +++ b/podpac/core/managers/multi_threading.py @@ -8,6 +8,7 @@ from __future__ import division, unicode_literals, print_function, absolute_import from multiprocessing import Lock +from multiprocessing.pool import ThreadPool from podpac.core.settings import settings @@ -42,12 +43,11 @@ def request_n_threads(self, n): int Number of threads a pool may use. Note, this may be less than or equal to n, and may be 0. """ - self._lock.acquire() - available = max(0, settings.get("N_THREADS", DEFAULT_N_THREADS) - self._n_threads_used) - claimed = min(available, n) - self._n_threads_used += claimed - self._lock.release() - return claimed + with self._lock: + available = max(0, settings.get("N_THREADS", DEFAULT_N_THREADS) - self._n_threads_used) + claimed = min(available, n) + self._n_threads_used += claimed + return claimed def release_n_threads(self, n): """ This releases the number of threads specified. @@ -62,11 +62,25 @@ def release_n_threads(self, n): int Number of threads available after releases 'n' threads """ - self._lock.acquire() - self._n_threads_used = max(0, self._n_threads_used - n) - available = max(0, settings.get("N_THREADS", DEFAULT_N_THREADS) - self._n_threads_used) - self._lock.release() - return available + with self._lock: + self._n_threads_used = max(0, self._n_threads_used - n) + available = max(0, settings.get("N_THREADS", DEFAULT_N_THREADS) - self._n_threads_used) + return available + + def get_thread_pool(self, processes): + """ Creates a threadpool that can be used to run jobs in parallel. + + Parameters + ----------- + processes : int + The number of threads or workers that will be part of the pool + + Returns + -------- + multiprocessing.ThreadPool + An instance of the ThreadPool class + """ + return ThreadPool(processes=processes) thread_manager = ThreadManager() diff --git a/podpac/core/node.py b/podpac/core/node.py index 22a565831..77b221066 100644 --- a/podpac/core/node.py +++ b/podpac/core/node.py @@ -137,6 +137,8 @@ def _validate_units(self, d): _requested_coordinates = tl.Instance(Coordinates, allow_none=True) _output = tl.Instance(UnitsDataArray, allow_none=True) _from_cache = tl.Bool(allow_none=True, default_value=None) + # Flag that is True if the Node was run multi-threaded, or None if the question doesn't apply + _multi_threaded = tl.Bool(allow_none=True, default_value=None) def __init__(self, **kwargs): """ Do not overwrite me """ @@ -482,9 +484,8 @@ def put_cache(self, data, key, coordinates=None, overwrite=False): if not overwrite and self.has_cache(key, coordinates=coordinates): raise NodeException("Cached data already exists for key '%s' and coordinates %s" % (key, coordinates)) - thread_manager.cache_lock.acquire() - self.cache_ctrl.put(self, data, key, coordinates=coordinates, update=overwrite) - thread_manager.cache_lock.release() + with thread_manager.cache_lock: + self.cache_ctrl.put(self, data, key, coordinates=coordinates, update=overwrite) def has_cache(self, key, coordinates=None): """ @@ -502,10 +503,8 @@ def has_cache(self, key, coordinates=None): bool True if there is cached data for this node, key, and coordinates. """ - thread_manager.cache_lock.acquire() - has_cache = self.cache_ctrl.has(self, key, coordinates=coordinates) - thread_manager.cache_lock.release() - return has_cache + with thread_manager.cache_lock: + return self.cache_ctrl.has(self, key, coordinates=coordinates) def rem_cache(self, key, coordinates=None, mode=None): """