Skip to content

Commit

Permalink
The DnsClient does not have a close method to release the underlying …
Browse files Browse the repository at this point in the history
…networking resources.

Add a close method to the client that closes the datagram channel, inflight queries are failed.
  • Loading branch information
vietj committed Aug 22, 2023
1 parent 17f6dce commit 0ce76a8
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 21 deletions.
2 changes: 2 additions & 0 deletions src/main/asciidoc/dns.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ for non blocking address resolution.
{@link examples.DNSExamples#example1__}
----

A client uses a single event loop for querying purposes, it can safely be used from any thread, including non Vert.x thread.

=== lookup

Try to lookup the A (ipv4) or AAAA (ipv6) record for a given name. The first which is returned will be used,
Expand Down
13 changes: 13 additions & 0 deletions src/main/java/io/vertx/core/dns/DnsClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
* Provides a way to asynchronously lookup information from DNS servers.
* <p>
* Please consult the documentation for more information on DNS clients.
* <p>
* The client is thread safe and can be used from any thread.
*
* @author <a href="mailto:nmaurer@redhat.com">Norman Maurer</a>
*/
Expand Down Expand Up @@ -237,4 +239,15 @@ public interface DnsClient {
* Like {@link #reverseLookup(String, Handler)} but returns a {@code Future} of the asynchronous result
*/
Future<@Nullable String> reverseLookup(String ipaddress);

/**
* Close the client.
*/
void close(Handler<AsyncResult<Void>> handler);

/**
* Like {@link #close(Handler)} but returns a {@code Future} of the asynchronous result
*/
Future<Void> close();

}
49 changes: 38 additions & 11 deletions src/main/java/io/vertx/core/dns/impl/DnsClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.vertx.core.impl.future.PromiseInternal;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.buffer.impl.PartialPooledByteBufAllocator;
import io.vertx.core.net.impl.ConnectionBase;
import io.vertx.core.spi.transport.Transport;

import java.net.Inet4Address;
Expand All @@ -49,40 +50,39 @@ public final class DnsClientImpl implements DnsClient {
private final VertxInternal vertx;
private final LongObjectMap<Query> inProgressMap = new LongObjectHashMap<>();
private final InetSocketAddress dnsServer;
private final ContextInternal actualCtx;
private final ContextInternal context;
private final DatagramChannel channel;
private final DnsClientOptions options;
private volatile Future<Void> closed;

public DnsClientImpl(VertxInternal vertx, DnsClientOptions options) {
Objects.requireNonNull(options, "no null options accepted");
Objects.requireNonNull(options.getHost(), "no null host accepted");

this.options = new DnsClientOptions(options);

ContextInternal creatingContext = vertx.getContext();

this.dnsServer = new InetSocketAddress(options.getHost(), options.getPort());
if (this.dnsServer.isUnresolved()) {
throw new IllegalArgumentException("Cannot resolve the host to a valid ip address");
}
this.vertx = vertx;

Transport transport = vertx.transport();
actualCtx = vertx.getOrCreateContext();
context = vertx.getOrCreateContext();
channel = transport.datagramChannel(this.dnsServer.getAddress() instanceof Inet4Address ? InternetProtocolFamily.IPv4 : InternetProtocolFamily.IPv6);
channel.config().setOption(ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION, true);
MaxMessagesRecvByteBufAllocator bufAllocator = channel.config().getRecvByteBufAllocator();
bufAllocator.maxMessagesPerRead(1);
channel.config().setAllocator(PartialPooledByteBufAllocator.INSTANCE);
actualCtx.nettyEventLoop().register(channel);
context.nettyEventLoop().register(channel);
if (options.getLogActivity()) {
channel.pipeline().addLast("logging", new LoggingHandler(options.getActivityLogFormat()));
}
channel.pipeline().addLast(new DatagramDnsQueryEncoder());
channel.pipeline().addLast(new DatagramDnsResponseDecoder());
channel.pipeline().addLast(new SimpleChannelInboundHandler<DnsResponse>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, DnsResponse msg) throws Exception {
protected void channelRead0(ChannelHandlerContext ctx, DnsResponse msg) {
DefaultDnsQuestion question = msg.recordAt(DnsSection.QUESTION);
Query query = inProgressMap.get(dnsMessageId(msg.id(), question.name()));
if (query != null) {
Expand Down Expand Up @@ -266,9 +266,12 @@ private <T> Future<T> lookupSingle(String name, DnsRecordType... types) {
@SuppressWarnings("unchecked")
private <T> Future<List<T>> lookupList(String name, DnsRecordType... types) {
ContextInternal ctx = vertx.getOrCreateContext();
if (closed != null) {
return ctx.failedFuture(ConnectionBase.CLOSED_EXCEPTION);
}
PromiseInternal<List<T>> promise = ctx.promise();
Objects.requireNonNull(name, "no null name accepted");
EventLoop el = actualCtx.nettyEventLoop();
EventLoop el = context.nettyEventLoop();
Query query = new Query(name, types);
query.promise.addListener(promise);
if (el.inEventLoop()) {
Expand All @@ -285,7 +288,7 @@ private long dnsMessageId(int id, String query) {

// Testing purposes
public void inProgressQueries(Handler<Integer> handler) {
actualCtx.runOnContext(v -> {
context.runOnContext(v -> {
handler.handle(inProgressMap.size());
});
}
Expand All @@ -306,7 +309,7 @@ public Query(String name, DnsRecordType[] types) {
for (DnsRecordType type: types) {
msg.addRecord(DnsSection.QUESTION, new DefaultDnsQuestion(name, type, DnsRecord.CLASS_IN));
}
this.promise = actualCtx.nettyEventLoop().newPromise();
this.promise = context.nettyEventLoop().newPromise();
this.types = types;
this.name = name;
}
Expand Down Expand Up @@ -348,13 +351,13 @@ void run() {
inProgressMap.put(dnsMessageId(msg.id(), name), this);
timerID = vertx.setTimer(options.getQueryTimeout(), id -> {
timerID = -1;
actualCtx.runOnContext(v -> {
context.runOnContext(v -> {
fail(new VertxException("DNS query timeout for " + name));
});
});
channel.writeAndFlush(msg).addListener((ChannelFutureListener) future -> {
if (!future.isSuccess()) {
actualCtx.emit(future.cause(), this::fail);
context.emit(future.cause(), this::fail);
}
});
}
Expand All @@ -368,4 +371,28 @@ private boolean isRequestedType(DnsRecordType dnsRecordType, DnsRecordType[] typ
return false;
}
}

@Override
public void close(Handler<AsyncResult<Void>> handler) {
close().onComplete(handler);
}

@Override
public Future<Void> close() {
PromiseInternal<Void> promise;
synchronized (this) {
if (closed != null) {
return closed;
}
promise = vertx.promise();
closed = promise.future();
}
context.runOnContext(v -> {
new ArrayList<>(inProgressMap.values()).forEach(query -> {
query.fail(ConnectionBase.CLOSED_EXCEPTION);
});
channel.close().addListener(promise);
});
return promise.future();
}
}
46 changes: 36 additions & 10 deletions src/test/java/io/vertx/core/dns/DNSTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,33 +11,28 @@

package io.vertx.core.dns;

import static io.vertx.test.core.TestUtils.assertIllegalStateException;
import static io.vertx.test.core.TestUtils.assertNullPointerException;

import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import io.vertx.test.core.TestUtils;
import io.vertx.test.core.VertxTestBase;
import org.apache.directory.server.dns.messages.DnsMessage;
import org.apache.directory.server.dns.store.RecordStore;
import org.junit.Test;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.DeploymentOptions;
import io.vertx.core.Vertx;
import io.vertx.core.VertxException;
import io.vertx.core.VertxOptions;
import io.vertx.core.dns.DnsClient;
import io.vertx.core.dns.DnsClientOptions;
import io.vertx.core.dns.DnsException;
import io.vertx.core.dns.DnsResponseCode;
import io.vertx.core.dns.MxRecord;
import io.vertx.core.dns.SrvRecord;
import io.vertx.core.dns.impl.DnsClientImpl;
import io.vertx.test.fakedns.FakeDNSServer;
import io.vertx.test.netty.TestLoggerFactory;

import java.util.List;

/**
* @author <a href="mailto:nmaurer@redhat.com">Norman Maurer</a>
* @author <a href="http://tfox.org">Tim Fox</a>
Expand Down Expand Up @@ -445,6 +440,37 @@ public void testRecursionNotDesired() throws Exception {
await();
}

@Test
public void testClose() throws Exception {
waitFor(2);
String ip = "10.0.0.1";
RecordStore store = dnsServer.testResolveA(ip).store();
CountDownLatch latch1 = new CountDownLatch(1);
CountDownLatch latch2 = new CountDownLatch(1);
dnsServer.store(question -> {
latch1.countDown();
try {
latch2.await(10, TimeUnit.SECONDS);
} catch (Exception e) {
fail(e);
}
return store.getRecords(question);
});
DnsClient dns = prepareDns();
dns
.resolveA("vertx.io")
.onComplete(onFailure(timeout -> {
assertTrue(timeout.getMessage().contains("closed"));
complete();
}));
awaitLatch(latch1);
dns.close().onComplete(onSuccess(v -> {
complete();
latch2.countDown();
}));
await();
}

private DnsClient prepareDns() throws Exception {
return prepareDns(new DnsClientOptions().setQueryTimeout(15000));
}
Expand Down
4 changes: 4 additions & 0 deletions src/test/java/io/vertx/test/fakedns/FakeDNSServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ public static RecordStore A_store(Function<String, String> entries) {
public FakeDNSServer() {
}

public RecordStore store() {
return store;
}

public FakeDNSServer store(RecordStore store) {
this.store = store;
return this;
Expand Down

0 comments on commit 0ce76a8

Please sign in to comment.