From 948aa67cc48359b0022f834875f47c667ea4bf63 Mon Sep 17 00:00:00 2001 From: tunefun <965728225@qq.com> Date: Tue, 5 Mar 2024 00:08:57 +0800 Subject: [PATCH] election,lease,maintenance,watch: Register handlers before request Fixes #1308 Signed-off-by: tunefun <965728225@qq.com> --- gradle/libs.versions.toml | 2 +- .../java/io/etcd/jetcd/impl/ElectionImpl.java | 8 ++--- .../java/io/etcd/jetcd/impl/LeaseImpl.java | 36 ++++++++++--------- .../io/etcd/jetcd/impl/MaintenanceImpl.java | 22 ++++++------ .../java/io/etcd/jetcd/impl/WatchImpl.java | 16 +++++---- .../java/io/etcd/jetcd/impl/ElectionTest.java | 33 +++++++++++++++++ 6 files changed, 79 insertions(+), 38 deletions(-) diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index e7b235eb..4504f43f 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -14,7 +14,7 @@ commonsIo = "2.15.1" commonCompress = "1.26.0" autoService = "1.1.1" errorprone = "2.25.0" -vertx = "4.5.3" +vertx = "4.5.4" picocli = "4.7.5" restAssured = "5.4.0" javaxAnnotation = "1.3.2" diff --git a/jetcd-core/src/main/java/io/etcd/jetcd/impl/ElectionImpl.java b/jetcd-core/src/main/java/io/etcd/jetcd/impl/ElectionImpl.java index f8a5fe5a..02e16b51 100644 --- a/jetcd-core/src/main/java/io/etcd/jetcd/impl/ElectionImpl.java +++ b/jetcd-core/src/main/java/io/etcd/jetcd/impl/ElectionImpl.java @@ -113,10 +113,10 @@ public void observe(ByteSequence electionName, Listener listener) { .setName(ByteString.copyFrom(electionName.getBytes())) .build(); - stub.observe(request) - .handler(value -> listener.onNext(new LeaderResponse(value, namespace))) - .endHandler(ignored -> listener.onCompleted()) - .exceptionHandler(error -> listener.onError(toEtcdException(error))); + stub.observeWithHandler(request, + value -> listener.onNext(new LeaderResponse(value, namespace)), + ignored -> listener.onCompleted(), + error -> listener.onError(toEtcdException(error))); } @Override diff --git a/jetcd-core/src/main/java/io/etcd/jetcd/impl/LeaseImpl.java b/jetcd-core/src/main/java/io/etcd/jetcd/impl/LeaseImpl.java index 6e57fa93..67d9ac04 100644 --- a/jetcd-core/src/main/java/io/etcd/jetcd/impl/LeaseImpl.java +++ b/jetcd-core/src/main/java/io/etcd/jetcd/impl/LeaseImpl.java @@ -148,19 +148,21 @@ public CompletableFuture keepAliveOnce(long leaseId) { final LeaseKeepAliveRequest req = LeaseKeepAliveRequest.newBuilder().setID(leaseId).build(); leaseStub - .leaseKeepAlive(s -> { - ref.set(s); - s.write(req); - }) - .handler(r -> { - if (r.getTTL() != 0) { - future.complete(new LeaseKeepAliveResponse(r)); - } else { - future.completeExceptionally( - newEtcdException(ErrorCode.NOT_FOUND, "etcdserver: requested lease not found")); - } - }) - .exceptionHandler(future::completeExceptionally); + .leaseKeepAliveWithHandler( + s -> { + ref.set(s); + s.write(req); + }, + r -> { + if (r.getTTL() != 0) { + future.complete(new LeaseKeepAliveResponse(r)); + } else { + future.completeExceptionally( + newEtcdException(ErrorCode.NOT_FOUND, "etcdserver: requested lease not found")); + } + }, + null, + future::completeExceptionally); return future.whenComplete((r, t) -> ref.get().end(req)); } @@ -194,9 +196,11 @@ public KeepAlive() { @Override public void doStart() { - leaseStub.leaseKeepAlive(this::writeHandler) - .handler(this::handleResponse) - .exceptionHandler(this::handleException); + leaseStub.leaseKeepAliveWithHandler( + this::writeHandler, + this::handleResponse, + null, + this::handleException); } @Override diff --git a/jetcd-core/src/main/java/io/etcd/jetcd/impl/MaintenanceImpl.java b/jetcd-core/src/main/java/io/etcd/jetcd/impl/MaintenanceImpl.java index 0a977c84..4b742642 100644 --- a/jetcd-core/src/main/java/io/etcd/jetcd/impl/MaintenanceImpl.java +++ b/jetcd-core/src/main/java/io/etcd/jetcd/impl/MaintenanceImpl.java @@ -119,19 +119,20 @@ public CompletableFuture snapshot(OutputStream outputStream) { final CompletableFuture answer = new CompletableFuture<>(); final AtomicLong bytes = new AtomicLong(0); - this.stub.snapshot(SnapshotRequest.getDefaultInstance()) - .handler(r -> { + this.stub.snapshotWithHandler( + SnapshotRequest.getDefaultInstance(), + r -> { try { r.getBlob().writeTo(outputStream); bytes.addAndGet(r.getBlob().size()); } catch (IOException e) { answer.completeExceptionally(toEtcdException(e)); } - }) - .endHandler(event -> { + }, + event -> { answer.complete(bytes.get()); - }) - .exceptionHandler(e -> { + }, + e -> { answer.completeExceptionally(toEtcdException(e)); }); @@ -141,9 +142,10 @@ public CompletableFuture snapshot(OutputStream outputStream) { @Override public void snapshot(StreamObserver observer) { - this.stub.snapshot(SnapshotRequest.getDefaultInstance()) - .handler(r -> observer.onNext(new io.etcd.jetcd.maintenance.SnapshotResponse(r))) - .endHandler(event -> observer.onCompleted()) - .exceptionHandler(e -> observer.onError(toEtcdException(e))); + this.stub.snapshotWithHandler( + SnapshotRequest.getDefaultInstance(), + r -> observer.onNext(new io.etcd.jetcd.maintenance.SnapshotResponse(r)), + event -> observer.onCompleted(), + e -> observer.onError(toEtcdException(e))); } } diff --git a/jetcd-core/src/main/java/io/etcd/jetcd/impl/WatchImpl.java b/jetcd-core/src/main/java/io/etcd/jetcd/impl/WatchImpl.java index 21623ec5..f8e3ff44 100644 --- a/jetcd-core/src/main/java/io/etcd/jetcd/impl/WatchImpl.java +++ b/jetcd-core/src/main/java/io/etcd/jetcd/impl/WatchImpl.java @@ -178,13 +178,15 @@ void resume() { builder.addFilters(WatchCreateRequest.FilterType.NOPUT); } - rstream = Util.applyRequireLeader(option.withRequireLeader(), stub).watchWithExceptionHandler(stream -> { - wstream.set(stream); - stream.write(WatchRequest.newBuilder().setCreateRequest(builder).build()); - }, this::onError); - - rstream.handler(this::onNext); - rstream.endHandler(event -> onCompleted()); + rstream = Util.applyRequireLeader(option.withRequireLeader(), stub) + .watchWithHandler( + stream -> { + wstream.set(stream); + stream.write(WatchRequest.newBuilder().setCreateRequest(builder).build()); + }, + this::onNext, + event -> onCompleted(), + this::onError); } } diff --git a/jetcd-core/src/test/java/io/etcd/jetcd/impl/ElectionTest.java b/jetcd-core/src/test/java/io/etcd/jetcd/impl/ElectionTest.java index 99324aa8..a7626f40 100644 --- a/jetcd-core/src/test/java/io/etcd/jetcd/impl/ElectionTest.java +++ b/jetcd-core/src/test/java/io/etcd/jetcd/impl/ElectionTest.java @@ -197,6 +197,39 @@ public void onCompleted() { leaseClient.revoke(leaseId).get(); } + @Test + public void testObserveExistingLeader() throws Exception { + final AtomicInteger electionsSeen = new AtomicInteger(0); + ByteSequence electionName = ByteSequence.from(randomString(), StandardCharsets.UTF_8); + + long leaseId = leaseClient.grant(10).get().getID(); + + ByteSequence proposal = ByteSequence.from(randomString(), StandardCharsets.UTF_8); + electionClient.campaign(electionName, leaseId, proposal) + .get(OPERATION_TIMEOUT, TimeUnit.SECONDS); + + electionClient.observe(electionName, new Election.Listener() { + @Override + public void onNext(LeaderResponse response) { + electionsSeen.incrementAndGet(); + } + + @Override + public void onError(Throwable error) { + } + + @Override + public void onCompleted() { + } + }); + + TestUtil.waitForCondition( + () -> electionsSeen.get() == 1, OPERATION_TIMEOUT * 1000, + "Observer did not receive expected notifications, got: " + electionsSeen.get()); + + leaseClient.revoke(leaseId).get(); + } + @Test public void testSynchronizationBarrier() throws Exception { final int threadCount = 5;