Skip to content
This repository has been archived by the owner on Dec 1, 2022. It is now read-only.

Commit

Permalink
fixed pursueLeaderDone
Browse files Browse the repository at this point in the history
  • Loading branch information
bright-starry-sky committed Jun 22, 2021
1 parent 4677c6f commit db8ccd5
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 3 deletions.
18 changes: 16 additions & 2 deletions src/kvstore/Listener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,9 @@ void Listener::doApply() {
persist(committedLogId_, term_, lastApplyLogId_);
VLOG(1) << idStr_ << "Listener succeeded apply log to " << lastApplyLogId_;
lastApplyTime_ = time::WallClock::fastNowInMilliSec();
VLOG(1) << folly::sformat("Commit snapshot to : committedLogId={},"
"committedLogTerm={}, lastApplyLogId_={}",
committedLogId_, term_, lastApplyLogId_);
}
});
}
Expand Down Expand Up @@ -244,14 +247,17 @@ std::pair<int64_t, int64_t> Listener::commitSnapshot(const std::vector<std::stri
persist(committedLogId, committedLogTerm, lastApplyLogId_);
LOG(INFO) << idStr_ << "Listener succeeded apply log to " << lastApplyLogId_;
lastApplyTime_ = time::WallClock::fastNowInMilliSec();
VLOG(3) << folly::sformat("Commit snapshot to : committedLogId={},"
"committedLogTerm={}, lastApplyLogId_={}",
committedLogId, committedLogTerm, lastApplyLogId_);
}
return std::make_pair(count, size);
}

void Listener::resetListener() {
std::lock_guard<std::mutex> g(raftLock_);
reset();
VLOG(3) << folly::sformat("The listener has been reset : leaderCommitId={},"
VLOG(1) << folly::sformat("The listener has been reset : leaderCommitId={},"
"proposedTerm={}, lastLogTerm={}, term={},"
"lastApplyLogId={}",
leaderCommitId_, proposedTerm_, lastLogTerm_,
Expand All @@ -260,9 +266,17 @@ void Listener::resetListener() {

bool Listener::pursueLeaderDone() {
std::lock_guard<std::mutex> g(raftLock_);
if (status_ == Status::STARTING || status_ == Status::WAITING_SNAPSHOT) {
if (status_ != Status::RUNNING) {
return false;
}
// if the leaderCommitId_ and lastApplyLogId_ all are 0. means the snapshot gap.
if (leaderCommitId_ == 0 && lastApplyLogId_ == 0) {
return false;
}
// VLOG(1) << folly::sformat("pursue leader : leaderCommitId={}, lastApplyLogId_={}",
// leaderCommitId_, lastApplyLogId_);
LOG(INFO) << folly::sformat("pursue leader : leaderCommitId={}, lastApplyLogId_={}",
leaderCommitId_, lastApplyLogId_);
return (leaderCommitId_ - lastApplyLogId_) <= FLAGS_listener_pursue_leader_threshold;
}
} // namespace kvstore
Expand Down
7 changes: 6 additions & 1 deletion src/storage/admin/RebuildFTIndexTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ RebuildFTIndexTask::genSubTasks() {
if (!listener) {
return nebula::cpp2::ErrorCode::E_LISTENER_NOT_FOUND;
}
if (!listener->isRunning()) {
LOG(ERROR) << "listener not ready, may be starting or waiting snapshot";
// TODO : add ErrorCode for listener not ready.
return nebula::cpp2::ErrorCode::E_LISTENER_NOT_FOUND;
}
VLOG(3) << folly::sformat("Processing fulltext rebuild subtask, space={}, part={}",
*ctx_.parameters_.space_id_ref(), part);
std::function<nebula::cpp2::ErrorCode()> task =
Expand All @@ -58,7 +63,7 @@ RebuildFTIndexTask::taskByPart(nebula::kvstore::Listener* listener) {
if (listener->pursueLeaderDone()) {
return nebula::cpp2::ErrorCode::SUCCEEDED;
}
VLOG(3) << folly::sformat(
VLOG(1) << folly::sformat(
"Processing fulltext rebuild subtask, part={}, rebuild_log={}",
part, listener->getApplyId());
}
Expand Down

0 comments on commit db8ccd5

Please sign in to comment.