Skip to content

Commit

Permalink
Add test for #1161
Browse files Browse the repository at this point in the history
Signed-off-by: Luca Burgazzoli <lburgazzoli@gmail.com>
  • Loading branch information
lburgazzoli committed Jun 10, 2023
1 parent 73c7120 commit 23aeac5
Show file tree
Hide file tree
Showing 7 changed files with 107 additions and 31 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ subprojects {
maxParallelForks = Runtime.runtime.availableProcessors() ?: 1

retry {
maxRetries = 3
maxRetries = 1
maxFailures = 5
}
}
Expand Down
28 changes: 22 additions & 6 deletions jetcd-core/src/main/java/io/etcd/jetcd/ClientBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,7 @@
import java.net.URI;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -66,6 +61,7 @@ public final class ClientBuilder implements Cloneable {
private ByteSequence namespace = ByteSequence.EMPTY;
private long retryDelay = 500;
private long retryMaxDelay = 2500;
private int retryMaxAttempts = 2;
private ChronoUnit retryChronoUnit = ChronoUnit.MILLIS;
private Duration keepaliveTime = Duration.ofSeconds(30L);
private Duration keepaliveTimeout = Duration.ofSeconds(10L);
Expand Down Expand Up @@ -540,6 +536,26 @@ public ClientBuilder retryMaxDelay(long retryMaxDelay) {
return this;
}

/**
* Returns the max number of retry attempts
*
* @return max retry attempts.
*/
public int retryMaxAttempts() {
return retryMaxAttempts;
}

/**
* Set the max number of retry attempts
*
* @param retryMaxAttempts The max retry attempts.
* @return this builder
*/
public ClientBuilder retryMaxAttempts(int retryMaxAttempts) {
this.retryMaxAttempts = retryMaxAttempts;
return this;
}

/**
* Returns the keep alive time.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,8 @@
import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.ClientBuilder;
import io.etcd.jetcd.support.Util;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.*;
import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.netty.NegotiationType;
import io.grpc.stub.AbstractStub;
import io.netty.channel.ChannelOption;
Expand Down
35 changes: 27 additions & 8 deletions jetcd-core/src/main/java/io/etcd/jetcd/impl/Impl.java
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,30 @@ protected <S, T> CompletableFuture<T> execute(
Function<S, T> resultConvert,
Predicate<Status> doRetry) {

RetryPolicy<S> retryPolicy = new RetryPolicy<S>()
return Failsafe
.with(retryPolicy(doRetry))
.with(connectionManager.getExecutorService())
.getStageAsync(() -> supplier.get().toCompletionStage())
.thenApply(resultConvert);
}

protected <S> RetryPolicy<S> retryPolicy(Predicate<Status> doRetry) {
RetryPolicy<S> policy = new RetryPolicy<S>()
.onFailure(e -> {
logger.warn("retry failure (attempt: {}, error: {})",
e.getAttemptCount(),
e.getFailure() != null ? e.getFailure().getMessage() : "<none>");
})
.onRetry(e -> {
logger.debug("retry (attempt: {}, error: {})",
e.getAttemptCount(),
e.getLastFailure() != null ? e.getLastFailure().getMessage() : "<none>");
})
.onRetriesExceeded(e -> {
logger.warn("maximum number of auto retries reached (attempt: {}, error: {})",
e.getAttemptCount(),
e.getFailure() != null ? e.getFailure().getMessage() : "<none>");
})
.handleIf(throwable -> {
Status status = Status.fromThrowable(throwable);
if (isInvalidTokenError(status)) {
Expand All @@ -119,20 +142,16 @@ protected <S, T> CompletableFuture<T> execute(
}
return doRetry.test(status);
})
.onRetriesExceeded(e -> logger.warn("maximum number of auto retries reached"))
.withMaxRetries(connectionManager.builder().retryMaxAttempts())
.withBackoff(
connectionManager.builder().retryDelay(),
connectionManager.builder().retryMaxDelay(),
connectionManager.builder().retryChronoUnit());

if (connectionManager.builder().retryMaxDuration() != null) {
retryPolicy = retryPolicy.withMaxDuration(connectionManager.builder().retryMaxDuration());
policy = policy.withMaxDuration(connectionManager.builder().retryMaxDuration());
}

return Failsafe
.with(retryPolicy)
.with(connectionManager.getExecutorService())
.getStageAsync(() -> supplier.get().toCompletionStage())
.thenApply(resultConvert);
return policy;
}
}
47 changes: 47 additions & 0 deletions jetcd-core/src/test/java/io/etcd/jetcd/impl/RetryTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package io.etcd.jetcd.impl;

import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

import io.etcd.jetcd.Client;
import io.etcd.jetcd.ClientBuilder;
import io.grpc.StatusRuntimeException;

import static io.etcd.jetcd.impl.TestUtil.bytesOf;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

@Timeout(value = 30, unit = TimeUnit.SECONDS)
public class RetryTest {
@Test
public void testReconnect() throws Exception {
ClientBuilder builder = Client.builder()
.endpoints("http://127.0.0.1:9999")
.connectTimeout(Duration.ofMillis(250))
.waitForReady(false)
.retryMaxAttempts(5)
.retryDelay(250);

AtomicReference<Throwable> error = new AtomicReference<>();

try (Client client = builder.build()) {
CompletableFuture<?> unused = client.getKVClient().put(bytesOf("sample_key"), bytesOf("sample_value")).whenComplete(
(r, t) -> {
if (t != null) {
error.set(t);
}
});

await().untilAsserted(() -> {
assertThat(error.get()).isNotNull();
assertThat(error.get()).hasCauseInstanceOf(StatusRuntimeException.class);
assertThat(error.get().getCause()).hasMessage("UNAVAILABLE: io exception");
});
}
}
}
5 changes: 5 additions & 0 deletions jetcd-core/src/test/java/io/etcd/jetcd/impl/TestUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.Client;
import io.etcd.jetcd.ClientBuilder;
import io.etcd.jetcd.launcher.EtcdCluster;
import io.etcd.jetcd.test.EtcdClusterExtension;
import io.etcd.jetcd.watch.WatchResponse;

Expand Down Expand Up @@ -89,4 +90,8 @@ public static void noOpWatchResponseConsumer(WatchResponse response) {
public static ClientBuilder client(EtcdClusterExtension extension) {
return Client.builder().target("cluster://" + extension.clusterName());
}

public static ClientBuilder client(EtcdCluster cluster) {
return Client.builder().target("cluster://" + cluster.clusterName());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,12 @@
<!-- package loggers -->
<Logger name="org.testcontainers" level="INFO"/>
<Logger name="com.github.dockerjava" level="INFO"/>
<Logger name="🐳 [gcr.io/etcd-development/etcd:v3.3]" level="WARN"/>
<Logger name="🐳 [gcr.io/etcd-development/etcd:v3.4]" level="WARN"/>
<Logger name="🐳 [gcr.io/etcd-development/etcd:v3.4.7]" level="WARN"/>
<Logger name="io.etcd.jetcd.internal.infrastructure" level="INFO"/>
<Logger name="io.etcd.jetcd.launcher.EtcdCluster" level="INFO"/>
<Logger name="io.etcd.jetcd.launcher.EtcdContainer" level="INFO"/>
<Logger name="io.etcd" level="INFO"/>
<Logger name="io.etcd.jetcd.launcher.EtcdCluster" level="DEBUG"/>
<Logger name="io.etcd.jetcd.launcher.EtcdContainer" level="DEBUG"/>
<Logger name="io.etcd" level="TRACE"/>

<!-- main logger -->
<Root level="DEBUG">
<Root level="TRACE">
<AppenderRef ref="STDOUT" />
</Root>
</Loggers>
Expand Down

0 comments on commit 23aeac5

Please sign in to comment.