Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: asyncio LoopExecutor and async fsspec source #992

Merged
merged 47 commits into from
Oct 19, 2023
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
fbe57df
add LoopExecutor
lobis Oct 15, 2023
7578d35
use LoopExecutor for async-capable file systems
lobis Oct 15, 2023
cfb3c45
use similar submit interface
lobis Oct 15, 2023
c588cea
add comment
lobis Oct 15, 2023
7dd99fb
handle trivial executor
lobis Oct 15, 2023
3cf0866
renamed boolean switch variable
lobis Oct 16, 2023
b7aad49
add some types
lobis Oct 16, 2023
8b218e4
typing
lobis Oct 16, 2023
3624f87
from __future__ import annotations
lobis Oct 16, 2023
62bcd51
wait for loop to start
lobis Oct 16, 2023
3d4435c
do not return self
lobis Oct 16, 2023
96f9a6e
debug
lobis Oct 16, 2023
ae5bcef
debug
lobis Oct 16, 2023
b508df9
Revert "debug"
lobis Oct 16, 2023
803158b
Revert "do not return self"
lobis Oct 16, 2023
549ae7d
remove debugs
lobis Oct 16, 2023
7bc543c
Merge remote-tracking branch 'origin/fsspec-async' into fsspec-async
lobis Oct 16, 2023
202783a
Instantiate async filesystem inside the desired loop
nsmith- Oct 16, 2023
c020ae2
style: pre-commit fixes
pre-commit-ci[bot] Oct 16, 2023
300c8fe
trying to make it work
lobis Oct 16, 2023
69ea245
Revert "trying to make it work"
lobis Oct 16, 2023
4f01c1e
use isinstance
lobis Oct 16, 2023
0a161bc
add assertions for loop fsspec
lobis Oct 17, 2023
c615eca
do not shutdown loop executor and make thread daemon so it stops
lobis Oct 17, 2023
ef1f022
exit file before shutting down loop
lobis Oct 17, 2023
ffbd7cd
use singleton loop executor
lobis Oct 17, 2023
ba7a6bf
do not use thread pool executor, use loop executor instead
lobis Oct 17, 2023
80049db
Revert "do not use thread pool executor, use loop executor instead"
lobis Oct 17, 2023
a6500fc
Merge branch 'main' into fsspec-async
lobis Oct 18, 2023
5c2c9ae
add exceptions for loop executor submit
lobis Oct 18, 2023
49ba900
singleton start / stop, disable shutdown
lobis Oct 18, 2023
d57f499
merge and fix conflicts
lobis Oct 18, 2023
1baa3f4
pass the storage options to new fs
lobis Oct 18, 2023
bcc9e7b
test also when use_threads is false
lobis Oct 18, 2023
5be63b0
simplify loop attachment (is it safe?)
lobis Oct 18, 2023
ff07ab3
only instantiate filesystem once
lobis Oct 18, 2023
1aefc5f
do not shutdown executor on exit (singleton)
lobis Oct 18, 2023
a710ea0
add missing shutdown
lobis Oct 18, 2023
0a1c46b
use fsspec's own loop
lobis Oct 19, 2023
08349cb
add ABC Executor
lobis Oct 19, 2023
0210d0f
simplified if branching
lobis Oct 19, 2023
60b892a
add property to check if async or not
lobis Oct 19, 2023
17ec460
add `use_async` hidden option to allow testing the non-async executor…
lobis Oct 19, 2023
4fc0206
fix return type annotation
lobis Oct 19, 2023
8b96409
Merge branch 'main' into fsspec-async
lobis Oct 19, 2023
b5065ca
Merge branch 'main' into fsspec-async
lobis Oct 19, 2023
bbd4ecc
Merge branch 'main' into fsspec-async
lobis Oct 19, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions src/uproot/source/chunk.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ def chunk(self, start, stop) -> Chunk:
:doc:`uproot.source.chunk.Chunk`.
"""

def chunks(self, ranges, notifications: queue.Queue) -> list[Chunk]:
def chunks(
self, ranges: list[(int, int)], notifications: queue.Queue
) -> list[Chunk]:
"""
Args:
ranges (list of (int, int) 2-tuples): Intervals to fetch
Expand Down Expand Up @@ -164,7 +166,9 @@ def chunk(self, start, stop) -> Chunk:
self._executor.submit(future)
return chunk

