diff --git a/jetcd-core/src/main/java/io/etcd/jetcd/impl/LeaseImpl.java b/jetcd-core/src/main/java/io/etcd/jetcd/impl/LeaseImpl.java index 70cb00a5..6e57fa93 100644 --- a/jetcd-core/src/main/java/io/etcd/jetcd/impl/LeaseImpl.java +++ b/jetcd-core/src/main/java/io/etcd/jetcd/impl/LeaseImpl.java @@ -262,11 +262,13 @@ private synchronized void handleResponse(io.etcd.jetcd.api.LeaseKeepAliveRespons } } - private synchronized void handleException(Throwable unused) { + private synchronized void handleException(Throwable throwable) { if (!this.isRunning()) { return; } + keepAlives.values().forEach(ka -> ka.onError(throwable)); + restart = connectionManager().vertx().setTimer( 500, l -> { diff --git a/jetcd-core/src/test/java/io/etcd/jetcd/impl/LeaseUnitTest.java b/jetcd-core/src/test/java/io/etcd/jetcd/impl/LeaseUnitTest.java index 0c09edb6..4b5192a4 100644 --- a/jetcd-core/src/test/java/io/etcd/jetcd/impl/LeaseUnitTest.java +++ b/jetcd-core/src/test/java/io/etcd/jetcd/impl/LeaseUnitTest.java @@ -48,6 +48,7 @@ import static io.etcd.jetcd.common.exception.EtcdExceptionFactory.toEtcdException; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.awaitility.Awaitility.await; import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.Mockito.after; import static org.mockito.Mockito.timeout; @@ -57,6 +58,7 @@ @Timeout(value = 1, unit = TimeUnit.MINUTES) @ExtendWith(MockitoExtension.class) public class LeaseUnitTest { + private static final long TIME_OUT_SECONDS = 30; private Lease leaseCli; private AtomicReference> responseObserverRef; @@ -230,8 +232,10 @@ public void testKeepAliveReceivesExpiredLease() { @Test public void testKeepAliveResetOnStreamErrors() { - final StreamObserver observer = Observers.observer(response -> { - }); + final AtomicReference errorRecorder = new AtomicReference<>(); + final StreamObserver observer = Observers. builder() + .onError(errorRecorder::set) + .build(); try (CloseableClient client = this.leaseCli.keepAlive(LEASE_ID_1, observer)) { Throwable t = Status.ABORTED.asRuntimeException(); @@ -241,6 +245,7 @@ public void testKeepAliveResetOnStreamErrors() { // expect keep alive requests are still sending even with reset. verify(this.requestStreamObserverMock, timeout(2000).atLeast(3)).onNext(argThat(hasLeaseID(LEASE_ID_1))); + await().atMost(TIME_OUT_SECONDS, TimeUnit.SECONDS).untilAsserted(() -> assertThat(errorRecorder.get()).isNotNull()); } }