From 6ef25d37c311ce5578706a4fb8839bb15dd2bab8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9C=D0=B0=D0=BA=D1=81=D0=B8=D0=BC=20=D0=91=D0=B5=D1=80?= =?UTF-8?q?=D1=8C=D1=8F=D0=BD=D0=BE=D0=B2?= Date: Mon, 1 Jul 2024 12:01:13 +0700 Subject: [PATCH] Hot Reload for creds Signed-off-by: Maxim Beryanov --- pom.xml | 2 +- .../java/com/ibm/etcd/client/EtcdClient.java | 30 +++++++++++++++---- .../java/com/ibm/etcd/client/GrpcClient.java | 19 ++---------- .../com/ibm/etcd/client/KvStoreClient.java | 4 +++ 4 files changed, 33 insertions(+), 22 deletions(-) diff --git a/pom.xml b/pom.xml index 654778a..c15eec2 100644 --- a/pom.xml +++ b/pom.xml @@ -18,7 +18,7 @@ 4.0.0 com.ibm.etcd etcd-java - 0.0.25-SNAPSHOT + 0.0.25 jar etcd-java diff --git a/src/main/java/com/ibm/etcd/client/EtcdClient.java b/src/main/java/com/ibm/etcd/client/EtcdClient.java index a5135c9..bc13653 100644 --- a/src/main/java/com/ibm/etcd/client/EtcdClient.java +++ b/src/main/java/com/ibm/etcd/client/EtcdClient.java @@ -95,9 +95,13 @@ import io.netty.util.IllegalReferenceCountException; import io.netty.util.ReferenceCounted; import io.netty.util.concurrent.FastThreadLocalThread; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class EtcdClient implements KvStoreClient { + private static final Logger logger = LoggerFactory.getLogger(EtcdClient.class); + private static final Key TOKEN_KEY = Key.of("token", Metadata.ASCII_STRING_MARSHALLER); @@ -111,6 +115,7 @@ public class EtcdClient implements KvStoreClient { public static final long DEFAULT_TIMEOUT_MS = 10_000L; // 10sec default public static final int DEFAULT_SESSION_TIMEOUT_SECS = 20; // 20sec default + public static final long AUTH_RETRY_TIMEOUT_MS = 100L; // 100ms default // (not intended to be strict hostname validation here) protected static final Pattern ADDR_PATT = @@ -120,7 +125,7 @@ public class EtcdClient implements KvStoreClient { private final int sessionTimeoutSecs; - private final ByteString name, password; + private ByteString name, password; private final MultithreadEventLoopGroup internalExecutor; private final ScheduledExecutorService sharedInternalExecutor; @@ -580,7 +585,7 @@ public boolean isClosed() { // ------ authentication logic private static final Set OTHER_AUTH_FAILURE_CODES = ImmutableSet.of( - Code.INVALID_ARGUMENT, Code.FAILED_PRECONDITION, Code.PERMISSION_DENIED, Code.UNKNOWN); + Code.INVALID_ARGUMENT, Code.FAILED_PRECONDITION, Code.PERMISSION_DENIED, Code.INTERNAL, Code.UNKNOWN); // Various different errors can imply a re-auth is required (sometimes non-obvious), // so we cover most related messages to be safe. This should not cause problems since @@ -665,11 +670,10 @@ public void applyRequestMetadata(RequestInfo requestInfo, failStatus = Status.UNAUTHENTICATED .withDescription("(Re)authentication RPC failed") .withCause(failure); - authFailRetryTime = System.currentTimeMillis() + 5_000L; + authFailRetryTime = System.currentTimeMillis() + AUTH_RETRY_TIMEOUT_MS; } else { failStatus = Status.fromThrowable(failure); - // If this was a real auth failure, postpone further attempts a bit longer - authFailRetryTime = System.currentTimeMillis() + 15_000L; + authFailRetryTime = System.currentTimeMillis() + AUTH_RETRY_TIMEOUT_MS; } lastAuthFailStatus = failStatus; // Augment with the RPC failure that triggered the re-authentication, @@ -697,6 +701,8 @@ private static Metadata tokenHeader(AuthenticateResponse authResponse) { } private ListenableFuture authenticate() { + logger.error("Auth token seems to be incorrect or uninitialized yet. Getting new token with current etcd creds..."); + AuthenticateRequest request = AuthenticateRequest.newBuilder() .setNameBytes(name).setPasswordBytes(password).build(); // no call creds for auth call @@ -770,6 +776,20 @@ public PersistentLease getSessionLease() { return sl; } + @Override + public void updateCredentials(String name, String password) { + this.name = ByteString.copyFromUtf8(name); + this.password = ByteString.copyFromUtf8(password); + } + + @Override + public List getCredentials() { + return Arrays.asList( + this.name.toStringUtf8(), + this.password.toStringUtf8() + ); + } + public Executor getExecutor() { return grpc.getResponseExecutor(); } diff --git a/src/main/java/com/ibm/etcd/client/GrpcClient.java b/src/main/java/com/ibm/etcd/client/GrpcClient.java index 29ddb41..c368e12 100644 --- a/src/main/java/com/ibm/etcd/client/GrpcClient.java +++ b/src/main/java/com/ibm/etcd/client/GrpcClient.java @@ -264,16 +264,8 @@ private ListenableFuture call(MethodDescriptor method, // multiple retries disabled or deadline expired return Futures.immediateFailedFuture(t); } - boolean reauth = false; - if (authProvider.requiresReauth(t)) { - if (afterReauth) { - // if we have an auth failure immediately following a reauth, give up - // (important to avoid infinite loop of auth failures) - return Futures.immediateFailedFuture(t); - } - reauthenticate(baseCallOpts, t); - reauth = true; - } else if (!retry.retry(t, request)) { + boolean reauth = reauthIfRequired(t, baseCallOpts);; + if (!reauth && !retry.retry(t, request)) { // retry predicate says no (non retryable request and/or error) return Futures.immediateFailedFuture(t); } @@ -442,8 +434,6 @@ final class ResilientBiDiStream { private boolean finished; private Throwable error; - private boolean lastAuthFailed; - /** * * @param method @@ -636,7 +626,6 @@ public void beforeStart(ClientCallStreamObserver rs) { // called from grpc response thread @Override public void onNext(RespT value) { - lastAuthFailed = false; respStream.onNext(value); } // called from grpc response thread @@ -646,10 +635,9 @@ public void onError(Throwable t) { if (finished) { finalError = true; } else { - reauthed = !lastAuthFailed && reauthIfRequired(t, sentCallOptions); + reauthed = reauthIfRequired(t, sentCallOptions); finalError = !reauthed && !retryableStreamError(t); } - lastAuthFailed = reauthed; if (!finalError) { int errCount = -1; String msg; @@ -703,7 +691,6 @@ public void onError(Throwable t) { // called from grpc response thread @Override public void onCompleted() { - lastAuthFailed = false; if (!finished) { logger.warn("Unexpected onCompleted received" + " for stream of method " + method.getFullMethodName()); diff --git a/src/main/java/com/ibm/etcd/client/KvStoreClient.java b/src/main/java/com/ibm/etcd/client/KvStoreClient.java index 3a99662..d45e215 100644 --- a/src/main/java/com/ibm/etcd/client/KvStoreClient.java +++ b/src/main/java/com/ibm/etcd/client/KvStoreClient.java @@ -16,6 +16,7 @@ package com.ibm.etcd.client; import java.io.Closeable; +import java.util.List; import com.ibm.etcd.client.kv.KvClient; import com.ibm.etcd.client.lease.LeaseClient; @@ -44,4 +45,7 @@ public interface KvStoreClient extends Closeable { */ PersistentLease getSessionLease(); + void updateCredentials(String name, String password); + + List getCredentials(); }