Skip to content

Commit

Permalink
Immediately mark outbound as complete when sending Mono or Object
Browse files Browse the repository at this point in the history
Reduce then number of ChannelFuture listeners attached to the last message.
  • Loading branch information
violetagg committed May 21, 2024
1 parent b44c103 commit b936723
Show file tree
Hide file tree
Showing 7 changed files with 672 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ else if (!SCHEME_PATTERN.matcher(tempUri).matches()) {
*/
protected abstract HttpMessage outboundHttpMessage();

HttpMessage prepareHttpMessage(ByteBuf buffer) {
protected HttpMessage prepareHttpMessage(ByteBuf buffer) {
HttpMessage msg;
if (HttpUtil.getContentLength(outboundHttpMessage(), -1) == 0 ||
isContentAlwaysEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.DecoderResult;
Expand Down Expand Up @@ -57,7 +55,7 @@
*
* @author Violeta Georgieva
*/
final class Http2StreamBridgeServerHandler extends ChannelDuplexHandler implements ChannelFutureListener {
final class Http2StreamBridgeServerHandler extends ChannelDuplexHandler {

final BiPredicate<HttpServerRequest, HttpServerResponse> compress;
final ServerCookieDecoder cookieDecoder;
Expand Down Expand Up @@ -195,31 +193,11 @@ else if (msg instanceof HttpResponse && HttpResponseStatus.CONTINUE.equals(((Htt
}
else {
//"FutureReturnValueIgnored" this is deliberate
ChannelFuture f = ctx.write(msg, promise);
ctx.write(msg, promise);
if (msg instanceof LastHttpContent) {
pendingResponse = false;
f.addListener(this);
ctx.read();
}
}
}

@Override
public void operationComplete(ChannelFuture future) {
if (!future.isSuccess()) {
if (HttpServerOperations.log.isDebugEnabled()) {
HttpServerOperations.log.debug(format(future.channel(),
"Sending last HTTP packet was not successful, terminating the channel"),
future.cause());
}
}
else {
if (HttpServerOperations.log.isDebugEnabled()) {
HttpServerOperations.log.debug(format(future.channel(),
"Last HTTP packet was sent, terminating the channel"));
}
}

HttpServerOperations.cleanHandlerTerminate(future.channel());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.util.AsciiString;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.GenericFutureListener;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
Expand Down Expand Up @@ -109,7 +110,7 @@
* @author Stephane Maldini1
*/
class HttpServerOperations extends HttpOperations<HttpServerRequest, HttpServerResponse>
implements HttpServerRequest, HttpServerResponse {
implements HttpServerRequest, HttpServerResponse, GenericFutureListener<io.netty.util.concurrent.Future<? super Void>> {

final BiPredicate<HttpServerRequest, HttpServerResponse> configuredCompressionPredicate;
final ConnectionInfo connectionInfo;
Expand All @@ -133,6 +134,7 @@ class HttpServerOperations extends HttpOperations<HttpServerRequest, HttpServerR
String path;
Future<?> requestTimeoutFuture;
Consumer<? super HttpHeaders> trailerHeadersConsumer;
HttpMessage fullHttpResponse;

volatile Context currentContext;

Expand All @@ -148,6 +150,7 @@ class HttpServerOperations extends HttpOperations<HttpServerRequest, HttpServerR
this.formDecoderProvider = replaced.formDecoderProvider;
this.is100ContinueExpected = replaced.is100ContinueExpected;
this.isHttp2 = replaced.isHttp2;
this.fullHttpResponse = replaced.fullHttpResponse;
this.mapHandle = replaced.mapHandle;
this.nettyRequest = replaced.nettyRequest;
this.nettyResponse = replaced.nettyResponse;
Expand Down Expand Up @@ -514,11 +517,90 @@ public ZonedDateTime timestamp() {
return timestamp;
}

@Override
@SuppressWarnings("unchecked")
public NettyOutbound send(Publisher<? extends ByteBuf> source) {
if (!channel().isActive()) {
return then(Mono.error(AbortedException.beforeSend()));
}
if (source instanceof Mono) {
return new PostHeadersNettyOutbound(((Mono<ByteBuf>) source)
.flatMap(b -> {
if (!hasSentHeaders()) {
try {
beforeMarkSentHeaders();

fullHttpResponse = prepareHttpMessage(b);

afterMarkSentHeaders();
}
catch (RuntimeException e) {
b.release();
return Mono.error(e);
}

onComplete();
return Mono.<Void>empty();
}

log.debug(format(channel(), "Dropped HTTP content, since response has been sent already: {}"), b);
b.release();
return Mono.empty();
})
.doOnDiscard(ByteBuf.class, ByteBuf::release), this, null);
}
return super.send(source);
}

@Override
public NettyOutbound sendObject(Object message) {
if (!channel().isActive()) {
ReactorNetty.safeRelease(message);
return then(Mono.error(AbortedException.beforeSend()));
}
if (message instanceof ByteBuf) {
ByteBuf b = (ByteBuf) message;
return new PostHeadersNettyOutbound(Mono.create(sink -> {
if (!hasSentHeaders()) {
try {
beforeMarkSentHeaders();

fullHttpResponse = prepareHttpMessage(b);

afterMarkSentHeaders();
}
catch (RuntimeException e) {
// If afterMarkSentHeaders throws an exception there is no need to release the ByteBuf here.
// It will be released by PostHeadersNettyOutbound as there are on error/cancel hooks
sink.error(e);
return;
}

onComplete();
sink.success();
}
else {
log.debug(format(channel(), "Dropped HTTP content, since response has been sent already: {}"), b);
b.release();
sink.success();
}
}), this, b);
}
return super.sendObject(message);
}

@Override
public Mono<Void> send() {
return FutureMono.deferFuture(() -> markSentHeaderAndBody() ?
channel().writeAndFlush(newFullBodyMessage(EMPTY_BUFFER)) :
channel().newSucceededFuture());
return Mono.create(sink -> {
if (!hasSentHeaders()) {
onComplete();
sink.success();
}
else {
log.debug(format(channel(), "Response has been sent already."));
sink.success();
}
});
}

@Override
Expand Down Expand Up @@ -575,6 +657,46 @@ public HttpServerResponse status(HttpResponseStatus status) {
return this;
}

@Override
public Mono<Void> then() {
if (!channel().isActive()) {
return Mono.error(AbortedException.beforeSend());
}

if (hasSentHeaders()) {
return Mono.empty();
}

return FutureMono.deferFuture(() -> {
if (!hasSentHeaders()) {
beforeMarkSentHeaders();

HttpMessage msg = outboundHttpMessage();
boolean last = false;
int contentLength = HttpUtil.getContentLength(msg, -1);
if (contentLength == 0 || isContentAlwaysEmpty()) {
last = true;
msg = newFullBodyMessage(Unpooled.EMPTY_BUFFER);
}
else if (contentLength > 0) {
responseHeaders.remove(HttpHeaderNames.TRANSFER_ENCODING);
}

afterMarkSentHeaders();

if (!last) {
return markSentHeaders() ? channel().writeAndFlush(msg) : channel().newSucceededFuture();
}
else {
return markSentHeaderAndBody() ? channel().writeAndFlush(msg) : channel().newSucceededFuture();
}
}
else {
return channel().newSucceededFuture();
}
});
}

@Override
public HttpServerResponse trailerHeaders(Consumer<? super HttpHeaders> trailerHeaders) {
this.trailerHeadersConsumer = Objects.requireNonNull(trailerHeaders, "trailerHeaders");
Expand Down Expand Up @@ -749,11 +871,10 @@ protected void onOutboundComplete() {
}
if (markSentHeaderAndBody()) {
if (log.isDebugEnabled()) {
log.debug(format(channel(), "No sendHeaders() called before complete, sending " +
"zero-length header"));
log.debug(format(channel(), "Headers are not sent before onComplete()."));
}

f = channel().writeAndFlush(newFullBodyMessage(EMPTY_BUFFER));
f = channel().writeAndFlush(fullHttpResponse != null ? fullHttpResponse : newFullBodyMessage(EMPTY_BUFFER));
}
else if (markSentBody()) {
HttpHeaders trailerHeaders = null;
Expand Down Expand Up @@ -790,35 +911,29 @@ else if (markSentBody()) {
}
else {
discard();
terminate();
return;
}
f.addListener(s -> {
discard();
if (!s.isSuccess() && log.isDebugEnabled()) {
log.debug(format(channel(), "Failed flushing last frame"), s.cause());
}
});

f.addListener(this);
}

static void cleanHandlerTerminate(Channel ch) {
ChannelOperations<?, ?> ops = get(ch);

if (ops == null) {
return;
}

ops.discard();

//Try to defer the disposing to leave a chance for any synchronous complete following this callback
if (!ops.isSubscriptionDisposed()) {
ch.eventLoop()
.execute(((HttpServerOperations) ops)::terminate);
@Override
public void operationComplete(io.netty.util.concurrent.Future<? super Void> future) {
if (!future.isSuccess()) {
if (log.isDebugEnabled()) {
log.debug(format(channel(), "Sending last HTTP packet was not successful, terminating the channel"),
future.cause());
}
}
else {
//if already disposed, we can immediately call terminate
((HttpServerOperations) ops).terminate();
if (log.isDebugEnabled()) {
log.debug(format(channel(), "Last HTTP packet was sent, terminating the channel"));
}
}

discard();

terminate();
}

static long requestsCounter(Channel channel) {
Expand Down Expand Up @@ -896,8 +1011,15 @@ else if (cause instanceof TooLongHttpHeaderException) {
}
}

//"FutureReturnValueIgnored" this is deliberate
ctx.channel().writeAndFlush(response);
if (ops instanceof HttpServerOperations) {
HttpServerOperations serverOps = (HttpServerOperations) ops;
serverOps.fullHttpResponse = response;
serverOps.onComplete();
}
else {
//"FutureReturnValueIgnored" this is deliberate
ctx.channel().writeAndFlush(response);
}

listener.onStateChange(ops, REQUEST_DECODING_FAILED);
}
Expand All @@ -922,13 +1044,15 @@ protected void onOutboundError(Throwable err) {
nettyResponse.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
responseHeaders.set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE);
channel().writeAndFlush(newFullBodyMessage(EMPTY_BUFFER))
.addListener(this)
.addListener(ChannelFutureListener.CLOSE);
return;
}

markSentBody();
log.error(format(channel(), "Error finishing response. Closing connection"), err);
channel().writeAndFlush(EMPTY_BUFFER)
.addListener(this)
.addListener(ChannelFutureListener.CLOSE);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.util.function.BiPredicate;

import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
Expand Down Expand Up @@ -62,8 +61,7 @@
* Replace {@link io.netty.handler.codec.http.HttpServerKeepAliveHandler} with extra
* handler management.
*/
final class HttpTrafficHandler extends ChannelDuplexHandler
implements Runnable, ChannelFutureListener {
final class HttpTrafficHandler extends ChannelDuplexHandler implements Runnable {

static final String MULTIPART_PREFIX = "multipart";

Expand Down Expand Up @@ -339,13 +337,11 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
pendingResponses);
}
ctx.write(msg, promise.unvoid())
.addListener(this)
.addListener(ChannelFutureListener.CLOSE);
return;
}

ctx.write(msg, promise.unvoid())
.addListener(this);
ctx.write(msg, promise);

if (!persistentConnection) {
return;
Expand Down Expand Up @@ -457,25 +453,6 @@ public void run() {
overflow = false;
}

@Override
public void operationComplete(ChannelFuture future) {
if (!future.isSuccess()) {
if (HttpServerOperations.log.isDebugEnabled()) {
HttpServerOperations.log.debug(format(future.channel(),
"Sending last HTTP packet was not successful, terminating the channel"),
future.cause());
}
}
else {
if (HttpServerOperations.log.isDebugEnabled()) {
HttpServerOperations.log.debug(format(future.channel(),
"Last HTTP packet was sent, terminating the channel"));
}
}

HttpServerOperations.cleanHandlerTerminate(future.channel());
}

@Override
public void handlerRemoved(ChannelHandlerContext ctx) {
discard();
Expand Down
Loading

0 comments on commit b936723

Please sign in to comment.