Skip to content

Commit

Permalink
Support zero-byte reads on HTTP response streams (#61913)
Browse files Browse the repository at this point in the history
* Allow zero byte reads on raw HTTP/1.1 response streams

* Allow zero byte reads on the rest of HTTP/1.1 response streams

* Allow zero byte reads on HTTP/2 response streams

* Allow zero byte reads on HTTP/3 response streams

* Enable sync zero-byte reads

* Add zero-byte read tests

* Fully enable zero-byte reads on HTTP/2 and 3

* Add zero-byte read tests for HTTP/2 and HTTP/3

* Remove unsafe-ish code from PeekChunkFromConnectionBuffer

* Add comments when we do extra zero-byte reads

* Update MockQuicStreamConformanceTests to allow zero-byte reads

* Update ConnectedStream tests to allow zero-byte reads

* Skip zero-byte read tests on Browser

* Update comment on explicit zero-byte reads in ChunkedEncodingReadStream
  • Loading branch information
MihaZupan authored Nov 29, 2021
1 parent 027d755 commit d7f3f58
Show file tree
Hide file tree
Showing 14 changed files with 446 additions and 82 deletions.
12 changes: 0 additions & 12 deletions src/libraries/Common/src/System/Net/StreamBuffer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,6 @@ public void EndWrite()

private (bool wait, int bytesRead) TryReadFromBuffer(Span<byte> buffer)
{
Debug.Assert(buffer.Length > 0);

Debug.Assert(!Monitor.IsEntered(SyncObject));
lock (SyncObject)
{
Expand Down Expand Up @@ -225,11 +223,6 @@ public void EndWrite()

public int Read(Span<byte> buffer)
{
if (buffer.Length == 0)
{
return 0;
}

(bool wait, int bytesRead) = TryReadFromBuffer(buffer);
if (wait)
{
Expand All @@ -246,11 +239,6 @@ public async ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken can
{
cancellationToken.ThrowIfCancellationRequested();

if (buffer.Length == 0)
{
return 0;
}

(bool wait, int bytesRead) = TryReadFromBuffer(buffer.Span);
if (wait)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public static IEnumerable<object[]> AllSeekModesAndValue(object value) =>
from mode in Enum.GetValues<SeekMode>()
select new object[] { mode, value };

protected async Task<int> ReadAsync(ReadWriteMode mode, Stream stream, byte[] buffer, int offset, int count, CancellationToken cancellationToken = default)
public static async Task<int> ReadAsync(ReadWriteMode mode, Stream stream, byte[] buffer, int offset, int count, CancellationToken cancellationToken = default)
{
if (mode == ReadWriteMode.SyncByte)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ public class UnidirectionalConnectedStreamsTests : ConnectedStreamConformanceTes
{
protected override int BufferedSize => StreamBuffer.DefaultMaxBufferSize;
protected override bool FlushRequiredToWriteData => false;
protected override bool BlocksOnZeroByteReads => true;

protected override Task<StreamPair> CreateConnectedStreamsAsync() =>
Task.FromResult<StreamPair>(ConnectedStreams.CreateUnidirectional());
Expand All @@ -18,6 +19,7 @@ public class BidirectionalConnectedStreamsTests : ConnectedStreamConformanceTest
{
protected override int BufferedSize => StreamBuffer.DefaultMaxBufferSize;
protected override bool FlushRequiredToWriteData => false;
protected override bool BlocksOnZeroByteReads => true;

protected override Task<StreamPair> CreateConnectedStreamsAsync() =>
Task.FromResult<StreamPair>(ConnectedStreams.CreateBidirectional());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,27 @@ public ChunkedEncodingReadStream(HttpConnection connection, HttpResponseMessage

public override int Read(Span<byte> buffer)
{
if (_connection == null || buffer.Length == 0)
if (_connection == null)
{
// Response body fully consumed or the caller didn't ask for any data.
// Response body fully consumed
return 0;
}

// Try to consume from data we already have in the buffer.
int bytesRead = ReadChunksFromConnectionBuffer(buffer, cancellationRegistration: default);
if (bytesRead > 0)
if (buffer.Length == 0)
{
if (PeekChunkFromConnectionBuffer())
{
return 0;
}
}
else
{
return bytesRead;
// Try to consume from data we already have in the buffer.
int bytesRead = ReadChunksFromConnectionBuffer(buffer, cancellationRegistration: default);
if (bytesRead > 0)
{
return bytesRead;
}
}

// Nothing available to consume. Fall back to I/O.
Expand All @@ -68,7 +78,8 @@ public override int Read(Span<byte> buffer)
// as the connection buffer. That avoids an unnecessary copy while still reading
// the maximum amount we'd otherwise read at a time.
Debug.Assert(_connection.RemainingBuffer.Length == 0);
bytesRead = _connection.Read(buffer.Slice(0, (int)Math.Min((ulong)buffer.Length, _chunkBytesRemaining)));
Debug.Assert(buffer.Length != 0);
int bytesRead = _connection.Read(buffer.Slice(0, (int)Math.Min((ulong)buffer.Length, _chunkBytesRemaining)));
if (bytesRead == 0)
{
throw new IOException(SR.Format(SR.net_http_invalid_response_premature_eof_bytecount, _chunkBytesRemaining));
Expand All @@ -81,15 +92,35 @@ public override int Read(Span<byte> buffer)
return bytesRead;
}

if (buffer.Length == 0)
{
// User requested a zero-byte read, and we have no data available in the buffer for processing.
// This zero-byte read indicates their desire to trade off the extra cost of a zero-byte read
// for reduced memory consumption when data is not immediately available.
// So, we will issue our own zero-byte read against the underlying stream to allow it to make use of
// optimizations, such as deferring buffer allocation until data is actually available.
_connection.Read(buffer);
}

// We're only here if we need more data to make forward progress.
_connection.Fill();

// Now that we have more, see if we can get any response data, and if
// we can we're done.
int bytesCopied = ReadChunksFromConnectionBuffer(buffer, cancellationRegistration: default);
if (bytesCopied > 0)
if (buffer.Length == 0)
{
return bytesCopied;
if (PeekChunkFromConnectionBuffer())
{
return 0;
}
}
else
{
int bytesCopied = ReadChunksFromConnectionBuffer(buffer, cancellationRegistration: default);
if (bytesCopied > 0)
{
return bytesCopied;
}
}
}
}
Expand All @@ -102,17 +133,27 @@ public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken
return ValueTask.FromCanceled<int>(cancellationToken);
}

if (_connection == null || buffer.Length == 0)
if (_connection == null)
{
// Response body fully consumed or the caller didn't ask for any data.
// Response body fully consumed
return new ValueTask<int>(0);
}

// Try to consume from data we already have in the buffer.
int bytesRead = ReadChunksFromConnectionBuffer(buffer.Span, cancellationRegistration: default);
if (bytesRead > 0)
if (buffer.Length == 0)
{
return new ValueTask<int>(bytesRead);
if (PeekChunkFromConnectionBuffer())
{
return new ValueTask<int>(0);
}
}
else
{
// Try to consume from data we already have in the buffer.
int bytesRead = ReadChunksFromConnectionBuffer(buffer.Span, cancellationRegistration: default);
if (bytesRead > 0)
{
return new ValueTask<int>(bytesRead);
}
}

// We may have just consumed the remainder of the response (with no actual data
Expand All @@ -132,7 +173,6 @@ private async ValueTask<int> ReadAsyncCore(Memory<byte> buffer, CancellationToke
// Should only be called if ReadChunksFromConnectionBuffer returned 0.

Debug.Assert(_connection != null);
Debug.Assert(buffer.Length > 0);

CancellationTokenRegistration ctr = _connection.RegisterCancellation(cancellationToken);
try
Expand All @@ -154,6 +194,7 @@ private async ValueTask<int> ReadAsyncCore(Memory<byte> buffer, CancellationToke
// as the connection buffer. That avoids an unnecessary copy while still reading
// the maximum amount we'd otherwise read at a time.
Debug.Assert(_connection.RemainingBuffer.Length == 0);
Debug.Assert(buffer.Length != 0);
int bytesRead = await _connection.ReadAsync(buffer.Slice(0, (int)Math.Min((ulong)buffer.Length, _chunkBytesRemaining))).ConfigureAwait(false);
if (bytesRead == 0)
{
Expand All @@ -167,15 +208,35 @@ private async ValueTask<int> ReadAsyncCore(Memory<byte> buffer, CancellationToke
return bytesRead;
}

if (buffer.Length == 0)
{
// User requested a zero-byte read, and we have no data available in the buffer for processing.
// This zero-byte read indicates their desire to trade off the extra cost of a zero-byte read
// for reduced memory consumption when data is not immediately available.
// So, we will issue our own zero-byte read against the underlying stream to allow it to make use of
// optimizations, such as deferring buffer allocation until data is actually available.
await _connection.ReadAsync(buffer).ConfigureAwait(false);
}

// We're only here if we need more data to make forward progress.
await _connection.FillAsync(async: true).ConfigureAwait(false);

// Now that we have more, see if we can get any response data, and if
// we can we're done.
int bytesCopied = ReadChunksFromConnectionBuffer(buffer.Span, ctr);
if (bytesCopied > 0)
if (buffer.Length == 0)
{
return bytesCopied;
if (PeekChunkFromConnectionBuffer())
{
return 0;
}
}
else
{
int bytesCopied = ReadChunksFromConnectionBuffer(buffer.Span, ctr);
if (bytesCopied > 0)
{
return bytesCopied;
}
}
}
}
Expand Down Expand Up @@ -208,8 +269,7 @@ private async Task CopyToAsyncCore(Stream destination, CancellationToken cancell
{
while (true)
{
ReadOnlyMemory<byte> bytesRead = ReadChunkFromConnectionBuffer(int.MaxValue, ctr);
if (bytesRead.Length == 0)
if (ReadChunkFromConnectionBuffer(int.MaxValue, ctr) is not ReadOnlyMemory<byte> bytesRead || bytesRead.Length == 0)
{
break;
}
Expand All @@ -235,28 +295,33 @@ private async Task CopyToAsyncCore(Stream destination, CancellationToken cancell
}
}

private bool PeekChunkFromConnectionBuffer()
{
return ReadChunkFromConnectionBuffer(maxBytesToRead: 0, cancellationRegistration: default).HasValue;
}

private int ReadChunksFromConnectionBuffer(Span<byte> buffer, CancellationTokenRegistration cancellationRegistration)
{
Debug.Assert(buffer.Length > 0);
int totalBytesRead = 0;
while (buffer.Length > 0)
{
ReadOnlyMemory<byte> bytesRead = ReadChunkFromConnectionBuffer(buffer.Length, cancellationRegistration);
Debug.Assert(bytesRead.Length <= buffer.Length);
if (bytesRead.Length == 0)
if (ReadChunkFromConnectionBuffer(buffer.Length, cancellationRegistration) is not ReadOnlyMemory<byte> bytesRead || bytesRead.Length == 0)
{
break;
}

Debug.Assert(bytesRead.Length <= buffer.Length);
totalBytesRead += bytesRead.Length;
bytesRead.Span.CopyTo(buffer);
buffer = buffer.Slice(bytesRead.Length);
}
return totalBytesRead;
}

private ReadOnlyMemory<byte> ReadChunkFromConnectionBuffer(int maxBytesToRead, CancellationTokenRegistration cancellationRegistration)
private ReadOnlyMemory<byte>? ReadChunkFromConnectionBuffer(int maxBytesToRead, CancellationTokenRegistration cancellationRegistration)
{
Debug.Assert(maxBytesToRead > 0 && _connection != null);
Debug.Assert(_connection != null);

try
{
Expand Down Expand Up @@ -310,7 +375,7 @@ private ReadOnlyMemory<byte> ReadChunkFromConnectionBuffer(int maxBytesToRead, C
}

int bytesToConsume = Math.Min(maxBytesToRead, (int)Math.Min((ulong)connectionBuffer.Length, _chunkBytesRemaining));
Debug.Assert(bytesToConsume > 0);
Debug.Assert(bytesToConsume > 0 || maxBytesToRead == 0);

_connection.ConsumeFromRemainingBuffer(bytesToConsume);
_chunkBytesRemaining -= (ulong)bytesToConsume;
Expand Down Expand Up @@ -441,8 +506,7 @@ public override async ValueTask<bool> DrainAsync(int maxDrainBytes)
drainedBytes += _connection.RemainingBuffer.Length;
while (true)
{
ReadOnlyMemory<byte> bytesRead = ReadChunkFromConnectionBuffer(int.MaxValue, ctr);
if (bytesRead.Length == 0)
if (ReadChunkFromConnectionBuffer(int.MaxValue, ctr) is not ReadOnlyMemory<byte> bytesRead || bytesRead.Length == 0)
{
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ public ConnectionCloseReadStream(HttpConnection connection) : base(connection)
public override int Read(Span<byte> buffer)
{
HttpConnection? connection = _connection;
if (connection == null || buffer.Length == 0)
if (connection == null)
{
// Response body fully consumed or the caller didn't ask for any data
// Response body fully consumed
return 0;
}

int bytesRead = connection.Read(buffer);
if (bytesRead == 0)
if (bytesRead == 0 && buffer.Length != 0)
{
// We cannot reuse this connection, so close it.
_connection = null;
Expand All @@ -40,9 +40,9 @@ public override async ValueTask<int> ReadAsync(Memory<byte> buffer, Cancellation
CancellationHelper.ThrowIfCancellationRequested(cancellationToken);

HttpConnection? connection = _connection;
if (connection == null || buffer.Length == 0)
if (connection == null)
{
// Response body fully consumed or the caller didn't ask for any data
// Response body fully consumed
return 0;
}

Expand All @@ -69,7 +69,7 @@ public override async ValueTask<int> ReadAsync(Memory<byte> buffer, Cancellation
}
}

if (bytesRead == 0)
if (bytesRead == 0 && buffer.Length != 0)
{
// If cancellation is requested and tears down the connection, it could cause the read
// to return 0, which would otherwise signal the end of the data, but that would lead
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ public ContentLengthReadStream(HttpConnection connection, ulong contentLength) :

public override int Read(Span<byte> buffer)
{
if (_connection == null || buffer.Length == 0)
if (_connection == null)
{
// Response body fully consumed or the caller didn't ask for any data.
// Response body fully consumed
return 0;
}

Expand All @@ -35,7 +35,7 @@ public override int Read(Span<byte> buffer)
}

int bytesRead = _connection.Read(buffer);
if (bytesRead <= 0)
if (bytesRead <= 0 && buffer.Length != 0)
{
// Unexpected end of response stream.
throw new IOException(SR.Format(SR.net_http_invalid_response_premature_eof_bytecount, _contentBytesRemaining));
Expand All @@ -58,9 +58,9 @@ public override async ValueTask<int> ReadAsync(Memory<byte> buffer, Cancellation
{
CancellationHelper.ThrowIfCancellationRequested(cancellationToken);

if (_connection == null || buffer.Length == 0)
if (_connection == null)
{
// Response body fully consumed or the caller didn't ask for any data
// Response body fully consumed
return 0;
}

Expand Down Expand Up @@ -94,7 +94,7 @@ public override async ValueTask<int> ReadAsync(Memory<byte> buffer, Cancellation
}
}

if (bytesRead <= 0)
if (bytesRead == 0 && buffer.Length != 0)
{
// A cancellation request may have caused the EOF.
CancellationHelper.ThrowIfCancellationRequested(cancellationToken);
Expand Down
Loading

0 comments on commit d7f3f58

Please sign in to comment.