Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract common http logic to server #31311

Merged
merged 31 commits into from
Jun 14, 2018
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
2fe6a19
WIP
Tim-Brooks Jun 7, 2018
cf8eb63
WIP
Tim-Brooks Jun 7, 2018
ca521f7
WIP
Tim-Brooks Jun 8, 2018
39c7caf
WIP
Tim-Brooks Jun 8, 2018
26b5d7a
WIP
Tim-Brooks Jun 8, 2018
6322fa2
Work on transition nio to new server work
Tim-Brooks Jun 9, 2018
e713fce
Make some adjustments to how responses are created
Tim-Brooks Jun 9, 2018
6333342
Merge remote-tracking branch 'upstream/master' into default_rest_channel
Tim-Brooks Jun 11, 2018
bc9154a
Fix checkstyle
Tim-Brooks Jun 11, 2018
fc04691
Continue getting nio to work with new path
Tim-Brooks Jun 11, 2018
42a013e
Fix tests
Tim-Brooks Jun 11, 2018
2a31aed
WIP
Tim-Brooks Jun 12, 2018
1ed37e6
Merge remote-tracking branch 'upstream/master' into default_rest_channel
Tim-Brooks Jun 13, 2018
50ca17f
Merge remote-tracking branch 'upstream/master' into default_rest_channel
Tim-Brooks Jun 13, 2018
9b796d6
Work on netty module integration
Tim-Brooks Jun 13, 2018
26c1d5e
WIP
Tim-Brooks Jun 13, 2018
c3639c4
Alot of renaming
Tim-Brooks Jun 13, 2018
48fafd2
WIP
Tim-Brooks Jun 13, 2018
e7e63e1
WIP
Tim-Brooks Jun 13, 2018
1aac362
Fix tests
Tim-Brooks Jun 13, 2018
893501d
Renname channel
Tim-Brooks Jun 13, 2018
dc03bb2
Cleanups
Tim-Brooks Jun 13, 2018
ea79116
Fix license
Tim-Brooks Jun 14, 2018
29d2483
Fix head requests
Tim-Brooks Jun 14, 2018
1d7455d
Merge remote-tracking branch 'upstream/master' into default_rest_channel
Tim-Brooks Jun 14, 2018
0da1945
Merge remote-tracking branch 'upstream/master' into default_rest_channel
Tim-Brooks Jun 14, 2018
d67d6d7
Changes from review
Tim-Brooks Jun 14, 2018
a0948f2
Fix checkstyle
Tim-Brooks Jun 14, 2018
9bc92cd
Fix checkstyle
Tim-Brooks Jun 14, 2018
b67825c
Fix forbidden
Tim-Brooks Jun 14, 2018
3a8dfe9
Fix test
Tim-Brooks Jun 14, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,252 +19,58 @@

package org.elasticsearch.http.netty4;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.cookie.ServerCookieDecoder;
import io.netty.handler.codec.http.cookie.ServerCookieEncoder;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.http.HttpHandlingSettings;
import org.elasticsearch.http.netty4.cors.Netty4CorsHandler;
import org.elasticsearch.rest.AbstractRestChannel;
import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.http.HttpChannel;
import org.elasticsearch.http.HttpResponse;
import org.elasticsearch.transport.netty4.Netty4Utils;

import java.util.Collections;
import java.util.EnumMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.net.InetSocketAddress;

