-
Notifications
You must be signed in to change notification settings - Fork 76
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: asyncio
LoopExecutor and async fsspec source
#992
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've highlighted and commented upon the parts that are relevant for this PR (distinguishing them from the typing improvements).
I approve the PR; it should get merged once the test issues are resolved.
src/uproot/source/fsspec.py
Outdated
if self._use_threads: | ||
self._executor = concurrent.futures.ThreadPoolExecutor( | ||
max_workers=self._num_workers | ||
) | ||
if self._fs.async_impl: | ||
self._executor = uproot.source.futures.LoopExecutor() | ||
|
||
# Bind the loop to the filesystem | ||
async def make_fs(): | ||
return fsspec.filesystem( | ||
protocol=self._fs.protocol, loop=self._executor.loop | ||
) | ||
|
||
self._fs = self._executor.submit(make_fs).result() | ||
else: | ||
self._executor = concurrent.futures.ThreadPoolExecutor( | ||
max_workers=self._num_workers | ||
) | ||
else: | ||
self._executor = uproot.source.futures.TrivialExecutor() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The decision to use an event loop, a thread pool, or nothing (synchronous/blocking).
I don't see any reason why we should keep the ThreadPoolExecutor
-based solution around. Maybe there will be some advantage to letting the event loop use multiple threads or each thread has its own event loop, but the main thing that the old ThreadPoolExecutor
-based solution gets wrong is that it does not send all of the requests immediately.
The switch that falls back on ThreadPoolExecutor
is self._fs.async_impl
. If an fsspec backend doesn't implement async, shouldn't we do an event loop anyway? That is, if the function that makes a request makes a request when it is called doesn't return an output until the response is in, couldn't we wrap that up in a future manually?
None of the above should delay this PR, though. It's a next step.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the backend doesn't implement async, then the synchronous interface implies that some thread will block waiting for a request. So if we want any concurrency, we need multiple threads. Event loops will not work any better than a for loop with code that is blocking on IO.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wish I had read this comment before ba7a6bf 🤦🏻♂️ 😄
src/uproot/source/fsspec.py
Outdated
chunks = [] | ||
# _cat_file is async while cat_file is not | ||
use_async = ( | ||
self._fs.async_impl and type(self._executor).__name__ == "LoopExecutor" | ||
) | ||
cat_file = self._fs._cat_file if use_async else self._fs.cat_file | ||
for start, stop in ranges: | ||
future = self._executor.submit( | ||
self._fs.cat_file, self._file_path, start, stop | ||
) | ||
future = self._executor.submit(cat_file, self._file_path, start, stop) | ||
chunk = uproot.source.chunk.Chunk(self, start, stop, future) | ||
future.add_done_callback(uproot.source.chunk.notifier(chunk, notifications)) | ||
chunks.append(chunk) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Submitting all of the requests on what may be the event loop, may be the ThreadPoolExecutor
, and may be a TrivialExecutor
, wrapped up in something that has the interface of an executor.
The extra indirection might not be necessary. If we're going to switch between three (or two?) possible methods, why not do it right here?
Again, that's just something to think about.
src/uproot/source/futures.py
Outdated
class LoopExecutor: | ||
def __repr__(self): | ||
return f"<LoopExecutor at 0x{id(self):012x}>" | ||
|
||
def __init__(self): | ||
self._loop = asyncio.new_event_loop() | ||
self._thread = threading.Thread(target=self._run) | ||
self.start() | ||
|
||
def start(self): | ||
self._thread.start() | ||
return self | ||
|
||
def shutdown(self): | ||
self._loop.call_soon_threadsafe(self._loop.stop) | ||
self._thread.join() | ||
|
||
def _run(self): | ||
asyncio.set_event_loop(self._loop) | ||
try: | ||
self._loop.run_forever() | ||
finally: | ||
self._loop.run_until_complete(self._loop.shutdown_asyncgens()) | ||
self._loop.close() | ||
|
||
def __enter__(self): | ||
self.start() | ||
return self | ||
|
||
def __exit__(self, exc_type, exc_val, exc_tb): | ||
self.shutdown() | ||
|
||
@property | ||
def loop(self) -> asyncio.AbstractEventLoop: | ||
return self._loop | ||
|
||
def submit(self, coroutine, *args) -> asyncio.Future: | ||
coroutine_object = coroutine(*args) | ||
return asyncio.run_coroutine_threadsafe(coroutine_object, self._loop) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Implementation of the LoopExecutor
. This is the part that had the possible race condition. I don't know asyncio well enough to be sure that it's resolved, but CI was sensitive to it, so when everything passes, we'll take that as a sign that it's likely okay.
src/uproot/source/fsspec.py
Outdated
self._executor = uproot.source.futures.LoopExecutor() | ||
|
||
# Bind the loop to the filesystem | ||
async def make_fs(): | ||
return fsspec.filesystem( | ||
protocol=self._fs.protocol, loop=self._executor.loop | ||
) | ||
|
||
self._fs = self._executor.submit(make_fs).result() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is some of the problematic code. This fix was added by @nsmith- and fixed one of the race conditions. I am not sure if this is still the problem though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To be clear, the issue that was fixed was not a race condition but a initialization routine accessing some data that is only (apparently) valid when accessed from the thread running the event loop. So we schedule the initialization in that loop and wait for it here.
Looks like something "out of our control" (fsspec?) is submitting tasks to the loop after it has finished processing the requested tasks (get the chunks). This fails if the loop is not running (we shut it down because we don't expect to do any more work). Why this happens (and why it only happens in some OS...) is a mistery to me. Maybe the pattern of spawning a thread bound to a source has some fundamental flaw? Maybe we should use a single loop for everything async-related, so making the |
For anyone following up: I think I know what the problem is: something is trying to shutdown the executor while the intended tasks are running, this should not happen. Now I only need to find it. |
This makes sense - I was curious as to whether this was happening in #992 (comment) However, we should be blocking on the future that corresponds with the loop result, so I am surprised. I can take a look at that today! |
I realised that fsspec provides and manages it's own loop - exactly what we want, so I trashed the previous implementation of the It should probably be reviewed again since it's gone under significant changes since the last review. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FSSpec provides a loop for all of its IO in a daemon thread that lasts the duration of the program. It is constructed on first use in a threadsafe manner. I think this is a reasonable solution, though I suppose it should implement shutting down async generators and waiting for pending tasks as is done in the standard library asyncio.run
. Perhaps @martindurant knows if this is purposely not implemented.
Edit: since it is a daemon thread it will be terminated abruptly when the main thread exits. There is no way for it to block shutdown to finish things.
This PR adds a new type of executor exclusive to the fsspec source that allows to run tasks asynchronously in a single thread using the fsspec event loop (for async-capable backends).
The list of commits for this PR is very long because the design changed a few times:
First we added a generic
LoopExecutor
capable of submitting tasks to a loop running in a different thread. This class was responsible for managing the lifetime of the loop and thread. There were some issues when submitting fsspec coroutines (I still don't really understand why).Finally I realised that fsspec provides it's own loop, so we avoid having to manage all this and just use the fsspec's implementation of a "loop in a can" (@nsmith- ). This executor is defined in the
fsspec.py
source file as currently it only makes sense to use it from the fsspec source since you have to provide a loop. The executor does nothing, it's just a wrapper for compatibility.When implementing this thin executor I thought that implementing an ABC executor class would be a good idea (IMHO).