Skip to content

Commit

Permalink
Merge branch 'main' into cfs-election-retry-reqleader
Browse files Browse the repository at this point in the history
  • Loading branch information
jcferretti committed Mar 12, 2024
2 parents 563feb1 + 1054fc2 commit 2ea90e0
Show file tree
Hide file tree
Showing 8 changed files with 96 additions and 48 deletions.
14 changes: 7 additions & 7 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
[versions]
grpc = "1.61.1"
log4j = "2.22.1"
mockito = "5.10.0"
grpc = "1.62.2"
log4j = "2.23.0"
mockito = "5.11.0"
slf4j = "2.0.12"
guava = "33.0.0-jre"
assertj = "3.25.3"
junit = "5.10.2"
testcontainers = "1.19.5"
testcontainers = "1.19.7"
protoc = "3.25.1"
failsafe = "3.3.2"
awaitility = "4.2.0"
commonsIo = "2.15.1"
commonCompress = "1.25.0"
commonCompress = "1.26.0"
autoService = "1.1.1"
errorprone = "2.24.1"
vertx = "4.5.3"
errorprone = "2.25.0"
vertx = "4.5.4"
picocli = "4.7.5"
restAssured = "5.4.0"
javaxAnnotation = "1.3.2"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ ManagedChannelBuilder<?> defaultChannelBuilder(String target) {
if (builder.loadBalancerPolicy() != null) {
channelBuilder.defaultLoadBalancingPolicy(builder.loadBalancerPolicy());
} else {
channelBuilder.defaultLoadBalancingPolicy("pick_first");
channelBuilder.defaultLoadBalancingPolicy("round_robin");
}

if (builder.headers() != null) {
Expand Down
8 changes: 4 additions & 4 deletions jetcd-core/src/main/java/io/etcd/jetcd/impl/ElectionImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,10 @@ public void observe(ByteSequence electionName, Listener listener) {
.setName(ByteString.copyFrom(electionName.getBytes()))
.build();

stubWithLeader().observe(request)
.handler(value -> listener.onNext(new LeaderResponse(value, namespace)))
.endHandler(ignored -> listener.onCompleted())
.exceptionHandler(error -> listener.onError(toEtcdException(error)));
stubWithLeader().observeWithHandler(request,
value -> listener.onNext(new LeaderResponse(value, namespace)),
ignored -> listener.onCompleted(),
error -> listener.onError(toEtcdException(error)));
}

@Override
Expand Down
40 changes: 23 additions & 17 deletions jetcd-core/src/main/java/io/etcd/jetcd/impl/LeaseImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -148,19 +148,21 @@ public CompletableFuture<LeaseKeepAliveResponse> keepAliveOnce(long leaseId) {
final LeaseKeepAliveRequest req = LeaseKeepAliveRequest.newBuilder().setID(leaseId).build();

leaseStub
.leaseKeepAlive(s -> {
ref.set(s);
s.write(req);
})
.handler(r -> {
if (r.getTTL() != 0) {
future.complete(new LeaseKeepAliveResponse(r));
} else {
future.completeExceptionally(
newEtcdException(ErrorCode.NOT_FOUND, "etcdserver: requested lease not found"));
}
})
.exceptionHandler(future::completeExceptionally);
.leaseKeepAliveWithHandler(
s -> {
ref.set(s);
s.write(req);
},
r -> {
if (r.getTTL() != 0) {
future.complete(new LeaseKeepAliveResponse(r));
} else {
future.completeExceptionally(
newEtcdException(ErrorCode.NOT_FOUND, "etcdserver: requested lease not found"));
}
},
null,
future::completeExceptionally);

return future.whenComplete((r, t) -> ref.get().end(req));
}
Expand Down Expand Up @@ -194,9 +196,11 @@ public KeepAlive() {

@Override
public void doStart() {
leaseStub.leaseKeepAlive(this::writeHandler)
.handler(this::handleResponse)
.exceptionHandler(this::handleException);
leaseStub.leaseKeepAliveWithHandler(
this::writeHandler,
this::handleResponse,
null,
this::handleException);
}

@Override
Expand Down Expand Up @@ -262,11 +266,13 @@ private synchronized void handleResponse(io.etcd.jetcd.api.LeaseKeepAliveRespons
}
}

private synchronized void handleException(Throwable unused) {
private synchronized void handleException(Throwable throwable) {
if (!this.isRunning()) {
return;
}

keepAlives.values().forEach(ka -> ka.onError(throwable));

restart = connectionManager().vertx().setTimer(
500,
l -> {
Expand Down
22 changes: 12 additions & 10 deletions jetcd-core/src/main/java/io/etcd/jetcd/impl/MaintenanceImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -119,19 +119,20 @@ public CompletableFuture<Long> snapshot(OutputStream outputStream) {
final CompletableFuture<Long> answer = new CompletableFuture<>();
final AtomicLong bytes = new AtomicLong(0);

this.stub.snapshot(SnapshotRequest.getDefaultInstance())
.handler(r -> {
this.stub.snapshotWithHandler(
SnapshotRequest.getDefaultInstance(),
r -> {
try {
r.getBlob().writeTo(outputStream);
bytes.addAndGet(r.getBlob().size());
} catch (IOException e) {
answer.completeExceptionally(toEtcdException(e));
}
})
.endHandler(event -> {
},
event -> {
answer.complete(bytes.get());
})
.exceptionHandler(e -> {
},
e -> {
answer.completeExceptionally(toEtcdException(e));
});

Expand All @@ -141,9 +142,10 @@ public CompletableFuture<Long> snapshot(OutputStream outputStream) {
@Override
public void snapshot(StreamObserver<io.etcd.jetcd.maintenance.SnapshotResponse> observer) {

this.stub.snapshot(SnapshotRequest.getDefaultInstance())
.handler(r -> observer.onNext(new io.etcd.jetcd.maintenance.SnapshotResponse(r)))
.endHandler(event -> observer.onCompleted())
.exceptionHandler(e -> observer.onError(toEtcdException(e)));
this.stub.snapshotWithHandler(
SnapshotRequest.getDefaultInstance(),
r -> observer.onNext(new io.etcd.jetcd.maintenance.SnapshotResponse(r)),
event -> observer.onCompleted(),
e -> observer.onError(toEtcdException(e)));
}
}
16 changes: 9 additions & 7 deletions jetcd-core/src/main/java/io/etcd/jetcd/impl/WatchImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -178,13 +178,15 @@ void resume() {
builder.addFilters(WatchCreateRequest.FilterType.NOPUT);
}

rstream = Util.applyRequireLeader(option.withRequireLeader(), stub).watchWithExceptionHandler(stream -> {
wstream.set(stream);
stream.write(WatchRequest.newBuilder().setCreateRequest(builder).build());
}, this::onError);

rstream.handler(this::onNext);
rstream.endHandler(event -> onCompleted());
rstream = Util.applyRequireLeader(option.withRequireLeader(), stub)
.watchWithHandler(
stream -> {
wstream.set(stream);
stream.write(WatchRequest.newBuilder().setCreateRequest(builder).build());
},
this::onNext,
event -> onCompleted(),
this::onError);
}
}

Expand Down
33 changes: 33 additions & 0 deletions jetcd-core/src/test/java/io/etcd/jetcd/impl/ElectionTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,39 @@ public void onCompleted() {
leaseClient.revoke(leaseId).get();
}

@Test
public void testObserveExistingLeader() throws Exception {
final AtomicInteger electionsSeen = new AtomicInteger(0);
ByteSequence electionName = ByteSequence.from(randomString(), StandardCharsets.UTF_8);

long leaseId = leaseClient.grant(10).get().getID();

ByteSequence proposal = ByteSequence.from(randomString(), StandardCharsets.UTF_8);
electionClient.campaign(electionName, leaseId, proposal)
.get(OPERATION_TIMEOUT, TimeUnit.SECONDS);

electionClient.observe(electionName, new Election.Listener() {
@Override
public void onNext(LeaderResponse response) {
electionsSeen.incrementAndGet();
}

@Override
public void onError(Throwable error) {
}

@Override
public void onCompleted() {
}
});

TestUtil.waitForCondition(
() -> electionsSeen.get() == 1, OPERATION_TIMEOUT * 1000,
"Observer did not receive expected notifications, got: " + electionsSeen.get());

leaseClient.revoke(leaseId).get();
}

@Test
public void testSynchronizationBarrier() throws Exception {
final int threadCount = 5;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import static io.etcd.jetcd.common.exception.EtcdExceptionFactory.toEtcdException;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.awaitility.Awaitility.await;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Mockito.after;
import static org.mockito.Mockito.timeout;
Expand All @@ -57,6 +58,7 @@
@Timeout(value = 1, unit = TimeUnit.MINUTES)
@ExtendWith(MockitoExtension.class)
public class LeaseUnitTest {
private static final long TIME_OUT_SECONDS = 30;

private Lease leaseCli;
private AtomicReference<StreamObserver<LeaseKeepAliveResponse>> responseObserverRef;
Expand Down Expand Up @@ -230,8 +232,10 @@ public void testKeepAliveReceivesExpiredLease() {

@Test
public void testKeepAliveResetOnStreamErrors() {
final StreamObserver<io.etcd.jetcd.lease.LeaseKeepAliveResponse> observer = Observers.observer(response -> {
});
final AtomicReference<Throwable> errorRecorder = new AtomicReference<>();
final StreamObserver<io.etcd.jetcd.lease.LeaseKeepAliveResponse> observer = Observers.<io.etcd.jetcd.lease.LeaseKeepAliveResponse> builder()
.onError(errorRecorder::set)
.build();

try (CloseableClient client = this.leaseCli.keepAlive(LEASE_ID_1, observer)) {
Throwable t = Status.ABORTED.asRuntimeException();
Expand All @@ -241,6 +245,7 @@ public void testKeepAliveResetOnStreamErrors() {

// expect keep alive requests are still sending even with reset.
verify(this.requestStreamObserverMock, timeout(2000).atLeast(3)).onNext(argThat(hasLeaseID(LEASE_ID_1)));
await().atMost(TIME_OUT_SECONDS, TimeUnit.SECONDS).untilAsserted(() -> assertThat(errorRecorder.get()).isNotNull());
}
}

Expand Down

0 comments on commit 2ea90e0

Please sign in to comment.