Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix panic propagation in CoalescePartitions, consolidates panic propagation into RecordBatchReceiverStream #6507

Merged
merged 18 commits into from
Jun 6, 2023
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 8 additions & 24 deletions datafusion/core/src/physical_plan/analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,9 @@ use crate::{
};
use arrow::{array::StringBuilder, datatypes::SchemaRef, record_batch::RecordBatch};
use futures::StreamExt;
use tokio::task::JoinSet;

use super::expressions::PhysicalSortExpr;
use super::stream::RecordBatchStreamAdapter;
use super::stream::{RecordBatchReceiverStream, RecordBatchStreamAdapter};
use super::{Distribution, SendableRecordBatchStream};
use datafusion_execution::TaskContext;

Expand Down Expand Up @@ -121,23 +120,15 @@ impl ExecutionPlan for AnalyzeExec {
// Gather futures that will run each input partition in
// parallel (on a separate tokio task) using a JoinSet to
// cancel outstanding futures on drop
let mut set = JoinSet::new();
let num_input_partitions = self.input.output_partitioning().partition_count();
let mut builder =
RecordBatchReceiverStream::builder(self.schema(), num_input_partitions);

for input_partition in 0..num_input_partitions {
let input_stream = self.input.execute(input_partition, context.clone());

set.spawn(async move {
let mut total_rows = 0;
let mut input_stream = input_stream?;
while let Some(batch) = input_stream.next().await {
let batch = batch?;
total_rows += batch.num_rows();
}
Ok(total_rows) as Result<usize>
});
builder.run_input(self.input.clone(), input_partition, context.clone());
}

// Create future that computes thefinal output
let start = Instant::now();
let captured_input = self.input.clone();
let captured_schema = self.schema.clone();
Expand All @@ -146,18 +137,11 @@ impl ExecutionPlan for AnalyzeExec {
// future that gathers the results from all the tasks in the
// JoinSet that computes the overall row count and final
// record batch
let mut input_stream = builder.build();
let output = async move {
let mut total_rows = 0;
while let Some(res) = set.join_next().await {
// translate join errors (aka task panic's) into ExecutionErrors
match res {
Ok(row_count) => total_rows += row_count?,
Err(e) => {
return Err(DataFusionError::Execution(format!(
"Join error in AnalyzeExec: {e}"
)))
}
}
while let Some(batch) = input_stream.next().await.transpose()? {
total_rows += batch.num_rows();
}

let duration = Instant::now() - start;
Expand Down
75 changes: 25 additions & 50 deletions datafusion/core/src/physical_plan/coalesce_partitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,19 @@

use std::any::Any;
use std::sync::Arc;
use std::task::Poll;

use futures::Stream;
use tokio::sync::mpsc;

use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;

use super::common::AbortOnDropMany;
use super::expressions::PhysicalSortExpr;
use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use super::{RecordBatchStream, Statistics};
use super::stream::{ObservedStream, RecordBatchReceiverStream};
use super::Statistics;
use crate::error::{DataFusionError, Result};
use crate::physical_plan::{
DisplayFormatType, EquivalenceProperties, ExecutionPlan, Partitioning,
};

use super::SendableRecordBatchStream;
use crate::physical_plan::common::spawn_execution;
use datafusion_execution::TaskContext;

/// Merge execution plan executes partitions in parallel and combines them into a single
Expand Down Expand Up @@ -137,27 +131,17 @@ impl ExecutionPlan for CoalescePartitionsExec {
// use a stream that allows each sender to put in at
// least one result in an attempt to maximize
// parallelism.
let (sender, receiver) =
mpsc::channel::<Result<RecordBatch>>(input_partitions);
let mut builder =
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am quite pleased that this is now all encapsulated into RecordBatchReceiverStream

RecordBatchReceiverStream::builder(self.schema(), input_partitions);

// spawn independent tasks whose resulting streams (of batches)
// are sent to the channel for consumption.
let mut join_handles = Vec::with_capacity(input_partitions);
for part_i in 0..input_partitions {
join_handles.push(spawn_execution(
self.input.clone(),
sender.clone(),
part_i,
context.clone(),
));
builder.run_input(self.input.clone(), part_i, context.clone());
}

Ok(Box::pin(MergeStream {
input: receiver,
schema: self.schema(),
baseline_metrics,
drop_helper: AbortOnDropMany(join_handles),
}))
let stream = builder.build();
Ok(Box::pin(ObservedStream::new(stream, baseline_metrics)))
}
}
}
Expand All @@ -183,32 +167,6 @@ impl ExecutionPlan for CoalescePartitionsExec {
}
}

struct MergeStream {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I basically taught RecordBatchReceiverStream how to propagate panics and then updated CoalescePartitionsExec to use it

schema: SchemaRef,
input: mpsc::Receiver<Result<RecordBatch>>,
baseline_metrics: BaselineMetrics,
#[allow(unused)]
drop_helper: AbortOnDropMany<()>,
}

impl Stream for MergeStream {
type Item = Result<RecordBatch>;

fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
let poll = self.input.poll_recv(cx);
self.baseline_metrics.record_poll(poll)
}
}

impl RecordBatchStream for MergeStream {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
}

#[cfg(test)]
mod tests {

Expand All @@ -218,7 +176,9 @@ mod tests {
use super::*;
use crate::physical_plan::{collect, common};
use crate::prelude::SessionContext;
use crate::test::exec::{assert_strong_count_converges_to_zero, BlockingExec};
use crate::test::exec::{
assert_strong_count_converges_to_zero, BlockingExec, PanicExec,
};
use crate::test::{self, assert_is_pending};

#[tokio::test]
Expand Down Expand Up @@ -270,4 +230,19 @@ mod tests {

Ok(())
}

#[tokio::test]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the new test from @nvartolomei

#[should_panic(expected = "PanickingStream did panic")]
async fn test_panic() {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let schema =
Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, true)]));

