Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

watch without waitForReady sometimes not reschedule when all servers are down #1261

Closed
tunefun opened this issue Oct 25, 2023 · 8 comments · Fixed by #1287
Closed

watch without waitForReady sometimes not reschedule when all servers are down #1261

tunefun opened this issue Oct 25, 2023 · 8 comments · Fixed by #1287

Comments

@tunefun
Copy link
Contributor

tunefun commented Oct 25, 2023

Versions

  • etcd: 3.5.8
  • jetcd: 0.7.6
  • java: 1.8

Describe the bug
Watch without waitForReady sometimes not reschedule when all servers are down. When we reboot part of servers, watch stream on those servers rescheduled as expected;When we shutdown all servers and reboot all servers after a while, watch not rescheduled as expected.

Review code below, we will loss the event when event comes before setting handler.

rstream = Util.applyRequireLeader(option.withRequireLeader(), stub).watch(stream -> {
wstream.set(stream);
stream.write(WatchRequest.newBuilder().setCreateRequest(builder).build());
});
rstream.handler(this::onNext);
rstream.exceptionHandler(this::onError);
rstream.endHandler(event -> onCompleted());

example:
connect failed -> StreamObserverReadStream.onError() -> WatcherImpl.onError() -> WatcherImpl.reschedule() -> WatcherImpl.resume() -> WatchVertxStub.watch() -> connect failed -> StreamObserverReadStream.onError() -> StreamObserverReadStream.exceptionHandler is null -> rstream.exceptionHandler(this::onError)

To Reproduce
as description

Expected behavior
watch rescheduled

@lburgazzoli
Copy link
Collaborator

@tenghuanhe can you provide a reproducer and/or willing to work on a PR ?

@giri-vsr
Copy link
Contributor

giri-vsr commented Nov 9, 2023

We are also facing same issue.I am not able to reproduce using test containers.

This is what is happening.
For creating watch io.vertx.grpc.stub.ClientCalls.manyToMany is called which returns rstream and after that we register exceptionHandler but in this case exception happens before rstream is returned thats why StreamObserverReadStream.exceptionHandler is null.So it not rescheduling(onError is not called).

Exception occurs in StreamObserver<I> request = delegate.apply(response);

  public static <I, O> ReadStream<O> manyToMany(ContextInternal ctx, Handler<WriteStream<I>> requestHandler, Function<StreamObserver<O>, StreamObserver<I>> delegate) {
    StreamObserverReadStream<O> response = new StreamObserverReadStream<>();
    StreamObserver<I> request = delegate.apply(response);
    requestHandler.handle(new GrpcWriteStream<>(request));
    return response;
  }

@lburgazzoli I am not sure what will be the fix?

@lburgazzoli
Copy link
Collaborator

@giri-vsr I do have some very limited time at this stage and I need to digg into the issue more to understand what to do
If you have any time, maybe it would be good to chat with the vert.x folks to see what a solution could be.

@giri-vsr
Copy link
Contributor

Maybe we need to move to https://github.com/eclipse-vertx/vertx-grpc since https://github.com/vert-x3/vertx-grpc is deprecated

@lburgazzoli
Copy link
Collaborator

Maybe we need to move to https://github.com/eclipse-vertx/vertx-grpc since https://github.com/vert-x3/vertx-grpc is deprecated

can you try to work on a PR ?

@giri-vsr
Copy link
Contributor

Issue is fixed in https://github.com/vert-x3/vertx-grpc 4.5.1 and Move to https://github.com/eclipse-vertx/vertx-grpc should be handled separately.

@tunefun
Copy link
Contributor Author

tunefun commented Jan 11, 2024

@lburgazzoli @giri-vsr hello, it seems that election observe have the same problem

stub.observe(request)
.handler(value -> listener.onNext(new LeaderResponse(value, namespace)))
.endHandler(ignored -> listener.onCompleted())
.exceptionHandler(error -> listener.onError(toEtcdException(error)));

and maybe we can also registers end handler before watch request?

@tunefun
Copy link
Contributor Author

tunefun commented Jan 11, 2024

also LeaseImpl,MaintenanceImpl:

leaseStub
.leaseKeepAlive(s -> {
ref.set(s);
s.write(req);
})
.handler(r -> {
if (r.getTTL() != 0) {
future.complete(new LeaseKeepAliveResponse(r));
} else {
future.completeExceptionally(
newEtcdException(ErrorCode.NOT_FOUND, "etcdserver: requested lease not found"));
}
})
.exceptionHandler(future::completeExceptionally);

leaseStub.leaseKeepAlive(this::writeHandler)
.handler(this::handleResponse)
.exceptionHandler(this::handleException);

this.stub.snapshot(SnapshotRequest.getDefaultInstance())
.handler(r -> {
try {
r.getBlob().writeTo(outputStream);
bytes.addAndGet(r.getBlob().size());
} catch (IOException e) {
answer.completeExceptionally(toEtcdException(e));
}
})
.endHandler(event -> {
answer.complete(bytes.get());
})
.exceptionHandler(e -> {
answer.completeExceptionally(toEtcdException(e));
});

this.stub.snapshot(SnapshotRequest.getDefaultInstance())
.handler(r -> observer.onNext(new io.etcd.jetcd.maintenance.SnapshotResponse(r)))
.endHandler(event -> observer.onCompleted())
.exceptionHandler(e -> observer.onError(toEtcdException(e)));

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

Successfully merging a pull request may close this issue.

3 participants