Skip to content

Commit

Permalink
[opt](MultiCast) Avoid copying while holding a lock (apache#37462)
Browse files Browse the repository at this point in the history
Previously, copying was done while holding a lock;
Now, get block while holding the lock and then copy
  • Loading branch information
Mryange committed Aug 23, 2024
1 parent 9d5468d commit 5457e9b
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 77 deletions.
5 changes: 0 additions & 5 deletions be/src/pipeline/exec/multi_cast_data_stream_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,6 @@ Status MultiCastDataStreamerSourceOperator::get_block(RuntimeState* state, vecto
return Status::OK();
}

Status MultiCastDataStreamerSourceOperator::close(doris::RuntimeState* state) {
_multi_cast_data_streamer->close_sender(_consumer_id);
return OperatorBase::close(state);
}

RuntimeProfile* MultiCastDataStreamerSourceOperator::get_runtime_profile() const {
return _multi_cast_data_streamer->profile();
}
Expand Down
2 changes: 0 additions & 2 deletions be/src/pipeline/exec/multi_cast_data_stream_source.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,6 @@ class MultiCastDataStreamerSourceOperator final : public OperatorBase,

bool can_read() override;

Status close(doris::RuntimeState* state) override;

[[nodiscard]] RuntimeProfile* get_runtime_profile() const override;

private:
Expand Down
127 changes: 68 additions & 59 deletions be/src/pipeline/exec/multi_cast_data_streamer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,81 +23,97 @@

namespace doris::pipeline {

MultiCastBlock::MultiCastBlock(vectorized::Block* block, int used_count, size_t mem_size)
: _used_count(used_count), _mem_size(mem_size) {
MultiCastBlock::MultiCastBlock(vectorized::Block* block, int used_count, int un_finish_copy,
size_t mem_size)
: _used_count(used_count), _un_finish_copy(un_finish_copy), _mem_size(mem_size) {
_block = vectorized::Block::create_unique(block->get_columns_with_type_and_name());
block->clear();
}

Status MultiCastDataStreamer::pull(int sender_idx, doris::vectorized::Block* block, bool* eos) {
std::lock_guard l(_mutex);
auto& pos_to_pull = _sender_pos_to_read[sender_idx];
if (pos_to_pull != _multi_cast_blocks.end()) {
if (pos_to_pull->_used_count == 1) {
DCHECK(pos_to_pull == _multi_cast_blocks.begin());
pos_to_pull->_block->swap(*block);

_cumulative_mem_size -= pos_to_pull->_mem_size;
pos_to_pull++;
_multi_cast_blocks.pop_front();
} else {
pos_to_pull->_block->create_same_struct_block(0)->swap(*block);
RETURN_IF_ERROR(vectorized::MutableBlock(block).merge(*pos_to_pull->_block));
pos_to_pull->_used_count--;
pos_to_pull++;
int* un_finish_copy = nullptr;
int use_count = 0;
{
std::lock_guard l(_mutex);
auto& pos_to_pull = _sender_pos_to_read[sender_idx];
const auto end = _multi_cast_blocks.end();
DCHECK(pos_to_pull != end);

*block = *pos_to_pull->_block;

_cumulative_mem_size -= pos_to_pull->_mem_size;

pos_to_pull->_used_count--;
use_count = pos_to_pull->_used_count;
un_finish_copy = &pos_to_pull->_un_finish_copy;

pos_to_pull++;

if (pos_to_pull == end) {
_block_reading(sender_idx);
}

*eos = _eos and pos_to_pull == end;
}
*eos = _eos and pos_to_pull == _multi_cast_blocks.end();
if (pos_to_pull == _multi_cast_blocks.end()) {
_block_reading(sender_idx);

if (use_count == 0) {
// will clear _multi_cast_blocks
_wait_copy_block(block, *un_finish_copy);
} else {
_copy_block(block, *un_finish_copy);
}

return Status::OK();
}

void MultiCastDataStreamer::close_sender(int sender_idx) {
std::lock_guard l(_mutex);
auto& pos_to_pull = _sender_pos_to_read[sender_idx];
while (pos_to_pull != _multi_cast_blocks.end()) {
if (pos_to_pull->_used_count == 1) {
DCHECK(pos_to_pull == _multi_cast_blocks.begin());
_cumulative_mem_size -= pos_to_pull->_mem_size;
pos_to_pull++;
_multi_cast_blocks.pop_front();
} else {
pos_to_pull->_used_count--;
pos_to_pull++;
}
void MultiCastDataStreamer::_copy_block(vectorized::Block* block, int& un_finish_copy) {
const auto rows = block->rows();
for (int i = 0; i < block->columns(); ++i) {
block->get_by_position(i).column = block->get_by_position(i).column->clone_resized(rows);
}

std::unique_lock l(_mutex);
un_finish_copy--;
if (un_finish_copy == 0) {
l.unlock();
_cv.notify_one();
}
_closed_sender_count++;
_block_reading(sender_idx);
}

void MultiCastDataStreamer::_wait_copy_block(vectorized::Block* block, int& un_finish_copy) {
std::unique_lock l(_mutex);
_cv.wait(l, [&]() { return un_finish_copy == 0; });
_multi_cast_blocks.pop_front();
}

Status MultiCastDataStreamer::push(RuntimeState* state, doris::vectorized::Block* block, bool eos) {
auto rows = block->rows();
COUNTER_UPDATE(_process_rows, rows);

auto block_mem_size = block->allocated_bytes();
std::lock_guard l(_mutex);
int need_process_count = _cast_sender_count - _closed_sender_count;
if (need_process_count == 0) {
return Status::EndOfFile("All data streamer is EOF");
}
// TODO: if the [queue back block rows + block->rows()] < batch_size, better
// do merge block. but need check the need_process_count and used_count whether
// equal
_multi_cast_blocks.emplace_back(block, need_process_count, block_mem_size);
const auto block_mem_size = block->allocated_bytes();
_cumulative_mem_size += block_mem_size;
COUNTER_SET(_peak_mem_usage, std::max(_cumulative_mem_size, _peak_mem_usage->value()));

auto end = _multi_cast_blocks.end();
end--;
for (int i = 0; i < _sender_pos_to_read.size(); ++i) {
if (_sender_pos_to_read[i] == _multi_cast_blocks.end()) {
_sender_pos_to_read[i] = end;
_set_ready_for_read(i);
{
std::lock_guard l(_mutex);
_multi_cast_blocks.emplace_back(block, _cast_sender_count, _cast_sender_count - 1,
block_mem_size);
// last elem
auto end = std::prev(_multi_cast_blocks.end());
for (int i = 0; i < _sender_pos_to_read.size(); ++i) {
if (_sender_pos_to_read[i] == _multi_cast_blocks.end()) {
_sender_pos_to_read[i] = end;
_set_ready_for_read(i);
}
}
_eos = eos;
}

if (_eos) {
for (auto* read_dep : _dependencies) {
read_dep->set_always_ready();
}
}
_eos = eos;
return Status::OK();
}

Expand All @@ -110,13 +126,6 @@ void MultiCastDataStreamer::_set_ready_for_read(int sender_idx) {
dep->set_ready();
}

void MultiCastDataStreamer::_set_ready_for_read() {
for (auto* dep : _dependencies) {
DCHECK(dep);
dep->set_ready();
}
}

void MultiCastDataStreamer::_block_reading(int sender_idx) {
if (_dependencies.empty()) {
return;
Expand Down
18 changes: 7 additions & 11 deletions be/src/pipeline/exec/multi_cast_data_streamer.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@ namespace doris::pipeline {

class Dependency;
struct MultiCastBlock {
MultiCastBlock(vectorized::Block* block, int used_count, size_t mem_size);
MultiCastBlock(vectorized::Block* block, int used_count, int need_copy, size_t mem_size);

std::unique_ptr<vectorized::Block> _block;
int _used_count;
int _un_finish_copy;
size_t _mem_size;
};

Expand All @@ -52,8 +53,6 @@ class MultiCastDataStreamer {

Status pull(int sender_idx, vectorized::Block* block, bool* eos);

void close_sender(int sender_idx);

Status push(RuntimeState* state, vectorized::Block* block, bool eos);

// use sink to check can_write, now always true after we support spill to disk
Expand All @@ -68,30 +67,27 @@ class MultiCastDataStreamer {

RuntimeProfile* profile() { return _profile; }

void set_eos() {
std::lock_guard l(_mutex);
_eos = true;
_set_ready_for_read();
}

void set_dep_by_sender_idx(int sender_idx, Dependency* dep) {
_dependencies[sender_idx] = dep;
_block_reading(sender_idx);
}

private:
void _set_ready_for_read(int sender_idx);
void _set_ready_for_read();
void _block_reading(int sender_idx);

void _copy_block(vectorized::Block* block, int& un_finish_copy);

void _wait_copy_block(vectorized::Block* block, int& un_finish_copy);

const RowDescriptor& _row_desc;
RuntimeProfile* _profile = nullptr;
std::list<MultiCastBlock> _multi_cast_blocks;
std::vector<std::list<MultiCastBlock>::iterator> _sender_pos_to_read;
std::condition_variable _cv;
std::mutex _mutex;
bool _eos = false;
int _cast_sender_count = 0;
int _closed_sender_count = 0;
int64_t _cumulative_mem_size = 0;

RuntimeProfile::Counter* _process_rows = nullptr;
Expand Down

0 comments on commit 5457e9b

Please sign in to comment.