Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pipelineX](improvement) Support global runtime filter #28692

Merged
merged 4 commits into from
Dec 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 34 additions & 4 deletions be/src/exprs/runtime_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -946,10 +946,11 @@ class RuntimePredicateWrapper {
Status IRuntimeFilter::create(RuntimeFilterParamsContext* state, ObjectPool* pool,
const TRuntimeFilterDesc* desc, const TQueryOptions* query_options,
const RuntimeFilterRole role, int node_id, IRuntimeFilter** res,
bool build_bf_exactly) {
*res = pool->add(new IRuntimeFilter(state, pool, desc));
bool build_bf_exactly, bool is_global, int parallel_tasks) {
*res = pool->add(new IRuntimeFilter(state, pool, desc, is_global, parallel_tasks));
(*res)->set_role(role);
return (*res)->init_with_desc(desc, query_options, node_id, build_bf_exactly);
return (*res)->init_with_desc(desc, query_options, node_id,
is_global ? false : build_bf_exactly);
}

void IRuntimeFilter::copy_to_shared_context(vectorized::SharedRuntimeFilterContext& context) {
Expand All @@ -972,9 +973,35 @@ void IRuntimeFilter::insert_batch(const vectorized::ColumnPtr column, size_t sta
_wrapper->insert_batch(column, start);
}

Status IRuntimeFilter::merge_local_filter(RuntimePredicateWrapper* wrapper, int* merged_num) {
SCOPED_TIMER(_merge_local_rf_timer);
std::unique_lock lock(_local_merge_mutex);
if (_merged_rf_num == 0) {
_wrapper = wrapper;
} else {
RETURN_IF_ERROR(merge_from(wrapper));
}
*merged_num = ++_merged_rf_num;
return Status::OK();
}

Status IRuntimeFilter::publish() {
DCHECK(is_producer());
if (_has_local_target) {
if (_is_global) {
std::vector<IRuntimeFilter*> filters;
RETURN_IF_ERROR(_state->get_query_ctx()->runtime_filter_mgr()->get_consume_filters(
_filter_id, filters));
// push down
for (auto filter : filters) {
int merged_num = 0;
RETURN_IF_ERROR(filter->merge_local_filter(_wrapper, &merged_num));
if (merged_num == _parallel_build_tasks) {
filter->update_runtime_filter_type_to_profile();
filter->signal();
}
}
return Status::OK();
} else if (_has_local_target) {
std::vector<IRuntimeFilter*> filters;
RETURN_IF_ERROR(_state->runtime_filter_mgr->get_consume_filters(_filter_id, filters));
// push down
Expand Down Expand Up @@ -1297,6 +1324,9 @@ void IRuntimeFilter::init_profile(RuntimeProfile* parent_profile) {
_profile_init = true;
parent_profile->add_child(_profile.get(), true, nullptr);
_profile->add_info_string("Info", _format_status());
if (_is_global) {
_merge_local_rf_timer = ADD_TIMER(_profile.get(), "MergeLocalRuntimeFilterTime");
}
if (_runtime_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER) {
update_runtime_filter_type_to_profile();
}
Expand Down
22 changes: 19 additions & 3 deletions be/src/exprs/runtime_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ enum RuntimeFilterState {
class IRuntimeFilter {
public:
IRuntimeFilter(RuntimeFilterParamsContext* state, ObjectPool* pool,
const TRuntimeFilterDesc* desc)
const TRuntimeFilterDesc* desc, bool is_global = false, int parallel_tasks = -1)
: _state(state),
_pool(pool),
_filter_id(desc->filter_id),
Expand All @@ -206,14 +206,17 @@ class IRuntimeFilter {
_runtime_filter_type(get_runtime_filter_type(desc)),
_name(fmt::format("RuntimeFilter: (id = {}, type = {})", _filter_id,
to_string(_runtime_filter_type))),
_profile(new RuntimeProfile(_name)) {}
_profile(new RuntimeProfile(_name)),
_is_global(is_global),
_parallel_build_tasks(parallel_tasks) {}

~IRuntimeFilter() = default;

static Status create(RuntimeFilterParamsContext* state, ObjectPool* pool,
const TRuntimeFilterDesc* desc, const TQueryOptions* query_options,
const RuntimeFilterRole role, int node_id, IRuntimeFilter** res,
bool build_bf_exactly = false);
bool build_bf_exactly = false, bool is_global = false,
int parallel_tasks = 0);

void copy_to_shared_context(vectorized::SharedRuntimeFilterContext& context);
Status copy_from_shared_context(vectorized::SharedRuntimeFilterContext& context);
Expand Down Expand Up @@ -359,6 +362,8 @@ class IRuntimeFilter {

void set_filter_timer(std::shared_ptr<pipeline::RuntimeFilterTimer>);

Status merge_local_filter(RuntimePredicateWrapper* wrapper, int* merged_num);

protected:
// serialize _wrapper to protobuf
void to_protobuf(PInFilter* filter);
Expand Down Expand Up @@ -452,7 +457,18 @@ class IRuntimeFilter {
// parent profile
// only effect on consumer
std::unique_ptr<RuntimeProfile> _profile;
RuntimeProfile::Counter* _merge_local_rf_timer = nullptr;
bool _opt_remote_rf;
// `_is_global` indicates whether this runtime filter is global on this BE.
// All runtime filters should be merged on each BE if it is global.
// This is improvement for pipelineX.
const bool _is_global = false;
std::mutex _local_merge_mutex;
// There are `_parallel_build_tasks` pipeline tasks to build runtime filter.
// We should call `signal` once all runtime filters are done and merged to one
// (e.g. `_merged_rf_num` is equal to `_parallel_build_tasks`).
int _merged_rf_num = 0;
const int _parallel_build_tasks = -1;

std::vector<std::shared_ptr<pipeline::RuntimeFilterTimer>> _filter_timer;
};
Expand Down
12 changes: 9 additions & 3 deletions be/src/exprs/runtime_filter_slots.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@ class VRuntimeFilterSlots {
public:
VRuntimeFilterSlots(
const std::vector<std::shared_ptr<vectorized::VExprContext>>& build_expr_ctxs,
const std::vector<TRuntimeFilterDesc>& runtime_filter_descs)
: _build_expr_context(build_expr_ctxs), _runtime_filter_descs(runtime_filter_descs) {}
const std::vector<TRuntimeFilterDesc>& runtime_filter_descs, bool is_global = false)
: _build_expr_context(build_expr_ctxs),
_runtime_filter_descs(runtime_filter_descs),
_is_global(is_global) {}

Status init(RuntimeState* state, int64_t hash_table_size) {
// runtime filter effect strategy
Expand All @@ -45,7 +47,10 @@ class VRuntimeFilterSlots {

std::map<int, bool> has_in_filter;

auto ignore_local_filter = [state](int filter_id) {
auto ignore_local_filter = [&](int filter_id) {
if (_is_global) {
return Status::OK();
}
std::vector<IRuntimeFilter*> filters;
RETURN_IF_ERROR(state->runtime_filter_mgr()->get_consume_filters(filter_id, filters));
if (filters.empty()) {
Expand Down Expand Up @@ -236,6 +241,7 @@ class VRuntimeFilterSlots {
private:
const std::vector<std::shared_ptr<vectorized::VExprContext>>& _build_expr_context;
const std::vector<TRuntimeFilterDesc>& _runtime_filter_descs;
const bool _is_global = false;
// prob_contition index -> [IRuntimeFilter]
std::map<int, std::list<IRuntimeFilter*>> _runtime_filters;
};
Expand Down
10 changes: 5 additions & 5 deletions be/src/pipeline/exec/aggregation_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -366,15 +366,15 @@ class AggSinkOperatorX : public DataSinkOperatorX<LocalStateType> {
Status sink(RuntimeState* state, vectorized::Block* in_block,
SourceState source_state) override;

std::vector<TExpr> get_local_shuffle_exprs() const override { return _partition_exprs; }
ExchangeType get_local_exchange_type() const override {
DataDistribution get_local_exchange_type() const override {
if (_probe_expr_ctxs.empty()) {
return _needs_finalize || DataSinkOperatorX<LocalStateType>::_child_x
->ignore_data_distribution()
? ExchangeType::PASSTHROUGH
: ExchangeType::NOOP;
? DataDistribution(ExchangeType::PASSTHROUGH)
: DataSinkOperatorX<LocalStateType>::get_local_exchange_type();
}
return _is_colocate ? ExchangeType::BUCKET_HASH_SHUFFLE : ExchangeType::HASH_SHUFFLE;
return _is_colocate ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs);
}

using DataSinkOperatorX<LocalStateType>::id;
Expand Down
5 changes: 3 additions & 2 deletions be/src/pipeline/exec/aggregation_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -506,8 +506,9 @@ Status AggLocalState::_get_without_key_result(RuntimeState* state, vectorized::B
if (!column_type->is_nullable() || data_types[i]->is_nullable() ||
!remove_nullable(column_type)->equals(*data_types[i])) {
return Status::InternalError(
"column_type not match data_types, column_type={}, data_types={}",
column_type->get_name(), data_types[i]->get_name());
"node id = {}, column_type not match data_types, column_type={}, "
"data_types={}",
_parent->node_id(), column_type->get_name(), data_types[i]->get_name());
}
}

Expand Down
11 changes: 6 additions & 5 deletions be/src/pipeline/exec/analytic_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,14 +107,15 @@ class AnalyticSinkOperatorX final : public DataSinkOperatorX<AnalyticSinkLocalSt

Status sink(RuntimeState* state, vectorized::Block* in_block,
SourceState source_state) override;
std::vector<TExpr> get_local_shuffle_exprs() const override { return _partition_exprs; }
ExchangeType get_local_exchange_type() const override {
DataDistribution get_local_exchange_type() const override {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: method 'get_local_exchange_type' can be made static [readability-convert-member-functions-to-static]

Suggested change
DataDistribution get_local_exchange_type() const override {
static DataDistribution get_local_exchange_type() override {

if (_partition_by_eq_expr_ctxs.empty()) {
return ExchangeType::PASSTHROUGH;
return {ExchangeType::PASSTHROUGH};
} else if (_order_by_eq_expr_ctxs.empty()) {
return _is_colocate ? ExchangeType::BUCKET_HASH_SHUFFLE : ExchangeType::HASH_SHUFFLE;
return _is_colocate
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs);
}
return ExchangeType::NOOP;
return DataSinkOperatorX<AnalyticSinkLocalState>::get_local_exchange_type();
}

private:
Expand Down
4 changes: 3 additions & 1 deletion be/src/pipeline/exec/assert_num_rows_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ class AssertNumRowsOperatorX final : public StreamingOperatorX<AssertNumRowsLoca

[[nodiscard]] bool is_source() const override { return false; }

ExchangeType get_local_exchange_type() const override { return ExchangeType::PASSTHROUGH; }
DataDistribution get_local_exchange_type() const override {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: method 'get_local_exchange_type' can be made static [readability-convert-member-functions-to-static]

Suggested change
DataDistribution get_local_exchange_type() const override {
static DataDistribution get_local_exchange_type() override {

return {ExchangeType::PASSTHROUGH};
}

private:
friend class AssertNumRowsLocalState;
Expand Down
7 changes: 5 additions & 2 deletions be/src/pipeline/exec/exchange_source_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,11 @@ class ExchangeSourceOperatorX final : public OperatorX<ExchangeLocalState> {
return _sub_plan_query_statistics_recvr;
}

bool need_to_local_shuffle() const override {
return !_is_hash_partition || OperatorX<ExchangeLocalState>::ignore_data_distribution();
DataDistribution get_local_exchange_type() const override {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: method 'get_local_exchange_type' can be made static [readability-convert-member-functions-to-static]

Suggested change
DataDistribution get_local_exchange_type() const override {
static DataDistribution get_local_exchange_type() override {

if (!_is_hash_partition || OperatorX<ExchangeLocalState>::ignore_data_distribution()) {
return {ExchangeType::NOOP};
}
return {ExchangeType::HASH_SHUFFLE};
}

private:
Expand Down
9 changes: 6 additions & 3 deletions be/src/pipeline/exec/hashjoin_build_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo
_runtime_filters.resize(p._runtime_filter_descs.size());
for (size_t i = 0; i < p._runtime_filter_descs.size(); i++) {
RETURN_IF_ERROR(state->runtime_filter_mgr()->register_producer_filter(
p._runtime_filter_descs[i], state->query_options(), _build_expr_ctxs.size() == 1));
p._runtime_filter_descs[i], state->query_options(), _build_expr_ctxs.size() == 1,
p._use_global_rf, p._child_x->parallel_tasks()));
RETURN_IF_ERROR(state->runtime_filter_mgr()->get_producer_filter(
p._runtime_filter_descs[i].filter_id, &_runtime_filters[i]));
}
Expand Down Expand Up @@ -386,12 +387,14 @@ void HashJoinBuildSinkLocalState::_hash_table_init(RuntimeState* state) {

HashJoinBuildSinkOperatorX::HashJoinBuildSinkOperatorX(ObjectPool* pool, int operator_id,
const TPlanNode& tnode,
const DescriptorTbl& descs)
const DescriptorTbl& descs,
bool use_global_rf)
: JoinBuildSinkOperatorX(pool, operator_id, tnode, descs),
_join_distribution(tnode.hash_join_node.__isset.dist_type ? tnode.hash_join_node.dist_type
: TJoinDistributionType::NONE),
_is_broadcast_join(tnode.hash_join_node.__isset.is_broadcast_join &&
tnode.hash_join_node.is_broadcast_join) {
tnode.hash_join_node.is_broadcast_join),
_use_global_rf(use_global_rf) {
_runtime_filter_descs = tnode.runtime_filters;
}

Expand Down
18 changes: 10 additions & 8 deletions be/src/pipeline/exec/hashjoin_build_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ class HashJoinBuildSinkOperatorX final
: public JoinBuildSinkOperatorX<HashJoinBuildSinkLocalState> {
public:
HashJoinBuildSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode,
const DescriptorTbl& descs);
const DescriptorTbl& descs, bool use_global_rf);
Status init(const TDataSink& tsink) override {
return Status::InternalError("{} should not init with TDataSink",
JoinBuildSinkOperatorX<HashJoinBuildSinkLocalState>::_name);
Expand All @@ -155,18 +155,18 @@ class HashJoinBuildSinkOperatorX final
._should_build_hash_table;
}

std::vector<TExpr> get_local_shuffle_exprs() const override { return _partition_exprs; }
ExchangeType get_local_exchange_type() const override {
DataDistribution get_local_exchange_type() const override {
if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
return ExchangeType::NOOP;
return {ExchangeType::NOOP};
} else if (_is_broadcast_join) {
return _child_x->ignore_data_distribution() ? ExchangeType::BROADCAST
: ExchangeType::NOOP;
return _child_x->ignore_data_distribution()
? DataDistribution(ExchangeType::PASS_TO_ONE)
: DataDistribution(ExchangeType::NOOP);
}
return _join_distribution == TJoinDistributionType::BUCKET_SHUFFLE ||
_join_distribution == TJoinDistributionType::COLOCATE
? ExchangeType::BUCKET_HASH_SHUFFLE
: ExchangeType::HASH_SHUFFLE;
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs);
}

private:
Expand All @@ -187,6 +187,8 @@ class HashJoinBuildSinkOperatorX final
vectorized::SharedHashTableContextPtr _shared_hash_table_context = nullptr;
std::vector<TRuntimeFilterDesc> _runtime_filter_descs;
std::vector<TExpr> _partition_exprs;

const bool _use_global_rf;
};

} // namespace pipeline
Expand Down
12 changes: 6 additions & 6 deletions be/src/pipeline/exec/hashjoin_probe_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -163,17 +163,17 @@ class HashJoinProbeOperatorX final : public JoinProbeOperatorX<HashJoinProbeLoca
SourceState& source_state) const override;

bool need_more_input_data(RuntimeState* state) const override;
std::vector<TExpr> get_local_shuffle_exprs() const override { return _partition_exprs; }
ExchangeType get_local_exchange_type() const override {
DataDistribution get_local_exchange_type() const override {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: method 'get_local_exchange_type' can be made static [readability-convert-member-functions-to-static]

Suggested change
DataDistribution get_local_exchange_type() const override {
static DataDistribution get_local_exchange_type() override {

if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
return ExchangeType::NOOP;
return {ExchangeType::NOOP};
}
return _is_broadcast_join
? ExchangeType::PASSTHROUGH
? DataDistribution(ExchangeType::PASSTHROUGH)
: (_join_distribution == TJoinDistributionType::BUCKET_SHUFFLE ||
_join_distribution == TJoinDistributionType::COLOCATE
? ExchangeType::BUCKET_HASH_SHUFFLE
: ExchangeType::HASH_SHUFFLE);
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE,
_partition_exprs)
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs));
}

private:
Expand Down
8 changes: 8 additions & 0 deletions be/src/pipeline/exec/nested_loop_join_build_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,14 @@ class NestedLoopJoinBuildSinkOperatorX final
Status sink(RuntimeState* state, vectorized::Block* in_block,
SourceState source_state) override;

DataDistribution get_local_exchange_type() const override {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: method 'get_local_exchange_type' can be made static [readability-convert-member-functions-to-static]

Suggested change
DataDistribution get_local_exchange_type() const override {
static DataDistribution get_local_exchange_type() override {

if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
return {ExchangeType::NOOP};
}
return _child_x->ignore_data_distribution() ? DataDistribution(ExchangeType::BROADCAST)
: DataDistribution(ExchangeType::NOOP);
}

private:
friend class NestedLoopJoinBuildSinkLocalState;

Expand Down
6 changes: 3 additions & 3 deletions be/src/pipeline/exec/nested_loop_join_probe_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -227,11 +227,11 @@ class NestedLoopJoinProbeOperatorX final
return _old_version_flag ? _row_descriptor : *_intermediate_row_desc;
}

ExchangeType get_local_exchange_type() const override {
DataDistribution get_local_exchange_type() const override {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: method 'get_local_exchange_type' can be made static [readability-convert-member-functions-to-static]

Suggested change
DataDistribution get_local_exchange_type() const override {
static DataDistribution get_local_exchange_type() override {

if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
return ExchangeType::NOOP;
return {ExchangeType::NOOP};
}
return ExchangeType::ADAPTIVE_PASSTHROUGH;
return {ExchangeType::ADAPTIVE_PASSTHROUGH};
}

const RowDescriptor& row_desc() override {
Expand Down
6 changes: 3 additions & 3 deletions be/src/pipeline/exec/partition_sort_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,11 @@ class PartitionSortSinkOperatorX final : public DataSinkOperatorX<PartitionSortS
Status open(RuntimeState* state) override;
Status sink(RuntimeState* state, vectorized::Block* in_block,
SourceState source_state) override;
ExchangeType get_local_exchange_type() const override {
DataDistribution get_local_exchange_type() const override {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: method 'get_local_exchange_type' can be made static [readability-convert-member-functions-to-static]

Suggested change
DataDistribution get_local_exchange_type() const override {
static DataDistribution get_local_exchange_type() override {

if (_topn_phase == TPartTopNPhase::TWO_PHASE_GLOBAL) {
return ExchangeType::NOOP;
return DataSinkOperatorX<PartitionSortSinkLocalState>::get_local_exchange_type();
}
return ExchangeType::PASSTHROUGH;
return {ExchangeType::PASSTHROUGH};
}

private:
Expand Down
Loading
Loading