diff --git a/src/IceRpc/Transports/Internal/IdleTimeoutDuplexConnectionDecorator.cs b/src/IceRpc/Internal/IceDuplexConnectionDecorator.cs similarity index 59% rename from src/IceRpc/Transports/Internal/IdleTimeoutDuplexConnectionDecorator.cs rename to src/IceRpc/Internal/IceDuplexConnectionDecorator.cs index 031c1bf95c..e75158a817 100644 --- a/src/IceRpc/Transports/Internal/IdleTimeoutDuplexConnectionDecorator.cs +++ b/src/IceRpc/Internal/IceDuplexConnectionDecorator.cs @@ -1,23 +1,31 @@ // Copyright (c) ZeroC, Inc. +using IceRpc.Transports; using System.Buffers; using System.Diagnostics; -namespace IceRpc.Transports.Internal; +namespace IceRpc.Internal; /// Decorates to fail if no byte is received for over readIdleTimeout. Also decorates /// to schedule a keep alive action (writeIdleTimeout / 2) after a successful write. Both /// sides of the connection are expected to use the same idle timeouts. -internal class IdleTimeoutDuplexConnectionDecorator : IDuplexConnection +internal class IceDuplexConnectionDecorator : IDuplexConnection { private readonly IDuplexConnection _decoratee; - private Timer? _keepAliveTimer; + private readonly Timer _writerTimer; private readonly CancellationTokenSource _readCts = new(); - private TimeSpan _readIdleTimeout = Timeout.InfiniteTimeSpan; - private TimeSpan _writeIdleTimeout = Timeout.InfiniteTimeSpan; + private readonly TimeSpan _readIdleTimeout; + private readonly TimeSpan _writeIdleTimeout; - public Task ConnectAsync(CancellationToken cancellationToken) => - _decoratee.ConnectAsync(cancellationToken); + public async Task ConnectAsync(CancellationToken cancellationToken) + { + TransportConnectionInformation connectionInformation = await _decoratee.ConnectAsync(cancellationToken) + .ConfigureAwait(false); + + // Schedule or reschedule a keep alive after a successful connection establishment. + ResetWriteTimer(); + return connectionInformation; + } public void Dispose() { @@ -25,7 +33,7 @@ public void Dispose() _readCts.Dispose(); // Using Dispose is fine, there's no need to wait for the keep alive action to terminate if it's running. - _keepAliveTimer?.Dispose(); + _writerTimer.Dispose(); } public ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken) @@ -72,50 +80,30 @@ async ValueTask PerformWriteAsync() { await _decoratee.WriteAsync(buffer, cancellationToken).ConfigureAwait(false); - // After each successful write, we schedule one ping (keep alive or heartbeat) at _writeIdleTimeout / 2 in - // the future. Since each ping is itself a write, if there is no application activity at all, we'll send - // successive pings at _writeIdleTimeout / 2 intervals. - ScheduleKeepAlive(); + // After each successful write, we (re)schedule one ping (heartbeat) at _writeIdleTimeout / 2 in the future. + // Since each ping is itself a write, if there is no application activity at all, we'll send successive + // pings at _writeIdleTimeout / 2 intervals. + ResetWriteTimer(); } } - /// Constructs a decorator that does nothing until it is enabled by a call to . - /// - internal IdleTimeoutDuplexConnectionDecorator(IDuplexConnection decoratee) => _decoratee = decoratee; - /// Constructs a decorator that ensures a call to will fail after readIdleTimeout. - /// This decorator also schedules a keepAliveAction after each write (see ). - /// - /// Do not call on a decorator constructed with this constructor. - internal IdleTimeoutDuplexConnectionDecorator( + /// This decorator also schedules a keepAliveAction after each write (see ). + internal IceDuplexConnectionDecorator( IDuplexConnection decoratee, TimeSpan readIdleTimeout, TimeSpan writeIdleTimeout, Action keepAliveAction) - : this(decoratee) { Debug.Assert(writeIdleTimeout != Timeout.InfiniteTimeSpan); + _decoratee = decoratee; _readIdleTimeout = readIdleTimeout; // can be infinite i.e. disabled _writeIdleTimeout = writeIdleTimeout; - _keepAliveTimer = new Timer(_ => keepAliveAction()); - } + _writerTimer = new Timer(_ => keepAliveAction()); - /// Enables the read and write idle timeouts; also schedules one keep-alive.. - internal void Enable(TimeSpan idleTimeout, Action? keepAliveAction) - { - Debug.Assert(idleTimeout != Timeout.InfiniteTimeSpan); - Debug.Assert(_keepAliveTimer is null); - - _readIdleTimeout = idleTimeout; - _writeIdleTimeout = idleTimeout; - - if (keepAliveAction is not null) - { - _keepAliveTimer = new Timer(_ => keepAliveAction()); - ScheduleKeepAlive(); - } + // We can't schedule a keep alive right away because the connection is not connected yet. } - /// Schedules one keep alive in writeIdleTimeout / 2. - internal void ScheduleKeepAlive() => _keepAliveTimer?.Change(_writeIdleTimeout / 2, Timeout.InfiniteTimeSpan); + /// Resets the write timer. We send a keep alive when this timer expires. + private void ResetWriteTimer() => _writerTimer.Change(_writeIdleTimeout / 2, Timeout.InfiniteTimeSpan); } diff --git a/src/IceRpc/Internal/IceProtocolConnection.cs b/src/IceRpc/Internal/IceProtocolConnection.cs index 2150c65f1f..51a3b1f0e9 100644 --- a/src/IceRpc/Internal/IceProtocolConnection.cs +++ b/src/IceRpc/Internal/IceProtocolConnection.cs @@ -56,9 +56,6 @@ internal sealed class IceProtocolConnection : IProtocolConnection // A connection refuses invocations when it's disposed, shut down, shutting down or merely "shutdown requested". private bool _refuseInvocations; - // When not null, schedules one keep-alive action in options.IdleTimeout / 2. - private readonly Action? _scheduleKeepAlive; - // Does ShutdownAsync send a close connection frame? private bool _sendCloseConnectionFrame = true; @@ -143,9 +140,6 @@ internal sealed class IceProtocolConnection : IProtocolConnection throw new InvalidDataException( $"Expected '{nameof(IceFrameType.ValidateConnection)}' frame but received frame type '{validateConnectionFrame.FrameType}'."); } - - // Schedules a keep-alive to keep the connection alive now that it's established. - _scheduleKeepAlive?.Invoke(); } } catch (OperationCanceledException) @@ -600,13 +594,11 @@ internal IceProtocolConnection( if (options.IceIdleTimeout != Timeout.InfiniteTimeSpan) { - var duplexConnectionDecorator = new IdleTimeoutDuplexConnectionDecorator( + duplexConnection = new IceDuplexConnectionDecorator( duplexConnection, readIdleTimeout: options.EnableIceIdleCheck ? options.IceIdleTimeout : Timeout.InfiniteTimeSpan, writeIdleTimeout: options.IceIdleTimeout, KeepAlive); - duplexConnection = duplexConnectionDecorator; - _scheduleKeepAlive = duplexConnectionDecorator.ScheduleKeepAlive; } _duplexConnection = duplexConnection; diff --git a/src/IceRpc/Transports/Slic/Internal/SlicConnection.cs b/src/IceRpc/Transports/Slic/Internal/SlicConnection.cs index 25cdd8569d..47a3c42043 100644 --- a/src/IceRpc/Transports/Slic/Internal/SlicConnection.cs +++ b/src/IceRpc/Transports/Slic/Internal/SlicConnection.cs @@ -55,10 +55,9 @@ internal class SlicConnection : IMultiplexedConnection private Task? _connectTask; private readonly CancellationTokenSource _disposedCts = new(); private Task? _disposeTask; - private readonly IDuplexConnection _duplexConnection; + private readonly SlicDuplexConnectionDecorator _duplexConnection; private readonly DuplexConnectionReader _duplexConnectionReader; private readonly SlicDuplexConnectionWriter _duplexConnectionWriter; - private readonly Action _enableIdleTimeoutAndKeepAlive; private bool _isClosed; private ulong? _lastRemoteBidirectionalStreamId; private ulong? _lastRemoteUnidirectionalStreamId; @@ -265,9 +264,7 @@ async Task PerformConnectAsync() if (idleTimeout != Timeout.InfiniteTimeSpan) { - // Only client connections send ping frames when idle to keep the server connection alive. The server - // sends back a Pong frame in turn to keep alive the client connection. - _enableIdleTimeoutAndKeepAlive(idleTimeout, IsServer ? null : KeepAlive); + _duplexConnection.Enable(idleTimeout); } _readFramesTask = ReadFramesAsync(_disposedCts.Token); @@ -317,31 +314,6 @@ async Task PerformConnectAsync() _ => throw new InvalidDataException($"Received unexpected Slic frame: '{frameType}'."), }; - void KeepAlive() - { - // _pendingPongCount can be < 0 if an unexpected pong is received. If it's the case, the connection is being - // torn down and there's no point in sending a ping frame. - if (Interlocked.Increment(ref _pendingPongCount) > 0) - { - try - { - // For now, the Ping frame payload is just a long which is always set to 0. In the future, it could - // be a ping frame type value if the ping frame is used for different purpose (e.g: a KeepAlive or - // RTT ping frame type). - WriteConnectionFrame(FrameType.Ping, new PingBody(0L).Encode); - } - catch (IceRpcException) - { - // Expected if the connection is closed. - } - catch (Exception exception) - { - Debug.Fail($"The Slic keep alive timer failed with an unexpected exception: {exception}"); - throw; - } - } - } - async ValueTask ReadFrameAsync( Func, T> decodeFunc, CancellationToken cancellationToken) @@ -577,10 +549,11 @@ internal SlicConnection( _closedCancellationToken = _closedCts.Token; - var duplexConnectionDecorator = new IdleTimeoutDuplexConnectionDecorator(duplexConnection); - _enableIdleTimeoutAndKeepAlive = duplexConnectionDecorator.Enable; + // Only the client-side sends pings to keep the connection alive when idle timeout (set later) is not infinite. + _duplexConnection = IsServer ? + new SlicDuplexConnectionDecorator(duplexConnection) : + new SlicDuplexConnectionDecorator(duplexConnection, SendReadPing, SendWritePing); - _duplexConnection = duplexConnectionDecorator; _duplexConnectionReader = new DuplexConnectionReader(_duplexConnection, options.Pool, options.MinSegmentSize); _duplexConnectionWriter = new SlicDuplexConnectionWriter( _duplexConnection, @@ -598,6 +571,42 @@ internal SlicConnection( _nextBidirectionalId = 0; _nextUnidirectionalId = 2; } + + void SendPing(long payload) + { + try + { + WriteConnectionFrame(FrameType.Ping, new PingBody(payload).Encode); + } + catch (IceRpcException) + { + // Expected if the connection is closed. + } + catch (Exception exception) + { + Debug.Fail($"The sending of a Ping frame failed with an unexpected exception: {exception}"); + throw; + } + } + + void SendReadPing() + { + // This local function is no-op if there is already a pending Pong. + if (Interlocked.CompareExchange(ref _pendingPongCount, 1, 0) == 0) + { + SendPing(1L); + } + } + + void SendWritePing() + { + // _pendingPongCount can be <= 0 if an unexpected pong is received. If it's the case, the connection is + // being torn down and there's no point in sending a ping frame. + if (Interlocked.Increment(ref _pendingPongCount) > 0) + { + SendPing(0L); + } + } } /// Fills the given writer with stream data received on the connection. @@ -1190,8 +1199,8 @@ async Task ReadPongFrameAsync(int size, CancellationToken cancellationToken) (ref SliceDecoder decoder) => new PongBody(ref decoder), cancellationToken).ConfigureAwait(false); - // For now, we only send a 0 payload value. - if (pongBody.Payload != 0L) + // For now, we only send a 0 or 1 payload value (0 for "write ping" and 1 for "read ping"). + if (pongBody.Payload != 0L && pongBody.Payload != 1L) { throw new InvalidDataException($"Received {nameof(FrameType.Pong)} with unexpected payload."); } diff --git a/src/IceRpc/Transports/Slic/Internal/SlicDuplexConnectionDecorator.cs b/src/IceRpc/Transports/Slic/Internal/SlicDuplexConnectionDecorator.cs new file mode 100644 index 0000000000..8e1c53c5ad --- /dev/null +++ b/src/IceRpc/Transports/Slic/Internal/SlicDuplexConnectionDecorator.cs @@ -0,0 +1,127 @@ +// Copyright (c) ZeroC, Inc. + +using System.Buffers; +using System.Diagnostics; + +namespace IceRpc.Transports.Slic.Internal; + +/// Decorates to fail if no byte is received for over idle timeout. Also optionally +/// decorates both and to schedule pings that prevent both the local +/// and remote idle timers from expiring. +internal class SlicDuplexConnectionDecorator : IDuplexConnection +{ + private readonly IDuplexConnection _decoratee; + private TimeSpan _idleTimeout = Timeout.InfiniteTimeSpan; + private readonly CancellationTokenSource _readCts = new(); + + private readonly Timer? _readTimer; + private readonly Timer? _writeTimer; + + public Task ConnectAsync(CancellationToken cancellationToken) => + _decoratee.ConnectAsync(cancellationToken); + + public void Dispose() + { + _decoratee.Dispose(); + _readCts.Dispose(); + + // Using Dispose is fine, there's no need to wait for the keep alive action to terminate if it's running. + _readTimer?.Dispose(); + _writeTimer?.Dispose(); + } + + public ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken) + { + return _idleTimeout == Timeout.InfiniteTimeSpan ? + _decoratee.ReadAsync(buffer, cancellationToken) : + PerformReadAsync(); + + async ValueTask PerformReadAsync() + { + try + { + using CancellationTokenRegistration _ = cancellationToken.UnsafeRegister( + cts => ((CancellationTokenSource)cts!).Cancel(), + _readCts); + _readCts.CancelAfter(_idleTimeout); // enable idle timeout before reading + + int bytesRead = await _decoratee.ReadAsync(buffer, _readCts.Token).ConfigureAwait(false); + + // After each successful read, we schedule one ping some time in the future. + if (bytesRead > 0) + { + ResetReadTimer(); + } + // When 0, the other side called ShutdownWriteAsync, so there is no point to send a ping since we can't + // get back a pong. + + return bytesRead; + } + catch (OperationCanceledException) + { + cancellationToken.ThrowIfCancellationRequested(); + + throw new IceRpcException( + IceRpcError.ConnectionIdle, + $"The connection did not receive any bytes for over {_idleTimeout.TotalSeconds} s."); + } + finally + { + _readCts.CancelAfter(Timeout.InfiniteTimeSpan); // disable idle timeout if not canceled + } + } + } + + public Task ShutdownWriteAsync(CancellationToken cancellationToken) => + _decoratee.ShutdownWriteAsync(cancellationToken); + + public ValueTask WriteAsync(ReadOnlySequence buffer, CancellationToken cancellationToken) + { + return _idleTimeout == Timeout.InfiniteTimeSpan ? + _decoratee.WriteAsync(buffer, cancellationToken) : + PerformWriteAsync(); + + async ValueTask PerformWriteAsync() + { + await _decoratee.WriteAsync(buffer, cancellationToken).ConfigureAwait(false); + + // After each successful write, we schedule one ping some time in the future. Since each ping is itself a + // write, if there is no application activity at all, we'll send successive pings at regular intervals. + ResetWriteTimer(); + } + } + + /// Constructs a decorator that does nothing until it is enabled by a call to . + /// + internal SlicDuplexConnectionDecorator(IDuplexConnection decoratee) => _decoratee = decoratee; + + /// Constructs a decorator that does nothing until it is enabled by a call to . + /// + internal SlicDuplexConnectionDecorator(IDuplexConnection decoratee, Action sendReadPing, Action sendWritePing) + : this(decoratee) + { + _readTimer = new Timer(_ => sendReadPing()); + _writeTimer = new Timer(_ => sendWritePing()); + } + + /// Sets the idle timeout and schedules pings once the connection is established.. + internal void Enable(TimeSpan idleTimeout) + { + Debug.Assert(idleTimeout != Timeout.InfiniteTimeSpan); + _idleTimeout = idleTimeout; + + ResetReadTimer(); + ResetWriteTimer(); + } + + /// Resets the read timer. We send a "read" ping when this timer expires. + /// This method is no-op unless this decorator is constructed with send ping actions. + private void ResetReadTimer() => _readTimer?.Change(_idleTimeout * 0.5, Timeout.InfiniteTimeSpan); + + /// Resets the write timer. We send a "write" ping when this timer expires. + /// This method is no-op unless this decorator is constructed with send ping actions. + // The write timer factor (0.6) was chosen to be greater than the read timer factor (0.5). This way, when the + // connection is completely idle, the read timer expires before the write timer and has time to send a ping that + // resets the write timer. This reduces the likelihood of duplicate "keep alive" pings. + private void ResetWriteTimer() => _writeTimer?.Change(_idleTimeout * 0.6, Timeout.InfiniteTimeSpan); +} diff --git a/tests/IceRpc.Tests/Transports/IdleTimeoutTests.cs b/tests/IceRpc.Tests/IceIdleTimeoutTests.cs similarity index 81% rename from tests/IceRpc.Tests/Transports/IdleTimeoutTests.cs rename to tests/IceRpc.Tests/IceIdleTimeoutTests.cs index 353690a01d..60e00476c6 100644 --- a/tests/IceRpc.Tests/Transports/IdleTimeoutTests.cs +++ b/tests/IceRpc.Tests/IceIdleTimeoutTests.cs @@ -1,18 +1,18 @@ // Copyright (c) ZeroC, Inc. using IceRpc.Tests.Common; -using IceRpc.Transports.Internal; +using IceRpc.Transports.Slic.Internal; using Microsoft.Extensions.DependencyInjection; using NUnit.Framework; using System.Buffers; -namespace IceRpc.Tests.Transports; +namespace IceRpc.Internal; [Parallelizable(scope: ParallelScope.All)] -public class IdleTimeoutTests +public class IceIdleTimeoutTests { [Test] - public async Task Connection_idle_after_idle_timeout() + public async Task Ice_connection_idle_after_idle_timeout() { // Arrange await using ServiceProvider provider = new ServiceCollection() @@ -23,8 +23,11 @@ public async Task Connection_idle_after_idle_timeout() var sut = provider.GetRequiredService(); await sut.AcceptAndConnectAsync(); - using var clientConnection = new IdleTimeoutDuplexConnectionDecorator(sut.Client); - clientConnection.Enable(TimeSpan.FromMilliseconds(500), keepAliveAction: null); + using var clientConnection = new IceDuplexConnectionDecorator( + sut.Client, + readIdleTimeout: TimeSpan.FromMilliseconds(500), + writeIdleTimeout: TimeSpan.FromMilliseconds(500), + keepAliveAction: () => {}); // Write and read data to the connection await sut.Server.WriteAsync(new ReadOnlySequence(new byte[1]), default); @@ -44,7 +47,7 @@ public async Task Connection_idle_after_idle_timeout() } [Test] - public async Task Keep_alive_action_is_called() + public async Task Ice_keep_alive_action_is_called() { // Arrange await using ServiceProvider provider = new ServiceCollection() @@ -56,7 +59,7 @@ public async Task Keep_alive_action_is_called() await sut.AcceptAndConnectAsync(); using var semaphore = new SemaphoreSlim(0, 1); - using var clientConnection = new IdleTimeoutDuplexConnectionDecorator( + using var clientConnection = new IceDuplexConnectionDecorator( sut.Client, readIdleTimeout: Timeout.InfiniteTimeSpan, writeIdleTimeout: TimeSpan.FromMilliseconds(500), diff --git a/tests/IceRpc.Tests/Transports/Slic/SlicIdleTimeoutTests.cs b/tests/IceRpc.Tests/Transports/Slic/SlicIdleTimeoutTests.cs new file mode 100644 index 0000000000..18bdaca0ec --- /dev/null +++ b/tests/IceRpc.Tests/Transports/Slic/SlicIdleTimeoutTests.cs @@ -0,0 +1,81 @@ +// Copyright (c) ZeroC, Inc. + +using IceRpc.Tests.Common; +using IceRpc.Transports.Slic.Internal; +using Microsoft.Extensions.DependencyInjection; +using NUnit.Framework; +using System.Buffers; + +namespace IceRpc.Tests.Transports.Slic; + +[Parallelizable(scope: ParallelScope.All)] +public class SlicIdleTimeoutTests +{ + [Test] + public async Task Slic_connection_idle_after_idle_timeout() + { + // Arrange + await using ServiceProvider provider = new ServiceCollection() + .AddDuplexTransportTest() + .AddColocTransport() + .BuildServiceProvider(validateScopes: true); + + var sut = provider.GetRequiredService(); + await sut.AcceptAndConnectAsync(); + + using var clientConnection = new SlicDuplexConnectionDecorator(sut.Client); + clientConnection.Enable(TimeSpan.FromMilliseconds(500)); + + // Write and read data to the connection + await sut.Server.WriteAsync(new ReadOnlySequence(new byte[1]), default); + Memory buffer = new byte[1]; + await clientConnection.ReadAsync(buffer, default); + + var startTime = TimeSpan.FromMilliseconds(Environment.TickCount64); + + // Act/Assert + Assert.That( + async () => await clientConnection.ReadAsync(buffer, default), + Throws.InstanceOf().With.Property("IceRpcError").EqualTo(IceRpcError.ConnectionIdle)); + + Assert.That( + TimeSpan.FromMilliseconds(Environment.TickCount64) - startTime, + Is.GreaterThan(TimeSpan.FromMilliseconds(490))); + } + + [Test] + public async Task Slic_send_pings_are_called() + { + // Arrange + await using ServiceProvider provider = new ServiceCollection() + .AddDuplexTransportTest() + .AddColocTransport() + .BuildServiceProvider(validateScopes: true); + + var sut = provider.GetRequiredService(); + await sut.AcceptAndConnectAsync(); + + using var readSemaphore = new SemaphoreSlim(0, 1); + using var writeSemaphore = new SemaphoreSlim(0, 1); + + using var clientConnection = new SlicDuplexConnectionDecorator( + sut.Client, + sendReadPing: () => readSemaphore.Release(), + sendWritePing: () => writeSemaphore.Release()); + clientConnection.Enable(TimeSpan.FromMilliseconds(500)); + + // Write and read data. + await clientConnection.WriteAsync(new ReadOnlySequence(new byte[1]), default); + await sut.Server.ReadAsync(new byte[10], default); + + var startTime = TimeSpan.FromMilliseconds(Environment.TickCount64); + + // Act/Assert + Assert.That(readSemaphore.WaitAsync, Throws.Nothing); + Assert.That(writeSemaphore.WaitAsync, Throws.Nothing); + + Assert.That( + TimeSpan.FromMilliseconds(Environment.TickCount64) - startTime, + Is.LessThan(TimeSpan.FromMilliseconds(500))); + } +} diff --git a/tests/IceRpc.Tests/Transports/Slic/SlicTransportTests.cs b/tests/IceRpc.Tests/Transports/Slic/SlicTransportTests.cs index e225e1c8ce..609c201879 100644 --- a/tests/IceRpc.Tests/Transports/Slic/SlicTransportTests.cs +++ b/tests/IceRpc.Tests/Transports/Slic/SlicTransportTests.cs @@ -587,6 +587,49 @@ public async Task Connection_with_idle_timeout_is_not_aborted_when_idle( Assert.That(async () => await acceptStreamTask, Throws.InstanceOf()); } + /// Verifies that setting the idle timeout doesn't abort the connection when there is slow write activity + /// from client to server. + [Test] + public async Task Connection_with_idle_timeout_and_slow_write_is_not_aborted( + [Values(true, false)] bool serverIdleTimeout) + { + // Arrange + var services = new ServiceCollection().AddSlicTest(); + var idleTimeout = TimeSpan.FromSeconds(1); + if (serverIdleTimeout) + { + services.AddOptions("server").Configure(options => options.IdleTimeout = idleTimeout); + } + else + { + services.AddOptions("client").Configure(options => options.IdleTimeout = idleTimeout); + } + + await using ServiceProvider provider = services.BuildServiceProvider(validateScopes: true); + + var sut = provider.GetRequiredService(); + await sut.AcceptAndConnectAsync(); + + ValueTask acceptStreamTask = sut.Server.AcceptStreamAsync(default); + IMultiplexedStream stream = await sut.Client.CreateStreamAsync(bidirectional: false, default); + + // Act + for (int i = 0; i < 4; ++i) + { + // Slow writes to the server (the server doesn't even read them). + _ = await stream.Output.WriteAsync(new byte[1], default); + await Task.Delay(TimeSpan.FromMilliseconds(500)); + } + stream.Output.Complete(); + ValueTask nextAcceptStreamTask = sut.Server.AcceptStreamAsync(default); + + // Assert + Assert.That(acceptStreamTask.IsCompleted, Is.True); + Assert.That(nextAcceptStreamTask.IsCompleted, Is.False); + await sut.Client.CloseAsync(MultiplexedConnectionCloseError.NoError, default); + Assert.That(async () => await nextAcceptStreamTask, Throws.InstanceOf()); + } + /// Verifies the cancellation token of CloseAsync works when the ShutdownAsync of the underlying server /// duplex connection hangs. [Test]