Skip to content

Commit

Permalink
[opt](hash_join) Merging all build blocks at once will cause performa…
Browse files Browse the repository at this point in the history
…nce issue
  • Loading branch information
mrhhsg committed Jul 8, 2024
1 parent 111886a commit 65f88c4
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 38 deletions.
21 changes: 4 additions & 17 deletions be/src/pipeline/exec/hashjoin_build_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,7 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
vectorized::MutableBlock::build_mutable_block(&tmp_build_block);
}

if (in_block->rows() != 0) {
if (!in_block->empty()) {
std::vector<int> res_col_ids(_build_expr_ctxs.size());
RETURN_IF_ERROR(local_state._do_evaluate(*in_block, local_state._build_expr_ctxs,
*local_state._build_expr_call_timer,
Expand All @@ -533,27 +533,14 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block*

local_state._mem_tracker->consume(in_block->bytes());
COUNTER_UPDATE(local_state._build_blocks_memory_usage, in_block->bytes());
local_state._build_blocks.emplace_back(std::move(*in_block));

RETURN_IF_ERROR(local_state._build_side_mutable_block.merge_ignore_overflow(
std::move(*in_block)));
}
}

if (local_state._should_build_hash_table && eos) {
DCHECK(!local_state._build_side_mutable_block.empty());

for (auto& column : local_state._build_side_mutable_block.mutable_columns()) {
column->reserve(local_state._build_side_rows);
}

{
SCOPED_TIMER(local_state._build_side_merge_block_timer);
for (auto& block : local_state._build_blocks) {
RETURN_IF_ERROR(local_state._build_side_mutable_block.merge_ignore_overflow(block));

vectorized::Block temp;
std::swap(block, temp);
}
}

local_state._shared_state->build_block = std::make_shared<vectorized::Block>(
local_state._build_side_mutable_block.to_block());

Expand Down
1 change: 0 additions & 1 deletion be/src/pipeline/exec/hashjoin_build_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ class HashJoinBuildSinkLocalState final
int64_t _build_side_last_mem_used = 0;

size_t _build_side_rows = 0;
std::vector<vectorized::Block> _build_blocks;

vectorized::MutableBlock _build_side_mutable_block;
std::shared_ptr<VRuntimeFilterSlots> _runtime_filter_slots;
Expand Down
44 changes: 24 additions & 20 deletions be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,30 +100,33 @@ size_t PartitionedHashJoinSinkLocalState::revocable_mem_size(RuntimeState* state
}

Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeState* state) {
_shared_state->need_to_spill = true;
auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>();
_shared_state->inner_shared_state->hash_table_variants.reset();
auto row_desc = p._child_x->row_desc();
const auto num_slots = row_desc.num_slots();
std::vector<vectorized::Block> build_blocks;
vectorized::Block build_block;
auto inner_sink_state_ = _shared_state->inner_runtime_state->get_sink_local_state();
if (inner_sink_state_) {
auto inner_sink_state = assert_cast<HashJoinBuildSinkLocalState*>(inner_sink_state_);
build_blocks = std::move(inner_sink_state->_build_blocks);
build_block = inner_sink_state->_build_side_mutable_block.to_block();
}

if (build_blocks.empty()) {
if (build_block.rows() <= 1) {
LOG(WARNING) << "has no data to revoke, node: " << _parent->node_id()
<< ", task: " << state->task_id();
return Status::OK();
}

auto spill_func = [build_blocks = std::move(build_blocks), state, num_slots, this]() mutable {
if (build_block.columns() > num_slots) {
build_block.erase(num_slots);
}

auto spill_func = [build_block = std::move(build_block), state, this]() mutable {
auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>();
auto& partitioned_blocks = _shared_state->partitioned_build_blocks;
std::vector<std::vector<uint32_t>> partitions_indexes(p._partition_count);

const auto reserved_size = 4096;
const size_t reserved_size = 4096;
std::for_each(partitions_indexes.begin(), partitions_indexes.end(),
[](std::vector<uint32_t>& indices) { indices.reserve(reserved_size); });

Expand All @@ -142,24 +145,27 @@ Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta
return true;
};

for (size_t block_idx = 0; block_idx != build_blocks.size(); ++block_idx) {
auto& build_block = build_blocks[block_idx];
const auto is_last_block = (block_idx == (build_blocks.size() - 1));
if (UNLIKELY(build_block.empty())) {
continue;
}
size_t total_rows = build_block.rows();
size_t offset = 1;
while (offset < total_rows) {
auto sub_block = build_block.clone_empty();
size_t this_run = std::min(reserved_size, total_rows - offset);

if (build_block.columns() > num_slots) {
build_block.erase(num_slots);
for (size_t i = 0; i != build_block.columns(); ++i) {
sub_block.get_by_position(i).column =
build_block.get_by_position(i).column->cut(offset, this_run);
}

offset += this_run;
const auto is_last_block = offset == total_rows;

{
SCOPED_TIMER(_partition_timer);
(void)_partitioner->do_partitioning(state, &build_block, _mem_tracker.get());
(void)_partitioner->do_partitioning(state, &sub_block, _mem_tracker.get());
}

auto* channel_ids = _partitioner->get_channel_ids().get<uint32_t>();
for (size_t i = 0; i != build_block.rows(); ++i) {
const auto* channel_ids = _partitioner->get_channel_ids().get<uint32_t>();
for (size_t i = 0; i != sub_block.rows(); ++i) {
partitions_indexes[channel_ids[i]].emplace_back(i);
}

Expand All @@ -176,7 +182,7 @@ Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta

{
SCOPED_TIMER(_partition_shuffle_timer);
Status st = partition_block->add_rows(&build_block, begin, end);
Status st = partition_block->add_rows(&sub_block, begin, end);
if (!st.ok()) {
std::unique_lock<std::mutex> lock(_spill_lock);
_spill_status = st;
Expand All @@ -195,8 +201,6 @@ Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta
vectorized::MutableBlock::create_unique(build_block.clone_empty());
}
}

build_block.clear();
}

_dependency->set_ready();
Expand Down

0 comments on commit 65f88c4

Please sign in to comment.