From 66bab4e9b0b9181bfb030a09f538b8f0fa20287a Mon Sep 17 00:00:00 2001 From: tunefun <965728225@qq.com> Date: Tue, 5 Mar 2024 00:08:57 +0800 Subject: [PATCH] election,lease,maintenance,watch: Register handlers before request Fixes #1308 Signed-off-by: tunefun <965728225@qq.com> --- gradle/libs.versions.toml | 2 +- .../java/io/etcd/jetcd/impl/ElectionImpl.java | 9 +++-- .../java/io/etcd/jetcd/impl/LeaseImpl.java | 39 +++++++++++-------- .../io/etcd/jetcd/impl/MaintenanceImpl.java | 23 ++++++----- .../java/io/etcd/jetcd/impl/WatchImpl.java | 16 ++++---- .../java/io/etcd/jetcd/impl/ElectionTest.java | 33 ++++++++++++++++ 6 files changed, 83 insertions(+), 39 deletions(-) diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index e7b235eb..4504f43f 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -14,7 +14,7 @@ commonsIo = "2.15.1" commonCompress = "1.26.0" autoService = "1.1.1" errorprone = "2.25.0" -vertx = "4.5.3" +vertx = "4.5.4" picocli = "4.7.5" restAssured = "5.4.0" javaxAnnotation = "1.3.2" diff --git a/jetcd-core/src/main/java/io/etcd/jetcd/impl/ElectionImpl.java b/jetcd-core/src/main/java/io/etcd/jetcd/impl/ElectionImpl.java index f8a5fe5a..d7a85abe 100644 --- a/jetcd-core/src/main/java/io/etcd/jetcd/impl/ElectionImpl.java +++ b/jetcd-core/src/main/java/io/etcd/jetcd/impl/ElectionImpl.java @@ -34,6 +34,7 @@ import io.etcd.jetcd.election.ResignResponse; import io.etcd.jetcd.support.Util; import io.grpc.StatusRuntimeException; +import io.vertx.core.streams.ReadStream; import com.google.protobuf.ByteString; @@ -113,10 +114,10 @@ public void observe(ByteSequence electionName, Listener listener) { .setName(ByteString.copyFrom(electionName.getBytes())) .build(); - stub.observe(request) - .handler(value -> listener.onNext(new LeaderResponse(value, namespace))) - .endHandler(ignored -> listener.onCompleted()) - .exceptionHandler(error -> listener.onError(toEtcdException(error))); + ReadStream ignoredRStream = stub.observeWithHandler(request, + value -> listener.onNext(new LeaderResponse(value, namespace)), + ignored -> listener.onCompleted(), + error -> listener.onError(toEtcdException(error))); } @Override diff --git a/jetcd-core/src/main/java/io/etcd/jetcd/impl/LeaseImpl.java b/jetcd-core/src/main/java/io/etcd/jetcd/impl/LeaseImpl.java index 6e57fa93..a83c095c 100644 --- a/jetcd-core/src/main/java/io/etcd/jetcd/impl/LeaseImpl.java +++ b/jetcd-core/src/main/java/io/etcd/jetcd/impl/LeaseImpl.java @@ -42,6 +42,7 @@ import io.etcd.jetcd.support.CloseableClient; import io.etcd.jetcd.support.Util; import io.grpc.stub.StreamObserver; +import io.vertx.core.streams.ReadStream; import io.vertx.core.streams.WriteStream; import static io.etcd.jetcd.common.exception.EtcdExceptionFactory.newClosedLeaseClientException; @@ -147,20 +148,22 @@ public CompletableFuture keepAliveOnce(long leaseId) { final CompletableFuture future = new CompletableFuture<>(); final LeaseKeepAliveRequest req = LeaseKeepAliveRequest.newBuilder().setID(leaseId).build(); - leaseStub - .leaseKeepAlive(s -> { - ref.set(s); - s.write(req); - }) - .handler(r -> { - if (r.getTTL() != 0) { - future.complete(new LeaseKeepAliveResponse(r)); - } else { - future.completeExceptionally( - newEtcdException(ErrorCode.NOT_FOUND, "etcdserver: requested lease not found")); - } - }) - .exceptionHandler(future::completeExceptionally); + ReadStream ignoredRStream = leaseStub + .leaseKeepAliveWithHandler( + s -> { + ref.set(s); + s.write(req); + }, + r -> { + if (r.getTTL() != 0) { + future.complete(new LeaseKeepAliveResponse(r)); + } else { + future.completeExceptionally( + newEtcdException(ErrorCode.NOT_FOUND, "etcdserver: requested lease not found")); + } + }, + null, + future::completeExceptionally); return future.whenComplete((r, t) -> ref.get().end(req)); } @@ -194,9 +197,11 @@ public KeepAlive() { @Override public void doStart() { - leaseStub.leaseKeepAlive(this::writeHandler) - .handler(this::handleResponse) - .exceptionHandler(this::handleException); + ReadStream ignoredRStream = leaseStub.leaseKeepAliveWithHandler( + this::writeHandler, + this::handleResponse, + null, + this::handleException); } @Override diff --git a/jetcd-core/src/main/java/io/etcd/jetcd/impl/MaintenanceImpl.java b/jetcd-core/src/main/java/io/etcd/jetcd/impl/MaintenanceImpl.java index 0a977c84..3a53bab8 100644 --- a/jetcd-core/src/main/java/io/etcd/jetcd/impl/MaintenanceImpl.java +++ b/jetcd-core/src/main/java/io/etcd/jetcd/impl/MaintenanceImpl.java @@ -30,6 +30,7 @@ import io.etcd.jetcd.maintenance.MoveLeaderResponse; import io.etcd.jetcd.maintenance.StatusResponse; import io.grpc.stub.StreamObserver; +import io.vertx.core.streams.ReadStream; import static com.google.common.base.Preconditions.checkArgument; import static io.etcd.jetcd.common.exception.EtcdExceptionFactory.toEtcdException; @@ -119,19 +120,20 @@ public CompletableFuture snapshot(OutputStream outputStream) { final CompletableFuture answer = new CompletableFuture<>(); final AtomicLong bytes = new AtomicLong(0); - this.stub.snapshot(SnapshotRequest.getDefaultInstance()) - .handler(r -> { + ReadStream ignoredRStream = this.stub.snapshotWithHandler( + SnapshotRequest.getDefaultInstance(), + r -> { try { r.getBlob().writeTo(outputStream); bytes.addAndGet(r.getBlob().size()); } catch (IOException e) { answer.completeExceptionally(toEtcdException(e)); } - }) - .endHandler(event -> { + }, + event -> { answer.complete(bytes.get()); - }) - .exceptionHandler(e -> { + }, + e -> { answer.completeExceptionally(toEtcdException(e)); }); @@ -141,9 +143,10 @@ public CompletableFuture snapshot(OutputStream outputStream) { @Override public void snapshot(StreamObserver observer) { - this.stub.snapshot(SnapshotRequest.getDefaultInstance()) - .handler(r -> observer.onNext(new io.etcd.jetcd.maintenance.SnapshotResponse(r))) - .endHandler(event -> observer.onCompleted()) - .exceptionHandler(e -> observer.onError(toEtcdException(e))); + ReadStream ignoredRStream = this.stub.snapshotWithHandler( + SnapshotRequest.getDefaultInstance(), + r -> observer.onNext(new io.etcd.jetcd.maintenance.SnapshotResponse(r)), + event -> observer.onCompleted(), + e -> observer.onError(toEtcdException(e))); } } diff --git a/jetcd-core/src/main/java/io/etcd/jetcd/impl/WatchImpl.java b/jetcd-core/src/main/java/io/etcd/jetcd/impl/WatchImpl.java index 21623ec5..f8e3ff44 100644 --- a/jetcd-core/src/main/java/io/etcd/jetcd/impl/WatchImpl.java +++ b/jetcd-core/src/main/java/io/etcd/jetcd/impl/WatchImpl.java @@ -178,13 +178,15 @@ void resume() { builder.addFilters(WatchCreateRequest.FilterType.NOPUT); } - rstream = Util.applyRequireLeader(option.withRequireLeader(), stub).watchWithExceptionHandler(stream -> { - wstream.set(stream); - stream.write(WatchRequest.newBuilder().setCreateRequest(builder).build()); - }, this::onError); - - rstream.handler(this::onNext); - rstream.endHandler(event -> onCompleted()); + rstream = Util.applyRequireLeader(option.withRequireLeader(), stub) + .watchWithHandler( + stream -> { + wstream.set(stream); + stream.write(WatchRequest.newBuilder().setCreateRequest(builder).build()); + }, + this::onNext, + event -> onCompleted(), + this::onError); } } diff --git a/jetcd-core/src/test/java/io/etcd/jetcd/impl/ElectionTest.java b/jetcd-core/src/test/java/io/etcd/jetcd/impl/ElectionTest.java index 99324aa8..a7626f40 100644 --- a/jetcd-core/src/test/java/io/etcd/jetcd/impl/ElectionTest.java +++ b/jetcd-core/src/test/java/io/etcd/jetcd/impl/ElectionTest.java @@ -197,6 +197,39 @@ public void onCompleted() { leaseClient.revoke(leaseId).get(); } + @Test + public void testObserveExistingLeader() throws Exception { + final AtomicInteger electionsSeen = new AtomicInteger(0); + ByteSequence electionName = ByteSequence.from(randomString(), StandardCharsets.UTF_8); + + long leaseId = leaseClient.grant(10).get().getID(); + + ByteSequence proposal = ByteSequence.from(randomString(), StandardCharsets.UTF_8); + electionClient.campaign(electionName, leaseId, proposal) + .get(OPERATION_TIMEOUT, TimeUnit.SECONDS); + + electionClient.observe(electionName, new Election.Listener() { + @Override + public void onNext(LeaderResponse response) { + electionsSeen.incrementAndGet(); + } + + @Override + public void onError(Throwable error) { + } + + @Override + public void onCompleted() { + } + }); + + TestUtil.waitForCondition( + () -> electionsSeen.get() == 1, OPERATION_TIMEOUT * 1000, + "Observer did not receive expected notifications, got: " + electionsSeen.get()); + + leaseClient.revoke(leaseId).get(); + } + @Test public void testSynchronizationBarrier() throws Exception { final int threadCount = 5;