Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: report DataFusion metrics for DeltaScan #2617

Merged
merged 8 commits into from
Jun 28, 2024
58 changes: 44 additions & 14 deletions crates/core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ use datafusion::execution::FunctionRegistry;
use datafusion::physical_optimizer::pruning::PruningPredicate;
use datafusion::physical_plan::filter::FilterExec;
use datafusion::physical_plan::limit::LocalLimitExec;
use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder, MetricsSet};
use datafusion::physical_plan::{
DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, SendableRecordBatchStream,
Statistics,
Expand Down Expand Up @@ -535,28 +536,38 @@ impl<'a> DeltaScanBuilder<'a> {
.map(|expr| context.create_physical_expr(expr, &df_schema).unwrap());

// Perform Pruning of files to scan
let files = match self.files {
Some(files) => files.to_owned(),
let (files, files_scanned, files_pruned) = match self.files {
Some(files) => {
let files = files.to_owned();
let files_scanned = files.len();
(files, files_scanned, 0)
}
None => {
if let Some(predicate) = &logical_filter {
let pruning_predicate =
PruningPredicate::try_new(predicate.clone(), logical_schema.clone())?;
let files_to_prune = pruning_predicate.prune(self.snapshot)?;
self.snapshot
let mut files_pruned = 0usize;
let files = self
.snapshot
.file_actions_iter()?
.zip(files_to_prune.into_iter())
.filter_map(
|(action, keep)| {
if keep {
Some(action.to_owned())
} else {
None
}
},
)
.collect()
.filter_map(|(action, keep)| {
if keep {
Some(action.to_owned())
} else {
files_pruned += 1;
None
}
})
.collect::<Vec<_>>();

let files_scanned = files.len();
(files, files_scanned, files_pruned)
} else {
self.snapshot.file_actions()?
let files = self.snapshot.file_actions()?;
let files_scanned = files.len();
(files, files_scanned, 0)
}
}
};
Expand Down Expand Up @@ -644,11 +655,20 @@ impl<'a> DeltaScanBuilder<'a> {
)
.await?;

let metrics = ExecutionPlanMetricsSet::new();
MetricBuilder::new(&metrics)
.global_counter("files_scanned")
.add(files_scanned);
MetricBuilder::new(&metrics)
.global_counter("files_pruned")
.add(files_pruned);

Ok(DeltaScan {
table_uri: ensure_table_uri(self.log_store.root_uri())?.as_str().into(),
parquet_scan: scan,
config,
logical_schema,
metrics,
})
}
}
Expand Down Expand Up @@ -802,6 +822,8 @@ pub struct DeltaScan {
pub parquet_scan: Arc<dyn ExecutionPlan>,
/// The schema of the table to be used when evaluating expressions
pub logical_schema: Arc<ArrowSchema>,
/// Metrics for scan reported via DataFusion
metrics: ExecutionPlanMetricsSet,
}

#[derive(Debug, Serialize, Deserialize)]
Expand Down Expand Up @@ -849,6 +871,7 @@ impl ExecutionPlan for DeltaScan {
config: self.config.clone(),
parquet_scan: children[0].clone(),
logical_schema: self.logical_schema.clone(),
metrics: self.metrics.clone(),
}))
}

Expand All @@ -860,6 +883,10 @@ impl ExecutionPlan for DeltaScan {
self.parquet_scan.execute(partition, context)
}

fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}

