Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Arrow] Adding ArrowTensorTypeV2 to support tensors larger than 2Gb #47832

Merged
merged 34 commits into from
Oct 1, 2024

Conversation

alexeykudinkin
Copy link
Contributor

@alexeykudinkin alexeykudinkin commented Sep 26, 2024

Why are these changes needed?

Currently, when using tensor type in Ray Data if single tensor in a block grows above 2Gb (due to use of signed int32 as offsets) this would result in the following issue:

pyarrow.lib.ArrowInvalid: offset overflow while concatenating arrays

Consequently, this change adds support for tensors of > 2Gb in size, while maintaining compatibility with existing datasets already using tensors.

This is done by forking off ArrowTensorType in 2:

  • ArrowTensorType (v1) remaining intact
  • ArrowTensorTypeV2 is rebased on Arrow's LargeListType as well as now using int64 offsets

Related issue number

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

@bveeramani
Copy link
Member

bveeramani commented Sep 27, 2024

@alexeykudinkin is my understanding correct that v2 is off by default? Are we planning on removing v1 at some point? If not, how do we avoid maintaining both v1 and v2 indefinitely?

Also, what's the motivation for introducing a new version rather than changing the implementation? To avoid making a breaking change?

@alexeykudinkin
Copy link
Contributor Author

@alexeykudinkin is my understanding correct that v2 is off by default? Are we planning on removing v1 at some point? If not, how do we avoid maintaining both v1 and v2 indefinitely?

Correct.

V1 is gonna stay around for a while for BWC reasons, hence we have to offer a migration path:

  1. Users are switching to use ArrowTensorTypeV2 by setting RAY_DATA_USE_ARROW_TENSOR_V2 (or updating DataContext.use_arrow_tensor_v2
  2. Reading V1 path will still be supported, but all new datasets will be written in V2 format

Also, what's the motivation for introducing a new version rather than changing the implementation? To avoid making a breaking change?

Correct

Copy link
Member

@bveeramani bveeramani left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

@alexeykudinkin do we need to make an analogous change to ArrowVariableShapedTensorType?

Reading V1 path will still be supported, but all new datasets will be written in V2 format

Not sure if I missed it, but will new datasets be written with v2 after this PR?

@@ -1204,6 +1205,92 @@ def test_partitioning_in_dataset_kwargs_raises_error(ray_start_regular_shared):
)


