From 2ee8f82bcfe58903ae56cdf078176f89f7abeb38 Mon Sep 17 00:00:00 2001 From: James Newton-King Date: Sun, 27 Oct 2019 16:27:49 +1300 Subject: [PATCH 1/4] Support duplex streaming in TestServer --- src/Hosting/TestHost/src/ClientHandler.cs | 37 ++- .../TestHost/src/HttpContextBuilder.cs | 50 ++- src/Hosting/TestHost/src/TestServer.cs | 4 +- src/Hosting/TestHost/src/WebSocketClient.cs | 2 +- ...Microsoft.AspNetCore.TestHost.Tests.csproj | 4 + src/Hosting/TestHost/test/TestClientTests.cs | 305 +++++++++++++++++- src/Hosting/TestHost/test/Utilities.cs | 17 +- 7 files changed, 383 insertions(+), 36 deletions(-) diff --git a/src/Hosting/TestHost/src/ClientHandler.cs b/src/Hosting/TestHost/src/ClientHandler.cs index 14b26869792a..86a425465e8b 100644 --- a/src/Hosting/TestHost/src/ClientHandler.cs +++ b/src/Hosting/TestHost/src/ClientHandler.cs @@ -5,6 +5,7 @@ using System.Collections.Generic; using System.Diagnostics.Contracts; using System.IO; +using System.IO.Pipelines; using System.Linq; using System.Net; using System.Net.Http; @@ -65,8 +66,33 @@ protected override async Task SendAsync( var contextBuilder = new HttpContextBuilder(_application, AllowSynchronousIO, PreserveExecutionContext); var requestContent = request.Content ?? new StreamContent(Stream.Null); - var body = await requestContent.ReadAsStreamAsync(); - contextBuilder.Configure(context => + + // Read content from the request HttpContent into a pipe in a background task. This will allow the request + // delegate to start before the request HttpContent is complete. A background task allows duplex streaming scenarios. + contextBuilder.SendRequestStream(async writer => + { + if (requestContent is StreamContent) + { + // This is odd but required for backwards compat. If StreamContent is passed in then seek to beginning. + // This is safe because StreamContent.ReadAsStreamAsync doesn't block. It will return the inner stream. + var body = await requestContent.ReadAsStreamAsync(); + if (body.CanSeek) + { + // This body may have been consumed before, rewind it. + body.Seek(0, SeekOrigin.Begin); + } + + await body.CopyToAsync(writer); + } + else + { + await requestContent.CopyToAsync(writer.AsStream()); + } + + await writer.CompleteAsync(); + }); + + contextBuilder.Configure((context, reader) => { var req = context.Request; @@ -115,12 +141,7 @@ protected override async Task SendAsync( } } - if (body.CanSeek) - { - // This body may have been consumed before, rewind it. - body.Seek(0, SeekOrigin.Begin); - } - req.Body = new AsyncStreamWrapper(body, () => contextBuilder.AllowSynchronousIO); + req.Body = new AsyncStreamWrapper(reader.AsStream(), () => contextBuilder.AllowSynchronousIO); }); var response = new HttpResponseMessage(); diff --git a/src/Hosting/TestHost/src/HttpContextBuilder.cs b/src/Hosting/TestHost/src/HttpContextBuilder.cs index 2e5e25bb6f15..60cb8cdc50f5 100644 --- a/src/Hosting/TestHost/src/HttpContextBuilder.cs +++ b/src/Hosting/TestHost/src/HttpContextBuilder.cs @@ -26,7 +26,10 @@ internal class HttpContextBuilder : IHttpBodyControlFeature, IHttpResetFeature private bool _pipelineFinished; private bool _returningResponse; private object _testContext; + private Pipe _requestPipe; + private Action _responseReadCompleteCallback; + private Task _sendRequestStreamTask; internal HttpContextBuilder(ApplicationWrapper application, bool allowSynchronousIO, bool preserveExecutionContext) { @@ -41,9 +44,11 @@ internal HttpContextBuilder(ApplicationWrapper application, bool allowSynchronou request.Protocol = "HTTP/1.1"; request.Method = HttpMethods.Get; - var pipe = new Pipe(); - _responseReaderStream = new ResponseBodyReaderStream(pipe, ClientInitiatedAbort, () => _responseReadCompleteCallback?.Invoke(_httpContext)); - _responsePipeWriter = new ResponseBodyPipeWriter(pipe, ReturnResponseMessageAsync); + _requestPipe = new Pipe(); + + var responsePipe = new Pipe(); + _responseReaderStream = new ResponseBodyReaderStream(responsePipe, ClientInitiatedAbort, () => _responseReadCompleteCallback?.Invoke(_httpContext)); + _responsePipeWriter = new ResponseBodyPipeWriter(responsePipe, ReturnResponseMessageAsync); _responseFeature.Body = new ResponseBodyWriterStream(_responsePipeWriter, () => AllowSynchronousIO); _responseFeature.BodyWriter = _responsePipeWriter; @@ -56,14 +61,24 @@ internal HttpContextBuilder(ApplicationWrapper application, bool allowSynchronou public bool AllowSynchronousIO { get; set; } - internal void Configure(Action configureContext) + internal void Configure(Action configureContext) { if (configureContext == null) { throw new ArgumentNullException(nameof(configureContext)); } - configureContext(_httpContext); + configureContext(_httpContext, _requestPipe.Reader); + } + + internal void SendRequestStream(Func sendRequestStream) + { + if (sendRequestStream == null) + { + throw new ArgumentNullException(nameof(sendRequestStream)); + } + + _sendRequestStreamTask = sendRequestStream(_requestPipe.Writer); } internal void RegisterResponseReadCompleteCallback(Action responseReadCompleteCallback) @@ -92,10 +107,10 @@ async Task RunRequestAsync() // since we are now inside the Server's execution context. If it happens outside this cont // it will be lost when we abandon the execution context. _testContext = _application.CreateContext(_httpContext.Features); - try { await _application.ProcessRequestAsync(_testContext); + await CompleteRequestAsync(); await CompleteResponseAsync(); _application.DisposeContext(_testContext, exception: null); } @@ -134,8 +149,24 @@ internal void ClientInitiatedAbort() // We don't want to trigger the token for already completed responses. _requestLifetimeFeature.Cancel(); } + // Writes will still succeed, the app will only get an error if they check the CT. _responseReaderStream.Abort(new IOException("The client aborted the request.")); + + // Cancel any pending request async activity when the client aborts a duplex + // streaming scenario by disposing the HttpResponseMessage. + AbortRequest(); + } + + private async Task CompleteRequestAsync() + { + if (_sendRequestStreamTask != null) + { + await _sendRequestStreamTask; + } + + await _requestPipe.Writer.CompleteAsync(); + await _requestPipe.Reader.CompleteAsync(); } internal async Task CompleteResponseAsync() @@ -192,6 +223,13 @@ internal void Abort(Exception exception) _responseReaderStream.Abort(exception); _requestLifetimeFeature.Cancel(); _responseTcs.TrySetException(exception); + AbortRequest(); + } + + private void AbortRequest() + { + _requestPipe.Writer.CancelPendingFlush(); + _requestPipe.Reader.CancelPendingRead(); } void IHttpResetFeature.Reset(int errorCode) diff --git a/src/Hosting/TestHost/src/TestServer.cs b/src/Hosting/TestHost/src/TestServer.cs index de10ff0081e1..1fe56ec9191a 100644 --- a/src/Hosting/TestHost/src/TestServer.cs +++ b/src/Hosting/TestHost/src/TestServer.cs @@ -138,7 +138,7 @@ public async Task SendAsync(Action configureContext, C } var builder = new HttpContextBuilder(Application, AllowSynchronousIO, PreserveExecutionContext); - builder.Configure(context => + builder.Configure((context, reader) => { var request = context.Request; request.Scheme = BaseAddress.Scheme; @@ -154,7 +154,7 @@ public async Task SendAsync(Action configureContext, C } request.PathBase = pathBase; }); - builder.Configure(configureContext); + builder.Configure((context, reader) => configureContext(context)); // TODO: Wrap the request body if any? return await builder.SendAsync(cancellationToken).ConfigureAwait(false); } diff --git a/src/Hosting/TestHost/src/WebSocketClient.cs b/src/Hosting/TestHost/src/WebSocketClient.cs index d312c0ebbd6e..7cf116cd7a75 100644 --- a/src/Hosting/TestHost/src/WebSocketClient.cs +++ b/src/Hosting/TestHost/src/WebSocketClient.cs @@ -51,7 +51,7 @@ public async Task ConnectAsync(Uri uri, CancellationToken cancellatio { WebSocketFeature webSocketFeature = null; var contextBuilder = new HttpContextBuilder(_application, AllowSynchronousIO, PreserveExecutionContext); - contextBuilder.Configure(context => + contextBuilder.Configure((context, reader) => { var request = context.Request; var scheme = uri.Scheme; diff --git a/src/Hosting/TestHost/test/Microsoft.AspNetCore.TestHost.Tests.csproj b/src/Hosting/TestHost/test/Microsoft.AspNetCore.TestHost.Tests.csproj index bc32f814ab5b..6f77fe58d6e6 100644 --- a/src/Hosting/TestHost/test/Microsoft.AspNetCore.TestHost.Tests.csproj +++ b/src/Hosting/TestHost/test/Microsoft.AspNetCore.TestHost.Tests.csproj @@ -4,6 +4,10 @@ $(DefaultNetCoreTargetFramework) + + + + diff --git a/src/Hosting/TestHost/test/TestClientTests.cs b/src/Hosting/TestHost/test/TestClientTests.cs index 322ab54a6f8b..53e7952fc705 100644 --- a/src/Hosting/TestHost/test/TestClientTests.cs +++ b/src/Hosting/TestHost/test/TestClientTests.cs @@ -5,6 +5,7 @@ using System.Globalization; using System.IO; using System.Linq; +using System.Net; using System.Net.Http; using System.Net.WebSockets; using System.Text; @@ -13,6 +14,7 @@ using Microsoft.AspNetCore.Builder; using Microsoft.AspNetCore.Hosting; using Microsoft.AspNetCore.Http; +using Microsoft.AspNetCore.Internal; using Microsoft.AspNetCore.Testing; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; @@ -89,17 +91,20 @@ public async Task PutAsyncWorks() { // Arrange RequestDelegate appDelegate = async ctx => - await ctx.Response.WriteAsync(await new StreamReader(ctx.Request.Body).ReadToEndAsync() + " PUT Response"); + { + var content = await new StreamReader(ctx.Request.Body).ReadToEndAsync(); + await ctx.Response.WriteAsync(content + " PUT Response"); + }; var builder = new WebHostBuilder().Configure(app => app.Run(appDelegate)); var server = new TestServer(builder); var client = server.CreateClient(); // Act var content = new StringContent("Hello world"); - var response = await client.PutAsync("http://localhost:12345", content); + var response = await client.PutAsync("http://localhost:12345", content).WithTimeout(); // Assert - Assert.Equal("Hello world PUT Response", await response.Content.ReadAsStringAsync()); + Assert.Equal("Hello world PUT Response", await response.Content.ReadAsStringAsync().WithTimeout()); } [Fact] @@ -114,10 +119,10 @@ public async Task PostAsyncWorks() // Act var content = new StringContent("Hello world"); - var response = await client.PostAsync("http://localhost:12345", content); + var response = await client.PostAsync("http://localhost:12345", content).WithTimeout(); // Assert - Assert.Equal("Hello world POST Response", await response.Content.ReadAsStringAsync()); + Assert.Equal("Hello world POST Response", await response.Content.ReadAsStringAsync().WithTimeout()); } [Fact] @@ -162,6 +167,296 @@ public void Dispose() } } + [Fact] + public async Task ClientStreamingWorks() + { + // Arrange + var responseStartedSyncPoint = new SyncPoint(); + var requestEndingSyncPoint = new SyncPoint(); + var requestStreamSyncPoint = new SyncPoint(); + + RequestDelegate appDelegate = async ctx => + { + // Send headers + await ctx.Response.BodyWriter.FlushAsync(); + + // Ensure headers received by client + await responseStartedSyncPoint.WaitToContinue(); + + await ctx.Response.WriteAsync("STARTED"); + + // ReadToEndAsync will wait until request body is complete + var requestString = await new StreamReader(ctx.Request.Body).ReadToEndAsync(); + await ctx.Response.WriteAsync(requestString + " POST Response"); + + await requestEndingSyncPoint.WaitToContinue(); + }; + + Stream requestStream = null; + + var builder = new WebHostBuilder().Configure(app => app.Run(appDelegate)); + var server = new TestServer(builder); + var client = server.CreateClient(); + + var httpRequest = new HttpRequestMessage(HttpMethod.Post, "http://localhost:12345"); + httpRequest.Version = new Version(2, 0); + httpRequest.Content = new PushContent(async stream => + { + // Initial flush to ensure headers are sent + await stream.FlushAsync(); + + requestStream = stream; + await requestStreamSyncPoint.WaitToContinue(); + }); + + // Act + var response = await client.SendAsync(httpRequest, HttpCompletionOption.ResponseHeadersRead).WithTimeout(); + + await responseStartedSyncPoint.WaitForSyncPoint().WithTimeout(); + responseStartedSyncPoint.Continue(); + + var responseContent = await response.Content.ReadAsStreamAsync().WithTimeout(); + + // Assert + + // Ensure request stream has started + await requestStreamSyncPoint.WaitForSyncPoint(); + + byte[] buffer = new byte[1024]; + var length = await responseContent.ReadAsync(buffer).AsTask().WithTimeout(); + Assert.Equal("STARTED", Encoding.UTF8.GetString(buffer, 0, length)); + + // Send content and finish request body + await requestStream.WriteAsync(Encoding.UTF8.GetBytes("Hello world")).AsTask().WithTimeout(); + await requestStream.FlushAsync().WithTimeout(); + requestStreamSyncPoint.Continue(); + + // Ensure content is received while request is in progress + length = await responseContent.ReadAsync(buffer).AsTask().WithTimeout(); + Assert.Equal("Hello world POST Response", Encoding.UTF8.GetString(buffer, 0, length)); + + // Request is ending + await requestEndingSyncPoint.WaitForSyncPoint().WithTimeout(); + requestEndingSyncPoint.Continue(); + + // No more response content + length = await responseContent.ReadAsync(buffer).AsTask().WithTimeout(); + Assert.Equal(0, length); + } + + [Fact] + public async Task ClientStreaming_Cancellation() + { + // Arrange + var responseStartedSyncPoint = new SyncPoint(); + var responseReadSyncPoint = new SyncPoint(); + var responseEndingSyncPoint = new SyncPoint(); + var requestStreamSyncPoint = new SyncPoint(); + var readCanceled = false; + + RequestDelegate appDelegate = async ctx => + { + // Send headers + await ctx.Response.BodyWriter.FlushAsync(); + + // Ensure headers received by client + await responseStartedSyncPoint.WaitToContinue(); + + var serverBuffer = new byte[1024]; + var serverLength = await ctx.Request.Body.ReadAsync(serverBuffer); + + Assert.Equal("SENT", Encoding.UTF8.GetString(serverBuffer, 0, serverLength)); + + await responseReadSyncPoint.WaitToContinue(); + + try + { + await ctx.Request.Body.ReadAsync(serverBuffer); + } + catch (OperationCanceledException) + { + readCanceled = true; + } + + await responseEndingSyncPoint.WaitToContinue(); + }; + + Stream requestStream = null; + + var builder = new WebHostBuilder().Configure(app => app.Run(appDelegate)); + var server = new TestServer(builder); + var client = server.CreateClient(); + + var httpRequest = new HttpRequestMessage(HttpMethod.Post, "http://localhost:12345"); + httpRequest.Version = new Version(2, 0); + httpRequest.Content = new PushContent(async stream => + { + // Initial flush to ensure headers are sent + await stream.FlushAsync(); + + requestStream = stream; + await requestStreamSyncPoint.WaitToContinue(); + }); + + // Act + var response = await client.SendAsync(httpRequest, HttpCompletionOption.ResponseHeadersRead).WithTimeout(); + + await responseStartedSyncPoint.WaitForSyncPoint().WithTimeout(); + responseStartedSyncPoint.Continue(); + + var responseContent = await response.Content.ReadAsStreamAsync().WithTimeout(); + + // Assert + + // Ensure request stream has started + await requestStreamSyncPoint.WaitForSyncPoint(); + + // Write to request + await requestStream.WriteAsync(Encoding.UTF8.GetBytes("SENT")).AsTask().WithTimeout(); + await requestStream.FlushAsync().WithTimeout(); + await responseReadSyncPoint.WaitForSyncPoint().WithTimeout(); + + // Cancel request. Disposing response must be used because SendAsync has finished. + response.Dispose(); + responseReadSyncPoint.Continue(); + + await responseEndingSyncPoint.WaitForSyncPoint().WithTimeout(); + responseEndingSyncPoint.Continue(); + + Assert.True(readCanceled); + + requestStreamSyncPoint.Continue(); + } + + [Fact] + public async Task ClientStreaming_ResponseCompletesWithoutReadingRequest() + { + // Arrange + var requestStreamSyncPoint = new SyncPoint(); + + RequestDelegate appDelegate = async ctx => + { + await ctx.Response.WriteAsync("POST Response"); + }; + + Stream requestStream = null; + + var builder = new WebHostBuilder().Configure(app => app.Run(appDelegate)); + var server = new TestServer(builder); + var client = server.CreateClient(); + + var httpRequest = new HttpRequestMessage(HttpMethod.Post, "http://localhost:12345"); + httpRequest.Version = new Version(2, 0); + httpRequest.Content = new PushContent(async stream => + { + // Initial flush to ensure headers are sent + await stream.FlushAsync(); + + requestStream = stream; + await requestStreamSyncPoint.WaitToContinue(); + }); + + // Act + var response = await client.SendAsync(httpRequest, HttpCompletionOption.ResponseHeadersRead).WithTimeout(); + + var responseContent = await response.Content.ReadAsStreamAsync().WithTimeout(); + + // Assert + + // Read response + byte[] buffer = new byte[1024]; + var length = await responseContent.ReadAsync(buffer).AsTask().WithTimeout(); + Assert.Equal("POST Response", Encoding.UTF8.GetString(buffer, 0, length)); + + // Ensure request stream has started + await requestStreamSyncPoint.WaitForSyncPoint(); + + // Send content and finish request body + await requestStream.WriteAsync(Encoding.UTF8.GetBytes("Hello world")).AsTask().WithTimeout(); + await requestStream.FlushAsync().WithTimeout(); + requestStreamSyncPoint.Continue(); + + // No more response content + length = await responseContent.ReadAsync(buffer).AsTask().WithTimeout(); + Assert.Equal(0, length); + } + + [Fact] + public async Task ClientStreaming_ServerAbort() + { + // Arrange + var requestStreamSyncPoint = new SyncPoint(); + var responseEndingSyncPoint = new SyncPoint(); + + RequestDelegate appDelegate = async ctx => + { + // Send headers + await ctx.Response.BodyWriter.FlushAsync(); + + ctx.Abort(); + await responseEndingSyncPoint.WaitToContinue(); + }; + + Stream requestStream = null; + + var builder = new WebHostBuilder().Configure(app => app.Run(appDelegate)); + var server = new TestServer(builder); + var client = server.CreateClient(); + + var httpRequest = new HttpRequestMessage(HttpMethod.Post, "http://localhost:12345"); + httpRequest.Version = new Version(2, 0); + httpRequest.Content = new PushContent(async stream => + { + // Initial flush to ensure headers are sent + await stream.FlushAsync(); + + requestStream = stream; + await requestStreamSyncPoint.WaitToContinue(); + }); + + // Act + var response = await client.SendAsync(httpRequest, HttpCompletionOption.ResponseHeadersRead).WithTimeout(); + + var responseContent = await response.Content.ReadAsStreamAsync().WithTimeout(); + + // Assert + + // Ensure server has aborted + await responseEndingSyncPoint.WaitForSyncPoint(); + + // Ensure request stream has started + await requestStreamSyncPoint.WaitForSyncPoint(); + + // Send content and finish request body + await ExceptionAssert.ThrowsAsync( + () => requestStream.WriteAsync(Encoding.UTF8.GetBytes("Hello world")).AsTask(), + "Flush was canceled on underlying PipeWriter.").WithTimeout(); + + responseEndingSyncPoint.Continue(); + requestStreamSyncPoint.Continue(); + } + + private class PushContent : HttpContent + { + private readonly Func _sendContent; + + public PushContent(Func sendContent) + { + _sendContent = sendContent; + } + + protected override Task SerializeToStreamAsync(Stream stream, TransportContext context) + { + return _sendContent(stream); + } + + protected override bool TryComputeLength(out long length) + { + length = -1; + return false; + } + } + [Fact] public async Task WebSocketWorks() { diff --git a/src/Hosting/TestHost/test/Utilities.cs b/src/Hosting/TestHost/test/Utilities.cs index 91603563552c..da5d433f4c2a 100644 --- a/src/Hosting/TestHost/test/Utilities.cs +++ b/src/Hosting/TestHost/test/Utilities.cs @@ -3,6 +3,7 @@ using System; using System.Threading.Tasks; +using Microsoft.AspNetCore.Testing; namespace Microsoft.AspNetCore.TestHost { @@ -10,20 +11,8 @@ internal static class Utilities { internal static readonly TimeSpan DefaultTimeout = TimeSpan.FromSeconds(15); - internal static Task WithTimeout(this Task task) => task.WithTimeout(DefaultTimeout); + internal static Task WithTimeout(this Task task) => task.TimeoutAfter(DefaultTimeout); - internal static async Task WithTimeout(this Task task, TimeSpan timeout) - { - var completedTask = await Task.WhenAny(task, Task.Delay(timeout)); - - if (completedTask == task) - { - return await task; - } - else - { - throw new TimeoutException("The task has timed out."); - } - } + internal static Task WithTimeout(this Task task) => task.TimeoutAfter(DefaultTimeout); } } From 284fc04acc20ada6e98e33b829b6bd948c38dd15 Mon Sep 17 00:00:00 2001 From: James Newton-King Date: Wed, 30 Oct 2019 13:25:46 +1300 Subject: [PATCH 2/4] PR feedback --- .../TestHost/src/HttpContextBuilder.cs | 24 +++++++++-- src/Hosting/TestHost/test/TestClientTests.cs | 40 +++++++++---------- 2 files changed, 40 insertions(+), 24 deletions(-) diff --git a/src/Hosting/TestHost/src/HttpContextBuilder.cs b/src/Hosting/TestHost/src/HttpContextBuilder.cs index 60cb8cdc50f5..edd8e9793b29 100644 --- a/src/Hosting/TestHost/src/HttpContextBuilder.cs +++ b/src/Hosting/TestHost/src/HttpContextBuilder.cs @@ -160,13 +160,29 @@ internal void ClientInitiatedAbort() private async Task CompleteRequestAsync() { - if (_sendRequestStreamTask != null) + if (!_requestPipe.Reader.TryRead(out var result) || !result.IsCompleted) + { + // If request is still in progress then abort it. + AbortRequest(); + } + else { - await _sendRequestStreamTask; + await _requestPipe.Writer.CompleteAsync(); + await _requestPipe.Reader.CompleteAsync(); } - await _requestPipe.Writer.CompleteAsync(); - await _requestPipe.Reader.CompleteAsync(); + if (_sendRequestStreamTask != null) + { + try + { + // Ensure duplex request is either completely read or has been aborted. + await _sendRequestStreamTask; + } + catch (OperationCanceledException) + { + // Request was canceled, likely because it wasn't read before the request ended. + } + } } internal async Task CompleteResponseAsync() diff --git a/src/Hosting/TestHost/test/TestClientTests.cs b/src/Hosting/TestHost/test/TestClientTests.cs index 53e7952fc705..9ae6caebfe32 100644 --- a/src/Hosting/TestHost/test/TestClientTests.cs +++ b/src/Hosting/TestHost/test/TestClientTests.cs @@ -202,9 +202,6 @@ public async Task ClientStreamingWorks() httpRequest.Version = new Version(2, 0); httpRequest.Content = new PushContent(async stream => { - // Initial flush to ensure headers are sent - await stream.FlushAsync(); - requestStream = stream; await requestStreamSyncPoint.WaitToContinue(); }); @@ -291,9 +288,6 @@ public async Task ClientStreaming_Cancellation() httpRequest.Version = new Version(2, 0); httpRequest.Content = new PushContent(async stream => { - // Initial flush to ensure headers are sent - await stream.FlushAsync(); - requestStream = stream; await requestStreamSyncPoint.WaitToContinue(); }); @@ -332,11 +326,13 @@ public async Task ClientStreaming_Cancellation() public async Task ClientStreaming_ResponseCompletesWithoutReadingRequest() { // Arrange - var requestStreamSyncPoint = new SyncPoint(); + var requestStreamTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var responseEndingSyncPoint = new SyncPoint(); RequestDelegate appDelegate = async ctx => { await ctx.Response.WriteAsync("POST Response"); + await responseEndingSyncPoint.WaitToContinue(); }; Stream requestStream = null; @@ -349,11 +345,8 @@ public async Task ClientStreaming_ResponseCompletesWithoutReadingRequest() httpRequest.Version = new Version(2, 0); httpRequest.Content = new PushContent(async stream => { - // Initial flush to ensure headers are sent - await stream.FlushAsync(); - requestStream = stream; - await requestStreamSyncPoint.WaitToContinue(); + await requestStreamTcs.Task; }); // Act @@ -368,17 +361,27 @@ public async Task ClientStreaming_ResponseCompletesWithoutReadingRequest() var length = await responseContent.ReadAsync(buffer).AsTask().WithTimeout(); Assert.Equal("POST Response", Encoding.UTF8.GetString(buffer, 0, length)); - // Ensure request stream has started - await requestStreamSyncPoint.WaitForSyncPoint(); + // Send large content and block on back pressure + var writeTask = Task.Run(async () => + { + try + { + await requestStream.WriteAsync(Encoding.UTF8.GetBytes(new string('!', 1024 * 1024 * 50))).AsTask().WithTimeout(); + requestStreamTcs.SetResult(null); + } + catch (Exception ex) + { + requestStreamTcs.SetException(ex); + } + }); - // Send content and finish request body - await requestStream.WriteAsync(Encoding.UTF8.GetBytes("Hello world")).AsTask().WithTimeout(); - await requestStream.FlushAsync().WithTimeout(); - requestStreamSyncPoint.Continue(); + responseEndingSyncPoint.Continue(); // No more response content length = await responseContent.ReadAsync(buffer).AsTask().WithTimeout(); Assert.Equal(0, length); + + await writeTask; } [Fact] @@ -407,9 +410,6 @@ public async Task ClientStreaming_ServerAbort() httpRequest.Version = new Version(2, 0); httpRequest.Content = new PushContent(async stream => { - // Initial flush to ensure headers are sent - await stream.FlushAsync(); - requestStream = stream; await requestStreamSyncPoint.WaitToContinue(); }); From d5119e644fd1b3f528d3e976a0f65a47d5dc78e8 Mon Sep 17 00:00:00 2001 From: James Newton-King Date: Wed, 30 Oct 2019 22:11:20 +1300 Subject: [PATCH 3/4] PR feedback --- src/Hosting/TestHost/src/HttpContextBuilder.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Hosting/TestHost/src/HttpContextBuilder.cs b/src/Hosting/TestHost/src/HttpContextBuilder.cs index edd8e9793b29..9a478685e577 100644 --- a/src/Hosting/TestHost/src/HttpContextBuilder.cs +++ b/src/Hosting/TestHost/src/HttpContextBuilder.cs @@ -167,7 +167,7 @@ private async Task CompleteRequestAsync() } else { - await _requestPipe.Writer.CompleteAsync(); + // Writer was already completed in send request callback. await _requestPipe.Reader.CompleteAsync(); } From df21112e40d12bc3a26a9fb97343305d6551e16a Mon Sep 17 00:00:00 2001 From: James Newton-King Date: Thu, 31 Oct 2019 08:54:16 +1300 Subject: [PATCH 4/4] PR feedback --- src/Hosting/TestHost/src/HttpContextBuilder.cs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/Hosting/TestHost/src/HttpContextBuilder.cs b/src/Hosting/TestHost/src/HttpContextBuilder.cs index 9a478685e577..aab30590aa38 100644 --- a/src/Hosting/TestHost/src/HttpContextBuilder.cs +++ b/src/Hosting/TestHost/src/HttpContextBuilder.cs @@ -155,7 +155,7 @@ internal void ClientInitiatedAbort() // Cancel any pending request async activity when the client aborts a duplex // streaming scenario by disposing the HttpResponseMessage. - AbortRequest(); + CancelRequestBody(); } private async Task CompleteRequestAsync() @@ -163,7 +163,7 @@ private async Task CompleteRequestAsync() if (!_requestPipe.Reader.TryRead(out var result) || !result.IsCompleted) { // If request is still in progress then abort it. - AbortRequest(); + CancelRequestBody(); } else { @@ -239,10 +239,10 @@ internal void Abort(Exception exception) _responseReaderStream.Abort(exception); _requestLifetimeFeature.Cancel(); _responseTcs.TrySetException(exception); - AbortRequest(); + CancelRequestBody(); } - private void AbortRequest() + private void CancelRequestBody() { _requestPipe.Writer.CancelPendingFlush(); _requestPipe.Reader.CancelPendingRead();