diff --git a/dledger/src/test/java/io/openmessaging/storage/dledger/AppendAndPushTest.java b/dledger/src/test/java/io/openmessaging/storage/dledger/AppendAndPushTest.java index f35c44c6..f65082fa 100644 --- a/dledger/src/test/java/io/openmessaging/storage/dledger/AppendAndPushTest.java +++ b/dledger/src/test/java/io/openmessaging/storage/dledger/AppendAndPushTest.java @@ -114,6 +114,54 @@ public void testPushNetworkOffline() throws Exception { Assertions.assertTrue(hasWait); } + @Test + public void testPushNetworkOfflineWithSmallFallConfig() throws Exception { + String group = UUID.randomUUID().toString(); + String peers = String.format("n0-localhost:%d;n1-localhost:%d", nextPort(), nextPort()); + + DLedgerServer dLedgerServer0 = launchServer(group, peers, "n0", "n0", DLedgerConfig.FILE); + dLedgerServer0.getDLedgerConfig().setMaxPendingCommitBytes(100); + boolean hasWait = false; + for (int i = 0; i < 3; i++) { + AppendEntryRequest appendEntryRequest = new AppendEntryRequest(); + appendEntryRequest.setGroup(group); + appendEntryRequest.setRemoteId(dLedgerServer0.getMemberState().getSelfId()); + appendEntryRequest.setBody(new byte[128]); + CompletableFuture future = dLedgerServer0.handleAppend(appendEntryRequest); + Assertions.assertTrue(future instanceof AppendFuture); + if (future.isDone()) { + Assertions.assertEquals(DLedgerResponseCode.LEADER_PENDING_FULL.getCode(), future.get().getCode()); + hasWait = true; + break; + } + } + Assertions.assertTrue(hasWait); + } + + @Test + public void testPushNetworkOfflineWithSmallPendingCommitIndex() throws Exception { + String group = UUID.randomUUID().toString(); + String peers = String.format("n0-localhost:%d;n1-localhost:%d", nextPort(), nextPort()); + + DLedgerServer dLedgerServer0 = launchServer(group, peers, "n0", "n0", DLedgerConfig.FILE); + dLedgerServer0.getDLedgerConfig().setMaxPendingCommitIndexNum(10); + boolean hasWait = false; + for (int i = 0; i < 12; i++) { + AppendEntryRequest appendEntryRequest = new AppendEntryRequest(); + appendEntryRequest.setGroup(group); + appendEntryRequest.setRemoteId(dLedgerServer0.getMemberState().getSelfId()); + appendEntryRequest.setBody(new byte[128]); + CompletableFuture future = dLedgerServer0.handleAppend(appendEntryRequest); + Assertions.assertTrue(future instanceof AppendFuture); + if (future.isDone()) { + Assertions.assertEquals(DLedgerResponseCode.LEADER_PENDING_FULL.getCode(), future.get().getCode()); + hasWait = true; + break; + } + } + Assertions.assertTrue(hasWait); + } + @Test public void testPushNetworkNotStable() throws Exception { String group = UUID.randomUUID().toString(); diff --git a/dledger/src/test/java/io/openmessaging/storage/dledger/BatchPushTest.java b/dledger/src/test/java/io/openmessaging/storage/dledger/BatchPushTest.java index 07f7a55e..c96385ba 100644 --- a/dledger/src/test/java/io/openmessaging/storage/dledger/BatchPushTest.java +++ b/dledger/src/test/java/io/openmessaging/storage/dledger/BatchPushTest.java @@ -150,6 +150,58 @@ public void testBatchPushNetworkOffline() throws Exception { Assertions.assertTrue(hasWait); } + @Test + public void testBatchPushNetworkOfflineWithSmallFall() throws Exception { + String group = UUID.randomUUID().toString(); + String peers = String.format("n0-localhost:%d;n1-localhost:%d", nextPort(), nextPort()); + + DLedgerServer dLedgerServer0 = launchServerEnableBatchPush(group, peers, "n0", "n0", DLedgerConfig.FILE); + dLedgerServer0.getDLedgerConfig().setMaxPendingCommitBytes(100); + + boolean hasWait = false; + for (int i = 0; i < 3; i++) { + AppendEntryRequest appendEntryRequest = new AppendEntryRequest(); + appendEntryRequest.setGroup(group); + appendEntryRequest.setRemoteId(dLedgerServer0.getMemberState().getSelfId()); + appendEntryRequest.setBody(new byte[128]); + CompletableFuture future = dLedgerServer0.handleAppend(appendEntryRequest); + Assertions.assertTrue(future instanceof AppendFuture); + if (future.isDone()) { + Assertions.assertEquals(DLedgerResponseCode.LEADER_PENDING_FULL.getCode(), future.get().getCode()); + hasWait = true; + break; + } + } + dLedgerServer0.shutdown(); + Assertions.assertTrue(hasWait); + } + + @Test + public void testBatchPushNetworkOfflineWithSmallPendingCommitIndex() throws Exception { + String group = UUID.randomUUID().toString(); + String peers = String.format("n0-localhost:%d;n1-localhost:%d", nextPort(), nextPort()); + + DLedgerServer dLedgerServer0 = launchServerEnableBatchPush(group, peers, "n0", "n0", DLedgerConfig.FILE); + dLedgerServer0.getDLedgerConfig().setMaxPendingCommitIndexNum(10); + + boolean hasWait = false; + for (int i = 0; i < 12; i++) { + AppendEntryRequest appendEntryRequest = new AppendEntryRequest(); + appendEntryRequest.setGroup(group); + appendEntryRequest.setRemoteId(dLedgerServer0.getMemberState().getSelfId()); + appendEntryRequest.setBody(new byte[128]); + CompletableFuture future = dLedgerServer0.handleAppend(appendEntryRequest); + Assertions.assertTrue(future instanceof AppendFuture); + if (future.isDone()) { + Assertions.assertEquals(DLedgerResponseCode.LEADER_PENDING_FULL.getCode(), future.get().getCode()); + hasWait = true; + break; + } + } + dLedgerServer0.shutdown(); + Assertions.assertTrue(hasWait); + } + @Test public void testBatchPushNetworkNotStable() throws Exception { String group = UUID.randomUUID().toString();