From 9cd1e9891114625b7f2b0aaa5f71e7fff29b40ef Mon Sep 17 00:00:00 2001 From: Luca Burgazzoli Date: Tue, 30 May 2023 15:01:34 +0200 Subject: [PATCH] refactor: use vertx executor service Signed-off-by: Luca Burgazzoli --- .../jetcd/impl/ClientConnectionManager.java | 18 +++-- .../java/io/etcd/jetcd/impl/LeaseImpl.java | 68 ++++++++----------- .../etcd/jetcd/launcher/EtcdClusterImpl.java | 37 +--------- 3 files changed, 46 insertions(+), 77 deletions(-) diff --git a/jetcd-core/src/main/java/io/etcd/jetcd/impl/ClientConnectionManager.java b/jetcd-core/src/main/java/io/etcd/jetcd/impl/ClientConnectionManager.java index 9bfcf44ee..17c034b3c 100644 --- a/jetcd-core/src/main/java/io/etcd/jetcd/impl/ClientConnectionManager.java +++ b/jetcd-core/src/main/java/io/etcd/jetcd/impl/ClientConnectionManager.java @@ -161,10 +161,8 @@ ManagedChannelBuilder defaultChannelBuilder(String target) { if (target == null) { throw new IllegalArgumentException("At least one endpoint should be provided"); } - if (vertx == null) { - vertx = Vertx.vertx(new VertxOptions().setUseDaemonThread(true)); - } - final VertxChannelBuilder channelBuilder = VertxChannelBuilder.forTarget(vertx, target); + + final VertxChannelBuilder channelBuilder = VertxChannelBuilder.forTarget(vertx(), target); if (builder.authority() != null) { channelBuilder.overrideAuthority(builder.authority()); @@ -224,4 +222,16 @@ public void start(Listener responseListener, Metadata headers) { return channelBuilder; } + + Vertx vertx() { + if (this.vertx == null) { + synchronized (this.lock) { + if (this.vertx == null) { + this.vertx = Vertx.vertx(new VertxOptions().setUseDaemonThread(true)); + } + } + } + + return this.vertx; + } } diff --git a/jetcd-core/src/main/java/io/etcd/jetcd/impl/LeaseImpl.java b/jetcd-core/src/main/java/io/etcd/jetcd/impl/LeaseImpl.java index b90eaa7cf..813c35916 100644 --- a/jetcd-core/src/main/java/io/etcd/jetcd/impl/LeaseImpl.java +++ b/jetcd-core/src/main/java/io/etcd/jetcd/impl/LeaseImpl.java @@ -21,9 +21,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import io.etcd.jetcd.Lease; @@ -186,13 +183,11 @@ public synchronized void close() { * The KeepAliver hold a background task and stream for keep aliaves. */ private final class KeepAlive extends Service { - private volatile ScheduledFuture task; - private volatile ScheduledFuture restart; - private volatile ScheduledExecutorService executor; + private volatile Long task; + private volatile Long restart; private volatile WriteStream requestStream; public KeepAlive() { - this.executor = Executors.newScheduledThreadPool(2); } @Override @@ -208,12 +203,10 @@ public void doStop() { requestStream.end(); } if (this.restart != null) { - this.restart.cancel(true); - this.restart = null; + connectionManager().vertx().cancelTimer(this.restart); } if (this.task != null) { - this.task.cancel(true); - this.task = null; + connectionManager().vertx().cancelTimer(this.task); } } @@ -223,17 +216,17 @@ public void close() { this.task = null; this.restart = null; - this.executor.shutdownNow(); } private void writeHandler(WriteStream stream) { requestStream = stream; - task = executor.scheduleAtFixedRate( - () -> keepAlives.values().forEach(element -> sendKeepAlive(element, stream)), + task = connectionManager().vertx().setPeriodic( 0, 500, - TimeUnit.MILLISECONDS); + l -> { + keepAlives.values().forEach(element -> sendKeepAlive(element, stream)); + }); } private void sendKeepAlive(KeepAliveObserver observer, WriteStream stream) { @@ -272,7 +265,13 @@ private synchronized void handleException(Throwable unused) { return; } - restart = this.executor.schedule(this::restart, 500, TimeUnit.MILLISECONDS); + restart = connectionManager().vertx().setTimer( + 500, + l -> { + if (isRunning()) { + restart(); + } + }); } } @@ -280,42 +279,35 @@ private synchronized void handleException(Throwable unused) { * The DeadLiner hold a background task to check deadlines. */ private class DeadLine extends Service { - private volatile ScheduledFuture task; - private volatile ScheduledExecutorService executor; + private volatile Long task; public DeadLine() { - this.executor = Executors.newScheduledThreadPool(2); } @Override public void doStart() { - this.task = executor.scheduleAtFixedRate(() -> { - long now = System.currentTimeMillis(); - - keepAlives.values().removeIf(ka -> { - if (ka.getDeadLine() < now) { - ka.onCompleted(); - return true; - } - return false; + this.task = connectionManager().vertx().setPeriodic( + 0, + 1000, + l -> { + long now = System.currentTimeMillis(); + + keepAlives.values().removeIf(ka -> { + if (ka.getDeadLine() < now) { + ka.onCompleted(); + return true; + } + return false; + }); }); - }, 0, 1000, TimeUnit.MILLISECONDS); } @Override public void doStop() { if (this.task != null) { - this.task.cancel(true); + connectionManager().vertx().cancelTimer(this.task); } } - - @Override - public void close() { - super.close(); - - this.executor.shutdownNow(); - } - } /** diff --git a/jetcd-launcher/src/main/java/io/etcd/jetcd/launcher/EtcdClusterImpl.java b/jetcd-launcher/src/main/java/io/etcd/jetcd/launcher/EtcdClusterImpl.java index 39763146f..7fb4f4c23 100644 --- a/jetcd-launcher/src/main/java/io/etcd/jetcd/launcher/EtcdClusterImpl.java +++ b/jetcd-launcher/src/main/java/io/etcd/jetcd/launcher/EtcdClusterImpl.java @@ -22,27 +22,17 @@ import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.IntStream; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.testcontainers.DockerClientFactory; import org.testcontainers.containers.Network; -import com.github.dockerjava.api.command.SyncDockerCmd; - import static java.util.stream.Collectors.toList; public class EtcdClusterImpl implements EtcdCluster { - private static final Logger LOGGER = LoggerFactory.getLogger(EtcdClusterImpl.class); - private final List containers; private final String clusterName; - private final Network network; private final List endpoints; - private final AtomicBoolean hookIsSet; public EtcdClusterImpl( String image, @@ -51,11 +41,10 @@ public EtcdClusterImpl( int nodes, boolean ssl, Collection additionalArgs, - Network network, boolean shouldMountDataDirectory) { + Network network, + boolean shouldMountDataDirectory) { this.clusterName = clusterName; - this.network = network; - this.hookIsSet = new AtomicBoolean(false); this.endpoints = IntStream.range(0, nodes) .mapToObj(i -> (prefix == null ? "etcd" : prefix + "etcd") + i) .collect(toList()); @@ -70,30 +59,8 @@ public EtcdClusterImpl( .collect(toList()); } - private void execQuietly(SyncDockerCmd cmd) { - try { - cmd.exec(); - } catch (Exception e) { - LOGGER.warn("", e); - } - } - - private void performCleanup() { - if (this.network != null && network.getId() != null) { - execQuietly( - DockerClientFactory.instance().client().removeNetworkCmd(this.network.getId())); - } - } - @Override public void start() { - if (hookIsSet.compareAndSet(false, true)) { - // If the JVM stops without containers being stopped, try and stop the container. - Runtime - .getRuntime() - .addShutdownHook(new Thread(this::performCleanup)); - } - final CountDownLatch latch = new CountDownLatch(containers.size()); final AtomicReference failedToStart = new AtomicReference<>();