-
Notifications
You must be signed in to change notification settings - Fork 4.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
API Proposal: Add PipeReader.ReadAsync overloads to allow waiting for a specific amount of data #25063
Comments
I like it
Should it be a suggestion/hint of size? What happens if there will be a large delay before the next bit of data? (e.g. TLS may want to close the frame early and send anyway). Often there are variable sized protocols, which have a preferred frame size (e.g. TCP at packet sizes, TLS at frame size), which can close the frame early and send to next pipe stage/output; however are more efficient doing the send on Frame boundaries. |
Then you cancel the read to yield it. |
So the pattern would be? public async Task Write15LinesAsync()
{
PipeWriter buffer = Pipe.Writer;
var length = 'z' - 'a' + 3;
for (var i = 0; i < 15; i++)
{
var written = WriteAtoZ(buffer.GetMemory(length));
buffer.Advance(written);
await buffer.FlushAsync(); // Write data
}
await Pipe.Reader.CancelPendingRead(); // Clear any buffered data
}
public int WriteAtoZ(Memory<byte> memory)
{
Span<byte> span = memory.Span;
var i = 0;
for (var c = 'a'; c <= 'z'; c++)
{
span[i] = (byte)c;
i++;
}
span[i + 1] = (byte)'\r';
span[i + 2] = (byte)'\n';
return i + 2;
} |
Or would cancel/wake up happen with Flush but not Write so it would be: public async Task Write15LinesAsync()
{
PipeWriter buffer = Pipe.Writer;
var length = 'z' - 'a' + 3;
for (var i = 0; i < 15; i++)
{
var written = WriteAtoZ(buffer.GetMemory(length));
buffer.Advance(written);
await buffer.WriteAsync(); // Write data
}
await buffer.FlushAsync(); // Clear any buffered data
} e.g. how do you
|
The question is really about where the knowledge is (reader or writer). In this simple example, you can just flush at the end and not Write for each Advance (so it's a bit contrived). |
Both sizes have knowledge; the Writer knows when it has no more pending data; the Reader knows either its ideal chunk sizes; or whether it has enough data to parse.
Is simplified for ease of concepts; the point is you can't Advance forever or you'll run out of memory; and you get no backpressure feedback. Ok. a more complex example: public async Task ExecuteAsync()
{
try
{
var bytesWritten = 0u;
while (true)
{
if (!_pipe.Reader.TryRead(out var result))
{
// No waiting input data
if (bytesWritten > 0)
{
// Flush any written data before waiting for more input
var flushResult = _pipe.Writer.FlushAsync();
if (!flushResult.IsCompleted)
{
await flushResult;
}
bytesWritten = 0u;
}
// Await for more input
result = await _pipe.Reader.ReadAsync();
}
var inputBuffer = result.Buffer;
var consumed = inputBuffer.Start;
var examined = inputBuffer.End;
try
{
if (inputBuffer.IsEmpty && result.IsCompleted)
{
break;
}
ParseRequest(inputBuffer, out consumed, out examined);
if (_state != State.Body && result.IsCompleted)
{
// Bad request, finish request processing
break;
}
if (_state == State.Body)
{
bytesWritten += await ProcessRequestAsync();
_state = State.StartLine;
}
}
finally
{
_pipe.Reader.AdvanceTo(consumed, examined);
}
}
// Flush any pending data
await _pipe.Writer.FlushAsync();
_pipe.Reader.Complete();
}
catch (Exception ex)
{
_pipe.Reader.Complete(ex);
}
finally
{
_pipe.Writer.Complete();
}
}
private async ValueTask<uint> ProcessRequestAsync()
{
// Processing a request Writes, but doesn't Flush,
// unless it needs to trigger immediate sending
// like after a websocket message, server sent event or
// at end of </head> to get early browser read of css, fonts and preload, prefetch links
await _pipe.Writer.WriteAsync(_plainTextBytes);
return (uint)_plainTextBytes.Length;
} So the question is how do you differentiate between a forced early wake up, and an enough data wakeup on the Writer and Reader ends. I'm suggesting using |
Also that you want to default the first param of public virtual PipeAwaiter<FlushResult> WriteAsync(ReadOnlyMemory<byte> source = default, CancellationToken cancellationToken = default) So you can do await WriteAsync(); When you are only writing using Span and Advance |
Then don't FlushAsync, use a reasonable threshold that won't run out of memory? What's the scenario where you will need to buffer so much that you could run out?
WriteAsync without a buffer is not an easily understood concept that carries over to normal usage.
You need to describe how the other side of the writer wants to handle consuming the data written during request processing. |
Second scenario, TextWriter without arrays as buffers (as generally the encoding happens in Flush so it needs to hold conversion buffers) ; and encoding directly to Pipe without intermediary copies. Holding onto these arrays causes issues https://github.com/dotnet/coreclr/issues/16389 as well as contention getting the from the pool.
With Pipes the arrays just aren't needed as the Pipe brings its own memory. Stream that wraps Pipe public class DuplexPipeStream : Stream, IDuplexPipe
{
private Pipe _pipe;
PipeReader IDuplexPipe.Input => _pipe.Reader;
PipeWriter IDuplexPipe.Output => _pipe.Writer;
public override async Task FlushAsync(CancellationToken cancellationToken)
=> await _pipe.Writer.FlushAsync(cancellationToken);
public override Task WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken = default)
=> _pipe.Writer.WriteAsync(source, cancellationToken);
public override async ValueTask<int> ReadAsync(Memory<byte> destination, CancellationToken cancellationToken = default)
{
while (true)
{
var result = await _pipe.Reader.ReadAsync();
var readableBuffer = result.Buffer;
try
{
if (!readableBuffer.IsEmpty)
{
// buffer.Count is int
var count = (int)Math.Min(readableBuffer.Length, destination.Count);
readableBuffer = readableBuffer.Slice(0, count);
readableBuffer.CopyTo(destination);
return count;
}
else if (result.IsCompleted)
{
return 0;
}
}
finally
{
_pipe.Reader.AdvanceTo(readableBuffer.End, readableBuffer.End);
}
}
}
public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
=> WriteAsync(new Memory<byte>(buffer, offset, count), cancellationToken);
public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
=> ReadAsync(new Memory<byte>(buffer, offset, count), cancellationToken).AsTask();
// Other stream overloads
} Then TextWriter that works with that Stream; encoding on the go in WriteAsync rather than waiting for FlushAsync and having no internal; but it would rely on WriteAsync only occasionally acting as a Flush (as it does in StreamTextWriter currently) to maintain performance. public class HttpResponseStreamWriter : TextWriter
{
internal const int DefaultBufferSize = 16 * 1024;
private IDuplexPipe _pipe;
private StreamWriter _streamWriter;
private readonly Encoder _encoder;
public PipeStreamWriter(Stream stream, Encoding encoding)
: this(stream, encoding, DefaultBufferSize) {}
public PipeStreamWriter(Stream stream, Encoding encoding, int bufferSize)
{
Encoding = encoding;
if (stream is IDuplexPipe pipe)
{
_pipe = pipe;
}
else
{
_streamWriter = new StreamWriter(stream, encoding, bufferSize);
}
}
public override Encoding Encoding { get; }
public unsafe override void Write(char value)
{
if (_pipe != null)
{
Span<byte> bytes = _pipe.Output.GetSpan(Encoding.GetMaxByteCount(1));
var encoded = Encoding.GetBytes(new ReadOnlySpan<byte>(&value, sizeof(char)), bytes);
_pipe.Output.Advance(encoded);
}
else
{
_streamWriter.Write(value);
}
}
else
{
_streamWriter.Write(value);
}
}
public override async Task WriteAsync(ReadOnlyMemory<char> source)
{
if (_pipe != null)
{
Write(source, default(CancellationToken));
await _pipe.Output.WriteAsync();
}
else
{
await _streamWriter.WriteAsync(source);
}
}
private void Write(ReadOnlyMemory<char> source, CancellationToken token)
{
ReadOnlySpan<char> input = source.Span;
int minBytes = Encoding.GetMaxCharCount(1);
while (input.Length > 0)
{
Span<byte> bytes = _pipe.Output.GetSpan(minBytes);
int totalEncoded = 0;
while (bytes.Length > 0)
{
int toEncode = Math.Min(Encoding.GetMaxCharCount(bytes.Length), input.Length);
var encoded = Encoding.GetBytes(input.Slice(0, toEncode), bytes);
input = input.Slice(toEncode);
bytes = bytes.Slice(encoded);
totalEncoded += encoded;
if (bytes.Length < minBytes)
{
break;
}
}
_pipe.Output.Advance(totalEncoded);
}
}
public override Task WriteAsync(char[] values, int index, int count)
=> WriteAsync(new ReadOnlyMemory<char>(values, index, count));
public override Task WriteAsync(string value)
=> WriteAsync(value.AsReadOnlyMemory());
public override Task FlushAsync()
{
if (_pipe != null)
{
await _pipe.Output.FlushAsync();
}
else
{
await _streamWriter.FlushAsync();
}
}
// Other TextWriter overloads
} |
So the question is if WriteAsync called FlushAsync as now; then TextWriter.FlushAsync wouldn't call FlushAsync; instead it would need to cancel the reader of the next stage in the pipeline
So it would additionally need with the wrapping Stream the next stage in the pipeline so it could call the void method public override Task FlushAsync()
{
if (_pipe != null)
{
// Where does _nextPipe come from?
// Is it a problem CancelPendingRead is acting as a non-awaitable Flush?
_nextPipe.Input.CancelPendingRead();
}
else
{
await _streamWriter.FlushAsync();
}
} |
btw I think |
You still haven't shown the consuming side. What's on the other end of the pipe that's flushing? I'd like to see the transport code basically and how it uses these 2 features. What I hear you asking for is a force flush from the writer side even though the reader is looking for a specific buffer size. |
Something along the lines of: const int PreferredFrameSize = 1024; // Varies based on Reader's framing size
public async Task WriteFramesAsync(CancellationToken cancellationToken)
{
try
{
await WriteFramesAsync(_pipe.Reader, cancellationToken).ConfigureAwait(false);
}
catch (Exception ex)
{
_pipe.Reader.Complete(ex);
}
finally
{
_pipe.Writer.Complete();
}
}
private async Task WriteFramesAsync(PipeReader reader, CancellationToken cancellationToken)
{
while (true)
{
// Read specifying preferred readsize hint
ReadResult result = await reader.ReadAsync(PreferredFrameSize, cancellationToken);
ReadOnlySequence<byte> inputBuffer = result.Buffer;
SequencePosition consumed = inputBuffer.Start;
SequencePosition examined = inputBuffer.End;
bool shouldConsumeAll = (result.IsFlush || result.IsCompleted);
try
{
while (true)
{
cancellationToken.ThrowIfCancellationRequested();
// Output frame, passing on Flush if last frame of current data
await OutputFrameAsync(
inputBuffer,
flush: (inputBuffer.Length <= PreferredFrameSize ? shouldConsumeAll : false),
out consumed,
out examined,
cancellationToken);
inputBuffer = inputBuffer.Slice(consumed);
if (inputBuffer.IsEmpty)
{
// Frame completed and no more data, await more data
break;
}
if (!result.IsCompleted && inputBuffer.Length < PreferredFrameSize)
{
// Under preferred size, check if more input was made
// available while writing Frames, Completed won't have more data
reader.AdvanceTo(consumed, examined);
if (reader.TryRead(out ReadResult newResult))
{
result = newResult;
inputBuffer = result.Buffer;
consumed = inputBuffer.Start;
examined = inputBuffer.End;
// Apply inital Flush if originally requested
shouldConsumeAll = (shouldConsumeAll || result.IsFlush || result.IsCompleted);
}
}
if (!shouldConsumeAll && inputBuffer.Length < PreferredFrameSize)
{
// Not enough left to fill a frame and not Flush or Complete, await more data
break;
}
}
if (inputBuffer.IsEmpty && result.IsCompleted)
{
// Finished, exit
break;
}
}
finally
{
reader.AdvanceTo(consumed, examined);
}
}
reader.Complete();
} |
I think the problem here is that you need to guarantee that the writer absolutely is going to flush or you can end up in a situation where you don't have enough data for a frame and you haven't consumed enough to release back pressure. Thats' fundamentally what I think is broken about the reader using IsFlush to make decisions like this. That's why I think you always need this timeout on the reader side that tries to consume data as a failsafe. |
If you are in back pressure you have If the reader stops making progress while in back pressure; then it should throw Then it would tear everything down? |
So IsFlush is also true if you WriteAsync and cross the threshold?
But how do you code around that? |
Yes; because the "buffer" is full. Much like with a TextWriter or a BufferedStream adds a FlushAsync to a WriteAsync when its internal buffer gets full.
Added
As you could call Thinking about it... Perhaps the values are already there and no more values needed? Expose the thresholds on public abstract partial class PipeReader
{
public long PauseWriterThreshold { get; } => _pauseWriterThreshold;
public long ResumeWriterThreshold { get; } => _resumeWriterThreshold;
public PipeAwaiter<ReadResult> ReadAsync()
=> ReadAsync(Math.Max(ResumeWriterThreshold, inputBuffer.Length + 1)); // semantically
public abstract PipeAwaiter<ReadResult> ReadAsync(long waitForBytes);
} Then the reader has a chance to decide whether it wants to defer while the pressure is going up; but has to consume once its hit the top. A defaults for sockets could be something like new PipeOptions(
pauseWriterThreshold: 65535, // 64kB
resumeWriterThreshold: 1400; // or MTU - TCP/IP Headers
);
Basing it on the PipeOptions means its config (which would make @tmds happy) also using the same value for Also exposing the threasholds makes it easier for a wrapping stream as it can pick up the wrapped Readers limits and make decisions using them. |
Minimal implementation using IsFlush (as it needs to be passed on) private async Task PassThroughAsync(PipeReader reader, PipeWriter writer, CancellationToken cancellationToken)
{
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> inputBuffer = result.Buffer;
SequencePosition consumed = inputBuffer.Start;
SequencePosition examined = inputBuffer.End;
try
{
cancellationToken.ThrowIfCancellationRequested();
if (inputBuffer.IsEmpty && result.IsCompleted)
{
await writer.FlushAsync(cancellationToken);
break;
}
// While loop could be a WriteAsync(ReadOnlySequence<byte> extension
while (inputBuffer.TryGet(ref consumed, out var memory, advance: true))
{
await writer.WriteAsync(memory, cancellationToken);
}
if (result.IsFlush)
{
await writer.FlushAsync(cancellationToken);
}
}
finally
{
reader.AdvanceTo(consumed, examined);
}
}
reader.Complete();
}
public async Task PassThroughAsync(CancellationToken cancellationToken)
{
try
{
await PassThroughAsync(_pipe.Reader, _pipe.Writer, cancellationToken).ConfigureAwait(false);
}
catch (Exception ex)
{
_pipe.Reader.Complete(ex);
}
finally
{
_pipe.Writer.Complete();
}
} |
I think we need to split up between:
For the first, I think the reader needs to respect the writer's FlushAsync (as it is the way to indicate no more data is expected soon). The way of working in larger chunks is to buffer data from WriteAsync up to an amount (cfr dotnet/apireviews#59 (comment)). The reader can use the IsFlush flag to distinguish between an explicit Flush by FlushAsync or an implicit flush by WriteAsync. I think we can use the low threshold as the WriteAsync buffering threshold. For the second, the FlushAsyncs won't cause ReadAsync to complete until the amount is available (or an error occurs, e.g. timeout). |
I didn't read through the whole thread, but the initial summary of this issue is something I have seen that Boost ASIO does in the C++ world fairly well: https://www.boost.org/doc/libs/1_69_0/doc/html/boost_asio/reference/read_until.html This is very helpful when consuming protocols that have a header of what the total expected length is. |
One of my projects at work was interfacing with two (different) IoT devices that had custom application protocols over TCP, and were length-prefixed. The farther up the application/message stack you get, the harder it is to do something like partial parsing; yes, you just want to pull the entire message off the stream and handle it. Of course, I'm pretty sure most of the actual messages being passed were fitting inside a single TCP frame, so it's pretty likely my code only had to actually |
@halter73 showed interested in doing this |
I haven't back read the whole issue but.... There is never a scenario (outside cancelation/error) that if you say "I need 5k" but actually you want 3k ... In TLS due to the encrypted nature it's all or nothing. Secondly surely if your "asked for" amount is > the backpressure threshold shouldn't you throw straight away.... As your requirement is already bigger than your backpressure anyway? |
@Drawaes the back pressure mechanic was changed completely in the last version so it's no longer a problem. |
Given the old behavior where backpressure was based on the number of unconsumed bytes, this makes a lot of sense. Given the new behavior where backpressure is based on the number of unexamined bytes, I think it's fair to say you are "examining" all the bytes until you get at least the number of bytes you expected. Really this feature is just meant to be a more optimized version of the following: public async ValueTask<ReadResult> ReadAtLeast(PipeReader pipeReader, long minBytes)
{
var result = await pipeReader.ReadAsync();
while (result.Buffer.Length < minBytes)
{
pipeReader.AdvanceTo(result.Buffer.Start, result.Buffer.End);
result = await pipeReader.ReadAsync();
}
return result;
} |
Also, that will be the implementation in the PipeReader abstract base class. I'll turn this into a formal API proposal. |
namespace System.IO.Pipelines
{
partial class PipeReader
{
public ValueTask<ReadResult> ReadAtLeastAsync(int minimumBytes, CancellationToken cancellationToken = default);
protected virtual ValueTask<ReadResult> ReadAtLeastAsyncCore(int minimumBytes, CancellationToken cancellationToken);
}
} @GrabYourPitchforks seems to have had some existential concerns with this. The shape is approved as long as we don't outright axe the feature. |
My concern specifically was with this API being promoted as a way to deal with TLV-structured data. What I absolutely do not want to have happen is for this pattern to become commonplace: int payloadLength = await reader.ReadInt32Async();
await reader.ReadAtLeastAsync(payloadLength); // buffer data
// consume buffer once await returns If the untrusted payload contains a TLV header which says "2GB of data follows," then flowing this value as-is to I don't think this is an existential threat to the API as proposed, since we have lots of APIs which misbehave when given untrusted input. But we really need to be careful with documentation and sample code here. And if we don't think docs are sufficient then we should consider placing restrictions on how large a number this API is willing to accept by default. |
If we want to bound the size (4K or something?), the first release is the time to do it. I see length prefixed values were even called out in the top proposal as a scenario. |
We don't need to bound anything, the nice thing about pipelines is that it'll give you nice split buffers up by without destroying the heap. Now we can talk about the fact that memory is being bloated but that's not a concern here, let the upper layers deal with maximum message sizes before calling this API. It's what all serializers of this form do already. |
That was my specific concern. Ultimately it doesn't matter whether the API allocates a single 100MB buffer or 10,000 x 10KB buffers. That's an implementation detail. The problem fundamentally is that we don't want the untrusted remote endpoint to coerce the application into doing significantly more work / consuming significantly more resources than the application intended.
If we put a blurb like this in the doc and sample code, I think that would address the concerns. Thanks! |
It should be one of your concerns though because there are 2 common "issues" here. Code like this: // This is bad code, do not copy it
int payloadLength = await stream.ReadInt32Async();
var buffer = new byte[payloadLength]; // Untrusted length, did the client even send this much data? Why are we assuming they did before actually reading it
await stream.ReadAsync(buffer); // read the data This pattern is one of the traps this API helps you avoid. Plus the other issue you mention (not having a maximum message size). e.g. https://github.com/protocolbuffers/protobuf/blob/3f5fc4df1de8e12b2235c3006593e22d6993c3f5/csharp/src/Google.Protobuf/ParsingPrimitives.cs#L469-L475
That's absolutely reasonable since it's already a concern today but I can see why this API invites you to do it (since the pipe helps you buffer). |
I notice we didn't define the behavior of ReadAtLeast(0). I think it should signify, wait until you have any data (since we have TryRead for synchronous attempts). But I guess that naturally falls out 😄 |
When using this API it's important to have a timeout in case |
How is that different from any other async API? That is what cancellation is for |
Some timeouts happen without the use of a It depends on the API how important the timeout case is. For this API (and related) it's important. |
What API times out without a token? |
For the |
I'm fine if it's done through a |
Those aren't APIs in the same vein as this so not comparable (and the PipeWriter in ASP.NET Core is extremely specific and tied to Kestrel's implementation...). Timeouts are handled via cancellation for async APIs. |
Often a protocol knows how much data it is waiting for (TLS frames being one example, size headered binary protocols being another).
Instead of ReadAsync() then you check the header and you see you don't have enough, so you ReadAsync() check if you have enough and continue doing loops until you have a complete frame. If you could do a "ReadAsync(xxxxx)" and it wouldn't return unless there was an error/complete or at least that amount of data was available it would solve extra looping in every downstream protocol
Proposed API
public abstract partial class PipeReader { + public virtual ValueTask<ReadResult> ReadAsync(int minimumBytes, CancellationToken cancellationToken = default); }
/cc @Drawaes
The text was updated successfully, but these errors were encountered: