Skip to content

Commit

Permalink
Retry auth failures and require leader in ElectionImpl.
Browse files Browse the repository at this point in the history
Signed-off-by: Cristian Ferretti <jcferretti2020@gmail.com>
  • Loading branch information
jcferretti committed Mar 15, 2024
1 parent 2cd1293 commit f0bd80f
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 30 deletions.
98 changes: 70 additions & 28 deletions jetcd-core/src/main/java/io/etcd/jetcd/impl/ElectionImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.etcd.jetcd.election.NotLeaderException;
import io.etcd.jetcd.election.ProclaimResponse;
import io.etcd.jetcd.election.ResignResponse;
import io.etcd.jetcd.support.Errors;
import io.etcd.jetcd.support.Util;
import io.grpc.StatusRuntimeException;

Expand All @@ -51,6 +52,27 @@ final class ElectionImpl extends Impl implements Election {
this.namespace = connectionManager.getNamespace();
}

// Election operations are done in a context where a client is trying to implement
// some fault tolerance related use case; in that type of context, it makes sense to always
// apply require leader, since we don't want a client connected to a non-raft-leader server
// have an election method just go silent if the server the client happens to be connected to
// becomes partitioned from the actual raft-leader server in the etcd servers cluster:
// in that scenario and without required leader, an attempt to campaign could block forever
// not because some other client is already an election leader, but because the server the client
// is connected to is partitioned and can't tell.
// With require leader, in that case the call will fail and we give
// the client the ability to (a) know (b) retry on a different server.
// The retry on a different server should happen automatically if the connection manager is using
// a round robin strategy.
//
// Beware in the context of this election API, the word "leader" is overloaded.
// In the paragraph above when we say "raft-leader" we are talking about the etcd server that is a leader
// of the etcd servers cluster according to raft, we are not talking about the client that
// happens to be the leader of an election using the election API in this file.
private VertxElectionGrpc.ElectionVertxStub stubWithLeader() {
return Util.applyRequireLeader(true, stub);
}

@Override
public CompletableFuture<CampaignResponse> campaign(ByteSequence electionName, long leaseId, ByteSequence proposal) {
requireNonNull(electionName, "election name should not be null");
Expand All @@ -62,10 +84,11 @@ public CompletableFuture<CampaignResponse> campaign(ByteSequence electionName, l
.setLease(leaseId)
.build();

return completable(
stub.campaign(request),
CampaignResponse::new,
this::convertException);
return wrapConvertException(
execute(
() -> stubWithLeader().campaign(request),
CampaignResponse::new,
Errors::isRetryable));
}

@Override
Expand All @@ -84,10 +107,11 @@ public CompletableFuture<ProclaimResponse> proclaim(LeaderKey leaderKey, ByteSeq
.setValue(ByteString.copyFrom(proposal.getBytes()))
.build();

return completable(
stub.proclaim(request),
ProclaimResponse::new,
this::convertException);
return wrapConvertException(
execute(
() -> stubWithLeader().proclaim(request),
ProclaimResponse::new,
Errors::isRetryable));
}

@Override
Expand All @@ -98,10 +122,11 @@ public CompletableFuture<LeaderResponse> leader(ByteSequence electionName) {
.setName(Util.prefixNamespace(electionName, namespace))
.build();

return completable(
stub.leader(request),
r -> new LeaderResponse(r, namespace),
this::convertException);
return wrapConvertException(
execute(
() -> stubWithLeader().leader(request),
response -> new LeaderResponse(response, namespace),
Errors::isRetryable));
}

@Override
Expand All @@ -113,7 +138,7 @@ public void observe(ByteSequence electionName, Listener listener) {
.setName(ByteString.copyFrom(electionName.getBytes()))
.build();

stub.observeWithHandler(request,
stubWithLeader().observeWithHandler(request,
value -> listener.onNext(new LeaderResponse(value, namespace)),
ignored -> listener.onCompleted(),
error -> listener.onError(toEtcdException(error)));
Expand All @@ -133,25 +158,42 @@ public CompletableFuture<ResignResponse> resign(LeaderKey leaderKey) {
.build())
.build();

return completable(
stub.resign(request),
ResignResponse::new,
this::convertException);
return wrapConvertException(
execute(
() -> stubWithLeader().resign(request),
ResignResponse::new,
Errors::isRetryable));
}

private Throwable convertException(Throwable e) {
if (e instanceof StatusRuntimeException) {
StatusRuntimeException exception = (StatusRuntimeException) e;
String description = exception.getStatus().getDescription();
// different APIs use different messages. we cannot distinguish missing leader error otherwise,
// because communicated status is always UNKNOWN
if ("election: not leader".equals(description)) {
return new NotLeaderException();
} else if ("election: no leader".equals(description)) {
return new NoLeaderException();
private <S> CompletableFuture<S> wrapConvertException(CompletableFuture<S> future) {
return future.exceptionally(e -> {
throw convertException(e);
});
}

private RuntimeException convertException(Throwable e) {
Throwable cause = e;
while (cause != null) {
if (cause instanceof StatusRuntimeException) {
StatusRuntimeException exception = (StatusRuntimeException) cause;
String description = exception.getStatus().getDescription();
// different APIs use different messages. we cannot distinguish missing leader error otherwise,
// because communicated status is always UNKNOWN
if ("election: not leader".equals(description)) {
// Candidate is not a leader at the moment.
// Note there is a one letter difference, but this exception type is not the same as
// NoLeaderException.
return new NotLeaderException();
}
if ("election: no leader".equals(description)) {
// Leader for given election does not exist.
// Note there is a one letter difference, but this exception type is not the same as
// NotLeaderException.
return new NoLeaderException();
}
}
cause = cause.getCause();
}

return toEtcdException(e);
}
}
20 changes: 18 additions & 2 deletions jetcd-core/src/main/java/io/etcd/jetcd/impl/LockImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,22 @@ final class LockImpl extends Impl implements Lock {
private final VertxLockGrpc.LockVertxStub stub;
private final ByteSequence namespace;

// Lock operations are done in a context where a client is trying to implement
// some strict mutual exclusion use case; in that type of context, it makes sense to always
// apply require leader, since we don't want a client connected to a non-raft-leader server
// to have a lock method just go silent if the server the client happens to be connected to
// becomes partitioned from the actual raft-leader server in the etcd servers cluster:
// in that scenario and without required leader, an attempt to lock could block forever
// not because some other client is already holding a lock, but because the server the client
// is connected to is partitioned and can't tell.
// With require leader, in that case the call will fail and the client has the ability to
// (a) know (b) retry on a different server.
// The retry on a different server should happen automatically if the connection manager is using
// a round robin strategy.
private VertxLockGrpc.LockVertxStub stubWithLeader() {
return Util.applyRequireLeader(true, stub);
}

LockImpl(ClientConnectionManager connectionManager) {
super(connectionManager);

Expand All @@ -51,7 +67,7 @@ public CompletableFuture<LockResponse> lock(ByteSequence name, long leaseId) {
.build();

return execute(
() -> stub.lock(request),
() -> stubWithLeader().lock(request),
response -> new LockResponse(response, namespace),
Errors::isRetryable);
}
Expand All @@ -65,7 +81,7 @@ public CompletableFuture<UnlockResponse> unlock(ByteSequence lockKey) {
.build();

return execute(
() -> stub.unlock(request),
() -> stubWithLeader().unlock(request),
UnlockResponse::new,
Errors::isRetryable);
}
Expand Down

0 comments on commit f0bd80f

Please sign in to comment.