Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unify http channels and exception handling #31379

Merged
merged 3 commits into from
Jun 19, 2018
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,41 @@ public static void close(final Exception ex, final Iterable<? extends Closeable>
}
}

/**
* Closes all given {@link Closeable}s. Some of the {@linkplain Closeable}s may be null; they are
* ignored. After everything is closed, the method either throws the first {@link RuntimeException} it
* hit while closing with other exceptions added as suppressed, or completes normally if there were
* no exceptions. If the first exception is not a {@linkplain RuntimeException}, it is wrapped in a
* {@linkplain RuntimeException}.
*
* @param objects objects to close
*/
public static void closeAndConvertToRuntimeExceptions(final Iterable<? extends Closeable> objects) {
RuntimeException closingException = null;
for (final Closeable object : objects) {
try {
if (object != null) {
object.close();
}
} catch (RuntimeException e) {
if (closingException == null) {
closingException = e;
} else {
closingException.addSuppressed(e);
}
} catch (IOException e) {
if (closingException == null) {
closingException = new RuntimeException(e);
} {
closingException.addSuppressed(e);
}
}
}
if (closingException != null) {
throw closingException;
}
}

/**
* Closes all given {@link Closeable}s, suppressing all thrown exceptions. Some of the {@link Closeable}s may be null, they are ignored.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.netty.channel.Channel;
import io.netty.channel.ChannelPromise;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.concurrent.CompletableContext;
import org.elasticsearch.http.HttpChannel;
import org.elasticsearch.http.HttpResponse;
import org.elasticsearch.transport.netty4.Netty4Utils;
Expand All @@ -31,9 +32,23 @@
public class Netty4HttpChannel implements HttpChannel {

private final Channel channel;
private final CompletableContext<Void> closeContext = new CompletableContext<>();

Netty4HttpChannel(Channel channel) {
this.channel = channel;
this.channel.closeFuture().addListener(f -> {
if (f.isSuccess()) {
closeContext.complete(null);
} else {
Throwable cause = f.cause();
if (cause instanceof Error) {
Netty4Utils.maybeDie(cause);
closeContext.completeExceptionally(new Exception(cause));
} else {
closeContext.completeExceptionally((Exception) cause);
}
}
});
}

@Override
Expand Down Expand Up @@ -65,6 +80,16 @@ public InetSocketAddress getRemoteAddress() {
return (InetSocketAddress) channel.remoteAddress();
}

@Override
public void addCloseListener(ActionListener<Void> listener) {
closeContext.addListener(ActionListener.toBiConsumer(listener));
}

@Override
public boolean isOpen() {
return channel.isOpen();
}

@Override
public void close() {
channel.close();
Expand All @@ -73,4 +98,12 @@ public void close() {
public Channel getNettyChannel() {
return channel;
}

@Override
public String toString() {
return "Netty4HttpChannel{" +
"localAddress=" + getLocalAddress() +
", remoteAddress=" + getRemoteAddress() +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.elasticsearch.http.HttpPipelinedRequest;
import org.elasticsearch.transport.netty4.Netty4Utils;

import static org.elasticsearch.http.netty4.Netty4HttpServerTransport.HTTP_CHANNEL_KEY;

@ChannelHandler.Sharable
class Netty4HttpRequestHandler extends SimpleChannelInboundHandler<HttpPipelinedRequest<FullHttpRequest>> {

Expand All @@ -40,7 +42,7 @@ class Netty4HttpRequestHandler extends SimpleChannelInboundHandler<HttpPipelined

@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpPipelinedRequest<FullHttpRequest> msg) throws Exception {
Netty4HttpChannel channel = ctx.channel().attr(Netty4HttpServerTransport.HTTP_CHANNEL_KEY).get();
Netty4HttpChannel channel = ctx.channel().attr(HTTP_CHANNEL_KEY).get();
FullHttpRequest request = msg.getRequest();

try {
Expand Down Expand Up @@ -75,7 +77,12 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpPipelinedRequest<Full
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
Netty4Utils.maybeDie(cause);
serverTransport.exceptionCaught(ctx, cause);
Netty4HttpChannel httpChannel = ctx.channel().attr(HTTP_CHANNEL_KEY).get();
if (cause instanceof Error) {
serverTransport.onException(httpChannel, new Exception(cause));
} else {
serverTransport.onException(httpChannel, (Exception) cause);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,13 @@
import io.netty.handler.timeout.ReadTimeoutException;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.util.AttributeKey;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.network.CloseableChannel;
import org.elasticsearch.common.network.NetworkAddress;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.NetworkExceptionHelper;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
Expand All @@ -57,14 +55,14 @@
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.http.AbstractHttpServerTransport;
import org.elasticsearch.http.BindHttpException;
import org.elasticsearch.http.HttpChannel;
import org.elasticsearch.http.HttpHandlingSettings;
import org.elasticsearch.http.HttpStats;
import org.elasticsearch.http.netty4.cors.Netty4CorsConfig;
import org.elasticsearch.http.netty4.cors.Netty4CorsConfigBuilder;
import org.elasticsearch.http.netty4.cors.Netty4CorsHandler;
import org.elasticsearch.rest.RestUtils;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.netty4.Netty4OpenChannelsHandler;
import org.elasticsearch.transport.netty4.Netty4Utils;

import java.io.IOException;
Expand Down Expand Up @@ -171,10 +169,6 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {

protected final List<Channel> serverChannels = new ArrayList<>();

// package private for testing
Netty4OpenChannelsHandler serverOpenChannels;


private final Netty4CorsConfig corsConfig;

public Netty4HttpServerTransport(Settings settings, NetworkService networkService, BigArrays bigArrays, ThreadPool threadPool,
Expand Down Expand Up @@ -216,8 +210,6 @@ public Settings settings() {
protected void doStart() {
boolean success = false;
try {
this.serverOpenChannels = new Netty4OpenChannelsHandler(logger);

serverBootstrap = new ServerBootstrap();

serverBootstrap.group(new NioEventLoopGroup(workerCount, daemonThreadFactory(settings,
Expand Down Expand Up @@ -281,10 +273,9 @@ static Netty4CorsConfig buildCorsConfig(Settings settings) {
builder.allowCredentials();
}
String[] strMethods = Strings.tokenizeToStringArray(SETTING_CORS_ALLOW_METHODS.get(settings), ",");
HttpMethod[] methods = Arrays.asList(strMethods)
.stream()
HttpMethod[] methods = Arrays.stream(strMethods)
.map(HttpMethod::valueOf)
.toArray(size -> new HttpMethod[size]);
.toArray(HttpMethod[]::new);
return builder.allowedRequestMethods(methods)
.maxAge(SETTING_CORS_MAX_AGE.get(settings))
.allowedRequestHeaders(Strings.tokenizeToStringArray(SETTING_CORS_ALLOW_HEADERS.get(settings), ","))
Expand Down Expand Up @@ -327,15 +318,21 @@ protected void doStop() {
Netty4Utils.closeChannels(serverChannels);
} catch (IOException e) {
logger.trace("exception while closing channels", e);
} finally {
serverChannels.clear();
}
serverChannels.clear();
}
}

if (serverOpenChannels != null) {
serverOpenChannels.close();
serverOpenChannels = null;
// TODO: Move all of channel closing to abstract class once server channels are handled
try {
CloseableChannel.closeChannels(new ArrayList<>(httpChannels), true);
} catch (Exception e) {
logger.warn("unexpected exception while closing http channels", e);
}
httpChannels.clear();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be cleaner if we do it in a finally?




if (serverBootstrap != null) {
serverBootstrap.config().group().shutdownGracefully(0, 5, TimeUnit.SECONDS).awaitUninterruptibly();
Expand All @@ -349,38 +346,18 @@ protected void doClose() {

@Override
public HttpStats stats() {
Netty4OpenChannelsHandler channels = serverOpenChannels;
return new HttpStats(channels == null ? 0 : channels.numberOfOpenChannels(), channels == null ? 0 : channels.totalChannels());
return new HttpStats(httpChannels.size(), totalChannelsAccepted.get());
}

public Netty4CorsConfig getCorsConfig() {
return corsConfig;
}

protected void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
@Override
protected void onException(HttpChannel channel, Exception cause) {
if (cause instanceof ReadTimeoutException) {
if (logger.isTraceEnabled()) {
logger.trace("Read timeout [{}]", ctx.channel().remoteAddress());
logger.trace("Http read timeout {}", channel);
}
ctx.channel().close();
CloseableChannel.closeChannel(channel);;
} else {
if (!lifecycle.started()) {
// ignore
return;
}
if (!NetworkExceptionHelper.isCloseConnectionException(cause)) {
logger.warn(
(Supplier<?>) () -> new ParameterizedMessage(
"caught exception while handling client http traffic, closing connection {}", ctx.channel()),
cause);
ctx.channel().close();
} else {
logger.debug(
(Supplier<?>) () -> new ParameterizedMessage(
"caught exception while handling client http traffic, closing connection {}", ctx.channel()),
cause);
ctx.channel().close();
}
super.onException(channel, cause);
}
}

Expand All @@ -404,9 +381,8 @@ protected HttpChannelHandler(final Netty4HttpServerTransport transport, final Ht

@Override
protected void initChannel(Channel ch) throws Exception {
Netty4HttpChannel nettyTcpChannel = new Netty4HttpChannel(ch);
ch.attr(HTTP_CHANNEL_KEY).set(nettyTcpChannel);
ch.pipeline().addLast("openChannels", transport.serverOpenChannels);
Netty4HttpChannel nettyHttpChannel = new Netty4HttpChannel(ch);
ch.attr(HTTP_CHANNEL_KEY).set(nettyHttpChannel);
ch.pipeline().addLast("read_timeout", new ReadTimeoutHandler(transport.readTimeoutMillis, TimeUnit.MILLISECONDS));
final HttpRequestDecoder decoder = new HttpRequestDecoder(
handlingSettings.getMaxInitialLineLength(),
Expand All @@ -423,10 +399,11 @@ protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast("encoder_compress", new HttpContentCompressor(handlingSettings.getCompressionLevel()));
}
if (handlingSettings.isCorsEnabled()) {
ch.pipeline().addLast("cors", new Netty4CorsHandler(transport.getCorsConfig()));
ch.pipeline().addLast("cors", new Netty4CorsHandler(transport.corsConfig));
}
ch.pipeline().addLast("pipelining", new Netty4HttpPipeliningHandler(transport.logger, transport.pipeliningMaxEvents));
ch.pipeline().addLast("handler", requestHandler);
transport.serverAcceptedChannel(nettyHttpChannel);
}

@Override
Expand Down

This file was deleted.

Loading