Skip to content

Commit

Permalink
style: optmize or simplify some code.
Browse files Browse the repository at this point in the history
  • Loading branch information
supervate committed Jan 17, 2024
1 parent d34f270 commit c362df1
Show file tree
Hide file tree
Showing 11 changed files with 34 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ public void setReadOnlyDataStoreDirs(String readOnlyDataStoreDirs) {

private String selfAddress;

// groupId#selfIf -> address
// groupId#selfId -> address
private Map<String, String> peerAddressMap;

private final AtomicBoolean inited = new AtomicBoolean(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import io.openmessaging.storage.dledger.protocol.DLedgerResponseCode;
import io.openmessaging.storage.dledger.protocol.GetEntriesResponse;
import io.openmessaging.storage.dledger.util.FileTestUtil;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import org.awaitility.core.AssertionCondition;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
Expand All @@ -59,7 +58,7 @@ public void testPushCommittedIndex() throws Exception {
appendEntryRequest.setRemoteId(dLedgerServer0.getMemberState().getSelfId());
appendEntryRequest.setBody(new byte[256]);
CompletableFuture<AppendEntryResponse> future = dLedgerServer0.handleAppend(appendEntryRequest);
Assertions.assertTrue(future instanceof AppendFuture);
Assertions.assertInstanceOf(AppendFuture.class, future);
futures.add(future);
}
Assertions.assertEquals(9, dLedgerServer0.getDLedgerStore().getLedgerEndIndex());
Expand All @@ -86,13 +85,12 @@ public void testPushNetworkOffline() throws Exception {
appendEntryRequest.setRemoteId(dLedgerServer0.getMemberState().getSelfId());
appendEntryRequest.setBody(new byte[128]);
CompletableFuture<AppendEntryResponse> future = dLedgerServer0.handleAppend(appendEntryRequest);
Assertions.assertTrue(future instanceof AppendFuture);
Assertions.assertInstanceOf(AppendFuture.class, future);
futures.add(future);
}
Assertions.assertEquals(9, dLedgerServer0.getDLedgerStore().getLedgerEndIndex());
Thread.sleep(dLedgerServer0.getDLedgerConfig().getMaxWaitAckTimeMs() + 1000);
for (int i = 0; i < futures.size(); i++) {
CompletableFuture<AppendEntryResponse> future = futures.get(i);
for (CompletableFuture<AppendEntryResponse> future : futures) {
Assertions.assertTrue(future.isDone());
Assertions.assertEquals(DLedgerResponseCode.WAIT_QUORUM_ACK_TIMEOUT.getCode(), future.get().getCode());
}
Expand All @@ -104,7 +102,7 @@ public void testPushNetworkOffline() throws Exception {
appendEntryRequest.setRemoteId(dLedgerServer0.getMemberState().getSelfId());
appendEntryRequest.setBody(new byte[128]);
CompletableFuture<AppendEntryResponse> future = dLedgerServer0.handleAppend(appendEntryRequest);
Assertions.assertTrue(future instanceof AppendFuture);
Assertions.assertInstanceOf(AppendFuture.class, future);
if (future.isDone()) {
Assertions.assertEquals(DLedgerResponseCode.LEADER_PENDING_FULL.getCode(), future.get().getCode());
hasWait = true;
Expand All @@ -126,12 +124,12 @@ public void testPushNetworkNotStable() throws Exception {
appendEntryRequest.setRemoteId(dLedgerServer0.getMemberState().getSelfId());
appendEntryRequest.setBody(new byte[128]);
CompletableFuture<AppendEntryResponse> future = dLedgerServer0.handleAppend(appendEntryRequest);
Assertions.assertTrue(future instanceof AppendFuture);
Assertions.assertInstanceOf(AppendFuture.class, future);
future.whenComplete((x, ex) -> {
sendSuccess.set(true);
});
Thread.sleep(500);
Assertions.assertTrue(!sendSuccess.get());
Assertions.assertFalse(sendSuccess.get());
//start server1
DLedgerServer dLedgerServer1 = launchServer(group, peers, "n1", "n0", DLedgerConfig.FILE);
Thread.sleep(1500);
Expand All @@ -140,12 +138,12 @@ public void testPushNetworkNotStable() throws Exception {
dLedgerServer1.shutdown();
sendSuccess.set(false);
future = dLedgerServer0.handleAppend(appendEntryRequest);
Assertions.assertTrue(future instanceof AppendFuture);
Assertions.assertInstanceOf(AppendFuture.class, future);
future.whenComplete((x, ex) -> {
sendSuccess.set(true);
});
Thread.sleep(500);
Assertions.assertTrue(!sendSuccess.get());
Assertions.assertFalse(sendSuccess.get());
//restart server1
dLedgerServer1 = launchServer(group, peers, "n1", "n0", DLedgerConfig.FILE);
Thread.sleep(1500);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,13 +122,12 @@ public void testBatchPushNetworkOffline() throws Exception {
appendEntryRequest.setRemoteId(dLedgerServer0.getMemberState().getSelfId());
appendEntryRequest.setBody(new byte[128]);
CompletableFuture<AppendEntryResponse> future = dLedgerServer0.handleAppend(appendEntryRequest);
Assertions.assertTrue(future instanceof AppendFuture);
Assertions.assertInstanceOf(AppendFuture.class, future);
futures.add(future);
}
Assertions.assertEquals(9, dLedgerServer0.getDLedgerStore().getLedgerEndIndex());
Thread.sleep(dLedgerServer0.getDLedgerConfig().getMaxWaitAckTimeMs() + 1000);
for (int i = 0; i < futures.size(); i++) {
CompletableFuture<AppendEntryResponse> future = futures.get(i);
for (CompletableFuture<AppendEntryResponse> future : futures) {
Assertions.assertTrue(future.isDone());
Assertions.assertEquals(DLedgerResponseCode.WAIT_QUORUM_ACK_TIMEOUT.getCode(), future.get().getCode());
}
Expand All @@ -140,7 +139,7 @@ public void testBatchPushNetworkOffline() throws Exception {
appendEntryRequest.setRemoteId(dLedgerServer0.getMemberState().getSelfId());
appendEntryRequest.setBody(new byte[128]);
CompletableFuture<AppendEntryResponse> future = dLedgerServer0.handleAppend(appendEntryRequest);
Assertions.assertTrue(future instanceof AppendFuture);
Assertions.assertInstanceOf(AppendFuture.class, future);
if (future.isDone()) {
Assertions.assertEquals(DLedgerResponseCode.LEADER_PENDING_FULL.getCode(), future.get().getCode());
hasWait = true;
Expand All @@ -162,12 +161,12 @@ public void testBatchPushNetworkNotStable() throws Exception {
appendEntryRequest.setRemoteId(dLedgerServer0.getMemberState().getSelfId());
appendEntryRequest.setBody(new byte[128]);
CompletableFuture<AppendEntryResponse> future = dLedgerServer0.handleAppend(appendEntryRequest);
Assertions.assertTrue(future instanceof AppendFuture);
Assertions.assertInstanceOf(AppendFuture.class, future);
future.whenComplete((x, ex) -> {
sendSuccess.set(true);
});
Thread.sleep(500);
Assertions.assertTrue(!sendSuccess.get());
Assertions.assertFalse(sendSuccess.get());
//start server1
DLedgerServer dLedgerServer1 = launchServerEnableBatchPush(group, peers, "n1", "n0", DLedgerConfig.FILE);
Thread.sleep(1500);
Expand All @@ -176,12 +175,12 @@ public void testBatchPushNetworkNotStable() throws Exception {
dLedgerServer1.shutdown();
sendSuccess.set(false);
future = dLedgerServer0.handleAppend(appendEntryRequest);
Assertions.assertTrue(future instanceof AppendFuture);
Assertions.assertInstanceOf(AppendFuture.class, future);
future.whenComplete((x, ex) -> {
sendSuccess.set(true);
});
Thread.sleep(500);
Assertions.assertTrue(!sendSuccess.get());
Assertions.assertFalse(sendSuccess.get());
//restart server1
dLedgerServer1 = launchServerEnableBatchPush(group, peers, "n1", "n0", DLedgerConfig.FILE);
Thread.sleep(1500);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,7 @@ public void testDisableFastAdvanceCommitIndex() throws Exception {
DLedgerServer server0 = launchServer(group, peers, "n0", "n0");
DLedgerServer server1 = launchServer(group, peers, "n1", "n0");
DLedgerServer server2 = launchServer(group, peers, "n2", "n0");
await().atMost(6, TimeUnit.SECONDS).pollInterval(300, TimeUnit.MILLISECONDS).until(() -> {
return server0.isLeader();
});
await().atMost(6, TimeUnit.SECONDS).pollInterval(300, TimeUnit.MILLISECONDS).until(server0::isLeader);
// at beginning, the commit index is -1
Assertions.assertEquals(-1, server0.getMemberState().getCommittedIndex());
Assertions.assertEquals(-1, server1.getMemberState().getCommittedIndex());
Expand Down Expand Up @@ -133,9 +131,7 @@ public void testDisableFastAdvanceCommitIndex() throws Exception {

// now restart n0
DLedgerServer newServer0 = launchServer(group, peers, "n0", "n0");
await().atMost(6, TimeUnit.SECONDS).pollInterval(300, TimeUnit.MILLISECONDS).until(() -> {
return newServer0.isLeader();
});
await().atMost(6, TimeUnit.SECONDS).pollInterval(300, TimeUnit.MILLISECONDS).until(newServer0::isLeader);
Assertions.assertEquals(1, newServer0.getMemberState().getCommittedIndex());
}

Expand All @@ -146,9 +142,7 @@ public void testEnableFastAdvanceCommitIndex() throws Exception {
DLedgerServer server0 = launchServer(group, peers, "n0", "n0", true);
DLedgerServer server1 = launchServer(group, peers, "n1", "n0", true);
DLedgerServer server2 = launchServer(group, peers, "n2", "n0", true);
await().atMost(6, TimeUnit.SECONDS).pollInterval(300, TimeUnit.MILLISECONDS).until(() -> {
return server0.isLeader();
});
await().atMost(6, TimeUnit.SECONDS).pollInterval(300, TimeUnit.MILLISECONDS).until(server0::isLeader);
// at beginning, the commit index is -1
Assertions.assertEquals(-1, server0.getMemberState().getCommittedIndex());
Assertions.assertEquals(-1, server1.getMemberState().getCommittedIndex());
Expand Down Expand Up @@ -232,9 +226,7 @@ public void testEnableFastAdvanceCommitIndex() throws Exception {
// why not 3? because when leader change, n0 hava already advanced the commit index to 2 (commit index is not stale at that time),
// so it is meaningless to append one more no-op entry to advance the commit index
DLedgerServer newServer0 = launchServer(group, peers, "n0", "n0", true);
await().atMost(6, TimeUnit.SECONDS).pollInterval(300, TimeUnit.MILLISECONDS).until(() -> {
return newServer0.isLeader();
});
await().atMost(6, TimeUnit.SECONDS).pollInterval(300, TimeUnit.MILLISECONDS).until(newServer0::isLeader);
Assertions.assertEquals(2, newServer0.getMemberState().getCommittedIndex());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.junit.jupiter.api.Test;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -80,15 +81,7 @@ public void testThreeServer() throws Exception {
Assertions.assertNotNull(leaderServer);

for (int i = 0; i < 10; i++) {
long maxTerm = servers.stream().max((o1, o2) -> {
if (o1.getMemberState().currTerm() < o2.getMemberState().currTerm()) {
return -1;
} else if (o1.getMemberState().currTerm() > o2.getMemberState().currTerm()) {
return 1;
} else {
return 0;
}
}).get().getMemberState().currTerm();
long maxTerm = servers.stream().max(Comparator.comparingLong(o -> o.getMemberState().currTerm())).get().getMemberState().currTerm();
DLedgerServer candidate = servers.get(i % servers.size());
candidate.getDLedgerLeaderElector().testRevote(maxTerm + 1);
Thread.sleep(2000);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,13 @@ protected synchronized DLedgerServer launchServer(String group, String peers, St
}

protected synchronized DLedgerServer launchServer(String group, String peers, String selfId,
String preferredLeaderId, boolean enbaleFastAdvanceCommitIndex) {
String preferredLeaderId, boolean enableFastAdvanceCommitIndex) {
DLedgerConfig config = new DLedgerConfig();
config.setStoreBaseDir(getBaseDir() + File.separator + group);
config.group(group).selfId(selfId).peers(peers);
config.setStoreType(DLedgerConfig.FILE);
config.setPreferredLeaderId(preferredLeaderId);
config.setEnableFastAdvanceCommitIndex(enbaleFastAdvanceCommitIndex);
config.setEnableFastAdvanceCommitIndex(enableFastAdvanceCommitIndex);
DLedgerServer dLedgerServer = new DLedgerServer(config);
dLedgerServer.startup();
bases.add(config.getDefaultPath());
Expand Down Expand Up @@ -85,8 +85,8 @@ protected synchronized DLedgerServer launchServer(String group, String peers, St


protected DLedgerServer launchServerWithStateMachineDisableSnapshot(String group, String peers,
String selfIf, String leaderId, String storeType, int mappedFileSizeForEntryData, StateMachine stateMachine) {
return this.launchServerWithStateMachine(group, peers, selfIf, leaderId, storeType, false, 0,
String selfId, String leaderId, String storeType, int mappedFileSizeForEntryData, StateMachine stateMachine) {
return this.launchServerWithStateMachine(group, peers, selfId, leaderId, storeType, false, 0,
mappedFileSizeForEntryData, stateMachine);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public void done(Status status) {
try {
latch.await();
} catch (InterruptedException e) {
Assertions.assertTrue(false);
Assertions.fail();
}
}

Expand Down Expand Up @@ -110,7 +110,7 @@ public void done(Status status) {
try {
latch.await();
} catch (InterruptedException e) {
Assertions.assertTrue(false);
Assertions.fail();
}
}

Expand Down Expand Up @@ -138,7 +138,7 @@ public void done(Status status) {
try {
latch.await();
} catch (InterruptedException e) {
Assertions.assertTrue(false);
Assertions.fail();
}
}

Expand Down Expand Up @@ -171,7 +171,7 @@ public void done(Status status) {
try {
latch.await();
} catch (InterruptedException e) {
Assertions.assertTrue(false);
Assertions.fail();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ public void testEncodeDecode() {
public void testCompareEntry() {
DLedgerEntry entry = new DLedgerEntry();
DLedgerEntry other = new DLedgerEntry();
Assertions.assertTrue(!entry.equals(null));
Assertions.assertEquals(entry, other);
Assertions.assertEquals(other, entry);
Assertions.assertEquals(other.hashCode(), entry.hashCode());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

public class MockSnapshotFile {

private static Logger logger = LoggerFactory.getLogger(MockSnapshotFile.class);
private static final Logger logger = LoggerFactory.getLogger(MockSnapshotFile.class);

private final String snapshotStorePath;

Expand All @@ -29,10 +29,10 @@ public boolean save(final long value) {

public long load() throws IOException {
String str = IOUtils.file2String(new File(snapshotStorePath));
if (str != null && str.length() != 0) {
if (str != null && !str.isEmpty()) {
return Long.parseLong(str);
} else {
throw new IOException("Unable to load snapshot data from " + snapshotStorePath);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

public class RegisterSnapshotFile {

private static Logger logger = LoggerFactory.getLogger(RegisterSnapshotFile.class);
private static final Logger logger = LoggerFactory.getLogger(RegisterSnapshotFile.class);

private final String snapshotStorePath;

Expand Down

0 comments on commit c362df1

Please sign in to comment.