From 9d4135fc439fc2261279c9eba42b02d4eac71697 Mon Sep 17 00:00:00 2001 From: mask <1603849451@qq.com> Date: Sun, 5 Jan 2020 12:16:32 +0800 Subject: [PATCH 1/2] impl uds,without test --- .../org/asynchttpclient/AsyncHandler.java | 9 +- .../AsyncHttpClientConfig.java | 7 ++ .../DefaultAsyncHttpClientConfig.java | 17 +++ .../org/asynchttpclient/DefaultRequest.java | 24 ++-- .../java/org/asynchttpclient/Request.java | 11 +- .../asynchttpclient/RequestBuilderBase.java | 34 +++++- .../channel/DefaultKeepAliveStrategy.java | 3 +- .../channel/KeepAliveStrategy.java | 3 +- .../config/AsyncHttpClientConfigDefaults.java | 5 + .../netty/channel/ChannelManager.java | 35 +++++- .../channel/EpollDomainTransportFactory.java | 44 +++++++ .../channel/KQueueDomainTransportFactory.java | 44 +++++++ .../netty/channel/NettyChannelConnector.java | 15 +-- .../netty/channel/NettyConnectListener.java | 3 +- .../netty/handler/HttpHandler.java | 2 +- .../intercept/Redirect30xInterceptor.java | 7 +- .../netty/request/NettyRequestSender.java | 111 +++++++++++++----- .../netty/timeout/TimeoutTimerTask.java | 17 ++- .../netty/timeout/TimeoutsHolder.java | 9 +- .../resolver/DefaultDomainNameResolver.java | 39 ++++++ .../resolver/RequestHostnameResolver.java | 45 +++++++ .../test/EventCollectingHandler.java | 9 +- .../completable/CompletableFutures.java | 31 +++++ .../AbstractMaybeAsyncHandlerBridge.java | 9 +- .../AsyncHttpClientTypesafeConfig.java | 5 + 25 files changed, 460 insertions(+), 78 deletions(-) create mode 100644 client/src/main/java/org/asynchttpclient/netty/channel/EpollDomainTransportFactory.java create mode 100644 client/src/main/java/org/asynchttpclient/netty/channel/KQueueDomainTransportFactory.java create mode 100644 client/src/main/java/org/asynchttpclient/resolver/DefaultDomainNameResolver.java diff --git a/client/src/main/java/org/asynchttpclient/AsyncHandler.java b/client/src/main/java/org/asynchttpclient/AsyncHandler.java index 6733c94711..8cec31e40d 100644 --- a/client/src/main/java/org/asynchttpclient/AsyncHandler.java +++ b/client/src/main/java/org/asynchttpclient/AsyncHandler.java @@ -21,6 +21,7 @@ import javax.net.ssl.SSLSession; import java.net.InetSocketAddress; +import java.net.SocketAddress; import java.util.List; @@ -132,7 +133,7 @@ default void onHostnameResolutionAttempt(String name) { * @param name the name to be resolved * @param addresses the resolved addresses */ - default void onHostnameResolutionSuccess(String name, List addresses) { + default void onHostnameResolutionSuccess(String name, List addresses) { } /** @@ -153,7 +154,7 @@ default void onHostnameResolutionFailure(String name, Throwable cause) { * * @param remoteAddress the address we try to connect to */ - default void onTcpConnectAttempt(InetSocketAddress remoteAddress) { + default void onTcpConnectAttempt(SocketAddress remoteAddress) { } /** @@ -162,7 +163,7 @@ default void onTcpConnectAttempt(InetSocketAddress remoteAddress) { * @param remoteAddress the address we try to connect to * @param connection the connection */ - default void onTcpConnectSuccess(InetSocketAddress remoteAddress, Channel connection) { + default void onTcpConnectSuccess(SocketAddress remoteAddress, Channel connection) { } /** @@ -173,7 +174,7 @@ default void onTcpConnectSuccess(InetSocketAddress remoteAddress, Channel connec * @param remoteAddress the address we try to connect to * @param cause the cause of the failure */ - default void onTcpConnectFailure(InetSocketAddress remoteAddress, Throwable cause) { + default void onTcpConnectFailure(SocketAddress remoteAddress, Throwable cause) { } // ////////////// TLS /////////////// diff --git a/client/src/main/java/org/asynchttpclient/AsyncHttpClientConfig.java b/client/src/main/java/org/asynchttpclient/AsyncHttpClientConfig.java index 862aa2ce9f..be7c20bdf8 100644 --- a/client/src/main/java/org/asynchttpclient/AsyncHttpClientConfig.java +++ b/client/src/main/java/org/asynchttpclient/AsyncHttpClientConfig.java @@ -294,6 +294,13 @@ public interface AsyncHttpClientConfig { boolean isUseNativeTransport(); + String getUnixSocket(); + + default boolean isUseUnixDomain(){ + String unixSocket = getUnixSocket(); + return unixSocket !=null && !unixSocket.isEmpty(); + } + Consumer getHttpAdditionalChannelInitializer(); Consumer getWsAdditionalChannelInitializer(); diff --git a/client/src/main/java/org/asynchttpclient/DefaultAsyncHttpClientConfig.java b/client/src/main/java/org/asynchttpclient/DefaultAsyncHttpClientConfig.java index d26612fb6d..8d4552a358 100644 --- a/client/src/main/java/org/asynchttpclient/DefaultAsyncHttpClientConfig.java +++ b/client/src/main/java/org/asynchttpclient/DefaultAsyncHttpClientConfig.java @@ -120,6 +120,7 @@ public class DefaultAsyncHttpClientConfig implements AsyncHttpClientConfig { private final Map, Object> channelOptions; private final EventLoopGroup eventLoopGroup; private final boolean useNativeTransport; + private final String unixSocket; private final ByteBufAllocator allocator; private final boolean tcpNoDelay; private final boolean soReuseAddress; @@ -209,6 +210,7 @@ private DefaultAsyncHttpClientConfig(// http Map, Object> channelOptions, EventLoopGroup eventLoopGroup, boolean useNativeTransport, + String unixSocket, ByteBufAllocator allocator, Timer nettyTimer, ThreadFactory threadFactory, @@ -295,6 +297,7 @@ private DefaultAsyncHttpClientConfig(// http this.channelOptions = channelOptions; this.eventLoopGroup = eventLoopGroup; this.useNativeTransport = useNativeTransport; + this.unixSocket = unixSocket; this.allocator = allocator; this.nettyTimer = nettyTimer; this.threadFactory = threadFactory; @@ -621,6 +624,11 @@ public boolean isUseNativeTransport() { return useNativeTransport; } + @Override + public String getUnixSocket() { + return unixSocket; + } + @Override public ByteBufAllocator getAllocator() { return allocator; @@ -738,6 +746,7 @@ public static class Builder { private int httpClientCodecInitialBufferSize = defaultHttpClientCodecInitialBufferSize(); private int chunkedFileChunkSize = defaultChunkedFileChunkSize(); private boolean useNativeTransport = defaultUseNativeTransport(); + private String unixSocket = defaultUnixSocket(); private ByteBufAllocator allocator; private Map, Object> channelOptions = new HashMap<>(); private EventLoopGroup eventLoopGroup; @@ -821,6 +830,7 @@ public Builder(AsyncHttpClientConfig config) { channelOptions.putAll(config.getChannelOptions()); eventLoopGroup = config.getEventLoopGroup(); useNativeTransport = config.isUseNativeTransport(); + unixSocket = config.getUnixSocket(); allocator = config.getAllocator(); nettyTimer = config.getNettyTimer(); threadFactory = config.getThreadFactory(); @@ -1189,6 +1199,12 @@ public Builder setUseNativeTransport(boolean useNativeTransport) { return this; } + public Builder setUnixSocket(String unixSocket) { + setUseNativeTransport(true); + this.unixSocket = unixSocket; + return this; + } + public Builder setAllocator(ByteBufAllocator allocator) { this.allocator = allocator; return this; @@ -1301,6 +1317,7 @@ public DefaultAsyncHttpClientConfig build() { channelOptions.isEmpty() ? Collections.emptyMap() : Collections.unmodifiableMap(channelOptions), eventLoopGroup, useNativeTransport, + unixSocket, allocator, nettyTimer, threadFactory, diff --git a/client/src/main/java/org/asynchttpclient/DefaultRequest.java b/client/src/main/java/org/asynchttpclient/DefaultRequest.java index 4cabb41792..8f2f47fba3 100644 --- a/client/src/main/java/org/asynchttpclient/DefaultRequest.java +++ b/client/src/main/java/org/asynchttpclient/DefaultRequest.java @@ -13,6 +13,7 @@ */ package org.asynchttpclient; +import io.netty.channel.unix.DomainSocketAddress; import io.netty.handler.codec.http.HttpHeaders; import io.netty.handler.codec.http.cookie.Cookie; import io.netty.resolver.NameResolver; @@ -25,6 +26,7 @@ import java.io.File; import java.io.InputStream; import java.net.InetAddress; +import java.net.SocketAddress; import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.util.ArrayList; @@ -39,8 +41,8 @@ public class DefaultRequest implements Request { public final ProxyServer proxyServer; private final String method; private final Uri uri; - private final InetAddress address; - private final InetAddress localAddress; + private final SocketAddress address; + private final SocketAddress localAddress; private final HttpHeaders headers; private final List cookies; private final byte[] byteData; @@ -61,13 +63,14 @@ public class DefaultRequest implements Request { private final Charset charset; private final ChannelPoolPartitioning channelPoolPartitioning; private final NameResolver nameResolver; + private final NameResolver domainNameResolver; // lazily loaded private List queryParams; public DefaultRequest(String method, Uri uri, - InetAddress address, - InetAddress localAddress, + SocketAddress address, + SocketAddress localAddress, HttpHeaders headers, List cookies, byte[] byteData, @@ -88,7 +91,8 @@ public DefaultRequest(String method, long rangeOffset, Charset charset, ChannelPoolPartitioning channelPoolPartitioning, - NameResolver nameResolver) { + NameResolver nameResolver, + NameResolver domainNameResolver) { this.method = method; this.uri = uri; this.address = address; @@ -114,6 +118,7 @@ public DefaultRequest(String method, this.charset = charset; this.channelPoolPartitioning = channelPoolPartitioning; this.nameResolver = nameResolver; + this.domainNameResolver = domainNameResolver; } @Override @@ -132,12 +137,12 @@ public Uri getUri() { } @Override - public InetAddress getAddress() { + public SocketAddress getAddress() { return address; } @Override - public InetAddress getLocalAddress() { + public SocketAddress getLocalAddress() { return localAddress; } @@ -246,6 +251,11 @@ public NameResolver getNameResolver() { return nameResolver; } + @Override + public NameResolver getDomainNameResolver() { + return domainNameResolver; + } + @Override public List getQueryParams() { if (queryParams == null) diff --git a/client/src/main/java/org/asynchttpclient/Request.java b/client/src/main/java/org/asynchttpclient/Request.java index cf6a82dee2..d608c8f9c5 100644 --- a/client/src/main/java/org/asynchttpclient/Request.java +++ b/client/src/main/java/org/asynchttpclient/Request.java @@ -16,6 +16,7 @@ */ package org.asynchttpclient; +import io.netty.channel.unix.DomainSocketAddress; import io.netty.handler.codec.http.HttpHeaders; import io.netty.handler.codec.http.cookie.Cookie; import io.netty.resolver.NameResolver; @@ -28,6 +29,7 @@ import java.io.File; import java.io.InputStream; import java.net.InetAddress; +import java.net.SocketAddress; import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.util.List; @@ -64,12 +66,12 @@ public interface Request { /** * @return the InetAddress to be used to bypass uri's hostname resolution */ - InetAddress getAddress(); + SocketAddress getAddress(); /** * @return the local address to bind from */ - InetAddress getLocalAddress(); + SocketAddress getLocalAddress(); /** * @return the HTTP headers @@ -181,6 +183,11 @@ public interface Request { */ NameResolver getNameResolver(); + /** + * @return the NameResolver to be used to resolve hostnams's IP + */ + NameResolver getDomainNameResolver(); + /** * @return a new request builder using this request as a prototype */ diff --git a/client/src/main/java/org/asynchttpclient/RequestBuilderBase.java b/client/src/main/java/org/asynchttpclient/RequestBuilderBase.java index 35c8145776..95079c5349 100644 --- a/client/src/main/java/org/asynchttpclient/RequestBuilderBase.java +++ b/client/src/main/java/org/asynchttpclient/RequestBuilderBase.java @@ -16,6 +16,7 @@ package org.asynchttpclient; import io.netty.buffer.ByteBuf; +import io.netty.channel.unix.DomainSocketAddress; import io.netty.handler.codec.http.DefaultHttpHeaders; import io.netty.handler.codec.http.HttpHeaders; import io.netty.handler.codec.http.cookie.Cookie; @@ -27,6 +28,7 @@ import org.asynchttpclient.request.body.generator.BodyGenerator; import org.asynchttpclient.request.body.generator.ReactiveStreamsBodyGenerator; import org.asynchttpclient.request.body.multipart.Part; +import org.asynchttpclient.resolver.DefaultDomainNameResolver; import org.asynchttpclient.uri.Uri; import org.asynchttpclient.util.UriEncoder; import org.reactivestreams.Publisher; @@ -36,6 +38,8 @@ import java.io.File; import java.io.InputStream; import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.util.*; @@ -56,6 +60,7 @@ public abstract class RequestBuilderBase> { private final static Logger LOGGER = LoggerFactory.getLogger(RequestBuilderBase.class); private static final Uri DEFAULT_REQUEST_URL = Uri.create("http://localhost"); public static NameResolver DEFAULT_NAME_RESOLVER = new DefaultNameResolver(ImmediateEventExecutor.INSTANCE); + public static NameResolver DEFAULT_DOMAIN_NAME_RESOLVER = new DefaultDomainNameResolver(ImmediateEventExecutor.INSTANCE); // builder only fields protected UriEncoder uriEncoder; protected List queryParams; @@ -64,8 +69,8 @@ public abstract class RequestBuilderBase> { // request fields protected String method; protected Uri uri; - protected InetAddress address; - protected InetAddress localAddress; + protected SocketAddress address; + protected SocketAddress localAddress; protected HttpHeaders headers; protected ArrayList cookies; protected byte[] byteData; @@ -87,6 +92,7 @@ public abstract class RequestBuilderBase> { protected Charset charset; protected ChannelPoolPartitioning channelPoolPartitioning = ChannelPoolPartitioning.PerHostChannelPoolPartitioning.INSTANCE; protected NameResolver nameResolver = DEFAULT_NAME_RESOLVER; + protected NameResolver domainNameResolver = DEFAULT_DOMAIN_NAME_RESOLVER; protected RequestBuilderBase(String method, boolean disableUrlEncoding) { this(method, disableUrlEncoding, true); @@ -136,6 +142,7 @@ protected RequestBuilderBase(Request prototype, boolean disableUrlEncoding, bool this.charset = prototype.getCharset(); this.channelPoolPartitioning = prototype.getChannelPoolPartitioning(); this.nameResolver = prototype.getNameResolver(); + this.domainNameResolver = prototype.getDomainNameResolver(); } @SuppressWarnings("unchecked") @@ -144,6 +151,9 @@ private T asDerivedType() { } public T setUrl(String url) { + if (!url.contains("://")){ + url = "http://127.0.0.1:80" + url; + } return setUri(Uri.create(url)); } @@ -153,11 +163,20 @@ public T setUri(Uri uri) { } public T setAddress(InetAddress address) { - this.address = address; + this.address = new InetSocketAddress(address,0); return asDerivedType(); } public T setLocalAddress(InetAddress address) { + this.localAddress = new InetSocketAddress(address,0); + return asDerivedType(); + } + public T setDomainAddress(DomainSocketAddress address) { + this.address = address; + return asDerivedType(); + } + + public T setDomainLocalAddress(DomainSocketAddress address) { this.localAddress = address; return asDerivedType(); } @@ -534,6 +553,11 @@ public T setNameResolver(NameResolver nameResolver) { return asDerivedType(); } + public T setDomainNameResolver(NameResolver nameResolver) { + this.domainNameResolver = nameResolver; + return asDerivedType(); + } + public T setSignatureCalculator(SignatureCalculator signatureCalculator) { this.signatureCalculator = signatureCalculator; return asDerivedType(); @@ -580,6 +604,7 @@ private RequestBuilderBase executeSignatureCalculator() { rb.charset = this.charset; rb.channelPoolPartitioning = this.channelPoolPartitioning; rb.nameResolver = this.nameResolver; + rb.domainNameResolver = this.domainNameResolver; Request unsignedRequest = rb.build(); signatureCalculator.calculateAndAddSignature(unsignedRequest, rb); return rb; @@ -642,6 +667,7 @@ public Request build() { rb.rangeOffset, rb.charset, rb.channelPoolPartitioning, - rb.nameResolver); + rb.nameResolver, + rb.domainNameResolver); } } diff --git a/client/src/main/java/org/asynchttpclient/channel/DefaultKeepAliveStrategy.java b/client/src/main/java/org/asynchttpclient/channel/DefaultKeepAliveStrategy.java index f1c6a5f42f..a1f984c474 100644 --- a/client/src/main/java/org/asynchttpclient/channel/DefaultKeepAliveStrategy.java +++ b/client/src/main/java/org/asynchttpclient/channel/DefaultKeepAliveStrategy.java @@ -6,6 +6,7 @@ import org.asynchttpclient.Request; import java.net.InetSocketAddress; +import java.net.SocketAddress; import static io.netty.handler.codec.http.HttpHeaderValues.CLOSE; @@ -18,7 +19,7 @@ public class DefaultKeepAliveStrategy implements KeepAliveStrategy { * Implemented in accordance with RFC 7230 section 6.1 https://tools.ietf.org/html/rfc7230#section-6.1 */ @Override - public boolean keepAlive(InetSocketAddress remoteAddress, Request ahcRequest, HttpRequest request, HttpResponse response) { + public boolean keepAlive(SocketAddress remoteAddress, Request ahcRequest, HttpRequest request, HttpResponse response) { return HttpUtil.isKeepAlive(response) && HttpUtil.isKeepAlive(request) // support non standard Proxy-Connection diff --git a/client/src/main/java/org/asynchttpclient/channel/KeepAliveStrategy.java b/client/src/main/java/org/asynchttpclient/channel/KeepAliveStrategy.java index c748fe76ac..3da3f4c611 100644 --- a/client/src/main/java/org/asynchttpclient/channel/KeepAliveStrategy.java +++ b/client/src/main/java/org/asynchttpclient/channel/KeepAliveStrategy.java @@ -18,6 +18,7 @@ import org.asynchttpclient.Request; import java.net.InetSocketAddress; +import java.net.SocketAddress; public interface KeepAliveStrategy { @@ -30,5 +31,5 @@ public interface KeepAliveStrategy { * @param nettyResponse the HTTP response received from Netty * @return true if the connection should be kept alive, false if it should be closed. */ - boolean keepAlive(InetSocketAddress remoteAddress, Request ahcRequest, HttpRequest nettyRequest, HttpResponse nettyResponse); + boolean keepAlive(SocketAddress remoteAddress, Request ahcRequest, HttpRequest nettyRequest, HttpResponse nettyResponse); } diff --git a/client/src/main/java/org/asynchttpclient/config/AsyncHttpClientConfigDefaults.java b/client/src/main/java/org/asynchttpclient/config/AsyncHttpClientConfigDefaults.java index fa073bc82f..8b1abd4f7f 100644 --- a/client/src/main/java/org/asynchttpclient/config/AsyncHttpClientConfigDefaults.java +++ b/client/src/main/java/org/asynchttpclient/config/AsyncHttpClientConfigDefaults.java @@ -69,6 +69,7 @@ public final class AsyncHttpClientConfigDefaults { public static final String SHUTDOWN_QUIET_PERIOD_CONFIG = "shutdownQuietPeriod"; public static final String SHUTDOWN_TIMEOUT_CONFIG = "shutdownTimeout"; public static final String USE_NATIVE_TRANSPORT_CONFIG = "useNativeTransport"; + public static final String UNIX_SOCKET = "unixSocket"; public static final String IO_THREADS_COUNT_CONFIG = "ioThreadsCount"; public static final String AHC_VERSION; @@ -286,6 +287,10 @@ public static boolean defaultUseNativeTransport() { return AsyncHttpClientConfigHelper.getAsyncHttpClientConfig().getBoolean(ASYNC_CLIENT_CONFIG_ROOT + USE_NATIVE_TRANSPORT_CONFIG); } + public static String defaultUnixSocket() { + return AsyncHttpClientConfigHelper.getAsyncHttpClientConfig().getString(ASYNC_CLIENT_CONFIG_ROOT + UNIX_SOCKET); + } + public static int defaultIoThreadsCount() { return AsyncHttpClientConfigHelper.getAsyncHttpClientConfig().getInt(ASYNC_CLIENT_CONFIG_ROOT + IO_THREADS_COUNT_CONFIG); } diff --git a/client/src/main/java/org/asynchttpclient/netty/channel/ChannelManager.java b/client/src/main/java/org/asynchttpclient/netty/channel/ChannelManager.java index 4488bb6514..ade0f65af7 100755 --- a/client/src/main/java/org/asynchttpclient/netty/channel/ChannelManager.java +++ b/client/src/main/java/org/asynchttpclient/netty/channel/ChannelManager.java @@ -38,6 +38,7 @@ import io.netty.resolver.NameResolver; import io.netty.util.Timer; import io.netty.util.concurrent.*; +import io.netty.util.internal.StringUtil; import org.asynchttpclient.*; import org.asynchttpclient.channel.ChannelPool; import org.asynchttpclient.channel.ChannelPoolPartitioning; @@ -51,6 +52,7 @@ import org.asynchttpclient.netty.ssl.DefaultSslEngineFactory; import org.asynchttpclient.proxy.ProxyServer; import org.asynchttpclient.uri.Uri; +import org.asynchttpclient.util.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -124,7 +126,11 @@ public ChannelManager(final AsyncHttpClientConfig config, Timer nettyTimer) { TransportFactory transportFactory; if (allowReleaseEventLoopGroup) { if (config.isUseNativeTransport()) { - transportFactory = getNativeTransportFactory(); + if (config.isUseUnixDomain()){ + transportFactory = getDomainTransportFactory(); + }else { + transportFactory = getNativeTransportFactory(); + } } else { transportFactory = NioTransportFactory.INSTANCE; } @@ -134,11 +140,22 @@ public ChannelManager(final AsyncHttpClientConfig config, Timer nettyTimer) { eventLoopGroup = config.getEventLoopGroup(); if (eventLoopGroup instanceof NioEventLoopGroup) { + if (config.isUseUnixDomain()){ + throw new IllegalArgumentException("Unix domain socket not support NioEventLoopGroup now!"); + } transportFactory = NioTransportFactory.INSTANCE; } else if (eventLoopGroup instanceof EpollEventLoopGroup) { - transportFactory = new EpollTransportFactory(); + if (config.isUseUnixDomain()){ + transportFactory = new EpollDomainTransportFactory(); + }else { + transportFactory = new EpollTransportFactory(); + } } else if (eventLoopGroup instanceof KQueueEventLoopGroup) { - transportFactory = new KQueueTransportFactory(); + if (config.isUseUnixDomain()){ + transportFactory = new KQueueDomainTransportFactory(); + }else { + transportFactory = new KQueueTransportFactory(); + } } else { throw new IllegalArgumentException("Unknown event loop group " + eventLoopGroup.getClass().getSimpleName()); } @@ -199,6 +216,18 @@ private Bootstrap newBootstrap(ChannelFactory channelFactory, } } + private TransportFactory getDomainTransportFactory() { + try { + return (TransportFactory) Class.forName("org.asynchttpclient.netty.channel.EpollDomainTransportFactory").newInstance(); + } catch (Exception e) { + try { + return (TransportFactory) Class.forName("org.asynchttpclient.netty.channel.KQueueDomainTransportFactory").newInstance(); + } catch (Exception e1) { + throw new IllegalArgumentException("No suitable native transport (epoll or kqueue) available"); + } + } + } + public void configureBootstraps(NettyRequestSender requestSender) { final AsyncHttpClientHandler httpHandler = new HttpHandler(config, this, requestSender); diff --git a/client/src/main/java/org/asynchttpclient/netty/channel/EpollDomainTransportFactory.java b/client/src/main/java/org/asynchttpclient/netty/channel/EpollDomainTransportFactory.java new file mode 100644 index 0000000000..b5cc22c996 --- /dev/null +++ b/client/src/main/java/org/asynchttpclient/netty/channel/EpollDomainTransportFactory.java @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2016 AsyncHttpClient Project. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at + * http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package org.asynchttpclient.netty.channel; + +import io.netty.channel.epoll.Epoll; +import io.netty.channel.epoll.EpollDomainSocketChannel; +import io.netty.channel.epoll.EpollEventLoopGroup; + +import java.util.concurrent.ThreadFactory; + +class EpollDomainTransportFactory implements TransportFactory { + + EpollDomainTransportFactory() { + try { + Class.forName("io.netty.channel.epoll.Epoll"); + } catch (ClassNotFoundException e) { + throw new IllegalStateException("The epoll transport is not available"); + } + if (!Epoll.isAvailable()) { + throw new IllegalStateException("The epoll transport is not supported"); + } + } + + @Override + public EpollDomainSocketChannel newChannel() { + return new EpollDomainSocketChannel(); + } + + @Override + public EpollEventLoopGroup newEventLoopGroup(int ioThreadsCount, ThreadFactory threadFactory) { + return new EpollEventLoopGroup(ioThreadsCount, threadFactory); + } +} diff --git a/client/src/main/java/org/asynchttpclient/netty/channel/KQueueDomainTransportFactory.java b/client/src/main/java/org/asynchttpclient/netty/channel/KQueueDomainTransportFactory.java new file mode 100644 index 0000000000..220c8c90d6 --- /dev/null +++ b/client/src/main/java/org/asynchttpclient/netty/channel/KQueueDomainTransportFactory.java @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2019 AsyncHttpClient Project. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at + * http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package org.asynchttpclient.netty.channel; + +import io.netty.channel.kqueue.KQueue; +import io.netty.channel.kqueue.KQueueDomainSocketChannel; +import io.netty.channel.kqueue.KQueueEventLoopGroup; + +import java.util.concurrent.ThreadFactory; + +class KQueueDomainTransportFactory implements TransportFactory { + + KQueueDomainTransportFactory() { + try { + Class.forName("io.netty.channel.kqueue.KQueue"); + } catch (ClassNotFoundException e) { + throw new IllegalStateException("The kqueue transport is not available"); + } + if (!KQueue.isAvailable()) { + throw new IllegalStateException("The kqueue transport is not supported"); + } + } + + @Override + public KQueueDomainSocketChannel newChannel() { + return new KQueueDomainSocketChannel(); + } + + @Override + public KQueueEventLoopGroup newEventLoopGroup(int ioThreadsCount, ThreadFactory threadFactory) { + return new KQueueEventLoopGroup(ioThreadsCount, threadFactory); + } +} diff --git a/client/src/main/java/org/asynchttpclient/netty/channel/NettyChannelConnector.java b/client/src/main/java/org/asynchttpclient/netty/channel/NettyChannelConnector.java index 8951bd062e..04b36d7fc9 100644 --- a/client/src/main/java/org/asynchttpclient/netty/channel/NettyChannelConnector.java +++ b/client/src/main/java/org/asynchttpclient/netty/channel/NettyChannelConnector.java @@ -22,6 +22,7 @@ import java.net.InetAddress; import java.net.InetSocketAddress; +import java.net.SocketAddress; import java.util.List; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; @@ -34,16 +35,16 @@ public class NettyChannelConnector { .newUpdater(NettyChannelConnector.class, "i"); private final AsyncHandler asyncHandler; - private final InetSocketAddress localAddress; - private final List remoteAddresses; + private final SocketAddress localAddress; + private final List remoteAddresses; private final AsyncHttpClientState clientState; private volatile int i = 0; - public NettyChannelConnector(InetAddress localAddress, - List remoteAddresses, + public NettyChannelConnector(SocketAddress localAddress, + List remoteAddresses, AsyncHandler asyncHandler, AsyncHttpClientState clientState) { - this.localAddress = localAddress != null ? new InetSocketAddress(localAddress, 0) : null; + this.localAddress = localAddress ; this.remoteAddresses = remoteAddresses; this.asyncHandler = asyncHandler; this.clientState = clientState; @@ -55,7 +56,7 @@ private boolean pickNextRemoteAddress() { } public void connect(final Bootstrap bootstrap, final NettyConnectListener connectListener) { - final InetSocketAddress remoteAddress = remoteAddresses.get(i); + final SocketAddress remoteAddress = remoteAddresses.get(i); try { asyncHandler.onTcpConnectAttempt(remoteAddress); @@ -76,7 +77,7 @@ public void connect(final Bootstrap bootstrap, final NettyConnectListener con } } - private void connect0(Bootstrap bootstrap, final NettyConnectListener connectListener, InetSocketAddress remoteAddress) { + private void connect0(Bootstrap bootstrap, final NettyConnectListener connectListener, SocketAddress remoteAddress) { bootstrap.connect(remoteAddress, localAddress) .addListener(new SimpleChannelFutureListener() { diff --git a/client/src/main/java/org/asynchttpclient/netty/channel/NettyConnectListener.java b/client/src/main/java/org/asynchttpclient/netty/channel/NettyConnectListener.java index 4a6f4dce20..a01e5fbcca 100755 --- a/client/src/main/java/org/asynchttpclient/netty/channel/NettyConnectListener.java +++ b/client/src/main/java/org/asynchttpclient/netty/channel/NettyConnectListener.java @@ -30,6 +30,7 @@ import java.net.ConnectException; import java.net.InetSocketAddress; +import java.net.SocketAddress; /** * Non Blocking connect. @@ -80,7 +81,7 @@ private void writeRequest(Channel channel) { requestSender.writeRequest(future, channel); } - public void onSuccess(Channel channel, InetSocketAddress remoteAddress) { + public void onSuccess(Channel channel, SocketAddress remoteAddress) { if (connectionSemaphore != null) { // transfer lock from future to channel diff --git a/client/src/main/java/org/asynchttpclient/netty/handler/HttpHandler.java b/client/src/main/java/org/asynchttpclient/netty/handler/HttpHandler.java index a52f75fc83..2393aca855 100755 --- a/client/src/main/java/org/asynchttpclient/netty/handler/HttpHandler.java +++ b/client/src/main/java/org/asynchttpclient/netty/handler/HttpHandler.java @@ -70,7 +70,7 @@ private void handleHttpResponse(final HttpResponse response, final Channel chann HttpRequest httpRequest = future.getNettyRequest().getHttpRequest(); logger.debug("\n\nRequest {}\n\nResponse {}\n", httpRequest, response); - future.setKeepAlive(config.getKeepAliveStrategy().keepAlive((InetSocketAddress) channel.remoteAddress(), future.getTargetRequest(), httpRequest, response)); + future.setKeepAlive(config.getKeepAliveStrategy().keepAlive(channel.remoteAddress(), future.getTargetRequest(), httpRequest, response)); NettyResponseStatus status = new NettyResponseStatus(future.getUri(), response, channel); HttpHeaders responseHeaders = response.headers(); diff --git a/client/src/main/java/org/asynchttpclient/netty/handler/intercept/Redirect30xInterceptor.java b/client/src/main/java/org/asynchttpclient/netty/handler/intercept/Redirect30xInterceptor.java index d56b90fd24..56cfd293d8 100644 --- a/client/src/main/java/org/asynchttpclient/netty/handler/intercept/Redirect30xInterceptor.java +++ b/client/src/main/java/org/asynchttpclient/netty/handler/intercept/Redirect30xInterceptor.java @@ -32,6 +32,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -90,11 +93,13 @@ public boolean exitAfterHandlingRedirect(Channel channel, && (statusCode == MOVED_PERMANENTLY_301 || statusCode == SEE_OTHER_303 || (statusCode == FOUND_302 && !config.isStrict302Handling())); boolean keepBody = statusCode == TEMPORARY_REDIRECT_307 || statusCode == PERMANENT_REDIRECT_308 || (statusCode == FOUND_302 && config.isStrict302Handling()); + InetSocketAddress localAddress = (InetSocketAddress) request.getLocalAddress(); final RequestBuilder requestBuilder = new RequestBuilder(switchToGet ? GET : originalMethod) .setChannelPoolPartitioning(request.getChannelPoolPartitioning()) .setFollowRedirect(true) - .setLocalAddress(request.getLocalAddress()) + .setLocalAddress(localAddress.getAddress()) .setNameResolver(request.getNameResolver()) + .setDomainNameResolver(request.getDomainNameResolver()) .setProxyServer(request.getProxyServer()) .setRealm(request.getRealm()) .setRequestTimeout(request.getRequestTimeout()); diff --git a/client/src/main/java/org/asynchttpclient/netty/request/NettyRequestSender.java b/client/src/main/java/org/asynchttpclient/netty/request/NettyRequestSender.java index 32720acc10..20c5a0acdb 100755 --- a/client/src/main/java/org/asynchttpclient/netty/request/NettyRequestSender.java +++ b/client/src/main/java/org/asynchttpclient/netty/request/NettyRequestSender.java @@ -18,6 +18,7 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelProgressivePromise; import io.netty.channel.ChannelPromise; +import io.netty.channel.unix.DomainSocketAddress; import io.netty.handler.codec.http.*; import io.netty.util.Timer; import io.netty.util.concurrent.Future; @@ -296,35 +297,65 @@ private ListenableFuture sendRequestWithNewChannel(Request request, // exit and don't try to resolve address return future; } + if (config.isUseUnixDomain()){ + resolveDomainAddresses(request, proxy, future, asyncHandler) + .addListener(new SimpleFutureListener>() { + + @Override + protected void onSuccess(List addresses) { + NettyConnectListener connectListener = new NettyConnectListener<>(future, + NettyRequestSender.this, channelManager, connectionSemaphore); + NettyChannelConnector connector = new NettyChannelConnector(request.getLocalAddress(), + addresses, asyncHandler, clientState); + if (!future.isDone()) { + // Do not throw an exception when we need an extra connection for a redirect + // FIXME why? This violate the max connection per host handling, right? + channelManager.getBootstrap(request.getUri(), null, null) + .addListener((Future whenBootstrap) -> { + if (whenBootstrap.isSuccess()) { + connector.connect(whenBootstrap.get(), connectListener); + } else { + abort(null, future, whenBootstrap.cause()); + } + }); + } + } - resolveAddresses(request, proxy, future, asyncHandler) - .addListener(new SimpleFutureListener>() { - - @Override - protected void onSuccess(List addresses) { - NettyConnectListener connectListener = new NettyConnectListener<>(future, - NettyRequestSender.this, channelManager, connectionSemaphore); - NettyChannelConnector connector = new NettyChannelConnector(request.getLocalAddress(), - addresses, asyncHandler, clientState); - if (!future.isDone()) { - // Do not throw an exception when we need an extra connection for a redirect - // FIXME why? This violate the max connection per host handling, right? - channelManager.getBootstrap(request.getUri(), request.getNameResolver(), proxy) - .addListener((Future whenBootstrap) -> { - if (whenBootstrap.isSuccess()) { - connector.connect(whenBootstrap.get(), connectListener); - } else { - abort(null, future, whenBootstrap.cause()); - } - }); + @Override + protected void onFailure(Throwable cause) { + abort(null, future, getCause(cause)); + } + }); + }else { + resolveAddresses(request, proxy, future, asyncHandler) + .addListener(new SimpleFutureListener>() { + + @Override + protected void onSuccess(List addresses) { + NettyConnectListener connectListener = new NettyConnectListener<>(future, + NettyRequestSender.this, channelManager, connectionSemaphore); + NettyChannelConnector connector = new NettyChannelConnector(request.getLocalAddress(), + addresses, asyncHandler, clientState); + if (!future.isDone()) { + // Do not throw an exception when we need an extra connection for a redirect + // FIXME why? This violate the max connection per host handling, right? + channelManager.getBootstrap(request.getUri(), request.getNameResolver(), proxy) + .addListener((Future whenBootstrap) -> { + if (whenBootstrap.isSuccess()) { + connector.connect(whenBootstrap.get(), connectListener); + } else { + abort(null, future, whenBootstrap.cause()); + } + }); + } } - } - @Override - protected void onFailure(Throwable cause) { - abort(null, future, getCause(cause)); - } - }); + @Override + protected void onFailure(Throwable cause) { + abort(null, future, getCause(cause)); + } + }); + } return future; } @@ -351,13 +382,35 @@ private Future> resolveAddresses(Request request, if (request.getAddress() != null) { // bypass resolution - InetSocketAddress inetSocketAddress = new InetSocketAddress(request.getAddress(), port); - return promise.setSuccess(singletonList(inetSocketAddress)); + InetSocketAddress address = (InetSocketAddress) request.getAddress(); + if (address.getPort() != port){ + address = new InetSocketAddress(address.getAddress(), port); + } + return promise.setSuccess(singletonList(address)); } else { return RequestHostnameResolver.INSTANCE.resolve(request.getNameResolver(), unresolvedRemoteAddress, asyncHandler); } } } + private Future> resolveDomainAddresses(Request request, + ProxyServer proxy, + NettyResponseFuture future, + AsyncHandler asyncHandler) { + + if (proxy != null ) { + throw new IllegalArgumentException("Unix domain socket not support proxy"); + } else { + + DomainSocketAddress socketAddress = new DomainSocketAddress(config.getUnixSocket()); + scheduleRequestTimeout(future, socketAddress); + + if (request.getAddress() != null) { + throw new IllegalArgumentException("Unix domain socket not support set address !"); + } else { + return RequestHostnameResolver.INSTANCE.resolve(request.getDomainNameResolver(), socketAddress, asyncHandler); + } + } + } private NettyResponseFuture newNettyResponseFuture(Request request, AsyncHandler asyncHandler, @@ -442,7 +495,7 @@ private void configureTransferAdapter(AsyncHandler handler, HttpRequest httpR } private void scheduleRequestTimeout(NettyResponseFuture nettyResponseFuture, - InetSocketAddress originalRemoteAddress) { + SocketAddress originalRemoteAddress) { nettyResponseFuture.touch(); TimeoutsHolder timeoutsHolder = new TimeoutsHolder(nettyTimer, nettyResponseFuture, this, config, originalRemoteAddress); diff --git a/client/src/main/java/org/asynchttpclient/netty/timeout/TimeoutTimerTask.java b/client/src/main/java/org/asynchttpclient/netty/timeout/TimeoutTimerTask.java index 034502785c..3e3d1da7f2 100755 --- a/client/src/main/java/org/asynchttpclient/netty/timeout/TimeoutTimerTask.java +++ b/client/src/main/java/org/asynchttpclient/netty/timeout/TimeoutTimerTask.java @@ -13,6 +13,7 @@ */ package org.asynchttpclient.netty.timeout; +import io.netty.channel.unix.DomainSocketAddress; import io.netty.util.TimerTask; import org.asynchttpclient.netty.NettyResponseFuture; import org.asynchttpclient.netty.request.NettyRequestSender; @@ -20,6 +21,7 @@ import org.slf4j.LoggerFactory; import java.net.InetSocketAddress; +import java.net.SocketAddress; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; @@ -54,11 +56,16 @@ public void clean() { } void appendRemoteAddress(StringBuilder sb) { - InetSocketAddress remoteAddress = timeoutsHolder.remoteAddress(); - sb.append(remoteAddress.getHostString()); - if (!remoteAddress.isUnresolved()) { - sb.append('/').append(remoteAddress.getAddress().getHostAddress()); + SocketAddress socketAddress = timeoutsHolder.remoteAddress(); + if (socketAddress instanceof InetSocketAddress){ + InetSocketAddress remoteAddress = (InetSocketAddress) socketAddress; + sb.append(remoteAddress.getHostString()); + if (!remoteAddress.isUnresolved()) { + sb.append('/').append(remoteAddress.getAddress().getHostAddress()); + } + sb.append(':').append(remoteAddress.getPort()); + }else if (socketAddress instanceof DomainSocketAddress){ + sb.append(((DomainSocketAddress) socketAddress).path()); } - sb.append(':').append(remoteAddress.getPort()); } } diff --git a/client/src/main/java/org/asynchttpclient/netty/timeout/TimeoutsHolder.java b/client/src/main/java/org/asynchttpclient/netty/timeout/TimeoutsHolder.java index 89d3faf586..2d41bf57c9 100755 --- a/client/src/main/java/org/asynchttpclient/netty/timeout/TimeoutsHolder.java +++ b/client/src/main/java/org/asynchttpclient/netty/timeout/TimeoutsHolder.java @@ -22,6 +22,7 @@ import org.asynchttpclient.netty.request.NettyRequestSender; import java.net.InetSocketAddress; +import java.net.SocketAddress; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -37,9 +38,9 @@ public class TimeoutsHolder { private final int readTimeoutValue; private volatile Timeout readTimeout; private volatile NettyResponseFuture nettyResponseFuture; - private volatile InetSocketAddress remoteAddress; + private volatile SocketAddress remoteAddress; - public TimeoutsHolder(Timer nettyTimer, NettyResponseFuture nettyResponseFuture, NettyRequestSender requestSender, AsyncHttpClientConfig config, InetSocketAddress originalRemoteAddress) { + public TimeoutsHolder(Timer nettyTimer, NettyResponseFuture nettyResponseFuture, NettyRequestSender requestSender, AsyncHttpClientConfig config, SocketAddress originalRemoteAddress) { this.nettyTimer = nettyTimer; this.nettyResponseFuture = nettyResponseFuture; this.requestSender = requestSender; @@ -64,11 +65,11 @@ public TimeoutsHolder(Timer nettyTimer, NettyResponseFuture nettyResponseFutu } } - public void setResolvedRemoteAddress(InetSocketAddress address) { + public void setResolvedRemoteAddress(SocketAddress address) { remoteAddress = address; } - InetSocketAddress remoteAddress() { + SocketAddress remoteAddress() { return remoteAddress; } diff --git a/client/src/main/java/org/asynchttpclient/resolver/DefaultDomainNameResolver.java b/client/src/main/java/org/asynchttpclient/resolver/DefaultDomainNameResolver.java new file mode 100644 index 0000000000..7afa36d682 --- /dev/null +++ b/client/src/main/java/org/asynchttpclient/resolver/DefaultDomainNameResolver.java @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2015 AsyncHttpClient Project. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at + * http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package org.asynchttpclient.resolver; + +import io.netty.channel.unix.DomainSocketAddress; +import io.netty.resolver.SimpleNameResolver; +import io.netty.util.concurrent.EventExecutor; +import io.netty.util.concurrent.Promise; +import io.netty.util.internal.SocketUtils; + +import java.util.Arrays; +import java.util.List; + +public class DefaultDomainNameResolver extends SimpleNameResolver { + public DefaultDomainNameResolver(EventExecutor executor) { + super(executor); + } + + @Override + protected void doResolve(String path, Promise promise) throws Exception { + promise.setSuccess(new DomainSocketAddress(path)); + } + + @Override + protected void doResolveAll(String path, Promise> promise) throws Exception { + promise.setSuccess(Arrays.asList(new DomainSocketAddress(path))); + } +} diff --git a/client/src/main/java/org/asynchttpclient/resolver/RequestHostnameResolver.java b/client/src/main/java/org/asynchttpclient/resolver/RequestHostnameResolver.java index da42fcf660..a54b40a682 100644 --- a/client/src/main/java/org/asynchttpclient/resolver/RequestHostnameResolver.java +++ b/client/src/main/java/org/asynchttpclient/resolver/RequestHostnameResolver.java @@ -13,6 +13,7 @@ */ package org.asynchttpclient.resolver; +import io.netty.channel.unix.DomainSocketAddress; import io.netty.resolver.NameResolver; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.ImmediateEventExecutor; @@ -33,6 +34,50 @@ public enum RequestHostnameResolver { private static final Logger LOGGER = LoggerFactory.getLogger(RequestHostnameResolver.class); + public Future> resolve(NameResolver nameResolver, DomainSocketAddress address, AsyncHandler asyncHandler) { + + final Promise> promise = ImmediateEventExecutor.INSTANCE.newPromise(); + + try { + asyncHandler.onHostnameResolutionAttempt(address.path()); + } catch (Exception e) { + LOGGER.error("onHostnameResolutionAttempt crashed", e); + promise.tryFailure(e); + return promise; + } + + final Future> whenResolved = nameResolver.resolveAll(address.path()); + + whenResolved.addListener(new SimpleFutureListener>() { + + @Override + protected void onSuccess(List socketAddresses) { + try { + asyncHandler.onHostnameResolutionSuccess(address.path(), socketAddresses); + } catch (Exception e) { + LOGGER.error("onHostnameResolutionSuccess crashed", e); + promise.tryFailure(e); + return; + } + promise.trySuccess(socketAddresses); + } + + @Override + protected void onFailure(Throwable t) { + try { + asyncHandler.onHostnameResolutionFailure(address.path(), t); + } catch (Exception e) { + LOGGER.error("onHostnameResolutionFailure crashed", e); + promise.tryFailure(e); + return; + } + promise.tryFailure(t); + } + }); + + return promise; + } + public Future> resolve(NameResolver nameResolver, InetSocketAddress unresolvedAddress, AsyncHandler asyncHandler) { final String hostname = unresolvedAddress.getHostString(); diff --git a/client/src/test/java/org/asynchttpclient/test/EventCollectingHandler.java b/client/src/test/java/org/asynchttpclient/test/EventCollectingHandler.java index 8047c5f843..a3bb6145ea 100644 --- a/client/src/test/java/org/asynchttpclient/test/EventCollectingHandler.java +++ b/client/src/test/java/org/asynchttpclient/test/EventCollectingHandler.java @@ -22,6 +22,7 @@ import javax.net.ssl.SSLSession; import java.net.InetSocketAddress; +import java.net.SocketAddress; import java.util.List; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; @@ -94,17 +95,17 @@ public State onContentWritten() { } @Override - public void onTcpConnectAttempt(InetSocketAddress address) { + public void onTcpConnectAttempt(SocketAddress address) { firedEvents.add(CONNECTION_OPEN_EVENT); } @Override - public void onTcpConnectSuccess(InetSocketAddress address, Channel connection) { + public void onTcpConnectSuccess(SocketAddress address, Channel connection) { firedEvents.add(CONNECTION_SUCCESS_EVENT); } @Override - public void onTcpConnectFailure(InetSocketAddress address, Throwable t) { + public void onTcpConnectFailure(SocketAddress address, Throwable t) { firedEvents.add(CONNECTION_FAILURE_EVENT); } @@ -114,7 +115,7 @@ public void onHostnameResolutionAttempt(String name) { } @Override - public void onHostnameResolutionSuccess(String name, List addresses) { + public void onHostnameResolutionSuccess(String name, List addresses) { firedEvents.add(HOSTNAME_RESOLUTION_SUCCESS_EVENT); } diff --git a/example/src/main/java/org/asynchttpclient/example/completable/CompletableFutures.java b/example/src/main/java/org/asynchttpclient/example/completable/CompletableFutures.java index f8a5eb1c0b..bbb5df2917 100644 --- a/example/src/main/java/org/asynchttpclient/example/completable/CompletableFutures.java +++ b/example/src/main/java/org/asynchttpclient/example/completable/CompletableFutures.java @@ -17,11 +17,13 @@ package org.asynchttpclient.example.completable; import org.asynchttpclient.AsyncHttpClient; +import org.asynchttpclient.DefaultAsyncHttpClientConfig; import org.asynchttpclient.Response; import java.io.IOException; import static org.asynchttpclient.Dsl.asyncHttpClient; +import static org.asynchttpclient.Dsl.config; public class CompletableFutures { public static void main(String[] args) throws IOException { @@ -34,5 +36,34 @@ public static void main(String[] args) throws IOException { .thenAccept(System.out::println) .join(); } + if (!isWindows()) { + // support unix domain socket + DefaultAsyncHttpClientConfig.Builder config = config(); + config.setUnixSocket("/root/server.socket"); // when the unixSocket is set, the useNativeTransport will be true. + try (AsyncHttpClient asyncHttpClient = asyncHttpClient(config)) { + asyncHttpClient + .prepareGet("http://www.example.com/") + .execute() + .toCompletableFuture() + .thenApply(Response::getResponseBody) + .thenAccept(System.out::println) + .join(); + } + // support unix domain socket + try (AsyncHttpClient asyncHttpClient = asyncHttpClient(config)) { + asyncHttpClient + .prepareGet("/hello/unix") // will add http:127.0.0.1:80 + .execute() + .toCompletableFuture() + .thenApply(Response::getResponseBody) + .thenAccept(System.out::println) + .join(); + } + } + + } + + private static boolean isWindows(){ + return System.getProperty("os.name").toUpperCase().contains("WINDOW"); } } diff --git a/extras/rxjava2/src/main/java/org/asynchttpclient/extras/rxjava2/maybe/AbstractMaybeAsyncHandlerBridge.java b/extras/rxjava2/src/main/java/org/asynchttpclient/extras/rxjava2/maybe/AbstractMaybeAsyncHandlerBridge.java index 6a5f8dca7a..c174a6ebbb 100644 --- a/extras/rxjava2/src/main/java/org/asynchttpclient/extras/rxjava2/maybe/AbstractMaybeAsyncHandlerBridge.java +++ b/extras/rxjava2/src/main/java/org/asynchttpclient/extras/rxjava2/maybe/AbstractMaybeAsyncHandlerBridge.java @@ -28,6 +28,7 @@ import javax.net.ssl.SSLSession; import java.net.InetSocketAddress; +import java.net.SocketAddress; import java.util.Arrays; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; @@ -155,7 +156,7 @@ public void onHostnameResolutionAttempt(String name) { } @Override - public void onHostnameResolutionSuccess(String name, List addresses) { + public void onHostnameResolutionSuccess(String name, List addresses) { executeUnlessEmitterDisposed(() -> delegate().onHostnameResolutionSuccess(name, addresses)); } @@ -165,17 +166,17 @@ public void onHostnameResolutionFailure(String name, Throwable cause) { } @Override - public void onTcpConnectAttempt(InetSocketAddress remoteAddress) { + public void onTcpConnectAttempt(SocketAddress remoteAddress) { executeUnlessEmitterDisposed(() -> delegate().onTcpConnectAttempt(remoteAddress)); } @Override - public void onTcpConnectSuccess(InetSocketAddress remoteAddress, Channel connection) { + public void onTcpConnectSuccess(SocketAddress remoteAddress, Channel connection) { executeUnlessEmitterDisposed(() -> delegate().onTcpConnectSuccess(remoteAddress, connection)); } @Override - public void onTcpConnectFailure(InetSocketAddress remoteAddress, Throwable cause) { + public void onTcpConnectFailure(SocketAddress remoteAddress, Throwable cause) { executeUnlessEmitterDisposed(() -> delegate().onTcpConnectFailure(remoteAddress, cause)); } diff --git a/extras/typesafeconfig/src/main/java/org/asynchttpclient/extras/typesafeconfig/AsyncHttpClientTypesafeConfig.java b/extras/typesafeconfig/src/main/java/org/asynchttpclient/extras/typesafeconfig/AsyncHttpClientTypesafeConfig.java index 55c88ab251..92ac848441 100644 --- a/extras/typesafeconfig/src/main/java/org/asynchttpclient/extras/typesafeconfig/AsyncHttpClientTypesafeConfig.java +++ b/extras/typesafeconfig/src/main/java/org/asynchttpclient/extras/typesafeconfig/AsyncHttpClientTypesafeConfig.java @@ -309,6 +309,11 @@ public boolean isUseNativeTransport() { return getBooleanOpt(USE_NATIVE_TRANSPORT_CONFIG).orElse(defaultUseNativeTransport()); } + @Override + public String getUnixSocket() { + return getStringOpt(UNIX_SOCKET).orElse(defaultUnixSocket()); + } + @Override public Consumer getHttpAdditionalChannelInitializer() { return null; From 56da156a396706e8c56fb3858b2a2dfc7960e3d3 Mon Sep 17 00:00:00 2001 From: mask <1603849451@qq.com> Date: Mon, 6 Jan 2020 23:21:35 +0800 Subject: [PATCH 2/2] the unix domain socket test success --- .../java/org/asynchttpclient/AsyncHandler.java | 1 - .../src/main/java/org/asynchttpclient/Request.java | 2 +- .../org/asynchttpclient/RequestBuilderBase.java | 11 ++++++----- .../channel/DefaultKeepAliveStrategy.java | 1 - .../asynchttpclient/channel/KeepAliveStrategy.java | 3 +-- .../netty/channel/NettyChannelConnector.java | 2 -- .../netty/channel/NettyConnectListener.java | 1 - .../asynchttpclient/netty/handler/HttpHandler.java | 1 - .../handler/intercept/Redirect30xInterceptor.java | 6 +----- .../netty/request/NettyRequestSender.java | 14 +++++++++----- .../netty/timeout/TimeoutsHolder.java | 1 - .../test/EventCollectingHandler.java | 1 - .../example/completable/CompletableFutures.java | 3 --- .../maybe/AbstractMaybeAsyncHandlerBridge.java | 1 - 14 files changed, 18 insertions(+), 30 deletions(-) diff --git a/client/src/main/java/org/asynchttpclient/AsyncHandler.java b/client/src/main/java/org/asynchttpclient/AsyncHandler.java index 8cec31e40d..f17fd9275c 100644 --- a/client/src/main/java/org/asynchttpclient/AsyncHandler.java +++ b/client/src/main/java/org/asynchttpclient/AsyncHandler.java @@ -20,7 +20,6 @@ import org.asynchttpclient.netty.request.NettyRequest; import javax.net.ssl.SSLSession; -import java.net.InetSocketAddress; import java.net.SocketAddress; import java.util.List; diff --git a/client/src/main/java/org/asynchttpclient/Request.java b/client/src/main/java/org/asynchttpclient/Request.java index d608c8f9c5..10d9c9a0f4 100644 --- a/client/src/main/java/org/asynchttpclient/Request.java +++ b/client/src/main/java/org/asynchttpclient/Request.java @@ -64,7 +64,7 @@ public interface Request { String getUrl(); /** - * @return the InetAddress to be used to bypass uri's hostname resolution + * @return the SocketAddress to be used to bypass uri's hostname or unix domain path resolution */ SocketAddress getAddress(); diff --git a/client/src/main/java/org/asynchttpclient/RequestBuilderBase.java b/client/src/main/java/org/asynchttpclient/RequestBuilderBase.java index 95079c5349..06921420ca 100644 --- a/client/src/main/java/org/asynchttpclient/RequestBuilderBase.java +++ b/client/src/main/java/org/asynchttpclient/RequestBuilderBase.java @@ -167,16 +167,17 @@ public T setAddress(InetAddress address) { return asDerivedType(); } - public T setLocalAddress(InetAddress address) { - this.localAddress = new InetSocketAddress(address,0); + public T setAddress(SocketAddress address) { + this.address = address; return asDerivedType(); } - public T setDomainAddress(DomainSocketAddress address) { - this.address = address; + + public T setLocalAddress(InetAddress address) { + this.localAddress = new InetSocketAddress(address,0); return asDerivedType(); } - public T setDomainLocalAddress(DomainSocketAddress address) { + public T setLocalAddress(SocketAddress address) { this.localAddress = address; return asDerivedType(); } diff --git a/client/src/main/java/org/asynchttpclient/channel/DefaultKeepAliveStrategy.java b/client/src/main/java/org/asynchttpclient/channel/DefaultKeepAliveStrategy.java index a1f984c474..e90f0ce76f 100644 --- a/client/src/main/java/org/asynchttpclient/channel/DefaultKeepAliveStrategy.java +++ b/client/src/main/java/org/asynchttpclient/channel/DefaultKeepAliveStrategy.java @@ -5,7 +5,6 @@ import io.netty.handler.codec.http.HttpUtil; import org.asynchttpclient.Request; -import java.net.InetSocketAddress; import java.net.SocketAddress; import static io.netty.handler.codec.http.HttpHeaderValues.CLOSE; diff --git a/client/src/main/java/org/asynchttpclient/channel/KeepAliveStrategy.java b/client/src/main/java/org/asynchttpclient/channel/KeepAliveStrategy.java index 3da3f4c611..002a8aaef7 100644 --- a/client/src/main/java/org/asynchttpclient/channel/KeepAliveStrategy.java +++ b/client/src/main/java/org/asynchttpclient/channel/KeepAliveStrategy.java @@ -17,7 +17,6 @@ import io.netty.handler.codec.http.HttpResponse; import org.asynchttpclient.Request; -import java.net.InetSocketAddress; import java.net.SocketAddress; public interface KeepAliveStrategy { @@ -25,7 +24,7 @@ public interface KeepAliveStrategy { /** * Determines whether the connection should be kept alive after this HTTP message exchange. * - * @param remoteAddress the remote InetSocketAddress associated with the request + * @param remoteAddress the remote SocketAddress associated with the request * @param ahcRequest the Request, as built by AHC * @param nettyRequest the HTTP request sent to Netty * @param nettyResponse the HTTP response received from Netty diff --git a/client/src/main/java/org/asynchttpclient/netty/channel/NettyChannelConnector.java b/client/src/main/java/org/asynchttpclient/netty/channel/NettyChannelConnector.java index 04b36d7fc9..92a5b31012 100644 --- a/client/src/main/java/org/asynchttpclient/netty/channel/NettyChannelConnector.java +++ b/client/src/main/java/org/asynchttpclient/netty/channel/NettyChannelConnector.java @@ -20,8 +20,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.net.InetAddress; -import java.net.InetSocketAddress; import java.net.SocketAddress; import java.util.List; import java.util.concurrent.RejectedExecutionException; diff --git a/client/src/main/java/org/asynchttpclient/netty/channel/NettyConnectListener.java b/client/src/main/java/org/asynchttpclient/netty/channel/NettyConnectListener.java index a01e5fbcca..997b1c1b9b 100755 --- a/client/src/main/java/org/asynchttpclient/netty/channel/NettyConnectListener.java +++ b/client/src/main/java/org/asynchttpclient/netty/channel/NettyConnectListener.java @@ -29,7 +29,6 @@ import org.slf4j.LoggerFactory; import java.net.ConnectException; -import java.net.InetSocketAddress; import java.net.SocketAddress; /** diff --git a/client/src/main/java/org/asynchttpclient/netty/handler/HttpHandler.java b/client/src/main/java/org/asynchttpclient/netty/handler/HttpHandler.java index 2393aca855..db9fb7cc24 100755 --- a/client/src/main/java/org/asynchttpclient/netty/handler/HttpHandler.java +++ b/client/src/main/java/org/asynchttpclient/netty/handler/HttpHandler.java @@ -30,7 +30,6 @@ import org.asynchttpclient.netty.request.NettyRequestSender; import java.io.IOException; -import java.net.InetSocketAddress; @Sharable public final class HttpHandler extends AsyncHttpClientHandler { diff --git a/client/src/main/java/org/asynchttpclient/netty/handler/intercept/Redirect30xInterceptor.java b/client/src/main/java/org/asynchttpclient/netty/handler/intercept/Redirect30xInterceptor.java index 56cfd293d8..1253f8722a 100644 --- a/client/src/main/java/org/asynchttpclient/netty/handler/intercept/Redirect30xInterceptor.java +++ b/client/src/main/java/org/asynchttpclient/netty/handler/intercept/Redirect30xInterceptor.java @@ -32,9 +32,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.SocketAddress; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -93,11 +90,10 @@ public boolean exitAfterHandlingRedirect(Channel channel, && (statusCode == MOVED_PERMANENTLY_301 || statusCode == SEE_OTHER_303 || (statusCode == FOUND_302 && !config.isStrict302Handling())); boolean keepBody = statusCode == TEMPORARY_REDIRECT_307 || statusCode == PERMANENT_REDIRECT_308 || (statusCode == FOUND_302 && config.isStrict302Handling()); - InetSocketAddress localAddress = (InetSocketAddress) request.getLocalAddress(); final RequestBuilder requestBuilder = new RequestBuilder(switchToGet ? GET : originalMethod) .setChannelPoolPartitioning(request.getChannelPoolPartitioning()) .setFollowRedirect(true) - .setLocalAddress(localAddress.getAddress()) + .setLocalAddress(request.getLocalAddress()) .setNameResolver(request.getNameResolver()) .setDomainNameResolver(request.getDomainNameResolver()) .setProxyServer(request.getProxyServer()) diff --git a/client/src/main/java/org/asynchttpclient/netty/request/NettyRequestSender.java b/client/src/main/java/org/asynchttpclient/netty/request/NettyRequestSender.java index 20c5a0acdb..f7cb2fcaaf 100755 --- a/client/src/main/java/org/asynchttpclient/netty/request/NettyRequestSender.java +++ b/client/src/main/java/org/asynchttpclient/netty/request/NettyRequestSender.java @@ -47,6 +47,7 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.net.SocketAddress; +import java.util.Arrays; import java.util.List; import static io.netty.handler.codec.http.HttpHeaderNames.EXPECT; @@ -240,7 +241,7 @@ private ListenableFuture sendRequestWithOpenChannel(NettyResponseFuture Future> resolveDomainAddresses(Request req if (proxy != null ) { throw new IllegalArgumentException("Unix domain socket not support proxy"); } else { - DomainSocketAddress socketAddress = new DomainSocketAddress(config.getUnixSocket()); scheduleRequestTimeout(future, socketAddress); - - if (request.getAddress() != null) { - throw new IllegalArgumentException("Unix domain socket not support set address !"); + SocketAddress address = request.getAddress(); + if (address != null) { + final Promise> promise = ImmediateEventExecutor.INSTANCE.newPromise(); + if (!(address instanceof DomainSocketAddress)){ + throw new IllegalArgumentException("address must be instance of DomainSocketAddress"); + } + return promise.setSuccess(singletonList((DomainSocketAddress) address)); } else { return RequestHostnameResolver.INSTANCE.resolve(request.getDomainNameResolver(), socketAddress, asyncHandler); } diff --git a/client/src/main/java/org/asynchttpclient/netty/timeout/TimeoutsHolder.java b/client/src/main/java/org/asynchttpclient/netty/timeout/TimeoutsHolder.java index 2d41bf57c9..06c2607450 100755 --- a/client/src/main/java/org/asynchttpclient/netty/timeout/TimeoutsHolder.java +++ b/client/src/main/java/org/asynchttpclient/netty/timeout/TimeoutsHolder.java @@ -21,7 +21,6 @@ import org.asynchttpclient.netty.NettyResponseFuture; import org.asynchttpclient.netty.request.NettyRequestSender; -import java.net.InetSocketAddress; import java.net.SocketAddress; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; diff --git a/client/src/test/java/org/asynchttpclient/test/EventCollectingHandler.java b/client/src/test/java/org/asynchttpclient/test/EventCollectingHandler.java index a3bb6145ea..9a88ae513d 100644 --- a/client/src/test/java/org/asynchttpclient/test/EventCollectingHandler.java +++ b/client/src/test/java/org/asynchttpclient/test/EventCollectingHandler.java @@ -21,7 +21,6 @@ import org.testng.Assert; import javax.net.ssl.SSLSession; -import java.net.InetSocketAddress; import java.net.SocketAddress; import java.util.List; import java.util.Queue; diff --git a/example/src/main/java/org/asynchttpclient/example/completable/CompletableFutures.java b/example/src/main/java/org/asynchttpclient/example/completable/CompletableFutures.java index bbb5df2917..f36878143a 100644 --- a/example/src/main/java/org/asynchttpclient/example/completable/CompletableFutures.java +++ b/example/src/main/java/org/asynchttpclient/example/completable/CompletableFutures.java @@ -48,9 +48,6 @@ public static void main(String[] args) throws IOException { .thenApply(Response::getResponseBody) .thenAccept(System.out::println) .join(); - } - // support unix domain socket - try (AsyncHttpClient asyncHttpClient = asyncHttpClient(config)) { asyncHttpClient .prepareGet("/hello/unix") // will add http:127.0.0.1:80 .execute() diff --git a/extras/rxjava2/src/main/java/org/asynchttpclient/extras/rxjava2/maybe/AbstractMaybeAsyncHandlerBridge.java b/extras/rxjava2/src/main/java/org/asynchttpclient/extras/rxjava2/maybe/AbstractMaybeAsyncHandlerBridge.java index c174a6ebbb..528e1f4123 100644 --- a/extras/rxjava2/src/main/java/org/asynchttpclient/extras/rxjava2/maybe/AbstractMaybeAsyncHandlerBridge.java +++ b/extras/rxjava2/src/main/java/org/asynchttpclient/extras/rxjava2/maybe/AbstractMaybeAsyncHandlerBridge.java @@ -27,7 +27,6 @@ import org.slf4j.LoggerFactory; import javax.net.ssl.SSLSession; -import java.net.InetSocketAddress; import java.net.SocketAddress; import java.util.Arrays; import java.util.List;