Skip to content

Commit

Permalink
fix(server): Fix bug in flushing in snapshot (#654)
Browse files Browse the repository at this point in the history
* fix(server): Fix bug in flushing in snapshot

Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
  • Loading branch information
dranikpg authored Jan 8, 2023
1 parent 7ddde7c commit 3da6bd9
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 38 deletions.
30 changes: 12 additions & 18 deletions src/server/rdb_save.cc
Original file line number Diff line number Diff line change
Expand Up @@ -655,23 +655,8 @@ error_code RdbSerializer::WriteRaw(const io::Bytes& buf) {
return error_code{};
}

io::Bytes RdbSerializer::Flush() {
size_t sz = mem_buf_.InputLen();
if (sz == 0)
return mem_buf_.InputBuffer();

if (compression_mode_ == CompressionMode::MULTY_ENTRY_ZSTD ||
compression_mode_ == CompressionMode::MULTY_ENTRY_LZ4) {
CompressBlob();
// After blob was compressed membuf was overwirten with compressed data
sz = mem_buf_.InputLen();
}

return mem_buf_.InputBuffer();
}

error_code RdbSerializer::FlushToSink(io::Sink* s) {
auto bytes = Flush();
auto bytes = PrepareFlush();
if (bytes.empty())
return error_code{};

Expand All @@ -687,8 +672,17 @@ size_t RdbSerializer::SerializedLen() const {
return mem_buf_.InputLen();
}

void RdbSerializer::Clear() {
mem_buf_.Clear();
io::Bytes RdbSerializer::PrepareFlush() {
size_t sz = mem_buf_.InputLen();
if (sz == 0)
return mem_buf_.InputBuffer();

if (compression_mode_ == CompressionMode::MULTY_ENTRY_ZSTD ||
compression_mode_ == CompressionMode::MULTY_ENTRY_LZ4) {
CompressBlob();
}

return mem_buf_.InputBuffer();
}

error_code RdbSerializer::WriteJournalEntries(absl::Span<const journal::Entry> entries) {
Expand Down
9 changes: 3 additions & 6 deletions src/server/rdb_save.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,18 +118,12 @@ class RdbSerializer {

~RdbSerializer();

// Get access to internal buffer, compressed, if enabled.
io::Bytes Flush();

// Internal buffer size. Might shrink after flush due to compression.
size_t SerializedLen() const;

// Flush internal buffer to sink.
std::error_code FlushToSink(io::Sink* s);

// Clear internal buffer contents.
void Clear();

std::error_code SelectDb(uint32_t dbid);

// Must be called in the thread to which `it` belongs.
Expand Down Expand Up @@ -161,6 +155,9 @@ class RdbSerializer {
std::error_code SendFullSyncCut();

private:
// Prepare internal buffer for flush. Compress it.
io::Bytes PrepareFlush();

std::error_code SaveLzfBlob(const ::io::Bytes& src, size_t uncompressed_len);
std::error_code SaveObject(const PrimeValue& pv);
std::error_code SaveListObject(const robj* obj);
Expand Down
27 changes: 15 additions & 12 deletions src/server/snapshot.cc
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ unsigned SliceSnapshot::SerializeBucket(DbIndex db_index, PrimeTable::bucket_ite
}

if (tmp_serializer) {
PushBytesToChannel(db_index, tmp_serializer->Flush());
PushBytesToChannel(db_index, &*tmp_serializer);
VLOG(1) << "Pushed " << result << " entries via tmp_serializer";
}

Expand All @@ -231,27 +231,30 @@ void SliceSnapshot::SerializeEntry(DbIndex db_indx, const PrimeKey& pk, const Pr
++type_freq_map_[*res];
}

void SliceSnapshot::PushBytesToChannel(DbIndex db_index, io::Bytes bytes) {
size_t SliceSnapshot::PushBytesToChannel(DbIndex db_index, RdbSerializer* serializer) {
auto id = rec_id_++;
DVLOG(2) << "Pushed " << id;

stats_.channel_bytes += bytes.size();
DbRecord db_rec{.db_index = db_index, .id = id, .value = string(io::View(bytes))};
io::StringFile sfile;
serializer->FlushToSink(&sfile);

size_t serialized = sfile.val.size();
if (serialized == 0)
return 0;
stats_.channel_bytes += serialized;

DbRecord db_rec{.db_index = db_index, .id = id, .value = std::move(sfile.val)};

dest_->Push(std::move(db_rec));
return serialized;
}

bool SliceSnapshot::FlushDefaultBuffer(bool force) {
if (!force && default_serializer_->SerializedLen() < 4096)
return false;

auto bytes = default_serializer_->Flush();
if (bytes.empty())
return false;

VLOG(2) << "FlushDefaultBuffer " << bytes.size() << " bytes";
PushBytesToChannel(current_db_, bytes);
default_serializer_->Clear();
size_t written = PushBytesToChannel(current_db_, default_serializer_.get());
VLOG(2) << "FlushDefaultBuffer " << written << " bytes";
return true;
}

Expand Down Expand Up @@ -290,7 +293,7 @@ void SliceSnapshot::OnJournalEntry(const journal::Entry& entry) {
serializer_ptr->WriteJournalEntries(absl::Span{&entry, 1});

if (tmp_serializer) {
PushBytesToChannel(entry.dbid, tmp_serializer->Flush());
PushBytesToChannel(entry.dbid, &*tmp_serializer);
} else {
// This is the only place that flushes in streaming mode
// once the iterate buckets fiber finished.
Expand Down
4 changes: 2 additions & 2 deletions src/server/snapshot.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ class SliceSnapshot {
void SerializeEntry(DbIndex db_index, const PrimeKey& pk, const PrimeValue& pv,
std::optional<uint64_t> expire, RdbSerializer* serializer);

// Push byte slice to channel.
void PushBytesToChannel(DbIndex db_index, io::Bytes bytes);
// Push rdb serializer's internal buffer to channel. Return now many bytes were written.
size_t PushBytesToChannel(DbIndex db_index, RdbSerializer* serializer);

// DbChange listener
void OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req);
Expand Down

0 comments on commit 3da6bd9

Please sign in to comment.