Skip to content

Commit

Permalink
feat(server): active derag task WIP for high level flow #448
Browse files Browse the repository at this point in the history
Signed-off-by: Boaz Sade <boaz@dragonflydb.io>
  • Loading branch information
boazsade committed Nov 22, 2022
1 parent da03cd8 commit 59597f3
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 0 deletions.
95 changes: 95 additions & 0 deletions src/server/engine_shard_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@ extern "C" {

#include "base/flags.h"
#include "base/logging.h"
#include "base/proc_util.h"
#include "server/blocking_controller.h"
#include "server/server_state.h"
#include "server/tiered_storage.h"
#include "server/transaction.h"
#include "strings/human_readable.h"
#include "util/fiber_sched_algo.h"
#include "util/varz.h"

Expand All @@ -30,6 +32,17 @@ ABSL_FLAG(bool, cache_mode, false,
"If true, the backend behaves like a cache, "
"by evicting entries when getting close to maxmemory limit");

// memory de-fragmentation related flags
ABSL_FLAG(float, mem_defrag_threshold, 20.0,
"Threshold level to run memory fragmentation between total available memory and "
"currently commited memory");

ABSL_FLAG(float, commit_use_threshold, 1.3,
"The threshold to apply memory fragmentation between commited and used memory");

ABSL_FLAG(float, mem_utilization_threshold, 0.5,
"Threshold for memory page under utilization. Bellow this threshold a pointer in this "
"page will be moved in memory");
namespace dfly {

using namespace util;
Expand All @@ -39,8 +52,19 @@ using absl::GetFlag;

namespace {

constexpr DbIndex DEFAULT_DB_INDEX = 0;

vector<EngineShardSet::CachedStats> cached_stats; // initialized in EngineShardSet::Init

inline bool MemUsageChanged(uint64_t current, uint64_t now) {
// TODO: remove commented out code
// static const float commit_use_threshold = GetFlag(FLAGS_commit_use_threshold);
// we only care whether the memory usage is going up
// otherwise we don't really case
return true; // no op for now
// return (current * commit_use_threshold) < float(now);
}

} // namespace

constexpr size_t kQueueLen = 64;
Expand All @@ -56,6 +80,72 @@ EngineShard::Stats& EngineShard::Stats::operator+=(const EngineShard::Stats& o)
return *this;
}

// This function checks 3 things:
// 1. Don't try memory fragmentation if we don't use "enough" memory (control by
// mem_defrag_threshold flag)
// 2. That we had change in memory usage - to prevent endless loop
// 3. in case the above is OK, make sure that we have a "gap" between usage and commited memory
// (control by commit_use_threshold flag)
bool EngineShard::DefragTaskArgs::IsRequired() {
static const uint64_t mem_size = max_memory_limit;
static const float threshold_mem = mem_size / GetFlag(FLAGS_mem_defrag_threshold);
static const float commit_use_threshold = GetFlag(FLAGS_commit_use_threshold);

uint64_t commited = GetMallocCurrentCommitted();
uint64_t mem_in_use = used_mem_current.load(memory_order_relaxed);

// we want to make sure that we are not running this to many times - i.e.
// if there was no change to the memory, don't run this
if (MemUsageChanged(current_mem_commit, commited) ||
MemUsageChanged(current_mem_used, mem_in_use)) {
if (threshold_mem < commited && mem_in_use != 0 &&
(float(mem_in_use * commit_use_threshold) < commited)) {
// we have way more commited then actual usage
current_mem_commit = commited;
current_mem_used = mem_in_use;
return true;
}
}
return false;
}

// for now this does nothing
bool EngineShard::DoDefrag() {
// TODO: impl!
// Placeholder for now!
return defrag_args_.cursor > 0;
}

void EngineShard::DefragTaskArgs::Init() {
current_mem_commit = GetMallocCurrentCommitted();
current_mem_used = used_mem_current.load(memory_order_relaxed);
cursor = 0u;
}

// the memory de-fragmenting task is as follow:
// 1. Check if memory usage is high enough
// 2. Check if diff between commited and used memory is high enough
// 3. Check if we have memory changes (to ensure that we not running endlessly).
// 4. if all the above pass -> run on the shard and try to de-fragment memory by re-allocating
// values
// if the cursor for this is signal that we are not done, schedule the task to run at high
// priority otherwise lower the task priority so that it would not use the CPU when not required
uint32_t EngineShard::DefragTask() {
const auto shard_id = db_slice().shard_id();
bool required_state = (defrag_args_.cursor == 0) && defrag_args_.IsRequired();
if (required_state) {
VLOG(1) << shard_id << ": need to update memory - commited memory "
<< strings::HumanReadableNumBytes(defrag_args_.current_mem_commit)
<< ", the use memory is "
<< strings::HumanReadableNumBytes(defrag_args_.current_mem_used);
if (DoDefrag()) { // we didn't finish the scan
return util::ProactorBase::kOnIdleMaxLevel;
}
}
// by default we just want to not get in the way..
return 0u;
}

EngineShard::EngineShard(util::ProactorBase* pb, bool update_db_time, mi_heap_t* heap)
: queue_(kQueueLen), txq_([](const Transaction* t) { return t->txid(); }), mi_resource_(heap),
db_slice_(pb->GetIndex(), GetFlag(FLAGS_cache_mode), this) {
Expand All @@ -75,6 +165,9 @@ EngineShard::EngineShard(util::ProactorBase* pb, bool update_db_time, mi_heap_t*
tmp_str1 = sdsempty();

db_slice_.UpdateExpireBase(absl::GetCurrentTimeNanos() / 1000000, 0);
// start the de-fermentation task here
defrag_args_.Init();
defrag_task_ = pb->AddOnIdleTask([this]() { return this->DefragTask(); });
}

EngineShard::~EngineShard() {
Expand All @@ -92,6 +185,8 @@ void EngineShard::Shutdown() {
if (periodic_task_) {
ProactorBase::me()->CancelPeriodic(periodic_task_);
}

ProactorBase::me()->RemoveOnIdleTask(defrag_task_);
}

void EngineShard::InitThreadLocal(ProactorBase* pb, bool update_db_time) {
Expand Down
25 changes: 25 additions & 0 deletions src/server/engine_shard_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,18 @@ class EngineShard {
void TEST_EnableHeartbeat();

private:
struct DefragTaskArgs {
uint64_t current_mem_commit = 0u;
uint64_t current_mem_used = 0u;
uint64_t cursor = 0u;

void Init();

// check the current threshold and return true if
// we need to do the de-fermentation
bool IsRequired();
};

EngineShard(util::ProactorBase* pb, bool update_db_time, mi_heap_t* heap);

// blocks the calling fiber.
Expand All @@ -153,6 +165,17 @@ class EngineShard {

void CacheStats();

// We are running a task that checks whether we need to
// do memory de-fragmentation here, this task only run
// when there are available CPU time.
uint32_t DefragTask();

// scan the shard with the cursor and apply
// de-fragmentation option for entries. This function will return the new cursor at the end of the
// scan This function is called from context of StartDefragTask
// return true if we did not complete the shard scan
bool DoDefrag();

::util::fibers_ext::FiberQueue queue_;
::boost::fibers::fiber fiber_q_;

Expand All @@ -170,6 +193,8 @@ class EngineShard {
IntentLock shard_lock_;

uint32_t periodic_task_ = 0;
uint32_t defrag_task_ = 0;
DefragTaskArgs defrag_args_;
std::unique_ptr<TieredStorage> tiered_storage_;
std::unique_ptr<BlockingController> blocking_controller_;

Expand Down

0 comments on commit 59597f3

Please sign in to comment.