Skip to content

Commit

Permalink
test
Browse files Browse the repository at this point in the history
  • Loading branch information
Yifei Zhang committed Oct 12, 2024
1 parent d4079f3 commit 36e1436
Showing 1 changed file with 45 additions and 21 deletions.
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
package org.jgroups.tests.election;

import org.jgroups.Address;
import org.jgroups.Global;
import org.jgroups.Header;
import org.jgroups.JChannel;
import org.jgroups.Message;
import org.jgroups.View;
import org.jgroups.*;
import org.jgroups.protocols.SHARED_LOOPBACK;
import org.jgroups.protocols.pbcast.GMS;
import org.jgroups.protocols.raft.RAFT;
import org.jgroups.protocols.raft.election.BaseElection;
import org.jgroups.protocols.raft.election.VoteResponse;
import org.jgroups.stack.Protocol;
import org.jgroups.tests.harness.AbstractRaftTest;
import org.jgroups.tests.harness.BaseRaftElectionTest;
import org.jgroups.util.Util;

Expand All @@ -20,9 +17,9 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;
import java.util.stream.IntStream;

import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import static java.util.Arrays.stream;
Expand All @@ -41,11 +38,14 @@
public class NetworkPartitionChannelTest extends BaseRaftElectionTest.ChannelBased {
private final int[] indexes;
private volatile Semaphore newTerm;
private final AtomicInteger slowVoteResponses = new AtomicInteger();

{
clusterSize = 5;
indexes = IntStream.range(0, clusterSize).toArray();
recreatePerMethod = true;
System.setProperty(AbstractRaftTest.ENABLE_TRACE_CLASSES,
"org.jgroups.protocols.raft.ELECTION,org.jgroups.protocols.raft.ELECTION2");
}

public void electionAfterMerge(Class<?> ignore) throws Exception {
Expand Down Expand Up @@ -79,27 +79,28 @@ public void electionAfterMerge(Class<?> ignore) throws Exception {

merge(leader, coord);

int finalLeader = leader;
assertThat(eventually(() -> coordIndex(finalLeader) == coord && coordIndex(coord) == coord, 10, TimeUnit.SECONDS))
.isTrue();
System.out.println("after merge: " + view(coord));

// since the term is not advanced yet, new coordinator has accepted the existing leader, and stopping the
// voting thread, but voting thread is just interrupted, it's still running, the term will be advanced anyway,
// and the VoteRequest will be sent, if the voting process goes wrong, e.g. waiting response timeout then it
// won't retry the voting process since the thread has been interrupted.
// voting runner, but voting thread is just interrupted, because the voting process almost uninterruptible,
// so it's still running, the term will be advanced anyway, and the VoteRequest will be sent, if the voting
// process goes wrong, e.g. waiting response timeout then it won't retry the voting process since the runner
// has been stopped and the thread has been interrupted.
waitUntilLeaderElected(3000, indexes);
System.out.println(dumpLeaderAndTerms());

// try to make waiting VoteResponse timeout
election(channel(coord)).voteTimeout(1);
// slow down the responses, coordinator won't get majority vote responses after waiting timeout
slowVoteResponses.set(3);

// unblock the voting thread
newTerm.release();
newTerm = null;

int finalLeader = leader;
assertThat(eventually(() -> coordIndex(finalLeader) == coord && coordIndex(coord) == coord, 10, TimeUnit.SECONDS))
.isTrue();
System.out.println("after merge: " + view(coord));

// ELECTION may be timeout, ELECTION2 always pass.
waitUntilLeaderElected(5000, indexes);
waitUntilLeaderElected(3000, indexes);
System.out.println(dumpLeaderAndTerms());
}

Expand All @@ -115,6 +116,23 @@ public long createNewTerm() {
};
}

@Override
protected Protocol[] baseProtocolStackForNode(String name) throws Exception {
Protocol[] protocols = super.baseProtocolStackForNode(name);
protocols[0] = new SHARED_LOOPBACK() {
@Override
public Object down(Message msg) {
if (!addr().equals(msg.dest())) {
Header h = msg.getHeader((short) 520);
if (h == null) h = msg.getHeader((short) 524);
if (h instanceof VoteResponse && slowVoteResponses.getAndDecrement() > 0) park(1000);
}
return super.down(msg);
}
};
return protocols;
}

private void partition(int[]... partitions) throws TimeoutException {
List<List<JChannel>> parts = stream(partitions).map(t -> stream(t).mapToObj(this::channel).collect(toList()))
.collect(toList());
Expand All @@ -136,8 +154,7 @@ private void merge(int... coordinators) throws TimeoutException {
}

private View view(int index) {
GMS gms = channel(index).stack().findProtocol(GMS.class);
return gms.view();
return channel(index).stack().<GMS>findProtocol(GMS.class).view();
}

private int coordIndex(int index) {
Expand All @@ -147,4 +164,11 @@ private int coordIndex(int index) {
private int index(Address addr) {
return stream(indexes).filter(t -> channel(t).address().equals(addr)).findAny().getAsInt();
}

private void park(int ms) {
long deadline = System.nanoTime() + ms * 1000_000L;
do {
LockSupport.parkUntil(deadline);
} while (System.nanoTime() < deadline);
}
}

0 comments on commit 36e1436

Please sign in to comment.