Skip to content

Commit

Permalink
[fix](tls) Manually track memory in Allocator instead of mem hook and…
Browse files Browse the repository at this point in the history
… ThreadContext life cycle to manual control (apache#26904)

Manually track query/load/compaction/etc. memory in Allocator instead of mem hook.
Can still use Mem Hook when cannot manually track memory code segments and find memory locations during debugging.
This will cause memory tracking loss for Query, loss less than 10% compared to the past, but this is expected to be more controllable.
Similarly, Mem Hook will no longer track unowned memory to the orphan mem tracker by default, so the total memory of all MemTrackers will be less than before.
Not need to get memory size from jemalloc in Mem Hook each memory alloc and free, which would lose performance in the past.
Not require caching bthread local in pthread local for memory hook, in the past this has caused core dumps inside bthread, seems to be a bug in bthread.
ThreadContext life cycle to manual control
In the past, ThreadContext was automatically created when it was used for the first time (this was usually in the Jemalloc Hook when the first malloc memory), and was automatically destroyed when the thread exited.
Now instead of manually controlling the create and destroy of ThreadContext, it is mainly created manually when the task thread start and destroyed before the task thread end.
Run 43 clickbench query tests.
Use MemHook in the past:
  • Loading branch information
xinyiZzz authored and 胥剑旭 committed Dec 14, 2023
1 parent f9f3e67 commit af7e273
Show file tree
Hide file tree
Showing 27 changed files with 397 additions and 513 deletions.
1 change: 0 additions & 1 deletion be/src/io/fs/stream_load_pipe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ StreamLoadPipe::StreamLoadPipe(size_t max_buffered_bytes, size_t min_chunk_size,
_use_proto(use_proto) {}

StreamLoadPipe::~StreamLoadPipe() {
SCOPED_TRACK_MEMORY_TO_UNKNOWN();
while (!_buf_queue.empty()) {
_buf_queue.pop_front();
}
Expand Down
8 changes: 4 additions & 4 deletions be/src/olap/schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ using SchemaSPtr = std::shared_ptr<const Schema>;
class Schema {
public:
Schema(TabletSchemaSPtr tablet_schema) {
SCOPED_MEM_COUNT(&_mem_size);
SCOPED_MEM_COUNT_BY_HOOK(&_mem_size);
size_t num_columns = tablet_schema->num_columns();
// ignore this column
if (tablet_schema->columns().back().name() == BeConsts::ROW_STORE_COL) {
Expand Down Expand Up @@ -86,7 +86,7 @@ class Schema {

// All the columns of one table may exist in the columns param, but col_ids is only a subset.
Schema(const std::vector<TabletColumn>& columns, const std::vector<ColumnId>& col_ids) {
SCOPED_MEM_COUNT(&_mem_size);
SCOPED_MEM_COUNT_BY_HOOK(&_mem_size);
size_t num_key_columns = 0;
_unique_ids.resize(columns.size());
for (size_t i = 0; i < columns.size(); ++i) {
Expand All @@ -109,7 +109,7 @@ class Schema {

// Only for UT
Schema(const std::vector<TabletColumn>& columns, size_t num_key_columns) {
SCOPED_MEM_COUNT(&_mem_size);
SCOPED_MEM_COUNT_BY_HOOK(&_mem_size);
std::vector<ColumnId> col_ids(columns.size());
_unique_ids.resize(columns.size());
for (uint32_t cid = 0; cid < columns.size(); ++cid) {
Expand All @@ -121,7 +121,7 @@ class Schema {
}

Schema(const std::vector<const Field*>& cols, size_t num_key_columns) {
SCOPED_MEM_COUNT(&_mem_size);
SCOPED_MEM_COUNT_BY_HOOK(&_mem_size);
std::vector<ColumnId> col_ids(cols.size());
_unique_ids.resize(cols.size());
for (uint32_t cid = 0; cid < cols.size(); ++cid) {
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/tablet_schema.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -720,7 +720,7 @@ void TabletSchema::clear_columns() {
}

void TabletSchema::init_from_pb(const TabletSchemaPB& schema) {
SCOPED_MEM_COUNT(&_mem_size);
SCOPED_MEM_COUNT_BY_HOOK(&_mem_size);
_keys_type = schema.keys_type();
_num_columns = 0;
_num_variant_columns = 0;
Expand Down
11 changes: 2 additions & 9 deletions be/src/runtime/buffer_control_block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ void GetResultBatchCtx::on_failure(const Status& status) {
status.to_protobuf(result->mutable_status());
{
// call by result sink
SCOPED_TRACK_MEMORY_TO_UNKNOWN();
done->Run();
}
delete this;
Expand All @@ -57,10 +56,7 @@ void GetResultBatchCtx::on_close(int64_t packet_seq, QueryStatistics* statistics
}
result->set_packet_seq(packet_seq);
result->set_eos(true);
{
SCOPED_TRACK_MEMORY_TO_UNKNOWN();
done->Run();
}
{ done->Run(); }
delete this;
}

Expand All @@ -85,10 +81,7 @@ void GetResultBatchCtx::on_data(const std::unique_ptr<TFetchDataResult>& t_resul
result->set_eos(eos);
}
st.to_protobuf(result->mutable_status());
{
SCOPED_TRACK_MEMORY_TO_UNKNOWN();
done->Run();
}
{ done->Run(); }
delete this;
}

Expand Down
68 changes: 42 additions & 26 deletions be/src/runtime/memory/jemalloc_hook.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,18 @@ extern "C" {
// mem hook should avoid nesting new/malloc.

void* doris_malloc(size_t size) __THROW {
CONSUME_MEM_TRACKER(jenallocx(size, 0));
CONSUME_THREAD_MEM_TRACKER_BY_HOOK_WITH_FN([](size_t size) { return jenallocx(size, 0); },
size);
void* ptr = jemalloc(size);
if (UNLIKELY(ptr == nullptr)) {
RELEASE_MEM_TRACKER(jenallocx(size, 0));
RELEASE_THREAD_MEM_TRACKER_BY_HOOK_WITH_FN([](size_t size) { return jenallocx(size, 0); },
size);
}
return ptr;
}

void doris_free(void* p) __THROW {
RELEASE_MEM_TRACKER(jemalloc_usable_size(p));
RELEASE_THREAD_MEM_TRACKER_BY_HOOK_WITH_FN([](void* p) { return jemalloc_usable_size(p); }, p);
jefree(p);
}

Expand All @@ -60,87 +62,101 @@ void* doris_realloc(void* p, size_t size) __THROW {

#if USE_MEM_TRACKER
int64_t old_size = jemalloc_usable_size(p);
#endif

CONSUME_MEM_TRACKER(jenallocx(size, 0) - old_size);
CONSUME_THREAD_MEM_TRACKER_BY_HOOK_WITH_FN(
[](size_t size, int64_t old_size) { return jenallocx(size, 0) - old_size; }, size,
old_size);
void* ptr = jerealloc(p, size);
if (UNLIKELY(ptr == nullptr)) {
RELEASE_MEM_TRACKER(jenallocx(size, 0) - old_size);
RELEASE_THREAD_MEM_TRACKER_BY_HOOK_WITH_FN(
[](size_t size, int64_t old_size) { return jenallocx(size, 0) - old_size; }, size,
old_size);
}
return ptr;
#else
void* ptr = jerealloc(p, size);
return ptr;
#endif
}

void* doris_calloc(size_t n, size_t size) __THROW {
if (UNLIKELY(size == 0)) {
return nullptr;
}

CONSUME_MEM_TRACKER(n * size);
CONSUME_THREAD_MEM_TRACKER_BY_HOOK(n * size);
void* ptr = jecalloc(n, size);
if (UNLIKELY(ptr == nullptr)) {
RELEASE_MEM_TRACKER(n * size);
RELEASE_THREAD_MEM_TRACKER_BY_HOOK(n * size);
} else {
CONSUME_MEM_TRACKER(jemalloc_usable_size(ptr) - n * size);
CONSUME_THREAD_MEM_TRACKER_BY_HOOK_WITH_FN(
[](void* ptr, size_t size) { return jemalloc_usable_size(ptr) - size; }, ptr,
n * size);
}
return ptr;
}

void doris_cfree(void* ptr) __THROW {
RELEASE_MEM_TRACKER(jemalloc_usable_size(ptr));
RELEASE_THREAD_MEM_TRACKER_BY_HOOK_WITH_FN([](void* ptr) { return jemalloc_usable_size(ptr); },
ptr);
jefree(ptr);
}

void* doris_memalign(size_t align, size_t size) __THROW {
CONSUME_MEM_TRACKER(size);
CONSUME_THREAD_MEM_TRACKER_BY_HOOK(size);
void* ptr = jealigned_alloc(align, size);
if (UNLIKELY(ptr == nullptr)) {
RELEASE_MEM_TRACKER(size);
RELEASE_THREAD_MEM_TRACKER_BY_HOOK(size);
} else {
CONSUME_MEM_TRACKER(jemalloc_usable_size(ptr) - size);
CONSUME_THREAD_MEM_TRACKER_BY_HOOK_WITH_FN(
[](void* ptr, size_t size) { return jemalloc_usable_size(ptr) - size; }, ptr, size);
}
return ptr;
}

void* doris_aligned_alloc(size_t align, size_t size) __THROW {
CONSUME_MEM_TRACKER(size);
CONSUME_THREAD_MEM_TRACKER_BY_HOOK(size);
void* ptr = jealigned_alloc(align, size);
if (UNLIKELY(ptr == nullptr)) {
RELEASE_MEM_TRACKER(size);
RELEASE_THREAD_MEM_TRACKER_BY_HOOK(size);
} else {
CONSUME_MEM_TRACKER(jemalloc_usable_size(ptr) - size);
CONSUME_THREAD_MEM_TRACKER_BY_HOOK_WITH_FN(
[](void* ptr, size_t size) { return jemalloc_usable_size(ptr) - size; }, ptr, size);
}
return ptr;
}

void* doris_valloc(size_t size) __THROW {
CONSUME_MEM_TRACKER(size);
CONSUME_THREAD_MEM_TRACKER_BY_HOOK(size);
void* ptr = jevalloc(size);
if (UNLIKELY(ptr == nullptr)) {
RELEASE_MEM_TRACKER(size);
RELEASE_THREAD_MEM_TRACKER_BY_HOOK(size);
} else {
CONSUME_MEM_TRACKER(jemalloc_usable_size(ptr) - size);
CONSUME_THREAD_MEM_TRACKER_BY_HOOK_WITH_FN(
[](void* ptr, size_t size) { return jemalloc_usable_size(ptr) - size; }, ptr, size);
}
return ptr;
}

void* doris_pvalloc(size_t size) __THROW {
CONSUME_MEM_TRACKER(size);
CONSUME_THREAD_MEM_TRACKER_BY_HOOK(size);
void* ptr = jevalloc(size);
if (UNLIKELY(ptr == nullptr)) {
RELEASE_MEM_TRACKER(size);
RELEASE_THREAD_MEM_TRACKER_BY_HOOK(size);
} else {
CONSUME_MEM_TRACKER(jemalloc_usable_size(ptr) - size);
CONSUME_THREAD_MEM_TRACKER_BY_HOOK_WITH_FN(
[](void* ptr, size_t size) { return jemalloc_usable_size(ptr) - size; }, ptr, size);
}
return ptr;
}

int doris_posix_memalign(void** r, size_t align, size_t size) __THROW {
CONSUME_MEM_TRACKER(size);
CONSUME_THREAD_MEM_TRACKER_BY_HOOK(size);
int ret = jeposix_memalign(r, align, size);
if (UNLIKELY(ret != 0)) {
RELEASE_MEM_TRACKER(size);
RELEASE_THREAD_MEM_TRACKER_BY_HOOK(size);
} else {
CONSUME_MEM_TRACKER(jemalloc_usable_size(*r) - size);
CONSUME_THREAD_MEM_TRACKER_BY_HOOK_WITH_FN(
[](void* ptr, size_t size) { return jemalloc_usable_size(ptr) - size; }, *r, size);
}
return ret;
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/memory/mem_tracker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ void MemTracker::bind_parent(MemTrackerLimiter* parent) {
if (parent) {
_parent_label = parent->label();
_parent_group_num = parent->group_num();
} else if (thread_context_ptr.init) {
} else if (is_thread_context_init()) {
_parent_label = thread_context()->thread_mem_tracker()->label();
_parent_group_num = thread_context()->thread_mem_tracker()->group_num();
}
Expand Down
28 changes: 22 additions & 6 deletions be/src/runtime/memory/mem_tracker_limiter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ void MemTrackerLimiter::refresh_global_counter() {

void MemTrackerLimiter::make_process_snapshots(std::vector<MemTracker::Snapshot>* snapshots) {
MemTrackerLimiter::refresh_global_counter();
int64_t process_mem_sum = 0;
int64_t all_tracker_mem_sum = 0;
Snapshot snapshot;
for (auto it : MemTrackerLimiter::TypeMemSum) {
snapshot.type = type_string(it.first);
Expand All @@ -145,7 +145,7 @@ void MemTrackerLimiter::make_process_snapshots(std::vector<MemTracker::Snapshot>
snapshot.cur_consumption = it.second->current_value();
snapshot.peak_consumption = it.second->peak_value();
(*snapshots).emplace_back(snapshot);
process_mem_sum += it.second->current_value();
all_tracker_mem_sum += it.second->current_value();
}

snapshot.type = "tc/jemalloc_free_memory";
Expand All @@ -154,14 +154,28 @@ void MemTrackerLimiter::make_process_snapshots(std::vector<MemTracker::Snapshot>
snapshot.cur_consumption = MemInfo::allocator_cache_mem();
snapshot.peak_consumption = -1;
(*snapshots).emplace_back(snapshot);
process_mem_sum += MemInfo::allocator_cache_mem();
all_tracker_mem_sum += MemInfo::allocator_cache_mem();

snapshot.type = "process";
snapshot.type = "all_tracker_sum (is virtual memory)";
snapshot.label = "";
snapshot.limit = -1;
snapshot.cur_consumption = process_mem_sum;
snapshot.cur_consumption = all_tracker_mem_sum;
snapshot.peak_consumption = -1;
(*snapshots).emplace_back(snapshot);

snapshot.type = "process resident memory (from /proc VmRSS VmHWM)";
snapshot.label = "";
snapshot.limit = -1;
snapshot.cur_consumption = PerfCounters::get_vm_rss();
snapshot.peak_consumption = PerfCounters::get_vm_hwm();
(*snapshots).emplace_back(snapshot);

snapshot.type = "process virtual memory (from /proc VmSize VmPeak)";
snapshot.label = "";
snapshot.limit = -1;
snapshot.cur_consumption = PerfCounters::get_vm_size();
snapshot.peak_consumption = PerfCounters::get_vm_peak();
(*snapshots).emplace_back(snapshot);
}

void MemTrackerLimiter::make_type_snapshots(std::vector<MemTracker::Snapshot>* snapshots,
Expand Down Expand Up @@ -343,7 +357,9 @@ std::string MemTrackerLimiter::tracker_limit_exceeded_str() {
err_msg += fmt::format(
" exec node:<{}>, can `set exec_mem_limit=8G` to change limit, details see "
"be.INFO.",
doris::thread_context()->thread_mem_tracker_mgr->last_consumer_tracker());
doris::is_thread_context_init()
? doris::thread_context()->thread_mem_tracker_mgr->last_consumer_tracker()
: "");
} else if (_type == Type::SCHEMA_CHANGE) {
err_msg += fmt::format(
" can modify `memory_limitation_per_thread_for_schema_change_bytes` in be.conf to "
Expand Down
3 changes: 3 additions & 0 deletions be/src/runtime/memory/mem_tracker_limiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,9 @@ class MemTrackerLimiter final : public MemTracker {

// Transfer 'bytes' of consumption from this tracker to 'dst'.
void transfer_to(int64_t size, MemTrackerLimiter* dst) {
if (label() == dst->label()) {
return;
}
cache_consume(-size);
dst->cache_consume(size);
}
Expand Down
4 changes: 2 additions & 2 deletions be/src/runtime/memory/tcmalloc_hook.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@
// destructor to control the behavior of consume can lead to unexpected behavior,
// like this: if (LIKELY(doris::start_thread_mem_tracker)) {
void new_hook(const void* ptr, size_t size) {
CONSUME_MEM_TRACKER(tc_nallocx(size, 0));
CONSUME_THREAD_MEM_TRACKER_BY_HOOK(tc_nallocx(size, 0));
}

void delete_hook(const void* ptr) {
RELEASE_MEM_TRACKER(tc_malloc_size(const_cast<void*>(ptr)));
RELEASE_THREAD_MEM_TRACKER_BY_HOOK(tc_malloc_size(const_cast<void*>(ptr)));
}

void init_hook() {
Expand Down
16 changes: 9 additions & 7 deletions be/src/runtime/memory/thread_mem_tracker_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,13 @@ namespace doris {
// Memory Hook is counted in the memory tracker of the current thread.
class ThreadMemTrackerMgr {
public:
ThreadMemTrackerMgr() {}
ThreadMemTrackerMgr() = default;

~ThreadMemTrackerMgr() {
// if _init == false, exec env is not initialized when init(). and never consumed mem tracker once.
if (_init) flush_untracked_mem();
if (_init) {
flush_untracked_mem();
}
}

bool init();
Expand Down Expand Up @@ -77,7 +79,7 @@ class ThreadMemTrackerMgr {
// such as calling LOG/iostream/sstream/stringstream/etc. related methods,
// must increase the control to avoid entering infinite recursion, otherwise it may cause crash or stuck,
// Returns whether the memory exceeds limit, and will consume mem trcker no matter whether the limit is exceeded.
void consume(int64_t size, bool large_memory_check = false);
void consume(int64_t size, int skip_large_memory_check = 0);
void flush_untracked_mem();

bool is_attach_query() { return _fragment_instance_id != TUniqueId(); }
Expand All @@ -92,7 +94,7 @@ class ThreadMemTrackerMgr {
}

void disable_wait_gc() { _wait_gc = false; }
bool wait_gc() { return _wait_gc; }
[[nodiscard]] bool wait_gc() const { return _wait_gc; }
void cancel_instance(const std::string& exceed_msg);

std::string print_debug_string() {
Expand Down Expand Up @@ -161,9 +163,9 @@ inline void ThreadMemTrackerMgr::pop_consumer_tracker() {
_consumer_tracker_stack.pop_back();
}

inline void ThreadMemTrackerMgr::consume(int64_t size, bool large_memory_check) {
inline void ThreadMemTrackerMgr::consume(int64_t size, int skip_large_memory_check) {
_untracked_mem += size;
if (!ExecEnv::ready()) {
if (!_init && !ExecEnv::ready()) {
return;
}
// When some threads `0 < _untracked_mem < config::mem_tracker_consume_min_size_bytes`
Expand All @@ -176,7 +178,7 @@ inline void ThreadMemTrackerMgr::consume(int64_t size, bool large_memory_check)
flush_untracked_mem();
}

if (large_memory_check && doris::config::large_memory_check_bytes > 0 &&
if (skip_large_memory_check == 0 && doris::config::large_memory_check_bytes > 0 &&
size > doris::config::large_memory_check_bytes) {
_stop_consume = true;
LOG(WARNING) << fmt::format(
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/runtime_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ Status RuntimeState::check_query_state(const std::string& msg) {
//
// If the thread MemTrackerLimiter exceeds the limit, an error status is returned.
// Usually used after SCOPED_ATTACH_TASK, during query execution.
if (thread_context()->thread_mem_tracker()->limit_exceeded() &&
if (is_thread_context_init() && thread_context()->thread_mem_tracker()->limit_exceeded() &&
!config::enable_query_memory_overcommit) {
auto failed_msg =
fmt::format("{}, {}", msg,
Expand Down
Loading

0 comments on commit af7e273

Please sign in to comment.