Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use new PipeWriter Json overloads #55740

Merged
merged 6 commits into from
Jul 12, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/Hosting/TestHost/src/ResponseBodyPipeWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -95,4 +95,8 @@ public override Span<byte> GetSpan(int sizeHint = 0)
CheckNotComplete();
return _pipe.Writer.GetSpan(sizeHint);
}

public override bool CanGetUnflushedBytes => _pipe.Writer.CanGetUnflushedBytes;

public override long UnflushedBytes => _pipe.Writer.UnflushedBytes;
}
119 changes: 91 additions & 28 deletions src/Http/Http.Extensions/src/HttpResponseJsonExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// The .NET Foundation licenses this file to you under the MIT license.

using System.Diagnostics.CodeAnalysis;
using System.IO.Pipelines;
using System.Text.Json;
using System.Text.Json.Serialization;
using System.Text.Json.Serialization.Metadata;
Expand Down Expand Up @@ -89,13 +90,23 @@ public static Task WriteAsJsonAsync<TValue>(

response.ContentType = contentType ?? JsonConstants.JsonContentTypeWithCharset;

var startTask = Task.CompletedTask;
if (!response.HasStarted)
{
// Flush headers before starting Json serialization. This avoids an extra layer of buffering before the first flush.
startTask = response.StartAsync(cancellationToken);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'd be nice to get some HTTP/2 numbers too. I don't think this optimization helps in that case, but it probably doesn't hurt either. @sebastienros Do we have any HTTP/2 scenarios that use WriteAsJsonAsync?

}

// if no user provided token, pass the RequestAborted token and ignore OperationCanceledException
if (!cancellationToken.CanBeCanceled)
if (!startTask.IsCompleted || !cancellationToken.CanBeCanceled)
{
return WriteAsJsonAsyncSlow(response.Body, value, options, response.HttpContext.RequestAborted);
return WriteAsJsonAsyncSlow(startTask, response.BodyWriter, value, options,
ignoreOCE: !cancellationToken.CanBeCanceled,
cancellationToken.CanBeCanceled ? cancellationToken : response.HttpContext.RequestAborted);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The "Slow" path is normal one right, since the middleware doesn't pass a cancellation token?

I think it would be better to change WriteAsJsonAsync not pass in RequestAborted by default similar to WriteAsync. Essentially serializing JSON data to /dev/null should be faster most of the time than handling an OCE anyway unless you're doing something unusual like IAsyncEnumerable serialization, where you would probably want to explicitly pass in RequestAborted.

Right now, the only way to get the fast pass is to pass in a token from a cancellable source and then not cancel it. Passing in RequestAborted explicitly mostly works for this, but that's annoying and could result in OCEs in your logs that you don't want.

@JamesNK @Tratcher do you have opinions on not passing in RequestAborted by default?

}

return JsonSerializer.SerializeAsync(response.Body, value, options, cancellationToken);
startTask.GetAwaiter().GetResult();
return JsonSerializer.SerializeAsync(response.BodyWriter, value, options, cancellationToken);
}

/// <summary>
Expand All @@ -120,21 +131,33 @@ public static Task WriteAsJsonAsync<TValue>(

response.ContentType = contentType ?? JsonConstants.JsonContentTypeWithCharset;

var startTask = Task.CompletedTask;
if (!response.HasStarted)
{
// Flush headers before starting Json serialization. This avoids an extra layer of buffering before the first flush.
startTask = response.StartAsync(cancellationToken);
}

// if no user provided token, pass the RequestAborted token and ignore OperationCanceledException
if (!cancellationToken.CanBeCanceled)
if (!startTask.IsCompleted || !cancellationToken.CanBeCanceled)
{
return WriteAsJsonAsyncSlow(response, value, jsonTypeInfo);
return WriteAsJsonAsyncSlow(startTask, response, value, jsonTypeInfo,
ignoreOCE: !cancellationToken.CanBeCanceled,
cancellationToken.CanBeCanceled ? cancellationToken : response.HttpContext.RequestAborted);
}

return JsonSerializer.SerializeAsync(response.Body, value, jsonTypeInfo, cancellationToken);
startTask.GetAwaiter().GetResult();
return JsonSerializer.SerializeAsync(response.BodyWriter, value, jsonTypeInfo, cancellationToken);

static async Task WriteAsJsonAsyncSlow(HttpResponse response, TValue value, JsonTypeInfo<TValue> jsonTypeInfo)
static async Task WriteAsJsonAsyncSlow(Task startTask, HttpResponse response, TValue value, JsonTypeInfo<TValue> jsonTypeInfo,
bool ignoreOCE, CancellationToken cancellationToken)
{
try
{
await JsonSerializer.SerializeAsync(response.Body, value, jsonTypeInfo, response.HttpContext.RequestAborted);
await startTask;
await JsonSerializer.SerializeAsync(response.BodyWriter, value, jsonTypeInfo, cancellationToken);
}
catch (OperationCanceledException) { }
catch (OperationCanceledException) when (ignoreOCE) { }
}
}

