Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

back off recovery #8

Merged
merged 4 commits into from
Jun 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# RabbitMQ Amqp1.0 DotNet Client
# RabbitMQ AMQP 1.0 DotNet Client

See the [internal documentation](https://docs.google.com/document/d/1afO2ugGpTIZYUeXH_0GtMxedV51ZzmsbC3-mRdoSI_o/edit#heading=h.kqd38uu4iku)

This library is in early stages of development.
It is meant to be used with RabbitMQ 4.0.



16 changes: 8 additions & 8 deletions RabbitMQ.AMQP.Client/IClosable.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
namespace RabbitMQ.AMQP.Client;

public enum Status
public enum State
{
Closed,
Reconneting,
// Opening,
Open,
Reconnecting,
Closing,
Closed,
}

public class Error
Expand All @@ -15,13 +17,11 @@ public class Error

public interface IClosable
{
public Status Status { get; }
public State State { get; }

Task CloseAsync();

public delegate void ChangeStatusCallBack(object sender, Status from, Status to, Error? error);
public delegate void LifeCycleCallBack(object sender, State previousState, State currentState, Error? failureCause);

event ChangeStatusCallBack ChangeStatus;


event LifeCycleCallBack ChangeState;
}
5 changes: 5 additions & 0 deletions RabbitMQ.AMQP.Client/IEntities.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ public interface IEntityInfo
{
}


/// <summary>
/// Generic interface for declaring entities
/// </summary>
/// <typeparam name="T"></typeparam>
public interface IEntityDeclaration<T> where T : IEntityInfo
{
Task<T> Declare();
Expand Down
2 changes: 1 addition & 1 deletion RabbitMQ.AMQP.Client/IManagement.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ namespace RabbitMQ.AMQP.Client;

public class ModelException(string message) : Exception(message);

public class PreconditionFailException(string message) : Exception(message);
public class PreconditionFailedException(string message) : Exception(message);

public interface IManagement : IClosable
{
Expand Down
50 changes: 49 additions & 1 deletion RabbitMQ.AMQP.Client/IRecoveryConfiguration.cs
Original file line number Diff line number Diff line change
@@ -1,15 +1,63 @@
namespace RabbitMQ.AMQP.Client;


/// <summary>
/// Interface for the recovery configuration.
/// </summary>
public interface IRecoveryConfiguration
{
/// <summary>
/// Define if the recovery is activated.
/// If is not activated the connection will not try to reconnect.
/// </summary>
/// <param name="activated"></param>
/// <returns></returns>
IRecoveryConfiguration Activated(bool activated);

bool IsActivate();

// IRecoveryConfiguration BackOffDelayPolicy(BackOffDelayPolicy backOffDelayPolicy);
/// <summary>
/// Define the backoff delay policy.
/// It is used when the connection is trying to reconnect.
/// </summary>
/// <param name="backOffDelayPolicy"></param>
/// <returns></returns>
IRecoveryConfiguration BackOffDelayPolicy(IBackOffDelayPolicy backOffDelayPolicy);

/// <summary>
/// Define if the recovery of the topology is activated.
/// When Activated the connection will try to recover the topology after a reconnection.
/// It is valid only if the recovery is activated.
/// </summary>
/// <param name="activated"></param>
/// <returns></returns>
IRecoveryConfiguration Topology(bool activated);

bool IsTopologyActive();

}

/// <summary>
/// Interface for the backoff delay policy.
/// Used during the recovery of the connection.
/// </summary>
public interface IBackOffDelayPolicy
{
/// <summary>
/// Get the next delay in milliseconds.
/// </summary>
/// <returns></returns>
int Delay();

/// <summary>
/// Reset the backoff delay policy.
/// </summary>
void Reset();

/// <summary>
/// Define if the backoff delay policy is active.
/// Can be used to disable the backoff delay policy after a certain number of retries.
/// or when the user wants to disable the backoff delay policy.
/// </summary>
bool IsActive { get; }
}
4 changes: 4 additions & 0 deletions RabbitMQ.AMQP.Client/ITopologyListener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,8 @@ public interface ITopologyListener
void QueueDeclared(IQueueSpecification specification);

void QueueDeleted(string name);

void Clear();

int QueueCount();
}
131 changes: 94 additions & 37 deletions RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,39 +8,47 @@ internal class Visitor(AmqpManagement management) : IVisitor
{
private AmqpManagement Management { get; } = management;


public void VisitQueues(List<QueueSpec> queueSpec)
public async Task VisitQueues(List<QueueSpec> queueSpec)
{
foreach (var spec in queueSpec)
{
Trace.WriteLine(TraceLevel.Information, $"Recovering queue {spec.Name}");
Management.Queue(spec).Declare();
await Management.Queue(spec).Declare();
}
}
}


/// <summary>
/// AmqpConnection is the concrete implementation of <see cref="IConnection"/>
/// It is a wrapper around the AMQP.Net Lite <see cref="Connection"/> class
/// </summary>
public class AmqpConnection : IConnection
{
private const string ConnectionNotRecoveredCode = "CONNECTION_NOT_RECOVERED";
private const string ConnectionNotRecoveredMessage = "Connection not recovered";

// The native AMQP.Net Lite connection
private Connection? _nativeConnection;
private readonly AmqpManagement _management = new();


private readonly RecordingTopologyListener _recordingTopologyListener = new();

private readonly ConnectionSettings _connectionSettings;

/// <summary>
/// Creates a new instance of <see cref="AmqpConnection"/>
/// Through the Connection is possible to create:
/// - Management. See <see cref="AmqpManagement"/>
/// - Publishers and Consumers: TODO: Implement
/// </summary>
/// <param name="connectionSettings"></param>
/// <returns></returns>
public static async Task<AmqpConnection> CreateAsync(ConnectionSettings connectionSettings)
{
var connection = new AmqpConnection(connectionSettings);
await connection.EnsureConnectionAsync();

return connection;
}

Expand Down Expand Up @@ -86,75 +94,116 @@ [new Symbol("connection_name")] = _connectionSettings.ConnectionName(),
_nativeConnection.AddClosedCallback(MaybeRecoverConnection());
}

OnNewStatus(Status.Open, null);
OnNewStatus(State.Open, null);
}
catch (AmqpException e)
{
throw new ConnectionException("AmqpException: Connection failed", e);
throw new ConnectionException($"AmqpException: Connection failed. Info: {ToString()} ", e);
}
catch (OperationCanceledException e)
{
// wrong virtual host
throw new ConnectionException("OperationCanceledException: Connection failed", e);
throw new ConnectionException($"OperationCanceledException: Connection failed. Info: {ToString()}", e);
}

catch (NotSupportedException e)
{
// wrong schema
throw new ConnectionException("NotSupportedException: Connection failed", e);
throw new ConnectionException($"NotSupportedException: Connection failed. Info: {ToString()}", e);
}
}

