Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable datafusion.optimizer.filter_null_join_keys by default #12369

Closed
wants to merge 23 commits into from
Closed
2 changes: 1 addition & 1 deletion datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,7 @@ config_namespace! {
/// a nullable and non-nullable column to filter out nulls on the nullable side. This
/// filter can add additional overhead when the file format does not fully support
/// predicate push down.
pub filter_null_join_keys: bool, default = false
pub filter_null_join_keys: bool, default = true

/// Should DataFusion repartition data using the aggregate keys to execute aggregates
/// in parallel using the provided `target_partitions` level
Expand Down
40 changes: 39 additions & 1 deletion datafusion/core/benches/sql_query_with_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use rand::{rngs::StdRng, Rng, SeedableRng};
use tokio::runtime::Runtime;
use url::Url;

const THREADS: usize = 4;
const THREADS: usize = 10;
const TABLES: usize = 3;
const TABLE_PARTITIONS: usize = 10;
const PARTITION_FILES: usize = 2;
Expand All @@ -58,22 +58,31 @@ fn schema() -> SchemaRef {
Arc::new(Schema::new(vec![
Field::new("id", DataType::UInt64, false),
Field::new("payload", DataType::Int64, false),
Field::new("optional_id", DataType::UInt64, true),
]))
}

fn create_parquet_file(rng: &mut StdRng, id_offset: usize) -> Bytes {
let schema = schema();
let mut id_builder = UInt64Builder::new();
let mut payload_builder = Int64Builder::new();
let mut optional_id_builder = UInt64Builder::new();

for row in 0..FILE_ROWS {
id_builder.append_value((row + id_offset) as u64);
payload_builder.append_value(rng.gen());
if row % 2 == 0 {
optional_id_builder.append_null();
} else {
optional_id_builder.append_value((row + id_offset) as u64);
}
}
let batch = RecordBatch::try_new(
Arc::clone(&schema),
vec![
Arc::new(id_builder.finish()),
Arc::new(payload_builder.finish()),
Arc::new(optional_id_builder.finish()),
],
)
.unwrap();
Expand Down Expand Up @@ -256,6 +265,35 @@ fn criterion_benchmark(c: &mut Criterion) {
&format!("{join_query} WHERE {table0_name}.partition = 0"),
PARTITION_FILES * FILE_ROWS,
);

let mut join_query = "SELECT * FROM".to_owned();
for table_id in 0..TABLES {
let table_name = table_name(table_id);
if table_id == 0 {
write!(join_query, " {table_name}").unwrap();
} else {
write!(
join_query,
" INNER JOIN {table_name} on {table_name}.optional_id = {table0_name}.id AND {table_name}.partition = {table0_name}.partition",
).unwrap();
}
}
bench_query(
c,
&ctx,
&rt,
"IO: INNER JOIN (nullable), all tables, all partitions",
&join_query,
TABLE_PARTITIONS * PARTITION_FILES * FILE_ROWS / 2,
);
bench_query(
c,
&ctx,
&rt,
"IO: INNER JOIN (nullable), all tables, single partition",
&format!("{join_query} WHERE {table0_name}.partition = 0"),
PARTITION_FILES * FILE_ROWS / 2,
);
}

criterion_group!(benches, criterion_benchmark);
Expand Down
4 changes: 3 additions & 1 deletion datafusion/optimizer/src/filter_null_join_keys.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ impl OptimizerRule for FilterNullJoinKeys {
return Ok(Transformed::no(plan));
}
match plan {
LogicalPlan::Join(mut join) if !join.on.is_empty() => {
LogicalPlan::Join(mut join)
if !join.on.is_empty() && !join.null_equals_null =>
{
let (left_preserved, right_preserved) =
on_lr_is_preserved(join.join_type);

Expand Down
21 changes: 8 additions & 13 deletions datafusion/optimizer/tests/optimizer_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,15 +177,12 @@ fn intersect() -> Result<()> {
let plan = test_sql(sql)?;
let expected =
"LeftSemi Join: test.col_int32 = test.col_int32, test.col_utf8 = test.col_utf8\
\n Aggregate: groupBy=[[test.col_int32, test.col_utf8]], aggr=[[]]\
\n LeftSemi Join: test.col_int32 = test.col_int32, test.col_utf8 = test.col_utf8\
\n Aggregate: groupBy=[[test.col_int32, test.col_utf8]], aggr=[[]]\
\n Filter: test.col_int32 IS NOT NULL AND test.col_utf8 IS NOT NULL\
\n TableScan: test projection=[col_int32, col_utf8]\
\n Filter: test.col_int32 IS NOT NULL AND test.col_utf8 IS NOT NULL\
\n TableScan: test projection=[col_int32, col_utf8]\
\n Filter: test.col_int32 IS NOT NULL AND test.col_utf8 IS NOT NULL\
\n TableScan: test projection=[col_int32, col_utf8]";
\n Aggregate: groupBy=[[test.col_int32, test.col_utf8]], aggr=[[]]\
\n LeftSemi Join: test.col_int32 = test.col_int32, test.col_utf8 = test.col_utf8\
\n Aggregate: groupBy=[[test.col_int32, test.col_utf8]], aggr=[[]]\
\n TableScan: test projection=[col_int32, col_utf8]\
\n TableScan: test projection=[col_int32, col_utf8]\
\n TableScan: test projection=[col_int32, col_utf8]";
assert_eq!(expected, format!("{plan}"));
Ok(())
}
Expand Down Expand Up @@ -281,11 +278,9 @@ fn test_same_name_but_not_ambiguous() {
let expected = "LeftSemi Join: t1.col_int32 = t2.col_int32\
\n Aggregate: groupBy=[[t1.col_int32]], aggr=[[]]\
\n SubqueryAlias: t1\
\n Filter: test.col_int32 IS NOT NULL\
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was actually a regression in https://github.com/apache/datafusion/pull/

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the link seems to be incomplete

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This #12348 , I think I move the fix out of this PR, so it can be reviewed separately

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

\n TableScan: test projection=[col_int32]\
\n TableScan: test projection=[col_int32]\
\n SubqueryAlias: t2\
\n Filter: test.col_int32 IS NOT NULL\
\n TableScan: test projection=[col_int32]";
\n TableScan: test projection=[col_int32]";
assert_eq!(expected, format!("{plan}"));
}

Expand Down
93 changes: 61 additions & 32 deletions datafusion/sqllogictest/test_files/group_by.slt
Original file line number Diff line number Diff line change
Expand Up @@ -2009,9 +2009,11 @@ logical_plan
03)----Aggregate: groupBy=[[l.col0, l.col1, l.col2]], aggr=[[last_value(r.col1) ORDER BY [r.col0 ASC NULLS LAST]]]
04)------Inner Join: l.col0 = r.col0
05)--------SubqueryAlias: l
06)----------TableScan: tab0 projection=[col0, col1, col2]
07)--------SubqueryAlias: r
08)----------TableScan: tab0 projection=[col0, col1]
06)----------Filter: tab0.col0 IS NOT NULL
07)------------TableScan: tab0 projection=[col0, col1, col2]
08)--------SubqueryAlias: r
09)----------Filter: tab0.col0 IS NOT NULL
10)------------TableScan: tab0 projection=[col0, col1]
physical_plan
01)SortPreservingMergeExec: [col0@0 ASC NULLS LAST]
02)--SortExec: expr=[col0@0 ASC NULLS LAST], preserve_partitioning=[true]
Expand All @@ -2020,12 +2022,21 @@ physical_plan
05)--------CoalesceBatchesExec: target_batch_size=8192
06)----------RepartitionExec: partitioning=Hash([col0@0, col1@1, col2@2], 4), input_partitions=4
07)------------AggregateExec: mode=Partial, gby=[col0@0 as col0, col1@1 as col1, col2@2 as col2], aggr=[last_value(r.col1) ORDER BY [r.col0 ASC NULLS LAST]]
08)--------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
09)----------------ProjectionExec: expr=[col0@2 as col0, col1@3 as col1, col2@4 as col2, col0@0 as col0, col1@1 as col1]
10)------------------CoalesceBatchesExec: target_batch_size=8192
11)--------------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(col0@0, col0@0)]
12)----------------------MemoryExec: partitions=1, partition_sizes=[3]
13)----------------------MemoryExec: partitions=1, partition_sizes=[3]
08)--------------ProjectionExec: expr=[col0@2 as col0, col1@3 as col1, col2@4 as col2, col0@0 as col0, col1@1 as col1]
09)----------------CoalesceBatchesExec: target_batch_size=8192
10)------------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(col0@0, col0@0)]
11)--------------------CoalesceBatchesExec: target_batch_size=8192
12)----------------------RepartitionExec: partitioning=Hash([col0@0], 4), input_partitions=4
13)------------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
14)--------------------------CoalesceBatchesExec: target_batch_size=8192
15)----------------------------FilterExec: col0@0 IS NOT NULL
16)------------------------------MemoryExec: partitions=1, partition_sizes=[3]
17)--------------------CoalesceBatchesExec: target_batch_size=8192
18)----------------------RepartitionExec: partitioning=Hash([col0@0], 4), input_partitions=4
19)------------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
20)--------------------------CoalesceBatchesExec: target_batch_size=8192
21)----------------------------FilterExec: col0@0 IS NOT NULL
22)------------------------------MemoryExec: partitions=1, partition_sizes=[3]