Expand All @@ -161,37 +184,52 @@ public static Task WriteAsJsonAsync(

response.ContentType = contentType ?? JsonConstants.JsonContentTypeWithCharset;

var startTask = Task.CompletedTask;
if (!response.HasStarted)
{
// Flush headers before starting Json serialization. This avoids an extra layer of buffering before the first flush.
startTask = response.StartAsync(cancellationToken);
}

// if no user provided token, pass the RequestAborted token and ignore OperationCanceledException
if (!cancellationToken.CanBeCanceled)
if (!startTask.IsCompleted || !cancellationToken.CanBeCanceled)
{
return WriteAsJsonAsyncSlow(response, value, jsonTypeInfo);
return WriteAsJsonAsyncSlow(startTask, response, value, jsonTypeInfo,
ignoreOCE: !cancellationToken.CanBeCanceled,
cancellationToken.CanBeCanceled ? cancellationToken : response.HttpContext.RequestAborted);
}

return JsonSerializer.SerializeAsync(response.Body, value, jsonTypeInfo, cancellationToken);
startTask.GetAwaiter().GetResult();
return JsonSerializer.SerializeAsync(response.BodyWriter, value, jsonTypeInfo, cancellationToken);

static async Task WriteAsJsonAsyncSlow(HttpResponse response, object? value, JsonTypeInfo jsonTypeInfo)
static async Task WriteAsJsonAsyncSlow(Task startTask, HttpResponse response, object? value, JsonTypeInfo jsonTypeInfo,
bool ignoreOCE, CancellationToken cancellationToken)
{
try
{
await JsonSerializer.SerializeAsync(response.Body, value, jsonTypeInfo, response.HttpContext.RequestAborted);
await startTask;
await JsonSerializer.SerializeAsync(response.BodyWriter, value, jsonTypeInfo, cancellationToken);
}
catch (OperationCanceledException) { }
catch (OperationCanceledException) when (ignoreOCE) { }
}
}

[RequiresUnreferencedCode(RequiresUnreferencedCodeMessage)]
[RequiresDynamicCode(RequiresDynamicCodeMessage)]
private static async Task WriteAsJsonAsyncSlow<TValue>(
Stream body,
Task startTask,
PipeWriter body,
TValue value,
JsonSerializerOptions? options,
bool ignoreOCE,
CancellationToken cancellationToken)
{
try
{
await startTask;
await JsonSerializer.SerializeAsync(body, value, options, cancellationToken);
}
catch (OperationCanceledException) { }
catch (OperationCanceledException) when (ignoreOCE) { }
}

/// <summary>
Expand Down Expand Up @@ -266,29 +304,42 @@ public static Task WriteAsJsonAsync(

response.ContentType = contentType ?? JsonConstants.JsonContentTypeWithCharset;

var startTask = Task.CompletedTask;
if (!response.HasStarted)
{
// Flush headers before starting Json serialization. This avoids an extra layer of buffering before the first flush.
startTask = response.StartAsync(cancellationToken);
}

// if no user provided token, pass the RequestAborted token and ignore OperationCanceledException
if (!cancellationToken.CanBeCanceled)
if (!startTask.IsCompleted || !cancellationToken.CanBeCanceled)
{
return WriteAsJsonAsyncSlow(response.Body, value, type, options, response.HttpContext.RequestAborted);
return WriteAsJsonAsyncSlow(startTask, response.BodyWriter, value, type, options,
ignoreOCE: !cancellationToken.CanBeCanceled,
cancellationToken.CanBeCanceled ? cancellationToken : response.HttpContext.RequestAborted);
}

return JsonSerializer.SerializeAsync(response.Body, value, type, options, cancellationToken);
startTask.GetAwaiter().GetResult();
return JsonSerializer.SerializeAsync(response.BodyWriter, value, type, options, cancellationToken);
}

