-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ARROW-369: [Python] Convert multiple record batches at once to Pandas #216
ARROW-369: [Python] Convert multiple record batches at once to Pandas #216
Conversation
ping @wesm. I still need to check that the RecordBatch schemas are equal and add some negative unit tests, but I wanted to check with you if the approach so far looks good. Could you please take a look when you get a chance? Thanks! |
c_col.reset(new CColumn(schema.sp_schema.get().field(i), c_array_vec[i])) | ||
# TODO - why need PyOject ref? arr is placeholder | ||
check_status(pyarrow.ConvertColumnToPandas( | ||
c_col, <PyObject*> arr, &np_arr)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the PyObject *
param doesn't seem to do anything, is it needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can remove the cast.
If you instead construct a Table from a list of RecordBatches you can avoid the code duplication. See Table.to_pandas
|
||
if (arr->null_count() > 0) { | ||
if (data->null_count() > 0) { | ||
RETURN_NOT_OK(AllocateOutput(NPY_FLOAT64)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to make sure, an Integer column only needs to be casted to a double if there are Null values right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems fine so far, minor comments
''' | ||
|
||
@staticmethod | ||
def to_pandas(batches): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a Java-like construct -- just make this a plain old function "dataframe_from_batches" or something like that
c_col.reset(new CColumn(schema.sp_schema.get().field(i), c_array_vec[i])) | ||
# TODO - why need PyOject ref? arr is placeholder | ||
check_status(pyarrow.ConvertColumnToPandas( | ||
c_col, <PyObject*> arr, &np_arr)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can remove the cast.
If you instead construct a Table from a list of RecordBatches you can avoid the code duplication. See Table.to_pandas
for batch in batches: | ||
for i in range(K): | ||
arr = batch[i] | ||
c_array_vec[i].push_back(arr.sp_array) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you instead loop over column index, then loop over batches, you can avoid this c_array_vec
business and construct the Column directly.
|
||
if (arr->null_count() > 0) { | ||
if (data->null_count() > 0) { | ||
RETURN_NOT_OK(AllocateOutput(NPY_FLOAT64)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right
arrow::PrimitiveArray* prim_arr = static_cast<arrow::PrimitiveArray*>( | ||
arr.get()); | ||
const T* in_values = reinterpret_cast<const T*>(prim_arr->data()->data()); | ||
T* out_values = reinterpret_cast<T*>(PyArray_DATA(out_)) + chunk_offset; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you're welcome to use auto
with these casts for better DRY
|
||
# TODO: Enable when subclass of unittest.testcase | ||
#with self.assertRaises(pyarrow.ArrowException): | ||
# self.assertRaises(pa.dataframe_from_batches([batch1, batch2])) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can these tests be changed to a unittest.testcase subclass? I could open another JIRA to do this if it's ok
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't have a strong opinion -- I think we've been trying to use pytest (i.e. fixtures over TestCase classes), @xhochy do you have an opinion?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pytest is fine too. I saw unittest being used somewhere, so I wasn't sure if there was a preference.
Thanks for the review @wesm , I fixed the issues you pointed out and added a check that the schema in each batch is equal. |
# check schemas are equal | ||
for i in range(1, len(batches)): | ||
schema_comp = batches[i].schema | ||
if not schema.sp_schema.get().Equals(schema_comp.sp_schema): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should the method CSchema.Equals
also be in the pyarrow class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
definitely, pyarrow.schema.Schema.equals
@wesm, hopefully this is good to go now. Please take another look when you can, thanks! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
# check schemas are equal | ||
if any((not schema.equals(other.schema) for other in batches[1:])): | ||
raise ArrowException("Error converting list of RecordBatches to " | ||
"DataFrame, not all schemas are equal") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Later we'll want to display the mismatched schemas in the error message but this is ok for now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I should have added that.. I'll make a note to do that later.
I was thinking a better API for this would be:
what do you think? |
yeah, it would be more flexible that way. I can go ahead and make a PR for it. |
See https://github.com/apache/parquet-mr/blob/df9d8e415436292ae33e1ca0b8da256640de9710/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java#L86, this number should be 1 for Parquet 1.0 files, I believe. Author: Wes McKinney <wes.mckinney@twosigma.com> Closes apache#216 from wesm/PARQUET-828 and squashes the following commits: ab6773c [Wes McKinney] Do not implicitly cast ParquetVersion enum to int. Set 1.0 to 1, 2.0 to 2
See https://github.com/apache/parquet-mr/blob/df9d8e415436292ae33e1ca0b8da256640de9710/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java#L86, this number should be 1 for Parquet 1.0 files, I believe. Author: Wes McKinney <wes.mckinney@twosigma.com> Closes apache#216 from wesm/PARQUET-828 and squashes the following commits: ab6773c [Wes McKinney] Do not implicitly cast ParquetVersion enum to int. Set 1.0 to 1, 2.0 to 2 Change-Id: I7cb74e2c2ea4d567ff0d04cd4510efda9115a53f
See https://github.com/apache/parquet-mr/blob/df9d8e415436292ae33e1ca0b8da256640de9710/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java#L86, this number should be 1 for Parquet 1.0 files, I believe. Author: Wes McKinney <wes.mckinney@twosigma.com> Closes apache#216 from wesm/PARQUET-828 and squashes the following commits: ab6773c [Wes McKinney] Do not implicitly cast ParquetVersion enum to int. Set 1.0 to 1, 2.0 to 2 Change-Id: I7cb74e2c2ea4d567ff0d04cd4510efda9115a53f
See https://github.com/apache/parquet-mr/blob/df9d8e415436292ae33e1ca0b8da256640de9710/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java#L86, this number should be 1 for Parquet 1.0 files, I believe. Author: Wes McKinney <wes.mckinney@twosigma.com> Closes apache#216 from wesm/PARQUET-828 and squashes the following commits: ab6773c [Wes McKinney] Do not implicitly cast ParquetVersion enum to int. Set 1.0 to 1, 2.0 to 2 Change-Id: I7cb74e2c2ea4d567ff0d04cd4510efda9115a53f
See https://github.com/apache/parquet-mr/blob/df9d8e415436292ae33e1ca0b8da256640de9710/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java#L86, this number should be 1 for Parquet 1.0 files, I believe. Author: Wes McKinney <wes.mckinney@twosigma.com> Closes apache#216 from wesm/PARQUET-828 and squashes the following commits: ab6773c [Wes McKinney] Do not implicitly cast ParquetVersion enum to int. Set 1.0 to 1, 2.0 to 2 Change-Id: I7cb74e2c2ea4d567ff0d04cd4510efda9115a53f
Modified Pandas adapter to handle columns with multiple chunks with
ConvertColumnToPandas
. This modifies the pyarrow public API by adding a classRecordBatchList
and static methodtoPandas
which takes a list of Arrow RecordBatches and outputs a Pandas DataFrame.Adds unit test in test_table.py to do the conversion for each column with typed specialization.