Skip to content

Commit

Permalink
Use new PipeWriter Json overloads (#55740)
Browse files Browse the repository at this point in the history
  • Loading branch information
BrennanConroy authored Jul 12, 2024
1 parent fe84bd0 commit cd24d14
Show file tree
Hide file tree
Showing 18 changed files with 474 additions and 44 deletions.
4 changes: 4 additions & 0 deletions src/Hosting/TestHost/src/ResponseBodyPipeWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -95,4 +95,8 @@ public override Span<byte> GetSpan(int sizeHint = 0)
CheckNotComplete();
return _pipe.Writer.GetSpan(sizeHint);
}

public override bool CanGetUnflushedBytes => _pipe.Writer.CanGetUnflushedBytes;

public override long UnflushedBytes => _pipe.Writer.UnflushedBytes;
}
119 changes: 91 additions & 28 deletions src/Http/Http.Extensions/src/HttpResponseJsonExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// The .NET Foundation licenses this file to you under the MIT license.

using System.Diagnostics.CodeAnalysis;
using System.IO.Pipelines;
using System.Text.Json;
using System.Text.Json.Serialization;
using System.Text.Json.Serialization.Metadata;
Expand Down Expand Up @@ -89,13 +90,23 @@ public static Task WriteAsJsonAsync<TValue>(

response.ContentType = contentType ?? JsonConstants.JsonContentTypeWithCharset;

var startTask = Task.CompletedTask;
if (!response.HasStarted)
{
// Flush headers before starting Json serialization. This avoids an extra layer of buffering before the first flush.
startTask = response.StartAsync(cancellationToken);
}

// if no user provided token, pass the RequestAborted token and ignore OperationCanceledException
if (!cancellationToken.CanBeCanceled)
if (!startTask.IsCompleted || !cancellationToken.CanBeCanceled)
{
return WriteAsJsonAsyncSlow(response.Body, value, options, response.HttpContext.RequestAborted);
return WriteAsJsonAsyncSlow(startTask, response.BodyWriter, value, options,
ignoreOCE: !cancellationToken.CanBeCanceled,
cancellationToken.CanBeCanceled ? cancellationToken : response.HttpContext.RequestAborted);
}

return JsonSerializer.SerializeAsync(response.Body, value, options, cancellationToken);
startTask.GetAwaiter().GetResult();
return JsonSerializer.SerializeAsync(response.BodyWriter, value, options, cancellationToken);
}

/// <summary>
Expand All @@ -120,21 +131,33 @@ public static Task WriteAsJsonAsync<TValue>(

response.ContentType = contentType ?? JsonConstants.JsonContentTypeWithCharset;

var startTask = Task.CompletedTask;
if (!response.HasStarted)
{
// Flush headers before starting Json serialization. This avoids an extra layer of buffering before the first flush.
startTask = response.StartAsync(cancellationToken);
}

// if no user provided token, pass the RequestAborted token and ignore OperationCanceledException
if (!cancellationToken.CanBeCanceled)
if (!startTask.IsCompleted || !cancellationToken.CanBeCanceled)
{
return WriteAsJsonAsyncSlow(response, value, jsonTypeInfo);
return WriteAsJsonAsyncSlow(startTask, response, value, jsonTypeInfo,
ignoreOCE: !cancellationToken.CanBeCanceled,
cancellationToken.CanBeCanceled ? cancellationToken : response.HttpContext.RequestAborted);
}

return JsonSerializer.SerializeAsync(response.Body, value, jsonTypeInfo, cancellationToken);
startTask.GetAwaiter().GetResult();
return JsonSerializer.SerializeAsync(response.BodyWriter, value, jsonTypeInfo, cancellationToken);

static async Task WriteAsJsonAsyncSlow(HttpResponse response, TValue value, JsonTypeInfo<TValue> jsonTypeInfo)
static async Task WriteAsJsonAsyncSlow(Task startTask, HttpResponse response, TValue value, JsonTypeInfo<TValue> jsonTypeInfo,
bool ignoreOCE, CancellationToken cancellationToken)
{
try
{
await JsonSerializer.SerializeAsync(response.Body, value, jsonTypeInfo, response.HttpContext.RequestAborted);
await startTask;
await JsonSerializer.SerializeAsync(response.BodyWriter, value, jsonTypeInfo, cancellationToken);
}
catch (OperationCanceledException) { }
catch (OperationCanceledException) when (ignoreOCE) { }
}
}

Expand All @@ -161,37 +184,52 @@ public static Task WriteAsJsonAsync(

response.ContentType = contentType ?? JsonConstants.JsonContentTypeWithCharset;

var startTask = Task.CompletedTask;
if (!response.HasStarted)
{
// Flush headers before starting Json serialization. This avoids an extra layer of buffering before the first flush.
startTask = response.StartAsync(cancellationToken);
}

// if no user provided token, pass the RequestAborted token and ignore OperationCanceledException
if (!cancellationToken.CanBeCanceled)
if (!startTask.IsCompleted || !cancellationToken.CanBeCanceled)
{
return WriteAsJsonAsyncSlow(response, value, jsonTypeInfo);
return WriteAsJsonAsyncSlow(startTask, response, value, jsonTypeInfo,
ignoreOCE: !cancellationToken.CanBeCanceled,
cancellationToken.CanBeCanceled ? cancellationToken : response.HttpContext.RequestAborted);
}

return JsonSerializer.SerializeAsync(response.Body, value, jsonTypeInfo, cancellationToken);
startTask.GetAwaiter().GetResult();
return JsonSerializer.SerializeAsync(response.BodyWriter, value, jsonTypeInfo, cancellationToken);

static async Task WriteAsJsonAsyncSlow(HttpResponse response, object? value, JsonTypeInfo jsonTypeInfo)
static async Task WriteAsJsonAsyncSlow(Task startTask, HttpResponse response, object? value, JsonTypeInfo jsonTypeInfo,
bool ignoreOCE, CancellationToken cancellationToken)
{
try
{
await JsonSerializer.SerializeAsync(response.Body, value, jsonTypeInfo, response.HttpContext.RequestAborted);
await startTask;
await JsonSerializer.SerializeAsync(response.BodyWriter, value, jsonTypeInfo, cancellationToken);
}
catch (OperationCanceledException) { }
catch (OperationCanceledException) when (ignoreOCE) { }
}
}

[RequiresUnreferencedCode(RequiresUnreferencedCodeMessage)]
[RequiresDynamicCode(RequiresDynamicCodeMessage)]
private static async Task WriteAsJsonAsyncSlow<TValue>(
Stream body,
Task startTask,
PipeWriter body,
TValue value,
JsonSerializerOptions? options,
bool ignoreOCE,
CancellationToken cancellationToken)
{
try
{
await startTask;
await JsonSerializer.SerializeAsync(body, value, options, cancellationToken);
}
catch (OperationCanceledException) { }
catch (OperationCanceledException) when (ignoreOCE) { }
}

/// <summary>
Expand Down Expand Up @@ -266,29 +304,42 @@ public static Task WriteAsJsonAsync(

response.ContentType = contentType ?? JsonConstants.JsonContentTypeWithCharset;

var startTask = Task.CompletedTask;
if (!response.HasStarted)
{
// Flush headers before starting Json serialization. This avoids an extra layer of buffering before the first flush.
startTask = response.StartAsync(cancellationToken);
}

// if no user provided token, pass the RequestAborted token and ignore OperationCanceledException
if (!cancellationToken.CanBeCanceled)
if (!startTask.IsCompleted || !cancellationToken.CanBeCanceled)
{
return WriteAsJsonAsyncSlow(response.Body, value, type, options, response.HttpContext.RequestAborted);
return WriteAsJsonAsyncSlow(startTask, response.BodyWriter, value, type, options,
ignoreOCE: !cancellationToken.CanBeCanceled,
cancellationToken.CanBeCanceled ? cancellationToken : response.HttpContext.RequestAborted);
}

return JsonSerializer.SerializeAsync(response.Body, value, type, options, cancellationToken);
startTask.GetAwaiter().GetResult();
return JsonSerializer.SerializeAsync(response.BodyWriter, value, type, options, cancellationToken);
}

