Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A helper to provide a bytes interface for r/w on Python object. #286

Merged
merged 3 commits into from
Jun 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 130 additions & 0 deletions python/vineyard/data/pickle.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
#! /usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright 2021 Alibaba Group Holding Limited.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

'''
Pickle support for arbitrary vineyard objects.
'''

from io import BytesIO

import pickle
if pickle.HIGHEST_PROTOCOL < 5:
import pickle5 as pickle


class PickledReader:
''' Serialize a python object in zero-copy fashion and provides a bytes-like
read interface.
'''
def __init__(self, value):
self._value = value
self._buffers = [None]
self._store_size = 0

buffers = []
bs = pickle.dumps(value, protocol=5, fix_imports=True, buffer_callback=buffers.append)

meta = BytesIO()
meta.write(b'__VINEYARD__')
self._poke_int(meta, len(buffers))
for buf in buffers:
raw = buf.raw()
self._buffers.append(raw)
self._store_size += len(raw)
self._poke_int(meta, len(raw))

self._poke_int(meta, len(bs))
meta.write(bs)
self._buffers[0] = memoryview(meta.getbuffer())
self._store_size += len(self._buffers[0])

self._chunk_index = 0
self._chunk_offset = 0

@property
def value(self):
return self._value

@property
def store_size(self):
return self._store_size

def _poke_int(self, bs, value):
bs.write(int.to_bytes(value, length=8, byteorder='big'))

def read(self, block_size):
assert block_size >= 0, "The next chunk size to read must be greater than 0"
if self._chunk_offset == len(self._buffers[self._chunk_index]):
self._chunk_index += 1
self._chunk_offset = 0
if self._chunk_index >= len(self._buffers):
return b''
chunk = self._buffers[self._chunk_index]
offset = self._chunk_offset
next_offset = min(len(chunk), offset + block_size)
result = self._buffers[self._chunk_index][offset:next_offset]
self._chunk_offset += len(result)
return result


class PickledWriter:
''' Deserialize a pickled bytes into a python object in zero-copy fashion.
'''
def __init__(self, store_size):
self._buffer = BytesIO(initial_bytes=b'\x00' * store_size)
self._buffer.seek(0)
self._value = None

@property
def value(self):
return self._value

def write(self, bs):
self._buffer.write(bs)

def close(self):
bs = self._buffer.getbuffer()
buffers = []
buffer_sizes = []
offset, _ = self._peek_any(bs, 0, b'__VINEYARD__')
offset, nbuffers = self._peek_int(bs, offset)
for _ in range(nbuffers):
offset, sz = self._peek_int(bs, offset)
buffer_sizes.append(sz)
offset, metalen = self._peek_int(bs, offset)
offset, meta = self._peek_buffer(bs, offset, metalen)
for nlen in buffer_sizes:
offset, block = self._peek_buffer(bs, offset, nlen)
buffers.append(block)
self._value = pickle.loads(meta, fix_imports=True, buffers=buffers)

def _peek_any(self, bs, offset, target):
value = bs[offset:offset + len(target)]
assert value == target, "Unexpected bytes: " + value
return offset + len(target), value

def _peek_int(self, bs, offset):
value = int.from_bytes(bs[offset:offset + 8], byteorder='big')
return offset + 8, value

def _peek_buffer(self, bs, offset, size):
value = bs[offset:offset + size]
return offset + size, value


__all__ = ['PickledReader', 'PickledWriter']
94 changes: 94 additions & 0 deletions python/vineyard/data/tests/test_pickle.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
#! /usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright 2020 Alibaba Group Holding Limited.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import pandas as pd
import pytest
import numpy as np

from vineyard.data.pickle import PickledReader, PickledWriter

b1m = 1 * 1024 * 1024
b16m = 16 * 1024 * 1024
b64m = 64 * 1024 * 1024
b128m = 128 * 1024 * 1024

values = [
(b1m, 1),
(b1m, (1, 2, 3)),
(b1m, [1, 2, 3, 4]),
(b1m, "dsdfsdf"),
(b1m, (1, "sdfsdfs")),
(b1m, [1] * 100000000),
(b1m, np.arange(1024 * 1024 * 400)),
(b16m, np.zeros((1024, 1024, 48))),
(b64m, np.zeros((1024, 1024, 512))),
(b1m, pd.DataFrame({
'a': np.ones(1024),
'b': np.zeros(1024),
})),
(b16m, pd.DataFrame({
'a': np.ones(1024 * 1024),
'b': np.zeros(1024 * 1024),
})),
(b64m, pd.DataFrame({
'a': np.ones(1024 * 1024 * 4),
'b': np.zeros(1024 * 1024 * 4),
})),
(b128m, pd.DataFrame({
'a': np.ones(1024 * 1024 * 16),
'b': np.zeros(1024 * 1024 * 16),
})),
]


@pytest.mark.parametrize("block_size, value", values)
def test_bytes_io_roundtrip(block_size, value):
reader = PickledReader(value)
bs, nlen = [], 0
while True:
block = reader.read(block_size)
if block:
bs.append(block)
nlen += len(block)
else:
break
assert nlen == reader.store_size

writer = PickledWriter(reader.store_size)
for block in bs:
writer.write(block)
assert writer.value is None
writer.close()
assert writer.value is not None
target = writer.value

# compare values
if isinstance(value, np.ndarray):
# FIXME why `assert_array_equal` are so slow ...
#
# np.testing.assert_array_equal(target, value)
#
assert (target == value).all()
elif isinstance(value, pd.DataFrame):
pd.testing.assert_frame_equal(target, value)
elif isinstance(value, pd.Index):
pd.testing.assert_index_equal(target, value)
elif isinstance(value, pd.Series):
pd.testing.assert_series_equal(target, value)
else:
assert target == value