diff --git a/src/server/rdb_save.cc b/src/server/rdb_save.cc index daa4bb16314d..704d6fab68ae 100644 --- a/src/server/rdb_save.cc +++ b/src/server/rdb_save.cc @@ -655,23 +655,8 @@ error_code RdbSerializer::WriteRaw(const io::Bytes& buf) { return error_code{}; } -io::Bytes RdbSerializer::Flush() { - size_t sz = mem_buf_.InputLen(); - if (sz == 0) - return mem_buf_.InputBuffer(); - - if (compression_mode_ == CompressionMode::MULTY_ENTRY_ZSTD || - compression_mode_ == CompressionMode::MULTY_ENTRY_LZ4) { - CompressBlob(); - // After blob was compressed membuf was overwirten with compressed data - sz = mem_buf_.InputLen(); - } - - return mem_buf_.InputBuffer(); -} - error_code RdbSerializer::FlushToSink(io::Sink* s) { - auto bytes = Flush(); + auto bytes = PrepareFlush(); if (bytes.empty()) return error_code{}; @@ -687,8 +672,17 @@ size_t RdbSerializer::SerializedLen() const { return mem_buf_.InputLen(); } -void RdbSerializer::Clear() { - mem_buf_.Clear(); +io::Bytes RdbSerializer::PrepareFlush() { + size_t sz = mem_buf_.InputLen(); + if (sz == 0) + return mem_buf_.InputBuffer(); + + if (compression_mode_ == CompressionMode::MULTY_ENTRY_ZSTD || + compression_mode_ == CompressionMode::MULTY_ENTRY_LZ4) { + CompressBlob(); + } + + return mem_buf_.InputBuffer(); } error_code RdbSerializer::WriteJournalEntries(absl::Span entries) { diff --git a/src/server/rdb_save.h b/src/server/rdb_save.h index 15f6e0a240a2..cbc4f6ca52ba 100644 --- a/src/server/rdb_save.h +++ b/src/server/rdb_save.h @@ -118,18 +118,12 @@ class RdbSerializer { ~RdbSerializer(); - // Get access to internal buffer, compressed, if enabled. - io::Bytes Flush(); - // Internal buffer size. Might shrink after flush due to compression. size_t SerializedLen() const; // Flush internal buffer to sink. std::error_code FlushToSink(io::Sink* s); - // Clear internal buffer contents. - void Clear(); - std::error_code SelectDb(uint32_t dbid); // Must be called in the thread to which `it` belongs. @@ -161,6 +155,9 @@ class RdbSerializer { std::error_code SendFullSyncCut(); private: + // Prepare internal buffer for flush. Compress it. + io::Bytes PrepareFlush(); + std::error_code SaveLzfBlob(const ::io::Bytes& src, size_t uncompressed_len); std::error_code SaveObject(const PrimeValue& pv); std::error_code SaveListObject(const robj* obj); diff --git a/src/server/snapshot.cc b/src/server/snapshot.cc index b7f6f0cea388..fafa5c8b9d0c 100644 --- a/src/server/snapshot.cc +++ b/src/server/snapshot.cc @@ -209,7 +209,7 @@ unsigned SliceSnapshot::SerializeBucket(DbIndex db_index, PrimeTable::bucket_ite } if (tmp_serializer) { - PushBytesToChannel(db_index, tmp_serializer->Flush()); + PushBytesToChannel(db_index, &*tmp_serializer); VLOG(1) << "Pushed " << result << " entries via tmp_serializer"; } @@ -231,27 +231,30 @@ void SliceSnapshot::SerializeEntry(DbIndex db_indx, const PrimeKey& pk, const Pr ++type_freq_map_[*res]; } -void SliceSnapshot::PushBytesToChannel(DbIndex db_index, io::Bytes bytes) { +size_t SliceSnapshot::PushBytesToChannel(DbIndex db_index, RdbSerializer* serializer) { auto id = rec_id_++; DVLOG(2) << "Pushed " << id; - stats_.channel_bytes += bytes.size(); - DbRecord db_rec{.db_index = db_index, .id = id, .value = string(io::View(bytes))}; + io::StringFile sfile; + serializer->FlushToSink(&sfile); + + size_t serialized = sfile.val.size(); + if (serialized == 0) + return 0; + stats_.channel_bytes += serialized; + + DbRecord db_rec{.db_index = db_index, .id = id, .value = std::move(sfile.val)}; dest_->Push(std::move(db_rec)); + return serialized; } bool SliceSnapshot::FlushDefaultBuffer(bool force) { if (!force && default_serializer_->SerializedLen() < 4096) return false; - auto bytes = default_serializer_->Flush(); - if (bytes.empty()) - return false; - - VLOG(2) << "FlushDefaultBuffer " << bytes.size() << " bytes"; - PushBytesToChannel(current_db_, bytes); - default_serializer_->Clear(); + size_t written = PushBytesToChannel(current_db_, default_serializer_.get()); + VLOG(2) << "FlushDefaultBuffer " << written << " bytes"; return true; } @@ -290,7 +293,7 @@ void SliceSnapshot::OnJournalEntry(const journal::Entry& entry) { serializer_ptr->WriteJournalEntries(absl::Span{&entry, 1}); if (tmp_serializer) { - PushBytesToChannel(entry.dbid, tmp_serializer->Flush()); + PushBytesToChannel(entry.dbid, &*tmp_serializer); } else { // This is the only place that flushes in streaming mode // once the iterate buckets fiber finished. diff --git a/src/server/snapshot.h b/src/server/snapshot.h index 2cde5d401a3d..2d81b6f36780 100644 --- a/src/server/snapshot.h +++ b/src/server/snapshot.h @@ -95,8 +95,8 @@ class SliceSnapshot { void SerializeEntry(DbIndex db_index, const PrimeKey& pk, const PrimeValue& pv, std::optional expire, RdbSerializer* serializer); - // Push byte slice to channel. - void PushBytesToChannel(DbIndex db_index, io::Bytes bytes); + // Push rdb serializer's internal buffer to channel. Return now many bytes were written. + size_t PushBytesToChannel(DbIndex db_index, RdbSerializer* serializer); // DbChange listener void OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req);