Skip to content

Commit

Permalink
feat(server): high level active defrag dragonflydb#448
Browse files Browse the repository at this point in the history
Signed-off-by: Boaz Sade <boaz@dragonflydb.io>
  • Loading branch information
boazsade committed Nov 28, 2022
1 parent 2493434 commit 3eb6035
Show file tree
Hide file tree
Showing 12 changed files with 280 additions and 31 deletions.
2 changes: 1 addition & 1 deletion helio
9 changes: 5 additions & 4 deletions src/core/compact_object.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ using absl::GetFlag;
namespace {

constexpr XXH64_hash_t kHashSeed = 24061983;
constexpr size_t kAlignSize = 8u;

// Approximation since does not account for listpacks.
size_t QlMAllocSize(quicklist* ql) {
Expand Down Expand Up @@ -217,7 +218,7 @@ struct TL {

thread_local TL tl;

constexpr bool kUseSmallStrings = true;
constexpr bool kUseSmallStrings = false;

/// TODO: Ascii encoding becomes slow for large blobs. We should factor it out into a separate
/// file and implement with SIMD instructions.
Expand Down Expand Up @@ -398,13 +399,13 @@ void RobjWrapper::MakeInnerRoom(size_t current_cap, size_t desired, pmr::memory_
desired += SDS_MAX_PREALLOC;
}

void* newp = mr->allocate(desired, 8);
void* newp = mr->allocate(desired, kAlignSize);
if (sz_) {
memcpy(newp, inner_obj_, sz_);
}

if (current_cap) {
mr->deallocate(inner_obj_, current_cap, 8);
mr->deallocate(inner_obj_, current_cap, kAlignSize);
}
inner_obj_ = newp;
}
Expand Down Expand Up @@ -659,7 +660,7 @@ robj* CompactObj::AsRObj() const {
res->type = u_.r_obj.type();

if (res->type == OBJ_SET) {
LOG(FATAL) << "Should not call AsRObj for type " << res->type;
LOG(FATAL) << "Should not call AsRObj for type " << res->type;
}

if (res->type == OBJ_HASH) {
Expand Down
2 changes: 1 addition & 1 deletion src/core/compact_object.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ typedef struct redisObject robj;
namespace dfly {

constexpr unsigned kEncodingIntSet = 0;
constexpr unsigned kEncodingStrMap = 1; // for set/map encodings of strings
constexpr unsigned kEncodingStrMap = 1; // for set/map encodings of strings
constexpr unsigned kEncodingStrMap2 = 2; // for set/map encodings of strings using DenseSet
constexpr unsigned kEncodingListPack = 3;

Expand Down
99 changes: 95 additions & 4 deletions src/core/compact_object_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
//
#include "core/compact_object.h"

#include <absl/strings/str_cat.h>
#include <mimalloc.h>
#include <xxhash.h>
#include <absl/strings/str_cat.h>

#include "base/gtest.h"
#include "base/logging.h"
Expand Down Expand Up @@ -34,6 +34,36 @@ void PrintTo(const CompactObj& cobj, std::ostream* os) {
*os << "cobj: [" << cobj.ObjType() << "]";
}

// This is for the mimalloc test - being able to find an address in memory
// where we have memory underutilzation
// see issue number 448 (https://github.com/dragonflydb/dragonfly/issues/448)
std::vector<void*> AllocateForTest(int size, std::size_t allocate_size, int factor1 = 1,
int factor2 = 1) {
std::vector<void*> ptrs;
for (int index = 0; index < size; index++) {
auto alloc_size = index % 13 == 0 ? allocate_size * factor1 : allocate_size * factor2;
auto heap_alloc = mi_heap_get_backing();
void* ptr = mi_heap_malloc(heap_alloc, alloc_size);
ptrs.push_back(ptr);
}
return ptrs;
}

bool FindUnderutilizedMemory(const std::vector<void*>& ptrs, float ratio) {
auto it = std::find_if(ptrs.begin(), ptrs.end(), [&](auto p) {
int r = p && zmalloc_page_is_underutilized(p, ratio);
return r > 0;
});
return it != ptrs.end();
}

void DecimateMemory(std::vector<void*>* ptrs, size_t steps) {
for (size_t i = 24; i < ptrs->size(); i += steps) {
mi_free(ptrs->at(i));
ptrs->at(i) = nullptr;
}
}

class CompactObjectTest : public ::testing::Test {
protected:
static void SetUpTestSuite() {
Expand Down Expand Up @@ -110,7 +140,6 @@ TEST_F(CompactObjectTest, InlineAsciiEncoded) {
EXPECT_EQ(s.size(), obj.Size());
}


TEST_F(CompactObjectTest, Int) {
cobj_.SetString("0");
EXPECT_EQ(0, cobj_.TryGetInt());
Expand Down Expand Up @@ -211,13 +240,13 @@ TEST_F(CompactObjectTest, FlatSet) {
size_t allocated2, resident2, active2;

zmalloc_get_allocator_info(&allocated1, &active1, &resident1);
dict *d = dictCreate(&setDictType);
dict* d = dictCreate(&setDictType);
constexpr size_t kTestSize = 2000;

for (size_t i = 0; i < kTestSize; ++i) {
sds key = sdsnew("key:000000000000");
key = sdscatfmt(key, "%U", i);
dictEntry *de = dictAddRaw(d, key,NULL);
dictEntry* de = dictAddRaw(d, key, NULL);
de->v.val = NULL;
}

Expand Down Expand Up @@ -259,4 +288,66 @@ TEST_F(CompactObjectTest, StreamObj) {
EXPECT_FALSE(cobj_.IsInline());
}

TEST_F(CompactObjectTest, MimallocUnderutilzation) {
// We are testing with the same object size allocation here
// This test is for https://github.com/dragonflydb/dragonfly/issues/448
size_t allocation_size = 94;
int count = 2000;
std::vector<void*> ptrs = AllocateForTest(count, allocation_size);
bool found = FindUnderutilizedMemory(ptrs, 1.0f);
ASSERT_FALSE(found);
DecimateMemory(&ptrs, 26);
found = FindUnderutilizedMemory(ptrs, 1.0f);
ASSERT_TRUE(found);
for (auto* ptr : ptrs) {
mi_free(ptr);
}
}

TEST_F(CompactObjectTest, MimallocUnderutilzationDifferentSizes) {
// This test uses different objects sizes to cover more use cases
// related to issue https://github.com/dragonflydb/dragonfly/issues/448
size_t allocation_size = 97;
int count = 2000;
int mem_factor_1 = 3;
int mem_factor_2 = 2;
std::vector<void*> ptrs = AllocateForTest(count, allocation_size, mem_factor_1, mem_factor_2);
bool found = FindUnderutilizedMemory(ptrs, 1.0f);
ASSERT_FALSE(found);
DecimateMemory(&ptrs, 26);
found = FindUnderutilizedMemory(ptrs, 1.0f);
ASSERT_TRUE(found);
for (auto* ptr : ptrs) {
mi_free(ptr);
}
}

TEST_F(CompactObjectTest, MimallocUnderutilzationWithRealloc) {
// This test is checking underutilzation with reallocation as well as deallocation
// of the memory - see issue https://github.com/dragonflydb/dragonfly/issues/448
size_t allocation_size = 102;
int count = 2000;
int mem_factor_1 = 4;
int mem_factor_2 = 1;
std::vector<void*> ptrs = AllocateForTest(count, allocation_size, mem_factor_1, mem_factor_2);
bool found = FindUnderutilizedMemory(ptrs, 1.0f);
ASSERT_FALSE(found);
DecimateMemory(&ptrs, 26);
// TestMiMallocUnderutilized(ptrs, run_reallocation, allocation_size);
// This is another case, where we are filling the "gaps" by doing re-allocations
// in this case, since we are not setting all the values back it should still have
// places that are not used. Plus since we are not looking at the first page
// other pages should be underutilized.
for (size_t i = 24; i < ptrs.size(); i += 26) {
if (!ptrs[i]) {
ptrs[i] = mi_heap_malloc(mi_heap_get_backing(), allocation_size);
}
}
found = FindUnderutilizedMemory(ptrs, 1.0f);
ASSERT_TRUE(found);
for (auto* ptr : ptrs) {
mi_free(ptr);
}
}

} // namespace dfly
38 changes: 20 additions & 18 deletions src/redis/zmalloc.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@
#define __zm_str(s) #s

#if defined(USE_JEMALLOC)
#define ZMALLOC_LIB ("jemalloc-" __xstr(JEMALLOC_VERSION_MAJOR) "." __xstr(JEMALLOC_VERSION_MINOR) "." __xstr(JEMALLOC_VERSION_BUGFIX))
#define ZMALLOC_LIB \
("jemalloc-" __xstr(JEMALLOC_VERSION_MAJOR) "." __xstr(JEMALLOC_VERSION_MINOR) "." __xstr( \
JEMALLOC_VERSION_BUGFIX))
#include <jemalloc/jemalloc.h>
#if (JEMALLOC_VERSION_MAJOR == 2 && JEMALLOC_VERSION_MINOR >= 1) || (JEMALLOC_VERSION_MAJOR > 2)
#define HAVE_MALLOC_SIZE 1
Expand Down Expand Up @@ -82,35 +84,35 @@
#define HAVE_DEFRAG
#endif

void *zmalloc(size_t size);
void *zcalloc(size_t size);
void *zrealloc(void *ptr, size_t size);
void *ztrymalloc(size_t size);
void *ztrycalloc(size_t size);
void *ztryrealloc(void *ptr, size_t size);
void zfree(void *ptr);
void* zmalloc(size_t size);
void* zcalloc(size_t size);
void* zrealloc(void* ptr, size_t size);
void* ztrymalloc(size_t size);
void* ztrycalloc(size_t size);
void* ztryrealloc(void* ptr, size_t size);
void zfree(void* ptr);

size_t znallocx(size_t size); // Equivalent to nallocx for jemalloc or mi_good_size for mimalloc.
size_t znallocx(size_t size); // Equivalent to nallocx for jemalloc or mi_good_size for mimalloc.
void zfree_size(void* ptr, size_t size); // equivalent to sdallocx or mi_free_size

void *zmalloc_usable(size_t size, size_t *usable);
void *zcalloc_usable(size_t size, size_t *usable);
void *zrealloc_usable(void *ptr, size_t size, size_t *usable);
void *ztrymalloc_usable(size_t size, size_t *usable);
void *ztrycalloc_usable(size_t size, size_t *usable);
void *ztryrealloc_usable(void *ptr, size_t size, size_t *usable);
void* zmalloc_usable(size_t size, size_t* usable);
void* zcalloc_usable(size_t size, size_t* usable);
void* zrealloc_usable(void* ptr, size_t size, size_t* usable);
void* ztrymalloc_usable(size_t size, size_t* usable);
void* ztrycalloc_usable(size_t size, size_t* usable);
void* ztryrealloc_usable(void* ptr, size_t size, size_t* usable);

// size_t zmalloc_used_memory(void);
void zmalloc_set_oom_handler(void (*oom_handler)(size_t));
size_t zmalloc_get_rss(void);
int zmalloc_get_allocator_info(size_t *allocated, size_t *active, size_t *resident);
int zmalloc_get_allocator_info(size_t* allocated, size_t* active, size_t* resident);
void set_jemalloc_bg_thread(int enable);
int jemalloc_purge();
size_t zmalloc_get_private_dirty(long pid);
size_t zmalloc_get_smap_bytes_by_field(char *field, long pid);
size_t zmalloc_get_smap_bytes_by_field(char* field, long pid);
size_t zmalloc_get_memory_size(void);
size_t zmalloc_usable_size(const void* p);

int zmalloc_page_is_underutilized(void* ptr, float ratio);
// roman: void zlibc_free(void *ptr);

void init_zmalloc_threadlocal(void* heap);
Expand Down
4 changes: 4 additions & 0 deletions src/redis/zmalloc_mi.c
Original file line number Diff line number Diff line change
Expand Up @@ -137,3 +137,7 @@ void init_zmalloc_threadlocal(void* heap) {
return;
zmalloc_heap = heap;
}

int zmalloc_page_is_underutilized(void* ptr, float ratio) {
return mi_heap_page_is_underutilized(zmalloc_heap, ptr, ratio);
}
2 changes: 1 addition & 1 deletion src/server/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ if (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND CMAKE_BUILD_TYPE STREQUAL "Rele
set_source_files_properties(dfly_main.cc PROPERTIES COMPILE_FLAGS -march=core2 COMPILE_DEFINITIONS SOURCE_PATH_FROM_BUILD_ENV=${CMAKE_SOURCE_DIR})
endif()

add_library(dfly_transaction db_slice.cc engine_shard_set.cc blocking_controller.cc common.cc
add_library(dfly_transaction db_slice.cc malloc_stats.cc engine_shard_set.cc blocking_controller.cc common.cc
io_mgr.cc journal/journal.cc journal/journal_slice.cc table.cc
tiered_storage.cc transaction.cc)
cxx_link(dfly_transaction uring_fiber_lib dfly_core strings_lib)
Expand Down
28 changes: 28 additions & 0 deletions src/server/dragonfly_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,13 @@ class DflyEngineTest : public BaseFamilyTest {
}
};

class DefragDflyEngineTest : public DflyEngineTest {
protected:
DefragDflyEngineTest() : DflyEngineTest() {
num_threads_ = 1;
}
};

// TODO: to implement equivalent parsing in redis parser.
TEST_F(DflyEngineTest, Sds) {
int argc;
Expand Down Expand Up @@ -749,6 +756,27 @@ TEST_F(DflyEngineTest, Bug496) {
});
}

TEST_F(DefragDflyEngineTest, TestDefragOption) {
// Fill data into dragonfly and then check if we have
// any location in memory to defrag. See issue #448 for details about this.
EXPECT_GE(max_memory_limit, 100'000);
const int NUMBER_OF_KEYS = 184000;

RespExpr resp = Run({"DEBUG", "POPULATE", std::to_string(NUMBER_OF_KEYS), "key-name", "130"});
ASSERT_EQ(resp, "OK");
resp = Run({"DBSIZE"});
EXPECT_THAT(resp, IntArg(NUMBER_OF_KEYS));

shard_set->pool()->AwaitFiberOnAll([&](unsigned index, ProactorBase* base) {
EngineShard* shard = EngineShard::tlocal();
ASSERT_FALSE(shard == nullptr); // we only have one and its should not be empty!
EXPECT_EQ(shard->GetDefragStats().success_count, 0);
// should not run at all!!
EXPECT_EQ(shard->GetDefragStats().tries, 0);
EXPECT_GT(GetMallocCurrentCommitted(), NUMBER_OF_KEYS);
});
}

// TODO: to test transactions with a single shard since then all transactions become local.
// To consider having a parameter in dragonfly engine controlling number of shards
// unconditionally from number of cpus. TO TEST BLPOP under multi for single/multi argument case.
Expand Down
Loading

0 comments on commit 3eb6035

Please sign in to comment.