fn statistics(&self) -> DataFusionResult<Statistics> {
self.parquet_scan.statistics()
}
Expand All @@ -875,6 +902,7 @@ impl ExecutionPlan for DeltaScan {
config: self.config.clone(),
parquet_scan,
logical_schema: self.logical_schema.clone(),
metrics: self.metrics.clone(),
})))
} else {
Ok(None)
Expand Down Expand Up @@ -1236,6 +1264,7 @@ impl PhysicalExtensionCodec for DeltaPhysicalCodec {
parquet_scan: (*inputs)[0].clone(),
config: wire.config,
logical_schema: wire.logical_schema,
metrics: ExecutionPlanMetricsSet::new(),
};
Ok(Arc::new(delta_scan))
}
Expand Down Expand Up @@ -1928,6 +1957,7 @@ mod tests {
parquet_scan: Arc::from(EmptyExec::new(schema.clone())),
config: DeltaScanConfig::default(),
logical_schema: schema.clone(),
metrics: ExecutionPlanMetricsSet::new(),
});
let proto: protobuf::PhysicalPlanNode =
protobuf::PhysicalPlanNode::try_from_physical_plan(exec_plan.clone(), &codec)
Expand Down
41 changes: 41 additions & 0 deletions crates/core/tests/integration_datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ mod local {
#[derive(Debug, Default)]
pub struct ExecutionMetricsCollector {
scanned_files: HashSet<Label>,
pub skip_count: usize,
pub keep_count: usize,
}

impl ExecutionMetricsCollector {
Expand All @@ -83,6 +85,15 @@ mod local {
if let Some(exec) = plan.as_any().downcast_ref::<ParquetExec>() {
let files = get_scanned_files(exec);
self.scanned_files.extend(files);
} else if let Some(exec) = plan.as_any().downcast_ref::<DeltaScan>() {
self.keep_count = exec
.metrics()
.and_then(|m| m.sum_by_name("files_scanned").map(|v| v.as_usize()))
.unwrap_or_default();
self.skip_count = exec
.metrics()
.and_then(|m| m.sum_by_name("files_pruned").map(|v| v.as_usize()))
.unwrap_or_default();
}
Ok(true)
}
Expand Down Expand Up @@ -440,6 +451,10 @@ mod local {
let task_ctx = Arc::new(TaskContext::from(state));
let _result = collect(plan.execute(0, task_ctx)?).await?;
visit_execution_plan(&plan, &mut metrics).unwrap();
} else {
// if scan produces no output from ParquetExec, we still want to visit DeltaScan
// to check its metrics
visit_execution_plan(scan.as_ref(), &mut metrics).unwrap();
}

Ok(metrics)
Expand Down Expand Up @@ -616,6 +631,8 @@ mod local {

let metrics = get_scan_metrics(&table, &state, &[]).await?;
assert_eq!(metrics.num_scanned_files(), 3);
assert_eq!(metrics.num_scanned_files(), metrics.keep_count);
assert_eq!(metrics.skip_count, 0);

// (Column name, value from file 1, value from file 2, value from file 3, non existent value)
let tests = [
Expand Down Expand Up @@ -662,25 +679,33 @@ mod local {
let e = col(column).eq(file1_value.clone());
let metrics = get_scan_metrics(&table, &state, &[e]).await?;
assert_eq!(metrics.num_scanned_files(), 1);
assert_eq!(metrics.num_scanned_files(), metrics.keep_count);
assert_eq!(metrics.skip_count, 2);

// Value does not exist
let e = col(column).eq(non_existent_value.clone());
let metrics = get_scan_metrics(&table, &state, &[e]).await?;
assert_eq!(metrics.num_scanned_files(), 0);
assert_eq!(metrics.num_scanned_files(), metrics.keep_count);
assert_eq!(metrics.skip_count, 3);

// Conjunction
let e = col(column)
.gt(file1_value.clone())
.and(col(column).lt(file2_value.clone()));
let metrics = get_scan_metrics(&table, &state, &[e]).await?;
assert_eq!(metrics.num_scanned_files(), 2);
assert_eq!(metrics.num_scanned_files(), metrics.keep_count);
assert_eq!(metrics.skip_count, 1);

// Disjunction
let e = col(column)
.lt(file1_value.clone())
.or(col(column).gt(file3_value.clone()));
let metrics = get_scan_metrics(&table, &state, &[e]).await?;
assert_eq!(metrics.num_scanned_files(), 2);
assert_eq!(metrics.num_scanned_files(), metrics.keep_count);
assert_eq!(metrics.skip_count, 1);
}

// Validate Boolean type
Expand All @@ -692,10 +717,14 @@ mod local {
let e = col("boolean").eq(lit(true));
let metrics = get_scan_metrics(&table, &state, &[e]).await?;
assert_eq!(metrics.num_scanned_files(), 1);
assert_eq!(metrics.num_scanned_files(), metrics.keep_count);
assert_eq!(metrics.skip_count, 1);

let e = col("boolean").eq(lit(false));
let metrics = get_scan_metrics(&table, &state, &[e]).await?;
assert_eq!(metrics.num_scanned_files(), 1);
assert_eq!(metrics.num_scanned_files(), metrics.keep_count);
assert_eq!(metrics.skip_count, 1);

let tests = [
TestCase::new_wrapped("utf8", |value| lit(value.to_string())),
Expand Down Expand Up @@ -762,23 +791,31 @@ mod local {
let e = col(column).eq(file1_value.clone());
let metrics = get_scan_metrics(&table, &state, &[e]).await?;
assert_eq!(metrics.num_scanned_files(), 1);
assert_eq!(metrics.num_scanned_files(), metrics.keep_count);
assert_eq!(metrics.skip_count, 8);

// Value does not exist
let e = col(column).eq(non_existent_value);
let metrics = get_scan_metrics(&table, &state, &[e]).await?;
assert_eq!(metrics.num_scanned_files(), 0);
assert_eq!(metrics.num_scanned_files(), metrics.keep_count);
assert_eq!(metrics.skip_count, 9);

// Conjunction
let e = col(column)
.gt(file1_value.clone())
.and(col(column).lt(file2_value));
let metrics = get_scan_metrics(&table, &state, &[e]).await?;
assert_eq!(metrics.num_scanned_files(), 2);
assert_eq!(metrics.num_scanned_files(), metrics.keep_count);
assert_eq!(metrics.skip_count, 7);

// Disjunction
let e = col(column).lt(file1_value).or(col(column).gt(file3_value));
let metrics = get_scan_metrics(&table, &state, &[e]).await?;
assert_eq!(metrics.num_scanned_files(), 2);
assert_eq!(metrics.num_scanned_files(), metrics.keep_count);
assert_eq!(metrics.skip_count, 7);

// TODO how to get an expression with the right datatypes eludes me ..
// Validate null pruning
Expand Down Expand Up @@ -808,10 +845,14 @@ mod local {
let e = col("boolean").eq(lit(true));
let metrics = get_scan_metrics(&table, &state, &[e]).await?;
assert_eq!(metrics.num_scanned_files(), 1);
assert_eq!(metrics.num_scanned_files(), metrics.keep_count);
assert_eq!(metrics.skip_count, 1);

let e = col("boolean").eq(lit(false));
let metrics = get_scan_metrics(&table, &state, &[e]).await?;
assert_eq!(metrics.num_scanned_files(), 1);
assert_eq!(metrics.num_scanned_files(), metrics.keep_count);
assert_eq!(metrics.skip_count, 1);

// Ensure that tables without stats and partition columns can be pruned for just partitions
// let table = open_table("./tests/data/delta-0.8.0-null-partition").await?;
Expand Down
Loading