Skip to content

Commit

Permalink
feat(server): active memory defrag helio and UT dragonflydb#448
Browse files Browse the repository at this point in the history
Signed-off-by: Boaz Sade <boaz@dragonflydb.io>
  • Loading branch information
boazsade committed Nov 24, 2022
1 parent 39a231d commit 6ec1ca1
Show file tree
Hide file tree
Showing 6 changed files with 235 additions and 23 deletions.
2 changes: 1 addition & 1 deletion helio
109 changes: 105 additions & 4 deletions src/core/compact_object_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
//
#include "core/compact_object.h"

#include <absl/strings/str_cat.h>
#include <mimalloc.h>
#include <xxhash.h>
#include <absl/strings/str_cat.h>

#include "base/gtest.h"
#include "base/logging.h"
Expand Down Expand Up @@ -34,6 +34,76 @@ void PrintTo(const CompactObj& cobj, std::ostream* os) {
*os << "cobj: [" << cobj.ObjType() << "]";
}

// This is for the mimalloc test - being able to find an address in memory
// where we have memory underutilzation
// see issue number 448 (https://github.com/dragonflydb/dragonfly/issues/448)
std::vector<void*> AllocateForTest(int size, std::size_t allocate_size, int factor1 = 1,
int factor2 = 1) {
std::vector<void*> ptrs;
for (int index = 0; index < size; index++) {
auto alloc_size = index % 13 == 0 ? allocate_size * factor1 : allocate_size * factor2;
auto heap_alloc = mi_heap_get_backing();
void* ptr = mi_heap_malloc(heap_alloc, alloc_size);
ptrs.push_back(ptr);
}
return ptrs;
}

void TestMiMallocUnderutilized(std::vector<void*> ptrs, bool realloc = false,
std::size_t allocate_size = 0) {
size_t failed_count = 0;
auto searchUnderutilized = [&]() {
auto it = std::find_if(ptrs.begin(), ptrs.end(), [&](auto p) {
int r = p && zmalloc_page_is_underutilized(p, 1);
if (r) {
return true;
} else {
failed_count++;
return false;
}
});
return it != ptrs.end();
};
// At this step all memory is either full or that we don't hae previous
bool found = searchUnderutilized();
EXPECT_FALSE(found);
EXPECT_EQ(failed_count, ptrs.size());

// so we are decimating the memory so that we would have pages with
// use == capacity
for (size_t i = 24; i < ptrs.size(); i += 13) {
mi_free(ptrs[i]);
ptrs[i] = nullptr;
}

// now we should find a location in memory that there is no full use for it
// so we expecting to succeed with the call to mi_heap_page_is_underutilized
failed_count = 0;
found = searchUnderutilized();
EXPECT_TRUE(found);
EXPECT_NE(failed_count, ptrs.size());

if (realloc) {
// This is another case, where we are filling the "gaps" by doing re-allocations
// in this case, since we are not setting all the values back it should still have
// places that are not used. Plus since we are not looking at the first page
// other pages should be underutilized.
for (size_t i = 24; i < ptrs.size(); i += 26) {
if (!ptrs[i]) {
ptrs[i] = mi_heap_malloc(mi_heap_get_backing(), allocate_size);
}
}
found = searchUnderutilized();
EXPECT_TRUE(found);
EXPECT_NE(failed_count, ptrs.size());
}

// cleanup
for (auto* ptr : ptrs) {
mi_free(ptr);
}
}

