Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enforce dtypes in P2P shuffle #7879

Merged
merged 4 commits into from
Jun 2, 2023

Conversation

hendrikmakait
Copy link
Member

@hendrikmakait hendrikmakait commented Jun 2, 2023

Closes #7420
Closes dask/dask#10326
@jrbourbeau: As discussed offline yesterday, here's the version that uses meta instead of a pa.Schema

  • Tests added / passed
  • Passes pre-commit run --all-files

@hendrikmakait hendrikmakait requested a review from fjetter as a code owner June 2, 2023 15:33
@hendrikmakait
Copy link
Member Author

cc @wence- to keep you in the loop.

return pa.concat_tables(shards)
table = pa.concat_tables(shards)
df = table.to_pandas(self_destruct=True)
return df.astype(meta.dtypes)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might not be the most performant version, but I also don't know if it's much of a problem. I'll run an A/B test on the existing test suite.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My guess is it'd be good to avoid a cast is possible (at least for strings) via type_mapper= in pa.Table.to_pandas(). For example, as is, this will create object-backed string[python] columns first and then cast them to string[pyarrow].

This definitely isn't a blocker for this PR, but let's add a # TODO: comment if we don't include that logic now

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll create a follow-up ticket once this is merged.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like a good first pass. Thanks for putting this together so quickly. My guess is that generating the object dtype columns will slow things down, both for normal reasons, and for GIL + networking reasons.

I like that we didn't feel a need to block on this though.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes this slows stuff down significantly, mapping the strings (pa.string and pa.large_string) to pd.ArrowDtype makes this zero copy and should speed up follow up as types

left = ext.get_output_partition(
shuffle_id_left, barrier_left, output_partition
).drop(columns=_HASH_COLUMN_NAME)
).drop(columns=_HASH_COLUMN_NAME, errors="ignore")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is inelegant, but so is adding the hash column to the meta. If anybody has strong preferences, please speak up.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems fine to me. Thanks for adding the informative comment 👌

@hendrikmakait hendrikmakait added needs review Needs review from a contributor. shuffle labels Jun 2, 2023
Copy link
Member

@jrbourbeau jrbourbeau left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! Thanks @hendrikmakait. Overall the changes here look good to me. I left some minor comments / questions

Have you tried this out with the example in dask/dask#10326?

@@ -54,7 +54,9 @@ def convert_partition(data: bytes) -> pa.Table:
while file.tell() < end:
sr = pa.RecordBatchStreamReader(file)
shards.append(sr.read_all())
return pa.concat_tables(shards)
table = pa.concat_tables(shards)
df = table.to_pandas(self_destruct=True)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little nervous about self_destruct=True as the docstring (https://arrow.apache.org/docs/python/generated/pyarrow.Table.html#pyarrow.Table.to_pandas) says it's experimental and "If you use the object after calling to_pandas with this option it will crash your program".

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I took it as a recommendation from https://arrow.apache.org/docs/python/pandas.html#reducing-memory-use-in-table-to-pandas

From what I understand, it would wreck havoc if we used table after the call to to_pandas. Given that we return after the next line, that shouldn't be a problem.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing to the extra docs. I guess I was nervous about pyarrow-backed dtypes in pandas specifically. But I would hope in that case memory wouldn't be deallocated if pandas was using it after the pyarrow -> pandas handoff. If we're running this against a few use cases that do things after a shuffle (which is sounds like we are) then we should have sufficient coverage

return pa.concat_tables(shards)
table = pa.concat_tables(shards)
df = table.to_pandas(self_destruct=True)
return df.astype(meta.dtypes)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My guess is it'd be good to avoid a cast is possible (at least for strings) via type_mapper= in pa.Table.to_pandas(). For example, as is, this will create object-backed string[python] columns first and then cast them to string[pyarrow].

This definitely isn't a blocker for this PR, but let's add a # TODO: comment if we don't include that logic now

left = ext.get_output_partition(
shuffle_id_left, barrier_left, output_partition
).drop(columns=_HASH_COLUMN_NAME)
).drop(columns=_HASH_COLUMN_NAME, errors="ignore")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems fine to me. Thanks for adding the informative comment 👌

@@ -100,13 +102,13 @@ def rearrange_by_column_p2p(
) -> DataFrame:
from dask.dataframe import DataFrame

check_dtype_support(df._meta)
meta = df._meta.copy()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the copy here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see this is so we can reuse the same meta a few lines below. I'm now wondering why we need a copy there

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can skip the copy here as long as we have the other one in place.


out = await self.offload(_)
except KeyError:
out = self.schema.empty_table().to_pandas()
out = self.meta.copy()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar question here re: copy

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am hesitant to return several references to the same dataframe object for independent partitions since they are not immutable.

Comment on lines +728 to +730
f"col{next(counter)}": pd.array(
["lorem ipsum"] * 100,
dtype="string[python]",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding the extra test coverage here

@jrbourbeau
Copy link
Member

Hmm looks like there are some related test failures popping up https://github.com/dask/distributed/actions/runs/5157181512/jobs/9289216557?pr=7879

@hendrikmakait
Copy link
Member Author

Have you tried this out with the example in dask/dask#10326?

Yes, here are the results:

before

95.183360131
209.716807107

after

95.183360131
89.09515961

@hendrikmakait
Copy link
Member Author

Hmm looks like there are some related test failures popping up https://github.com/dask/distributed/actions/runs/5157181512/jobs/9289216557?pr=7879

Whoops, forgot to check in a couple of changes to the tests.

@jrbourbeau jrbourbeau removed the needs review Needs review from a contributor. label Jun 2, 2023
@github-actions
Copy link
Contributor

github-actions bot commented Jun 2, 2023

Unit Test Results

See test report for an extended history of previous test failures. This is useful for diagnosing flaky tests.

       20 files  ±0         20 suites  ±0   11h 29m 14s ⏱️ - 18m 24s
  3 659 tests ±0    3 551 ✔️ +5     108 💤 ±0  0  - 5 
35 380 runs  ±0  33 614 ✔️ +4  1 766 💤 +1  0  - 5 

Results for commit d2a69cd. ± Comparison against base commit 8301cb7.

Copy link
Member

@jrbourbeau jrbourbeau left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @hendrikmakait!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

p2p shuffled pandas data takes more memory string[pyarrow] dtype does not roundtrip in P2P shuffling
4 participants