-
-
Notifications
You must be signed in to change notification settings - Fork 344
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Repeatedly running run_in_worker_thread #267
Comments
It's hard to give specific advice without knowing more about your setup, but in general there are a few possible strategies here. First, all else being equal, if you can avoid juggling threads altogether by doing everything inside trio, then that's great -- it makes your life simpler. (That's what th docs about Then if you're using threads, there are two general approaches: you could stay in trio most of the time, and only use Any mechanism for crossing back and forth between threads is going to have some cost, unfortunately -- have you measured it and found it problematic? For example, trio's Why does it matter that all the device operations use the same thread? In your example code, you have two different tasks trying to assign a |
First, thanks for the response! Regarding using a queue, I didn't want to do that because I liked and wanted the exception injection in a task offered by run_in_worker_thread. If I did stuff with a queue and passed the results back I would lose that. Regarding why I want to run them in the same thread, that's because a call may block and I don't want devices to block each other. I.e. I don't mind a device blocking other calls to the same device, but I don't want that to leak between devices. Similarly, I don't want to have more than one thread per device because I'd rather multiple calls to the same device line up than spawn more threads due to the dubious multi-threaded safety of the underlying devices. With regards to doing a Anyway, following is an implementation of what I wanted: import threading
import queue as stdlib_queue
import trio
class Executor(object):
_thread = None
name = 'Executor'
_exec_queue = None
def __del__(self):
self.stop_executor(block=False)
def start_executor(self):
queue = self._exec_queue = stdlib_queue.Queue()
# daemon=True because it might get left behind if we cancel, and in
# this case shouldn't block process exit.
thread = self._thread = threading.Thread(
target=self.worker_thread_fn, name=self.name, daemon=True,
args=(queue, ))
thread.start()
def stop_executor(self, block=True):
if not self._thread:
return
if not self._exec_queue:
self._thread = None
return
self._exec_queue.put('eof', block=block)
if block:
self._thread.join()
self._thread = self._exec_queue = None
def report_back_in_trio_thread_fn(self, task_container):
# This function gets scheduled into the trio run loop to deliver the
# thread's result.
def do_release_then_return_result():
# if canceled, do the cancellation otherwise the result.
if task_container[1] is not None:
task_container[1]()
return task_container[2].unwrap()
result = trio.Result.capture(do_release_then_return_result)
trio.hazmat.reschedule(task_container[0], result)
def worker_thread_fn(self, queue):
# This is the function that runs in the worker thread to do the actual
# work and then schedule the calls to report_back_in_trio_thread_fn
while True:
sync_fn, args, task_container, call_soon = queue.get(block=True)
if sync_fn == 'eof':
return
if task_container[1] is None:
task_container[2] = trio.Result.capture(sync_fn, *args)
try:
call_soon(self.report_back_in_trio_thread_fn, task_container)
except trio.RunFinishedError:
# The entire run finished, so our particular tasks are certainly
# long gone - it must have cancelled. Continue eating the queue.
raise # pass
@trio.hazmat.enable_ki_protection
async def run_in_worker_thread(self, sync_fn, *args, cancellable=False):
await trio.hazmat.yield_if_cancelled()
# Holds a reference to the task that's blocked in this function waiting
# for the result as well as to the cancel callback and the result
# (when not canceled).
if self._thread is None:
self.start_executor()
task_container = [trio.current_task(), None, None]
self._exec_queue.put(
(sync_fn, args, task_container,
trio.hazmat.current_call_soon_thread_and_signal_safe()))
def abort(raise_cancel):
if cancellable:
task_container[1] = raise_cancel
return trio.hazmat.Abort.FAILED
return await trio.hazmat.yield_indefinitely(abort)
if __name__ == '__main__':
executor = Executor()
def get_value(value):
return value
async def run_sequence():
for i in range(10):
print(await executor.run_in_worker_thread(get_value, i))
async def execute_thread(i):
print(await executor.run_in_worker_thread(get_value, i))
async def spawner():
async with trio.open_nursery() as nursery:
for i in range(10):
nursery.spawn(execute_thread, i)
trio.run(run_sequence)
print('done run_sequence')
trio.run(spawner)
print('done spawner') Which when run prints
Here's a timeit result compared to the trio one: from timeit import timeit
import trio
from src.playground import Executor
executor = Executor()
def get_value(value):
return value
async def run_sequence():
for i in range(1000):
(await executor.run_in_worker_thread(get_value, i))
async def run_sequence_trio():
for i in range(1000):
(await trio.run_in_worker_thread(get_value, i))
print(timeit(stmt='trio.run(run_sequence)', setup='from __main__ import executor, trio, run_sequence, run_sequence_trio', number=10))
print(timeit(stmt='trio.run(run_sequence_trio)', setup='from __main__ import executor, trio, run_sequence, run_sequence_trio', number=10)) prints:
Now, back to my original request, I was hoping that I (or trio) could implement the I really appreciate the level of detail of the docs written in trio :) but I do think the it could benefit of some trimming because it is somewhat difficult to get a broad picture of the trio api. Maybe for now just listing all the api signatures would provide the best of both worlds. Sphinx doesn't make it much easy to collapse api docs (I really wish they had a template that did that) so you can just see the signatures. Although I guess it was mostly the But I started out with your The reason why I think this executor example would be useful is not only my use case that I described above, but with some modifications you can use it to schedule an event in an existing event loop framework (e.g. when you're using some gui framework that provides one) and have a trio task proxy it fully. This is really what I wanted; a high level bullet proof way of proxying an event scheduled in another even loop with a trio task. E.g. have a trio task interface that a event loop designer can call when their event is exec or canceled or creates an exception that passes it on to the trio task. Then, a user scheduling a callback with the framework's event loop can also await it. This would be great for compatibility reasons. Looking at my executor class, it is not "that" hard to implement, especially working by example. But there is enough things one needs to look out for, that having such an example pattern from which to start from would be beneficial for framework authors. I suppose |
It sounds like using a
That's pretty interesting. What platform did you use to measure this? Windows?
Just because if the operation has already completed successfully, then it wasn't cancelled :-). As much as possible for the low-level trio operations I try to maintain the rule that cancelled means that the operation had no effect. Like if a timeout expires but before you can react to the timeout, the operation completes successfully... why throw away that work and pretend it failed? it's already done :-).
Right, this is definitely interesting! It does require some case-by-case work of course though. You don't actually want to use a queue to communicate between trio and kivy, for example, because you don't want to do blocking queue operations in either thread, plus they both have a solid set of non-blocking tools to work with. Instead you should use Regarding your comments on docs and examples: the feedback is definitely appreciated. I may not be able to do too much about it in the immediate future because of limited hours in a day and because I'm still trying to focus on getting trio feature-complete :-). But it's useful to hear in any case. (And if you have more specific suggestions or get an itch and decide to submit some edits as a PR, that'd be very welcome too.) Hope that helps! |
Yeah, I could have a lock per device and then not care which threads they run on as long as there are enough threads. But I do like the design of thread per device so everything is self contained and I don't have to worry about devices blocking each other if there are not enough threads or spawning more threads that really needed.
Yes, I'm on windows 10.
Right, I couldn't be sure where exactly we were when cancel was called. I.e. when cancel is called in the trio thread the other thread may or may not have executed the task (I check for canceling in the second thread before executing the task). It would be a lot more complicated to try to make sure to actually only raise the cancel exception if it was cancelled before the second thread executed the task. And I guess for my use case, if I canceled one device, other devices may have also been canceled and closed and had other side effects. So imagine that I don't raise cancel until the next checkpoint and continue processing the result from the second thread because we pretended it wasn't cancelled. Now, I'm in state though where the overall program state is canceled, yet I still continue to process the result for this task as if it isn't, which may cause issues. I'd rather once cancel is called, nothing proceeds. Of course with well designed code this may not be an issue, but still, I'd just rather avoid this issue by canceling aggressively. I'm closing the issue as it's pretty much resolved for me. |
I have been thinking how to upgrade some of my tools to use async, specifically trio, now that I switched to py3 (finally). However, I have run into some conceptual threading issues that I'm not sure the best way to solve.
My tool runs scientific experiments and currently does it all in a callback framework (kivy). So e.g. it runs through a series of stages that does various things. More importantly, it opens a bunch of devices in secondary threads which continuously read or write to them in a blocking manner and update the main thread.
Trio is perfect for the stages part because I can just await each stage to finish in a tree like manner (stages have sub-stages etc). The devices part is where it's more difficult. Obviously I can run
run_in_worker_thread
for each read and write, but the problem is two-fold, a) the cost of spawning a thread for every r/w and b) I would like to run all operations on the same device in the same thread.In the ideal world it'd look like something like this in trio. For every device,
Then later someplace:
And elsewhere at the same time:
I guess this is not that different from having trio interfacing with an event loop running in its own (or even the main) thread. I had initially written something like that using the python async library callbacks so I could await in the main thread a task running in an event loop in a different thread before I found trio. I can see why that may not mash with your idea that in the long term there would be no blocking calls so there would be no need for this. And also that interfacing with a second (and third) event loop which would require callbacks makes things ugly, rather than running things in a very temporary and singular
run_in_worker_thread
call. But I'm not sure how to make my use case work in trio given the two issues I mentioned.The text was updated successfully, but these errors were encountered: