From f0bd80fb33b1080e539cbdfd08fea088bcc3dcec Mon Sep 17 00:00:00 2001 From: Cristian Ferretti Date: Tue, 12 Mar 2024 02:13:41 -0400 Subject: [PATCH] Retry auth failures and require leader in ElectionImpl. Signed-off-by: Cristian Ferretti --- .../java/io/etcd/jetcd/impl/ElectionImpl.java | 98 +++++++++++++------ .../java/io/etcd/jetcd/impl/LockImpl.java | 20 +++- 2 files changed, 88 insertions(+), 30 deletions(-) diff --git a/jetcd-core/src/main/java/io/etcd/jetcd/impl/ElectionImpl.java b/jetcd-core/src/main/java/io/etcd/jetcd/impl/ElectionImpl.java index 02e16b51..9d2de940 100644 --- a/jetcd-core/src/main/java/io/etcd/jetcd/impl/ElectionImpl.java +++ b/jetcd-core/src/main/java/io/etcd/jetcd/impl/ElectionImpl.java @@ -32,6 +32,7 @@ import io.etcd.jetcd.election.NotLeaderException; import io.etcd.jetcd.election.ProclaimResponse; import io.etcd.jetcd.election.ResignResponse; +import io.etcd.jetcd.support.Errors; import io.etcd.jetcd.support.Util; import io.grpc.StatusRuntimeException; @@ -51,6 +52,27 @@ final class ElectionImpl extends Impl implements Election { this.namespace = connectionManager.getNamespace(); } + // Election operations are done in a context where a client is trying to implement + // some fault tolerance related use case; in that type of context, it makes sense to always + // apply require leader, since we don't want a client connected to a non-raft-leader server + // have an election method just go silent if the server the client happens to be connected to + // becomes partitioned from the actual raft-leader server in the etcd servers cluster: + // in that scenario and without required leader, an attempt to campaign could block forever + // not because some other client is already an election leader, but because the server the client + // is connected to is partitioned and can't tell. + // With require leader, in that case the call will fail and we give + // the client the ability to (a) know (b) retry on a different server. + // The retry on a different server should happen automatically if the connection manager is using + // a round robin strategy. + // + // Beware in the context of this election API, the word "leader" is overloaded. + // In the paragraph above when we say "raft-leader" we are talking about the etcd server that is a leader + // of the etcd servers cluster according to raft, we are not talking about the client that + // happens to be the leader of an election using the election API in this file. + private VertxElectionGrpc.ElectionVertxStub stubWithLeader() { + return Util.applyRequireLeader(true, stub); + } + @Override public CompletableFuture campaign(ByteSequence electionName, long leaseId, ByteSequence proposal) { requireNonNull(electionName, "election name should not be null"); @@ -62,10 +84,11 @@ public CompletableFuture campaign(ByteSequence electionName, l .setLease(leaseId) .build(); - return completable( - stub.campaign(request), - CampaignResponse::new, - this::convertException); + return wrapConvertException( + execute( + () -> stubWithLeader().campaign(request), + CampaignResponse::new, + Errors::isRetryable)); } @Override @@ -84,10 +107,11 @@ public CompletableFuture proclaim(LeaderKey leaderKey, ByteSeq .setValue(ByteString.copyFrom(proposal.getBytes())) .build(); - return completable( - stub.proclaim(request), - ProclaimResponse::new, - this::convertException); + return wrapConvertException( + execute( + () -> stubWithLeader().proclaim(request), + ProclaimResponse::new, + Errors::isRetryable)); } @Override @@ -98,10 +122,11 @@ public CompletableFuture leader(ByteSequence electionName) { .setName(Util.prefixNamespace(electionName, namespace)) .build(); - return completable( - stub.leader(request), - r -> new LeaderResponse(r, namespace), - this::convertException); + return wrapConvertException( + execute( + () -> stubWithLeader().leader(request), + response -> new LeaderResponse(response, namespace), + Errors::isRetryable)); } @Override @@ -113,7 +138,7 @@ public void observe(ByteSequence electionName, Listener listener) { .setName(ByteString.copyFrom(electionName.getBytes())) .build(); - stub.observeWithHandler(request, + stubWithLeader().observeWithHandler(request, value -> listener.onNext(new LeaderResponse(value, namespace)), ignored -> listener.onCompleted(), error -> listener.onError(toEtcdException(error))); @@ -133,25 +158,42 @@ public CompletableFuture resign(LeaderKey leaderKey) { .build()) .build(); - return completable( - stub.resign(request), - ResignResponse::new, - this::convertException); + return wrapConvertException( + execute( + () -> stubWithLeader().resign(request), + ResignResponse::new, + Errors::isRetryable)); } - private Throwable convertException(Throwable e) { - if (e instanceof StatusRuntimeException) { - StatusRuntimeException exception = (StatusRuntimeException) e; - String description = exception.getStatus().getDescription(); - // different APIs use different messages. we cannot distinguish missing leader error otherwise, - // because communicated status is always UNKNOWN - if ("election: not leader".equals(description)) { - return new NotLeaderException(); - } else if ("election: no leader".equals(description)) { - return new NoLeaderException(); + private CompletableFuture wrapConvertException(CompletableFuture future) { + return future.exceptionally(e -> { + throw convertException(e); + }); + } + + private RuntimeException convertException(Throwable e) { + Throwable cause = e; + while (cause != null) { + if (cause instanceof StatusRuntimeException) { + StatusRuntimeException exception = (StatusRuntimeException) cause; + String description = exception.getStatus().getDescription(); + // different APIs use different messages. we cannot distinguish missing leader error otherwise, + // because communicated status is always UNKNOWN + if ("election: not leader".equals(description)) { + // Candidate is not a leader at the moment. + // Note there is a one letter difference, but this exception type is not the same as + // NoLeaderException. + return new NotLeaderException(); + } + if ("election: no leader".equals(description)) { + // Leader for given election does not exist. + // Note there is a one letter difference, but this exception type is not the same as + // NotLeaderException. + return new NoLeaderException(); + } } + cause = cause.getCause(); } - return toEtcdException(e); } } diff --git a/jetcd-core/src/main/java/io/etcd/jetcd/impl/LockImpl.java b/jetcd-core/src/main/java/io/etcd/jetcd/impl/LockImpl.java index 191aabe6..4518082f 100644 --- a/jetcd-core/src/main/java/io/etcd/jetcd/impl/LockImpl.java +++ b/jetcd-core/src/main/java/io/etcd/jetcd/impl/LockImpl.java @@ -34,6 +34,22 @@ final class LockImpl extends Impl implements Lock { private final VertxLockGrpc.LockVertxStub stub; private final ByteSequence namespace; + // Lock operations are done in a context where a client is trying to implement + // some strict mutual exclusion use case; in that type of context, it makes sense to always + // apply require leader, since we don't want a client connected to a non-raft-leader server + // to have a lock method just go silent if the server the client happens to be connected to + // becomes partitioned from the actual raft-leader server in the etcd servers cluster: + // in that scenario and without required leader, an attempt to lock could block forever + // not because some other client is already holding a lock, but because the server the client + // is connected to is partitioned and can't tell. + // With require leader, in that case the call will fail and the client has the ability to + // (a) know (b) retry on a different server. + // The retry on a different server should happen automatically if the connection manager is using + // a round robin strategy. + private VertxLockGrpc.LockVertxStub stubWithLeader() { + return Util.applyRequireLeader(true, stub); + } + LockImpl(ClientConnectionManager connectionManager) { super(connectionManager); @@ -51,7 +67,7 @@ public CompletableFuture lock(ByteSequence name, long leaseId) { .build(); return execute( - () -> stub.lock(request), + () -> stubWithLeader().lock(request), response -> new LockResponse(response, namespace), Errors::isRetryable); } @@ -65,7 +81,7 @@ public CompletableFuture unlock(ByteSequence lockKey) { .build(); return execute( - () -> stub.unlock(request), + () -> stubWithLeader().unlock(request), UnlockResponse::new, Errors::isRetryable); }