Skip to content

Commit

Permalink
Fixes #6168 - Improve handling of unconsumed content
Browse files Browse the repository at this point in the history
Added or expanded the scope of catch blocks to properly handle exceptions thrown by `HttpInput.Interceptor`.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
  • Loading branch information
sbordet authored Apr 13, 2021
1 parent 57d0bae commit fe359ac
Show file tree
Hide file tree
Showing 3 changed files with 570 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,14 @@ else if (filled < 0)
}
}
}
catch (Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("{} caught exception {}", this, _channel.getState(), x);
BufferUtil.clear(_requestBuffer);
releaseRequestBuffer();
close();
}
finally
{
setCurrentConnection(last);
Expand Down Expand Up @@ -322,10 +330,7 @@ protected boolean fillAndParseForContent()
private int fillRequestBuffer()
{
if (_contentBufferReferences.get() > 0)
{
LOG.warn("{} fill with unconsumed content!", this);
return 0;
}
throw new IllegalStateException("fill with unconsumed content on " + this);

if (BufferUtil.isEmpty(_requestBuffer))
{
Expand Down Expand Up @@ -353,11 +358,13 @@ else if (filled < 0)
}
catch (IOException e)
{
LOG.debug(e);
if (LOG.isDebugEnabled())
LOG.debug(e);
_parser.atEOF();
return -1;
}
}

return 0;
}

