Skip to content

Commit

Permalink
Unify http channels and exception handling (#31379)
Browse files Browse the repository at this point in the history
This is a general cleanup of channels and exception handling in http.
This commit introduces a CloseableChannel that is a superclass of
TcpChannel and HttpChannel. This allows us to unify the closing logic
between tcp and http transports. Additionally, the normal http channels
are extracted to the abstract server transport.

Finally, this commit (mostly) unifies the exception handling between nio
and netty4 http server transports.
  • Loading branch information
Tim-Brooks authored Jun 19, 2018
1 parent 8fd1f5f commit 529e704
Show file tree
Hide file tree
Showing 28 changed files with 353 additions and 392 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.netty.channel.Channel;
import io.netty.channel.ChannelPromise;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.concurrent.CompletableContext;
import org.elasticsearch.http.HttpChannel;
import org.elasticsearch.http.HttpResponse;
import org.elasticsearch.transport.netty4.Netty4Utils;
Expand All @@ -31,9 +32,23 @@
public class Netty4HttpChannel implements HttpChannel {

private final Channel channel;
private final CompletableContext<Void> closeContext = new CompletableContext<>();

Netty4HttpChannel(Channel channel) {
this.channel = channel;
this.channel.closeFuture().addListener(f -> {
if (f.isSuccess()) {
closeContext.complete(null);
} else {
Throwable cause = f.cause();
if (cause instanceof Error) {
Netty4Utils.maybeDie(cause);
closeContext.completeExceptionally(new Exception(cause));
} else {
closeContext.completeExceptionally((Exception) cause);
}
}
});
}

@Override
Expand Down Expand Up @@ -65,6 +80,16 @@ public InetSocketAddress getRemoteAddress() {
return (InetSocketAddress) channel.remoteAddress();
}

@Override
public void addCloseListener(ActionListener<Void> listener) {
closeContext.addListener(ActionListener.toBiConsumer(listener));
}

@Override
public boolean isOpen() {
return channel.isOpen();
}

@Override
public void close() {
channel.close();
Expand All @@ -73,4 +98,12 @@ public void close() {
public Channel getNettyChannel() {
return channel;
}

@Override
public String toString() {
return "Netty4HttpChannel{" +
"localAddress=" + getLocalAddress() +
", remoteAddress=" + getRemoteAddress() +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.elasticsearch.http.HttpPipelinedRequest;
import org.elasticsearch.transport.netty4.Netty4Utils;

import static org.elasticsearch.http.netty4.Netty4HttpServerTransport.HTTP_CHANNEL_KEY;

@ChannelHandler.Sharable
class Netty4HttpRequestHandler extends SimpleChannelInboundHandler<HttpPipelinedRequest<FullHttpRequest>> {

Expand All @@ -40,7 +42,7 @@ class Netty4HttpRequestHandler extends SimpleChannelInboundHandler<HttpPipelined

@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpPipelinedRequest<FullHttpRequest> msg) throws Exception {
Netty4HttpChannel channel = ctx.channel().attr(Netty4HttpServerTransport.HTTP_CHANNEL_KEY).get();
Netty4HttpChannel channel = ctx.channel().attr(HTTP_CHANNEL_KEY).get();
FullHttpRequest request = msg.getRequest();

try {
Expand Down Expand Up @@ -75,7 +77,12 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpPipelinedRequest<Full
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
Netty4Utils.maybeDie(cause);
serverTransport.exceptionCaught(ctx, cause);
Netty4HttpChannel httpChannel = ctx.channel().attr(HTTP_CHANNEL_KEY).get();
if (cause instanceof Error) {
serverTransport.onException(httpChannel, new Exception(cause));
} else {
serverTransport.onException(httpChannel, (Exception) cause);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,13 @@
import io.netty.handler.timeout.ReadTimeoutException;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.util.AttributeKey;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.network.CloseableChannel;
import org.elasticsearch.common.network.NetworkAddress;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.NetworkExceptionHelper;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
Expand All @@ -57,14 +55,14 @@
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.http.AbstractHttpServerTransport;
import org.elasticsearch.http.BindHttpException;
import org.elasticsearch.http.HttpChannel;
import org.elasticsearch.http.HttpHandlingSettings;
import org.elasticsearch.http.HttpStats;
import org.elasticsearch.http.netty4.cors.Netty4CorsConfig;
import org.elasticsearch.http.netty4.cors.Netty4CorsConfigBuilder;
import org.elasticsearch.http.netty4.cors.Netty4CorsHandler;
import org.elasticsearch.rest.RestUtils;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.netty4.Netty4OpenChannelsHandler;
import org.elasticsearch.transport.netty4.Netty4Utils;

import java.io.IOException;
Expand Down Expand Up @@ -171,10 +169,6 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {

protected final List<Channel> serverChannels = new ArrayList<>();

// package private for testing
Netty4OpenChannelsHandler serverOpenChannels;


private final Netty4CorsConfig corsConfig;

public Netty4HttpServerTransport(Settings settings, NetworkService networkService, BigArrays bigArrays, ThreadPool threadPool,
Expand Down Expand Up @@ -216,8 +210,6 @@ public Settings settings() {
protected void doStart() {
boolean success = false;
try {
this.serverOpenChannels = new Netty4OpenChannelsHandler(logger);

serverBootstrap = new ServerBootstrap();

serverBootstrap.group(new NioEventLoopGroup(workerCount, daemonThreadFactory(settings,
Expand Down Expand Up @@ -281,10 +273,9 @@ static Netty4CorsConfig buildCorsConfig(Settings settings) {
builder.allowCredentials();
}
String[] strMethods = Strings.tokenizeToStringArray(SETTING_CORS_ALLOW_METHODS.get(settings), ",");
HttpMethod[] methods = Arrays.asList(strMethods)
.stream()
HttpMethod[] methods = Arrays.stream(strMethods)
.map(HttpMethod::valueOf)
.toArray(size -> new HttpMethod[size]);
.toArray(HttpMethod[]::new);
return builder.allowedRequestMethods(methods)
.maxAge(SETTING_CORS_MAX_AGE.get(settings))
.allowedRequestHeaders(Strings.tokenizeToStringArray(SETTING_CORS_ALLOW_HEADERS.get(settings), ","))
Expand Down Expand Up @@ -327,15 +318,21 @@ protected void doStop() {
Netty4Utils.closeChannels(serverChannels);
} catch (IOException e) {
logger.trace("exception while closing channels", e);
} finally {
serverChannels.clear();
}
serverChannels.clear();
}
}

if (serverOpenChannels != null) {
serverOpenChannels.close();
serverOpenChannels = null;
// TODO: Move all of channel closing to abstract class once server channels are handled
try {
CloseableChannel.closeChannels(new ArrayList<>(httpChannels), true);
} catch (Exception e) {
logger.warn("unexpected exception while closing http channels", e);
}
httpChannels.clear();



if (serverBootstrap != null) {
serverBootstrap.config().group().shutdownGracefully(0, 5, TimeUnit.SECONDS).awaitUninterruptibly();
Expand All @@ -349,38 +346,18 @@ protected void doClose() {

@Override
public HttpStats stats() {
Netty4OpenChannelsHandler channels = serverOpenChannels;
return new HttpStats(channels == null ? 0 : channels.numberOfOpenChannels(), channels == null ? 0 : channels.totalChannels());
return new HttpStats(httpChannels.size(), totalChannelsAccepted.get());
}

public Netty4CorsConfig getCorsConfig() {
return corsConfig;
}

protected void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
@Override
protected void onException(HttpChannel channel, Exception cause) {
if (cause instanceof ReadTimeoutException) {
if (logger.isTraceEnabled()) {
logger.trace("Read timeout [{}]", ctx.channel().remoteAddress());
logger.trace("Http read timeout {}", channel);
}
ctx.channel().close();
CloseableChannel.closeChannel(channel);;
} else {
if (!lifecycle.started()) {
// ignore
return;
}
if (!NetworkExceptionHelper.isCloseConnectionException(cause)) {
logger.warn(
(Supplier<?>) () -> new ParameterizedMessage(
"caught exception while handling client http traffic, closing connection {}", ctx.channel()),
cause);
ctx.channel().close();
} else {
logger.debug(
(Supplier<?>) () -> new ParameterizedMessage(
"caught exception while handling client http traffic, closing connection {}", ctx.channel()),
cause);
ctx.channel().close();
}
super.onException(channel, cause);
}
}

Expand All @@ -404,9 +381,8 @@ protected HttpChannelHandler(final Netty4HttpServerTransport transport, final Ht

@Override
protected void initChannel(Channel ch) throws Exception {
Netty4HttpChannel nettyTcpChannel = new Netty4HttpChannel(ch);
ch.attr(HTTP_CHANNEL_KEY).set(nettyTcpChannel);
ch.pipeline().addLast("openChannels", transport.serverOpenChannels);
Netty4HttpChannel nettyHttpChannel = new Netty4HttpChannel(ch);
ch.attr(HTTP_CHANNEL_KEY).set(nettyHttpChannel);
ch.pipeline().addLast("read_timeout", new ReadTimeoutHandler(transport.readTimeoutMillis, TimeUnit.MILLISECONDS));
final HttpRequestDecoder decoder = new HttpRequestDecoder(
handlingSettings.getMaxInitialLineLength(),
Expand All @@ -423,10 +399,11 @@ protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast("encoder_compress", new HttpContentCompressor(handlingSettings.getCompressionLevel()));
}
if (handlingSettings.isCorsEnabled()) {
ch.pipeline().addLast("cors", new Netty4CorsHandler(transport.getCorsConfig()));
ch.pipeline().addLast("cors", new Netty4CorsHandler(transport.corsConfig));
}
ch.pipeline().addLast("pipelining", new Netty4HttpPipeliningHandler(transport.logger, transport.pipeliningMaxEvents));
ch.pipeline().addLast("handler", requestHandler);
transport.serverAcceptedChannel(nettyHttpChannel);
}

@Override
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.CloseableChannel;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -91,7 +92,7 @@ protected void closeConnectionChannel(Transport transport, Transport.Connection
final Netty4Transport t = (Netty4Transport) transport;
@SuppressWarnings("unchecked")
final TcpTransport.NodeChannels channels = (TcpTransport.NodeChannels) connection;
TcpChannel.closeChannels(channels.getChannels().subList(0, randomIntBetween(1, channels.getChannels().size())), true);
CloseableChannel.closeChannels(channels.getChannels().subList(0, randomIntBetween(1, channels.getChannels().size())), true);
}

public void testConnectException() throws UnknownHostException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,17 @@ public class NioHttpChannel extends NioSocketChannel implements HttpChannel {
public void sendResponse(HttpResponse response, ActionListener<Void> listener) {
getContext().sendMessage(response, ActionListener.toBiConsumer(listener));
}

@Override
public void addCloseListener(ActionListener<Void> listener) {
addCloseListener(ActionListener.toBiConsumer(listener));
}

@Override
public String toString() {
return "NioHttpChannel{" +
"localAddress=" + getLocalAddress() +
", remoteAddress=" + getRemoteAddress() +
'}';
}
}
Loading

0 comments on commit 529e704

Please sign in to comment.