-
Notifications
You must be signed in to change notification settings - Fork 981
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(rdb save): add blob compression on snapshot #505
Conversation
src/server/rdb_save.cc
Outdated
@@ -773,13 +778,16 @@ class RdbSaver::Impl { | |||
RdbSerializer meta_serializer_; | |||
SliceSnapshot::RecordChannel channel_; | |||
std::optional<AlignedBuffer> aligned_buf_; | |||
bool enhanced_compression_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please add comment saying what it means.
src/server/snapshot.h
Outdated
@@ -19,6 +22,22 @@ struct Entry; | |||
|
|||
class RdbSerializer; | |||
|
|||
class ZstdCompress { | |||
public: | |||
ZstdCompress() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe add deleted constructor and operator= to prevent copying by chance.
src/server/snapshot.h
Outdated
size_t channel_bytes_ = 0; | ||
size_t serialized_ = 0, skipped_ = 0, side_saved_ = 0, savecb_calls_ = 0; | ||
uint64_t rec_id_ = 0; | ||
uint32_t num_records_in_blob_ = 0; | ||
|
||
bool enhanced_compression_ = false; | ||
ZstdCompress compressor_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make it unique_ptr<ZstdCompress>
to avoid allocating the context in case compression is false.
src/server/snapshot.cc
Outdated
size_t compressed_size = | ||
ZSTD_compressCCtx(cctx_, compr_buf_.data(), compr_buf_.capacity(), str.data(), str.size(), 1); | ||
if (compressed_size > str.size() * 0.85) { | ||
++comperssion_no_effective_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
comperssion -> compression
src/server/snapshot.cc
Outdated
void SliceSnapshot::PushFileToChannel(io::StringFile* sfile, DbIndex db_index, unsigned num_records, | ||
bool should_compress) { | ||
string string_to_push = std::move(sfile->val); | ||
if (should_compress && string_to_push.size() > 4096) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please introduce a named variable and maybe reduce its value to 256.
also please check if compression makes sense at all.
src/server/snapshot.h
Outdated
@@ -19,6 +22,22 @@ struct Entry; | |||
|
|||
class RdbSerializer; | |||
|
|||
class ZstdCompress { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: i would move this class definition into cc and just forward declare it here.
why - to avoid bringing "zstd.h" includes to all the cc files that include this header since it's really an implementation detail. Also see my comment below about unique_ptr
src/server/snapshot.cc
Outdated
compr_buf_.reserve(buf_size); | ||
} | ||
size_t compressed_size = | ||
ZSTD_compressCCtx(cctx_, compr_buf_.data(), compr_buf_.capacity(), str.data(), str.size(), 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please introduce a flag snapshot_compression_level
- it will be convenient for playing with compression.
5b0acf1
to
c76a42b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great work. Are you going to send test code in a separate pr?
src/server/rdb_load.cc
Outdated
// Push uncompressed blob to its membuf | ||
uncompressed_mem_buf_.Reserve(uncompressed_blob.size()); | ||
IoBuf::Bytes dest = uncompressed_mem_buf_.AppendBuffer(); | ||
CHECK_GE(dest.size(), uncompressed_blob.size()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please avoid to check failing as a result of bad data. please use error_code for this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe my comment is wrong here. You can not check-fail as a result of bad data, you verify the invariant, right?
src/server/rdb_load.cc
Outdated
uncompressed_mem_buf_.Reserve(uncompressed_blob.size()); | ||
IoBuf::Bytes dest = uncompressed_mem_buf_.AppendBuffer(); | ||
CHECK_GE(dest.size(), uncompressed_blob.size()); | ||
memcpy(dest.data(), uncompressed_blob.data(), uncompressed_blob.size()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is some inefficiency here (double write to memory). maybe add todo: to adjust interfaces so that we avoid redundant memory copy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, I am fixing this now
src/server/rdb_save.cc
Outdated
@@ -737,7 +743,7 @@ class RdbSaver::Impl { | |||
public: | |||
// We pass K=sz to say how many producers are pushing data in order to maintain | |||
// correct closing semantics - channel is closing when K producers marked it as closed. | |||
Impl(bool align_writes, unsigned producers_len, io::Sink* sink); | |||
Impl(bool align_writes, unsigned producers_len, bool multi_entires_compression, io::Sink* sink); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
multi_entires_compression -> multi_entries_compression
src/server/snapshot.h
Outdated
size_t channel_bytes_ = 0; | ||
size_t serialized_ = 0, skipped_ = 0, side_saved_ = 0, savecb_calls_ = 0; | ||
uint64_t rec_id_ = 0; | ||
uint32_t num_records_in_blob_ = 0; | ||
|
||
bool multi_entires_compression_ = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
multi_entries_compression_
src/server/snapshot.cc
Outdated
} | ||
auto comp_res = zstd_serializer_->Compress(string_to_push); | ||
if (comp_res.first) { | ||
string_to_push = comp_res.second; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
string_to_push.swap(comp_res.second);
src/server/rdb_save.cc
Outdated
// Compress the string with end blob opcode | ||
string to_compress(str.data(), str.size()); | ||
opcode = RDB_OPCODE_COMPRESSED_BLOB_END; | ||
to_compress.append(reinterpret_cast<const char*>(&opcode), 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to_compress.push_back(opcode);
src/server/rdb_save.cc
Outdated
#include "base/logging.h" | ||
#include "server/engine_shard_set.h" | ||
#include "server/error.h" | ||
#include "server/rdb_extensions.h" | ||
#include "server/snapshot.h" | ||
#include "util/fibers/simple_channel.h" | ||
|
||
ABSL_FLAG(bool, save_enhanced_compression, true, "DF save compressed blobs of multiple entries"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is a bug when you run dragonfly with save_enhanced_compression=false
it does not compress strings with lzf as it did beforehand.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Owww not only that but I do double compression lzf and zstd when I have the flag=true
77f0817
to
a4b69b4
Compare
src/server/rdb_save.cc
Outdated
producer_count = 0; | ||
if (compression_mode == 2) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets keep it as RDB (no need for multi-entry here).
src/server/rdb_save.cc
Outdated
} | ||
|
||
// Compress the string | ||
string_view compresse_res = impl_->Compress(str); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
compresse_res -> compressed_res
src/server/rdb_save.cc
Outdated
// Write encoded compressed string len and than the compressed string | ||
serialized_compressed_blob.append(reinterpret_cast<const char*>(buf), enclen); | ||
serialized_compressed_blob.append(compresse_res); | ||
return std::make_pair(true, serialized_compressed_blob); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
std::move(serialized_compressed_blob)
src/server/rdb_save.h
Outdated
~ZstdCompressSerializer(); | ||
|
||
// Returns a pair consisting of an bool denoting whether the string was compressed | ||
// and a string the result of compression. If givven string was not compressed returned |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
givven -> given
a4b69b4
to
b52c0f0
Compare
b52c0f0
to
fe6bba2
Compare
Signed-off-by: adi_holden <adi@dragonflydb.io>
fe6bba2
to
9a7cc25
Compare
Signed-off-by: adi_holden adi@dragonflydb.io