def chunks(self, ranges, notifications: queue.Queue) -> list[Chunk]:
def chunks(
self, ranges: list[(int, int)], notifications: queue.Queue
) -> list[Chunk]:
self._num_requests += 1
self._num_requested_chunks += len(ranges)
self._num_requested_bytes += sum(stop - start for start, stop in ranges)
Expand Down
1 change: 1 addition & 0 deletions src/uproot/source/cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
the lowest level of interpretation (numbers, strings, raw arrays, etc.).
"""

from __future__ import annotations

import datetime
import struct
Expand Down
2 changes: 1 addition & 1 deletion src/uproot/source/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ def chunk(self, start, stop) -> uproot.source.chunk.Chunk:
return self._fallback.chunk(start, stop)

def chunks(
self, ranges, notifications: queue.Queue
self, ranges: list[(int, int)], notifications: queue.Queue
) -> list[uproot.source.chunk.Chunk]:
if self._fallback is None:
if self.closed:
Expand Down
35 changes: 27 additions & 8 deletions src/uproot/source/fsspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,22 @@ def __init__(self, file_path, **options):
self._fs, self._file_path = fsspec.core.url_to_fs(file_path, **opts)

if self._use_threads:
self._executor = concurrent.futures.ThreadPoolExecutor(
max_workers=self._num_workers
)
if self._fs.async_impl:
self._executor = uproot.source.futures.LoopExecutor()

# Bind the loop to the filesystem
async def make_fs():
return fsspec.filesystem(
protocol=self._fs.protocol, loop=self._executor.loop
)

self._fs = self._executor.submit(make_fs).result()
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is some of the problematic code. This fix was added by @nsmith- and fixed one of the race conditions. I am not sure if this is still the problem though.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be clear, the issue that was fixed was not a race condition but a initialization routine accessing some data that is only (apparently) valid when accessed from the thread running the event loop. So we schedule the initialization in that loop and wait for it here.

assert self._fs.loop is self._executor.loop, "loop not bound"
assert self._fs.loop.is_running(), "loop not running"
else:
self._executor = concurrent.futures.ThreadPoolExecutor(
max_workers=self._num_workers
)
else:
self._executor = uproot.source.futures.TrivialExecutor()

Expand Down Expand Up @@ -73,8 +86,11 @@ def __enter__(self):

def __exit__(self, exception_type, exception_value, traceback):
self._fh = None
self._executor.shutdown()
self._file.__exit__(exception_type, exception_value, traceback)
# TODO: proper cleanup. Shutting down the loop executor causes problems
# self._executor.shutdown()
if not isinstance(self._executor, uproot.source.futures.LoopExecutor):
self._executor.shutdown()

def chunk(self, start, stop) -> uproot.source.chunk.Chunk:
"""
Expand All @@ -98,7 +114,7 @@ def chunk(self, start, stop) -> uproot.source.chunk.Chunk:
return uproot.source.chunk.Chunk(self, start, stop, future)

def chunks(
self, ranges, notifications: queue.Queue
self, ranges: list[(int, int)], notifications: queue.Queue
) -> list[uproot.source.chunk.Chunk]:
"""
Args:
Expand Down Expand Up @@ -132,10 +148,13 @@ def chunks(
self._num_requested_bytes += sum(stop - start for start, stop in ranges)

chunks = []
# _cat_file is async while cat_file is not
use_async = self._fs.async_impl and isinstance(
self._executor, uproot.source.futures.LoopExecutor
)
cat_file = self._fs._cat_file if use_async else self._fs.cat_file
for start, stop in ranges:
future = self._executor.submit(
self._fs.cat_file, self._file_path, start, stop
)
future = self._executor.submit(cat_file, self._file_path, start, stop)
chunk = uproot.source.chunk.Chunk(self, start, stop, future)
future.add_done_callback(uproot.source.chunk.notifier(chunk, notifications))
chunks.append(chunk)
Expand Down
49 changes: 47 additions & 2 deletions src/uproot/source/futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
These classes implement a *subset* of Python's Future and Executor interfaces.
"""

from __future__ import annotations

import asyncio
import os
import queue
import sys
Expand Down Expand Up @@ -224,7 +227,7 @@ def num_workers(self) -> int:
return len(self._workers)

