From 869deb043bbae428f5a5a090b243b9567b2917df Mon Sep 17 00:00:00 2001 From: Kang Date: Wed, 8 Nov 2023 09:43:20 -0600 Subject: [PATCH] Revert "[improvement](scanner_schedule) reduce memory consumption of scanner #24199 (#25547)" This reverts commit 9a19581a2c55c6e78e7e2812a45e1bc3842ffec7. --- be/src/exec/exec_node.cpp | 3 +- be/src/pipeline/exec/scan_operator.cpp | 2 +- be/src/runtime/plan_fragment_executor.cpp | 1 - be/src/vec/exec/scan/pip_scanner_context.h | 7 ++- be/src/vec/exec/scan/scanner_context.cpp | 68 +++++++++------------- be/src/vec/exec/scan/scanner_context.h | 28 ++++----- be/src/vec/exec/scan/scanner_scheduler.cpp | 12 ++-- 7 files changed, 55 insertions(+), 66 deletions(-) diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp index c8f46b2ed124e0..dc30bf163a58a5 100644 --- a/be/src/exec/exec_node.cpp +++ b/be/src/exec/exec_node.cpp @@ -41,7 +41,6 @@ #include "runtime/runtime_state.h" #include "util/debug_util.h" #include "util/runtime_profile.h" -#include "util/stack_util.h" #include "util/uid_util.h" #include "vec/columns/column_nullable.h" #include "vec/core/block.h" @@ -206,7 +205,7 @@ Status ExecNode::close(RuntimeState* state) { << " already closed"; return Status::OK(); } - LOG(INFO) << "fragment_instance_id=" << print_id(state->fragment_instance_id()) << " closed. "; + LOG(INFO) << "fragment_instance_id=" << print_id(state->fragment_instance_id()) << " closed"; _is_closed = true; Status result; diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp index 1f15b1d61f8e59..f34461a9fd200c 100644 --- a/be/src/pipeline/exec/scan_operator.cpp +++ b/be/src/pipeline/exec/scan_operator.cpp @@ -44,7 +44,7 @@ bool ScanOperator::can_read() { return true; } else { if (_node->_scanner_ctx->get_num_running_scanners() == 0 && - _node->_scanner_ctx->should_be_scheduled()) { + _node->_scanner_ctx->has_enough_space_in_blocks_queue()) { _node->_scanner_ctx->reschedule_scanner_ctx(); } return _node->ready_to_read(); // there are some blocks to process diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp index a98085c342fb76..2d7f8c79520159 100644 --- a/be/src/runtime/plan_fragment_executor.cpp +++ b/be/src/runtime/plan_fragment_executor.cpp @@ -53,7 +53,6 @@ #include "util/container_util.hpp" #include "util/defer_op.h" #include "util/pretty_printer.h" -#include "util/stack_util.h" #include "util/telemetry/telemetry.h" #include "util/threadpool.h" #include "util/time.h" diff --git a/be/src/vec/exec/scan/pip_scanner_context.h b/be/src/vec/exec/scan/pip_scanner_context.h index f52bd3bf3c747a..b98c628368eeeb 100644 --- a/be/src/vec/exec/scan/pip_scanner_context.h +++ b/be/src/vec/exec/scan/pip_scanner_context.h @@ -166,6 +166,10 @@ class PipScannerContext : public vectorized::ScannerContext { _free_blocks_memory_usage->add(free_blocks_memory_usage); } + bool has_enough_space_in_blocks_queue() const override { + return _current_used_bytes < _max_bytes_in_queue / 2 * _num_parallel_instances; + } + void _dispose_coloate_blocks_not_in_queue() override { if (_need_colocate_distribute) { for (int i = 0; i < _num_parallel_instances; ++i) { @@ -217,7 +221,8 @@ class PipScannerContext : public vectorized::ScannerContext { std::lock_guard queue_l(*_queue_mutexs[loc]); _blocks_queues[loc].emplace_back(std::move(_colocate_blocks[loc])); } - _colocate_blocks[loc] = get_free_block(); + bool get_block_not_empty = true; + _colocate_blocks[loc] = get_free_block(&get_block_not_empty, get_block_not_empty); _colocate_mutable_blocks[loc]->set_muatable_columns( _colocate_blocks[loc]->mutate_columns()); } diff --git a/be/src/vec/exec/scan/scanner_context.cpp b/be/src/vec/exec/scan/scanner_context.cpp index 8deb21534786fe..478d9fb4cb71cd 100644 --- a/be/src/vec/exec/scan/scanner_context.cpp +++ b/be/src/vec/exec/scan/scanner_context.cpp @@ -52,7 +52,7 @@ ScannerContext::ScannerContext(doris::RuntimeState* state_, doris::vectorized::V _process_status(Status::OK()), _batch_size(state_->batch_size()), limit(limit_), - _max_bytes_in_queue(max_bytes_in_blocks_queue_ * num_parallel_instances), + _max_bytes_in_queue(max_bytes_in_blocks_queue_), _scanner_scheduler(state_->exec_env()->scanner_scheduler()), _scanners(scanners_), _num_parallel_instances(num_parallel_instances) { @@ -63,21 +63,26 @@ ScannerContext::ScannerContext(doris::RuntimeState* state_, doris::vectorized::V if (limit < 0) { limit = -1; } +} +// After init function call, should not access _parent +Status ScannerContext::init() { + // 1. Calculate max concurrency + // TODO: now the max thread num <= config::doris_scanner_thread_pool_thread_num / 4 + // should find a more reasonable value. _max_thread_num = config::doris_scanner_thread_pool_thread_num / 4; - _max_thread_num *= num_parallel_instances; + if (_parent->_shared_scan_opt) { + DCHECK(_num_parallel_instances > 0); + _max_thread_num *= _num_parallel_instances; + } _max_thread_num = _max_thread_num == 0 ? 1 : _max_thread_num; DCHECK(_max_thread_num > 0); _max_thread_num = std::min(_max_thread_num, (int32_t)_scanners.size()); - // 1. Calculate max concurrency // For select * from table limit 10; should just use one thread. if (_parent->should_run_serial()) { _max_thread_num = 1; } -} -// After init function call, should not access _parent -Status ScannerContext::init() { _scanner_profile = _parent->_scanner_profile; _scanner_sched_counter = _parent->_scanner_sched_counter; _scanner_ctx_sched_counter = _parent->_scanner_ctx_sched_counter; @@ -99,9 +104,6 @@ Status ScannerContext::init() { limit == -1 ? _batch_size : std::min(static_cast(_batch_size), limit); _block_per_scanner = (doris_scanner_row_num + (real_block_size - 1)) / real_block_size; _free_blocks_capacity = _max_thread_num * _block_per_scanner; - auto block = get_free_block(); - _estimated_block_bytes = std::max(block->allocated_bytes(), (size_t)16); - return_free_block(std::move(block)); #ifndef BE_TEST // 3. get thread token @@ -121,33 +123,27 @@ Status ScannerContext::init() { return Status::OK(); } -vectorized::BlockUPtr ScannerContext::get_free_block() { +vectorized::BlockUPtr ScannerContext::get_free_block(bool* has_free_block, + bool get_block_not_empty) { vectorized::BlockUPtr block; if (_free_blocks.try_dequeue(block)) { - DCHECK(block->mem_reuse()); - _free_blocks_memory_usage->add(-block->allocated_bytes()); - _serving_blocks_num++; - return block; + if (!get_block_not_empty || block->mem_reuse()) { + _free_blocks_capacity--; + _free_blocks_memory_usage->add(-block->allocated_bytes()); + return block; + } } - block = vectorized::Block::create_unique(_output_tuple_desc->slots(), _batch_size, - true /*ignore invalid slots*/); COUNTER_UPDATE(_newly_create_free_blocks_num, 1); - - _serving_blocks_num++; - return block; + return vectorized::Block::create_unique(_output_tuple_desc->slots(), _batch_size, + true /*ignore invalid slots*/); } void ScannerContext::return_free_block(std::unique_ptr block) { - _serving_blocks_num--; - if (block->mem_reuse()) { - // Only put blocks with schema to free blocks, because colocate blocks - // need schema. - _estimated_block_bytes = std::max(block->allocated_bytes(), (size_t)16); - block->clear_column_data(); - _free_blocks_memory_usage->add(block->allocated_bytes()); - _free_blocks.enqueue(std::move(block)); - } + block->clear_column_data(); + _free_blocks_memory_usage->add(block->allocated_bytes()); + _free_blocks.enqueue(std::move(block)); + ++_free_blocks_capacity; } void ScannerContext::append_blocks_to_queue(std::vector& blocks) { @@ -180,7 +176,7 @@ Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::Blo // (if the scheduler continues to schedule, it will cause a lot of busy running). // At this point, consumers are required to trigger new scheduling to ensure that // data can be continuously fetched. - if (should_be_scheduled() && _num_running_scanners == 0) { + if (has_enough_space_in_blocks_queue() && _num_running_scanners == 0) { auto state = _scanner_scheduler->submit(this); if (state.ok()) { _num_scheduling_ctx++; @@ -188,7 +184,6 @@ Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::Blo set_status_on_error(state, false); } } - // Wait for block from queue if (wait) { SCOPED_TIMER(_scanner_wait_batch_timer); @@ -212,7 +207,6 @@ Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::Blo auto block_bytes = (*block)->allocated_bytes(); _cur_bytes_in_queue -= block_bytes; - _queued_blocks_memory_usage->add(-block_bytes); return Status::OK(); } else { @@ -359,13 +353,7 @@ void ScannerContext::push_back_scanner_and_reschedule(VScannerSPtr scanner) { _scanners.push_front(scanner); } std::lock_guard l(_transfer_lock); - - // In pipeline engine, doris will close scanners when `no_schedule`. - // We have to decrease _num_running_scanners before schedule, otherwise - // schedule does not woring due to _num_running_scanners. - _num_running_scanners--; - - if (should_be_scheduled()) { + if (has_enough_space_in_blocks_queue()) { auto state = _scanner_scheduler->submit(this); if (state.ok()) { _num_scheduling_ctx++; @@ -385,6 +373,8 @@ void ScannerContext::push_back_scanner_and_reschedule(VScannerSPtr scanner) { _is_finished = true; _blocks_queue_added_cv.notify_one(); } + // In pipeline engine, doris will close scanners when `no_schedule`. + _num_running_scanners--; _ctx_finish_cv.notify_one(); } @@ -394,7 +384,7 @@ void ScannerContext::get_next_batch_of_scanners(std::list* current { // If there are enough space in blocks queue, // the scanner number depends on the _free_blocks numbers - thread_slot_num = get_available_thread_slot_num(); + thread_slot_num = cal_thread_slot_num_by_free_block_num(); } // 2. get #thread_slot_num scanners from ctx->scanners diff --git a/be/src/vec/exec/scan/scanner_context.h b/be/src/vec/exec/scan/scanner_context.h index a345bfc03dd114..3aad0d6263fa23 100644 --- a/be/src/vec/exec/scan/scanner_context.h +++ b/be/src/vec/exec/scan/scanner_context.h @@ -62,12 +62,12 @@ class ScannerContext { ScannerContext(RuntimeState* state_, VScanNode* parent, const TupleDescriptor* output_tuple_desc, const std::list& scanners_, int64_t limit_, - int64_t max_bytes_in_blocks_queue_, const int num_parallel_instances = 1); + int64_t max_bytes_in_blocks_queue_, const int num_parallel_instances = 0); virtual ~ScannerContext() = default; virtual Status init(); - vectorized::BlockUPtr get_free_block(); + vectorized::BlockUPtr get_free_block(bool* has_free_block, bool get_not_empty_block = false); void return_free_block(std::unique_ptr block); // Append blocks from scanners to the blocks queue. @@ -136,25 +136,20 @@ class ScannerContext { virtual bool empty_in_queue(int id); // todo(wb) rethinking how to calculate ```_max_bytes_in_queue``` when executing shared scan - inline bool should_be_scheduled() const { - return (_cur_bytes_in_queue < _max_bytes_in_queue / 2) && - (_serving_blocks_num < allowed_blocks_num()); + virtual inline bool has_enough_space_in_blocks_queue() const { + return _cur_bytes_in_queue < _max_bytes_in_queue / 2; } - int get_available_thread_slot_num() { + int cal_thread_slot_num_by_free_block_num() { int thread_slot_num = 0; - thread_slot_num = (allowed_blocks_num() + _block_per_scanner - 1) / _block_per_scanner; + thread_slot_num = (_free_blocks_capacity + _block_per_scanner - 1) / _block_per_scanner; thread_slot_num = std::min(thread_slot_num, _max_thread_num - _num_running_scanners); + if (thread_slot_num <= 0) { + thread_slot_num = 1; + } return thread_slot_num; } - int32_t allowed_blocks_num() const { - int32_t blocks_num = std::min(_free_blocks_capacity, - int32_t((_max_bytes_in_queue + _estimated_block_bytes - 1) / - _estimated_block_bytes)); - return blocks_num; - } - void reschedule_scanner_ctx(); // the unique id of this context @@ -208,12 +203,10 @@ class ScannerContext { // Lazy-allocated blocks for all scanners to share, for memory reuse. moodycamel::ConcurrentQueue _free_blocks; - std::atomic _serving_blocks_num = 0; // The current number of free blocks available to the scanners. // Used to limit the memory usage of the scanner. // NOTE: this is NOT the size of `_free_blocks`. - int32_t _free_blocks_capacity = 0; - int64_t _estimated_block_bytes = 0; + std::atomic_int32_t _free_blocks_capacity = 0; int _batch_size; // The limit from SQL's limit clause @@ -238,7 +231,6 @@ class ScannerContext { int64_t _cur_bytes_in_queue = 0; // The max limit bytes of blocks in blocks queue const int64_t _max_bytes_in_queue; - std::atomic _bytes_allocated = 0; doris::vectorized::ScannerScheduler* _scanner_scheduler; // List "scanners" saves all "unfinished" scanners. diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp index 2529ce67e5e159..3481128a1d2de4 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.cpp +++ b/be/src/vec/exec/scan/scanner_scheduler.cpp @@ -321,6 +321,7 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler, ScannerContext int64_t raw_rows_threshold = raw_rows_read + config::doris_scanner_row_num; int64_t raw_bytes_read = 0; int64_t raw_bytes_threshold = config::doris_scanner_row_bytes; + bool has_free_block = true; int num_rows_in_block = 0; // Only set to true when ctx->done() return true. @@ -330,8 +331,9 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler, ScannerContext bool should_stop = false; // Has to wait at least one full block, or it will cause a lot of schedule task in priority // queue, it will affect query latency and query concurrency for example ssb 3.3. - while (!eos && raw_bytes_read < raw_bytes_threshold && raw_rows_read < raw_rows_threshold && - num_rows_in_block < state->batch_size()) { + while (!eos && raw_bytes_read < raw_bytes_threshold && + ((raw_rows_read < raw_rows_threshold && has_free_block) || + num_rows_in_block < state->batch_size())) { if (UNLIKELY(ctx->done())) { // No need to set status on error here. // Because done() maybe caused by "should_stop" @@ -339,7 +341,7 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler, ScannerContext break; } - BlockUPtr block = ctx->get_free_block(); + BlockUPtr block = ctx->get_free_block(&has_free_block); status = scanner->get_block(state, block.get(), &eos); VLOG_ROW << "VScanNode input rows: " << block->rows() << ", eos: " << eos; // The VFileScanner for external table may try to open not exist files, @@ -355,11 +357,12 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler, ScannerContext if (status.is()) { // The only case in this "if" branch is external table file delete and fe cache has not been updated yet. // Set status to OK. + LOG(INFO) << "scan range not found: " << scanner->get_current_scan_range_name(); status = Status::OK(); eos = true; } - raw_bytes_read += block->allocated_bytes(); + raw_bytes_read += block->bytes(); num_rows_in_block += block->rows(); if (UNLIKELY(block->rows() == 0)) { ctx->return_free_block(std::move(block)); @@ -394,6 +397,7 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler, ScannerContext if (eos || should_stop) { scanner->mark_to_need_to_close(); } + ctx->push_back_scanner_and_reschedule(scanner); }