Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Data] Update Dataset.count() to avoid unnecessarily keeping BlockRefs in-memory #46369

Merged
merged 21 commits into from
Jul 10, 2024

Conversation

scottjlee
Copy link
Contributor

@scottjlee scottjlee commented Jul 1, 2024

Why are these changes needed?

Currently, the implementation of Dataset.count() retrieves the entire list of BlockRefs associated with the Dataset when calculating the number of rows per block. This PR is a minor performance improvement to use an iterator over the BlockRefs, so that we can drop them as soon as we get each block's row count, and we do not need to hold the entire list of BlockRefs.

Related issue number

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

sjl and others added 2 commits July 1, 2024 23:23
@@ -4577,8 +4601,6 @@ def get_internal_block_refs(self) -> List[ObjectRef[Block]]:
>>> ds.get_internal_block_refs()
[ObjectRef(...)]

Time complexity: O(1)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removing this because it's no longer accurate.

@scottjlee scottjlee changed the title [Data] Update Dataset.count() to avoid unnecessary keeping BlockRefs in-memory [Data] Update Dataset.count() to avoid unnecessarily keeping BlockRefs in-memory Jul 2, 2024
@scottjlee scottjlee marked this pull request as ready for review July 2, 2024 03:19
An iterator over references to this Dataset's blocks.
"""
iter_block_refs_md, _, _ = self._plan.execute_to_iterator()
iter_block_refs = (block_ref for block_ref, _ in iter_block_refs_md)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just realized that we already have block metadata here. So no need to submit additional tasks to count rows.
We can update this method to return Iterator[RefBundle]

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Comment on lines 4573 to 4574
This function can be used for zero-copy access to the data. It does not
keep the data materialized in-memory.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does zero-copy access mean here? You might copy the data when you get the block reference, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i had thought that when we get the RefBundle / BlockRef, it does not copy the data. that's the main advantage of passing the references instead of blocks themselves, right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh. Yeah, if you don't call ray.get there won't be any copies, although the way I read this makes it sound like I can access the actual Block without copies.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point, let me just remove the line. i think saying "It does not keep the data materialized in-memory." is the more important main point to get across.

def iter_internal_block_refs(self) -> Iterator[ObjectRef[Block]]:
"""Get an iterator over references to the underlying blocks of this Dataset.

This function can be used for zero-copy access to the data. It does not
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
This function can be used for zero-copy access to the data. It does not
This function can be used for zero-copy access to the data. It doesn't

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An iterator over references to this Dataset's blocks.
"""
iter_block_refs_md, _, _ = self._plan.execute_to_iterator()
iter_block_refs = (block_ref for block_ref, _ in iter_block_refs_md)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

sjl added 6 commits July 3, 2024 04:32
Signed-off-by: sjl <sjl@anyscale.com>
Signed-off-by: sjl <sjl@anyscale.com>
Signed-off-by: Scott Lee <sjl@anyscale.com>
fix
Signed-off-by: Scott Lee <sjl@anyscale.com>
Signed-off-by: sjl <sjl@anyscale.com>
@scottjlee scottjlee requested review from raulchen and bveeramani July 3, 2024 21:27
Comment on lines 2467 to 2470
"""Count the number of records in the dataset. For `Dataset`s
which only read Parquet files (created with :meth:`~ray.data.read_parquet`),
this method reads the file metadata to efficiently count the number of records
without reading in the entire data.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: docstring summary should be one line. Also, replace "records" with "rows" for consistency with our terminology elsewhere

Suggested change
"""Count the number of records in the dataset. For `Dataset`s
which only read Parquet files (created with :meth:`~ray.data.read_parquet`),
this method reads the file metadata to efficiently count the number of records
without reading in the entire data.
"""Count the number of rows in the dataset.
For `Dataset`s
which only read Parquet files (created with :meth:`~ray.data.read_parquet`),
this method reads the file metadata to efficiently count the number of rows
without reading in the entire data.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use the metadata for other APIs, too (read_images, from_pandas, etc.).

I'm wondering if we should just remove the thing about Parquet? It's an implementation detail

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think the specific case around parquet has come up in questions from OSS users sometimes, which is why i expanded on the existing O(1) for parquet to clarify what it means. let me know if you think it's too confusing, and we can remove it instead

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, got it. If people have been asking about, sounds good to keep.

blocks: Tuple[ObjectRef[Block], BlockMetadata],
) -> RefBundle:
# Set `owns_blocks=True` so we can destroy the blocks eagerly
# after getting count from metadata.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Let's also update _plan.execute_to_iterator to return RefBundles?
  • Eager free won't work here, unless we explicitly call destroy_if_owned after count. I think it's fine to not eager free, just a note about the comment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's also update _plan.execute_to_iterator to return RefBundles?

i am thinking i can do that in a future PR, while i also replace get_internal_block_refs usages with the new iter_internal_ref_bundles method. what do you think?

Eager free won't work here, unless we explicitly call destroy_if_owned after count. I think it's fine to not eager free, just a note about the comment.

ah thanks, i had misunderstood how that worked. i will remove the comment but keep owns_blocks=True usage

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's also update _plan.execute_to_iterator to return RefBundles?

@scottjlee @raulchen I'll do this. It was planned clean up item from the LazyBlockList removal

num_rows = ref_bundle.num_rows()
# Executing the dataset always returns blocks with valid `num_rows`.
assert num_rows is not None
total_rows += num_rows
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not an issue with this PR. but we should make BlockMetadata.num_rows non-nullable, to avoid repeating this check.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, let's definitely do this when we separate BlockMetadata from read tasks. Currently, BlockMetadata.num_rows must be nullable because some datasources don't know how many rows are yielded by each read task

