Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: use vertx executor service #1153

Merged
merged 1 commit into from
May 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,8 @@ ManagedChannelBuilder<?> defaultChannelBuilder(String target) {
if (target == null) {
throw new IllegalArgumentException("At least one endpoint should be provided");
}
if (vertx == null) {
vertx = Vertx.vertx(new VertxOptions().setUseDaemonThread(true));
}
final VertxChannelBuilder channelBuilder = VertxChannelBuilder.forTarget(vertx, target);

final VertxChannelBuilder channelBuilder = VertxChannelBuilder.forTarget(vertx(), target);

if (builder.authority() != null) {
channelBuilder.overrideAuthority(builder.authority());
Expand Down Expand Up @@ -224,4 +222,16 @@ public void start(Listener<RespT> responseListener, Metadata headers) {

return channelBuilder;
}

Vertx vertx() {
if (this.vertx == null) {
synchronized (this.lock) {
if (this.vertx == null) {
this.vertx = Vertx.vertx(new VertxOptions().setUseDaemonThread(true));
}
}
}

return this.vertx;
}
}
68 changes: 30 additions & 38 deletions jetcd-core/src/main/java/io/etcd/jetcd/impl/LeaseImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,6 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

import io.etcd.jetcd.Lease;
Expand Down Expand Up @@ -186,13 +183,11 @@ public synchronized void close() {
* The KeepAliver hold a background task and stream for keep aliaves.
*/
private final class KeepAlive extends Service {
private volatile ScheduledFuture<?> task;
private volatile ScheduledFuture<?> restart;
private volatile ScheduledExecutorService executor;
private volatile Long task;
private volatile Long restart;
private volatile WriteStream<LeaseKeepAliveRequest> requestStream;

public KeepAlive() {
this.executor = Executors.newScheduledThreadPool(2);
}

@Override
Expand All @@ -208,12 +203,10 @@ public void doStop() {
requestStream.end();
}
if (this.restart != null) {
this.restart.cancel(true);
this.restart = null;
connectionManager().vertx().cancelTimer(this.restart);
}
if (this.task != null) {
this.task.cancel(true);
this.task = null;
connectionManager().vertx().cancelTimer(this.task);
}
}

Expand All @@ -223,17 +216,17 @@ public void close() {

this.task = null;
this.restart = null;
this.executor.shutdownNow();
}

private void writeHandler(WriteStream<LeaseKeepAliveRequest> stream) {
requestStream = stream;

task = executor.scheduleAtFixedRate(
() -> keepAlives.values().forEach(element -> sendKeepAlive(element, stream)),
task = connectionManager().vertx().setPeriodic(
0,
500,
TimeUnit.MILLISECONDS);
l -> {
keepAlives.values().forEach(element -> sendKeepAlive(element, stream));
});
}

private void sendKeepAlive(KeepAliveObserver observer, WriteStream<LeaseKeepAliveRequest> stream) {
Expand Down Expand Up @@ -272,50 +265,49 @@ private synchronized void handleException(Throwable unused) {
return;
}

restart = this.executor.schedule(this::restart, 500, TimeUnit.MILLISECONDS);
restart = connectionManager().vertx().setTimer(
500,
l -> {
if (isRunning()) {
restart();
}
});
}
}

/**
* The DeadLiner hold a background task to check deadlines.
*/
private class DeadLine extends Service {
private volatile ScheduledFuture<?> task;
private volatile ScheduledExecutorService executor;
private volatile Long task;

public DeadLine() {
this.executor = Executors.newScheduledThreadPool(2);
}

@Override
public void doStart() {
this.task = executor.scheduleAtFixedRate(() -> {
long now = System.currentTimeMillis();

keepAlives.values().removeIf(ka -> {
if (ka.getDeadLine() < now) {
ka.onCompleted();
return true;
}
return false;
this.task = connectionManager().vertx().setPeriodic(
0,
1000,
l -> {
long now = System.currentTimeMillis();

keepAlives.values().removeIf(ka -> {
if (ka.getDeadLine() < now) {
ka.onCompleted();
return true;
}
return false;
});
});
}, 0, 1000, TimeUnit.MILLISECONDS);
}

@Override
public void doStop() {
if (this.task != null) {
this.task.cancel(true);
connectionManager().vertx().cancelTimer(this.task);
}
}

@Override
public void close() {
super.close();

this.executor.shutdownNow();
}

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,27 +22,17 @@
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.IntStream;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.DockerClientFactory;
import org.testcontainers.containers.Network;

import com.github.dockerjava.api.command.SyncDockerCmd;

import static java.util.stream.Collectors.toList;

public class EtcdClusterImpl implements EtcdCluster {
private static final Logger LOGGER = LoggerFactory.getLogger(EtcdClusterImpl.class);

private final List<EtcdContainer> containers;
private final String clusterName;
private final Network network;
private final List<String> endpoints;
private final AtomicBoolean hookIsSet;

public EtcdClusterImpl(
String image,
Expand All @@ -51,11 +41,10 @@ public EtcdClusterImpl(
int nodes,
boolean ssl,
Collection<String> additionalArgs,
Network network, boolean shouldMountDataDirectory) {
Network network,
boolean shouldMountDataDirectory) {

this.clusterName = clusterName;
this.network = network;
this.hookIsSet = new AtomicBoolean(false);
this.endpoints = IntStream.range(0, nodes)
.mapToObj(i -> (prefix == null ? "etcd" : prefix + "etcd") + i)
.collect(toList());
Expand All @@ -70,30 +59,8 @@ public EtcdClusterImpl(
.collect(toList());
}

private void execQuietly(SyncDockerCmd<?> cmd) {
try {
cmd.exec();
} catch (Exception e) {
LOGGER.warn("", e);
}
}

private void performCleanup() {
if (this.network != null && network.getId() != null) {
execQuietly(
DockerClientFactory.instance().client().removeNetworkCmd(this.network.getId()));
}
}

@Override
public void start() {
if (hookIsSet.compareAndSet(false, true)) {
// If the JVM stops without containers being stopped, try and stop the container.
Runtime
.getRuntime()
.addShutdownHook(new Thread(this::performCleanup));
}

final CountDownLatch latch = new CountDownLatch(containers.size());
final AtomicReference<Exception> failedToStart = new AtomicReference<>();

Expand Down