Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: partition table query optimize #1594

Merged
merged 8 commits into from
Nov 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -99,24 +99,24 @@ UInt64(16367588166920223437),Timestamp(1651737067000),String("horaedb9"),Int32(0
-- SQLNESS REPLACE duration=\d+.?\d*(µ|m|n) duration=xx
-- SQLNESS REPLACE compute=\d+.?\d*(µ|m|n) compute=xx
-- SQLNESS REPLACE time=\d+.?\d*(µ|m|n) time=xx
-- SQLNESS REPLACE metrics=\[.*?s\] metrics=xx
-- SQLNESS REPLACE elapsed_compute=\d+.?\d*(µ|m|n) elapsed_compute=xx
-- SQLNESS REPLACE scan_memtable_\d+ scan_memtable_n
EXPLAIN ANALYZE SELECT * from partition_table_t where name = "ceresdb0";

plan_type,plan,
String("Plan with Metrics"),String("ResolvedPartitionedScan: pushdown_continue:false, partition_count:1, metrics=xx\n ScanTable: table=__partition_table_t_1, parallelism=8, priority=Low, partition_count=UnknownPartitioning(8), metrics=[\nPredicate { exprs:[name = Utf8(\"ceresdb0\")], time_range:TimeRange { inclusive_start: Timestamp(-9223372036854775808), exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n do_merge_sort=true\n iter_num=1\n merge_iter_0:\n init_duration=xxs\n num_memtables=0\n num_ssts=0\n scan_count=1\n scan_duration=xxs\n times_fetch_row_from_multiple=0\n times_fetch_rows_from_one=0\n total_rows_fetch_from_one=0\n scan_memtable_n, fetched_columns:[tsid,t,name,id,value]:\n=0]\n=0]\n"),
String("Plan with Metrics"),String("ResolvedPartitionedScan: pushdown_continue:false, partition_count:1, metrics=[\npartition_table_t:\n __partition_table_t_1:\n poll_duration=xxs\n total_duration=xxs\n wait_duration=xxs\n\n__partition_table_t_1:\nCoalescePartitionsExec, metrics=[output_rows=0, elapsed_compute=xxs]\n ScanTable: table=__partition_table_t_1, parallelism=8, priority=Low, partition_count=UnknownPartitioning(8), metrics=[\nPredicate { exprs:[name = Utf8(\"ceresdb0\")], time_range:TimeRange { inclusive_start: Timestamp(-9223372036854775808), exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n do_merge_sort=true\n iter_num=1\n merge_iter_0:\n init_duration=xxs\n num_memtables=0\n num_ssts=0\n scan_count=1\n scan_duration=xxs\n times_fetch_row_from_multiple=0\n times_fetch_rows_from_one=0\n total_rows_fetch_from_one=0\n scan_memtable_n, fetched_columns:[tsid,t,name,id,value]:\n=0]\n=0]\n"),


-- SQLNESS REPLACE duration=\d+.?\d*(µ|m|n) duration=xx
-- SQLNESS REPLACE compute=\d+.?\d*(µ|m|n) compute=xx
-- SQLNESS REPLACE __partition_table_t_\d __partition_table_t_x
-- SQLNESS REPLACE time=\d+.?\d*(µ|m|n) time=xx
-- SQLNESS REPLACE metrics=\[.*?s\] metrics=xx
-- SQLNESS REPLACE elapsed_compute=\d+.?\d*(µ|m|n) elapsed_compute=xx
-- SQLNESS REPLACE scan_memtable_\d+ scan_memtable_n
EXPLAIN ANALYZE SELECT * from partition_table_t where name in ("ceresdb0", "ceresdb1", "ceresdb2", "ceresdb3", "ceresdb4");

plan_type,plan,
String("Plan with Metrics"),String("ResolvedPartitionedScan: pushdown_continue:false, partition_count:3, metrics=xx\n ScanTable: table=__partition_table_t_x, parallelism=8, priority=Low, partition_count=UnknownPartitioning(8), metrics=xx\n ScanTable: table=__partition_table_t_x, parallelism=8, priority=Low, partition_count=UnknownPartitioning(8), metrics=xx\n ScanTable: table=__partition_table_t_x, parallelism=8, priority=Low, partition_count=UnknownPartitioning(8), metrics=[\nPredicate { exprs:[name IN ([Utf8(\"ceresdb0\"), Utf8(\"ceresdb1\"), Utf8(\"ceresdb2\"), Utf8(\"ceresdb3\"), Utf8(\"ceresdb4\")])], time_range:TimeRange { inclusive_start: Timestamp(-9223372036854775808), exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n do_merge_sort=true\n iter_num=1\n merge_iter_0:\n init_duration=xxs\n num_memtables=0\n num_ssts=0\n scan_count=1\n scan_duration=xxs\n times_fetch_row_from_multiple=0\n times_fetch_rows_from_one=0\n total_rows_fetch_from_one=0\n scan_memtable_n, fetched_columns:[tsid,t,name,id,value]:\n=0]\n=0]\n"),
String("Plan with Metrics"),String("ResolvedPartitionedScan: pushdown_continue:false, partition_count:3, metrics=[\npartition_table_t:\n __partition_table_t_x:\n poll_duration=xxs\n total_duration=xxs\n wait_duration=xxs\n __partition_table_t_x:\n poll_duration=xxs\n total_duration=xxs\n wait_duration=xxs\n __partition_table_t_x:\n poll_duration=xxs\n total_duration=xxs\n wait_duration=xxs\n\n__partition_table_t_x:\nCoalescePartitionsExec, metrics=[output_rows=0, elapsed_compute=xxs]\n ScanTable: table=__partition_table_t_x, parallelism=8, priority=Low, partition_count=UnknownPartitioning(8), metrics=[\nPredicate { exprs:[name IN ([Utf8(\"ceresdb2\")])], time_range:TimeRange { inclusive_start: Timestamp(-9223372036854775808), exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n do_merge_sort=true\n iter_num=1\n merge_iter_0:\n init_duration=xxs\n num_memtables=0\n num_ssts=0\n scan_count=1\n scan_duration=xxs\n times_fetch_row_from_multiple=0\n times_fetch_rows_from_one=0\n total_rows_fetch_from_one=0\n scan_memtable_n, fetched_columns:[tsid,t,name,id,value]:\n=0]\n\n__partition_table_t_x:\nCoalescePartitionsExec, metrics=[output_rows=0, elapsed_compute=xxs]\n ScanTable: table=__partition_table_t_x, parallelism=8, priority=Low, partition_count=UnknownPartitioning(8), metrics=[\nPredicate { exprs:[name IN ([Utf8(\"ceresdb0\"), Utf8(\"ceresdb4\")])], time_range:TimeRange { inclusive_start: Timestamp(-9223372036854775808), exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n do_merge_sort=true\n iter_num=1\n merge_iter_0:\n init_duration=xxs\n num_memtables=0\n num_ssts=0\n scan_count=1\n scan_duration=xxs\n times_fetch_row_from_multiple=0\n times_fetch_rows_from_one=0\n total_rows_fetch_from_one=0\n scan_memtable_n, fetched_columns:[tsid,t,name,id,value]:\n=0]\n\n__partition_table_t_x:\nCoalescePartitionsExec, metrics=[output_rows=0, elapsed_compute=xxs]\n ScanTable: table=__partition_table_t_x, parallelism=8, priority=Low, partition_count=UnknownPartitioning(8), metrics=[\nPredicate { exprs:[name IN ([Utf8(\"ceresdb1\"), Utf8(\"ceresdb3\")])], time_range:TimeRange { inclusive_start: Timestamp(-9223372036854775808), exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n do_merge_sort=true\n iter_num=1\n merge_iter_0:\n init_duration=xxs\n num_memtables=0\n num_ssts=0\n scan_count=1\n scan_duration=xxs\n times_fetch_row_from_multiple=0\n times_fetch_rows_from_one=0\n total_rows_fetch_from_one=0\n scan_memtable_n, fetched_columns:[tsid,t,name,id,value]:\n=0]\n=0]\n"),


ALTER TABLE partition_table_t ADD COLUMN (b string);
Expand Down
4 changes: 2 additions & 2 deletions integration_tests/cases/env/cluster/ddl/partition_table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,15 @@ SELECT * from partition_table_t where name in ("horaedb5", "horaedb6", "horaedb7
-- SQLNESS REPLACE duration=\d+.?\d*(µ|m|n) duration=xx
-- SQLNESS REPLACE compute=\d+.?\d*(µ|m|n) compute=xx
-- SQLNESS REPLACE time=\d+.?\d*(µ|m|n) time=xx
-- SQLNESS REPLACE metrics=\[.*?s\] metrics=xx
-- SQLNESS REPLACE elapsed_compute=\d+.?\d*(µ|m|n) elapsed_compute=xx
-- SQLNESS REPLACE scan_memtable_\d+ scan_memtable_n
EXPLAIN ANALYZE SELECT * from partition_table_t where name = "ceresdb0";

-- SQLNESS REPLACE duration=\d+.?\d*(µ|m|n) duration=xx
-- SQLNESS REPLACE compute=\d+.?\d*(µ|m|n) compute=xx
-- SQLNESS REPLACE __partition_table_t_\d __partition_table_t_x
-- SQLNESS REPLACE time=\d+.?\d*(µ|m|n) time=xx
-- SQLNESS REPLACE metrics=\[.*?s\] metrics=xx
zealchen marked this conversation as resolved.
Show resolved Hide resolved
-- SQLNESS REPLACE elapsed_compute=\d+.?\d*(µ|m|n) elapsed_compute=xx
-- SQLNESS REPLACE scan_memtable_\d+ scan_memtable_n
EXPLAIN ANALYZE SELECT * from partition_table_t where name in ("ceresdb0", "ceresdb1", "ceresdb2", "ceresdb3", "ceresdb4");

Expand Down
5 changes: 4 additions & 1 deletion src/df_engine_extensions/src/dist_sql_query/physical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ use datafusion::{
};
use futures::{future::BoxFuture, FutureExt, Stream, StreamExt};
use runtime::Priority;
use table_engine::{remote::model::TableIdentifier, table::ReadRequest};
use table_engine::{predicate::Predicate, remote::model::TableIdentifier, table::ReadRequest};
use trace_metric::{collector::FormatCollectorVisitor, MetricsCollector, TraceMetricWhenDrop};

use crate::dist_sql_query::{RemotePhysicalPlanExecutor, RemoteTaskContext, TableScanContext};
Expand All @@ -62,13 +62,15 @@ pub struct UnresolvedPartitionedScan {
pub table_scan_ctx: TableScanContext,
pub metrics_collector: MetricsCollector,
pub priority: Priority,
pub predicates: Option<Vec<Predicate>>,
}

impl UnresolvedPartitionedScan {
pub fn new(
table_name: &str,
sub_tables: Vec<TableIdentifier>,
read_request: ReadRequest,
predicates: Option<Vec<Predicate>>,
) -> Self {
let metrics_collector = MetricsCollector::new(table_name.to_string());
let table_scan_ctx = TableScanContext {
Expand All @@ -83,6 +85,7 @@ impl UnresolvedPartitionedScan {
table_scan_ctx,
metrics_collector,
priority: read_request.priority,
predicates,
}
}
}
Expand Down
16 changes: 14 additions & 2 deletions src/df_engine_extensions/src/dist_sql_query/resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,10 +135,22 @@ impl Resolver {
let sub_tables = unresolved.sub_tables.clone();
let remote_plans = sub_tables
.into_iter()
.map(|table| {
.enumerate()
.map(|(idx, table)| {
let plan = Arc::new(UnresolvedSubTableScan {
table: table.clone(),
table_scan_ctx: unresolved.table_scan_ctx.clone(),
table_scan_ctx: if let Some(ref predicates) = unresolved.predicates {
// Since all each partition has different predicate, so we shall build
// seperate ctx regarding each partition
// with different predicate
let mut ctx = unresolved.table_scan_ctx.clone();
// overwrite old predicate (it's the predidcate before partiton
// calculation) with optimized predicate
ctx.predicate = Arc::new(predicates[idx].clone());
zealchen marked this conversation as resolved.
Show resolved Hide resolved
ctx
} else {
unresolved.table_scan_ctx.clone()
},
});
let sub_metrics_collect = metrics_collector.span(table.table.clone());

Expand Down
2 changes: 2 additions & 0 deletions src/df_engine_extensions/src/dist_sql_query/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@ impl TestContext {
"test",
sub_tables,
self.request.clone(),
None,
));

let filter: Arc<dyn ExecutionPlan> =
Expand Down Expand Up @@ -364,6 +365,7 @@ impl TestContext {
"test",
self.sub_table_groups[0].clone(),
self.request.clone(),
None,
));

self.build_aggr_plan_with_input(unresolved_scan)
Expand Down
49 changes: 46 additions & 3 deletions src/partition_table_engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,17 @@ use std::sync::Arc;

use analytic_engine::TableOptions;
use async_trait::async_trait;
use datafusion::logical_expr::expr::{Expr, InList};
use generic_error::BoxError;
use snafu::{OptionExt, ResultExt};
use snafu::{ensure, OptionExt, ResultExt};
use table_engine::{
engine::{
CloseShardRequest, CloseTableRequest, CreateTableParams, CreateTableRequest,
DropTableRequest, OpenShardRequest, OpenShardResult, OpenTableRequest, Result, TableEngine,
Unexpected, UnexpectedNoCause,
DropTableRequest, InvalidPartitionContext, OpenShardRequest, OpenShardResult,
OpenTableRequest, Result, TableEngine, Unexpected, UnexpectedNoCause,
},
partition::rule::df_adapter::PartitionedFilterKeyIndex,
predicate::Predicate,
remote::RemoteEngineRef,
table::TableRef,
PARTITION_TABLE_ENGINE_TYPE,
Expand Down Expand Up @@ -110,3 +113,43 @@ impl TableEngine for PartitionTableEngine {
vec![Ok("".to_string())]
}
}

pub fn partitioned_predicates(
predicate: Arc<Predicate>,
partitions: &[usize],
partitioned_key_indices: &mut PartitionedFilterKeyIndex,
) -> Result<Vec<Predicate>> {
ensure!(
partitions.len() == partitioned_key_indices.keys().len(),
InvalidPartitionContext {
msg: format!(
"partitions length:{}, partitioned_key_indices length: {}",
partitions.len(),
partitioned_key_indices.keys().len()
)
}
);
let mut predicates = vec![(*predicate).clone(); partitions.len()];
for (idx, predicate) in predicates.iter_mut().enumerate() {
let partition = partitions[idx];
if let Some(filter_indices) = partitioned_key_indices.get(&partition) {
let exprs = predicate.mut_exprs();
for (filter_idx, key_indices) in filter_indices {
if let Expr::InList(InList {
list,
negated: false,
..
}) = &mut exprs[*filter_idx]
{
let mut idx = 0;
list.retain(|_| {
let should_kept = key_indices.contains(&idx);
idx += 1;
should_kept
});
}
}
}
}
Ok(predicates)
}
8 changes: 4 additions & 4 deletions src/partition_table_engine/src/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ use table_engine::{
partition::{
format_sub_partition_table_name,
rule::{
df_adapter::DfPartitionRuleAdapter, PartitionedRow, PartitionedRows,
PartitionedRowsIter,
df_adapter::{DfPartitionRuleAdapter, PartitionedFilterKeyIndex},
PartitionedRow, PartitionedRows, PartitionedRowsIter,
},
PartitionInfo,
},
Expand Down Expand Up @@ -289,14 +289,14 @@ impl Table for PartitionTableImpl {
.context(CreatePartitionRule)?
}
};

let mut partitioned_key_indices = PartitionedFilterKeyIndex::new();
// Evaluate expr and locate partition.
let partitions = {
let _locate_timer = PARTITION_TABLE_PARTITIONED_READ_DURATION_HISTOGRAM
.with_label_values(&["locate"])
.start_timer();
df_partition_rule
.locate_partitions_for_read(request.predicate.exprs())
.locate_partitions_for_read(request.predicate.exprs(), &mut partitioned_key_indices)
.box_err()
.context(LocatePartitions)?
};
Expand Down
Loading
Loading