Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENH: Adding multi-threading to algorithm eval node. #343

Merged
merged 8 commits into from
Nov 25, 2019

Conversation

mpu-creare
Copy link
Contributor

@mpu-creare mpu-creare commented Nov 21, 2019

This should allow IO parallelism.

Initial testing looks good. Running https://github.com/creare-com/podpac-drought-monitor/blob/develop/notebooks/Drought-Monitor-Pipeline.ipynb with two additional cells:

%%timeit
with podpac.settings:
    podpac.settings['DEFAULT_CACHE'] = []
    podpac.settings['RAM_CACHE_ENABLED'] = False
    podpac.settings["CACHE_OUTPUT_DEFAUL"] = False
    podpac.settings['MULTITHREADING'] = True
    alg.eval(coords)

2.48 s ± 50.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

%%timeit
with podpac.settings:
    podpac.settings['DEFAULT_CACHE'] = []
    podpac.settings['RAM_CACHE_ENABLED'] = False
    podpac.settings["CACHE_OUTPUT_DEFAUL"] = False
    podpac.settings['MULTITHREADING'] = False
    alg.eval(coords)

5.71 s ± 133 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

suggests a 2x increase in speed.

This is to close #312

@mpu-creare mpu-creare added the enhancement New feature or request label Nov 21, 2019
@mpu-creare mpu-creare requested a review from jmilloy November 21, 2019 16:06
@mpu-creare mpu-creare self-assigned this Nov 21, 2019
@mpu-creare
Copy link
Contributor Author

@jmilloy It didn't seem to make sense to do a mixin -- the structure between this and the compositor was different enough.

I do wonder if we should have a common/global Threadpool, because right now this structure allows threads to create additional threads.. in fact, it probably SHOULD have a common thread pool. Do you agree?

@mpu-creare
Copy link
Contributor Author

Actually, I take it back about the common thread pool. I can see cases where the common thread pool would cause the who thing to hang.

10 of A generates 10 threads B -- takes up entire thread Pool
10 B's try to generate work for an additional 10 Threads, but thread Pool is full.
A waits for B to finish, but work on B cannot start, so whole thing stalls.

Do you agree @jmilloy

@coveralls
Copy link

coveralls commented Nov 21, 2019

Coverage Status

Coverage increased (+0.1%) to 81.994% when pulling 050d316 on feature/async-algorithm-eval into 3c52a2b on develop.

@mpu-creare mpu-creare changed the title WIP: Adding multi-threading to algorithm eval node. ENH: Adding multi-threading to algorithm eval node. Nov 21, 2019
@mpu-creare
Copy link
Contributor Author

Okay @jmilloy this should be ready to merge.
Question for you:

  • Is there a reason you didn't use .close() and .join() on the ThreadPool in the implementation for the compositor?

@jmilloy
Copy link
Collaborator

jmilloy commented Nov 21, 2019

You bring up a serious question. If I set n_threads to 10, then I should expect there to be no more than 10 threads total, not 10 threads per Algorithm node. There needs to be a way to separate the execution into separate threads up to a point, and then force subsequent nodes to stay within their thread.

Secondly, are there caching issues here to be wary of? For example, if two nodes A1 and A2 require the results from the same input node B, when they evaluate serially, A1 will cause B to evaluate, and A2 will just use the cached results from B. If they execute in parallel, B will start evaluation twice, and one will finish first and cache its result, and the second will try to cache the same result. We need to add a test to make sure that this still works. Either the second one should ignore the exception that is raised by there now being a cached result, or, even better, the first one should mark that a cached result will be available so that the second one can wait for it.

@jmilloy
Copy link
Collaborator

jmilloy commented Nov 21, 2019

Is there a reason you didn't use .close() and .join() on the ThreadPool in the implementation for the compositor?

I'm not sure. The get waits for the result, so the join is unnecessary. Maybe we should still use close, though, I'm not sure. When I search for apply_async examples, none of them use close or join.

@mpu-creare
Copy link
Contributor Author

I just spoke with @mls. I think the correct behavior is to create threads untill the common thread pool is not full. If the thread pool is full, then evaluate serially in the current thread.

This should be ok even if you only have 10 threads but need 16 threads at a particular level. At this level, 6 threads will be waiting. One level lower, when a thread tries to create more theads it will notice the threadpool is full, so then proceed serially.

