From b6a788e99f493ceef0aba821ad9869ce78bbcc5e Mon Sep 17 00:00:00 2001 From: Boaz Sade Date: Tue, 22 Nov 2022 08:21:00 +0200 Subject: [PATCH] feat(server): WIP memory fragmentation WIP, high level flow only #448 Signed-off-by: Boaz Sade --- src/server/engine_shard_set.cc | 95 ++++++++++++++++++++++++++++++++++ src/server/engine_shard_set.h | 25 +++++++++ 2 files changed, 120 insertions(+) diff --git a/src/server/engine_shard_set.cc b/src/server/engine_shard_set.cc index a98a219c07a5..3aa18ddf9ff2 100644 --- a/src/server/engine_shard_set.cc +++ b/src/server/engine_shard_set.cc @@ -11,10 +11,12 @@ extern "C" { #include "base/flags.h" #include "base/logging.h" +#include "base/proc_util.h" #include "server/blocking_controller.h" #include "server/server_state.h" #include "server/tiered_storage.h" #include "server/transaction.h" +#include "strings/human_readable.h" #include "util/fiber_sched_algo.h" #include "util/varz.h" @@ -30,6 +32,17 @@ ABSL_FLAG(bool, cache_mode, false, "If true, the backend behaves like a cache, " "by evicting entries when getting close to maxmemory limit"); +// memory de-fragmentation related flags +ABSL_FLAG(uint64_t, mem_defrag_threshold, 20, + "Threshold level to run memory fragmentation between total available memory and " + "currently commited memory"); + +ABSL_FLAG(float, commit_use_threshold, 1.3, + "The threshold to apply memory fragmentation between commited and used memory"); + +ABSL_FLAG(float, mem_utilization_threshold, 0.5, + "Threshold for memory page under utilization. Bellow this threshold a pointer in this " + "page will be moved in memory"); namespace dfly { using namespace util; @@ -39,8 +52,19 @@ using absl::GetFlag; namespace { +constexpr DbIndex DEFAULT_DB_INDEX = 0; + vector cached_stats; // initialized in EngineShardSet::Init +inline bool MemUsageChanged(uint64_t current, uint64_t now) { + // TODO: remove commented out code + // static const float commit_use_threshold = GetFlag(FLAGS_commit_use_threshold); + // we only care whether the memory usage is going up + // otherwise we don't really case + return true; // no op for now + // return (current * commit_use_threshold) < float(now); +} + } // namespace constexpr size_t kQueueLen = 64; @@ -56,6 +80,72 @@ EngineShard::Stats& EngineShard::Stats::operator+=(const EngineShard::Stats& o) return *this; } +// This function checks 3 things: +// 1. Don't try memory fragmentation if we don't use "enough" memory (control by +// mem_defrag_threshold flag) +// 2. That we had change in memory usage - to prevent endless loop +// 3. in case the above is OK, make sure that we have a "gap" between usage and commited memory +// (control by commit_use_threshold flag) +bool EngineShard::DefragTaskArgs::IsRequired() { + static const uint64_t mem_size = max_memory_limit; + static const uint64_t threshold_mem = mem_size / GetFlag(FLAGS_mem_defrag_threshold); + static const float commit_use_threshold = GetFlag(FLAGS_commit_use_threshold); + + uint64_t commited = GetMallocCurrentCommitted(); + uint64_t mem_in_use = used_mem_current.load(memory_order_relaxed); + + // we want to make sure that we are not running this to many times - i.e. + // if there was no change to the memory, don't run this + if (MemUsageChanged(current_mem_commit, commited) || + MemUsageChanged(current_mem_used, mem_in_use)) { + if (threshold_mem < commited && mem_in_use != 0 && + (float(mem_in_use * commit_use_threshold) < commited)) { + // we have way more commited then actual usage + current_mem_commit = commited; + current_mem_used = mem_in_use; + return true; + } + } + return false; +} + +// for now this does nothing +bool EngineShard::DoDefrag() { + // TODO: impl! + // Placeholder for now! + return defrag_args_.cursor > 0; +} + +void EngineShard::DefragTaskArgs::Init() { + current_mem_commit = GetMallocCurrentCommitted(); + current_mem_used = used_mem_current.load(memory_order_relaxed); + cursor = 0u; +} + +// the memory de-fragmenting task is as follow: +// 1. Check if memory usage is high enough +// 2. Check if diff between commited and used memory is high enough +// 3. Check if we have memory changes (to ensure that we not running endlessly). +// 4. if all the above pass -> run on the shard and try to de-fragment memory by re-allocating +// values +// if the cursor for this is signal that we are not done, schedule the task to run at high +// priority otherwise lower the task priority so that it would not use the CPU when not required +uint32_t EngineShard::DefragTask() { + const auto shard_id = db_slice().shard_id(); + bool required_state = (defrag_args_.cursor == 0) && defrag_args_.IsRequired(); + if (required_state) { + VLOG(1) << shard_id << ": need to update memory - commited memory " + << strings::HumanReadableNumBytes(defrag_args_.current_mem_commit) + << ", the use memory is " + << strings::HumanReadableNumBytes(defrag_args_.current_mem_used); + if (DoDefrag()) { // we didn't finish the scan + return util::ProactorBase::kOnIdleMaxLevel; + } + } + // by default we just want to not get in the way.. + return 0u; +} + EngineShard::EngineShard(util::ProactorBase* pb, bool update_db_time, mi_heap_t* heap) : queue_(kQueueLen), txq_([](const Transaction* t) { return t->txid(); }), mi_resource_(heap), db_slice_(pb->GetIndex(), GetFlag(FLAGS_cache_mode), this) { @@ -75,6 +165,9 @@ EngineShard::EngineShard(util::ProactorBase* pb, bool update_db_time, mi_heap_t* tmp_str1 = sdsempty(); db_slice_.UpdateExpireBase(absl::GetCurrentTimeNanos() / 1000000, 0); + // start the de-fermentation task here + defrag_args_.Init(); + defrag_task_ = pb->AddOnIdleTask([this]() { return this->DefragTask(); }); } EngineShard::~EngineShard() { @@ -92,6 +185,8 @@ void EngineShard::Shutdown() { if (periodic_task_) { ProactorBase::me()->CancelPeriodic(periodic_task_); } + + ProactorBase::me()->RemoveOnIdleTask(defrag_task_); } void EngineShard::InitThreadLocal(ProactorBase* pb, bool update_db_time) { diff --git a/src/server/engine_shard_set.h b/src/server/engine_shard_set.h index 9c82c8f73591..5f67939019b4 100644 --- a/src/server/engine_shard_set.h +++ b/src/server/engine_shard_set.h @@ -144,6 +144,18 @@ class EngineShard { void TEST_EnableHeartbeat(); private: + struct DefragTaskArgs { + uint64_t current_mem_commit = 0u; + uint64_t current_mem_used = 0u; + uint64_t cursor = 0u; + + void Init(); + + // check the current threshold and return true if + // we need to do the de-fermentation + bool IsRequired(); + }; + EngineShard(util::ProactorBase* pb, bool update_db_time, mi_heap_t* heap); // blocks the calling fiber. @@ -153,6 +165,17 @@ class EngineShard { void CacheStats(); + // We are running a task that checks whether we need to + // do memory de-fragmentation here, this task only run + // when there are available CPU time. + uint32_t DefragTask(); + + // scan the shard with the cursor and apply + // de-fragmentation option for entries. This function will return the new cursor at the end of the + // scan This function is called from context of StartDefragTask + // return true if we did not complete the shard scan + bool DoDefrag(); + ::util::fibers_ext::FiberQueue queue_; ::boost::fibers::fiber fiber_q_; @@ -170,6 +193,8 @@ class EngineShard { IntentLock shard_lock_; uint32_t periodic_task_ = 0; + uint32_t defrag_task_ = 0; + DefragTaskArgs defrag_args_; std::unique_ptr tiered_storage_; std::unique_ptr blocking_controller_;