Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "[improvement](scanner_schedule) reduce memory consumption of scanner #24199" #26613

Merged
merged 1 commit into from
Nov 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions be/src/exec/exec_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
#include "runtime/runtime_state.h"
#include "util/debug_util.h"
#include "util/runtime_profile.h"
#include "util/stack_util.h"
#include "util/uid_util.h"
#include "vec/columns/column_nullable.h"
#include "vec/core/block.h"
Expand Down Expand Up @@ -206,7 +205,7 @@ Status ExecNode::close(RuntimeState* state) {
<< " already closed";
return Status::OK();
}
LOG(INFO) << "fragment_instance_id=" << print_id(state->fragment_instance_id()) << " closed. ";
LOG(INFO) << "fragment_instance_id=" << print_id(state->fragment_instance_id()) << " closed";
_is_closed = true;

Status result;
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ bool ScanOperator::can_read() {
return true;
} else {
if (_node->_scanner_ctx->get_num_running_scanners() == 0 &&
_node->_scanner_ctx->should_be_scheduled()) {
_node->_scanner_ctx->has_enough_space_in_blocks_queue()) {
_node->_scanner_ctx->reschedule_scanner_ctx();
}
return _node->ready_to_read(); // there are some blocks to process
Expand Down
1 change: 0 additions & 1 deletion be/src/runtime/plan_fragment_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
#include "util/container_util.hpp"
#include "util/defer_op.h"
#include "util/pretty_printer.h"
#include "util/stack_util.h"
#include "util/telemetry/telemetry.h"
#include "util/threadpool.h"
#include "util/time.h"
Expand Down
7 changes: 6 additions & 1 deletion be/src/vec/exec/scan/pip_scanner_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,10 @@ class PipScannerContext : public vectorized::ScannerContext {
_free_blocks_memory_usage->add(free_blocks_memory_usage);
}

bool has_enough_space_in_blocks_queue() const override {
return _current_used_bytes < _max_bytes_in_queue / 2 * _num_parallel_instances;
}

void _dispose_coloate_blocks_not_in_queue() override {
if (_need_colocate_distribute) {
for (int i = 0; i < _num_parallel_instances; ++i) {
Expand Down Expand Up @@ -217,7 +221,8 @@ class PipScannerContext : public vectorized::ScannerContext {
std::lock_guard<std::mutex> queue_l(*_queue_mutexs[loc]);
_blocks_queues[loc].emplace_back(std::move(_colocate_blocks[loc]));
}
_colocate_blocks[loc] = get_free_block();
bool get_block_not_empty = true;
_colocate_blocks[loc] = get_free_block(&get_block_not_empty, get_block_not_empty);
_colocate_mutable_blocks[loc]->set_muatable_columns(
_colocate_blocks[loc]->mutate_columns());
}
Expand Down
68 changes: 29 additions & 39 deletions be/src/vec/exec/scan/scanner_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ ScannerContext::ScannerContext(doris::RuntimeState* state_, doris::vectorized::V
_process_status(Status::OK()),
_batch_size(state_->batch_size()),
limit(limit_),
_max_bytes_in_queue(max_bytes_in_blocks_queue_ * num_parallel_instances),
_max_bytes_in_queue(max_bytes_in_blocks_queue_),
_scanner_scheduler(state_->exec_env()->scanner_scheduler()),
_scanners(scanners_),
_num_parallel_instances(num_parallel_instances) {
Expand All @@ -63,21 +63,26 @@ ScannerContext::ScannerContext(doris::RuntimeState* state_, doris::vectorized::V
if (limit < 0) {
limit = -1;
}
}

// After init function call, should not access _parent
Status ScannerContext::init() {
// 1. Calculate max concurrency
// TODO: now the max thread num <= config::doris_scanner_thread_pool_thread_num / 4
// should find a more reasonable value.
_max_thread_num = config::doris_scanner_thread_pool_thread_num / 4;
_max_thread_num *= num_parallel_instances;
if (_parent->_shared_scan_opt) {
DCHECK(_num_parallel_instances > 0);
_max_thread_num *= _num_parallel_instances;
}
_max_thread_num = _max_thread_num == 0 ? 1 : _max_thread_num;
DCHECK(_max_thread_num > 0);
_max_thread_num = std::min(_max_thread_num, (int32_t)_scanners.size());
// 1. Calculate max concurrency
// For select * from table limit 10; should just use one thread.
if (_parent->should_run_serial()) {
_max_thread_num = 1;
}
}

// After init function call, should not access _parent
Status ScannerContext::init() {
_scanner_profile = _parent->_scanner_profile;
_scanner_sched_counter = _parent->_scanner_sched_counter;
_scanner_ctx_sched_counter = _parent->_scanner_ctx_sched_counter;
Expand All @@ -99,9 +104,6 @@ Status ScannerContext::init() {
limit == -1 ? _batch_size : std::min(static_cast<int64_t>(_batch_size), limit);
_block_per_scanner = (doris_scanner_row_num + (real_block_size - 1)) / real_block_size;
_free_blocks_capacity = _max_thread_num * _block_per_scanner;
auto block = get_free_block();
_estimated_block_bytes = std::max(block->allocated_bytes(), (size_t)16);
return_free_block(std::move(block));

#ifndef BE_TEST
// 3. get thread token
Expand All @@ -121,33 +123,27 @@ Status ScannerContext::init() {
return Status::OK();
}

vectorized::BlockUPtr ScannerContext::get_free_block() {
vectorized::BlockUPtr ScannerContext::get_free_block(bool* has_free_block,
bool get_block_not_empty) {
vectorized::BlockUPtr block;
if (_free_blocks.try_dequeue(block)) {
DCHECK(block->mem_reuse());
_free_blocks_memory_usage->add(-block->allocated_bytes());
_serving_blocks_num++;
return block;
if (!get_block_not_empty || block->mem_reuse()) {
_free_blocks_capacity--;
_free_blocks_memory_usage->add(-block->allocated_bytes());
return block;
}
}

block = vectorized::Block::create_unique(_output_tuple_desc->slots(), _batch_size,
true /*ignore invalid slots*/);
COUNTER_UPDATE(_newly_create_free_blocks_num, 1);

_serving_blocks_num++;
return block;
return vectorized::Block::create_unique(_output_tuple_desc->slots(), _batch_size,
true /*ignore invalid slots*/);
}

void ScannerContext::return_free_block(std::unique_ptr<vectorized::Block> block) {
_serving_blocks_num--;
if (block->mem_reuse()) {
// Only put blocks with schema to free blocks, because colocate blocks
// need schema.
_estimated_block_bytes = std::max(block->allocated_bytes(), (size_t)16);
block->clear_column_data();
_free_blocks_memory_usage->add(block->allocated_bytes());
_free_blocks.enqueue(std::move(block));
}
block->clear_column_data();
_free_blocks_memory_usage->add(block->allocated_bytes());
_free_blocks.enqueue(std::move(block));
++_free_blocks_capacity;
}

void ScannerContext::append_blocks_to_queue(std::vector<vectorized::BlockUPtr>& blocks) {
Expand Down Expand Up @@ -180,15 +176,14 @@ Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::Blo
// (if the scheduler continues to schedule, it will cause a lot of busy running).
// At this point, consumers are required to trigger new scheduling to ensure that
// data can be continuously fetched.
if (should_be_scheduled() && _num_running_scanners == 0) {
if (has_enough_space_in_blocks_queue() && _num_running_scanners == 0) {
auto state = _scanner_scheduler->submit(this);
if (state.ok()) {
_num_scheduling_ctx++;
} else {
set_status_on_error(state, false);
}
}

// Wait for block from queue
if (wait) {
SCOPED_TIMER(_scanner_wait_batch_timer);
Expand All @@ -212,7 +207,6 @@ Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::Blo

auto block_bytes = (*block)->allocated_bytes();
_cur_bytes_in_queue -= block_bytes;

_queued_blocks_memory_usage->add(-block_bytes);
return Status::OK();
} else {
Expand Down Expand Up @@ -359,13 +353,7 @@ void ScannerContext::push_back_scanner_and_reschedule(VScannerSPtr scanner) {
_scanners.push_front(scanner);
}
std::lock_guard l(_transfer_lock);

// In pipeline engine, doris will close scanners when `no_schedule`.
// We have to decrease _num_running_scanners before schedule, otherwise
// schedule does not woring due to _num_running_scanners.
_num_running_scanners--;

if (should_be_scheduled()) {
if (has_enough_space_in_blocks_queue()) {
auto state = _scanner_scheduler->submit(this);
if (state.ok()) {
_num_scheduling_ctx++;
Expand All @@ -385,6 +373,8 @@ void ScannerContext::push_back_scanner_and_reschedule(VScannerSPtr scanner) {
_is_finished = true;
_blocks_queue_added_cv.notify_one();
}
// In pipeline engine, doris will close scanners when `no_schedule`.
_num_running_scanners--;
_ctx_finish_cv.notify_one();
}

Expand All @@ -394,7 +384,7 @@ void ScannerContext::get_next_batch_of_scanners(std::list<VScannerSPtr>* current
{
// If there are enough space in blocks queue,
// the scanner number depends on the _free_blocks numbers
thread_slot_num = get_available_thread_slot_num();
thread_slot_num = cal_thread_slot_num_by_free_block_num();
}

// 2. get #thread_slot_num scanners from ctx->scanners
Expand Down
28 changes: 10 additions & 18 deletions be/src/vec/exec/scan/scanner_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,12 @@ class ScannerContext {
ScannerContext(RuntimeState* state_, VScanNode* parent,
const TupleDescriptor* output_tuple_desc,
const std::list<VScannerSPtr>& scanners_, int64_t limit_,
int64_t max_bytes_in_blocks_queue_, const int num_parallel_instances = 1);
int64_t max_bytes_in_blocks_queue_, const int num_parallel_instances = 0);

virtual ~ScannerContext() = default;
virtual Status init();

vectorized::BlockUPtr get_free_block();
vectorized::BlockUPtr get_free_block(bool* has_free_block, bool get_not_empty_block = false);
void return_free_block(std::unique_ptr<vectorized::Block> block);

// Append blocks from scanners to the blocks queue.
Expand Down Expand Up @@ -136,25 +136,20 @@ class ScannerContext {
virtual bool empty_in_queue(int id);

// todo(wb) rethinking how to calculate ```_max_bytes_in_queue``` when executing shared scan
inline bool should_be_scheduled() const {
return (_cur_bytes_in_queue < _max_bytes_in_queue / 2) &&
(_serving_blocks_num < allowed_blocks_num());
virtual inline bool has_enough_space_in_blocks_queue() const {
return _cur_bytes_in_queue < _max_bytes_in_queue / 2;
}

int get_available_thread_slot_num() {
int cal_thread_slot_num_by_free_block_num() {
int thread_slot_num = 0;
thread_slot_num = (allowed_blocks_num() + _block_per_scanner - 1) / _block_per_scanner;
thread_slot_num = (_free_blocks_capacity + _block_per_scanner - 1) / _block_per_scanner;
thread_slot_num = std::min(thread_slot_num, _max_thread_num - _num_running_scanners);
if (thread_slot_num <= 0) {
thread_slot_num = 1;
}
return thread_slot_num;
}

int32_t allowed_blocks_num() const {
int32_t blocks_num = std::min(_free_blocks_capacity,
int32_t((_max_bytes_in_queue + _estimated_block_bytes - 1) /
_estimated_block_bytes));
return blocks_num;
}

void reschedule_scanner_ctx();

// the unique id of this context
Expand Down Expand Up @@ -208,12 +203,10 @@ class ScannerContext {

// Lazy-allocated blocks for all scanners to share, for memory reuse.
moodycamel::ConcurrentQueue<vectorized::BlockUPtr> _free_blocks;
std::atomic<int32_t> _serving_blocks_num = 0;
// The current number of free blocks available to the scanners.
// Used to limit the memory usage of the scanner.
// NOTE: this is NOT the size of `_free_blocks`.
int32_t _free_blocks_capacity = 0;
int64_t _estimated_block_bytes = 0;
std::atomic_int32_t _free_blocks_capacity = 0;

int _batch_size;
// The limit from SQL's limit clause
Expand All @@ -238,7 +231,6 @@ class ScannerContext {
int64_t _cur_bytes_in_queue = 0;
// The max limit bytes of blocks in blocks queue
const int64_t _max_bytes_in_queue;
std::atomic<int64_t> _bytes_allocated = 0;

doris::vectorized::ScannerScheduler* _scanner_scheduler;
// List "scanners" saves all "unfinished" scanners.
Expand Down
12 changes: 8 additions & 4 deletions be/src/vec/exec/scan/scanner_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,7 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler, ScannerContext
int64_t raw_rows_threshold = raw_rows_read + config::doris_scanner_row_num;
int64_t raw_bytes_read = 0;
int64_t raw_bytes_threshold = config::doris_scanner_row_bytes;
bool has_free_block = true;
int num_rows_in_block = 0;

// Only set to true when ctx->done() return true.
Expand All @@ -330,16 +331,17 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler, ScannerContext
bool should_stop = false;
// Has to wait at least one full block, or it will cause a lot of schedule task in priority
// queue, it will affect query latency and query concurrency for example ssb 3.3.
while (!eos && raw_bytes_read < raw_bytes_threshold && raw_rows_read < raw_rows_threshold &&
num_rows_in_block < state->batch_size()) {
while (!eos && raw_bytes_read < raw_bytes_threshold &&
((raw_rows_read < raw_rows_threshold && has_free_block) ||
num_rows_in_block < state->batch_size())) {
if (UNLIKELY(ctx->done())) {
// No need to set status on error here.
// Because done() maybe caused by "should_stop"
should_stop = true;
break;
}

BlockUPtr block = ctx->get_free_block();
BlockUPtr block = ctx->get_free_block(&has_free_block);
status = scanner->get_block(state, block.get(), &eos);
VLOG_ROW << "VScanNode input rows: " << block->rows() << ", eos: " << eos;
// The VFileScanner for external table may try to open not exist files,
Expand All @@ -355,11 +357,12 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler, ScannerContext
if (status.is<ErrorCode::NOT_FOUND>()) {
// The only case in this "if" branch is external table file delete and fe cache has not been updated yet.
// Set status to OK.
LOG(INFO) << "scan range not found: " << scanner->get_current_scan_range_name();
status = Status::OK();
eos = true;
}

raw_bytes_read += block->allocated_bytes();
raw_bytes_read += block->bytes();
num_rows_in_block += block->rows();
if (UNLIKELY(block->rows() == 0)) {
ctx->return_free_block(std::move(block));
Expand Down Expand Up @@ -394,6 +397,7 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler, ScannerContext
if (eos || should_stop) {
scanner->mark_to_need_to_close();
}

ctx->push_back_scanner_and_reschedule(scanner);
}

Expand Down