Caching... didn't think about that one. It will be tricky to test I'm guessing... well maybe not. I'll work on it.

So, two things I'll work on:

  • Make the ThreadPool common for the whole application
  • Test caching behavior.

@jmilloy
Copy link
Collaborator

jmilloy commented Nov 21, 2019

Python docs:

Note that the methods of a pool should only ever be used by the process which created it.

I'm not sure if this applies to a thread pool, as well. Should methods of a threadpool only ever be used by the thread which created it? Unfortunately, the threadpool is not really documented anywhere.

I think the correct behavior is to create threads untill the common thread pool is not full.

This seems backwards. Just wanted to check.

If the thread pool is full, then evaluate serially in the current thread.

Yeah. I don't know how hard it will be to keep track of how many threads are available. Even if 9 out of the 10 threads are used, and you need 100 threads, you can use the thread pool. But I think you will have race-conditions if you are not careful. You probably need to lock the thread pool when you are checking if it is full and starting workers so that the first worker doesn't start claiming more workers itself.

if MULTITHREADING:
    pool_lock.acquire()
    if pool_in_use < settings['n_threads']:
        pool_full = False
        results = [pool.apply_async(f, [node]) for node in self._inputs.values()]
        pool_in_use += len(results)
    else:
        pool_full = True
    pool_lock.release()

    if not pool_full:
        for key, res in zip(self._inputs.keys(), results):
            inputs[key] = res.get()
        pool_in_use -= len(results)

if not MULTITHREADING or pool_full:
    <evaluate in serial>

@jmilloy
Copy link
Collaborator

jmilloy commented Nov 21, 2019

Maybe you can get away with something like this (by incrementing the in-use count before starting workers). Even if sibling nodes check if pool is full at the same time and both start claiming workers, they won't interfere with each other. I don't know. Have fun...........

if MULTITHREADING and pool_in_use < settings['n_threads']:
    pool_in_use += len(results)
    results = [pool.apply_async(f, [node]) for node in self._inputs.values()]
    for key, res in zip(self._inputs.keys(), results):
        inputs[key] = res.get()
    pool_in_use -= len(results)
else:
    <evaluate in serial>

@jmilloy
Copy link
Collaborator

jmilloy commented Nov 21, 2019

Could even free up the pool a bit faster like this. Okay, I'll stop now.

if settings['MULTITHREADING'] and pool_in_use < settings['n_threads']:
    pool_in_use += len(results)
    results = [pool.apply_async(f, [node]) for node in self._inputs.values()]
    for key, res in zip(self._inputs.keys(), results):
        inputs[key] = res.get()
        pool_in_use -= 1
else:
    <evaluate in serial>

@mpu-creare
Copy link
Contributor Author

So, based on

Note that the methods of a pool should only ever be used by the process which created it.

It seems like the better practice is to create new threadpools in each thread? That doesn't seem right. I understand the reason for it in the processor pool case, but it doesn't seem correct in the thread case.

What's the general feeling about threads creating threads that create threads, etc... ?

@jmilloy
Copy link
Collaborator

jmilloy commented Nov 21, 2019

I think it is only just that they don't recommend trying to interact with the pool object from child processes, one reason being the deadlock scenario that you brought up. Of course, creating a new pool in the child process and using that would be fine.

I would think it is the same for a thread pool. I think its fine for threads to create threads to create threads, and I think it's fine to create threadpools in threads in the same way. It's just probably not wise to try to re-use a common threadpool across tiers of threads, because of the race conditions and deadlock. Maybe our scenario is simple enough that we can manage it and avoid those issues.

Perhaps a safer version of this, but still flexible, is to create the threadpool ad-hoc but track globally how many threads you have made. Here n_current_threads is a global state variable, initially 0.

if settings['MULTITHREADING'] and n_current_threads < settings['n_threads']:
    n_threads_available = settings['n_threads'] - n_current_threads # don't take more than you can
    n_threads_needed = min(n_threads_available, len(self._inputs)) # don't take more than you need
    n_current_threads += n_threads_needed # "claim" your threads globally
    pool = ThreadPool(n_threads_needed)
    results = [pool.apply_async(f, [node]) for node in self._inputs.values()]
    for key, res in zip(self._inputs.keys(), results):
        inputs[key] = res.get()
    n_current_threads -= n_threads_needed
else:
    <evaluate in serial>

@jmilloy
Copy link
Collaborator

