Skip to content

Commit

Permalink
[pipelineX](improvement) Support global runtime filter (apache#28692)
Browse files Browse the repository at this point in the history
  • Loading branch information
Gabriel39 authored and stephen committed Dec 28, 2023
1 parent e9ed2b3 commit 8701e31
Show file tree
Hide file tree
Showing 45 changed files with 475 additions and 236 deletions.
38 changes: 34 additions & 4 deletions be/src/exprs/runtime_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -946,10 +946,11 @@ class RuntimePredicateWrapper {
Status IRuntimeFilter::create(RuntimeFilterParamsContext* state, ObjectPool* pool,
const TRuntimeFilterDesc* desc, const TQueryOptions* query_options,
const RuntimeFilterRole role, int node_id, IRuntimeFilter** res,
bool build_bf_exactly) {
*res = pool->add(new IRuntimeFilter(state, pool, desc));
bool build_bf_exactly, bool is_global, int parallel_tasks) {
*res = pool->add(new IRuntimeFilter(state, pool, desc, is_global, parallel_tasks));
(*res)->set_role(role);
return (*res)->init_with_desc(desc, query_options, node_id, build_bf_exactly);
return (*res)->init_with_desc(desc, query_options, node_id,
is_global ? false : build_bf_exactly);
}

void IRuntimeFilter::copy_to_shared_context(vectorized::SharedRuntimeFilterContext& context) {
Expand All @@ -972,9 +973,35 @@ void IRuntimeFilter::insert_batch(const vectorized::ColumnPtr column, size_t sta
_wrapper->insert_batch(column, start);
}

Status IRuntimeFilter::merge_local_filter(RuntimePredicateWrapper* wrapper, int* merged_num) {
SCOPED_TIMER(_merge_local_rf_timer);
std::unique_lock lock(_local_merge_mutex);
if (_merged_rf_num == 0) {
_wrapper = wrapper;
} else {
RETURN_IF_ERROR(merge_from(wrapper));
}
*merged_num = ++_merged_rf_num;
return Status::OK();
}

Status IRuntimeFilter::publish() {
DCHECK(is_producer());
if (_has_local_target) {
if (_is_global) {
std::vector<IRuntimeFilter*> filters;
RETURN_IF_ERROR(_state->get_query_ctx()->runtime_filter_mgr()->get_consume_filters(
_filter_id, filters));
// push down
for (auto filter : filters) {
int merged_num = 0;
RETURN_IF_ERROR(filter->merge_local_filter(_wrapper, &merged_num));
if (merged_num == _parallel_build_tasks) {
filter->update_runtime_filter_type_to_profile();
filter->signal();
}
}
return Status::OK();
} else if (_has_local_target) {
std::vector<IRuntimeFilter*> filters;
RETURN_IF_ERROR(_state->runtime_filter_mgr->get_consume_filters(_filter_id, filters));
// push down
Expand Down Expand Up @@ -1297,6 +1324,9 @@ void IRuntimeFilter::init_profile(RuntimeProfile* parent_profile) {
_profile_init = true;
parent_profile->add_child(_profile.get(), true, nullptr);
_profile->add_info_string("Info", _format_status());
if (_is_global) {
_merge_local_rf_timer = ADD_TIMER(_profile.get(), "MergeLocalRuntimeFilterTime");
}
if (_runtime_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER) {
update_runtime_filter_type_to_profile();
}
Expand Down
22 changes: 19 additions & 3 deletions be/src/exprs/runtime_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ enum RuntimeFilterState {
class IRuntimeFilter {
public:
IRuntimeFilter(RuntimeFilterParamsContext* state, ObjectPool* pool,
const TRuntimeFilterDesc* desc)
const TRuntimeFilterDesc* desc, bool is_global = false, int parallel_tasks = -1)
: _state(state),
_pool(pool),
_filter_id(desc->filter_id),
Expand All @@ -206,14 +206,17 @@ class IRuntimeFilter {
_runtime_filter_type(get_runtime_filter_type(desc)),
_name(fmt::format("RuntimeFilter: (id = {}, type = {})", _filter_id,
to_string(_runtime_filter_type))),
_profile(new RuntimeProfile(_name)) {}
_profile(new RuntimeProfile(_name)),
_is_global(is_global),
_parallel_build_tasks(parallel_tasks) {}

~IRuntimeFilter() = default;

static Status create(RuntimeFilterParamsContext* state, ObjectPool* pool,
const TRuntimeFilterDesc* desc, const TQueryOptions* query_options,
const RuntimeFilterRole role, int node_id, IRuntimeFilter** res,
bool build_bf_exactly = false);
bool build_bf_exactly = false, bool is_global = false,
int parallel_tasks = 0);

void copy_to_shared_context(vectorized::SharedRuntimeFilterContext& context);
Status copy_from_shared_context(vectorized::SharedRuntimeFilterContext& context);
Expand Down Expand Up @@ -359,6 +362,8 @@ class IRuntimeFilter {

void set_filter_timer(std::shared_ptr<pipeline::RuntimeFilterTimer>);

Status merge_local_filter(RuntimePredicateWrapper* wrapper, int* merged_num);

protected:
// serialize _wrapper to protobuf
void to_protobuf(PInFilter* filter);
Expand Down Expand Up @@ -452,7 +457,18 @@ class IRuntimeFilter {
// parent profile
// only effect on consumer
std::unique_ptr<RuntimeProfile> _profile;
RuntimeProfile::Counter* _merge_local_rf_timer = nullptr;
bool _opt_remote_rf;
// `_is_global` indicates whether this runtime filter is global on this BE.
// All runtime filters should be merged on each BE if it is global.
// This is improvement for pipelineX.
const bool _is_global = false;
std::mutex _local_merge_mutex;
// There are `_parallel_build_tasks` pipeline tasks to build runtime filter.
// We should call `signal` once all runtime filters are done and merged to one
// (e.g. `_merged_rf_num` is equal to `_parallel_build_tasks`).
int _merged_rf_num = 0;
const int _parallel_build_tasks = -1;

std::vector<std::shared_ptr<pipeline::RuntimeFilterTimer>> _filter_timer;
};
Expand Down
12 changes: 9 additions & 3 deletions be/src/exprs/runtime_filter_slots.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@ class VRuntimeFilterSlots {
public:
VRuntimeFilterSlots(
const std::vector<std::shared_ptr<vectorized::VExprContext>>& build_expr_ctxs,
const std::vector<TRuntimeFilterDesc>& runtime_filter_descs)
: _build_expr_context(build_expr_ctxs), _runtime_filter_descs(runtime_filter_descs) {}
const std::vector<TRuntimeFilterDesc>& runtime_filter_descs, bool is_global = false)
: _build_expr_context(build_expr_ctxs),
_runtime_filter_descs(runtime_filter_descs),
_is_global(is_global) {}

Status init(RuntimeState* state, int64_t hash_table_size) {
// runtime filter effect strategy
Expand All @@ -45,7 +47,10 @@ class VRuntimeFilterSlots {

std::map<int, bool> has_in_filter;

auto ignore_local_filter = [state](int filter_id) {
auto ignore_local_filter = [&](int filter_id) {
if (_is_global) {
return Status::OK();
}
std::vector<IRuntimeFilter*> filters;
RETURN_IF_ERROR(state->runtime_filter_mgr()->get_consume_filters(filter_id, filters));
if (filters.empty()) {
Expand Down Expand Up @@ -236,6 +241,7 @@ class VRuntimeFilterSlots {
private:
const std::vector<std::shared_ptr<vectorized::VExprContext>>& _build_expr_context;
const std::vector<TRuntimeFilterDesc>& _runtime_filter_descs;
const bool _is_global = false;
// prob_contition index -> [IRuntimeFilter]
std::map<int, std::list<IRuntimeFilter*>> _runtime_filters;
};
Expand Down
10 changes: 5 additions & 5 deletions be/src/pipeline/exec/aggregation_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -366,15 +366,15 @@ class AggSinkOperatorX : public DataSinkOperatorX<LocalStateType> {
Status sink(RuntimeState* state, vectorized::Block* in_block,
SourceState source_state) override;

std::vector<TExpr> get_local_shuffle_exprs() const override { return _partition_exprs; }
ExchangeType get_local_exchange_type() const override {
DataDistribution get_local_exchange_type() const override {
if (_probe_expr_ctxs.empty()) {
return _needs_finalize || DataSinkOperatorX<LocalStateType>::_child_x
->ignore_data_distribution()
? ExchangeType::PASSTHROUGH
: ExchangeType::NOOP;
? DataDistribution(ExchangeType::PASSTHROUGH)
: DataSinkOperatorX<LocalStateType>::get_local_exchange_type();
}
return _is_colocate ? ExchangeType::BUCKET_HASH_SHUFFLE : ExchangeType::HASH_SHUFFLE;
return _is_colocate ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs);
}

using DataSinkOperatorX<LocalStateType>::id;
Expand Down
5 changes: 3 additions & 2 deletions be/src/pipeline/exec/aggregation_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -506,8 +506,9 @@ Status AggLocalState::_get_without_key_result(RuntimeState* state, vectorized::B
if (!column_type->is_nullable() || data_types[i]->is_nullable() ||
!remove_nullable(column_type)->equals(*data_types[i])) {
return Status::InternalError(
"column_type not match data_types, column_type={}, data_types={}",
column_type->get_name(), data_types[i]->get_name());
"node id = {}, column_type not match data_types, column_type={}, "
"data_types={}",
_parent->node_id(), column_type->get_name(), data_types[i]->get_name());
}
}

Expand Down
11 changes: 6 additions & 5 deletions be/src/pipeline/exec/analytic_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,14 +107,15 @@ class AnalyticSinkOperatorX final : public DataSinkOperatorX<AnalyticSinkLocalSt

Status sink(RuntimeState* state, vectorized::Block* in_block,
SourceState source_state) override;
std::vector<TExpr> get_local_shuffle_exprs() const override { return _partition_exprs; }
ExchangeType get_local_exchange_type() const override {
DataDistribution get_local_exchange_type() const override {
if (_partition_by_eq_expr_ctxs.empty()) {
return ExchangeType::PASSTHROUGH;
return {ExchangeType::PASSTHROUGH};
} else if (_order_by_eq_expr_ctxs.empty()) {
return _is_colocate ? ExchangeType::BUCKET_HASH_SHUFFLE : ExchangeType::HASH_SHUFFLE;
return _is_colocate
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs);
}
return ExchangeType::NOOP;
return DataSinkOperatorX<AnalyticSinkLocalState>::get_local_exchange_type();
}

private:
Expand Down
4 changes: 3 additions & 1 deletion be/src/pipeline/exec/assert_num_rows_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ class AssertNumRowsOperatorX final : public StreamingOperatorX<AssertNumRowsLoca

[[nodiscard]] bool is_source() const override { return false; }

ExchangeType get_local_exchange_type() const override { return ExchangeType::PASSTHROUGH; }
DataDistribution get_local_exchange_type() const override {
return {ExchangeType::PASSTHROUGH};
}

private:
friend class AssertNumRowsLocalState;
Expand Down
7 changes: 5 additions & 2 deletions be/src/pipeline/exec/exchange_source_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,11 @@ class ExchangeSourceOperatorX final : public OperatorX<ExchangeLocalState> {
return _sub_plan_query_statistics_recvr;
}

bool need_to_local_shuffle() const override {
return !_is_hash_partition || OperatorX<ExchangeLocalState>::ignore_data_distribution();
DataDistribution get_local_exchange_type() const override {
if (!_is_hash_partition || OperatorX<ExchangeLocalState>::ignore_data_distribution()) {
return {ExchangeType::NOOP};
}
return {ExchangeType::HASH_SHUFFLE};
}

private:
Expand Down
9 changes: 6 additions & 3 deletions be/src/pipeline/exec/hashjoin_build_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo
_runtime_filters.resize(p._runtime_filter_descs.size());
for (size_t i = 0; i < p._runtime_filter_descs.size(); i++) {
RETURN_IF_ERROR(state->runtime_filter_mgr()->register_producer_filter(
p._runtime_filter_descs[i], state->query_options(), _build_expr_ctxs.size() == 1));
p._runtime_filter_descs[i], state->query_options(), _build_expr_ctxs.size() == 1,
p._use_global_rf, p._child_x->parallel_tasks()));
RETURN_IF_ERROR(state->runtime_filter_mgr()->get_producer_filter(
p._runtime_filter_descs[i].filter_id, &_runtime_filters[i]));
}
Expand Down Expand Up @@ -386,12 +387,14 @@ void HashJoinBuildSinkLocalState::_hash_table_init(RuntimeState* state) {

HashJoinBuildSinkOperatorX::HashJoinBuildSinkOperatorX(ObjectPool* pool, int operator_id,
const TPlanNode& tnode,
const DescriptorTbl& descs)
const DescriptorTbl& descs,
bool use_global_rf)
: JoinBuildSinkOperatorX(pool, operator_id, tnode, descs),
_join_distribution(tnode.hash_join_node.__isset.dist_type ? tnode.hash_join_node.dist_type
: TJoinDistributionType::NONE),
_is_broadcast_join(tnode.hash_join_node.__isset.is_broadcast_join &&
tnode.hash_join_node.is_broadcast_join) {
tnode.hash_join_node.is_broadcast_join),
_use_global_rf(use_global_rf) {
_runtime_filter_descs = tnode.runtime_filters;
}

Expand Down
18 changes: 10 additions & 8 deletions be/src/pipeline/exec/hashjoin_build_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ class HashJoinBuildSinkOperatorX final
: public JoinBuildSinkOperatorX<HashJoinBuildSinkLocalState> {
public:
HashJoinBuildSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode,
const DescriptorTbl& descs);
const DescriptorTbl& descs, bool use_global_rf);
Status init(const TDataSink& tsink) override {
return Status::InternalError("{} should not init with TDataSink",
JoinBuildSinkOperatorX<HashJoinBuildSinkLocalState>::_name);
Expand All @@ -155,18 +155,18 @@ class HashJoinBuildSinkOperatorX final
._should_build_hash_table;
}

std::vector<TExpr> get_local_shuffle_exprs() const override { return _partition_exprs; }
ExchangeType get_local_exchange_type() const override {
DataDistribution get_local_exchange_type() const override {
if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
return ExchangeType::NOOP;
return {ExchangeType::NOOP};
} else if (_is_broadcast_join) {
return _child_x->ignore_data_distribution() ? ExchangeType::BROADCAST
: ExchangeType::NOOP;
return _child_x->ignore_data_distribution()
? DataDistribution(ExchangeType::PASS_TO_ONE)
: DataDistribution(ExchangeType::NOOP);
}
return _join_distribution == TJoinDistributionType::BUCKET_SHUFFLE ||
_join_distribution == TJoinDistributionType::COLOCATE
? ExchangeType::BUCKET_HASH_SHUFFLE
: ExchangeType::HASH_SHUFFLE;
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs);
}

private:
Expand All @@ -187,6 +187,8 @@ class HashJoinBuildSinkOperatorX final
vectorized::SharedHashTableContextPtr _shared_hash_table_context = nullptr;
std::vector<TRuntimeFilterDesc> _runtime_filter_descs;
std::vector<TExpr> _partition_exprs;

const bool _use_global_rf;
};

} // namespace pipeline
Expand Down
12 changes: 6 additions & 6 deletions be/src/pipeline/exec/hashjoin_probe_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -163,17 +163,17 @@ class HashJoinProbeOperatorX final : public JoinProbeOperatorX<HashJoinProbeLoca
SourceState& source_state) const override;

bool need_more_input_data(RuntimeState* state) const override;
std::vector<TExpr> get_local_shuffle_exprs() const override { return _partition_exprs; }
ExchangeType get_local_exchange_type() const override {
DataDistribution get_local_exchange_type() const override {
if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
return ExchangeType::NOOP;
return {ExchangeType::NOOP};
}
return _is_broadcast_join
? ExchangeType::PASSTHROUGH
? DataDistribution(ExchangeType::PASSTHROUGH)
: (_join_distribution == TJoinDistributionType::BUCKET_SHUFFLE ||
_join_distribution == TJoinDistributionType::COLOCATE
? ExchangeType::BUCKET_HASH_SHUFFLE
: ExchangeType::HASH_SHUFFLE);
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE,
_partition_exprs)
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs));
}

private:
Expand Down
8 changes: 8 additions & 0 deletions be/src/pipeline/exec/nested_loop_join_build_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,14 @@ class NestedLoopJoinBuildSinkOperatorX final
Status sink(RuntimeState* state, vectorized::Block* in_block,
SourceState source_state) override;

DataDistribution get_local_exchange_type() const override {
if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
return {ExchangeType::NOOP};
}
return _child_x->ignore_data_distribution() ? DataDistribution(ExchangeType::BROADCAST)
: DataDistribution(ExchangeType::NOOP);
}

private:
friend class NestedLoopJoinBuildSinkLocalState;

Expand Down
6 changes: 3 additions & 3 deletions be/src/pipeline/exec/nested_loop_join_probe_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -227,11 +227,11 @@ class NestedLoopJoinProbeOperatorX final
return _old_version_flag ? _row_descriptor : *_intermediate_row_desc;
}

ExchangeType get_local_exchange_type() const override {
DataDistribution get_local_exchange_type() const override {
if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
return ExchangeType::NOOP;
return {ExchangeType::NOOP};
}
return ExchangeType::ADAPTIVE_PASSTHROUGH;
return {ExchangeType::ADAPTIVE_PASSTHROUGH};
}

const RowDescriptor& row_desc() override {
Expand Down
6 changes: 3 additions & 3 deletions be/src/pipeline/exec/partition_sort_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,11 @@ class PartitionSortSinkOperatorX final : public DataSinkOperatorX<PartitionSortS
Status open(RuntimeState* state) override;
Status sink(RuntimeState* state, vectorized::Block* in_block,
SourceState source_state) override;
ExchangeType get_local_exchange_type() const override {
DataDistribution get_local_exchange_type() const override {
if (_topn_phase == TPartTopNPhase::TWO_PHASE_GLOBAL) {
return ExchangeType::NOOP;
return DataSinkOperatorX<PartitionSortSinkLocalState>::get_local_exchange_type();
}
return ExchangeType::PASSTHROUGH;
return {ExchangeType::PASSTHROUGH};
}

private:
Expand Down
Loading

0 comments on commit 8701e31

Please sign in to comment.