Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add benchmark for planning sorted unions #14157

Merged
merged 1 commit into from
Jan 19, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 84 additions & 0 deletions datafusion/core/benches/sql_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@ mod data_utils;

use crate::criterion::Criterion;
use arrow::datatypes::{DataType, Field, Fields, Schema};
use arrow_array::{ArrayRef, RecordBatch};
use criterion::Bencher;
use datafusion::datasource::MemTable;
use datafusion::execution::context::SessionContext;
use datafusion_common::ScalarValue;
use datafusion_expr::col;
use itertools::Itertools;
use std::fs::File;
use std::io::{BufRead, BufReader};
Expand Down Expand Up @@ -147,6 +149,77 @@ fn benchmark_with_param_values_many_columns(ctx: &SessionContext, b: &mut Benche
});
}

/// Registers a table like this:
/// c0,c1,c2...,c99
/// 0,100...9900
/// 0,200...19800
/// 0,300...29700
fn register_union_order_table(ctx: &SessionContext, num_columns: usize, num_rows: usize) {
// ("c0", [0, 0, ...])
// ("c1": [100, 200, ...])
// etc
let iter = (0..num_columns).map(|i| i as u64).map(|i| {
let array: ArrayRef = Arc::new(arrow::array::UInt64Array::from_iter_values(
(0..num_rows)
.map(|j| j as u64 * 100 + i)
.collect::<Vec<_>>(),
));
(format!("c{}", i), array)
});
let batch = RecordBatch::try_from_iter(iter).unwrap();
let schema = batch.schema();
let partitions = vec![vec![batch]];

// tell DataFusion that the table is sorted by all columns
let sort_order = (0..num_columns)
.map(|i| col(format!("c{}", i)).sort(true, true))
.collect::<Vec<_>>();

// create the table
let table = MemTable::try_new(schema, partitions)
.unwrap()
.with_sort_order(vec![sort_order]);

ctx.register_table("t", Arc::new(table)).unwrap();
}

/// return a query like
/// ```sql
/// select c1, null as c2, ... null as cn from t ORDER BY c1
/// UNION ALL
/// select null as c1, c2, ... null as cn from t ORDER BY c2
/// ...
/// select null as c1, null as c2, ... cn from t ORDER BY cn
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we really need inner ORDER BY if the query got the outer one? 🤔 Shouldn't be inner sorting ignored?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I probably get it, the problem is with the planning of such query not the execution

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I probably get it, the problem is with the planning of such query not the execution

Yes, exactly

do we really need inner ORDER BY if the query got the outer one? 🤔 Shouldn't be inner sorting ignored?

Yes, indeed. I think the way it is ignored is that the sort equivalence code determines that the inner sorts aren't needed (or in this case they are all equivalent, so the top order by can a merge rather than sort)

The sort equivalence code (OrderEquivalenceProperties in particular) is what is consuming all this time

/// ORDER BY c1, c2 ... CN
/// ```
fn union_orderby_query(n: usize) -> String {
let mut query = String::new();
for i in 0..n {
if i != 0 {
query.push_str("\n UNION ALL \n");
}
let select_list = (0..n)
.map(|j| {
if i == j {
format!("c{j}")
} else {
format!("null as c{j}")
}
})
.collect::<Vec<_>>()
.join(", ");
query.push_str(&format!("(SELECT {} FROM t ORDER BY c{})", select_list, i));
}
query.push_str(&format!(
"\nORDER BY {}",
(0..n)
.map(|i| format!("c{}", i))
.collect::<Vec<_>>()
.join(", ")
));
query
}

fn criterion_benchmark(c: &mut Criterion) {
// verify that we can load the clickbench data prior to running the benchmark
if !PathBuf::from(format!("{BENCHMARKS_PATH_1}{CLICKBENCH_DATA_PATH}")).exists()
Expand Down Expand Up @@ -289,6 +362,17 @@ fn criterion_benchmark(c: &mut Criterion) {
});
});

// -- Sorted Queries --
register_union_order_table(&ctx, 100, 1000);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The table has 100 columns


// this query has many expressions in its sort order so stresses
// order equivalence validation
c.bench_function("physical_sorted_union_orderby", |b| {
// SELECT ... UNION ALL ...
let query = union_orderby_query(20);
b.iter(|| physical_plan(&ctx, &query))
});

// --- TPC-H ---

let tpch_ctx = register_defs(SessionContext::new(), tpch_schemas());
Expand Down
Loading