-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix panic propagation in CoalescePartitions
, consolidates panic propagation into RecordBatchReceiverStream
#6507
Conversation
Another try for fixing apache#3104. RepartitionExec might need a similar fix.
@@ -183,32 +168,6 @@ impl ExecutionPlan for CoalescePartitionsExec { | |||
} | |||
} | |||
|
|||
struct MergeStream { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I basically taught RecordBatchReceiverStream
how to propagate panics and then updated CoalescePartitionsExec
to use it
@@ -270,4 +231,19 @@ mod tests { | |||
|
|||
Ok(()) | |||
} | |||
|
|||
#[tokio::test] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the new test from @nvartolomei
d887201
to
fe8b82d
Compare
b9535e7
to
e1c827a
Compare
|
||
/// Stream wrapper that records `BaselineMetrics` for a particular | ||
/// partition | ||
pub(crate) struct ObservedStream { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved from Union so it can be reused
let num_partitions = 2; | ||
let input = PanicingExec::new(schema.clone(), num_partitions) | ||
.with_partition_panic(0, 10) | ||
.with_partition_panic(1, 3); // partition 1 should panic first (after 3 ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is a test showing that when the second partition panic's it is properly reported
@@ -560,40 +561,6 @@ impl Stream for CombinedRecordBatchStream { | |||
} | |||
} | |||
|
|||
/// Stream wrapper that records `BaselineMetrics` for a particular |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
moved
@@ -137,27 +131,17 @@ impl ExecutionPlan for CoalescePartitionsExec { | |||
// use a stream that allows each sender to put in at | |||
// least one result in an attempt to maximize | |||
// parallelism. | |||
let (sender, receiver) = | |||
mpsc::channel::<Result<RecordBatch>>(input_partitions); | |||
let mut builder = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am quite pleased that this is now all encapsulated into RecordBatchReceiverStream
CoalescePartitions
, consolidates panic propagation into RecordBatchReceiverStream
// unwrap Option / only return the error | ||
.filter_map(|item| async move { item }); | ||
|
||
let inner = ReceiverStream::new(rx).chain(check_stream).boxed(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change looks much better than mine. Wondering if it can be improved further and if it makes sense at all.
With this implementation, the panics will be propagated only after all input (from other partitions is consumed). Probably fine, as this shouldn't happen during normal operation and is more of a correctness check. Also, the check
future will not make any progress up until all the inputs are exhausted. Shouldn't be much work, fine for it to be sequential.
As an alternative, what if we build a "supervisor" task (tokio::spawn
) which is launched to do all that work, and then in the check_stream
we just check the JoinHandle
of the "supervisor" task? This way the supervisor task will be able to make progress concurrently and panic/report errors early.
Thought about this after looking at RepartitionStream
which would need something similar (supervisor) to get task failures and then multiplex them to all the output partitions. Then, all "output streams" would only have to ensure that the supervisor didn't die. Currently, in RepartitionStream
there are |output partitions|
"supervisors" (wait_for_task
) which aren't checked for success either. Wondering if it could fail at all though (tokio-rs/tokio#5744).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://docs.rs/futures/latest/futures/prelude/stream/trait.StreamExt.html#method.take_while is possibly a better way to formulate this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With this implementation, the panics will be propagated only after all input
This is my worry as well. I think you could move the check future into another task (that holds the join set and is also aborted on drop, like a two-level join set) and that sends the error to tx
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am working on tests for this behavior
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe I fixed this in b1a817c
After trying several other approaches, I found https://docs.rs/tokio-stream/latest/tokio_stream/trait.StreamExt.html#method.merge which did exactly what I wanted 💯
It is tested in record_batch_receiver_stream_propagates_panics_early_shutdown
|
||
while let Some(item) = stream.next().await { | ||
// If send fails, plan being torn down, | ||
// there is no place to send the error. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should also short-circuit if item is an error, I think it will currently drive execution to completion
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in 56a26eb
Tested in record_batch_receiver_stream_error_does_not_drive_completion
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like where this is headed, left some comments to potentially improve it further
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be nice to be a bit more eager w/ error reporting, but this is at least better than the status quo.
// unwrap Option / only return the error | ||
.filter_map(|item| async move { item }); | ||
|
||
let inner = ReceiverStream::new(rx).chain(check_stream).boxed(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With this implementation, the panics will be propagated only after all input
This is my worry as well. I think you could move the check future into another task (that holds the join set and is also aborted on drop, like a two-level join set) and that sends the error to tx
.
I plan to work on improving the panic checks to be more eager later today |
Co-authored-by: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com>
…sion into alamb/propagate_error
I believe I have resolved all outstanding comments in this PR. Please take another look if you have time |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me, left some minor comments you can take or leave
|
||
// Merge the streams together so whichever is ready first | ||
// produces the batch (since futures::stream:StreamExt is | ||
// already in scope, need to call it explicitly) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW https://docs.rs/futures/latest/futures/stream/fn.select.html is the futures crate version of this, not sure if there is a material difference between the two impls
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed in fb17af8 -- I didn't see select
. TIL!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(it also inspired me to do #6565)
// the JoinSet were aborted, which in turn | ||
// would imply that the receiver has been | ||
// dropped and this code is not running | ||
return Some(Err(DataFusionError::Internal(format!( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this is unreachable (which I'm fairly certain it is) I'm not sure why we don't just panic here, making this future infallible and therefore an ideal candidate for https://docs.rs/futures/latest/futures/stream/trait.StreamExt.html#method.take_until
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't studied the tokio JoinHandle code or under what conditions it currently or in the future might return an error (like if the task is canceled in some way will it error??) .
Given that the API returns an error I think handling and propagating the error is the most future proof thing to do.
great work @alamb! 👏 |
Thank you @nvartolomei for starting the process (and providing the tests!) |
Which issue does this PR close?
Based on #6449 from @nvartolomei
Closes #3104
Closes #6449
Rationale for this change
I wanted to centralize the logic for propagating
panic
s from tasks.What changes are included in this PR?
RecordBatchReceiverStreamBuilder
which handles doing the abort-on-drop dance using tokio::task::JoinSet as shown by @nvartolomeiCoalsceExec
andAnalyzeExec
to use this builderAre these changes tested?
Yes
Are there any user-facing changes?
Yes, panic's are not ignored