diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/HttpOperations.java b/reactor-netty-http/src/main/java/reactor/netty/http/HttpOperations.java index 39c8917263..42eacc8a4b 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/HttpOperations.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/HttpOperations.java @@ -421,7 +421,7 @@ else if (!SCHEME_PATTERN.matcher(tempUri).matches()) { */ protected abstract HttpMessage outboundHttpMessage(); - HttpMessage prepareHttpMessage(ByteBuf buffer) { + protected HttpMessage prepareHttpMessage(ByteBuf buffer) { HttpMessage msg; if (HttpUtil.getContentLength(outboundHttpMessage(), -1) == 0 || isContentAlwaysEmpty()) { diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/server/Http2StreamBridgeServerHandler.java b/reactor-netty-http/src/main/java/reactor/netty/http/server/Http2StreamBridgeServerHandler.java index aba5a38dca..91bc96a341 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/server/Http2StreamBridgeServerHandler.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/server/Http2StreamBridgeServerHandler.java @@ -24,8 +24,6 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelDuplexHandler; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; import io.netty.handler.codec.DecoderResult; @@ -57,7 +55,7 @@ * * @author Violeta Georgieva */ -final class Http2StreamBridgeServerHandler extends ChannelDuplexHandler implements ChannelFutureListener { +final class Http2StreamBridgeServerHandler extends ChannelDuplexHandler { final BiPredicate compress; final ServerCookieDecoder cookieDecoder; @@ -196,31 +194,11 @@ else if (msg instanceof HttpResponse && HttpResponseStatus.CONTINUE.equals(((Htt } else { //"FutureReturnValueIgnored" this is deliberate - ChannelFuture f = ctx.write(msg, promise); + ctx.write(msg, promise); if (msg instanceof LastHttpContent) { pendingResponse = false; - f.addListener(this); ctx.read(); } } } - - @Override - public void operationComplete(ChannelFuture future) { - if (!future.isSuccess()) { - if (HttpServerOperations.log.isDebugEnabled()) { - HttpServerOperations.log.debug(format(future.channel(), - "Sending last HTTP packet was not successful, terminating the channel"), - future.cause()); - } - } - else { - if (HttpServerOperations.log.isDebugEnabled()) { - HttpServerOperations.log.debug(format(future.channel(), - "Last HTTP packet was sent, terminating the channel")); - } - } - - HttpServerOperations.cleanHandlerTerminate(future.channel()); - } } diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/server/Http3StreamBridgeServerHandler.java b/reactor-netty-http/src/main/java/reactor/netty/http/server/Http3StreamBridgeServerHandler.java index 326add2186..cad7446244 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/server/Http3StreamBridgeServerHandler.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/server/Http3StreamBridgeServerHandler.java @@ -18,7 +18,6 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; import io.netty.handler.codec.DecoderResult; @@ -49,7 +48,7 @@ import static reactor.netty.ReactorNetty.format; -final class Http3StreamBridgeServerHandler extends ChannelDuplexHandler implements ChannelFutureListener { +final class Http3StreamBridgeServerHandler extends ChannelDuplexHandler { final BiPredicate compress; final ServerCookieDecoder cookieDecoder; final ServerCookieEncoder cookieEncoder; @@ -179,29 +178,9 @@ else if (msg instanceof HttpResponse && HttpResponseStatus.CONTINUE.equals(((Htt ChannelFuture f = ctx.write(msg, promise); if (msg instanceof LastHttpContent) { pendingResponse = false; - f.addListener(this) - .addListener(QuicStreamChannel.SHUTDOWN_OUTPUT); + f.addListener(QuicStreamChannel.SHUTDOWN_OUTPUT); ctx.read(); } } } - - @Override - public void operationComplete(ChannelFuture future) { - if (!future.isSuccess()) { - if (HttpServerOperations.log.isDebugEnabled()) { - HttpServerOperations.log.debug(format(future.channel(), - "Sending last HTTP packet was not successful, terminating the channel"), - future.cause()); - } - } - else { - if (HttpServerOperations.log.isDebugEnabled()) { - HttpServerOperations.log.debug(format(future.channel(), - "Last HTTP packet was sent, terminating the channel")); - } - } - - HttpServerOperations.cleanHandlerTerminate(future.channel()); - } } diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServerOperations.java b/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServerOperations.java index c2f9741180..9bda3c1f1f 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServerOperations.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServerOperations.java @@ -74,6 +74,7 @@ import io.netty.handler.timeout.ReadTimeoutHandler; import io.netty.util.AsciiString; import io.netty.util.ReferenceCountUtil; +import io.netty.util.concurrent.GenericFutureListener; import org.reactivestreams.Publisher; import org.reactivestreams.Subscription; import reactor.core.CoreSubscriber; @@ -109,7 +110,7 @@ * @author Stephane Maldini1 */ class HttpServerOperations extends HttpOperations - implements HttpServerRequest, HttpServerResponse { + implements HttpServerRequest, HttpServerResponse, GenericFutureListener> { final BiPredicate configuredCompressionPredicate; final ConnectionInfo connectionInfo; @@ -133,6 +134,7 @@ class HttpServerOperations extends HttpOperations requestTimeoutFuture; Consumer trailerHeadersConsumer; + HttpMessage fullHttpResponse; volatile Context currentContext; @@ -148,6 +150,7 @@ class HttpServerOperations extends HttpOperations source) { + if (!channel().isActive()) { + return then(Mono.error(AbortedException.beforeSend())); + } + if (source instanceof Mono) { + return new PostHeadersNettyOutbound(((Mono) source) + .flatMap(b -> { + if (!hasSentHeaders()) { + try { + fullHttpResponse = prepareHttpMessage(b); + + afterMarkSentHeaders(); + } + catch (RuntimeException e) { + b.release(); + return Mono.error(e); + } + + onComplete(); + return Mono.empty(); + } + + if (log.isDebugEnabled()) { + log.debug(format(channel(), "Dropped HTTP content, since response has been sent already: {}"), b); + } + b.release(); + return Mono.empty(); + }) + .doOnDiscard(ByteBuf.class, ByteBuf::release), this, null); + } + return super.send(source); + } + + @Override + public NettyOutbound sendObject(Object message) { + if (!channel().isActive()) { + ReactorNetty.safeRelease(message); + return then(Mono.error(AbortedException.beforeSend())); + } + if (message instanceof ByteBuf) { + ByteBuf b = (ByteBuf) message; + return new PostHeadersNettyOutbound(Mono.create(sink -> { + if (!hasSentHeaders()) { + try { + fullHttpResponse = prepareHttpMessage(b); + + afterMarkSentHeaders(); + } + catch (RuntimeException e) { + // If afterMarkSentHeaders throws an exception there is no need to release the ByteBuf here. + // It will be released by PostHeadersNettyOutbound as there are on error/cancel hooks + sink.error(e); + return; + } + + onComplete(); + sink.success(); + } + else { + if (log.isDebugEnabled()) { + log.debug(format(channel(), "Dropped HTTP content, since response has been sent already: {}"), b); + } + b.release(); + sink.success(); + } + }), this, b); + } + return super.sendObject(message); + } + @Override public Mono send() { - return FutureMono.deferFuture(() -> markSentHeaderAndBody() ? - channel().writeAndFlush(newFullBodyMessage(EMPTY_BUFFER)) : - channel().newSucceededFuture()); + return Mono.create(sink -> { + if (!hasSentHeaders()) { + onComplete(); + sink.success(); + } + else { + if (log.isDebugEnabled()) { + log.debug(format(channel(), "Response has been sent already.")); + } + sink.success(); + } + }); } @Override @@ -575,6 +664,46 @@ public HttpServerResponse status(HttpResponseStatus status) { return this; } + @Override + public Mono then() { + if (!channel().isActive()) { + return Mono.error(AbortedException.beforeSend()); + } + + if (hasSentHeaders()) { + return Mono.empty(); + } + + return FutureMono.deferFuture(() -> { + if (!hasSentHeaders()) { + beforeMarkSentHeaders(); + + HttpMessage msg = outboundHttpMessage(); + boolean last = false; + int contentLength = HttpUtil.getContentLength(msg, -1); + if (contentLength == 0 || isContentAlwaysEmpty()) { + last = true; + msg = newFullBodyMessage(Unpooled.EMPTY_BUFFER); + } + else if (contentLength > 0) { + responseHeaders.remove(HttpHeaderNames.TRANSFER_ENCODING); + } + + afterMarkSentHeaders(); + + if (!last) { + return markSentHeaders() ? channel().writeAndFlush(msg) : channel().newSucceededFuture(); + } + else { + return markSentHeaderAndBody() ? channel().writeAndFlush(msg) : channel().newSucceededFuture(); + } + } + else { + return channel().newSucceededFuture(); + } + }); + } + @Override public HttpServerResponse trailerHeaders(Consumer trailerHeaders) { this.trailerHeadersConsumer = Objects.requireNonNull(trailerHeaders, "trailerHeaders"); @@ -749,11 +878,10 @@ protected void onOutboundComplete() { } if (markSentHeaderAndBody()) { if (log.isDebugEnabled()) { - log.debug(format(channel(), "No sendHeaders() called before complete, sending " + - "zero-length header")); + log.debug(format(channel(), "Headers are not sent before onComplete().")); } - f = channel().writeAndFlush(newFullBodyMessage(EMPTY_BUFFER)); + f = channel().writeAndFlush(fullHttpResponse != null ? fullHttpResponse : newFullBodyMessage(EMPTY_BUFFER)); } else if (markSentBody()) { HttpHeaders trailerHeaders = null; @@ -790,35 +918,29 @@ else if (markSentBody()) { } else { discard(); + terminate(); return; } - f.addListener(s -> { - discard(); - if (!s.isSuccess() && log.isDebugEnabled()) { - log.debug(format(channel(), "Failed flushing last frame"), s.cause()); - } - }); - + f.addListener(this); } - static void cleanHandlerTerminate(Channel ch) { - ChannelOperations ops = get(ch); - - if (ops == null) { - return; - } - - ops.discard(); - - //Try to defer the disposing to leave a chance for any synchronous complete following this callback - if (!ops.isSubscriptionDisposed()) { - ch.eventLoop() - .execute(((HttpServerOperations) ops)::terminate); + @Override + public void operationComplete(io.netty.util.concurrent.Future future) { + if (!future.isSuccess()) { + if (log.isDebugEnabled()) { + log.debug(format(channel(), "Sending last HTTP packet was not successful, terminating the channel"), + future.cause()); + } } else { - //if already disposed, we can immediately call terminate - ((HttpServerOperations) ops).terminate(); + if (log.isDebugEnabled()) { + log.debug(format(channel(), "Last HTTP packet was sent, terminating the channel")); + } } + + discard(); + + terminate(); } static long requestsCounter(Channel channel) { @@ -896,8 +1018,15 @@ else if (cause instanceof TooLongHttpHeaderException) { } } - //"FutureReturnValueIgnored" this is deliberate - ctx.channel().writeAndFlush(response); + if (ops instanceof HttpServerOperations) { + HttpServerOperations serverOps = (HttpServerOperations) ops; + serverOps.fullHttpResponse = response; + serverOps.onComplete(); + } + else { + //"FutureReturnValueIgnored" this is deliberate + ctx.channel().writeAndFlush(response); + } listener.onStateChange(ops, REQUEST_DECODING_FAILED); } @@ -922,6 +1051,7 @@ protected void onOutboundError(Throwable err) { nettyResponse.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR); responseHeaders.set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE); channel().writeAndFlush(newFullBodyMessage(EMPTY_BUFFER)) + .addListener(this) .addListener(ChannelFutureListener.CLOSE); return; } @@ -929,6 +1059,7 @@ protected void onOutboundError(Throwable err) { markSentBody(); log.error(format(channel(), "Error finishing response. Closing connection"), err); channel().writeAndFlush(EMPTY_BUFFER) + .addListener(this) .addListener(ChannelFutureListener.CLOSE); } diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpTrafficHandler.java b/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpTrafficHandler.java index bcdf484422..07ba9a170f 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpTrafficHandler.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpTrafficHandler.java @@ -24,7 +24,6 @@ import java.util.function.BiPredicate; import io.netty.channel.ChannelDuplexHandler; -import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; @@ -62,8 +61,7 @@ * Replace {@link io.netty.handler.codec.http.HttpServerKeepAliveHandler} with extra * handler management. */ -final class HttpTrafficHandler extends ChannelDuplexHandler - implements Runnable, ChannelFutureListener { +final class HttpTrafficHandler extends ChannelDuplexHandler implements Runnable { static final String MULTIPART_PREFIX = "multipart"; @@ -340,13 +338,11 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) pendingResponses); } ctx.write(msg, promise.unvoid()) - .addListener(this) .addListener(ChannelFutureListener.CLOSE); return; } - ctx.write(msg, promise.unvoid()) - .addListener(this); + ctx.write(msg, promise); if (!persistentConnection) { return; @@ -459,25 +455,6 @@ public void run() { overflow = false; } - @Override - public void operationComplete(ChannelFuture future) { - if (!future.isSuccess()) { - if (HttpServerOperations.log.isDebugEnabled()) { - HttpServerOperations.log.debug(format(future.channel(), - "Sending last HTTP packet was not successful, terminating the channel"), - future.cause()); - } - } - else { - if (HttpServerOperations.log.isDebugEnabled()) { - HttpServerOperations.log.debug(format(future.channel(), - "Last HTTP packet was sent, terminating the channel")); - } - } - - HttpServerOperations.cleanHandlerTerminate(future.channel()); - } - @Override public void handlerRemoved(ChannelHandlerContext ctx) { discard(); diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/HttpCompressionClientServerTests.java b/reactor-netty-http/src/test/java/reactor/netty/http/HttpCompressionClientServerTests.java index b7e4cfb39f..6c49cd0388 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/http/HttpCompressionClientServerTests.java +++ b/reactor-netty-http/src/test/java/reactor/netty/http/HttpCompressionClientServerTests.java @@ -34,6 +34,7 @@ import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.codec.compression.Brotli; import io.netty.handler.codec.compression.Zstd; +import io.netty.handler.codec.http.HttpHeaderNames; import io.netty.handler.codec.http.HttpHeaders; import io.netty.handler.codec.http.LastHttpContent; import io.netty.handler.ssl.util.InsecureTrustManagerFactory; @@ -276,12 +277,14 @@ void serverCompressionEnabledSmallResponse(HttpServer server, HttpClient client) .get("/2", (in, out) -> out.sendString(Mono.just("reply"))) .get("/3", (in, out) -> out.header("content-length", "5") //explicit 'content-length' .sendObject(Unpooled.wrappedBuffer("reply".getBytes(Charset.defaultCharset())))) - .get("/4", (in, out) -> out.sendObject(Unpooled.wrappedBuffer("reply".getBytes(Charset.defaultCharset()))))) + .get("/4", (in, out) -> out.sendObject(Unpooled.wrappedBuffer("reply".getBytes(Charset.defaultCharset())))) + .get("/5", (in, out) -> out.header("content-length", "5") //explicit 'content-length')) + .sendString(Flux.just("r", "e", "p", "l", "y")))) .bindNow(Duration.ofSeconds(10)); //don't activate compression on the client options to avoid auto-handling (which removes the header) //edit the header manually to attempt to trigger compression on server side - Flux.range(1, 4) + Flux.range(1, 5) .flatMap(i -> client.port(disposableServer.port()) .headers(h -> h.add("Accept-Encoding", "gzip")) @@ -593,6 +596,14 @@ void testIssue825SendObject() { doTestIssue825_2((b, out) -> out.sendObject(b)); } + @Test + void testIssue825SendHeaders() { + doTestIssue825_2((b, out) -> { + b.release(); + return out.header(HttpHeaderNames.CONTENT_LENGTH, "0").sendHeaders(); + }); + } + private void doTestIssue825_2(BiFunction> serverFn) { int port1 = SocketUtils.findAvailableTcpPort(); int port2 = SocketUtils.findAvailableTcpPort(); @@ -650,8 +661,9 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) { .compress(true) .get() .uri("/") - .responseContent()) - .expectError() + .response((res, bytes) -> Mono.just(res.status().code()))) + .expectNext(500) + .expectComplete() .verify(Duration.ofSeconds(30)); bufferReleased.asMono() diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/client/HttpClientTest.java b/reactor-netty-http/src/test/java/reactor/netty/http/client/HttpClientTest.java index 835b5246ee..5b8eafc93e 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/http/client/HttpClientTest.java +++ b/reactor-netty-http/src/test/java/reactor/netty/http/client/HttpClientTest.java @@ -3180,7 +3180,7 @@ public void testSharedNameResolver_NotSharedClientNoConnectionPool() throws Inte @Test void testHttpClientCancelled() throws InterruptedException { // logged by the server when last http packet is sent and channel is terminated - String serverCancelledLog = "[HttpServer] Channel inbound receiver cancelled (operation cancelled)."; + String serverCancelledLog = "[HttpServer] Channel inbound receiver cancelled (subscription disposed)."; // logged by client when cancelled while receiving response String clientCancelledLog = HttpClientOperations.INBOUND_CANCEL_LOG; diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/server/HttpServerOutboundCompleteTest.java b/reactor-netty-http/src/test/java/reactor/netty/http/server/HttpServerOutboundCompleteTest.java new file mode 100644 index 0000000000..9d7ed779dd --- /dev/null +++ b/reactor-netty-http/src/test/java/reactor/netty/http/server/HttpServerOutboundCompleteTest.java @@ -0,0 +1,495 @@ +/* + * Copyright (c) 2024 VMware, Inc. or its affiliates, All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package reactor.netty.http.server; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpContent; +import io.netty.handler.codec.http.HttpHeaderNames; +import io.netty.handler.codec.http.HttpMethod; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.LastHttpContent; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.publisher.Signal; +import reactor.netty.BaseHttpTest; +import reactor.netty.DisposableServer; +import reactor.netty.http.HttpProtocol; +import reactor.netty.http.client.HttpClient; +import reactor.test.StepVerifier; +import reactor.util.annotation.Nullable; + +import java.nio.charset.Charset; +import java.time.Duration; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; + +import static io.netty.buffer.Unpooled.EMPTY_BUFFER; +import static reactor.netty.NettyPipeline.HttpTrafficHandler; +import static org.assertj.core.api.Assertions.assertThat; + +class HttpServerOutboundCompleteTest extends BaseHttpTest { + static final String REPEAT = createString(1024); + static final String EXPECTED_REPEAT = createString(4096); + + @ParameterizedTest + @EnumSource(value = HttpProtocol.class, names = {"HTTP11", "H2C"}) + void httpGetRespondsSend(HttpProtocol protocol) throws Exception { + CountDownLatch latch = new CountDownLatch(4); + EventsRecorder recorder = new EventsRecorder(latch); + disposableServer = createServer(recorder, protocol, + r -> r.get("/1", (req, res) -> res.send().doOnEach(recorder).doOnCancel(recorder))); + + sendGetRequest(disposableServer.port(), protocol) + .as(StepVerifier::create) + .expectNext(Collections.emptyList()) + .expectComplete() + .verify(Duration.ofSeconds(5)); + + assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue(); + assertThat(recorder.bufferIsReleased.get()).isEqualTo(1); + assertThat(recorder.fullResponseIsSent.get()).isEqualTo(1); + assertThat(recorder.onCompleteIsReceived.get()).isEqualTo(1); + assertThat(recorder.onTerminateIsReceived.get()).isEqualTo(1); + } + + @ParameterizedTest + @EnumSource(value = HttpProtocol.class, names = {"HTTP11", "H2C"}) + void httpGetRespondsSendFlux(HttpProtocol protocol) throws Exception { + CountDownLatch latch = new CountDownLatch(3); + EventsRecorder recorder = new EventsRecorder(latch); + disposableServer = createServer(recorder, protocol, + r -> r.get("/1", (req, res) -> res.sendString(Flux.just(REPEAT, REPEAT, REPEAT, REPEAT).doOnEach(recorder).doOnCancel(recorder)))); + + sendGetRequest(disposableServer.port(), protocol) + .as(StepVerifier::create) + .expectNext(Collections.singletonList(EXPECTED_REPEAT)) + .expectComplete() + .verify(Duration.ofSeconds(5)); + + assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue(); + assertThat(recorder.bufferIsReleased.get()).isEqualTo(1); + assertThat(recorder.fullResponseIsSent.get()).isEqualTo(0); + assertThat(recorder.onCompleteIsReceived.get()).isEqualTo(1); + assertThat(recorder.onTerminateIsReceived.get()).isEqualTo(1); + } + + @ParameterizedTest + @EnumSource(value = HttpProtocol.class, names = {"HTTP11", "H2C"}) + void httpGetRespondsSendFluxContentAlwaysEmpty(HttpProtocol protocol) throws Exception { + CountDownLatch latch = new CountDownLatch(4); + EventsRecorder recorder = new EventsRecorder(latch); + disposableServer = createServer(recorder, protocol, + r -> r.get("/1", (req, res) -> res.status(HttpResponseStatus.NO_CONTENT) + .sendString(Flux.just(REPEAT, REPEAT, REPEAT, REPEAT).doOnEach(recorder).doOnCancel(recorder)))); + + sendGetRequest(disposableServer.port(), protocol) + .as(StepVerifier::create) + .expectNext(Collections.emptyList()) + .expectComplete() + .verify(Duration.ofSeconds(5)); + + assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue(); + assertThat(recorder.bufferIsReleased.get()).isEqualTo(1); + assertThat(recorder.fullResponseIsSent.get()).isEqualTo(1); + assertThat(recorder.onCompleteIsReceived.get()).isEqualTo(1); + assertThat(recorder.onTerminateIsReceived.get()).isEqualTo(1); + } + + @ParameterizedTest + @EnumSource(value = HttpProtocol.class, names = {"HTTP11", "H2C"}) + void httpGetRespondsSendFluxContentLengthZero(HttpProtocol protocol) throws Exception { + CountDownLatch latch = new CountDownLatch(4); + EventsRecorder recorder = new EventsRecorder(latch); + disposableServer = createServer(recorder, protocol, + r -> r.get("/1", (req, res) -> res.header(HttpHeaderNames.CONTENT_LENGTH, "0") + .sendString(Flux.just(REPEAT, REPEAT, REPEAT, REPEAT).doOnEach(recorder).doOnCancel(recorder)))); + + sendGetRequest(disposableServer.port(), protocol) + .as(StepVerifier::create) + .expectNext(Collections.emptyList()) + .expectComplete() + .verify(Duration.ofSeconds(5)); + + assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue(); + assertThat(recorder.bufferIsReleased.get()).isEqualTo(1); + assertThat(recorder.fullResponseIsSent.get()).isEqualTo(1); + assertThat(recorder.onCompleteIsReceived.get()).isEqualTo(1); + assertThat(recorder.onTerminateIsReceived.get()).isEqualTo(1); + } + + @ParameterizedTest + @EnumSource(value = HttpProtocol.class, names = {"HTTP11", "H2C"}) + void httpGetRespondsSendHeaders(HttpProtocol protocol) throws Exception { + CountDownLatch latch = new CountDownLatch(3); + EventsRecorder recorder = new EventsRecorder(latch); + disposableServer = createServer(recorder, protocol, + r -> r.get("/1", (req, res) -> res.sendHeaders().then().doOnEach(recorder).doOnCancel(recorder))); + + sendGetRequest(disposableServer.port(), protocol) + .as(StepVerifier::create) + .expectNext(Collections.emptyList()) + .expectComplete() + .verify(Duration.ofSeconds(5)); + + assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue(); + assertThat(recorder.bufferIsReleased.get()).isEqualTo(1); + assertThat(recorder.fullResponseIsSent.get()).isEqualTo(0); + assertThat(recorder.onCompleteIsReceived.get()).isEqualTo(1); + assertThat(recorder.onTerminateIsReceived.get()).isEqualTo(1); + } + + @ParameterizedTest + @EnumSource(value = HttpProtocol.class, names = {"HTTP11", "H2C"}) + void httpGetRespondsSendMono(HttpProtocol protocol) throws Exception { + CountDownLatch latch = new CountDownLatch(4); + EventsRecorder recorder = new EventsRecorder(latch); + disposableServer = createServer(recorder, protocol, + r -> r.get("/1", (req, res) -> res.sendString(Mono.just(REPEAT).doOnEach(recorder).doOnCancel(recorder)))); + + sendGetRequest(disposableServer.port(), protocol) + .as(StepVerifier::create) + .expectNext(Collections.singletonList(REPEAT)) + .expectComplete() + .verify(Duration.ofSeconds(5)); + + assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue(); + assertThat(recorder.bufferIsReleased.get()).isEqualTo(1); + assertThat(recorder.fullResponseIsSent.get()).isEqualTo(1); + assertThat(recorder.onCompleteIsReceived.get()).isEqualTo(1); + assertThat(recorder.onTerminateIsReceived.get()).isEqualTo(1); + } + + @ParameterizedTest + @EnumSource(value = HttpProtocol.class, names = {"HTTP11", "H2C"}) + void httpGetRespondsSendMonoEmpty(HttpProtocol protocol) throws Exception { + CountDownLatch latch = new CountDownLatch(4); + EventsRecorder recorder = new EventsRecorder(latch); + disposableServer = createServer(recorder, protocol, + r -> r.get("/1", (req, res) -> Mono.empty().doOnEach(recorder).doOnCancel(recorder))); + + sendGetRequest(disposableServer.port(), protocol) + .as(StepVerifier::create) + .expectNext(Collections.emptyList()) + .expectComplete() + .verify(Duration.ofSeconds(5)); + + assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue(); + assertThat(recorder.bufferIsReleased.get()).isEqualTo(1); + assertThat(recorder.fullResponseIsSent.get()).isEqualTo(1); + assertThat(recorder.onCompleteIsReceived.get()).isEqualTo(1); + assertThat(recorder.onTerminateIsReceived.get()).isEqualTo(1); + } + + @ParameterizedTest + @EnumSource(value = HttpProtocol.class, names = {"HTTP11", "H2C"}) + void httpGetRespondsSendObject(HttpProtocol protocol) throws Exception { + CountDownLatch latch = new CountDownLatch(4); + EventsRecorder recorder = new EventsRecorder(latch); + disposableServer = createServer(recorder, protocol, + r -> r.get("/1", (req, res) -> res.sendObject(Unpooled.wrappedBuffer(REPEAT.getBytes(Charset.defaultCharset()))) + .then().doOnEach(recorder).doOnCancel(recorder))); + + sendGetRequest(disposableServer.port(), protocol) + .as(StepVerifier::create) + .expectNext(Collections.singletonList(REPEAT)) + .expectComplete() + .verify(Duration.ofSeconds(5)); + + assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue(); + assertThat(recorder.bufferIsReleased.get()).isEqualTo(1); + assertThat(recorder.fullResponseIsSent.get()).isEqualTo(1); + assertThat(recorder.onCompleteIsReceived.get()).isEqualTo(1); + assertThat(recorder.onTerminateIsReceived.get()).isEqualTo(1); + } + + @ParameterizedTest + @EnumSource(value = HttpProtocol.class, names = {"HTTP11", "H2C"}) + void httpPostRespondsSend(HttpProtocol protocol) throws Exception { + CountDownLatch latch = new CountDownLatch(8); + EventsRecorder recorder = new EventsRecorder(latch); + disposableServer = createServer(recorder, protocol, + r -> r.post("/1", (req, res) -> res.send().doOnEach(recorder).doOnCancel(recorder)) + .post("/2", (req, res) -> req.receive().then(res.send().doOnEach(recorder).doOnCancel(recorder)))); + + sendPostRequest(disposableServer.port(), protocol) + .as(StepVerifier::create) + .expectNext(Collections.emptyList()) + .expectComplete() + .verify(Duration.ofSeconds(5)); + + assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue(); + assertThat(recorder.bufferIsReleased.get()).isEqualTo(2); + assertThat(recorder.fullResponseIsSent.get()).isEqualTo(2); + assertThat(recorder.onCompleteIsReceived.get()).isEqualTo(2); + assertThat(recorder.onTerminateIsReceived.get()).isEqualTo(2); + } + + @ParameterizedTest + @EnumSource(value = HttpProtocol.class, names = {"HTTP11", "H2C"}) + void httpPostRespondsSendFlux(HttpProtocol protocol) throws Exception { + CountDownLatch latch = new CountDownLatch(5); + EventsRecorder recorder = new EventsRecorder(latch); + disposableServer = createServer(recorder, protocol, + r -> r.post("/1", (req, res) -> res.sendString(Flux.just(REPEAT, REPEAT, REPEAT, REPEAT).doOnEach(recorder).doOnCancel(recorder))) + .post("/2", (req, res) -> res.send(req.receive().retain()).then().doOnEach(recorder).doOnCancel(recorder))); + + sendPostRequest(disposableServer.port(), protocol) + .as(StepVerifier::create) + .expectNext(Arrays.asList(EXPECTED_REPEAT, EXPECTED_REPEAT)) + .expectComplete() + .verify(Duration.ofSeconds(5)); + + assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue(); + assertThat(recorder.bufferIsReleased.get()).isEqualTo(1); + assertThat(recorder.fullResponseIsSent.get()).isEqualTo(0); + assertThat(recorder.onCompleteIsReceived.get()).isEqualTo(2); + assertThat(recorder.onTerminateIsReceived.get()).isEqualTo(2); + } + + @ParameterizedTest + @EnumSource(value = HttpProtocol.class, names = {"HTTP11", "H2C"}) + void httpPostRespondsSendHeaders(HttpProtocol protocol) throws Exception { + CountDownLatch latch = new CountDownLatch(6); + EventsRecorder recorder = new EventsRecorder(latch); + disposableServer = createServer(recorder, protocol, + r -> r.post("/1", (req, res) -> res.sendHeaders().then().doOnEach(recorder).doOnCancel(recorder)) + .post("/2", (req, res) -> req.receive().then(res.sendHeaders().then().doOnEach(recorder).doOnCancel(recorder)))); + + sendPostRequest(disposableServer.port(), protocol) + .as(StepVerifier::create) + .expectNext(Collections.emptyList()) + .expectComplete() + .verify(Duration.ofSeconds(5)); + + assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue(); + assertThat(recorder.bufferIsReleased.get()).isEqualTo(2); + assertThat(recorder.fullResponseIsSent.get()).isEqualTo(0); + assertThat(recorder.onCompleteIsReceived.get()).isEqualTo(2); + assertThat(recorder.onTerminateIsReceived.get()).isEqualTo(2); + } + + @ParameterizedTest + @EnumSource(value = HttpProtocol.class, names = {"HTTP11", "H2C"}) + void httpPostRespondsSendMono(HttpProtocol protocol) throws Exception { + CountDownLatch latch = new CountDownLatch(7); + EventsRecorder recorder = new EventsRecorder(latch); + disposableServer = createServer(recorder, protocol, + r -> r.post("/1", (req, res) -> res.sendString(Mono.just(REPEAT).doOnEach(recorder).doOnCancel(recorder))) + .post("/2", (req, res) -> res.send(req.receive().aggregate().retain()).then().doOnEach(recorder).doOnCancel(recorder))); + + sendPostRequest(disposableServer.port(), protocol) + .as(StepVerifier::create) + .expectNext(Arrays.asList(REPEAT, EXPECTED_REPEAT)) + .expectComplete() + .verify(Duration.ofSeconds(5)); + + assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue(); + assertThat(recorder.bufferIsReleased.get()).isEqualTo(1); + assertThat(recorder.fullResponseIsSent.get()).isEqualTo(2); + assertThat(recorder.onCompleteIsReceived.get()).isEqualTo(2); + assertThat(recorder.onTerminateIsReceived.get()).isEqualTo(2); + } + + @ParameterizedTest + @EnumSource(value = HttpProtocol.class, names = {"HTTP11", "H2C"}) + void httpPostRespondsSendMonoEmpty(HttpProtocol protocol) throws Exception { + CountDownLatch latch = new CountDownLatch(8); + EventsRecorder recorder = new EventsRecorder(latch); + disposableServer = createServer(recorder, protocol, + r -> r.post("/1", (req, res) -> Mono.empty().doOnEach(recorder).doOnCancel(recorder)) + .post("/2", (req, res) -> req.receive().then(Mono.empty().doOnEach(recorder).doOnCancel(recorder)))); + + sendPostRequest(disposableServer.port(), protocol) + .as(StepVerifier::create) + .expectNext(Collections.emptyList()) + .expectComplete() + .verify(Duration.ofSeconds(5)); + + assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue(); + assertThat(recorder.bufferIsReleased.get()).isEqualTo(2); + assertThat(recorder.fullResponseIsSent.get()).isEqualTo(2); + assertThat(recorder.onCompleteIsReceived.get()).isEqualTo(2); + assertThat(recorder.onTerminateIsReceived.get()).isEqualTo(2); + } + + @ParameterizedTest + @EnumSource(value = HttpProtocol.class, names = {"HTTP11", "H2C"}) + void httpPostRespondsSendObject(HttpProtocol protocol) throws Exception { + CountDownLatch latch = new CountDownLatch(8); + EventsRecorder recorder = new EventsRecorder(latch); + disposableServer = createServer(recorder, protocol, + r -> r.post("/1", (req, res) -> res.sendObject(Unpooled.wrappedBuffer(REPEAT.getBytes(Charset.defaultCharset()))) + .then().doOnEach(recorder).doOnCancel(recorder)) + .post("/2", (req, res) -> req.receive().then(res.sendObject(Unpooled.wrappedBuffer(REPEAT.getBytes(Charset.defaultCharset()))) + .then().doOnEach(recorder).doOnCancel(recorder)))); + + sendPostRequest(disposableServer.port(), protocol) + .as(StepVerifier::create) + .expectNext(Arrays.asList(REPEAT, REPEAT)) + .expectComplete() + .verify(Duration.ofSeconds(5)); + + assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue(); + assertThat(recorder.bufferIsReleased.get()).isEqualTo(2); + assertThat(recorder.fullResponseIsSent.get()).isEqualTo(2); + assertThat(recorder.onCompleteIsReceived.get()).isEqualTo(2); + assertThat(recorder.onTerminateIsReceived.get()).isEqualTo(2); + } + + static DisposableServer createServer(EventsRecorder recorder, HttpProtocol protocol, Consumer routes) { + return createServer() + .protocol(protocol) + .doOnChannelInit((obs, ch, addr) -> { + if (protocol == HttpProtocol.HTTP11) { + ch.pipeline().addBefore(HttpTrafficHandler, "eventsRecorderHandler", new EventsRecorderHandler(recorder)); + } + }) + .doOnConnection(conn -> { + conn.onTerminate().subscribe(null, null, recorder::recordOnTerminateIsReceived); + if (protocol == HttpProtocol.H2C) { + conn.channel().pipeline().addBefore(HttpTrafficHandler, "eventsRecorderHandler", new EventsRecorderHandler(recorder)); + } + }) + .route(routes) + .bindNow(); + } + + static String createString(int length) { + char[] chars = new char[length]; + Arrays.fill(chars, 'm'); + return new String(chars); + } + + static Mono> sendGetRequest(int port, HttpProtocol protocol) { + return sendRequest(port, protocol, HttpMethod.GET, 1, null); + } + + static Mono> sendPostRequest(int port, HttpProtocol protocol) { + return sendRequest(port, protocol, HttpMethod.POST, 2, Flux.just(REPEAT, REPEAT, REPEAT, REPEAT)); + } + + static Mono> sendRequest(int port, HttpProtocol protocol, HttpMethod method, int numRequests, + @Nullable Publisher body) { + HttpClient client = createClient(port).protocol(protocol); + return Flux.range(1, numRequests) + .flatMap(i -> + client.request(method) + .uri("/" + i) + .send((req, out) -> body != null ? out.sendString(body) : out) + .responseContent() + .aggregate() + .asString()) + .collectList(); + } + + static class EventsRecorder implements Consumer>, Runnable { + final AtomicInteger bufferIsReleased = new AtomicInteger(); + final AtomicInteger fullResponseIsSent = new AtomicInteger(); + final AtomicInteger onCompleteIsReceived = new AtomicInteger(); + final AtomicInteger onTerminateIsReceived = new AtomicInteger(); + + final CountDownLatch latch; + + EventsRecorder(CountDownLatch latch) { + this.latch = latch; + } + + @Override + public void accept(Signal sig) { + if (sig.isOnComplete()) { + onCompleteIsReceived.incrementAndGet(); + latch.countDown(); + } + } + + @Override + public void run() { + onCompleteIsReceived.decrementAndGet(); + latch.countDown(); + } + + void recordBufferIsReleased() { + bufferIsReleased.incrementAndGet(); + latch.countDown(); + } + + void recordFullResponseIsSent() { + fullResponseIsSent.incrementAndGet(); + latch.countDown(); + } + + void recordOnTerminateIsReceived() { + onTerminateIsReceived.incrementAndGet(); + latch.countDown(); + } + } + + static class EventsRecorderHandler extends ChannelDuplexHandler { + final EventsRecorder recorder; + + int counter; + + EventsRecorderHandler(EventsRecorder recorder) { + this.recorder = recorder; + } + + @Override + @SuppressWarnings("ReferenceEquality") + public void channelRead(ChannelHandlerContext ctx, Object msg) { + ByteBuf content = null; + int expectedRefCount = 0; + if (msg instanceof HttpContent) { + content = ((HttpContent) msg).content(); + expectedRefCount = content.refCnt() - 1; + counter++; + } + + ctx.fireChannelRead(msg); + + // "ReferenceEquality" this is deliberate + if (content != null && (content == EMPTY_BUFFER || content.refCnt() == expectedRefCount)) { + counter--; + } + if (msg instanceof LastHttpContent && counter == 0) { + recorder.recordBufferIsReleased(); + } + } + + @Override + @SuppressWarnings("FutureReturnValueIgnored") + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { + if (msg instanceof FullHttpResponse) { + recorder.recordFullResponseIsSent(); + } + + // "FutureReturnValueIgnored" this is deliberate + ctx.write(msg, promise); + } + } +}