Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: asyncio LoopExecutor and async fsspec source #992

Merged
merged 47 commits into from
Oct 19, 2023
Merged
Show file tree
Hide file tree
Changes from 41 commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
fbe57df
add LoopExecutor
lobis Oct 15, 2023
7578d35
use LoopExecutor for async-capable file systems
lobis Oct 15, 2023
cfb3c45
use similar submit interface
lobis Oct 15, 2023
c588cea
add comment
lobis Oct 15, 2023
7dd99fb
handle trivial executor
lobis Oct 15, 2023
3cf0866
renamed boolean switch variable
lobis Oct 16, 2023
b7aad49
add some types
lobis Oct 16, 2023
8b218e4
typing
lobis Oct 16, 2023
3624f87
from __future__ import annotations
lobis Oct 16, 2023
62bcd51
wait for loop to start
lobis Oct 16, 2023
3d4435c
do not return self
lobis Oct 16, 2023
96f9a6e
debug
lobis Oct 16, 2023
ae5bcef
debug
lobis Oct 16, 2023
b508df9
Revert "debug"
lobis Oct 16, 2023
803158b
Revert "do not return self"
lobis Oct 16, 2023
549ae7d
remove debugs
lobis Oct 16, 2023
7bc543c
Merge remote-tracking branch 'origin/fsspec-async' into fsspec-async
lobis Oct 16, 2023
202783a
Instantiate async filesystem inside the desired loop
nsmith- Oct 16, 2023
c020ae2
style: pre-commit fixes
pre-commit-ci[bot] Oct 16, 2023
300c8fe
trying to make it work
lobis Oct 16, 2023
69ea245
Revert "trying to make it work"
lobis Oct 16, 2023
4f01c1e
use isinstance
lobis Oct 16, 2023
0a161bc
add assertions for loop fsspec
lobis Oct 17, 2023
c615eca
do not shutdown loop executor and make thread daemon so it stops
lobis Oct 17, 2023
ef1f022
exit file before shutting down loop
lobis Oct 17, 2023
ffbd7cd
use singleton loop executor
lobis Oct 17, 2023
ba7a6bf
do not use thread pool executor, use loop executor instead
lobis Oct 17, 2023
80049db
Revert "do not use thread pool executor, use loop executor instead"
lobis Oct 17, 2023
a6500fc
Merge branch 'main' into fsspec-async
lobis Oct 18, 2023
5c2c9ae
add exceptions for loop executor submit
lobis Oct 18, 2023
49ba900
singleton start / stop, disable shutdown
lobis Oct 18, 2023
d57f499
merge and fix conflicts
lobis Oct 18, 2023
1baa3f4
pass the storage options to new fs
lobis Oct 18, 2023
bcc9e7b
test also when use_threads is false
lobis Oct 18, 2023
5be63b0
simplify loop attachment (is it safe?)
lobis Oct 18, 2023
ff07ab3
only instantiate filesystem once
lobis Oct 18, 2023
1aefc5f
do not shutdown executor on exit (singleton)
lobis Oct 18, 2023
a710ea0
add missing shutdown
lobis Oct 18, 2023
0a1c46b
use fsspec's own loop
lobis Oct 19, 2023
08349cb
add ABC Executor
lobis Oct 19, 2023
0210d0f
simplified if branching
lobis Oct 19, 2023
60b892a
add property to check if async or not
lobis Oct 19, 2023
17ec460
add `use_async` hidden option to allow testing the non-async executor…
lobis Oct 19, 2023
4fc0206
fix return type annotation
lobis Oct 19, 2023
8b96409
Merge branch 'main' into fsspec-async
lobis Oct 19, 2023
b5065ca
Merge branch 'main' into fsspec-async
lobis Oct 19, 2023
bbd4ecc
Merge branch 'main' into fsspec-async
lobis Oct 19, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 30 additions & 9 deletions src/uproot/source/fsspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from __future__ import annotations

