Skip to content

Commit

Permalink
SAS7BDAT parser: Improve subheader lookup performance (#47656)
Browse files Browse the repository at this point in the history
* SAS7BDAT parser: Improve subheader lookup performance

* Fix ssize_t type

* Update _sas.pyi

* Lint
  • Loading branch information
jonashaag authored Oct 4, 2022
1 parent 9f94480 commit 39fc318
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 105 deletions.
2 changes: 2 additions & 0 deletions pandas/io/sas/_sas.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,5 @@ from pandas.io.sas.sas7bdat import SAS7BDATReader
class Parser:
def __init__(self, parser: SAS7BDATReader) -> None: ...
def read(self, nrows: int) -> None: ...

def get_subheader_index(signature: bytes) -> int: ...
69 changes: 58 additions & 11 deletions pandas/io/sas/sas.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ from libc.stdint cimport (
int64_t,
uint8_t,
uint16_t,
uint32_t,
uint64_t,
)
from libc.stdlib cimport (
calloc,
Expand All @@ -17,6 +19,9 @@ import numpy as np
import pandas.io.sas.sas_constants as const


cdef object np_nan = np.nan


cdef struct Buffer:
# Convenience wrapper for uint8_t data to allow fast and safe reads and writes.
# We use this as a replacement for np.array(..., dtype=np.uint8) because it's
Expand Down Expand Up @@ -53,9 +58,6 @@ cdef inline buf_free(Buffer buf):
if buf.data != NULL:
free(buf.data)


cdef object np_nan = np.nan

# rle_decompress decompresses data using a Run Length Encoding
# algorithm. It is partially documented here:
#
Expand Down Expand Up @@ -231,7 +233,7 @@ cdef enum ColumnTypes:
column_type_string = 2


# type the page_data types
# Const aliases
assert len(const.page_meta_types) == 2
cdef:
int page_meta_types_0 = const.page_meta_types[0]
Expand All @@ -240,6 +242,53 @@ cdef:
int page_data_type = const.page_data_type
int subheader_pointers_offset = const.subheader_pointers_offset

# Copy of subheader_signature_to_index that allows for much faster lookups.
# Lookups are done in get_subheader_index. The C structures are initialized
# in _init_subheader_signatures().
uint32_t subheader_signatures_32bit[13]
int subheader_indices_32bit[13]
uint64_t subheader_signatures_64bit[17]
int subheader_indices_64bit[17]
int data_subheader_index = const.SASIndex.data_subheader_index


def _init_subheader_signatures():
subheaders_32bit = [(sig, idx) for sig, idx in const.subheader_signature_to_index.items() if len(sig) == 4]
subheaders_64bit = [(sig, idx) for sig, idx in const.subheader_signature_to_index.items() if len(sig) == 8]
assert len(subheaders_32bit) == 13
assert len(subheaders_64bit) == 17
assert len(const.subheader_signature_to_index) == 13 + 17
for i, (signature, idx) in enumerate(subheaders_32bit):
subheader_signatures_32bit[i] = (<uint32_t *><char *>signature)[0]
subheader_indices_32bit[i] = idx
for i, (signature, idx) in enumerate(subheaders_64bit):
subheader_signatures_64bit[i] = (<uint64_t *><char *>signature)[0]
subheader_indices_64bit[i] = idx


_init_subheader_signatures()


def get_subheader_index(bytes signature):
"""Fast version of 'subheader_signature_to_index.get(signature)'."""
cdef:
uint32_t sig32
uint64_t sig64
Py_ssize_t i
assert len(signature) in (4, 8)
if len(signature) == 4:
sig32 = (<uint32_t *><char *>signature)[0]
for i in range(len(subheader_signatures_32bit)):
if subheader_signatures_32bit[i] == sig32:
return subheader_indices_32bit[i]
else:
sig64 = (<uint64_t *><char *>signature)[0]
for i in range(len(subheader_signatures_64bit)):
if subheader_signatures_64bit[i] == sig64:
return subheader_indices_64bit[i]

return data_subheader_index


cdef class Parser:

Expand Down Expand Up @@ -355,7 +404,7 @@ cdef class Parser:
cdef bint readline(self) except? True:

cdef:
int offset, bit_offset, align_correction
int offset, length, bit_offset, align_correction
int subheader_pointer_length, mn
bint done, flag

Expand All @@ -379,12 +428,10 @@ cdef class Parser:
if done:
return True
continue
current_subheader_pointer = (
self.parser._current_page_data_subheader_pointers[
self.current_row_on_page_index])
self.process_byte_array_with_data(
current_subheader_pointer.offset,
current_subheader_pointer.length)
offset, length = self.parser._current_page_data_subheader_pointers[
self.current_row_on_page_index
]
self.process_byte_array_with_data(offset, length)
return False
elif self.current_page_type == page_mix_type:
align_correction = (
Expand Down
151 changes: 57 additions & 94 deletions pandas/io/sas/sas7bdat.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@
)

from pandas.io.common import get_handle
from pandas.io.sas._sas import Parser
from pandas.io.sas._sas import (
Parser,
get_subheader_index,
)
import pandas.io.sas.sas_constants as const
from pandas.io.sas.sasreader import ReaderBase

Expand Down Expand Up @@ -87,19 +90,6 @@ def _convert_datetimes(sas_datetimes: pd.Series, unit: str) -> pd.Series:
return s_series


class _SubheaderPointer:
offset: int
length: int
compression: int
ptype: int

def __init__(self, offset: int, length: int, compression: int, ptype: int) -> None:
self.offset = offset
self.length = length
self.compression = compression
self.ptype = ptype


class _Column:
col_id: int
name: str | bytes
Expand Down Expand Up @@ -189,7 +179,7 @@ def __init__(
self.column_formats: list[str | bytes] = []
self.columns: list[_Column] = []

self._current_page_data_subheader_pointers: list[_SubheaderPointer] = []
self._current_page_data_subheader_pointers: list[tuple[int, int]] = []
self._cached_page = None
self._column_data_lengths: list[int] = []
self._column_data_offsets: list[int] = []
Expand All @@ -205,6 +195,19 @@ def __init__(

self._path_or_buf = self.handles.handle

# Same order as const.SASIndex
self._subheader_processors = [
self._process_rowsize_subheader,
self._process_columnsize_subheader,
self._process_subheader_counts,
self._process_columntext_subheader,
self._process_columnname_subheader,
self._process_columnattributes_subheader,
self._process_format_subheader,
self._process_columnlist_subheader,
None, # Data
]

try:
self._get_properties()
self._parse_metadata()
Expand Down Expand Up @@ -426,89 +429,47 @@ def _process_page_metadata(self) -> None:
bit_offset = self._page_bit_offset

for i in range(self._current_page_subheaders_count):
pointer = self._process_subheader_pointers(
const.subheader_pointers_offset + bit_offset, i
)
if pointer.length == 0:
continue
if pointer.compression == const.truncated_subheader_id:
continue
subheader_signature = self._read_subheader_signature(pointer.offset)
subheader_index = self._get_subheader_index(
subheader_signature, pointer.compression, pointer.ptype
)
self._process_subheader(subheader_index, pointer)

def _get_subheader_index(self, signature: bytes, compression, ptype) -> int:
# TODO: return here could be made an enum
index = const.subheader_signature_to_index.get(signature)
if index is None:
f1 = (compression == const.compressed_subheader_id) or (compression == 0)
f2 = ptype == const.compressed_subheader_type
if (self.compression != b"") and f1 and f2:
index = const.SASIndex.data_subheader_index
else:
self.close()
raise ValueError("Unknown subheader signature")
return index

def _process_subheader_pointers(
self, offset: int, subheader_pointer_index: int
) -> _SubheaderPointer:

subheader_pointer_length = self._subheader_pointer_length
total_offset = offset + subheader_pointer_length * subheader_pointer_index
offset = const.subheader_pointers_offset + bit_offset
total_offset = offset + self._subheader_pointer_length * i

subheader_offset = self._read_int(total_offset, self._int_length)
total_offset += self._int_length
subheader_offset = self._read_int(total_offset, self._int_length)
total_offset += self._int_length

subheader_length = self._read_int(total_offset, self._int_length)
total_offset += self._int_length
subheader_length = self._read_int(total_offset, self._int_length)
total_offset += self._int_length

subheader_compression = self._read_int(total_offset, 1)
total_offset += 1

subheader_type = self._read_int(total_offset, 1)

x = _SubheaderPointer(
subheader_offset, subheader_length, subheader_compression, subheader_type
)
subheader_compression = self._read_int(total_offset, 1)
total_offset += 1

return x
subheader_type = self._read_int(total_offset, 1)

def _read_subheader_signature(self, offset: int) -> bytes:
subheader_signature = self._read_bytes(offset, self._int_length)
return subheader_signature

def _process_subheader(
self, subheader_index: int, pointer: _SubheaderPointer
) -> None:
offset = pointer.offset
length = pointer.length

if subheader_index == const.SASIndex.row_size_index:
processor = self._process_rowsize_subheader
elif subheader_index == const.SASIndex.column_size_index:
processor = self._process_columnsize_subheader
elif subheader_index == const.SASIndex.column_text_index:
processor = self._process_columntext_subheader
elif subheader_index == const.SASIndex.column_name_index:
processor = self._process_columnname_subheader
elif subheader_index == const.SASIndex.column_attributes_index:
processor = self._process_columnattributes_subheader
elif subheader_index == const.SASIndex.format_and_label_index:
processor = self._process_format_subheader
elif subheader_index == const.SASIndex.column_list_index:
processor = self._process_columnlist_subheader
elif subheader_index == const.SASIndex.subheader_counts_index:
processor = self._process_subheader_counts
elif subheader_index == const.SASIndex.data_subheader_index:
self._current_page_data_subheader_pointers.append(pointer)
return
else:
raise ValueError("unknown subheader index")
if (
subheader_length == 0
or subheader_compression == const.truncated_subheader_id
):
continue

processor(offset, length)
subheader_signature = self._read_bytes(subheader_offset, self._int_length)
subheader_index = get_subheader_index(subheader_signature)
subheader_processor = self._subheader_processors[subheader_index]

if subheader_processor is None:
f1 = (
subheader_compression == const.compressed_subheader_id
or subheader_compression == 0
)
f2 = subheader_type == const.compressed_subheader_type
if self.compression and f1 and f2:
self._current_page_data_subheader_pointers.append(
(subheader_offset, subheader_length)
)
else:
self.close()
raise ValueError(
f"Unknown subheader signature {subheader_signature}"
)
else:
subheader_processor(subheader_offset, subheader_length)

def _process_rowsize_subheader(self, offset: int, length: int) -> None:

Expand All @@ -523,10 +484,12 @@ def _process_rowsize_subheader(self, offset: int, length: int) -> None:
lcp_offset += 378

self.row_length = self._read_int(
offset + const.row_length_offset_multiplier * int_len, int_len
offset + const.row_length_offset_multiplier * int_len,
int_len,
)
self.row_count = self._read_int(
offset + const.row_count_offset_multiplier * int_len, int_len
offset + const.row_count_offset_multiplier * int_len,
int_len,
)
self.col_count_p1 = self._read_int(
offset + const.col_count_p1_multiplier * int_len, int_len
Expand Down

0 comments on commit 39fc318

Please sign in to comment.