Skip to content

Commit

Permalink
Merge pull request pandas-dev#647 from dimosped/incremental-serializa…
Browse files Browse the repository at this point in the history
…tion-disabled-final

Incremental serialization complete implementation (not used, code complete)
  • Loading branch information
dimosped authored Nov 6, 2018
2 parents 9295032 + 8577d70 commit 38d17f8
Show file tree
Hide file tree
Showing 9 changed files with 497 additions and 53 deletions.
6 changes: 5 additions & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
## Changelog

### 1.71 (2018-11-05)
### 1.72
* Feature: #577 Added implementation for incremental serializer for numpy records
* Bugfix: #648 Fix issue with Timezone aware Pandas types, which don't contain hasobject attribute

### 1.71 (2018-11-05)
* Bugfix: #645 Fix write errors for Pandas DataFrame that has mixed object/string types in multi-index column

### 1.70 (2018-10-30)
Expand Down
9 changes: 7 additions & 2 deletions arctic/_util.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
from pandas import DataFrame
from pandas.util.testing import assert_frame_equal
import logging

import numpy as np
import pymongo
from pandas import DataFrame
from pandas.util.testing import assert_frame_equal


logger = logging.getLogger(__name__)

MAX_DOCUMENT_SIZE = int(pymongo.common.MAX_BSON_SIZE * 0.8)
NP_OBJECT_DTYPE = np.dtype('O')

# Avoid import-time extra logic
_use_new_count_api = None

