Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix](local exchange) Fix local exchange blocked by a huge data block #38657

Merged
merged 1 commit into from
Aug 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions be/src/pipeline/local_exchange/local_exchange_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ class LocalExchangeSinkLocalState final : public PipelineXSinkLocalState<LocalEx
friend class PassToOneExchanger;
friend class LocalMergeSortExchanger;
friend class AdaptivePassthroughExchanger;
template <typename BlockType>
friend class Exchanger;

ExchangerBase* _exchanger = nullptr;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,13 @@ std::string LocalExchangeSourceLocalState::debug_string(int indentation_level) c
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer,
"{}, _channel_id: {}, _num_partitions: {}, _num_senders: {}, _num_sources: {}, "
"_running_sink_operators: {}, _running_source_operators: {}, mem_usage: {}",
"_running_sink_operators: {}, _running_source_operators: {}, mem_usage: {}, "
"data queue info: {}",
Base::debug_string(indentation_level), _channel_id, _exchanger->_num_partitions,
_exchanger->_num_senders, _exchanger->_num_sources,
_exchanger->_running_sink_operators, _exchanger->_running_source_operators,
_shared_state->mem_usage.load());
_shared_state->mem_usage.load(),
_exchanger->data_queue_debug_string(_channel_id));
size_t i = 0;
fmt::format_to(debug_string_buffer, ", MemTrackers: ");
for (auto* mem_tracker : _shared_state->mem_trackers) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ class LocalExchangeSourceLocalState final : public PipelineXLocalState<LocalExch
friend class PassToOneExchanger;
friend class LocalMergeSortExchanger;
friend class AdaptivePassthroughExchanger;
template <typename BlockType>
friend class Exchanger;

ExchangerBase* _exchanger = nullptr;
int _channel_id;
Expand Down
115 changes: 50 additions & 65 deletions be/src/pipeline/local_exchange/local_exchanger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,37 @@

