diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp b/be/src/pipeline/exec/hashjoin_build_sink.cpp index 8f7b176a979a4d..b6cd342d3988be 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.cpp +++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp @@ -113,6 +113,20 @@ Status HashJoinBuildSinkLocalState::open(RuntimeState* state) { Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status exec_status) { auto p = _parent->cast(); Defer defer {[&]() { + if (_should_build_hash_table) { + // The build side hash key column maybe no need output, but we need to keep the column in block + // because it is used to compare with probe side hash key column + if (p._should_keep_hash_key_column && _build_col_ids.size() == 1) { + p._should_keep_column_flags[_build_col_ids[0]] = true; + } + + if (_shared_state->build_block) { + // release the memory of unused column in probe stage + _shared_state->build_block->clear_column_mem_not_keep( + p._should_keep_column_flags, bool(p._shared_hashtable_controller)); + } + } + if (_should_build_hash_table && p._shared_hashtable_controller) { p._shared_hashtable_controller->signal_finish(p.node_id()); } @@ -386,7 +400,9 @@ void HashJoinBuildSinkLocalState::_hash_table_init(RuntimeState* state) { default: _shared_state->hash_table_variants ->emplace(); + return; } + p._should_keep_hash_key_column = true; return; } @@ -433,6 +449,10 @@ Status HashJoinBuildSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* st RETURN_IF_ERROR(JoinBuildSinkOperatorX::init(tnode, state)); DCHECK(tnode.__isset.hash_join_node); + if (tnode.hash_join_node.__isset.hash_output_slot_ids) { + _hash_output_slot_ids = tnode.hash_join_node.hash_output_slot_ids; + } + const bool build_stores_null = _join_op == TJoinOp::RIGHT_OUTER_JOIN || _join_op == TJoinOp::FULL_OUTER_JOIN || _join_op == TJoinOp::RIGHT_ANTI_JOIN; @@ -494,6 +514,17 @@ Status HashJoinBuildSinkOperatorX::open(RuntimeState* state) { _shared_hash_table_context = _shared_hashtable_controller->get_context(node_id()); } } + auto init_keep_column_flags = [&](auto& tuple_descs, auto& output_slot_flags) { + for (const auto& tuple_desc : tuple_descs) { + for (const auto& slot_desc : tuple_desc->slots()) { + output_slot_flags.emplace_back( + _hash_output_slot_ids.empty() || + std::find(_hash_output_slot_ids.begin(), _hash_output_slot_ids.end(), + slot_desc->id()) != _hash_output_slot_ids.end()); + } + } + }; + init_keep_column_flags(row_desc().tuple_descriptors(), _should_keep_column_flags); RETURN_IF_ERROR(vectorized::VExpr::prepare(_build_expr_ctxs, state, _child->row_desc())); return vectorized::VExpr::open(_build_expr_ctxs, state); } @@ -565,7 +596,6 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block* _shared_hash_table_context->build_indexes_null = local_state._shared_state->build_indexes_null; local_state._runtime_filter_slots->copy_to_shared_context(_shared_hash_table_context); - _shared_hashtable_controller->signal(node_id()); } } else if (!local_state._should_build_hash_table) { DCHECK(_shared_hashtable_controller != nullptr); diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h b/be/src/pipeline/exec/hashjoin_build_sink.h index cf677833fb5b64..a544cdcf4563a4 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.h +++ b/be/src/pipeline/exec/hashjoin_build_sink.h @@ -173,6 +173,10 @@ class HashJoinBuildSinkOperatorX final const std::vector _partition_exprs; const bool _need_local_merge; + + std::vector _hash_output_slot_ids; + std::vector _should_keep_column_flags; + bool _should_keep_hash_key_column = false; }; template diff --git a/be/src/pipeline/exec/join/process_hash_table_probe_impl.h b/be/src/pipeline/exec/join/process_hash_table_probe_impl.h index 653cc8ab4473dd..6bb5a2006ab9b0 100644 --- a/be/src/pipeline/exec/join/process_hash_table_probe_impl.h +++ b/be/src/pipeline/exec/join/process_hash_table_probe_impl.h @@ -640,9 +640,13 @@ Status ProcessHashTableProbe::process_data_in_hashtable( mcol.size(), _right_col_len, _right_col_idx); } for (size_t j = 0; j < _right_col_len; ++j) { - const auto& column = *_build_block->safe_get_by_position(j).column; - mcol[j + _right_col_idx]->insert_indices_from(column, _build_indexs.data(), - _build_indexs.data() + block_size); + if (_right_output_slot_flags->at(j)) { + const auto& column = *_build_block->safe_get_by_position(j).column; + mcol[j + _right_col_idx]->insert_indices_from(column, _build_indexs.data(), + _build_indexs.data() + block_size); + } else { + mcol[j + _right_col_idx]->resize(block_size); + } } // just resize the left table column in case with other conjunct to make block size is not zero diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp index 6efec59544ec23..2e54bd76fff78e 100644 --- a/be/src/vec/core/block.cpp +++ b/be/src/vec/core/block.cpp @@ -753,6 +753,24 @@ void Block::erase_tmp_columns() noexcept { } } +void Block::clear_column_mem_not_keep(const std::vector& column_keep_flags, + bool need_keep_first) { + if (data.size() >= column_keep_flags.size()) { + auto origin_rows = rows(); + for (size_t i = 0; i < column_keep_flags.size(); ++i) { + if (!column_keep_flags[i]) { + data[i].column = data[i].column->clone_empty(); + } + } + + if (need_keep_first && !column_keep_flags[0]) { + auto first_column = data[0].column->clone_empty(); + first_column->resize(origin_rows); + data[0].column = std::move(first_column); + } + } +} + void Block::swap(Block& other) noexcept { SCOPED_SKIP_MEMORY_CHECK(); data.swap(other.data); diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h index 9abc75141290f5..f180460169342a 100644 --- a/be/src/vec/core/block.h +++ b/be/src/vec/core/block.h @@ -404,6 +404,9 @@ class Block { // we built some temporary columns into block void erase_tmp_columns() noexcept; + void clear_column_mem_not_keep(const std::vector& column_keep_flags, + bool need_keep_first); + private: void erase_impl(size_t position); }; diff --git a/be/src/vec/runtime/shared_hash_table_controller.cpp b/be/src/vec/runtime/shared_hash_table_controller.cpp index 4b77b1ed8a34bd..e5a28bed6ce50a 100644 --- a/be/src/vec/runtime/shared_hash_table_controller.cpp +++ b/be/src/vec/runtime/shared_hash_table_controller.cpp @@ -42,7 +42,7 @@ SharedHashTableContextPtr SharedHashTableController::get_context(int my_node_id) return _shared_contexts[my_node_id]; } -void SharedHashTableController::signal(int my_node_id) { +void SharedHashTableController::signal_finish(int my_node_id) { std::lock_guard lock(_mutex); auto it = _shared_contexts.find(my_node_id); if (it != _shared_contexts.cend()) { @@ -52,10 +52,6 @@ void SharedHashTableController::signal(int my_node_id) { for (auto& dep : _dependencies[my_node_id]) { dep->set_ready(); } -} - -void SharedHashTableController::signal_finish(int my_node_id) { - std::lock_guard lock(_mutex); for (auto& dep : _finish_dependencies[my_node_id]) { dep->set_ready(); } diff --git a/be/src/vec/runtime/shared_hash_table_controller.h b/be/src/vec/runtime/shared_hash_table_controller.h index b04b1cdba064b9..173f9d46e890c8 100644 --- a/be/src/vec/runtime/shared_hash_table_controller.h +++ b/be/src/vec/runtime/shared_hash_table_controller.h @@ -76,7 +76,6 @@ class SharedHashTableController { void set_builder_and_consumers(TUniqueId builder, int node_id); TUniqueId get_builder_fragment_instance_id(int my_node_id); SharedHashTableContextPtr get_context(int my_node_id); - void signal(int my_node_id); void signal_finish(int my_node_id); void append_dependency(int node_id, std::shared_ptr dep, std::shared_ptr finish_dep) {