diff --git a/.all-contributorsrc b/.all-contributorsrc index 7ffdff5e1..a5ed074fd 100644 --- a/.all-contributorsrc +++ b/.all-contributorsrc @@ -506,6 +506,15 @@ "contributions": [ "doc" ] + }, + { + "login": "GaetanLepage", + "name": "Gaétan Lepage", + "avatar_url": "https://avatars.githubusercontent.com/u/33058747?v=4", + "profile": "http://glepage.com", + "contributions": [ + "test" + ] } ], "contributorsPerLine": 7, diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 998444e07..742949656 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -30,7 +30,7 @@ repos: - repo: https://github.com/asottile/pyupgrade - rev: v3.13.0 + rev: v3.15.0 hooks: - id: pyupgrade args: ["--py38-plus"] diff --git a/README.md b/README.md index 82e07b561..d58bb564b 100644 --- a/README.md +++ b/README.md @@ -178,6 +178,7 @@ Thanks especially to the gracious help of Uproot contributors (including the [or ioanaif
ioanaif

💻 OTABI Tomoya
OTABI Tomoya

⚠️ Jost Migenda
Jost Migenda

📖 + Gaétan Lepage
Gaétan Lepage

⚠️ diff --git a/src/uproot/const.py b/src/uproot/const.py index 731453c98..0d4424b5a 100644 --- a/src/uproot/const.py +++ b/src/uproot/const.py @@ -129,18 +129,25 @@ 7: "float64", 8: "float32", 9: "float16", - 10: "int64", - 11: "int32", - 12: "int16", - 13: "int8", - 14: "uint32", # SplitIndex64 delta encoding - 15: "uint64", # SplitIndex32 delta encoding + 10: "uint64", + 11: "uint32", + 12: "uint16", + 13: "uint8", + 14: "uint64", # SplitIndex64 delta encoding + 15: "uint32", # SplitIndex32 delta encoding 16: "float64", # split 17: "float32", # split 18: "float16", # split - 19: "int64", # split - 20: "int32", # split - 21: "int16", # split + 19: "uint64", # split + 20: "uint32", # split + 21: "uint16", # split + 22: "int64", + 23: "int32", + 24: "int16", + 25: "int8", + 26: "int64", # split + zigzag encoding + 27: "int32", # split + zigzag encoding + 28: "int16", # split + zigzag encoding } rntuple_col_num_to_size_dict = { 1: 64, @@ -156,14 +163,21 @@ 11: 32, 12: 16, 13: 8, - 14: 32, # SplitIndex64 delta encoding - 15: 64, # SplitIndex32 delta encoding + 14: 64, # SplitIndex64 delta encoding + 15: 32, # SplitIndex32 delta encoding 16: 64, # split 17: 32, # split 18: 16, # split 19: 64, # split 20: 32, # split 21: 16, # split + 22: 64, + 23: 32, + 24: 16, + 25: 8, + 26: 64, # split + zigzag encoding + 27: 32, # split + zigzag encoding + 28: 16, # split + zigzag encoding } rntuple_col_type_to_num_dict = { @@ -176,10 +190,10 @@ "real64": 7, "real32": 8, "real16": 9, - "int64": 10, - "int32": 11, - "int16": 12, - "int8": 13, + "uint64": 10, + "uint32": 11, + "uint16": 12, + "uint8": 13, "splitindex64": 14, "splitindex32": 15, "splitreal64": 16, @@ -188,6 +202,13 @@ "splitin64": 19, "splitint32": 20, "splitint16": 21, + "int64": 22, + "int32": 23, + "int16": 24, + "int8": 25, + "splitzigzagint64": 26, + "splitzigzagint32": 27, + "splitzigzagint16": 28, } rntuple_role_leaf = 0 diff --git a/src/uproot/models/RNTuple.py b/src/uproot/models/RNTuple.py index 7bb3f3d52..b3d4fe871 100644 --- a/src/uproot/models/RNTuple.py +++ b/src/uproot/models/RNTuple.py @@ -28,6 +28,10 @@ _rntuple_cluster_summary_format = struct.Struct("> 1 ^ -(n & 1) + + def _envelop_header(chunk, cursor, context): env_version, min_version = cursor.fields( chunk, uproot.const._rntuple_frame_format, context @@ -326,7 +330,7 @@ def to_akform(self): form = ak.forms.RecordForm(recordlist, topnames, form_key="toplevel") return form - def read_pagedesc(self, destination, desc, dtype_str, dtype): + def read_pagedesc(self, destination, desc, dtype_str, dtype, nbits, split): loc = desc.locator context = {} # bool in RNTuple is always stored as bits @@ -339,6 +343,44 @@ def read_pagedesc(self, destination, desc, dtype_str, dtype): content = cursor.array( decomp_chunk, num_elements_toread, dtype, context, move=False ) + + if split: + content = content.view(numpy.uint8) + + if nbits == 16: + # AAAAABBBBB needs to become + # ABABABABAB + res = numpy.empty(len(content), numpy.uint8) + res[0::2] = content[len(res) * 0 // 2 : len(res) * 1 // 2] + res[1::2] = content[len(res) * 1 // 2 : len(res) * 2 // 2] + res = res.view(numpy.uint16) + + elif nbits == 32: + # AAAAABBBBBCCCCCDDDDD needs to become + # ABCDABCDABCDABCDABCD + res = numpy.empty(len(content), numpy.uint8) + res[0::4] = content[len(res) * 0 // 4 : len(res) * 1 // 4] + res[1::4] = content[len(res) * 1 // 4 : len(res) * 2 // 4] + res[2::4] = content[len(res) * 2 // 4 : len(res) * 3 // 4] + res[3::4] = content[len(res) * 3 // 4 : len(res) * 4 // 4] + res = res.view(numpy.uint32) + + elif nbits == 64: + # AAAAABBBBBCCCCCDDDDDEEEEEFFFFFGGGGGHHHHH needs to become + # ABCDEFGHABCDEFGHABCDEFGHABCDEFGHABCDEFGH + res = numpy.empty(len(content), numpy.uint8) + res[0::8] = content[len(res) * 0 // 8 : len(res) * 1 // 8] + res[1::8] = content[len(res) * 1 // 8 : len(res) * 2 // 8] + res[2::8] = content[len(res) * 2 // 8 : len(res) * 3 // 8] + res[3::8] = content[len(res) * 3 // 8 : len(res) * 4 // 8] + res[4::8] = content[len(res) * 4 // 8 : len(res) * 5 // 8] + res[5::8] = content[len(res) * 5 // 8 : len(res) * 6 // 8] + res[6::8] = content[len(res) * 6 // 8 : len(res) * 7 // 8] + res[7::8] = content[len(res) * 7 // 8 : len(res) * 8 // 8] + res = res.view(numpy.uint64) + + content = res + if isbit: content = ( numpy.unpackbits(content.view(dtype=numpy.uint8)) @@ -368,14 +410,24 @@ def read_col_page(self, ncol, cluster_i): total_len = numpy.sum([desc.num_elements for desc in pagelist]) res = numpy.empty(total_len, dtype) tracker = 0 + split = 14 <= dtype_byte <= 21 or 26 <= dtype_byte <= 28 + nbits = uproot.const.rntuple_col_num_to_size_dict[dtype_byte] for page_desc in pagelist: n_elements = page_desc.num_elements tracker_end = tracker + n_elements - self.read_pagedesc(res[tracker:tracker_end], page_desc, dtype_str, dtype) + self.read_pagedesc( + res[tracker:tracker_end], page_desc, dtype_str, dtype, nbits, split + ) tracker = tracker_end if dtype_byte <= uproot.const.rntuple_col_type_to_num_dict["index32"]: res = numpy.insert(res, 0, 0) # for offsets + zigzag = 26 <= dtype_byte <= 28 + delta = 14 <= dtype_byte <= 15 + if zigzag: + res = from_zigzag(res) + elif delta: + numpy.cumsum(res) return res def arrays( @@ -645,6 +697,15 @@ def read(self, chunk, cursor, context): return out + def read_extension_header(self, out, chunk, cursor, context): + out.field_records = self.list_field_record_frames.read(chunk, cursor, context) + out.column_records = self.list_column_record_frames.read(chunk, cursor, context) + out.alias_columns = self.list_alias_column_frames.read(chunk, cursor, context) + out.extra_type_infos = self.list_extra_type_info_reader.read( + chunk, cursor, context + ) + return out + class ColumnGroupRecordReader: def read(self, chunk, cursor, context): @@ -672,9 +733,29 @@ def read(self, chunk, cursor, context): return out +class RNTupleSchemaExtension: + def read(self, chunk, cursor, context): + out = MetaData(type(self).__name__) + out.size = cursor.field(chunk, struct.Struct(" str: """ A path to the file (or URL). """ @@ -49,7 +49,7 @@ class Source: the file. """ - def chunk(self, start, stop) -> Chunk: + def chunk(self, start: int, stop: int) -> Chunk: """ Args: start (int): Seek position of the first byte to include. @@ -60,7 +60,9 @@ def chunk(self, start, stop) -> Chunk: :doc:`uproot.source.chunk.Chunk`. """ - def chunks(self, ranges, notifications: queue.Queue) -> list[Chunk]: + def chunks( + self, ranges: list[(int, int)], notifications: queue.Queue + ) -> list[Chunk]: """ Args: ranges (list of (int, int) 2-tuples): Intervals to fetch @@ -90,7 +92,7 @@ def chunks(self, ranges, notifications: queue.Queue) -> list[Chunk]: """ @property - def file_path(self): + def file_path(self) -> str: """ A path to the file (or URL). """ @@ -154,7 +156,7 @@ def __repr__(self): type(self).__name__, path, self.num_workers, id(self) ) - def chunk(self, start, stop) -> Chunk: + def chunk(self, start: int, stop: int) -> Chunk: self._num_requests += 1 self._num_requested_chunks += 1 self._num_requested_bytes += stop - start @@ -164,7 +166,9 @@ def chunk(self, start, stop) -> Chunk: self._executor.submit(future) return chunk - def chunks(self, ranges, notifications: queue.Queue) -> list[Chunk]: + def chunks( + self, ranges: list[(int, int)], notifications: queue.Queue + ) -> list[Chunk]: self._num_requests += 1 self._num_requested_chunks += len(ranges) self._num_requested_bytes += sum(stop - start for start, stop in ranges) @@ -186,7 +190,7 @@ def executor(self): return self._executor @property - def num_workers(self): + def num_workers(self) -> int: """ The number of :doc:`uproot.source.futures.ResourceWorker` threads in the :doc:`uproot.source.futures.ResourceThreadPoolExecutor`. @@ -240,7 +244,7 @@ class Chunk: _dtype = numpy.dtype(numpy.uint8) @classmethod - def wrap(cls, source, data, start=0): + def wrap(cls, source: Source, data: numpy.ndarray | bytes, start: int = 0): """ Args: source (:doc:`uproot.source.chunk.Source`): Source to attach to @@ -255,7 +259,9 @@ def wrap(cls, source, data, start=0): future = uproot.source.futures.TrivialFuture(data) return Chunk(source, start, start + len(data), future) - def __init__(self, source, start, stop, future, is_memmap=False): + def __init__( + self, source: Source, start: int, stop: int, future, is_memmap: bool = False + ): self._source = source self._start = start self._stop = stop @@ -267,21 +273,21 @@ def __repr__(self): return f"" @property - def source(self): + def source(self) -> Source: """ Source from which this Chunk is derived. """ return self._source @property - def start(self): + def start(self) -> int: """ Seek position of the first byte to include. """ return self._start @property - def stop(self): + def stop(self) -> int: """ Seek position of the first byte to exclude (one greater than the last byte to include). @@ -297,7 +303,7 @@ def future(self): return self._future @property - def is_memmap(self): + def is_memmap(self) -> bool: """ If True, the `raw_data` is or will be a view into a memmap file, which must be handled carefully. Accessing that data after the file is closed @@ -326,7 +332,7 @@ def detach_memmap(self): else: return self - def __contains__(self, range): + def __contains__(self, range: tuple[int, int]): start, stop = range if isinstance(start, uproot.source.cursor.Cursor): start = start.index @@ -334,7 +340,7 @@ def __contains__(self, range): stop = stop.index return self._start <= start and stop <= self._stop - def wait(self, insist=True): + def wait(self, insist: bool = True): """ Args: insist (bool or int): If True, raise an OSError if ``raw_data`` does @@ -368,7 +374,7 @@ def wait(self, insist=True): self._future = None @property - def raw_data(self): + def raw_data(self) -> numpy.ndarray | bytes: """ Data from the Source as a ``numpy.ndarray`` of ``numpy.uint8``. @@ -379,7 +385,9 @@ def raw_data(self): self.wait() return self._raw_data - def get(self, start, stop, cursor, context): + def get( + self, start: int, stop: int, cursor: uproot.source.cursor.Cursor, context: dict + ) -> numpy.ndarray: """ Args: start (int): Seek position of the first byte to include. @@ -417,7 +425,9 @@ def get(self, start, stop, cursor, context): self._source.file_path, ) - def remainder(self, start, cursor, context): + def remainder( + self, start: int, cursor: uproot.source.cursor.Cursor, context: dict + ) -> numpy.ndarray: """ Args: start (int): Seek position of the first byte to include. diff --git a/src/uproot/source/cursor.py b/src/uproot/source/cursor.py index fc0c4f9e0..f28386484 100644 --- a/src/uproot/source/cursor.py +++ b/src/uproot/source/cursor.py @@ -6,6 +6,7 @@ the lowest level of interpretation (numbers, strings, raw arrays, etc.). """ +from __future__ import annotations import datetime import struct @@ -44,7 +45,7 @@ class Cursor: requested by :doc:`uproot.deserialization.read_object_any`. """ - def __init__(self, index, origin=0, refs=None): + def __init__(self, index: int, origin: int = 0, refs: dict | None = None): self._index = index self._origin = origin self._refs = refs @@ -66,7 +67,7 @@ def __repr__(self): return f"Cursor({self._index}{o}{r})" @property - def index(self): + def index(self) -> int: """ Global seek position in the ROOT file or local position in an uncompressed :doc:`uproot.source.chunk.Chunk`. @@ -74,7 +75,7 @@ def index(self): return self._index @property - def origin(self): + def origin(self) -> int: """ Zero-point for numerical keys in :ref:`uproot.source.cursor.Cursor.refs`. @@ -82,7 +83,7 @@ def origin(self): return self._origin @property - def refs(self): + def refs(self) -> dict: """ References to data already read in :doc:`uproot.deserialization.read_object_any`. @@ -91,7 +92,7 @@ def refs(self): self._refs = {} return self._refs - def displacement(self, other=None): + def displacement(self, other: Cursor = None) -> int: """ The number of bytes between this :doc:`uproot.source.cursor.Cursor` and its :ref:`uproot.source.cursor.Cursor.origin` (if None) @@ -105,7 +106,7 @@ def displacement(self, other=None): else: return self._index - other._index - def copy(self, link_refs=True): + def copy(self, link_refs: bool = True) -> Cursor: """ Returns a copy of this :doc:`uproot.source.cursor.Cursor`. If ``link_refs`` is True, any :ref:`uproot.source.cursor.Cursor.refs` @@ -116,14 +117,14 @@ def copy(self, link_refs=True): else: return Cursor(self._index, origin=self._origin, refs=dict(self._refs)) - def move_to(self, index): + def move_to(self, index: int): """ Move the :ref:`uproot.source.cursor.Cursor.index` to a specified seek position. """ self._index = index - def skip(self, num_bytes): + def skip(self, num_bytes: int): """ Move the :ref:`uproot.source.cursor.Cursor.index` forward ``num_bytes``. @@ -148,7 +149,7 @@ def skip_after(self, obj): ) self._index = start_cursor.index + num_bytes - def skip_over(self, chunk, context): + def skip_over(self, chunk: uproot.source.chunk.Chunk, context: dict): """ Args: chunk (:doc:`uproot.source.chunk.Chunk`): Buffer of contiguous data @@ -171,7 +172,13 @@ def skip_over(self, chunk, context): self._index += num_bytes return True - def fields(self, chunk, format, context, move=True): + def fields( + self, + chunk: uproot.source.chunk.Chunk, + format: struct.Struct, + context: dict, + move: bool = True, + ): """ Args: chunk (:doc:`uproot.source.chunk.Chunk`): Buffer of contiguous data @@ -193,7 +200,13 @@ def fields(self, chunk, format, context, move=True): self._index = stop return format.unpack(chunk.get(start, stop, self, context)) - def field(self, chunk, format, context, move=True): + def field( + self, + chunk: uproot.source.chunk.Chunk, + format: struct.Struct, + context: dict, + move: bool = True, + ): """ Args: chunk (:doc:`uproot.source.chunk.Chunk`): Buffer of contiguous data @@ -215,7 +228,9 @@ def field(self, chunk, format, context, move=True): self._index = stop return format.unpack(chunk.get(start, stop, self, context))[0] - def double32(self, chunk, context, move=True): + def double32( + self, chunk: uproot.source.chunk.Chunk, context: dict, move: bool = True + ) -> float: """ Args: chunk (:doc:`uproot.source.chunk.Chunk`): Buffer of contiguous data @@ -235,7 +250,13 @@ def double32(self, chunk, context, move=True): self._index = stop return _raw_double32.unpack(chunk.get(start, stop, self, context))[0] - def float16(self, chunk, num_bits, context, move=True): + def float16( + self, + chunk: uproot.source.chunk.Chunk, + num_bits: int, + context: dict, + move: bool = True, + ) -> float: """ Args: chunk (:doc:`uproot.source.chunk.Chunk`): Buffer of contiguous data @@ -268,7 +289,7 @@ def float16(self, chunk, num_bits, context, move=True): return out.item() - def byte(self, chunk, context, move=True): + def byte(self, chunk: uproot.source.chunk.Chunk, context: dict, move: bool = True): """ Args: chunk (:doc:`uproot.source.chunk.Chunk`): Buffer of contiguous data @@ -306,7 +327,14 @@ def bytes(self, chunk, length, context, move=True): self._index = stop return chunk.get(start, stop, self, context) - def array(self, chunk, length, dtype, context, move=True): + def array( + self, + chunk: uproot.source.chunk.Chunk, + length: int, + dtype: numpy.dtype, + context: dict, + move: bool = True, + ) -> numpy.ndarray: """ Args: chunk (:doc:`uproot.source.chunk.Chunk`): Buffer of contiguous data @@ -330,7 +358,9 @@ def array(self, chunk, length, dtype, context, move=True): _u1 = numpy.dtype("u1") _i4 = numpy.dtype(">i4") - def bytestring(self, chunk, context, move=True): + def bytestring( + self, chunk: uproot.source.chunk.Chunk, context: dict, move: bool = True + ) -> bytes: """ Args: chunk (:doc:`uproot.source.chunk.Chunk`): Buffer of contiguous data @@ -361,7 +391,9 @@ def bytestring(self, chunk, context, move=True): self._index = stop return uproot._util.tobytes(chunk.get(start, stop, self, context)) - def string(self, chunk, context, move=True): + def string( + self, chunk: uproot.source.chunk.Chunk, context: dict, move: bool = True + ) -> str: """ Args: chunk (:doc:`uproot.source.chunk.Chunk`): Buffer of contiguous data @@ -382,7 +414,13 @@ def string(self, chunk, context, move=True): errors="surrogateescape" ) - def bytestring_with_length(self, chunk, context, length, move=True): + def bytestring_with_length( + self, + chunk: uproot.source.chunk.Chunk, + context: dict, + length: int, + move: bool = True, + ) -> bytes: """ Args: chunk (:doc:`uproot.source.chunk.Chunk`): Buffer of contiguous data @@ -403,7 +441,13 @@ def bytestring_with_length(self, chunk, context, length, move=True): data = chunk.get(start, stop, self, context) return uproot._util.tobytes(data) - def string_with_length(self, chunk, context, length, move=True): + def string_with_length( + self, + chunk: uproot.source.chunk.Chunk, + context: dict, + length: int, + move: bool = True, + ) -> str: """ Args: chunk (:doc:`uproot.source.chunk.Chunk`): Buffer of contiguous data @@ -421,7 +465,9 @@ def string_with_length(self, chunk, context, length, move=True): errors="surrogateescape" ) - def classname(self, chunk, context, move=True): + def classname( + self, chunk: uproot.source.chunk.Chunk, context: dict, move: bool = True + ) -> str: """ Args: chunk (:doc:`uproot.source.chunk.Chunk`): Buffer of contiguous data @@ -455,7 +501,9 @@ def classname(self, chunk, context, move=True): errors="surrogateescape" ) - def rntuple_string(self, chunk, context, move=True): + def rntuple_string( + self, chunk: uproot.source.chunk.Chunk, context: dict, move: bool = True + ) -> str: if move: length = self.field(chunk, _rntuple_string_length, context) return self.string_with_length(chunk, context, length) @@ -465,17 +513,19 @@ def rntuple_string(self, chunk, context, move=True): self._index = index return out - def rntuple_datetime(self, chunk, context, move=True): + def rntuple_datetime( + self, chunk: uproot.source.chunk.Chunk, context: dict, move: bool = True + ) -> datetime.datetime: raw = self.field(chunk, _rntuple_datetime, context, move=move) return datetime.datetime.fromtimestamp(raw) def debug( self, - chunk, - context={}, # noqa: B006 (it's not actually mutated in the function) - limit_bytes=None, - dtype=None, - offset=0, + chunk: uproot.source.chunk.Chunk, + context: dict = {}, # noqa: B006 (it's not actually mutated in the function) + limit_bytes: int | None = None, + dtype: numpy.dtype = None, + offset: int = 0, stream=sys.stdout, ): """ diff --git a/src/uproot/source/file.py b/src/uproot/source/file.py index feae3d0b9..2cf03c01c 100644 --- a/src/uproot/source/file.py +++ b/src/uproot/source/file.py @@ -32,7 +32,7 @@ class FileResource(uproot.source.chunk.Resource): A :doc:`uproot.source.chunk.Resource` for a simple file handle. """ - def __init__(self, file_path): + def __init__(self, file_path: str): self._file_path = file_path try: self._file = open(self._file_path, "rb") @@ -56,7 +56,7 @@ def __enter__(self): def __exit__(self, exception_type, exception_value, traceback): self._file.__exit__(exception_type, exception_value, traceback) - def get(self, start, stop): + def get(self, start: int, stop: int) -> bytes: """ Args: start (int): Seek position of the first byte to include. @@ -69,7 +69,7 @@ def get(self, start, stop): return self._file.read(stop - start) @staticmethod - def future(source, start, stop): + def future(source: uproot.source.chunk.Source, start: int, stop: int): """ Args: source (:doc:`uproot.source.file.MultithreadedFileSource`): The @@ -99,7 +99,7 @@ class MemmapSource(uproot.source.chunk.Source): _dtype = uproot.source.chunk.Chunk._dtype - def __init__(self, file_path, **options): + def __init__(self, file_path: str, **options): self._num_fallback_workers = options["num_fallback_workers"] self._fallback_opts = options self._num_requests = 0 @@ -139,7 +139,7 @@ def __repr__(self): fallback = " with fallback" return f"<{type(self).__name__} {path}{fallback} at 0x{id(self):012x}>" - def chunk(self, start, stop) -> uproot.source.chunk.Chunk: + def chunk(self, start: int, stop: int) -> uproot.source.chunk.Chunk: if self._fallback is None: if self.closed: raise OSError(f"memmap is closed for file {self._file_path}") @@ -156,7 +156,7 @@ def chunk(self, start, stop) -> uproot.source.chunk.Chunk: return self._fallback.chunk(start, stop) def chunks( - self, ranges, notifications: queue.Queue + self, ranges: list[(int, int)], notifications: queue.Queue ) -> list[uproot.source.chunk.Chunk]: if self._fallback is None: if self.closed: @@ -242,7 +242,7 @@ class MultithreadedFileSource(uproot.source.chunk.MultithreadedSource): ResourceClass = FileResource - def __init__(self, file_path, **options): + def __init__(self, file_path: str, **options): self._num_requests = 0 self._num_requested_chunks = 0 self._num_requested_bytes = 0 diff --git a/src/uproot/source/fsspec.py b/src/uproot/source/fsspec.py index 7f4680f3a..fd4f59672 100644 --- a/src/uproot/source/fsspec.py +++ b/src/uproot/source/fsspec.py @@ -22,7 +22,7 @@ class FSSpecSource(uproot.source.chunk.Source): to get many chunks in one request. """ - def __init__(self, file_path, **options): + def __init__(self, file_path: str, **options): import fsspec.core default_options = uproot.reading.open.defaults @@ -76,7 +76,7 @@ def __exit__(self, exception_type, exception_value, traceback): self._executor.shutdown() self._file.__exit__(exception_type, exception_value, traceback) - def chunk(self, start, stop) -> uproot.source.chunk.Chunk: + def chunk(self, start: int, stop: int) -> uproot.source.chunk.Chunk: """ Args: start (int): Seek position of the first byte to include. @@ -98,7 +98,7 @@ def chunk(self, start, stop) -> uproot.source.chunk.Chunk: return uproot.source.chunk.Chunk(self, start, stop, future) def chunks( - self, ranges, notifications: queue.Queue + self, ranges: list[(int, int)], notifications: queue.Queue ) -> list[uproot.source.chunk.Chunk]: """ Args: diff --git a/src/uproot/source/futures.py b/src/uproot/source/futures.py index e5bca5d53..f10b84e5a 100644 --- a/src/uproot/source/futures.py +++ b/src/uproot/source/futures.py @@ -21,6 +21,8 @@ These classes implement a *subset* of Python's Future and Executor interfaces. """ +from __future__ import annotations + import os import queue import sys @@ -77,7 +79,7 @@ def submit(self, task, *args): """ return TrivialFuture(task(*args)) - def shutdown(self, wait=True): + def shutdown(self, wait: bool = True): """ Does nothing, since this object does not have threads to stop. """ @@ -144,13 +146,13 @@ class Worker(threading.Thread): :doc:`uproot.source.futures.ThreadPoolExecutor`. """ - def __init__(self, work_queue): + def __init__(self, work_queue: queue.Queue): super().__init__() self.daemon = True self._work_queue = work_queue @property - def work_queue(self): + def work_queue(self) -> queue.Queue: """ The worker calls ``get`` on this queue for tasks in the form of :doc:`uproot.source.futures.Future` objects and runs them. If it ever @@ -188,7 +190,7 @@ class ThreadPoolExecutor: class. """ - def __init__(self, max_workers=None): + def __init__(self, max_workers: int | None = None): if max_workers is None: if hasattr(os, "cpu_count"): self._max_workers = os.cpu_count() @@ -224,7 +226,7 @@ def num_workers(self) -> int: return len(self._workers) @property - def workers(self): + def workers(self) -> list[Worker]: """ A list of workers (:doc:`uproot.source.futures.Worker`). """ @@ -241,7 +243,7 @@ def submit(self, task, *args): self._work_queue.put(future) return future - def shutdown(self, wait=True): + def shutdown(self, wait: bool = True): """ Stop every :doc:`uproot.source.futures.Worker` by putting None on the :ref:`uproot.source.futures.Worker.work_queue` until none of diff --git a/src/uproot/source/http.py b/src/uproot/source/http.py index ac80cf038..956f7f32d 100644 --- a/src/uproot/source/http.py +++ b/src/uproot/source/http.py @@ -17,9 +17,11 @@ from __future__ import annotations import base64 +import http.client import queue import re import sys +import urllib.parse from urllib.parse import urlparse import uproot @@ -27,7 +29,7 @@ import uproot.source.futures -def make_connection(parsed_url, timeout): +def make_connection(parsed_url: urllib.parse.ParseResult, timeout: float | None): """ Args: parsed_url (``urllib.parse.ParseResult``): The URL to connect to, which @@ -51,7 +53,7 @@ def make_connection(parsed_url, timeout): ) -def full_path(parsed_url): +def full_path(parsed_url) -> str: """ Returns the ``parsed_url.path`` with ``"?"`` and the ``parsed_url.query`` if it exists, just the path otherwise. @@ -78,7 +80,7 @@ def basic_auth_headers(parsed_url): return ret -def get_num_bytes(file_path, parsed_url, timeout): +def get_num_bytes(file_path: str, parsed_url: urllib.parse.ParseResult, timeout) -> int: """ Args: file_path (str): The URL to access as a raw string. @@ -158,7 +160,7 @@ def __init__(self, file_path, timeout): self._auth_headers = basic_auth_headers(self._parsed_url) @property - def timeout(self): + def timeout(self) -> float | None: """ The timeout in seconds or None. """ @@ -184,7 +186,7 @@ def __enter__(self): def __exit__(self, exception_type, exception_value, traceback): pass - def get(self, connection, start, stop): + def get(self, connection, start: int, stop: int) -> bytes: """ Args: start (int): Seek position of the first byte to include. @@ -235,7 +237,7 @@ def get(self, connection, start, stop): connection.close() @staticmethod - def future(source, start, stop): + def future(source: uproot.source.chunk.Source, start: int, stop: int): """ Args: source (:doc:`uproot.source.http.HTTPSource` or :doc:`uproot.source.http.MultithreadedHTTPSource`): The @@ -260,7 +262,9 @@ def task(resource): return uproot.source.futures.ResourceFuture(task) @staticmethod - def multifuture(source, ranges, futures, results): + def multifuture( + source: uproot.source.chunk.Source, ranges: list[(int, int)], futures, results + ): """ Args: source (:doc:`uproot.source.http.HTTPSource`): The data source. @@ -351,7 +355,9 @@ def task(resource): ) _content_range = re.compile(b"Content-Range: bytes ([0-9]+-[0-9]+)", re.I) - def is_multipart_supported(self, ranges, response): + def is_multipart_supported( + self, ranges: list[(int, int)], response: http.client.HTTPResponse + ) -> bool: """ Helper function for :ref:`uproot.source.http.HTTPResource.multifuture` to check for multipart GET support. @@ -368,7 +374,13 @@ def is_multipart_supported(self, ranges, response): else: return True - def handle_no_multipart(self, source, ranges, futures, results): + def handle_no_multipart( + self, + source: uproot.source.chunk.Source, + ranges: list[(int, int)], + futures, + results, + ): """ Helper function for :ref:`uproot.source.http.HTTPResource.multifuture` to handle a lack of multipart GET support. @@ -383,7 +395,14 @@ def handle_no_multipart(self, source, ranges, futures, results): results[chunk.start, chunk.stop] = chunk.raw_data futures[chunk.start, chunk.stop]._run(self) - def handle_multipart(self, source, futures, results, response, ranges): + def handle_multipart( + self, + source: uproot.source.chunk.Source, + futures, + results, + response: http.client.HTTPResponse, + ranges: list[(int, int)], + ): """ Helper function for :ref:`uproot.source.http.HTTPResource.multifuture` to handle the multipart GET response. @@ -477,7 +496,7 @@ def next_header(self, response_buffer): return range_string, size @staticmethod - def partfuture(results, start, stop): + def partfuture(results, start: int, stop: int): """ Returns a :doc:`uproot.source.futures.ResourceFuture` to simply select the ``(start, stop)`` item from the ``results`` dict. @@ -500,7 +519,7 @@ def __init__(self, stream): self.already_read = b"" self.stream = stream - def read(self, length): + def read(self, length: int): if length < len(self.already_read): out = self.already_read[:length] self.already_read = self.already_read[length:] @@ -514,7 +533,7 @@ def read(self, length): else: return self.stream.read(length) - def readline(self): + def readline(self) -> bytes: while True: try: index = self.already_read.index(b"\n") @@ -545,7 +564,7 @@ class HTTPSource(uproot.source.chunk.Source): ResourceClass = HTTPResource - def __init__(self, file_path, **options): + def __init__(self, file_path: str, **options): self._num_fallback_workers = options["num_fallback_workers"] self._timeout = options["timeout"] self._num_requests = 0 @@ -594,7 +613,7 @@ def __repr__(self): fallback = " with fallback" return f"<{type(self).__name__} {path}{fallback} at 0x{id(self):012x}>" - def chunk(self, start, stop) -> uproot.source.chunk.Chunk: + def chunk(self, start: int, stop: int) -> uproot.source.chunk.Chunk: self._num_requests += 1 self._num_requested_chunks += 1 self._num_requested_bytes += stop - start @@ -605,7 +624,7 @@ def chunk(self, start, stop) -> uproot.source.chunk.Chunk: return chunk def chunks( - self, ranges, notifications: queue.Queue + self, ranges: list[(int, int)], notifications: queue.Queue ) -> list[uproot.source.chunk.Chunk]: if self._fallback is None: self._num_requests += 1 diff --git a/src/uproot/source/object.py b/src/uproot/source/object.py index 25d02a74a..ce7c3ad14 100644 --- a/src/uproot/source/object.py +++ b/src/uproot/source/object.py @@ -8,6 +8,7 @@ has exactly one worker (we can't assume that the object is thread-safe). """ +from __future__ import annotations import uproot import uproot.source.chunk @@ -52,7 +53,7 @@ def __exit__(self, exception_type, exception_value, traceback): if hasattr(self._obj, "__exit__"): self._obj.__exit__(exception_type, exception_value, traceback) - def get(self, start, stop): + def get(self, start: int, stop: int): """ Args: start (int): Seek position of the first byte to include. @@ -65,7 +66,7 @@ def get(self, start, stop): return self._obj.read(stop - start) @staticmethod - def future(source, start, stop): + def future(source: uproot.source.chunk.Source, start: int, stop: int): """ Args: source (:doc:`uproot.source.object.ObjectSource`): The data source. diff --git a/src/uproot/source/s3.py b/src/uproot/source/s3.py index 44e432ae1..fbcb3adbd 100644 --- a/src/uproot/source/s3.py +++ b/src/uproot/source/s3.py @@ -4,6 +4,8 @@ This module defines a physical layer for remote files, accessed via S3. """ +from __future__ import annotations + import os from urllib.parse import parse_qsl, urlparse @@ -27,7 +29,7 @@ class S3Source(uproot.source.http.HTTPSource): def __init__( self, - file_path, + file_path: str, endpoint="s3.amazonaws.com", access_key=None, secret_key=None, diff --git a/src/uproot/source/xrootd.py b/src/uproot/source/xrootd.py index 0efd8d883..0571b6713 100644 --- a/src/uproot/source/xrootd.py +++ b/src/uproot/source/xrootd.py @@ -85,7 +85,7 @@ class XRootDResource(uproot.source.chunk.Resource): A :doc:`uproot.source.chunk.Resource` for XRootD connections. """ - def __init__(self, file_path, timeout): + def __init__(self, file_path: str, timeout: float | None): self._file_path = file_path self._timeout = timeout self._open() @@ -108,7 +108,7 @@ def __setstate__(self, state): self.__dict__ = state self._open() - def _xrd_timeout(self): + def _xrd_timeout(self) -> int: if self._timeout is None: return 0 else: @@ -129,7 +129,7 @@ def _xrd_error(self, status): ) @property - def timeout(self): + def timeout(self) -> float | None: """ The timeout in seconds or None. """ @@ -159,7 +159,7 @@ def __enter__(self): def __exit__(self, exception_type, exception_value, traceback): self._file.close(timeout=self._xrd_timeout()) - def get(self, start, stop): + def get(self, start: int, stop: int) -> bytes: """ Args: start (int): Seek position of the first byte to include. @@ -176,7 +176,7 @@ def get(self, start, stop): return data @staticmethod - def future(source, start, stop): + def future(source: uproot.source.chunk.Source, start: int, stop: int): """ Args: source (:doc:`uproot.source.xrootd.MultithreadedXRootDSource`): The @@ -196,7 +196,7 @@ def task(resource): return uproot.source.futures.ResourceFuture(task) @staticmethod - def partfuture(results, start, stop): + def partfuture(results, start: int, stop: int): """ Returns a :doc:`uproot.source.futures.ResourceFuture` to simply select the ``(start, stop)`` item from the ``results`` dict. @@ -265,7 +265,7 @@ class XRootDSource(uproot.source.chunk.Source): ResourceClass = XRootDResource - def __init__(self, file_path, **options): + def __init__(self, file_path: str, **options): self._timeout = options["timeout"] self._desired_max_num_elements = options["max_num_elements"] self._use_threads = options["use_threads"] @@ -315,7 +315,7 @@ def __repr__(self): path = repr("..." + self._file_path[-10:]) return f"<{type(self).__name__} {path} at 0x{id(self):012x}>" - def chunk(self, start, stop) -> uproot.source.chunk.Chunk: + def chunk(self, start: int, stop: int) -> uproot.source.chunk.Chunk: self._num_requests += 1 self._num_requested_chunks += 1 self._num_requested_bytes += stop - start @@ -325,7 +325,7 @@ def chunk(self, start, stop) -> uproot.source.chunk.Chunk: return uproot.source.chunk.Chunk(self, start, stop, future) def chunks( - self, ranges, notifications: queue.Queue + self, ranges: list[(int, int)], notifications: queue.Queue ) -> list[uproot.source.chunk.Chunk]: self._num_requests += 1 self._num_requested_chunks += len(ranges) @@ -339,7 +339,9 @@ def chunks( # this is to track which requests were split into smaller ranges and have to be merged sub_ranges = {} - def add_request_range(start, length, sub_ranges_list): + def add_request_range( + start: int, length: int, sub_ranges_list: list[(int, int)] + ): if len(all_request_ranges[-1]) >= self._max_num_elements: all_request_ranges.append([]) all_request_ranges[-1].append((start, length)) @@ -412,7 +414,7 @@ def resource(self): return self._resource @property - def timeout(self): + def timeout(self) -> float | None: """ The timeout in seconds or None. """ @@ -455,7 +457,7 @@ class MultithreadedXRootDSource(uproot.source.chunk.MultithreadedSource): ResourceClass = XRootDResource - def __init__(self, file_path, **options): + def __init__(self, file_path: str, **options): self._num_workers = options["num_workers"] self._timeout = options["timeout"] self._use_threads = options["use_threads"] @@ -472,7 +474,7 @@ def _open(self): self._executor = uproot.source.futures.ResourceThreadPoolExecutor( [ XRootDResource(self._file_path, self._timeout) - for x in range(self._num_workers) + for _ in range(self._num_workers) ] ) else: @@ -490,7 +492,7 @@ def __setstate__(self, state): self._open() @property - def timeout(self): + def timeout(self) -> float | None: """ The timeout in seconds or None. """ diff --git a/tests/test_0630-rntuple-basics.py b/tests/test_0630-rntuple-basics.py index 207b17599..a6703e6a6 100644 --- a/tests/test_0630-rntuple-basics.py +++ b/tests/test_0630-rntuple-basics.py @@ -13,6 +13,7 @@ pytest.importorskip("awkward") +@pytest.mark.skip(reason="RNTUPLE UPDATE: ignore test with previous file for now.") def test_flat(): filename = skhep_testdata.data_path("test_ntuple_int_float.root") with uproot.open(filename) as f: diff --git a/tests/test_0705-rntuple-writing-metadata.py b/tests/test_0705-rntuple-writing-metadata.py index 968b4616e..012df70e7 100644 --- a/tests/test_0705-rntuple-writing-metadata.py +++ b/tests/test_0705-rntuple-writing-metadata.py @@ -14,6 +14,7 @@ ak = pytest.importorskip("awkward") +@pytest.mark.skip(reason="RNTUPLE UPDATE: ignore test with previous file for now.") def test_header(tmp_path): filepath = os.path.join(tmp_path, "test.root") diff --git a/tests/test_0962-RNTuple-update.py b/tests/test_0962-RNTuple-update.py new file mode 100644 index 000000000..bc4b8ef9a --- /dev/null +++ b/tests/test_0962-RNTuple-update.py @@ -0,0 +1,37 @@ +# BSD 3-Clause License; see https://github.com/scikit-hep/uproot5/blob/main/LICENSE + +import pytest +import uproot +import skhep_testdata +import numpy as np + + +def test_new_support_RNTuple_split_int32_reading(): + with uproot.open( + skhep_testdata.data_path("uproot_ntuple_int_5e4_629_01.root") + ) as f: + obj = f["ntuple"] + df = obj.arrays() + assert len(df) == 5e4 + assert len(df.one_integers) == 5e4 + assert np.all(df.one_integers == np.arange(5e4 + 1)[::-1][:-1]) + + +def test_new_support_RNTuple_bit_bool_reading(): + with uproot.open(skhep_testdata.data_path("uproot_ntuple_bit_629_01.root")) as f: + obj = f["ntuple"] + df = obj.arrays() + assert np.all(df.one_bit == np.asarray([1, 0, 0, 1, 0, 0, 1, 0, 0, 1])) + + +def test_new_support_RNTuple_split_int16_reading(): + with uproot.open( + skhep_testdata.data_path("uproot_ntuple_int_multicluster_629_01.root") + ) as f: + obj = f["ntuple"] + df = obj.arrays() + assert len(df.one_integers) == 1e8 + assert df.one_integers[0] == 2 + assert df.one_integers[-1] == 1 + assert np.all(np.unique(df.one_integers[: len(df.one_integers) // 2]) == [2]) + assert np.all(np.unique(df.one_integers[len(df.one_integers) / 2 + 1 :]) == [1]) diff --git a/tests/test_0965-inverted-axes-variances-hist-888.py b/tests/test_0965-inverted-axes-variances-hist-888.py index cbc3fd8ab..5dd33dad0 100644 --- a/tests/test_0965-inverted-axes-variances-hist-888.py +++ b/tests/test_0965-inverted-axes-variances-hist-888.py @@ -2,10 +2,10 @@ import pytest import numpy -import hist import uproot import skhep_testdata +pytest.importorskip("hist") ROOT = pytest.importorskip("ROOT")