Skip to content

Commit

Permalink
feat(server): WIP memory fragmentation WIP, high level flow only drag…
Browse files Browse the repository at this point in the history
…onflydb#448

Signed-off-by: Boaz Sade <boaz@dragonflydb.io>
  • Loading branch information
boazsade committed Nov 22, 2022
1 parent da03cd8 commit 78bcef2
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 0 deletions.
89 changes: 89 additions & 0 deletions src/server/engine_shard_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@ extern "C" {

#include "base/flags.h"
#include "base/logging.h"
#include "base/proc_util.h"
#include "server/blocking_controller.h"
#include "server/server_state.h"
#include "server/tiered_storage.h"
#include "server/transaction.h"
#include "strings/human_readable.h"
#include "util/fiber_sched_algo.h"
#include "util/varz.h"

Expand All @@ -30,6 +32,17 @@ ABSL_FLAG(bool, cache_mode, false,
"If true, the backend behaves like a cache, "
"by evicting entries when getting close to maxmemory limit");

// memory de-fragmentation related flags
ABSL_FLAG(uint64_t, mem_defrag_threshold, 20,
"Threshold level to run memory fragmentation between total available memory and "
"currently commited memory");

ABSL_FLAG(float, commit_use_threshold, 1.3,
"The threshold to apply memory fragmentation between commited and used memory");

ABSL_FLAG(float, mem_utilization_threshold, 0.5,
"Threshold for memory page under utilization. Bellow this threshold a pointer in this "
"page will be moved in memory");
namespace dfly {

using namespace util;
Expand All @@ -39,8 +52,45 @@ using absl::GetFlag;

namespace {

constexpr DbIndex DEFAULT_DB_INDEX = 0;

vector<EngineShardSet::CachedStats> cached_stats; // initialized in EngineShardSet::Init

inline bool MemUsageChanged(uint64_t current, uint64_t now) {
static const float commit_use_threshold = GetFlag(FLAGS_commit_use_threshold);
// we only care whether the memory usage is going up
// otherwise we don't really case
return (current * commit_use_threshold) < float(now);
}

// This function checks 3 things:
// 1. Don't try memory fragmentation if we don't use "enough" memory (control by
// mem_defrag_threshold flag)
// 2. That we had change in memory usage - to prevent endless loop
// 3. in case the above is OK, make sure that we have a "gap" between usage and commited memory
// (control by commit_use_threshold flag)
bool IsMemDefragRequired(uint64_t* current_commit, uint64_t* current_use) {
static const uint64_t mem_size = max_memory_limit;
static const uint64_t threshold_mem = mem_size / GetFlag(FLAGS_mem_defrag_threshold);
static const float commit_use_threshold = GetFlag(FLAGS_commit_use_threshold);

uint64_t commited = GetMallocCurrentCommitted();
uint64_t mem_in_use = used_mem_current.load(memory_order_relaxed);

// we want to make sure that we are not running this to many times - i.e.
// if there was no change to the memory, don't run this
if (MemUsageChanged(*current_commit, commited) || MemUsageChanged(*current_use, mem_in_use)) {
if (threshold_mem < commited && mem_in_use != 0 &&
(float(mem_in_use * commit_use_threshold) < commited)) {
// we have way more commited then actual usage
*current_commit = commited;
*current_use = mem_in_use;
return true;
}
}
return false;
}

} // namespace

constexpr size_t kQueueLen = 64;
Expand All @@ -56,6 +106,41 @@ EngineShard::Stats& EngineShard::Stats::operator+=(const EngineShard::Stats& o)
return *this;
}

// for now this does nothing
uint64_t EngineShard::DoDefragIfRequired(uint64_t cursor) {
// Placeholder for now!
return cursor;
}

// the memory de-fragmenting task is as follow:
// 1. Check if memory usage is high enough
// 2. Check if diff between commited and used memory is high enough
// 3. Check if we have memory changes (to ensure that we not running endlessly).
// 4. if all the above pass -> run on the shard and try to de-fragment memory by re-allocating
// values
// if the cursor for this is signal that we are not done, schedule the task to run at high
// priority otherwise lower the task priority so that it would not use the CPU when not required
void EngineShard::StartDefragTask(util::ProactorBase* pb) {
uint64_t commited = GetMallocCurrentCommitted();
uint64_t mem_in_use = used_mem_current.load(memory_order_relaxed);

defrag_task_ = pb->AddOnIdleTask([this, commited, mem_in_use, cursor = 0ul]() mutable {
const auto shard_id = db_slice().shard_id();
bool required_state = IsMemDefragRequired(&commited, &mem_in_use);
if (required_state) {
VLOG(1) << shard_id << ": need to update memory - commited memory "
<< strings::HumanReadableNumBytes(commited) << ", the use memory is "
<< strings::HumanReadableNumBytes(mem_in_use);
cursor = DoDefragIfRequired(cursor); // we are only checking for DB == 0
if (cursor > 0) {
return util::ProactorBase::kOnIdleMaxLevel; // we need to run more of this
}
}
// by default we just want to not get in the way..
return 0u;
});
}

EngineShard::EngineShard(util::ProactorBase* pb, bool update_db_time, mi_heap_t* heap)
: queue_(kQueueLen), txq_([](const Transaction* t) { return t->txid(); }), mi_resource_(heap),
db_slice_(pb->GetIndex(), GetFlag(FLAGS_cache_mode), this) {
Expand All @@ -75,6 +160,8 @@ EngineShard::EngineShard(util::ProactorBase* pb, bool update_db_time, mi_heap_t*
tmp_str1 = sdsempty();

db_slice_.UpdateExpireBase(absl::GetCurrentTimeNanos() / 1000000, 0);

StartDefragTask(pb);
}

EngineShard::~EngineShard() {
Expand All @@ -92,6 +179,8 @@ void EngineShard::Shutdown() {
if (periodic_task_) {
ProactorBase::me()->CancelPeriodic(periodic_task_);
}

ProactorBase::me()->RemoveOnIdleTask(defrag_task_);
}

void EngineShard::InitThreadLocal(ProactorBase* pb, bool update_db_time) {
Expand Down
11 changes: 11 additions & 0 deletions src/server/engine_shard_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,16 @@ class EngineShard {

void CacheStats();

// We are running a task that checks whether we need to
// do memory de-fragmentation here, this task only run
// when there are available CPU time.
void StartDefragTask(util::ProactorBase* pb);

// scan the shard with the cursor and apply
// de-fragmentation option for entries. This function will return the new cursor at the end of the
// scan This function is called from context of StartDefragTask
uint64_t DoDefragIfRequired(uint64_t cursor);

::util::fibers_ext::FiberQueue queue_;
::boost::fibers::fiber fiber_q_;

Expand All @@ -170,6 +180,7 @@ class EngineShard {
IntentLock shard_lock_;

uint32_t periodic_task_ = 0;
uint32_t defrag_task_ = 0;
std::unique_ptr<TieredStorage> tiered_storage_;
std::unique_ptr<BlockingController> blocking_controller_;

Expand Down

0 comments on commit 78bcef2

Please sign in to comment.