Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Data] Add support for objects to Arrow blocks #45272

Conversation

terraflops1048576
Copy link
Contributor

Why are these changes needed?

Currently, Ray does not support blocks/batches with objects and multi-dimensional arrays in different columns. This causes Ray Data to throw exceptions when these are provided because:

  1. Since there's an arbitrary object in the batch, the Arrow block format fails with ArrowNotImplemented with dtype 17. This falls back to return pd.DataFrame(dict(batch)) in BlockAccessor.batch_to_block.
  2. However, this particular DataFrame constructor does not support columns with numpy.ndarray objects, so it throws the exception listed in the linked issue.

This change enables Python object storage in the Arrow blocks by defining an Arrow extension type that simply represents the Python objects as a variable-sized large binary. I suppose the alleged performance benefits listed in the comments are an extra benefit.

I'm not sure that this is the correct approach or that I've properly patched all of the places, so some help would be appreciated!

Related issue number

Resolves #45235

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

Signed-off-by: Peter Wang <peter.wang9812@gmail.com>
Peter Wang added 2 commits May 11, 2024 21:56
Signed-off-by: Peter Wang <peter.wang9812@gmail.com>
Signed-off-by: Peter Wang <peter.wang9812@gmail.com>
@terraflops1048576 terraflops1048576 force-pushed the terraflops/add_objects_to_arrow_blocks branch from abe7c58 to 516ff57 Compare May 12, 2024 05:46
…fix type annnotation

Signed-off-by: Peter Wang <peter.wang9812@gmail.com>
@terraflops1048576 terraflops1048576 force-pushed the terraflops/add_objects_to_arrow_blocks branch from 516ff57 to 3b38131 Compare May 12, 2024 05:59
@terraflops1048576 terraflops1048576 changed the title Add support for objects to Arrow blocks [Data] Add support for objects to Arrow blocks May 12, 2024
@terraflops1048576 terraflops1048576 force-pushed the terraflops/add_objects_to_arrow_blocks branch 2 times, most recently from 2d82297 to 978b30e Compare May 12, 2024 19:23
Signed-off-by: Peter Wang <peter.wang9812@gmail.com>
@terraflops1048576 terraflops1048576 force-pushed the terraflops/add_objects_to_arrow_blocks branch from 978b30e to 014949f Compare May 12, 2024 20:26
Peter Wang added 2 commits May 12, 2024 16:55
Signed-off-by: Peter Wang <peter.wang9812@gmail.com>
Signed-off-by: Peter Wang <peter.wang9812@gmail.com>
@raulchen raulchen self-assigned this May 13, 2024
@anyscalesam
Copy link
Contributor

@terraflops1048576 this could be a big contribution but we want to think a little deeper about this with you to properly codevelop this; can you reach out to me on Ray Slack so we can setup some time to discuss further.

My handle is "Sam (Ray Team)"

We should get a formal REP in ray-enhancements for this as well: https://github.com/ray-project/enhancements

cc @bveeramani

@anyscalesam anyscalesam added triage Needs triage (eg: priority, bug/not-bug, and owning component) data Ray Data-related issues labels May 29, 2024
@anyscalesam anyscalesam reopened this May 30, 2024
@anyscalesam anyscalesam removed the triage Needs triage (eg: priority, bug/not-bug, and owning component) label May 30, 2024
@raulchen
Copy link
Contributor

sorry, I got disruptions last week. doing the 2nd pass of review now.
Also, could you resolve the conflicts when you have a chance? thanks

Copy link
Contributor

@raulchen raulchen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me at the high level.
Apologize for my late review. Also none of the active Ray Data maintainers are familiar with the pyarrow extension. it'd be appreciated if you can help explain the questions and add more comments.
thanks.


