Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactors how Slic keeps connections alive #3670

Merged
merged 8 commits into from
Oct 4, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,31 +1,39 @@
// Copyright (c) ZeroC, Inc.

using IceRpc.Transports;
using System.Buffers;
using System.Diagnostics;

namespace IceRpc.Transports.Internal;
namespace IceRpc.Internal;

/// <summary>Decorates <see cref="ReadAsync" /> to fail if no byte is received for over readIdleTimeout. Also decorates
/// <see cref="WriteAsync" /> to schedule a keep alive action (writeIdleTimeout / 2) after a successful write. Both
/// sides of the connection are expected to use the same idle timeouts.</summary>
internal class IdleTimeoutDuplexConnectionDecorator : IDuplexConnection
internal class IceDuplexConnectionDecorator : IDuplexConnection
{
private readonly IDuplexConnection _decoratee;
private Timer? _keepAliveTimer;
private readonly Timer _writerTimer;
private readonly CancellationTokenSource _readCts = new();
private TimeSpan _readIdleTimeout = Timeout.InfiniteTimeSpan;
private TimeSpan _writeIdleTimeout = Timeout.InfiniteTimeSpan;
private readonly TimeSpan _readIdleTimeout;
private readonly TimeSpan _writeIdleTimeout;

public Task<TransportConnectionInformation> ConnectAsync(CancellationToken cancellationToken) =>
_decoratee.ConnectAsync(cancellationToken);
public async Task<TransportConnectionInformation> ConnectAsync(CancellationToken cancellationToken)
{
TransportConnectionInformation connectionInformation = await _decoratee.ConnectAsync(cancellationToken)
.ConfigureAwait(false);

// Schedule or reschedule a keep alive after a successful connection establishment.
ResetWriteTimer();
return connectionInformation;
}

public void Dispose()
{
_decoratee.Dispose();
_readCts.Dispose();

// Using Dispose is fine, there's no need to wait for the keep alive action to terminate if it's running.
_keepAliveTimer?.Dispose();
_writerTimer.Dispose();
}

public ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken)
Expand Down Expand Up @@ -72,50 +80,30 @@ async ValueTask PerformWriteAsync()
{
await _decoratee.WriteAsync(buffer, cancellationToken).ConfigureAwait(false);

// After each successful write, we schedule one ping (keep alive or heartbeat) at _writeIdleTimeout / 2 in
// the future. Since each ping is itself a write, if there is no application activity at all, we'll send
// successive pings at _writeIdleTimeout / 2 intervals.
ScheduleKeepAlive();
// After each successful write, we (re)schedule one ping (heartbeat) at _writeIdleTimeout / 2 in the future.
// Since each ping is itself a write, if there is no application activity at all, we'll send successive
// pings at _writeIdleTimeout / 2 intervals.
ResetWriteTimer();
}
}

/// <summary>Constructs a decorator that does nothing until it is enabled by a call to <see cref="Enable" />.
/// </summary>
internal IdleTimeoutDuplexConnectionDecorator(IDuplexConnection decoratee) => _decoratee = decoratee;

/// <summary>Constructs a decorator that ensures a call to <see cref="ReadAsync" /> will fail after readIdleTimeout.
/// This decorator also schedules a keepAliveAction after each write (see <see cref="ScheduleKeepAlive" />).
/// </summary>
/// <remarks>Do not call <see cref="Enable" /> on a decorator constructed with this constructor.</remarks>
internal IdleTimeoutDuplexConnectionDecorator(
/// This decorator also schedules a keepAliveAction after each write (see <see cref="ResetWriteTimer" />).</summary>
internal IceDuplexConnectionDecorator(
IDuplexConnection decoratee,
TimeSpan readIdleTimeout,
TimeSpan writeIdleTimeout,
Action keepAliveAction)
: this(decoratee)
{
Debug.Assert(writeIdleTimeout != Timeout.InfiniteTimeSpan);
_decoratee = decoratee;
_readIdleTimeout = readIdleTimeout; // can be infinite i.e. disabled
_writeIdleTimeout = writeIdleTimeout;
_keepAliveTimer = new Timer(_ => keepAliveAction());
}
_writerTimer = new Timer(_ => keepAliveAction());

/// <summary>Enables the read and write idle timeouts; also schedules one keep-alive.</summary>.
internal void Enable(TimeSpan idleTimeout, Action? keepAliveAction)
{
Debug.Assert(idleTimeout != Timeout.InfiniteTimeSpan);
Debug.Assert(_keepAliveTimer is null);

_readIdleTimeout = idleTimeout;
_writeIdleTimeout = idleTimeout;

if (keepAliveAction is not null)
{
_keepAliveTimer = new Timer(_ => keepAliveAction());
ScheduleKeepAlive();
}
// We can't schedule a keep alive right away because the connection is not connected yet.
}

/// <summary>Schedules one keep alive in writeIdleTimeout / 2.</summary>
internal void ScheduleKeepAlive() => _keepAliveTimer?.Change(_writeIdleTimeout / 2, Timeout.InfiniteTimeSpan);
/// <summary>Resets the write timer. We send a keep alive when this timer expires.</summary>
private void ResetWriteTimer() => _writerTimer.Change(_writeIdleTimeout / 2, Timeout.InfiniteTimeSpan);
}
10 changes: 1 addition & 9 deletions src/IceRpc/Internal/IceProtocolConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,6 @@ internal sealed class IceProtocolConnection : IProtocolConnection
// A connection refuses invocations when it's disposed, shut down, shutting down or merely "shutdown requested".
private bool _refuseInvocations;

