Skip to content

Commit

Permalink
Update deps
Browse files Browse the repository at this point in the history
Signed-off-by: Luca Burgazzoli <lburgazzoli@gmail.com>
  • Loading branch information
lburgazzoli committed May 28, 2024
1 parent aa9ebaa commit 58c80db
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 14 deletions.
10 changes: 5 additions & 5 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@ picocli = "4.7.6"
restAssured = "5.4.0"
javaxAnnotation = "1.3.2"

versionsPlugin = "0.50.0"
versionsPlugin = "0.51.0"
errorPronePlugin = "3.1.0"
spotlessPlugin = "6.23.3"
spotlessPlugin = "6.25.0"
shadowPlugin = "8.1.1"
testLoggerPlugin = "4.0.0"
protobufPlugin = "0.9.4"
nexusPublishPlugin = "1.3.0"
axionReleasePlugin = "1.16.1"
testRetryPlugin = "1.5.8"
nexusPublishPlugin = "2.0.0"
axionReleasePlugin = "1.17.2"
testRetryPlugin = "1.5.9"


[libraries]
Expand Down
22 changes: 13 additions & 9 deletions jetcd-core/src/main/java/io/etcd/jetcd/impl/WatchImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,19 @@

import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.Watch;
import io.etcd.jetcd.api.*;
import io.etcd.jetcd.api.VertxWatchGrpc;
import io.etcd.jetcd.api.WatchCancelRequest;
import io.etcd.jetcd.api.WatchCreateRequest;
import io.etcd.jetcd.api.WatchProgressRequest;
import io.etcd.jetcd.api.WatchRequest;
import io.etcd.jetcd.api.WatchResponse;
import io.etcd.jetcd.common.exception.ErrorCode;
import io.etcd.jetcd.common.exception.EtcdException;
import io.etcd.jetcd.options.OptionsUtil;
import io.etcd.jetcd.options.WatchOption;
import io.etcd.jetcd.support.Errors;
import io.etcd.jetcd.support.Util;
import io.grpc.Status;
import io.vertx.core.streams.ReadStream;
import io.vertx.core.streams.WriteStream;

import com.google.common.base.Strings;
Expand All @@ -46,7 +50,10 @@
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;

import static io.etcd.jetcd.common.exception.EtcdExceptionFactory.*;
import static io.etcd.jetcd.common.exception.EtcdExceptionFactory.newClosedWatchClientException;
import static io.etcd.jetcd.common.exception.EtcdExceptionFactory.newCompactedException;
import static io.etcd.jetcd.common.exception.EtcdExceptionFactory.newEtcdException;
import static io.etcd.jetcd.common.exception.EtcdExceptionFactory.toEtcdException;

/**
* watch Implementation.
Expand Down Expand Up @@ -117,10 +124,8 @@ final class WatcherImpl implements Watcher {
private final Listener listener;
private final AtomicBoolean closed;

//private StreamObserver<WatchRequest> stream;
private final AtomicReference<WriteStream<WatchRequest>> wstream;
private final AtomicBoolean started;
private ReadStream<WatchResponse> rstream;
private long revision;
private long id;

Expand All @@ -132,7 +137,6 @@ final class WatcherImpl implements Watcher {

this.started = new AtomicBoolean();
this.wstream = new AtomicReference<>();
this.rstream = null;
this.id = -1;
this.revision = this.option.getRevision();
}
Expand All @@ -154,7 +158,7 @@ void resume() {
}

if (started.compareAndSet(false, true)) {
// id is not really useful today but it may be in etcd 3.4
// id is not really useful today, but it may be in etcd 3.4
id = -1;

WatchCreateRequest.Builder builder = WatchCreateRequest.newBuilder()
Expand All @@ -166,7 +170,7 @@ void resume() {
.map(endKey -> Util.prefixNamespaceToRangeEnd(endKey, namespace))
.ifPresent(builder::setRangeEnd);

if (!option.getEndKey().isPresent() && option.isPrefix()) {
if (option.getEndKey().isEmpty() && option.isPrefix()) {
ByteSequence endKey = OptionsUtil.prefixEndOf(key);
builder.setRangeEnd(Util.prefixNamespaceToRangeEnd(endKey, namespace));
}
Expand All @@ -179,7 +183,7 @@ void resume() {
builder.addFilters(WatchCreateRequest.FilterType.NOPUT);
}

rstream = Util.applyRequireLeader(option.withRequireLeader(), stub)
var ignored = Util.applyRequireLeader(option.withRequireLeader(), stub)
.watchWithHandler(
stream -> {
wstream.set(stream);
Expand Down

0 comments on commit 58c80db

Please sign in to comment.