[RequiresUnreferencedCode(RequiresUnreferencedCodeMessage)]
[RequiresDynamicCode(RequiresDynamicCodeMessage)]
private static async Task WriteAsJsonAsyncSlow(
Stream body,
Task startTask,
PipeWriter body,
object? value,
Type type,
JsonSerializerOptions? options,
bool ignoreOCE,
CancellationToken cancellationToken)
{
try
{
await startTask;
await JsonSerializer.SerializeAsync(body, value, type, options, cancellationToken);
}
catch (OperationCanceledException) { }
catch (OperationCanceledException) when (ignoreOCE) { }
}

/// <summary>
Expand Down Expand Up @@ -316,21 +367,33 @@ public static Task WriteAsJsonAsync(

response.ContentType = contentType ?? JsonConstants.JsonContentTypeWithCharset;

var startTask = Task.CompletedTask;
if (!response.HasStarted)
{
// Flush headers before starting Json serialization. This avoids an extra layer of buffering before the first flush.
startTask = response.StartAsync(cancellationToken);
}

// if no user provided token, pass the RequestAborted token and ignore OperationCanceledException
if (!cancellationToken.CanBeCanceled)
if (!startTask.IsCompleted || !cancellationToken.CanBeCanceled)
{
return WriteAsJsonAsyncSlow();
return WriteAsJsonAsyncSlow(startTask, response.BodyWriter, value, type, context,
ignoreOCE: !cancellationToken.CanBeCanceled,
cancellationToken.CanBeCanceled ? cancellationToken : response.HttpContext.RequestAborted);
}

return JsonSerializer.SerializeAsync(response.Body, value, type, context, cancellationToken);
startTask.GetAwaiter().GetResult();
return JsonSerializer.SerializeAsync(response.BodyWriter, value, type, context, cancellationToken);

async Task WriteAsJsonAsyncSlow()
static async Task WriteAsJsonAsyncSlow(Task startTask, PipeWriter body, object? value, Type type, JsonSerializerContext context,
bool ignoreOCE, CancellationToken cancellationToken)
{
try
{
await JsonSerializer.SerializeAsync(response.Body, value, type, context, cancellationToken);
await startTask;
await JsonSerializer.SerializeAsync(body, value, type, context, cancellationToken);
}
catch (OperationCanceledException) { }
catch (OperationCanceledException) when (ignoreOCE) { }
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -494,10 +494,7 @@ private class TestStream : Stream
public override long Length { get; }
public override long Position { get; set; }

public override void Flush()
{
throw new NotImplementedException();
}
public override void Flush() { }

public override int Read(byte[] buffer, int offset, int count)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,21 +83,25 @@ public sealed override async Task WriteResponseBodyAsync(OutputFormatterWriteCon
}
}

var responseStream = httpContext.Response.Body;
if (selectedEncoding.CodePage == Encoding.UTF8.CodePage)
{
try
{
var responseWriter = httpContext.Response.BodyWriter;
if (!httpContext.Response.HasStarted)
{
// Flush headers before starting Json serialization. This avoids an extra layer of buffering before the first flush.
await httpContext.Response.StartAsync();
}

if (jsonTypeInfo is not null)
{
await JsonSerializer.SerializeAsync(responseStream, context.Object, jsonTypeInfo, httpContext.RequestAborted);
await JsonSerializer.SerializeAsync(responseWriter, context.Object, jsonTypeInfo, httpContext.RequestAborted);
}
else
{
await JsonSerializer.SerializeAsync(responseStream, context.Object, SerializerOptions, httpContext.RequestAborted);
await JsonSerializer.SerializeAsync(responseWriter, context.Object, SerializerOptions, httpContext.RequestAborted);
}

await responseStream.FlushAsync(httpContext.RequestAborted);
}
catch (OperationCanceledException) when (context.HttpContext.RequestAborted.IsCancellationRequested) { }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,18 @@ public async Task ExecuteAsync(ActionContext context, JsonResult result)
var objectType = value?.GetType() ?? typeof(object);

// Keep this code in sync with SystemTextJsonOutputFormatter
var responseStream = response.Body;
if (resolvedContentTypeEncoding.CodePage == Encoding.UTF8.CodePage)
{
try
{
await JsonSerializer.SerializeAsync(responseStream, value, objectType, jsonSerializerOptions, context.HttpContext.RequestAborted);
await responseStream.FlushAsync(context.HttpContext.RequestAborted);
var responseWriter = response.BodyWriter;
if (!response.HasStarted)
{
// Flush headers before starting Json serialization. This avoids an extra layer of buffering before the first flush.
await response.StartAsync();
}

await JsonSerializer.SerializeAsync(responseWriter, value, objectType, jsonSerializerOptions, context.HttpContext.RequestAborted);
}
catch (OperationCanceledException) when (context.HttpContext.RequestAborted.IsCancellationRequested) { }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,19 @@ public void Advance(int bytes)
}
}

public long UnflushedBytes
{
get
{
var bytes = _position + _advancedBytesForChunk + _pipeWriter.UnflushedBytes;
for (var i = 0; i < _completedSegments?.Count; i++)
{
bytes += _completedSegments[i].Length;
}
BrennanConroy marked this conversation as resolved.
Show resolved Hide resolved
return bytes;
}
}

public void CancelPendingFlush()
{
_pipeWriter.CancelPendingFlush();
Expand Down
2 changes: 2 additions & 0 deletions src/Servers/Kestrel/Core/src/Internal/Http/HttpProtocol.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1482,6 +1482,8 @@ public void Advance(int bytes)
}
}

