From 03062cae8ae5a83a0fe93097c8c5145509dc6481 Mon Sep 17 00:00:00 2001 From: Boaz Sade Date: Tue, 22 Nov 2022 08:21:00 +0200 Subject: [PATCH] feat(server): acrtive memory defrag high level flow Signed-off-by: Boaz Sade --- src/core/compact_object_test.cc | 99 +++++++++++++++++++++++++++++++-- src/redis/zmalloc.h | 6 ++ src/redis/zmalloc_mi.c | 4 ++ src/server/CMakeLists.txt | 2 +- src/server/dragonfly_test.cc | 28 ++++++++++ src/server/engine_shard_set.cc | 81 +++++++++++++++++++++++++++ src/server/engine_shard_set.h | 40 +++++++++++++ 7 files changed, 255 insertions(+), 5 deletions(-) diff --git a/src/core/compact_object_test.cc b/src/core/compact_object_test.cc index 500b4459d9f1..2365ff39b356 100644 --- a/src/core/compact_object_test.cc +++ b/src/core/compact_object_test.cc @@ -3,9 +3,9 @@ // #include "core/compact_object.h" +#include #include #include -#include #include "base/gtest.h" #include "base/logging.h" @@ -34,6 +34,36 @@ void PrintTo(const CompactObj& cobj, std::ostream* os) { *os << "cobj: [" << cobj.ObjType() << "]"; } +// This is for the mimalloc test - being able to find an address in memory +// where we have memory underutilzation +// see issue number 448 (https://github.com/dragonflydb/dragonfly/issues/448) +std::vector AllocateForTest(int size, std::size_t allocate_size, int factor1 = 1, + int factor2 = 1) { + std::vector ptrs; + for (int index = 0; index < size; index++) { + auto alloc_size = index % 13 == 0 ? allocate_size * factor1 : allocate_size * factor2; + auto heap_alloc = mi_heap_get_backing(); + void* ptr = mi_heap_malloc(heap_alloc, alloc_size); + ptrs.push_back(ptr); + } + return ptrs; +} + +bool FindUnderutilizedMemory(const std::vector& ptrs, float ratio) { + auto it = std::find_if(ptrs.begin(), ptrs.end(), [&](auto p) { + int r = p && zmalloc_page_is_underutilized(p, ratio); + return r > 0; + }); + return it != ptrs.end(); +} + +void DecimateMemory(std::vector* ptrs, size_t steps) { + for (size_t i = 24; i < ptrs->size(); i += steps) { + mi_free(ptrs->at(i)); + ptrs->at(i) = nullptr; + } +} + class CompactObjectTest : public ::testing::Test { protected: static void SetUpTestSuite() { @@ -110,7 +140,6 @@ TEST_F(CompactObjectTest, InlineAsciiEncoded) { EXPECT_EQ(s.size(), obj.Size()); } - TEST_F(CompactObjectTest, Int) { cobj_.SetString("0"); EXPECT_EQ(0, cobj_.TryGetInt()); @@ -211,13 +240,13 @@ TEST_F(CompactObjectTest, FlatSet) { size_t allocated2, resident2, active2; zmalloc_get_allocator_info(&allocated1, &active1, &resident1); - dict *d = dictCreate(&setDictType); + dict* d = dictCreate(&setDictType); constexpr size_t kTestSize = 2000; for (size_t i = 0; i < kTestSize; ++i) { sds key = sdsnew("key:000000000000"); key = sdscatfmt(key, "%U", i); - dictEntry *de = dictAddRaw(d, key,NULL); + dictEntry* de = dictAddRaw(d, key, NULL); de->v.val = NULL; } @@ -259,4 +288,66 @@ TEST_F(CompactObjectTest, StreamObj) { EXPECT_FALSE(cobj_.IsInline()); } +TEST_F(CompactObjectTest, MimallocUnderutilzation) { + // We are testing with the same object size allocation here + // This test is for https://github.com/dragonflydb/dragonfly/issues/448 + size_t allocation_size = 94; + int count = 2000; + std::vector ptrs = AllocateForTest(count, allocation_size); + bool found = FindUnderutilizedMemory(ptrs, 1.0f); + ASSERT_FALSE(found); + DecimateMemory(&ptrs, 26); + found = FindUnderutilizedMemory(ptrs, 1.0f); + ASSERT_TRUE(found); + for (auto* ptr : ptrs) { + mi_free(ptr); + } +} + +TEST_F(CompactObjectTest, MimallocUnderutilzationDifferentSizes) { + // This test uses different objects sizes to cover more use cases + // related to issue https://github.com/dragonflydb/dragonfly/issues/448 + size_t allocation_size = 97; + int count = 2000; + int mem_factor_1 = 3; + int mem_factor_2 = 2; + std::vector ptrs = AllocateForTest(count, allocation_size, mem_factor_1, mem_factor_2); + bool found = FindUnderutilizedMemory(ptrs, 1.0f); + ASSERT_FALSE(found); + DecimateMemory(&ptrs, 26); + found = FindUnderutilizedMemory(ptrs, 1.0f); + ASSERT_TRUE(found); + for (auto* ptr : ptrs) { + mi_free(ptr); + } +} + +TEST_F(CompactObjectTest, MimallocUnderutilzationWithRealloc) { + // This test is checking underutilzation with reallocation as well as deallocation + // of the memory - see issue https://github.com/dragonflydb/dragonfly/issues/448 + size_t allocation_size = 102; + int count = 2000; + int mem_factor_1 = 4; + int mem_factor_2 = 1; + std::vector ptrs = AllocateForTest(count, allocation_size, mem_factor_1, mem_factor_2); + bool found = FindUnderutilizedMemory(ptrs, 1.0f); + ASSERT_FALSE(found); + DecimateMemory(&ptrs, 26); + // TestMiMallocUnderutilized(ptrs, run_reallocation, allocation_size); + // This is another case, where we are filling the "gaps" by doing re-allocations + // in this case, since we are not setting all the values back it should still have + // places that are not used. Plus since we are not looking at the first page + // other pages should be underutilized. + for (size_t i = 24; i < ptrs.size(); i += 26) { + if (!ptrs[i]) { + ptrs[i] = mi_heap_malloc(mi_heap_get_backing(), allocation_size); + } + } + found = FindUnderutilizedMemory(ptrs, 1.0f); + ASSERT_TRUE(found); + for (auto* ptr : ptrs) { + mi_free(ptr); + } +} + } // namespace dfly diff --git a/src/redis/zmalloc.h b/src/redis/zmalloc.h index 2eb9e46501c8..01a87691860d 100644 --- a/src/redis/zmalloc.h +++ b/src/redis/zmalloc.h @@ -111,6 +111,12 @@ size_t zmalloc_get_smap_bytes_by_field(char *field, long pid); size_t zmalloc_get_memory_size(void); size_t zmalloc_usable_size(const void* p); +/* + * checks whether a page that the pointer ptr located at is underutilized. + * This uses the current local thread heap. + * return 0 if not, 1 if underutilized + */ +int zmalloc_page_is_underutilized(void *ptr, float ratio); // roman: void zlibc_free(void *ptr); void init_zmalloc_threadlocal(void* heap); diff --git a/src/redis/zmalloc_mi.c b/src/redis/zmalloc_mi.c index 42899a216ca3..8f9e5140c39e 100644 --- a/src/redis/zmalloc_mi.c +++ b/src/redis/zmalloc_mi.c @@ -137,3 +137,7 @@ void init_zmalloc_threadlocal(void* heap) { return; zmalloc_heap = heap; } + +int zmalloc_page_is_underutilized(void* ptr, float ratio) { + return mi_heap_page_is_underutilized(zmalloc_heap, ptr, ratio); +} diff --git a/src/server/CMakeLists.txt b/src/server/CMakeLists.txt index 72592ca4bad7..9887cf5d1be2 100644 --- a/src/server/CMakeLists.txt +++ b/src/server/CMakeLists.txt @@ -7,7 +7,7 @@ if (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND CMAKE_BUILD_TYPE STREQUAL "Rele set_source_files_properties(dfly_main.cc PROPERTIES COMPILE_FLAGS -march=core2 COMPILE_DEFINITIONS SOURCE_PATH_FROM_BUILD_ENV=${CMAKE_SOURCE_DIR}) endif() -add_library(dfly_transaction db_slice.cc engine_shard_set.cc blocking_controller.cc common.cc +add_library(dfly_transaction db_slice.cc malloc_stats.cc engine_shard_set.cc blocking_controller.cc common.cc io_mgr.cc journal/journal.cc journal/journal_slice.cc table.cc tiered_storage.cc transaction.cc) cxx_link(dfly_transaction uring_fiber_lib dfly_core strings_lib) diff --git a/src/server/dragonfly_test.cc b/src/server/dragonfly_test.cc index b5083ef6b3f5..20936aa61a5b 100644 --- a/src/server/dragonfly_test.cc +++ b/src/server/dragonfly_test.cc @@ -50,6 +50,13 @@ class DflyEngineTest : public BaseFamilyTest { } }; +class DefragDflyEngineTest : public DflyEngineTest { + protected: + DefragDflyEngineTest() : DflyEngineTest() { + num_threads_ = 1; + } +}; + // TODO: to implement equivalent parsing in redis parser. TEST_F(DflyEngineTest, Sds) { int argc; @@ -749,6 +756,27 @@ TEST_F(DflyEngineTest, Bug496) { }); } +TEST_F(DefragDflyEngineTest, TestDefragOption) { + // Fill data into dragonfly and then check if we have + // any location in memory to defrag. See issue #448 for details about this. + EXPECT_GE(max_memory_limit, 100'000); + const int NUMBER_OF_KEYS = 184000; + + RespExpr resp = Run({"DEBUG", "POPULATE", std::to_string(NUMBER_OF_KEYS), "key-name", "130"}); + ASSERT_EQ(resp, "OK"); + resp = Run({"DBSIZE"}); + EXPECT_THAT(resp, IntArg(NUMBER_OF_KEYS)); + + shard_set->pool()->AwaitFiberOnAll([&](unsigned index, ProactorBase* base) { + EngineShard* shard = EngineShard::tlocal(); + ASSERT_FALSE(shard == nullptr); // we only have one and its should not be empty! + EXPECT_EQ(shard->GetDefragStats().success_count, 0); + // should not run at all!! + EXPECT_EQ(shard->GetDefragStats().tries, 0); + EXPECT_GT(GetMallocCurrentCommitted(), NUMBER_OF_KEYS); + }); +} + // TODO: to test transactions with a single shard since then all transactions become local. // To consider having a parameter in dragonfly engine controlling number of shards // unconditionally from number of cpus. TO TEST BLPOP under multi for single/multi argument case. diff --git a/src/server/engine_shard_set.cc b/src/server/engine_shard_set.cc index ba1d1860e3e0..14deb79b6026 100644 --- a/src/server/engine_shard_set.cc +++ b/src/server/engine_shard_set.cc @@ -30,6 +30,18 @@ ABSL_FLAG(bool, cache_mode, false, "If true, the backend behaves like a cache, " "by evicting entries when getting close to maxmemory limit"); +// memory defragmented related flags +ABSL_FLAG(float, mem_defrag_threshold, + 1, // The default now is to disable the task from running! change this to 0.05!! + "Minimum percentage of used memory relative to total available memory before running " + "defragmentation"); + +ABSL_FLAG(float, commit_use_threshold, 1.3, + "The threshold to apply memory fragmentation between commited and used memory"); + +ABSL_FLAG(float, mem_utilization_threshold, 0.8, + "memory page under utilization threshold. Ratio between used and commited size, above " + "this, memory in this page will defragmented"); namespace dfly { using namespace util; @@ -39,6 +51,8 @@ using absl::GetFlag; namespace { +constexpr DbIndex DEFAULT_DB_INDEX = 0; + vector cached_stats; // initialized in EngineShardSet::Init } // namespace @@ -56,6 +70,66 @@ EngineShard::Stats& EngineShard::Stats::operator+=(const EngineShard::Stats& o) return *this; } +// This function checks 3 things: +// 1. Don't try memory fragmentation if we don't use "enough" memory (control by +// mem_defrag_threshold flag) +// 2. That we had change in memory usage - to prevent endless loop - out of scope for now +// 3. in case the above is OK, make sure that we have a "gap" between usage and commited memory +// (control by commit_use_threshold flag) +bool EngineShard::DefragTaskState::IsRequired() { + const double threshold_mem = max_memory_limit * GetFlag(FLAGS_mem_defrag_threshold); + const double commit_use_threshold = GetFlag(FLAGS_commit_use_threshold); + + if (cursor > 0) { + return true; + } + + uint64_t commited = GetMallocCurrentCommitted(); + uint64_t mem_in_use = used_mem_current.load(memory_order_relaxed); + + // we want to make sure that we are not running this to many times - i.e. + // if there was no change to the memory, don't run this + if (threshold_mem < commited && mem_in_use != 0 && + (uint64_t(mem_in_use * commit_use_threshold) < commited)) { + // we have way more commited then actual usage + return true; + } + + return false; +} + +// for now this does nothing +bool EngineShard::DoDefrag() { + // TODO - Impl!! + return defrag_state_.cursor > 0; +} + +void EngineShard::DefragTaskState::Init() { + cursor = 0u; +} + +// the memory defragmentation task is as follow: +// 1. Check if memory usage is high enough +// 2. Check if diff between commited and used memory is high enough +// 3. Check if we have memory changes (to ensure that we not running endlessly). - TODO +// 4. if all the above pass -> run on the shard and try to defragmented memory by re-allocating +// values +// if the cursor for this is signal that we are not done, schedule the task to run at high +// priority otherwise lower the task priority so that it would not use the CPU when not required +uint32_t EngineShard::DefragTask() { + const auto shard_id = db_slice().shard_id(); + bool required_state = defrag_state_.IsRequired(); + if (required_state) { + VLOG(1) << shard_id << ": need to run defrag memory cursor state: " << defrag_state_.cursor; + if (DoDefrag()) { + // we didn't finish the scan + return util::ProactorBase::kOnIdleMaxLevel; + } + } + // by default we just want to not get in the way.. + return 0u; +} + EngineShard::EngineShard(util::ProactorBase* pb, bool update_db_time, mi_heap_t* heap) : queue_(kQueueLen), txq_([](const Transaction* t) { return t->txid(); }), mi_resource_(heap), db_slice_(pb->GetIndex(), GetFlag(FLAGS_cache_mode), this) { @@ -75,6 +149,9 @@ EngineShard::EngineShard(util::ProactorBase* pb, bool update_db_time, mi_heap_t* tmp_str1 = sdsempty(); db_slice_.UpdateExpireBase(absl::GetCurrentTimeNanos() / 1000000, 0); + // start the defragmented task here + defrag_state_.Init(); + defrag_task_ = pb->AddOnIdleTask([this]() { return this->DefragTask(); }); } EngineShard::~EngineShard() { @@ -92,6 +169,10 @@ void EngineShard::Shutdown() { if (periodic_task_) { ProactorBase::me()->CancelPeriodic(periodic_task_); } + + if (defrag_task_ != 0) { + ProactorBase::me()->RemoveOnIdleTask(defrag_task_); + } } void EngineShard::InitThreadLocal(ProactorBase* pb, bool update_db_time) { diff --git a/src/server/engine_shard_set.h b/src/server/engine_shard_set.h index 6436cdc6e9a9..77a3fb9215c3 100644 --- a/src/server/engine_shard_set.h +++ b/src/server/engine_shard_set.h @@ -41,6 +41,11 @@ class EngineShard { Stats& operator+=(const Stats&); }; + struct DefragStats { + uint64_t success_count = 0; // how many objects were moved + uint64_t tries = 0; // how many times we tried + }; + // EngineShard() is private down below. ~EngineShard(); @@ -141,9 +146,25 @@ class EngineShard { journal_ = j; } + const DefragStats& GetDefragStats() const { + return defrag_state_.stats; + } + void TEST_EnableHeartbeat(); private: + struct DefragTaskState { + // we will add more data members later + uint64_t cursor = 0u; + DefragStats stats; + + void Init(); + + // check the current threshold and return true if + // we need to do the de-fermentation + bool IsRequired(); + }; + EngineShard(util::ProactorBase* pb, bool update_db_time, mi_heap_t* heap); // blocks the calling fiber. @@ -153,6 +174,23 @@ class EngineShard { void CacheStats(); + // We are running a task that checks whether we need to + // do memory de-fragmentation here, this task only run + // when there are available CPU time. + // -------------------------------------------------------------------------- + // NOTE: This task is running with exclusive access to the shard. + // i.e. - Since we are using shared noting access here, and all access + // are done using fibers, This fiber is run only when no other fiber in the + // context of the controlling thread will access this shard! + // -------------------------------------------------------------------------- + uint32_t DefragTask(); + + // scan the shard with the cursor and apply + // de-fragmentation option for entries. This function will return the new cursor at the end of the + // scan This function is called from context of StartDefragTask + // return true if we did not complete the shard scan + bool DoDefrag(); + ::util::fibers_ext::FiberQueue queue_; ::boost::fibers::fiber fiber_q_; @@ -170,6 +208,8 @@ class EngineShard { IntentLock shard_lock_; uint32_t periodic_task_ = 0; + uint32_t defrag_task_ = 0; + DefragTaskState defrag_state_; std::unique_ptr tiered_storage_; std::unique_ptr blocking_controller_;