Skip to content

Commit

Permalink
Merge #3250 into 1.2.0-M3
Browse files Browse the repository at this point in the history
  • Loading branch information
violetagg committed May 28, 2024
2 parents 57ee1f3 + 94f48e0 commit c33825e
Show file tree
Hide file tree
Showing 8 changed files with 682 additions and 110 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ else if (!SCHEME_PATTERN.matcher(tempUri).matches()) {
*/
protected abstract HttpMessage outboundHttpMessage();

HttpMessage prepareHttpMessage(ByteBuf buffer) {
protected HttpMessage prepareHttpMessage(ByteBuf buffer) {
HttpMessage msg;
if (HttpUtil.getContentLength(outboundHttpMessage(), -1) == 0 ||
isContentAlwaysEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.DecoderResult;
Expand Down Expand Up @@ -57,7 +55,7 @@
*
* @author Violeta Georgieva
*/
final class Http2StreamBridgeServerHandler extends ChannelDuplexHandler implements ChannelFutureListener {
final class Http2StreamBridgeServerHandler extends ChannelDuplexHandler {

final BiPredicate<HttpServerRequest, HttpServerResponse> compress;
final ServerCookieDecoder cookieDecoder;
Expand Down Expand Up @@ -196,31 +194,11 @@ else if (msg instanceof HttpResponse && HttpResponseStatus.CONTINUE.equals(((Htt
}
else {
//"FutureReturnValueIgnored" this is deliberate
ChannelFuture f = ctx.write(msg, promise);
ctx.write(msg, promise);
if (msg instanceof LastHttpContent) {
pendingResponse = false;
f.addListener(this);
ctx.read();
}
}
}

@Override
public void operationComplete(ChannelFuture future) {
if (!future.isSuccess()) {
if (HttpServerOperations.log.isDebugEnabled()) {
HttpServerOperations.log.debug(format(future.channel(),
"Sending last HTTP packet was not successful, terminating the channel"),
future.cause());
}
}
else {
if (HttpServerOperations.log.isDebugEnabled()) {
HttpServerOperations.log.debug(format(future.channel(),
"Last HTTP packet was sent, terminating the channel"));
}
}

HttpServerOperations.cleanHandlerTerminate(future.channel());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.DecoderResult;
Expand Down Expand Up @@ -49,7 +48,7 @@

import static reactor.netty.ReactorNetty.format;

final class Http3StreamBridgeServerHandler extends ChannelDuplexHandler implements ChannelFutureListener {
final class Http3StreamBridgeServerHandler extends ChannelDuplexHandler {
final BiPredicate<HttpServerRequest, HttpServerResponse> compress;
final ServerCookieDecoder cookieDecoder;
final ServerCookieEncoder cookieEncoder;
Expand Down Expand Up @@ -179,29 +178,9 @@ else if (msg instanceof HttpResponse && HttpResponseStatus.CONTINUE.equals(((Htt
ChannelFuture f = ctx.write(msg, promise);
if (msg instanceof LastHttpContent) {
pendingResponse = false;
f.addListener(this)
.addListener(QuicStreamChannel.SHUTDOWN_OUTPUT);
f.addListener(QuicStreamChannel.SHUTDOWN_OUTPUT);
ctx.read();
}
}
}

@Override
public void operationComplete(ChannelFuture future) {
if (!future.isSuccess()) {
if (HttpServerOperations.log.isDebugEnabled()) {
HttpServerOperations.log.debug(format(future.channel(),
"Sending last HTTP packet was not successful, terminating the channel"),
future.cause());
}
}
else {
if (HttpServerOperations.log.isDebugEnabled()) {
HttpServerOperations.log.debug(format(future.channel(),
"Last HTTP packet was sent, terminating the channel"));
}
}

HttpServerOperations.cleanHandlerTerminate(future.channel());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.util.AsciiString;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.GenericFutureListener;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
Expand Down Expand Up @@ -109,7 +110,7 @@
* @author Stephane Maldini1
*/
class HttpServerOperations extends HttpOperations<HttpServerRequest, HttpServerResponse>
implements HttpServerRequest, HttpServerResponse {
implements HttpServerRequest, HttpServerResponse, GenericFutureListener<io.netty.util.concurrent.Future<? super Void>> {

final BiPredicate<HttpServerRequest, HttpServerResponse> configuredCompressionPredicate;
final ConnectionInfo connectionInfo;
Expand All @@ -133,6 +134,7 @@ class HttpServerOperations extends HttpOperations<HttpServerRequest, HttpServerR
String path;
Future<?> requestTimeoutFuture;
Consumer<? super HttpHeaders> trailerHeadersConsumer;
HttpMessage fullHttpResponse;

volatile Context currentContext;

Expand All @@ -148,6 +150,7 @@ class HttpServerOperations extends HttpOperations<HttpServerRequest, HttpServerR
this.formDecoderProvider = replaced.formDecoderProvider;
this.is100ContinueExpected = replaced.is100ContinueExpected;
this.isHttp2 = replaced.isHttp2;
this.fullHttpResponse = replaced.fullHttpResponse;
this.mapHandle = replaced.mapHandle;
this.nettyRequest = replaced.nettyRequest;
this.nettyResponse = replaced.nettyResponse;
Expand Down Expand Up @@ -371,7 +374,12 @@ final boolean isHttp2() {

@Override
public HttpServerResponse keepAlive(boolean keepAlive) {
HttpUtil.setKeepAlive(nettyResponse, keepAlive);
if (fullHttpResponse == null) {
HttpUtil.setKeepAlive(nettyResponse, keepAlive);
}
else {
HttpUtil.setKeepAlive(fullHttpResponse, keepAlive);
}
return this;
}

Expand Down Expand Up @@ -514,11 +522,92 @@ public ZonedDateTime timestamp() {
return timestamp;
}

@Override
@SuppressWarnings("unchecked")
public NettyOutbound send(Publisher<? extends ByteBuf> source) {
if (!channel().isActive()) {
return then(Mono.error(AbortedException.beforeSend()));
}
if (source instanceof Mono) {
return new PostHeadersNettyOutbound(((Mono<ByteBuf>) source)
.flatMap(b -> {
if (!hasSentHeaders()) {
try {
fullHttpResponse = prepareHttpMessage(b);

afterMarkSentHeaders();
}
catch (RuntimeException e) {
b.release();
return Mono.error(e);
}

onComplete();
return Mono.<Void>empty();
}

if (log.isDebugEnabled()) {
log.debug(format(channel(), "Dropped HTTP content, since response has been sent already: {}"), b);
}
b.release();
return Mono.empty();
})
.doOnDiscard(ByteBuf.class, ByteBuf::release), this, null);
}
return super.send(source);
}

@Override
public NettyOutbound sendObject(Object message) {
if (!channel().isActive()) {
ReactorNetty.safeRelease(message);
return then(Mono.error(AbortedException.beforeSend()));
}
if (message instanceof ByteBuf) {
ByteBuf b = (ByteBuf) message;
return new PostHeadersNettyOutbound(Mono.create(sink -> {
if (!hasSentHeaders()) {
try {
fullHttpResponse = prepareHttpMessage(b);

afterMarkSentHeaders();
}
catch (RuntimeException e) {
// If afterMarkSentHeaders throws an exception there is no need to release the ByteBuf here.
// It will be released by PostHeadersNettyOutbound as there are on error/cancel hooks
sink.error(e);
return;
}

onComplete();
sink.success();
}
else {
if (log.isDebugEnabled()) {
log.debug(format(channel(), "Dropped HTTP content, since response has been sent already: {}"), b);
}
b.release();
sink.success();
}
}), this, b);
}
return super.sendObject(message);
}

@Override
public Mono<Void> send() {
return FutureMono.deferFuture(() -> markSentHeaderAndBody() ?
channel().writeAndFlush(newFullBodyMessage(EMPTY_BUFFER)) :
channel().newSucceededFuture());
return Mono.create(sink -> {
if (!hasSentHeaders()) {
onComplete();
sink.success();
}
else {
if (log.isDebugEnabled()) {
log.debug(format(channel(), "Response has been sent already."));
}
sink.success();
}
});
}

@Override
Expand Down Expand Up @@ -575,6 +664,46 @@ public HttpServerResponse status(HttpResponseStatus status) {
return this;
}

@Override
public Mono<Void> then() {
if (!channel().isActive()) {
return Mono.error(AbortedException.beforeSend());
}

if (hasSentHeaders()) {
return Mono.empty();
}

return FutureMono.deferFuture(() -> {
if (!hasSentHeaders()) {
beforeMarkSentHeaders();

HttpMessage msg = outboundHttpMessage();
boolean last = false;
int contentLength = HttpUtil.getContentLength(msg, -1);
if (contentLength == 0 || isContentAlwaysEmpty()) {
last = true;
msg = newFullBodyMessage(Unpooled.EMPTY_BUFFER);
}
else if (contentLength > 0) {
responseHeaders.remove(HttpHeaderNames.TRANSFER_ENCODING);
}

afterMarkSentHeaders();

if (!last) {
return markSentHeaders() ? channel().writeAndFlush(msg) : channel().newSucceededFuture();
}
else {
return markSentHeaderAndBody() ? channel().writeAndFlush(msg) : channel().newSucceededFuture();
}
}
else {
return channel().newSucceededFuture();
}
});
}

@Override
public HttpServerResponse trailerHeaders(Consumer<? super HttpHeaders> trailerHeaders) {
this.trailerHeadersConsumer = Objects.requireNonNull(trailerHeaders, "trailerHeaders");
Expand Down Expand Up @@ -749,11 +878,10 @@ protected void onOutboundComplete() {
}
if (markSentHeaderAndBody()) {
if (log.isDebugEnabled()) {
log.debug(format(channel(), "No sendHeaders() called before complete, sending " +
"zero-length header"));
log.debug(format(channel(), "Headers are not sent before onComplete()."));
}

f = channel().writeAndFlush(newFullBodyMessage(EMPTY_BUFFER));
f = channel().writeAndFlush(fullHttpResponse != null ? fullHttpResponse : newFullBodyMessage(EMPTY_BUFFER));
}
else if (markSentBody()) {
HttpHeaders trailerHeaders = null;
Expand Down Expand Up @@ -790,35 +918,29 @@ else if (markSentBody()) {
}
else {
discard();
terminate();
return;
}
f.addListener(s -> {
discard();
if (!s.isSuccess() && log.isDebugEnabled()) {
log.debug(format(channel(), "Failed flushing last frame"), s.cause());
}
});

f.addListener(this);
}

static void cleanHandlerTerminate(Channel ch) {
ChannelOperations<?, ?> ops = get(ch);

if (ops == null) {
return;
}

ops.discard();

//Try to defer the disposing to leave a chance for any synchronous complete following this callback
if (!ops.isSubscriptionDisposed()) {
ch.eventLoop()
.execute(((HttpServerOperations) ops)::terminate);
@Override
public void operationComplete(io.netty.util.concurrent.Future<? super Void> future) {
if (!future.isSuccess()) {
if (log.isDebugEnabled()) {
log.debug(format(channel(), "Sending last HTTP packet was not successful, terminating the channel"),
future.cause());
}
}
else {
//if already disposed, we can immediately call terminate
((HttpServerOperations) ops).terminate();
if (log.isDebugEnabled()) {
log.debug(format(channel(), "Last HTTP packet was sent, terminating the channel"));
}
}

discard();

terminate();
}

static long requestsCounter(Channel channel) {
Expand Down Expand Up @@ -896,8 +1018,15 @@ else if (cause instanceof TooLongHttpHeaderException) {
}
}

//"FutureReturnValueIgnored" this is deliberate
ctx.channel().writeAndFlush(response);
if (ops instanceof HttpServerOperations) {
HttpServerOperations serverOps = (HttpServerOperations) ops;
serverOps.fullHttpResponse = response;
serverOps.onComplete();
}
else {
//"FutureReturnValueIgnored" this is deliberate
ctx.channel().writeAndFlush(response);
}

listener.onStateChange(ops, REQUEST_DECODING_FAILED);
}
Expand All @@ -922,13 +1051,15 @@ protected void onOutboundError(Throwable err) {
nettyResponse.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
responseHeaders.set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE);
channel().writeAndFlush(newFullBodyMessage(EMPTY_BUFFER))
.addListener(this)
.addListener(ChannelFutureListener.CLOSE);
return;
}

markSentBody();
log.error(format(channel(), "Error finishing response. Closing connection"), err);
channel().writeAndFlush(EMPTY_BUFFER)
.addListener(this)
.addListener(ChannelFutureListener.CLOSE);
}

Expand Down
Loading

0 comments on commit c33825e

Please sign in to comment.