private void OnNewStatus(Status newStatus, Error? error)

private void OnNewStatus(State newState, Error? error)
{
if (Status == newStatus) return;
var oldStatus = Status;
Status = newStatus;
ChangeStatus?.Invoke(this, oldStatus, newStatus, error);
if (State == newState) return;
var oldStatus = State;
State = newState;
ChangeState?.Invoke(this, oldStatus, newState, error);
}

private ClosedCallback MaybeRecoverConnection()
{
return (sender, error) =>
return async (sender, error) =>
{
if (error != null)
{
// TODO: Implement Dump Interface
Trace.WriteLine(TraceLevel.Warning, $"connection is closed unexpected" +
$"{sender} {error} {Status} " +
$"{_nativeConnection!.IsClosed}");
Trace.WriteLine(TraceLevel.Warning, $"connection is closed unexpectedly. " +
$"Info: {ToString()}");

if (!_connectionSettings.RecoveryConfiguration.IsActivate())
{
OnNewStatus(Status.Closed, Utils.ConvertError(error));
OnNewStatus(State.Closed, Utils.ConvertError(error));
return;
}

// TODO: Block the publishers and consumers
OnNewStatus(State.Reconnecting, Utils.ConvertError(error));

OnNewStatus(Status.Reconneting, Utils.ConvertError(error));

Thread.Sleep(1000);
// TODO: Replace with Backoff pattern
var t = Task.Run(async () =>
await Task.Run(async () =>
{
Trace.WriteLine(TraceLevel.Information, "Recovering connection");
await EnsureConnectionAsync();
Trace.WriteLine(TraceLevel.Information, "Recovering topology");
var connected = false;
// as first step we try to recover the connection
// so the connected flag is false
while (!connected &&
// we have to check if the recovery is active.
// The user may want to disable the recovery mechanism
// the user can use the lifecycle callback to handle the error
_connectionSettings.RecoveryConfiguration.IsActivate() &&
// we have to check if the backoff policy is active
// the user may want to disable the backoff policy or
// the backoff policy is not active due of some condition
// for example: Reaching the maximum number of retries and avoid the forever loop
_connectionSettings.RecoveryConfiguration.GetBackOffDelayPolicy().IsActive)
{
try
{
var next = _connectionSettings.RecoveryConfiguration.GetBackOffDelayPolicy().Delay();
Trace.WriteLine(TraceLevel.Information,
$"Trying Recovering connection in {next} milliseconds. Info: {ToString()})");
await Task.Delay(
TimeSpan.FromMilliseconds(next));

await EnsureConnectionAsync();
connected = true;
}
catch (Exception e)
{
Trace.WriteLine(TraceLevel.Warning,
$"Error trying to recover connection {e}. Info: {this}");
}
}

_connectionSettings.RecoveryConfiguration.GetBackOffDelayPolicy().Reset();
var connectionDescription = connected ? "recovered" : "not recovered";
Trace.WriteLine(TraceLevel.Information,
$"Connection {connectionDescription}. Info: {ToString()}");

if (!connected)
{
Trace.WriteLine(TraceLevel.Verbose, $"connection is closed. Info: {ToString()}");
OnNewStatus(State.Closed, new Error()
{
Description =
$"{ConnectionNotRecoveredMessage}, recover status: {_connectionSettings.RecoveryConfiguration}",
ErrorCode = ConnectionNotRecoveredCode
});
return;
}


if (_connectionSettings.RecoveryConfiguration.IsTopologyActive())
{
_recordingTopologyListener.Accept(new Visitor(_management));
Trace.WriteLine(TraceLevel.Information, $"Recovering topology. Info: {ToString()}");
await _recordingTopologyListener.Accept(new Visitor(_management));
}
});
t.WaitAsync(TimeSpan.FromSeconds(10));
return;
}


Trace.WriteLine(TraceLevel.Verbose, $"connection is closed" +
$"{sender} {error} {Status} " +
$"{_nativeConnection!.IsClosed}");
OnNewStatus(Status.Closed, Utils.ConvertError(error));
Trace.WriteLine(TraceLevel.Verbose, $"connection is closed. Info: {ToString()}");
OnNewStatus(State.Closed, Utils.ConvertError(error));
};
}

Expand All @@ -166,12 +215,20 @@ private ClosedCallback MaybeRecoverConnection()

public async Task CloseAsync()
{
OnNewStatus(Status.Closed, null);
_recordingTopologyListener.Clear();
if (State == State.Closed) return;
OnNewStatus(State.Closing, null);
if (_nativeConnection is { IsClosed: false }) await _nativeConnection.CloseAsync();
await _management.CloseAsync();
}

public event IClosable.ChangeStatusCallBack? ChangeStatus;
public event IClosable.LifeCycleCallBack? ChangeState;

public State State { get; private set; } = State.Closed;

public Status Status { get; private set; } = Status.Closed;
public override string ToString()
{
var info = $"AmqpConnection{{ConnectionSettings='{_connectionSettings}', Status='{State.ToString()}'}}";
return info;
}
}
Loading