def test_tensors_in_tables_parquet(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason not to parameterize this test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need -- this test verifies BWC

Comment on lines +255 to +266
@classmethod
def __arrow_ext_deserialize__(cls, storage_type, serialized):
shape = tuple(json.loads(serialized))
return cls(shape, storage_type.value_type)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dumb question -- why can't we put this in _BaseArrowTensorType?

Comment on lines +782 to +801
@pytest.mark.parametrize("tensor_format", ["v1", "v2"])
def test_large_arrow_tensor_array(restore_data_context, tensor_format):
DataContext.get_current().use_arrow_tensor_v2 = tensor_format == "v2"

test_arr = np.ones((1000, 550), dtype=np.uint8)

if tensor_format == "v1":
with pytest.raises(ArrowConversionError) as exc_info:
ta = ArrowTensorArray.from_numpy([test_arr] * 4000)

assert (
repr(exc_info.value.__cause__)
== "ArrowInvalid('Negative offsets in list array')"
)
else:
ta = ArrowTensorArray.from_numpy([test_arr] * 4000)
assert len(ta) == 4000
for arr in ta:
assert np.asarray(arr).shape == (1000, 550)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: IMO this would be more readable as two separate tests because separate tests would avoid conditionals, and there's only one shared line between the two parameterizations

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I definitely see your point, however splitting it in 2 tests obstructs the intention of it which is to show that it doesn't work w/ V1 and does so w/ V2



@pytest.mark.parametrize("tensor_format", ["v1", "v2"])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit, define an autouse fixture to avoid updating each test case.

@pytest.fixture(autouse=True, scope="module", params=["v1", "v2"])
def ...()
   ...
   yield

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, interesting, didn't know about this trick.

Would prefer not to now unwind all the 40 tests back removing this parametrize

python/ray/air/_internal/tensorflow_utils.py Outdated Show resolved Hide resolved
@@ -286,6 +288,7 @@ class DataContext:
min_parallelism: int = DEFAULT_MIN_PARALLELISM
read_op_min_num_blocks: int = DEFAULT_READ_OP_MIN_NUM_BLOCKS
enable_tensor_extension_casting: bool = DEFAULT_ENABLE_TENSOR_EXTENSION_CASTING
use_arrow_tensor_v2: bool = DEFAULT_USE_ARROW_TENSOR_V2
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a comment in the docstring explaining the differences between v1 and v2, as well as backward compatibility.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What docstring are you referring to (attached to what)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added to the DEFAULT_USE_ARROW_TENSOR_V2 definition

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is a long docstring below class DataContext covering all fields.
I was referring to it. but I think we should break it down to each field definition.

python/ray/data/dataset.py Outdated Show resolved Hide resolved
Copy link
Contributor

@raulchen raulchen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also add a comment explaining what exactly is not compatible? I assume the issue is that data written in v1 cannot be read using v2.

Reading V1 path will still be supported, but all new datasets will be written in V2 format

This sounds like a good idea. We can default to V2, and print a warning to prompt users to switch V1 when reading V1 data. This can be done in a follow-up PR though.

@alexeykudinkin
Copy link
Contributor Author

This sounds like a good idea. We can default to V2, and print a warning to prompt users to switch V1 when reading V1 data. This can be done in a follow-up PR though.

I'd rather decouple switching default from supporting it in this PR to get a bit more miles under the rubber for us to validate V2 before flipping it on by default

@alexeykudinkin
Copy link
Contributor Author

Can you also add a comment explaining what exactly is not compatible? I assume the issue is that data written in v1 cannot be read using v2.

Wire format is not compatible:

  • V1 is using int32 offsets, while V2 is int64
  • V1 is using PA ListType, while V2 is LargeListType

Peter Wang and others added 6 commits September 30, 2024 15:39
Signed-off-by: Peter Wang <peter.wang9812@gmail.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
…Array` type def;

Forked off `ArrowTensorTypeV2` from `ArrowTensorType` now using int64 offsets

Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
…rrowTensorType`)

Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
@@ -286,6 +288,7 @@ class DataContext:
min_parallelism: int = DEFAULT_MIN_PARALLELISM
read_op_min_num_blocks: int = DEFAULT_READ_OP_MIN_NUM_BLOCKS
enable_tensor_extension_casting: bool = DEFAULT_ENABLE_TENSOR_EXTENSION_CASTING
use_arrow_tensor_v2: bool = DEFAULT_USE_ARROW_TENSOR_V2
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is a long docstring below class DataContext covering all fields.
I was referring to it. but I think we should break it down to each field definition.

python/ray/data/dataset.py Outdated Show resolved Hide resolved
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
@raulchen raulchen merged commit 1bab09b into ray-project:master Oct 1, 2024
5 checks passed
ujjawal-khare pushed a commit to ujjawal-khare-27/ray that referenced this pull request Oct 15, 2024
…ray-project#47832)

Currently, when using tensor type in Ray Data if single tensor in a
block grows above 2Gb (due to use of signed `int32` as offsets) this
would result in the following issue:

```
pyarrow.lib.ArrowInvalid: offset overflow while concatenating arrays
```

Consequently, this change adds support for tensors of > 4Gb in size,
while maintaining compatibility with existing datasets already using
tensors.

This is done by forking off `ArrowTensorType` in 2:

 - `ArrowTensorType` (v1) remaining intact
- `ArrowTensorTypeV2` is rebased on Arrow's `LargeListType` as well as
now using `int64` offsets

---------

