From 546b0cbb8595d1775f53f938c9bc6bda6c2d5931 Mon Sep 17 00:00:00 2001 From: Luis Antonio Obis Aparicio <35803280+lobis@users.noreply.github.com> Date: Wed, 4 Oct 2023 14:15:03 -0500 Subject: [PATCH] feat: adding a very basic FSSpecSource (#967) * Initial fsspec source implementation * Some basic test * Actually add the test * Skip test if XRootD module not available * Keep a file handle active to speed up individual chunk requests * add `s3_handler`, `use_threads` to exclusion list for fsspec options * remove aiohttp * add aiohttp * add comment, refactor exclusion * add fsspec to dependencies * move fsspec dependencies to dev instead of test * load default options * fsspec open mode set to "r" from "rb" to avoid error on github spec * add s3fs to test deps * rename fsspec test (replace hyphen with underscore) * add tests for other protocols * remove read-only from fsspec open * add fsspec dependencies in test insted of dev * add skip to github test * remove top level skip * attempt to fix ci * add back fsspec * remove commented s3fs * Address https://github.com/scikit-hep/uproot5/pull/967#discussion_r1346179136 --------- Co-authored-by: Nick Smith --- pyproject.toml | 3 + src/uproot/source/fsspec.py | 126 ++++++++++++++++++++++++++++++++++++ tests/test_0692_fsspec.py | 73 +++++++++++++++++++++ 3 files changed, 202 insertions(+) create mode 100644 src/uproot/source/fsspec.py create mode 100644 tests/test_0692_fsspec.py diff --git a/pyproject.toml b/pyproject.toml index 78eba667e..b0da5b2c1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -59,6 +59,9 @@ dev = [ test = [ "lz4", "minio", + "aiohttp", + "fsspec", + "fsspec-xrootd", "pytest>=6", "pytest-timeout", "pytest-rerunfailures", diff --git a/src/uproot/source/fsspec.py b/src/uproot/source/fsspec.py new file mode 100644 index 000000000..4ebb8889b --- /dev/null +++ b/src/uproot/source/fsspec.py @@ -0,0 +1,126 @@ +import uproot.source.chunk + + +class FSSpecSource(uproot.source.chunk.Source): + """ + Args: + file_path (str): A URL for the file to open. + **kwargs (dict): any extra arguments to be forwarded to the particular + FileSystem instance constructor. This might include S3 access keys, + or HTTP headers, etc. + + A :doc:`uproot.source.chunk.Source` that uses FSSpec's cat_ranges feature + to get many chunks in one request. + """ + + def __init__(self, file_path, **kwargs): + import fsspec.core + + # TODO: is timeout always valid? + + # Remove uproot-specific options (should be done earlier) + exclude_keys = set(uproot.reading.open.defaults.keys()) + opts = {k: v for k, v in kwargs.items() if k not in exclude_keys} + + self._fs, self._file_path = fsspec.core.url_to_fs(file_path, **opts) + # TODO: set mode to "read-only" in a way that works for all filesystems + self._file = self._fs.open(self._file_path) + self._fh = None + self._num_requests = 0 + self._num_requested_chunks = 0 + self._num_requested_bytes = 0 + self.__enter__() + + def __repr__(self): + path = repr(self._file_path) + if len(self._file_path) > 10: + path = repr("..." + self._file_path[-10:]) + return f"<{type(self).__name__} {path} at 0x{id(self):012x}>" + + def __enter__(self): + self._fh = self._file.__enter__() + return self + + def __exit__(self, exception_type, exception_value, traceback): + self._fh = None + self._file.__exit__(exception_type, exception_value, traceback) + + def chunk(self, start, stop): + """ + Args: + start (int): Seek position of the first byte to include. + stop (int): Seek position of the first byte to exclude + (one greater than the last byte to include). + + Request a byte range of data from the file as a + :doc:`uproot.source.chunk.Chunk`. + """ + self._num_requests += 1 + self._num_requested_chunks += 1 + self._num_requested_bytes += stop - start + if self._fh: + self._fh.seek(start) + data = self._fh.read(stop - start) + else: + data = self._fs.cat_file(self._file_path, start, stop) + future = uproot.source.futures.TrivialFuture(data) + return uproot.source.chunk.Chunk(self, start, stop, future) + + def chunks(self, ranges, notifications): + """ + Args: + ranges (list of (int, int) 2-tuples): Intervals to fetch + as (start, stop) pairs in a single request, if possible. + notifications (``queue.Queue``): Indicator of completed + chunks. After each gets filled, it is ``put`` on the + queue; a listener should ``get`` from this queue + ``len(ranges)`` times. + + Request a set of byte ranges from the file. + + This method has two outputs: + + * The method returns a list of unfilled + :doc:`uproot.source.chunk.Chunk` objects, which get filled + in a background thread. If you try to read data from an + unfilled chunk, it will wait until it is filled. + * The method also puts the same :doc:`uproot.source.chunk.Chunk` + objects onto the ``notifications`` queue as soon as they are + filled. + + Reading data from chunks on the queue can be more efficient than + reading them from the returned list. The total reading time is the + same, but work on the filled chunks can be better parallelized if + it is triggered by already-filled chunks, rather than waiting for + chunks to be filled. + """ + self._num_requests += 1 + self._num_requested_chunks += len(ranges) + self._num_requested_bytes += sum(stop - start for start, stop in ranges) + data = self._fs.cat_ranges( + [self._file_path] * len(ranges), + [start for start, _ in ranges], + [stop for _, stop in ranges], + ) + chunks = [] + for item, (start, stop) in zip(data, ranges): + future = uproot.source.futures.TrivialFuture(item) + chunk = uproot.source.chunk.Chunk(self, start, stop, future) + uproot.source.chunk.notifier(chunk, notifications)() + chunks.append(chunk) + return chunks + + @property + def num_bytes(self): + """ + The number of bytes in the file. + """ + return self._fs.size(self._file_path) + + @property + def closed(self): + """ + True if the associated file/connection/thread pool is closed; False + otherwise. + """ + return False diff --git a/tests/test_0692_fsspec.py b/tests/test_0692_fsspec.py new file mode 100644 index 000000000..8cf1afca8 --- /dev/null +++ b/tests/test_0692_fsspec.py @@ -0,0 +1,73 @@ +# BSD 3-Clause License; see https://github.com/scikit-hep/uproot4/blob/main/LICENSE + +import pytest + +import uproot +import uproot.source.fsspec + + +@pytest.mark.network +def test_open_fsspec_http(): + with uproot.open( + "https://github.com/CoffeaTeam/coffea/raw/master/tests/samples/nano_dy.root", + http_handler=uproot.source.fsspec.FSSpecSource, + ) as f: + data = f["Events/MET_pt"].array(library="np") + assert len(data) == 40 + + +@pytest.mark.network +def test_open_fsspec_github(): + pytest.skip("not working yet") + with uproot.open( + "github://CoffeaTeam:coffea@master/tests/samples/nano_dy.root", + http_handler=uproot.source.fsspec.FSSpecSource, + ) as f: + data = f["Events/MET_pt"].array(library="np") + assert len(data) == 40 + + +@pytest.mark.network +def test_open_fsspec_local(tmp_path): + url = "https://github.com/CoffeaTeam/coffea/raw/master/tests/samples/nano_dy.root" + + # download file to local + local_path = str(tmp_path / "nano_dy.root") + import fsspec + + with fsspec.open(url) as f: + with open(local_path, "wb") as fout: + fout.write(f.read()) + + with uproot.open( + local_path, + file_handler=uproot.source.fsspec.FSSpecSource, + ) as f: + data = f["Events/MET_pt"].array(library="np") + assert len(data) == 40 + + +@pytest.mark.network +def test_open_fsspec_s3(): + pytest.importorskip("s3fs") + + with uproot.open( + "s3://pivarski-princeton/pythia_ppZee_run17emb.picoDst.root:PicoDst", + anon=True, + s3_handler=uproot.source.fsspec.FSSpecSource, + ) as f: + data = f["Event/Event.mEventId"].array(library="np") + assert len(data) == 8004 + + +@pytest.mark.network +@pytest.mark.xrootd +def test_open_fsspec_xrootd(): + pytest.importorskip("XRootD") + with uproot.open( + "root://eospublic.cern.ch//eos/root-eos/cms_opendata_2012_nanoaod/Run2012B_DoubleMuParked.root", + xrootd_handler=uproot.source.fsspec.FSSpecSource, + ) as f: + data = f["Events/run"].array(library="np", entry_stop=20) + assert len(data) == 20 + assert (data == 194778).all()