Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't optimize AnalyzeExec (#6379) (try 2) #6494

Merged
merged 6 commits into from
May 31, 2023

Conversation

alamb
Copy link
Contributor

@alamb alamb commented May 30, 2023

Which issue does this PR close?

Closes #6380
Closes #6379

Rationale for this change

See #6379

What changes are included in this PR?

This is based off #6380 from @tustvold but it was getting a little big to just push to his branch so I decided to make a new PR

  1. Don't optimize the input to AnalyzeExec
  2. Update AnalyzeExec to handle multiple input streams using futures-fu
  3. Add a new test

Are these changes tested?

Yes

Are there any user-facing changes?

Less confusing EXPLAIN ANALYZE results

@github-actions github-actions bot added the core Core DataFusion crate label May 30, 2023
@alamb alamb changed the title Alamb/explain analyze Don't optimize AnalyzeExec (#6379) (try 2) May 30, 2023
)));
// Gather futures that will run each input partition using a
// JoinSet to cancel outstanding futures on drop
let mut set = JoinSet::new();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This uses the cool JoinSet I learned about from @nvartolomei and @Darksonn on #6449 ❤️

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this logic was just extracted into its own function

#[cfg_attr(tarpaulin, ignore)]
async fn csv_explain_analyze_order_by() {
let ctx = SessionContext::new();
register_aggregate_csv_by_sql(&ctx).await;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here is the new test

// Turn the tasks in the JoinSet into a stream of
// Result<usize> representing the counts of each output
// partition.
let counts_stream = futures::stream::unfold(set, |mut set| async {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think looking at https://github.com/apache/arrow-datafusion/pull/6494/files?w=1 makes it clearer what I did -- which was to change the plumbing to use futures and stream fu rather than channels

@@ -387,7 +387,7 @@ mod sql_tests {
};
let test2 = UnaryTestCase {
source_type: SourceType::Unbounded,
expect_fail: true,
expect_fail: false,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it works now!

@alamb alamb marked this pull request as ready for review May 30, 2023 21:17
Copy link
Contributor

@tustvold tustvold left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me

// Turn the tasks in the JoinSet into a stream of
// Result<usize> representing the counts of each output
// partition.
let counts_stream = futures::stream::unfold(set, |mut set| async {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I confirmed that the tokio stream adapaters don't appear to have a JoinSet impl - https://docs.rs/tokio-stream/latest/tokio_stream/wrappers/index.html?search=

let end = Instant::now();
// future that gathers the input counts into an overall output
// count, and makes an output batch
let output = counts_stream
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW you could just use a regular async move here, instead of needing the futures adapters

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is an excellent point -- I got so carried away with being clever I lost sight of that.

I rewrote this logic to use a single future and also moved the output record batch creation into a function to separate the business logic from the async orchestration: 5521a70

@tustvold
Copy link
Contributor

Test failures do not appear to be related

@alamb
Copy link
Contributor Author

alamb commented May 31, 2023

Test failures I think are due to #6495

@github-actions github-actions bot added the optimizer Optimizer rules label May 31, 2023
@alamb alamb merged commit 78def88 into apache:main May 31, 2023
@alamb alamb deleted the alamb/explain_analyze branch May 31, 2023 21:57
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate optimizer Optimizer rules
Projects
None yet
Development

Successfully merging this pull request may close these issues.

AnalyzeExec Optimized Incorrectly
2 participants