Skip to content

Commit

Permalink
Code optimization
Browse files Browse the repository at this point in the history
  • Loading branch information
silence-coding committed Jan 18, 2020
2 parents 56da156 + 677c43f commit 6408219
Show file tree
Hide file tree
Showing 11 changed files with 133 additions and 104 deletions.
21 changes: 19 additions & 2 deletions client/src/main/java/org/asynchttpclient/DefaultRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.io.File;
import java.io.InputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
Expand Down Expand Up @@ -137,12 +138,28 @@ public Uri getUri() {
}

@Override
public SocketAddress getAddress() {
public InetAddress getAddress() {
if (!(address instanceof InetSocketAddress)) {
throw new IllegalArgumentException("address can't cast to InetAddress, please use the method of getSocketAddress");
}
return ((InetSocketAddress) address).getAddress();
}

@Override
public InetAddress getLocalAddress() {
if (!(localAddress instanceof InetSocketAddress)) {
throw new IllegalArgumentException("address can't cast to InetAddress, please use the method of getSocketAddress");
}
return ((InetSocketAddress) localAddress).getAddress();
}

@Override
public SocketAddress getSocketAddress() {
return address;
}

@Override
public SocketAddress getLocalAddress() {
public SocketAddress getLocalSocketAddress() {
return localAddress;
}

Expand Down
16 changes: 13 additions & 3 deletions client/src/main/java/org/asynchttpclient/Request.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,24 @@ public interface Request {
String getUrl();

/**
* @return the SocketAddress to be used to bypass uri's hostname or unix domain path resolution
* @return the InetAddress to be used to bypass uri's hostname or unix domain path resolution
*/
InetAddress getAddress();

/**
* @return the local address to bind from
*/
SocketAddress getAddress();
InetAddress getLocalAddress();

/**
* @return the local address to bind from
*/
SocketAddress getLocalAddress();
SocketAddress getLocalSocketAddress();

/**
* @return the SocketAddress to be used to bypass uri's hostname or unix domain path resolution
*/
SocketAddress getSocketAddress();

/**
* @return the HTTP headers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ protected RequestBuilderBase(Request prototype, boolean disableUrlEncoding, bool
this.method = prototype.getMethod();
this.uriEncoder = UriEncoder.uriEncoder(disableUrlEncoding);
this.uri = prototype.getUri();
this.address = prototype.getAddress();
this.localAddress = prototype.getLocalAddress();
this.address = prototype.getSocketAddress();
this.localAddress = prototype.getLocalSocketAddress();
this.headers = new DefaultHttpHeaders(validateHeaders);
this.headers.add(prototype.getHeaders());
if (isNonEmpty(prototype.getCookies())) {
Expand Down Expand Up @@ -151,9 +151,6 @@ private T asDerivedType() {
}

public T setUrl(String url) {
if (!url.contains("://")){
url = "http://127.0.0.1:80" + url;
}
return setUri(Uri.create(url));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import io.netty.resolver.NameResolver;
import io.netty.util.Timer;
import io.netty.util.concurrent.*;
import io.netty.util.internal.StringUtil;
import io.netty.util.internal.PlatformDependent;
import org.asynchttpclient.*;
import org.asynchttpclient.channel.ChannelPool;
import org.asynchttpclient.channel.ChannelPoolPartitioning;
Expand All @@ -52,7 +52,6 @@
import org.asynchttpclient.netty.ssl.DefaultSslEngineFactory;
import org.asynchttpclient.proxy.ProxyServer;
import org.asynchttpclient.uri.Uri;
import org.asynchttpclient.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -126,11 +125,7 @@ public ChannelManager(final AsyncHttpClientConfig config, Timer nettyTimer) {
TransportFactory<? extends Channel, ? extends EventLoopGroup> transportFactory;
if (allowReleaseEventLoopGroup) {
if (config.isUseNativeTransport()) {
if (config.isUseUnixDomain()){
transportFactory = getDomainTransportFactory();
}else {
transportFactory = getNativeTransportFactory();
}
transportFactory = config.isUseUnixDomain() ? getDomainTransportFactory() : getNativeTransportFactory();
} else {
transportFactory = NioTransportFactory.INSTANCE;
}
Expand All @@ -145,17 +140,9 @@ public ChannelManager(final AsyncHttpClientConfig config, Timer nettyTimer) {
}
transportFactory = NioTransportFactory.INSTANCE;
} else if (eventLoopGroup instanceof EpollEventLoopGroup) {
if (config.isUseUnixDomain()){
transportFactory = new EpollDomainTransportFactory();
}else {
transportFactory = new EpollTransportFactory();
}
transportFactory = config.isUseUnixDomain() ? new EpollDomainTransportFactory() : new EpollTransportFactory();
} else if (eventLoopGroup instanceof KQueueEventLoopGroup) {
if (config.isUseUnixDomain()){
transportFactory = new KQueueDomainTransportFactory();
}else {
transportFactory = new KQueueTransportFactory();
}
transportFactory = config.isUseUnixDomain()? new KQueueDomainTransportFactory():new KQueueTransportFactory();
} else {
throw new IllegalArgumentException("Unknown event loop group " + eventLoopGroup.getClass().getSimpleName());
}
Expand Down Expand Up @@ -205,27 +192,33 @@ private Bootstrap newBootstrap(ChannelFactory<? extends Channel> channelFactory,

@SuppressWarnings("unchecked")
private TransportFactory<? extends Channel, ? extends EventLoopGroup> getNativeTransportFactory() {
try {
return (TransportFactory<? extends Channel, ? extends EventLoopGroup>) Class.forName("org.asynchttpclient.netty.channel.EpollTransportFactory").newInstance();
} catch (Exception e) {
try {
return (TransportFactory<? extends Channel, ? extends EventLoopGroup>) Class.forName("org.asynchttpclient.netty.channel.KQueueTransportFactory").newInstance();
} catch (Exception e1) {
throw new IllegalArgumentException("No suitable native transport (epoll or kqueue) available");
}
String nativeTransportFactoryClassName = null;
if (PlatformDependent.isOsx()) {
nativeTransportFactoryClassName = "org.asynchttpclient.netty.channel.KQueueTransportFactory";
} else if (!PlatformDependent.isWindows()) {
nativeTransportFactoryClassName = "org.asynchttpclient.netty.channel.EpollTransportFactory";
}
return loadNativeTransportFactory(nativeTransportFactoryClassName);
}

private TransportFactory<? extends Channel, ? extends EventLoopGroup> getDomainTransportFactory() {
private TransportFactory<? extends Channel, ? extends EventLoopGroup> loadNativeTransportFactory(String nativeTransportFactoryClassName) {
try {
return (TransportFactory<? extends Channel, ? extends EventLoopGroup>) Class.forName("org.asynchttpclient.netty.channel.EpollDomainTransportFactory").newInstance();
} catch (Exception e) {
try {
return (TransportFactory<? extends Channel, ? extends EventLoopGroup>) Class.forName("org.asynchttpclient.netty.channel.KQueueDomainTransportFactory").newInstance();
} catch (Exception e1) {
throw new IllegalArgumentException("No suitable native transport (epoll or kqueue) available");
if (nativeTransportFactoryClassName != null) {
return (TransportFactory<? extends Channel, ? extends EventLoopGroup>) Class.forName(nativeTransportFactoryClassName).newInstance();
}
} catch (Exception e) {
e.printStackTrace();
}
throw new IllegalArgumentException("No suitable native transport available");
}
private TransportFactory<? extends Channel, ? extends EventLoopGroup> getDomainTransportFactory() {
String nativeTransportFactoryClassName = null;
if (PlatformDependent.isOsx()) {
nativeTransportFactoryClassName = "org.asynchttpclient.netty.channel.KQueueDomainTransportFactory";
} else if (!PlatformDependent.isWindows()) {
nativeTransportFactoryClassName = "org.asynchttpclient.netty.channel.EpollDomainTransportFactory";
}
return loadNativeTransportFactory(nativeTransportFactoryClassName);
}

public void configureBootstraps(NettyRequestSender requestSender) {
Expand Down Expand Up @@ -382,6 +375,11 @@ public Future<Channel> updatePipelineForHttpTunneling(ChannelPipeline pipeline,

if (requestUri.isWebSocket()) {
pipeline.addAfter(AHC_HTTP_HANDLER, AHC_WS_HANDLER, wsHandler);

if (config.isEnableWebSocketCompression()) {
pipeline.addBefore(AHC_WS_HANDLER, WS_COMPRESSOR_HANDLER, WebSocketClientCompressionHandler.INSTANCE);
}

pipeline.remove(AHC_HTTP_HANDLER);
}
return whenHanshaked;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,23 +13,16 @@
*/
package org.asynchttpclient.netty.channel;

import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollDomainSocketChannel;
import io.netty.channel.epoll.EpollEventLoopGroup;
import org.asynchttpclient.util.ReflectionUtil;

import java.util.concurrent.ThreadFactory;

class EpollDomainTransportFactory implements TransportFactory<EpollDomainSocketChannel, EpollEventLoopGroup> {

EpollDomainTransportFactory() {
try {
Class.forName("io.netty.channel.epoll.Epoll");
} catch (ClassNotFoundException e) {
throw new IllegalStateException("The epoll transport is not available");
}
if (!Epoll.isAvailable()) {
throw new IllegalStateException("The epoll transport is not supported");
}
ReflectionUtil.loadEpollClass();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,23 +13,16 @@
*/
package org.asynchttpclient.netty.channel;

import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollSocketChannel;
import org.asynchttpclient.util.ReflectionUtil;

import java.util.concurrent.ThreadFactory;

class EpollTransportFactory implements TransportFactory<EpollSocketChannel, EpollEventLoopGroup> {

EpollTransportFactory() {
try {
Class.forName("io.netty.channel.epoll.Epoll");
} catch (ClassNotFoundException e) {
throw new IllegalStateException("The epoll transport is not available");
}
if (!Epoll.isAvailable()) {
throw new IllegalStateException("The epoll transport is not supported");
}
ReflectionUtil.loadEpollClass();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,23 +13,16 @@
*/
package org.asynchttpclient.netty.channel;

import io.netty.channel.kqueue.KQueue;
import io.netty.channel.kqueue.KQueueDomainSocketChannel;
import io.netty.channel.kqueue.KQueueEventLoopGroup;
import org.asynchttpclient.util.ReflectionUtil;

import java.util.concurrent.ThreadFactory;

class KQueueDomainTransportFactory implements TransportFactory<KQueueDomainSocketChannel, KQueueEventLoopGroup> {

KQueueDomainTransportFactory() {
try {
Class.forName("io.netty.channel.kqueue.KQueue");
} catch (ClassNotFoundException e) {
throw new IllegalStateException("The kqueue transport is not available");
}
if (!KQueue.isAvailable()) {
throw new IllegalStateException("The kqueue transport is not supported");
}
ReflectionUtil.loadKQueueClass();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,23 +13,16 @@
*/
package org.asynchttpclient.netty.channel;

import io.netty.channel.kqueue.KQueue;
import io.netty.channel.kqueue.KQueueEventLoopGroup;
import io.netty.channel.kqueue.KQueueSocketChannel;
import org.asynchttpclient.util.ReflectionUtil;

import java.util.concurrent.ThreadFactory;

class KQueueTransportFactory implements TransportFactory<KQueueSocketChannel, KQueueEventLoopGroup> {

KQueueTransportFactory() {
try {
Class.forName("io.netty.channel.kqueue.KQueue");
} catch (ClassNotFoundException e) {
throw new IllegalStateException("The kqueue transport is not available");
}
if (!KQueue.isAvailable()) {
throw new IllegalStateException("The kqueue transport is not supported");
}
ReflectionUtil.loadKQueueClass();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ private <T> ListenableFuture<T> sendRequestWithNewChannel(Request request,
protected void onSuccess(List<DomainSocketAddress> addresses) {
NettyConnectListener<T> connectListener = new NettyConnectListener<>(future,
NettyRequestSender.this, channelManager, connectionSemaphore);
NettyChannelConnector connector = new NettyChannelConnector(request.getLocalAddress(),
NettyChannelConnector connector = new NettyChannelConnector(request.getLocalSocketAddress(),
addresses, asyncHandler, clientState);
if (!future.isDone()) {
// Do not throw an exception when we need an extra connection for a redirect
Expand Down Expand Up @@ -335,7 +335,7 @@ protected void onFailure(Throwable cause) {
protected void onSuccess(List<InetSocketAddress> addresses) {
NettyConnectListener<T> connectListener = new NettyConnectListener<>(future,
NettyRequestSender.this, channelManager, connectionSemaphore);
NettyChannelConnector connector = new NettyChannelConnector(request.getLocalAddress(),
NettyChannelConnector connector = new NettyChannelConnector(request.getLocalSocketAddress(),
addresses, asyncHandler, clientState);
if (!future.isDone()) {
// Do not throw an exception when we need an extra connection for a redirect
Expand Down Expand Up @@ -381,9 +381,9 @@ private <T> Future<List<InetSocketAddress>> resolveAddresses(Request request,
InetSocketAddress unresolvedRemoteAddress = InetSocketAddress.createUnresolved(uri.getHost(), port);
scheduleRequestTimeout(future, unresolvedRemoteAddress);

if (request.getAddress() != null) {
if (request.getSocketAddress() != null) {
// bypass resolution
InetSocketAddress address = (InetSocketAddress) request.getAddress();
InetSocketAddress address = (InetSocketAddress) request.getSocketAddress();
if (address.getPort() != port){
address = new InetSocketAddress(address.getAddress(), port);
}
Expand All @@ -403,7 +403,7 @@ private <T> Future<List<DomainSocketAddress>> resolveDomainAddresses(Request req
} else {
DomainSocketAddress socketAddress = new DomainSocketAddress(config.getUnixSocket());
scheduleRequestTimeout(future, socketAddress);
SocketAddress address = request.getAddress();
SocketAddress address = request.getSocketAddress();
if (address != null) {
final Promise<List<DomainSocketAddress>> promise = ImmediateEventExecutor.INSTANCE.newPromise();
if (!(address instanceof DomainSocketAddress)){
Expand Down
41 changes: 41 additions & 0 deletions client/src/main/java/org/asynchttpclient/util/ReflectionUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright (c) 2017 AsyncHttpClient Project. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
* You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the Apache License Version 2.0 is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/
package org.asynchttpclient.util;

import io.netty.channel.epoll.Epoll;
import io.netty.channel.kqueue.KQueue;

public abstract class ReflectionUtil {

public static void loadEpollClass() {
try {
Class.forName("io.netty.channel.epoll.Epoll");
} catch (ClassNotFoundException e) {
throw new IllegalStateException("The epoll transport is not available");
}
if (!Epoll.isAvailable()) {
throw new IllegalStateException("The epoll transport is not supported");
}
}

public static void loadKQueueClass() {
try {
Class.forName("io.netty.channel.kqueue.KQueue");
} catch (ClassNotFoundException e) {
throw new IllegalStateException("The kqueue transport is not available");
}
if (!KQueue.isAvailable()) {
throw new IllegalStateException("The kqueue transport is not supported");
}
}
}
Loading

0 comments on commit 6408219

Please sign in to comment.