# Columns in the table are a,b,c,d. Source is CsvExec which is ordered by
# a,b,c column. Column a has cardinality 2, column b has cardinality 4.
Expand Down Expand Up @@ -2868,18 +2879,24 @@ logical_plan
04)------Projection: s.zip_code, s.country, s.sn, s.ts, s.currency, e.sn, e.amount
05)--------Inner Join: s.currency = e.currency Filter: s.ts >= e.ts
06)----------SubqueryAlias: s
07)------------TableScan: sales_global projection=[zip_code, country, sn, ts, currency]
08)----------SubqueryAlias: e
09)------------TableScan: sales_global projection=[sn, ts, currency, amount]
07)------------Filter: sales_global.currency IS NOT NULL
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Benefits: Input to join is smaller, so smaller input, faster build, no nulls need to be hashed

But in order to skip hashing nulls, the input array would have to be "filtered" (aka copy the matching rows)

lower chance for data skew, other join can be planned, downstream kernels are faster, can possibly be pushed down into scan, etc. In distributed setting, might save a lot of IO as well.

The argument in the distributed setting makes sense to me, but the other ones seem like they are all of the class "faster in some cases but slower in others"

Copy link
Contributor Author

@Dandandan Dandandan Sep 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But in order to skip hashing nulls, the input array would have to be "filtered" (aka copy the matching rows)