Signed-off-by: Peter Wang <peter.wang9812@gmail.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Co-authored-by: Peter Wang <peter.wang9812@gmail.com>
Signed-off-by: ujjawal-khare <ujjawal.khare@dream11.com>
ujjawal-khare pushed a commit to ujjawal-khare-27/ray that referenced this pull request Oct 15, 2024
…ray-project#47832)

Currently, when using tensor type in Ray Data if single tensor in a
block grows above 2Gb (due to use of signed `int32` as offsets) this
would result in the following issue:

```
pyarrow.lib.ArrowInvalid: offset overflow while concatenating arrays
```

Consequently, this change adds support for tensors of > 4Gb in size,
while maintaining compatibility with existing datasets already using
tensors.

This is done by forking off `ArrowTensorType` in 2:

 - `ArrowTensorType` (v1) remaining intact
- `ArrowTensorTypeV2` is rebased on Arrow's `LargeListType` as well as
now using `int64` offsets

---------

Signed-off-by: Peter Wang <peter.wang9812@gmail.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Co-authored-by: Peter Wang <peter.wang9812@gmail.com>
Signed-off-by: ujjawal-khare <ujjawal.khare@dream11.com>
ujjawal-khare pushed a commit to ujjawal-khare-27/ray that referenced this pull request Oct 15, 2024
…ray-project#47832)

Currently, when using tensor type in Ray Data if single tensor in a
block grows above 2Gb (due to use of signed `int32` as offsets) this
would result in the following issue:

```
pyarrow.lib.ArrowInvalid: offset overflow while concatenating arrays
```

Consequently, this change adds support for tensors of > 4Gb in size,
while maintaining compatibility with existing datasets already using
tensors.

This is done by forking off `ArrowTensorType` in 2:

 - `ArrowTensorType` (v1) remaining intact
- `ArrowTensorTypeV2` is rebased on Arrow's `LargeListType` as well as
now using `int64` offsets

---------

Signed-off-by: Peter Wang <peter.wang9812@gmail.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Co-authored-by: Peter Wang <peter.wang9812@gmail.com>
Signed-off-by: ujjawal-khare <ujjawal.khare@dream11.com>
ujjawal-khare pushed a commit to ujjawal-khare-27/ray that referenced this pull request Oct 15, 2024
…ray-project#47832)

Currently, when using tensor type in Ray Data if single tensor in a
block grows above 2Gb (due to use of signed `int32` as offsets) this
would result in the following issue:

```
pyarrow.lib.ArrowInvalid: offset overflow while concatenating arrays
```

Consequently, this change adds support for tensors of > 4Gb in size,
while maintaining compatibility with existing datasets already using
tensors.

This is done by forking off `ArrowTensorType` in 2:

 - `ArrowTensorType` (v1) remaining intact
- `ArrowTensorTypeV2` is rebased on Arrow's `LargeListType` as well as
now using `int64` offsets

---------

Signed-off-by: Peter Wang <peter.wang9812@gmail.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Co-authored-by: Peter Wang <peter.wang9812@gmail.com>
Signed-off-by: ujjawal-khare <ujjawal.khare@dream11.com>
ujjawal-khare pushed a commit to ujjawal-khare-27/ray that referenced this pull request Oct 15, 2024
…ray-project#47832)

Currently, when using tensor type in Ray Data if single tensor in a
block grows above 2Gb (due to use of signed `int32` as offsets) this
would result in the following issue:

```
pyarrow.lib.ArrowInvalid: offset overflow while concatenating arrays
```

Consequently, this change adds support for tensors of > 4Gb in size,
while maintaining compatibility with existing datasets already using
tensors.

This is done by forking off `ArrowTensorType` in 2:

 - `ArrowTensorType` (v1) remaining intact
- `ArrowTensorTypeV2` is rebased on Arrow's `LargeListType` as well as
now using `int64` offsets

---------

Signed-off-by: Peter Wang <peter.wang9812@gmail.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Co-authored-by: Peter Wang <peter.wang9812@gmail.com>
Signed-off-by: ujjawal-khare <ujjawal.khare@dream11.com>
ujjawal-khare pushed a commit to ujjawal-khare-27/ray that referenced this pull request Oct 15, 2024
…ray-project#47832)