Expand Down
170 changes: 95 additions & 75 deletions jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ public int available()
{
produceContent();
}
catch (IOException e)
catch (Throwable e)
{
woken = failed(e);
}
Expand Down Expand Up @@ -390,7 +390,7 @@ protected Content nextContent() throws IOException
*
* @return Content or null
*/
protected Content nextNonSentinelContent()
protected Content nextNonSentinelContent() throws IOException
{
while (true)
{
Expand All @@ -416,7 +416,7 @@ protected Content nextNonSentinelContent()
* @return the content or EOF or null if none available.
* @throws IOException if retrieving the content fails
*/
protected Content produceNextContext() throws IOException
protected Content produceNextContent() throws IOException
{
Content content = nextInterceptedContent();
if (content == null && !isFinished())
Expand All @@ -433,7 +433,7 @@ protected Content produceNextContext() throws IOException
*
* @return Content with remaining, a {@link SentinelContent}, or null
*/
protected Content nextInterceptedContent()
protected Content nextInterceptedContent() throws IOException
{
// If we have a chunk produced by interception
if (_intercepted != null)
Expand All @@ -458,9 +458,10 @@ protected Content nextInterceptedContent()
// Are we intercepting?
if (_interceptor != null)
{
// Intercept the current content (may be called several
// times for the same content
_intercepted = _interceptor.readFrom(_content);
// Intercept the current content.
// The interceptor may be called several
// times for the same content.
_intercepted = intercept(_content);

// If interception produced new content
if (_intercepted != null && _intercepted != _content)
Expand Down Expand Up @@ -492,6 +493,24 @@ protected Content nextInterceptedContent()
return null;
}

private Content intercept(Content content) throws IOException
{
try
{
return _interceptor.readFrom(content);
}
catch (Throwable x)
{
IOException failure = new IOException("Bad content", x);
content.failed(failure);
HttpChannel channel = _channelState.getHttpChannel();
Response response = channel.getResponse();
if (response.isCommitted())
channel.abort(failure);
throw failure;
}
}

private void consume(Content content)
{
if (!isError() && content instanceof EofContent)
Expand Down Expand Up @@ -529,21 +548,6 @@ protected int get(Content content, byte[] buffer, int offset, int length)
return l;
}

/**
* Consumes the given content. Calls the content succeeded if all content consumed.
*
* @param content the content to consume
* @param length the number of bytes to consume
*/
protected void skip(Content content, int length)
{
int l = content.skip(length);

_contentConsumed += l;
if (l > 0 && content.isEmpty())
nextNonSentinelContent(); // hungry succeed
}

/**
* Blocks until some content or some end-of-file event arrives.
*
Expand Down Expand Up @@ -620,10 +624,19 @@ public boolean addContent(Content content)
if (LOG.isDebugEnabled())
LOG.debug("{} addContent {}", this, content);

if (nextInterceptedContent() != null)
return wakeup();
else
return false;
try
{
if (nextInterceptedContent() != null)
return wakeup();
else
return false;
}
catch (Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("", x);
return failed(x);
}
}
}
}
Expand Down Expand Up @@ -686,6 +699,7 @@ public boolean eof()
* Consume all available content without blocking.
* Raw content is counted in the {@link #getContentReceived()} statistics, but
* is not intercepted nor counted in the {@link #getContentConsumed()} statistics
*
* @return True if EOF was reached, false otherwise.
*/
public boolean consumeAll()
Expand Down Expand Up @@ -765,37 +779,39 @@ public boolean isFinished()
@Override
public boolean isReady()
{
try
synchronized (_inputQ)
{
synchronized (_inputQ)
try
{
if (_listener == null)
return true;
if (_state instanceof EOFState)
return true;
if (_waitingForContent)
return false;
if (produceNextContext() != null)
if (produceNextContent() != null)
return true;
_channelState.onReadUnready();
_waitingForContent = true;
return false;
}
catch (Throwable e)
{
if (LOG.isDebugEnabled())
LOG.debug("", e);
failed(e);
return true;
}
return false;
}
catch (IOException e)
{
LOG.ignore(e);
return true;
}
}

@Override
public void setReadListener(ReadListener readListener)
{
boolean woken = false;
try
synchronized (_inputQ)
{
synchronized (_inputQ)
try
{
if (_listener != null)
throw new IllegalStateException("ReadListener already set");
Expand All @@ -808,7 +824,7 @@ public void setReadListener(ReadListener readListener)
}
else
{
Content content = produceNextContext();
Content content = produceNextContent();
if (content != null)
{
_state = ASYNC;
Expand All @@ -827,10 +843,13 @@ else if (_state == EOF)
}
}
}
}
catch (IOException e)
{
throw new RuntimeIOException(e);
catch (Throwable e)
{
if (LOG.isDebugEnabled())
LOG.debug("", e);
failed(e);
woken = _channelState.onReadReady();
}
}

if (woken)
Expand Down Expand Up @@ -895,49 +914,49 @@ private boolean wakeup()
@Override
public void run()
{
final ReadListener listener;
Throwable error;
ReadListener listener = null;
Throwable error = null;
boolean aeof = false;

synchronized (_inputQ)
try
{
listener = _listener;

if (_state == EOF)
return;

if (_state == AEOF)
synchronized (_inputQ)
{
_state = EOF;
aeof = true;
}
listener = _listener;

error = _state.getError();

if (!aeof && error == null)
{
Content content = nextInterceptedContent();
if (content == null)
if (_state == EOF)
return;

// Consume a directly received EOF without first calling onDataAvailable
// So -1 will never be read and only onAddDataRread or onError will be called
if (content instanceof EofContent)
if (_state == AEOF)
{
consume(content);
if (_state == EARLY_EOF)
error = _state.getError();
else if (_state == AEOF)
_state = EOF;
aeof = true;
}

error = _state.getError();

if (!aeof && error == null)
{
Content content = nextInterceptedContent();
if (content == null)
return;

// Consume a directly received EOF without first calling onDataAvailable
// So -1 will never be read and only onAddDataRread or onError will be called
if (content instanceof EofContent)
{
aeof = true;
_state = EOF;
consume(content);
if (_state == EARLY_EOF)
error = _state.getError();
else if (_state == AEOF)
{
aeof = true;
_state = EOF;
}
}
}
}
}

try
{
if (error != null)
{
// TODO is this necessary to add here?
Expand All @@ -958,7 +977,8 @@ else if (aeof)
catch (Throwable e)
{
LOG.warn(e.toString());
LOG.debug(e);
if (LOG.isDebugEnabled())
LOG.debug("", e);
try
{
if (aeof || error == null)
Expand Down Expand Up @@ -1106,7 +1126,7 @@ protected static class EOFState extends State
{
}

protected class ErrorState extends EOFState
protected static class ErrorState extends EOFState
{
final Throwable _error;

Expand Down Expand Up @@ -1155,7 +1175,7 @@ public String toString()
protected static final State ASYNC = new State()
{
@Override
public int noContent() throws IOException
public int noContent()
{
return 0;
}
Expand Down
Loading

0 comments on commit fe359ac

Please sign in to comment.