diff --git a/CHANGES.md b/CHANGES.md index 436ef3f2de2ab..885be2c4747fc 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,6 +1,10 @@ ## Changelog -### 1.71 (2018-11-05) +### 1.72 + * Feature: #577 Added implementation for incremental serializer for numpy records + * Bugfix: #648 Fix issue with Timezone aware Pandas types, which don't contain hasobject attribute + +### 1.71 (2018-11-05) * Bugfix: #645 Fix write errors for Pandas DataFrame that has mixed object/string types in multi-index column ### 1.70 (2018-10-30) diff --git a/arctic/_util.py b/arctic/_util.py index e8df9f14f9030..775604285e65b 100644 --- a/arctic/_util.py +++ b/arctic/_util.py @@ -1,11 +1,16 @@ -from pandas import DataFrame -from pandas.util.testing import assert_frame_equal import logging + +import numpy as np import pymongo +from pandas import DataFrame +from pandas.util.testing import assert_frame_equal logger = logging.getLogger(__name__) +MAX_DOCUMENT_SIZE = int(pymongo.common.MAX_BSON_SIZE * 0.8) +NP_OBJECT_DTYPE = np.dtype('O') + # Avoid import-time extra logic _use_new_count_api = None diff --git a/arctic/exceptions.py b/arctic/exceptions.py index 0afc89cf2eb5c..0105d37c0f02c 100644 --- a/arctic/exceptions.py +++ b/arctic/exceptions.py @@ -40,6 +40,8 @@ class DataIntegrityException(ArcticException): """ pass +class ArcticSerializationException(ArcticException): + pass class ConcurrentModificationException(DataIntegrityException): pass diff --git a/arctic/serialization/incremental.py b/arctic/serialization/incremental.py new file mode 100644 index 0000000000000..1e5738c30474f --- /dev/null +++ b/arctic/serialization/incremental.py @@ -0,0 +1,230 @@ +import abc +import hashlib +import logging +import os +from threading import RLock + +import numpy as np +import pandas as pd +from bson import Binary + +from arctic.serialization.numpy_records import PandasSerializer +from .._compression import compress +from ..exceptions import ArcticSerializationException +from .._util import MAX_DOCUMENT_SIZE, NP_OBJECT_DTYPE + +ARCTIC_AUTO_EXPAND_CHUNK_SIZE = bool(os.environ.get('ARCTIC_AUTO_EXPAND_CHUNK_SIZE')) + +ABC = abc.ABCMeta('ABC', (object,), {}) + +log = logging.getLogger(__name__) + + +def incremental_checksum(item, curr_sha=None, is_bytes=False): + curr_sha = hashlib.sha1() if curr_sha is None else curr_sha + curr_sha.update(item if is_bytes else item.tostring()) + return curr_sha + + +class LazyIncrementalSerializer(ABC): + def __init__(self, serializer, input_data, chunk_size): + if chunk_size < 1: + raise ArcticSerializationException("LazyIncrementalSerializer can't be initialized " + "with chunk_size < 1 ({})".format(chunk_size)) + if not serializer: + raise ArcticSerializationException("LazyIncrementalSerializer can't be initialized " + "with a None serializer object") + self.input_data = input_data + self.chunk_size = chunk_size + self._serializer = serializer + self._initialized = False + self._checksum = None + + @abc.abstractmethod + def __len__(self): + pass + + @abc.abstractproperty + def generator(self): + pass + + @abc.abstractproperty + def generator_bytes(self): + pass + + @abc.abstractproperty + def serialize(self): + pass + + +class IncrementalPandasToRecArraySerializer(LazyIncrementalSerializer): + def __init__(self, serializer, input_data, chunk_size, string_max_len=None): + super(IncrementalPandasToRecArraySerializer, self).__init__(serializer, input_data, chunk_size) + if not isinstance(serializer, PandasSerializer): + raise ArcticSerializationException("IncrementalPandasToRecArraySerializer requires a serializer of " + "type PandasSerializer.") + if not isinstance(input_data, (pd.DataFrame, pd.Series)): + raise ArcticSerializationException("IncrementalPandasToRecArraySerializer requires a pandas DataFrame or " + "Series as data source input.") + if string_max_len and string_max_len < 1: + raise ArcticSerializationException("IncrementalPandasToRecArraySerializer can't be initialized " + "with string_max_len < 1 ({})".format(string_max_len)) + self.string_max_len = string_max_len + # The state which needs to be lazily initialized + self._dtype = None + self._shape = None + self._rows_per_chunk = 0 + self._total_chunks = 0 + self._has_string_object = False + self._lock = RLock() + + def _dtype_convert_to_max_len_string(self, input_ndtype, fname): + if input_ndtype.type not in (np.string_, np.unicode_): + return input_ndtype, False + type_sym = 'S' if input_ndtype.type == np.string_ else 'U' + max_str_len = len(max(self.input_data[fname].astype(type_sym), key=len)) + str_field_dtype = np.dtype('{}{:d}'.format(type_sym, max_str_len)) if max_str_len > 0 else input_ndtype + return str_field_dtype, True + + def _get_dtype(self): + # Serializer is being called only if can_convert_to_records_without_objects() has passed, + # which means that the resulting recarray does not contain objects but only numpy types, string, or unicode + + # Serialize the first row to obtain info about row size in bytes (cache first few rows only) + # Also raise an Exception early, if data are not serializable + first_chunk, serialized_dtypes = self._serializer.serialize( + self.input_data[0:10] if len(self) > 0 else self.input_data, + string_max_len=self.string_max_len) + + # This is the common case, where first row's dtype represents well the whole dataframe's dtype + if serialized_dtypes is None or \ + len(self.input_data) == 0 or \ + NP_OBJECT_DTYPE not in self.input_data.dtypes.values: + return first_chunk, serialized_dtypes, False + + # Reaching here means we have at least one column of type object + # To correctly serialize incrementally, we need to know the final dtype (type and fixed length), + # using length-conversion information from all values of the object columns + + dtype_arr = [] + has_string_object = False + for field_name in serialized_dtypes.names: # include all column names, along with the expanded multi-index + field_dtype = serialized_dtypes[field_name] + if field_name not in self.input_data or self.input_data.dtypes[field_name] is NP_OBJECT_DTYPE: + # Note: .hasobject breaks for timezone-aware datetime64 pandas columns, so compare with dtype('O') + # if column is an expanded multi index or doesn't contain objects, the serialized 1st row dtype is safe + field_dtype, with_str_object = self._dtype_convert_to_max_len_string(field_dtype, field_name) + has_string_object |= with_str_object + dtype_arr.append((field_name, field_dtype)) + return first_chunk, np.dtype(dtype_arr), has_string_object + + def _lazy_init(self): + if self._initialized: + return + + with self._lock: + if self._initialized: # intentional double check here + return + # Get the dtype of the serialized array (takes into account object types, converted to fixed length strings) + first_chunk, dtype, has_string_object = self._get_dtype() + + # Compute the number of rows which can fit in a chunk + rows_per_chunk = 0 + if len(self) > 0 and self.chunk_size > 1: + rows_per_chunk = IncrementalPandasToRecArraySerializer._calculate_rows_per_chunk(self.chunk_size, first_chunk) + + # Initialize object's state + self._dtype = dtype + shp = list(first_chunk.shape) + shp[0] = len(self) + self._shape = tuple(shp) + self._has_string_object = has_string_object + self._rows_per_chunk = rows_per_chunk + self._total_chunks = int(np.ceil(float(len(self)) / self._rows_per_chunk)) if rows_per_chunk > 0 else 0 + self._initialized = True + + @staticmethod + def _calculate_rows_per_chunk(max_chunk_size, chunk): + sze = int(chunk.dtype.itemsize * np.prod(chunk.shape[1:])) + sze = sze if sze < max_chunk_size else max_chunk_size + rows_per_chunk = int(max_chunk_size / sze) + if rows_per_chunk < 1 and ARCTIC_AUTO_EXPAND_CHUNK_SIZE: + # If a row size is larger than chunk_size, use the maximum document size + logging.warning('Chunk size of {} is too small to fit a row ({}). ' + 'Using maximum document size.'.format(max_chunk_size, MAX_DOCUMENT_SIZE)) + # For huge rows, fall-back to using a very large document size, less than max-allowed by MongoDB + rows_per_chunk = int(MAX_DOCUMENT_SIZE / sze) + if rows_per_chunk < 1: + raise ArcticSerializationException("Serialization failed to split data into max sized chunks.") + return rows_per_chunk + + def __len__(self): + return len(self.input_data) + + @property + def shape(self): + self._lazy_init() + return self._shape + + @property + def dtype(self): + self._lazy_init() + return self._dtype + + @property + def rows_per_chunk(self): + self._lazy_init() + return self._rows_per_chunk + + def checksum(self, from_idx, to_idx): + if self._checksum is None: + self._lazy_init() + total_sha = None + for chunk_bytes, dtype in self.generator_bytes(from_idx=from_idx, to_idx=to_idx): + # TODO: what about compress_array here in batches? + compressed_chunk = compress(chunk_bytes) + total_sha = incremental_checksum(compressed_chunk, curr_sha=total_sha, is_bytes=True) + self._checksum = Binary(total_sha.digest()) + return self._checksum + + def generator(self, from_idx=None, to_idx=None): + return self._generator(from_idx=from_idx, to_idx=to_idx) + + def generator_bytes(self, from_idx=None, to_idx=None): + return self._generator(from_idx=from_idx, to_idx=to_idx, get_bytes=True) + + def _generator(self, from_idx, to_idx, get_bytes=False): + # Note that the range is: [from_idx, to_idx) + self._lazy_init() + + my_lenth = len(self) + + # Take into account default arguments and negative indexing (from end offset) + from_idx = 0 if from_idx is None else from_idx + if from_idx < 0: + from_idx = my_lenth + from_idx + to_idx = my_lenth if to_idx is None else min(to_idx, my_lenth) + if to_idx < 0: + to_idx = my_lenth + to_idx + + # No data, finish iteration + if my_lenth == 0 or from_idx >= my_lenth or from_idx >= to_idx: + return + + # Perform serialization for each chunk + while from_idx < to_idx: + curr_stop = min(from_idx+self._rows_per_chunk, to_idx) + + chunk, _ = self._serializer.serialize( + self.input_data[from_idx: curr_stop], + string_max_len=self.string_max_len, + forced_dtype=self.dtype if self._has_string_object else None) + + # Let the gc collect the intermediate serialized chunk as early as possible + chunk = chunk.tostring() if chunk is not None and get_bytes else chunk + + yield chunk, self.dtype, from_idx, curr_stop + from_idx = curr_stop + + def serialize(self): + return self._serializer.serialize(self.input_data, self.string_max_len) diff --git a/arctic/serialization/numpy_records.py b/arctic/serialization/numpy_records.py index 9af4b5732ee38..02b322497f02d 100644 --- a/arctic/serialization/numpy_records.py +++ b/arctic/serialization/numpy_records.py @@ -4,6 +4,7 @@ import numpy as np from pandas import DataFrame, MultiIndex, Series, DatetimeIndex, Index from ..exceptions import ArcticException +from .._util import NP_OBJECT_DTYPE try: # 0.21+ Compatibility from pandas._libs.tslib import Timestamp from pandas._libs.tslibs.timezones import get_timezone @@ -27,19 +28,21 @@ def set_fast_check_df_serializable(config): _FAST_CHECK_DF_SERIALIZABLE = bool(config) -def _to_primitive(arr, string_max_len=None): +def _to_primitive(arr, string_max_len=None, forced_dtype=None): if arr.dtype.hasobject: if len(arr) > 0 and isinstance(arr[0], Timestamp): return np.array([t.value for t in arr], dtype=DTN64_DTYPE) - if string_max_len: - str_array = np.array(arr.astype('U{:d}'.format(string_max_len))) + if forced_dtype is not None: + casted_arr = arr.astype(dtype=forced_dtype, copy=False) + elif string_max_len is not None: + casted_arr = np.array(arr.astype('U{:d}'.format(string_max_len))) else: - str_array = np.array(list(arr)) + casted_arr = np.array(list(arr)) # Pick any unwanted data conversions (e.g. np.NaN to 'nan') - if np.array_equal(arr, str_array): - return str_array + if np.array_equal(arr, casted_arr): + return casted_arr return arr @@ -48,7 +51,7 @@ def _multi_index_to_records(index, empty_index): if not empty_index: ix_vals = list(map(np.array, [index.get_level_values(i) for i in range(index.nlevels)])) else: - # empty multi index has no size, create empty arrays for recarry.. + # empty multi index has no size, create empty arrays for recarry. ix_vals = [np.array([]) for n in index.names] index_names = list(index.names) count = 0 @@ -110,7 +113,7 @@ def _index_from_records(self, recarr): rtn = MultiIndex.from_arrays(level_arrays, names=index) return rtn - def _to_records(self, df, string_max_len=None): + def _to_records(self, df, string_max_len=None, forced_dtype=None): """ Similar to DataFrame.to_records() Differences: @@ -134,11 +137,16 @@ def _to_records(self, df, string_max_len=None): names = index_names + columns arrays = [] - for arr in ix_vals + column_vals: - arrays.append(_to_primitive(arr, string_max_len)) - - dtype = np.dtype([(str(x), v.dtype) if len(v.shape) == 1 else (str(x), v.dtype, v.shape[1]) for x, v in zip(names, arrays)], - metadata=metadata) + for arr, name in zip(ix_vals + column_vals, index_names + columns): + arrays.append(_to_primitive(arr, string_max_len, + forced_dtype=None if forced_dtype is None else forced_dtype[name])) + + if forced_dtype is None: + dtype = np.dtype([(str(x), v.dtype) if len(v.shape) == 1 else (str(x), v.dtype, v.shape[1]) + for x, v in zip(names, arrays)], + metadata=metadata) + else: + dtype = forced_dtype # The argument names is ignored when dtype is passed rtn = np.rec.fromarrays(arrays, dtype=dtype, names=names) @@ -166,8 +174,8 @@ def fast_check_serializable(self, df): mappings, and empty dict otherwise. """ i_dtype, f_dtypes = df.index.dtype, df.dtypes - index_has_object = df.index.dtype.hasobject - fields_with_object = [f for f in df.columns if f_dtypes[f] is np.dtype('O')] + index_has_object = df.index.dtype is NP_OBJECT_DTYPE + fields_with_object = [f for f in df.columns if f_dtypes[f] is NP_OBJECT_DTYPE] if df.empty or (not index_has_object and not fields_with_object): arr, _ = self._to_records(df.iloc[:10]) # only first few rows for performance return arr, {} @@ -202,7 +210,7 @@ def can_convert_to_records_without_objects(self, df, symbol): else: return True - def serialize(self, item): + def serialize(self, item, string_max_len=None, forced_dtype=None): raise NotImplementedError def deserialize(self, item): @@ -224,8 +232,8 @@ def deserialize(self, item): name = item.dtype.names[-1] return Series.from_array(item[name], index=index, name=name) - def serialize(self, item, string_max_len=None): - return self._to_records(item, string_max_len) + def serialize(self, item, string_max_len=None, forced_dtype=None): + return self._to_records(item, string_max_len, forced_dtype) class DataFrameSerializer(PandasSerializer): @@ -267,5 +275,5 @@ def deserialize(self, item): return df - def serialize(self, item, string_max_len=None): - return self._to_records(item, string_max_len) + def serialize(self, item, string_max_len=None, forced_dtype=None): + return self._to_records(item, string_max_len, forced_dtype) diff --git a/arctic/store/_pandas_ndarray_store.py b/arctic/store/_pandas_ndarray_store.py index ccb054aa63fb1..9f03a8a132226 100644 --- a/arctic/store/_pandas_ndarray_store.py +++ b/arctic/store/_pandas_ndarray_store.py @@ -1,6 +1,7 @@ import ast import logging +from arctic._util import NP_OBJECT_DTYPE from bson.binary import Binary from pandas import DataFrame, Series, Panel import numpy as np @@ -153,7 +154,7 @@ def can_write_type(data): def can_write(self, version, symbol, data): if self.can_write_type(data): # Series has always a single-column - if data.dtype.hasobject or data.index.dtype.hasobject: + if data.dtype is NP_OBJECT_DTYPE or data.index.dtype is NP_OBJECT_DTYPE: return self.SERIALIZER.can_convert_to_records_without_objects(data, symbol) return True return False @@ -184,7 +185,7 @@ def can_write_type(data): def can_write(self, version, symbol, data): if self.can_write_type(data): - if np.any(data.dtypes.values == 'object') or data.index.dtype.hasobject: + if NP_OBJECT_DTYPE in data.dtypes.values or data.index.dtype is NP_OBJECT_DTYPE: return self.SERIALIZER.can_convert_to_records_without_objects(data, symbol) return True return False @@ -215,7 +216,7 @@ def can_write_type(data): def can_write(self, version, symbol, data): if self.can_write_type(data): frame = data.to_frame(filter_observations=False) - if np.any(frame.dtypes.values == 'object') or data.index.dtype.hasobject: + if NP_OBJECT_DTYPE in frame.dtypes.values or data.index.dtype is NP_OBJECT_DTYPE: return self.SERIALIZER.can_convert_to_records_without_objects(frame, symbol) return True return False diff --git a/tests/integration/store/test_version_store.py b/tests/integration/store/test_version_store.py index cb0c45220b697..817283a15fb83 100644 --- a/tests/integration/store/test_version_store.py +++ b/tests/integration/store/test_version_store.py @@ -3,7 +3,7 @@ import struct from datetime import datetime as dt, timedelta as dtd import pandas as pd -from arctic import VERSION_STORE +from arctic import VERSION_STORE, PandasDataFrameStore, PandasSeriesStore from pandas.util.testing import assert_frame_equal, assert_series_equal from pymongo.errors import OperationFailure from pymongo.server_type import SERVER_TYPE @@ -1587,3 +1587,23 @@ def test_write_series_with_some_objects(library, input_series): library.write(symbol='symX', data=input_series) read_data = library.read(symbol='symX').data assert_series_equal(input_series, read_data) + + +def test_can_write_tz_aware_data_df(library): + mydf = _mixed_test_data()['index_tz_aware'][0] + library.write(symbol='symTz', data=mydf) + read_data = library.read(symbol='symTz').data + # Arctic converts by default the data to UTC, convert back + read_data.colB = read_data.colB.dt.tz_localize('UTC').dt.tz_convert(read_data.index.tzinfo) + assert library._versions.find_one({'symbol': 'symTz'})['type'] == PandasDataFrameStore.TYPE + assert_frame_equal(mydf, read_data) + + +def test_can_write_tz_aware_data_series(library): + myseries = _mixed_test_data()['index_tz_aware'][0]['colB'] + library.write(symbol='symTzSer', data=myseries) + read_data = library.read(symbol='symTzSer').data + # Arctic converts by default the data to UTC, convert back + read_data = read_data.dt.tz_localize('UTC').dt.tz_convert(read_data.index.tzinfo) + assert library._versions.find_one({'symbol': 'symTzSer'})['type'] == PandasSeriesStore.TYPE + assert_series_equal(myseries, read_data) diff --git a/tests/unit/serialization/serialization_test_data.py b/tests/unit/serialization/serialization_test_data.py index 15623b2829551..6151b012c3c43 100644 --- a/tests/unit/serialization/serialization_test_data.py +++ b/tests/unit/serialization/serialization_test_data.py @@ -105,32 +105,69 @@ def _new_np_nd_array(val): multi_column_with_some_objects = multi_column_no_multiindex.copy() multi_column_with_some_objects.iloc[1:, 1:2] = 'Convert this columnt dtype to object' + # Index with timezone-aware datetime + index_tz_aware = pd.DataFrame(data={'colA': range(10), + 'colB': pd.date_range('20130101', periods=10, tz='US/Eastern')}, + index=pd.date_range('20130101', periods=10, tz='US/Eastern')) + index_tz_aware.index.name = 'index' + _TEST_DATA = { - 'onerow': (onerow_ts, df_serializer.serialize(onerow_ts)), - 'small': (small_ts, df_serializer.serialize(small_ts)), - 'medium': (medium_ts, df_serializer.serialize(medium_ts)), - 'large': (large_ts, df_serializer.serialize(large_ts)), - 'empty': (empty_ts, df_serializer.serialize(empty_ts)), - 'empty_index': (empty_index, df_serializer.serialize(empty_index)), - 'with_some_objects': (with_some_objects_ts, df_serializer.serialize(with_some_objects_ts)), - 'large_with_some_objects': (large_with_some_objects, df_serializer.serialize(large_with_some_objects)), - 'with_string': (with_string_ts, df_serializer.serialize(with_string_ts)), - 'with_unicode': (with_unicode_ts, df_serializer.serialize(with_unicode_ts)), - 'with_some_none': (with_some_none_ts, df_serializer.serialize(with_some_none_ts)), - 'multiindex': (multiindex_ts, df_serializer.serialize(multiindex_ts)), - 'multiindex_with_object': (multi_index_with_object, df_serializer.serialize(multi_index_with_object)), - 'empty_multiindex': (empty_multiindex_ts, df_serializer.serialize(empty_multiindex_ts)), - 'large_multi_index': (large_multi_index, df_serializer.serialize(large_multi_index)), - 'empty_multicolumn': (empty_multi_column_ts, df_serializer.serialize(empty_multi_column_ts)), - 'multi_column_no_multiindex': (multi_column_no_multiindex, - df_serializer.serialize(multi_column_no_multiindex)), - 'large_multi_column': (large_multi_column, df_serializer.serialize(large_multi_column)), - 'multi_column_int_levels': (multi_column_int_levels, df_serializer.serialize(multi_column_int_levels)), - 'multi_column_and_multi_index': (multi_column_and_multi_index, - df_serializer.serialize(multi_column_and_multi_index)), - 'multi_column_with_some_objects': (multi_column_with_some_objects, - df_serializer.serialize(multi_column_with_some_objects)), - 'n_dimensional_df': (n_dimensional_df, Exception), - 'mixed_dtypes_df': (mixed_dtypes_df, df_serializer.serialize(mixed_dtypes_df)) + 'onerow': (onerow_ts, df_serializer.serialize(onerow_ts), + df_serializer.can_convert_to_records_without_objects(small_ts, 'symA')), + 'small': (small_ts, df_serializer.serialize(small_ts), + df_serializer.can_convert_to_records_without_objects(small_ts, 'symA')), + 'medium': (medium_ts, df_serializer.serialize(medium_ts), + df_serializer.can_convert_to_records_without_objects(medium_ts, 'symA')), + 'large': (large_ts, df_serializer.serialize(large_ts), + df_serializer.can_convert_to_records_without_objects(large_ts, 'symA')), + 'empty': (empty_ts, df_serializer.serialize(empty_ts), + df_serializer.can_convert_to_records_without_objects(empty_ts, 'symA')), + 'empty_index': (empty_index, df_serializer.serialize(empty_index), + df_serializer.can_convert_to_records_without_objects(empty_index, 'symA')), + 'with_some_objects': (with_some_objects_ts, df_serializer.serialize(with_some_objects_ts), + df_serializer.can_convert_to_records_without_objects(with_some_objects_ts, 'symA')), + 'large_with_some_objects': ( + large_with_some_objects, df_serializer.serialize(large_with_some_objects), + df_serializer.can_convert_to_records_without_objects(large_with_some_objects, 'symA')), + 'with_string': (with_string_ts, df_serializer.serialize(with_string_ts), + df_serializer.can_convert_to_records_without_objects(with_string_ts, 'symA')), + 'with_unicode': (with_unicode_ts, df_serializer.serialize(with_unicode_ts), + df_serializer.can_convert_to_records_without_objects(with_unicode_ts, 'symA')), + 'with_some_none': (with_some_none_ts, df_serializer.serialize(with_some_none_ts), + df_serializer.can_convert_to_records_without_objects(with_some_none_ts, 'symA')), + 'multiindex': (multiindex_ts, df_serializer.serialize(multiindex_ts), + df_serializer.can_convert_to_records_without_objects(multiindex_ts, 'symA')), + 'multiindex_with_object': ( + multi_index_with_object, df_serializer.serialize(multi_index_with_object), + df_serializer.can_convert_to_records_without_objects(multi_index_with_object, 'symA')), + 'empty_multiindex': (empty_multiindex_ts, df_serializer.serialize(empty_multiindex_ts), + df_serializer.can_convert_to_records_without_objects(empty_multiindex_ts, 'symA')), + 'large_multi_index': (large_multi_index, df_serializer.serialize(large_multi_index), + df_serializer.can_convert_to_records_without_objects(large_multi_index, 'symA')), + 'empty_multicolumn': (empty_multi_column_ts, df_serializer.serialize(empty_multi_column_ts), + df_serializer.can_convert_to_records_without_objects(empty_multi_column_ts, 'symA')), + 'multi_column_no_multiindex': ( + multi_column_no_multiindex, df_serializer.serialize(multi_column_no_multiindex), + df_serializer.can_convert_to_records_without_objects(multi_column_no_multiindex, 'symA')), + 'large_multi_column': (large_multi_column, df_serializer.serialize(large_multi_column), + df_serializer.can_convert_to_records_without_objects(large_multi_column, 'symA')), + 'multi_column_int_levels': ( + multi_column_int_levels, df_serializer.serialize(multi_column_int_levels), + df_serializer.can_convert_to_records_without_objects(multi_column_int_levels, 'symA')), + 'multi_column_and_multi_index': ( + multi_column_and_multi_index, df_serializer.serialize(multi_column_and_multi_index), + df_serializer.can_convert_to_records_without_objects(multi_column_and_multi_index, 'symA')), + 'multi_column_with_some_objects': ( + multi_column_with_some_objects, df_serializer.serialize(multi_column_with_some_objects), + df_serializer.can_convert_to_records_without_objects(multi_column_with_some_objects, 'symA')), + 'n_dimensional_df': (n_dimensional_df, Exception, None), + 'mixed_dtypes_df': (mixed_dtypes_df, df_serializer.serialize(mixed_dtypes_df), + df_serializer.can_convert_to_records_without_objects(mixed_dtypes_df, 'symA')), + 'index_tz_aware': (index_tz_aware, df_serializer.serialize(index_tz_aware), + df_serializer.can_convert_to_records_without_objects(index_tz_aware, 'symA')) } return _TEST_DATA + + +def is_test_data_serializable(input_df_descr): + return _mixed_test_data()[input_df_descr][2] diff --git a/tests/unit/serialization/test_incremental.py b/tests/unit/serialization/test_incremental.py new file mode 100644 index 0000000000000..26dddf3a1e6d9 --- /dev/null +++ b/tests/unit/serialization/test_incremental.py @@ -0,0 +1,137 @@ +import itertools +import pytest + +from arctic.exceptions import ArcticSerializationException +from arctic.serialization.incremental import IncrementalPandasToRecArraySerializer +from arctic.serialization.numpy_records import DataFrameSerializer +from tests.unit.serialization.serialization_test_data import _mixed_test_data, is_test_data_serializable + +_CHUNK_SIZE = 2 * 1024 * 1024 - 2048 +NON_HOMOGENEOUS_DTYPE_PATCH_SIZE_ROWS = 50 +_TEST_DATA = None + +df_serializer = DataFrameSerializer() + + +def test_incremental_bad_init(): + with pytest.raises(ArcticSerializationException): + IncrementalPandasToRecArraySerializer(df_serializer, 'hello world', chunk_size=_CHUNK_SIZE) + with pytest.raises(ArcticSerializationException): + IncrementalPandasToRecArraySerializer(df_serializer, 1234, chunk_size=_CHUNK_SIZE) + with pytest.raises(ArcticSerializationException): + IncrementalPandasToRecArraySerializer(df_serializer, _mixed_test_data()['small'][0], chunk_size=0) + with pytest.raises(ArcticSerializationException): + IncrementalPandasToRecArraySerializer(df_serializer, _mixed_test_data()['small'][0], chunk_size=-1) + with pytest.raises(ArcticSerializationException): + IncrementalPandasToRecArraySerializer(df_serializer, _mixed_test_data()['small'][0], chunk_size=_CHUNK_SIZE, string_max_len=-1) + + +def test_none_df(): + with pytest.raises(ArcticSerializationException): + incr_ser = IncrementalPandasToRecArraySerializer(df_serializer, None, chunk_size=_CHUNK_SIZE) + incr_ser.serialize() + with pytest.raises(ArcticSerializationException): + incr_ser = IncrementalPandasToRecArraySerializer(df_serializer, None, chunk_size=_CHUNK_SIZE) + incr_ser.generator_bytes() + + +@pytest.mark.parametrize("input_df_descr", _mixed_test_data().keys()) +def test_serialize_pandas_to_recarray(input_df_descr): + if not is_test_data_serializable(input_df_descr): + return + + df = _mixed_test_data()[input_df_descr][0] + expectation = _mixed_test_data()[input_df_descr][1] + + incr_ser = IncrementalPandasToRecArraySerializer(df_serializer, df, chunk_size=_CHUNK_SIZE) + if not isinstance(expectation, tuple) and issubclass(expectation, Exception): + with pytest.raises(expectation): + [chunk for chunk, _, _, _ in incr_ser.generator_bytes()] + else: + incr_ser_data, incr_ser_dtype = incr_ser.serialize() + matching = expectation[0].tostring() == incr_ser_data.tostring() + assert matching + assert expectation[1] == incr_ser_dtype + + +@pytest.mark.parametrize("input_df_descr", _mixed_test_data().keys()) +def test_serialize_incremental_pandas_to_recarray(input_df_descr): + if not is_test_data_serializable(input_df_descr): + return + + df = _mixed_test_data()[input_df_descr][0] + expectation = _mixed_test_data()[input_df_descr][1] + + incr_ser = IncrementalPandasToRecArraySerializer(df_serializer, df, chunk_size=_CHUNK_SIZE) + + if not isinstance(expectation, tuple) and issubclass(expectation, Exception): + with pytest.raises(expectation): + [chunk for chunk, _, _, _ in incr_ser.generator_bytes()] + else: + chunk_bytes = [chunk_b for chunk_b, _, _, _ in incr_ser.generator_bytes()] + matching = expectation[0].tostring() == b''.join(chunk_bytes) + assert matching + assert expectation[1] == incr_ser.dtype + + +@pytest.mark.parametrize("input_df_descr", _mixed_test_data().keys()) +def test_serialize_incremental_chunk_size_pandas_to_recarray(input_df_descr): + if not is_test_data_serializable(input_df_descr): + return + + df = _mixed_test_data()[input_df_descr][0] + expectation = _mixed_test_data()[input_df_descr][1] + + if not isinstance(expectation, tuple) and issubclass(expectation, Exception): + for div in (1, 4, 8): + chunk_size = div * 8 * 1024 ** 2 + with pytest.raises(expectation): + incr_ser = IncrementalPandasToRecArraySerializer(df_serializer, df, chunk_size=chunk_size) + [chunk for chunk, _, _, _ in incr_ser.generator_bytes()] + return + + for div in (1, 4, 8): + chunk_size = div * 8 * 1024 ** 2 + if input_df_descr is not None and len(expectation) > 0: + row_size = int(expectation[0].dtype.itemsize) + chunk_size = NON_HOMOGENEOUS_DTYPE_PATCH_SIZE_ROWS * row_size / div + incr_ser = IncrementalPandasToRecArraySerializer(df_serializer, df, chunk_size=chunk_size) + chunk_bytes = [chunk for chunk, _, _, _ in incr_ser.generator_bytes()] + matching = expectation[0].tostring() == b''.join(chunk_bytes) + assert matching + assert expectation[1] == incr_ser.dtype + + +@pytest.mark.parametrize("input_df_descr", _mixed_test_data().keys()) +def test_shape(input_df_descr): + if not is_test_data_serializable(input_df_descr): + return + + df = _mixed_test_data()[input_df_descr][0] + expectation = _mixed_test_data()[input_df_descr][1] + + incr_ser = IncrementalPandasToRecArraySerializer(df_serializer, df, chunk_size=_CHUNK_SIZE) + + if not isinstance(expectation, tuple) and issubclass(expectation, Exception): + with pytest.raises(expectation): + [chunk for chunk, _, _, _ in incr_ser.shape] + else: + assert incr_ser.shape == expectation[0].shape + + +@pytest.mark.parametrize("from_idx, to_idx", + [(x, y) for (x, y) in itertools.product(range(-10, len(_mixed_test_data()['large'][0])+100, 500), + range(-10, len(_mixed_test_data()['large'][0])+100, 500)) + if x <= y] + ) +def test_generator_bytes_range(from_idx, to_idx): + # Tests also negative indexing + df = _mixed_test_data()['large'][0] + expectation = _mixed_test_data()['large'][1] + + incr_ser = IncrementalPandasToRecArraySerializer(df_serializer, df, chunk_size=_CHUNK_SIZE) + + chunk_bytes = [chunk_b for chunk_b, _, _, _ in incr_ser.generator_bytes(from_idx=from_idx, to_idx=to_idx)] + matching = expectation[0][from_idx:to_idx].tostring() == b''.join(chunk_bytes) + assert matching + assert expectation[1] == incr_ser.dtype