Skip to content

Commit

Permalink
consolidate
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed May 31, 2023
1 parent 0a96357 commit d887201
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 82 deletions.
33 changes: 9 additions & 24 deletions datafusion/core/src/physical_plan/analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,9 @@ use crate::{
};
use arrow::{array::StringBuilder, datatypes::SchemaRef, record_batch::RecordBatch};
use futures::StreamExt;
use tokio::task::JoinSet;

use super::expressions::PhysicalSortExpr;
use super::stream::RecordBatchStreamAdapter;
use super::stream::{RecordBatchReceiverStreamBuilder, RecordBatchStreamAdapter};
use super::{Distribution, SendableRecordBatchStream};
use crate::execution::context::TaskContext;

Expand Down Expand Up @@ -121,23 +120,15 @@ impl ExecutionPlan for AnalyzeExec {
// Gather futures that will run each input partition in
// parallel (on a separate tokio task) using a JoinSet to
// cancel outstanding futures on drop
let mut set = JoinSet::new();
let num_input_partitions = self.input.output_partitioning().partition_count();
let mut builder =
RecordBatchReceiverStreamBuilder::new(self.schema(), num_input_partitions);

for input_partition in 0..num_input_partitions {
let input_stream = self.input.execute(input_partition, context.clone());

set.spawn(async move {
let mut total_rows = 0;
let mut input_stream = input_stream?;
while let Some(batch) = input_stream.next().await {
let batch = batch?;
total_rows += batch.num_rows();
}
Ok(total_rows) as Result<usize>
});
builder.run_input(self.input.clone(), input_partition, context.clone());
}

// Create future that computes thefinal output
let start = Instant::now();
let captured_input = self.input.clone();
let captured_schema = self.schema.clone();
Expand All @@ -146,18 +137,12 @@ impl ExecutionPlan for AnalyzeExec {
// future that gathers the results from all the tasks in the
// JoinSet that computes the overall row count and final
// record batch
let mut input_stream = builder.build();
let output = async move {
let mut total_rows = 0;
while let Some(res) = set.join_next().await {
// translate join errors (aka task panic's) into ExecutionErrors
match res {
Ok(row_count) => total_rows += row_count?,
Err(e) => {
return Err(DataFusionError::Execution(format!(
"Join error in AnalyzeExec: {e}"
)))
}
}
while let Some(batch) = input_stream.next().await {
let batch = batch?;
total_rows += batch.num_rows();
}

let duration = Instant::now() - start;
Expand Down
10 changes: 1 addition & 9 deletions datafusion/core/src/physical_plan/coalesce_partitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ use crate::physical_plan::{

use super::SendableRecordBatchStream;
use crate::execution::context::TaskContext;
use crate::physical_plan::common::spawn_execution;

/// Merge execution plan executes partitions in parallel and combines them into a single
/// partition. No guarantees are made about the order of the resulting partition.
Expand Down Expand Up @@ -140,14 +139,7 @@ impl ExecutionPlan for CoalescePartitionsExec {
// spawn independent tasks whose resulting streams (of batches)
// are sent to the channel for consumption.
for part_i in 0..input_partitions {
let sender = builder.tx();
spawn_execution(
builder.join_set_mut(),
self.input.clone(),
sender,
part_i,
context.clone(),
);
builder.run_input(self.input.clone(), part_i, context.clone());
}

Ok(builder.build())
Expand Down
43 changes: 2 additions & 41 deletions datafusion/core/src/physical_plan/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,15 @@
use super::SendableRecordBatchStream;
use crate::error::{DataFusionError, Result};
use crate::execution::context::TaskContext;
use crate::execution::memory_pool::MemoryReservation;
use crate::physical_plan::stream::RecordBatchReceiverStream;
use crate::physical_plan::{displayable, ColumnStatistics, ExecutionPlan, Statistics};
use crate::physical_plan::{ColumnStatistics, ExecutionPlan, Statistics};
use arrow::datatypes::Schema;
use arrow::ipc::writer::{FileWriter, IpcWriteOptions};
use arrow::record_batch::RecordBatch;
use datafusion_physical_expr::expressions::{BinaryExpr, Column};
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};
use futures::{Future, StreamExt, TryStreamExt};
use log::debug;
use parking_lot::Mutex;
use pin_project_lite::pin_project;
use std::fs;
Expand All @@ -38,7 +36,7 @@ use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::task::{Context, Poll};
use tokio::sync::mpsc;
use tokio::task::{JoinHandle, JoinSet};
use tokio::task::JoinHandle;

/// [`MemoryReservation`] used across query execution streams
pub(crate) type SharedMemoryReservation = Arc<Mutex<MemoryReservation>>;
Expand Down Expand Up @@ -96,43 +94,6 @@ fn build_file_list_recurse(
Ok(())
}

/// Spawns a task to the tokio threadpool and writes its outputs to the provided mpsc sender
pub(crate) fn spawn_execution(
join_set: &mut JoinSet<()>,
input: Arc<dyn ExecutionPlan>,
output: mpsc::Sender<Result<RecordBatch>>,
partition: usize,
context: Arc<TaskContext>,
) {
join_set.spawn(async move {
let mut stream = match input.execute(partition, context) {
Err(e) => {
// If send fails, plan being torn down,
// there is no place to send the error.
output.send(Err(e)).await.ok();
debug!(
"Stopping execution: error executing input: {}",
displayable(input.as_ref()).one_line()
);
return;
}
Ok(stream) => stream,
};

while let Some(item) = stream.next().await {
// If send fails, plan being torn down,
// there is no place to send the error.
if output.send(item).await.is_err() {
debug!(
"Stopping execution: output is gone, plan cancelling: {}",
displayable(input.as_ref()).one_line()
);
return;
}
}
});
}

/// If running in a tokio context spawns the execution of `stream` to a separate task
/// allowing it to execute in parallel with an intermediate buffer of size `buffer`
pub(crate) fn spawn_buffered(
Expand Down
56 changes: 48 additions & 8 deletions datafusion/core/src/physical_plan/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,26 @@

//! Stream wrappers for physical operators
use std::sync::Arc;

use crate::error::Result;
use crate::physical_plan::displayable;
use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
use datafusion_common::DataFusionError;
use datafusion_execution::TaskContext;
use futures::stream::BoxStream;
use futures::{Future, Stream, StreamExt};
use log::debug;
use pin_project_lite::pin_project;
use tokio::task::{JoinHandle, JoinSet};
use tokio_stream::wrappers::ReceiverStream;

use super::common::AbortOnDropSingle;
use super::{RecordBatchStream, SendableRecordBatchStream};
use super::{ExecutionPlan, RecordBatchStream, SendableRecordBatchStream};

/// Builder for [`RecordBatchReceiverStream`]
pub struct RecordBatchReceiverStreamBuilder {
/// Builder for [`RecordBatchReceiverStream`] that propagates errors
/// and panic's correctly.
pub(crate) struct RecordBatchReceiverStreamBuilder {
tx: tokio::sync::mpsc::Sender<Result<RecordBatch>>,
rx: tokio::sync::mpsc::Receiver<Result<RecordBatch>>,
schema: SchemaRef,
Expand All @@ -55,11 +61,6 @@ impl RecordBatchReceiverStreamBuilder {
self.tx.clone()
}

/// Get a handle to the `JoinSet` on which tasks are launched
pub fn join_set_mut(&mut self) -> &mut JoinSet<()> {
&mut self.join_set
}

/// Spawn task that will be aborted if this builder (or the stream
/// built from it) are dropped
///
Expand All @@ -73,6 +74,45 @@ impl RecordBatchReceiverStreamBuilder {
self.join_set.spawn(task);
}

/// runs the input_partition of the `input` ExecutionPlan on the
/// tokio threadpool and writes its outputs to this stream
pub(crate) fn run_input(
&mut self,
input: Arc<dyn ExecutionPlan>,
partition: usize,
context: Arc<TaskContext>,
) {
let output = self.tx();

self.spawn(async move {
let mut stream = match input.execute(partition, context) {
Err(e) => {
// If send fails, plan being torn down,
// there is no place to send the error.
output.send(Err(e)).await.ok();
debug!(
"Stopping execution: error executing input: {}",
displayable(input.as_ref()).one_line()
);
return;
}
Ok(stream) => stream,
};

while let Some(item) = stream.next().await {
// If send fails, plan being torn down,
// there is no place to send the error.
if output.send(item).await.is_err() {
debug!(
"Stopping execution: output is gone, plan cancelling: {}",
displayable(input.as_ref()).one_line()
);
return;
}
}
});
}

/// Create a stream of all `RecordBatch`es written to `tx`
pub fn build(self) -> SendableRecordBatchStream {
let Self {
Expand Down

0 comments on commit d887201

Please sign in to comment.