Skip to content

Commit

Permalink
feat(server): acrtive memory defrag high level flow
Browse files Browse the repository at this point in the history
Signed-off-by: Boaz Sade <boaz@dragonflydb.io>
  • Loading branch information
boazsade committed Nov 29, 2022
1 parent 685b441 commit 03062ca
Show file tree
Hide file tree
Showing 7 changed files with 255 additions and 5 deletions.
99 changes: 95 additions & 4 deletions src/core/compact_object_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
//
#include "core/compact_object.h"

#include <absl/strings/str_cat.h>
#include <mimalloc.h>
#include <xxhash.h>
#include <absl/strings/str_cat.h>

#include "base/gtest.h"
#include "base/logging.h"
Expand Down Expand Up @@ -34,6 +34,36 @@ void PrintTo(const CompactObj& cobj, std::ostream* os) {
*os << "cobj: [" << cobj.ObjType() << "]";
}

// This is for the mimalloc test - being able to find an address in memory
// where we have memory underutilzation
// see issue number 448 (https://github.com/dragonflydb/dragonfly/issues/448)
std::vector<void*> AllocateForTest(int size, std::size_t allocate_size, int factor1 = 1,
int factor2 = 1) {
std::vector<void*> ptrs;
for (int index = 0; index < size; index++) {
auto alloc_size = index % 13 == 0 ? allocate_size * factor1 : allocate_size * factor2;
auto heap_alloc = mi_heap_get_backing();
void* ptr = mi_heap_malloc(heap_alloc, alloc_size);
ptrs.push_back(ptr);
}
return ptrs;
}

bool FindUnderutilizedMemory(const std::vector<void*>& ptrs, float ratio) {
auto it = std::find_if(ptrs.begin(), ptrs.end(), [&](auto p) {
int r = p && zmalloc_page_is_underutilized(p, ratio);
return r > 0;
});
return it != ptrs.end();
}

void DecimateMemory(std::vector<void*>* ptrs, size_t steps) {
for (size_t i = 24; i < ptrs->size(); i += steps) {
mi_free(ptrs->at(i));
ptrs->at(i) = nullptr;
}
}

