Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement the unix domain socket #1688

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions client/src/main/java/org/asynchttpclient/AsyncHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.asynchttpclient.netty.request.NettyRequest;

import javax.net.ssl.SSLSession;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.List;


Expand Down Expand Up @@ -132,7 +132,7 @@ default void onHostnameResolutionAttempt(String name) {
* @param name the name to be resolved
* @param addresses the resolved addresses
*/
default void onHostnameResolutionSuccess(String name, List<InetSocketAddress> addresses) {
default void onHostnameResolutionSuccess(String name, List<? extends SocketAddress> addresses) {
}

/**
Expand All @@ -153,7 +153,7 @@ default void onHostnameResolutionFailure(String name, Throwable cause) {
*
* @param remoteAddress the address we try to connect to
*/
default void onTcpConnectAttempt(InetSocketAddress remoteAddress) {
default void onTcpConnectAttempt(SocketAddress remoteAddress) {
}

/**
Expand All @@ -162,7 +162,7 @@ default void onTcpConnectAttempt(InetSocketAddress remoteAddress) {
* @param remoteAddress the address we try to connect to
* @param connection the connection
*/
default void onTcpConnectSuccess(InetSocketAddress remoteAddress, Channel connection) {
default void onTcpConnectSuccess(SocketAddress remoteAddress, Channel connection) {
}

/**
Expand All @@ -173,7 +173,7 @@ default void onTcpConnectSuccess(InetSocketAddress remoteAddress, Channel connec
* @param remoteAddress the address we try to connect to
* @param cause the cause of the failure
*/
default void onTcpConnectFailure(InetSocketAddress remoteAddress, Throwable cause) {
default void onTcpConnectFailure(SocketAddress remoteAddress, Throwable cause) {
}

// ////////////// TLS ///////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,13 @@ public interface AsyncHttpClientConfig {

boolean isUseNativeTransport();

String getUnixSocket();

default boolean isUseUnixDomain(){
String unixSocket = getUnixSocket();
return unixSocket !=null && !unixSocket.isEmpty();
}

Consumer<Channel> getHttpAdditionalChannelInitializer();

Consumer<Channel> getWsAdditionalChannelInitializer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ public class DefaultAsyncHttpClientConfig implements AsyncHttpClientConfig {
private final Map<ChannelOption<Object>, Object> channelOptions;
private final EventLoopGroup eventLoopGroup;
private final boolean useNativeTransport;
private final String unixSocket;
private final ByteBufAllocator allocator;
private final boolean tcpNoDelay;
private final boolean soReuseAddress;
Expand Down Expand Up @@ -209,6 +210,7 @@ private DefaultAsyncHttpClientConfig(// http
Map<ChannelOption<Object>, Object> channelOptions,
EventLoopGroup eventLoopGroup,
boolean useNativeTransport,
String unixSocket,
ByteBufAllocator allocator,
Timer nettyTimer,
ThreadFactory threadFactory,
Expand Down Expand Up @@ -295,6 +297,7 @@ private DefaultAsyncHttpClientConfig(// http
this.channelOptions = channelOptions;
this.eventLoopGroup = eventLoopGroup;
this.useNativeTransport = useNativeTransport;
this.unixSocket = unixSocket;
this.allocator = allocator;
this.nettyTimer = nettyTimer;
this.threadFactory = threadFactory;
Expand Down Expand Up @@ -621,6 +624,11 @@ public boolean isUseNativeTransport() {
return useNativeTransport;
}

@Override
public String getUnixSocket() {
return unixSocket;
}

@Override
public ByteBufAllocator getAllocator() {
return allocator;
Expand Down Expand Up @@ -738,6 +746,7 @@ public static class Builder {
private int httpClientCodecInitialBufferSize = defaultHttpClientCodecInitialBufferSize();
private int chunkedFileChunkSize = defaultChunkedFileChunkSize();
private boolean useNativeTransport = defaultUseNativeTransport();
private String unixSocket = defaultUnixSocket();
private ByteBufAllocator allocator;
private Map<ChannelOption<Object>, Object> channelOptions = new HashMap<>();
private EventLoopGroup eventLoopGroup;
Expand Down Expand Up @@ -821,6 +830,7 @@ public Builder(AsyncHttpClientConfig config) {
channelOptions.putAll(config.getChannelOptions());
eventLoopGroup = config.getEventLoopGroup();
useNativeTransport = config.isUseNativeTransport();
unixSocket = config.getUnixSocket();
allocator = config.getAllocator();
nettyTimer = config.getNettyTimer();
threadFactory = config.getThreadFactory();
Expand Down Expand Up @@ -1189,6 +1199,12 @@ public Builder setUseNativeTransport(boolean useNativeTransport) {
return this;
}

public Builder setUnixSocket(String unixSocket) {
setUseNativeTransport(true);
this.unixSocket = unixSocket;
return this;
}

public Builder setAllocator(ByteBufAllocator allocator) {
this.allocator = allocator;
return this;
Expand Down Expand Up @@ -1301,6 +1317,7 @@ public DefaultAsyncHttpClientConfig build() {
channelOptions.isEmpty() ? Collections.emptyMap() : Collections.unmodifiableMap(channelOptions),
eventLoopGroup,
useNativeTransport,
unixSocket,
allocator,
nettyTimer,
threadFactory,
Expand Down
39 changes: 33 additions & 6 deletions client/src/main/java/org/asynchttpclient/DefaultRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package org.asynchttpclient;

import io.netty.channel.unix.DomainSocketAddress;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.cookie.Cookie;
import io.netty.resolver.NameResolver;
Expand All @@ -25,6 +26,8 @@
import java.io.File;
import java.io.InputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.ArrayList;
Expand All @@ -39,8 +42,8 @@ public class DefaultRequest implements Request {
public final ProxyServer proxyServer;
private final String method;
private final Uri uri;
private final InetAddress address;
private final InetAddress localAddress;
private final SocketAddress address;
private final SocketAddress localAddress;
private final HttpHeaders headers;
private final List<Cookie> cookies;
private final byte[] byteData;
Expand All @@ -61,13 +64,14 @@ public class DefaultRequest implements Request {
private final Charset charset;
private final ChannelPoolPartitioning channelPoolPartitioning;
private final NameResolver<InetAddress> nameResolver;
private final NameResolver<DomainSocketAddress> domainNameResolver;
// lazily loaded
private List<Param> queryParams;

public DefaultRequest(String method,
Uri uri,
InetAddress address,
InetAddress localAddress,
SocketAddress address,
SocketAddress localAddress,
HttpHeaders headers,
List<Cookie> cookies,
byte[] byteData,
Expand All @@ -88,7 +92,8 @@ public DefaultRequest(String method,
long rangeOffset,
Charset charset,
ChannelPoolPartitioning channelPoolPartitioning,
NameResolver<InetAddress> nameResolver) {
NameResolver<InetAddress> nameResolver,
NameResolver<DomainSocketAddress> domainNameResolver) {
this.method = method;
this.uri = uri;
this.address = address;
Expand All @@ -114,6 +119,7 @@ public DefaultRequest(String method,
this.charset = charset;
this.channelPoolPartitioning = channelPoolPartitioning;
this.nameResolver = nameResolver;
this.domainNameResolver = domainNameResolver;
}

@Override
Expand All @@ -133,11 +139,27 @@ public Uri getUri() {

@Override
public InetAddress getAddress() {
return address;
if (!(address instanceof InetSocketAddress)) {
throw new IllegalArgumentException("address can't cast to InetAddress, please use the method of getSocketAddress");
}
return ((InetSocketAddress) address).getAddress();
}

@Override
public InetAddress getLocalAddress() {
if (!(localAddress instanceof InetSocketAddress)) {
throw new IllegalArgumentException("localAddress can't cast to InetAddress, please use the method of getLocalSocketAddress");
}
return ((InetSocketAddress) localAddress).getAddress();
}

@Override
public SocketAddress getSocketAddress() {
return address;
}

@Override
public SocketAddress getLocalSocketAddress() {
return localAddress;
}

Expand Down Expand Up @@ -246,6 +268,11 @@ public NameResolver<InetAddress> getNameResolver() {
return nameResolver;
}

@Override
public NameResolver<DomainSocketAddress> getDomainNameResolver() {
return domainNameResolver;
}

@Override
public List<Param> getQueryParams() {
if (queryParams == null)
Expand Down
19 changes: 18 additions & 1 deletion client/src/main/java/org/asynchttpclient/Request.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.asynchttpclient;

import io.netty.channel.unix.DomainSocketAddress;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.cookie.Cookie;
import io.netty.resolver.NameResolver;
Expand All @@ -28,6 +29,7 @@
import java.io.File;
import java.io.InputStream;
import java.net.InetAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.List;
Expand Down Expand Up @@ -62,7 +64,7 @@ public interface Request {
String getUrl();

/**
* @return the InetAddress to be used to bypass uri's hostname resolution
* @return the InetAddress to be used to bypass uri's hostname or unix domain path resolution
*/
InetAddress getAddress();

Expand All @@ -71,6 +73,16 @@ public interface Request {
*/
InetAddress getLocalAddress();

/**
* @return the local address to bind from
*/
SocketAddress getLocalSocketAddress();

/**
* @return the SocketAddress to be used to bypass uri's hostname or unix domain path resolution
*/
SocketAddress getSocketAddress();

/**
* @return the HTTP headers
*/
Expand Down Expand Up @@ -181,6 +193,11 @@ public interface Request {
*/
NameResolver<InetAddress> getNameResolver();

/**
* @return the NameResolver to be used to resolve hostnams's IP
*/
NameResolver<DomainSocketAddress> getDomainNameResolver();

/**
* @return a new request builder using this request as a prototype
*/
Expand Down
Loading