From f602ef97f572e03e7b70e1b3bafde3002a584956 Mon Sep 17 00:00:00 2001 From: Alex Wilcoxson Date: Sun, 23 Jun 2024 20:27:58 -0500 Subject: [PATCH 1/5] feat: report DataFusion metrics for DeltaScan --- crates/core/src/delta_datafusion/mod.rs | 39 ++++++++++++++++++++----- 1 file changed, 32 insertions(+), 7 deletions(-) diff --git a/crates/core/src/delta_datafusion/mod.rs b/crates/core/src/delta_datafusion/mod.rs index 723c2bea03..b188b62677 100644 --- a/crates/core/src/delta_datafusion/mod.rs +++ b/crates/core/src/delta_datafusion/mod.rs @@ -51,9 +51,9 @@ use datafusion::execution::FunctionRegistry; use datafusion::physical_optimizer::pruning::PruningPredicate; use datafusion::physical_plan::filter::FilterExec; use datafusion::physical_plan::limit::LocalLimitExec; +use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder, MetricsSet}; use datafusion::physical_plan::{ - DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, SendableRecordBatchStream, - Statistics, + DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, SendableRecordBatchStream, Statistics }; use datafusion_common::scalar::ScalarValue; use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor}; @@ -535,14 +535,19 @@ impl<'a> DeltaScanBuilder<'a> { .map(|expr| context.create_physical_expr(expr, &df_schema).unwrap()); // Perform Pruning of files to scan - let files = match self.files { - Some(files) => files.to_owned(), + let (files, files_scanned, files_pruned) = match self.files { + Some(files) => { + let files = files.to_owned(); + let files_scanned = files.len(); + (files, files_scanned, 0) + } None => { if let Some(predicate) = &logical_filter { let pruning_predicate = PruningPredicate::try_new(predicate.clone(), logical_schema.clone())?; let files_to_prune = pruning_predicate.prune(self.snapshot)?; - self.snapshot + let mut files_pruned = 0usize; + let files = self.snapshot .file_actions_iter()? .zip(files_to_prune.into_iter()) .filter_map( @@ -550,13 +555,19 @@ impl<'a> DeltaScanBuilder<'a> { if keep { Some(action.to_owned()) } else { + files_pruned += 1; None } }, ) - .collect() + .collect::>(); + + let files_scanned = files.len(); + (files, files_scanned, files_pruned) } else { - self.snapshot.file_actions()? + let files = self.snapshot.file_actions()?; + let files_scanned = files.len(); + (files, files_scanned, 0) } } }; @@ -644,11 +655,16 @@ impl<'a> DeltaScanBuilder<'a> { ) .await?; + let metrics = ExecutionPlanMetricsSet::new(); + MetricBuilder::new(&metrics).global_counter("files_scanned").add(files_scanned); + MetricBuilder::new(&metrics).global_counter("files_pruned").add(files_pruned); + Ok(DeltaScan { table_uri: ensure_table_uri(self.log_store.root_uri())?.as_str().into(), parquet_scan: scan, config, logical_schema, + metrics, }) } } @@ -802,6 +818,7 @@ pub struct DeltaScan { pub parquet_scan: Arc, /// The schema of the table to be used when evaluating expressions pub logical_schema: Arc, + metrics: ExecutionPlanMetricsSet, } #[derive(Debug, Serialize, Deserialize)] @@ -849,6 +866,7 @@ impl ExecutionPlan for DeltaScan { config: self.config.clone(), parquet_scan: children[0].clone(), logical_schema: self.logical_schema.clone(), + metrics: self.metrics.clone(), })) } @@ -860,6 +878,10 @@ impl ExecutionPlan for DeltaScan { self.parquet_scan.execute(partition, context) } + fn metrics(&self) -> Option { + Some(self.metrics.clone_inner()) + } + fn statistics(&self) -> DataFusionResult { self.parquet_scan.statistics() } @@ -875,6 +897,7 @@ impl ExecutionPlan for DeltaScan { config: self.config.clone(), parquet_scan, logical_schema: self.logical_schema.clone(), + metrics: self.metrics.clone(), }))) } else { Ok(None) @@ -1236,6 +1259,7 @@ impl PhysicalExtensionCodec for DeltaPhysicalCodec { parquet_scan: (*inputs)[0].clone(), config: wire.config, logical_schema: wire.logical_schema, + metrics: ExecutionPlanMetricsSet::new(), }; Ok(Arc::new(delta_scan)) } @@ -1928,6 +1952,7 @@ mod tests { parquet_scan: Arc::from(EmptyExec::new(schema.clone())), config: DeltaScanConfig::default(), logical_schema: schema.clone(), + metrics: ExecutionPlanMetricsSet::new(), }); let proto: protobuf::PhysicalPlanNode = protobuf::PhysicalPlanNode::try_from_physical_plan(exec_plan.clone(), &codec) From db09b80de131d84a8d20456b479b556b91d14b5b Mon Sep 17 00:00:00 2001 From: Alex Wilcoxson Date: Sun, 23 Jun 2024 20:29:40 -0500 Subject: [PATCH 2/5] fix: fmt --- crates/core/src/delta_datafusion/mod.rs | 29 ++++++++++++++----------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/crates/core/src/delta_datafusion/mod.rs b/crates/core/src/delta_datafusion/mod.rs index b188b62677..466fd34763 100644 --- a/crates/core/src/delta_datafusion/mod.rs +++ b/crates/core/src/delta_datafusion/mod.rs @@ -547,19 +547,18 @@ impl<'a> DeltaScanBuilder<'a> { PruningPredicate::try_new(predicate.clone(), logical_schema.clone())?; let files_to_prune = pruning_predicate.prune(self.snapshot)?; let mut files_pruned = 0usize; - let files = self.snapshot + let files = self + .snapshot .file_actions_iter()? .zip(files_to_prune.into_iter()) - .filter_map( - |(action, keep)| { - if keep { - Some(action.to_owned()) - } else { - files_pruned += 1; - None - } - }, - ) + .filter_map(|(action, keep)| { + if keep { + Some(action.to_owned()) + } else { + files_pruned += 1; + None + } + }) .collect::>(); let files_scanned = files.len(); @@ -656,8 +655,12 @@ impl<'a> DeltaScanBuilder<'a> { .await?; let metrics = ExecutionPlanMetricsSet::new(); - MetricBuilder::new(&metrics).global_counter("files_scanned").add(files_scanned); - MetricBuilder::new(&metrics).global_counter("files_pruned").add(files_pruned); + MetricBuilder::new(&metrics) + .global_counter("files_scanned") + .add(files_scanned); + MetricBuilder::new(&metrics) + .global_counter("files_pruned") + .add(files_pruned); Ok(DeltaScan { table_uri: ensure_table_uri(self.log_store.root_uri())?.as_str().into(), From 8c776229f71c8ed2ac83af64dc7fede9933e5113 Mon Sep 17 00:00:00 2001 From: Alex Wilcoxson Date: Sun, 23 Jun 2024 20:30:44 -0500 Subject: [PATCH 3/5] fix: fmt --- crates/core/src/delta_datafusion/mod.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/core/src/delta_datafusion/mod.rs b/crates/core/src/delta_datafusion/mod.rs index 466fd34763..8487ff5891 100644 --- a/crates/core/src/delta_datafusion/mod.rs +++ b/crates/core/src/delta_datafusion/mod.rs @@ -53,7 +53,8 @@ use datafusion::physical_plan::filter::FilterExec; use datafusion::physical_plan::limit::LocalLimitExec; use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder, MetricsSet}; use datafusion::physical_plan::{ - DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, SendableRecordBatchStream, Statistics + DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, SendableRecordBatchStream, + Statistics, }; use datafusion_common::scalar::ScalarValue; use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor}; From ba3f24510bb9d1ec35a6f3be9b1199daa2f6a163 Mon Sep 17 00:00:00 2001 From: Alex Wilcoxson Date: Mon, 24 Jun 2024 16:14:09 -0500 Subject: [PATCH 4/5] fix: doc string --- crates/core/src/delta_datafusion/mod.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/crates/core/src/delta_datafusion/mod.rs b/crates/core/src/delta_datafusion/mod.rs index 8487ff5891..32933d694f 100644 --- a/crates/core/src/delta_datafusion/mod.rs +++ b/crates/core/src/delta_datafusion/mod.rs @@ -822,6 +822,7 @@ pub struct DeltaScan { pub parquet_scan: Arc, /// The schema of the table to be used when evaluating expressions pub logical_schema: Arc, + /// Metrics for scan reported via DataFusion metrics: ExecutionPlanMetricsSet, } From 5cf4a3eb407afce0864f63621ffc7e53182ba0a9 Mon Sep 17 00:00:00 2001 From: Alex Wilcoxson Date: Thu, 27 Jun 2024 10:22:16 -0500 Subject: [PATCH 5/5] test: add DeltaScan metric tests --- crates/core/tests/integration_datafusion.rs | 41 +++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/crates/core/tests/integration_datafusion.rs b/crates/core/tests/integration_datafusion.rs index cb3cc41edb..27f942b581 100644 --- a/crates/core/tests/integration_datafusion.rs +++ b/crates/core/tests/integration_datafusion.rs @@ -65,6 +65,8 @@ mod local { #[derive(Debug, Default)] pub struct ExecutionMetricsCollector { scanned_files: HashSet