@property
def workers(self):
def workers(self) -> list[Worker]:
"""
A list of workers (:doc:`uproot.source.futures.Worker`).
"""
Expand Down Expand Up @@ -315,7 +318,7 @@ class ResourceWorker(Worker):
executes.
"""

def __init__(self, work_queue, resource):
def __init__(self, work_queue: queue.Queue, resource):
super().__init__(work_queue)
self._resource = resource

Expand Down Expand Up @@ -469,3 +472,45 @@ def __exit__(self, exception_type, exception_value, traceback):
self.shutdown()
self._resource.__exit__(exception_type, exception_value, traceback)
self._closed = True


class LoopExecutor:
def __repr__(self):
return f"<LoopExecutor at 0x{id(self):012x}>"

def __init__(self):
self._loop = asyncio.new_event_loop()
# TODO: remove daemon=True (or not?)
self._thread = threading.Thread(target=self._run, daemon=True)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we want a daemon here — we want fsspec to be able to gracefully close its resources if possible.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I agree, it should be possible to do this cleanup correctly

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I only added this as part of the quick fix. The daemon together with not shutting down the loop executor made the test pass.

self.start()

def start(self):
self._thread.start()
return self

def shutdown(self):
self._loop.call_soon_threadsafe(self._loop.stop)
self._thread.join()

def _run(self):
asyncio.set_event_loop(self._loop)
try:
self._loop.run_forever()
finally:
self._loop.run_until_complete(self._loop.shutdown_asyncgens())
self._loop.close()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably need to run cleanups here too, and shield these from cancellation

    # Find all running tasks:
    pending = asyncio.all_tasks()

    # Run loop until tasks done:
    loop.run_until_complete(
        asyncio.gather(*[
            asyncio.shield(t) for t in pending
            ]
        )
    )

Copy link
Member

@nsmith- nsmith- Oct 17, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is mostly my fault. I don't understand why there isn't a canned "loop-in-a-thread" in the stdlib or elsewhere. So we're designing one.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nsmith I wouldn't ascribe fault — asyncio is a bit of an ugly beast (I use trio in my personal projects these days), but it's the one we're stuck with.

The intention with asyncio is that you only really have one loop. What we're doing is a bit unconventional, but clearly necessary to handle a blocking high level UX.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, yes! The previous error was an unhandled exception, and I would not be surprised if we have another one.


def __enter__(self):
self.start()
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self.shutdown()

@property
def loop(self) -> asyncio.AbstractEventLoop:
return self._loop

def submit(self, coroutine, *args) -> asyncio.Future:
coroutine_object = coroutine(*args)
return asyncio.run_coroutine_threadsafe(coroutine_object, self._loop)
8 changes: 4 additions & 4 deletions src/uproot/source/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ def task(resource):
return uproot.source.futures.ResourceFuture(task)

@staticmethod
def multifuture(source, ranges, futures, results):
def multifuture(source, ranges: list[(int, int)], futures, results):
"""
Args:
source (:doc:`uproot.source.http.HTTPSource`): The data source.
Expand Down Expand Up @@ -351,7 +351,7 @@ def task(resource):
)
_content_range = re.compile(b"Content-Range: bytes ([0-9]+-[0-9]+)", re.I)

def is_multipart_supported(self, ranges, response):
def is_multipart_supported(self, ranges: list[(int, int)], response) -> bool:
"""
Helper function for :ref:`uproot.source.http.HTTPResource.multifuture`
to check for multipart GET support.
Expand All @@ -368,7 +368,7 @@ def is_multipart_supported(self, ranges, response):
else:
return True

def handle_no_multipart(self, source, ranges, futures, results):
def handle_no_multipart(self, source, ranges: list[(int, int)], futures, results):
"""
Helper function for :ref:`uproot.source.http.HTTPResource.multifuture`
to handle a lack of multipart GET support.
Expand Down Expand Up @@ -605,7 +605,7 @@ def chunk(self, start, stop) -> uproot.source.chunk.Chunk:
return chunk

def chunks(
self, ranges, notifications: queue.Queue
self, ranges: list[(int, int)], notifications: queue.Queue
) -> list[uproot.source.chunk.Chunk]:
if self._fallback is None:
self._num_requests += 1
Expand Down
1 change: 1 addition & 0 deletions src/uproot/source/object.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
has exactly one worker (we can't assume that the object is thread-safe).
"""

from __future__ import annotations

import uproot
import uproot.source.chunk
Expand Down
2 changes: 2 additions & 0 deletions src/uproot/source/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
This module defines a physical layer for remote files, accessed via S3.
"""

from __future__ import annotations

import os
from urllib.parse import parse_qsl, urlparse

Expand Down
2 changes: 1 addition & 1 deletion src/uproot/source/xrootd.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ def chunk(self, start, stop) -> uproot.source.chunk.Chunk:
return uproot.source.chunk.Chunk(self, start, stop, future)

def chunks(
self, ranges, notifications: queue.Queue
self, ranges: list[(int, int)], notifications: queue.Queue
) -> list[uproot.source.chunk.Chunk]:
self._num_requests += 1
self._num_requested_chunks += len(ranges)
Expand Down
Loading