Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge consequent FilterSteps. #64760

Merged
merged 19 commits into from
Jun 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions src/Processors/QueryPlan/Optimizations/mergeExpressions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,25 @@
#include <Processors/QueryPlan/FilterStep.h>
#include <Processors/QueryPlan/ExpressionStep.h>
#include <Interpreters/ActionsDAG.h>
#include <Functions/FunctionsLogical.h>
#include <Functions/IFunctionAdaptors.h>

namespace DB::QueryPlanOptimizations
{

static void removeFromOutputs(ActionsDAG & dag, const ActionsDAG::Node & node)
{
auto & outputs = dag.getOutputs();
for (size_t i = 0; i < outputs.size(); ++i)
{
if (&node == outputs[i])
{
outputs.erase(outputs.begin() + i);
return;
}
}
}

size_t tryMergeExpressions(QueryPlan::Node * parent_node, QueryPlan::Nodes &)
{
if (parent_node->children.size() != 1)
Expand All @@ -19,6 +34,7 @@ size_t tryMergeExpressions(QueryPlan::Node * parent_node, QueryPlan::Nodes &)
auto * parent_expr = typeid_cast<ExpressionStep *>(parent.get());
auto * parent_filter = typeid_cast<FilterStep *>(parent.get());
auto * child_expr = typeid_cast<ExpressionStep *>(child.get());
auto * child_filter = typeid_cast<FilterStep *>(child.get());

if (parent_expr && child_expr)
{
Expand Down Expand Up @@ -60,6 +76,42 @@ size_t tryMergeExpressions(QueryPlan::Node * parent_node, QueryPlan::Nodes &)
parent_node->children.swap(child_node->children);
return 1;
}
else if (parent_filter && child_filter)
{
const auto & child_actions = child_filter->getExpression();
const auto & parent_actions = parent_filter->getExpression();

if (child_actions->hasArrayJoin())
return 0;

auto actions = child_actions->clone();
const auto & child_filter_node = actions->findInOutputs(child_filter->getFilterColumnName());
if (child_filter->removesFilterColumn())
removeFromOutputs(*actions, child_filter_node);

actions->mergeInplace(std::move(*parent_actions->clone()));

const auto & parent_filter_node = actions->findInOutputs(parent_filter->getFilterColumnName());
if (parent_filter->removesFilterColumn())
removeFromOutputs(*actions, parent_filter_node);

FunctionOverloadResolverPtr func_builder_and = std::make_unique<FunctionToOverloadResolverAdaptor>(std::make_shared<FunctionAnd>());
const auto & condition = actions->addFunction(func_builder_and, {&child_filter_node, &parent_filter_node}, {});
auto & outputs = actions->getOutputs();
outputs.insert(outputs.begin(), &condition);

actions->removeUnusedActions(false);

auto filter = std::make_unique<FilterStep>(child_filter->getInputStreams().front(),
actions,
condition.result_name,
true);
filter->setStepDescription("(" + parent_filter->getStepDescription() + " + " + child_filter->getStepDescription() + ")");

parent_node->step = std::move(filter);
parent_node->children.swap(child_node->children);
return 1;
}

return 0;
}
Expand Down
10 changes: 10 additions & 0 deletions src/Storages/SelectQueryInfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,9 @@ class IMergeTreeDataPart;

using ManyExpressionActions = std::vector<ExpressionActionsPtr>;

struct StorageSnapshot;
using StorageSnapshotPtr = std::shared_ptr<StorageSnapshot>;

/** Query along with some additional data,
* that can be used during query processing
* inside storage engines.
Expand Down Expand Up @@ -173,6 +176,13 @@ struct SelectQueryInfo
/// Local storage limits
StorageLimits local_storage_limits;

/// This is a leak of abstraction.
/// StorageMerge replaces storage into query_tree. However, column types may be changed for inner table.
/// So, resolved query tree might have incompatible types.
/// StorageDistributed uses this query tree to calculate a header, throws if we use storage snapshot.
/// To avoid this, we use initial merge_storage_snapshot.
StorageSnapshotPtr merge_storage_snapshot;

/// Cluster for the query.
ClusterPtr cluster;
/// Optimized cluster for the query.
Expand Down
2 changes: 1 addition & 1 deletion src/Storages/StorageDistributed.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -846,7 +846,7 @@ void StorageDistributed::read(
remote_storage_id = StorageID{remote_database, remote_table};

auto query_tree_distributed = buildQueryTreeDistributed(modified_query_info,
storage_snapshot,
query_info.merge_storage_snapshot ? query_info.merge_storage_snapshot : storage_snapshot,
remote_storage_id,
remote_table_function_ptr);
header = InterpreterSelectQueryAnalyzer::getSampleBlock(query_tree_distributed, local_context, SelectQueryOptions(processed_stage).analyze());
Expand Down
7 changes: 6 additions & 1 deletion src/Storages/StorageMerge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -889,6 +889,8 @@ SelectQueryInfo ReadFromMerge::getModifiedQueryInfo(const ContextMutablePtr & mo

SelectQueryInfo modified_query_info = query_info;

modified_query_info.merge_storage_snapshot = merge_storage_snapshot;

if (modified_query_info.planner_context)
modified_query_info.planner_context = std::make_shared<PlannerContext>(modified_context, modified_query_info.planner_context);

Expand Down Expand Up @@ -1198,7 +1200,10 @@ ReadFromMerge::ChildPlan ReadFromMerge::createPlanForTable(

if (allow_experimental_analyzer)
{
InterpreterSelectQueryAnalyzer interpreter(modified_query_info.query_tree,
/// Converting query to AST because types might be different in the source table.
/// Need to resolve types again.
auto ast = modified_query_info.query_tree->toAST();
InterpreterSelectQueryAnalyzer interpreter(ast,
modified_context,
SelectQueryOptions(processed_stage));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,6 @@ Filter column: notEquals(__table1.y, 2_UInt8)
> filter is pushed down before CreatingSets
CreatingSets
Filter
Filter
1
3
> one condition of filter is pushed down before LEFT JOIN
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
Filter (((WHERE + (Change column names to column identifiers + (Project names + Projection))) + HAVING))
Filter column: and(notEquals(sum(__table2.number), 0_UInt8), equals(__table1.key, 7_UInt8)) (removed)
Aggregating
Filter (( + (Before GROUP BY + Change column names to column identifiers)))
Filter column: equals(__table1.key, 7_UInt8) (removed)
Filter (((WHERE + (Projection + Before ORDER BY)) + HAVING))
Filter column: and(notEquals(sum(number), 0), equals(key, 7)) (removed)
Aggregating
Filter ((( + Before GROUP BY) + WHERE))
Filter column: and(equals(bitAnd(number, 15), 7), equals(key, 7)) (removed)
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
set allow_experimental_analyzer=1;
select explain from (explain actions = 1 select * from (select sum(number) as v, bitAnd(number, 15) as key from numbers(1e8) group by key having v != 0) where key = 7) where explain like '%Filter%' or explain like '%Aggregating%';

set allow_experimental_analyzer=0;
select explain from (explain actions = 1 select * from (select sum(number) as v, bitAnd(number, 15) as key from numbers(1e8) group by key having v != 0) where key = 7) where explain like '%Filter%' or explain like '%Aggregating%';
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@
Prewhere info
Prewhere filter
Prewhere filter column: and(notEmpty(v), equals(k, 3)) (removed)
Prewhere info
Prewhere filter
Prewhere filter column: and(notEmpty(v), equals(k, 3)) (removed)
Prewhere info
Prewhere filter
Prewhere filter column: and(notEmpty(v), equals(k, 3)) (removed)
2
Filter column: and(equals(k, 3), notEmpty(v)) (removed)
Prewhere info
Expand Down
3 changes: 2 additions & 1 deletion tests/queries/0_stateless/02156_storage_merge_prewhere.sql
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ INSERT INTO t_02156_mt1 SELECT number, toString(number) FROM numbers(10000);
INSERT INTO t_02156_mt2 SELECT number, toString(number) FROM numbers(10000);
INSERT INTO t_02156_log SELECT number, toString(number) FROM numbers(10000);

SELECT replaceRegexpAll(explain, '__table1\.|_UInt8', '') FROM (EXPLAIN actions=1 SELECT count() FROM t_02156_merge1 WHERE k = 3 AND notEmpty(v)) WHERE explain LIKE '%Prewhere%' OR explain LIKE '%Filter column%';
SELECT replaceRegexpAll(explain, '__table1\.|_UInt8', '') FROM (EXPLAIN actions=1 SELECT count() FROM t_02156_merge1 WHERE k = 3 AND notEmpty(v)) WHERE explain LIKE '%Prewhere%' OR explain LIKE '%Filter column%' settings allow_experimental_analyzer=1;
SELECT replaceRegexpAll(explain, '__table1\.|_UInt8', '') FROM (EXPLAIN actions=1 SELECT count() FROM t_02156_merge1 WHERE k = 3 AND notEmpty(v)) WHERE explain LIKE '%Prewhere%' OR explain LIKE '%Filter column%' settings allow_experimental_analyzer=0;
SELECT count() FROM t_02156_merge1 WHERE k = 3 AND notEmpty(v);

SELECT replaceRegexpAll(explain, '__table1\.|_UInt8', '') FROM (EXPLAIN actions=1 SELECT count() FROM t_02156_merge2 WHERE k = 3 AND notEmpty(v)) WHERE explain LIKE '%Prewhere%' OR explain LIKE '%Filter column%';
Expand Down
12 changes: 12 additions & 0 deletions tests/queries/0_stateless/02156_storage_merge_prewhere_2.reference
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
1 a
1 a
2 b
2 b
1 a
1 a
2 b
2 b
1 a
2 b
1 a
2 b
17 changes: 17 additions & 0 deletions tests/queries/0_stateless/02156_storage_merge_prewhere_2.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
DROP TABLE IF EXISTS t_02156_ololo_1;
DROP TABLE IF EXISTS t_02156_ololo_2;
DROP TABLE IF EXISTS t_02156_ololo_dist;

CREATE TABLE t_02156_ololo_1 (k UInt32, v Nullable(String)) ENGINE = MergeTree order by k;
CREATE TABLE t_02156_ololo_2 (k UInt32, v String) ENGINE = MergeTree order by k;
CREATE TABLE t_02156_ololo_dist (k UInt32, v String) ENGINE = Distributed(test_shard_localhost, currentDatabase(), t_02156_ololo_2);
CREATE TABLE t_02156_ololo_dist2 (k UInt32, v Nullable(String)) ENGINE = Distributed(test_shard_localhost, currentDatabase(), t_02156_ololo_1);

insert into t_02156_ololo_1 values (1, 'a');
insert into t_02156_ololo_2 values (2, 'b');

select * from merge('t_02156_ololo') where k != 0 and notEmpty(v) order by k settings optimize_move_to_prewhere=0;
select * from merge('t_02156_ololo') where k != 0 and notEmpty(v) order by k settings optimize_move_to_prewhere=1;

select * from merge('t_02156_ololo_dist') where k != 0 and notEmpty(v) order by k settings optimize_move_to_prewhere=0;
select * from merge('t_02156_ololo_dist') where k != 0 and notEmpty(v) order by k settings optimize_move_to_prewhere=1;
Original file line number Diff line number Diff line change
Expand Up @@ -332,13 +332,12 @@ SETTINGS optimize_aggregators_of_group_by_keys=0 -- avoid removing any() as it d
Expression (Projection)
Sorting (Sorting for ORDER BY)
Expression (Before ORDER BY)
Filter ((WHERE + (Projection + Before ORDER BY)))
Filter (HAVING)
Aggregating
Expression ((Before GROUP BY + Projection))
Sorting (Sorting for ORDER BY)
Expression ((Before ORDER BY + (Projection + Before ORDER BY)))
ReadFromSystemNumbers
Filter (((WHERE + (Projection + Before ORDER BY)) + HAVING))
Aggregating
Expression ((Before GROUP BY + Projection))
Sorting (Sorting for ORDER BY)
Expression ((Before ORDER BY + (Projection + Before ORDER BY)))
ReadFromSystemNumbers
-- execute
1
2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,16 @@ WHERE type_1 = \'all\'
ExpressionTransform × 2
(Filter)
FilterTransform × 2
(Filter)
FilterTransform × 2
(Filter)
FilterTransform × 2
(Aggregating)
ExpressionTransform × 2
AggregatingTransform × 2
Copy 1 → 2
(Expression)
ExpressionTransform
(Expression)
ExpressionTransform
(ReadFromMergeTree)
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
(Aggregating)
ExpressionTransform × 2
AggregatingTransform × 2
Copy 1 → 2
(Expression)
ExpressionTransform
(Expression)
ExpressionTransform
(ReadFromMergeTree)
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
(Expression)
ExpressionTransform × 2
(Filter)
Expand All @@ -68,14 +64,10 @@ ExpressionTransform × 2
ExpressionTransform × 2
AggregatingTransform × 2
Copy 1 → 2
(Filter)
FilterTransform
(Filter)
FilterTransform
(Expression)
ExpressionTransform
(ReadFromMergeTree)
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
(Expression)
ExpressionTransform
(ReadFromMergeTree)
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
(Expression)
ExpressionTransform × 2
(Aggregating)
Expand Down
Loading