Skip to content

Commit

Permalink
Publisher first version (#9)
Browse files Browse the repository at this point in the history
* Publisher First version
---------

Signed-off-by: Gabriele Santomaggio <g.santomaggio@gmail.com>
  • Loading branch information
Gsantomaggio authored Jun 28, 2024
1 parent 7a43277 commit 73653a9
Show file tree
Hide file tree
Showing 31 changed files with 997 additions and 164 deletions.
14 changes: 13 additions & 1 deletion Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
</PropertyGroup>
<ItemGroup>
<!-- RabbitMQ.Amqp.Client -->
<PackageVersion Include="AMQPNetLite.Core" Version="2.4.10" />
<PackageVersion Include="AMQPNetLite.Core" Version="2.4.11" />
<PackageVersion Include="Microsoft.CodeAnalysis.PublicApiAnalyzers" Version="3.3.4" />
<PackageVersion Include="Microsoft.NET.Test.Sdk" Version="17.5.0" />
<PackageVersion Include="Microsoft.SourceLink.GitHub" Version="1.1.1" />
Expand All @@ -17,4 +17,16 @@
<PackageVersion Include="coverlet.collector" Version="3.2.0" />
<!-- docs/**/*.csproj -->
</ItemGroup>
<ItemGroup Label=".NET 6 Specific" Condition="'$(TargetFramework)' == 'net6.0'">
<!-- RabbitMQ.Amqp.Client -->
<PackageVersion Include="System.Diagnostics.DiagnosticSource" Version="6.0.0" />
</ItemGroup>
<ItemGroup Label=".NET 7 Specific" Condition="'$(TargetFramework)' == 'net7.0'">
<!-- RabbitMQ.Amqp.Client -->
<PackageVersion Include="System.Diagnostics.DiagnosticSource" Version="7.0.0" />
</ItemGroup>
<ItemGroup Label=".NET 8 Specific" Condition="'$(TargetFramework)' == 'net8.0'">
<!-- RabbitMQ.Amqp.Client -->
<PackageVersion Include="System.Diagnostics.DiagnosticSource" Version="8.0.1" />
</ItemGroup>
</Project>
23 changes: 23 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,28 @@
This library is in early stages of development.
It is meant to be used with RabbitMQ 4.0.

## How to Run
- start the broker with `ci/start-broker.sh`
- run the tests with ` dotnet test ./Build.csproj --logger "console;verbosity=detailed" /p:AltCover=true`

## Getting Started
You can find an example in: `docs/Examples/GettingStarted`


## TODO

- [x] Declare queues
- [ ] Declare exchanges
- [ ] Declare bindings
- [x] Simple Publish messages
- [x] Implement backpressure ( atm it is implemented with MaxInflightMessages `MaxInFlight(2000).`)
- [ ] Simple Consume messages
- [ ] Implement metrics ( See `System.Diagnostics.DiagnosticSource` [Link](https://learn.microsoft.com/en-us/dotnet/core/diagnostics/metrics-instrumentation) )
- [x] Recovery connection on connection lost
- [x] Recovery management on connection lost
- [x] Recovery queues on connection lost
- [ ] Recovery publisher on connection lost
- [ ] Recovery consumer on connection lost
- [ ] Docker image to test in LRE
- [ ] Check the TODO in the code

13 changes: 13 additions & 0 deletions RabbitMQ.AMQP.Client/IAddressBuilder.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
namespace RabbitMQ.AMQP.Client;

public class InvalidAddressException(string message) : Exception(message);

public interface IAddressBuilder<out T>
{

T Exchange(string exchange);

T Queue(string queue);

T Key(string key);
}
15 changes: 10 additions & 5 deletions RabbitMQ.AMQP.Client/IClosable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,20 @@ public enum State
Closed,
}

public class Error
public class Error(string? errorCode, string? description)
{
public string? Description { get; internal set; }
public string? ErrorCode { get; internal set; }
public string? Description { get; } = description;
public string? ErrorCode { get; } = errorCode;

public override string ToString()
{
return $"Code: {ErrorCode} - Description: {Description}";
}
}

public interface IClosable
public interface IClosable // TODO: Create an abstract class with the event and the State property
{
public State State { get; }
public State State { get; }

Task CloseAsync();

Expand Down
4 changes: 2 additions & 2 deletions RabbitMQ.AMQP.Client/IConnection.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
namespace RabbitMQ.AMQP.Client;

public class ConnectionException(string? message, Exception? innerException) : Exception(message, innerException);
public class ConnectionException(string? message) : Exception(message);

public interface IConnection : IClosable
public interface IConnection
{
IManagement Management();
Task ConnectAsync();
Expand Down
6 changes: 3 additions & 3 deletions RabbitMQ.AMQP.Client/IEntities.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ public interface IQueueSpecification : IEntityDeclaration<IQueueInfo>
IQueueSpecification Arguments(Dictionary<object, object> arguments);

public Dictionary<object, object> Arguments();

IQueueSpecification Type(QueueType type);

public QueueType Type();

// IQuorumQueueSpecification Quorum();
}

Expand Down
2 changes: 1 addition & 1 deletion RabbitMQ.AMQP.Client/IManagement.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ public interface IManagement : IClosable
IQueueSpecification Queue(string name);

IQueueDeletion QueueDeletion();

ITopologyListener TopologyListener();
}

4 changes: 1 addition & 3 deletions RabbitMQ.AMQP.Client/IMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@ namespace RabbitMQ.AMQP.Client;

public interface IMessage
{
IMessage Body(object body);
object Body();

public object Body();
// properties
string MessageId();
IMessage MessageId(string id);
Expand Down
26 changes: 26 additions & 0 deletions RabbitMQ.AMQP.Client/IPublisher.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
namespace RabbitMQ.AMQP.Client;

public class PublisherException(string message) : Exception(message);

public enum OutcomeState
{
Accepted,
Failed,
}

public class OutcomeDescriptor(ulong code, string description, OutcomeState state, Error? error)
{
public OutcomeState State { get; internal set; } = state;
public ulong Code { get; internal set; } = code;
public string Description { get; internal set; } = description;

public Error? Error { get; internal set; } = error;
}

public delegate void OutcomeDescriptorCallback(IMessage message, OutcomeDescriptor outcomeDescriptor);

public interface IPublisher : IClosable
{
Task Publish(IMessage message,
OutcomeDescriptorCallback outcomeCallback); // TODO: Add CancellationToken and callBack
}
9 changes: 9 additions & 0 deletions RabbitMQ.AMQP.Client/IPublisherBuilder.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
namespace RabbitMQ.AMQP.Client;

public interface IPublisherBuilder : IAddressBuilder<IPublisherBuilder>
{
IPublisherBuilder PublishTimeout(TimeSpan timeout);

IPublisherBuilder MaxInflightMessages(int maxInFlight);
IPublisher Build();
}
12 changes: 6 additions & 6 deletions RabbitMQ.AMQP.Client/IRecoveryConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public interface IRecoveryConfiguration
/// <param name="activated"></param>
/// <returns></returns>
IRecoveryConfiguration Activated(bool activated);

bool IsActivate();

/// <summary>
Expand All @@ -32,7 +32,7 @@ public interface IRecoveryConfiguration
/// <param name="activated"></param>
/// <returns></returns>
IRecoveryConfiguration Topology(bool activated);

bool IsTopologyActive();

}
Expand All @@ -41,23 +41,23 @@ public interface IRecoveryConfiguration
/// Interface for the backoff delay policy.
/// Used during the recovery of the connection.
/// </summary>
public interface IBackOffDelayPolicy
public interface IBackOffDelayPolicy
{
/// <summary>
/// Get the next delay in milliseconds.
/// </summary>
/// <returns></returns>
int Delay();

/// <summary>
/// Reset the backoff delay policy.
/// </summary>
void Reset();

/// <summary>
/// Define if the backoff delay policy is active.
/// Can be used to disable the backoff delay policy after a certain number of retries.
/// or when the user wants to disable the backoff delay policy.
/// </summary>
bool IsActive { get; }
bool IsActive();
}
4 changes: 2 additions & 2 deletions RabbitMQ.AMQP.Client/ITopologyListener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ public interface ITopologyListener
void QueueDeclared(IQueueSpecification specification);

void QueueDeleted(string name);

void Clear();

int QueueCount();
}
27 changes: 27 additions & 0 deletions RabbitMQ.AMQP.Client/Impl/AbstractClosable.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
namespace RabbitMQ.AMQP.Client.Impl;

public class AmqpClosedException(string message) : Exception(message);

public abstract class AbstractClosable : IClosable
{
public State State { get; internal set; } = State.Closed;
public abstract Task CloseAsync();
protected void ThrowIfClosed()
{
if (State == State.Closed)
{
throw new AmqpClosedException(GetType().Name);
}
}


protected void OnNewStatus(State newState, Error? error)
{
if (State == newState) return;
var oldStatus = State;
State = newState;
ChangeState?.Invoke(this, oldStatus, newState, error);
}

public event IClosable.LifeCycleCallBack? ChangeState;
}
Loading

0 comments on commit 73653a9

Please sign in to comment.