Skip to content

Commit

Permalink
feat(server): memory defrag support - unit tests added to verify #448 (
Browse files Browse the repository at this point in the history
…#523)

feat(server): active memory defrag task #448

Signed-off-by: Boaz Sade <boaz@dragonflydb.io>
  • Loading branch information
boazsade authored Dec 4, 2022
1 parent 8bff194 commit 6ec4cf0
Show file tree
Hide file tree
Showing 9 changed files with 175 additions and 37 deletions.
48 changes: 44 additions & 4 deletions src/core/compact_object.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ using absl::GetFlag;
namespace {

constexpr XXH64_hash_t kHashSeed = 24061983;
constexpr size_t kAlignSize = 8u;

// Approximation since does not account for listpacks.
size_t QlMAllocSize(quicklist* ql) {
Expand Down Expand Up @@ -217,7 +218,7 @@ struct TL {

thread_local TL tl;

constexpr bool kUseSmallStrings = true;
constexpr bool kUseSmallStrings = false;

/// TODO: Ascii encoding becomes slow for large blobs. We should factor it out into a separate
/// file and implement with SIMD instructions.
Expand Down Expand Up @@ -380,6 +381,23 @@ void RobjWrapper::SetString(string_view s, pmr::memory_resource* mr) {
}
}

bool RobjWrapper::DefragIfNeeded(float ratio) {
if (type() == OBJ_STRING) { // only applicable to strings
if (zmalloc_page_is_underutilized(inner_obj(), ratio)) {
return Reallocate(tl.local_mr);
}
}
return false;
}

bool RobjWrapper::Reallocate(std::pmr::memory_resource* mr) {
void* old_ptr = inner_obj_;
inner_obj_ = mr->allocate(sz_, kAlignSize);
memcpy(inner_obj_, old_ptr, sz_);
mr->deallocate(old_ptr, 0, kAlignSize);
return true;
}

void RobjWrapper::Init(unsigned type, unsigned encoding, void* inner) {
type_ = type;
encoding_ = encoding;
Expand All @@ -398,13 +416,13 @@ void RobjWrapper::MakeInnerRoom(size_t current_cap, size_t desired, pmr::memory_
desired += SDS_MAX_PREALLOC;
}

void* newp = mr->allocate(desired, 8);
void* newp = mr->allocate(desired, kAlignSize);
if (sz_) {
memcpy(newp, inner_obj_, sz_);
}

if (current_cap) {
mr->deallocate(inner_obj_, current_cap, 8);
mr->deallocate(inner_obj_, current_cap, kAlignSize);
}
inner_obj_ = newp;
}
Expand Down Expand Up @@ -659,7 +677,7 @@ robj* CompactObj::AsRObj() const {
res->type = u_.r_obj.type();

if (res->type == OBJ_SET) {
LOG(FATAL) << "Should not call AsRObj for type " << res->type;
LOG(FATAL) << "Should not call AsRObj for type " << res->type;
}

if (res->type == OBJ_HASH) {
Expand Down Expand Up @@ -850,6 +868,28 @@ string_view CompactObj::GetSlice(string* scratch) const {
return string_view{};
}

bool CompactObj::DefragIfNeeded(float ratio) {
switch (taglen_) {
case ROBJ_TAG:
// currently only these objet types are supported for this operation
if (u_.r_obj.inner_obj() != nullptr) {
return u_.r_obj.DefragIfNeeded(ratio);
}
return false;
case SMALL_TAG:
// TODO - support this later
return false;
case INT_TAG:
// this is not relevant in this case
return false;
case EXTERNAL_TAG:
return false;
default:
// This is the case when the object is at inline_str
return false;
}
}

bool CompactObj::HasAllocated() const {
if (IsRef() || taglen_ == INT_TAG || IsInline() || taglen_ == EXTERNAL_TAG ||
(taglen_ == ROBJ_TAG && u_.r_obj.inner_obj() == nullptr))
Expand Down
7 changes: 6 additions & 1 deletion src/core/compact_object.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ typedef struct redisObject robj;
namespace dfly {

constexpr unsigned kEncodingIntSet = 0;
constexpr unsigned kEncodingStrMap = 1; // for set/map encodings of strings
constexpr unsigned kEncodingStrMap = 1; // for set/map encodings of strings
constexpr unsigned kEncodingStrMap2 = 2; // for set/map encodings of strings using DenseSet
constexpr unsigned kEncodingListPack = 3;

Expand Down Expand Up @@ -52,7 +52,10 @@ class RobjWrapper {
return std::string_view{reinterpret_cast<char*>(inner_obj_), sz_};
}

bool DefragIfNeeded(float ratio);

private:
bool Reallocate(std::pmr::memory_resource* mr);
size_t InnerObjMallocUsed() const;
void MakeInnerRoom(size_t current_cap, size_t desired, std::pmr::memory_resource* mr);

Expand Down Expand Up @@ -208,6 +211,8 @@ class CompactObj {
return mask_ & IO_PENDING;
}

bool DefragIfNeeded(float ratio);

void SetIoPending(bool b) {
if (b) {
mask_ |= IO_PENDING;
Expand Down
2 changes: 1 addition & 1 deletion src/core/compact_object_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ TEST_F(CompactObjectTest, MimallocUnderutilzationWithRealloc) {
bool found = HasUnderutilizedMemory(ptrs, kUnderUtilizedRatio);
ASSERT_FALSE(found);
DeallocateAtRandom(kRandomStep, &ptrs);
// TestMiMallocUnderutilized(ptrs, run_reallocation, allocation_size);

// This is another case, where we are filling the "gaps" by doing re-allocations
// in this case, since we are not setting all the values back it should still have
// places that are not used. Plus since we are not looking at the first page
Expand Down
1 change: 1 addition & 0 deletions src/server/dfly_main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,7 @@ Usage: dragonfly [FLAGS]
}
mi_option_enable(mi_option_show_errors);
mi_option_set(mi_option_max_warnings, 0);
mi_option_set(mi_option_decommit_delay, 0);

base::sys::KernelVersion kver;
base::sys::GetKernelVersion(&kver);
Expand Down
59 changes: 52 additions & 7 deletions src/server/dragonfly_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -758,21 +758,66 @@ TEST_F(DflyEngineTest, Bug496) {
TEST_F(DefragDflyEngineTest, TestDefragOption) {
// Fill data into dragonfly and then check if we have
// any location in memory to defrag. See issue #448 for details about this.
EXPECT_GE(max_memory_limit, 100'000);
const int NUMBER_OF_KEYS = 184000;
max_memory_limit = 300'000; // control memory size so no need for too many keys
constexpr int kNumberOfKeys = 100'000; // this fill the memory
constexpr int kKeySize = 137;
constexpr int kMaxDefragTriesForTests = 10;

std::vector<std::string> keys2delete;
keys2delete.push_back("del");

// Generate a list of keys that would be deleted
// The keys that we will delete are all in the form of "key-name:1<other digits>"
// This is because we are populating keys that has this format, but we don't want
// to delete all keys, only some random keys so we deleting those that start with 1
constexpr int kFactor = 10;
int kMaxNumKeysToDelete = 10'000;
int current_step = kFactor;
for (int i = 1; i < kMaxNumKeysToDelete; current_step *= kFactor) {
for (; i < current_step; i++) {
int j = i - 1 + current_step;
keys2delete.push_back("key-name:" + std::to_string(j));
}
}

std::vector<std::string_view> keys(keys2delete.begin(), keys2delete.end());

RespExpr resp = Run({"DEBUG", "POPULATE", std::to_string(NUMBER_OF_KEYS), "key-name", "130"});
RespExpr resp = Run(
{"DEBUG", "POPULATE", std::to_string(kNumberOfKeys), "key-name", std::to_string(kKeySize)});
ASSERT_EQ(resp, "OK");
resp = Run({"DBSIZE"});
EXPECT_THAT(resp, IntArg(NUMBER_OF_KEYS));
EXPECT_THAT(resp, IntArg(kNumberOfKeys));

shard_set->pool()->AwaitFiberOnAll([&](unsigned index, ProactorBase* base) {
EngineShard* shard = EngineShard::tlocal();
ASSERT_FALSE(shard == nullptr); // we only have one and its should not be empty!
this_fiber::sleep_for(100ms);
EXPECT_EQ(shard->GetDefragStats().success_count, 0);
// we are not running stats yet
EXPECT_EQ(shard->GetDefragStats().tries, 0);
EXPECT_GT(GetMallocCurrentCommitted(), NUMBER_OF_KEYS);
// we are expecting to have at least one try by now
EXPECT_GT(shard->GetDefragStats().tries, 0);
});

ArgSlice delete_cmd(keys);
auto r = CheckedInt(delete_cmd);
// the first element in this is the command del so size is one less
ASSERT_EQ(r, keys2delete.size() - 1);

// At this point we need to see whether we did running the task and whether the task did something
shard_set->pool()->AwaitFiberOnAll([&](unsigned index, ProactorBase* base) {
EngineShard* shard = EngineShard::tlocal();
ASSERT_FALSE(shard == nullptr); // we only have one and its should not be empty!
// a "busy wait" to ensure that memory defragmentations was successful:
// the task ran and did it work
auto stats = shard->GetDefragStats();
for (int i = 0; i < kMaxDefragTriesForTests; i++) {
stats = shard->GetDefragStats();
if (stats.success_count > 0) {
break;
}
this_fiber::sleep_for(220ms);
}
// make sure that we successfully found places to defrag in memory
EXPECT_GT(stats.success_count, 0);
});
}

Expand Down
77 changes: 57 additions & 20 deletions src/server/engine_shard_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ using absl::GetFlag;

namespace {

constexpr DbIndex DEFAULT_DB_INDEX = 0;
constexpr DbIndex kDefaultDbIndex = 0;
constexpr uint64_t kCursorDoneState = 0u;

vector<EngineShardSet::CachedStats> cached_stats; // initialized in EngineShardSet::Init

Expand Down Expand Up @@ -80,7 +81,7 @@ bool EngineShard::DefragTaskState::IsRequired() {
const uint64_t threshold_mem = max_memory_limit * GetFlag(FLAGS_mem_defrag_threshold);
const double commit_use_threshold = GetFlag(FLAGS_commit_use_threshold);

if (cursor > 0) {
if (cursor > kCursorDoneState) {
return true;
}

Expand All @@ -100,34 +101,73 @@ bool EngineShard::DefragTaskState::IsRequired() {

// for now this does nothing
bool EngineShard::DoDefrag() {
// TODO - Impl!!
return defrag_state_.cursor > 0;
}

void EngineShard::DefragTaskState::Init() {
cursor = 0u;
// --------------------------------------------------------------------------
// NOTE: This task is running with exclusive access to the shard.
// i.e. - Since we are using shared noting access here, and all access
// are done using fibers, This fiber is run only when no other fiber in the
// context of the controlling thread will access this shard!
// --------------------------------------------------------------------------

constexpr size_t kMaxTraverses = 50;
const float threshold = GetFlag(FLAGS_mem_utilization_threshold);

auto& slice = db_slice();
DCHECK(slice.IsDbValid(kDefaultDbIndex));
auto [prime_table, expire_table] = slice.GetTables(kDefaultDbIndex);
PrimeTable::Cursor cur = defrag_state_.cursor;
uint64_t defrag_count = 0;
unsigned traverses_count = 0;

do {
cur = prime_table->Traverse(cur, [&](PrimeIterator it) {
// for each value check whether we should move it because it
// seats on underutilized page of memory, and if so, do it.
bool did = it->second.DefragIfNeeded(threshold);
if (did) {
defrag_count++;
}
});
traverses_count++;
} while (traverses_count < kMaxTraverses && cur);

defrag_state_.cursor = cur.value();
if (defrag_count > 0) {
VLOG(1) << "shard " << slice.shard_id() << ": successfully defrag " << defrag_count
<< " times, did it in " << traverses_count << " cursor is at the "
<< (defrag_state_.cursor == 0 ? "end" : "in progress");
} else {
VLOG(1) << "shard " << slice.shard_id() << ": run the defrag " << traverses_count
<< " times out of maximum " << kMaxTraverses << ", with cursor at "
<< (defrag_state_.cursor == 0 ? "end" : "in progress")
<< " but no location for defrag were found";
}
defrag_state_.stats.success_count += defrag_count;
defrag_state_.stats.tries++;
return defrag_state_.cursor > kCursorDoneState;
}

// the memory defragmentation task is as follow:
// 1. Check if memory usage is high enough
// 2. Check if diff between commited and used memory is high enough
// 3. Check if we have memory changes (to ensure that we not running endlessly). - TODO
// 4. if all the above pass -> run on the shard and try to defragmented memory by re-allocating
// values
// if the cursor for this is signal that we are not done, schedule the task to run at high
// priority otherwise lower the task priority so that it would not use the CPU when not required
// 4. if all the above pass -> scan this shard and try to find whether we can move pointer to
// underutilized pages values
// if the cursor returned from scan is not in done state, schedule the task to run at high
// priority.
// otherwise lower the task priority so that it would not use the CPU when not required
uint32_t EngineShard::DefragTask() {
constexpr uint32_t kRunAtLowPriority = 0u;

const auto shard_id = db_slice().shard_id();
bool required_state = defrag_state_.IsRequired();
if (required_state) {
if (defrag_state_.IsRequired()) {
VLOG(1) << shard_id << ": need to run defrag memory cursor state: " << defrag_state_.cursor;
if (DoDefrag()) {
// we didn't finish the scan
return util::ProactorBase::kOnIdleMaxLevel;
}
}
// by default we just want to not get in the way..
return 0u;

return kRunAtLowPriority;
}

EngineShard::EngineShard(util::ProactorBase* pb, bool update_db_time, mi_heap_t* heap)
Expand All @@ -150,7 +190,6 @@ EngineShard::EngineShard(util::ProactorBase* pb, bool update_db_time, mi_heap_t*

db_slice_.UpdateExpireBase(absl::GetCurrentTimeNanos() / 1000000, 0);
// start the defragmented task here
defrag_state_.Init();
defrag_task_ = pb->AddOnIdleTask([this]() { return this->DefragTask(); });
}

Expand All @@ -170,9 +209,7 @@ void EngineShard::Shutdown() {
ProactorBase::me()->CancelPeriodic(periodic_task_);
}

if (defrag_task_ != 0) {
ProactorBase::me()->RemoveOnIdleTask(defrag_task_);
}
ProactorBase::me()->RemoveOnIdleTask(defrag_task_);
}

void EngineShard::InitThreadLocal(ProactorBase* pb, bool update_db_time) {
Expand Down
2 changes: 0 additions & 2 deletions src/server/engine_shard_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,6 @@ class EngineShard {
uint64_t cursor = 0u;
DefragStats stats;

void Init();

// check the current threshold and return true if
// we need to do the de-fermentation
bool IsRequired();
Expand Down
11 changes: 10 additions & 1 deletion src/server/test_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,15 @@ ABSL_FLAG(bool, force_epoll, false, "If true, uses epoll api instead iouring to

namespace dfly {

std::ostream& operator<<(std::ostream& os, ArgSlice& list) {
os << "[";
if (!list.empty()) {
std::for_each(list.begin(), list.end() - 1, [&os](const auto& val) { os << val << ", "; });
os << (*(list.end() - 1));
}
return os << "]";
}

extern unsigned kInitSegmentLog;

using MP = MemcacheParser;
Expand Down Expand Up @@ -267,7 +276,7 @@ auto BaseFamilyTest::GetMC(MP::CmdType cmd_type, std::initializer_list<std::stri
return conn->SplitLines();
}

int64_t BaseFamilyTest::CheckedInt(std::initializer_list<std::string_view> list) {
int64_t BaseFamilyTest::CheckedInt(ArgSlice list) {
RespExpr resp = Run(list);
if (resp.type == RespExpr::INT64) {
return get<int64_t>(resp.u);
Expand Down
5 changes: 4 additions & 1 deletion src/server/test_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,10 @@ class BaseFamilyTest : public ::testing::Test {
MCResponse RunMC(MemcacheParser::CmdType cmd_type, std::string_view key = std::string_view{});
MCResponse GetMC(MemcacheParser::CmdType cmd_type, std::initializer_list<std::string_view> list);

int64_t CheckedInt(std::initializer_list<std::string_view> list);
int64_t CheckedInt(std::initializer_list<std::string_view> list) {
return CheckedInt(ArgSlice{list.begin(), list.size()});
}
int64_t CheckedInt(ArgSlice list);

bool IsLocked(DbIndex db_index, std::string_view key) const;
ConnectionContext::DebugInfo GetDebugInfo(const std::string& id) const;
Expand Down

0 comments on commit 6ec4cf0

Please sign in to comment.