Expand Down
2 changes: 2 additions & 0 deletions arctic/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ class DataIntegrityException(ArcticException):
"""
pass

class ArcticSerializationException(ArcticException):
pass

class ConcurrentModificationException(DataIntegrityException):
pass
Expand Down
230 changes: 230 additions & 0 deletions arctic/serialization/incremental.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
import abc
import hashlib
import logging
import os
from threading import RLock

import numpy as np
import pandas as pd
from bson import Binary

from arctic.serialization.numpy_records import PandasSerializer
from .._compression import compress
from ..exceptions import ArcticSerializationException
from .._util import MAX_DOCUMENT_SIZE, NP_OBJECT_DTYPE

ARCTIC_AUTO_EXPAND_CHUNK_SIZE = bool(os.environ.get('ARCTIC_AUTO_EXPAND_CHUNK_SIZE'))

ABC = abc.ABCMeta('ABC', (object,), {})

log = logging.getLogger(__name__)


def incremental_checksum(item, curr_sha=None, is_bytes=False):
curr_sha = hashlib.sha1() if curr_sha is None else curr_sha
curr_sha.update(item if is_bytes else item.tostring())
return curr_sha


class LazyIncrementalSerializer(ABC):
def __init__(self, serializer, input_data, chunk_size):
if chunk_size < 1:
raise ArcticSerializationException("LazyIncrementalSerializer can't be initialized "
"with chunk_size < 1 ({})".format(chunk_size))
if not serializer:
raise ArcticSerializationException("LazyIncrementalSerializer can't be initialized "
"with a None serializer object")
self.input_data = input_data
self.chunk_size = chunk_size
self._serializer = serializer
self._initialized = False
self._checksum = None

@abc.abstractmethod
def __len__(self):
pass

@abc.abstractproperty
def generator(self):
pass

@abc.abstractproperty
def generator_bytes(self):
pass

@abc.abstractproperty
def serialize(self):
pass


class IncrementalPandasToRecArraySerializer(LazyIncrementalSerializer):
def __init__(self, serializer, input_data, chunk_size, string_max_len=None):
super(IncrementalPandasToRecArraySerializer, self).__init__(serializer, input_data, chunk_size)
if not isinstance(serializer, PandasSerializer):
raise ArcticSerializationException("IncrementalPandasToRecArraySerializer requires a serializer of "
"type PandasSerializer.")
if not isinstance(input_data, (pd.DataFrame, pd.Series)):
raise ArcticSerializationException("IncrementalPandasToRecArraySerializer requires a pandas DataFrame or "
"Series as data source input.")
if string_max_len and string_max_len < 1:
raise ArcticSerializationException("IncrementalPandasToRecArraySerializer can't be initialized "
"with string_max_len < 1 ({})".format(string_max_len))
self.string_max_len = string_max_len
# The state which needs to be lazily initialized
self._dtype = None
self._shape = None
self._rows_per_chunk = 0
self._total_chunks = 0
self._has_string_object = False
self._lock = RLock()

def _dtype_convert_to_max_len_string(self, input_ndtype, fname):
if input_ndtype.type not in (np.string_, np.unicode_):
return input_ndtype, False
type_sym = 'S' if input_ndtype.type == np.string_ else 'U'
max_str_len = len(max(self.input_data[fname].astype(type_sym), key=len))
str_field_dtype = np.dtype('{}{:d}'.format(type_sym, max_str_len)) if max_str_len > 0 else input_ndtype
return str_field_dtype, True

def _get_dtype(self):
# Serializer is being called only if can_convert_to_records_without_objects() has passed,
# which means that the resulting recarray does not contain objects but only numpy types, string, or unicode

# Serialize the first row to obtain info about row size in bytes (cache first few rows only)
# Also raise an Exception early, if data are not serializable
first_chunk, serialized_dtypes = self._serializer.serialize(
self.input_data[0:10] if len(self) > 0 else self.input_data,
string_max_len=self.string_max_len)

# This is the common case, where first row's dtype represents well the whole dataframe's dtype
if serialized_dtypes is None or \
len(self.input_data) == 0 or \
NP_OBJECT_DTYPE not in self.input_data.dtypes.values:
return first_chunk, serialized_dtypes, False

# Reaching here means we have at least one column of type object
# To correctly serialize incrementally, we need to know the final dtype (type and fixed length),
# using length-conversion information from all values of the object columns

dtype_arr = []
has_string_object = False
for field_name in serialized_dtypes.names: # include all column names, along with the expanded multi-index
field_dtype = serialized_dtypes[field_name]
if field_name not in self.input_data or self.input_data.dtypes[field_name] is NP_OBJECT_DTYPE:
# Note: .hasobject breaks for timezone-aware datetime64 pandas columns, so compare with dtype('O')
# if column is an expanded multi index or doesn't contain objects, the serialized 1st row dtype is safe
field_dtype, with_str_object = self._dtype_convert_to_max_len_string(field_dtype, field_name)
has_string_object |= with_str_object
dtype_arr.append((field_name, field_dtype))
return first_chunk, np.dtype(dtype_arr), has_string_object

def _lazy_init(self):
if self._initialized:
return

with self._lock:
if self._initialized: # intentional double check here
return
# Get the dtype of the serialized array (takes into account object types, converted to fixed length strings)
first_chunk, dtype, has_string_object = self._get_dtype()

# Compute the number of rows which can fit in a chunk
rows_per_chunk = 0
if len(self) > 0 and self.chunk_size > 1:
rows_per_chunk = IncrementalPandasToRecArraySerializer._calculate_rows_per_chunk(self.chunk_size, first_chunk)

# Initialize object's state
self._dtype = dtype
shp = list(first_chunk.shape)
shp[0] = len(self)
self._shape = tuple(shp)
self._has_string_object = has_string_object
self._rows_per_chunk = rows_per_chunk
self._total_chunks = int(np.ceil(float(len(self)) / self._rows_per_chunk)) if rows_per_chunk > 0 else 0
self._initialized = True

@staticmethod
def _calculate_rows_per_chunk(max_chunk_size, chunk):
sze = int(chunk.dtype.itemsize * np.prod(chunk.shape[1:]))
sze = sze if sze < max_chunk_size else max_chunk_size
rows_per_chunk = int(max_chunk_size / sze)
if rows_per_chunk < 1 and ARCTIC_AUTO_EXPAND_CHUNK_SIZE:
# If a row size is larger than chunk_size, use the maximum document size
logging.warning('Chunk size of {} is too small to fit a row ({}). '
'Using maximum document size.'.format(max_chunk_size, MAX_DOCUMENT_SIZE))
# For huge rows, fall-back to using a very large document size, less than max-allowed by MongoDB
rows_per_chunk = int(MAX_DOCUMENT_SIZE / sze)
if rows_per_chunk < 1:
raise ArcticSerializationException("Serialization failed to split data into max sized chunks.")
return rows_per_chunk

def __len__(self):
return len(self.input_data)

@property
def shape(self):
self._lazy_init()
return self._shape

@property
def dtype(self):
self._lazy_init()
return self._dtype

@property
def rows_per_chunk(self):
self._lazy_init()
return self._rows_per_chunk

def checksum(self, from_idx, to_idx):
if self._checksum is None:
self._lazy_init()
total_sha = None
for chunk_bytes, dtype in self.generator_bytes(from_idx=from_idx, to_idx=to_idx):
# TODO: what about compress_array here in batches?
compressed_chunk = compress(chunk_bytes)
total_sha = incremental_checksum(compressed_chunk, curr_sha=total_sha, is_bytes=True)
self._checksum = Binary(total_sha.digest())
return self._checksum

def generator(self, from_idx=None, to_idx=None):
return self._generator(from_idx=from_idx, to_idx=to_idx)

def generator_bytes(self, from_idx=None, to_idx=None):
return self._generator(from_idx=from_idx, to_idx=to_idx, get_bytes=True)

def _generator(self, from_idx, to_idx, get_bytes=False):
# Note that the range is: [from_idx, to_idx)
self._lazy_init()

my_lenth = len(self)

# Take into account default arguments and negative indexing (from end offset)
from_idx = 0 if from_idx is None else from_idx
if from_idx < 0:
from_idx = my_lenth + from_idx
to_idx = my_lenth if to_idx is None else min(to_idx, my_lenth)
if to_idx < 0:
to_idx = my_lenth + to_idx

# No data, finish iteration
if my_lenth == 0 or from_idx >= my_lenth or from_idx >= to_idx:
return

# Perform serialization for each chunk
while from_idx < to_idx:
curr_stop = min(from_idx+self._rows_per_chunk, to_idx)

chunk, _ = self._serializer.serialize(
self.input_data[from_idx: curr_stop],
string_max_len=self.string_max_len,
forced_dtype=self.dtype if self._has_string_object else None)

# Let the gc collect the intermediate serialized chunk as early as possible
chunk = chunk.tostring() if chunk is not None and get_bytes else chunk

yield chunk, self.dtype, from_idx, curr_stop
from_idx = curr_stop

def serialize(self):
return self._serializer.serialize(self.input_data, self.string_max_len)
48 changes: 28 additions & 20 deletions arctic/serialization/numpy_records.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import numpy as np
from pandas import DataFrame, MultiIndex, Series, DatetimeIndex, Index
from ..exceptions import ArcticException
from .._util import NP_OBJECT_DTYPE
try: # 0.21+ Compatibility
from pandas._libs.tslib import Timestamp
from pandas._libs.tslibs.timezones import get_timezone
Expand All @@ -27,19 +28,21 @@ def set_fast_check_df_serializable(config):
_FAST_CHECK_DF_SERIALIZABLE = bool(config)


def _to_primitive(arr, string_max_len=None):
def _to_primitive(arr, string_max_len=None, forced_dtype=None):
if arr.dtype.hasobject:
if len(arr) > 0 and isinstance(arr[0], Timestamp):
return np.array([t.value for t in arr], dtype=DTN64_DTYPE)

if string_max_len:
str_array = np.array(arr.astype('U{:d}'.format(string_max_len)))
if forced_dtype is not None:
casted_arr = arr.astype(dtype=forced_dtype, copy=False)
elif string_max_len is not None:
casted_arr = np.array(arr.astype('U{:d}'.format(string_max_len)))
else:
str_array = np.array(list(arr))
casted_arr = np.array(list(arr))

# Pick any unwanted data conversions (e.g. np.NaN to 'nan')
if np.array_equal(arr, str_array):
return str_array
if np.array_equal(arr, casted_arr):
return casted_arr
return arr


Expand All @@ -48,7 +51,7 @@ def _multi_index_to_records(index, empty_index):
if not empty_index:
ix_vals = list(map(np.array, [index.get_level_values(i) for i in range(index.nlevels)]))
else:
# empty multi index has no size, create empty arrays for recarry..
# empty multi index has no size, create empty arrays for recarry.
ix_vals = [np.array([]) for n in index.names]
index_names = list(index.names)
count = 0
Expand Down Expand Up @@ -110,7 +113,7 @@ def _index_from_records(self, recarr):
rtn = MultiIndex.from_arrays(level_arrays, names=index)
return rtn

def _to_records(self, df, string_max_len=None):
def _to_records(self, df, string_max_len=None, forced_dtype=None):
"""
Similar to DataFrame.to_records()
Differences:
Expand All @@ -134,11 +137,16 @@ def _to_records(self, df, string_max_len=None):
names = index_names + columns

