Skip to content

Commit

Permalink
refactor(replication): Reduce memory copying during incremental synch…
Browse files Browse the repository at this point in the history
…ronization

Optimized the logic for handling Psync incremental data on replica nodes, reducing an unnecessary data copy and lowering the loop complexity in the corresponding logic.
  • Loading branch information
RiversJin committed Dec 11, 2024
1 parent 698c3d4 commit 514b57d
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 42 deletions.
73 changes: 39 additions & 34 deletions src/cluster/replication.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,16 @@
#include <atomic>
#include <csignal>
#include <future>
#include <memory>
#include <string>
#include <string_view>
#include <thread>

#include "commands/error_constants.h"
#include "event_util.h"
#include "fmt/format.h"
#include "io_util.h"
#include "rocksdb/write_batch.h"
#include "rocksdb_crc32c.h"
#include "scope_exit.h"
#include "server/redis_reply.h"
Expand Down Expand Up @@ -123,8 +126,8 @@ void FeedSlaveThread::loop() {
// iter_ would be always valid here
auto batch = iter_->GetBatch();
if (batch.sequence != curr_seq) {
LOG(ERROR) << "Fatal error encountered, WAL iterator is discrete, some seq might be lost"
<< ", sequence " << curr_seq << " expected, but got " << batch.sequence;
LOG(ERROR) << "Fatal error encountered, WAL iterator is discrete, some seq might be lost" << ", sequence "
<< curr_seq << " expected, but got " << batch.sequence;
Stop();
return;
}
Expand Down Expand Up @@ -470,8 +473,7 @@ ReplicationThread::CBState ReplicationThread::replConfReadCB(bufferevent *bev) {
// on unknown option: first try without announce ip, if it fails again - do nothing (to prevent infinite loop)
if (isUnknownOption(line.View()) && !next_try_without_announce_ip_address_) {
next_try_without_announce_ip_address_ = true;
LOG(WARNING) << "The old version master, can't handle ip-address, "
<< "try without it again";
LOG(WARNING) << "The old version master, can't handle ip-address, " << "try without it again";
// Retry previous state, i.e. send replconf again
return CBState::PREV;
}
Expand Down Expand Up @@ -537,8 +539,7 @@ ReplicationThread::CBState ReplicationThread::tryPSyncReadCB(bufferevent *bev) {

if (line[0] == '-' && isWrongPsyncNum(line.View())) {
next_try_old_psync_ = true;
LOG(WARNING) << "The old version master, can't handle new PSYNC, "
<< "try old PSYNC again";
LOG(WARNING) << "The old version master, can't handle new PSYNC, " << "try old PSYNC again";
// Retry previous state, i.e. send PSYNC again
return CBState::PREV;
}
Expand All @@ -557,7 +558,6 @@ ReplicationThread::CBState ReplicationThread::tryPSyncReadCB(bufferevent *bev) {
}

ReplicationThread::CBState ReplicationThread::incrementBatchLoopCB(bufferevent *bev) {
char *bulk_data = nullptr;
repl_state_.store(kReplConnected, std::memory_order_relaxed);
auto input = bufferevent_get_input(bev);
while (true) {
Expand All @@ -576,31 +576,38 @@ ReplicationThread::CBState ReplicationThread::incrementBatchLoopCB(bufferevent *
}
case Incr_batch_data:
// Read bulk data (batch data)
if (incr_bulk_len_ + 2 <= evbuffer_get_length(input)) { // We got enough data
bulk_data = reinterpret_cast<char *>(evbuffer_pullup(input, static_cast<ssize_t>(incr_bulk_len_ + 2)));
std::string bulk_string = std::string(bulk_data, incr_bulk_len_);
if (incr_bulk_len_ + 2 > evbuffer_get_length(input)) { // If data not enough
return CBState::AGAIN;
}

const char *bulk_data =
reinterpret_cast<const char *>(evbuffer_pullup(input, static_cast<ssize_t>(incr_bulk_len_ + 2)));
std::string bulk_string = std::string(bulk_data, incr_bulk_len_);
evbuffer_drain(input, incr_bulk_len_ + 2);
incr_state_ = Incr_batch_size;

if (bulk_string == "ping") {
// master would send the ping heartbeat packet to check whether the slave was alive or not,
// don't write ping to db here.
if (bulk_string != "ping") {
auto s = storage_->ReplicaApplyWriteBatch(std::string(bulk_data, incr_bulk_len_));
if (!s.IsOK()) {
LOG(ERROR) << "[replication] CRITICAL - Failed to write batch to local, " << s.Msg() << ". batch: 0x"
<< util::StringToHex(bulk_string);
return CBState::RESTART;
}

s = parseWriteBatch(bulk_string);
if (!s.IsOK()) {
LOG(ERROR) << "[replication] CRITICAL - failed to parse write batch 0x" << util::StringToHex(bulk_string)
<< ": " << s.Msg();
return CBState::RESTART;
}
}
evbuffer_drain(input, incr_bulk_len_ + 2);
incr_state_ = Incr_batch_size;
} else {
return CBState::AGAIN;
}

rocksdb::WriteBatch batch(std::move(bulk_string));

auto s = storage_->ReplicaApplyWriteBatch(&batch);
if (!s.IsOK()) {
LOG(ERROR) << "[replication] CRITICAL - Failed to write batch to local, " << s.Msg() << ". batch: 0x"
<< util::StringToHex(batch.Data());
return CBState::RESTART;
}

s = parseWriteBatch(batch);
if (!s.IsOK()) {
LOG(ERROR) << "[replication] CRITICAL - failed to parse write batch 0x" << util::StringToHex(batch.Data())
<< ": " << s.Msg();
return CBState::RESTART;
}

break;
}
}
Expand Down Expand Up @@ -815,10 +822,9 @@ Status ReplicationThread::parallelFetchFile(const std::string &dir,
fetch_cnt.fetch_add(1);
uint32_t cur_skip_cnt = skip_cnt.load();
uint32_t cur_fetch_cnt = fetch_cnt.load();
LOG(INFO) << "[fetch] "
<< "Fetched " << fetch_file << ", crc32: " << fetch_crc << ", skip count: " << cur_skip_cnt
<< ", fetch count: " << cur_fetch_cnt << ", progress: " << cur_skip_cnt + cur_fetch_cnt << "/"
<< files_count;
LOG(INFO) << "[fetch] " << "Fetched " << fetch_file << ", crc32: " << fetch_crc
<< ", skip count: " << cur_skip_cnt << ", fetch count: " << cur_fetch_cnt
<< ", progress: " << cur_skip_cnt + cur_fetch_cnt << "/" << files_count;
};
// For master using old version, it only supports to fetch a single file by one
// command, so we need to fetch all files by multiple command interactions.
Expand Down Expand Up @@ -981,8 +987,7 @@ void ReplicationThread::TimerCB(int, int16_t) {
}
}

Status ReplicationThread::parseWriteBatch(const std::string &batch_string) {
rocksdb::WriteBatch write_batch(batch_string);
Status ReplicationThread::parseWriteBatch(const rocksdb::WriteBatch &write_batch) {
WriteBatchHandler write_batch_handler;

auto db_status = write_batch.Iterate(&write_batch_handler);
Expand Down
3 changes: 2 additions & 1 deletion src/cluster/replication.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

#include "event_util.h"
#include "io_util.h"
#include "rocksdb/write_batch.h"
#include "server/redis_connection.h"
#include "status.h"
#include "storage/storage.h"
Expand Down Expand Up @@ -209,7 +210,7 @@ class ReplicationThread : private EventCallbackBase<ReplicationThread> {
static bool isWrongPsyncNum(std::string_view err);
static bool isUnknownOption(std::string_view err);

Status parseWriteBatch(const std::string &batch_string);
Status parseWriteBatch(const rocksdb::WriteBatch &write_batch);
};

/*
Expand Down
16 changes: 11 additions & 5 deletions src/storage/storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
#include "redis_db.h"
#include "redis_metadata.h"
#include "rocksdb/cache.h"
#include "rocksdb/options.h"
#include "rocksdb/write_batch.h"
#include "rocksdb_crc32c.h"
#include "server/server.h"
#include "storage/batch_indexer.h"
Expand Down Expand Up @@ -766,22 +768,26 @@ rocksdb::Status Storage::FlushScripts(engine::Context &ctx, const rocksdb::Write
return Write(ctx, options, batch->GetWriteBatch());
}

Status Storage::ReplicaApplyWriteBatch(std::string &&raw_batch) {
return ApplyWriteBatch(default_write_opts_, std::move(raw_batch));
Status Storage::ReplicaApplyWriteBatch(rocksdb::WriteBatch *batch) {
return applyWriteBatch(default_write_opts_, batch);
}

Status Storage::ApplyWriteBatch(const rocksdb::WriteOptions &options, std::string &&raw_batch) {
Status Storage::applyWriteBatch(const rocksdb::WriteOptions &options, rocksdb::WriteBatch *batch) {
if (db_size_limit_reached_) {
return {Status::NotOK, "reach space limit"};
}
auto batch = rocksdb::WriteBatch(std::move(raw_batch));
auto s = db_->Write(options, &batch);
auto s = db_->Write(options, batch);
if (!s.ok()) {
return {Status::NotOK, s.ToString()};
}
return Status::OK();
}

Status Storage::ApplyWriteBatch(const rocksdb::WriteOptions &options, std::string &&raw_batch) {
auto batch = rocksdb::WriteBatch(std::move(raw_batch));
return applyWriteBatch(options, &batch);
}

void Storage::RecordStat(StatType type, uint64_t v) {
switch (type) {
case StatType::FlushCount:
Expand Down
6 changes: 4 additions & 2 deletions src/storage/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
#include "config/config.h"
#include "lock_manager.h"
#include "observer_or_unique.h"
#include "rocksdb/write_batch.h"
#include "status.h"

#if defined(__sparc__) || defined(__arm__)
Expand Down Expand Up @@ -230,7 +231,7 @@ class Storage {
Status RestoreFromBackup();
Status RestoreFromCheckpoint();
Status GetWALIter(rocksdb::SequenceNumber seq, std::unique_ptr<rocksdb::TransactionLogIterator> *iter);
Status ReplicaApplyWriteBatch(std::string &&raw_batch);
Status ReplicaApplyWriteBatch(rocksdb::WriteBatch *batch);
Status ApplyWriteBatch(const rocksdb::WriteOptions &options, std::string &&raw_batch);
rocksdb::SequenceNumber LatestSeqNumber();

Expand Down Expand Up @@ -380,13 +381,14 @@ class Storage {
// command, so it won't have multi transactions to be executed at the same time.
std::unique_ptr<rocksdb::WriteBatchWithIndex> txn_write_batch_;

rocksdb::WriteOptions default_write_opts_ = rocksdb::WriteOptions();
rocksdb::WriteOptions default_write_opts_;

// rocksdb used global block cache
std::shared_ptr<rocksdb::Cache> shared_block_cache_;

rocksdb::Status writeToDB(engine::Context &ctx, const rocksdb::WriteOptions &options, rocksdb::WriteBatch *updates);
void recordKeyspaceStat(const rocksdb::ColumnFamilyHandle *column_family, const rocksdb::Status &s);
Status applyWriteBatch(const rocksdb::WriteOptions &options, rocksdb::WriteBatch *batch);
};

/// Context passes fixed snapshot and batch between APIs
Expand Down

0 comments on commit 514b57d

Please sign in to comment.