Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix TestServer from blocking on request stream #15591

Merged
merged 4 commits into from
Oct 31, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 29 additions & 8 deletions src/Hosting/TestHost/src/ClientHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Collections.Generic;
using System.Diagnostics.Contracts;
using System.IO;
using System.IO.Pipelines;
using System.Linq;
using System.Net;
using System.Net.Http;
Expand Down Expand Up @@ -65,8 +66,33 @@ protected override async Task<HttpResponseMessage> SendAsync(
var contextBuilder = new HttpContextBuilder(_application, AllowSynchronousIO, PreserveExecutionContext);

var requestContent = request.Content ?? new StreamContent(Stream.Null);
var body = await requestContent.ReadAsStreamAsync();
contextBuilder.Configure(context =>

// Read content from the request HttpContent into a pipe in a background task. This will allow the request
// delegate to start before the request HttpContent is complete. A background task allows duplex streaming scenarios.
contextBuilder.SendRequestStream(async writer =>
{
if (requestContent is StreamContent)
{
// This is odd but required for backwards compat. If StreamContent is passed in then seek to beginning.
// This is safe because StreamContent.ReadAsStreamAsync doesn't block. It will return the inner stream.
var body = await requestContent.ReadAsStreamAsync();
if (body.CanSeek)
{
// This body may have been consumed before, rewind it.
body.Seek(0, SeekOrigin.Begin);
}

await body.CopyToAsync(writer);
}
else
{
await requestContent.CopyToAsync(writer.AsStream());
}

await writer.CompleteAsync();
});

contextBuilder.Configure((context, reader) =>
{
var req = context.Request;

Expand Down Expand Up @@ -115,12 +141,7 @@ protected override async Task<HttpResponseMessage> SendAsync(
}
}

if (body.CanSeek)
{
// This body may have been consumed before, rewind it.
body.Seek(0, SeekOrigin.Begin);
}
req.Body = new AsyncStreamWrapper(body, () => contextBuilder.AllowSynchronousIO);
req.Body = new AsyncStreamWrapper(reader.AsStream(), () => contextBuilder.AllowSynchronousIO);
});

var response = new HttpResponseMessage();
Expand Down
66 changes: 60 additions & 6 deletions src/Hosting/TestHost/src/HttpContextBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ internal class HttpContextBuilder : IHttpBodyControlFeature, IHttpResetFeature
private bool _pipelineFinished;
private bool _returningResponse;
private object _testContext;
private Pipe _requestPipe;

private Action<HttpContext> _responseReadCompleteCallback;
private Task _sendRequestStreamTask;

internal HttpContextBuilder(ApplicationWrapper application, bool allowSynchronousIO, bool preserveExecutionContext)
{
Expand All @@ -41,9 +44,11 @@ internal HttpContextBuilder(ApplicationWrapper application, bool allowSynchronou
request.Protocol = "HTTP/1.1";
request.Method = HttpMethods.Get;

var pipe = new Pipe();
_responseReaderStream = new ResponseBodyReaderStream(pipe, ClientInitiatedAbort, () => _responseReadCompleteCallback?.Invoke(_httpContext));
_responsePipeWriter = new ResponseBodyPipeWriter(pipe, ReturnResponseMessageAsync);
_requestPipe = new Pipe();

var responsePipe = new Pipe();
_responseReaderStream = new ResponseBodyReaderStream(responsePipe, ClientInitiatedAbort, () => _responseReadCompleteCallback?.Invoke(_httpContext));
_responsePipeWriter = new ResponseBodyPipeWriter(responsePipe, ReturnResponseMessageAsync);
_responseFeature.Body = new ResponseBodyWriterStream(_responsePipeWriter, () => AllowSynchronousIO);
_responseFeature.BodyWriter = _responsePipeWriter;

Expand All @@ -56,14 +61,24 @@ internal HttpContextBuilder(ApplicationWrapper application, bool allowSynchronou

public bool AllowSynchronousIO { get; set; }

internal void Configure(Action<HttpContext> configureContext)
internal void Configure(Action<HttpContext, PipeReader> configureContext)
{
if (configureContext == null)
{
throw new ArgumentNullException(nameof(configureContext));
}

configureContext(_httpContext);
configureContext(_httpContext, _requestPipe.Reader);
}

internal void SendRequestStream(Func<PipeWriter, Task> sendRequestStream)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The layering still seems odd here. Only ClientHandler needs/uses the request body pipe and _sendRequestStreamTask, why doesn't it own those? What it needs from HttpContextBuilder is a callback for CompleteRequestAsync and/or cancel.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did it this way because the alternative is callbacks everywhere:

  1. After request is complete
  2. Request is aborted from server
  3. Request is aborted from client

Sure HttpContextBuilder owns the request pipe and task, but using it is optional.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea I don’t think this layering is super important

{
if (sendRequestStream == null)
{
throw new ArgumentNullException(nameof(sendRequestStream));
}

_sendRequestStreamTask = sendRequestStream(_requestPipe.Writer);
}

internal void RegisterResponseReadCompleteCallback(Action<HttpContext> responseReadCompleteCallback)
Expand Down Expand Up @@ -92,10 +107,10 @@ async Task RunRequestAsync()
// since we are now inside the Server's execution context. If it happens outside this cont
// it will be lost when we abandon the execution context.
_testContext = _application.CreateContext(_httpContext.Features);

try
{
await _application.ProcessRequestAsync(_testContext);
await CompleteRequestAsync();
await CompleteResponseAsync();
_application.DisposeContext(_testContext, exception: null);
}
Expand Down Expand Up @@ -134,8 +149,40 @@ internal void ClientInitiatedAbort()
// We don't want to trigger the token for already completed responses.
_requestLifetimeFeature.Cancel();
}

// Writes will still succeed, the app will only get an error if they check the CT.
_responseReaderStream.Abort(new IOException("The client aborted the request."));

// Cancel any pending request async activity when the client aborts a duplex
// streaming scenario by disposing the HttpResponseMessage.
CancelRequestBody();
}

private async Task CompleteRequestAsync()
{
if (!_requestPipe.Reader.TryRead(out var result) || !result.IsCompleted)
{
// If request is still in progress then abort it.
CancelRequestBody();
}
else
{
// Writer was already completed in send request callback.
await _requestPipe.Reader.CompleteAsync();
}

if (_sendRequestStreamTask != null)
{
try
{
// Ensure duplex request is either completely read or has been aborted.
await _sendRequestStreamTask;
}
catch (OperationCanceledException)
{
// Request was canceled, likely because it wasn't read before the request ended.
}
}
}

internal async Task CompleteResponseAsync()
Expand Down Expand Up @@ -192,6 +239,13 @@ internal void Abort(Exception exception)
_responseReaderStream.Abort(exception);
_requestLifetimeFeature.Cancel();
_responseTcs.TrySetException(exception);
CancelRequestBody();
}

private void CancelRequestBody()
{
_requestPipe.Writer.CancelPendingFlush();
_requestPipe.Reader.CancelPendingRead();
}

void IHttpResetFeature.Reset(int errorCode)
Expand Down
4 changes: 2 additions & 2 deletions src/Hosting/TestHost/src/TestServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public async Task<HttpContext> SendAsync(Action<HttpContext> configureContext, C
}

var builder = new HttpContextBuilder(Application, AllowSynchronousIO, PreserveExecutionContext);
builder.Configure(context =>
builder.Configure((context, reader) =>
{
var request = context.Request;
request.Scheme = BaseAddress.Scheme;
Expand All @@ -154,7 +154,7 @@ public async Task<HttpContext> SendAsync(Action<HttpContext> configureContext, C
}
request.PathBase = pathBase;
});
builder.Configure(configureContext);
builder.Configure((context, reader) => configureContext(context));
// TODO: Wrap the request body if any?
return await builder.SendAsync(cancellationToken).ConfigureAwait(false);
}
Expand Down
2 changes: 1 addition & 1 deletion src/Hosting/TestHost/src/WebSocketClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public async Task<WebSocket> ConnectAsync(Uri uri, CancellationToken cancellatio
{
WebSocketFeature webSocketFeature = null;
var contextBuilder = new HttpContextBuilder(_application, AllowSynchronousIO, PreserveExecutionContext);
contextBuilder.Configure(context =>
contextBuilder.Configure((context, reader) =>
{
var request = context.Request;
var scheme = uri.Scheme;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@
<TargetFramework>$(DefaultNetCoreTargetFramework)</TargetFramework>
</PropertyGroup>

<ItemGroup>
<Compile Include="..\..\..\Shared\SyncPoint\SyncPoint.cs" Link="SyncPoint.cs" />
</ItemGroup>

<ItemGroup>
<Reference Include="Microsoft.AspNetCore.TestHost" />
<Reference Include="Microsoft.Extensions.DiagnosticAdapter" />
Expand Down
Loading