Skip to content

Commit

Permalink
Improve shutdown of non-persistent HTTP/1 connections #12212 (#12216)
Browse files Browse the repository at this point in the history
* Improve shutdown of non-persistent HTTP/1 connections

 + shutdown in SendCallback

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>

---------

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
Co-authored-by: Simone Bordet <simone.bordet@gmail.com>
  • Loading branch information
gregw and sbordet authored Sep 2, 2024
1 parent 5439f17 commit 551710e
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -318,15 +318,16 @@ public void setUseOutputDirectByteBuffers(boolean useOutputDirectByteBuffers)
@Override
public ByteBuffer onUpgradeFrom()
{
if (!isRequestBufferEmpty())
if (isRequestBufferEmpty())
{
ByteBuffer unconsumed = ByteBuffer.allocateDirect(_retainableByteBuffer.remaining());
unconsumed.put(_retainableByteBuffer.getByteBuffer());
unconsumed.flip();
releaseRequestBuffer();
return unconsumed;
return null;
}
return null;
ByteBuffer unconsumed = ByteBuffer.allocateDirect(_retainableByteBuffer.remaining());
unconsumed.put(_retainableByteBuffer.getByteBuffer());
unconsumed.flip();
releaseRequestBuffer();
return unconsumed;
}

@Override
Expand All @@ -341,10 +342,10 @@ void releaseRequestBuffer()
{
if (LOG.isDebugEnabled())
LOG.debug("releaseRequestBuffer {}", this);
if (_retainableByteBuffer.release())
_retainableByteBuffer = null;
else
throw new IllegalStateException("unreleased buffer " + _retainableByteBuffer);
RetainableByteBuffer buffer = _retainableByteBuffer;
_retainableByteBuffer = null;
if (!buffer.release())
throw new IllegalStateException("unreleased buffer " + buffer);
}
}

Expand All @@ -369,7 +370,9 @@ public void onFillable()
HttpConnection last = setCurrentConnection(this);
try
{
while (getEndPoint().isOpen())
// We must loop until we fill -1 or there is an async pause in handling.
// Note that the endpoint might already be closed in some special circumstances.
while (true)
{
// Fill the request buffer (if needed).
int filled = fillRequestBuffer();
Expand Down Expand Up @@ -906,6 +909,13 @@ private void releaseChunk()
@Override
protected void onCompleteSuccess()
{
// If we are a non-persistent connection and have succeeded the last write...
if (_shutdownOut && !(_httpChannel.getRequest().getAttribute(HttpStream.UPGRADE_CONNECTION_ATTRIBUTE) instanceof Connection))
{
// then we shutdown the output here so that the client sees the body termination ASAP and
// cannot be delayed by any further server handling before the stream callback is completed.
getEndPoint().shutdownOutput();
}
release().succeeded();
}

Expand Down Expand Up @@ -1513,8 +1523,7 @@ public void succeeded()
return;
}

Connection upgradeConnection = (Connection)_httpChannel.getRequest().getAttribute(HttpStream.UPGRADE_CONNECTION_ATTRIBUTE);
if (upgradeConnection != null)
if (_httpChannel.getRequest().getAttribute(HttpStream.UPGRADE_CONNECTION_ATTRIBUTE) instanceof Connection upgradeConnection)
{
getEndPoint().upgrade(upgradeConnection);
_httpChannel.recycle();
Expand All @@ -1523,13 +1532,8 @@ public void succeeded()
return;
}

// As this is not an upgrade, we can shutdown the output if we know we are not persistent
if (_sendCallback._shutdownOut)
getEndPoint().shutdownOutput();

_httpChannel.recycle();


// If a 100 Continue is still expected to be sent, but no content was read, then
// close the parser so that seeks EOF below, not the next request.
if (_expects100Continue)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,15 @@
import org.eclipse.jetty.util.Blocker;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.NanoTime;
import org.eclipse.jetty.util.StringUtil;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -68,6 +72,7 @@
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.startsWith;
import static org.junit.jupiter.api.Assertions.assertEquals;
Expand Down Expand Up @@ -1085,6 +1090,56 @@ public void testCloseWhileWriteBlocked() throws Exception
}
}

@Test
public void testCloseWhileCompletePending() throws Exception
{
String content = "The End!\r\n";
CountDownLatch handleComplete = new CountDownLatch(1);
startServer(new Handler.Abstract()
{
@Override
public boolean handle(Request request, Response response, Callback callback) throws Exception
{
FutureCallback writeComplete = new FutureCallback();
Content.Sink.write(response, true, content, writeComplete);
// Wait until the write is complete
writeComplete.get(30, TimeUnit.SECONDS);

// Wait until test lets the handling complete
assertTrue(handleComplete.await(30, TimeUnit.SECONDS));

callback.succeeded();
return true;
}
});

try (Socket client = newSocket(_serverURI.getHost(), _serverURI.getPort()))
{
OutputStream output = client.getOutputStream();
output.write("""
GET / HTTP/1.1\r
Host: localhost:%d\r
Connection: close\r
\r
""".formatted(_serverURI.getPort())
.getBytes());
output.flush();

client.setSoTimeout(5000);
long start = NanoTime.now();
HttpTester.Input input = HttpTester.from(client.getInputStream());
HttpTester.Response response = HttpTester.parseResponse(input);
assertEquals(HttpStatus.OK_200, response.getStatus());
assertEquals(content, response.getContent());
assertFalse(input.isEOF());
assertEquals(-1, input.fillBuffer());
assertTrue(input.isEOF());
assertThat(NanoTime.secondsSince(start), lessThan(5L));

}
handleComplete.countDown();
}

@Test
public void testBigBlocks() throws Exception
{
Expand Down Expand Up @@ -1813,8 +1868,9 @@ public void testChunkedShutdown() throws Exception
}
}

@Test
public void testHoldContent() throws Exception
@ParameterizedTest
@ValueSource(booleans = {false /* TODO, true */})
public void testHoldContent(boolean close) throws Exception
{
Queue<Content.Chunk> contents = new ConcurrentLinkedQueue<>();
final int bufferSize = 1024;
Expand Down Expand Up @@ -1857,6 +1913,10 @@ public void onClosed(Connection connection)
}

response.setStatus(200);

if (close)
request.getConnectionMetaData().getConnection().getEndPoint().close();

callback.succeeded();
return true;
}
Expand Down Expand Up @@ -1897,9 +1957,12 @@ public void onClosed(Connection connection)
out.flush();

// check the response
HttpTester.Response response = HttpTester.parseResponse(client.getInputStream());
assertNotNull(response);
assertThat(response.getStatus(), is(200));
if (!close)
{
HttpTester.Response response = HttpTester.parseResponse(client.getInputStream());
assertNotNull(response);
assertThat(response.getStatus(), is(200));
}
}

assertTrue(closed.await(10, TimeUnit.SECONDS));
Expand Down

0 comments on commit 551710e

Please sign in to comment.