Skip to content

Commit

Permalink
PSYNC based on Unique Replication Sequence ID (#538)
Browse files Browse the repository at this point in the history
### Background
Currently, master only checks sequence number when replica asks for PSYNC,
that is not enough since they may have different replication history even
the replica asking sequence is in the range of the master current WAL.

A bug report in #462

PS: Master check also DB name before checking sequence number, but it is
also not enough since it is setting in conf, and don't change when role is
changed.
 
### Solution
We design 'PSYNC based on Unique Replication Sequence ID', we add unique
replication id for every write batch (the operation of each command on the
storage engine), so the combination of replication id and sequence is unique
for write batch. The master can identify whether the replica has the same
replication history by checking replication id and sequence. 
Like Redis, replicas can partially resynchronize with new master if old
master is failed and new selected master has largest offset, and replicas
can also partially resynchronize with master after restarting if the replicas'
conf file has the its origin master.

Unique replication id will be changed when replicas become master.

By default, it is not enabled since this stricter check may easily lead to
full synchronization. You should enable it if you want to data correctness,
maybe we enable it by default in Kvrocks 3.0.
  • Loading branch information
ShooterIT authored May 24, 2022
1 parent fd81d5b commit 9ecf5c7
Show file tree
Hide file tree
Showing 13 changed files with 429 additions and 34 deletions.
14 changes: 14 additions & 0 deletions kvrocks.conf
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,20 @@ tcp-backlog 511
# connect 'master's listening port' when synchronization.
master-use-repl-port no

# Currently, master only checks sequence number when replica asks for PSYNC,
# that is not enough since they may have different replication history even
# the replica asking sequence is in the range of the master current WAL.
#
# We design 'Replication Sequence ID' PSYNC, we add unique replication id for
# every write batch (the operation of each command on the storage engine), so
# the combination of replication id and sequence is unique for write batch.
# The master can identify whether the replica has the same replication history
# by checking replication id and sequence.
#
# By default, it is not enabled since this stricter check may easily lead to
# full synchronization.
use-rsid-psync no

# Master-Slave replication. Use slaveof to make a kvrocks instance a copy of
# another kvrocks server. A few things to understand ASAP about kvrocks replication.
#
Expand Down
12 changes: 11 additions & 1 deletion src/batch_extractor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,22 @@
#include <rocksdb/write_batch.h>
#include <glog/logging.h>

#include "server.h"
#include "redis_bitmap.h"
#include "redis_slot.h"
#include "redis_reply.h"

void WriteBatchExtractor::LogData(const rocksdb::Slice &blob) {
log_data_.Decode(blob);
// Currently, we only have two kinds of log data
if (ServerLogData::IsServerLogData(blob.data())) {
ServerLogData server_log;
if (server_log.Decode(blob).IsOK()) {
// We don't handle server log currently
}
} else {
// Redis type log data
log_data_.Decode(blob);
}
}

rocksdb::Status WriteBatchExtractor::PutCF(uint32_t column_family_id, const Slice &key,
Expand Down
1 change: 1 addition & 0 deletions src/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ Config::Config() {
{"slave-empty-db-before-fullsync", false, new YesNoField(&slave_empty_db_before_fullsync, false)},
{"slave-priority", false, new IntField(&slave_priority, 100, 0, INT_MAX)},
{"slave-read-only", false, new YesNoField(&slave_readonly, true)},
{"use-rsid-psync", true, new YesNoField(&use_rsid_psync, false)},
{"profiling-sample-ratio", false, new IntField(&profiling_sample_ratio, 0, 0, 100)},
{"profiling-sample-record-max-len", false, new IntField(&profiling_sample_record_max_len, 256, 0, INT_MAX)},
{"profiling-sample-record-threshold-ms",
Expand Down
1 change: 1 addition & 0 deletions src/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ struct Config{
bool purge_backup_on_fullsync = false;
bool auto_resize_block_and_sst = true;
int fullsync_recv_file_delay = 0;
bool use_rsid_psync = false;
std::vector<std::string> binds;
std::string dir;
std::string db_dir;
Expand Down
48 changes: 43 additions & 5 deletions src/redis_cmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3608,22 +3608,58 @@ class CommandStats: public Commander {
class CommandPSync : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
size_t seq_arg = 1;
if (args.size() == 3) {
seq_arg = 2;
new_psync = true;
}
try {
auto s = std::stoull(args[1]);
auto s = std::stoull(args[seq_arg]);
next_repl_seq = static_cast<rocksdb::SequenceNumber>(s);
} catch (const std::exception &e) {
return Status(Status::RedisParseErr, "value is not an unsigned long long or out of range");
}
if (new_psync) {
assert(args.size() == 3);
replica_replid = args[1];
if (replica_replid.size() != kReplIdLength) {
return Status(Status::RedisParseErr, "Wrong replication id length");
}
}
return Commander::Parse(args);
}

Status Execute(Server *svr, Connection *conn, std::string *output) override {
LOG(INFO) << "Slave " << conn->GetAddr() << " asks for synchronization"
LOG(INFO) << "Slave " << conn->GetAddr()
<< ", listening port: " << conn->GetListeningPort()
<< " asks for synchronization"
<< " with next sequence: " << next_repl_seq
<< " replication id: " << (replica_replid.length() ? replica_replid : "not supported")
<< ", and local sequence: " << svr->storage_->LatestSeq();
if (!checkWALBoundary(svr->storage_, next_repl_seq).IsOK()) {
svr->stats_.IncrPSyncErrCounter();

bool need_full_sync = false;

// Check replication id of the last sequence log
if (new_psync && svr->GetConfig()->use_rsid_psync) {
std::string replid_in_wal = svr->storage_->GetReplIdFromWalBySeq(next_repl_seq - 1);
LOG(INFO) << "Replication id in WAL: " << replid_in_wal;

// We check replication id only when WAL has this sequence, since there may be no WAL,
// Or WAL may has nothing when starting from db of old version kvrocks.
if (replid_in_wal.length() == kReplIdLength && replid_in_wal != replica_replid) {
*output = "wrong replication id of the last log";
need_full_sync = true;
}
}

// Check Log sequence
if (!need_full_sync && !checkWALBoundary(svr->storage_, next_repl_seq).IsOK()) {
*output = "sequence out of range, please use fullsync";
need_full_sync = true;
}

if (need_full_sync) {
svr->stats_.IncrPSyncErrCounter();
return Status(Status::RedisExecErr, *output);
}

Expand All @@ -3650,6 +3686,8 @@ class CommandPSync : public Commander {

private:
rocksdb::SequenceNumber next_repl_seq = 0;
bool new_psync = false;
std::string replica_replid;

// Return OK if the seq is in the range of the current WAL
Status checkWALBoundary(Engine::Storage *storage,
Expand Down Expand Up @@ -4826,7 +4864,7 @@ CommandAttributes redisCommandTable[] = {
ADD_CMD("stats", 1, "read-only", 0, 0, 0, CommandStats),

ADD_CMD("replconf", -3, "read-only replication no-script", 0, 0, 0, CommandReplConf),
ADD_CMD("psync", 2, "read-only replication no-multi no-script", 0, 0, 0, CommandPSync),
ADD_CMD("psync", -2, "read-only replication no-multi no-script", 0, 0, 0, CommandPSync),
ADD_CMD("_fetch_meta", 1, "read-only replication no-multi no-script", 0, 0, 0, CommandFetchMeta),
ADD_CMD("_fetch_file", 2, "read-only replication no-multi no-script", 0, 0, 0, CommandFetchFile),
ADD_CMD("_db_name", 1, "read-only replication no-multi", 0, 0, 0, CommandDBName),
Expand Down
53 changes: 49 additions & 4 deletions src/replication.cc
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,8 @@ void ReplicationThread::CallbacksStateMachine::EvCallback(bufferevent *bev,
switch (st) {
case CBState::NEXT:
++self->handler_idx_;
case CBState::PREV:
if (st == CBState::PREV) --self->handler_idx_;
if (self->getHandlerEventType(self->handler_idx_) == WRITE) {
SetWriteCB(bev, EvCallback, ctx);
} else {
Expand Down Expand Up @@ -486,10 +488,39 @@ ReplicationThread::CBState ReplicationThread::replConfReadCB(
ReplicationThread::CBState ReplicationThread::tryPSyncWriteCB(
bufferevent *bev, void *ctx) {
auto self = static_cast<ReplicationThread *>(ctx);
auto next_seq = self->storage_->LatestSeq() + 1;
send_string(bev, Redis::MultiBulkString({"PSYNC", std::to_string(next_seq)}));
auto cur_seq = self->storage_->LatestSeq();
auto next_seq = cur_seq + 1;
std::string replid;

// Get replication id
std::string replid_in_wal = self->storage_->GetReplIdFromWalBySeq(cur_seq);
// Set if valid replication id
if (replid_in_wal.length() == kReplIdLength) {
replid = replid_in_wal;
} else {
// Maybe there is no WAL, we can get replication id from db since master
// always write replication id into db before any operation when starting
// new "replication history".
std::string replid_in_db = self->storage_->GetReplIdFromDbEngine();
if (replid_in_db.length() == kReplIdLength) {
replid = replid_in_db;
}
}

// To adapt to old master using old PSYNC, i.e. only use next sequence id.
// Also use old PSYNC if replica can't find replication id from WAL and DB.
if (!self->srv_->GetConfig()->use_rsid_psync ||
self->next_try_old_psync_ || replid.length() != kReplIdLength) {
self->next_try_old_psync_ = false; // Reset next_try_old_psync_
send_string(bev, Redis::MultiBulkString({"PSYNC", std::to_string(next_seq)}));
LOG(INFO) << "[replication] Try to use psync, next seq: " << next_seq;
} else {
// NEW PSYNC "Unique Replication Sequence ID": replication id and sequence id
send_string(bev, Redis::MultiBulkString({"PSYNC", replid, std::to_string(next_seq)}));
LOG(INFO) << "[replication] Try to use new psync, current unique replication sequence id: "
<< replid << ":" << cur_seq;
}
self->repl_state_ = kReplSendPSync;
LOG(INFO) << "[replication] Try to use psync, next seq: " << next_seq;
return CBState::NEXT;
}

Expand All @@ -507,6 +538,16 @@ ReplicationThread::CBState ReplicationThread::tryPSyncReadCB(bufferevent *bev,
LOG(WARNING) << "The master was restoring the db, retry later";
return CBState::RESTART;
}

if (line[0] == '-' && isWrongPsyncNum(line)) {
self->next_try_old_psync_ = true;
free(line);
LOG(WARNING) << "The old version master, can't handle new PSYNC, "
<< "try old PSYNC again";
// Retry previous state, i.e. send PSYNC again
return CBState::PREV;
}

if (strncmp(line, "+OK", 3) != 0) {
// PSYNC isn't OK, we should use FullSync
// Switch to fullsync state machine
Expand Down Expand Up @@ -553,7 +594,7 @@ ReplicationThread::CBState ReplicationThread::incrementBatchLoopCB(
// master would send the ping heartbeat packet to check whether the slave was alive or not,
// don't write ping to db here.
if (bulk_string != "ping") {
auto s = self->storage_->WriteBatch(std::string(bulk_data, self->incr_bulk_len_));
auto s = self->storage_->ReplicaApplyWriteBatch(std::string(bulk_data, self->incr_bulk_len_));
if (!s.IsOK()) {
LOG(ERROR) << "[replication] CRITICAL - Failed to write batch to local, " << s.Msg() << ". batch: 0x"
<< Util::StringToHex(bulk_string);
Expand Down Expand Up @@ -967,6 +1008,10 @@ bool ReplicationThread::isRestoringError(const char *err) {
return std::string(err) == "-ERR restoring the db from backup";
}

bool ReplicationThread::isWrongPsyncNum(const char *err) {
return std::string(err) == "-ERR wrong number of arguments";
}

rocksdb::Status WriteBatchHandler::PutCF(uint32_t column_family_id, const rocksdb::Slice &key,
const rocksdb::Slice &value) {
if (column_family_id == kColumnFamilyIDPubSub) {
Expand Down
10 changes: 10 additions & 0 deletions src/replication.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ class ReplicationThread {
public:
enum class State {
NEXT,
PREV,
AGAIN,
QUIT,
RESTART,
Expand Down Expand Up @@ -148,6 +149,7 @@ class ReplicationThread {
Engine::Storage *storage_ = nullptr;
ReplState repl_state_;
time_t last_io_time_ = 0;
bool next_try_old_psync_ = false;

std::function<void()> pre_fullsync_cb_;
std::function<void()> post_fullsync_cb_;
Expand Down Expand Up @@ -198,6 +200,7 @@ class ReplicationThread {
Status parallelFetchFile(const std::string &dir,
const std::vector<std::pair<std::string, uint32_t>> &files);
static bool isRestoringError(const char *err);
static bool isWrongPsyncNum(const char *err);

static void EventTimerCB(int, int16_t, void *ctx);

Expand All @@ -211,6 +214,13 @@ class WriteBatchHandler : public rocksdb::WriteBatch::Handler {
public:
rocksdb::Status PutCF(uint32_t column_family_id, const rocksdb::Slice &key,
const rocksdb::Slice &value) override;
rocksdb::Status DeleteCF(uint32_t column_family_id, const rocksdb::Slice &key) override {
return rocksdb::Status::OK();
}
rocksdb::Status DeleteRangeCF(uint32_t column_family_id,
const rocksdb::Slice& begin_key, const rocksdb::Slice& end_key) override {
return rocksdb::Status::OK();
}
WriteBatchType Type() { return type_; }
std::string Key() const { return kv_.first; }
std::string Value() const { return kv_.second; }
Expand Down
42 changes: 29 additions & 13 deletions src/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ Status Server::Start() {
if (!config_->master_host.empty()) {
Status s = AddMaster(config_->master_host, static_cast<uint32_t>(config_->master_port), false);
if (!s.IsOK()) return s;
} else {
// Generate new replication id if not a replica
storage_->ShiftReplId();
}

if (config_->cluster_enabled) {
Expand Down Expand Up @@ -237,6 +240,7 @@ Status Server::RemoveMaster() {
config_->ClearMaster();
if (replication_thread_) replication_thread_->Stop();
replication_thread_ = nullptr;
storage_->ShiftReplId();
}
slaveof_mu_.unlock();
return Status::OK();
Expand Down Expand Up @@ -1345,7 +1349,7 @@ Status Server::ScriptGet(const std::string &sha, std::string *body) {

void Server::ScriptSet(const std::string &sha, const std::string &body) {
std::string funcname = Engine::kLuaFunctionPrefix + sha;
WriteToPropagateCF(funcname, body);
storage_->WriteToPropagateCF(funcname, body);
}

void Server::ScriptReset() {
Expand All @@ -1359,17 +1363,6 @@ void Server::ScriptFlush() {
ScriptReset();
}

Status Server::WriteToPropagateCF(const std::string &key, const std::string &value) const {
rocksdb::WriteBatch batch;
auto propagateCf = storage_->GetCFHandle(Engine::kPropagateColumnFamilyName);
batch.Put(propagateCf, key, value);
auto s = storage_->Write(rocksdb::WriteOptions(), &batch);
if (!s.ok()) {
return Status(Status::NotOK, s.ToString());
}
return Status::OK();
}

// Generally, we store data into rocksdb and just replicate WAL instead of propagating
// commands. But sometimes, we need to update inner states or do special operations
// for specific commands, such as `script flush`.
Expand All @@ -1380,7 +1373,7 @@ Status Server::Propagate(const std::string &channel, const std::vector<std::stri
for (const auto &iter : tokens) {
value += Redis::BulkString(iter);
}
return WriteToPropagateCF(channel, value);
return storage_->WriteToPropagateCF(channel, value);
}

Status Server::ExecPropagateScriptCommand(const std::vector<std::string> &tokens) {
Expand Down Expand Up @@ -1453,3 +1446,26 @@ void Server::AdjustOpenFilesLimit() {
<< " (it's originally set to " << old_limit << ")";
}
}

std::string ServerLogData::Encode() {
if (type_ == kReplIdLog) {
return std::string(1, kReplIdTag) + " " + content_;
} else {
return content_;
}
}

Status ServerLogData::Decode(const rocksdb::Slice &blob) {
if (blob.size() == 0) {
return Status(Status::NotOK);
}

const char *header = blob.data();
// Only support `kReplIdTag` now
if (*header == kReplIdTag && blob.size() == 2 + kReplIdLength) {
type_ = kReplIdLog;
content_ = std::string(blob.data()+2, blob.size()-2);
return Status::OK();
}
return Status(Status::NotOK);
}
30 changes: 29 additions & 1 deletion src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,35 @@ enum ClientType {
kTypeSlave = (1ULL<<3), // slave client
};

enum ServerLogType {
kServerLogNone,
kReplIdLog
};

class ServerLogData {
public:
// Redis::WriteBatchLogData always starts with digist ascii, we use alphabetic to
// distinguish ServerLogData with Redis::WriteBatchLogData.
static const char kReplIdTag = 'r';
static bool IsServerLogData(const char *header) {
if (header != NULL) return *header == kReplIdTag;
return false;
}

ServerLogData() = default;
explicit ServerLogData(ServerLogType type, const std::string &content) :
type_(type), content_(content) {}

ServerLogType GetType() { return type_; }
std::string GetContent() { return content_; }
std::string Encode();
Status Decode(const rocksdb::Slice &blob);

private:
ServerLogType type_ = kServerLogNone;
std::string content_;
};

class Server {
public:
explicit Server(Engine::Storage *storage, Config *config);
Expand Down Expand Up @@ -155,7 +184,6 @@ class Server {
void ScriptReset();
void ScriptFlush();

Status WriteToPropagateCF(const std::string &key, const std::string &value) const;
Status Propagate(const std::string &channel, const std::vector<std::string> &tokens);
Status ExecPropagatedCommand(const std::vector<std::string> &tokens);
Status ExecPropagateScriptCommand(const std::vector<std::string> &tokens);
Expand Down
Loading

0 comments on commit 9ecf5c7

Please sign in to comment.