Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

memtable flush reserve process mem and improve logs #45743

Open
wants to merge 1 commit into
base: spill_and_reserve
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions be/src/olap/memtable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -611,6 +611,20 @@ bool MemTable::need_agg() const {
return false;
}

size_t MemTable::get_flush_reserve_memory_size() const {
size_t reserve_size = 0;
if (_keys_type == KeysType::DUP_KEYS) {
if (_tablet_schema->num_key_columns() == 0) {
// no need to reserve
} else {
reserve_size = _input_mutable_block.allocated_bytes();
}
} else {
reserve_size = _input_mutable_block.allocated_bytes();
}
return reserve_size;
}

Status MemTable::_to_block(std::unique_ptr<vectorized::Block>* res) {
size_t same_keys_num = _sort();
if (_keys_type == KeysType::DUP_KEYS || same_keys_num == 0) {
Expand Down
1 change: 1 addition & 0 deletions be/src/olap/memtable.h
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ class MemTable {

int64_t tablet_id() const { return _tablet_id; }
size_t memory_usage() const { return _mem_tracker->consumption(); }
size_t get_flush_reserve_memory_size() const;
// insert tuple from (row_pos) to (row_pos+num_rows)
Status insert(const vectorized::Block* block, const std::vector<uint32_t>& row_idxs);

Expand Down
39 changes: 39 additions & 0 deletions be/src/olap/memtable_flush_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "common/signal_handler.h"
#include "olap/memtable.h"
#include "olap/rowset/rowset_writer.h"
#include "olap/storage_engine.h"
#include "util/debug_points.h"
#include "util/doris_metrics.h"
#include "util/metrics.h"
Expand Down Expand Up @@ -140,6 +141,36 @@ Status FlushToken::wait() {
return Status::OK();
}

Status FlushToken::_try_reserve_memory(int64_t size) {
auto* thread_context = doris::thread_context();
auto* memtable_flush_executor =
ExecEnv::GetInstance()->storage_engine().memtable_flush_executor();
Status st;
do {
// only try to reserve process memory
st = thread_context->reserve_memory(size);
if (st.ok()) {
memtable_flush_executor->inc_flushing_task();
break;
}
if (_is_shutdown()) {
st = Status::Cancelled("flush memtable already cancelled");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

如果在等待期间,要flush的这个memtable 关联的导入任务呗cancel了,怎么处理?

break;
}
// Make sure at least one memtable is flushing even reserve memory failed.
if (memtable_flush_executor->check_and_inc_has_any_flushing_task()) {
// If there are already any flushing task, Wait for some time and retry.
LOG(INFO) << fmt::format(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个每1min 打印一次吧,否则内存满的时候刷屏了

"Failed to reserve memory {} for flush memtable, retry after 100ms", size);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
} else {
st = Status::OK();
break;
}
} while (true);
return st;
}

Status FlushToken::_do_flush_memtable(MemTable* memtable, int32_t segment_id, int64_t* flush_size) {
VLOG_CRITICAL << "begin to flush memtable for tablet: " << memtable->tablet_id()
<< ", memsize: " << memtable->memory_usage()
Expand All @@ -150,10 +181,18 @@ Status FlushToken::_do_flush_memtable(MemTable* memtable, int32_t segment_id, in
SCOPED_ATTACH_TASK(memtable->query_thread_context());
signal::set_signal_task_id(_rowset_writer->load_id());
signal::tablet_id = memtable->tablet_id();

DEFER_RELEASE_RESERVED();

{
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
memtable->query_thread_context().query_mem_tracker->write_tracker());
SCOPED_CONSUME_MEM_TRACKER(memtable->mem_tracker());
auto reserve_size = memtable->get_flush_reserve_memory_size();
RETURN_IF_ERROR(_try_reserve_memory(reserve_size));
Defer defer {[&]() {
ExecEnv::GetInstance()->storage_engine().memtable_flush_executor()->dec_flushing_task();
}};
std::unique_ptr<vectorized::Block> block;
RETURN_IF_ERROR(memtable->to_block(&block));
RETURN_IF_ERROR(_rowset_writer->flush_memtable(block.get(), segment_id, flush_size));
Expand Down
19 changes: 19 additions & 0 deletions be/src/olap/memtable_flush_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ class FlushToken : public std::enable_shared_from_this<FlushToken> {

Status _do_flush_memtable(MemTable* memtable, int32_t segment_id, int64_t* flush_size);

Status _try_reserve_memory(int64_t size);

// Records the current flush status of the tablet.
// Note: Once its value is set to Failed, it cannot return to SUCCESS.
std::shared_mutex _flush_status_lock;
Expand Down Expand Up @@ -140,12 +142,29 @@ class MemTableFlushExecutor {
std::shared_ptr<RowsetWriter> rowset_writer, bool is_high_priority,
std::shared_ptr<WorkloadGroup> wg_sptr);

// return true if it already has any flushing task
bool check_and_inc_has_any_flushing_task() {
// need to use CAS instead of only `if (0 == _flushing_task_count)` statement,
// to avoid concurrent entries both pass the if statement
int expected_count = 0;
if (!_flushing_task_count.compare_exchange_strong(expected_count, 1)) {
return true;
}
DCHECK(expected_count == 0 && _flushing_task_count == 1);
return false;
}

void inc_flushing_task() { _flushing_task_count++; }

void dec_flushing_task() { _flushing_task_count--; }

private:
void _register_metrics();
static void _deregister_metrics();

std::unique_ptr<ThreadPool> _flush_pool;
std::unique_ptr<ThreadPool> _high_prio_flush_pool;
std::atomic<int> _flushing_task_count = 0;
};

} // namespace doris
28 changes: 22 additions & 6 deletions be/src/olap/memtable_memory_limiter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,10 @@ void MemTableMemoryLimiter::handle_workload_group_memtable_flush(WorkloadGroupPt
--sleep_times;
}
// Check process memory again.
handle_memtable_flush();
handle_memtable_flush(wg);
}

void MemTableMemoryLimiter::handle_memtable_flush() {
void MemTableMemoryLimiter::handle_memtable_flush(WorkloadGroupPtr wg) {
// Check the soft limit.
DCHECK(_load_soft_mem_limit > 0);
if (!_soft_limit_reached() || _load_usage_low()) {
Expand All @@ -150,20 +150,34 @@ void MemTableMemoryLimiter::handle_memtable_flush() {
if (need_flush > 0) {
auto limit = _hard_limit_reached() ? Limit::HARD : Limit::SOFT;
LOG(INFO) << "reached memtable memory " << (limit == Limit::HARD ? "hard" : "soft")
<< ", " << GlobalMemoryArbitrator::process_memory_used_details_str()
<< ", " << GlobalMemoryArbitrator::process_memory_used_details_str() << ", "
<< GlobalMemoryArbitrator::sys_mem_available_details_str()
<< ", load mem: " << PrettyPrinter::print_bytes(_mem_tracker->consumption())
<< ", memtable writers num: " << _writers.size()
<< ", active: " << PrettyPrinter::print_bytes(_active_mem_usage)
<< ", queue: " << PrettyPrinter::print_bytes(_queue_mem_usage)
<< ", flush: " << PrettyPrinter::print_bytes(_flush_mem_usage);
<< ", flush: " << PrettyPrinter::print_bytes(_flush_mem_usage)
<< ", wg: " << (wg ? wg->debug_string() : "null\n")
<< doris::ProcessProfile::instance()
->memory_profile()
->process_memory_detail_str();
_flush_active_memtables(0, need_flush);
}
} while (_hard_limit_reached() && !_load_usage_low());
g_memtable_memory_limit_waiting_threads << -1;
timer.stop();
int64_t time_ms = timer.elapsed_time() / 1000 / 1000;
g_memtable_memory_limit_latency_ms << time_ms;
LOG(INFO) << "waited " << time_ms << " ms for memtable memory limit";
LOG(INFO) << "waited " << time_ms << " ms for memtable memory limit"
<< ", " << GlobalMemoryArbitrator::process_memory_used_details_str() << ", "
<< GlobalMemoryArbitrator::sys_mem_available_details_str()
<< ", load mem: " << PrettyPrinter::print_bytes(_mem_tracker->consumption())
<< ", memtable writers num: " << _writers.size()
<< ", active: " << PrettyPrinter::print_bytes(_active_mem_usage)
<< ", queue: " << PrettyPrinter::print_bytes(_queue_mem_usage)
<< ", flush: " << PrettyPrinter::print_bytes(_flush_mem_usage)
<< ", wg: " << (wg ? wg->debug_string() : "null.\n")
<< doris::ProcessProfile::instance()->memory_profile()->process_memory_detail_str();
}

int64_t MemTableMemoryLimiter::flush_workload_group_memtables(uint64_t wg_id, int64_t need_flush) {
Expand Down Expand Up @@ -270,11 +284,13 @@ void MemTableMemoryLimiter::refresh_mem_tracker() {
_last_limit = limit;
_log_timer.reset();
LOG(INFO) << ss.str() << ", " << GlobalMemoryArbitrator::process_memory_used_details_str()
<< ", " << GlobalMemoryArbitrator::sys_mem_available_details_str()
<< ", load mem: " << PrettyPrinter::print_bytes(_mem_tracker->consumption())
<< ", memtable writers num: " << _writers.size()
<< ", active: " << PrettyPrinter::print_bytes(_active_mem_usage)
<< ", queue: " << PrettyPrinter::print_bytes(_queue_mem_usage)
<< ", flush: " << PrettyPrinter::print_bytes(_flush_mem_usage);
<< ", flush: " << PrettyPrinter::print_bytes(_flush_mem_usage) << "\n"
<< doris::ProcessProfile::instance()->memory_profile()->process_memory_detail_str();
}

void MemTableMemoryLimiter::_refresh_mem_tracker() {
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/memtable_memory_limiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class MemTableMemoryLimiter {
// If yes, it will flush memtable to try to reduce memory consumption.
// Every write operation will call this API to check if need flush memtable OR hang
// when memory is not available.
void handle_memtable_flush();
void handle_memtable_flush(WorkloadGroupPtr wg);

int64_t flush_workload_group_memtables(uint64_t wg_id, int64_t need_flush_bytes);

Expand Down
14 changes: 8 additions & 6 deletions be/src/runtime/memory/global_memory_arbitrator.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,13 @@ class GlobalMemoryArbitrator {
static inline std::string sys_mem_available_details_str() {
auto msg = fmt::format(
"sys available memory {}(= {}[proc/available] - {}[reserved] - "
"{}B[waiting_refresh])",
"{}B[waiting_refresh] + {}[tc/jemalloc_cache])",
PrettyPrinter::print(sys_mem_available(), TUnit::BYTES),
PrettyPrinter::print(MemInfo::_s_sys_mem_available.load(std::memory_order_relaxed),
TUnit::BYTES),
PrettyPrinter::print(process_reserved_memory(), TUnit::BYTES),
refresh_interval_memory_growth);
refresh_interval_memory_growth,
PrettyPrinter::print_bytes(static_cast<uint64_t>(MemInfo::allocator_cache_mem())));
#ifdef ADDRESS_SANITIZER
msg = "[ASAN]" + msg;
#endif
Expand Down Expand Up @@ -165,15 +166,16 @@ class GlobalMemoryArbitrator {

static std::string process_limit_exceeded_errmsg_str() {
return fmt::format(
"{} exceed limit {} or {} less than low water mark {}", process_memory_used_str(),
MemInfo::mem_limit_str(), sys_mem_available_str(),
"{} exceed limit {} or {} less than low water mark {}",
process_memory_used_details_str(), MemInfo::mem_limit_str(),
sys_mem_available_details_str(),
PrettyPrinter::print(MemInfo::sys_mem_available_low_water_mark(), TUnit::BYTES));
}

static std::string process_soft_limit_exceeded_errmsg_str() {
return fmt::format("{} exceed soft limit {} or {} less than warning water mark {}.",
process_memory_used_str(), MemInfo::soft_mem_limit_str(),
sys_mem_available_str(),
process_memory_used_details_str(), MemInfo::soft_mem_limit_str(),
sys_mem_available_details_str(),
PrettyPrinter::print(MemInfo::sys_mem_available_warning_water_mark(),
TUnit::BYTES));
}
Expand Down
3 changes: 2 additions & 1 deletion be/src/runtime/memory/mem_tracker_limiter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,8 @@ std::string MemTrackerLimiter::tracker_limit_exceeded_str() {
"{}, peak used {}, current used {}. backend {}, {}.",
label(), type_string(_type), MemCounter::print_bytes(limit()),
MemCounter::print_bytes(peak_consumption()), MemCounter::print_bytes(consumption()),
BackendOptions::get_localhost(), GlobalMemoryArbitrator::process_memory_used_str());
BackendOptions::get_localhost(),
GlobalMemoryArbitrator::process_memory_used_details_str());
if (_type == Type::QUERY || _type == Type::LOAD) {
err_msg += fmt::format(
" exec node:<{}>, can `set exec_mem_limit=8G` to change limit, details see "
Expand Down
13 changes: 8 additions & 5 deletions be/src/runtime/memory/memory_profile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -340,14 +340,17 @@ int64_t MemoryProfile::other_current_usage() {
return memory_other_trackers_sum_bytes.get_value();
}

std::string MemoryProfile::process_memory_detail_str() const {
return fmt::format("Process Memory Summary: {}\n, {}\n, {}\n, {}",
GlobalMemoryArbitrator::process_mem_log_str(),
print_memory_overview_profile(), print_global_memory_profile(),
print_top_memory_tasks_profile());
}

void MemoryProfile::print_log_process_usage() {
if (_enable_print_log_process_usage) {
_enable_print_log_process_usage = false;
LOG(WARNING) << "Process Memory Summary: " + GlobalMemoryArbitrator::process_mem_log_str()
<< "\n"
<< print_memory_overview_profile() << "\n"
<< print_global_memory_profile() << "\n"
<< print_top_memory_tasks_profile();
LOG(WARNING) << process_memory_detail_str();
}
}

Expand Down
1 change: 1 addition & 0 deletions be/src/runtime/memory/memory_profile.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ class MemoryProfile {
// process memory changes more than 256M, or the GC ends
void enable_print_log_process_usage() { _enable_print_log_process_usage = true; }
void print_log_process_usage();
std::string process_memory_detail_str() const;

private:
MultiVersion<RuntimeProfile> _memory_overview_profile;
Expand Down
48 changes: 29 additions & 19 deletions be/src/runtime/memory/thread_mem_tracker_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class ThreadMemTrackerMgr {
void consume(int64_t size, int skip_large_memory_check = 0);
void flush_untracked_mem();

doris::Status try_reserve(int64_t size);
doris::Status try_reserve(int64_t size, bool only_check_process_memory);

void release_reserved();

Expand Down Expand Up @@ -278,34 +278,44 @@ inline void ThreadMemTrackerMgr::flush_untracked_mem() {
_stop_consume = false;
}

inline doris::Status ThreadMemTrackerMgr::try_reserve(int64_t size) {
inline doris::Status ThreadMemTrackerMgr::try_reserve(int64_t size,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

把这个API 的单测补充一下

bool only_check_process_memory) {
DCHECK(_limiter_tracker);
DCHECK(size >= 0);
CHECK(init());
// if _reserved_mem not equal to 0, repeat reserve,
// _untracked_mem store bytes that not synchronized to process reserved memory.
flush_untracked_mem();
auto wg_ptr = _wg_wptr.lock();
if (!_limiter_tracker->try_reserve(size)) {
auto err_msg = fmt::format(
"reserve memory failed, size: {}, because query memory exceeded, memory tracker "
"consumption: {}, limit: {}",
PrettyPrinter::print(size, TUnit::BYTES),
PrettyPrinter::print(_limiter_tracker->consumption(), TUnit::BYTES),
PrettyPrinter::print(_limiter_tracker->limit(), TUnit::BYTES));
return doris::Status::Error<ErrorCode::QUERY_MEMORY_EXCEEDED>(err_msg);
}
if (wg_ptr) {
if (!wg_ptr->add_wg_refresh_interval_memory_growth(size)) {
if (only_check_process_memory) {
_limiter_tracker->reserve(size);
if (wg_ptr) {
wg_ptr->add_wg_refresh_interval_memory_growth(size);
}
} else {
if (!_limiter_tracker->try_reserve(size)) {
auto err_msg = fmt::format(
"reserve memory failed, size: {}, because workload group memory exceeded, "
"workload group: {}",
PrettyPrinter::print(size, TUnit::BYTES), wg_ptr->memory_debug_string());
_limiter_tracker->release(size); // rollback
_limiter_tracker->release_reserved(size); // rollback
return doris::Status::Error<ErrorCode::WORKLOAD_GROUP_MEMORY_EXCEEDED>(err_msg);
"reserve memory failed, size: {}, because query memory exceeded, memory "
"tracker "
"consumption: {}, limit: {}",
PrettyPrinter::print(size, TUnit::BYTES),
PrettyPrinter::print(_limiter_tracker->consumption(), TUnit::BYTES),
PrettyPrinter::print(_limiter_tracker->limit(), TUnit::BYTES));
return doris::Status::Error<ErrorCode::QUERY_MEMORY_EXCEEDED>(err_msg);
}
if (wg_ptr) {
if (!wg_ptr->try_add_wg_refresh_interval_memory_growth(size)) {
auto err_msg = fmt::format(
"reserve memory failed, size: {}, because workload group memory exceeded, "
"workload group: {}",
PrettyPrinter::print(size, TUnit::BYTES), wg_ptr->memory_debug_string());
_limiter_tracker->release(size); // rollback
_limiter_tracker->release_reserved(size); // rollback
return doris::Status::Error<ErrorCode::WORKLOAD_GROUP_MEMORY_EXCEEDED>(err_msg);
}
}
}

if (!doris::GlobalMemoryArbitrator::try_reserve_process_memory(size)) {
auto err_msg =
fmt::format("reserve memory failed, size: {}, because proccess memory exceeded, {}",
Expand Down
7 changes: 0 additions & 7 deletions be/src/runtime/runtime_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -410,13 +410,6 @@ class RuntimeState {

bool enable_page_cache() const;

int partitioned_hash_join_rows_threshold() const {
if (!_query_options.__isset.partitioned_hash_join_rows_threshold) {
return 0;
}
return _query_options.partitioned_hash_join_rows_threshold;
}

int partitioned_hash_agg_rows_threshold() const {
if (!_query_options.__isset.partitioned_hash_agg_rows_threshold) {
return 0;
Expand Down
11 changes: 10 additions & 1 deletion be/src/runtime/thread_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -249,13 +249,22 @@ class ThreadContext {
thread_mem_tracker_mgr->consume(size, skip_large_memory_check);
}

doris::Status reserve_memory(const int64_t size) const {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

xxx 和 try_xxx 区分不清楚是否要检查query和wg的limit。
我们换个名字,增加一个新的函数,try_reserve_process_memory

#ifdef USE_MEM_TRACKER
DCHECK(doris::k_doris_exit || !doris::config::enable_memory_orphan_check ||
thread_mem_tracker()->label() != "Orphan")
<< doris::memory_orphan_check_msg;
#endif
return thread_mem_tracker_mgr->try_reserve(size, true);
}

doris::Status try_reserve_memory(const int64_t size) const {
#ifdef USE_MEM_TRACKER
DCHECK(doris::k_doris_exit || !doris::config::enable_memory_orphan_check ||
thread_mem_tracker()->label() != "Orphan")
<< doris::memory_orphan_check_msg;
#endif
return thread_mem_tracker_mgr->try_reserve(size);
return thread_mem_tracker_mgr->try_reserve(size, false);
}

void release_reserved_memory() const {
Expand Down
Loading
Loading