[RequiresUnreferencedCode(RequiresUnreferencedCodeMessage)]
[RequiresDynamicCode(RequiresDynamicCodeMessage)]
private static async Task WriteAsJsonAsyncSlow(
Stream body,
Task startTask,
PipeWriter body,
object? value,
Type type,
JsonSerializerOptions? options,
bool ignoreOCE,
CancellationToken cancellationToken)
{
try
{
await startTask;
await JsonSerializer.SerializeAsync(body, value, type, options, cancellationToken);
}
catch (OperationCanceledException) { }
catch (OperationCanceledException) when (ignoreOCE) { }
}

/// <summary>
Expand Down Expand Up @@ -316,21 +367,33 @@ public static Task WriteAsJsonAsync(

response.ContentType = contentType ?? JsonConstants.JsonContentTypeWithCharset;

var startTask = Task.CompletedTask;
if (!response.HasStarted)
{
// Flush headers before starting Json serialization. This avoids an extra layer of buffering before the first flush.
startTask = response.StartAsync(cancellationToken);
}

// if no user provided token, pass the RequestAborted token and ignore OperationCanceledException
if (!cancellationToken.CanBeCanceled)
if (!startTask.IsCompleted || !cancellationToken.CanBeCanceled)
{
return WriteAsJsonAsyncSlow();
return WriteAsJsonAsyncSlow(startTask, response.BodyWriter, value, type, context,
ignoreOCE: !cancellationToken.CanBeCanceled,
cancellationToken.CanBeCanceled ? cancellationToken : response.HttpContext.RequestAborted);
}

return JsonSerializer.SerializeAsync(response.Body, value, type, context, cancellationToken);
startTask.GetAwaiter().GetResult();
return JsonSerializer.SerializeAsync(response.BodyWriter, value, type, context, cancellationToken);

async Task WriteAsJsonAsyncSlow()
static async Task WriteAsJsonAsyncSlow(Task startTask, PipeWriter body, object? value, Type type, JsonSerializerContext context,
bool ignoreOCE, CancellationToken cancellationToken)
{
try
{
await JsonSerializer.SerializeAsync(response.Body, value, type, context, cancellationToken);
await startTask;
await JsonSerializer.SerializeAsync(body, value, type, context, cancellationToken);
}
catch (OperationCanceledException) { }
catch (OperationCanceledException) when (ignoreOCE) { }
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -494,10 +494,7 @@ private class TestStream : Stream
public override long Length { get; }
public override long Position { get; set; }

public override void Flush()
{
throw new NotImplementedException();
}
public override void Flush() { }

public override int Read(byte[] buffer, int offset, int count)
{
Expand Down
14 changes: 9 additions & 5 deletions src/Mvc/Mvc.Core/src/Formatters/SystemTextJsonOutputFormatter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -83,21 +83,25 @@ public sealed override async Task WriteResponseBodyAsync(OutputFormatterWriteCon
}
}

var responseStream = httpContext.Response.Body;
if (selectedEncoding.CodePage == Encoding.UTF8.CodePage)
{
try
{
var responseWriter = httpContext.Response.BodyWriter;
if (!httpContext.Response.HasStarted)
{
// Flush headers before starting Json serialization. This avoids an extra layer of buffering before the first flush.
await httpContext.Response.StartAsync();
}

if (jsonTypeInfo is not null)
{
await JsonSerializer.SerializeAsync(responseStream, context.Object, jsonTypeInfo, httpContext.RequestAborted);
await JsonSerializer.SerializeAsync(responseWriter, context.Object, jsonTypeInfo, httpContext.RequestAborted);
}
else
{
await JsonSerializer.SerializeAsync(responseStream, context.Object, SerializerOptions, httpContext.RequestAborted);
await JsonSerializer.SerializeAsync(responseWriter, context.Object, SerializerOptions, httpContext.RequestAborted);
}

await responseStream.FlushAsync(httpContext.RequestAborted);
}
catch (OperationCanceledException) when (context.HttpContext.RequestAborted.IsCancellationRequested) { }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,18 @@ public async Task ExecuteAsync(ActionContext context, JsonResult result)
var objectType = value?.GetType() ?? typeof(object);

// Keep this code in sync with SystemTextJsonOutputFormatter
var responseStream = response.Body;
if (resolvedContentTypeEncoding.CodePage == Encoding.UTF8.CodePage)
{
try
{
await JsonSerializer.SerializeAsync(responseStream, value, objectType, jsonSerializerOptions, context.HttpContext.RequestAborted);
await responseStream.FlushAsync(context.HttpContext.RequestAborted);
var responseWriter = response.BodyWriter;
if (!response.HasStarted)
{
// Flush headers before starting Json serialization. This avoids an extra layer of buffering before the first flush.
await response.StartAsync();
}

await JsonSerializer.SerializeAsync(responseWriter, value, objectType, jsonSerializerOptions, context.HttpContext.RequestAborted);
}
catch (OperationCanceledException) when (context.HttpContext.RequestAborted.IsCancellationRequested) { }
}
Expand Down
12 changes: 12 additions & 0 deletions src/Servers/Kestrel/Core/src/Internal/Http/Http1OutputProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ internal class Http1OutputProducer : IHttpOutputProducer, IDisposable
// Fields needed to store writes before calling either startAsync or Write/FlushAsync
// These should be cleared by the end of the request
private List<CompletedBuffer>? _completedSegments;
private int _completedSegmentsByteCount;
private Memory<byte> _currentSegment;
private IMemoryOwner<byte>? _currentSegmentOwner;
private int _position;
Expand Down Expand Up @@ -273,6 +274,15 @@ public void Advance(int bytes)
}
}

public long UnflushedBytes
{
get
{
var bytes = _position + _advancedBytesForChunk + _pipeWriter.UnflushedBytes + _completedSegmentsByteCount;
return bytes;
}
}

public void CancelPendingFlush()
{
_pipeWriter.CancelPendingFlush();
Expand Down Expand Up @@ -372,6 +382,7 @@ private void WriteDataWrittenBeforeHeaders(ref BufferWriter<PipeWriter> writer)
segment.Return();
}

_completedSegmentsByteCount = 0;
_completedSegments.Clear();
}