namespace doris::pipeline {

template <typename BlockType>
bool Exchanger<BlockType>::_enqueue_data_and_set_ready(int channel_id,
LocalExchangeSinkLocalState& local_state,
BlockType&& block) {
std::unique_lock l(_m);
if (_data_queue[channel_id].enqueue(std::move(block))) {
local_state._shared_state->set_ready_to_read(channel_id);
return true;
}
return false;
}

template <typename BlockType>
bool Exchanger<BlockType>::_dequeue_data(LocalExchangeSourceLocalState& local_state,
BlockType& block, bool* eos) {
bool all_finished = _running_sink_operators == 0;
if (_data_queue[local_state._channel_id].try_dequeue(block)) {
return true;
} else if (all_finished) {
*eos = true;
} else {
std::unique_lock l(_m);
if (_data_queue[local_state._channel_id].try_dequeue(block)) {
return true;
}
COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
local_state._dependency->block();
}
return false;
}

Status ShuffleExchanger::sink(RuntimeState* state, vectorized::Block* in_block, bool eos,
LocalExchangeSinkLocalState& local_state) {
{
Expand Down Expand Up @@ -74,25 +105,18 @@ Status ShuffleExchanger::get_block(RuntimeState* state, vectorized::Block* block
return Status::OK();
};

bool all_finished = _running_sink_operators == 0;
if (_data_queue[local_state._channel_id].try_dequeue(partitioned_block)) {
if (_dequeue_data(local_state, partitioned_block, eos)) {
SCOPED_TIMER(local_state._copy_data_timer);
mutable_block = vectorized::VectorizedUtils::build_mutable_mem_reuse_block(
block, partitioned_block.first->data_block);
RETURN_IF_ERROR(get_data(block));
} else if (all_finished) {
*eos = true;
} else {
COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
local_state._dependency->block();
}
return Status::OK();
}

Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __restrict channel_ids,
vectorized::Block* block, bool eos,
LocalExchangeSinkLocalState& local_state) {
auto& data_queue = _data_queue;
const auto rows = block->rows();
auto row_idx = std::make_shared<vectorized::PODArray<uint32_t>>(rows);
{
Expand Down Expand Up @@ -135,9 +159,9 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __rest
if (size > 0) {
local_state._shared_state->add_mem_usage(
it.second, new_block_wrapper->data_block.allocated_bytes(), false);
if (data_queue[it.second].enqueue({new_block_wrapper, {row_idx, start, size}})) {
local_state._shared_state->set_ready_to_read(it.second);
} else {

if (!_enqueue_data_and_set_ready(it.second, local_state,
{new_block_wrapper, {row_idx, start, size}})) {
local_state._shared_state->sub_mem_usage(
it.second, new_block_wrapper->data_block.allocated_bytes(), false);
new_block_wrapper->unref(local_state._shared_state);
Expand All @@ -154,10 +178,8 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __rest
if (size > 0) {
local_state._shared_state->add_mem_usage(
i % _num_sources, new_block_wrapper->data_block.allocated_bytes(), false);
if (data_queue[i % _num_sources].enqueue(
{new_block_wrapper, {row_idx, start, size}})) {
local_state._shared_state->set_ready_to_read(i % _num_sources);
} else {
if (!_enqueue_data_and_set_ready(i % _num_sources, local_state,
{new_block_wrapper, {row_idx, start, size}})) {
local_state._shared_state->sub_mem_usage(
i % _num_sources, new_block_wrapper->data_block.allocated_bytes(),
false);
Expand All @@ -177,9 +199,8 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __rest
if (size > 0) {
local_state._shared_state->add_mem_usage(
map[i], new_block_wrapper->data_block.allocated_bytes(), false);
if (data_queue[map[i]].enqueue({new_block_wrapper, {row_idx, start, size}})) {
local_state._shared_state->set_ready_to_read(map[i]);
} else {
if (!_enqueue_data_and_set_ready(map[i], local_state,
{new_block_wrapper, {row_idx, start, size}})) {
local_state._shared_state->sub_mem_usage(
map[i], new_block_wrapper->data_block.allocated_bytes(), false);
new_block_wrapper->unref(local_state._shared_state);
Expand All @@ -203,9 +224,7 @@ Status PassthroughExchanger::sink(RuntimeState* state, vectorized::Block* in_blo
auto channel_id = (local_state._channel_id++) % _num_partitions;
size_t memory_usage = new_block.allocated_bytes();
local_state._shared_state->add_mem_usage(channel_id, memory_usage);
if (_data_queue[channel_id].enqueue(std::move(new_block))) {
local_state._shared_state->set_ready_to_read(channel_id);
} else {
if (!_enqueue_data_and_set_ready(channel_id, local_state, std::move(new_block))) {
yiguolei marked this conversation as resolved.
Show resolved Hide resolved
local_state._shared_state->sub_mem_usage(channel_id, memory_usage);
}

Expand All @@ -224,19 +243,13 @@ void PassthroughExchanger::close(LocalExchangeSourceLocalState& local_state) {
Status PassthroughExchanger::get_block(RuntimeState* state, vectorized::Block* block, bool* eos,
LocalExchangeSourceLocalState& local_state) {
vectorized::Block next_block;
bool all_finished = _running_sink_operators == 0;
if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
if (_dequeue_data(local_state, next_block, eos)) {
block->swap(next_block);
local_state._shared_state->sub_mem_usage(local_state._channel_id, block->allocated_bytes());
if (_free_block_limit == 0 ||
_free_blocks.size_approx() < _free_block_limit * _num_sources) {
_free_blocks.enqueue(std::move(next_block));
}
} else if (all_finished) {
*eos = true;
} else {
COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
local_state._dependency->block();
}
return Status::OK();
}
Expand All @@ -245,9 +258,7 @@ Status PassToOneExchanger::sink(RuntimeState* state, vectorized::Block* in_block
LocalExchangeSinkLocalState& local_state) {
vectorized::Block new_block(in_block->clone_empty());
new_block.swap(*in_block);
if (_data_queue[0].enqueue(std::move(new_block))) {
local_state._shared_state->set_ready_to_read(0);
}
_enqueue_data_and_set_ready(0, local_state, std::move(new_block));

return Status::OK();
}
Expand All @@ -259,14 +270,8 @@ Status PassToOneExchanger::get_block(RuntimeState* state, vectorized::Block* blo
return Status::OK();
}
vectorized::Block next_block;
bool all_finished = _running_sink_operators == 0;
if (_data_queue[0].try_dequeue(next_block)) {
if (_dequeue_data(local_state, next_block, eos)) {
*block = std::move(next_block);
} else if (all_finished) {
*eos = true;
} else {
COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
local_state._dependency->block();
}
return Status::OK();
}
Expand All @@ -283,9 +288,7 @@ Status LocalMergeSortExchanger::sink(RuntimeState* state, vectorized::Block* in_
size_t memory_usage = new_block.allocated_bytes();
add_mem_usage(local_state, memory_usage);

if (_data_queue[local_state._channel_id].enqueue(std::move(new_block))) {
local_state._shared_state->set_ready_to_read(0);
} else {
if (!_enqueue_data_and_set_ready(local_state._channel_id, local_state, std::move(new_block))) {
sub_mem_usage(local_state, memory_usage);
}
if (eos) {
Expand Down Expand Up @@ -402,9 +405,7 @@ Status BroadcastExchanger::sink(RuntimeState* state, vectorized::Block* in_block
for (size_t i = 0; i < _num_partitions; i++) {
auto mutable_block = vectorized::MutableBlock::create_unique(in_block->clone_empty());
RETURN_IF_ERROR(mutable_block->add_rows(in_block, 0, in_block->rows()));
if (_data_queue[i].enqueue(mutable_block->to_block())) {
local_state._shared_state->set_ready_to_read(i);
}
_enqueue_data_and_set_ready(i, local_state, mutable_block->to_block());
}

return Status::OK();
Expand All @@ -421,14 +422,8 @@ void BroadcastExchanger::close(LocalExchangeSourceLocalState& local_state) {
Status BroadcastExchanger::get_block(RuntimeState* state, vectorized::Block* block, bool* eos,
LocalExchangeSourceLocalState& local_state) {
vectorized::Block next_block;
bool all_finished = _running_sink_operators == 0;
if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
if (_dequeue_data(local_state, next_block, eos)) {
*block = std::move(next_block);
} else if (all_finished) {
*eos = true;
} else {
COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
local_state._dependency->block();
}
return Status::OK();
}
Expand All @@ -444,9 +439,8 @@ Status AdaptivePassthroughExchanger::_passthrough_sink(RuntimeState* state,
auto channel_id = (local_state._channel_id++) % _num_partitions;
size_t memory_usage = new_block.allocated_bytes();
local_state._shared_state->add_mem_usage(channel_id, memory_usage);
if (_data_queue[channel_id].enqueue(std::move(new_block))) {
local_state._shared_state->set_ready_to_read(channel_id);
} else {

if (!_enqueue_data_and_set_ready(channel_id, local_state, std::move(new_block))) {
local_state._shared_state->sub_mem_usage(channel_id, memory_usage);
}

Expand Down Expand Up @@ -477,7 +471,6 @@ Status AdaptivePassthroughExchanger::_split_rows(RuntimeState* state,
const uint32_t* __restrict channel_ids,
vectorized::Block* block, bool eos,
LocalExchangeSinkLocalState& local_state) {
auto& data_queue = _data_queue;
const auto rows = block->rows();
auto row_idx = std::make_shared<std::vector<uint32_t>>(rows);
{
Expand Down Expand Up @@ -506,9 +499,7 @@ Status AdaptivePassthroughExchanger::_split_rows(RuntimeState* state,

size_t memory_usage = new_block.allocated_bytes();
local_state._shared_state->add_mem_usage(i, memory_usage);
if (data_queue[i].enqueue(std::move(new_block))) {
local_state._shared_state->set_ready_to_read(i);
} else {
if (!_enqueue_data_and_set_ready(i, local_state, std::move(new_block))) {
local_state._shared_state->sub_mem_usage(i, memory_usage);
}
}
Expand All @@ -532,19 +523,13 @@ Status AdaptivePassthroughExchanger::get_block(RuntimeState* state, vectorized::
bool* eos,
LocalExchangeSourceLocalState& local_state) {
vectorized::Block next_block;
bool all_finished = _running_sink_operators == 0;
if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
if (_dequeue_data(local_state, next_block, eos)) {
block->swap(next_block);
if (_free_block_limit == 0 ||
_free_blocks.size_approx() < _free_block_limit * _num_sources) {
_free_blocks.enqueue(std::move(next_block));
}
local_state._shared_state->sub_mem_usage(local_state._channel_id, block->allocated_bytes());
} else if (all_finished) {
*eos = true;
} else {
COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
local_state._dependency->block();
}
return Status::OK();
}
Expand Down
14 changes: 14 additions & 0 deletions be/src/pipeline/local_exchange/local_exchanger.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ class ExchangerBase {
virtual std::vector<Dependency*> local_sink_state_dependency(int channel_id) { return {}; }
virtual std::vector<Dependency*> local_state_dependency(int channel_id) { return {}; }

virtual std::string data_queue_debug_string(int i) = 0;

protected:
friend struct LocalExchangeSharedState;
friend struct ShuffleBlockWrapper;
Expand Down Expand Up @@ -115,9 +117,21 @@ class Exchanger : public ExchangerBase {
: ExchangerBase(running_sink_operators, num_sources, num_partitions, free_block_limit) {
}
~Exchanger() override = default;
std::string data_queue_debug_string(int i) override {
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer, "Data Queue {}: [size approx = {}, eos = {}]",
_data_queue[i].data_queue.size_approx(), _data_queue[i].eos);
return fmt::to_string(debug_string_buffer);
}

protected:
bool _enqueue_data_and_set_ready(int channel_id, LocalExchangeSinkLocalState& local_state,
BlockType&& block);
bool _dequeue_data(LocalExchangeSourceLocalState& local_state, BlockType& block, bool* eos);
std::vector<BlockQueue<BlockType>> _data_queue;

private:
std::mutex _m;
};

class LocalExchangeSourceLocalState;
Expand Down
Loading