// When not null, schedules one keep-alive action in options.IdleTimeout / 2.
private readonly Action? _scheduleKeepAlive;

// Does ShutdownAsync send a close connection frame?
private bool _sendCloseConnectionFrame = true;

Expand Down Expand Up @@ -143,9 +140,6 @@ internal sealed class IceProtocolConnection : IProtocolConnection
throw new InvalidDataException(
$"Expected '{nameof(IceFrameType.ValidateConnection)}' frame but received frame type '{validateConnectionFrame.FrameType}'.");
}

// Schedules a keep-alive to keep the connection alive now that it's established.
_scheduleKeepAlive?.Invoke();
}
}
catch (OperationCanceledException)
Expand Down Expand Up @@ -600,13 +594,11 @@ internal IceProtocolConnection(

if (options.IceIdleTimeout != Timeout.InfiniteTimeSpan)
{
var duplexConnectionDecorator = new IdleTimeoutDuplexConnectionDecorator(
duplexConnection = new IceDuplexConnectionDecorator(
duplexConnection,
readIdleTimeout: options.EnableIceIdleCheck ? options.IceIdleTimeout : Timeout.InfiniteTimeSpan,
writeIdleTimeout: options.IceIdleTimeout,
KeepAlive);
duplexConnection = duplexConnectionDecorator;
_scheduleKeepAlive = duplexConnectionDecorator.ScheduleKeepAlive;
}

_duplexConnection = duplexConnection;
Expand Down
84 changes: 49 additions & 35 deletions src/IceRpc/Transports/Slic/Internal/SlicConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,9 @@ internal class SlicConnection : IMultiplexedConnection
private Task<TransportConnectionInformation>? _connectTask;
private readonly CancellationTokenSource _disposedCts = new();
private Task? _disposeTask;
private readonly IDuplexConnection _duplexConnection;
private readonly SlicDuplexConnectionDecorator _duplexConnection;
private readonly DuplexConnectionReader _duplexConnectionReader;
private readonly SlicDuplexConnectionWriter _duplexConnectionWriter;
private readonly Action<TimeSpan, Action?> _enableIdleTimeoutAndKeepAlive;
private bool _isClosed;
private ulong? _lastRemoteBidirectionalStreamId;
private ulong? _lastRemoteUnidirectionalStreamId;
Expand Down Expand Up @@ -265,9 +264,7 @@ async Task<TransportConnectionInformation> PerformConnectAsync()

if (idleTimeout != Timeout.InfiniteTimeSpan)
{
// Only client connections send ping frames when idle to keep the server connection alive. The server
// sends back a Pong frame in turn to keep alive the client connection.
_enableIdleTimeoutAndKeepAlive(idleTimeout, IsServer ? null : KeepAlive);
_duplexConnection.Enable(idleTimeout);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be better to keep IDuplexConnection and then do

if (_duplexConnection is SlicDuplexConnectionDecorator duplexConnectionDecoator)
{
    duplexConnectionDecoator.Enable();
} 

Then we can make SlicDuplexConnectionDecorator timers non-nullable, and keep a single constructor.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see how this would work.

The difficulty here is the idle timeout is negotiated during connection establishment. At construction time, we don't know if the negotiated idle timeout is good to be infinite or some other value.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can pass the idleTimeout I forget to add it there, and that is not the point. The point is to avoid creating this decorator for the Server connections.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need the decorator for server connections too. It implements the idle timer.

}

_readFramesTask = ReadFramesAsync(_disposedCts.Token);
Expand Down Expand Up @@ -317,31 +314,6 @@ async Task<TransportConnectionInformation> PerformConnectAsync()
_ => throw new InvalidDataException($"Received unexpected Slic frame: '{frameType}'."),
};

void KeepAlive()
{
// _pendingPongCount can be < 0 if an unexpected pong is received. If it's the case, the connection is being
// torn down and there's no point in sending a ping frame.
if (Interlocked.Increment(ref _pendingPongCount) > 0)
{
try
{
// For now, the Ping frame payload is just a long which is always set to 0. In the future, it could
// be a ping frame type value if the ping frame is used for different purpose (e.g: a KeepAlive or
// RTT ping frame type).
WriteConnectionFrame(FrameType.Ping, new PingBody(0L).Encode);
}
catch (IceRpcException)
{
// Expected if the connection is closed.
}
catch (Exception exception)
{
Debug.Fail($"The Slic keep alive timer failed with an unexpected exception: {exception}");
throw;
}
}
}

async ValueTask<T> ReadFrameAsync<T>(
Func<FrameType?, ReadOnlySequence<byte>, T> decodeFunc,
CancellationToken cancellationToken)
Expand Down Expand Up @@ -577,10 +549,11 @@ internal SlicConnection(

_closedCancellationToken = _closedCts.Token;

var duplexConnectionDecorator = new IdleTimeoutDuplexConnectionDecorator(duplexConnection);
_enableIdleTimeoutAndKeepAlive = duplexConnectionDecorator.Enable;
// Only the client-side sends pings to keep the connection alive when idle timeout (set later) is not infinite.
_duplexConnection = IsServer ?
new SlicDuplexConnectionDecorator(duplexConnection) :
new SlicDuplexConnectionDecorator(duplexConnection, SendReadPing, SendWritePing);

_duplexConnection = duplexConnectionDecorator;
_duplexConnectionReader = new DuplexConnectionReader(_duplexConnection, options.Pool, options.MinSegmentSize);
_duplexConnectionWriter = new SlicDuplexConnectionWriter(
_duplexConnection,
Expand All @@ -598,6 +571,47 @@ internal SlicConnection(
_nextBidirectionalId = 0;
_nextUnidirectionalId = 2;
}

void SendPing(long payload)
{
try
{
WriteConnectionFrame(FrameType.Ping, new PingBody(payload).Encode);
}
catch (IceRpcException)
{
// Expected if the connection is closed.
}
catch (Exception exception)
{
Debug.Fail($"The sending of a Ping frame failed with an unexpected exception: {exception}");
throw;
}
}

void SendReadPing()
{
// ReadKeepAlive is no-op unless _pendingPongCount == 0 before the Increment below: we send a Ping only if
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is ReadKeepAlive there is no such operation?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. It's the current local function, SendReadPing.

// there is no outstanding Pong.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is outstanding Pong. correct here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replaced by pending Pong, like the field name.

if (Interlocked.Increment(ref _pendingPongCount) == 1)
{
SendPing(1L);
}
else
{
Interlocked.Decrement(ref _pendingPongCount);
bernardnormier marked this conversation as resolved.
Show resolved Hide resolved
}
}

void SendWritePing()
{
// _pendingPongCount can be <= 0 if an unexpected pong is received. If it's the case, the connection is
// being torn down and there's no point in sending a ping frame.
if (Interlocked.Increment(ref _pendingPongCount) > 0)
{
SendPing(0L);
}
}
}

/// <summary>Fills the given writer with stream data received on the connection.</summary>
Expand Down Expand Up @@ -1190,8 +1204,8 @@ async Task ReadPongFrameAsync(int size, CancellationToken cancellationToken)
(ref SliceDecoder decoder) => new PongBody(ref decoder),
cancellationToken).ConfigureAwait(false);

// For now, we only send a 0 payload value.
if (pongBody.Payload != 0L)
// For now, we only send a 0 or 1 payload value (0 for "write ping" and 1 for "read ping").
if (pongBody.Payload != 0L && pongBody.Payload != 1L)
{
throw new InvalidDataException($"Received {nameof(FrameType.Pong)} with unexpected payload.");
}
Expand Down
Loading