From 165f36b3245816c39fbde658b40dde7b4c303be3 Mon Sep 17 00:00:00 2001 From: anjakefala Date: Thu, 24 Aug 2023 14:04:25 -0700 Subject: [PATCH 1/5] [Python] use IPC for pickle serialisation Existing pickling serialises the whole buffer, even if the Array is sliced. Now we use Arrow's buffer truncation implemented for IPC serialization for pickling. Relies on a RecordBatch wrapper, adding ~230 bytes to the pickled payload per Array chunk. Closes #26685 --- python/pyarrow/array.pxi | 32 ++++- python/pyarrow/table.pxi | 90 ++++++++++++-- python/pyarrow/tests/test_array.py | 68 +++++++--- python/pyarrow/tests/test_table.py | 191 +++++++++++++++++++++++++++-- 4 files changed, 337 insertions(+), 44 deletions(-) diff --git a/python/pyarrow/array.pxi b/python/pyarrow/array.pxi index e36d8b2f04315..9745beca7a430 100644 --- a/python/pyarrow/array.pxi +++ b/python/pyarrow/array.pxi @@ -674,12 +674,18 @@ cdef shared_ptr[CArrayData] _reconstruct_array_data(data): offset) -def _restore_array(data): +def _restore_array(buffer): """ - Reconstruct an Array from pickled ArrayData. + Restore an IPC serialized Arrow Array. + + Workaround for a pickling sliced Array issue, + where the whole buffer would be serialized: + https://github.com/apache/arrow/issues/26685 """ - cdef shared_ptr[CArrayData] ad = _reconstruct_array_data(data) - return pyarrow_wrap_array(MakeArray(ad)) + from pyarrow.ipc import RecordBatchStreamReader + + with RecordBatchStreamReader(buffer) as reader: + return reader.read_next_batch().column(0) cdef class _PandasConvertible(_Weakrefable): @@ -1100,8 +1106,22 @@ cdef class Array(_PandasConvertible): memory_pool=memory_pool) def __reduce__(self): - return _restore_array, \ - (_reduce_array_data(self.sp_array.get().data().get()),) + """ + Use Arrow IPC format for serialization. + + Workaround for a pickling sliced Array issue, + where the whole buffer would be serialized: + https://github.com/apache/arrow/issues/26685 + """ + from pyarrow.ipc import RecordBatchStreamWriter + from pyarrow.lib import RecordBatch, BufferOutputStream + + batch = RecordBatch.from_arrays([self], ['']) + sink = BufferOutputStream() + with RecordBatchStreamWriter(sink, schema=batch.schema) as writer: + writer.write_batch(batch) + + return _restore_array, (sink.getvalue(),) @staticmethod def from_buffers(DataType type, length, buffers, null_count=-1, offset=0, diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi index 2eae38485dca4..b12ab9e62549b 100644 --- a/python/pyarrow/table.pxi +++ b/python/pyarrow/table.pxi @@ -16,6 +16,7 @@ # under the License. import warnings +import functools cdef class ChunkedArray(_PandasConvertible): @@ -67,7 +68,21 @@ cdef class ChunkedArray(_PandasConvertible): self.chunked_array = chunked_array.get() def __reduce__(self): - return chunked_array, (self.chunks, self.type) + """ + Use Arrow IPC format for serialization. + + Workaround for a pickling sliced Array issue, + where the whole buffer would be serialized: + https://github.com/apache/arrow/issues/26685 + + Adds ~230 extra bytes to the pickled payload per Array chunk. + """ + import pyarrow as pa + + # IPC serialization requires wrapping in RecordBatch + table = pa.Table.from_arrays([self], names=[""]) + reconstruct_table, serialised = table.__reduce__() + return functools.partial(_reconstruct_chunked_array, reconstruct_table), serialised @property def data(self): @@ -1390,6 +1405,16 @@ def chunked_array(arrays, type=None): c_result = GetResultValue(CChunkedArray.Make(c_arrays, c_type)) return pyarrow_wrap_chunked_array(c_result) +def _reconstruct_chunked_array(restore_table, buffer): + """ + Restore an IPC serialized ChunkedArray. + + Workaround for a pickling sliced Array issue, + where the whole buffer would be serialized: + https://github.com/apache/arrow/issues/26685 + """ + return restore_table(buffer).column(0) + cdef _schema_from_arrays(arrays, names, metadata, shared_ptr[CSchema]* schema): cdef: @@ -2196,7 +2221,21 @@ cdef class RecordBatch(_Tabular): return self.batch != NULL def __reduce__(self): - return _reconstruct_record_batch, (self.columns, self.schema) + """ + Use Arrow IPC format for serialization. + + Workaround for a pickling sliced RecordBatch issue, + where the whole buffer would be serialized: + https://github.com/apache/arrow/issues/26685 + """ + from pyarrow.ipc import RecordBatchStreamWriter + from pyarrow.lib import RecordBatch, BufferOutputStream + + sink = BufferOutputStream() + with RecordBatchStreamWriter(sink, schema=self.schema) as writer: + writer.write_batch(self) + + return _reconstruct_record_batch, (sink.getvalue(),) def validate(self, *, full=False): """ @@ -2984,11 +3023,18 @@ cdef class RecordBatch(_Tabular): return pyarrow_wrap_batch(c_batch) -def _reconstruct_record_batch(columns, schema): +def _reconstruct_record_batch(buffer): """ - Internal: reconstruct RecordBatch from pickled components. + Restore an IPC serialized Arrow RecordBatch. + + Workaround for a pickling sliced RecordBatch issue, + where the whole buffer would be serialized: + https://github.com/apache/arrow/issues/26685 """ - return RecordBatch.from_arrays(columns, schema=schema) + from pyarrow.ipc import RecordBatchStreamReader + + with RecordBatchStreamReader(buffer) as reader: + return reader.read_next_batch() def table_to_blocks(options, Table table, categories, extension_columns): @@ -3170,10 +3216,23 @@ cdef class Table(_Tabular): check_status(self.table.Validate()) def __reduce__(self): - # Reduce the columns as ChunkedArrays to avoid serializing schema - # data twice - columns = [col for col in self.columns] - return _reconstruct_table, (columns, self.schema) + """ + Use Arrow IPC format for serialization. + + Workaround for a pickling sliced Table issue, + where the whole buffer would be serialized: + https://github.com/apache/arrow/issues/26685 + + Adds ~230 extra bytes to pickled payload per Array chunk. + """ + from pyarrow.ipc import RecordBatchStreamWriter + from pyarrow.lib import RecordBatch, BufferOutputStream + + sink = BufferOutputStream() + with RecordBatchStreamWriter(sink, schema=self.schema) as writer: + writer.write_table(self) + + return _reconstruct_table, (sink.getvalue(), ) def slice(self, offset=0, length=None): """ @@ -4754,11 +4813,18 @@ cdef class Table(_Tabular): ) -def _reconstruct_table(arrays, schema): +def _reconstruct_table(buffer): """ - Internal: reconstruct pa.Table from pickled components. + Restore an IPC serialized Arrow Table. + + Workaround for a pickling sliced Table issue, + where the whole buffer would be serialized: + https://github.com/apache/arrow/issues/26685 """ - return Table.from_arrays(arrays, schema=schema) + from pyarrow.ipc import RecordBatchStreamReader + + with RecordBatchStreamReader(buffer) as reader: + return reader.read_all() def record_batch(data, names=None, schema=None, metadata=None): diff --git a/python/pyarrow/tests/test_array.py b/python/pyarrow/tests/test_array.py index fca094b519fa5..b7fa17985aeb2 100644 --- a/python/pyarrow/tests/test_array.py +++ b/python/pyarrow/tests/test_array.py @@ -1985,20 +1985,35 @@ def test_cast_identities(ty, values): pickle_test_parametrize = pytest.mark.parametrize( - ('data', 'typ'), - [ - ([True, False, True, True], pa.bool_()), - ([1, 2, 4, 6], pa.int64()), - ([1.0, 2.5, None], pa.float64()), - (['a', None, 'b'], pa.string()), - ([], None), - ([[1, 2], [3]], pa.list_(pa.int64())), - ([[4, 5], [6]], pa.large_list(pa.int16())), - ([['a'], None, ['b', 'c']], pa.list_(pa.string())), - ([(1, 'a'), (2, 'c'), None], - pa.struct([pa.field('a', pa.int64()), pa.field('b', pa.string())])) - ] -) + ('data', 'typ'), + [ + # Int array + (list(range(999)) + [None], pa.int64()), + # Float array + (list(map(float, range(999))) + [None], pa.float64()), + # Boolean array + ([True, False, None, True] * 250, pa.bool_()), + # String array + (['a', 'b', 'cd', None, 'efg'] * 200, pa.string()), + # List array + ([[1, 2], [3], [None, 4, 5], [6]] * 250, pa.list_(pa.int64())), + # Large list array + ( + [[4, 5], [6], [None, 7], [8, 9, 10]] * 250, + pa.large_list(pa.int16()) + ), + # String list array + ( + [['a'], None, ['b', 'cd'], ['efg']] * 250, + pa.list_(pa.string()) + ), + # Struct array + ( + [(1, 'a'), (2, 'c'), None, (3, 'b')] * 250, + pa.struct([pa.field('a', pa.int64()), pa.field('b', pa.string())]) + ), + # Empty array + ]) @pickle_test_parametrize @@ -2049,6 +2064,31 @@ def test_array_pickle_protocol5(data, typ, pickle_module): for buf in result.buffers()] assert result_addresses == addresses +@pickle_test_parametrize +def test_array_pickle_slice_truncation(data, typ, pickle_module): + arr = pa.array(data, type=typ) + serialized = pickle_module.dumps(arr) + + slice_arr = arr.slice(10, 2) + serialized = pickle_module.dumps(slice_arr) + + # Check truncation upon serialization + assert len(serialized) <= 0.2 * len(serialized) + + post_pickle_slice = pickle_module.loads(serialized) + + # Check for post-roundtrip equality + assert post_pickle_slice.equals(slice_arr) + + # Check that pickling reset the offset + assert post_pickle_slice.offset == 0 + + # Check that after pickling the slice buffer was trimmed to only contain the sliced data + buf_size = arr.get_total_buffer_size() + post_pickle_slice_buf_size = post_pickle_slice.get_total_buffer_size() + assert buf_size / post_pickle_slice_buf_size - len(arr) / len(post_pickle_slice) < 10 + + @pytest.mark.parametrize( 'narr', diff --git a/python/pyarrow/tests/test_table.py b/python/pyarrow/tests/test_table.py index f93c6bbc2c204..7a81918acaae5 100644 --- a/python/pyarrow/tests/test_table.py +++ b/python/pyarrow/tests/test_table.py @@ -285,21 +285,38 @@ def ne(xarrs, yarrs): # ARROW-4822 assert not pa.chunked_array([], type=pa.int32()).equals(None) - -@pytest.mark.parametrize( - ('data', 'typ'), +pickle_test_parametrize = pytest.mark.parametrize( + ('data', 'typ'), [ - ([True, False, True, True], pa.bool_()), - ([1, 2, 4, 6], pa.int64()), - ([1.0, 2.5, None], pa.float64()), - (['a', None, 'b'], pa.string()), - ([], pa.list_(pa.uint8())), - ([[1, 2], [3]], pa.list_(pa.int64())), - ([['a'], None, ['b', 'c']], pa.list_(pa.string())), - ([(1, 'a'), (2, 'c'), None], - pa.struct([pa.field('a', pa.int64()), pa.field('b', pa.string())])) + # Int array + (list(range(999)) + [None], pa.int64()), + # Float array + (list(map(float, range(999))) + [None], pa.float64()), + # Boolean array + ([True, False, None, True] * 250, pa.bool_()), + # String array + (['a', 'b', 'cd', None, 'efg'] * 200, pa.string()), + # List array + ([[1, 2], [3], [None, 4, 5], [6]] * 250, pa.list_(pa.int64())), + # Large list array + ( + [[4, 5], [6], [None, 7], [8, 9, 10]] * 250, + pa.large_list(pa.int16()), + ), + # String list array + ( + [['a'], None, ['b', 'cd'], ['efg']] * 250, + pa.list_(pa.string()), + ), + # Struct array + ( + [(1, 'a'), (2, 'c'), None, (3, 'b')] * 250, + pa.struct([pa.field('a', pa.int64()), pa.field('b', pa.string())]), + ), ] ) + +@pickle_test_parametrize def test_chunked_array_pickle(data, typ, pickle_module): arrays = [] while data: @@ -311,6 +328,38 @@ def test_chunked_array_pickle(data, typ, pickle_module): result.validate() assert result.equals(array) +@pickle_test_parametrize +def test_chunked_array_pickle_slice_truncation(data, typ, pickle_module): + # create chunked array + arrays = [] + while data: + arrays.append(pa.array(data[:2], type=typ)) + data = data[2:] + array = pa.chunked_array(arrays, type=typ) + array.validate() + serialised_array = pickle_module.dumps(array) + + slice_array = array.slice(10, 20) + serialised_slice = pickle_module.dumps(slice_array) + + # Check truncation upon serialisation + assert len(serialised_slice) <= 0.2 * len(serialised_array) + + post_pickle_slice = pickle_module.loads(serialised_slice) + + # Check for roundtrip equality + assert post_pickle_slice.equals(slice_array) + + # Check that pickling reset the offset + assert post_pickle_slice.chunk(0).offset == 0 + + # Check that after pickling the slice buffer was trimmed to only contain sliced data + buf_size = array.get_total_buffer_size() + post_pickle_slice_buf_size = post_pickle_slice.get_total_buffer_size() + assert buf_size / post_pickle_slice_buf_size - len(array) / len(post_pickle_slice) < 10 + + + @pytest.mark.pandas def test_chunked_array_to_pandas(): @@ -678,6 +727,64 @@ def test_recordbatch_pickle(pickle_module): assert result.equals(batch) assert result.schema == schema +def test_recordbatch_pickle_slice_truncation(pickle_module): + data = [ + # Int array + pa.array(list(range(999)) + [None], type=pa.int64()), + # Float array. + pa.array(list(map(float, range(999))) + [None], type=pa.float64()), + # Boolean array. + pa.array([True, False, None, True] * 250, type=pa.bool_()), + # String array. + pa.array(['a', 'b', 'cd', None, 'efg'] * 200, type=pa.string()), + # List array. + pa.array([[1, 2], [3], [None, 4, 5], [6]] * 250, type=pa.list_(pa.int64())), + # Large list array. + pa.array( + [[4, 5], [6], [None, 7], [8, 9, 10]] * 250, + type=pa.large_list(pa.int16()), + ), + # String list array. + pa.array( + [['a'], None, ['b', 'cd'], ['efg']] * 250, + type=pa.list_(pa.string()), + ), + ] + + fields = [ + pa.field('ints', pa.int64()), + pa.field('floats', pa.float64()), + pa.field('bools', pa.bool_()), + pa.field("strs", pa.string()), + pa.field("lists", pa.list_(pa.int64())), + pa.field("large-lists", pa.large_list(pa.int16())), + pa.field("str-list", pa.list_(pa.string())), + ] + + schema = pa.schema(fields, metadata={b'foo': b'bar'}) + batch = pa.record_batch(data, schema=schema) + serialised_batch = pickle_module.dumps(batch) + + slice_batch = batch.slice(10, 2) + serialised_slice = pickle_module.dumps(slice_batch) + + # Check that buffer was truncated upon serialisation + assert len(serialised_slice) <= 0.2 * len(serialised_batch) + + post_pickle_slice = pickle_module.loads(serialised_slice) + + # Roundtrip equality + assert post_pickle_slice.equals(slice_batch) + + # Slice offset was reset + for col in post_pickle_slice.columns: + assert col.offset == 0 + + # Post-pickle slice buffer should only contain the sliced data + buf_size = batch.get_total_buffer_size() + post_pickle_slice_buf_size = post_pickle_slice.get_total_buffer_size() + assert buf_size / post_pickle_slice_buf_size - len(batch) / len(post_pickle_slice) < 10 + def test_recordbatch_get_field(): data = [ @@ -1035,6 +1142,66 @@ def test_table_pickle(pickle_module): result.validate() assert result.equals(table) +def test_table_pickle_slice_truncation(pickle_module): + data = [ + # Int array + pa.chunked_array([list(range(999))] + [[None]], type=pa.int64()), + # Float array + pa.chunked_array([list(map(float, range(999)))] + [[None]], type=pa.float64()), + # Boolean array + pa.chunked_array([[True, False, None, True]] * 250, type=pa.bool_()), + # String array + pa.chunked_array([['a', 'b', 'cd', None, 'efg']] * 200, type=pa.string()), + # List array + pa.chunked_array([[[1, 2], [3], [None, 4, 5], [6]]] * 250, type=pa.list_(pa.int64())), + # Large list array + pa.chunked_array( + [[[4, 5], [6], [None, 7], [8, 9, 10]]] * 250, + type=pa.large_list(pa.int16()), + ), + # String list array. + pa.chunked_array( + [[['a'], None, ['b', 'cd'], ['efg']]] * 250, + type=pa.list_(pa.string()), + ), + ] + + fields = [ + pa.field('ints', pa.int64()), + pa.field('floats', pa.float64()), + pa.field('bools', pa.bool_()), + pa.field("strings", pa.string()), + pa.field("lists", pa.list_(pa.int64())), + pa.field("large-lists", pa.large_list(pa.int16())), + pa.field("string-list", pa.list_(pa.string())), + ] + + schema = pa.schema(fields, metadata= {b'foo': b'bar'}) + + table = pa.Table.from_arrays(data, schema=schema) + serialised_table = pickle_module.dumps(table) + + slice_table = table.slice(10, 2) + serialised_slice = pickle_module.dumps(slice_table) + + # Check buffer truncation + assert len(serialised_slice) <= 0.2 * len(serialised_table) + + post_pickle_slice = pickle_module.loads(serialised_slice) + post_pickle_slice.validate() + + # Roundtrip-equality + assert post_pickle_slice.equals(slice_table) + + # Offset reset post-pickle + for col in post_pickle_slice.columns: + assert col.chunk(0).offset == 0 + + # Post-pickle slice buffer only contains slice data + buf_size = table.get_total_buffer_size() + post_pickle_slice_buf_size = post_pickle_slice.get_total_buffer_size() + assert buf_size / post_pickle_slice_buf_size - len(table) / len(post_pickle_slice) < 10 + def test_table_get_field(): data = [ From 2124aab25ce244b15f49b9ebf419b2ffa29c2c96 Mon Sep 17 00:00:00 2001 From: anjakefala Date: Tue, 12 Sep 2023 11:42:53 -0700 Subject: [PATCH 2/5] Ran linter --- python/pyarrow/table.pxi | 1 + python/pyarrow/tests/test_array.py | 65 +++++++++++++++--------------- python/pyarrow/tests/test_table.py | 57 ++++++++++++++------------ 3 files changed, 66 insertions(+), 57 deletions(-) diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi index b12ab9e62549b..bb32604916e9e 100644 --- a/python/pyarrow/table.pxi +++ b/python/pyarrow/table.pxi @@ -1405,6 +1405,7 @@ def chunked_array(arrays, type=None): c_result = GetResultValue(CChunkedArray.Make(c_arrays, c_type)) return pyarrow_wrap_chunked_array(c_result) + def _reconstruct_chunked_array(restore_table, buffer): """ Restore an IPC serialized ChunkedArray. diff --git a/python/pyarrow/tests/test_array.py b/python/pyarrow/tests/test_array.py index b7fa17985aeb2..47a6a2a4c9a17 100644 --- a/python/pyarrow/tests/test_array.py +++ b/python/pyarrow/tests/test_array.py @@ -1985,35 +1985,35 @@ def test_cast_identities(ty, values): pickle_test_parametrize = pytest.mark.parametrize( - ('data', 'typ'), - [ - # Int array - (list(range(999)) + [None], pa.int64()), - # Float array - (list(map(float, range(999))) + [None], pa.float64()), - # Boolean array - ([True, False, None, True] * 250, pa.bool_()), - # String array - (['a', 'b', 'cd', None, 'efg'] * 200, pa.string()), - # List array - ([[1, 2], [3], [None, 4, 5], [6]] * 250, pa.list_(pa.int64())), - # Large list array - ( - [[4, 5], [6], [None, 7], [8, 9, 10]] * 250, - pa.large_list(pa.int16()) - ), - # String list array - ( - [['a'], None, ['b', 'cd'], ['efg']] * 250, - pa.list_(pa.string()) - ), - # Struct array - ( - [(1, 'a'), (2, 'c'), None, (3, 'b')] * 250, - pa.struct([pa.field('a', pa.int64()), pa.field('b', pa.string())]) - ), - # Empty array - ]) + ('data', 'typ'), + [ + # Int array + (list(range(999)) + [None], pa.int64()), + # Float array + (list(map(float, range(999))) + [None], pa.float64()), + # Boolean array + ([True, False, None, True] * 250, pa.bool_()), + # String array + (['a', 'b', 'cd', None, 'efg'] * 200, pa.string()), + # List array + ([[1, 2], [3], [None, 4, 5], [6]] * 250, pa.list_(pa.int64())), + # Large list array + ( + [[4, 5], [6], [None, 7], [8, 9, 10]] * 250, + pa.large_list(pa.int16()) + ), + # String list array + ( + [['a'], None, ['b', 'cd'], ['efg']] * 250, + pa.list_(pa.string()) + ), + # Struct array + ( + [(1, 'a'), (2, 'c'), None, (3, 'b')] * 250, + pa.struct([pa.field('a', pa.int64()), pa.field('b', pa.string())]) + ), + # Empty array + ]) @pickle_test_parametrize @@ -2064,6 +2064,7 @@ def test_array_pickle_protocol5(data, typ, pickle_module): for buf in result.buffers()] assert result_addresses == addresses + @pickle_test_parametrize def test_array_pickle_slice_truncation(data, typ, pickle_module): arr = pa.array(data, type=typ) @@ -2083,11 +2084,11 @@ def test_array_pickle_slice_truncation(data, typ, pickle_module): # Check that pickling reset the offset assert post_pickle_slice.offset == 0 - # Check that after pickling the slice buffer was trimmed to only contain the sliced data + # After pickling the slice buffer trimmed to only contain the sliced data buf_size = arr.get_total_buffer_size() post_pickle_slice_buf_size = post_pickle_slice.get_total_buffer_size() - assert buf_size / post_pickle_slice_buf_size - len(arr) / len(post_pickle_slice) < 10 - + assert buf_size / post_pickle_slice_buf_size - \ + len(arr) / len(post_pickle_slice) < 10 @pytest.mark.parametrize( diff --git a/python/pyarrow/tests/test_table.py b/python/pyarrow/tests/test_table.py index 7a81918acaae5..a98bac3670fc9 100644 --- a/python/pyarrow/tests/test_table.py +++ b/python/pyarrow/tests/test_table.py @@ -285,8 +285,9 @@ def ne(xarrs, yarrs): # ARROW-4822 assert not pa.chunked_array([], type=pa.int32()).equals(None) + pickle_test_parametrize = pytest.mark.parametrize( - ('data', 'typ'), + ('data', 'typ'), [ # Int array (list(range(999)) + [None], pa.int64()), @@ -316,6 +317,7 @@ def ne(xarrs, yarrs): ] ) + @pickle_test_parametrize def test_chunked_array_pickle(data, typ, pickle_module): arrays = [] @@ -328,6 +330,7 @@ def test_chunked_array_pickle(data, typ, pickle_module): result.validate() assert result.equals(array) + @pickle_test_parametrize def test_chunked_array_pickle_slice_truncation(data, typ, pickle_module): # create chunked array @@ -339,7 +342,7 @@ def test_chunked_array_pickle_slice_truncation(data, typ, pickle_module): array.validate() serialised_array = pickle_module.dumps(array) - slice_array = array.slice(10, 20) + slice_array = array.slice(10, 20) serialised_slice = pickle_module.dumps(slice_array) # Check truncation upon serialisation @@ -356,9 +359,8 @@ def test_chunked_array_pickle_slice_truncation(data, typ, pickle_module): # Check that after pickling the slice buffer was trimmed to only contain sliced data buf_size = array.get_total_buffer_size() post_pickle_slice_buf_size = post_pickle_slice.get_total_buffer_size() - assert buf_size / post_pickle_slice_buf_size - len(array) / len(post_pickle_slice) < 10 - - + assert buf_size / post_pickle_slice_buf_size - \ + len(array) / len(post_pickle_slice) < 10 @pytest.mark.pandas @@ -727,6 +729,7 @@ def test_recordbatch_pickle(pickle_module): assert result.equals(batch) assert result.schema == schema + def test_recordbatch_pickle_slice_truncation(pickle_module): data = [ # Int array @@ -752,14 +755,14 @@ def test_recordbatch_pickle_slice_truncation(pickle_module): ] fields = [ - pa.field('ints', pa.int64()), - pa.field('floats', pa.float64()), - pa.field('bools', pa.bool_()), - pa.field("strs", pa.string()), - pa.field("lists", pa.list_(pa.int64())), - pa.field("large-lists", pa.large_list(pa.int16())), - pa.field("str-list", pa.list_(pa.string())), - ] + pa.field('ints', pa.int64()), + pa.field('floats', pa.float64()), + pa.field('bools', pa.bool_()), + pa.field("strs", pa.string()), + pa.field("lists", pa.list_(pa.int64())), + pa.field("large-lists", pa.large_list(pa.int16())), + pa.field("str-list", pa.list_(pa.string())), + ] schema = pa.schema(fields, metadata={b'foo': b'bar'}) batch = pa.record_batch(data, schema=schema) @@ -783,7 +786,8 @@ def test_recordbatch_pickle_slice_truncation(pickle_module): # Post-pickle slice buffer should only contain the sliced data buf_size = batch.get_total_buffer_size() post_pickle_slice_buf_size = post_pickle_slice.get_total_buffer_size() - assert buf_size / post_pickle_slice_buf_size - len(batch) / len(post_pickle_slice) < 10 + assert buf_size / post_pickle_slice_buf_size - \ + len(batch) / len(post_pickle_slice) < 10 def test_recordbatch_get_field(): @@ -1142,6 +1146,7 @@ def test_table_pickle(pickle_module): result.validate() assert result.equals(table) + def test_table_pickle_slice_truncation(pickle_module): data = [ # Int array @@ -1153,7 +1158,8 @@ def test_table_pickle_slice_truncation(pickle_module): # String array pa.chunked_array([['a', 'b', 'cd', None, 'efg']] * 200, type=pa.string()), # List array - pa.chunked_array([[[1, 2], [3], [None, 4, 5], [6]]] * 250, type=pa.list_(pa.int64())), + pa.chunked_array([[[1, 2], [3], [None, 4, 5], [6]]] + * 250, type=pa.list_(pa.int64())), # Large list array pa.chunked_array( [[[4, 5], [6], [None, 7], [8, 9, 10]]] * 250, @@ -1167,16 +1173,16 @@ def test_table_pickle_slice_truncation(pickle_module): ] fields = [ - pa.field('ints', pa.int64()), - pa.field('floats', pa.float64()), - pa.field('bools', pa.bool_()), - pa.field("strings", pa.string()), - pa.field("lists", pa.list_(pa.int64())), - pa.field("large-lists", pa.large_list(pa.int16())), - pa.field("string-list", pa.list_(pa.string())), - ] + pa.field('ints', pa.int64()), + pa.field('floats', pa.float64()), + pa.field('bools', pa.bool_()), + pa.field("strings", pa.string()), + pa.field("lists", pa.list_(pa.int64())), + pa.field("large-lists", pa.large_list(pa.int16())), + pa.field("string-list", pa.list_(pa.string())), + ] - schema = pa.schema(fields, metadata= {b'foo': b'bar'}) + schema = pa.schema(fields, metadata={b'foo': b'bar'}) table = pa.Table.from_arrays(data, schema=schema) serialised_table = pickle_module.dumps(table) @@ -1200,7 +1206,8 @@ def test_table_pickle_slice_truncation(pickle_module): # Post-pickle slice buffer only contains slice data buf_size = table.get_total_buffer_size() post_pickle_slice_buf_size = post_pickle_slice.get_total_buffer_size() - assert buf_size / post_pickle_slice_buf_size - len(table) / len(post_pickle_slice) < 10 + assert buf_size / post_pickle_slice_buf_size - \ + len(table) / len(post_pickle_slice) < 10 def test_table_get_field(): From d1cdc4e30abb0e7ed520ad33e72dbd0f6184876f Mon Sep 17 00:00:00 2001 From: anjakefala Date: Tue, 12 Sep 2023 12:10:43 -0700 Subject: [PATCH 3/5] Fix variable names --- python/pyarrow/tests/test_array.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/python/pyarrow/tests/test_array.py b/python/pyarrow/tests/test_array.py index 47a6a2a4c9a17..7f8a7add55db5 100644 --- a/python/pyarrow/tests/test_array.py +++ b/python/pyarrow/tests/test_array.py @@ -2068,15 +2068,15 @@ def test_array_pickle_protocol5(data, typ, pickle_module): @pickle_test_parametrize def test_array_pickle_slice_truncation(data, typ, pickle_module): arr = pa.array(data, type=typ) - serialized = pickle_module.dumps(arr) + serialized_arr = pickle_module.dumps(arr) slice_arr = arr.slice(10, 2) - serialized = pickle_module.dumps(slice_arr) + serialized_slice = pickle_module.dumps(slice_arr) # Check truncation upon serialization - assert len(serialized) <= 0.2 * len(serialized) + assert len(serialized_slice) <= 0.2 * len(serialized_arr) - post_pickle_slice = pickle_module.loads(serialized) + post_pickle_slice = pickle_module.loads(serialized_slice) # Check for post-roundtrip equality assert post_pickle_slice.equals(slice_arr) From d79b837298a2106f6ef8cf58d124886f90f708e0 Mon Sep 17 00:00:00 2001 From: anjakefala Date: Thu, 5 Oct 2023 14:44:40 -0700 Subject: [PATCH 4/5] Refactor pickle tests to use test-specific parameters --- python/pyarrow/tests/test_array.py | 76 ++++++++++++++++++------------ python/pyarrow/tests/test_table.py | 46 +++++++++++------- 2 files changed, 75 insertions(+), 47 deletions(-) diff --git a/python/pyarrow/tests/test_array.py b/python/pyarrow/tests/test_array.py index 7f8a7add55db5..e3feed720afeb 100644 --- a/python/pyarrow/tests/test_array.py +++ b/python/pyarrow/tests/test_array.py @@ -1985,35 +1985,21 @@ def test_cast_identities(ty, values): pickle_test_parametrize = pytest.mark.parametrize( - ('data', 'typ'), - [ - # Int array - (list(range(999)) + [None], pa.int64()), - # Float array - (list(map(float, range(999))) + [None], pa.float64()), - # Boolean array - ([True, False, None, True] * 250, pa.bool_()), - # String array - (['a', 'b', 'cd', None, 'efg'] * 200, pa.string()), - # List array - ([[1, 2], [3], [None, 4, 5], [6]] * 250, pa.list_(pa.int64())), - # Large list array - ( - [[4, 5], [6], [None, 7], [8, 9, 10]] * 250, - pa.large_list(pa.int16()) - ), - # String list array - ( - [['a'], None, ['b', 'cd'], ['efg']] * 250, - pa.list_(pa.string()) - ), - # Struct array - ( - [(1, 'a'), (2, 'c'), None, (3, 'b')] * 250, - pa.struct([pa.field('a', pa.int64()), pa.field('b', pa.string())]) - ), - # Empty array - ]) + ('data', 'typ'), + [ + ([True, False, True, True], pa.bool_()), + ([1, 2, 4, 6], pa.int64()), + ([1.0, 2.5, None], pa.float64()), + (['a', None, 'b'], pa.string()), + ([], None), + ([[1, 2], [3]], pa.list_(pa.int64())), + ([[4, 5], [6]], pa.large_list(pa.int16())), + ([['a'], None, ['b', 'c']], pa.list_(pa.string())), + ([(1, 'a'), (2, 'c'), None], + pa.struct([pa.field('a', pa.int64()), pa.field('b', pa.string())])) + ] + ) + @pickle_test_parametrize @@ -2065,7 +2051,37 @@ def test_array_pickle_protocol5(data, typ, pickle_module): assert result_addresses == addresses -@pickle_test_parametrize +@pytest.mark.parametrize( + ('data', 'typ'), + [ + # Int array + (list(range(999)) + [None], pa.int64()), + # Float array + (list(map(float, range(999))) + [None], pa.float64()), + # Boolean array + ([True, False, None, True] * 250, pa.bool_()), + # String array + (['a', 'b', 'cd', None, 'efg'] * 200, pa.string()), + # List array + ([[1, 2], [3], [None, 4, 5], [6]] * 250, pa.list_(pa.int64())), + # Large list array + ( + [[4, 5], [6], [None, 7], [8, 9, 10]] * 250, + pa.large_list(pa.int16()) + ), + # String list array + ( + [['a'], None, ['b', 'cd'], ['efg']] * 250, + pa.list_(pa.string()) + ), + # Struct array + ( + [(1, 'a'), (2, 'c'), None, (3, 'b')] * 250, + pa.struct([pa.field('a', pa.int64()), pa.field('b', pa.string())]) + ), + # Empty array + ]) + def test_array_pickle_slice_truncation(data, typ, pickle_module): arr = pa.array(data, type=typ) serialized_arr = pickle_module.dumps(arr) diff --git a/python/pyarrow/tests/test_table.py b/python/pyarrow/tests/test_table.py index a98bac3670fc9..d6a467f53867c 100644 --- a/python/pyarrow/tests/test_table.py +++ b/python/pyarrow/tests/test_table.py @@ -286,7 +286,35 @@ def ne(xarrs, yarrs): assert not pa.chunked_array([], type=pa.int32()).equals(None) -pickle_test_parametrize = pytest.mark.parametrize( + + +@pytest.mark.parametrize( + ('data', 'typ'), + [ + ([True, False, True, True], pa.bool_()), + ([1, 2, 4, 6], pa.int64()), + ([1.0, 2.5, None], pa.float64()), + (['a', None, 'b'], pa.string()), + ([], pa.list_(pa.uint8())), + ([[1, 2], [3]], pa.list_(pa.int64())), + ([['a'], None, ['b', 'c']], pa.list_(pa.string())), + ([(1, 'a'), (2, 'c'), None], + pa.struct([pa.field('a', pa.int64()), pa.field('b', pa.string())])) + ] +) +def test_chunked_array_pickle(data, typ, pickle_module): + arrays = [] + while data: + arrays.append(pa.array(data[:2], type=typ)) + data = data[2:] + array = pa.chunked_array(arrays, type=typ) + array.validate() + result = pickle_module.loads(pickle_module.dumps(array)) + result.validate() + assert result.equals(array) + + +@pytest.mark.parametrize( ('data', 'typ'), [ # Int array @@ -316,22 +344,6 @@ def ne(xarrs, yarrs): ), ] ) - - -@pickle_test_parametrize -def test_chunked_array_pickle(data, typ, pickle_module): - arrays = [] - while data: - arrays.append(pa.array(data[:2], type=typ)) - data = data[2:] - array = pa.chunked_array(arrays, type=typ) - array.validate() - result = pickle_module.loads(pickle_module.dumps(array)) - result.validate() - assert result.equals(array) - - -@pickle_test_parametrize def test_chunked_array_pickle_slice_truncation(data, typ, pickle_module): # create chunked array arrays = [] From 3e54f205f8216fe0f52f0e3af8ccd1e6b4cd3770 Mon Sep 17 00:00:00 2001 From: anjakefala Date: Thu, 5 Oct 2023 16:07:14 -0700 Subject: [PATCH 5/5] Run linter and fix indenting to minimise diff --- python/pyarrow/tests/test_array.py | 29 ++++++++++++++--------------- python/pyarrow/tests/test_table.py | 2 -- 2 files changed, 14 insertions(+), 17 deletions(-) diff --git a/python/pyarrow/tests/test_array.py b/python/pyarrow/tests/test_array.py index e3feed720afeb..406cdab77e176 100644 --- a/python/pyarrow/tests/test_array.py +++ b/python/pyarrow/tests/test_array.py @@ -1985,20 +1985,20 @@ def test_cast_identities(ty, values): pickle_test_parametrize = pytest.mark.parametrize( - ('data', 'typ'), - [ - ([True, False, True, True], pa.bool_()), - ([1, 2, 4, 6], pa.int64()), - ([1.0, 2.5, None], pa.float64()), - (['a', None, 'b'], pa.string()), - ([], None), - ([[1, 2], [3]], pa.list_(pa.int64())), - ([[4, 5], [6]], pa.large_list(pa.int16())), - ([['a'], None, ['b', 'c']], pa.list_(pa.string())), - ([(1, 'a'), (2, 'c'), None], - pa.struct([pa.field('a', pa.int64()), pa.field('b', pa.string())])) - ] - ) + ('data', 'typ'), + [ + ([True, False, True, True], pa.bool_()), + ([1, 2, 4, 6], pa.int64()), + ([1.0, 2.5, None], pa.float64()), + (['a', None, 'b'], pa.string()), + ([], None), + ([[1, 2], [3]], pa.list_(pa.int64())), + ([[4, 5], [6]], pa.large_list(pa.int16())), + ([['a'], None, ['b', 'c']], pa.list_(pa.string())), + ([(1, 'a'), (2, 'c'), None], + pa.struct([pa.field('a', pa.int64()), pa.field('b', pa.string())])) + ] +) @@ -2081,7 +2081,6 @@ def test_array_pickle_protocol5(data, typ, pickle_module): ), # Empty array ]) - def test_array_pickle_slice_truncation(data, typ, pickle_module): arr = pa.array(data, type=typ) serialized_arr = pickle_module.dumps(arr) diff --git a/python/pyarrow/tests/test_table.py b/python/pyarrow/tests/test_table.py index d6a467f53867c..4776296f17fb5 100644 --- a/python/pyarrow/tests/test_table.py +++ b/python/pyarrow/tests/test_table.py @@ -286,8 +286,6 @@ def ne(xarrs, yarrs): assert not pa.chunked_array([], type=pa.int32()).equals(None) - - @pytest.mark.parametrize( ('data', 'typ'), [