Skip to content

Commit

Permalink
Fix server cannot exit properly when enabling cluster mode (#969)
Browse files Browse the repository at this point in the history
  • Loading branch information
git-hulk authored Oct 11, 2022
1 parent d501248 commit d33ef02
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 37 deletions.
79 changes: 48 additions & 31 deletions src/slot_migrate.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,11 @@ SlotMigrate::SlotMigrate(Server *svr, int speed, int pipeline_size, int seq_gap)
// because metadata_cf_handle_ and db_ will be destroyed if DB is reopened.
// [Situation]:
// 1. Start an empty slave server.
// 2. Connect to master which has amount of data, and trigger full sychronization.
// 2. Connect to master which has amounted of data, and trigger full synchronization.
// 3. After replication, change slave to master and start slot migrate.
// 4. It will occur segment fault when using metadata_cf_handle_ to create iterator of rocksdb.
// [Reason]:
// After full sychronization, DB will be reopened, db_ and metadata_cf_handle_ will be released.
// After full synchronization, DB will be reopened, db_ and metadata_cf_handle_ will be released.
// Then, if we create rocksdb iterator with metadata_cf_handle_, it will go wrong.
// [Solution]:
// db_ and metadata_cf_handle_ will be replaced by storage_->GetDB() and storage_->GetCFHandle("metadata")
Expand All @@ -75,7 +75,7 @@ SlotMigrate::SlotMigrate(Server *svr, int speed, int pipeline_size, int seq_gap)
migrate_slot_ = -1;
migrate_failed_slot_ = -1;
migrate_state_ = kMigrateNone;
stop_ = false;
stop_migrate_ = false;
slot_snapshot_ = nullptr;

if (svr->IsSlave()) {
Expand Down Expand Up @@ -121,10 +121,20 @@ Status SlotMigrate::MigrateStart(Server *svr, const std::string &node_id, const
return Status::OK();
}

SlotMigrate::~SlotMigrate() {
if (thread_state_ == ThreadState::Running) {
stop_migrate_ = true;
thread_state_ = ThreadState::Terminated;
job_cv_.notify_all();
t_.join();
}
}

Status SlotMigrate::CreateMigrateHandleThread(void) {
try {
t_ = std::thread([this]() {
Util::ThreadSetName("slot-migrate");
thread_state_ = ThreadState::Running;
this->Loop(static_cast<void*>(this));
});
} catch(const std::exception &e) {
Expand All @@ -136,11 +146,15 @@ Status SlotMigrate::CreateMigrateHandleThread(void) {
void *SlotMigrate::Loop(void *arg) {
while (true) {
std::unique_lock<std::mutex> ul(this->job_mutex_);
while (this->slot_job_ == nullptr) {
while (!IsTerminated() && this->slot_job_ == nullptr) {
this->job_cv_.wait(ul);
}
ul.unlock();

if (IsTerminated()) {
return nullptr;
}

LOG(INFO) << "[migrate] migrate_slot: " << slot_job_->migrate_slot_
<< ", dst_ip: " << slot_job_->dst_ip_
<< ", dst_port: " << slot_job_->dst_port_
Expand All @@ -155,12 +169,16 @@ void *SlotMigrate::Loop(void *arg) {

StateMachine();
}
return nullptr;
}

void SlotMigrate::StateMachine(void) {
state_machine_ = kSlotMigrateStart;
while (true) {
if (IsTerminated()) {
LOG(WARNING) << "[migrate] Will stop state machine, because the thread was terminated";
return;
}

switch (state_machine_) {
case kSlotMigrateStart: {
auto s = Start();
Expand Down Expand Up @@ -281,12 +299,12 @@ Status SlotMigrate::SendSnapshot(void) {
ComposeSlotKeyPrefix(namespace_, slot, &prefix);
LOG(INFO) << "[migrate] Iterate keys of slot, key's prefix: " << prefix;

// Seek to the begining of keys start with 'prefix' and iterate all these keys
// Seek to the beginning of keys start with 'prefix' and iterate all these keys
for (iter->Seek(prefix); iter->Valid(); iter->Next()) {
// The migrating task has to be stopped, if server role is changed from master to slave
// or flush command (flushdb or flushall) is executed
if (stop_) {
LOG(ERROR) << "[migrate] Stop migrating snapshot due to the thread stoped";
if (stop_migrate_) {
LOG(ERROR) << "[migrate] Stop migrating snapshot due to the thread stopped";
return Status(Status::NotOK);
}

Expand Down Expand Up @@ -346,7 +364,7 @@ Status SlotMigrate::SyncWal(void) {
}

Status SlotMigrate::Success(void) {
if (stop_) {
if (stop_migrate_) {
LOG(ERROR) << "[migrate] Stop migrating slot " << migrate_slot_;
return Status(Status::NotOK);
}
Expand All @@ -370,7 +388,7 @@ Status SlotMigrate::Fail(void) {
if (!SetDstImportStatus(slot_job_->slot_fd_, kImportFailed)) {
LOG(INFO) << "[migrate] Failed to notify the destination that data migration failed";
}
// Stop slot forbiding writing
// Stop slot will forbid writing
migrate_failed_slot_ = migrate_slot_;
forbidden_slot_ = -1;
return Status::OK();
Expand Down Expand Up @@ -443,7 +461,7 @@ bool SlotMigrate::CheckResponseOnce(int sock_fd) {
// ltrim Redis::SimpleString -Err\r\n
// linsert Redis::Integer
// lset Redis::SimpleString
// hdel Redis::Intege
// hdel Redis::Integer
// srem Redis::Integer
// zrem Redis::Integer
// lpop Redis::NilString $-1\r\n
Expand All @@ -460,7 +478,7 @@ bool SlotMigrate::CheckResponseWithCounts(int sock_fd, int total) {
return false;
}

// Set socket recieve timeout first
// Set socket receive timeout first
struct timeval tv;
tv.tv_sec = 1;
tv.tv_usec = 0;
Expand All @@ -472,7 +490,7 @@ bool SlotMigrate::CheckResponseWithCounts(int sock_fd, int total) {
stat_ = ArrayLen;
UniqueEvbuf evbuf;
while (true) {
// Read response data from socket buffer to event buffer
// Read response data from socket buffer to the event buffer
if (evbuffer_read(evbuf.get(), sock_fd, -1) <= 0) {
LOG(ERROR) << "[migrate] Failed to read response, Err: " + std::string(strerror(errno));
return false;
Expand Down Expand Up @@ -541,7 +559,6 @@ bool SlotMigrate::CheckResponseWithCounts(int sock_fd, int total) {
}
}
}
return true; // Can't reach here
}

Status SlotMigrate::MigrateOneKey(const rocksdb::Slice &key, const rocksdb::Slice &value, std::string *restore_cmds) {
Expand Down Expand Up @@ -621,9 +638,9 @@ bool SlotMigrate::MigrateComplexKey(const rocksdb::Slice &key, const Metadata &m
std::string slot_key, prefix_subkey;
AppendNamespacePrefix(key, &slot_key);
InternalKey(slot_key, "", metadata.version, true).Encode(&prefix_subkey);
int itermscount = 0;
int item_count = 0;
for (iter->Seek(prefix_subkey); iter->Valid(); iter->Next()) {
if (stop_) {
if (stop_migrate_) {
LOG(ERROR) << "[migrate] Stop migrating complex key due to task stopped";
return false;
}
Expand All @@ -633,7 +650,7 @@ bool SlotMigrate::MigrateComplexKey(const rocksdb::Slice &key, const Metadata &m
}

// Parse values of the complex key
// InternalKey is adopt to get compex key's value
// InternalKey is adopted to get complex key's value
// from the formatted key return by iterator of rocksdb
InternalKey inkey(iter->key(), true);
switch (metadata.Type()) {
Expand Down Expand Up @@ -669,18 +686,18 @@ bool SlotMigrate::MigrateComplexKey(const rocksdb::Slice &key, const Metadata &m
break;
}

// Check iterms count
// Check item count
// Exclude bitmap because it does not have hmset-like command
if (metadata.Type() != kRedisBitmap) {
itermscount++;
if (itermscount >= kMaxItemsInCommand) {
item_count++;
if (item_count >= kMaxItemsInCommand) {
*restore_cmds += Redis::MultiBulkString(user_cmd, false);
current_pipeline_size_++;
itermscount = 0;
// Have to clear saved iterms
item_count = 0;
// Have to clear saved items
user_cmd.erase(user_cmd.begin() + 2, user_cmd.end());

// Maybe key has amout of elements, have to check pipeline here
// Maybe key has amounted of elements, have to check pipeline here
if (!SendCmdsPipelineIfNeed(restore_cmds, false)) {
LOG(INFO) << "[migrate] Failed to send complex key part";
return false;
Expand All @@ -689,8 +706,8 @@ bool SlotMigrate::MigrateComplexKey(const rocksdb::Slice &key, const Metadata &m
}
}

// Have to check the iterm count of the last command list
if (itermscount % kMaxItemsInCommand) {
// Have to check the item count of the last command list
if (item_count % kMaxItemsInCommand) {
*restore_cmds += Redis::MultiBulkString(user_cmd, false);
current_pipeline_size_++;
}
Expand Down Expand Up @@ -749,7 +766,7 @@ bool SlotMigrate::MigrateBitmapKey(const InternalKey &inkey,

bool SlotMigrate::SendCmdsPipelineIfNeed(std::string *commands, bool need) {
// Stop migrating or not
if (stop_) {
if (stop_migrate_) {
LOG(ERROR) << "[migrate] Stop sending data due to migrating thread stopped"
<< ", current migrating slot: " << migrate_slot_;
return false;
Expand Down Expand Up @@ -782,7 +799,7 @@ bool SlotMigrate::SendCmdsPipelineIfNeed(std::string *commands, bool need) {
return false;
}

// Clear commands and currentpipeline
// Clear commands and running pipeline
commands->clear();
current_pipeline_size_ = 0;
return true;
Expand Down Expand Up @@ -843,21 +860,21 @@ Status SlotMigrate::MigrateIncrementData(std::unique_ptr<rocksdb::TransactionLog
std::string commands;
commands.clear();
while (true) {
if (stop_) {
if (stop_migrate_) {
LOG(ERROR) << "[migrate] Migration task end during migrating WAL data";
return Status(Status::NotOK);
}
auto batch = (*iter)->GetBatch();
if (batch.sequence != next_seq) {
LOG(ERROR) << "[migrate] WAL iterator is discrete, some seq might be lost"
<< ", expectd sequence: " << next_seq << ", but got sequence: " << batch.sequence;
<< ", expected sequence: " << next_seq << ", but got sequence: " << batch.sequence;
return Status(Status::NotOK);
}

// Generate commands by iterating write bacth
// Generate commands by iterating write batch
auto s = GenerateCmdsFromBatch(&batch, &commands);
if (!s.IsOK()) {
LOG(ERROR) << "[migrate] Failed to generate commands from wirte batch";
LOG(ERROR) << "[migrate] Failed to generate commands from write batch";
return Status(Status::NotOK);
}

Expand Down
14 changes: 11 additions & 3 deletions src/slot_migrate.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class SlotMigrate : public Redis::Database {
public:
explicit SlotMigrate(Server *svr, int speed = kMigrateSpeed,
int pipeline_size = kPipelineSize, int seq_gap = kSeqGapLimit);
~SlotMigrate() {}
~SlotMigrate();

Status CreateMigrateHandleThread(void);
void *Loop(void *arg);
Expand All @@ -93,12 +93,13 @@ class SlotMigrate : public Redis::Database {
void SetMigrateSpeedLimit(int speed) { if (speed >= 0) migrate_speed_ = speed; }
void SetPipelineSize(uint32_t size) { if (size > 0) pipeline_size_limit_ = size; }
void SetSequenceGapSize(int size) { if (size > 0) seq_gap_limit_ = size; }
void SetMigrateStopFlag(bool state) { stop_ = state; }
void SetMigrateStopFlag(bool state) { stop_migrate_ = state; }
int16_t GetMigrateState() { return migrate_state_; }
int16_t GetMigrateStateMachine() { return state_machine_; }
int16_t GetForbiddenSlot(void) { return forbidden_slot_; }
int16_t GetMigratingSlot(void) { return migrate_slot_; }
void GetMigrateInfo(std::string *info);
bool IsTerminated() { return thread_state_ == ThreadState::Terminated; }

private:
void StateMachine(void);
Expand Down Expand Up @@ -145,6 +146,13 @@ class SlotMigrate : public Redis::Database {
};
ParserState stat_ = ArrayLen;

enum class ThreadState {
Uninitialized,
Running,
Terminated
};
ThreadState thread_state_ = ThreadState::Uninitialized;

static const size_t kProtoInlineMaxSize = 16 * 1024L;
static const size_t kProtoBulkMaxSize = 512 * 1024L * 1024L;
static const int kMaxNotifyRetryTimes = 3;
Expand All @@ -169,7 +177,7 @@ class SlotMigrate : public Redis::Database {
std::atomic<int16_t> migrate_slot_;
int16_t migrate_failed_slot_;
std::atomic<MigrateTaskState> migrate_state_;
std::atomic<bool> stop_;
std::atomic<bool> stop_migrate_; // stop_migrate_ is true will stop migrate but the migration thread won't destroy.
std::string current_migrate_key_;
uint64_t slot_snapshot_time_;
const rocksdb::Snapshot *slot_snapshot_;
Expand Down
4 changes: 1 addition & 3 deletions tests/gocase/util/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,7 @@ func (s *KvrocksServer) Close() {

func (s *KvrocksServer) close(keepDir bool) {
require.NoError(s.t, s.cmd.Process.Signal(syscall.SIGTERM))
// TODO: activate require after server issue resolved
// https://github.com/apache/incubator-kvrocks/issues/946#issuecomment-1272445443
f := func(err error) { /* require.NoError(s.t, err) */ }
f := func(err error) { require.NoError(s.t, err) }
timer := time.AfterFunc(defaultGracePeriod, func() {
require.NoError(s.t, s.cmd.Process.Kill())
f = func(err error) { require.EqualError(s.t, err, "signal: killed") }
Expand Down

0 comments on commit d33ef02

Please sign in to comment.