From a914d06851e5d3f4f0d0d8f7dd06c0ddef05ff9e Mon Sep 17 00:00:00 2001 From: Stephen Carman Date: Mon, 19 Feb 2024 16:10:03 -0500 Subject: [PATCH 1/6] feat: Logical Node for find files --- .../delta_datafusion/find_files/logical.rs | 47 +++++ .../src/delta_datafusion/find_files/mod.rs | 160 ++++++++++++++++++ .../delta_datafusion/find_files/physical.rs | 124 ++++++++++++++ crates/core/src/delta_datafusion/mod.rs | 5 +- crates/core/src/table/state_arrow.rs | 8 +- 5 files changed, 339 insertions(+), 5 deletions(-) create mode 100644 crates/core/src/delta_datafusion/find_files/logical.rs create mode 100644 crates/core/src/delta_datafusion/find_files/mod.rs create mode 100644 crates/core/src/delta_datafusion/find_files/physical.rs diff --git a/crates/core/src/delta_datafusion/find_files/logical.rs b/crates/core/src/delta_datafusion/find_files/logical.rs new file mode 100644 index 0000000000..7b6c6d8b83 --- /dev/null +++ b/crates/core/src/delta_datafusion/find_files/logical.rs @@ -0,0 +1,47 @@ +use std::collections::HashSet; + +use datafusion_common::DFSchemaRef; +use datafusion_expr::{Expr, LogicalPlan, UserDefinedLogicalNodeCore}; + +#[derive(Debug, Hash, Eq, PartialEq, Clone)] +pub struct FindFilesNode { + pub id: String, + pub input: LogicalPlan, + pub predicates: Vec, + pub files: Vec, + pub schema: DFSchemaRef, +} + +impl UserDefinedLogicalNodeCore for FindFilesNode { + fn name(&self) -> &str { + "FindFiles" + } + + fn inputs(&self) -> Vec<&LogicalPlan> { + vec![] + } + + fn schema(&self) -> &DFSchemaRef { + &self.schema + } + + fn expressions(&self) -> Vec { + vec![] + } + + fn prevent_predicate_push_down_columns(&self) -> HashSet { + HashSet::new() + } + + fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!( + f, + "FindFiles id={}, predicate={:?}, files={:?}", + &self.id, self.predicates, self.files + ) + } + + fn from_template(&self, _exprs: &[Expr], _inputs: &[LogicalPlan]) -> Self { + self.clone() + } +} diff --git a/crates/core/src/delta_datafusion/find_files/mod.rs b/crates/core/src/delta_datafusion/find_files/mod.rs new file mode 100644 index 0000000000..345616f820 --- /dev/null +++ b/crates/core/src/delta_datafusion/find_files/mod.rs @@ -0,0 +1,160 @@ +use std::sync::Arc; + +use arrow_array::{RecordBatch, StringArray}; +use arrow_array::cast::AsArray; +use arrow_schema::{DataType, Field, Schema}; +use arrow_schema::SchemaBuilder; +use async_trait::async_trait; +use datafusion::execution::context::{QueryPlanner, SessionState}; +use datafusion::physical_plan::ExecutionPlan; +use datafusion::physical_planner::{DefaultPhysicalPlanner, ExtensionPlanner, PhysicalPlanner}; +use datafusion::prelude::{ParquetReadOptions, SessionContext}; +use datafusion_common::Result; +use datafusion_expr::{Expr, LogicalPlan, UserDefinedLogicalNode}; + +use crate::delta_datafusion::find_files::logical::FindFilesNode; +use crate::delta_datafusion::find_files::physical::FindFilesExec; +use crate::delta_datafusion::PATH_COLUMN; + +pub mod logical; +pub mod physical; + +#[inline] +fn only_file_path_schema() -> Arc { + let mut builder = SchemaBuilder::new(); + builder.push(Field::new(PATH_COLUMN, DataType::Utf8, false)); + Arc::new(builder.finish()) +} + +struct FindFilesPlannerExtension {} + +struct FindFilesPlanner {} + +#[async_trait] +impl ExtensionPlanner for FindFilesPlannerExtension { + async fn plan_extension( + &self, + _planner: &dyn PhysicalPlanner, + node: &dyn UserDefinedLogicalNode, + _logical_inputs: &[&LogicalPlan], + _physical_inputs: &[Arc], + _session_state: &SessionState, + ) -> Result>> { + if let Some(node) = node.as_any().downcast_ref::() { + dbg!(&node.files, &node.predicates); + let schema = Arc::new(Schema::from(node.schema.as_ref())); + + return Ok(Some(Arc::new(FindFilesExec::new( + node.files.clone(), + node.predicates[0].clone(), + schema, + )?))); + } + Ok(None) + } +} + +#[async_trait] +impl QueryPlanner for FindFilesPlanner { + async fn create_physical_plan( + &self, + logical_plan: &LogicalPlan, + session_state: &SessionState, + ) -> Result> { + let planner = Arc::new(Box::new(DefaultPhysicalPlanner::with_extension_planners( + vec![Arc::new(FindFilesPlannerExtension {})], + ))); + planner + .create_physical_plan(logical_plan, session_state) + .await + } +} + +async fn scan_memory_table_batch(batch: RecordBatch, predicate: Expr) -> Result { + let ctx = SessionContext::new(); + let mut batches = vec![]; + + if let Some(column) = batch.column_by_name(PATH_COLUMN) { + let mut column_iter = column.as_string::().into_iter(); + while let Some(Some(row)) = column_iter.next() { + let df = ctx + .read_parquet(row, ParquetReadOptions::default()) + .await? + .filter(predicate.to_owned())?; + if df.count().await? > 0 { + batches.push(row); + } + } + } + let str_array = Arc::new(StringArray::from(batches)); + RecordBatch::try_new(only_file_path_schema(), vec![str_array]).map_err(Into::into) +} + +#[cfg(test)] +pub mod tests { + use std::sync::Arc; + + use arrow_cast::pretty::print_batches; + use arrow_schema::{DataType, Field, Fields, Schema, SchemaBuilder}; + use datafusion::prelude::{DataFrame, SessionContext}; + use datafusion_common::ToDFSchema; + use datafusion_expr::{col, Extension, lit, LogicalPlan, LogicalPlanBuilder}; + + use crate::{DeltaOps, DeltaResult}; + use crate::delta_datafusion::find_files::FindFilesPlanner; + use crate::delta_datafusion::find_files::logical::FindFilesNode; + use crate::delta_datafusion::PATH_COLUMN; + use crate::operations::collect_sendable_stream; + use crate::writer::test_utils::{create_bare_table, get_record_batch}; + + #[inline] + fn find_files_schema(fields: &Fields) -> Arc { + let mut builder = SchemaBuilder::from(fields); + builder.reverse(); + builder.push(Field::new(PATH_COLUMN, DataType::Utf8, false)); + builder.reverse(); + Arc::new(builder.finish()) + } + + async fn make_table() -> DeltaOps { + let batch = get_record_batch(None, false); + let write = DeltaOps(create_bare_table()) + .write(vec![batch.clone()]) + .await + .unwrap(); + DeltaOps(write) + } + + #[tokio::test] + pub async fn test_find_files() -> DeltaResult<()> { + let ctx = SessionContext::new(); + let state = ctx + .state() + .with_query_planner(Arc::new(FindFilesPlanner {})); + let table = make_table().await; + let files = table.0.get_file_uris()?.collect::>(); + let plan = LogicalPlanBuilder::empty(false).build()?; + + let schema = find_files_schema(table.0.snapshot()?.arrow_schema()?.fields()).to_dfschema_ref()?; + let find_files_node = LogicalPlan::Extension(Extension { + node: Arc::new(FindFilesNode { + id: "my_cool_id".to_string(), + input: plan, + predicates: vec![col("id").eq(lit("A"))], + files, + schema, + }), + }); + let df = DataFrame::new(state.clone(), find_files_node); + let p = state + .clone() + .create_physical_plan(df.logical_plan()) + .await + .unwrap(); + + let e = p.execute(0, state.task_ctx())?; + let s = collect_sendable_stream(e).await.unwrap(); + print_batches(&s)?; + Ok(()) + } +} diff --git a/crates/core/src/delta_datafusion/find_files/physical.rs b/crates/core/src/delta_datafusion/find_files/physical.rs new file mode 100644 index 0000000000..af2ac7b04f --- /dev/null +++ b/crates/core/src/delta_datafusion/find_files/physical.rs @@ -0,0 +1,124 @@ +use std::any::Any; +use std::fmt::{Debug, Formatter}; + +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; + +use arrow_array::{RecordBatch, StringArray}; +use arrow_schema::SchemaRef; +use datafusion::error::Result; +use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext}; +use datafusion::physical_plan::memory::MemoryStream; +use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan}; +use datafusion_expr::Expr; +use datafusion_physical_expr::{Partitioning, PhysicalSortExpr}; +use futures::stream::BoxStream; +use futures::{Stream, StreamExt, TryStreamExt}; + +use crate::delta_datafusion::find_files::{only_file_path_schema, scan_memory_table_batch}; + +pub struct FindFilesExec { + schema: SchemaRef, + files: Vec, + predicate: Expr, +} + +impl FindFilesExec { + pub fn new(files: Vec, predicate: Expr, schema: SchemaRef) -> Result { + Ok(Self { + schema, + files, + predicate, + }) + } +} + +struct FindFilesStream<'a> { + mem_stream: BoxStream<'a, Result>, +} + +impl<'a> FindFilesStream<'a> { + pub fn new(mem_stream: BoxStream<'a, Result>) -> Result { + Ok(Self { mem_stream }) + } +} + +impl<'a> RecordBatchStream for FindFilesStream<'a> { + fn schema(&self) -> SchemaRef { + only_file_path_schema() + } +} + +impl<'a> Stream for FindFilesStream<'a> { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.as_mut().mem_stream.poll_next_unpin(cx) + } +} + +impl Debug for FindFilesExec { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "FindFilesExec[schema={:?}, files={:?}]", + self.schema, self.files + ) + } +} + +impl DisplayAs for FindFilesExec { + fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { + write!( + f, + "FindFilesExec[schema={:?}, files={:?}]", + self.schema, self.files + ) + } +} + +impl ExecutionPlan for FindFilesExec { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + fn output_partitioning(&self) -> Partitioning { + Partitioning::UnknownPartitioning(0) + } + + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + None + } + + fn children(&self) -> Vec> { + vec![] + } + + fn with_new_children( + self: Arc, + _children: Vec>, + ) -> Result> { + Ok(self) + } + + fn execute( + &self, + _partition: usize, + _context: Arc, + ) -> Result { + let array = Arc::new(StringArray::from(self.files.clone())); + let record_batch = RecordBatch::try_new(only_file_path_schema(), vec![array])?; + let predicate = self.predicate.clone(); + let mem_stream = + MemoryStream::try_new(vec![record_batch.clone()], only_file_path_schema(), None)? + .and_then(move |batch| scan_memory_table_batch(batch, predicate.clone())) + .boxed(); + + Ok(Box::pin(FindFilesStream::new(mem_stream)?)) + } +} diff --git a/crates/core/src/delta_datafusion/mod.rs b/crates/core/src/delta_datafusion/mod.rs index 6ea60a0bda..3ebfb1373d 100644 --- a/crates/core/src/delta_datafusion/mod.rs +++ b/crates/core/src/delta_datafusion/mod.rs @@ -87,6 +87,8 @@ pub mod expr; pub mod logical; pub mod physical; +mod find_files; + impl From for DataFusionError { fn from(err: DeltaTableError) -> Self { match err { @@ -1145,6 +1147,7 @@ impl TreeNodeVisitor for FindFilesExprProperties { } } +#[derive(Debug, Hash, Eq, PartialEq)] /// Representing the result of the [find_files] function. pub struct FindFiles { /// A list of `Add` objects that match the given predicate @@ -1198,7 +1201,7 @@ fn join_batches_with_add_actions( Ok(files) } -/// Determine which files contain a record that statisfies the predicate +/// Determine which files contain a record that satisfies the predicate pub(crate) async fn find_files_scan<'a>( snapshot: &DeltaTableState, log_store: LogStoreRef, diff --git a/crates/core/src/table/state_arrow.rs b/crates/core/src/table/state_arrow.rs index 143ab23d1c..5aae2bdb79 100644 --- a/crates/core/src/table/state_arrow.rs +++ b/crates/core/src/table/state_arrow.rs @@ -91,7 +91,7 @@ impl DeltaTableState { .fields .iter() .map(|field| Cow::Owned(field.name().clone())) - .zip(partition_cols_batch.columns().iter().map(Arc::clone)), + .zip(partition_cols_batch.columns().iter().cloned()), ) } @@ -103,7 +103,7 @@ impl DeltaTableState { .fields .iter() .map(|field| Cow::Owned(field.name().clone())) - .zip(stats.columns().iter().map(Arc::clone)), + .zip(stats.columns().iter().cloned()), ); } if files.iter().any(|add| add.deletion_vector.is_some()) { @@ -114,7 +114,7 @@ impl DeltaTableState { .fields .iter() .map(|field| Cow::Owned(field.name().clone())) - .zip(delvs.columns().iter().map(Arc::clone)), + .zip(delvs.columns().iter().cloned()), ); } if files.iter().any(|add| { @@ -129,7 +129,7 @@ impl DeltaTableState { .fields .iter() .map(|field| Cow::Owned(field.name().clone())) - .zip(tags.columns().iter().map(Arc::clone)), + .zip(tags.columns().iter().cloned()), ); } From 640b74458bd2d5d071b4941aa86d893ff7af726a Mon Sep 17 00:00:00 2001 From: Stephen Carman Date: Fri, 23 Feb 2024 14:34:18 -0500 Subject: [PATCH 2/6] feat: Logical Node for find files --- .../delta_datafusion/find_files/logical.rs | 46 ++++++++++++++++--- .../src/delta_datafusion/find_files/mod.rs | 38 ++++++--------- .../delta_datafusion/find_files/physical.rs | 23 ++-------- crates/core/src/kernel/snapshot/log_data.rs | 5 +- 4 files changed, 61 insertions(+), 51 deletions(-) diff --git a/crates/core/src/delta_datafusion/find_files/logical.rs b/crates/core/src/delta_datafusion/find_files/logical.rs index 7b6c6d8b83..81aa2b264b 100644 --- a/crates/core/src/delta_datafusion/find_files/logical.rs +++ b/crates/core/src/delta_datafusion/find_files/logical.rs @@ -1,15 +1,47 @@ use std::collections::HashSet; +use std::path::Path; -use datafusion_common::DFSchemaRef; +use datafusion_common::{DFSchemaRef, ToDFSchema}; use datafusion_expr::{Expr, LogicalPlan, UserDefinedLogicalNodeCore}; +use crate::delta_datafusion::find_files::only_file_path_schema; +use crate::kernel::EagerSnapshot; + #[derive(Debug, Hash, Eq, PartialEq, Clone)] pub struct FindFilesNode { - pub id: String, - pub input: LogicalPlan, - pub predicates: Vec, - pub files: Vec, - pub schema: DFSchemaRef, + id: String, + predicate: Expr, + files: Vec, + schema: DFSchemaRef, +} + +impl FindFilesNode { + pub fn new( + id: String, + eager_snapshot: EagerSnapshot, + predicate: Expr, + ) -> datafusion_common::Result { + let files: Vec = eager_snapshot + .files() + .map(|f| f.object_store_path().to_string()) + .collect(); + + + Ok(Self { + id, + predicate, + files, + schema: only_file_path_schema().to_dfschema_ref()?, + }) + } + + pub fn predicate(&self) -> &Expr { + &self.predicate + } + + pub fn files(&self) -> Vec { + self.files.clone() + } } impl UserDefinedLogicalNodeCore for FindFilesNode { @@ -37,7 +69,7 @@ impl UserDefinedLogicalNodeCore for FindFilesNode { write!( f, "FindFiles id={}, predicate={:?}, files={:?}", - &self.id, self.predicates, self.files + &self.id, self.predicate, self.files ) } diff --git a/crates/core/src/delta_datafusion/find_files/mod.rs b/crates/core/src/delta_datafusion/find_files/mod.rs index 345616f820..6ad257664d 100644 --- a/crates/core/src/delta_datafusion/find_files/mod.rs +++ b/crates/core/src/delta_datafusion/find_files/mod.rs @@ -1,9 +1,9 @@ use std::sync::Arc; -use arrow_array::{RecordBatch, StringArray}; use arrow_array::cast::AsArray; -use arrow_schema::{DataType, Field, Schema}; +use arrow_array::{RecordBatch, StringArray}; use arrow_schema::SchemaBuilder; +use arrow_schema::{DataType, Field, Schema}; use async_trait::async_trait; use datafusion::execution::context::{QueryPlanner, SessionState}; use datafusion::physical_plan::ExecutionPlan; @@ -40,14 +40,10 @@ impl ExtensionPlanner for FindFilesPlannerExtension { _physical_inputs: &[Arc], _session_state: &SessionState, ) -> Result>> { - if let Some(node) = node.as_any().downcast_ref::() { - dbg!(&node.files, &node.predicates); - let schema = Arc::new(Schema::from(node.schema.as_ref())); - + if let Some(find_files_node) = node.as_any().downcast_ref::() { return Ok(Some(Arc::new(FindFilesExec::new( - node.files.clone(), - node.predicates[0].clone(), - schema, + find_files_node.files(), + find_files_node.predicate().clone(), )?))); } Ok(None) @@ -97,15 +93,14 @@ pub mod tests { use arrow_cast::pretty::print_batches; use arrow_schema::{DataType, Field, Fields, Schema, SchemaBuilder}; use datafusion::prelude::{DataFrame, SessionContext}; - use datafusion_common::ToDFSchema; - use datafusion_expr::{col, Extension, lit, LogicalPlan, LogicalPlanBuilder}; + use datafusion_expr::{col, lit, Extension, LogicalPlan}; - use crate::{DeltaOps, DeltaResult}; - use crate::delta_datafusion::find_files::FindFilesPlanner; use crate::delta_datafusion::find_files::logical::FindFilesNode; + use crate::delta_datafusion::find_files::FindFilesPlanner; use crate::delta_datafusion::PATH_COLUMN; use crate::operations::collect_sendable_stream; use crate::writer::test_utils::{create_bare_table, get_record_batch}; + use crate::{DeltaOps, DeltaResult}; #[inline] fn find_files_schema(fields: &Fields) -> Arc { @@ -132,18 +127,13 @@ pub mod tests { .state() .with_query_planner(Arc::new(FindFilesPlanner {})); let table = make_table().await; - let files = table.0.get_file_uris()?.collect::>(); - let plan = LogicalPlanBuilder::empty(false).build()?; - - let schema = find_files_schema(table.0.snapshot()?.arrow_schema()?.fields()).to_dfschema_ref()?; + table.0.get_file_uris()?.for_each(|f| println!("{:?}", f)); let find_files_node = LogicalPlan::Extension(Extension { - node: Arc::new(FindFilesNode { - id: "my_cool_id".to_string(), - input: plan, - predicates: vec![col("id").eq(lit("A"))], - files, - schema, - }), + node: Arc::new(FindFilesNode::new( + "my_cool_plan".into(), + table.0.snapshot()?.snapshot.clone(), + col("id").eq(lit("A")), + )?), }); let df = DataFrame::new(state.clone(), find_files_node); let p = state diff --git a/crates/core/src/delta_datafusion/find_files/physical.rs b/crates/core/src/delta_datafusion/find_files/physical.rs index af2ac7b04f..f489a42d47 100644 --- a/crates/core/src/delta_datafusion/find_files/physical.rs +++ b/crates/core/src/delta_datafusion/find_files/physical.rs @@ -19,18 +19,13 @@ use futures::{Stream, StreamExt, TryStreamExt}; use crate::delta_datafusion::find_files::{only_file_path_schema, scan_memory_table_batch}; pub struct FindFilesExec { - schema: SchemaRef, files: Vec, predicate: Expr, } impl FindFilesExec { - pub fn new(files: Vec, predicate: Expr, schema: SchemaRef) -> Result { - Ok(Self { - schema, - files, - predicate, - }) + pub fn new(files: Vec, predicate: Expr) -> Result { + Ok(Self { files, predicate }) } } @@ -60,21 +55,13 @@ impl<'a> Stream for FindFilesStream<'a> { impl Debug for FindFilesExec { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!( - f, - "FindFilesExec[schema={:?}, files={:?}]", - self.schema, self.files - ) + write!(f, "FindFilesExec[schema={:?}, files={:?}]", 1, 2) } } impl DisplayAs for FindFilesExec { fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { - write!( - f, - "FindFilesExec[schema={:?}, files={:?}]", - self.schema, self.files - ) + write!(f, "FindFilesExec[schema={:?}, files={:?}]", 1, 2) } } @@ -84,7 +71,7 @@ impl ExecutionPlan for FindFilesExec { } fn schema(&self) -> SchemaRef { - self.schema.clone() + only_file_path_schema() } fn output_partitioning(&self) -> Partitioning { diff --git a/crates/core/src/kernel/snapshot/log_data.rs b/crates/core/src/kernel/snapshot/log_data.rs index 686a4110fe..ffcad36dda 100644 --- a/crates/core/src/kernel/snapshot/log_data.rs +++ b/crates/core/src/kernel/snapshot/log_data.rs @@ -120,9 +120,9 @@ impl<'a> DeletionVectorView<'a> { } } -/// A view into the log data representiang a single logical file. +/// A view into the log data representing a single logical file. /// -/// This stuct holds a pointer to a specific row in the log data and provides access to the +/// This struct holds a pointer to a specific row in the log data and provides access to the /// information stored in that row by tracking references to the underlying arrays. /// /// Additionally, references to some table metadata is tracked to provide higher level @@ -345,6 +345,7 @@ impl<'a> FileStatsAccessor<'a> { schema: &'a StructType, ) -> DeltaResult { let paths = extract_and_cast::(data, "add.path")?; + dbg!(&paths); let sizes = extract_and_cast::(data, "add.size")?; let modification_times = extract_and_cast::(data, "add.modificationTime")?; let stats = extract_and_cast::(data, "add.stats_parsed")?; From 54be04e8a3ce23d74a75e25b25f0c207de618ba1 Mon Sep 17 00:00:00 2001 From: Stephen Carman Date: Mon, 26 Feb 2024 11:54:48 -0500 Subject: [PATCH 3/6] feat: Logical Node for find files --- .../delta_datafusion/find_files/logical.rs | 24 ++++++----- .../src/delta_datafusion/find_files/mod.rs | 42 +++++++++---------- .../delta_datafusion/find_files/physical.rs | 12 +++--- crates/core/src/kernel/snapshot/log_data.rs | 1 - 4 files changed, 40 insertions(+), 39 deletions(-) diff --git a/crates/core/src/delta_datafusion/find_files/logical.rs b/crates/core/src/delta_datafusion/find_files/logical.rs index 81aa2b264b..3d2c0345d5 100644 --- a/crates/core/src/delta_datafusion/find_files/logical.rs +++ b/crates/core/src/delta_datafusion/find_files/logical.rs @@ -1,11 +1,11 @@ use std::collections::HashSet; -use std::path::Path; -use datafusion_common::{DFSchemaRef, ToDFSchema}; +use datafusion_common::DFSchemaRef; use datafusion_expr::{Expr, LogicalPlan, UserDefinedLogicalNodeCore}; -use crate::delta_datafusion::find_files::only_file_path_schema; -use crate::kernel::EagerSnapshot; +use crate::delta_datafusion::find_files::ONLY_FILES_DF_SCHEMA; +use crate::logstore::LogStoreRef; +use crate::table::state::DeltaTableState; #[derive(Debug, Hash, Eq, PartialEq, Clone)] pub struct FindFilesNode { @@ -13,25 +13,27 @@ pub struct FindFilesNode { predicate: Expr, files: Vec, schema: DFSchemaRef, + version: i64, } impl FindFilesNode { pub fn new( id: String, - eager_snapshot: EagerSnapshot, + eager_snapshot: DeltaTableState, + log_store: LogStoreRef, predicate: Expr, ) -> datafusion_common::Result { let files: Vec = eager_snapshot - .files() - .map(|f| f.object_store_path().to_string()) + .file_paths_iter() + .map(|f| log_store.to_uri(&f)) .collect(); - Ok(Self { id, predicate, files, - schema: only_file_path_schema().to_dfschema_ref()?, + schema: ONLY_FILES_DF_SCHEMA.clone(), + version: eager_snapshot.version(), }) } @@ -68,8 +70,8 @@ impl UserDefinedLogicalNodeCore for FindFilesNode { fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { write!( f, - "FindFiles id={}, predicate={:?}, files={:?}", - &self.id, self.predicate, self.files + "FindFiles id={}, predicate={:?}, version={:?}", + &self.id, self.predicate, self.version ) } diff --git a/crates/core/src/delta_datafusion/find_files/mod.rs b/crates/core/src/delta_datafusion/find_files/mod.rs index 6ad257664d..2d2e734806 100644 --- a/crates/core/src/delta_datafusion/find_files/mod.rs +++ b/crates/core/src/delta_datafusion/find_files/mod.rs @@ -9,8 +9,9 @@ use datafusion::execution::context::{QueryPlanner, SessionState}; use datafusion::physical_plan::ExecutionPlan; use datafusion::physical_planner::{DefaultPhysicalPlanner, ExtensionPlanner, PhysicalPlanner}; use datafusion::prelude::{ParquetReadOptions, SessionContext}; -use datafusion_common::Result; +use datafusion_common::{DFSchemaRef, Result, ToDFSchema}; use datafusion_expr::{Expr, LogicalPlan, UserDefinedLogicalNode}; +use lazy_static::lazy_static; use crate::delta_datafusion::find_files::logical::FindFilesNode; use crate::delta_datafusion::find_files::physical::FindFilesExec; @@ -19,11 +20,14 @@ use crate::delta_datafusion::PATH_COLUMN; pub mod logical; pub mod physical; -#[inline] -fn only_file_path_schema() -> Arc { - let mut builder = SchemaBuilder::new(); - builder.push(Field::new(PATH_COLUMN, DataType::Utf8, false)); - Arc::new(builder.finish()) +lazy_static! { + static ref ONLY_FILES_SCHEMA: Arc = { + let mut builder = SchemaBuilder::new(); + builder.push(Field::new(PATH_COLUMN, DataType::Utf8, false)); + Arc::new(builder.finish()) + }; + static ref ONLY_FILES_DF_SCHEMA: DFSchemaRef = + ONLY_FILES_SCHEMA.clone().to_dfschema_ref().unwrap(); } struct FindFilesPlannerExtension {} @@ -69,6 +73,11 @@ impl QueryPlanner for FindFilesPlanner { async fn scan_memory_table_batch(batch: RecordBatch, predicate: Expr) -> Result { let ctx = SessionContext::new(); let mut batches = vec![]; + let columns = predicate + .to_columns()? + .into_iter() + .map(Expr::Column) + .collect::>(); if let Some(column) = batch.column_by_name(PATH_COLUMN) { let mut column_iter = column.as_string::().into_iter(); @@ -76,14 +85,16 @@ async fn scan_memory_table_batch(batch: RecordBatch, predicate: Expr) -> Result< let df = ctx .read_parquet(row, ParquetReadOptions::default()) .await? - .filter(predicate.to_owned())?; + .select(columns.clone())? + .filter(predicate.clone())? + .limit(0, Some(1))?; if df.count().await? > 0 { batches.push(row); } } } let str_array = Arc::new(StringArray::from(batches)); - RecordBatch::try_new(only_file_path_schema(), vec![str_array]).map_err(Into::into) + RecordBatch::try_new(ONLY_FILES_SCHEMA.clone(), vec![str_array]).map_err(Into::into) } #[cfg(test)] @@ -91,26 +102,15 @@ pub mod tests { use std::sync::Arc; use arrow_cast::pretty::print_batches; - use arrow_schema::{DataType, Field, Fields, Schema, SchemaBuilder}; use datafusion::prelude::{DataFrame, SessionContext}; use datafusion_expr::{col, lit, Extension, LogicalPlan}; use crate::delta_datafusion::find_files::logical::FindFilesNode; use crate::delta_datafusion::find_files::FindFilesPlanner; - use crate::delta_datafusion::PATH_COLUMN; use crate::operations::collect_sendable_stream; use crate::writer::test_utils::{create_bare_table, get_record_batch}; use crate::{DeltaOps, DeltaResult}; - #[inline] - fn find_files_schema(fields: &Fields) -> Arc { - let mut builder = SchemaBuilder::from(fields); - builder.reverse(); - builder.push(Field::new(PATH_COLUMN, DataType::Utf8, false)); - builder.reverse(); - Arc::new(builder.finish()) - } - async fn make_table() -> DeltaOps { let batch = get_record_batch(None, false); let write = DeltaOps(create_bare_table()) @@ -127,11 +127,11 @@ pub mod tests { .state() .with_query_planner(Arc::new(FindFilesPlanner {})); let table = make_table().await; - table.0.get_file_uris()?.for_each(|f| println!("{:?}", f)); let find_files_node = LogicalPlan::Extension(Extension { node: Arc::new(FindFilesNode::new( "my_cool_plan".into(), - table.0.snapshot()?.snapshot.clone(), + table.0.snapshot()?.clone(), + table.0.log_store.clone(), col("id").eq(lit("A")), )?), }); diff --git a/crates/core/src/delta_datafusion/find_files/physical.rs b/crates/core/src/delta_datafusion/find_files/physical.rs index f489a42d47..58892dff77 100644 --- a/crates/core/src/delta_datafusion/find_files/physical.rs +++ b/crates/core/src/delta_datafusion/find_files/physical.rs @@ -16,7 +16,7 @@ use datafusion_physical_expr::{Partitioning, PhysicalSortExpr}; use futures::stream::BoxStream; use futures::{Stream, StreamExt, TryStreamExt}; -use crate::delta_datafusion::find_files::{only_file_path_schema, scan_memory_table_batch}; +use crate::delta_datafusion::find_files::{scan_memory_table_batch, ONLY_FILES_SCHEMA}; pub struct FindFilesExec { files: Vec, @@ -41,7 +41,7 @@ impl<'a> FindFilesStream<'a> { impl<'a> RecordBatchStream for FindFilesStream<'a> { fn schema(&self) -> SchemaRef { - only_file_path_schema() + ONLY_FILES_SCHEMA.clone() } } @@ -71,11 +71,11 @@ impl ExecutionPlan for FindFilesExec { } fn schema(&self) -> SchemaRef { - only_file_path_schema() + ONLY_FILES_SCHEMA.clone() } fn output_partitioning(&self) -> Partitioning { - Partitioning::UnknownPartitioning(0) + Partitioning::RoundRobinBatch(num_cpus::get()) } fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { @@ -99,10 +99,10 @@ impl ExecutionPlan for FindFilesExec { _context: Arc, ) -> Result { let array = Arc::new(StringArray::from(self.files.clone())); - let record_batch = RecordBatch::try_new(only_file_path_schema(), vec![array])?; + let record_batch = RecordBatch::try_new(ONLY_FILES_SCHEMA.clone(), vec![array])?; let predicate = self.predicate.clone(); let mem_stream = - MemoryStream::try_new(vec![record_batch.clone()], only_file_path_schema(), None)? + MemoryStream::try_new(vec![record_batch.clone()], ONLY_FILES_SCHEMA.clone(), None)? .and_then(move |batch| scan_memory_table_batch(batch, predicate.clone())) .boxed(); diff --git a/crates/core/src/kernel/snapshot/log_data.rs b/crates/core/src/kernel/snapshot/log_data.rs index ffcad36dda..972aeb6f9a 100644 --- a/crates/core/src/kernel/snapshot/log_data.rs +++ b/crates/core/src/kernel/snapshot/log_data.rs @@ -345,7 +345,6 @@ impl<'a> FileStatsAccessor<'a> { schema: &'a StructType, ) -> DeltaResult { let paths = extract_and_cast::(data, "add.path")?; - dbg!(&paths); let sizes = extract_and_cast::(data, "add.size")?; let modification_times = extract_and_cast::(data, "add.modificationTime")?; let stats = extract_and_cast::(data, "add.stats_parsed")?; From 6301ef6a11c1d27eeb8f2bd0a501221a8e9009ba Mon Sep 17 00:00:00 2001 From: Stephen Carman Date: Sat, 2 Mar 2024 16:40:16 -0500 Subject: [PATCH 4/6] feat: Logical Node for find files --- .../delta_datafusion/find_files/logical.rs | 55 ++-- .../src/delta_datafusion/find_files/mod.rs | 241 ++++++++++++++---- .../delta_datafusion/find_files/physical.rs | 69 +++-- 3 files changed, 274 insertions(+), 91 deletions(-) diff --git a/crates/core/src/delta_datafusion/find_files/logical.rs b/crates/core/src/delta_datafusion/find_files/logical.rs index 3d2c0345d5..6234cbe5c2 100644 --- a/crates/core/src/delta_datafusion/find_files/logical.rs +++ b/crates/core/src/delta_datafusion/find_files/logical.rs @@ -1,4 +1,5 @@ use std::collections::HashSet; +use std::hash::{Hash, Hasher}; use datafusion_common::DFSchemaRef; use datafusion_expr::{Expr, LogicalPlan, UserDefinedLogicalNodeCore}; @@ -7,42 +8,58 @@ use crate::delta_datafusion::find_files::ONLY_FILES_DF_SCHEMA; use crate::logstore::LogStoreRef; use crate::table::state::DeltaTableState; -#[derive(Debug, Hash, Eq, PartialEq, Clone)] +#[derive(Debug, Clone)] pub struct FindFilesNode { id: String, predicate: Expr, - files: Vec, - schema: DFSchemaRef, + table_state: DeltaTableState, + log_store: LogStoreRef, version: i64, } impl FindFilesNode { pub fn new( id: String, - eager_snapshot: DeltaTableState, + table_state: DeltaTableState, log_store: LogStoreRef, predicate: Expr, ) -> datafusion_common::Result { - let files: Vec = eager_snapshot - .file_paths_iter() - .map(|f| log_store.to_uri(&f)) - .collect(); - + let version = table_state.version(); Ok(Self { id, predicate, - files, - schema: ONLY_FILES_DF_SCHEMA.clone(), - version: eager_snapshot.version(), + log_store, + table_state, + + version, }) } - pub fn predicate(&self) -> &Expr { - &self.predicate + pub fn predicate(&self) -> Expr { + self.predicate.clone() + } + + pub fn state(&self) -> DeltaTableState { + self.table_state.clone() + } + + pub fn log_store(&self) -> LogStoreRef { + self.log_store.clone() } +} + +impl Eq for FindFilesNode {} + +impl PartialEq for FindFilesNode { + fn eq(&self, other: &Self) -> bool { + self.id == other.id + } +} - pub fn files(&self) -> Vec { - self.files.clone() +impl Hash for FindFilesNode { + fn hash(&self, state: &mut H) { + state.write(self.id.as_bytes()); + state.finish(); } } @@ -56,7 +73,7 @@ impl UserDefinedLogicalNodeCore for FindFilesNode { } fn schema(&self) -> &DFSchemaRef { - &self.schema + &ONLY_FILES_DF_SCHEMA } fn expressions(&self) -> Vec { @@ -70,8 +87,8 @@ impl UserDefinedLogicalNodeCore for FindFilesNode { fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { write!( f, - "FindFiles id={}, predicate={:?}, version={:?}", - &self.id, self.predicate, self.version + "FindFiles[id={}, predicate=\"{}\", version={}]", + &self.id, self.predicate, self.version, ) } diff --git a/crates/core/src/delta_datafusion/find_files/mod.rs b/crates/core/src/delta_datafusion/find_files/mod.rs index 2d2e734806..2f826ae766 100644 --- a/crates/core/src/delta_datafusion/find_files/mod.rs +++ b/crates/core/src/delta_datafusion/find_files/mod.rs @@ -1,21 +1,31 @@ use std::sync::Arc; -use arrow_array::cast::AsArray; -use arrow_array::{RecordBatch, StringArray}; +use arrow_array::RecordBatch; use arrow_schema::SchemaBuilder; -use arrow_schema::{DataType, Field, Schema}; +use arrow_schema::{ArrowError, DataType, Field, Schema, SchemaRef}; +use arrow_select::concat::concat_batches; use async_trait::async_trait; +use datafusion::datasource::MemTable; use datafusion::execution::context::{QueryPlanner, SessionState}; +use datafusion::execution::TaskContext; +use datafusion::physical_plan::filter::FilterExec; +use datafusion::physical_plan::limit::LocalLimitExec; use datafusion::physical_plan::ExecutionPlan; use datafusion::physical_planner::{DefaultPhysicalPlanner, ExtensionPlanner, PhysicalPlanner}; -use datafusion::prelude::{ParquetReadOptions, SessionContext}; +use datafusion::prelude::SessionContext; use datafusion_common::{DFSchemaRef, Result, ToDFSchema}; -use datafusion_expr::{Expr, LogicalPlan, UserDefinedLogicalNode}; +use datafusion_expr::{col, Expr, LogicalPlan, UserDefinedLogicalNode}; +use datafusion_physical_expr::create_physical_expr; use lazy_static::lazy_static; use crate::delta_datafusion::find_files::logical::FindFilesNode; use crate::delta_datafusion::find_files::physical::FindFilesExec; -use crate::delta_datafusion::PATH_COLUMN; +use crate::delta_datafusion::{ + df_logical_schema, register_store, DeltaScanBuilder, DeltaScanConfigBuilder, PATH_COLUMN, +}; +use crate::logstore::LogStoreRef; +use crate::table::state::DeltaTableState; +use crate::DeltaTableError; pub mod logical; pub mod physical; @@ -23,7 +33,11 @@ pub mod physical; lazy_static! { static ref ONLY_FILES_SCHEMA: Arc = { let mut builder = SchemaBuilder::new(); - builder.push(Field::new(PATH_COLUMN, DataType::Utf8, false)); + builder.push(Field::new( + PATH_COLUMN, + DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8)), + false, + )); Arc::new(builder.finish()) }; static ref ONLY_FILES_DF_SCHEMA: DFSchemaRef = @@ -46,8 +60,9 @@ impl ExtensionPlanner for FindFilesPlannerExtension { ) -> Result>> { if let Some(find_files_node) = node.as_any().downcast_ref::() { return Ok(Some(Arc::new(FindFilesExec::new( - find_files_node.files(), - find_files_node.predicate().clone(), + find_files_node.state(), + find_files_node.log_store(), + find_files_node.predicate(), )?))); } Ok(None) @@ -70,81 +85,199 @@ impl QueryPlanner for FindFilesPlanner { } } -async fn scan_memory_table_batch(batch: RecordBatch, predicate: Expr) -> Result { +async fn scan_table_by_partitions(batch: RecordBatch, predicate: Expr) -> Result { + let mut arrays = Vec::new(); + let mut fields = Vec::new(); + + let schema = batch.schema(); + + arrays.push( + batch + .column_by_name("path") + .ok_or(DeltaTableError::Generic( + "Column with name `path` does not exist".to_owned(), + ))? + .to_owned(), + ); + fields.push(Field::new(PATH_COLUMN, DataType::Utf8, false)); + + for field in schema.fields() { + if field.name().starts_with("partition.") { + let name = field.name().strip_prefix("partition.").unwrap(); + + arrays.push(batch.column_by_name(field.name()).unwrap().to_owned()); + fields.push(Field::new( + name, + field.data_type().to_owned(), + field.is_nullable(), + )); + } + } + + let schema = Arc::new(Schema::new(fields)); + let batch = RecordBatch::try_new(schema, arrays)?; + let mem_table = MemTable::try_new(batch.schema(), vec![vec![batch]])?; + let ctx = SessionContext::new(); - let mut batches = vec![]; - let columns = predicate + let mut df = ctx.read_table(Arc::new(mem_table))?; + df = df + .filter(predicate.to_owned())? + .select(vec![col(PATH_COLUMN)])?; + let df_schema = df.schema().clone(); + let batches = df.collect().await?; + Ok(concat_batches(&SchemaRef::from(df_schema), &batches)?) +} + +async fn scan_table_by_files( + snapshot: DeltaTableState, + log_store: LogStoreRef, + state: SessionState, + expression: Expr, +) -> Result { + register_store(log_store.clone(), state.runtime_env().clone()); + let scan_config = DeltaScanConfigBuilder { + include_file_column: true, + ..Default::default() + } + .build(&snapshot)?; + + let logical_schema = df_logical_schema(&snapshot, &scan_config)?; + + // Identify which columns we need to project + let mut used_columns = expression .to_columns()? .into_iter() - .map(Expr::Column) - .collect::>(); - - if let Some(column) = batch.column_by_name(PATH_COLUMN) { - let mut column_iter = column.as_string::().into_iter(); - while let Some(Some(row)) = column_iter.next() { - let df = ctx - .read_parquet(row, ParquetReadOptions::default()) - .await? - .select(columns.clone())? - .filter(predicate.clone())? - .limit(0, Some(1))?; - if df.count().await? > 0 { - batches.push(row); - } - } - } - let str_array = Arc::new(StringArray::from(batches)); - RecordBatch::try_new(ONLY_FILES_SCHEMA.clone(), vec![str_array]).map_err(Into::into) + .map(|column| logical_schema.index_of(&column.name)) + .collect::, ArrowError>>()?; + // Add path column + used_columns.push(logical_schema.index_of(scan_config.file_column_name.as_ref().unwrap())?); + + let scan = DeltaScanBuilder::new(&snapshot, log_store, &state) + .with_filter(Some(expression.clone())) + .with_projection(Some(&used_columns)) + .with_scan_config(scan_config) + .build() + .await?; + + let scan = Arc::new(scan); + let input_schema = scan.logical_schema.as_ref().to_owned(); + let input_dfschema = input_schema.clone().try_into()?; + + let predicate_expr = create_physical_expr( + &Expr::IsTrue(Box::new(expression.clone())), + &input_dfschema, + state.execution_props(), + )?; + + let filter: Arc = + Arc::new(FilterExec::try_new(predicate_expr, scan.clone())?); + let limit: Arc = Arc::new(LocalLimitExec::new(filter, 1)); + let field_idx = input_schema.index_of(PATH_COLUMN)?; + let task_ctx = Arc::new(TaskContext::from(&state)); + let path_batches: Vec = datafusion::physical_plan::collect(limit, task_ctx) + .await? + .into_iter() + .map(|batch| batch.project(&[field_idx]).unwrap()) + .collect(); + + let result_batches = concat_batches(&ONLY_FILES_SCHEMA.clone(), &path_batches)?; + + Ok(result_batches) } #[cfg(test)] pub mod tests { use std::sync::Arc; - use arrow_cast::pretty::print_batches; use datafusion::prelude::{DataFrame, SessionContext}; - use datafusion_expr::{col, lit, Extension, LogicalPlan}; + use datafusion_common::{assert_batches_eq, assert_batches_sorted_eq}; + use datafusion_expr::{col, lit, Expr, Extension, LogicalPlan}; use crate::delta_datafusion::find_files::logical::FindFilesNode; use crate::delta_datafusion::find_files::FindFilesPlanner; use crate::operations::collect_sendable_stream; - use crate::writer::test_utils::{create_bare_table, get_record_batch}; - use crate::{DeltaOps, DeltaResult}; + use crate::{DeltaResult, DeltaTable, DeltaTableError}; - async fn make_table() -> DeltaOps { - let batch = get_record_batch(None, false); - let write = DeltaOps(create_bare_table()) - .write(vec![batch.clone()]) - .await - .unwrap(); - DeltaOps(write) - } - - #[tokio::test] - pub async fn test_find_files() -> DeltaResult<()> { + pub async fn test_plan<'a>( + table: DeltaTable, + expr: Expr, + ) -> Result, DeltaTableError> { let ctx = SessionContext::new(); let state = ctx .state() .with_query_planner(Arc::new(FindFilesPlanner {})); - let table = make_table().await; let find_files_node = LogicalPlan::Extension(Extension { node: Arc::new(FindFilesNode::new( "my_cool_plan".into(), - table.0.snapshot()?.clone(), - table.0.log_store.clone(), - col("id").eq(lit("A")), + table.snapshot()?.clone(), + table.log_store().clone(), + expr, )?), }); let df = DataFrame::new(state.clone(), find_files_node); + let p = state .clone() .create_physical_plan(df.logical_plan()) - .await - .unwrap(); + .await?; let e = p.execute(0, state.task_ctx())?; - let s = collect_sendable_stream(e).await.unwrap(); - print_batches(&s)?; + collect_sendable_stream(e).await.map_err(Into::into) + } + + #[tokio::test] + pub async fn test_find_files_partitioned() -> DeltaResult<()> { + let table = crate::open_table("../test/tests/data/delta-0.8.0-partitioned").await?; + let expr: Expr = col("year").eq(lit(2020)); + let s = test_plan(table, expr).await?; + + assert_batches_eq! { + ["+---------------------------------------------------------------------------------------------+", + "| __delta_rs_path |", + "+---------------------------------------------------------------------------------------------+", + "| year=2020/month=1/day=1/part-00000-8eafa330-3be9-4a39-ad78-fd13c2027c7e.c000.snappy.parquet |", + "| year=2020/month=2/day=3/part-00000-94d16827-f2fd-42cd-a060-f67ccc63ced9.c000.snappy.parquet |", + "| year=2020/month=2/day=5/part-00000-89cdd4c8-2af7-4add-8ea3-3990b2f027b5.c000.snappy.parquet |", + "+---------------------------------------------------------------------------------------------+"], + &s + } + Ok(()) + } + + #[tokio::test] + pub async fn test_find_files_unpartitioned() -> DeltaResult<()> { + let table = crate::open_table("../test/tests/data/simple_table").await?; + let expr: Expr = col("id").in_list(vec![lit(9i64), lit(7i64)], false); + let s = test_plan(table, expr).await?; + + assert_batches_sorted_eq! { + ["+---------------------------------------------------------------------+", + "| __delta_rs_path |", + "+---------------------------------------------------------------------+", + "| part-00007-3a0e4727-de0d-41b6-81ef-5223cf40f025-c000.snappy.parquet |", + "| part-00004-315835fe-fb44-4562-98f6-5e6cfa3ae45d-c000.snappy.parquet |", + "+---------------------------------------------------------------------+"], + &s + } + Ok(()) + } + + #[tokio::test] + pub async fn test_find_files_unpartitioned2() -> DeltaResult<()> { + let table = crate::open_table("../test/tests/data/simple_table").await?; + let expr: Expr = col("id").is_not_null(); + let s = test_plan(table, expr).await?; + + assert_batches_sorted_eq! { + ["+---------------------------------------------------------------------+", + "| __delta_rs_path |", + "+---------------------------------------------------------------------+", + "| part-00001-7891c33d-cedc-47c3-88a6-abcfb049d3b4-c000.snappy.parquet |", + "| part-00004-315835fe-fb44-4562-98f6-5e6cfa3ae45d-c000.snappy.parquet |", + "| part-00007-3a0e4727-de0d-41b6-81ef-5223cf40f025-c000.snappy.parquet |", + "+---------------------------------------------------------------------+"], + &s + } Ok(()) } } diff --git a/crates/core/src/delta_datafusion/find_files/physical.rs b/crates/core/src/delta_datafusion/find_files/physical.rs index 58892dff77..9b9238dd86 100644 --- a/crates/core/src/delta_datafusion/find_files/physical.rs +++ b/crates/core/src/delta_datafusion/find_files/physical.rs @@ -1,31 +1,42 @@ use std::any::Any; use std::fmt::{Debug, Formatter}; - use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; -use arrow_array::{RecordBatch, StringArray}; +use arrow_array::RecordBatch; use arrow_schema::SchemaRef; use datafusion::error::Result; use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext}; use datafusion::physical_plan::memory::MemoryStream; use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan}; +use datafusion::prelude::SessionContext; +use datafusion_common::tree_node::TreeNode; use datafusion_expr::Expr; use datafusion_physical_expr::{Partitioning, PhysicalSortExpr}; use futures::stream::BoxStream; -use futures::{Stream, StreamExt, TryStreamExt}; +use futures::{FutureExt, Stream, StreamExt, TryStreamExt}; -use crate::delta_datafusion::find_files::{scan_memory_table_batch, ONLY_FILES_SCHEMA}; +use crate::delta_datafusion::find_files::{ + scan_table_by_files, scan_table_by_partitions, ONLY_FILES_SCHEMA, +}; +use crate::delta_datafusion::FindFilesExprProperties; +use crate::logstore::LogStoreRef; +use crate::table::state::DeltaTableState; pub struct FindFilesExec { - files: Vec, predicate: Expr, + state: DeltaTableState, + log_store: LogStoreRef, } impl FindFilesExec { - pub fn new(files: Vec, predicate: Expr) -> Result { - Ok(Self { files, predicate }) + pub fn new(state: DeltaTableState, log_store: LogStoreRef, predicate: Expr) -> Result { + Ok(Self { + predicate, + log_store, + state, + }) } } @@ -55,13 +66,13 @@ impl<'a> Stream for FindFilesStream<'a> { impl Debug for FindFilesExec { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "FindFilesExec[schema={:?}, files={:?}]", 1, 2) + write!(f, "FindFilesExec[predicate=\"{}\"]", self.predicate) } } impl DisplayAs for FindFilesExec { fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { - write!(f, "FindFilesExec[schema={:?}, files={:?}]", 1, 2) + write!(f, "FindFilesExec[predicate=\"{}\"]", self.predicate) } } @@ -98,14 +109,36 @@ impl ExecutionPlan for FindFilesExec { _partition: usize, _context: Arc, ) -> Result { - let array = Arc::new(StringArray::from(self.files.clone())); - let record_batch = RecordBatch::try_new(ONLY_FILES_SCHEMA.clone(), vec![array])?; - let predicate = self.predicate.clone(); - let mem_stream = - MemoryStream::try_new(vec![record_batch.clone()], ONLY_FILES_SCHEMA.clone(), None)? - .and_then(move |batch| scan_memory_table_batch(batch, predicate.clone())) - .boxed(); - - Ok(Box::pin(FindFilesStream::new(mem_stream)?)) + let current_metadata = self.state.metadata(); + let mut expr_properties = FindFilesExprProperties { + partition_only: true, + partition_columns: current_metadata.partition_columns.clone(), + result: Ok(()), + }; + + TreeNode::visit(&self.predicate, &mut expr_properties)?; + expr_properties.result?; + + if expr_properties.partition_only { + let actions_table = self.state.add_actions_table(true)?; + let predicate = self.predicate.clone(); + let schema = actions_table.schema(); + let mem_stream = + MemoryStream::try_new(vec![actions_table.clone()], schema.clone(), None)? + .and_then(move |batch| scan_table_by_partitions(batch, predicate.clone())) + .boxed(); + + Ok(Box::pin(FindFilesStream::new(mem_stream)?)) + } else { + let ctx = SessionContext::new(); + let state = ctx.state(); + let table_state = self.state.clone(); + let predicate = self.predicate.clone(); + let output_files = + scan_table_by_files(table_state, self.log_store.clone(), state, predicate); + + let mem_stream = output_files.into_stream().boxed(); + Ok(Box::pin(FindFilesStream::new(mem_stream)?)) + } } } From 26949ce5c6e876214c4630a9582f93b4f6e6040f Mon Sep 17 00:00:00 2001 From: Stephen Carman Date: Sun, 17 Mar 2024 14:44:15 -0400 Subject: [PATCH 5/6] feat: Logical Node for find files - added the ability to determine whether or not to wrap partition columns in dictionary encodings, this is on by default --- crates/benchmarks/src/bin/merge.rs | 1 + .../src/delta_datafusion/find_files/mod.rs | 15 +++---- crates/core/src/delta_datafusion/mod.rs | 45 +++++++++++++------ 3 files changed, 37 insertions(+), 24 deletions(-) diff --git a/crates/benchmarks/src/bin/merge.rs b/crates/benchmarks/src/bin/merge.rs index ea43171052..ddfe0bb239 100644 --- a/crates/benchmarks/src/bin/merge.rs +++ b/crates/benchmarks/src/bin/merge.rs @@ -199,6 +199,7 @@ async fn benchmark_merge_tpcds( table.snapshot()?.clone(), table.log_store(), DeltaScanConfig { + wrap_partition_values: true, file_column_name: Some("file_path".to_string()), }, ) diff --git a/crates/core/src/delta_datafusion/find_files/mod.rs b/crates/core/src/delta_datafusion/find_files/mod.rs index 2f826ae766..7bcc4c1b2d 100644 --- a/crates/core/src/delta_datafusion/find_files/mod.rs +++ b/crates/core/src/delta_datafusion/find_files/mod.rs @@ -33,11 +33,7 @@ pub mod physical; lazy_static! { static ref ONLY_FILES_SCHEMA: Arc = { let mut builder = SchemaBuilder::new(); - builder.push(Field::new( - PATH_COLUMN, - DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8)), - false, - )); + builder.push(Field::new(PATH_COLUMN, DataType::Utf8, false)); Arc::new(builder.finish()) }; static ref ONLY_FILES_DF_SCHEMA: DFSchemaRef = @@ -135,11 +131,10 @@ async fn scan_table_by_files( expression: Expr, ) -> Result { register_store(log_store.clone(), state.runtime_env().clone()); - let scan_config = DeltaScanConfigBuilder { - include_file_column: true, - ..Default::default() - } - .build(&snapshot)?; + let scan_config = DeltaScanConfigBuilder::new() + .wrap_partition_values(false) + .with_file_column(true) + .build(&snapshot)?; let logical_schema = df_logical_schema(&snapshot, &scan_config)?; diff --git a/crates/core/src/delta_datafusion/mod.rs b/crates/core/src/delta_datafusion/mod.rs index b9ff5d81b0..057dcaca87 100644 --- a/crates/core/src/delta_datafusion/mod.rs +++ b/crates/core/src/delta_datafusion/mod.rs @@ -298,8 +298,9 @@ pub(crate) fn register_store(store: LogStoreRef, env: Arc) { env.register_object_store(url, store.object_store()); } -/// The logical schema for a Deltatable is different then protocol level schema since partiton columns must appear at the end of the schema. -/// This is to align with how partition are handled at the physical level +/// The logical schema for a Deltatable is different from the protocol level schema since partition +/// columns must appear at the end of the schema. This is to align with how partition are handled +/// at the physical level pub(crate) fn df_logical_schema( snapshot: &DeltaTableState, scan_config: &DeltaScanConfig, @@ -324,11 +325,7 @@ pub(crate) fn df_logical_schema( } if let Some(file_column_name) = &scan_config.file_column_name { - fields.push(Arc::new(Field::new( - file_column_name, - arrow_schema::DataType::Utf8, - true, - ))); + fields.push(Arc::new(Field::new(file_column_name, DataType::Utf8, true))); } Ok(Arc::new(ArrowSchema::new(fields))) @@ -337,13 +334,15 @@ pub(crate) fn df_logical_schema( #[derive(Debug, Clone, Default)] /// Used to specify if additional metadata columns are exposed to the user pub struct DeltaScanConfigBuilder { - /// Include the source path for each record. The name of this column is determine by `file_column_name` + /// Include the source path for each record. The name of this column is determined by `file_column_name` include_file_column: bool, /// Column name that contains the source path. /// /// If include_file_column is true and the name is None then it will be auto-generated /// Otherwise the user provided name will be used file_column_name: Option, + /// Whether to wrap partition values in a dictionary encoding to potentially save space + wrap_partition_values: Option, } impl DeltaScanConfigBuilder { @@ -367,6 +366,12 @@ impl DeltaScanConfigBuilder { self } + /// Whether to wrap partition values in a dictionary encoding + pub fn wrap_partition_values(mut self, wrap: bool) -> Self { + self.wrap_partition_values = Some(wrap); + self + } + /// Build a DeltaScanConfig and ensure no column name conflicts occur during downstream processing pub fn build(&self, snapshot: &DeltaTableState) -> DeltaResult { let input_schema = snapshot.input_schema()?; @@ -403,7 +408,10 @@ impl DeltaScanConfigBuilder { } } - Ok(DeltaScanConfig { file_column_name }) + Ok(DeltaScanConfig { + file_column_name, + wrap_partition_values: self.wrap_partition_values.unwrap_or(true), + }) } } @@ -412,6 +420,8 @@ impl DeltaScanConfigBuilder { pub struct DeltaScanConfig { /// Include the source path for each record pub file_column_name: Option, + /// Wrap partition values in a dictionary encoding + pub wrap_partition_values: bool, } #[derive(Debug)] @@ -536,10 +546,12 @@ impl<'a> DeltaScanBuilder<'a> { let mut part = partitioned_file_from_action(action, table_partition_cols, &schema); if config.file_column_name.is_some() { - part.partition_values - .push(wrap_partition_value_in_dict(ScalarValue::Utf8(Some( - action.path.clone(), - )))); + let partition_value = if config.wrap_partition_values { + wrap_partition_value_in_dict(ScalarValue::Utf8(Some(action.path.clone()))) + } else { + ScalarValue::Utf8(Some(action.path.clone())) + }; + part.partition_values.push(partition_value); } file_groups @@ -563,9 +575,14 @@ impl<'a> DeltaScanBuilder<'a> { .collect::, ArrowError>>()?; if let Some(file_column_name) = &config.file_column_name { + let field_name_datatype = if config.wrap_partition_values { + wrap_partition_type_in_dict(DataType::Utf8) + } else { + DataType::Utf8 + }; table_partition_cols.push(Field::new( file_column_name.clone(), - wrap_partition_type_in_dict(DataType::Utf8), + field_name_datatype, false, )); } From d33d8d33817e23234754747398e6933e6a991a9a Mon Sep 17 00:00:00 2001 From: Stephen Carman Date: Sun, 24 Mar 2024 11:04:05 -0400 Subject: [PATCH 6/6] feat: Logical Node for find files - removed turning off dictionary encoding, now just turn it on and rebuild the batch --- crates/core/src/delta_datafusion/find_files/mod.rs | 12 ++++++++++-- python/src/lib.rs | 1 - python/src/schema.rs | 1 - 3 files changed, 10 insertions(+), 4 deletions(-) diff --git a/crates/core/src/delta_datafusion/find_files/mod.rs b/crates/core/src/delta_datafusion/find_files/mod.rs index 7bcc4c1b2d..65d113ee5b 100644 --- a/crates/core/src/delta_datafusion/find_files/mod.rs +++ b/crates/core/src/delta_datafusion/find_files/mod.rs @@ -1,5 +1,7 @@ +use arrow_array::cast::AsArray; use std::sync::Arc; +use arrow_array::types::UInt16Type; use arrow_array::RecordBatch; use arrow_schema::SchemaBuilder; use arrow_schema::{ArrowError, DataType, Field, Schema, SchemaRef}; @@ -132,7 +134,7 @@ async fn scan_table_by_files( ) -> Result { register_store(log_store.clone(), state.runtime_env().clone()); let scan_config = DeltaScanConfigBuilder::new() - .wrap_partition_values(false) + .wrap_partition_values(true) .with_file_column(true) .build(&snapshot)?; @@ -172,7 +174,13 @@ async fn scan_table_by_files( let path_batches: Vec = datafusion::physical_plan::collect(limit, task_ctx) .await? .into_iter() - .map(|batch| batch.project(&[field_idx]).unwrap()) + .map(|batch| { + let col = batch + .column(field_idx) + .as_dictionary::() + .values(); + RecordBatch::try_from_iter(vec![(PATH_COLUMN, col.clone())]).unwrap() + }) .collect(); let result_batches = concat_batches(&ONLY_FILES_SCHEMA.clone(), &path_batches)?; diff --git a/python/src/lib.rs b/python/src/lib.rs index 445ef921ea..2401b17a0e 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -6,7 +6,6 @@ mod schema; mod utils; use std::collections::{HashMap, HashSet}; -use std::convert::TryFrom; use std::future::IntoFuture; use std::sync::Arc; use std::time; diff --git a/python/src/schema.rs b/python/src/schema.rs index c56010f131..0d0823cbff 100644 --- a/python/src/schema.rs +++ b/python/src/schema.rs @@ -13,7 +13,6 @@ use deltalake::kernel::{ use pyo3::exceptions::{PyException, PyNotImplementedError, PyTypeError, PyValueError}; use pyo3::prelude::*; use pyo3::types::IntoPyDict; -use pyo3::{PyRef, PyResult}; use std::collections::HashMap; // PyO3 doesn't yet support converting classes with inheritance with Python