arrays = []
for arr in ix_vals + column_vals:
arrays.append(_to_primitive(arr, string_max_len))

dtype = np.dtype([(str(x), v.dtype) if len(v.shape) == 1 else (str(x), v.dtype, v.shape[1]) for x, v in zip(names, arrays)],
metadata=metadata)
for arr, name in zip(ix_vals + column_vals, index_names + columns):
arrays.append(_to_primitive(arr, string_max_len,
forced_dtype=None if forced_dtype is None else forced_dtype[name]))

if forced_dtype is None:
dtype = np.dtype([(str(x), v.dtype) if len(v.shape) == 1 else (str(x), v.dtype, v.shape[1])
for x, v in zip(names, arrays)],
metadata=metadata)
else:
dtype = forced_dtype

# The argument names is ignored when dtype is passed
rtn = np.rec.fromarrays(arrays, dtype=dtype, names=names)
Expand Down Expand Up @@ -166,8 +174,8 @@ def fast_check_serializable(self, df):
mappings, and empty dict otherwise.
"""
i_dtype, f_dtypes = df.index.dtype, df.dtypes
index_has_object = df.index.dtype.hasobject
fields_with_object = [f for f in df.columns if f_dtypes[f] is np.dtype('O')]
index_has_object = df.index.dtype is NP_OBJECT_DTYPE
fields_with_object = [f for f in df.columns if f_dtypes[f] is NP_OBJECT_DTYPE]
if df.empty or (not index_has_object and not fields_with_object):
arr, _ = self._to_records(df.iloc[:10]) # only first few rows for performance
return arr, {}
Expand Down Expand Up @@ -202,7 +210,7 @@ def can_convert_to_records_without_objects(self, df, symbol):
else:
return True

def serialize(self, item):
def serialize(self, item, string_max_len=None, forced_dtype=None):
raise NotImplementedError

def deserialize(self, item):
Expand All @@ -224,8 +232,8 @@ def deserialize(self, item):
name = item.dtype.names[-1]
return Series.from_array(item[name], index=index, name=name)

def serialize(self, item, string_max_len=None):
return self._to_records(item, string_max_len)
def serialize(self, item, string_max_len=None, forced_dtype=None):
return self._to_records(item, string_max_len, forced_dtype)


class DataFrameSerializer(PandasSerializer):
Expand Down Expand Up @@ -267,5 +275,5 @@ def deserialize(self, item):

return df

def serialize(self, item, string_max_len=None):
return self._to_records(item, string_max_len)
def serialize(self, item, string_max_len=None, forced_dtype=None):
return self._to_records(item, string_max_len, forced_dtype)
Loading

0 comments on commit 38d17f8

Please sign in to comment.