Skip to content

Commit

Permalink
lease: Exceptions of lease keepAlive can be perceived
Browse files Browse the repository at this point in the history
Fixes #1322

Signed-off-by: tunefun <965728225@qq.com>
  • Loading branch information
tunefun authored and lburgazzoli committed Mar 5, 2024
1 parent 4274990 commit 10b07f5
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 3 deletions.
4 changes: 3 additions & 1 deletion jetcd-core/src/main/java/io/etcd/jetcd/impl/LeaseImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -262,11 +262,13 @@ private synchronized void handleResponse(io.etcd.jetcd.api.LeaseKeepAliveRespons
}
}

private synchronized void handleException(Throwable unused) {
private synchronized void handleException(Throwable throwable) {
if (!this.isRunning()) {
return;
}

keepAlives.values().forEach(ka -> ka.onError(throwable));

restart = connectionManager().vertx().setTimer(
500,
l -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import static io.etcd.jetcd.common.exception.EtcdExceptionFactory.toEtcdException;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.awaitility.Awaitility.await;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Mockito.after;
import static org.mockito.Mockito.timeout;
Expand All @@ -57,6 +58,7 @@
@Timeout(value = 1, unit = TimeUnit.MINUTES)
@ExtendWith(MockitoExtension.class)
public class LeaseUnitTest {
private static final long TIME_OUT_SECONDS = 30;

private Lease leaseCli;
private AtomicReference<StreamObserver<LeaseKeepAliveResponse>> responseObserverRef;
Expand Down Expand Up @@ -230,8 +232,10 @@ public void testKeepAliveReceivesExpiredLease() {

@Test
public void testKeepAliveResetOnStreamErrors() {
final StreamObserver<io.etcd.jetcd.lease.LeaseKeepAliveResponse> observer = Observers.observer(response -> {
});
final AtomicReference<Throwable> errorRecorder = new AtomicReference<>();
final StreamObserver<io.etcd.jetcd.lease.LeaseKeepAliveResponse> observer = Observers.<io.etcd.jetcd.lease.LeaseKeepAliveResponse> builder()
.onError(errorRecorder::set)
.build();

try (CloseableClient client = this.leaseCli.keepAlive(LEASE_ID_1, observer)) {
Throwable t = Status.ABORTED.asRuntimeException();
Expand All @@ -241,6 +245,7 @@ public void testKeepAliveResetOnStreamErrors() {

// expect keep alive requests are still sending even with reset.
verify(this.requestStreamObserverMock, timeout(2000).atLeast(3)).onNext(argThat(hasLeaseID(LEASE_ID_1)));
await().atMost(TIME_OUT_SECONDS, TimeUnit.SECONDS).untilAsserted(() -> assertThat(errorRecorder.get()).isNotNull());
}
}

Expand Down

0 comments on commit 10b07f5

Please sign in to comment.