Skip to content

Commit

Permalink
refactor: use vertx executor service
Browse files Browse the repository at this point in the history
Signed-off-by: Luca Burgazzoli <lburgazzoli@gmail.com>
  • Loading branch information
lburgazzoli committed May 31, 2023
1 parent 879bb22 commit 9cd1e98
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,8 @@ ManagedChannelBuilder<?> defaultChannelBuilder(String target) {
if (target == null) {
throw new IllegalArgumentException("At least one endpoint should be provided");
}
if (vertx == null) {
vertx = Vertx.vertx(new VertxOptions().setUseDaemonThread(true));
}
final VertxChannelBuilder channelBuilder = VertxChannelBuilder.forTarget(vertx, target);

final VertxChannelBuilder channelBuilder = VertxChannelBuilder.forTarget(vertx(), target);

if (builder.authority() != null) {
channelBuilder.overrideAuthority(builder.authority());
Expand Down Expand Up @@ -224,4 +222,16 @@ public void start(Listener<RespT> responseListener, Metadata headers) {

return channelBuilder;
}

Vertx vertx() {
if (this.vertx == null) {
synchronized (this.lock) {
if (this.vertx == null) {
this.vertx = Vertx.vertx(new VertxOptions().setUseDaemonThread(true));
}
}
}

return this.vertx;
}
}
68 changes: 30 additions & 38 deletions jetcd-core/src/main/java/io/etcd/jetcd/impl/LeaseImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,6 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

import io.etcd.jetcd.Lease;
Expand Down Expand Up @@ -186,13 +183,11 @@ public synchronized void close() {
* The KeepAliver hold a background task and stream for keep aliaves.
*/
private final class KeepAlive extends Service {
private volatile ScheduledFuture<?> task;
private volatile ScheduledFuture<?> restart;
private volatile ScheduledExecutorService executor;
private volatile Long task;
private volatile Long restart;
private volatile WriteStream<LeaseKeepAliveRequest> requestStream;

public KeepAlive() {
this.executor = Executors.newScheduledThreadPool(2);
}

@Override
Expand All @@ -208,12 +203,10 @@ public void doStop() {
requestStream.end();
}
if (this.restart != null) {
this.restart.cancel(true);
this.restart = null;
connectionManager().vertx().cancelTimer(this.restart);
}
if (this.task != null) {
this.task.cancel(true);
this.task = null;
connectionManager().vertx().cancelTimer(this.task);
}
}

Expand All @@ -223,17 +216,17 @@ public void close() {

this.task = null;
this.restart = null;
this.executor.shutdownNow();
}

private void writeHandler(WriteStream<LeaseKeepAliveRequest> stream) {
requestStream = stream;

task = executor.scheduleAtFixedRate(
() -> keepAlives.values().forEach(element -> sendKeepAlive(element, stream)),
task = connectionManager().vertx().setPeriodic(
0,
500,
TimeUnit.MILLISECONDS);
l -> {
keepAlives.values().forEach(element -> sendKeepAlive(element, stream));
});
}

private void sendKeepAlive(KeepAliveObserver observer, WriteStream<LeaseKeepAliveRequest> stream) {
Expand Down Expand Up @@ -272,50 +265,49 @@ private synchronized void handleException(Throwable unused) {
return;
}

restart = this.executor.schedule(this::restart, 500, TimeUnit.MILLISECONDS);
restart = connectionManager().vertx().setTimer(
500,
l -> {
if (isRunning()) {
restart();
}
});
}
}

/**
* The DeadLiner hold a background task to check deadlines.
*/
private class DeadLine extends Service {
private volatile ScheduledFuture<?> task;
private volatile ScheduledExecutorService executor;
private volatile Long task;

public DeadLine() {
this.executor = Executors.newScheduledThreadPool(2);
}

@Override
public void doStart() {
this.task = executor.scheduleAtFixedRate(() -> {
long now = System.currentTimeMillis();

keepAlives.values().removeIf(ka -> {
if (ka.getDeadLine() < now) {
ka.onCompleted();
return true;
}
return false;
this.task = connectionManager().vertx().setPeriodic(
0,
1000,
l -> {
long now = System.currentTimeMillis();

keepAlives.values().removeIf(ka -> {
if (ka.getDeadLine() < now) {
ka.onCompleted();
return true;
}
return false;
});
});
}, 0, 1000, TimeUnit.MILLISECONDS);
}

@Override
public void doStop() {
if (this.task != null) {
this.task.cancel(true);
connectionManager().vertx().cancelTimer(this.task);
}
}

@Override
public void close() {
super.close();

this.executor.shutdownNow();
}

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,27 +22,17 @@
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.IntStream;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.DockerClientFactory;
import org.testcontainers.containers.Network;

import com.github.dockerjava.api.command.SyncDockerCmd;

import static java.util.stream.Collectors.toList;

public class EtcdClusterImpl implements EtcdCluster {
private static final Logger LOGGER = LoggerFactory.getLogger(EtcdClusterImpl.class);

private final List<EtcdContainer> containers;
private final String clusterName;
private final Network network;
private final List<String> endpoints;
private final AtomicBoolean hookIsSet;

public EtcdClusterImpl(
String image,
Expand All @@ -51,11 +41,10 @@ public EtcdClusterImpl(
int nodes,
boolean ssl,
Collection<String> additionalArgs,
Network network, boolean shouldMountDataDirectory) {
Network network,
boolean shouldMountDataDirectory) {

this.clusterName = clusterName;
this.network = network;
this.hookIsSet = new AtomicBoolean(false);
this.endpoints = IntStream.range(0, nodes)
.mapToObj(i -> (prefix == null ? "etcd" : prefix + "etcd") + i)
.collect(toList());
Expand All @@ -70,30 +59,8 @@ public EtcdClusterImpl(
.collect(toList());
}

private void execQuietly(SyncDockerCmd<?> cmd) {
try {
cmd.exec();
} catch (Exception e) {
LOGGER.warn("", e);
}
}

private void performCleanup() {
if (this.network != null && network.getId() != null) {
execQuietly(
DockerClientFactory.instance().client().removeNetworkCmd(this.network.getId()));
}
}

@Override
public void start() {
if (hookIsSet.compareAndSet(false, true)) {
// If the JVM stops without containers being stopped, try and stop the container.
Runtime
.getRuntime()
.addShutdownHook(new Thread(this::performCleanup));
}

final CountDownLatch latch = new CountDownLatch(containers.size());
final AtomicReference<Exception> failedToStart = new AtomicReference<>();

Expand Down

0 comments on commit 9cd1e98

Please sign in to comment.