Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Datasets] [Out-of-Band Serialization: 1/3] Refactor LazyBlockList. #23821

Merged

Conversation

clarkzinzow
Copy link
Contributor

@clarkzinzow clarkzinzow commented Apr 10, 2022

This PR refactors LazyBlockList in service of out-of-band serialization (see mono-PR) and is a precursor to an execution plan refactor (PR #2) and adding the actual out-of-band serialization APIs (PR #3). The following is included in this refactor:

  1. ReadTasks are now a first-class concept, replacing calls;
  2. read stage progress tracking is consolidated into LazyBlockList._get_blocks_with_metadta() and more of the read task complexity, e.g. the read remote function, was pushed into LazyBlockList to make ray.data.read_datasource() simpler;
  3. we are a bit smarter with how we progressively launch tasks and fetch and cache metadata, including fetching the metadata for read tasks in .iter_blocks_with_metadata() instead of relying on the pre-read task metadata (which will be less accurate), and we also fix some small bugs in the lazy ramp-up around progressive metadata fetching.

(1) is the most important item for supporting out-of-band serialization and fundamentally changes the LazyBlockList data model. This is required since we need to be able to reference the underlying read tasks when rewriting read stages during optimization and when serializing the lineage of the Dataset. See the mono-PR for more context.

Other changes:

  1. Changed stats actor to a global named actor singleton in order to obviate the need for serializing the actor handle with the Dataset stats; without this, we were encountering serialization failures.

Checks

  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

@clarkzinzow clarkzinzow force-pushed the datasets/feat/refactor-lazy-block-list branch from fd7f3a7 to d9b2891 Compare April 10, 2022 08:48
Copy link
Contributor

@ericl ericl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few questions on why the test cases change, but the high level approach looks good.

@jianoaix can you take the detailed review?

python/ray/data/impl/lazy_block_list.py Outdated Show resolved Hide resolved
This serves as a cache of fetched block metadata.
ray_remote_args: Ray remote arguments for the read tasks.
stats_uuid: UUID for the dataset stats, used to group and fetch read task
stats. If not provided, a new UUID will be created.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is great.

python/ray/data/tests/test_dataset_formats.py Outdated Show resolved Hide resolved
@@ -750,7 +748,7 @@ def test_numpy_read(ray_start_regular_shared, tmp_path):
np.save(os.path.join(path, "test.npy"), np.expand_dims(np.arange(0, 10), 1))
ds = ray.data.read_numpy(path)
assert str(ds) == (
"Dataset(num_blocks=1, num_rows=None, "
"Dataset(num_blocks=1, num_rows=10, "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, how are we able to get the number of rows here without eager reading?

Copy link
Contributor Author

@clarkzinzow clarkzinzow Apr 11, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAICT, before this PR, we were already eagerly submitting the first read task AND blocking on the read task finishing in order to fetch the schema in this case (1, 2). After this PR, we fetch all of the block metadata so we can also get an more informative view of e.g. the number of rows for datasources lacking metadata peeking, as in this case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code linked seems still just look at the metadata for the first block? Also the above "test_numpy_roundtrip" test still has num_rows=None.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code linked seems still just look at the metadata for the first block?

@jianoaix That's an older commit in this PR, as you can see in the current state, we block and fetch the metadata for the first block if the schema isn't set.

Also the above "test_numpy_roundtrip" test still has num_rows=None.

The reasoning for that is because the dataset for the test_numpy_read dataset only consists of a single block, so our blocking and fetching the metadata for the first block gives us the exact number of rows in the Dataset, and we can calculate the number of rows off of the block metadata alone. The test_numpy_roundtrip dataset, on the other hand, has 2 blocks, so when we block and fetch the metadata for the first block, we end up having the number of rows for only one of the two blocks, so we can't calculate the number of rows in the full dataset via the block metadata alone.

@ericl ericl added the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label Apr 10, 2022
@ericl
Copy link
Contributor

ericl commented Apr 10, 2022

already-read block partitions are reused when possible to prevent unnecessary re-reads;

I'm a little worried about this introducing unnecessary complexity in the timing / stats, but if you and Jian think it's a good idea I'm ok with it.

@clarkzinzow
Copy link
Contributor Author

clarkzinzow commented Apr 11, 2022

I'm a little worried about this introducing unnecessary complexity in the timing / stats, but if you and Jian think it's a good idea I'm ok with it.

Agreed about the stats; the end-result will be that, for a fused read + transformation stage, the stats for the task operating on the already-read block will only include the transformation time, not the read time. However, I still think that it's worth it in order to avoid unnecessary re-reading, and if the stats discrepancy is untenable, I'd rather do something like a best-effort merge of the pre-fusion read stats + the post-fusion read + transform stats than do unnecessary file re-reads.

@jianoaix
Copy link
Contributor

I'm a little worried about this introducing unnecessary complexity in the timing / stats, but if you and Jian think it's a good idea I'm ok with it.

Agreed about the stats; the end-result will be that, for a fused read + transformation stage, the stats for the task operating on the already-read block will only include the transformation time, not the read time. However, I still think that it's worth it in order to avoid unnecessary re-reading, and if the stats discrepancy is untenable, I'd rather do something like a best-effort merge of the pre-fusion read stats + the post-fusion read + transform stats than do unnecessary file re-reads.

Based on our offline discussion, how about drop this optimization for now in this PR, and evaluate in follow-up whether it's worth the complexity we may add?

Copy link
Contributor

@jianoaix jianoaix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you Clark for splitting the mono-PR. The splitting and this PR itself look good to me!

python/ray/data/impl/lazy_block_list.py Show resolved Hide resolved
python/ray/data/impl/lazy_block_list.py Outdated Show resolved Hide resolved
fetched_metadata: An optional list of already computed AND fetched metadata.
This serves as a cache of fetched block metadata.
ray_remote_args: Ray remote arguments for the read tasks.
stats_uuid: UUID for the dataset stats, used to group and fetch read task
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This UUID is per dataset, or per <dataset, stage>? If it's the later, name it more accurately will be helpful.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This UUID is per stats object, and amounts to ~per read stage so is basically ~per dataset, but predates the creation of a Dataset and its UUID, hence us have two different UUIDs.

The UUID is only used for gathering stats about read stages, so we could change this to read_stats_uuid if that makes it more clear.

python/ray/data/impl/lazy_block_list.py Outdated Show resolved Hide resolved
python/ray/data/impl/lazy_block_list.py Show resolved Hide resolved
python/ray/data/impl/lazy_block_list.py Outdated Show resolved Hide resolved
python/ray/data/impl/lazy_block_list.py Show resolved Hide resolved
python/ray/data/impl/lazy_block_list.py Outdated Show resolved Hide resolved
@clarkzinzow
Copy link
Contributor Author

Based on our offline discussion, how about drop this optimization for now in this PR, and evaluate in follow-up whether it's worth the complexity we may add?

@jianoaix I'll time-box the stats merging that we talked about (15 minutes), and if it's not straightforward, I'll revert that optimization and add it in a follow-up PR!

@clarkzinzow clarkzinzow force-pushed the datasets/feat/refactor-lazy-block-list branch from 703bace to dade522 Compare April 13, 2022 04:59
@clarkzinzow
Copy link
Contributor Author

@jianoaix I did the time-boxed investigation of stats merging: the merging itself is simple, the complicated bit is propagating the partial-read stats to the downstream read + transform stats creation, which would require adding a "partially computed stats" concept that's passed around in a clunky way. Ultimately, opted to remove the optimization.

python/ray/data/impl/plan.py Outdated Show resolved Hide resolved
@@ -750,7 +748,7 @@ def test_numpy_read(ray_start_regular_shared, tmp_path):
np.save(os.path.join(path, "test.npy"), np.expand_dims(np.arange(0, 10), 1))
ds = ray.data.read_numpy(path)
assert str(ds) == (
"Dataset(num_blocks=1, num_rows=None, "
"Dataset(num_blocks=1, num_rows=10, "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code linked seems still just look at the metadata for the first block? Also the above "test_numpy_roundtrip" test still has num_rows=None.

python/ray/data/impl/lazy_block_list.py Outdated Show resolved Hide resolved
from ray.data.impl.block_list import BlockList
from ray.data.impl.progress_bar import ProgressBar
from ray.data.impl.remote_fn import cached_remote_fn
from ray.data.impl.stats import DatasetStats, _get_or_create_stats_actor


class LazyBlockList(BlockList):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add some unit tests for the public methods of this class?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm happy to do this as a follow up, but since we don't currently have any unit testing at this layer (all unit testing is done on the Dataset and DatasetPipeline methods), I'd prefer not to block out-of-band serialization from landing on adding a new testing layer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think in generally we should strive for covering the code with unit test in the same PR, rather than followups, which is essentially a (short-term) tech debt; especially for newly added classes/functions, or new public methods in existing class. This PR is mostly refactoring, and we still jam unit tests together so far, so LG to move forward.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Strongly agree, I'm not suggesting that we should add new abstractions and the corresponding unit tests in separate PRs, I'm pushing for not making a large test coverage change in this PR for a refactoring of an existing abstraction that's blocking some AIR features. I would really like to have a much more strict unit testing standard, since most of our Python tests in core Ray and our ecosystem are technically integration tests, with many intermediate edge cases uncovered. So I'm very happy that you're of the same mind, definitely looking forward to pushing on this!

@jianoaix
Copy link
Contributor

@jianoaix I did the time-boxed investigation of stats merging: the merging itself is simple, the complicated bit is propagating the partial-read stats to the downstream read + transform stats creation, which would require adding a "partially computed stats" concept that's passed around in a clunky way. Ultimately, opted to remove the optimization.

Sounds good, thanks for quick investigation! Please update the PR description and drop the item #2.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
@author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants