From 3eb6035485b490b2c84ca17caccb3132635c6288 Mon Sep 17 00:00:00 2001 From: Boaz Sade Date: Tue, 22 Nov 2022 08:21:00 +0200 Subject: [PATCH] feat(server): high level active defrag #448 Signed-off-by: Boaz Sade --- helio | 2 +- src/core/compact_object.cc | 9 +-- src/core/compact_object.h | 2 +- src/core/compact_object_test.cc | 99 +++++++++++++++++++++++++++++++-- src/redis/zmalloc.h | 38 +++++++------ src/redis/zmalloc_mi.c | 4 ++ src/server/CMakeLists.txt | 2 +- src/server/dragonfly_test.cc | 28 ++++++++++ src/server/engine_shard_set.cc | 83 +++++++++++++++++++++++++++ src/server/engine_shard_set.h | 40 +++++++++++++ src/server/test_utils.cc | 2 +- src/server/test_utils.h | 2 +- 12 files changed, 280 insertions(+), 31 deletions(-) diff --git a/helio b/helio index 3ad1137f0541..3dbdb2ab7bbd 160000 --- a/helio +++ b/helio @@ -1 +1 @@ -Subproject commit 3ad1137f0541381c73a44a3430ea38c7af7796bd +Subproject commit 3dbdb2ab7bbd59932ed2ee89a0c01d43b3eb4672 diff --git a/src/core/compact_object.cc b/src/core/compact_object.cc index f48095c4cfa7..94c5254ef0fb 100644 --- a/src/core/compact_object.cc +++ b/src/core/compact_object.cc @@ -39,6 +39,7 @@ using absl::GetFlag; namespace { constexpr XXH64_hash_t kHashSeed = 24061983; +constexpr size_t kAlignSize = 8u; // Approximation since does not account for listpacks. size_t QlMAllocSize(quicklist* ql) { @@ -217,7 +218,7 @@ struct TL { thread_local TL tl; -constexpr bool kUseSmallStrings = true; +constexpr bool kUseSmallStrings = false; /// TODO: Ascii encoding becomes slow for large blobs. We should factor it out into a separate /// file and implement with SIMD instructions. @@ -398,13 +399,13 @@ void RobjWrapper::MakeInnerRoom(size_t current_cap, size_t desired, pmr::memory_ desired += SDS_MAX_PREALLOC; } - void* newp = mr->allocate(desired, 8); + void* newp = mr->allocate(desired, kAlignSize); if (sz_) { memcpy(newp, inner_obj_, sz_); } if (current_cap) { - mr->deallocate(inner_obj_, current_cap, 8); + mr->deallocate(inner_obj_, current_cap, kAlignSize); } inner_obj_ = newp; } @@ -659,7 +660,7 @@ robj* CompactObj::AsRObj() const { res->type = u_.r_obj.type(); if (res->type == OBJ_SET) { - LOG(FATAL) << "Should not call AsRObj for type " << res->type; + LOG(FATAL) << "Should not call AsRObj for type " << res->type; } if (res->type == OBJ_HASH) { diff --git a/src/core/compact_object.h b/src/core/compact_object.h index 6f56a16432ca..c9f599f6b682 100644 --- a/src/core/compact_object.h +++ b/src/core/compact_object.h @@ -16,7 +16,7 @@ typedef struct redisObject robj; namespace dfly { constexpr unsigned kEncodingIntSet = 0; -constexpr unsigned kEncodingStrMap = 1; // for set/map encodings of strings +constexpr unsigned kEncodingStrMap = 1; // for set/map encodings of strings constexpr unsigned kEncodingStrMap2 = 2; // for set/map encodings of strings using DenseSet constexpr unsigned kEncodingListPack = 3; diff --git a/src/core/compact_object_test.cc b/src/core/compact_object_test.cc index 500b4459d9f1..2365ff39b356 100644 --- a/src/core/compact_object_test.cc +++ b/src/core/compact_object_test.cc @@ -3,9 +3,9 @@ // #include "core/compact_object.h" +#include #include #include -#include #include "base/gtest.h" #include "base/logging.h" @@ -34,6 +34,36 @@ void PrintTo(const CompactObj& cobj, std::ostream* os) { *os << "cobj: [" << cobj.ObjType() << "]"; } +// This is for the mimalloc test - being able to find an address in memory +// where we have memory underutilzation +// see issue number 448 (https://github.com/dragonflydb/dragonfly/issues/448) +std::vector AllocateForTest(int size, std::size_t allocate_size, int factor1 = 1, + int factor2 = 1) { + std::vector ptrs; + for (int index = 0; index < size; index++) { + auto alloc_size = index % 13 == 0 ? allocate_size * factor1 : allocate_size * factor2; + auto heap_alloc = mi_heap_get_backing(); + void* ptr = mi_heap_malloc(heap_alloc, alloc_size); + ptrs.push_back(ptr); + } + return ptrs; +} + +bool FindUnderutilizedMemory(const std::vector& ptrs, float ratio) { + auto it = std::find_if(ptrs.begin(), ptrs.end(), [&](auto p) { + int r = p && zmalloc_page_is_underutilized(p, ratio); + return r > 0; + }); + return it != ptrs.end(); +} + +void DecimateMemory(std::vector* ptrs, size_t steps) { + for (size_t i = 24; i < ptrs->size(); i += steps) { + mi_free(ptrs->at(i)); + ptrs->at(i) = nullptr; + } +} + class CompactObjectTest : public ::testing::Test { protected: static void SetUpTestSuite() { @@ -110,7 +140,6 @@ TEST_F(CompactObjectTest, InlineAsciiEncoded) { EXPECT_EQ(s.size(), obj.Size()); } - TEST_F(CompactObjectTest, Int) { cobj_.SetString("0"); EXPECT_EQ(0, cobj_.TryGetInt()); @@ -211,13 +240,13 @@ TEST_F(CompactObjectTest, FlatSet) { size_t allocated2, resident2, active2; zmalloc_get_allocator_info(&allocated1, &active1, &resident1); - dict *d = dictCreate(&setDictType); + dict* d = dictCreate(&setDictType); constexpr size_t kTestSize = 2000; for (size_t i = 0; i < kTestSize; ++i) { sds key = sdsnew("key:000000000000"); key = sdscatfmt(key, "%U", i); - dictEntry *de = dictAddRaw(d, key,NULL); + dictEntry* de = dictAddRaw(d, key, NULL); de->v.val = NULL; } @@ -259,4 +288,66 @@ TEST_F(CompactObjectTest, StreamObj) { EXPECT_FALSE(cobj_.IsInline()); } +TEST_F(CompactObjectTest, MimallocUnderutilzation) { + // We are testing with the same object size allocation here + // This test is for https://github.com/dragonflydb/dragonfly/issues/448 + size_t allocation_size = 94; + int count = 2000; + std::vector ptrs = AllocateForTest(count, allocation_size); + bool found = FindUnderutilizedMemory(ptrs, 1.0f); + ASSERT_FALSE(found); + DecimateMemory(&ptrs, 26); + found = FindUnderutilizedMemory(ptrs, 1.0f); + ASSERT_TRUE(found); + for (auto* ptr : ptrs) { + mi_free(ptr); + } +} + +TEST_F(CompactObjectTest, MimallocUnderutilzationDifferentSizes) { + // This test uses different objects sizes to cover more use cases + // related to issue https://github.com/dragonflydb/dragonfly/issues/448 + size_t allocation_size = 97; + int count = 2000; + int mem_factor_1 = 3; + int mem_factor_2 = 2; + std::vector ptrs = AllocateForTest(count, allocation_size, mem_factor_1, mem_factor_2); + bool found = FindUnderutilizedMemory(ptrs, 1.0f); + ASSERT_FALSE(found); + DecimateMemory(&ptrs, 26); + found = FindUnderutilizedMemory(ptrs, 1.0f); + ASSERT_TRUE(found); + for (auto* ptr : ptrs) { + mi_free(ptr); + } +} + +TEST_F(CompactObjectTest, MimallocUnderutilzationWithRealloc) { + // This test is checking underutilzation with reallocation as well as deallocation + // of the memory - see issue https://github.com/dragonflydb/dragonfly/issues/448 + size_t allocation_size = 102; + int count = 2000; + int mem_factor_1 = 4; + int mem_factor_2 = 1; + std::vector ptrs = AllocateForTest(count, allocation_size, mem_factor_1, mem_factor_2); + bool found = FindUnderutilizedMemory(ptrs, 1.0f); + ASSERT_FALSE(found); + DecimateMemory(&ptrs, 26); + // TestMiMallocUnderutilized(ptrs, run_reallocation, allocation_size); + // This is another case, where we are filling the "gaps" by doing re-allocations + // in this case, since we are not setting all the values back it should still have + // places that are not used. Plus since we are not looking at the first page + // other pages should be underutilized. + for (size_t i = 24; i < ptrs.size(); i += 26) { + if (!ptrs[i]) { + ptrs[i] = mi_heap_malloc(mi_heap_get_backing(), allocation_size); + } + } + found = FindUnderutilizedMemory(ptrs, 1.0f); + ASSERT_TRUE(found); + for (auto* ptr : ptrs) { + mi_free(ptr); + } +} + } // namespace dfly diff --git a/src/redis/zmalloc.h b/src/redis/zmalloc.h index 2eb9e46501c8..b95bec354468 100644 --- a/src/redis/zmalloc.h +++ b/src/redis/zmalloc.h @@ -38,7 +38,9 @@ #define __zm_str(s) #s #if defined(USE_JEMALLOC) -#define ZMALLOC_LIB ("jemalloc-" __xstr(JEMALLOC_VERSION_MAJOR) "." __xstr(JEMALLOC_VERSION_MINOR) "." __xstr(JEMALLOC_VERSION_BUGFIX)) +#define ZMALLOC_LIB \ + ("jemalloc-" __xstr(JEMALLOC_VERSION_MAJOR) "." __xstr(JEMALLOC_VERSION_MINOR) "." __xstr( \ + JEMALLOC_VERSION_BUGFIX)) #include #if (JEMALLOC_VERSION_MAJOR == 2 && JEMALLOC_VERSION_MINOR >= 1) || (JEMALLOC_VERSION_MAJOR > 2) #define HAVE_MALLOC_SIZE 1 @@ -82,35 +84,35 @@ #define HAVE_DEFRAG #endif -void *zmalloc(size_t size); -void *zcalloc(size_t size); -void *zrealloc(void *ptr, size_t size); -void *ztrymalloc(size_t size); -void *ztrycalloc(size_t size); -void *ztryrealloc(void *ptr, size_t size); -void zfree(void *ptr); +void* zmalloc(size_t size); +void* zcalloc(size_t size); +void* zrealloc(void* ptr, size_t size); +void* ztrymalloc(size_t size); +void* ztrycalloc(size_t size); +void* ztryrealloc(void* ptr, size_t size); +void zfree(void* ptr); -size_t znallocx(size_t size); // Equivalent to nallocx for jemalloc or mi_good_size for mimalloc. +size_t znallocx(size_t size); // Equivalent to nallocx for jemalloc or mi_good_size for mimalloc. void zfree_size(void* ptr, size_t size); // equivalent to sdallocx or mi_free_size -void *zmalloc_usable(size_t size, size_t *usable); -void *zcalloc_usable(size_t size, size_t *usable); -void *zrealloc_usable(void *ptr, size_t size, size_t *usable); -void *ztrymalloc_usable(size_t size, size_t *usable); -void *ztrycalloc_usable(size_t size, size_t *usable); -void *ztryrealloc_usable(void *ptr, size_t size, size_t *usable); +void* zmalloc_usable(size_t size, size_t* usable); +void* zcalloc_usable(size_t size, size_t* usable); +void* zrealloc_usable(void* ptr, size_t size, size_t* usable); +void* ztrymalloc_usable(size_t size, size_t* usable); +void* ztrycalloc_usable(size_t size, size_t* usable); +void* ztryrealloc_usable(void* ptr, size_t size, size_t* usable); // size_t zmalloc_used_memory(void); void zmalloc_set_oom_handler(void (*oom_handler)(size_t)); size_t zmalloc_get_rss(void); -int zmalloc_get_allocator_info(size_t *allocated, size_t *active, size_t *resident); +int zmalloc_get_allocator_info(size_t* allocated, size_t* active, size_t* resident); void set_jemalloc_bg_thread(int enable); int jemalloc_purge(); size_t zmalloc_get_private_dirty(long pid); -size_t zmalloc_get_smap_bytes_by_field(char *field, long pid); +size_t zmalloc_get_smap_bytes_by_field(char* field, long pid); size_t zmalloc_get_memory_size(void); size_t zmalloc_usable_size(const void* p); - +int zmalloc_page_is_underutilized(void* ptr, float ratio); // roman: void zlibc_free(void *ptr); void init_zmalloc_threadlocal(void* heap); diff --git a/src/redis/zmalloc_mi.c b/src/redis/zmalloc_mi.c index 42899a216ca3..8f9e5140c39e 100644 --- a/src/redis/zmalloc_mi.c +++ b/src/redis/zmalloc_mi.c @@ -137,3 +137,7 @@ void init_zmalloc_threadlocal(void* heap) { return; zmalloc_heap = heap; } + +int zmalloc_page_is_underutilized(void* ptr, float ratio) { + return mi_heap_page_is_underutilized(zmalloc_heap, ptr, ratio); +} diff --git a/src/server/CMakeLists.txt b/src/server/CMakeLists.txt index bbeedda33b44..f46f086d6eaa 100644 --- a/src/server/CMakeLists.txt +++ b/src/server/CMakeLists.txt @@ -7,7 +7,7 @@ if (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND CMAKE_BUILD_TYPE STREQUAL "Rele set_source_files_properties(dfly_main.cc PROPERTIES COMPILE_FLAGS -march=core2 COMPILE_DEFINITIONS SOURCE_PATH_FROM_BUILD_ENV=${CMAKE_SOURCE_DIR}) endif() -add_library(dfly_transaction db_slice.cc engine_shard_set.cc blocking_controller.cc common.cc +add_library(dfly_transaction db_slice.cc malloc_stats.cc engine_shard_set.cc blocking_controller.cc common.cc io_mgr.cc journal/journal.cc journal/journal_slice.cc table.cc tiered_storage.cc transaction.cc) cxx_link(dfly_transaction uring_fiber_lib dfly_core strings_lib) diff --git a/src/server/dragonfly_test.cc b/src/server/dragonfly_test.cc index b5083ef6b3f5..20936aa61a5b 100644 --- a/src/server/dragonfly_test.cc +++ b/src/server/dragonfly_test.cc @@ -50,6 +50,13 @@ class DflyEngineTest : public BaseFamilyTest { } }; +class DefragDflyEngineTest : public DflyEngineTest { + protected: + DefragDflyEngineTest() : DflyEngineTest() { + num_threads_ = 1; + } +}; + // TODO: to implement equivalent parsing in redis parser. TEST_F(DflyEngineTest, Sds) { int argc; @@ -749,6 +756,27 @@ TEST_F(DflyEngineTest, Bug496) { }); } +TEST_F(DefragDflyEngineTest, TestDefragOption) { + // Fill data into dragonfly and then check if we have + // any location in memory to defrag. See issue #448 for details about this. + EXPECT_GE(max_memory_limit, 100'000); + const int NUMBER_OF_KEYS = 184000; + + RespExpr resp = Run({"DEBUG", "POPULATE", std::to_string(NUMBER_OF_KEYS), "key-name", "130"}); + ASSERT_EQ(resp, "OK"); + resp = Run({"DBSIZE"}); + EXPECT_THAT(resp, IntArg(NUMBER_OF_KEYS)); + + shard_set->pool()->AwaitFiberOnAll([&](unsigned index, ProactorBase* base) { + EngineShard* shard = EngineShard::tlocal(); + ASSERT_FALSE(shard == nullptr); // we only have one and its should not be empty! + EXPECT_EQ(shard->GetDefragStats().success_count, 0); + // should not run at all!! + EXPECT_EQ(shard->GetDefragStats().tries, 0); + EXPECT_GT(GetMallocCurrentCommitted(), NUMBER_OF_KEYS); + }); +} + // TODO: to test transactions with a single shard since then all transactions become local. // To consider having a parameter in dragonfly engine controlling number of shards // unconditionally from number of cpus. TO TEST BLPOP under multi for single/multi argument case. diff --git a/src/server/engine_shard_set.cc b/src/server/engine_shard_set.cc index ba1d1860e3e0..512771c9977f 100644 --- a/src/server/engine_shard_set.cc +++ b/src/server/engine_shard_set.cc @@ -11,10 +11,12 @@ extern "C" { #include "base/flags.h" #include "base/logging.h" +#include "base/proc_util.h" #include "server/blocking_controller.h" #include "server/server_state.h" #include "server/tiered_storage.h" #include "server/transaction.h" +#include "strings/human_readable.h" #include "util/fiber_sched_algo.h" #include "util/varz.h" @@ -30,6 +32,18 @@ ABSL_FLAG(bool, cache_mode, false, "If true, the backend behaves like a cache, " "by evicting entries when getting close to maxmemory limit"); +// memory defragmented related flags +ABSL_FLAG(float, mem_defrag_threshold, + 1, // The default now is to disable the task from running! change this to 0.05!! + "Minimum percentage of used memory relative to total available memory before running " + "defragmentation"); + +ABSL_FLAG(float, commit_use_threshold, 1.3, + "The threshold to apply memory fragmentation between commited and used memory"); + +ABSL_FLAG(float, mem_utilization_threshold, 0.8, + "memory page under utilization threshold. Ratio between used and commited size, above " + "this, memory in this page will defragmented"); namespace dfly { using namespace util; @@ -39,6 +53,8 @@ using absl::GetFlag; namespace { +constexpr DbIndex DEFAULT_DB_INDEX = 0; + vector cached_stats; // initialized in EngineShardSet::Init } // namespace @@ -56,6 +72,66 @@ EngineShard::Stats& EngineShard::Stats::operator+=(const EngineShard::Stats& o) return *this; } +// This function checks 3 things: +// 1. Don't try memory fragmentation if we don't use "enough" memory (control by +// mem_defrag_threshold flag) +// 2. That we had change in memory usage - to prevent endless loop - out of scope for now +// 3. in case the above is OK, make sure that we have a "gap" between usage and commited memory +// (control by commit_use_threshold flag) +bool EngineShard::DefragTaskState::IsRequired() { + const double threshold_mem = max_memory_limit * GetFlag(FLAGS_mem_defrag_threshold); + const double commit_use_threshold = GetFlag(FLAGS_commit_use_threshold); + + if (cursor > 0) { + return true; + } + + uint64_t commited = GetMallocCurrentCommitted(); + uint64_t mem_in_use = used_mem_current.load(memory_order_relaxed); + + // we want to make sure that we are not running this to many times - i.e. + // if there was no change to the memory, don't run this + if (threshold_mem < commited && mem_in_use != 0 && + (uint64_t(mem_in_use * commit_use_threshold) < commited)) { + // we have way more commited then actual usage + return true; + } + + return false; +} + +// for now this does nothing +bool EngineShard::DoDefrag() { + // TODO - Impl!! + return defrag_state_.cursor > 0; +} + +void EngineShard::DefragTaskState::Init() { + cursor = 0u; +} + +// the memory defragmentation task is as follow: +// 1. Check if memory usage is high enough +// 2. Check if diff between commited and used memory is high enough +// 3. Check if we have memory changes (to ensure that we not running endlessly). - TODO +// 4. if all the above pass -> run on the shard and try to defragmented memory by re-allocating +// values +// if the cursor for this is signal that we are not done, schedule the task to run at high +// priority otherwise lower the task priority so that it would not use the CPU when not required +uint32_t EngineShard::DefragTask() { + const auto shard_id = db_slice().shard_id(); + bool required_state = defrag_state_.IsRequired(); + if (required_state) { + VLOG(1) << shard_id << ": need to run defrag memory cursor state: " << defrag_state_.cursor; + if (DoDefrag()) { + // we didn't finish the scan + return util::ProactorBase::kOnIdleMaxLevel; + } + } + // by default we just want to not get in the way.. + return 0u; +} + EngineShard::EngineShard(util::ProactorBase* pb, bool update_db_time, mi_heap_t* heap) : queue_(kQueueLen), txq_([](const Transaction* t) { return t->txid(); }), mi_resource_(heap), db_slice_(pb->GetIndex(), GetFlag(FLAGS_cache_mode), this) { @@ -75,6 +151,9 @@ EngineShard::EngineShard(util::ProactorBase* pb, bool update_db_time, mi_heap_t* tmp_str1 = sdsempty(); db_slice_.UpdateExpireBase(absl::GetCurrentTimeNanos() / 1000000, 0); + // start the defragmented task here + defrag_state_.Init(); + defrag_task_ = pb->AddOnIdleTask([this]() { return this->DefragTask(); }); } EngineShard::~EngineShard() { @@ -92,6 +171,10 @@ void EngineShard::Shutdown() { if (periodic_task_) { ProactorBase::me()->CancelPeriodic(periodic_task_); } + + if (defrag_task_ != 0) { + ProactorBase::me()->RemoveOnIdleTask(defrag_task_); + } } void EngineShard::InitThreadLocal(ProactorBase* pb, bool update_db_time) { diff --git a/src/server/engine_shard_set.h b/src/server/engine_shard_set.h index 6436cdc6e9a9..77a3fb9215c3 100644 --- a/src/server/engine_shard_set.h +++ b/src/server/engine_shard_set.h @@ -41,6 +41,11 @@ class EngineShard { Stats& operator+=(const Stats&); }; + struct DefragStats { + uint64_t success_count = 0; // how many objects were moved + uint64_t tries = 0; // how many times we tried + }; + // EngineShard() is private down below. ~EngineShard(); @@ -141,9 +146,25 @@ class EngineShard { journal_ = j; } + const DefragStats& GetDefragStats() const { + return defrag_state_.stats; + } + void TEST_EnableHeartbeat(); private: + struct DefragTaskState { + // we will add more data members later + uint64_t cursor = 0u; + DefragStats stats; + + void Init(); + + // check the current threshold and return true if + // we need to do the de-fermentation + bool IsRequired(); + }; + EngineShard(util::ProactorBase* pb, bool update_db_time, mi_heap_t* heap); // blocks the calling fiber. @@ -153,6 +174,23 @@ class EngineShard { void CacheStats(); + // We are running a task that checks whether we need to + // do memory de-fragmentation here, this task only run + // when there are available CPU time. + // -------------------------------------------------------------------------- + // NOTE: This task is running with exclusive access to the shard. + // i.e. - Since we are using shared noting access here, and all access + // are done using fibers, This fiber is run only when no other fiber in the + // context of the controlling thread will access this shard! + // -------------------------------------------------------------------------- + uint32_t DefragTask(); + + // scan the shard with the cursor and apply + // de-fragmentation option for entries. This function will return the new cursor at the end of the + // scan This function is called from context of StartDefragTask + // return true if we did not complete the shard scan + bool DoDefrag(); + ::util::fibers_ext::FiberQueue queue_; ::boost::fibers::fiber fiber_q_; @@ -170,6 +208,8 @@ class EngineShard { IntentLock shard_lock_; uint32_t periodic_task_ = 0; + uint32_t defrag_task_ = 0; + DefragTaskState defrag_state_; std::unique_ptr tiered_storage_; std::unique_ptr blocking_controller_; diff --git a/src/server/test_utils.cc b/src/server/test_utils.cc index 02364dc4b97e..cf5953a2ce16 100644 --- a/src/server/test_utils.cc +++ b/src/server/test_utils.cc @@ -267,7 +267,7 @@ auto BaseFamilyTest::GetMC(MP::CmdType cmd_type, std::initializer_listSplitLines(); } -int64_t BaseFamilyTest::CheckedInt(std::initializer_list list) { +int64_t BaseFamilyTest::CheckedInt(std::initializer_list list) { RespExpr resp = Run(list); if (resp.type == RespExpr::INT64) { return get(resp.u); diff --git a/src/server/test_utils.h b/src/server/test_utils.h index d4dc2957284c..72f462d39433 100644 --- a/src/server/test_utils.h +++ b/src/server/test_utils.h @@ -56,7 +56,7 @@ class BaseFamilyTest : public ::testing::Test { MCResponse RunMC(MemcacheParser::CmdType cmd_type, std::string_view key = std::string_view{}); MCResponse GetMC(MemcacheParser::CmdType cmd_type, std::initializer_list list); - int64_t CheckedInt(std::initializer_list list); + int64_t CheckedInt(std::initializer_list list); bool IsLocked(DbIndex db_index, std::string_view key) const; ConnectionContext::DebugInfo GetDebugInfo(const std::string& id) const;