let panicking_exec = Arc::new(PanicExec::new(Arc::clone(&schema), 2));
let coalesce_partitions_exec =
Arc::new(CoalescePartitionsExec::new(panicking_exec));

collect(coalesce_partitions_exec, task_ctx).await.unwrap();
}
}
56 changes: 9 additions & 47 deletions datafusion/core/src/physical_plan/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,20 @@ use super::SendableRecordBatchStream;
use crate::error::{DataFusionError, Result};
use crate::execution::memory_pool::MemoryReservation;
use crate::physical_plan::stream::RecordBatchReceiverStream;
use crate::physical_plan::{displayable, ColumnStatistics, ExecutionPlan, Statistics};
use crate::physical_plan::{ColumnStatistics, ExecutionPlan, Statistics};
use arrow::datatypes::Schema;
use arrow::ipc::writer::{FileWriter, IpcWriteOptions};
use arrow::record_batch::RecordBatch;
use datafusion_execution::TaskContext;
use datafusion_physical_expr::expressions::{BinaryExpr, Column};
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};
use futures::{Future, StreamExt, TryStreamExt};
use log::debug;
use parking_lot::Mutex;
use pin_project_lite::pin_project;
use std::fs;
use std::fs::{metadata, File};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::task::{Context, Poll};
use tokio::sync::mpsc;
use tokio::task::JoinHandle;

/// [`MemoryReservation`] used across query execution streams
Expand Down Expand Up @@ -96,65 +93,30 @@ fn build_file_list_recurse(
Ok(())
}

/// Spawns a task to the tokio threadpool and writes its outputs to the provided mpsc sender
pub(crate) fn spawn_execution(
input: Arc<dyn ExecutionPlan>,
output: mpsc::Sender<Result<RecordBatch>>,
partition: usize,
context: Arc<TaskContext>,
) -> JoinHandle<()> {
tokio::spawn(async move {
let mut stream = match input.execute(partition, context) {
Err(e) => {
// If send fails, plan being torn down,
// there is no place to send the error.
output.send(Err(e)).await.ok();
debug!(
"Stopping execution: error executing input: {}",
displayable(input.as_ref()).one_line()
);
return;
}
Ok(stream) => stream,
};

while let Some(item) = stream.next().await {
// If send fails, plan being torn down,
// there is no place to send the error.
if output.send(item).await.is_err() {
debug!(
"Stopping execution: output is gone, plan cancelling: {}",
displayable(input.as_ref()).one_line()
);
return;
}
}
})
}

/// If running in a tokio context spawns the execution of `stream` to a separate task
/// allowing it to execute in parallel with an intermediate buffer of size `buffer`
pub(crate) fn spawn_buffered(
mut input: SendableRecordBatchStream,
buffer: usize,
) -> SendableRecordBatchStream {
// Use tokio only if running from a tokio context (#2201)
let handle = match tokio::runtime::Handle::try_current() {
Ok(handle) => handle,
Err(_) => return input,
if tokio::runtime::Handle::try_current().is_err() {
return input;
};

let schema = input.schema();
let (sender, receiver) = mpsc::channel(buffer);
let join = handle.spawn(async move {
let mut builder = RecordBatchReceiverStream::builder(input.schema(), buffer);

let sender = builder.tx();

builder.spawn(async move {
while let Some(item) = input.next().await {
if sender.send(item).await.is_err() {
return;
}
}
});

RecordBatchReceiverStream::create(&schema, receiver, join)
builder.build()
}

/// Computes the statistics for an in-memory RecordBatch
Expand Down
16 changes: 7 additions & 9 deletions datafusion/core/src/physical_plan/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ use std::io::BufReader;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tempfile::NamedTempFile;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::sync::mpsc::Sender;
use tokio::task;

struct ExternalSorterMetrics {
Expand Down Expand Up @@ -373,18 +373,16 @@ fn read_spill_as_stream(
path: NamedTempFile,
schema: SchemaRef,
) -> Result<SendableRecordBatchStream> {
let (sender, receiver): (Sender<Result<RecordBatch>>, Receiver<Result<RecordBatch>>) =
tokio::sync::mpsc::channel(2);
let join_handle = task::spawn_blocking(move || {
let mut builder = RecordBatchReceiverStream::builder(schema, 2);
let sender = builder.tx();

builder.spawn_blocking(move || {
if let Err(e) = read_spill(sender, path.path()) {
error!("Failure while reading spill file: {:?}. Error: {}", path, e);
}
});
Ok(RecordBatchReceiverStream::create(
&schema,
receiver,
join_handle,
))

Ok(builder.build())
}

fn write_sorted(
Expand Down
13 changes: 6 additions & 7 deletions datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -792,21 +792,20 @@ mod tests {
let mut streams = Vec::with_capacity(partition_count);

for partition in 0..partition_count {
let (sender, receiver) = tokio::sync::mpsc::channel(1);
let mut builder = RecordBatchReceiverStream::builder(schema.clone(), 1);

let sender = builder.tx();

let mut stream = batches.execute(partition, task_ctx.clone()).unwrap();
let join_handle = tokio::spawn(async move {
builder.spawn(async move {
while let Some(batch) = stream.next().await {
sender.send(batch).await.unwrap();
// This causes the MergeStream to wait for more input
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
}
});

streams.push(RecordBatchReceiverStream::create(
&schema,
receiver,
join_handle,
));
streams.push(builder.build());
}

let metrics = ExecutionPlanMetricsSet::new();
Expand Down
Loading