Currently, when using tensor type in Ray Data if single tensor in a
block grows above 2Gb (due to use of signed `int32` as offsets) this
would result in the following issue:

```
pyarrow.lib.ArrowInvalid: offset overflow while concatenating arrays
```

Consequently, this change adds support for tensors of > 4Gb in size,
while maintaining compatibility with existing datasets already using
tensors.

This is done by forking off `ArrowTensorType` in 2:

 - `ArrowTensorType` (v1) remaining intact
- `ArrowTensorTypeV2` is rebased on Arrow's `LargeListType` as well as
now using `int64` offsets

---------

Signed-off-by: Peter Wang <peter.wang9812@gmail.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Co-authored-by: Peter Wang <peter.wang9812@gmail.com>
Signed-off-by: ujjawal-khare <ujjawal.khare@dream11.com>
ujjawal-khare pushed a commit to ujjawal-khare-27/ray that referenced this pull request Oct 15, 2024
…ray-project#47832)

Currently, when using tensor type in Ray Data if single tensor in a
block grows above 2Gb (due to use of signed `int32` as offsets) this
would result in the following issue:

```
pyarrow.lib.ArrowInvalid: offset overflow while concatenating arrays
```

Consequently, this change adds support for tensors of > 4Gb in size,
while maintaining compatibility with existing datasets already using
tensors.

This is done by forking off `ArrowTensorType` in 2:

 - `ArrowTensorType` (v1) remaining intact
- `ArrowTensorTypeV2` is rebased on Arrow's `LargeListType` as well as
now using `int64` offsets

---------

Signed-off-by: Peter Wang <peter.wang9812@gmail.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Co-authored-by: Peter Wang <peter.wang9812@gmail.com>
Signed-off-by: ujjawal-khare <ujjawal.khare@dream11.com>
ujjawal-khare pushed a commit to ujjawal-khare-27/ray that referenced this pull request Oct 15, 2024
…ray-project#47832)

Currently, when using tensor type in Ray Data if single tensor in a
block grows above 2Gb (due to use of signed `int32` as offsets) this
would result in the following issue:

```
pyarrow.lib.ArrowInvalid: offset overflow while concatenating arrays
```

Consequently, this change adds support for tensors of > 4Gb in size,
while maintaining compatibility with existing datasets already using
tensors.

This is done by forking off `ArrowTensorType` in 2:

 - `ArrowTensorType` (v1) remaining intact
- `ArrowTensorTypeV2` is rebased on Arrow's `LargeListType` as well as
now using `int64` offsets

---------

Signed-off-by: Peter Wang <peter.wang9812@gmail.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Co-authored-by: Peter Wang <peter.wang9812@gmail.com>
Signed-off-by: ujjawal-khare <ujjawal.khare@dream11.com>
ujjawal-khare pushed a commit to ujjawal-khare-27/ray that referenced this pull request Oct 15, 2024
…ray-project#47832)

Currently, when using tensor type in Ray Data if single tensor in a
block grows above 2Gb (due to use of signed `int32` as offsets) this
would result in the following issue:

```
pyarrow.lib.ArrowInvalid: offset overflow while concatenating arrays
```

Consequently, this change adds support for tensors of > 4Gb in size,
while maintaining compatibility with existing datasets already using
tensors.

This is done by forking off `ArrowTensorType` in 2:

 - `ArrowTensorType` (v1) remaining intact
- `ArrowTensorTypeV2` is rebased on Arrow's `LargeListType` as well as
now using `int64` offsets

---------

Signed-off-by: Peter Wang <peter.wang9812@gmail.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Co-authored-by: Peter Wang <peter.wang9812@gmail.com>
Signed-off-by: ujjawal-khare <ujjawal.khare@dream11.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
go add ONLY when ready to merge, run all tests
Projects
None yet
4 participants