class CompactObjectTest : public ::testing::Test {
protected:
static void SetUpTestSuite() {
Expand Down Expand Up @@ -110,7 +140,6 @@ TEST_F(CompactObjectTest, InlineAsciiEncoded) {
EXPECT_EQ(s.size(), obj.Size());
}


TEST_F(CompactObjectTest, Int) {
cobj_.SetString("0");
EXPECT_EQ(0, cobj_.TryGetInt());
Expand Down Expand Up @@ -211,13 +240,13 @@ TEST_F(CompactObjectTest, FlatSet) {
size_t allocated2, resident2, active2;

zmalloc_get_allocator_info(&allocated1, &active1, &resident1);
dict *d = dictCreate(&setDictType);
dict* d = dictCreate(&setDictType);
constexpr size_t kTestSize = 2000;

for (size_t i = 0; i < kTestSize; ++i) {
sds key = sdsnew("key:000000000000");
key = sdscatfmt(key, "%U", i);
dictEntry *de = dictAddRaw(d, key,NULL);
dictEntry* de = dictAddRaw(d, key, NULL);
de->v.val = NULL;
}

Expand Down Expand Up @@ -259,4 +288,66 @@ TEST_F(CompactObjectTest, StreamObj) {
EXPECT_FALSE(cobj_.IsInline());
}

TEST_F(CompactObjectTest, MimallocUnderutilzation) {
// We are testing with the same object size allocation here
// This test is for https://github.com/dragonflydb/dragonfly/issues/448
size_t allocation_size = 94;
int count = 2000;
std::vector<void*> ptrs = AllocateForTest(count, allocation_size);
bool found = FindUnderutilizedMemory(ptrs, 1.0f);
ASSERT_FALSE(found);
DecimateMemory(&ptrs, 26);
found = FindUnderutilizedMemory(ptrs, 1.0f);
ASSERT_TRUE(found);
for (auto* ptr : ptrs) {
mi_free(ptr);
}
}

TEST_F(CompactObjectTest, MimallocUnderutilzationDifferentSizes) {
// This test uses different objects sizes to cover more use cases
// related to issue https://github.com/dragonflydb/dragonfly/issues/448
size_t allocation_size = 97;
int count = 2000;
int mem_factor_1 = 3;
int mem_factor_2 = 2;
std::vector<void*> ptrs = AllocateForTest(count, allocation_size, mem_factor_1, mem_factor_2);
bool found = FindUnderutilizedMemory(ptrs, 1.0f);
ASSERT_FALSE(found);
DecimateMemory(&ptrs, 26);
found = FindUnderutilizedMemory(ptrs, 1.0f);
ASSERT_TRUE(found);
for (auto* ptr : ptrs) {
mi_free(ptr);
}
}

TEST_F(CompactObjectTest, MimallocUnderutilzationWithRealloc) {
// This test is checking underutilzation with reallocation as well as deallocation
// of the memory - see issue https://github.com/dragonflydb/dragonfly/issues/448
size_t allocation_size = 102;
int count = 2000;
int mem_factor_1 = 4;
int mem_factor_2 = 1;
std::vector<void*> ptrs = AllocateForTest(count, allocation_size, mem_factor_1, mem_factor_2);
bool found = FindUnderutilizedMemory(ptrs, 1.0f);
ASSERT_FALSE(found);
DecimateMemory(&ptrs, 26);
// TestMiMallocUnderutilized(ptrs, run_reallocation, allocation_size);
// This is another case, where we are filling the "gaps" by doing re-allocations
// in this case, since we are not setting all the values back it should still have
// places that are not used. Plus since we are not looking at the first page
// other pages should be underutilized.
for (size_t i = 24; i < ptrs.size(); i += 26) {
if (!ptrs[i]) {
ptrs[i] = mi_heap_malloc(mi_heap_get_backing(), allocation_size);
}
}
found = FindUnderutilizedMemory(ptrs, 1.0f);
ASSERT_TRUE(found);
for (auto* ptr : ptrs) {
mi_free(ptr);
}
}

} // namespace dfly
6 changes: 6 additions & 0 deletions src/redis/zmalloc.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,12 @@ size_t zmalloc_get_smap_bytes_by_field(char *field, long pid);
size_t zmalloc_get_memory_size(void);
size_t zmalloc_usable_size(const void* p);

/*
* checks whether a page that the pointer ptr located at is underutilized.
* This uses the current local thread heap.
* return 0 if not, 1 if underutilized
*/
int zmalloc_page_is_underutilized(void *ptr, float ratio);
// roman: void zlibc_free(void *ptr);

void init_zmalloc_threadlocal(void* heap);
Expand Down
4 changes: 4 additions & 0 deletions src/redis/zmalloc_mi.c
Original file line number Diff line number Diff line change
Expand Up @@ -137,3 +137,7 @@ void init_zmalloc_threadlocal(void* heap) {
return;
zmalloc_heap = heap;
}

int zmalloc_page_is_underutilized(void* ptr, float ratio) {
return mi_heap_page_is_underutilized(zmalloc_heap, ptr, ratio);
}
2 changes: 1 addition & 1 deletion src/server/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ if (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND CMAKE_BUILD_TYPE STREQUAL "Rele
set_source_files_properties(dfly_main.cc PROPERTIES COMPILE_FLAGS -march=core2 COMPILE_DEFINITIONS SOURCE_PATH_FROM_BUILD_ENV=${CMAKE_SOURCE_DIR})
endif()

add_library(dfly_transaction db_slice.cc engine_shard_set.cc blocking_controller.cc common.cc
add_library(dfly_transaction db_slice.cc malloc_stats.cc engine_shard_set.cc blocking_controller.cc common.cc
io_mgr.cc journal/journal.cc journal/journal_slice.cc table.cc
tiered_storage.cc transaction.cc)
cxx_link(dfly_transaction uring_fiber_lib dfly_core strings_lib)
Expand Down
28 changes: 28 additions & 0 deletions src/server/dragonfly_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,13 @@ class DflyEngineTest : public BaseFamilyTest {
}
};

class DefragDflyEngineTest : public DflyEngineTest {
protected:
DefragDflyEngineTest() : DflyEngineTest() {
num_threads_ = 1;
}
};

// TODO: to implement equivalent parsing in redis parser.
TEST_F(DflyEngineTest, Sds) {
int argc;
Expand Down Expand Up @@ -749,6 +756,27 @@ TEST_F(DflyEngineTest, Bug496) {
});
}

TEST_F(DefragDflyEngineTest, TestDefragOption) {
// Fill data into dragonfly and then check if we have
// any location in memory to defrag. See issue #448 for details about this.
EXPECT_GE(max_memory_limit, 100'000);
const int NUMBER_OF_KEYS = 184000;

RespExpr resp = Run({"DEBUG", "POPULATE", std::to_string(NUMBER_OF_KEYS), "key-name", "130"});
ASSERT_EQ(resp, "OK");
resp = Run({"DBSIZE"});
EXPECT_THAT(resp, IntArg(NUMBER_OF_KEYS));

shard_set->pool()->AwaitFiberOnAll([&](unsigned index, ProactorBase* base) {
EngineShard* shard = EngineShard::tlocal();
ASSERT_FALSE(shard == nullptr); // we only have one and its should not be empty!
EXPECT_EQ(shard->GetDefragStats().success_count, 0);
// should not run at all!!
EXPECT_EQ(shard->GetDefragStats().tries, 0);
EXPECT_GT(GetMallocCurrentCommitted(), NUMBER_OF_KEYS);
});
}

// TODO: to test transactions with a single shard since then all transactions become local.
// To consider having a parameter in dragonfly engine controlling number of shards
// unconditionally from number of cpus. TO TEST BLPOP under multi for single/multi argument case.
Expand Down
81 changes: 81 additions & 0 deletions src/server/engine_shard_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,18 @@ ABSL_FLAG(bool, cache_mode, false,
"If true, the backend behaves like a cache, "
"by evicting entries when getting close to maxmemory limit");

// memory defragmented related flags
ABSL_FLAG(float, mem_defrag_threshold,
1, // The default now is to disable the task from running! change this to 0.05!!
"Minimum percentage of used memory relative to total available memory before running "
"defragmentation");

ABSL_FLAG(float, commit_use_threshold, 1.3,
"The threshold to apply memory fragmentation between commited and used memory");

ABSL_FLAG(float, mem_utilization_threshold, 0.8,
"memory page under utilization threshold. Ratio between used and commited size, above "
"this, memory in this page will defragmented");
namespace dfly {

using namespace util;
Expand All @@ -39,6 +51,8 @@ using absl::GetFlag;

namespace {

constexpr DbIndex DEFAULT_DB_INDEX = 0;

vector<EngineShardSet::CachedStats> cached_stats; // initialized in EngineShardSet::Init

} // namespace
Expand All @@ -56,6 +70,66 @@ EngineShard::Stats& EngineShard::Stats::operator+=(const EngineShard::Stats& o)
return *this;
}

// This function checks 3 things:
// 1. Don't try memory fragmentation if we don't use "enough" memory (control by
// mem_defrag_threshold flag)
// 2. That we had change in memory usage - to prevent endless loop - out of scope for now
// 3. in case the above is OK, make sure that we have a "gap" between usage and commited memory
// (control by commit_use_threshold flag)
bool EngineShard::DefragTaskState::IsRequired() {
const double threshold_mem = max_memory_limit * GetFlag(FLAGS_mem_defrag_threshold);
const double commit_use_threshold = GetFlag(FLAGS_commit_use_threshold);

if (cursor > 0) {
return true;
}

uint64_t commited = GetMallocCurrentCommitted();
uint64_t mem_in_use = used_mem_current.load(memory_order_relaxed);

// we want to make sure that we are not running this to many times - i.e.
// if there was no change to the memory, don't run this
if (threshold_mem < commited && mem_in_use != 0 &&
(uint64_t(mem_in_use * commit_use_threshold) < commited)) {
// we have way more commited then actual usage
return true;
}

return false;
}

// for now this does nothing
bool EngineShard::DoDefrag() {
// TODO - Impl!!
return defrag_state_.cursor > 0;
}

void EngineShard::DefragTaskState::Init() {
cursor = 0u;
}

// the memory defragmentation task is as follow:
// 1. Check if memory usage is high enough
// 2. Check if diff between commited and used memory is high enough
// 3. Check if we have memory changes (to ensure that we not running endlessly). - TODO
// 4. if all the above pass -> run on the shard and try to defragmented memory by re-allocating
// values
// if the cursor for this is signal that we are not done, schedule the task to run at high
// priority otherwise lower the task priority so that it would not use the CPU when not required
uint32_t EngineShard::DefragTask() {
const auto shard_id = db_slice().shard_id();
bool required_state = defrag_state_.IsRequired();
if (required_state) {
VLOG(1) << shard_id << ": need to run defrag memory cursor state: " << defrag_state_.cursor;
if (DoDefrag()) {
// we didn't finish the scan
return util::ProactorBase::kOnIdleMaxLevel;
}
}
// by default we just want to not get in the way..
return 0u;
}

EngineShard::EngineShard(util::ProactorBase* pb, bool update_db_time, mi_heap_t* heap)
: queue_(kQueueLen), txq_([](const Transaction* t) { return t->txid(); }), mi_resource_(heap),
db_slice_(pb->GetIndex(), GetFlag(FLAGS_cache_mode), this) {
Expand All @@ -75,6 +149,9 @@ EngineShard::EngineShard(util::ProactorBase* pb, bool update_db_time, mi_heap_t*
tmp_str1 = sdsempty();

db_slice_.UpdateExpireBase(absl::GetCurrentTimeNanos() / 1000000, 0);
// start the defragmented task here
defrag_state_.Init();
defrag_task_ = pb->AddOnIdleTask([this]() { return this->DefragTask(); });
}

EngineShard::~EngineShard() {
Expand All @@ -92,6 +169,10 @@ void EngineShard::Shutdown() {
if (periodic_task_) {
ProactorBase::me()->CancelPeriodic(periodic_task_);
}

if (defrag_task_ != 0) {
ProactorBase::me()->RemoveOnIdleTask(defrag_task_);
}
}

void EngineShard::InitThreadLocal(ProactorBase* pb, bool update_db_time) {
Expand Down
40 changes: 40 additions & 0 deletions src/server/engine_shard_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ class EngineShard {
Stats& operator+=(const Stats&);
};

struct DefragStats {
uint64_t success_count = 0; // how many objects were moved
uint64_t tries = 0; // how many times we tried
};

// EngineShard() is private down below.
~EngineShard();

Expand Down Expand Up @@ -141,9 +146,25 @@ class EngineShard {
journal_ = j;
}

const DefragStats& GetDefragStats() const {
return defrag_state_.stats;
}

void TEST_EnableHeartbeat();

private:
struct DefragTaskState {
// we will add more data members later
uint64_t cursor = 0u;
DefragStats stats;

void Init();

// check the current threshold and return true if
// we need to do the de-fermentation
bool IsRequired();
};

EngineShard(util::ProactorBase* pb, bool update_db_time, mi_heap_t* heap);

// blocks the calling fiber.
Expand All @@ -153,6 +174,23 @@ class EngineShard {

void CacheStats();

// We are running a task that checks whether we need to
// do memory de-fragmentation here, this task only run
// when there are available CPU time.
// --------------------------------------------------------------------------
// NOTE: This task is running with exclusive access to the shard.
// i.e. - Since we are using shared noting access here, and all access
// are done using fibers, This fiber is run only when no other fiber in the
// context of the controlling thread will access this shard!
// --------------------------------------------------------------------------
uint32_t DefragTask();

// scan the shard with the cursor and apply
// de-fragmentation option for entries. This function will return the new cursor at the end of the
// scan This function is called from context of StartDefragTask
// return true if we did not complete the shard scan
bool DoDefrag();

::util::fibers_ext::FiberQueue queue_;
::boost::fibers::fiber fiber_q_;

Expand All @@ -170,6 +208,8 @@ class EngineShard {
IntentLock shard_lock_;

uint32_t periodic_task_ = 0;
uint32_t defrag_task_ = 0;
DefragTaskState defrag_state_;
std::unique_ptr<TieredStorage> tiered_storage_;
std::unique_ptr<BlockingController> blocking_controller_;

Expand Down

0 comments on commit 03062ca

Please sign in to comment.