Skip to content

Commit

Permalink
A helper to provide a bytes interface for r/w on Python object. (v6d-…
Browse files Browse the repository at this point in the history
…io#286)


* A helper to provide a bytes interface for r/w on Python object.

Signed-off-by: Tao He <linzhu.ht@alibaba-inc.com>
  • Loading branch information
sighingnow authored Jun 1, 2021
1 parent 90b9e7d commit 01e1e50
Show file tree
Hide file tree
Showing 2 changed files with 224 additions and 0 deletions.
130 changes: 130 additions & 0 deletions python/vineyard/data/pickle.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
#! /usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright 2021 Alibaba Group Holding Limited.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

'''
Pickle support for arbitrary vineyard objects.
'''

from io import BytesIO

import pickle
if pickle.HIGHEST_PROTOCOL < 5:
import pickle5 as pickle


class PickledReader:
''' Serialize a python object in zero-copy fashion and provides a bytes-like
read interface.
'''
def __init__(self, value):
self._value = value
self._buffers = [None]
self._store_size = 0

buffers = []
bs = pickle.dumps(value, protocol=5, fix_imports=True, buffer_callback=buffers.append)

meta = BytesIO()
meta.write(b'__VINEYARD__')
self._poke_int(meta, len(buffers))
for buf in buffers:
raw = buf.raw()
self._buffers.append(raw)
self._store_size += len(raw)
self._poke_int(meta, len(raw))

self._poke_int(meta, len(bs))
meta.write(bs)
self._buffers[0] = memoryview(meta.getbuffer())
self._store_size += len(self._buffers[0])

self._chunk_index = 0
self._chunk_offset = 0

@property
def value(self):
return self._value

@property
def store_size(self):
return self._store_size

def _poke_int(self, bs, value):
bs.write(int.to_bytes(value, length=8, byteorder='big'))

def read(self, block_size):
assert block_size >= 0, "The next chunk size to read must be greater than 0"
if self._chunk_offset == len(self._buffers[self._chunk_index]):
self._chunk_index += 1
self._chunk_offset = 0
if self._chunk_index >= len(self._buffers):
return b''
chunk = self._buffers[self._chunk_index]
offset = self._chunk_offset
next_offset = min(len(chunk), offset + block_size)
result = self._buffers[self._chunk_index][offset:next_offset]
self._chunk_offset += len(result)
return result


class PickledWriter:
''' Deserialize a pickled bytes into a python object in zero-copy fashion.
'''
def __init__(self, store_size):
self._buffer = BytesIO(initial_bytes=b'\x00' * store_size)
self._buffer.seek(0)
self._value = None

@property
def value(self):
return self._value

def write(self, bs):
self._buffer.write(bs)

def close(self):
bs = self._buffer.getbuffer()
buffers = []
buffer_sizes = []
offset, _ = self._peek_any(bs, 0, b'__VINEYARD__')
offset, nbuffers = self._peek_int(bs, offset)
for _ in range(nbuffers):
offset, sz = self._peek_int(bs, offset)
buffer_sizes.append(sz)
offset, metalen = self._peek_int(bs, offset)
offset, meta = self._peek_buffer(bs, offset, metalen)
for nlen in buffer_sizes:
offset, block = self._peek_buffer(bs, offset, nlen)
buffers.append(block)
self._value = pickle.loads(meta, fix_imports=True, buffers=buffers)

def _peek_any(self, bs, offset, target):
value = bs[offset:offset + len(target)]
assert value == target, "Unexpected bytes: " + value
return offset + len(target), value

def _peek_int(self, bs, offset):
value = int.from_bytes(bs[offset:offset + 8], byteorder='big')
return offset + 8, value

def _peek_buffer(self, bs, offset, size):
value = bs[offset:offset + size]
return offset + size, value


__all__ = ['PickledReader', 'PickledWriter']
94 changes: 94 additions & 0 deletions python/vineyard/data/tests/test_pickle.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
#! /usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright 2020 Alibaba Group Holding Limited.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import pandas as pd
import pytest
import numpy as np

from vineyard.data.pickle import PickledReader, PickledWriter

b1m = 1 * 1024 * 1024
b16m = 16 * 1024 * 1024
b64m = 64 * 1024 * 1024
b128m = 128 * 1024 * 1024

values = [
(b1m, 1),
(b1m, (1, 2, 3)),
(b1m, [1, 2, 3, 4]),
(b1m, "dsdfsdf"),
(b1m, (1, "sdfsdfs")),
(b1m, [1] * 100000000),
(b1m, np.arange(1024 * 1024 * 400)),
(b16m, np.zeros((1024, 1024, 48))),
(b64m, np.zeros((1024, 1024, 512))),
(b1m, pd.DataFrame({
'a': np.ones(1024),
'b': np.zeros(1024),
})),
(b16m, pd.DataFrame({
'a': np.ones(1024 * 1024),
'b': np.zeros(1024 * 1024),
})),
(b64m, pd.DataFrame({
'a': np.ones(1024 * 1024 * 4),
'b': np.zeros(1024 * 1024 * 4),
})),
(b128m, pd.DataFrame({
'a': np.ones(1024 * 1024 * 16),
'b': np.zeros(1024 * 1024 * 16),
})),
]


@pytest.mark.parametrize("block_size, value", values)
def test_bytes_io_roundtrip(block_size, value):
reader = PickledReader(value)
bs, nlen = [], 0
while True:
block = reader.read(block_size)
if block:
bs.append(block)
nlen += len(block)
else:
break
assert nlen == reader.store_size

writer = PickledWriter(reader.store_size)
for block in bs:
writer.write(block)
assert writer.value is None
writer.close()
assert writer.value is not None
target = writer.value

# compare values
if isinstance(value, np.ndarray):
# FIXME why `assert_array_equal` are so slow ...
#
# np.testing.assert_array_equal(target, value)
#
assert (target == value).all()
elif isinstance(value, pd.DataFrame):
pd.testing.assert_frame_equal(target, value)
elif isinstance(value, pd.Index):
pd.testing.assert_index_equal(target, value)
elif isinstance(value, pd.Series):
pd.testing.assert_series_equal(target, value)
else:
assert target == value

0 comments on commit 01e1e50

Please sign in to comment.