-
-
Notifications
You must be signed in to change notification settings - Fork 348
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add thread cache #1545
Merged
Merged
Add thread cache #1545
Changes from 3 commits
Commits
Show all changes
5 commits
Select commit
Hold shift + click to select a range
1d1f871
Add thread cache
njsmith f4a57d5
Remove long, obsolete comment
njsmith 2751323
Swap order of arguments to start_thread_soon
njsmith ca6cc7c
Remove unused/unnecessary lock
njsmith e1b60f6
Merge branch 'master' into thread-cache
njsmith File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
To speed up `trio.to_thread.run_sync`, Trio now caches and re-uses | ||
worker threads. | ||
|
||
And in case you have some exotic use case where you need to spawn | ||
threads manually, but want to take advantage of Trio's cache, you can | ||
do that using the new `trio.lowlevel.start_thread_soon`. |
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,168 @@ | ||
from threading import Thread, Lock | ||
import sys | ||
import outcome | ||
from itertools import count | ||
|
||
# The "thread cache" is a simple unbounded thread pool, i.e., it automatically | ||
# spawns as many threads as needed to handle all the requests its given. Its | ||
# only purpose is to cache worker threads so that they don't have to be | ||
# started from scratch every time we want to delegate some work to a thread. | ||
# It's expected that some higher-level code will track how many threads are in | ||
# use to avoid overwhelming the system (e.g. the limiter= argument to | ||
# trio.to_thread.run_sync). | ||
# | ||
# To maximize sharing, there's only one thread cache per process, even if you | ||
# have multiple calls to trio.run. | ||
# | ||
# Guarantees: | ||
# | ||
# It's safe to call start_thread_soon simultaneously from | ||
# multiple threads. | ||
# | ||
# Idle threads are chosen in LIFO order, i.e. we *don't* spread work evenly | ||
# over all threads. Instead we try to let some threads do most of the work | ||
# while others sit idle as much as possible. Compared to FIFO, this has better | ||
# memory cache behavior, and it makes it easier to detect when we have too | ||
# many threads, so idle ones can exit. | ||
# | ||
# This code assumes that 'dict' has the following properties: | ||
# | ||
# - __setitem__, __delitem__, and popitem are all thread-safe and atomic with | ||
# respect to each other. This is guaranteed by the GIL. | ||
# | ||
# - popitem returns the most-recently-added item (i.e., __setitem__ + popitem | ||
# give you a LIFO queue). This relies on dicts being insertion-ordered, like | ||
# they are in py36+. | ||
|
||
# How long a thread will idle waiting for new work before gives up and exits. | ||
# This value is pretty arbitrary; I don't think it matters too much. | ||
IDLE_TIMEOUT = 10 # seconds | ||
|
||
name_counter = count() | ||
|
||
|
||
class WorkerThread: | ||
def __init__(self, thread_cache): | ||
self._job = None | ||
self._thread_cache = thread_cache | ||
# This Lock is used in an unconventional way. | ||
# | ||
# "Unlocked" means we have a pending job that's been assigned to us; | ||
# "locked" means that we don't. | ||
# | ||
# Initially we have no job, so it starts out in locked state. | ||
self._worker_lock = Lock() | ||
self._worker_lock.acquire() | ||
thread = Thread(target=self._work, daemon=True) | ||
thread.name = f"Trio worker thread {next(name_counter)}" | ||
thread.start() | ||
|
||
def _work(self): | ||
while True: | ||
if self._worker_lock.acquire(timeout=IDLE_TIMEOUT): | ||
# We got a job | ||
fn, deliver = self._job | ||
self._job = None | ||
result = outcome.capture(fn) | ||
# Tell the cache that we're available to be assigned a new | ||
# job. We do this *before* calling 'deliver', so that if | ||
# 'deliver' triggers a new job, it can be assigned to us | ||
# instead of spawning a new thread. | ||
self._thread_cache._idle_workers[self] = None | ||
deliver(result) | ||
else: | ||
# Timeout acquiring lock, so we can probably exit. But, | ||
# there's a race condition: we might be assigned a job *just* | ||
# as we're about to exit. So we have to check. | ||
try: | ||
del self._thread_cache._idle_workers[self] | ||
except KeyError: | ||
# Someone else removed us from the idle worker queue, so | ||
# they must be in the process of assigning us a job - loop | ||
# around and wait for it. | ||
continue | ||
else: | ||
# We successfully removed ourselves from the idle | ||
# worker queue, so no more jobs are incoming; it's safe to | ||
# exit. | ||
return | ||
|
||
|
||
class ThreadCache: | ||
def __init__(self): | ||
self._idle_workers = {} | ||
self._cache_lock = Lock() | ||
|
||
def start_thread_soon(self, fn, deliver): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For what it's worth, yes, this new order seems more intuitive to me, as deliver is called after fn |
||
try: | ||
worker, _ = self._idle_workers.popitem() | ||
except KeyError: | ||
worker = WorkerThread(self) | ||
worker._job = (fn, deliver) | ||
worker._worker_lock.release() | ||
|
||
|
||
THREAD_CACHE = ThreadCache() | ||
|
||
|
||
def start_thread_soon(fn, deliver): | ||
"""Runs ``deliver(outcome.capture(fn))`` in a worker thread. | ||
|
||
Generally ``fn`` does some blocking work, and ``deliver`` delivers the | ||
result back to whoever is interested. | ||
|
||
This is a low-level, no-frills interface, very similar to using | ||
`threading.Thread` to spawn a thread directly. The main difference is | ||
that this function tries to re-use threads when possible, so it can be | ||
a bit faster than `threading.Thread`. | ||
|
||
Worker threads have the `~threading.Thread.daemon` flag set, which means | ||
that if your main thread exits, worker threads will automatically be | ||
killed. If you want to make sure that your ``fn`` runs to completion, then | ||
you should make sure that the main thread remains alive until ``deliver`` | ||
is called. | ||
|
||
It is safe to call this function simultaneously from multiple threads. | ||
|
||
Args: | ||
|
||
fn (sync function): Performs arbitrary blocking work. | ||
|
||
deliver (sync function): Takes the `outcome.Outcome` of ``fn``, and | ||
delivers it. *Must not block.* | ||
|
||
Because worker threads are cached and reused for multiple calls, neither | ||
function should mutate thread-level state, like `threading.local` objects | ||
– or if they do, they should be careful to revert their changes before | ||
returning. | ||
|
||
Note: | ||
|
||
The split between ``fn`` and ``deliver`` serves two purposes. First, | ||
it's convenient, since most callers need something like this anyway. | ||
|
||
Second, it avoids a small race condition that could cause too many | ||
threads to be spawned. Consider a program that wants to run several | ||
jobs sequentially on a thread, so the main thread submits a job, waits | ||
for it to finish, submits another job, etc. In theory, this program | ||
should only need one worker thread. But what could happen is: | ||
|
||
1. Worker thread: First job finishes, and calls ``deliver``. | ||
|
||
2. Main thread: receives notification that the job finished, and calls | ||
``start_thread_soon``. | ||
|
||
3. Main thread: sees that no worker threads are marked idle, so spawns | ||
a second worker thread. | ||
|
||
4. Original worker thread: marks itself as idle. | ||
|
||
To avoid this, threads mark themselves as idle *before* calling | ||
``deliver``. | ||
|
||
Is this potential extra thread a major problem? Maybe not, but it's | ||
easy enough to avoid, and we figure that if the user is trying to | ||
limit how many threads they're using then it's polite to respect that. | ||
|
||
""" | ||
THREAD_CACHE.start_thread_soon(fn, deliver) |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You don't seem to use the
_cache_lock
anywhere. If you remove it, you could even makeTHREAD_CACHE
be the dict directly, and moveThreadCache.start_thread_soon()
into the globalstart_thread_soon()
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch on
_cache_lock
!We currently have a test that instantiates a private
ThreadCache
object (test_race_between_idle_exit_and_job_assignment`), and I don't see how to easily make it work otherwise, so I guess I'll leave the class there for now.