Skip to content

Commit

Permalink
election,lease,maintenance,watch: Register handlers before request
Browse files Browse the repository at this point in the history
Fixes #1308

Signed-off-by: tunefun <965728225@qq.com>
  • Loading branch information
tunefun committed Mar 5, 2024
1 parent 10b07f5 commit 8ff98bd
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 38 deletions.
2 changes: 1 addition & 1 deletion gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ commonsIo = "2.15.1"
commonCompress = "1.26.0"
autoService = "1.1.1"
errorprone = "2.25.0"
vertx = "4.5.3"
vertx = "4.5.4"
picocli = "4.7.5"
restAssured = "5.4.0"
javaxAnnotation = "1.3.2"
Expand Down
8 changes: 4 additions & 4 deletions jetcd-core/src/main/java/io/etcd/jetcd/impl/ElectionImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,10 @@ public void observe(ByteSequence electionName, Listener listener) {
.setName(ByteString.copyFrom(electionName.getBytes()))
.build();

stub.observe(request)
.handler(value -> listener.onNext(new LeaderResponse(value, namespace)))
.endHandler(ignored -> listener.onCompleted())
.exceptionHandler(error -> listener.onError(toEtcdException(error)));
stub.observeWithHandler(request,
value -> listener.onNext(new LeaderResponse(value, namespace)),
ignored -> listener.onCompleted(),
error -> listener.onError(toEtcdException(error)));
}

@Override
Expand Down
36 changes: 20 additions & 16 deletions jetcd-core/src/main/java/io/etcd/jetcd/impl/LeaseImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -148,19 +148,21 @@ public CompletableFuture<LeaseKeepAliveResponse> keepAliveOnce(long leaseId) {
final LeaseKeepAliveRequest req = LeaseKeepAliveRequest.newBuilder().setID(leaseId).build();

leaseStub
.leaseKeepAlive(s -> {
ref.set(s);
s.write(req);
})
.handler(r -> {
if (r.getTTL() != 0) {
future.complete(new LeaseKeepAliveResponse(r));
} else {
future.completeExceptionally(
newEtcdException(ErrorCode.NOT_FOUND, "etcdserver: requested lease not found"));
}
})
.exceptionHandler(future::completeExceptionally);
.leaseKeepAliveWithHandler(
s -> {
ref.set(s);
s.write(req);
},
r -> {
if (r.getTTL() != 0) {
future.complete(new LeaseKeepAliveResponse(r));
} else {
future.completeExceptionally(
newEtcdException(ErrorCode.NOT_FOUND, "etcdserver: requested lease not found"));
}
},
null,
future::completeExceptionally);

return future.whenComplete((r, t) -> ref.get().end(req));
}
Expand Down Expand Up @@ -194,9 +196,11 @@ public KeepAlive() {

@Override
public void doStart() {
leaseStub.leaseKeepAlive(this::writeHandler)
.handler(this::handleResponse)
.exceptionHandler(this::handleException);
leaseStub.leaseKeepAliveWithHandler(
this::writeHandler,
this::handleResponse,
null,
this::handleException);
}

@Override
Expand Down
22 changes: 12 additions & 10 deletions jetcd-core/src/main/java/io/etcd/jetcd/impl/MaintenanceImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -119,19 +119,20 @@ public CompletableFuture<Long> snapshot(OutputStream outputStream) {
final CompletableFuture<Long> answer = new CompletableFuture<>();
final AtomicLong bytes = new AtomicLong(0);

this.stub.snapshot(SnapshotRequest.getDefaultInstance())
.handler(r -> {
this.stub.snapshotWithHandler(
SnapshotRequest.getDefaultInstance(),
r -> {
try {
r.getBlob().writeTo(outputStream);
bytes.addAndGet(r.getBlob().size());
} catch (IOException e) {
answer.completeExceptionally(toEtcdException(e));
}
})
.endHandler(event -> {
},
event -> {
answer.complete(bytes.get());
})
.exceptionHandler(e -> {
},
e -> {
answer.completeExceptionally(toEtcdException(e));
});

Expand All @@ -141,9 +142,10 @@ public CompletableFuture<Long> snapshot(OutputStream outputStream) {
@Override
public void snapshot(StreamObserver<io.etcd.jetcd.maintenance.SnapshotResponse> observer) {

this.stub.snapshot(SnapshotRequest.getDefaultInstance())
.handler(r -> observer.onNext(new io.etcd.jetcd.maintenance.SnapshotResponse(r)))
.endHandler(event -> observer.onCompleted())
.exceptionHandler(e -> observer.onError(toEtcdException(e)));
this.stub.snapshotWithHandler(
SnapshotRequest.getDefaultInstance(),
r -> observer.onNext(new io.etcd.jetcd.maintenance.SnapshotResponse(r)),
event -> observer.onCompleted(),
e -> observer.onError(toEtcdException(e)));
}
}
16 changes: 9 additions & 7 deletions jetcd-core/src/main/java/io/etcd/jetcd/impl/WatchImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -178,13 +178,15 @@ void resume() {
builder.addFilters(WatchCreateRequest.FilterType.NOPUT);
}

rstream = Util.applyRequireLeader(option.withRequireLeader(), stub).watchWithExceptionHandler(stream -> {
wstream.set(stream);
stream.write(WatchRequest.newBuilder().setCreateRequest(builder).build());
}, this::onError);

rstream.handler(this::onNext);
rstream.endHandler(event -> onCompleted());
rstream = Util.applyRequireLeader(option.withRequireLeader(), stub)
.watchWithHandler(
stream -> {
wstream.set(stream);
stream.write(WatchRequest.newBuilder().setCreateRequest(builder).build());
},
this::onNext,
event -> onCompleted(),
this::onError);
}
}

Expand Down
33 changes: 33 additions & 0 deletions jetcd-core/src/test/java/io/etcd/jetcd/impl/ElectionTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,39 @@ public void onCompleted() {
leaseClient.revoke(leaseId).get();
}

@Test
public void testObserveExistingLeader() throws Exception {
final AtomicInteger electionsSeen = new AtomicInteger(0);
ByteSequence electionName = ByteSequence.from(randomString(), StandardCharsets.UTF_8);

long leaseId = leaseClient.grant(10).get().getID();

ByteSequence proposal = ByteSequence.from(randomString(), StandardCharsets.UTF_8);
electionClient.campaign(electionName, leaseId, proposal)
.get(OPERATION_TIMEOUT, TimeUnit.SECONDS);

electionClient.observe(electionName, new Election.Listener() {
@Override
public void onNext(LeaderResponse response) {
electionsSeen.incrementAndGet();
}

@Override
public void onError(Throwable error) {
}

@Override
public void onCompleted() {
}
});

TestUtil.waitForCondition(
() -> electionsSeen.get() == 1, OPERATION_TIMEOUT * 1000,
"Observer did not receive expected notifications, got: " + electionsSeen.get());

leaseClient.revoke(leaseId).get();
}

@Test
public void testSynchronizationBarrier() throws Exception {
final int threadCount = 5;
Expand Down

0 comments on commit 8ff98bd

Please sign in to comment.