Expand Down Expand Up @@ -730,6 +741,7 @@ private void AddSegment(int sizeHint = 0)
// GetMemory was called. In that case we'll take the current segment and call it "completed", but need to
// ignore any empty space in it.
_completedSegments.Add(new CompletedBuffer(_currentSegmentOwner, _currentSegment, _position));
_completedSegmentsByteCount += _position;
}

if (sizeHint <= _memoryPool.MaxBufferSize)
Expand Down
2 changes: 2 additions & 0 deletions src/Servers/Kestrel/Core/src/Internal/Http/HttpProtocol.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1482,6 +1482,8 @@ public void Advance(int bytes)
}
}

public long UnflushedBytes => Output.UnflushedBytes;

public Memory<byte> GetMemory(int sizeHint = 0)
{
_isLeasedMemoryInvalid = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ public void Abort()
}
}

public override bool CanGetUnflushedBytes => true;
public override long UnflushedBytes => _pipeControl.UnflushedBytes;

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void ValidateState(CancellationToken cancellationToken = default)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ internal interface IHttpOutputProducer
Task WriteDataAsync(ReadOnlySpan<byte> data, CancellationToken cancellationToken);
ValueTask<FlushResult> WriteStreamSuffixAsync();
void Advance(int bytes);
long UnflushedBytes { get; }
Span<byte> GetSpan(int sizeHint = 0);
Memory<byte> GetMemory(int sizeHint = 0);
void CancelPendingFlush();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ internal interface IHttpResponseControl
Memory<byte> GetMemory(int sizeHint = 0);
Span<byte> GetSpan(int sizeHint = 0);
void Advance(int bytes);
long UnflushedBytes { get; }
ValueTask<FlushResult> FlushPipeAsync(CancellationToken cancellationToken);
ValueTask<FlushResult> WritePipeAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken);
void CancelPendingFlush();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,8 @@ public void Advance(int bytes)
}
}

public long UnflushedBytes => _pipeWriter.UnflushedBytes;

public Span<byte> GetSpan(int sizeHint = 0)
{
lock (_dataWriterLock)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ public void Advance(int bytes)
}
}

public long UnflushedBytes => _pipeWriter.UnflushedBytes;

public void CancelPendingFlush()
{
lock (_dataWriterLock)
Expand Down
Loading

0 comments on commit cd24d14

Please sign in to comment.