import asyncio
import concurrent.futures
import queue

Expand All @@ -23,6 +24,7 @@ class FSSpecSource(uproot.source.chunk.Source):
"""

def __init__(self, file_path: str, **options):
import fsspec.asyn
import fsspec.core

default_options = uproot.reading.open.defaults
Expand All @@ -33,16 +35,21 @@ def __init__(self, file_path: str, **options):

# Remove uproot-specific options (should be done earlier)
exclude_keys = set(default_options.keys())
opts = {k: v for k, v in options.items() if k not in exclude_keys}
storage_options = {k: v for k, v in options.items() if k not in exclude_keys}

self._fs, self._file_path = fsspec.core.url_to_fs(file_path, **opts)
protocol = fsspec.core.split_protocol(file_path)[0]
fs_has_async_impl = fsspec.get_filesystem_class(protocol=protocol).async_impl

if self._use_threads:
if not self._use_threads:
self._executor = uproot.source.futures.TrivialExecutor()
elif fs_has_async_impl:
self._executor = FSSpecLoopExecutor(fsspec.asyn.get_loop())
else:
self._executor = concurrent.futures.ThreadPoolExecutor(
max_workers=self._num_workers
)
else:
self._executor = uproot.source.futures.TrivialExecutor()

self._fs, self._file_path = fsspec.core.url_to_fs(file_path, **storage_options)

# TODO: set mode to "read-only" in a way that works for all filesystems
self._file = self._fs.open(self._file_path)
Expand Down Expand Up @@ -73,8 +80,8 @@ def __enter__(self):

def __exit__(self, exception_type, exception_value, traceback):
self._fh = None
self._executor.shutdown()
self._file.__exit__(exception_type, exception_value, traceback)
lobis marked this conversation as resolved.
Show resolved Hide resolved
self._executor.shutdown()

def chunk(self, start: int, stop: int) -> uproot.source.chunk.Chunk:
"""
Expand Down Expand Up @@ -132,10 +139,11 @@ def chunks(
self._num_requested_bytes += sum(stop - start for start, stop in ranges)

chunks = []
# _cat_file is async while cat_file is not
use_async = isinstance(self._executor, FSSpecLoopExecutor)
cat_file = self._fs._cat_file if use_async else self._fs.cat_file
for start, stop in ranges:
future = self._executor.submit(
self._fs.cat_file, self._file_path, start, stop
)
future = self._executor.submit(cat_file, self._file_path, start, stop)
chunk = uproot.source.chunk.Chunk(self, start, stop, future)
future.add_done_callback(uproot.source.chunk.notifier(chunk, notifications))
chunks.append(chunk)
Expand All @@ -155,3 +163,16 @@ def closed(self) -> bool:
otherwise.
"""
return False


class FSSpecLoopExecutor(uproot.source.futures.Executor):
def __init__(self, loop: asyncio.AbstractEventLoop):
self.loop = loop

def submit(self, coroutine, *args) -> asyncio.Future:
lobis marked this conversation as resolved.
Show resolved Hide resolved
if not asyncio.iscoroutinefunction(coroutine):
raise TypeError("loop executor can only submit coroutines")
if not self.loop.is_running():
raise RuntimeError("cannot submit coroutine while loop is not running")
coroutine_object = coroutine(*args)
return asyncio.run_coroutine_threadsafe(coroutine_object, self.loop)
48 changes: 25 additions & 23 deletions src/uproot/source/futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import sys
import threading
import time
from abc import ABC, abstractmethod


def delayed_raise(exception_class, exception_value, traceback):
Expand All @@ -37,6 +38,25 @@ def delayed_raise(exception_class, exception_value, traceback):
raise exception_value.with_traceback(traceback)


class Executor(ABC):
def __repr__(self):
return f"<{self.__class__.__name__} at 0x{id(self):012x}>"

@abstractmethod
def submit(self, task, *args):
"""
Submit a task to be run in the background and return a Future object
representing that task.
"""
raise NotImplementedError

def shutdown(self, wait: bool = True):
"""
Stop the executor and free its resources.
"""
return


##################### use-case 1: trivial Futures/Executor (satisfying formalities)


Expand All @@ -62,28 +82,20 @@ def result(self, timeout=None):
return self._result


class TrivialExecutor:
class TrivialExecutor(Executor):
"""
Formally satisfies the interface for a
:doc:`uproot.source.futures.ThreadPoolExecutor`, but the
:ref:`uproot.source.futures.TrivialExecutor.submit` method computes its
``task`` synchronously.
"""

def __repr__(self):
return f"<TrivialExecutor at 0x{id(self):012x}>"

def submit(self, task, *args):
"""
Immediately runs ``task(*args)``.
"""
return TrivialFuture(task(*args))

def shutdown(self, wait: bool = True):
"""
Does nothing, since this object does not have threads to stop.
"""


##################### use-case 2: Python-like Futures/Executor for compute

Expand Down Expand Up @@ -176,7 +188,7 @@ def run(self):
future._run()


class ThreadPoolExecutor:
class ThreadPoolExecutor(Executor):
"""
Args:
max_workers (None or int): The maximum number of workers to start.
Expand Down Expand Up @@ -207,9 +219,7 @@ def __init__(self, max_workers: int | None = None):
worker.start()

def __repr__(self):
return "<ThreadPoolExecutor ({} workers) at 0x{:012x}>".format(
len(self._workers), id(self)
)
return f"<{self.__class__.__name__} ({len(self._workers)} workers) at 0x{id(self):012x}>"

@property
def max_workers(self) -> int:
Expand Down Expand Up @@ -317,7 +327,7 @@ class ResourceWorker(Worker):
executes.
"""

def __init__(self, work_queue, resource):
def __init__(self, work_queue: queue.Queue, resource):
super().__init__(work_queue)
self._resource = resource

Expand Down Expand Up @@ -368,11 +378,6 @@ def __init__(self, resources):
for worker in self._workers:
worker.start()

def __repr__(self):
return "<ResourceThreadPoolExecutor ({} workers) at 0x{:012x}>".format(
len(self._workers), id(self)
)

def submit(self, future):
"""
Pass the ``task`` onto the workers'
Expand Down Expand Up @@ -433,10 +438,7 @@ def __init__(self, resource):
self._resource = resource
self._closed = False

def __repr__(self):
return f"<ResourceTrivialExecutor at 0x{id(self):012x}>"

def submit(self, future):
def submit(self, future: ResourceFuture) -> ResourceFuture:
"""
Pass the ``task`` as a
:doc:`uproot.source.futures.ResourceFuture` so that it will be
Expand Down
14 changes: 11 additions & 3 deletions tests/test_0692_fsspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,22 @@ def test_open_fsspec_s3():
@pytest.mark.network
@pytest.mark.xrootd
@pytest.mark.parametrize(
"handler",
[uproot.source.fsspec.FSSpecSource, uproot.source.xrootd.XRootDSource, None],
"handler, use_threads",
[
(uproot.source.fsspec.FSSpecSource, True),
(uproot.source.fsspec.FSSpecSource, False),
(uproot.source.xrootd.XRootDSource, True),
(uproot.source.xrootd.XRootDSource, False),
(None, True),
(None, False),
],
)
def test_open_fsspec_xrootd(handler):
def test_open_fsspec_xrootd(handler, use_threads):
pytest.importorskip("XRootD")
with uproot.open(
"root://eospublic.cern.ch//eos/root-eos/cms_opendata_2012_nanoaod/Run2012B_DoubleMuParked.root",
handler=handler,
use_threads=use_threads,
) as f:
data = f["Events/run"].array(library="np", entry_stop=20)
assert len(data) == 20
Expand Down
Loading