jmilloy commented Nov 21, 2019

I guess you could always allow one worker (which is the same as executing serially...)

if settings['MULTITHREADING']:
    n_threads_available = max(1, settings['n_threads'] - n_current_threads) # don't take more than you can
    n_threads_used = min(n_threads_available, len(self._inputs)) # don't take more than you need
    n_current_threads += n_threads_needed # "claim" your threads globally
    pool = ThreadPool(n_threads_used)
    results = [pool.apply_async(f, [node]) for node in self._inputs.values()]
    for key, res in zip(self._inputs.keys(), results):
        inputs[key] = res.get()
    n_current_threads -= n_threads_used
else:
    <evaluate in serial>

@mpu-creare
Copy link
Contributor Author

I thought about that, it makes the code a little cleaner, but I don't feel great about daisy-chaining threads unnecessarily.

@mpu-creare
Copy link
Contributor Author

mpu-creare commented Nov 21, 2019

That was fun, seems like it works:

%%timeit
with podpac.settings:
    podpac.settings['DEFAULT_CACHE'] = []
    podpac.settings['RAM_CACHE_ENABLED'] = False
    podpac.settings["CACHE_OUTPUT_DEFAUL"] = False
    podpac.settings['MULTITHREADING'] = True
    podpac.settings['N_THREADS'] = 32
    alg.eval(coords)

2.46 s ± 73.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

%%timeit
with podpac.settings:
    podpac.settings['DEFAULT_CACHE'] = []
    podpac.settings['RAM_CACHE_ENABLED'] = False
    podpac.settings["CACHE_OUTPUT_DEFAUL"] = False
    podpac.settings['MULTITHREADING'] = True
    podpac.settings['N_THREADS'] = 4

    alg.eval(coords)

4.36 s ± 313 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

%%timeit
with podpac.settings:
    podpac.settings['DEFAULT_CACHE'] = []
    podpac.settings['RAM_CACHE_ENABLED'] = False
    podpac.settings["CACHE_OUTPUT_DEFAUL"] = False
    podpac.settings['MULTITHREADING'] = False
    alg.eval(coords)

5.49 s ± 345 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

Note that when I give it fewer threads it takes longer again!

@mpu-creare
Copy link
Contributor Author

TODO: Add test for the race condition RE: Caching.

…sure there should be no issues with caching outputs of Nodes. I think this is overkill given the GIL, but should now be pretty safe.
@mpu-creare
Copy link
Contributor Author

Added a race condition test. This is ready to go. Let met know if/when you're happy @jmilloy so we can merge.

Copy link
Collaborator

@jmilloy jmilloy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see any tests for the thread manager. I guess it would be okay not have explicit thread manager tests, because I think we should add an algorithm test that falls back to the serial evaluation when no more threads are available (or did I miss that your test does actually do that?).

I can make the changes that I've proposed here, if you don't have a chance to do that. Let me know if you want me to do that (and which you actually agree with and think are worth it).

podpac/core/algorithm/algorithm.py Outdated Show resolved Hide resolved
podpac/core/algorithm/algorithm.py Outdated Show resolved Hide resolved
podpac/core/algorithm/test/test_algorithm.py Show resolved Hide resolved
podpac/core/algorithm/test/test_algorithm.py Show resolved Hide resolved
podpac/core/managers/multi_threading.py Outdated Show resolved Hide resolved
podpac/core/managers/multi_threading.py Outdated Show resolved Hide resolved
podpac/core/node.py Outdated Show resolved Hide resolved
* Made the ThreadPool creation part of the thread_manager
* Doing serial computation if N_THREADS == 1 (had to release the obtained thread)
* Added the _multi_threaded node attribute to help with testing/debugging multi-threaded execution
* Using the Lock context manager instead of 'acquire/release'
* Added test to stress the number of threads in the execution and checking to make that the correct number of threads cause a cascade to lower levels in the pipeline.
@mpu-creare
Copy link
Contributor Author

okay @jmilloy have another peek. Should now be good to go.

Copy link
Collaborator

@jmilloy jmilloy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great. I like the _multi_threaded debug flag, too.

@mpu-creare mpu-creare merged commit 85716a4 into develop Nov 25, 2019
@mpu-creare mpu-creare deleted the feature/async-algorithm-eval branch November 25, 2019 14:15
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

ENH: Add multithreading to Algorithm Node
3 participants