self._synchronize_progress_bar()
return iter_ref_bundles

@ConsumptionAPI(pattern="")
@DeveloperAPI
def get_internal_block_refs(self) -> List[ObjectRef[Block]]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(can do this later) there are only a few use cases of get_internal_block_refs, we can also update them to use iter_internal_block_refs.

Scott Lee added 3 commits July 3, 2024 14:55
Signed-off-by: Scott Lee <sjl@anyscale.com>
Signed-off-by: Scott Lee <sjl@anyscale.com>
Signed-off-by: Scott Lee <sjl@anyscale.com>
@raulchen raulchen enabled auto-merge (squash) July 3, 2024 22:07
@github-actions github-actions bot added the go add ONLY when ready to merge, run all tests label Jul 3, 2024
sjl added 3 commits July 3, 2024 23:01
Signed-off-by: sjl <sjl@anyscale.com>
…01-count

Signed-off-by: sjl <sjl@anyscale.com>
@github-actions github-actions bot disabled auto-merge July 4, 2024 00:19
Signed-off-by: Scott Lee <sjl@anyscale.com>
Comment on lines 89 to 92
# Execute the dataset to get full schema.
ds = ds.materialize()
assert "{col1: int64, col2: int64, col3: object, col4: object}" in str(ds)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to update tests for Zip, because in the test, we call ds.count() before attempting to check the schema from the Dataset.__str__ representation. After updating ds.count() to no longer execute and get the list of underlying Blocks, the schema is unknown for N-ary operators without executing: https://github.com/ray-project/ray/blob/master/python/ray/data/_internal/plan.py#L155-L158

@scottjlee
Copy link
Contributor Author

One side effect we overlooked with this PR is, now that we avoid calling Dataset.get_internal_block_refs(), which calls ExecutionPlan.execute(), we no longer save the snapshot bundle to the ExecutionPlan after execution (which is what ExecutionPlan.execute() does). This has several effects:

  • Calling str(ds) on an unexecuted Dataset without row count from metadata will result in Dataset(num_rows=?), since the row count is always unknown prior to execution. If one calls ds.count() then str(ds), the proper num_rows will be displayed because the row count is obtained from ExecutionPlan._snapshot_bundle which is saved during ds.count().
  • Similarly, the schema for Datasets involving N-ary operators (e.g. Zip) are no longer known.

For both cases, I think it does make sense that the count / schema is unknown prior to execution, since we would need to see all of the data in order to be certain. What do you think @raulchen @bveeramani ?

Also, one optimization we could make is that whenever Dataset.count() is called, we can cache the row count to ExecutionPlan, and re-use this value in ExecutionPlan.get_plan_as_string() (equivalent to str(Dataset)), so that it's available without snapshot_bundle.

Signed-off-by: Scott Lee <sjl@anyscale.com>
@bveeramani
Copy link
Member

For both cases, I think it does make sense that the count / schema is unknown prior to execution, since we would need to see all of the data in order to be certain. What do you think @raulchen @bveeramani ?

Yeah, that sounds reasonable to me. Makes sense to me to merge this PR now and somehow cache the count and metadata in a follow-up PR.

Scott Lee added 3 commits July 8, 2024 14:58
Signed-off-by: Scott Lee <sjl@anyscale.com>
Signed-off-by: Scott Lee <sjl@anyscale.com>
Signed-off-by: Scott Lee <sjl@anyscale.com>
@scottjlee
Copy link
Contributor Author

Yeah, that sounds reasonable to me. Makes sense to me to merge this PR now and somehow cache the count and metadata in a follow-up PR.

Unfortunately can't merge this PR without implementing it, otherwise a number of tests will fail. But I have added the implementation here, waiting for tests to pass then will send for review again.

Signed-off-by: Scott Lee <sjl@anyscale.com>
@scottjlee scottjlee requested review from bveeramani and raulchen July 9, 2024 02:13
Copy link
Member

@bveeramani bveeramani left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice

# and count. This is calculated and cached when the plan is executed as an
# iterator (`execute_to_iterator()`), and avoids caching
# all of the output blocks in memory like in `self.snapshot_bundle`.
self._snapshot_metadata: Optional[BlockMetadata] = None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have any ideas off the top of my head, but I think it'd be good if we simplify the how we cache bundles and metadata at some point. Might be confusing how execute_to_iterator uses _snapshot_metadata but execute doesn't.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point, added a TODO comment

python/ray/data/tests/test_consumption.py Outdated Show resolved Hide resolved
python/ray/data/tests/test_consumption.py Outdated Show resolved Hide resolved
Signed-off-by: Scott Lee <sjl@anyscale.com>
@bveeramani bveeramani merged commit f8ee70a into ray-project:master Jul 10, 2024
5 checks passed
bveeramani pushed a commit that referenced this pull request Jul 16, 2024
…dles` instead of `(Block, BlockMetadata)` (#46575)

Followup to #46369 and
#46455.
Update `ExecutionPlan.execute_to_iterator()` to return `RefBundles`
instead of `(Block, BlockMetadata)`, to unify the logic between
`RefBundle`s and `Block`s. Also refactor the `iter_batches()` code path
accordingly to handle `RefBundle`s instead of raw `Block` and
`BlockMetadata`.

Signed-off-by: sjl <sjl@anyscale.com>
Signed-off-by: Scott Lee <sjl@anyscale.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
go add ONLY when ready to merge, run all tests
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants