Skip to content

Commit

Permalink
Minor: remove tokio_stream dependency
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Jun 6, 2023
1 parent 39ee59a commit 9f1f104
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 11 deletions.
1 change: 0 additions & 1 deletion datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ smallvec = { version = "1.6", features = ["union"] }
sqlparser = { version = "0.34", features = ["visitor"] }
tempfile = "3"
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync", "fs", "parking_lot"] }
tokio-stream = "0.1"
tokio-util = { version = "0.7.4", features = ["io"] }
url = "2.2"
uuid = { version = "1.0", features = ["v4"] }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,8 +231,7 @@ mod tests {
use arrow::compute::SortOptions;
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
use futures::FutureExt;
use tokio_stream::StreamExt;
use futures::{FutureExt, StreamExt};

use crate::arrow::array::{Int32Array, StringArray, TimestampNanosecondArray};
use crate::from_slice::FromSlice;
Expand Down
20 changes: 14 additions & 6 deletions datafusion/core/src/physical_plan/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ use log::debug;
use pin_project_lite::pin_project;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::task::JoinSet;
use tokio_stream::wrappers::ReceiverStream;

use super::metrics::BaselineMetrics;
use super::{ExecutionPlan, RecordBatchStream, SendableRecordBatchStream};
Expand Down Expand Up @@ -192,18 +191,27 @@ impl RecordBatchReceiverStreamBuilder {
// unwrap Option / only return the error
.filter_map(|item| async move { item });

// Convert the receiver into a stream
let rx_stream = futures::stream::unfold(rx, |mut rx| async move {
let next_item = rx.recv().await;
next_item.map(|next_item| (next_item, rx))
});

// Merge the streams together so whichever is ready first
// produces the batch
let inner =
futures::stream::select(ReceiverStream::new(rx), check_stream).boxed();
let inner = futures::stream::select(rx_stream, check_stream).boxed();

Box::pin(RecordBatchReceiverStream { schema, inner })
}
}

/// Adapter for a tokio [`ReceiverStream`] that implements the
/// [`SendableRecordBatchStream`] interface and propagates panics and
/// errors. Use [`Self::builder`] to construct one.
/// A [`SendableRecordBatchStream`] that combines [`RecordBatch`]es from multiple inputs,
/// on new tokio Tasks, increasing the potential parallelism.
///
/// This structure also handles propagating panics and cancelling the
/// underlying tasks correctly.
///
/// Use [`Self::builder`] to construct one.
pub struct RecordBatchReceiverStream {
schema: SchemaRef,
inner: BoxStream<'static, Result<RecordBatch>>,
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/tests/parquet/page_pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ use datafusion_common::{ScalarValue, Statistics, ToDFSchema};
use datafusion_expr::{col, lit, Expr};
use datafusion_physical_expr::create_physical_expr;
use datafusion_physical_expr::execution_props::ExecutionProps;
use futures::StreamExt;
use object_store::path::Path;
use object_store::ObjectMeta;
use tokio_stream::StreamExt;

async fn get_parquet_exec(state: &SessionState, filter: Expr) -> ParquetExec {
let object_store_url = ObjectStoreUrl::local_filesystem();
Expand Down

0 comments on commit 9f1f104

Please sign in to comment.