Correct, but you save some copying in RepartitionExec / build side concatenate as well, and copying / checking columns of keys in probe side.
In case there aren't any nulls (even if column is nullable), there is no copying happening.

Even with CSV / MemTable in many cases null filter can be combined with existing filter expressions, so no extra copying is happening (less copying in fact as fewer rows need to be copied).

08)--------------TableScan: sales_global projection=[zip_code, country, sn, ts, currency]
09)----------SubqueryAlias: e
10)------------Filter: sales_global.currency IS NOT NULL
11)--------------TableScan: sales_global projection=[sn, ts, currency, amount]
physical_plan
01)SortExec: expr=[sn@2 ASC NULLS LAST], preserve_partitioning=[false]
02)--ProjectionExec: expr=[zip_code@1 as zip_code, country@2 as country, sn@0 as sn, ts@3 as ts, currency@4 as currency, last_value(e.amount) ORDER BY [e.sn ASC NULLS LAST]@5 as last_rate]
03)----AggregateExec: mode=Single, gby=[sn@2 as sn, zip_code@0 as zip_code, country@1 as country, ts@3 as ts, currency@4 as currency], aggr=[last_value(e.amount) ORDER BY [e.sn ASC NULLS LAST]]
04)------ProjectionExec: expr=[zip_code@2 as zip_code, country@3 as country, sn@4 as sn, ts@5 as ts, currency@6 as currency, sn@0 as sn, amount@1 as amount]
05)--------CoalesceBatchesExec: target_batch_size=8192
06)----------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(currency@2, currency@4)], filter=ts@0 >= ts@1, projection=[sn@0, amount@3, zip_code@4, country@5, sn@6, ts@7, currency@8]
07)------------MemoryExec: partitions=1, partition_sizes=[1]
08)------------MemoryExec: partitions=1, partition_sizes=[1]
07)------------CoalesceBatchesExec: target_batch_size=8192
08)--------------FilterExec: currency@2 IS NOT NULL
09)----------------MemoryExec: partitions=1, partition_sizes=[1]
10)------------CoalesceBatchesExec: target_batch_size=8192
11)--------------FilterExec: currency@4 IS NOT NULL
12)----------------MemoryExec: partitions=1, partition_sizes=[1]

