Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rdb save): add blob compression on snapshot #505

Merged
merged 1 commit into from
Nov 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/build-from-source.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ git clone --recursive https://github.com/dragonflydb/dragonfly && cd dragonfly
```bash
# Install dependencies
sudo apt install ninja-build libunwind-dev libboost-fiber-dev libssl-dev \
autoconf-archive libtool cmake g++
autoconf-archive libtool cmake g++ libzstd-dev
```

## Step 3
Expand Down
57 changes: 27 additions & 30 deletions src/redis/redis_aux.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,9 @@ void InitRedisTables() {
server.hash_max_listpack_entries = 512;
server.hash_max_listpack_value = 32; // decreased from redis default 64.

server.rdb_compression = 1;

server.stream_node_max_bytes = 4096;
server.stream_node_max_entries = 100;
}
}

// These functions are moved here from server.c
int htNeedsResize(dict* dict) {
Expand Down Expand Up @@ -73,7 +71,7 @@ int dictPtrKeyCompare(dict* privdata, const void* key1, const void* key2) {
return key1 == key2;
}

int dictSdsKeyCompare(dict *d, const void* key1, const void* key2) {
int dictSdsKeyCompare(dict* d, const void* key1, const void* key2) {
int l1, l2;
DICT_NOTUSED(d);

Expand All @@ -84,7 +82,7 @@ int dictSdsKeyCompare(dict *d, const void* key1, const void* key2) {
return memcmp(key1, key2, l1) == 0;
}

void dictSdsDestructor(dict *d, void* val) {
void dictSdsDestructor(dict* d, void* val) {
DICT_NOTUSED(d);

sdsfree(val);
Expand All @@ -100,29 +98,28 @@ size_t sdsZmallocSize(sds s) {

/* Toggle the 64 bit unsigned integer pointed by *p from little endian to
* big endian */
void memrev64(void *p) {
unsigned char *x = p, t;

t = x[0];
x[0] = x[7];
x[7] = t;
t = x[1];
x[1] = x[6];
x[6] = t;
t = x[2];
x[2] = x[5];
x[5] = t;
t = x[3];
x[3] = x[4];
x[4] = t;
void memrev64(void* p) {
unsigned char *x = p, t;

t = x[0];
x[0] = x[7];
x[7] = t;
t = x[1];
x[1] = x[6];
x[6] = t;
t = x[2];
x[2] = x[5];
x[5] = t;
t = x[3];
x[3] = x[4];
x[4] = t;
}

uint64_t intrev64(uint64_t v) {
memrev64(&v);
return v;
memrev64(&v);
return v;
}


/* Set dictionary type. Keys are SDS strings, values are not used. */
dictType setDictType = {
dictSdsHash, /* hash function */
Expand All @@ -147,11 +144,11 @@ dictType zsetDictType = {

/* Hash type hash table (note that small hashes are represented with listpacks) */
dictType hashDictType = {
dictSdsHash, /* hash function */
NULL, /* key dup */
NULL, /* val dup */
dictSdsKeyCompare, /* key compare */
dictSdsDestructor, /* key destructor */
dictSdsDestructor, /* val destructor */
NULL /* allow to expand */
dictSdsHash, /* hash function */
NULL, /* key dup */
NULL, /* val dup */
dictSdsKeyCompare, /* key compare */
dictSdsDestructor, /* key destructor */
dictSdsDestructor, /* val destructor */
NULL /* allow to expand */
};
75 changes: 35 additions & 40 deletions src/redis/redis_aux.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,35 +4,34 @@
#include "dict.h"
#include "sds.h"

#define HASHTABLE_MIN_FILL 10 /* Minimal hash table fill 10% */
#define HASHTABLE_MAX_LOAD_FACTOR 1.618 /* Maximum hash table load factor. */
#define HASHTABLE_MIN_FILL 10 /* Minimal hash table fill 10% */
#define HASHTABLE_MAX_LOAD_FACTOR 1.618 /* Maximum hash table load factor. */

/* Redis maxmemory strategies. Instead of using just incremental number
* for this defines, we use a set of flags so that testing for certain
* properties common to multiple policies is faster. */
#define MAXMEMORY_FLAG_LRU (1<<0)
#define MAXMEMORY_FLAG_LFU (1<<1)
#define MAXMEMORY_FLAG_ALLKEYS (1<<2)
#define MAXMEMORY_FLAG_NO_SHARED_INTEGERS (MAXMEMORY_FLAG_LRU|MAXMEMORY_FLAG_LFU)
#define MAXMEMORY_FLAG_LRU (1 << 0)
#define MAXMEMORY_FLAG_LFU (1 << 1)
#define MAXMEMORY_FLAG_ALLKEYS (1 << 2)
#define MAXMEMORY_FLAG_NO_SHARED_INTEGERS (MAXMEMORY_FLAG_LRU | MAXMEMORY_FLAG_LFU)

#define LFU_INIT_VAL 5

#define MAXMEMORY_VOLATILE_LRU ((0<<8)|MAXMEMORY_FLAG_LRU)
#define MAXMEMORY_VOLATILE_LFU ((1<<8)|MAXMEMORY_FLAG_LFU)
#define MAXMEMORY_VOLATILE_TTL (2<<8)
#define MAXMEMORY_VOLATILE_RANDOM (3<<8)
#define MAXMEMORY_ALLKEYS_LRU ((4<<8)|MAXMEMORY_FLAG_LRU|MAXMEMORY_FLAG_ALLKEYS)
#define MAXMEMORY_ALLKEYS_LFU ((5<<8)|MAXMEMORY_FLAG_LFU|MAXMEMORY_FLAG_ALLKEYS)
#define MAXMEMORY_ALLKEYS_RANDOM ((6<<8)|MAXMEMORY_FLAG_ALLKEYS)
#define MAXMEMORY_NO_EVICTION (7<<8)

#define MAXMEMORY_VOLATILE_LRU ((0 << 8) | MAXMEMORY_FLAG_LRU)
#define MAXMEMORY_VOLATILE_LFU ((1 << 8) | MAXMEMORY_FLAG_LFU)
#define MAXMEMORY_VOLATILE_TTL (2 << 8)
#define MAXMEMORY_VOLATILE_RANDOM (3 << 8)
#define MAXMEMORY_ALLKEYS_LRU ((4 << 8) | MAXMEMORY_FLAG_LRU | MAXMEMORY_FLAG_ALLKEYS)
#define MAXMEMORY_ALLKEYS_LFU ((5 << 8) | MAXMEMORY_FLAG_LFU | MAXMEMORY_FLAG_ALLKEYS)
#define MAXMEMORY_ALLKEYS_RANDOM ((6 << 8) | MAXMEMORY_FLAG_ALLKEYS)
#define MAXMEMORY_NO_EVICTION (7 << 8)

#define CONFIG_RUN_ID_SIZE 40U

#define EVPOOL_CACHED_SDS_SIZE 255
#define EVPOOL_SIZE 16

int htNeedsResize(dict *dict); // moved from server.cc
int htNeedsResize(dict* dict); // moved from server.cc

/* Hash table types */
extern dictType zsetDictType;
Expand All @@ -52,39 +51,35 @@ extern dictType hashDictType;
* Empty entries have the key pointer set to NULL. */

struct evictionPoolEntry {
unsigned long long idle; /* Object idle time (inverse frequency for LFU) */
sds key; /* Key name. */
sds cached; /* Cached SDS object for key name. */
int dbid; /* Key DB number. */
unsigned long long idle; /* Object idle time (inverse frequency for LFU) */
sds key; /* Key name. */
sds cached; /* Cached SDS object for key name. */
int dbid; /* Key DB number. */
};

uint64_t dictSdsHash(const void *key);
int dictSdsKeyCompare(dict *privdata, const void *key1, const void *key2);
void dictSdsDestructor(dict *privdata, void *val);
size_t sdsZmallocSize(sds s) ;
uint64_t dictSdsHash(const void* key);
int dictSdsKeyCompare(dict* privdata, const void* key1, const void* key2);
void dictSdsDestructor(dict* privdata, void* val);
size_t sdsZmallocSize(sds s);

typedef struct ServerStub {
int rdb_compression;

int lfu_decay_time; /* LFU counter decay factor. */
/* should not be used. Use FLAGS_list_max_ziplist_size and FLAGS_list_compress_depth instead. */
// int list_compress_depth;
// int list_max_ziplist_size;
int lfu_decay_time; /* LFU counter decay factor. */
/* should not be used. Use FLAGS_list_max_ziplist_size and FLAGS_list_compress_depth instead. */
// int list_compress_depth;
// int list_max_ziplist_size;

// unused - left so that object.c will compile.
int maxmemory_policy; /* Policy for key eviction */
// unused - left so that object.c will compile.
int maxmemory_policy; /* Policy for key eviction */

unsigned long page_size;
size_t hash_max_listpack_entries,
hash_max_listpack_value;
size_t zset_max_listpack_entries;
size_t zset_max_listpack_value;
unsigned long page_size;
size_t hash_max_listpack_entries, hash_max_listpack_value;
size_t zset_max_listpack_entries;
size_t zset_max_listpack_value;

size_t stream_node_max_bytes;
long long stream_node_max_entries;
size_t stream_node_max_bytes;
long long stream_node_max_entries;
} Server;


extern Server server;

void InitRedisTables();
Expand Down
2 changes: 1 addition & 1 deletion src/server/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ add_library(dragonfly_lib channel_slice.cc command_registry.cc
zset_family.cc version.cc bitops_family.cc container_utils.cc)

cxx_link(dragonfly_lib dfly_transaction dfly_facade redis_lib strings_lib html_lib
absl::random_random TRDP::jsoncons)
absl::random_random TRDP::jsoncons zstd)

add_library(dfly_test_lib test_utils.cc)
cxx_link(dfly_test_lib dragonfly_lib epoll_fiber_lib facade_test gtest_main_ext)
Expand Down
4 changes: 3 additions & 1 deletion src/server/generic_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ extern "C" {

ABSL_FLAG(uint32_t, dbnum, 16, "Number of databases");
ABSL_FLAG(uint32_t, keys_output_limit, 8192, "Maximum number of keys output by keys command");
ABSL_DECLARE_FLAG(int, compression_mode);

namespace dfly {
using namespace std;
Expand Down Expand Up @@ -429,7 +430,8 @@ OpResult<std::string> OpDump(const OpArgs& op_args, string_view key) {
if (IsValid(it)) {
DVLOG(1) << "Dump: key '" << key << "' successfully found, going to dump it";
std::unique_ptr<::io::StringSink> sink = std::make_unique<::io::StringSink>();
RdbSerializer serializer(sink.get());
int compression_mode = absl::GetFlag(FLAGS_compression_mode);
RdbSerializer serializer(sink.get(), compression_mode != 0);

// According to Redis code we need to
// 1. Save the value itself - without the key
Expand Down
2 changes: 2 additions & 0 deletions src/server/rdb_extensions.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,5 @@
// to notify that it finished streaming static data and is ready
// to switch to the stable state replication phase.
const uint8_t RDB_OPCODE_FULLSYNC_END = 200;
const uint8_t RDB_OPCODE_COMPRESSED_BLOB_START = 201;
const uint8_t RDB_OPCODE_COMPRESSED_BLOB_END = 202;
Loading