class CompactObjectTest : public ::testing::Test {
protected:
static void SetUpTestSuite() {
Expand Down Expand Up @@ -110,7 +180,6 @@ TEST_F(CompactObjectTest, InlineAsciiEncoded) {
EXPECT_EQ(s.size(), obj.Size());
}


TEST_F(CompactObjectTest, Int) {
cobj_.SetString("0");
EXPECT_EQ(0, cobj_.TryGetInt());
Expand Down Expand Up @@ -211,13 +280,13 @@ TEST_F(CompactObjectTest, FlatSet) {
size_t allocated2, resident2, active2;

zmalloc_get_allocator_info(&allocated1, &active1, &resident1);
dict *d = dictCreate(&setDictType);
dict* d = dictCreate(&setDictType);
constexpr size_t kTestSize = 2000;

for (size_t i = 0; i < kTestSize; ++i) {
sds key = sdsnew("key:000000000000");
key = sdscatfmt(key, "%U", i);
dictEntry *de = dictAddRaw(d, key,NULL);
dictEntry* de = dictAddRaw(d, key, NULL);
de->v.val = NULL;
}

Expand Down Expand Up @@ -259,4 +328,36 @@ TEST_F(CompactObjectTest, StreamObj) {
EXPECT_FALSE(cobj_.IsInline());
}

TEST_F(CompactObjectTest, MimallocUnderutilzation) {
// We are testing with the same object size allocation here
// This test is for https://github.com/dragonflydb/dragonfly/issues/448
size_t allocation_size = 94;
int count = 2000;
std::vector<void*> ptrs = AllocateForTest(count, allocation_size);
TestMiMallocUnderutilized(ptrs);
}

TEST_F(CompactObjectTest, MimallocUnderutilzationDifferentSizes) {
// This test uses different objects sizes to cover more use cases
// related to issue https://github.com/dragonflydb/dragonfly/issues/448
size_t allocation_size = 97;
int count = 2000;
int mem_factor_1 = 3;
int mem_factor_2 = 2;
std::vector<void*> ptrs = AllocateForTest(count, allocation_size, mem_factor_1, mem_factor_2);
TestMiMallocUnderutilized(ptrs);
}

TEST_F(CompactObjectTest, MimallocUnderutilzationWithRealloc) {
// This test is checking underutilzation with reallocation as well as deallocation
// of the memory - see issue https://github.com/dragonflydb/dragonfly/issues/448
size_t allocation_size = 102;
int count = 2000;
int mem_factor_1 = 4;
int mem_factor_2 = 1;
bool run_reallocation = true;
std::vector<void*> ptrs = AllocateForTest(count, allocation_size, mem_factor_1, mem_factor_2);
TestMiMallocUnderutilized(ptrs, run_reallocation, allocation_size);
}

} // namespace dfly
38 changes: 20 additions & 18 deletions src/redis/zmalloc.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@
#define __zm_str(s) #s

#if defined(USE_JEMALLOC)
#define ZMALLOC_LIB ("jemalloc-" __xstr(JEMALLOC_VERSION_MAJOR) "." __xstr(JEMALLOC_VERSION_MINOR) "." __xstr(JEMALLOC_VERSION_BUGFIX))
#define ZMALLOC_LIB \
("jemalloc-" __xstr(JEMALLOC_VERSION_MAJOR) "." __xstr(JEMALLOC_VERSION_MINOR) "." __xstr( \
JEMALLOC_VERSION_BUGFIX))
#include <jemalloc/jemalloc.h>
#if (JEMALLOC_VERSION_MAJOR == 2 && JEMALLOC_VERSION_MINOR >= 1) || (JEMALLOC_VERSION_MAJOR > 2)
#define HAVE_MALLOC_SIZE 1
Expand Down Expand Up @@ -82,35 +84,35 @@
#define HAVE_DEFRAG
#endif

void *zmalloc(size_t size);
void *zcalloc(size_t size);
void *zrealloc(void *ptr, size_t size);
void *ztrymalloc(size_t size);
void *ztrycalloc(size_t size);
void *ztryrealloc(void *ptr, size_t size);
void zfree(void *ptr);
void* zmalloc(size_t size);
void* zcalloc(size_t size);
void* zrealloc(void* ptr, size_t size);
void* ztrymalloc(size_t size);
void* ztrycalloc(size_t size);
void* ztryrealloc(void* ptr, size_t size);
void zfree(void* ptr);

size_t znallocx(size_t size); // Equivalent to nallocx for jemalloc or mi_good_size for mimalloc.
size_t znallocx(size_t size); // Equivalent to nallocx for jemalloc or mi_good_size for mimalloc.
void zfree_size(void* ptr, size_t size); // equivalent to sdallocx or mi_free_size

void *zmalloc_usable(size_t size, size_t *usable);
void *zcalloc_usable(size_t size, size_t *usable);
void *zrealloc_usable(void *ptr, size_t size, size_t *usable);
void *ztrymalloc_usable(size_t size, size_t *usable);
void *ztrycalloc_usable(size_t size, size_t *usable);
void *ztryrealloc_usable(void *ptr, size_t size, size_t *usable);
void* zmalloc_usable(size_t size, size_t* usable);
void* zcalloc_usable(size_t size, size_t* usable);
void* zrealloc_usable(void* ptr, size_t size, size_t* usable);
void* ztrymalloc_usable(size_t size, size_t* usable);
void* ztrycalloc_usable(size_t size, size_t* usable);
void* ztryrealloc_usable(void* ptr, size_t size, size_t* usable);

// size_t zmalloc_used_memory(void);
void zmalloc_set_oom_handler(void (*oom_handler)(size_t));
size_t zmalloc_get_rss(void);
int zmalloc_get_allocator_info(size_t *allocated, size_t *active, size_t *resident);
int zmalloc_get_allocator_info(size_t* allocated, size_t* active, size_t* resident);
void set_jemalloc_bg_thread(int enable);
int jemalloc_purge();
size_t zmalloc_get_private_dirty(long pid);
size_t zmalloc_get_smap_bytes_by_field(char *field, long pid);
size_t zmalloc_get_smap_bytes_by_field(char* field, long pid);
size_t zmalloc_get_memory_size(void);
size_t zmalloc_usable_size(const void* p);

int zmalloc_page_is_underutilized(void* ptr, float ratio);
// roman: void zlibc_free(void *ptr);

void init_zmalloc_threadlocal(void* heap);
Expand Down
4 changes: 4 additions & 0 deletions src/redis/zmalloc_mi.c
Original file line number Diff line number Diff line change
Expand Up @@ -137,3 +137,7 @@ void init_zmalloc_threadlocal(void* heap) {
return;
zmalloc_heap = heap;
}

int zmalloc_page_is_underutilized(void* ptr, float ratio) {
return mi_heap_page_is_underutilized(zmalloc_heap, ptr, ratio);
}
81 changes: 81 additions & 0 deletions src/server/engine_shard_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@ extern "C" {

#include "base/flags.h"
#include "base/logging.h"
#include "base/proc_util.h"
#include "server/blocking_controller.h"
#include "server/server_state.h"
#include "server/tiered_storage.h"
#include "server/transaction.h"
#include "strings/human_readable.h"
#include "util/fiber_sched_algo.h"
#include "util/varz.h"

Expand All @@ -30,6 +32,17 @@ ABSL_FLAG(bool, cache_mode, false,
"If true, the backend behaves like a cache, "
"by evicting entries when getting close to maxmemory limit");

// memory defragmented related flags
ABSL_FLAG(float, mem_defrag_threshold, 20.0,
"Threshold level to run memory fragmentation between total available memory and "
"currently commited memory");

ABSL_FLAG(float, commit_use_threshold, 1.3,
"The threshold to apply memory fragmentation between commited and used memory");

ABSL_FLAG(float, mem_utilization_threshold, 0.5,
"Threshold for memory page under utilization. Bellow this threshold a pointer in this "
"page will be moved in memory");
namespace dfly {

using namespace util;
Expand All @@ -39,6 +52,8 @@ using absl::GetFlag;

namespace {

constexpr DbIndex DEFAULT_DB_INDEX = 0;

vector<EngineShardSet::CachedStats> cached_stats; // initialized in EngineShardSet::Init

} // namespace
Expand All @@ -56,6 +71,67 @@ EngineShard::Stats& EngineShard::Stats::operator+=(const EngineShard::Stats& o)
return *this;
}

// This function checks 3 things:
// 1. Don't try memory fragmentation if we don't use "enough" memory (control by
// mem_defrag_threshold flag)
// 2. That we had change in memory usage - to prevent endless loop - out of scope for now
// 3. in case the above is OK, make sure that we have a "gap" between usage and commited memory
// (control by commit_use_threshold flag)
bool EngineShard::DefragTaskArgs::IsRequired() {
static const uint64_t mem_size = max_memory_limit;
static const float threshold_mem = mem_size / GetFlag(FLAGS_mem_defrag_threshold);
static const float commit_use_threshold = GetFlag(FLAGS_commit_use_threshold);

if (cursor > 0) {
return true;
}

uint64_t commited = GetMallocCurrentCommitted();
uint64_t mem_in_use = used_mem_current.load(memory_order_relaxed);

// we want to make sure that we are not running this to many times - i.e.
// if there was no change to the memory, don't run this
if (threshold_mem < commited && mem_in_use != 0 &&
(float(mem_in_use * commit_use_threshold) < commited)) {
// we have way more commited then actual usage
return true;
}

return false;
}

// for now this does nothing
bool EngineShard::DoDefrag() {
// TODO: impl!
// Placeholder for now!
return defrag_args_.cursor > 0;
}

void EngineShard::DefragTaskArgs::Init() {
cursor = 0u;
}

// the memory defragmentation task is as follow:
// 1. Check if memory usage is high enough
// 2. Check if diff between commited and used memory is high enough
// 3. Check if we have memory changes (to ensure that we not running endlessly). - TODO
// 4. if all the above pass -> run on the shard and try to defragmented memory by re-allocating
// values
// if the cursor for this is signal that we are not done, schedule the task to run at high
// priority otherwise lower the task priority so that it would not use the CPU when not required
uint32_t EngineShard::DefragTask() {
const auto shard_id = db_slice().shard_id();
bool required_state = defrag_args_.IsRequired();
if (required_state) {
VLOG(1) << shard_id << ": need to run defrag memory cursor state: " << defrag_args_.cursor;
if (DoDefrag()) { // we didn't finish the scan
return util::ProactorBase::kOnIdleMaxLevel;
}
}
// by default we just want to not get in the way..
return 0u;
}

EngineShard::EngineShard(util::ProactorBase* pb, bool update_db_time, mi_heap_t* heap)
: queue_(kQueueLen), txq_([](const Transaction* t) { return t->txid(); }), mi_resource_(heap),
db_slice_(pb->GetIndex(), GetFlag(FLAGS_cache_mode), this) {
Expand All @@ -75,6 +151,9 @@ EngineShard::EngineShard(util::ProactorBase* pb, bool update_db_time, mi_heap_t*
tmp_str1 = sdsempty();

db_slice_.UpdateExpireBase(absl::GetCurrentTimeNanos() / 1000000, 0);
// start the defragmented task here
defrag_args_.Init();
defrag_task_ = pb->AddOnIdleTask([this]() { return this->DefragTask(); });
}

EngineShard::~EngineShard() {
Expand All @@ -92,6 +171,8 @@ void EngineShard::Shutdown() {
if (periodic_task_) {
ProactorBase::me()->CancelPeriodic(periodic_task_);
}

ProactorBase::me()->RemoveOnIdleTask(defrag_task_);
}

void EngineShard::InitThreadLocal(ProactorBase* pb, bool update_db_time) {
Expand Down
24 changes: 24 additions & 0 deletions src/server/engine_shard_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,17 @@ class EngineShard {
void TEST_EnableHeartbeat();

private:
struct DefragTaskArgs {
// we will add more data members later
uint64_t cursor = 0u;

void Init();

// check the current threshold and return true if
// we need to do the de-fermentation
bool IsRequired();
};

EngineShard(util::ProactorBase* pb, bool update_db_time, mi_heap_t* heap);

// blocks the calling fiber.
Expand All @@ -153,6 +164,17 @@ class EngineShard {

void CacheStats();

// We are running a task that checks whether we need to
// do memory de-fragmentation here, this task only run
// when there are available CPU time.
uint32_t DefragTask();

// scan the shard with the cursor and apply
// de-fragmentation option for entries. This function will return the new cursor at the end of the
// scan This function is called from context of StartDefragTask
// return true if we did not complete the shard scan
bool DoDefrag();

::util::fibers_ext::FiberQueue queue_;
::boost::fibers::fiber fiber_q_;

Expand All @@ -170,6 +192,8 @@ class EngineShard {
IntentLock shard_lock_;

uint32_t periodic_task_ = 0;
uint32_t defrag_task_ = 0;
DefragTaskArgs defrag_args_;
std::unique_ptr<TieredStorage> tiered_storage_;
std::unique_ptr<BlockingController> blocking_controller_;

Expand Down

0 comments on commit 6ec1ca1

Please sign in to comment.