Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Testing RATIS-2162 #1154 over release-3.1.1. #1157

Open
wants to merge 1 commit into
base: release-3.1.1
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,9 @@ public class ConfigurationManager {
* The current raft configuration. If configurations is not empty, should be
* the last entry of the map. Otherwise is initialConf.
*/
@SuppressWarnings({"squid:S3077"}) // Suppress volatile for generic type
private volatile RaftConfigurationImpl currentConf;
private RaftConfigurationImpl currentConf;
/** Cache the peer corresponding to {@link #id}. */
@SuppressWarnings({"squid:S3077"}) // Suppress volatile for generic type
private volatile RaftPeer currentPeer;
private RaftPeer currentPeer;

ConfigurationManager(RaftPeerId id, RaftConfigurationImpl initialConf) {
this.id = id;
Expand Down Expand Up @@ -78,11 +76,11 @@ private void addRaftConfigurationImpl(long logIndex, RaftConfigurationImpl conf)
}
}

RaftConfigurationImpl getCurrent() {
synchronized RaftConfigurationImpl getCurrent() {
return currentConf;
}

RaftPeer getCurrentPeer() {
synchronized RaftPeer getCurrentPeer() {
return currentPeer;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.ToIntFunction;
Expand Down Expand Up @@ -62,6 +63,7 @@ int update(AtomicInteger outstanding) {
@SuppressWarnings({"squid:S3077"}) // Suppress volatile for generic type
private volatile Timestamp lastRpcTime = creationTime;
private volatile boolean isRunning = true;
private final CompletableFuture<Void> stopped = new CompletableFuture<>();
private final AtomicInteger outstandingOp = new AtomicInteger();

FollowerState(RaftServerImpl server, Object reason) {
Expand Down Expand Up @@ -93,8 +95,10 @@ boolean isCurrentLeaderValid() {
return lastRpcTime.elapsedTime().compareTo(server.properties().minRpcTimeout()) < 0;
}

void stopRunning() {
CompletableFuture<Void> stopRunning() {
this.isRunning = false;
interrupt();
return stopped;
}

boolean lostMajorityHeartbeatsRecently() {
Expand Down Expand Up @@ -122,6 +126,14 @@ private boolean shouldRun() {

@Override
public void run() {
try {
runImpl();
} finally {
stopped.complete(null);
}
}

private void runImpl() {
final TimeDuration sleepDeviationThreshold = server.getSleepDeviationThreshold();
while (shouldRun()) {
final TimeDuration electionTimeout = server.getRandomElectionTimeout();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -183,6 +184,7 @@ public String toString() {
private final String name;
private final LifeCycle lifeCycle;
private final Daemon daemon;
private final CompletableFuture<Void> stopped = new CompletableFuture<>();

private final RaftServerImpl server;
private final boolean skipPreVote;
Expand Down Expand Up @@ -223,8 +225,10 @@ private void startIfNew(Runnable starter) {
}
}

void shutdown() {
CompletableFuture<Void> shutdown() {
lifeCycle.checkStateAndClose();
stopped.complete(null);
return stopped;
}

@VisibleForTesting
Expand All @@ -234,6 +238,14 @@ LifeCycle.State getCurrentState() {

@Override
public void run() {
try {
runImpl();
} finally {
stopped.complete(null);
}
}

private void runImpl() {
if (!lifeCycle.compareAndTransition(STARTING, RUNNING)) {
final LifeCycle.State state = lifeCycle.getCurrentState();
LOG.info("{}: skip running since this is already {}", this, state);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,11 +223,8 @@ boolean removeAll(Collection<LogAppender> c) {
}

CompletableFuture<Void> stopAll() {
final CompletableFuture<?>[] futures = new CompletableFuture<?>[senders.size()];
for(int i = 0; i < futures.length; i++) {
futures[i] = senders.get(i).stopAsync();
}
return CompletableFuture.allOf(futures);
return CompletableFuture.allOf(senders.stream().
map(LogAppender::stopAsync).toArray(CompletableFuture[]::new));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -565,20 +566,23 @@ void setFirstElection(Object reason) {
* @param force Force to start a new {@link FollowerState} even if this server is already a follower.
* @return if the term/votedFor should be updated to the new term
*/
private synchronized boolean changeToFollower(
long newTerm,
boolean force,
boolean allowListener,
Object reason) {
private boolean changeToFollower(long newTerm, boolean force, boolean allowListener, Object reason) {
final AtomicReference<Boolean> metadataUpdated = new AtomicReference<>();
changeToFollowerAsync(newTerm, force, allowListener, reason, metadataUpdated).join();
return metadataUpdated.get();
}

private synchronized CompletableFuture<Void> changeToFollowerAsync(
long newTerm, boolean force, boolean allowListener, Object reason, AtomicReference<Boolean> metadataUpdated) {
final RaftPeerRole old = role.getCurrentRole();
if (old == RaftPeerRole.LISTENER && !allowListener) {
throw new IllegalStateException("Unexpected role " + old);
}
boolean metadataUpdated;
CompletableFuture<Void> future = CompletableFuture.completedFuture(null);
if ((old != RaftPeerRole.FOLLOWER || force) && old != RaftPeerRole.LISTENER) {
setRole(RaftPeerRole.FOLLOWER, reason);
if (old == RaftPeerRole.LEADER) {
role.shutdownLeaderState(false)
future = role.shutdownLeaderState(false)
.exceptionally(e -> {
if (e != null) {
if (!getInfo().isAlive()) {
Expand All @@ -587,21 +591,21 @@ private synchronized boolean changeToFollower(
}
}
throw new CompletionException("Failed to shutdownLeaderState: " + this, e);
})
.join();
});
state.setLeader(null, reason);
} else if (old == RaftPeerRole.CANDIDATE) {
role.shutdownLeaderElection();
future = role.shutdownLeaderElection();
} else if (old == RaftPeerRole.FOLLOWER) {
role.shutdownFollowerState();
future = role.shutdownFollowerState();
}
metadataUpdated = state.updateCurrentTerm(newTerm);

metadataUpdated.set(state.updateCurrentTerm(newTerm));
role.startFollowerState(this, reason);
setFirstElection(reason);
} else {
metadataUpdated = state.updateCurrentTerm(newTerm);
metadataUpdated.set(state.updateCurrentTerm(newTerm));
}
return metadataUpdated;
return future;
}

synchronized void changeToFollowerAndPersistMetadata(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class RoleInfo {
public static final Logger LOG = LoggerFactory.getLogger(RoleInfo.class);

private final RaftPeerId id;
private volatile RaftPeerRole role;
private final AtomicReference<RaftPeerRole> role = new AtomicReference<>();
/** Used when the peer is leader */
private final AtomicReference<LeaderStateImpl> leaderState = new AtomicReference<>();
/** Used when the peer is follower, to monitor election timeout */
Expand All @@ -64,7 +64,7 @@ class RoleInfo {
}

void transitionRole(RaftPeerRole newRole) {
this.role = newRole;
this.role.set(newRole);
this.transitionTime.set(Timestamp.currentTime());
}

Expand All @@ -73,7 +73,7 @@ long getRoleElapsedTimeMs() {
}

RaftPeerRole getCurrentRole() {
return role;
return role.get();
}

boolean isLeaderReady() {
Expand Down Expand Up @@ -113,13 +113,13 @@ void startFollowerState(RaftServerImpl server, Object reason) {
updateAndGet(followerState, new FollowerState(server, reason)).start();
}

void shutdownFollowerState() {
CompletableFuture<Void> shutdownFollowerState() {
final FollowerState follower = followerState.getAndSet(null);
if (follower != null) {
LOG.info("{}: shutdown {}", id, follower);
follower.stopRunning();
follower.interrupt();
if (follower == null) {
return CompletableFuture.completedFuture(null);
}
LOG.info("{}: shutdown {}", id, follower);
return follower.stopRunning();
}

void startLeaderElection(RaftServerImpl server, boolean force) {
Expand All @@ -133,13 +133,13 @@ void setLeaderElectionPause(boolean pause) {
pauseLeaderElection.set(pause);
}

void shutdownLeaderElection() {
CompletableFuture<Void> shutdownLeaderElection() {
final LeaderElection election = leaderElection.getAndSet(null);
if (election != null) {
LOG.info("{}: shutdown {}", id, election);
election.shutdown();
// no need to interrupt the election thread
if (election == null) {
return CompletableFuture.completedFuture(null);
}
LOG.info("{}: shutdown {}", id, election);
return election.shutdown();
}

private <T> T updateAndGet(AtomicReference<T> ref, T current) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ public void start() {

@Override
public boolean isRunning() {
return daemon.isWorking();
return daemon.isWorking() && server.getInfo().isLeader();
}

@Override
Expand Down
Loading