-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Propagate panics #6449
Propagate panics #6449
Conversation
Another try for fixing apache#3104. RepartitionExec might need a similar fix.
My use-case can be seen here apache/datafusion#6449. I want to avoid needless Box::pin calls when I can just poll the underlying task.
// If the input stream is done, wait for all tasks to finish and return | ||
// the failure if any. | ||
if let Poll::Ready(None) = poll { | ||
match Box::pin(self.tasks.join_next()).poll_unpin(cx) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this would help here a bit tokio-rs/tokio#5721
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While I agree that it makes sense to add a poll_join_next
method, you don't have to box here. You can use the pin!
macro instead of boxing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for this contribution @nvartolomei 🙏
Initially I thought about propagating the JoinError, but (re)panicking seems nicer and helps debugging better.
I agree it is nicer, but I think propagating the JoinError is more consistent with the rest of the DataFusion codebase. For example RepartitionExec
In IOx, I think we solved a similar problem using a WatchedTask
Maybe we could apply a similar pattern in DataFusion
cc @crepererum or @tustvold in case you have some thoughts in this area
} | ||
|
||
Ok(Box::pin(MergeStream { | ||
input: receiver, | ||
schema: self.schema(), | ||
baseline_metrics, | ||
drop_helper: AbortOnDropMany(join_handles), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤔 it does look like https://docs.rs/tokio/latest/tokio/task/struct.JoinSet.html may be a nicer alternative to AbortOnDrop
-- what do you think @crepererum ?
// If the input stream is done, wait for all tasks to finish and return | ||
// the failure if any. | ||
if let Poll::Ready(None) = poll { | ||
let fut = self.tasks.join_next(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like this only checks the first future that finishes. Would it work if there were multiple input tasks and the first one that finished was not the one that panic
d?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW I have been working on writing a test for this as part of #6507
…h_id` public My use-case can be seen here apache/datafusion#6449. I want to avoid needless Box::pin calls when I can just poll the underlying task.
My use-case can be seen here apache/datafusion#6449. I want to avoid needless Box::pin calls when I can just poll the underlying task.
My use-case can be seen here apache/datafusion#6449. I want to avoid needless Box::pin calls when I can just poll the underlying task.
My use-case can be seen here apache/datafusion#6449. I want to avoid needless Box::pin calls when I can just poll the underlying task.
Good fix and test, thank you. Could you test+fix Short side note on |
@@ -98,12 +98,13 @@ fn build_file_list_recurse( | |||
|
|||
/// Spawns a task to the tokio threadpool and writes its outputs to the provided mpsc sender | |||
pub(crate) fn spawn_execution( | |||
join_set: &mut JoinSet<()>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
I filed #6513 |
Another try for fixing #3104.
RepartitionExec might need a similar fix.
Initially I thought about propagating the JoinError, but (re)panicking seems nicer and helps debugging better.
What changes are included in this PR?
Use tokio::task::JoinSet to handle spawning and tearing down of tasks.
Are these changes tested?
Yes
Are there any user-facing changes?
No