final class Netty4HttpChannel extends AbstractRestChannel {
public class Netty4HttpChannel implements HttpChannel {

private final Netty4HttpServerTransport transport;
private final Channel channel;
private final FullHttpRequest nettyRequest;
private final int sequence;
private final ThreadContext threadContext;
private final HttpHandlingSettings handlingSettings;

/**
* @param transport The corresponding <code>NettyHttpServerTransport</code> where this channel belongs to.
* @param request The request that is handled by this channel.
* @param sequence The pipelining sequence number for this request
* @param handlingSettings true if error messages should include stack traces.
* @param threadContext the thread context for the channel
*/
Netty4HttpChannel(Netty4HttpServerTransport transport, Netty4HttpRequest request, int sequence, HttpHandlingSettings handlingSettings,
ThreadContext threadContext) {
super(request, handlingSettings.getDetailedErrorsEnabled());
this.transport = transport;
this.channel = request.getChannel();
this.nettyRequest = request.request();
this.sequence = sequence;
this.threadContext = threadContext;
this.handlingSettings = handlingSettings;
Netty4HttpChannel(Channel channel) {
this.channel = channel;
}

@Override
protected BytesStreamOutput newBytesOutput() {
return new ReleasableBytesStreamOutput(transport.bigArrays);
}

@Override
public void sendResponse(RestResponse response) {
// if the response object was created upstream, then use it;
// otherwise, create a new one
ByteBuf buffer = Netty4Utils.toByteBuf(response.content());
final FullHttpResponse resp;
if (HttpMethod.HEAD.equals(nettyRequest.method())) {
resp = newResponse(Unpooled.EMPTY_BUFFER);
} else {
resp = newResponse(buffer);
}
resp.setStatus(getStatus(response.status()));

Netty4CorsHandler.setCorsResponseHeaders(nettyRequest, resp, transport.getCorsConfig());

String opaque = nettyRequest.headers().get("X-Opaque-Id");
if (opaque != null) {
setHeaderField(resp, "X-Opaque-Id", opaque);
}

// Add all custom headers
addCustomHeaders(resp, response.getHeaders());
addCustomHeaders(resp, threadContext.getResponseHeaders());

BytesReference content = response.content();
boolean releaseContent = content instanceof Releasable;
boolean releaseBytesStreamOutput = bytesOutputOrNull() instanceof ReleasableBytesStreamOutput;
try {
// If our response doesn't specify a content-type header, set one
setHeaderField(resp, HttpHeaderNames.CONTENT_TYPE.toString(), response.contentType(), false);
// If our response has no content-length, calculate and set one
setHeaderField(resp, HttpHeaderNames.CONTENT_LENGTH.toString(), String.valueOf(buffer.readableBytes()), false);

addCookies(resp);

final ChannelPromise promise = channel.newPromise();

if (releaseContent) {
promise.addListener(f -> ((Releasable) content).close());
}

if (releaseBytesStreamOutput) {
promise.addListener(f -> bytesOutputOrNull().close());
}

if (isCloseConnection()) {
promise.addListener(ChannelFutureListener.CLOSE);
}

Netty4HttpResponse newResponse = new Netty4HttpResponse(sequence, resp);

channel.writeAndFlush(newResponse, promise);
releaseContent = false;
releaseBytesStreamOutput = false;
} finally {
if (releaseContent) {
((Releasable) content).close();
}
if (releaseBytesStreamOutput) {
bytesOutputOrNull().close();
}
}
}

private void setHeaderField(HttpResponse resp, String headerField, String value) {
setHeaderField(resp, headerField, value, true);
}

private void setHeaderField(HttpResponse resp, String headerField, String value, boolean override) {
if (override || !resp.headers().contains(headerField)) {
resp.headers().add(headerField, value);
}
}

private void addCookies(HttpResponse resp) {
if (handlingSettings.isResetCookies()) {
String cookieString = nettyRequest.headers().get(HttpHeaderNames.COOKIE);
if (cookieString != null) {
Set<io.netty.handler.codec.http.cookie.Cookie> cookies = ServerCookieDecoder.STRICT.decode(cookieString);
if (!cookies.isEmpty()) {
// Reset the cookies if necessary.
resp.headers().set(HttpHeaderNames.SET_COOKIE, ServerCookieEncoder.STRICT.encode(cookies));
}
}
}
}

private void addCustomHeaders(HttpResponse response, Map<String, List<String>> customHeaders) {
if (customHeaders != null) {
for (Map.Entry<String, List<String>> headerEntry : customHeaders.entrySet()) {
for (String headerValue : headerEntry.getValue()) {
setHeaderField(response, headerEntry.getKey(), headerValue);
public void sendResponse(HttpResponse response, ActionListener<Void> listener) {
ChannelPromise writePromise = channel.newPromise();
writePromise.addListener(f -> {
if (f.isSuccess()) {
listener.onResponse(null);
} else {
final Throwable cause = f.cause();
Netty4Utils.maybeDie(cause);
if (cause instanceof Error) {
listener.onFailure(new Exception(cause));
} else {
listener.onFailure((Exception) cause);
}
}
}
});
channel.writeAndFlush(response, writePromise);
}

// Determine if the request protocol version is HTTP 1.0
private boolean isHttp10() {
return nettyRequest.protocolVersion().equals(HttpVersion.HTTP_1_0);
}

// Determine if the request connection should be closed on completion.
private boolean isCloseConnection() {
final boolean http10 = isHttp10();
return HttpHeaderValues.CLOSE.contentEqualsIgnoreCase(nettyRequest.headers().get(HttpHeaderNames.CONNECTION)) ||
(http10 && !HttpHeaderValues.KEEP_ALIVE.contentEqualsIgnoreCase(nettyRequest.headers().get(HttpHeaderNames.CONNECTION)));
@Override
public InetSocketAddress getLocalAddress() {
return (InetSocketAddress) channel.localAddress();
}

// Create a new {@link HttpResponse} to transmit the response for the netty request.
private FullHttpResponse newResponse(ByteBuf buffer) {
final boolean http10 = isHttp10();
final boolean close = isCloseConnection();
// Build the response object.
final HttpResponseStatus status = HttpResponseStatus.OK; // default to initialize
final FullHttpResponse response;
if (http10) {
response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_0, status, buffer);
if (!close) {
response.headers().add(HttpHeaderNames.CONNECTION, "Keep-Alive");
}
} else {
response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status, buffer);
}
return response;
@Override
public InetSocketAddress getRemoteAddress() {
return (InetSocketAddress) channel.remoteAddress();
}

private static Map<RestStatus, HttpResponseStatus> MAP;

static {
EnumMap<RestStatus, HttpResponseStatus> map = new EnumMap<>(RestStatus.class);
map.put(RestStatus.CONTINUE, HttpResponseStatus.CONTINUE);
map.put(RestStatus.SWITCHING_PROTOCOLS, HttpResponseStatus.SWITCHING_PROTOCOLS);
map.put(RestStatus.OK, HttpResponseStatus.OK);
map.put(RestStatus.CREATED, HttpResponseStatus.CREATED);
map.put(RestStatus.ACCEPTED, HttpResponseStatus.ACCEPTED);
map.put(RestStatus.NON_AUTHORITATIVE_INFORMATION, HttpResponseStatus.NON_AUTHORITATIVE_INFORMATION);
map.put(RestStatus.NO_CONTENT, HttpResponseStatus.NO_CONTENT);
map.put(RestStatus.RESET_CONTENT, HttpResponseStatus.RESET_CONTENT);
map.put(RestStatus.PARTIAL_CONTENT, HttpResponseStatus.PARTIAL_CONTENT);
map.put(RestStatus.MULTI_STATUS, HttpResponseStatus.INTERNAL_SERVER_ERROR); // no status for this??
map.put(RestStatus.MULTIPLE_CHOICES, HttpResponseStatus.MULTIPLE_CHOICES);
map.put(RestStatus.MOVED_PERMANENTLY, HttpResponseStatus.MOVED_PERMANENTLY);
map.put(RestStatus.FOUND, HttpResponseStatus.FOUND);
map.put(RestStatus.SEE_OTHER, HttpResponseStatus.SEE_OTHER);
map.put(RestStatus.NOT_MODIFIED, HttpResponseStatus.NOT_MODIFIED);
map.put(RestStatus.USE_PROXY, HttpResponseStatus.USE_PROXY);
map.put(RestStatus.TEMPORARY_REDIRECT, HttpResponseStatus.TEMPORARY_REDIRECT);
map.put(RestStatus.BAD_REQUEST, HttpResponseStatus.BAD_REQUEST);
map.put(RestStatus.UNAUTHORIZED, HttpResponseStatus.UNAUTHORIZED);
map.put(RestStatus.PAYMENT_REQUIRED, HttpResponseStatus.PAYMENT_REQUIRED);
map.put(RestStatus.FORBIDDEN, HttpResponseStatus.FORBIDDEN);
map.put(RestStatus.NOT_FOUND, HttpResponseStatus.NOT_FOUND);
map.put(RestStatus.METHOD_NOT_ALLOWED, HttpResponseStatus.METHOD_NOT_ALLOWED);
map.put(RestStatus.NOT_ACCEPTABLE, HttpResponseStatus.NOT_ACCEPTABLE);
map.put(RestStatus.PROXY_AUTHENTICATION, HttpResponseStatus.PROXY_AUTHENTICATION_REQUIRED);
map.put(RestStatus.REQUEST_TIMEOUT, HttpResponseStatus.REQUEST_TIMEOUT);
map.put(RestStatus.CONFLICT, HttpResponseStatus.CONFLICT);
map.put(RestStatus.GONE, HttpResponseStatus.GONE);
map.put(RestStatus.LENGTH_REQUIRED, HttpResponseStatus.LENGTH_REQUIRED);
map.put(RestStatus.PRECONDITION_FAILED, HttpResponseStatus.PRECONDITION_FAILED);
map.put(RestStatus.REQUEST_ENTITY_TOO_LARGE, HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE);
map.put(RestStatus.REQUEST_URI_TOO_LONG, HttpResponseStatus.REQUEST_URI_TOO_LONG);
map.put(RestStatus.UNSUPPORTED_MEDIA_TYPE, HttpResponseStatus.UNSUPPORTED_MEDIA_TYPE);
map.put(RestStatus.REQUESTED_RANGE_NOT_SATISFIED, HttpResponseStatus.REQUESTED_RANGE_NOT_SATISFIABLE);
map.put(RestStatus.EXPECTATION_FAILED, HttpResponseStatus.EXPECTATION_FAILED);
map.put(RestStatus.UNPROCESSABLE_ENTITY, HttpResponseStatus.BAD_REQUEST);
map.put(RestStatus.LOCKED, HttpResponseStatus.BAD_REQUEST);
map.put(RestStatus.FAILED_DEPENDENCY, HttpResponseStatus.BAD_REQUEST);
map.put(RestStatus.TOO_MANY_REQUESTS, HttpResponseStatus.TOO_MANY_REQUESTS);
map.put(RestStatus.INTERNAL_SERVER_ERROR, HttpResponseStatus.INTERNAL_SERVER_ERROR);
map.put(RestStatus.NOT_IMPLEMENTED, HttpResponseStatus.NOT_IMPLEMENTED);
map.put(RestStatus.BAD_GATEWAY, HttpResponseStatus.BAD_GATEWAY);
map.put(RestStatus.SERVICE_UNAVAILABLE, HttpResponseStatus.SERVICE_UNAVAILABLE);
map.put(RestStatus.GATEWAY_TIMEOUT, HttpResponseStatus.GATEWAY_TIMEOUT);
map.put(RestStatus.HTTP_VERSION_NOT_SUPPORTED, HttpResponseStatus.HTTP_VERSION_NOT_SUPPORTED);
MAP = Collections.unmodifiableMap(map);
@Override
public void close() {
channel.close();
}

private static HttpResponseStatus getStatus(RestStatus status) {
return MAP.getOrDefault(status, HttpResponseStatus.INTERNAL_SERVER_ERROR);
public Channel getNettyChannel() {
return channel;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public void write(final ChannelHandlerContext ctx, final Object msg, final Chann
try {
List<Tuple<Netty4HttpResponse, ChannelPromise>> readyResponses = aggregator.write(response, promise);
for (Tuple<Netty4HttpResponse, ChannelPromise> readyResponse : readyResponses) {
ctx.write(readyResponse.v1().getResponse(), readyResponse.v2());
ctx.write(readyResponse.v1(), readyResponse.v2());
}
success = true;
} catch (IllegalStateException e) {
Expand Down
Loading