Skip to content

Commit

Permalink
Consolidate panic propagation into RecordBatchReceiverStream
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed May 31, 2023
1 parent 742597a commit fe8b82d
Show file tree
Hide file tree
Showing 8 changed files with 241 additions and 235 deletions.
33 changes: 9 additions & 24 deletions datafusion/core/src/physical_plan/analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,9 @@ use crate::{
};
use arrow::{array::StringBuilder, datatypes::SchemaRef, record_batch::RecordBatch};
use futures::StreamExt;
use tokio::task::JoinSet;

use super::expressions::PhysicalSortExpr;
use super::stream::RecordBatchStreamAdapter;
use super::stream::{RecordBatchReceiverStream, RecordBatchStreamAdapter};
use super::{Distribution, SendableRecordBatchStream};
use crate::execution::context::TaskContext;

Expand Down Expand Up @@ -121,23 +120,15 @@ impl ExecutionPlan for AnalyzeExec {
// Gather futures that will run each input partition in
// parallel (on a separate tokio task) using a JoinSet to
// cancel outstanding futures on drop
let mut set = JoinSet::new();
let num_input_partitions = self.input.output_partitioning().partition_count();
let mut builder =
RecordBatchReceiverStream::builder(self.schema(), num_input_partitions);

for input_partition in 0..num_input_partitions {
let input_stream = self.input.execute(input_partition, context.clone());

set.spawn(async move {
let mut total_rows = 0;
let mut input_stream = input_stream?;
while let Some(batch) = input_stream.next().await {
let batch = batch?;
total_rows += batch.num_rows();
}
Ok(total_rows) as Result<usize>
});
builder.run_input(self.input.clone(), input_partition, context.clone());
}

// Create future that computes thefinal output
let start = Instant::now();
let captured_input = self.input.clone();
let captured_schema = self.schema.clone();
Expand All @@ -146,18 +137,12 @@ impl ExecutionPlan for AnalyzeExec {
// future that gathers the results from all the tasks in the
// JoinSet that computes the overall row count and final
// record batch
let mut input_stream = builder.build();
let output = async move {
let mut total_rows = 0;
while let Some(res) = set.join_next().await {
// translate join errors (aka task panic's) into ExecutionErrors
match res {
Ok(row_count) => total_rows += row_count?,
Err(e) => {
return Err(DataFusionError::Execution(format!(
"Join error in AnalyzeExec: {e}"
)))
}
}
while let Some(batch) = input_stream.next().await {
let batch = batch?;
total_rows += batch.num_rows();
}

let duration = Instant::now() - start;
Expand Down
79 changes: 7 additions & 72 deletions datafusion/core/src/physical_plan/coalesce_partitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,21 @@
//! into a single partition
use std::any::Any;
use std::panic;
use std::sync::Arc;
use std::task::Poll;

use futures::{Future, Stream};
use tokio::sync::mpsc;

use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
use tokio::task::JoinSet;

use super::expressions::PhysicalSortExpr;
use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use super::{RecordBatchStream, Statistics};
use super::stream::{ObservedStream, RecordBatchReceiverStream};
use super::Statistics;
use crate::error::{DataFusionError, Result};
use crate::physical_plan::{
DisplayFormatType, EquivalenceProperties, ExecutionPlan, Partitioning,
};

use super::SendableRecordBatchStream;
use crate::execution::context::TaskContext;
use crate::physical_plan::common::spawn_execution;

/// Merge execution plan executes partitions in parallel and combines them into a single
/// partition. No guarantees are made about the order of the resulting partition.
Expand Down Expand Up @@ -138,28 +131,17 @@ impl ExecutionPlan for CoalescePartitionsExec {
// use a stream that allows each sender to put in at
// least one result in an attempt to maximize
// parallelism.
let (sender, receiver) =
mpsc::channel::<Result<RecordBatch>>(input_partitions);
let mut builder =
RecordBatchReceiverStream::builder(self.schema(), input_partitions);

// spawn independent tasks whose resulting streams (of batches)
// are sent to the channel for consumption.
let mut tasks = JoinSet::new();
for part_i in 0..input_partitions {
spawn_execution(
&mut tasks,
self.input.clone(),
sender.clone(),
part_i,
context.clone(),
);
builder.run_input(self.input.clone(), part_i, context.clone());
}

Ok(Box::pin(MergeStream {
input: receiver,
schema: self.schema(),
baseline_metrics,
tasks,
}))
let stream = builder.build();
return Ok(Box::pin(ObservedStream::new(stream, baseline_metrics)));
}
}
}
Expand All @@ -185,53 +167,6 @@ impl ExecutionPlan for CoalescePartitionsExec {
}
}

struct MergeStream {
schema: SchemaRef,
input: mpsc::Receiver<Result<RecordBatch>>,
baseline_metrics: BaselineMetrics,
tasks: JoinSet<()>,
}

impl Stream for MergeStream {
type Item = Result<RecordBatch>;

fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
let poll = self.input.poll_recv(cx);

// If the input stream is done, wait for all tasks to finish and return
// the failure if any.
if let Poll::Ready(None) = poll {
let fut = self.tasks.join_next();
tokio::pin!(fut);

match fut.poll(cx) {
Poll::Ready(task_poll) => {
if let Some(Err(e)) = task_poll {
if e.is_panic() {
panic::resume_unwind(e.into_panic());
}
return Poll::Ready(Some(Err(DataFusionError::Execution(
format!("{e:?}"),
))));
}
}
Poll::Pending => {}
}
}

self.baseline_metrics.record_poll(poll)
}
}

