Skip to content

Commit

Permalink
Merge #3250 into 2.0.0-M4
Browse files Browse the repository at this point in the history
  • Loading branch information
violetagg committed May 28, 2024
2 parents 65cd7fe + c33825e commit 9f106da
Show file tree
Hide file tree
Showing 8 changed files with 679 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ else if (!SCHEME_PATTERN.matcher(tempUri).matches()) {
*/
protected abstract HttpMessage outboundHttpMessage();

HttpMessage prepareHttpMessage(Buffer buffer) {
protected HttpMessage prepareHttpMessage(Buffer buffer) {
HttpMessage msg;
if (HttpUtil.getContentLength(outboundHttpMessage(), -1) == 0 ||
isContentAlwaysEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.util.function.BiPredicate;

import io.netty5.buffer.Buffer;
import io.netty5.channel.Channel;
import io.netty5.channel.ChannelHandlerAdapter;
import io.netty5.channel.ChannelHandlerContext;
import io.netty5.handler.codec.DecoderResult;
Expand All @@ -36,7 +35,6 @@
import io.netty5.handler.ssl.SslHandler;
import io.netty5.util.Resource;
import io.netty5.util.concurrent.Future;
import io.netty5.util.concurrent.FutureContextListener;
import reactor.core.publisher.Mono;
import reactor.netty5.Connection;
import reactor.netty5.ConnectionObserver;
Expand All @@ -55,7 +53,7 @@
*
* @author Violeta Georgieva
*/
final class Http2StreamBridgeServerHandler extends ChannelHandlerAdapter implements FutureContextListener<Channel, Void> {
final class Http2StreamBridgeServerHandler extends ChannelHandlerAdapter {

final BiPredicate<HttpServerRequest, HttpServerResponse> compress;
final HttpServerFormDecoderProvider formDecoderProvider;
Expand Down Expand Up @@ -182,29 +180,9 @@ else if (msg instanceof HttpResponse && CONTINUE.equals(((HttpResponse) msg).sta
Future<Void> f = ctx.write(msg);
if (msg instanceof LastHttpContent) {
pendingResponse = false;
f.addListener(ctx.channel(), this);
ctx.read();
}
return f;
}
}

@Override
public void operationComplete(Channel channel, Future<? extends Void> future) {
if (!future.isSuccess()) {
if (HttpServerOperations.log.isDebugEnabled()) {
HttpServerOperations.log.debug(format(channel,
"Sending last HTTP packet was not successful, terminating the channel"),
future.cause());
}
}
else {
if (HttpServerOperations.log.isDebugEnabled()) {
HttpServerOperations.log.debug(format(channel,
"Last HTTP packet was sent, terminating the channel"));
}
}

HttpServerOperations.cleanHandlerTerminate(channel);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import io.netty5.util.AsciiString;
import io.netty5.util.Resource;
import io.netty5.util.concurrent.Future;
import io.netty5.util.concurrent.FutureContextListener;
import io.netty5.util.concurrent.FutureListener;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
Expand Down Expand Up @@ -107,7 +108,7 @@
* @author Stephane Maldini1
*/
class HttpServerOperations extends HttpOperations<HttpServerRequest, HttpServerResponse>
implements HttpServerRequest, HttpServerResponse {
implements HttpServerRequest, HttpServerResponse, FutureContextListener<Channel, Void> {

final BiPredicate<HttpServerRequest, HttpServerResponse> configuredCompressionPredicate;
final ConnectionInfo connectionInfo;
Expand All @@ -129,6 +130,7 @@ class HttpServerOperations extends HttpOperations<HttpServerRequest, HttpServerR
String path;
Future<Void> requestTimeoutFuture;
Consumer<? super HttpHeaders> trailerHeadersConsumer;
HttpMessage fullHttpResponse;

volatile Context currentContext;

Expand All @@ -142,6 +144,7 @@ class HttpServerOperations extends HttpOperations<HttpServerRequest, HttpServerR
this.formDecoderProvider = replaced.formDecoderProvider;
this.is100ContinueExpected = replaced.is100ContinueExpected;
this.isHttp2 = replaced.isHttp2;
this.fullHttpResponse = replaced.fullHttpResponse;
this.mapHandle = replaced.mapHandle;
this.nettyRequest = replaced.nettyRequest;
this.nettyResponse = replaced.nettyResponse;
Expand Down Expand Up @@ -358,7 +361,12 @@ final boolean isHttp2() {

@Override
public HttpServerResponse keepAlive(boolean keepAlive) {
HttpUtil.setKeepAlive(nettyResponse, keepAlive);
if (fullHttpResponse == null) {
HttpUtil.setKeepAlive(nettyResponse, keepAlive);
}
else {
HttpUtil.setKeepAlive(fullHttpResponse, keepAlive);
}
return this;
}

Expand Down Expand Up @@ -503,11 +511,91 @@ public ZonedDateTime timestamp() {
return timestamp;
}

@Override
@SuppressWarnings("unchecked")
public NettyOutbound send(Publisher<? extends Buffer> source) {
if (!channel().isActive()) {
return then(Mono.error(AbortedException.beforeSend()));
}
if (source instanceof Mono) {
return new PostHeadersNettyOutbound(((Mono<Buffer>) source)
.flatMap(b -> {
if (!hasSentHeaders()) {
try {
fullHttpResponse = prepareHttpMessage(b);

afterMarkSentHeaders();
}
catch (RuntimeException e) {
b.close();
return Mono.error(e);
}

onComplete();
return Mono.<Void>empty();
}

if (log.isDebugEnabled()) {
log.debug(format(channel(), "Dropped HTTP content, since response has been sent already: {}"), b);
}
b.close();
return Mono.empty();
})
.doOnDiscard(Buffer.class, Buffer::close), this, null);
}
return super.send(source);
}

@Override
public NettyOutbound sendObject(Object message) {
if (!channel().isActive()) {
ReactorNetty.safeRelease(message);
return then(Mono.error(AbortedException.beforeSend()));
}
if (message instanceof Buffer b) {
return new PostHeadersNettyOutbound(Mono.create(sink -> {
if (!hasSentHeaders()) {
try {
fullHttpResponse = prepareHttpMessage(b);

afterMarkSentHeaders();
}
catch (RuntimeException e) {
// If afterMarkSentHeaders throws an exception there is no need to release the ByteBuf here.
// It will be released by PostHeadersNettyOutbound as there are on error/cancel hooks
sink.error(e);
return;
}

onComplete();
sink.success();
}
else {
if (log.isDebugEnabled()) {
log.debug(format(channel(), "Dropped HTTP content, since response has been sent already: {}"), b);
}
b.close();
sink.success();
}
}), this, b);
}
return super.sendObject(message);
}

@Override
public Mono<Void> send() {
return FutureMono.deferFuture(() -> markSentHeaderAndBody() ?
channel().writeAndFlush(newFullBodyMessage(channel().bufferAllocator().allocate(0))) :
channel().newSucceededFuture());
return Mono.create(sink -> {
if (!hasSentHeaders()) {
onComplete();
sink.success();
}
else {
if (log.isDebugEnabled()) {
log.debug(format(channel(), "Response has been sent already."));
}
sink.success();
}
});
}

@Override
Expand Down Expand Up @@ -564,6 +652,46 @@ public HttpServerResponse status(HttpResponseStatus status) {
return this;
}

@Override
public Mono<Void> then() {
if (!channel().isActive()) {
return Mono.error(AbortedException.beforeSend());
}

if (hasSentHeaders()) {
return Mono.empty();
}

return FutureMono.deferFuture(() -> {
if (!hasSentHeaders()) {
beforeMarkSentHeaders();

HttpMessage msg = outboundHttpMessage();
boolean last = false;
int contentLength = HttpUtil.getContentLength(msg, -1);
if (contentLength == 0 || isContentAlwaysEmpty()) {
last = true;
msg = newFullBodyMessage(channel().bufferAllocator().allocate(0));
}
else if (contentLength > 0) {
responseHeaders.remove(HttpHeaderNames.TRANSFER_ENCODING);
}

afterMarkSentHeaders();

if (!last) {
return markSentHeaders() ? channel().writeAndFlush(msg) : channel().newSucceededFuture();
}
else {
return markSentHeaderAndBody() ? channel().writeAndFlush(msg) : channel().newSucceededFuture();
}
}
else {
return channel().newSucceededFuture();
}
});
}

@Override
public HttpServerResponse trailerHeaders(Consumer<? super HttpHeaders> trailerHeaders) {
this.trailerHeadersConsumer = Objects.requireNonNull(trailerHeaders, "trailerHeaders");
Expand Down Expand Up @@ -741,11 +869,10 @@ protected void onOutboundComplete() {
}
if (markSentHeaderAndBody()) {
if (log.isDebugEnabled()) {
log.debug(format(channel(), "No sendHeaders() called before complete, sending " +
"zero-length header"));
log.debug(format(channel(), "Headers are not sent before onComplete()."));
}

f = channel().writeAndFlush(newFullBodyMessage(channel().bufferAllocator().allocate(0)));
f = channel().writeAndFlush(fullHttpResponse != null ? fullHttpResponse : newFullBodyMessage(channel().bufferAllocator().allocate(0)));
}
else if (markSentBody()) {
HttpHeaders trailerHeaders = null;
Expand Down Expand Up @@ -782,35 +909,29 @@ else if (markSentBody()) {
}
else {
discard();
terminate();
return;
}
f.addListener(s -> {
discard();
if (!s.isSuccess() && log.isDebugEnabled()) {
log.debug(format(channel(), "Failed flushing last frame"), s.cause());
}
});

f.addListener(channel(), this);
}

static void cleanHandlerTerminate(Channel ch) {
ChannelOperations<?, ?> ops = get(ch);

if (ops == null) {
return;
}

ops.discard();

//Try to defer the disposing to leave a chance for any synchronous complete following this callback
if (!ops.isSubscriptionDisposed()) {
ch.executor()
.execute(((HttpServerOperations) ops)::terminate);
@Override
public void operationComplete(Channel channel, Future<? extends Void> future) {
if (!future.isSuccess()) {
if (log.isDebugEnabled()) {
log.debug(format(channel, "Sending last HTTP packet was not successful, terminating the channel"),
future.cause());
}
}
else {
//if already disposed, we can immediately call terminate
((HttpServerOperations) ops).terminate();
if (log.isDebugEnabled()) {
log.debug(format(channel, "Last HTTP packet was sent, terminating the channel"));
}
}

discard();

terminate();
}

static long requestsCounter(Channel channel) {
Expand Down Expand Up @@ -889,7 +1010,14 @@ else if (cause instanceof TooLongHttpHeaderException) {
}
}

ctx.channel().writeAndFlush(response);
if (ops instanceof HttpServerOperations) {
HttpServerOperations serverOps = (HttpServerOperations) ops;
serverOps.fullHttpResponse = response;
serverOps.onComplete();
}
else {
ctx.channel().writeAndFlush(response);
}

listener.onStateChange(ops, REQUEST_DECODING_FAILED);
}
Expand All @@ -914,13 +1042,15 @@ protected void onOutboundError(Throwable err) {
nettyResponse.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
responseHeaders.set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE);
channel().writeAndFlush(newFullBodyMessage(channel().bufferAllocator().allocate(0)))
.addListener(channel(), this)
.addListener(channel(), ChannelFutureListeners.CLOSE);
return;
}

markSentBody();
log.error(format(channel(), "Error finishing response. Closing connection"), err);
channel().writeAndFlush(channel().bufferAllocator().allocate(0))
.addListener(channel(), this)
.addListener(channel(), ChannelFutureListeners.CLOSE);
}

Expand Down
Loading

0 comments on commit 9f106da

Please sign in to comment.