From 7b29a55880fb207fa06daf06ae95423d8e2b8e03 Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Tue, 22 Nov 2016 16:10:12 -0800 Subject: [PATCH 1/9] Initial working version of RecordBatch list to_pandas, need more tests and cleanup --- python/pyarrow/__init__.py | 4 +- python/pyarrow/includes/libarrow.pxd | 3 + python/pyarrow/table.pyx | 50 +++++ python/pyarrow/tests/test_convert_pandas.py | 3 + python/pyarrow/tests/test_table.py | 18 ++ python/src/pyarrow/adapters/pandas.cc | 211 ++++++++++++-------- 6 files changed, 209 insertions(+), 80 deletions(-) diff --git a/python/pyarrow/__init__.py b/python/pyarrow/__init__.py index 775ce7ec47578..f1ab90a848a5d 100644 --- a/python/pyarrow/__init__.py +++ b/python/pyarrow/__init__.py @@ -41,5 +41,7 @@ list_, struct, field, DataType, Field, Schema, schema) -from pyarrow.table import Column, RecordBatch, Table, from_pandas_dataframe +from pyarrow.table import (Column, RecordBatch, RecordBatchList, Table, + from_pandas_dataframe) + from pyarrow.version import version as __version__ diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index 3ae1789170303..c88d610964b07 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -155,6 +155,9 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil: CColumn(const shared_ptr[CField]& field, const shared_ptr[CArray]& data) + CColumn(const shared_ptr[CField]& field, + const vector[shared_ptr[CArray]]& chunks) + int64_t length() int64_t null_count() const c_string& name() diff --git a/python/pyarrow/table.pyx b/python/pyarrow/table.pyx index 5459f26b80aa4..05040fb141b77 100644 --- a/python/pyarrow/table.pyx +++ b/python/pyarrow/table.pyx @@ -296,6 +296,56 @@ cdef class RecordBatch: return result +cdef class RecordBatchList: + ''' + A utility class used for conversions with RecordBatch lists + ''' + + @staticmethod + def to_pandas(batches): + cdef: + vector[vector[shared_ptr[CArray]]] c_array_vec + shared_ptr[CColumn] c_col + PyObject* np_arr + Array arr + Schema schema + + import pandas as pd + + schema = batches[0].schema + K = batches[0].num_columns + + # TODO - check schemas are equal + + # make a vector of ArrayVectors to store column chunks + for i in range(K): + c_array_vec.push_back(vector[shared_ptr[CArray]]()) + + # copy each batch into a chunk + for batch in batches: + for i in range(K): + arr = batch[i] + c_array_vec[i].push_back(arr.sp_array) + + # create columns from the chunks + names = [] + data = [] + for i in range(K): + c_col.reset(new CColumn(schema.sp_schema.get().field(i), c_array_vec[i])) + # TODO - why need PyOject ref? arr is placeholder + check_status(pyarrow.ConvertColumnToPandas( + c_col, arr, &np_arr)) + names.append(frombytes(c_col.get().name())) + data.append(PyObject_to_object(np_arr)) + + return pd.DataFrame(dict(zip(names, data)), columns=names) + + ''' + frames = [batch.to_pandas() for batch in batches] + return pd.concat(frames, ignore_index=True) + ''' + + cdef class Table: ''' A collection of top-level named, equal length Arrow arrays. diff --git a/python/pyarrow/tests/test_convert_pandas.py b/python/pyarrow/tests/test_convert_pandas.py index b527ca7e80816..447769ba04b7c 100644 --- a/python/pyarrow/tests/test_convert_pandas.py +++ b/python/pyarrow/tests/test_convert_pandas.py @@ -204,6 +204,9 @@ def test_timestamps_notimezone_nulls(self): }) self._check_pandas_roundtrip(df, timestamps_to_ms=False) + def test_chunked_array_conversion(self): + pass + # def test_category(self): # repeats = 1000 # values = [b'foo', None, u'bar', 'qux', np.nan] diff --git a/python/pyarrow/tests/test_table.py b/python/pyarrow/tests/test_table.py index 4c9d302106af8..ea65a14738ab9 100644 --- a/python/pyarrow/tests/test_table.py +++ b/python/pyarrow/tests/test_table.py @@ -50,6 +50,24 @@ def test_recordbatch_from_to_pandas(): assert_frame_equal(data, result) +def test_recordbatchlist_to_pandas(): + # NOTE - dtype='uint32' is converted to float64 when not zero copy + data = pd.DataFrame({ + 'c1': np.array([1.0, 2.0, 3.0, 4.0, 5.0], dtype='float64'), + 'c2': ['foo', 'bar', None, 'baz', 'qux'] + }) + + split = int(len(data) / 2) + data1 = data[:split] + data2 = data[split:] + + batch1 = pa.RecordBatch.from_pandas(data1) + batch2 = pa.RecordBatch.from_pandas(data2) + + result = pa.RecordBatchList.to_pandas([batch1, batch2]) + assert_frame_equal(data, result) + + def test_table_basics(): data = [ pa.from_pylist(range(5)), diff --git a/python/src/pyarrow/adapters/pandas.cc b/python/src/pyarrow/adapters/pandas.cc index 1f5b7009e6a08..433d3e59d0d29 100644 --- a/python/src/pyarrow/adapters/pandas.cc +++ b/python/src/pyarrow/adapters/pandas.cc @@ -597,14 +597,10 @@ class ArrowDeserializer { Status Convert(PyObject** out) { const std::shared_ptr data = col_->data(); - if (data->num_chunks() > 1) { - return Status::NotImplemented("Chunked column conversion NYI"); - } - - auto chunk = data->chunk(0); - RETURN_NOT_OK(ConvertValues(chunk)); + RETURN_NOT_OK(ConvertValues(data)); *out = reinterpret_cast(out_); + return Status::OK(); } @@ -634,7 +630,7 @@ class ArrowDeserializer { if (out_ == NULL) { // Error occurred, trust that SimpleNew set the error state - return Status::OK(); + return Status::OK();//Invalid("Error in PyArray_SimpleNewFromData"); } set_numpy_metadata(type, col_->type().get(), out_); @@ -654,27 +650,54 @@ class ArrowDeserializer { } template - inline typename std::enable_if< - arrow_traits::is_pandas_numeric_nullable, Status>::type - ConvertValues(const std::shared_ptr& arr) { + Status ConvertValuesZeroCopy(std::shared_ptr arr) { typedef typename arrow_traits::T T; arrow::PrimitiveArray* prim_arr = static_cast( arr.get()); const T* in_values = reinterpret_cast(prim_arr->data()->data()); - if (arr->null_count() > 0) { - RETURN_NOT_OK(AllocateOutput(arrow_traits::npy_type)); + // Zero-Copy. We can pass the data pointer directly to NumPy. + void* data = const_cast(in_values); + int type = arrow_traits::npy_type; + RETURN_NOT_OK(OutputFromData(type, data)); + + return Status::OK(); + } - T* out_values = reinterpret_cast(PyArray_DATA(out_)); - for (int64_t i = 0; i < arr->length(); ++i) { - out_values[i] = arr->IsNull(i) ? arrow_traits::na_value : in_values[i]; + template + inline typename std::enable_if< + arrow_traits::is_pandas_numeric_nullable, Status>::type + ConvertValues(const std::shared_ptr& data) { + typedef typename arrow_traits::T T; + bool has_null_values = data->null_count() > 0; + size_t chunk_offset = 0; + + if (data->num_chunks() == 1 && !has_null_values) { + return ConvertValuesZeroCopy(data->chunk(0)); + } + + RETURN_NOT_OK(AllocateOutput(arrow_traits::npy_type)); + + for (int c = 0; c < data->num_chunks(); c++) { + const std::shared_ptr arr = data->chunk(c); + arrow::PrimitiveArray* prim_arr = static_cast( + arr.get()); + const T* in_values = reinterpret_cast(prim_arr->data()->data()); + T* out_values = reinterpret_cast(PyArray_DATA(out_)) + chunk_offset; + + if (has_null_values) { + for (int64_t i = 0; i < arr->length(); ++i) { + out_values[i] = arr->IsNull(i) ? arrow_traits::na_value : in_values[i]; + } + } else { + // TODO - copy entire chunck? + for (int64_t i = 0; i < arr->length(); ++i) { + out_values[i] = in_values[i]; + } } - } else { - // Zero-Copy. We can pass the data pointer directly to NumPy. - void* data = const_cast(in_values); - int type = arrow_traits::npy_type; - RETURN_NOT_OK(OutputFromData(type, data)); + + chunk_offset += arr->length(); } return Status::OK(); @@ -684,27 +707,37 @@ class ArrowDeserializer { template inline typename std::enable_if< arrow_traits::is_pandas_numeric_not_nullable, Status>::type - ConvertValues(const std::shared_ptr& arr) { + ConvertValues(const std::shared_ptr& data) { typedef typename arrow_traits::T T; + bool has_null_values = data->null_count() > 0; + size_t chunk_offset = 0; - arrow::PrimitiveArray* prim_arr = static_cast( - arr.get()); - - const T* in_values = reinterpret_cast(prim_arr->data()->data()); + if (data->num_chunks() == 1 && !has_null_values) { + return ConvertValuesZeroCopy(data->chunk(0)); + } - if (arr->null_count() > 0) { - RETURN_NOT_OK(AllocateOutput(NPY_FLOAT64)); + // TODO - why cast to double? + RETURN_NOT_OK(AllocateOutput(NPY_FLOAT64)); + for (int c = 0; c < data->num_chunks(); c++) { + const std::shared_ptr arr = data->chunk(c); + arrow::PrimitiveArray* prim_arr = static_cast( + arr.get()); + const T* in_values = reinterpret_cast(prim_arr->data()->data()); // Upcast to double, set NaN as appropriate - double* out_values = reinterpret_cast(PyArray_DATA(out_)); - for (int i = 0; i < arr->length(); ++i) { - out_values[i] = prim_arr->IsNull(i) ? NAN : in_values[i]; + double* out_values = reinterpret_cast(PyArray_DATA(out_)) + chunk_offset; + + if (arr->null_count() > 0) { + for (int i = 0; i < arr->length(); ++i) { + out_values[i] = prim_arr->IsNull(i) ? NAN : in_values[i]; + } + } else { + for (int i = 0; i < arr->length(); ++i) { + out_values[i] = in_values[i]; + } } - } else { - // Zero-Copy. We can pass the data pointer directly to NumPy. - void* data = const_cast(in_values); - int type = arrow_traits::npy_type; - RETURN_NOT_OK(OutputFromData(type, data)); + + chunk_offset += arr->length(); } return Status::OK(); @@ -714,35 +747,48 @@ class ArrowDeserializer { template inline typename std::enable_if< arrow_traits::is_boolean, Status>::type - ConvertValues(const std::shared_ptr& arr) { + ConvertValues(const std::shared_ptr& data) { + size_t chunk_offset = 0; PyAcquireGIL lock; - arrow::BooleanArray* bool_arr = static_cast(arr.get()); - - if (arr->null_count() > 0) { + if (data->null_count() > 0) { RETURN_NOT_OK(AllocateOutput(NPY_OBJECT)); - PyObject** out_values = reinterpret_cast(PyArray_DATA(out_)); - for (int64_t i = 0; i < arr->length(); ++i) { - if (bool_arr->IsNull(i)) { - Py_INCREF(Py_None); - out_values[i] = Py_None; - } else if (bool_arr->Value(i)) { - // True - Py_INCREF(Py_True); - out_values[i] = Py_True; - } else { - // False - Py_INCREF(Py_False); - out_values[i] = Py_False; + for (int c = 0; c < data->num_chunks(); c++) { + const std::shared_ptr arr = data->chunk(c); + arrow::BooleanArray* bool_arr = static_cast(arr.get()); + PyObject** out_values = reinterpret_cast(PyArray_DATA(out_)) + chunk_offset; + + for (int64_t i = 0; i < arr->length(); ++i) { + if (bool_arr->IsNull(i)) { + Py_INCREF(Py_None); + out_values[i] = Py_None; + } else if (bool_arr->Value(i)) { + // True + Py_INCREF(Py_True); + out_values[i] = Py_True; + } else { + // False + Py_INCREF(Py_False); + out_values[i] = Py_False; + } } + + chunk_offset += bool_arr->length(); } } else { RETURN_NOT_OK(AllocateOutput(arrow_traits::npy_type)); - uint8_t* out_values = reinterpret_cast(PyArray_DATA(out_)); - for (int64_t i = 0; i < arr->length(); ++i) { - out_values[i] = static_cast(bool_arr->Value(i)); + for (int c = 0; c < data->num_chunks(); c++) { + const std::shared_ptr arr = data->chunk(c); + arrow::BooleanArray* bool_arr = static_cast(arr.get()); + uint8_t* out_values = reinterpret_cast(PyArray_DATA(out_)) + chunk_offset; + + for (int64_t i = 0; i < arr->length(); ++i) { + out_values[i] = static_cast(bool_arr->Value(i)); + } + + chunk_offset += bool_arr->length(); } } @@ -753,42 +799,49 @@ class ArrowDeserializer { template inline typename std::enable_if< T2 == arrow::Type::STRING, Status>::type - ConvertValues(const std::shared_ptr& arr) { + ConvertValues(const std::shared_ptr& data) { + size_t chunk_offset = 0; PyAcquireGIL lock; RETURN_NOT_OK(AllocateOutput(NPY_OBJECT)); - PyObject** out_values = reinterpret_cast(PyArray_DATA(out_)); - - arrow::StringArray* string_arr = static_cast(arr.get()); - - const uint8_t* data; - int32_t length; - if (arr->null_count() > 0) { - for (int64_t i = 0; i < arr->length(); ++i) { - if (string_arr->IsNull(i)) { - Py_INCREF(Py_None); - out_values[i] = Py_None; - } else { - data = string_arr->GetValue(i, &length); - - out_values[i] = make_pystring(data, length); + for (int c = 0; c < data->num_chunks(); c++) { + const std::shared_ptr arr = data->chunk(c); + arrow::StringArray* string_arr = static_cast(arr.get()); + PyObject** out_values = reinterpret_cast(PyArray_DATA(out_)) + chunk_offset; + + const uint8_t* data_ptr; + int32_t length; + if (data->null_count() > 0) { + for (int64_t i = 0; i < arr->length(); ++i) { + if (string_arr->IsNull(i)) { + Py_INCREF(Py_None); + out_values[i] = Py_None; + } else { + data_ptr = string_arr->GetValue(i, &length); + + out_values[i] = make_pystring(data_ptr, length); + if (out_values[i] == nullptr) { + return Status::UnknownError("String initialization failed"); + } + } + } + } else { + for (int64_t i = 0; i < arr->length(); ++i) { + data_ptr = string_arr->GetValue(i, &length); + out_values[i] = make_pystring(data_ptr, length); if (out_values[i] == nullptr) { return Status::UnknownError("String initialization failed"); } } } - } else { - for (int64_t i = 0; i < arr->length(); ++i) { - data = string_arr->GetValue(i, &length); - out_values[i] = make_pystring(data, length); - if (out_values[i] == nullptr) { - return Status::UnknownError("String initialization failed"); - } - } + + chunk_offset += string_arr->length(); } + return Status::OK(); } + private: std::shared_ptr col_; PyObject* py_ref_; From 398b18d121141850d7adce25424d9cc442b27567 Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Sun, 27 Nov 2016 21:59:20 -0800 Subject: [PATCH 2/9] Fixed case for Integer specialization without nulls --- python/pyarrow/tests/test_table.py | 19 ++++++---- python/src/pyarrow/adapters/pandas.cc | 54 ++++++++++++++------------- 2 files changed, 41 insertions(+), 32 deletions(-) diff --git a/python/pyarrow/tests/test_table.py b/python/pyarrow/tests/test_table.py index ea65a14738ab9..fed893a68d22a 100644 --- a/python/pyarrow/tests/test_table.py +++ b/python/pyarrow/tests/test_table.py @@ -51,20 +51,25 @@ def test_recordbatch_from_to_pandas(): def test_recordbatchlist_to_pandas(): - # NOTE - dtype='uint32' is converted to float64 when not zero copy - data = pd.DataFrame({ - 'c1': np.array([1.0, 2.0, 3.0, 4.0, 5.0], dtype='float64'), - 'c2': ['foo', 'bar', None, 'baz', 'qux'] + data1 = pd.DataFrame({ + 'c1': np.array([1, 1, 2], dtype='uint32'), + 'c2': np.array([1.0, 2.0, 3.0], dtype='float64'), + 'c3': [True, None, False], + 'c4': ['foo', 'bar', None] }) - split = int(len(data) / 2) - data1 = data[:split] - data2 = data[split:] + data2 = pd.DataFrame({ + 'c1': np.array([3, 5], dtype='uint32'), + 'c2': np.array([4.0, 5.0], dtype='float64'), + 'c3': [True, True], + 'c4': ['baz', 'qux'] + }) batch1 = pa.RecordBatch.from_pandas(data1) batch2 = pa.RecordBatch.from_pandas(data2) result = pa.RecordBatchList.to_pandas([batch1, batch2]) + data = pd.concat([data1, data2], ignore_index=True) assert_frame_equal(data, result) diff --git a/python/src/pyarrow/adapters/pandas.cc b/python/src/pyarrow/adapters/pandas.cc index 433d3e59d0d29..008b29a161f93 100644 --- a/python/src/pyarrow/adapters/pandas.cc +++ b/python/src/pyarrow/adapters/pandas.cc @@ -630,7 +630,7 @@ class ArrowDeserializer { if (out_ == NULL) { // Error occurred, trust that SimpleNew set the error state - return Status::OK();//Invalid("Error in PyArray_SimpleNewFromData"); + return Status::OK(); } set_numpy_metadata(type, col_->type().get(), out_); @@ -670,10 +670,9 @@ class ArrowDeserializer { arrow_traits::is_pandas_numeric_nullable, Status>::type ConvertValues(const std::shared_ptr& data) { typedef typename arrow_traits::T T; - bool has_null_values = data->null_count() > 0; size_t chunk_offset = 0; - if (data->num_chunks() == 1 && !has_null_values) { + if (data->num_chunks() == 1 && data->null_count() == 0) { return ConvertValuesZeroCopy(data->chunk(0)); } @@ -686,15 +685,12 @@ class ArrowDeserializer { const T* in_values = reinterpret_cast(prim_arr->data()->data()); T* out_values = reinterpret_cast(PyArray_DATA(out_)) + chunk_offset; - if (has_null_values) { + if (arr->null_count() > 0) { for (int64_t i = 0; i < arr->length(); ++i) { out_values[i] = arr->IsNull(i) ? arrow_traits::na_value : in_values[i]; } } else { - // TODO - copy entire chunck? - for (int64_t i = 0; i < arr->length(); ++i) { - out_values[i] = in_values[i]; - } + memcpy(out_values, in_values, sizeof(T) * arr->length()); } chunk_offset += arr->length(); @@ -709,35 +705,43 @@ class ArrowDeserializer { arrow_traits::is_pandas_numeric_not_nullable, Status>::type ConvertValues(const std::shared_ptr& data) { typedef typename arrow_traits::T T; - bool has_null_values = data->null_count() > 0; size_t chunk_offset = 0; - if (data->num_chunks() == 1 && !has_null_values) { + if (data->num_chunks() == 1 && data->null_count() == 0) { return ConvertValuesZeroCopy(data->chunk(0)); } - // TODO - why cast to double? - RETURN_NOT_OK(AllocateOutput(NPY_FLOAT64)); + if (data->null_count() > 0) { + RETURN_NOT_OK(AllocateOutput(NPY_FLOAT64)); - for (int c = 0; c < data->num_chunks(); c++) { - const std::shared_ptr arr = data->chunk(c); - arrow::PrimitiveArray* prim_arr = static_cast( - arr.get()); - const T* in_values = reinterpret_cast(prim_arr->data()->data()); - // Upcast to double, set NaN as appropriate - double* out_values = reinterpret_cast(PyArray_DATA(out_)) + chunk_offset; + for (int c = 0; c < data->num_chunks(); c++) { + const std::shared_ptr arr = data->chunk(c); + arrow::PrimitiveArray* prim_arr = static_cast( + arr.get()); + const T* in_values = reinterpret_cast(prim_arr->data()->data()); + // Upcast to double, set NaN as appropriate + double* out_values = reinterpret_cast(PyArray_DATA(out_)) + chunk_offset; - if (arr->null_count() > 0) { for (int i = 0; i < arr->length(); ++i) { out_values[i] = prim_arr->IsNull(i) ? NAN : in_values[i]; } - } else { - for (int i = 0; i < arr->length(); ++i) { - out_values[i] = in_values[i]; - } + + chunk_offset += arr->length(); } + } else { + RETURN_NOT_OK(AllocateOutput(arrow_traits::npy_type)); - chunk_offset += arr->length(); + for (int c = 0; c < data->num_chunks(); c++) { + const std::shared_ptr arr = data->chunk(c); + arrow::PrimitiveArray* prim_arr = static_cast( + arr.get()); + const T* in_values = reinterpret_cast(prim_arr->data()->data()); + T* out_values = reinterpret_cast(PyArray_DATA(out_)) + chunk_offset; + + memcpy(out_values, in_values, sizeof(T) * arr->length()); + + chunk_offset += arr->length(); + } } return Status::OK(); From 3ee51e625f5c072dba5b8918682585f8014fdcf9 Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Sun, 27 Nov 2016 22:01:40 -0800 Subject: [PATCH 3/9] cleanup --- python/pyarrow/__init__.py | 2 +- python/pyarrow/tests/test_convert_pandas.py | 3 --- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/python/pyarrow/__init__.py b/python/pyarrow/__init__.py index f1ab90a848a5d..e3ba1f92f4062 100644 --- a/python/pyarrow/__init__.py +++ b/python/pyarrow/__init__.py @@ -41,7 +41,7 @@ list_, struct, field, DataType, Field, Schema, schema) -from pyarrow.table import (Column, RecordBatch, RecordBatchList, Table, +from pyarrow.table import (Column, RecordBatch, RecordBatchList, Table, from_pandas_dataframe) from pyarrow.version import version as __version__ diff --git a/python/pyarrow/tests/test_convert_pandas.py b/python/pyarrow/tests/test_convert_pandas.py index 447769ba04b7c..b527ca7e80816 100644 --- a/python/pyarrow/tests/test_convert_pandas.py +++ b/python/pyarrow/tests/test_convert_pandas.py @@ -204,9 +204,6 @@ def test_timestamps_notimezone_nulls(self): }) self._check_pandas_roundtrip(df, timestamps_to_ms=False) - def test_chunked_array_conversion(self): - pass - # def test_category(self): # repeats = 1000 # values = [b'foo', None, u'bar', 'qux', np.nan] From c3d7e8fd82878c288cfe6b4929626df64d5343ff Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Tue, 29 Nov 2016 16:27:09 -0800 Subject: [PATCH 4/9] Changed conversion to make Table from columns first, now conversion is now just a free function --- python/pyarrow/__init__.py | 2 +- python/pyarrow/includes/libarrow.pxd | 3 ++ python/pyarrow/table.pyx | 78 ++++++++++++++-------------- python/pyarrow/tests/test_table.py | 2 +- 4 files changed, 43 insertions(+), 42 deletions(-) diff --git a/python/pyarrow/__init__.py b/python/pyarrow/__init__.py index e3ba1f92f4062..d4d0f00c52346 100644 --- a/python/pyarrow/__init__.py +++ b/python/pyarrow/__init__.py @@ -41,7 +41,7 @@ list_, struct, field, DataType, Field, Schema, schema) -from pyarrow.table import (Column, RecordBatch, RecordBatchList, Table, +from pyarrow.table import (Column, RecordBatch, dataframe_from_batches, Table, from_pandas_dataframe) from pyarrow.version import version as __version__ diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index c88d610964b07..350ebe30c9b89 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -88,6 +88,9 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil: cdef cppclass CSchema" arrow::Schema": CSchema(const vector[shared_ptr[CField]]& fields) + + c_bool Equals(const shared_ptr[CSchema]& other) + const shared_ptr[CField]& field(int i) int num_fields() c_string ToString() diff --git a/python/pyarrow/table.pyx b/python/pyarrow/table.pyx index 05040fb141b77..ae59f1a96d99f 100644 --- a/python/pyarrow/table.pyx +++ b/python/pyarrow/table.pyx @@ -28,6 +28,7 @@ cimport pyarrow.includes.pyarrow as pyarrow import pyarrow.config from pyarrow.array cimport Array, box_arrow_array +from pyarrow.error import ArrowException from pyarrow.error cimport check_status from pyarrow.schema cimport box_data_type, box_schema @@ -296,54 +297,51 @@ cdef class RecordBatch: return result -cdef class RecordBatchList: - ''' - A utility class used for conversions with RecordBatch lists - ''' - - @staticmethod - def to_pandas(batches): - cdef: - vector[vector[shared_ptr[CArray]]] c_array_vec - shared_ptr[CColumn] c_col - PyObject* np_arr - Array arr - Schema schema +def dataframe_from_batches(batches): + """ + Convert a list of Arrow RecordBatches to one pandas.DataFrame - import pandas as pd + Parameters + ---------- - schema = batches[0].schema - K = batches[0].num_columns + batches: list of RecordBatch + RecordBatch list with identical schema to be converted + """ - # TODO - check schemas are equal + cdef: + vector[shared_ptr[CArray]] c_array_chunks + vector[shared_ptr[CColumn]] c_columns + shared_ptr[CTable] c_table + Array arr + Schema schema + Schema schema_comp - # make a vector of ArrayVectors to store column chunks - for i in range(K): - c_array_vec.push_back(vector[shared_ptr[CArray]]()) + import pandas as pd - # copy each batch into a chunk - for batch in batches: - for i in range(K): - arr = batch[i] - c_array_vec[i].push_back(arr.sp_array) + schema = batches[0].schema - # create columns from the chunks - names = [] - data = [] - for i in range(K): - c_col.reset(new CColumn(schema.sp_schema.get().field(i), c_array_vec[i])) - # TODO - why need PyOject ref? arr is placeholder - check_status(pyarrow.ConvertColumnToPandas( - c_col, arr, &np_arr)) - names.append(frombytes(c_col.get().name())) - data.append(PyObject_to_object(np_arr)) + # check schemas are equal + for i in range(1, len(batches)): + schema_comp = batches[i].schema + if not schema.sp_schema.get().Equals(schema_comp.sp_schema): + raise ArrowException("Error converting list of RecordBatches to DataFrame, not all schemas are equal") - return pd.DataFrame(dict(zip(names, data)), columns=names) + cdef int K = batches[0].num_columns - ''' - frames = [batch.to_pandas() for batch in batches] - return pd.concat(frames, ignore_index=True) - ''' + # create chunked columns from the batches + c_columns.resize(K) + for i in range(K): + for batch in batches: + arr = batch[i] + c_array_chunks.push_back(arr.sp_array) + c_columns[i].reset(new CColumn(schema.sp_schema.get().field(i), c_array_chunks)) + c_array_chunks.clear() + + # create a Table from columns and convert to DataFrame + c_table.reset(new CTable('', schema.sp_schema, c_columns)) + table = Table() + table.init(c_table) + return table.to_pandas() cdef class Table: diff --git a/python/pyarrow/tests/test_table.py b/python/pyarrow/tests/test_table.py index fed893a68d22a..3a3ac76a6b34b 100644 --- a/python/pyarrow/tests/test_table.py +++ b/python/pyarrow/tests/test_table.py @@ -68,7 +68,7 @@ def test_recordbatchlist_to_pandas(): batch1 = pa.RecordBatch.from_pandas(data1) batch2 = pa.RecordBatch.from_pandas(data2) - result = pa.RecordBatchList.to_pandas([batch1, batch2]) + result = pa.dataframe_from_batches([batch1, batch2]) data = pd.concat([data1, data2], ignore_index=True) assert_frame_equal(data, result) From bd2a7209f8fc7a6b8d8faca17d374db7b5902873 Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Tue, 29 Nov 2016 16:54:53 -0800 Subject: [PATCH 5/9] added testcase for schema not equal, disabled now --- python/pyarrow/tests/test_table.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/python/pyarrow/tests/test_table.py b/python/pyarrow/tests/test_table.py index 3a3ac76a6b34b..6b8454c083d1c 100644 --- a/python/pyarrow/tests/test_table.py +++ b/python/pyarrow/tests/test_table.py @@ -73,6 +73,18 @@ def test_recordbatchlist_to_pandas(): assert_frame_equal(data, result) +def test_recordbatchlist_errors(): + data1 = pd.DataFrame({'c1': np.array([1], dtype='uint32')}) + data2 = pd.DataFrame({'c1': np.array([4.0, 5.0], dtype='float64')}) + + batch1 = pa.RecordBatch.from_pandas(data1) + batch2 = pa.RecordBatch.from_pandas(data2) + + # TODO: Enable when subclass of unittest.testcase + #with self.assertRaises(pyarrow.ArrowException): + # self.assertRaises(pa.dataframe_from_batches([batch1, batch2])) + + def test_table_basics(): data = [ pa.from_pylist(range(5)), From 9edb0ba2e10b231908b77a574967d7bb1e778cdd Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Tue, 29 Nov 2016 17:32:44 -0800 Subject: [PATCH 6/9] used auto keyword where some typecasting was done in ConvertValues --- python/src/pyarrow/adapters/pandas.cc | 38 ++++++++++++--------------- 1 file changed, 17 insertions(+), 21 deletions(-) diff --git a/python/src/pyarrow/adapters/pandas.cc b/python/src/pyarrow/adapters/pandas.cc index 008b29a161f93..adb27e83ef120 100644 --- a/python/src/pyarrow/adapters/pandas.cc +++ b/python/src/pyarrow/adapters/pandas.cc @@ -653,9 +653,8 @@ class ArrowDeserializer { Status ConvertValuesZeroCopy(std::shared_ptr arr) { typedef typename arrow_traits::T T; - arrow::PrimitiveArray* prim_arr = static_cast( - arr.get()); - const T* in_values = reinterpret_cast(prim_arr->data()->data()); + auto prim_arr = static_cast(arr.get()); + auto in_values = reinterpret_cast(prim_arr->data()->data()); // Zero-Copy. We can pass the data pointer directly to NumPy. void* data = const_cast(in_values); @@ -680,10 +679,9 @@ class ArrowDeserializer { for (int c = 0; c < data->num_chunks(); c++) { const std::shared_ptr arr = data->chunk(c); - arrow::PrimitiveArray* prim_arr = static_cast( - arr.get()); - const T* in_values = reinterpret_cast(prim_arr->data()->data()); - T* out_values = reinterpret_cast(PyArray_DATA(out_)) + chunk_offset; + auto prim_arr = static_cast(arr.get()); + auto in_values = reinterpret_cast(prim_arr->data()->data()); + auto out_values = reinterpret_cast(PyArray_DATA(out_)) + chunk_offset; if (arr->null_count() > 0) { for (int64_t i = 0; i < arr->length(); ++i) { @@ -716,11 +714,10 @@ class ArrowDeserializer { for (int c = 0; c < data->num_chunks(); c++) { const std::shared_ptr arr = data->chunk(c); - arrow::PrimitiveArray* prim_arr = static_cast( - arr.get()); - const T* in_values = reinterpret_cast(prim_arr->data()->data()); + auto prim_arr = static_cast(arr.get()); + auto in_values = reinterpret_cast(prim_arr->data()->data()); // Upcast to double, set NaN as appropriate - double* out_values = reinterpret_cast(PyArray_DATA(out_)) + chunk_offset; + auto out_values = reinterpret_cast(PyArray_DATA(out_)) + chunk_offset; for (int i = 0; i < arr->length(); ++i) { out_values[i] = prim_arr->IsNull(i) ? NAN : in_values[i]; @@ -733,10 +730,9 @@ class ArrowDeserializer { for (int c = 0; c < data->num_chunks(); c++) { const std::shared_ptr arr = data->chunk(c); - arrow::PrimitiveArray* prim_arr = static_cast( - arr.get()); - const T* in_values = reinterpret_cast(prim_arr->data()->data()); - T* out_values = reinterpret_cast(PyArray_DATA(out_)) + chunk_offset; + auto prim_arr = static_cast(arr.get()); + auto in_values = reinterpret_cast(prim_arr->data()->data()); + auto out_values = reinterpret_cast(PyArray_DATA(out_)) + chunk_offset; memcpy(out_values, in_values, sizeof(T) * arr->length()); @@ -760,8 +756,8 @@ class ArrowDeserializer { for (int c = 0; c < data->num_chunks(); c++) { const std::shared_ptr arr = data->chunk(c); - arrow::BooleanArray* bool_arr = static_cast(arr.get()); - PyObject** out_values = reinterpret_cast(PyArray_DATA(out_)) + chunk_offset; + auto bool_arr = static_cast(arr.get()); + auto out_values = reinterpret_cast(PyArray_DATA(out_)) + chunk_offset; for (int64_t i = 0; i < arr->length(); ++i) { if (bool_arr->IsNull(i)) { @@ -785,8 +781,8 @@ class ArrowDeserializer { for (int c = 0; c < data->num_chunks(); c++) { const std::shared_ptr arr = data->chunk(c); - arrow::BooleanArray* bool_arr = static_cast(arr.get()); - uint8_t* out_values = reinterpret_cast(PyArray_DATA(out_)) + chunk_offset; + auto bool_arr = static_cast(arr.get()); + auto out_values = reinterpret_cast(PyArray_DATA(out_)) + chunk_offset; for (int64_t i = 0; i < arr->length(); ++i) { out_values[i] = static_cast(bool_arr->Value(i)); @@ -811,8 +807,8 @@ class ArrowDeserializer { for (int c = 0; c < data->num_chunks(); c++) { const std::shared_ptr arr = data->chunk(c); - arrow::StringArray* string_arr = static_cast(arr.get()); - PyObject** out_values = reinterpret_cast(PyArray_DATA(out_)) + chunk_offset; + auto string_arr = static_cast(arr.get()); + auto out_values = reinterpret_cast(PyArray_DATA(out_)) + chunk_offset; const uint8_t* data_ptr; int32_t length; From da6534553830d296e2ab341543c8f1f0106fb860 Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Wed, 30 Nov 2016 12:02:40 -0800 Subject: [PATCH 7/9] fixed test case for schema checking --- python/pyarrow/tests/test_table.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/python/pyarrow/tests/test_table.py b/python/pyarrow/tests/test_table.py index 6b8454c083d1c..dc4f37a830e5f 100644 --- a/python/pyarrow/tests/test_table.py +++ b/python/pyarrow/tests/test_table.py @@ -19,6 +19,7 @@ from pandas.util.testing import assert_frame_equal import pandas as pd +import pytest import pyarrow as pa @@ -73,16 +74,15 @@ def test_recordbatchlist_to_pandas(): assert_frame_equal(data, result) -def test_recordbatchlist_errors(): +def test_recordbatchlist_schema_equals(): data1 = pd.DataFrame({'c1': np.array([1], dtype='uint32')}) data2 = pd.DataFrame({'c1': np.array([4.0, 5.0], dtype='float64')}) batch1 = pa.RecordBatch.from_pandas(data1) batch2 = pa.RecordBatch.from_pandas(data2) - # TODO: Enable when subclass of unittest.testcase - #with self.assertRaises(pyarrow.ArrowException): - # self.assertRaises(pa.dataframe_from_batches([batch1, batch2])) + with pytest.raises(pa.ArrowException): + pa.dataframe_from_batches([batch1, batch2]) def test_table_basics(): From edf056e0bf43ab88d20d5867cbb820d778379cf4 Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Thu, 1 Dec 2016 10:00:44 -0800 Subject: [PATCH 8/9] simplified with pyarrow.schema.Schema.equals --- python/pyarrow/table.pyx | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/python/pyarrow/table.pyx b/python/pyarrow/table.pyx index 187ef27344080..4dfa3ad4fc439 100644 --- a/python/pyarrow/table.pyx +++ b/python/pyarrow/table.pyx @@ -432,17 +432,15 @@ def dataframe_from_batches(batches): shared_ptr[CTable] c_table Array arr Schema schema - Schema schema_comp import pandas as pd schema = batches[0].schema # check schemas are equal - for i in range(1, len(batches)): - schema_comp = batches[i].schema - if not schema.sp_schema.get().Equals(schema_comp.sp_schema): - raise ArrowException("Error converting list of RecordBatches to DataFrame, not all schemas are equal") + if any((not schema.equals(other.schema) for other in batches[1:])): + raise ArrowException("Error converting list of RecordBatches to " + "DataFrame, not all schemas are equal") cdef int K = batches[0].num_columns From b6c998668b60c607525d1725ace9ea317a85f3a1 Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Thu, 1 Dec 2016 10:06:16 -0800 Subject: [PATCH 9/9] fixed formatting --- python/pyarrow/table.pyx | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/python/pyarrow/table.pyx b/python/pyarrow/table.pyx index 4dfa3ad4fc439..45cf7becceefa 100644 --- a/python/pyarrow/table.pyx +++ b/python/pyarrow/table.pyx @@ -417,13 +417,13 @@ cdef class RecordBatch: def dataframe_from_batches(batches): """ - Convert a list of Arrow RecordBatches to one pandas.DataFrame + Convert a list of Arrow RecordBatches to a pandas.DataFrame Parameters ---------- batches: list of RecordBatch - RecordBatch list with identical schema to be converted + RecordBatch list to be converted, schemas must be equal """ cdef: @@ -450,7 +450,8 @@ def dataframe_from_batches(batches): for batch in batches: arr = batch[i] c_array_chunks.push_back(arr.sp_array) - c_columns[i].reset(new CColumn(schema.sp_schema.get().field(i), c_array_chunks)) + c_columns[i].reset(new CColumn(schema.sp_schema.get().field(i), + c_array_chunks)) c_array_chunks.clear() # create a Table from columns and convert to DataFrame