query ITIPTR rowsort
SELECT s.zip_code, s.country, s.sn, s.ts, s.currency, LAST_VALUE(e.amount ORDER BY e.sn) AS last_rate
Expand Down Expand Up @@ -3864,20 +3881,26 @@ logical_plan
05)--------Projection: l.a, l.d, row_n
06)----------Inner Join: l.d = r.d Filter: CAST(l.a AS Int64) >= CAST(r.a AS Int64) - Int64(10)
07)------------SubqueryAlias: l
08)--------------TableScan: multiple_ordered_table projection=[a, d]
09)------------Projection: r.a, r.d, row_number() ORDER BY [r.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS row_n
10)--------------WindowAggr: windowExpr=[[row_number() ORDER BY [r.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
11)----------------SubqueryAlias: r
12)------------------TableScan: multiple_ordered_table projection=[a, d]
08)--------------Filter: multiple_ordered_table.d IS NOT NULL
09)----------------TableScan: multiple_ordered_table projection=[a, d], partial_filters=[multiple_ordered_table.d IS NOT NULL]
10)------------Projection: r.a, r.d, row_number() ORDER BY [r.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS row_n
11)--------------Filter: r.d IS NOT NULL
12)----------------WindowAggr: windowExpr=[[row_number() ORDER BY [r.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
13)------------------SubqueryAlias: r
14)--------------------TableScan: multiple_ordered_table projection=[a, d]
physical_plan
01)ProjectionExec: expr=[last_value(l.d) ORDER BY [l.a ASC NULLS LAST]@1 as amount_usd]
02)--AggregateExec: mode=Single, gby=[row_n@2 as row_n], aggr=[last_value(l.d) ORDER BY [l.a ASC NULLS LAST]], ordering_mode=Sorted
03)----CoalesceBatchesExec: target_batch_size=2
04)------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(d@1, d@1)], filter=CAST(a@0 AS Int64) >= CAST(a@1 AS Int64) - 10, projection=[a@0, d@1, row_n@4]
05)--------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true
06)--------ProjectionExec: expr=[a@0 as a, d@1 as d, row_number() ORDER BY [r.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as row_n]
07)----------BoundedWindowAggExec: wdw=[row_number() ORDER BY [r.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: "row_number() ORDER BY [r.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(NULL)), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]
08)------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true
05)--------CoalesceBatchesExec: target_batch_size=2
06)----------FilterExec: d@1 IS NOT NULL
07)------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true
08)--------ProjectionExec: expr=[a@0 as a, d@1 as d, row_number() ORDER BY [r.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as row_n]
09)----------CoalesceBatchesExec: target_batch_size=2
10)------------FilterExec: d@1 IS NOT NULL
11)--------------BoundedWindowAggExec: wdw=[row_number() ORDER BY [r.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: "row_number() ORDER BY [r.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(NULL)), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]
12)----------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true

# reset partition number to 8.
statement ok
Expand Down Expand Up @@ -4021,21 +4044,27 @@ logical_plan
03)----SubqueryAlias: lhs
04)------Projection: multiple_ordered_table_with_pk.c, multiple_ordered_table_with_pk.b, sum(multiple_ordered_table_with_pk.d) AS sum1
05)--------Aggregate: groupBy=[[multiple_ordered_table_with_pk.c, multiple_ordered_table_with_pk.b]], aggr=[[sum(CAST(multiple_ordered_table_with_pk.d AS Int64))]]
06)----------TableScan: multiple_ordered_table_with_pk projection=[b, c, d]
07)----SubqueryAlias: rhs
08)------Projection: multiple_ordered_table_with_pk.c, multiple_ordered_table_with_pk.b, sum(multiple_ordered_table_with_pk.d) AS sum1
09)--------Aggregate: groupBy=[[multiple_ordered_table_with_pk.c, multiple_ordered_table_with_pk.b]], aggr=[[sum(CAST(multiple_ordered_table_with_pk.d AS Int64))]]
10)----------TableScan: multiple_ordered_table_with_pk projection=[b, c, d]
06)----------Filter: multiple_ordered_table_with_pk.b IS NOT NULL
07)------------TableScan: multiple_ordered_table_with_pk projection=[b, c, d], partial_filters=[multiple_ordered_table_with_pk.b IS NOT NULL]
08)----SubqueryAlias: rhs
09)------Projection: multiple_ordered_table_with_pk.c, multiple_ordered_table_with_pk.b, sum(multiple_ordered_table_with_pk.d) AS sum1
10)--------Aggregate: groupBy=[[multiple_ordered_table_with_pk.c, multiple_ordered_table_with_pk.b]], aggr=[[sum(CAST(multiple_ordered_table_with_pk.d AS Int64))]]
11)----------Filter: multiple_ordered_table_with_pk.b IS NOT NULL
12)------------TableScan: multiple_ordered_table_with_pk projection=[b, c, d], partial_filters=[multiple_ordered_table_with_pk.b IS NOT NULL]
physical_plan
01)ProjectionExec: expr=[c@0 as c, c@2 as c, sum1@1 as sum1, sum1@3 as sum1]
02)--CoalesceBatchesExec: target_batch_size=2
03)----HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(b@1, b@1)], projection=[c@0, sum1@2, c@3, sum1@5]
04)------ProjectionExec: expr=[c@0 as c, b@1 as b, sum(multiple_ordered_table_with_pk.d)@2 as sum1]
05)--------AggregateExec: mode=Single, gby=[c@1 as c, b@0 as b], aggr=[sum(multiple_ordered_table_with_pk.d)], ordering_mode=PartiallySorted([0])
06)----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[b, c, d], output_ordering=[c@1 ASC NULLS LAST], has_header=true
07)------ProjectionExec: expr=[c@0 as c, b@1 as b, sum(multiple_ordered_table_with_pk.d)@2 as sum1]
08)--------AggregateExec: mode=Single, gby=[c@1 as c, b@0 as b], aggr=[sum(multiple_ordered_table_with_pk.d)], ordering_mode=PartiallySorted([0])
09)----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[b, c, d], output_ordering=[c@1 ASC NULLS LAST], has_header=true
06)----------CoalesceBatchesExec: target_batch_size=2
07)------------FilterExec: b@0 IS NOT NULL
08)--------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[b, c, d], output_ordering=[c@1 ASC NULLS LAST], has_header=true
09)------ProjectionExec: expr=[c@0 as c, b@1 as b, sum(multiple_ordered_table_with_pk.d)@2 as sum1]
10)--------AggregateExec: mode=Single, gby=[c@1 as c, b@0 as b], aggr=[sum(multiple_ordered_table_with_pk.d)], ordering_mode=PartiallySorted([0])
11)----------CoalesceBatchesExec: target_batch_size=2
12)------------FilterExec: b@0 IS NOT NULL
13)--------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[b, c, d], output_ordering=[c@1 ASC NULLS LAST], has_header=true

query TT
EXPLAIN SELECT lhs.c, rhs.c, lhs.sum1, rhs.sum1
Expand Down
Loading