Skip to content

Commit

Permalink
terminate early on panic
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Jun 6, 2023
1 parent 56a26eb commit b1a817c
Showing 1 changed file with 19 additions and 7 deletions.
26 changes: 19 additions & 7 deletions datafusion/core/src/physical_plan/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,11 @@ use super::{ExecutionPlan, RecordBatchStream, SendableRecordBatchStream};
/// Builder for [`RecordBatchReceiverStream`] that propagates errors
/// and panic's correctly.
///
/// [`RecordBatchReceiverStream`] can be used when there are one or
/// more tasks spawned which produce RecordBatches and send them to a
/// single `Receiver`.
/// [`RecordBatchReceiverStream`] is used to spawn one or more tasks
/// that produce `RecordBatch`es and send them to a single
/// `Receiver` which can improve parallelism.
///
/// This also handles propagating panic`s and canceling the tasks.
pub struct RecordBatchReceiverStreamBuilder {
tx: Sender<Result<RecordBatch>>,
rx: Receiver<Result<RecordBatch>>,
Expand Down Expand Up @@ -94,6 +96,9 @@ impl RecordBatchReceiverStreamBuilder {

/// runs the input_partition of the `input` ExecutionPlan on the
/// tokio threadpool and writes its outputs to this stream
///
/// If the input partition produces an error, the error will be
/// sent to the output stream and no further results are sent.
pub(crate) fn run_input(
&mut self,
input: Arc<dyn ExecutionPlan>,
Expand All @@ -105,8 +110,8 @@ impl RecordBatchReceiverStreamBuilder {
self.spawn(async move {
let mut stream = match input.execute(partition, context) {
Err(e) => {
// If send fails, plan being torn down,
// there is no place to send the error.
// If send fails, the plan being torn down, there
// is no place to send the error and no reason to continue.
output.send(Err(e)).await.ok();
debug!(
"Stopping execution: error executing input: {}",
Expand Down Expand Up @@ -135,6 +140,10 @@ impl RecordBatchReceiverStreamBuilder {
// stop after the first error is encontered (don't
// drive all streams to completion)
if is_err {
debug!(
"Stopping execution: plan returned error: {}",
displayable(input.as_ref()).one_line()
);
return;
}
}
Expand All @@ -153,7 +162,7 @@ impl RecordBatchReceiverStreamBuilder {
// don't need tx
drop(tx);

// future that checks the result of the join set
// future that checks the result of the join set, and propagates panic if seen
let check = async move {
while let Some(result) = join_set.join_next().await {
match result {
Expand Down Expand Up @@ -183,7 +192,10 @@ impl RecordBatchReceiverStreamBuilder {
// unwrap Option / only return the error
.filter_map(|item| async move { item });

let inner = ReceiverStream::new(rx).chain(check_stream).boxed();
// Merge the streams together (but futures::stream:StreamExt
// is already in scope, so call it explicitly)
let inner =
tokio_stream::StreamExt::merge(ReceiverStream::new(rx), check_stream).boxed();

Box::pin(RecordBatchReceiverStream { schema, inner })
}
Expand Down

0 comments on commit b1a817c

Please sign in to comment.