Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SAS7BDAT parser: Improve subheader lookup performance #47656

Merged
merged 5 commits into from
Oct 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pandas/io/sas/_sas.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,5 @@ from pandas.io.sas.sas7bdat import SAS7BDATReader
class Parser:
def __init__(self, parser: SAS7BDATReader) -> None: ...
def read(self, nrows: int) -> None: ...

def get_subheader_index(signature: bytes) -> int: ...
69 changes: 58 additions & 11 deletions pandas/io/sas/sas.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ from libc.stdint cimport (
int64_t,
uint8_t,
uint16_t,
uint32_t,
uint64_t,
)
from libc.stdlib cimport (
calloc,
Expand All @@ -17,6 +19,9 @@ import numpy as np
import pandas.io.sas.sas_constants as const


cdef object np_nan = np.nan


cdef struct Buffer:
# Convenience wrapper for uint8_t data to allow fast and safe reads and writes.
# We use this as a replacement for np.array(..., dtype=np.uint8) because it's
Expand Down Expand Up @@ -53,9 +58,6 @@ cdef inline buf_free(Buffer buf):
if buf.data != NULL:
free(buf.data)


cdef object np_nan = np.nan

# rle_decompress decompresses data using a Run Length Encoding
# algorithm. It is partially documented here:
#
Expand Down Expand Up @@ -231,7 +233,7 @@ cdef enum ColumnTypes:
column_type_string = 2


# type the page_data types
# Const aliases
assert len(const.page_meta_types) == 2
cdef:
int page_meta_types_0 = const.page_meta_types[0]
Expand All @@ -240,6 +242,53 @@ cdef:
int page_data_type = const.page_data_type
int subheader_pointers_offset = const.subheader_pointers_offset

# Copy of subheader_signature_to_index that allows for much faster lookups.
# Lookups are done in get_subheader_index. The C structures are initialized
# in _init_subheader_signatures().
uint32_t subheader_signatures_32bit[13]
int subheader_indices_32bit[13]
uint64_t subheader_signatures_64bit[17]
int subheader_indices_64bit[17]
int data_subheader_index = const.SASIndex.data_subheader_index


def _init_subheader_signatures():
subheaders_32bit = [(sig, idx) for sig, idx in const.subheader_signature_to_index.items() if len(sig) == 4]
subheaders_64bit = [(sig, idx) for sig, idx in const.subheader_signature_to_index.items() if len(sig) == 8]
assert len(subheaders_32bit) == 13
assert len(subheaders_64bit) == 17
assert len(const.subheader_signature_to_index) == 13 + 17
for i, (signature, idx) in enumerate(subheaders_32bit):
subheader_signatures_32bit[i] = (<uint32_t *><char *>signature)[0]
subheader_indices_32bit[i] = idx
for i, (signature, idx) in enumerate(subheaders_64bit):
subheader_signatures_64bit[i] = (<uint64_t *><char *>signature)[0]
subheader_indices_64bit[i] = idx


_init_subheader_signatures()


def get_subheader_index(bytes signature):
"""Fast version of 'subheader_signature_to_index.get(signature)'."""
cdef:
uint32_t sig32
uint64_t sig64
Py_ssize_t i
assert len(signature) in (4, 8)
if len(signature) == 4:
sig32 = (<uint32_t *><char *>signature)[0]
for i in range(len(subheader_signatures_32bit)):
if subheader_signatures_32bit[i] == sig32:
return subheader_indices_32bit[i]
else:
sig64 = (<uint64_t *><char *>signature)[0]
for i in range(len(subheader_signatures_64bit)):
if subheader_signatures_64bit[i] == sig64:
return subheader_indices_64bit[i]

return data_subheader_index


cdef class Parser:

Expand Down Expand Up @@ -355,7 +404,7 @@ cdef class Parser:
cdef bint readline(self) except? True:

cdef:
int offset, bit_offset, align_correction
int offset, length, bit_offset, align_correction
int subheader_pointer_length, mn
bint done, flag

Expand All @@ -379,12 +428,10 @@ cdef class Parser:
if done:
return True
continue
current_subheader_pointer = (
self.parser._current_page_data_subheader_pointers[
self.current_row_on_page_index])
self.process_byte_array_with_data(
current_subheader_pointer.offset,
current_subheader_pointer.length)
offset, length = self.parser._current_page_data_subheader_pointers[
self.current_row_on_page_index
]
self.process_byte_array_with_data(offset, length)
return False
elif self.current_page_type == page_mix_type:
align_correction = (
Expand Down
151 changes: 57 additions & 94 deletions pandas/io/sas/sas7bdat.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@
)

from pandas.io.common import get_handle
from pandas.io.sas._sas import Parser
from pandas.io.sas._sas import (
Parser,
get_subheader_index,
)
import pandas.io.sas.sas_constants as const
from pandas.io.sas.sasreader import ReaderBase

Expand Down Expand Up @@ -87,19 +90,6 @@ def _convert_datetimes(sas_datetimes: pd.Series, unit: str) -> pd.Series:
return s_series


class _SubheaderPointer:
offset: int
length: int
compression: int
ptype: int

def __init__(self, offset: int, length: int, compression: int, ptype: int) -> None:
self.offset = offset
self.length = length
self.compression = compression
self.ptype = ptype


class _Column:
col_id: int
name: str | bytes
Expand Down Expand Up @@ -189,7 +179,7 @@ def __init__(
self.column_formats: list[str | bytes] = []
self.columns: list[_Column] = []

self._current_page_data_subheader_pointers: list[_SubheaderPointer] = []
self._current_page_data_subheader_pointers: list[tuple[int, int]] = []
self._cached_page = None
self._column_data_lengths: list[int] = []
self._column_data_offsets: list[int] = []
Expand All @@ -205,6 +195,19 @@ def __init__(

self._path_or_buf = self.handles.handle

# Same order as const.SASIndex
self._subheader_processors = [
self._process_rowsize_subheader,
self._process_columnsize_subheader,
self._process_subheader_counts,
self._process_columntext_subheader,
self._process_columnname_subheader,
self._process_columnattributes_subheader,
self._process_format_subheader,
self._process_columnlist_subheader,
None, # Data
]

try:
self._get_properties()
self._parse_metadata()
Expand Down Expand Up @@ -426,89 +429,47 @@ def _process_page_metadata(self) -> None:
bit_offset = self._page_bit_offset

for i in range(self._current_page_subheaders_count):
pointer = self._process_subheader_pointers(
const.subheader_pointers_offset + bit_offset, i
)
if pointer.length == 0:
continue
if pointer.compression == const.truncated_subheader_id:
continue
subheader_signature = self._read_subheader_signature(pointer.offset)
subheader_index = self._get_subheader_index(
subheader_signature, pointer.compression, pointer.ptype
)
self._process_subheader(subheader_index, pointer)

def _get_subheader_index(self, signature: bytes, compression, ptype) -> int:
# TODO: return here could be made an enum
index = const.subheader_signature_to_index.get(signature)
if index is None:
f1 = (compression == const.compressed_subheader_id) or (compression == 0)
f2 = ptype == const.compressed_subheader_type
if (self.compression != b"") and f1 and f2:
index = const.SASIndex.data_subheader_index
else:
self.close()
raise ValueError("Unknown subheader signature")
return index

def _process_subheader_pointers(
self, offset: int, subheader_pointer_index: int
) -> _SubheaderPointer:

subheader_pointer_length = self._subheader_pointer_length
total_offset = offset + subheader_pointer_length * subheader_pointer_index
offset = const.subheader_pointers_offset + bit_offset
total_offset = offset + self._subheader_pointer_length * i

subheader_offset = self._read_int(total_offset, self._int_length)
total_offset += self._int_length
subheader_offset = self._read_int(total_offset, self._int_length)
total_offset += self._int_length

subheader_length = self._read_int(total_offset, self._int_length)
total_offset += self._int_length
subheader_length = self._read_int(total_offset, self._int_length)
total_offset += self._int_length

subheader_compression = self._read_int(total_offset, 1)
total_offset += 1

subheader_type = self._read_int(total_offset, 1)

x = _SubheaderPointer(
subheader_offset, subheader_length, subheader_compression, subheader_type
)
subheader_compression = self._read_int(total_offset, 1)
total_offset += 1

return x
subheader_type = self._read_int(total_offset, 1)

def _read_subheader_signature(self, offset: int) -> bytes:
subheader_signature = self._read_bytes(offset, self._int_length)
return subheader_signature

def _process_subheader(
self, subheader_index: int, pointer: _SubheaderPointer
) -> None:
offset = pointer.offset
length = pointer.length

if subheader_index == const.SASIndex.row_size_index:
processor = self._process_rowsize_subheader
elif subheader_index == const.SASIndex.column_size_index:
processor = self._process_columnsize_subheader
elif subheader_index == const.SASIndex.column_text_index:
processor = self._process_columntext_subheader
elif subheader_index == const.SASIndex.column_name_index:
processor = self._process_columnname_subheader
elif subheader_index == const.SASIndex.column_attributes_index:
processor = self._process_columnattributes_subheader
elif subheader_index == const.SASIndex.format_and_label_index:
processor = self._process_format_subheader
elif subheader_index == const.SASIndex.column_list_index:
processor = self._process_columnlist_subheader
elif subheader_index == const.SASIndex.subheader_counts_index:
processor = self._process_subheader_counts
elif subheader_index == const.SASIndex.data_subheader_index:
self._current_page_data_subheader_pointers.append(pointer)
return
else:
raise ValueError("unknown subheader index")
if (
subheader_length == 0
or subheader_compression == const.truncated_subheader_id
):
continue

processor(offset, length)
subheader_signature = self._read_bytes(subheader_offset, self._int_length)
subheader_index = get_subheader_index(subheader_signature)
subheader_processor = self._subheader_processors[subheader_index]

if subheader_processor is None:
f1 = (
subheader_compression == const.compressed_subheader_id
or subheader_compression == 0
)
f2 = subheader_type == const.compressed_subheader_type
if self.compression and f1 and f2:
self._current_page_data_subheader_pointers.append(
(subheader_offset, subheader_length)
)
else:
self.close()
raise ValueError(
f"Unknown subheader signature {subheader_signature}"
)
else:
subheader_processor(subheader_offset, subheader_length)

def _process_rowsize_subheader(self, offset: int, length: int) -> None:

Expand All @@ -523,10 +484,12 @@ def _process_rowsize_subheader(self, offset: int, length: int) -> None:
lcp_offset += 378

self.row_length = self._read_int(
offset + const.row_length_offset_multiplier * int_len, int_len
offset + const.row_length_offset_multiplier * int_len,
int_len,
)
self.row_count = self._read_int(
offset + const.row_count_offset_multiplier * int_len, int_len
offset + const.row_count_offset_multiplier * int_len,
int_len,
)
self.col_count_p1 = self._read_int(
offset + const.col_count_p1_multiplier * int_len, int_len
Expand Down