Skip to content

Commit

Permalink
Add max and min values to Datafusion Statistics (#327)
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya authored Jul 19, 2021
1 parent 2748a4e commit 7e6da30
Show file tree
Hide file tree
Showing 2 changed files with 148 additions and 13 deletions.
116 changes: 103 additions & 13 deletions rust/src/delta_datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ use datafusion::datasource::TableProvider;
use datafusion::logical_plan::{combine_filters, Expr};
use datafusion::physical_plan::parquet::{ParquetExec, ParquetPartition, RowGroupPredicateBuilder};
use datafusion::physical_plan::ExecutionPlan;
use datafusion::scalar::ScalarValue;

use crate::action::ColumnCountStat;
use crate::delta;
use crate::schema;

Expand Down Expand Up @@ -74,10 +74,16 @@ impl TableProvider for delta::DeltaTable {
null_count: statistics
.null_count
.get(field.get_name())
.and_then(stat_to_val),
max_value: None, // TODO: max/min/distinct
min_value: None,
distinct_count: None,
.and_then(|f| f.as_value().map(|v| v as usize)),
max_value: statistics
.max_values
.get(field.get_name())
.and_then(|f| to_scalar_value(f.as_value()?)),
min_value: statistics
.min_values
.get(field.get_name())
.and_then(|f| to_scalar_value(f.as_value()?)),
distinct_count: None, // TODO: distinct
})
.collect(),
),
Expand Down Expand Up @@ -147,12 +153,52 @@ impl TableProvider for delta::DeltaTable {
.get(field.get_name())
.and_then(|x| {
let null_count_acc = stats.null_count?;
let null_count = stat_to_val(x)?;
let null_count = x.as_value()? as usize;
Some(null_count_acc + null_count)
}),
max_value: None,
min_value: None,
distinct_count: None,
max_value: new_stats.max_values.get(field.get_name()).and_then(
|x| {
let old_stats = stats.clone();
let max_value = to_scalar_value(x.as_value()?);

match (max_value, old_stats.max_value) {
(Some(max_value), Some(old_max_value)) => {
if left_larger_than_right(
old_max_value.clone(),
max_value.clone(),
) {
Some(old_max_value)
} else {
Some(max_value)
}
}
(Some(max_value), None) => Some(max_value),
(None, old) => old,
}
},
),
min_value: new_stats.min_values.get(field.get_name()).and_then(
|x| {
let old_stats = stats.clone();
let min_value = to_scalar_value(x.as_value()?);

match (min_value, old_stats.min_value) {
(Some(min_value), Some(old_min_value)) => {
if left_larger_than_right(
min_value.clone(),
old_min_value.clone(),
) {
Some(old_min_value)
} else {
Some(min_value)
}
}
(Some(min_value), None) => Some(min_value),
(None, old) => old,
}
},
),
distinct_count: None, // TODO: distinct
})
.collect()
}),
Expand All @@ -163,9 +209,53 @@ impl TableProvider for delta::DeltaTable {
}
}

fn stat_to_val(stat: &ColumnCountStat) -> Option<usize> {
match stat {
ColumnCountStat::Value(val) => Some(*val as usize),
ColumnCountStat::Column(_) => None,
fn to_scalar_value(stat_val: &serde_json::Value) -> Option<datafusion::scalar::ScalarValue> {
if stat_val.is_number() {
if let Some(val) = stat_val.as_i64() {
Some(ScalarValue::from(val))
} else if let Some(val) = stat_val.as_u64() {
Some(ScalarValue::from(val))
} else {
stat_val.as_f64().map(ScalarValue::from)
}
} else {
None
}
}

fn left_larger_than_right(
left: datafusion::scalar::ScalarValue,
right: datafusion::scalar::ScalarValue,
) -> bool {
match left {
ScalarValue::Float64(Some(v)) => {
let f_right = f64::try_from(right).unwrap();
v > f_right
}
ScalarValue::Float32(Some(v)) => {
let f_right = f32::try_from(right).unwrap();
v > f_right
}
ScalarValue::Int8(Some(v)) => {
let i_right = i8::try_from(right).unwrap();
v > i_right
}
ScalarValue::Int16(Some(v)) => {
let i_right = i16::try_from(right).unwrap();
v > i_right
}
ScalarValue::Int32(Some(v)) => {
let i_right = i32::try_from(right).unwrap();
v > i_right
}
ScalarValue::Int64(Some(v)) => {
let i_right = i64::try_from(right).unwrap();
v > i_right
}
_ => unimplemented!(
"Scalar value comparison unimplemented for {:?} and {:?}",
left,
right
),
}
}
45 changes: 45 additions & 0 deletions rust/tests/datafusion_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ mod datafusion {
use datafusion::datasource::TableProvider;
use datafusion::error::Result;
use datafusion::execution::context::ExecutionContext;
use datafusion::scalar::ScalarValue;

#[tokio::test]
async fn test_datafusion_simple_query() -> Result<()> {
Expand Down Expand Up @@ -66,12 +67,56 @@ mod datafusion {
assert_eq!(
statistics
.column_statistics
.clone()
.unwrap()
.iter()
.map(|x| x.null_count)
.collect::<Vec<Option<usize>>>(),
vec![Some(0)],
);

let mut ctx = ExecutionContext::new();
ctx.register_table("test_table", Arc::new(table))?;

let batches = ctx
.sql("SELECT max(value), min(value) FROM test_table")?
.collect()
.await?;

assert_eq!(batches.len(), 1);
let batch = &batches[0];
assert_eq!(
batch.column(0).as_ref(),
Arc::new(Int32Array::from(vec![4])).as_ref(),
);

assert_eq!(
batch.column(1).as_ref(),
Arc::new(Int32Array::from(vec![0])).as_ref(),
);

assert_eq!(
statistics
.column_statistics
.clone()
.unwrap()
.iter()
.map(|x| x.max_value.as_ref())
.collect::<Vec<Option<&ScalarValue>>>(),
vec![Some(&ScalarValue::from(4 as i64))],
);

assert_eq!(
statistics
.column_statistics
.clone()
.unwrap()
.iter()
.map(|x| x.min_value.as_ref())
.collect::<Vec<Option<&ScalarValue>>>(),
vec![Some(&ScalarValue::from(0 as i64))],
);

Ok(())
}
}

0 comments on commit 7e6da30

Please sign in to comment.