impl RecordBatchStream for MergeStream {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
}

#[cfg(test)]
mod tests {

Expand Down
59 changes: 10 additions & 49 deletions datafusion/core/src/physical_plan/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,23 @@
use super::SendableRecordBatchStream;
use crate::error::{DataFusionError, Result};
use crate::execution::context::TaskContext;
use crate::execution::memory_pool::MemoryReservation;
use crate::physical_plan::stream::RecordBatchReceiverStream;
use crate::physical_plan::{displayable, ColumnStatistics, ExecutionPlan, Statistics};
use crate::physical_plan::{ColumnStatistics, ExecutionPlan, Statistics};
use arrow::datatypes::Schema;
use arrow::ipc::writer::{FileWriter, IpcWriteOptions};
use arrow::record_batch::RecordBatch;
use datafusion_physical_expr::expressions::{BinaryExpr, Column};
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};
use futures::{Future, StreamExt, TryStreamExt};
use log::debug;
use parking_lot::Mutex;
use pin_project_lite::pin_project;
use std::fs;
use std::fs::{metadata, File};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::task::{Context, Poll};
use tokio::sync::mpsc;
use tokio::task::{JoinHandle, JoinSet};
use tokio::task::JoinHandle;

/// [`MemoryReservation`] used across query execution streams
pub(crate) type SharedMemoryReservation = Arc<Mutex<MemoryReservation>>;
Expand Down Expand Up @@ -96,66 +93,30 @@ fn build_file_list_recurse(
Ok(())
}

/// Spawns a task to the tokio threadpool and writes its outputs to the provided mpsc sender
pub(crate) fn spawn_execution(
join_set: &mut JoinSet<()>,
input: Arc<dyn ExecutionPlan>,
output: mpsc::Sender<Result<RecordBatch>>,
partition: usize,
context: Arc<TaskContext>,
) {
join_set.spawn(async move {
let mut stream = match input.execute(partition, context) {
Err(e) => {
// If send fails, plan being torn down,
// there is no place to send the error.
output.send(Err(e)).await.ok();
debug!(
"Stopping execution: error executing input: {}",
displayable(input.as_ref()).one_line()
);
return;
}
Ok(stream) => stream,
};

while let Some(item) = stream.next().await {
// If send fails, plan being torn down,
// there is no place to send the error.
if output.send(item).await.is_err() {
debug!(
"Stopping execution: output is gone, plan cancelling: {}",
displayable(input.as_ref()).one_line()
);
return;
}
}
});
}

/// If running in a tokio context spawns the execution of `stream` to a separate task
/// allowing it to execute in parallel with an intermediate buffer of size `buffer`
pub(crate) fn spawn_buffered(
mut input: SendableRecordBatchStream,
buffer: usize,
) -> SendableRecordBatchStream {
// Use tokio only if running from a tokio context (#2201)
let handle = match tokio::runtime::Handle::try_current() {
Ok(handle) => handle,
Err(_) => return input,
if let Err(_) = tokio::runtime::Handle::try_current() {
return input;
};

let schema = input.schema();
let (sender, receiver) = mpsc::channel(buffer);
let join = handle.spawn(async move {
let mut builder = RecordBatchReceiverStream::builder(input.schema(), buffer);

let sender = builder.tx();

builder.spawn(async move {
while let Some(item) = input.next().await {
if sender.send(item).await.is_err() {
return;
}
}
});

RecordBatchReceiverStream::create(&schema, receiver, join)
builder.build()
}

/// Computes the statistics for an in-memory RecordBatch
Expand Down
16 changes: 7 additions & 9 deletions datafusion/core/src/physical_plan/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ use std::io::BufReader;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tempfile::NamedTempFile;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::sync::mpsc::Sender;
use tokio::task;

struct ExternalSorterMetrics {
Expand Down Expand Up @@ -373,18 +373,16 @@ fn read_spill_as_stream(
path: NamedTempFile,
schema: SchemaRef,
) -> Result<SendableRecordBatchStream> {
let (sender, receiver): (Sender<Result<RecordBatch>>, Receiver<Result<RecordBatch>>) =
tokio::sync::mpsc::channel(2);
let join_handle = task::spawn_blocking(move || {
let mut builder = RecordBatchReceiverStream::builder(schema, 2);
let sender = builder.tx();

builder.spawn_blocking(move || {
if let Err(e) = read_spill(sender, path.path()) {
error!("Failure while reading spill file: {:?}. Error: {}", path, e);
}
});
Ok(RecordBatchReceiverStream::create(
&schema,
receiver,
join_handle,
))

Ok(builder.build())
}

fn write_sorted(
Expand Down
13 changes: 6 additions & 7 deletions datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -792,21 +792,20 @@ mod tests {
let mut streams = Vec::with_capacity(partition_count);

for partition in 0..partition_count {
let (sender, receiver) = tokio::sync::mpsc::channel(1);
let mut builder = RecordBatchReceiverStream::builder(schema.clone(), 1);

let sender = builder.tx();

let mut stream = batches.execute(partition, task_ctx.clone()).unwrap();
let join_handle = tokio::spawn(async move {
builder.spawn(async move {
while let Some(batch) = stream.next().await {
sender.send(batch).await.unwrap();
// This causes the MergeStream to wait for more input
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
}
});

streams.push(RecordBatchReceiverStream::create(
&schema,
receiver,
join_handle,
));
streams.push(builder.build());
}

let metrics = ExecutionPlanMetricsSet::new();
Expand Down
Loading

0 comments on commit fe8b82d

Please sign in to comment.