def object_extension_type_allowed() -> bool:
return (
PYARROW_VERSION is None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in which case will this be None? why should this return True when None?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I'm not sure. I just borrowed these version checks from the existing ArrowTensorArray implementation, which also passes if the PYARROW_VERSION is None. It turns out the way I've done it is buggy, so I'll revise it. I can also defensively just make this check fail if PYARROW_VERSION is None.

The more general question is whether we would be open to bumping the minimum PyArrow version to 9, which is almost 2 years old at this point. Currently, it is at 6.0.1, which appears to be almost 3 years old.



@PublicAPI(stability="alpha")
class ArrowPythonObjectType(pa.ExtensionType):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you document these classes and methods in this file?
Not many people are familiar with pyarrow extension. Adding some comments would help improve readability.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do

except (
pyarrow.ArrowInvalid,
pyarrow.ArrowNotImplementedError,
pyarrow.ArrowTypeError,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Is it possible to detect if the input data contains unsupported types, instead of using try-except?
  • if not, for above exception types, can be be caused by other reasons besides unsupported types?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this is the way that the fallback to Pandas is calculated; in any case, this code is outdated after the merge conflict because they have now all been coalesced into ArrowConversionError.

It is possible to check that all of the objects inside are of some Arrow type, but I believe this is quite cumbersome -- you have to manually check whether it is convertible to any PyArrow type, and you have to check all of the elements. It's easier to try conversion and see if it works.

In any case, the conversion into the ArrowPythonObjectArray should never fail if everything is pickleable, and I assume this is better than falling back to Pandas.

if log_once(f"arrow_object_pickle_{col_name}"):
logger.warning(
f"Failed to interpret {col_name} as "
"multi-dimensional arrays. It will be pickled."
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in which case would this happen?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current code looks at any column which has a numpy dtype of object and checks if it can be converted into a list of tensors. If it can't be, then we fall back to ArrowPythonObjectArray. This happens when the numpy array has a dtype of object because you have custom objects in there.

ds = ds.materialize()
block = ray.get(ds.get_internal_block_refs()[0])
# TODO: Once we support converting dict to a supported arrow type,
# the block type should be Arrow.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does this comment mean? isn't the block already pyarrow?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes, this used to be a test that checked the fallback behavior to Pandas blocks, but these changes are supposed to remove that fallback behavior for the case given. I cut the test from where it originally was and pasted it here, but I must've forgotten to delete the comment.

python/ray/data/tests/test_arrow_block.py Show resolved Hide resolved
f"but schema is not ArrayTensorType: {tensor_array_types[0]}"
)
schema_field_overrides[col_name] = new_type
arrow_tensor_types = (ArrowVariableShapedTensorType, ArrowTensorType)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you explain what this change is doing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically this is a refactor of the unify_schemas function, which serves a similar purpose to the PyArrow unify_schemas. Its job is to take the schemas of the tables to be concatenated and produce a "unified" schema which contains the correct types for all of the columns of the constituent tables.

The previous code checked only the first table in the tables to concatenate, so it would behave weirdly if the first table weren't a tensor type and there was one further along by creating a schema where the tensor types were just erased (the error would be caught elsewhere, though).

However, for the object array, we need to support the case where the first table has a column of type, say, int64, and the second table has a column of type SomeObject. The old code would simply set the type to int64, which would cause an error. The refactored code will ensure that the int64s get pickled (yes, this is not very efficient, but this is the only way to support So this new code scans all of the table schemas, finds which columns have any table that contain tensors or objects, and handles them accordingly.

This is basically type promotion for the ArrowTensorArray and the ArrowVariableShapedTensorArray and likewise for the ArrowPythonObjectArray and other types. Should we set the promotion options differently too?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the detailed explanation! Can you also update the docstring of this method to reflect this change?
also do you mind explaining again what you mean by "set the promotion options differently"?

…cts_to_arrow_blocks

Signed-off-by: Peter Wang <peter.wang9812@gmail.com>
@@ -181,6 +185,32 @@ def test_arrow_concat_tensor_extension_uniform_but_different():
# fails for this case.


@pytest.mark.skipif(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if these actually get run in CI if I put the skipif and require PyArrow version >= 9.0.0. Can someone check?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Signed-off-by: Peter Wang <peter.wang9812@gmail.com>
@terraflops1048576 terraflops1048576 force-pushed the terraflops/add_objects_to_arrow_blocks branch from f41298e to 7854e67 Compare June 26, 2024 04:55
@terraflops1048576
Copy link
Contributor Author

terraflops1048576 commented Jun 26, 2024

After trying to fix the test failures that relate to the new exception info, I realized that this is actually rather problematic -- it fixes way too many errors that would otherwise be reported to the user, like the overflow error resulting from [1, 2**100], trying to concatenate np.array([2**100]) and np.array([1]), etc. After all, it is perfectly legal to pickle all of these things and store it in a column, but we obviously don't want to do this for performance reasons. We would rather error and tell the user to adjust the problematic values.

I'm not sure what we should do with the np.array case, as they are technically objects, but maybe we shouldn't fall back if anything related to ArrowTensorArray fails.

Peter Wang added 2 commits June 26, 2024 03:16
Signed-off-by: Peter Wang <peter.wang9812@gmail.com>
Signed-off-by: Peter Wang <peter.wang9812@gmail.com>
@terraflops1048576 terraflops1048576 force-pushed the terraflops/add_objects_to_arrow_blocks branch from 9c131f1 to 6564543 Compare June 26, 2024 08:22
Signed-off-by: Peter Wang <peter.wang9812@gmail.com>
@terraflops1048576 terraflops1048576 force-pushed the terraflops/add_objects_to_arrow_blocks branch from 2edc1bb to 924f653 Compare June 26, 2024 16:36
Copy link
Contributor

@raulchen raulchen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the new comments and explanation.

it fixes way too many errors that would otherwise be reported to the user, like the overflow error resulting from [1, 2100], trying to concatenate np.array([2100]) and np.array([1]), etc. After all, it is perfectly legal to pickle all of these things and store it in a column, but we obviously don't want to do this for performance reasons.

To confirm my understanding, in the UDF return, if any value is overflowed, all values will fallback to objects. If nothing is overflowed, the type will still be integers. Right?

This seems fine. Because we don't know if the users intend to return overflowed integers either. Maybe we can introduce a flag to allow disabling the fallback.

I'm not sure what we should do with the np.array case, as they are technically objects, but maybe we shouldn't fall back if anything related to ArrowTensorArray fails.

Are you concerned about np.arrays with object dtype? I think it's okay to not handle that. Users can just return a python list instead.

def is_object_fixable_error(e: ArrowConversionError) -> bool:
"""Returns whether this error can be fixed by using an ArrowPythonObjectArray"""
return any(
err in "".join(traceback.format_exception(type(e), e, e.__traceback__))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, it's not efficient to format the entire error as a string.
Can we iterate over the causes and check their types?

@@ -181,6 +185,32 @@ def test_arrow_concat_tensor_extension_uniform_but_different():
# fails for this case.


@pytest.mark.skipif(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

f"but schema is not ArrayTensorType: {tensor_array_types[0]}"
)
schema_field_overrides[col_name] = new_type
arrow_tensor_types = (ArrowVariableShapedTensorType, ArrowTensorType)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the detailed explanation! Can you also update the docstring of this method to reflect this change?
also do you mind explaining again what you mean by "set the promotion options differently"?

@raulchen
Copy link
Contributor

Some tests are failing. can you take a look? https://buildkite.com/ray-project/microcheck/builds/2214

Signed-off-by: Peter Wang <peter.wang9812@gmail.com>
@terraflops1048576
Copy link
Contributor Author

I've tried to separate out overflows and some other errors like mismatched float/int types and make them continue reporting errors as in the tests I've fixed, instead of saving them as objects in the PyArrow table, primarily just to avoid unintended consequences.

You are correct that if we enabled catching all ArrowConversionErrors and turning things into objects that as long as blocks were completely non-overflowed integers, they would be stored as integer type. Currently, np.ndarrays with object dtype (such as when a user tries to put in overflowed integers) are just pickled as objects, because it's really hard to detect whether it's the result of overflowed integers or just plain objects in there.

Copy link
Contributor

@raulchen raulchen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates. LGTM

python/ray/data/_internal/arrow_block.py Outdated Show resolved Hide resolved
Signed-off-by: Hao Chen <chenh1024@gmail.com>
@raulchen raulchen enabled auto-merge (squash) July 11, 2024 02:34
@github-actions github-actions bot added the go add ONLY when ready to merge, run all tests label Jul 11, 2024
@raulchen
Copy link
Contributor

@terraflops1048576 the CI failures are unrelated and already fixed in master. Can you merge the latest master again?

@github-actions github-actions bot disabled auto-merge July 12, 2024 15:29
Signed-off-by: Peter Wang <peter.wang9812@gmail.com>
@terraflops1048576
Copy link
Contributor Author

terraflops1048576 commented Jul 13, 2024

I'm not sure why the tests that I fixed here weren't broken before, but I included a fix.

Interestingly enough, the documentation for ray.data.Dataset.union says

The datasets must have the same schema as this dataset, otherwise the behavior is undefined.

However this isn't actually the case anymore and things like python/ray/data/tests/test_consumption.py::test_union actually depend on different behavior. Nevertheless, I fixed unify_schemas to reflect this behavior.

Also, I realized today while doing this fix that PyArrow tables can actually have duplicate column names -- this is an edge case that I don't really see treated anywhere (in particular, converting to pandas, which does not allow this, seems broken). I added something that just throws an exception if it sees it in this particular case.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
data Ray Data-related issues go add ONLY when ready to merge, run all tests P1 Issue that should be fixed within a few weeks
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Data] Can't return array-like data from UDF if batch contains unsupported type
4 participants