diff --git a/client/src/main/java/org/asynchttpclient/AsyncHandler.java b/client/src/main/java/org/asynchttpclient/AsyncHandler.java index 6733c94711..f17fd9275c 100644 --- a/client/src/main/java/org/asynchttpclient/AsyncHandler.java +++ b/client/src/main/java/org/asynchttpclient/AsyncHandler.java @@ -20,7 +20,7 @@ import org.asynchttpclient.netty.request.NettyRequest; import javax.net.ssl.SSLSession; -import java.net.InetSocketAddress; +import java.net.SocketAddress; import java.util.List; @@ -132,7 +132,7 @@ default void onHostnameResolutionAttempt(String name) { * @param name the name to be resolved * @param addresses the resolved addresses */ - default void onHostnameResolutionSuccess(String name, List addresses) { + default void onHostnameResolutionSuccess(String name, List addresses) { } /** @@ -153,7 +153,7 @@ default void onHostnameResolutionFailure(String name, Throwable cause) { * * @param remoteAddress the address we try to connect to */ - default void onTcpConnectAttempt(InetSocketAddress remoteAddress) { + default void onTcpConnectAttempt(SocketAddress remoteAddress) { } /** @@ -162,7 +162,7 @@ default void onTcpConnectAttempt(InetSocketAddress remoteAddress) { * @param remoteAddress the address we try to connect to * @param connection the connection */ - default void onTcpConnectSuccess(InetSocketAddress remoteAddress, Channel connection) { + default void onTcpConnectSuccess(SocketAddress remoteAddress, Channel connection) { } /** @@ -173,7 +173,7 @@ default void onTcpConnectSuccess(InetSocketAddress remoteAddress, Channel connec * @param remoteAddress the address we try to connect to * @param cause the cause of the failure */ - default void onTcpConnectFailure(InetSocketAddress remoteAddress, Throwable cause) { + default void onTcpConnectFailure(SocketAddress remoteAddress, Throwable cause) { } // ////////////// TLS /////////////// diff --git a/client/src/main/java/org/asynchttpclient/AsyncHttpClientConfig.java b/client/src/main/java/org/asynchttpclient/AsyncHttpClientConfig.java index 862aa2ce9f..be7c20bdf8 100644 --- a/client/src/main/java/org/asynchttpclient/AsyncHttpClientConfig.java +++ b/client/src/main/java/org/asynchttpclient/AsyncHttpClientConfig.java @@ -294,6 +294,13 @@ public interface AsyncHttpClientConfig { boolean isUseNativeTransport(); + String getUnixSocket(); + + default boolean isUseUnixDomain(){ + String unixSocket = getUnixSocket(); + return unixSocket !=null && !unixSocket.isEmpty(); + } + Consumer getHttpAdditionalChannelInitializer(); Consumer getWsAdditionalChannelInitializer(); diff --git a/client/src/main/java/org/asynchttpclient/DefaultAsyncHttpClientConfig.java b/client/src/main/java/org/asynchttpclient/DefaultAsyncHttpClientConfig.java index d26612fb6d..8d4552a358 100644 --- a/client/src/main/java/org/asynchttpclient/DefaultAsyncHttpClientConfig.java +++ b/client/src/main/java/org/asynchttpclient/DefaultAsyncHttpClientConfig.java @@ -120,6 +120,7 @@ public class DefaultAsyncHttpClientConfig implements AsyncHttpClientConfig { private final Map, Object> channelOptions; private final EventLoopGroup eventLoopGroup; private final boolean useNativeTransport; + private final String unixSocket; private final ByteBufAllocator allocator; private final boolean tcpNoDelay; private final boolean soReuseAddress; @@ -209,6 +210,7 @@ private DefaultAsyncHttpClientConfig(// http Map, Object> channelOptions, EventLoopGroup eventLoopGroup, boolean useNativeTransport, + String unixSocket, ByteBufAllocator allocator, Timer nettyTimer, ThreadFactory threadFactory, @@ -295,6 +297,7 @@ private DefaultAsyncHttpClientConfig(// http this.channelOptions = channelOptions; this.eventLoopGroup = eventLoopGroup; this.useNativeTransport = useNativeTransport; + this.unixSocket = unixSocket; this.allocator = allocator; this.nettyTimer = nettyTimer; this.threadFactory = threadFactory; @@ -621,6 +624,11 @@ public boolean isUseNativeTransport() { return useNativeTransport; } + @Override + public String getUnixSocket() { + return unixSocket; + } + @Override public ByteBufAllocator getAllocator() { return allocator; @@ -738,6 +746,7 @@ public static class Builder { private int httpClientCodecInitialBufferSize = defaultHttpClientCodecInitialBufferSize(); private int chunkedFileChunkSize = defaultChunkedFileChunkSize(); private boolean useNativeTransport = defaultUseNativeTransport(); + private String unixSocket = defaultUnixSocket(); private ByteBufAllocator allocator; private Map, Object> channelOptions = new HashMap<>(); private EventLoopGroup eventLoopGroup; @@ -821,6 +830,7 @@ public Builder(AsyncHttpClientConfig config) { channelOptions.putAll(config.getChannelOptions()); eventLoopGroup = config.getEventLoopGroup(); useNativeTransport = config.isUseNativeTransport(); + unixSocket = config.getUnixSocket(); allocator = config.getAllocator(); nettyTimer = config.getNettyTimer(); threadFactory = config.getThreadFactory(); @@ -1189,6 +1199,12 @@ public Builder setUseNativeTransport(boolean useNativeTransport) { return this; } + public Builder setUnixSocket(String unixSocket) { + setUseNativeTransport(true); + this.unixSocket = unixSocket; + return this; + } + public Builder setAllocator(ByteBufAllocator allocator) { this.allocator = allocator; return this; @@ -1301,6 +1317,7 @@ public DefaultAsyncHttpClientConfig build() { channelOptions.isEmpty() ? Collections.emptyMap() : Collections.unmodifiableMap(channelOptions), eventLoopGroup, useNativeTransport, + unixSocket, allocator, nettyTimer, threadFactory, diff --git a/client/src/main/java/org/asynchttpclient/DefaultRequest.java b/client/src/main/java/org/asynchttpclient/DefaultRequest.java index 4cabb41792..142c701b7d 100644 --- a/client/src/main/java/org/asynchttpclient/DefaultRequest.java +++ b/client/src/main/java/org/asynchttpclient/DefaultRequest.java @@ -13,6 +13,7 @@ */ package org.asynchttpclient; +import io.netty.channel.unix.DomainSocketAddress; import io.netty.handler.codec.http.HttpHeaders; import io.netty.handler.codec.http.cookie.Cookie; import io.netty.resolver.NameResolver; @@ -25,6 +26,8 @@ import java.io.File; import java.io.InputStream; import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.util.ArrayList; @@ -39,8 +42,8 @@ public class DefaultRequest implements Request { public final ProxyServer proxyServer; private final String method; private final Uri uri; - private final InetAddress address; - private final InetAddress localAddress; + private final SocketAddress address; + private final SocketAddress localAddress; private final HttpHeaders headers; private final List cookies; private final byte[] byteData; @@ -61,13 +64,14 @@ public class DefaultRequest implements Request { private final Charset charset; private final ChannelPoolPartitioning channelPoolPartitioning; private final NameResolver nameResolver; + private final NameResolver domainNameResolver; // lazily loaded private List queryParams; public DefaultRequest(String method, Uri uri, - InetAddress address, - InetAddress localAddress, + SocketAddress address, + SocketAddress localAddress, HttpHeaders headers, List cookies, byte[] byteData, @@ -88,7 +92,8 @@ public DefaultRequest(String method, long rangeOffset, Charset charset, ChannelPoolPartitioning channelPoolPartitioning, - NameResolver nameResolver) { + NameResolver nameResolver, + NameResolver domainNameResolver) { this.method = method; this.uri = uri; this.address = address; @@ -114,6 +119,7 @@ public DefaultRequest(String method, this.charset = charset; this.channelPoolPartitioning = channelPoolPartitioning; this.nameResolver = nameResolver; + this.domainNameResolver = domainNameResolver; } @Override @@ -133,11 +139,27 @@ public Uri getUri() { @Override public InetAddress getAddress() { - return address; + if (!(address instanceof InetSocketAddress)) { + throw new IllegalArgumentException("address can't cast to InetAddress, please use the method of getSocketAddress"); + } + return ((InetSocketAddress) address).getAddress(); } @Override public InetAddress getLocalAddress() { + if (!(localAddress instanceof InetSocketAddress)) { + throw new IllegalArgumentException("localAddress can't cast to InetAddress, please use the method of getLocalSocketAddress"); + } + return ((InetSocketAddress) localAddress).getAddress(); + } + + @Override + public SocketAddress getSocketAddress() { + return address; + } + + @Override + public SocketAddress getLocalSocketAddress() { return localAddress; } @@ -246,6 +268,11 @@ public NameResolver getNameResolver() { return nameResolver; } + @Override + public NameResolver getDomainNameResolver() { + return domainNameResolver; + } + @Override public List getQueryParams() { if (queryParams == null) diff --git a/client/src/main/java/org/asynchttpclient/Request.java b/client/src/main/java/org/asynchttpclient/Request.java index cf6a82dee2..97de025003 100644 --- a/client/src/main/java/org/asynchttpclient/Request.java +++ b/client/src/main/java/org/asynchttpclient/Request.java @@ -16,6 +16,7 @@ */ package org.asynchttpclient; +import io.netty.channel.unix.DomainSocketAddress; import io.netty.handler.codec.http.HttpHeaders; import io.netty.handler.codec.http.cookie.Cookie; import io.netty.resolver.NameResolver; @@ -28,6 +29,7 @@ import java.io.File; import java.io.InputStream; import java.net.InetAddress; +import java.net.SocketAddress; import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.util.List; @@ -62,7 +64,7 @@ public interface Request { String getUrl(); /** - * @return the InetAddress to be used to bypass uri's hostname resolution + * @return the InetAddress to be used to bypass uri's hostname or unix domain path resolution */ InetAddress getAddress(); @@ -71,6 +73,16 @@ public interface Request { */ InetAddress getLocalAddress(); + /** + * @return the local address to bind from + */ + SocketAddress getLocalSocketAddress(); + + /** + * @return the SocketAddress to be used to bypass uri's hostname or unix domain path resolution + */ + SocketAddress getSocketAddress(); + /** * @return the HTTP headers */ @@ -181,6 +193,11 @@ public interface Request { */ NameResolver getNameResolver(); + /** + * @return the NameResolver to be used to resolve hostnams's IP + */ + NameResolver getDomainNameResolver(); + /** * @return a new request builder using this request as a prototype */ diff --git a/client/src/main/java/org/asynchttpclient/RequestBuilderBase.java b/client/src/main/java/org/asynchttpclient/RequestBuilderBase.java index 35c8145776..13d3fc474d 100644 --- a/client/src/main/java/org/asynchttpclient/RequestBuilderBase.java +++ b/client/src/main/java/org/asynchttpclient/RequestBuilderBase.java @@ -16,6 +16,7 @@ package org.asynchttpclient; import io.netty.buffer.ByteBuf; +import io.netty.channel.unix.DomainSocketAddress; import io.netty.handler.codec.http.DefaultHttpHeaders; import io.netty.handler.codec.http.HttpHeaders; import io.netty.handler.codec.http.cookie.Cookie; @@ -27,6 +28,7 @@ import org.asynchttpclient.request.body.generator.BodyGenerator; import org.asynchttpclient.request.body.generator.ReactiveStreamsBodyGenerator; import org.asynchttpclient.request.body.multipart.Part; +import org.asynchttpclient.resolver.DefaultDomainNameResolver; import org.asynchttpclient.uri.Uri; import org.asynchttpclient.util.UriEncoder; import org.reactivestreams.Publisher; @@ -36,6 +38,8 @@ import java.io.File; import java.io.InputStream; import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.util.*; @@ -56,6 +60,7 @@ public abstract class RequestBuilderBase> { private final static Logger LOGGER = LoggerFactory.getLogger(RequestBuilderBase.class); private static final Uri DEFAULT_REQUEST_URL = Uri.create("http://localhost"); public static NameResolver DEFAULT_NAME_RESOLVER = new DefaultNameResolver(ImmediateEventExecutor.INSTANCE); + public static NameResolver DEFAULT_DOMAIN_NAME_RESOLVER = new DefaultDomainNameResolver(ImmediateEventExecutor.INSTANCE); // builder only fields protected UriEncoder uriEncoder; protected List queryParams; @@ -64,8 +69,8 @@ public abstract class RequestBuilderBase> { // request fields protected String method; protected Uri uri; - protected InetAddress address; - protected InetAddress localAddress; + protected SocketAddress address; + protected SocketAddress localAddress; protected HttpHeaders headers; protected ArrayList cookies; protected byte[] byteData; @@ -87,6 +92,7 @@ public abstract class RequestBuilderBase> { protected Charset charset; protected ChannelPoolPartitioning channelPoolPartitioning = ChannelPoolPartitioning.PerHostChannelPoolPartitioning.INSTANCE; protected NameResolver nameResolver = DEFAULT_NAME_RESOLVER; + protected NameResolver domainNameResolver = DEFAULT_DOMAIN_NAME_RESOLVER; protected RequestBuilderBase(String method, boolean disableUrlEncoding) { this(method, disableUrlEncoding, true); @@ -106,8 +112,8 @@ protected RequestBuilderBase(Request prototype, boolean disableUrlEncoding, bool this.method = prototype.getMethod(); this.uriEncoder = UriEncoder.uriEncoder(disableUrlEncoding); this.uri = prototype.getUri(); - this.address = prototype.getAddress(); - this.localAddress = prototype.getLocalAddress(); + this.address = prototype.getSocketAddress(); + this.localAddress = prototype.getLocalSocketAddress(); this.headers = new DefaultHttpHeaders(validateHeaders); this.headers.add(prototype.getHeaders()); if (isNonEmpty(prototype.getCookies())) { @@ -136,6 +142,7 @@ protected RequestBuilderBase(Request prototype, boolean disableUrlEncoding, bool this.charset = prototype.getCharset(); this.channelPoolPartitioning = prototype.getChannelPoolPartitioning(); this.nameResolver = prototype.getNameResolver(); + this.domainNameResolver = prototype.getDomainNameResolver(); } @SuppressWarnings("unchecked") @@ -153,11 +160,21 @@ public T setUri(Uri uri) { } public T setAddress(InetAddress address) { + this.address = new InetSocketAddress(address,0); + return asDerivedType(); + } + + public T setAddress(SocketAddress address) { this.address = address; return asDerivedType(); } public T setLocalAddress(InetAddress address) { + this.localAddress = new InetSocketAddress(address,0); + return asDerivedType(); + } + + public T setLocalAddress(SocketAddress address) { this.localAddress = address; return asDerivedType(); } @@ -534,6 +551,11 @@ public T setNameResolver(NameResolver nameResolver) { return asDerivedType(); } + public T setDomainNameResolver(NameResolver nameResolver) { + this.domainNameResolver = nameResolver; + return asDerivedType(); + } + public T setSignatureCalculator(SignatureCalculator signatureCalculator) { this.signatureCalculator = signatureCalculator; return asDerivedType(); @@ -580,6 +602,7 @@ private RequestBuilderBase executeSignatureCalculator() { rb.charset = this.charset; rb.channelPoolPartitioning = this.channelPoolPartitioning; rb.nameResolver = this.nameResolver; + rb.domainNameResolver = this.domainNameResolver; Request unsignedRequest = rb.build(); signatureCalculator.calculateAndAddSignature(unsignedRequest, rb); return rb; @@ -642,6 +665,7 @@ public Request build() { rb.rangeOffset, rb.charset, rb.channelPoolPartitioning, - rb.nameResolver); + rb.nameResolver, + rb.domainNameResolver); } } diff --git a/client/src/main/java/org/asynchttpclient/channel/DefaultKeepAliveStrategy.java b/client/src/main/java/org/asynchttpclient/channel/DefaultKeepAliveStrategy.java index f1c6a5f42f..e90f0ce76f 100644 --- a/client/src/main/java/org/asynchttpclient/channel/DefaultKeepAliveStrategy.java +++ b/client/src/main/java/org/asynchttpclient/channel/DefaultKeepAliveStrategy.java @@ -5,7 +5,7 @@ import io.netty.handler.codec.http.HttpUtil; import org.asynchttpclient.Request; -import java.net.InetSocketAddress; +import java.net.SocketAddress; import static io.netty.handler.codec.http.HttpHeaderValues.CLOSE; @@ -18,7 +18,7 @@ public class DefaultKeepAliveStrategy implements KeepAliveStrategy { * Implemented in accordance with RFC 7230 section 6.1 https://tools.ietf.org/html/rfc7230#section-6.1 */ @Override - public boolean keepAlive(InetSocketAddress remoteAddress, Request ahcRequest, HttpRequest request, HttpResponse response) { + public boolean keepAlive(SocketAddress remoteAddress, Request ahcRequest, HttpRequest request, HttpResponse response) { return HttpUtil.isKeepAlive(response) && HttpUtil.isKeepAlive(request) // support non standard Proxy-Connection diff --git a/client/src/main/java/org/asynchttpclient/channel/KeepAliveStrategy.java b/client/src/main/java/org/asynchttpclient/channel/KeepAliveStrategy.java index c748fe76ac..002a8aaef7 100644 --- a/client/src/main/java/org/asynchttpclient/channel/KeepAliveStrategy.java +++ b/client/src/main/java/org/asynchttpclient/channel/KeepAliveStrategy.java @@ -17,18 +17,18 @@ import io.netty.handler.codec.http.HttpResponse; import org.asynchttpclient.Request; -import java.net.InetSocketAddress; +import java.net.SocketAddress; public interface KeepAliveStrategy { /** * Determines whether the connection should be kept alive after this HTTP message exchange. * - * @param remoteAddress the remote InetSocketAddress associated with the request + * @param remoteAddress the remote SocketAddress associated with the request * @param ahcRequest the Request, as built by AHC * @param nettyRequest the HTTP request sent to Netty * @param nettyResponse the HTTP response received from Netty * @return true if the connection should be kept alive, false if it should be closed. */ - boolean keepAlive(InetSocketAddress remoteAddress, Request ahcRequest, HttpRequest nettyRequest, HttpResponse nettyResponse); + boolean keepAlive(SocketAddress remoteAddress, Request ahcRequest, HttpRequest nettyRequest, HttpResponse nettyResponse); } diff --git a/client/src/main/java/org/asynchttpclient/config/AsyncHttpClientConfigDefaults.java b/client/src/main/java/org/asynchttpclient/config/AsyncHttpClientConfigDefaults.java index fa073bc82f..8b1abd4f7f 100644 --- a/client/src/main/java/org/asynchttpclient/config/AsyncHttpClientConfigDefaults.java +++ b/client/src/main/java/org/asynchttpclient/config/AsyncHttpClientConfigDefaults.java @@ -69,6 +69,7 @@ public final class AsyncHttpClientConfigDefaults { public static final String SHUTDOWN_QUIET_PERIOD_CONFIG = "shutdownQuietPeriod"; public static final String SHUTDOWN_TIMEOUT_CONFIG = "shutdownTimeout"; public static final String USE_NATIVE_TRANSPORT_CONFIG = "useNativeTransport"; + public static final String UNIX_SOCKET = "unixSocket"; public static final String IO_THREADS_COUNT_CONFIG = "ioThreadsCount"; public static final String AHC_VERSION; @@ -286,6 +287,10 @@ public static boolean defaultUseNativeTransport() { return AsyncHttpClientConfigHelper.getAsyncHttpClientConfig().getBoolean(ASYNC_CLIENT_CONFIG_ROOT + USE_NATIVE_TRANSPORT_CONFIG); } + public static String defaultUnixSocket() { + return AsyncHttpClientConfigHelper.getAsyncHttpClientConfig().getString(ASYNC_CLIENT_CONFIG_ROOT + UNIX_SOCKET); + } + public static int defaultIoThreadsCount() { return AsyncHttpClientConfigHelper.getAsyncHttpClientConfig().getInt(ASYNC_CLIENT_CONFIG_ROOT + IO_THREADS_COUNT_CONFIG); } diff --git a/client/src/main/java/org/asynchttpclient/netty/channel/ChannelManager.java b/client/src/main/java/org/asynchttpclient/netty/channel/ChannelManager.java index 046b1d9e4f..f249112448 100755 --- a/client/src/main/java/org/asynchttpclient/netty/channel/ChannelManager.java +++ b/client/src/main/java/org/asynchttpclient/netty/channel/ChannelManager.java @@ -125,7 +125,7 @@ public ChannelManager(final AsyncHttpClientConfig config, Timer nettyTimer) { TransportFactory transportFactory; if (allowReleaseEventLoopGroup) { if (config.isUseNativeTransport()) { - transportFactory = getNativeTransportFactory(); + transportFactory = config.isUseUnixDomain() ? getDomainTransportFactory() : getNativeTransportFactory(); } else { transportFactory = NioTransportFactory.INSTANCE; } @@ -135,11 +135,14 @@ public ChannelManager(final AsyncHttpClientConfig config, Timer nettyTimer) { eventLoopGroup = config.getEventLoopGroup(); if (eventLoopGroup instanceof NioEventLoopGroup) { + if (config.isUseUnixDomain()){ + throw new IllegalArgumentException("Unix domain socket not support NioEventLoopGroup now!"); + } transportFactory = NioTransportFactory.INSTANCE; } else if (eventLoopGroup instanceof EpollEventLoopGroup) { - transportFactory = new EpollTransportFactory(); + transportFactory = config.isUseUnixDomain() ? new EpollDomainTransportFactory() : new EpollTransportFactory(); } else if (eventLoopGroup instanceof KQueueEventLoopGroup) { - transportFactory = new KQueueTransportFactory(); + transportFactory = config.isUseUnixDomain()? new KQueueDomainTransportFactory():new KQueueTransportFactory(); } else { throw new IllegalArgumentException("Unknown event loop group " + eventLoopGroup.getClass().getSimpleName()); } @@ -195,14 +198,27 @@ private Bootstrap newBootstrap(ChannelFactory channelFactory, } else if (!PlatformDependent.isWindows()) { nativeTransportFactoryClassName = "org.asynchttpclient.netty.channel.EpollTransportFactory"; } + return loadNativeTransportFactory(nativeTransportFactoryClassName); + } + private TransportFactory loadNativeTransportFactory(String nativeTransportFactoryClassName) { try { if (nativeTransportFactoryClassName != null) { return (TransportFactory) Class.forName(nativeTransportFactoryClassName).newInstance(); } } catch (Exception e) { + e.printStackTrace(); + } + throw new IllegalArgumentException("No suitable native transport available"); + } + private TransportFactory getDomainTransportFactory() { + String nativeTransportFactoryClassName = null; + if (PlatformDependent.isOsx()) { + nativeTransportFactoryClassName = "org.asynchttpclient.netty.channel.KQueueDomainTransportFactory"; + } else if (!PlatformDependent.isWindows()) { + nativeTransportFactoryClassName = "org.asynchttpclient.netty.channel.EpollDomainTransportFactory"; } - throw new IllegalArgumentException("No suitable native transport (epoll or kqueue) available"); + return loadNativeTransportFactory(nativeTransportFactoryClassName); } public void configureBootstraps(NettyRequestSender requestSender) { diff --git a/client/src/main/java/org/asynchttpclient/netty/channel/EpollDomainTransportFactory.java b/client/src/main/java/org/asynchttpclient/netty/channel/EpollDomainTransportFactory.java new file mode 100644 index 0000000000..6b381e38cc --- /dev/null +++ b/client/src/main/java/org/asynchttpclient/netty/channel/EpollDomainTransportFactory.java @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2016 AsyncHttpClient Project. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at + * http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package org.asynchttpclient.netty.channel; + +import io.netty.channel.epoll.EpollDomainSocketChannel; +import io.netty.channel.epoll.EpollEventLoopGroup; +import org.asynchttpclient.util.ReflectionUtil; + +import java.util.concurrent.ThreadFactory; + +class EpollDomainTransportFactory implements TransportFactory { + + EpollDomainTransportFactory() { + ReflectionUtil.loadEpollClass(); + } + + @Override + public EpollDomainSocketChannel newChannel() { + return new EpollDomainSocketChannel(); + } + + @Override + public EpollEventLoopGroup newEventLoopGroup(int ioThreadsCount, ThreadFactory threadFactory) { + return new EpollEventLoopGroup(ioThreadsCount, threadFactory); + } +} diff --git a/client/src/main/java/org/asynchttpclient/netty/channel/EpollTransportFactory.java b/client/src/main/java/org/asynchttpclient/netty/channel/EpollTransportFactory.java index 8f84272916..29570fd3fb 100644 --- a/client/src/main/java/org/asynchttpclient/netty/channel/EpollTransportFactory.java +++ b/client/src/main/java/org/asynchttpclient/netty/channel/EpollTransportFactory.java @@ -13,23 +13,16 @@ */ package org.asynchttpclient.netty.channel; -import io.netty.channel.epoll.Epoll; import io.netty.channel.epoll.EpollEventLoopGroup; import io.netty.channel.epoll.EpollSocketChannel; +import org.asynchttpclient.util.ReflectionUtil; import java.util.concurrent.ThreadFactory; class EpollTransportFactory implements TransportFactory { EpollTransportFactory() { - try { - Class.forName("io.netty.channel.epoll.Epoll"); - } catch (ClassNotFoundException e) { - throw new IllegalStateException("The epoll transport is not available"); - } - if (!Epoll.isAvailable()) { - throw new IllegalStateException("The epoll transport is not supported"); - } + ReflectionUtil.loadEpollClass(); } @Override diff --git a/client/src/main/java/org/asynchttpclient/netty/channel/KQueueDomainTransportFactory.java b/client/src/main/java/org/asynchttpclient/netty/channel/KQueueDomainTransportFactory.java new file mode 100644 index 0000000000..eb23fd3ab5 --- /dev/null +++ b/client/src/main/java/org/asynchttpclient/netty/channel/KQueueDomainTransportFactory.java @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2019 AsyncHttpClient Project. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at + * http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package org.asynchttpclient.netty.channel; + +import io.netty.channel.kqueue.KQueueDomainSocketChannel; +import io.netty.channel.kqueue.KQueueEventLoopGroup; +import org.asynchttpclient.util.ReflectionUtil; + +import java.util.concurrent.ThreadFactory; + +class KQueueDomainTransportFactory implements TransportFactory { + + KQueueDomainTransportFactory() { + ReflectionUtil.loadKQueueClass(); + } + + @Override + public KQueueDomainSocketChannel newChannel() { + return new KQueueDomainSocketChannel(); + } + + @Override + public KQueueEventLoopGroup newEventLoopGroup(int ioThreadsCount, ThreadFactory threadFactory) { + return new KQueueEventLoopGroup(ioThreadsCount, threadFactory); + } +} diff --git a/client/src/main/java/org/asynchttpclient/netty/channel/KQueueTransportFactory.java b/client/src/main/java/org/asynchttpclient/netty/channel/KQueueTransportFactory.java index f54fe46157..11ab74884f 100644 --- a/client/src/main/java/org/asynchttpclient/netty/channel/KQueueTransportFactory.java +++ b/client/src/main/java/org/asynchttpclient/netty/channel/KQueueTransportFactory.java @@ -13,23 +13,16 @@ */ package org.asynchttpclient.netty.channel; -import io.netty.channel.kqueue.KQueue; import io.netty.channel.kqueue.KQueueEventLoopGroup; import io.netty.channel.kqueue.KQueueSocketChannel; +import org.asynchttpclient.util.ReflectionUtil; import java.util.concurrent.ThreadFactory; class KQueueTransportFactory implements TransportFactory { KQueueTransportFactory() { - try { - Class.forName("io.netty.channel.kqueue.KQueue"); - } catch (ClassNotFoundException e) { - throw new IllegalStateException("The kqueue transport is not available"); - } - if (!KQueue.isAvailable()) { - throw new IllegalStateException("The kqueue transport is not supported"); - } + ReflectionUtil.loadKQueueClass(); } @Override diff --git a/client/src/main/java/org/asynchttpclient/netty/channel/NettyChannelConnector.java b/client/src/main/java/org/asynchttpclient/netty/channel/NettyChannelConnector.java index 8951bd062e..92a5b31012 100644 --- a/client/src/main/java/org/asynchttpclient/netty/channel/NettyChannelConnector.java +++ b/client/src/main/java/org/asynchttpclient/netty/channel/NettyChannelConnector.java @@ -20,8 +20,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.net.InetAddress; -import java.net.InetSocketAddress; +import java.net.SocketAddress; import java.util.List; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; @@ -34,16 +33,16 @@ public class NettyChannelConnector { .newUpdater(NettyChannelConnector.class, "i"); private final AsyncHandler asyncHandler; - private final InetSocketAddress localAddress; - private final List remoteAddresses; + private final SocketAddress localAddress; + private final List remoteAddresses; private final AsyncHttpClientState clientState; private volatile int i = 0; - public NettyChannelConnector(InetAddress localAddress, - List remoteAddresses, + public NettyChannelConnector(SocketAddress localAddress, + List remoteAddresses, AsyncHandler asyncHandler, AsyncHttpClientState clientState) { - this.localAddress = localAddress != null ? new InetSocketAddress(localAddress, 0) : null; + this.localAddress = localAddress ; this.remoteAddresses = remoteAddresses; this.asyncHandler = asyncHandler; this.clientState = clientState; @@ -55,7 +54,7 @@ private boolean pickNextRemoteAddress() { } public void connect(final Bootstrap bootstrap, final NettyConnectListener connectListener) { - final InetSocketAddress remoteAddress = remoteAddresses.get(i); + final SocketAddress remoteAddress = remoteAddresses.get(i); try { asyncHandler.onTcpConnectAttempt(remoteAddress); @@ -76,7 +75,7 @@ public void connect(final Bootstrap bootstrap, final NettyConnectListener con } } - private void connect0(Bootstrap bootstrap, final NettyConnectListener connectListener, InetSocketAddress remoteAddress) { + private void connect0(Bootstrap bootstrap, final NettyConnectListener connectListener, SocketAddress remoteAddress) { bootstrap.connect(remoteAddress, localAddress) .addListener(new SimpleChannelFutureListener() { diff --git a/client/src/main/java/org/asynchttpclient/netty/channel/NettyConnectListener.java b/client/src/main/java/org/asynchttpclient/netty/channel/NettyConnectListener.java index 4a6f4dce20..997b1c1b9b 100755 --- a/client/src/main/java/org/asynchttpclient/netty/channel/NettyConnectListener.java +++ b/client/src/main/java/org/asynchttpclient/netty/channel/NettyConnectListener.java @@ -29,7 +29,7 @@ import org.slf4j.LoggerFactory; import java.net.ConnectException; -import java.net.InetSocketAddress; +import java.net.SocketAddress; /** * Non Blocking connect. @@ -80,7 +80,7 @@ private void writeRequest(Channel channel) { requestSender.writeRequest(future, channel); } - public void onSuccess(Channel channel, InetSocketAddress remoteAddress) { + public void onSuccess(Channel channel, SocketAddress remoteAddress) { if (connectionSemaphore != null) { // transfer lock from future to channel diff --git a/client/src/main/java/org/asynchttpclient/netty/handler/HttpHandler.java b/client/src/main/java/org/asynchttpclient/netty/handler/HttpHandler.java index a52f75fc83..db9fb7cc24 100755 --- a/client/src/main/java/org/asynchttpclient/netty/handler/HttpHandler.java +++ b/client/src/main/java/org/asynchttpclient/netty/handler/HttpHandler.java @@ -30,7 +30,6 @@ import org.asynchttpclient.netty.request.NettyRequestSender; import java.io.IOException; -import java.net.InetSocketAddress; @Sharable public final class HttpHandler extends AsyncHttpClientHandler { @@ -70,7 +69,7 @@ private void handleHttpResponse(final HttpResponse response, final Channel chann HttpRequest httpRequest = future.getNettyRequest().getHttpRequest(); logger.debug("\n\nRequest {}\n\nResponse {}\n", httpRequest, response); - future.setKeepAlive(config.getKeepAliveStrategy().keepAlive((InetSocketAddress) channel.remoteAddress(), future.getTargetRequest(), httpRequest, response)); + future.setKeepAlive(config.getKeepAliveStrategy().keepAlive(channel.remoteAddress(), future.getTargetRequest(), httpRequest, response)); NettyResponseStatus status = new NettyResponseStatus(future.getUri(), response, channel); HttpHeaders responseHeaders = response.headers(); diff --git a/client/src/main/java/org/asynchttpclient/netty/handler/intercept/Redirect30xInterceptor.java b/client/src/main/java/org/asynchttpclient/netty/handler/intercept/Redirect30xInterceptor.java index d56b90fd24..b0e6002eeb 100644 --- a/client/src/main/java/org/asynchttpclient/netty/handler/intercept/Redirect30xInterceptor.java +++ b/client/src/main/java/org/asynchttpclient/netty/handler/intercept/Redirect30xInterceptor.java @@ -93,8 +93,9 @@ public boolean exitAfterHandlingRedirect(Channel channel, final RequestBuilder requestBuilder = new RequestBuilder(switchToGet ? GET : originalMethod) .setChannelPoolPartitioning(request.getChannelPoolPartitioning()) .setFollowRedirect(true) - .setLocalAddress(request.getLocalAddress()) + .setLocalAddress(request.getLocalSocketAddress()) .setNameResolver(request.getNameResolver()) + .setDomainNameResolver(request.getDomainNameResolver()) .setProxyServer(request.getProxyServer()) .setRealm(request.getRealm()) .setRequestTimeout(request.getRequestTimeout()); diff --git a/client/src/main/java/org/asynchttpclient/netty/request/NettyRequestSender.java b/client/src/main/java/org/asynchttpclient/netty/request/NettyRequestSender.java index 32720acc10..2cceb93b8c 100755 --- a/client/src/main/java/org/asynchttpclient/netty/request/NettyRequestSender.java +++ b/client/src/main/java/org/asynchttpclient/netty/request/NettyRequestSender.java @@ -18,6 +18,7 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelProgressivePromise; import io.netty.channel.ChannelPromise; +import io.netty.channel.unix.DomainSocketAddress; import io.netty.handler.codec.http.*; import io.netty.util.Timer; import io.netty.util.concurrent.Future; @@ -46,6 +47,7 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.net.SocketAddress; +import java.util.Arrays; import java.util.List; import static io.netty.handler.codec.http.HttpHeaderNames.EXPECT; @@ -239,7 +241,7 @@ private ListenableFuture sendRequestWithOpenChannel(NettyResponseFuture ListenableFuture sendRequestWithNewChannel(Request request, // exit and don't try to resolve address return future; } + if (config.isUseUnixDomain()){ + resolveDomainAddresses(request, proxy, future, asyncHandler) + .addListener(new SimpleFutureListener>() { + + @Override + protected void onSuccess(List addresses) { + NettyConnectListener connectListener = new NettyConnectListener<>(future, + NettyRequestSender.this, channelManager, connectionSemaphore); + NettyChannelConnector connector = new NettyChannelConnector(request.getLocalSocketAddress(), + addresses, asyncHandler, clientState); + if (!future.isDone()) { + // Do not throw an exception when we need an extra connection for a redirect + // FIXME why? This violate the max connection per host handling, right? + channelManager.getBootstrap(request.getUri(), null, null) + .addListener((Future whenBootstrap) -> { + if (whenBootstrap.isSuccess()) { + connector.connect(whenBootstrap.get(), connectListener); + } else { + abort(null, future, whenBootstrap.cause()); + } + }); + } + } - resolveAddresses(request, proxy, future, asyncHandler) - .addListener(new SimpleFutureListener>() { - - @Override - protected void onSuccess(List addresses) { - NettyConnectListener connectListener = new NettyConnectListener<>(future, - NettyRequestSender.this, channelManager, connectionSemaphore); - NettyChannelConnector connector = new NettyChannelConnector(request.getLocalAddress(), - addresses, asyncHandler, clientState); - if (!future.isDone()) { - // Do not throw an exception when we need an extra connection for a redirect - // FIXME why? This violate the max connection per host handling, right? - channelManager.getBootstrap(request.getUri(), request.getNameResolver(), proxy) - .addListener((Future whenBootstrap) -> { - if (whenBootstrap.isSuccess()) { - connector.connect(whenBootstrap.get(), connectListener); - } else { - abort(null, future, whenBootstrap.cause()); - } - }); + @Override + protected void onFailure(Throwable cause) { + abort(null, future, getCause(cause)); + } + }); + }else { + resolveAddresses(request, proxy, future, asyncHandler) + .addListener(new SimpleFutureListener>() { + + @Override + protected void onSuccess(List addresses) { + NettyConnectListener connectListener = new NettyConnectListener<>(future, + NettyRequestSender.this, channelManager, connectionSemaphore); + NettyChannelConnector connector = new NettyChannelConnector(request.getLocalSocketAddress(), + addresses, asyncHandler, clientState); + if (!future.isDone()) { + // Do not throw an exception when we need an extra connection for a redirect + // FIXME why? This violate the max connection per host handling, right? + channelManager.getBootstrap(request.getUri(), request.getNameResolver(), proxy) + .addListener((Future whenBootstrap) -> { + if (whenBootstrap.isSuccess()) { + connector.connect(whenBootstrap.get(), connectListener); + } else { + abort(null, future, whenBootstrap.cause()); + } + }); + } } - } - @Override - protected void onFailure(Throwable cause) { - abort(null, future, getCause(cause)); - } - }); + @Override + protected void onFailure(Throwable cause) { + abort(null, future, getCause(cause)); + } + }); + } return future; } @@ -349,15 +381,40 @@ private Future> resolveAddresses(Request request, InetSocketAddress unresolvedRemoteAddress = InetSocketAddress.createUnresolved(uri.getHost(), port); scheduleRequestTimeout(future, unresolvedRemoteAddress); - if (request.getAddress() != null) { + if (request.getSocketAddress() != null) { // bypass resolution - InetSocketAddress inetSocketAddress = new InetSocketAddress(request.getAddress(), port); - return promise.setSuccess(singletonList(inetSocketAddress)); + InetSocketAddress address = (InetSocketAddress) request.getSocketAddress(); + if (address.getPort() != port){ + address = new InetSocketAddress(address.getAddress(), port); + } + return promise.setSuccess(singletonList(address)); } else { return RequestHostnameResolver.INSTANCE.resolve(request.getNameResolver(), unresolvedRemoteAddress, asyncHandler); } } } + private Future> resolveDomainAddresses(Request request, + ProxyServer proxy, + NettyResponseFuture future, + AsyncHandler asyncHandler) { + + if (proxy != null ) { + throw new IllegalArgumentException("Unix domain socket not support proxy"); + } else { + DomainSocketAddress socketAddress = new DomainSocketAddress(config.getUnixSocket()); + scheduleRequestTimeout(future, socketAddress); + SocketAddress address = request.getSocketAddress(); + if (address != null) { + final Promise> promise = ImmediateEventExecutor.INSTANCE.newPromise(); + if (!(address instanceof DomainSocketAddress)){ + throw new IllegalArgumentException("address must be instance of DomainSocketAddress"); + } + return promise.setSuccess(singletonList((DomainSocketAddress) address)); + } else { + return RequestHostnameResolver.INSTANCE.resolve(request.getDomainNameResolver(), socketAddress, asyncHandler); + } + } + } private NettyResponseFuture newNettyResponseFuture(Request request, AsyncHandler asyncHandler, @@ -442,7 +499,7 @@ private void configureTransferAdapter(AsyncHandler handler, HttpRequest httpR } private void scheduleRequestTimeout(NettyResponseFuture nettyResponseFuture, - InetSocketAddress originalRemoteAddress) { + SocketAddress originalRemoteAddress) { nettyResponseFuture.touch(); TimeoutsHolder timeoutsHolder = new TimeoutsHolder(nettyTimer, nettyResponseFuture, this, config, originalRemoteAddress); diff --git a/client/src/main/java/org/asynchttpclient/netty/timeout/TimeoutTimerTask.java b/client/src/main/java/org/asynchttpclient/netty/timeout/TimeoutTimerTask.java index 034502785c..3e3d1da7f2 100755 --- a/client/src/main/java/org/asynchttpclient/netty/timeout/TimeoutTimerTask.java +++ b/client/src/main/java/org/asynchttpclient/netty/timeout/TimeoutTimerTask.java @@ -13,6 +13,7 @@ */ package org.asynchttpclient.netty.timeout; +import io.netty.channel.unix.DomainSocketAddress; import io.netty.util.TimerTask; import org.asynchttpclient.netty.NettyResponseFuture; import org.asynchttpclient.netty.request.NettyRequestSender; @@ -20,6 +21,7 @@ import org.slf4j.LoggerFactory; import java.net.InetSocketAddress; +import java.net.SocketAddress; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; @@ -54,11 +56,16 @@ public void clean() { } void appendRemoteAddress(StringBuilder sb) { - InetSocketAddress remoteAddress = timeoutsHolder.remoteAddress(); - sb.append(remoteAddress.getHostString()); - if (!remoteAddress.isUnresolved()) { - sb.append('/').append(remoteAddress.getAddress().getHostAddress()); + SocketAddress socketAddress = timeoutsHolder.remoteAddress(); + if (socketAddress instanceof InetSocketAddress){ + InetSocketAddress remoteAddress = (InetSocketAddress) socketAddress; + sb.append(remoteAddress.getHostString()); + if (!remoteAddress.isUnresolved()) { + sb.append('/').append(remoteAddress.getAddress().getHostAddress()); + } + sb.append(':').append(remoteAddress.getPort()); + }else if (socketAddress instanceof DomainSocketAddress){ + sb.append(((DomainSocketAddress) socketAddress).path()); } - sb.append(':').append(remoteAddress.getPort()); } } diff --git a/client/src/main/java/org/asynchttpclient/netty/timeout/TimeoutsHolder.java b/client/src/main/java/org/asynchttpclient/netty/timeout/TimeoutsHolder.java index 89d3faf586..06c2607450 100755 --- a/client/src/main/java/org/asynchttpclient/netty/timeout/TimeoutsHolder.java +++ b/client/src/main/java/org/asynchttpclient/netty/timeout/TimeoutsHolder.java @@ -21,7 +21,7 @@ import org.asynchttpclient.netty.NettyResponseFuture; import org.asynchttpclient.netty.request.NettyRequestSender; -import java.net.InetSocketAddress; +import java.net.SocketAddress; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -37,9 +37,9 @@ public class TimeoutsHolder { private final int readTimeoutValue; private volatile Timeout readTimeout; private volatile NettyResponseFuture nettyResponseFuture; - private volatile InetSocketAddress remoteAddress; + private volatile SocketAddress remoteAddress; - public TimeoutsHolder(Timer nettyTimer, NettyResponseFuture nettyResponseFuture, NettyRequestSender requestSender, AsyncHttpClientConfig config, InetSocketAddress originalRemoteAddress) { + public TimeoutsHolder(Timer nettyTimer, NettyResponseFuture nettyResponseFuture, NettyRequestSender requestSender, AsyncHttpClientConfig config, SocketAddress originalRemoteAddress) { this.nettyTimer = nettyTimer; this.nettyResponseFuture = nettyResponseFuture; this.requestSender = requestSender; @@ -64,11 +64,11 @@ public TimeoutsHolder(Timer nettyTimer, NettyResponseFuture nettyResponseFutu } } - public void setResolvedRemoteAddress(InetSocketAddress address) { + public void setResolvedRemoteAddress(SocketAddress address) { remoteAddress = address; } - InetSocketAddress remoteAddress() { + SocketAddress remoteAddress() { return remoteAddress; } diff --git a/client/src/main/java/org/asynchttpclient/resolver/DefaultDomainNameResolver.java b/client/src/main/java/org/asynchttpclient/resolver/DefaultDomainNameResolver.java new file mode 100644 index 0000000000..7afa36d682 --- /dev/null +++ b/client/src/main/java/org/asynchttpclient/resolver/DefaultDomainNameResolver.java @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2015 AsyncHttpClient Project. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at + * http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package org.asynchttpclient.resolver; + +import io.netty.channel.unix.DomainSocketAddress; +import io.netty.resolver.SimpleNameResolver; +import io.netty.util.concurrent.EventExecutor; +import io.netty.util.concurrent.Promise; +import io.netty.util.internal.SocketUtils; + +import java.util.Arrays; +import java.util.List; + +public class DefaultDomainNameResolver extends SimpleNameResolver { + public DefaultDomainNameResolver(EventExecutor executor) { + super(executor); + } + + @Override + protected void doResolve(String path, Promise promise) throws Exception { + promise.setSuccess(new DomainSocketAddress(path)); + } + + @Override + protected void doResolveAll(String path, Promise> promise) throws Exception { + promise.setSuccess(Arrays.asList(new DomainSocketAddress(path))); + } +} diff --git a/client/src/main/java/org/asynchttpclient/resolver/RequestHostnameResolver.java b/client/src/main/java/org/asynchttpclient/resolver/RequestHostnameResolver.java index da42fcf660..a54b40a682 100644 --- a/client/src/main/java/org/asynchttpclient/resolver/RequestHostnameResolver.java +++ b/client/src/main/java/org/asynchttpclient/resolver/RequestHostnameResolver.java @@ -13,6 +13,7 @@ */ package org.asynchttpclient.resolver; +import io.netty.channel.unix.DomainSocketAddress; import io.netty.resolver.NameResolver; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.ImmediateEventExecutor; @@ -33,6 +34,50 @@ public enum RequestHostnameResolver { private static final Logger LOGGER = LoggerFactory.getLogger(RequestHostnameResolver.class); + public Future> resolve(NameResolver nameResolver, DomainSocketAddress address, AsyncHandler asyncHandler) { + + final Promise> promise = ImmediateEventExecutor.INSTANCE.newPromise(); + + try { + asyncHandler.onHostnameResolutionAttempt(address.path()); + } catch (Exception e) { + LOGGER.error("onHostnameResolutionAttempt crashed", e); + promise.tryFailure(e); + return promise; + } + + final Future> whenResolved = nameResolver.resolveAll(address.path()); + + whenResolved.addListener(new SimpleFutureListener>() { + + @Override + protected void onSuccess(List socketAddresses) { + try { + asyncHandler.onHostnameResolutionSuccess(address.path(), socketAddresses); + } catch (Exception e) { + LOGGER.error("onHostnameResolutionSuccess crashed", e); + promise.tryFailure(e); + return; + } + promise.trySuccess(socketAddresses); + } + + @Override + protected void onFailure(Throwable t) { + try { + asyncHandler.onHostnameResolutionFailure(address.path(), t); + } catch (Exception e) { + LOGGER.error("onHostnameResolutionFailure crashed", e); + promise.tryFailure(e); + return; + } + promise.tryFailure(t); + } + }); + + return promise; + } + public Future> resolve(NameResolver nameResolver, InetSocketAddress unresolvedAddress, AsyncHandler asyncHandler) { final String hostname = unresolvedAddress.getHostString(); diff --git a/client/src/main/java/org/asynchttpclient/util/ReflectionUtil.java b/client/src/main/java/org/asynchttpclient/util/ReflectionUtil.java new file mode 100644 index 0000000000..0e5149d7e9 --- /dev/null +++ b/client/src/main/java/org/asynchttpclient/util/ReflectionUtil.java @@ -0,0 +1,41 @@ +/* + * Copyright (c) 2017 AsyncHttpClient Project. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package org.asynchttpclient.util; + +import io.netty.channel.epoll.Epoll; +import io.netty.channel.kqueue.KQueue; + +public abstract class ReflectionUtil { + + public static void loadEpollClass() { + try { + Class.forName("io.netty.channel.epoll.Epoll"); + } catch (ClassNotFoundException e) { + throw new IllegalStateException("The epoll transport is not available"); + } + if (!Epoll.isAvailable()) { + throw new IllegalStateException("The epoll transport is not supported"); + } + } + + public static void loadKQueueClass() { + try { + Class.forName("io.netty.channel.kqueue.KQueue"); + } catch (ClassNotFoundException e) { + throw new IllegalStateException("The kqueue transport is not available"); + } + if (!KQueue.isAvailable()) { + throw new IllegalStateException("The kqueue transport is not supported"); + } + } +} diff --git a/client/src/test/java/org/asynchttpclient/test/EventCollectingHandler.java b/client/src/test/java/org/asynchttpclient/test/EventCollectingHandler.java index 8047c5f843..9a88ae513d 100644 --- a/client/src/test/java/org/asynchttpclient/test/EventCollectingHandler.java +++ b/client/src/test/java/org/asynchttpclient/test/EventCollectingHandler.java @@ -21,7 +21,7 @@ import org.testng.Assert; import javax.net.ssl.SSLSession; -import java.net.InetSocketAddress; +import java.net.SocketAddress; import java.util.List; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; @@ -94,17 +94,17 @@ public State onContentWritten() { } @Override - public void onTcpConnectAttempt(InetSocketAddress address) { + public void onTcpConnectAttempt(SocketAddress address) { firedEvents.add(CONNECTION_OPEN_EVENT); } @Override - public void onTcpConnectSuccess(InetSocketAddress address, Channel connection) { + public void onTcpConnectSuccess(SocketAddress address, Channel connection) { firedEvents.add(CONNECTION_SUCCESS_EVENT); } @Override - public void onTcpConnectFailure(InetSocketAddress address, Throwable t) { + public void onTcpConnectFailure(SocketAddress address, Throwable t) { firedEvents.add(CONNECTION_FAILURE_EVENT); } @@ -114,7 +114,7 @@ public void onHostnameResolutionAttempt(String name) { } @Override - public void onHostnameResolutionSuccess(String name, List addresses) { + public void onHostnameResolutionSuccess(String name, List addresses) { firedEvents.add(HOSTNAME_RESOLUTION_SUCCESS_EVENT); } diff --git a/example/src/main/java/org/asynchttpclient/example/completable/CompletableFutures.java b/example/src/main/java/org/asynchttpclient/example/completable/CompletableFutures.java index f8a5eb1c0b..88aeff8b1a 100644 --- a/example/src/main/java/org/asynchttpclient/example/completable/CompletableFutures.java +++ b/example/src/main/java/org/asynchttpclient/example/completable/CompletableFutures.java @@ -17,11 +17,13 @@ package org.asynchttpclient.example.completable; import org.asynchttpclient.AsyncHttpClient; +import org.asynchttpclient.DefaultAsyncHttpClientConfig; import org.asynchttpclient.Response; import java.io.IOException; import static org.asynchttpclient.Dsl.asyncHttpClient; +import static org.asynchttpclient.Dsl.config; public class CompletableFutures { public static void main(String[] args) throws IOException { @@ -34,5 +36,25 @@ public static void main(String[] args) throws IOException { .thenAccept(System.out::println) .join(); } +// example of use unix domain socket +// if (!isWindows()) { +// // support unix domain socket +// DefaultAsyncHttpClientConfig.Builder config = config(); +// config.setUnixSocket("/root/server.socket"); // when the unixSocket is set, the useNativeTransport will be true. +// try (AsyncHttpClient asyncHttpClient = asyncHttpClient(config)) { +// asyncHttpClient +// .prepareGet("http://www.example.com/") +// .execute() +// .toCompletableFuture() +// .thenApply(Response::getResponseBody) +// .thenAccept(System.out::println) +// .join(); +// } +// } + + } + + private static boolean isWindows(){ + return System.getProperty("os.name").toUpperCase().contains("WINDOW"); } } diff --git a/extras/rxjava2/src/main/java/org/asynchttpclient/extras/rxjava2/maybe/AbstractMaybeAsyncHandlerBridge.java b/extras/rxjava2/src/main/java/org/asynchttpclient/extras/rxjava2/maybe/AbstractMaybeAsyncHandlerBridge.java index 6a5f8dca7a..528e1f4123 100644 --- a/extras/rxjava2/src/main/java/org/asynchttpclient/extras/rxjava2/maybe/AbstractMaybeAsyncHandlerBridge.java +++ b/extras/rxjava2/src/main/java/org/asynchttpclient/extras/rxjava2/maybe/AbstractMaybeAsyncHandlerBridge.java @@ -27,7 +27,7 @@ import org.slf4j.LoggerFactory; import javax.net.ssl.SSLSession; -import java.net.InetSocketAddress; +import java.net.SocketAddress; import java.util.Arrays; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; @@ -155,7 +155,7 @@ public void onHostnameResolutionAttempt(String name) { } @Override - public void onHostnameResolutionSuccess(String name, List addresses) { + public void onHostnameResolutionSuccess(String name, List addresses) { executeUnlessEmitterDisposed(() -> delegate().onHostnameResolutionSuccess(name, addresses)); } @@ -165,17 +165,17 @@ public void onHostnameResolutionFailure(String name, Throwable cause) { } @Override - public void onTcpConnectAttempt(InetSocketAddress remoteAddress) { + public void onTcpConnectAttempt(SocketAddress remoteAddress) { executeUnlessEmitterDisposed(() -> delegate().onTcpConnectAttempt(remoteAddress)); } @Override - public void onTcpConnectSuccess(InetSocketAddress remoteAddress, Channel connection) { + public void onTcpConnectSuccess(SocketAddress remoteAddress, Channel connection) { executeUnlessEmitterDisposed(() -> delegate().onTcpConnectSuccess(remoteAddress, connection)); } @Override - public void onTcpConnectFailure(InetSocketAddress remoteAddress, Throwable cause) { + public void onTcpConnectFailure(SocketAddress remoteAddress, Throwable cause) { executeUnlessEmitterDisposed(() -> delegate().onTcpConnectFailure(remoteAddress, cause)); } diff --git a/extras/typesafeconfig/src/main/java/org/asynchttpclient/extras/typesafeconfig/AsyncHttpClientTypesafeConfig.java b/extras/typesafeconfig/src/main/java/org/asynchttpclient/extras/typesafeconfig/AsyncHttpClientTypesafeConfig.java index 55c88ab251..92ac848441 100644 --- a/extras/typesafeconfig/src/main/java/org/asynchttpclient/extras/typesafeconfig/AsyncHttpClientTypesafeConfig.java +++ b/extras/typesafeconfig/src/main/java/org/asynchttpclient/extras/typesafeconfig/AsyncHttpClientTypesafeConfig.java @@ -309,6 +309,11 @@ public boolean isUseNativeTransport() { return getBooleanOpt(USE_NATIVE_TRANSPORT_CONFIG).orElse(defaultUseNativeTransport()); } + @Override + public String getUnixSocket() { + return getStringOpt(UNIX_SOCKET).orElse(defaultUnixSocket()); + } + @Override public Consumer getHttpAdditionalChannelInitializer() { return null;