public long UnflushedBytes => Output.UnflushedBytes;

public Memory<byte> GetMemory(int sizeHint = 0)
{
_isLeasedMemoryInvalid = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ public void Abort()
}
}

public override bool CanGetUnflushedBytes => true;
public override long UnflushedBytes => _pipeControl.UnflushedBytes;

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void ValidateState(CancellationToken cancellationToken = default)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ internal interface IHttpOutputProducer
Task WriteDataAsync(ReadOnlySpan<byte> data, CancellationToken cancellationToken);
ValueTask<FlushResult> WriteStreamSuffixAsync();
void Advance(int bytes);
long UnflushedBytes { get; }
Span<byte> GetSpan(int sizeHint = 0);
Memory<byte> GetMemory(int sizeHint = 0);
void CancelPendingFlush();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ internal interface IHttpResponseControl
Memory<byte> GetMemory(int sizeHint = 0);
Span<byte> GetSpan(int sizeHint = 0);
void Advance(int bytes);
long UnflushedBytes { get; }
ValueTask<FlushResult> FlushPipeAsync(CancellationToken cancellationToken);
ValueTask<FlushResult> WritePipeAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken);
void CancelPendingFlush();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,8 @@ public void Advance(int bytes)
}
}

public long UnflushedBytes => _pipeWriter.UnflushedBytes;

public Span<byte> GetSpan(int sizeHint = 0)
{
lock (_dataWriterLock)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ public void Advance(int bytes)
}
}

public long UnflushedBytes => _pipeWriter.UnflushedBytes;

public void CancelPendingFlush()
{
lock (_dataWriterLock)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,15 @@ public override void Complete(Exception? exception = null)
}
}

public override bool CanGetUnflushedBytes => _innerPipeWriter.CanGetUnflushedBytes;
public override long UnflushedBytes
{
get
{
return _innerPipeWriter.UnflushedBytes + _bytesBuffered;
}
}

public void Abort()
{
_aborted = true;
Expand Down
4 changes: 4 additions & 0 deletions src/Servers/Kestrel/shared/CompletionPipeWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ public override Span<byte> GetSpan(int sizeHint = 0)
return _inner.GetSpan(sizeHint);
}

public override bool CanGetUnflushedBytes => _inner.CanGetUnflushedBytes;

public override long UnflushedBytes => _inner.UnflushedBytes;

public void Reset()
{
IsCompleted = false;
Expand Down
Loading
Loading