Skip to content

Commit

Permalink
feat: adding a very basic FSSpecSource (#967)
Browse files Browse the repository at this point in the history
* Initial fsspec source implementation

* Some basic test

* Actually add the test

* Skip test if XRootD module not available

* Keep a file handle active to speed up individual chunk requests

* add `s3_handler`, `use_threads` to exclusion list for fsspec options

* remove aiohttp

* add aiohttp

* add comment, refactor exclusion

* add fsspec to dependencies

* move fsspec dependencies to dev instead of test

* load default options

* fsspec open mode set to "r" from "rb" to avoid error on github spec

* add s3fs to test deps

* rename fsspec test (replace hyphen with underscore)

* add tests for other protocols

* remove read-only from fsspec open

* add fsspec dependencies in test insted of dev

* add skip to github test

* remove top level skip

* attempt to fix ci

* add back fsspec

* remove commented s3fs

* Address #967 (comment)

---------

Co-authored-by: Nick Smith <nick.smith@cern.ch>
  • Loading branch information
lobis and nsmith- authored Oct 4, 2023
1 parent 64da20c commit 546b0cb
Show file tree
Hide file tree
Showing 3 changed files with 202 additions and 0 deletions.
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ dev = [
test = [
"lz4",
"minio",
"aiohttp",
"fsspec",
"fsspec-xrootd",
"pytest>=6",
"pytest-timeout",
"pytest-rerunfailures",
Expand Down
126 changes: 126 additions & 0 deletions src/uproot/source/fsspec.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
import uproot.source.chunk


class FSSpecSource(uproot.source.chunk.Source):
"""
Args:
file_path (str): A URL for the file to open.
**kwargs (dict): any extra arguments to be forwarded to the particular
FileSystem instance constructor. This might include S3 access keys,
or HTTP headers, etc.
A :doc:`uproot.source.chunk.Source` that uses FSSpec's cat_ranges feature
to get many chunks in one request.
"""

def __init__(self, file_path, **kwargs):
import fsspec.core

# TODO: is timeout always valid?

# Remove uproot-specific options (should be done earlier)
exclude_keys = set(uproot.reading.open.defaults.keys())
opts = {k: v for k, v in kwargs.items() if k not in exclude_keys}

self._fs, self._file_path = fsspec.core.url_to_fs(file_path, **opts)
# TODO: set mode to "read-only" in a way that works for all filesystems
self._file = self._fs.open(self._file_path)
self._fh = None
self._num_requests = 0
self._num_requested_chunks = 0
self._num_requested_bytes = 0
self.__enter__()

def __repr__(self):
path = repr(self._file_path)
if len(self._file_path) > 10:
path = repr("..." + self._file_path[-10:])
return f"<{type(self).__name__} {path} at 0x{id(self):012x}>"

def __enter__(self):
self._fh = self._file.__enter__()
return self

def __exit__(self, exception_type, exception_value, traceback):
self._fh = None
self._file.__exit__(exception_type, exception_value, traceback)

def chunk(self, start, stop):
"""
Args:
start (int): Seek position of the first byte to include.
stop (int): Seek position of the first byte to exclude
(one greater than the last byte to include).
Request a byte range of data from the file as a
:doc:`uproot.source.chunk.Chunk`.
"""
self._num_requests += 1
self._num_requested_chunks += 1
self._num_requested_bytes += stop - start
if self._fh:
self._fh.seek(start)
data = self._fh.read(stop - start)
else:
data = self._fs.cat_file(self._file_path, start, stop)
future = uproot.source.futures.TrivialFuture(data)
return uproot.source.chunk.Chunk(self, start, stop, future)

def chunks(self, ranges, notifications):
"""
Args:
ranges (list of (int, int) 2-tuples): Intervals to fetch
as (start, stop) pairs in a single request, if possible.
notifications (``queue.Queue``): Indicator of completed
chunks. After each gets filled, it is ``put`` on the
queue; a listener should ``get`` from this queue
``len(ranges)`` times.
Request a set of byte ranges from the file.
This method has two outputs:
* The method returns a list of unfilled
:doc:`uproot.source.chunk.Chunk` objects, which get filled
in a background thread. If you try to read data from an
unfilled chunk, it will wait until it is filled.
* The method also puts the same :doc:`uproot.source.chunk.Chunk`
objects onto the ``notifications`` queue as soon as they are
filled.
Reading data from chunks on the queue can be more efficient than
reading them from the returned list. The total reading time is the
same, but work on the filled chunks can be better parallelized if
it is triggered by already-filled chunks, rather than waiting for
chunks to be filled.
"""
self._num_requests += 1
self._num_requested_chunks += len(ranges)
self._num_requested_bytes += sum(stop - start for start, stop in ranges)
data = self._fs.cat_ranges(
[self._file_path] * len(ranges),
[start for start, _ in ranges],
[stop for _, stop in ranges],
)
chunks = []
for item, (start, stop) in zip(data, ranges):
future = uproot.source.futures.TrivialFuture(item)
chunk = uproot.source.chunk.Chunk(self, start, stop, future)
uproot.source.chunk.notifier(chunk, notifications)()
chunks.append(chunk)
return chunks

@property
def num_bytes(self):
"""
The number of bytes in the file.
"""
return self._fs.size(self._file_path)

@property
def closed(self):
"""
True if the associated file/connection/thread pool is closed; False
otherwise.
"""
return False
73 changes: 73 additions & 0 deletions tests/test_0692_fsspec.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# BSD 3-Clause License; see https://github.com/scikit-hep/uproot4/blob/main/LICENSE

import pytest

import uproot
import uproot.source.fsspec


@pytest.mark.network
def test_open_fsspec_http():
with uproot.open(
"https://github.com/CoffeaTeam/coffea/raw/master/tests/samples/nano_dy.root",
http_handler=uproot.source.fsspec.FSSpecSource,
) as f:
data = f["Events/MET_pt"].array(library="np")
assert len(data) == 40


@pytest.mark.network
def test_open_fsspec_github():
pytest.skip("not working yet")
with uproot.open(
"github://CoffeaTeam:coffea@master/tests/samples/nano_dy.root",
http_handler=uproot.source.fsspec.FSSpecSource,
) as f:
data = f["Events/MET_pt"].array(library="np")
assert len(data) == 40


@pytest.mark.network
def test_open_fsspec_local(tmp_path):
url = "https://github.com/CoffeaTeam/coffea/raw/master/tests/samples/nano_dy.root"

# download file to local
local_path = str(tmp_path / "nano_dy.root")
import fsspec

with fsspec.open(url) as f:
with open(local_path, "wb") as fout:
fout.write(f.read())

with uproot.open(
local_path,
file_handler=uproot.source.fsspec.FSSpecSource,
) as f:
data = f["Events/MET_pt"].array(library="np")
assert len(data) == 40


@pytest.mark.network
def test_open_fsspec_s3():
pytest.importorskip("s3fs")

with uproot.open(
"s3://pivarski-princeton/pythia_ppZee_run17emb.picoDst.root:PicoDst",
anon=True,
s3_handler=uproot.source.fsspec.FSSpecSource,
) as f:
data = f["Event/Event.mEventId"].array(library="np")
assert len(data) == 8004


@pytest.mark.network
@pytest.mark.xrootd
def test_open_fsspec_xrootd():
pytest.importorskip("XRootD")
with uproot.open(
"root://eospublic.cern.ch//eos/root-eos/cms_opendata_2012_nanoaod/Run2012B_DoubleMuParked.root",
xrootd_handler=uproot.source.fsspec.FSSpecSource,
) as f:
data = f["Events/run"].array(library="np", entry_stop=20)
assert len(data) == 20
assert (data == 194778).all()

0 comments on commit 546b0cb

Please sign in to comment.