diff --git a/test/FunctionalTests/Client/StreamingTests.cs b/test/FunctionalTests/Client/StreamingTests.cs index d279ed25a..e737ff0d8 100644 --- a/test/FunctionalTests/Client/StreamingTests.cs +++ b/test/FunctionalTests/Client/StreamingTests.cs @@ -350,9 +350,33 @@ writeContext.Exception is InvalidOperationException && return false; }); + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + async Task ClientStreamedData(IAsyncStreamReader requestStream, ServerCallContext context) + { + context.CancellationToken.Register(() => tcs.SetResult(null)); + + var total = 0L; + await foreach (var message in requestStream.ReadAllAsync()) + { + total += message.Data.Length; + + if (message.ServerDelayMilliseconds > 0) + { + await Task.Delay(message.ServerDelayMilliseconds); + } + } + + return new DataComplete + { + Size = total + }; + } + // Arrange var data = CreateTestData(1024 * 64); // 64 KB + var method = Fixture.DynamicGrpc.AddClientStreamingMethod(ClientStreamedData); + var httpClient = Fixture.CreateClient(); httpClient.Timeout = TimeSpan.FromSeconds(0.5); @@ -362,24 +386,21 @@ writeContext.Exception is InvalidOperationException && LoggerFactory = LoggerFactory }); - var client = new StreamService.StreamServiceClient(channel); + var client = TestClientFactory.Create(channel, method); + var dataMessage = new DataMessage { Data = ByteString.CopyFrom(data) }; // Act - var call = client.ClientStreamedData(); + var call = client.ClientStreamingCall(); - var ex = await ExceptionAssert.ThrowsAsync(async () => - { - while (true) - { - await call.RequestStream.WriteAsync(dataMessage).DefaultTimeout(); + await call.RequestStream.WriteAsync(dataMessage).DefaultTimeout(); - await Task.Delay(100); - } - }).DefaultTimeout(); + await tcs.Task.DefaultTimeout(); + + var ex = await ExceptionAssert.ThrowsAsync(() => call.RequestStream.WriteAsync(dataMessage)).DefaultTimeout(); // Assert Assert.AreEqual(StatusCode.Cancelled, ex.StatusCode);