Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support zero-byte reads on HTTP response streams #61913

Merged
merged 14 commits into from
Nov 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 0 additions & 12 deletions src/libraries/Common/src/System/Net/StreamBuffer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,6 @@ public void EndWrite()

private (bool wait, int bytesRead) TryReadFromBuffer(Span<byte> buffer)
{
Debug.Assert(buffer.Length > 0);

Debug.Assert(!Monitor.IsEntered(SyncObject));
lock (SyncObject)
{
Expand Down Expand Up @@ -225,11 +223,6 @@ public void EndWrite()

public int Read(Span<byte> buffer)
{
if (buffer.Length == 0)
{
return 0;
}

(bool wait, int bytesRead) = TryReadFromBuffer(buffer);
if (wait)
{
Expand All @@ -246,11 +239,6 @@ public async ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken can
{
cancellationToken.ThrowIfCancellationRequested();

if (buffer.Length == 0)
{
return 0;
}

(bool wait, int bytesRead) = TryReadFromBuffer(buffer.Span);
if (wait)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public static IEnumerable<object[]> AllSeekModesAndValue(object value) =>
from mode in Enum.GetValues<SeekMode>()
select new object[] { mode, value };

protected async Task<int> ReadAsync(ReadWriteMode mode, Stream stream, byte[] buffer, int offset, int count, CancellationToken cancellationToken = default)
public static async Task<int> ReadAsync(ReadWriteMode mode, Stream stream, byte[] buffer, int offset, int count, CancellationToken cancellationToken = default)
{
if (mode == ReadWriteMode.SyncByte)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ public class UnidirectionalConnectedStreamsTests : ConnectedStreamConformanceTes
{
protected override int BufferedSize => StreamBuffer.DefaultMaxBufferSize;
protected override bool FlushRequiredToWriteData => false;
protected override bool BlocksOnZeroByteReads => true;

protected override Task<StreamPair> CreateConnectedStreamsAsync() =>
Task.FromResult<StreamPair>(ConnectedStreams.CreateUnidirectional());
Expand All @@ -18,6 +19,7 @@ public class BidirectionalConnectedStreamsTests : ConnectedStreamConformanceTest
{
protected override int BufferedSize => StreamBuffer.DefaultMaxBufferSize;
protected override bool FlushRequiredToWriteData => false;
protected override bool BlocksOnZeroByteReads => true;

protected override Task<StreamPair> CreateConnectedStreamsAsync() =>
Task.FromResult<StreamPair>(ConnectedStreams.CreateBidirectional());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,27 @@ public ChunkedEncodingReadStream(HttpConnection connection, HttpResponseMessage

public override int Read(Span<byte> buffer)
{
if (_connection == null || buffer.Length == 0)
if (_connection == null)
{
// Response body fully consumed or the caller didn't ask for any data.
// Response body fully consumed
return 0;
}

// Try to consume from data we already have in the buffer.
int bytesRead = ReadChunksFromConnectionBuffer(buffer, cancellationRegistration: default);
if (bytesRead > 0)
if (buffer.Length == 0)
{
if (PeekChunkFromConnectionBuffer())
{
return 0;
}
}
else
{
return bytesRead;
// Try to consume from data we already have in the buffer.
int bytesRead = ReadChunksFromConnectionBuffer(buffer, cancellationRegistration: default);
if (bytesRead > 0)
{
return bytesRead;
}
}

// Nothing available to consume. Fall back to I/O.
Expand All @@ -68,7 +78,8 @@ public override int Read(Span<byte> buffer)
// as the connection buffer. That avoids an unnecessary copy while still reading
// the maximum amount we'd otherwise read at a time.
Debug.Assert(_connection.RemainingBuffer.Length == 0);
bytesRead = _connection.Read(buffer.Slice(0, (int)Math.Min((ulong)buffer.Length, _chunkBytesRemaining)));
Debug.Assert(buffer.Length != 0);
int bytesRead = _connection.Read(buffer.Slice(0, (int)Math.Min((ulong)buffer.Length, _chunkBytesRemaining)));
if (bytesRead == 0)
{
throw new IOException(SR.Format(SR.net_http_invalid_response_premature_eof_bytecount, _chunkBytesRemaining));
Expand All @@ -81,15 +92,35 @@ public override int Read(Span<byte> buffer)
return bytesRead;
}

if (buffer.Length == 0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this necessary? Won't the Fill call immediately below take care of this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fill will pass the entire connection buffer to the transport stream.
Checking for 0 here here means that we will attempt a zero byte read first if the user requested it and no data was available in the buffer.

This allows us to take advantage of the zero byte read on SslStream.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that makes sense. I assume we do this for the non-chunked cases too?

We should add a comment explaining this.

I do wonder a bit about cases where we have received part of a chunk already (or even just part of a chunk header). In these cases it seems reasonable to assume that the rest of the chunk will arrive promptly, and thus a zero byte read is probably counterproductive.

In other words, zero-byte read is mostly useful when you have some sort of streaming scenario. In that scenario the server is almost certainly not going to actually pause sending in the middle of a chunk; it's going to do it between chunks. So it's really the case where we have fully consumed the previous chunk and haven't received any of the subsequent chunk where zero-byte read is useful.

In SslStream we only do the zero-byte read on the first read of a new TLS frame (I think).

All that said, perhaps this doesn't really matter in practice. Thoughts?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume we do this for the non-chunked cases too?

Yes, we now do this for all HTTP 1 response streams.
On HTTP/2 and HTTP/3, the response streams also block on zero-byte reads, but those reads are separate from the reads to the underlying transport.

I agree that the most obvious benefits are to idle-ish connections (streaming), but even regular content transfers will often be slow enough for the receive loop to outperform and be forced to wait.

AspNetCore separates this into two parts:

  • The streams (pipes) exposed to the user will block on zero-byte reads
  • Whether the underlying transport uses zero-byte reads is controlled by the WaitForDataBeforeAllocatingBuffer flag which always defaults to true. Their SocketTransport will issue a zero-byte read before every actual read.

The overhead of honoring the user's request to issue a zero-byte read should be low, especially if we already checked that no data has been buffered.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

even regular content transfers will often be slow enough for the receive loop to outperform and be forced to wait.

Yes, that's a good point, and I agree it's reasonable to do a zero-byte read in these cases.

To step back for a second, here's how I think about this generally:

The response streams should always support zero-byte reads -- meaning, a zero-byte read will block until data is actually available and the subsequent read is guaranteed to not block. This enables a user to reduce their overall buffer memory usage: they only need to allocate a buffer when data is actually available.

That said, supporting zero-byte read does not require that the response stream does a zero-byte read against the underlying stream. This is most obvious in the HTTP2 case because of multiplexing. But it also applies to a case like the chunked response stream: we (usually) have an internal read buffer, and so we could just do regular (non-zero-byte) reads into that read buffer and then use that to provide the zero-byte-read semantics to the user.

What that approach doesn't address is, being smart about our own buffer management. Currently, we never release the internal read buffer between read calls, even when it's empty. And currently we never do a zero-byte read against the underlying stream. But if the user issues a zero-byte-read against us, then it seems reasonable to assume that they want to trade off some small CPU overhead for reduced memory usage -- that's the whole point of doing a zero-byte read. And so when that happens, we should avoid holding/allocating a buffer during the user's zero-byte read when possible, and instead do a zero-byte read against the underlying stream, and only allocate a buffer when that zero-byte read returns. (That's the change we made to SslStream in 6.0.)

What's weird about this PR is that we haven't changed the internal buffer management at all -- we still never release the internal read buffer. So even though we do a zero-byte read against the underlying stream, we aren't actually getting any advantages from this in terms of reduced memory usage.

Put differently: there's no point in doing a zero-byte read unless we are going to defer allocation of the read buffer until that zero-byte read completes.

Thoughts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Re your comments on buffer improvements to Raw/Chunked/HTTP2/etc, that all seems good to me.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's so that the user's zero-byte read translates into a zero-byte read to SslStream (deferring its 32k buffer allocation) while avoiding changes to the lifetimes of HttpConnection owned buffers. I.e. it's a minimal change to get most of the benefit, matching other HTTP/1.x streams.
It's akin to this change (HttpConnection line 1732) - not needed for the zero-byte read to block, but allows us to make use of SslStream's zero-byte read improvements.

I agree the comment is misleading now without further changes, I'll update that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, makes sense. Let's change the comment and get this in.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to ensure we are on the same page:

It seems like there are really three pieces of work here to fully support zero-byte reads:

(1) Support user zero-byte read so they can be smart about managing their buffers
(2) Use the user's zero-byte read to improve our own internal buffer management
(3) Perform zero-byte read against the underlying stream so that the underlying stream can be smart about their buffer management

This PR contains (1) and (3) for all response streams (except (3) for HTTP2, due to multiplexing).

We will look at (2) in a future PR. This may include changes to buffer management beyond just zero-byte-read cases.

We will also look at (3) for HTTP2 in a future PR.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that matches my understanding exactly

{
// User requested a zero-byte read, and we have no data available in the buffer for processing.
// This zero-byte read indicates their desire to trade off the extra cost of a zero-byte read
// for reduced memory consumption when data is not immediately available.
// So, we will issue our own zero-byte read against the underlying stream to allow it to make use of
// optimizations, such as deferring buffer allocation until data is actually available.
_connection.Read(buffer);
}

// We're only here if we need more data to make forward progress.
_connection.Fill();

// Now that we have more, see if we can get any response data, and if
// we can we're done.
int bytesCopied = ReadChunksFromConnectionBuffer(buffer, cancellationRegistration: default);
if (bytesCopied > 0)
if (buffer.Length == 0)
{
return bytesCopied;
if (PeekChunkFromConnectionBuffer())
{
return 0;
}
}
else
{
int bytesCopied = ReadChunksFromConnectionBuffer(buffer, cancellationRegistration: default);
if (bytesCopied > 0)
{
return bytesCopied;
}
}
}
}
Expand All @@ -102,17 +133,27 @@ public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken
return ValueTask.FromCanceled<int>(cancellationToken);
}

if (_connection == null || buffer.Length == 0)
if (_connection == null)
{
// Response body fully consumed or the caller didn't ask for any data.
// Response body fully consumed
return new ValueTask<int>(0);
}

// Try to consume from data we already have in the buffer.
int bytesRead = ReadChunksFromConnectionBuffer(buffer.Span, cancellationRegistration: default);
if (bytesRead > 0)
if (buffer.Length == 0)
{
return new ValueTask<int>(bytesRead);
if (PeekChunkFromConnectionBuffer())
{
return new ValueTask<int>(0);
}
}
else
{
// Try to consume from data we already have in the buffer.
int bytesRead = ReadChunksFromConnectionBuffer(buffer.Span, cancellationRegistration: default);
if (bytesRead > 0)
{
return new ValueTask<int>(bytesRead);
}
}

// We may have just consumed the remainder of the response (with no actual data
Expand All @@ -132,7 +173,6 @@ private async ValueTask<int> ReadAsyncCore(Memory<byte> buffer, CancellationToke
// Should only be called if ReadChunksFromConnectionBuffer returned 0.

Debug.Assert(_connection != null);
Debug.Assert(buffer.Length > 0);

CancellationTokenRegistration ctr = _connection.RegisterCancellation(cancellationToken);
try
Expand All @@ -154,6 +194,7 @@ private async ValueTask<int> ReadAsyncCore(Memory<byte> buffer, CancellationToke
// as the connection buffer. That avoids an unnecessary copy while still reading
// the maximum amount we'd otherwise read at a time.
Debug.Assert(_connection.RemainingBuffer.Length == 0);
Debug.Assert(buffer.Length != 0);
int bytesRead = await _connection.ReadAsync(buffer.Slice(0, (int)Math.Min((ulong)buffer.Length, _chunkBytesRemaining))).ConfigureAwait(false);
if (bytesRead == 0)
{
Expand All @@ -167,15 +208,35 @@ private async ValueTask<int> ReadAsyncCore(Memory<byte> buffer, CancellationToke
return bytesRead;
}

if (buffer.Length == 0)
MihaZupan marked this conversation as resolved.
Show resolved Hide resolved
{
// User requested a zero-byte read, and we have no data available in the buffer for processing.
// This zero-byte read indicates their desire to trade off the extra cost of a zero-byte read
// for reduced memory consumption when data is not immediately available.
// So, we will issue our own zero-byte read against the underlying stream to allow it to make use of
// optimizations, such as deferring buffer allocation until data is actually available.
await _connection.ReadAsync(buffer).ConfigureAwait(false);
}

// We're only here if we need more data to make forward progress.
await _connection.FillAsync(async: true).ConfigureAwait(false);

// Now that we have more, see if we can get any response data, and if
// we can we're done.
int bytesCopied = ReadChunksFromConnectionBuffer(buffer.Span, ctr);
if (bytesCopied > 0)
if (buffer.Length == 0)
{
return bytesCopied;
if (PeekChunkFromConnectionBuffer())
{
return 0;
}
}
else
{
int bytesCopied = ReadChunksFromConnectionBuffer(buffer.Span, ctr);
if (bytesCopied > 0)
{
return bytesCopied;
}
}
}
}
Expand Down Expand Up @@ -208,8 +269,7 @@ private async Task CopyToAsyncCore(Stream destination, CancellationToken cancell
{
while (true)
{
ReadOnlyMemory<byte> bytesRead = ReadChunkFromConnectionBuffer(int.MaxValue, ctr);
if (bytesRead.Length == 0)
if (ReadChunkFromConnectionBuffer(int.MaxValue, ctr) is not ReadOnlyMemory<byte> bytesRead || bytesRead.Length == 0)
{
break;
}
Expand All @@ -235,28 +295,33 @@ private async Task CopyToAsyncCore(Stream destination, CancellationToken cancell
}
}

private bool PeekChunkFromConnectionBuffer()
{
return ReadChunkFromConnectionBuffer(maxBytesToRead: 0, cancellationRegistration: default).HasValue;
}

private int ReadChunksFromConnectionBuffer(Span<byte> buffer, CancellationTokenRegistration cancellationRegistration)
{
Debug.Assert(buffer.Length > 0);
int totalBytesRead = 0;
while (buffer.Length > 0)
{
ReadOnlyMemory<byte> bytesRead = ReadChunkFromConnectionBuffer(buffer.Length, cancellationRegistration);
Debug.Assert(bytesRead.Length <= buffer.Length);
if (bytesRead.Length == 0)
if (ReadChunkFromConnectionBuffer(buffer.Length, cancellationRegistration) is not ReadOnlyMemory<byte> bytesRead || bytesRead.Length == 0)
{
break;
}

Debug.Assert(bytesRead.Length <= buffer.Length);
totalBytesRead += bytesRead.Length;
bytesRead.Span.CopyTo(buffer);
buffer = buffer.Slice(bytesRead.Length);
}
return totalBytesRead;
}

private ReadOnlyMemory<byte> ReadChunkFromConnectionBuffer(int maxBytesToRead, CancellationTokenRegistration cancellationRegistration)
private ReadOnlyMemory<byte>? ReadChunkFromConnectionBuffer(int maxBytesToRead, CancellationTokenRegistration cancellationRegistration)
{
Debug.Assert(maxBytesToRead > 0 && _connection != null);
Debug.Assert(_connection != null);

try
{
Expand Down Expand Up @@ -310,7 +375,7 @@ private ReadOnlyMemory<byte> ReadChunkFromConnectionBuffer(int maxBytesToRead, C
}

int bytesToConsume = Math.Min(maxBytesToRead, (int)Math.Min((ulong)connectionBuffer.Length, _chunkBytesRemaining));
Debug.Assert(bytesToConsume > 0);
Debug.Assert(bytesToConsume > 0 || maxBytesToRead == 0);

_connection.ConsumeFromRemainingBuffer(bytesToConsume);
_chunkBytesRemaining -= (ulong)bytesToConsume;
Expand Down Expand Up @@ -441,8 +506,7 @@ public override async ValueTask<bool> DrainAsync(int maxDrainBytes)
drainedBytes += _connection.RemainingBuffer.Length;
while (true)
{
ReadOnlyMemory<byte> bytesRead = ReadChunkFromConnectionBuffer(int.MaxValue, ctr);
if (bytesRead.Length == 0)
if (ReadChunkFromConnectionBuffer(int.MaxValue, ctr) is not ReadOnlyMemory<byte> bytesRead || bytesRead.Length == 0)
{
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ public ConnectionCloseReadStream(HttpConnection connection) : base(connection)
public override int Read(Span<byte> buffer)
{
HttpConnection? connection = _connection;
if (connection == null || buffer.Length == 0)
if (connection == null)
{
// Response body fully consumed or the caller didn't ask for any data
// Response body fully consumed
return 0;
}

int bytesRead = connection.Read(buffer);
if (bytesRead == 0)
if (bytesRead == 0 && buffer.Length != 0)
{
// We cannot reuse this connection, so close it.
_connection = null;
Expand All @@ -40,9 +40,9 @@ public override async ValueTask<int> ReadAsync(Memory<byte> buffer, Cancellation
CancellationHelper.ThrowIfCancellationRequested(cancellationToken);

HttpConnection? connection = _connection;
if (connection == null || buffer.Length == 0)
if (connection == null)
{
// Response body fully consumed or the caller didn't ask for any data
// Response body fully consumed
return 0;
}

Expand All @@ -69,7 +69,7 @@ public override async ValueTask<int> ReadAsync(Memory<byte> buffer, Cancellation
}
}

if (bytesRead == 0)
if (bytesRead == 0 && buffer.Length != 0)
{
// If cancellation is requested and tears down the connection, it could cause the read
// to return 0, which would otherwise signal the end of the data, but that would lead
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ public ContentLengthReadStream(HttpConnection connection, ulong contentLength) :

public override int Read(Span<byte> buffer)
{
if (_connection == null || buffer.Length == 0)
if (_connection == null)
{
// Response body fully consumed or the caller didn't ask for any data.
// Response body fully consumed
return 0;
}

Expand All @@ -35,7 +35,7 @@ public override int Read(Span<byte> buffer)
}

int bytesRead = _connection.Read(buffer);
if (bytesRead <= 0)
if (bytesRead <= 0 && buffer.Length != 0)
{
// Unexpected end of response stream.
throw new IOException(SR.Format(SR.net_http_invalid_response_premature_eof_bytecount, _contentBytesRemaining));
Expand All @@ -58,9 +58,9 @@ public override async ValueTask<int> ReadAsync(Memory<byte> buffer, Cancellation
{
CancellationHelper.ThrowIfCancellationRequested(cancellationToken);

if (_connection == null || buffer.Length == 0)
if (_connection == null)
{
// Response body fully consumed or the caller didn't ask for any data
// Response body fully consumed
return 0;
}

Expand Down Expand Up @@ -94,7 +94,7 @@ public override async ValueTask<int> ReadAsync(Memory<byte> buffer, Cancellation
}
}

if (bytesRead <= 0)
if (bytesRead == 0 && buffer.Length != 0)
{
// A cancellation request may have caused the EOF.
CancellationHelper.ThrowIfCancellationRequested(cancellationToken);
Expand Down
Loading