From 4e5f85620b77297b836310d6b04c7eb252bf2db4 Mon Sep 17 00:00:00 2001 From: Jonas Haag Date: Sat, 9 Jul 2022 22:55:41 +0200 Subject: [PATCH 1/4] SAS7BDAT parser: Improve subheader lookup performance --- pandas/io/sas/sas.pyx | 72 +++++++++++++++--- pandas/io/sas/sas7bdat.py | 151 ++++++++++++++------------------------ 2 files changed, 118 insertions(+), 105 deletions(-) diff --git a/pandas/io/sas/sas.pyx b/pandas/io/sas/sas.pyx index d8591c0b033a6..1177e4223f775 100644 --- a/pandas/io/sas/sas.pyx +++ b/pandas/io/sas/sas.pyx @@ -1,13 +1,18 @@ # cython: profile=False # cython: boundscheck=False, initializedcheck=False from cython cimport Py_ssize_t +from libc.stdint cimport ( + int64_t, + uint8_t, + uint16_t, + uint32_t, + uint64_t, +) + import numpy as np import pandas.io.sas.sas_constants as const -ctypedef signed long long int64_t -ctypedef unsigned char uint8_t -ctypedef unsigned short uint16_t # rle_decompress decompresses data using a Run Length Encoding # algorithm. It is partially documented here: @@ -194,7 +199,7 @@ cdef enum ColumnTypes: column_type_string = 2 -# type the page_data types +# Const aliases assert len(const.page_meta_types) == 2 cdef: int page_meta_types_0 = const.page_meta_types[0] @@ -203,6 +208,53 @@ cdef: int page_data_type = const.page_data_type int subheader_pointers_offset = const.subheader_pointers_offset + # Copy of subheader_signature_to_index that allows for much faster lookups. + # Lookups are done in get_subheader_index. The C structures are initialized + # in _init_subheader_signatures(). + uint32_t subheader_signatures_32bit[13] + int subheader_indices_32bit[13] + uint64_t subheader_signatures_64bit[17] + int subheader_indices_64bit[17] + int data_subheader_index = const.SASIndex.data_subheader_index + + +def _init_subheader_signatures(): + subheaders_32bit = [(sig, idx) for sig, idx in const.subheader_signature_to_index.items() if len(sig) == 4] + subheaders_64bit = [(sig, idx) for sig, idx in const.subheader_signature_to_index.items() if len(sig) == 8] + assert len(subheaders_32bit) == 13 + assert len(subheaders_64bit) == 17 + assert len(const.subheader_signature_to_index) == 13 + 17 + for i, (signature, idx) in enumerate(subheaders_32bit): + subheader_signatures_32bit[i] = (signature)[0] + subheader_indices_32bit[i] = idx + for i, (signature, idx) in enumerate(subheaders_64bit): + subheader_signatures_64bit[i] = (signature)[0] + subheader_indices_64bit[i] = idx + + +_init_subheader_signatures() + + +def get_subheader_index(bytes signature): + """Fast version of 'subheader_signature_to_index.get(signature)'.""" + cdef: + uint32_t sig32 + uint64_t sig64 + size_t i + assert len(signature) in (4, 8) + if len(signature) == 4: + sig32 = (signature)[0] + for i in range(len(subheader_signatures_32bit)): + if subheader_signatures_32bit[i] == sig32: + return subheader_indices_32bit[i] + else: + sig64 = (signature)[0] + for i in range(len(subheader_signatures_64bit)): + if subheader_signatures_64bit[i] == sig64: + return subheader_indices_64bit[i] + + return data_subheader_index + cdef class Parser: @@ -314,7 +366,7 @@ cdef class Parser: cdef bint readline(self) except? True: cdef: - int offset, bit_offset, align_correction + int offset, length, bit_offset, align_correction int subheader_pointer_length, mn bint done, flag @@ -338,12 +390,10 @@ cdef class Parser: if done: return True continue - current_subheader_pointer = ( - self.parser._current_page_data_subheader_pointers[ - self.current_row_on_page_index]) - self.process_byte_array_with_data( - current_subheader_pointer.offset, - current_subheader_pointer.length) + offset, length = self.parser._current_page_data_subheader_pointers[ + self.current_row_on_page_index + ] + self.process_byte_array_with_data(offset, length) return False elif self.current_page_type == page_mix_type: align_correction = ( diff --git a/pandas/io/sas/sas7bdat.py b/pandas/io/sas/sas7bdat.py index 7282affe1b5e6..9d2285b992150 100644 --- a/pandas/io/sas/sas7bdat.py +++ b/pandas/io/sas/sas7bdat.py @@ -42,7 +42,10 @@ ) from pandas.io.common import get_handle -from pandas.io.sas._sas import Parser +from pandas.io.sas._sas import ( + Parser, + get_subheader_index, +) import pandas.io.sas.sas_constants as const from pandas.io.sas.sasreader import ReaderBase @@ -87,19 +90,6 @@ def _convert_datetimes(sas_datetimes: pd.Series, unit: str) -> pd.Series: return s_series -class _SubheaderPointer: - offset: int - length: int - compression: int - ptype: int - - def __init__(self, offset: int, length: int, compression: int, ptype: int) -> None: - self.offset = offset - self.length = length - self.compression = compression - self.ptype = ptype - - class _Column: col_id: int name: str | bytes @@ -187,7 +177,7 @@ def __init__( self.column_formats: list[str | bytes] = [] self.columns: list[_Column] = [] - self._current_page_data_subheader_pointers: list[_SubheaderPointer] = [] + self._current_page_data_subheader_pointers: list[tuple[int, int]] = [] self._cached_page = None self._column_data_lengths: list[int] = [] self._column_data_offsets: list[int] = [] @@ -203,6 +193,19 @@ def __init__( self._path_or_buf = self.handles.handle + # Same order as const.SASIndex + self._subheader_processors = [ + self._process_rowsize_subheader, + self._process_columnsize_subheader, + self._process_subheader_counts, + self._process_columntext_subheader, + self._process_columnname_subheader, + self._process_columnattributes_subheader, + self._process_format_subheader, + self._process_columnlist_subheader, + None, # Data + ] + try: self._get_properties() self._parse_metadata() @@ -422,89 +425,47 @@ def _process_page_metadata(self) -> None: bit_offset = self._page_bit_offset for i in range(self._current_page_subheaders_count): - pointer = self._process_subheader_pointers( - const.subheader_pointers_offset + bit_offset, i - ) - if pointer.length == 0: - continue - if pointer.compression == const.truncated_subheader_id: - continue - subheader_signature = self._read_subheader_signature(pointer.offset) - subheader_index = self._get_subheader_index( - subheader_signature, pointer.compression, pointer.ptype - ) - self._process_subheader(subheader_index, pointer) - - def _get_subheader_index(self, signature: bytes, compression, ptype) -> int: - # TODO: return here could be made an enum - index = const.subheader_signature_to_index.get(signature) - if index is None: - f1 = (compression == const.compressed_subheader_id) or (compression == 0) - f2 = ptype == const.compressed_subheader_type - if (self.compression != b"") and f1 and f2: - index = const.SASIndex.data_subheader_index - else: - self.close() - raise ValueError("Unknown subheader signature") - return index - - def _process_subheader_pointers( - self, offset: int, subheader_pointer_index: int - ) -> _SubheaderPointer: - - subheader_pointer_length = self._subheader_pointer_length - total_offset = offset + subheader_pointer_length * subheader_pointer_index + offset = const.subheader_pointers_offset + bit_offset + total_offset = offset + self._subheader_pointer_length * i - subheader_offset = self._read_int(total_offset, self._int_length) - total_offset += self._int_length + subheader_offset = self._read_int(total_offset, self._int_length) + total_offset += self._int_length - subheader_length = self._read_int(total_offset, self._int_length) - total_offset += self._int_length + subheader_length = self._read_int(total_offset, self._int_length) + total_offset += self._int_length - subheader_compression = self._read_int(total_offset, 1) - total_offset += 1 - - subheader_type = self._read_int(total_offset, 1) - - x = _SubheaderPointer( - subheader_offset, subheader_length, subheader_compression, subheader_type - ) + subheader_compression = self._read_int(total_offset, 1) + total_offset += 1 - return x + subheader_type = self._read_int(total_offset, 1) - def _read_subheader_signature(self, offset: int) -> bytes: - subheader_signature = self._read_bytes(offset, self._int_length) - return subheader_signature - - def _process_subheader( - self, subheader_index: int, pointer: _SubheaderPointer - ) -> None: - offset = pointer.offset - length = pointer.length - - if subheader_index == const.SASIndex.row_size_index: - processor = self._process_rowsize_subheader - elif subheader_index == const.SASIndex.column_size_index: - processor = self._process_columnsize_subheader - elif subheader_index == const.SASIndex.column_text_index: - processor = self._process_columntext_subheader - elif subheader_index == const.SASIndex.column_name_index: - processor = self._process_columnname_subheader - elif subheader_index == const.SASIndex.column_attributes_index: - processor = self._process_columnattributes_subheader - elif subheader_index == const.SASIndex.format_and_label_index: - processor = self._process_format_subheader - elif subheader_index == const.SASIndex.column_list_index: - processor = self._process_columnlist_subheader - elif subheader_index == const.SASIndex.subheader_counts_index: - processor = self._process_subheader_counts - elif subheader_index == const.SASIndex.data_subheader_index: - self._current_page_data_subheader_pointers.append(pointer) - return - else: - raise ValueError("unknown subheader index") + if ( + subheader_length == 0 + or subheader_compression == const.truncated_subheader_id + ): + continue - processor(offset, length) + subheader_signature = self._read_bytes(subheader_offset, self._int_length) + subheader_index = get_subheader_index(subheader_signature) + subheader_processor = self._subheader_processors[subheader_index] + + if subheader_processor is None: + f1 = ( + subheader_compression == const.compressed_subheader_id + or subheader_compression == 0 + ) + f2 = subheader_type == const.compressed_subheader_type + if self.compression and f1 and f2: + self._current_page_data_subheader_pointers.append( + (subheader_offset, subheader_length) + ) + else: + self.close() + raise ValueError( + f"Unknown subheader signature {subheader_signature}" + ) + else: + subheader_processor(subheader_offset, subheader_length) def _process_rowsize_subheader(self, offset: int, length: int) -> None: @@ -519,10 +480,12 @@ def _process_rowsize_subheader(self, offset: int, length: int) -> None: lcp_offset += 378 self.row_length = self._read_int( - offset + const.row_length_offset_multiplier * int_len, int_len + offset + const.row_length_offset_multiplier * int_len, + int_len, ) self.row_count = self._read_int( - offset + const.row_count_offset_multiplier * int_len, int_len + offset + const.row_count_offset_multiplier * int_len, + int_len, ) self.col_count_p1 = self._read_int( offset + const.col_count_p1_multiplier * int_len, int_len From 8350d046469bdc5b14bfe4522e32fb655e75256f Mon Sep 17 00:00:00 2001 From: Jonas Haag Date: Sun, 10 Jul 2022 22:22:49 +0200 Subject: [PATCH 2/4] Fix ssize_t type --- pandas/io/sas/sas.pyx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pandas/io/sas/sas.pyx b/pandas/io/sas/sas.pyx index 1177e4223f775..f3128780b33bc 100644 --- a/pandas/io/sas/sas.pyx +++ b/pandas/io/sas/sas.pyx @@ -240,7 +240,7 @@ def get_subheader_index(bytes signature): cdef: uint32_t sig32 uint64_t sig64 - size_t i + Py_ssize_t i assert len(signature) in (4, 8) if len(signature) == 4: sig32 = (signature)[0] From dbaf39b97decaf72f25b8372e8a5930bfbb65a56 Mon Sep 17 00:00:00 2001 From: Jonas Haag Date: Mon, 11 Jul 2022 08:49:42 +0200 Subject: [PATCH 3/4] Update _sas.pyi --- pandas/io/sas/_sas.pyi | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pandas/io/sas/_sas.pyi b/pandas/io/sas/_sas.pyi index 527193dd71e57..5d65e2b56b591 100644 --- a/pandas/io/sas/_sas.pyi +++ b/pandas/io/sas/_sas.pyi @@ -3,3 +3,5 @@ from pandas.io.sas.sas7bdat import SAS7BDATReader class Parser: def __init__(self, parser: SAS7BDATReader) -> None: ... def read(self, nrows: int) -> None: ... + +def get_subheader_index(signature: bytes) -> int: ... From a14f3b0cb33779b204f5b5c1db3d83cb1ba30059 Mon Sep 17 00:00:00 2001 From: Jonas Haag Date: Tue, 4 Oct 2022 11:55:43 +0200 Subject: [PATCH 4/4] Lint --- pandas/io/sas/sas.pyx | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pandas/io/sas/sas.pyx b/pandas/io/sas/sas.pyx index 926013ddd83bb..9406900b69998 100644 --- a/pandas/io/sas/sas.pyx +++ b/pandas/io/sas/sas.pyx @@ -4,10 +4,10 @@ from cython cimport Py_ssize_t from libc.stddef cimport size_t from libc.stdint cimport ( int64_t, + uint8_t, uint16_t, uint32_t, uint64_t, - uint8_t, ) from libc.stdlib cimport ( calloc, @@ -18,6 +18,7 @@ import numpy as np import pandas.io.sas.sas_constants as const + cdef object np_nan = np.nan