From e169fddb6c9f5174e312e02e436a5714da0c1893 Mon Sep 17 00:00:00 2001 From: tunefun <965728225@qq.com> Date: Mon, 4 Mar 2024 23:34:12 +0800 Subject: [PATCH] lease: Exceptions of lease keepAlive can be perceived Fixes #1322 Signed-off-by: tunefun <965728225@qq.com> --- .../src/main/java/io/etcd/jetcd/impl/LeaseImpl.java | 4 +++- .../src/test/java/io/etcd/jetcd/impl/LeaseUnitTest.java | 9 +++++++-- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/jetcd-core/src/main/java/io/etcd/jetcd/impl/LeaseImpl.java b/jetcd-core/src/main/java/io/etcd/jetcd/impl/LeaseImpl.java index 70cb00a5a..6e57fa938 100644 --- a/jetcd-core/src/main/java/io/etcd/jetcd/impl/LeaseImpl.java +++ b/jetcd-core/src/main/java/io/etcd/jetcd/impl/LeaseImpl.java @@ -262,11 +262,13 @@ private synchronized void handleResponse(io.etcd.jetcd.api.LeaseKeepAliveRespons } } - private synchronized void handleException(Throwable unused) { + private synchronized void handleException(Throwable throwable) { if (!this.isRunning()) { return; } + keepAlives.values().forEach(ka -> ka.onError(throwable)); + restart = connectionManager().vertx().setTimer( 500, l -> { diff --git a/jetcd-core/src/test/java/io/etcd/jetcd/impl/LeaseUnitTest.java b/jetcd-core/src/test/java/io/etcd/jetcd/impl/LeaseUnitTest.java index 0c09edb66..00dc2c396 100644 --- a/jetcd-core/src/test/java/io/etcd/jetcd/impl/LeaseUnitTest.java +++ b/jetcd-core/src/test/java/io/etcd/jetcd/impl/LeaseUnitTest.java @@ -48,6 +48,7 @@ import static io.etcd.jetcd.common.exception.EtcdExceptionFactory.toEtcdException; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.awaitility.Awaitility.await; import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.Mockito.after; import static org.mockito.Mockito.timeout; @@ -57,6 +58,7 @@ @Timeout(value = 1, unit = TimeUnit.MINUTES) @ExtendWith(MockitoExtension.class) public class LeaseUnitTest { + private static final long TIME_OUT_SECONDS = 30; private Lease leaseCli; private AtomicReference> responseObserverRef; @@ -230,8 +232,10 @@ public void testKeepAliveReceivesExpiredLease() { @Test public void testKeepAliveResetOnStreamErrors() { - final StreamObserver observer = Observers.observer(response -> { - }); + final AtomicReference errorRecorder = new AtomicReference<>(); + final StreamObserver observer = Observers.builder() + .onError(errorRecorder::set) + .build(); try (CloseableClient client = this.leaseCli.keepAlive(LEASE_ID_1, observer)) { Throwable t = Status.ABORTED.asRuntimeException(); @@ -241,6 +245,7 @@ public void testKeepAliveResetOnStreamErrors() { // expect keep alive requests are still sending even with reset. verify(this.requestStreamObserverMock, timeout(2000).atLeast(3)).onNext(argThat(hasLeaseID(LEASE_ID_1))); + await().atMost(TIME_OUT_SECONDS, TimeUnit.SECONDS).untilAsserted(() -> assertThat(errorRecorder.get()).isNotNull()); } }