Skip to content

Commit

Permalink
implement bindings and un-bindings (#19)
Browse files Browse the repository at this point in the history
* implement bindings and unbindings
---------

Signed-off-by: Gabriele Santomaggio <g.santomaggio@gmail.com>
Co-authored-by: Luke Bakken <luke@bakken.io>
  • Loading branch information
Gsantomaggio and lukebakken authored Jul 15, 2024
1 parent 7eb6a19 commit 930385e
Show file tree
Hide file tree
Showing 17 changed files with 662 additions and 60 deletions.
9 changes: 8 additions & 1 deletion .ci/ubuntu/gha-setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,14 @@ script_dir="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
readonly script_dir
echo "[INFO] script_dir: '$script_dir'"

readonly rabbitmq_image="${RABBITMQ_IMAGE:-pivotalrabbitmq/rabbitmq:main}"
if [[ $3 == 'arm' ]]
then
readonly rabbitmq_image="${RABBITMQ_IMAGE:-pivotalrabbitmq/rabbitmq-arm64:main}"
else
readonly rabbitmq_image="${RABBITMQ_IMAGE:-pivotalrabbitmq/rabbitmq:main}"
fi


readonly docker_name_prefix='rabbitmq-amqp-dotnet-client'
readonly docker_network_name="$docker_name_prefix-network"

Expand Down
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ build:
test: build
dotnet test -c Debug $(CURDIR)/Tests/Tests.csproj --no-build --logger:"console;verbosity=detailed" /p:AltCover=true

rabbitmq-server-start:
./.ci/ubuntu/gha-setup.sh start
rabbitmq-server-start-arm:
./.ci/ubuntu/gha-setup.sh start pull arm

rabbitmq-server-stop:
./.ci/ubuntu/gha-setup.sh stop
Expand Down
39 changes: 32 additions & 7 deletions RabbitMQ.AMQP.Client/IEntities.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,23 @@ public interface IEntityInfo
}

/// <summary>
/// Generic interface for declaring entities
/// Generic interface for declaring entities with result of type T
/// </summary>
/// <typeparam name="T"></typeparam>
public interface IEntityDeclaration<T> where T : IEntityInfo
public interface IEntityInfoDeclaration<T> where T : IEntityInfo
{
Task<T> Declare();
}

public interface IQueueSpecification : IEntityDeclaration<IQueueInfo>
/// <summary>
/// Generic interface for declaring entities without result
/// </summary>
public interface IEntityDeclaration
{
Task Declare();
}

public interface IQueueSpecification : IEntityInfoDeclaration<IQueueInfo>
{
IQueueSpecification Name(string name);
public string Name();
Expand Down Expand Up @@ -44,22 +52,39 @@ public interface IQueueDeletion
Task<IEntityInfo> Delete(string name);
}

public interface IExchangeSpecification : IEntityDeclaration<IExchangeInfo>
public interface IExchangeSpecification : IEntityDeclaration
{
IExchangeSpecification Name(string name);

IExchangeSpecification AutoDelete(bool autoDelete);

IExchangeSpecification Type(ExchangeType type);

IExchangeSpecification Type(string type);
IExchangeSpecification Type(string type); // TODO: Add this

IExchangeSpecification Argument(string key, object value);
}


public interface IExchangeDeletion
{
// TODO consider returning a ExchangeStatus object with some info after deletion
Task<IEntityInfo> Delete(string name);
Task Delete(string name);
}

public interface IBindingSpecification
{
IBindingSpecification SourceExchange(string exchange);

IBindingSpecification DestinationQueue(string queue);

IBindingSpecification DestinationExchange(string exchange);

IBindingSpecification Key(string key);

IBindingSpecification Argument(string key, object value);

IBindingSpecification Arguments(Dictionary<string, object> arguments);

Task Bind();
Task Unbind();
}
3 changes: 0 additions & 3 deletions RabbitMQ.AMQP.Client/IEntitiesInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,3 @@ public enum ExchangeType
HEADERS
}

public interface IExchangeInfo : IEntityInfo
{
}
3 changes: 2 additions & 1 deletion RabbitMQ.AMQP.Client/IManagement.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ public interface IManagement : IClosable

IExchangeDeletion ExchangeDeletion();

IBindingSpecification Binding();

ITopologyListener TopologyListener();
}

170 changes: 170 additions & 0 deletions RabbitMQ.AMQP.Client/Impl/AmqpBindingSpecification.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
using Amqp.Types;

namespace RabbitMQ.AMQP.Client.Impl;

public abstract class BindingSpecificationBase
{
protected string Source = "";
protected string Destination = "";
protected string RoutingKey = "";
protected bool ToQueue = true;
protected Dictionary<string, object> _arguments = new();

protected Map ArgsToMap()
{
Map argMap = new();

foreach ((string key, object value) in _arguments)
{
argMap[key] = value;
}

return argMap;
}
}

public class AmqpBindingSpecification(AmqpManagement management) : BindingSpecificationBase, IBindingSpecification
{
private AmqpManagement Management { get; } = management;

public async Task Bind()
{
var kv = new Map
{
{ "source", Source },
{ "binding_key", RoutingKey },
{ "arguments", ArgsToMap() },
{ ToQueue ? "destination_queue" : "destination_exchange", Destination }
};

await Management.Request(kv, $"/{Consts.Bindings}",
AmqpManagement.Post,
[
AmqpManagement.Code204,
]).ConfigureAwait(false);
}

public async Task Unbind()
{
string destinationCharacter = ToQueue ? "dstq" : "dste";
if (_arguments.Count == 0)
{
string target =
$"/{Consts.Bindings}/src={Utils.EncodePathSegment(Source)};" +
$"{($"{destinationCharacter}={Utils.EncodePathSegment(Destination)};" +
$"key={Utils.EncodePathSegment(RoutingKey)};args=")}";

await Management.Request(
null, target,
AmqpManagement.Delete, new[] { AmqpManagement.Code204 }).ConfigureAwait(false);
}
else
{
string path = BindingsTarget(destinationCharacter, Source, Destination, RoutingKey);
List<Map> bindings = await GetBindings(path).ConfigureAwait(false);
string? uri = MatchBinding(bindings, RoutingKey, ArgsToMap());
if (uri != null)
{
await Management.Request(
null, uri,
AmqpManagement.Delete, new[] { AmqpManagement.Code204 }).ConfigureAwait(false);
}
}
}

public IBindingSpecification SourceExchange(string exchange)
{
ToQueue = false;
Source = exchange;
return this;
}

public IBindingSpecification DestinationQueue(string queue)
{
ToQueue = true;
Destination = queue;
return this;
}

public IBindingSpecification DestinationExchange(string exchange)
{
Destination = exchange;
return this;
}

public IBindingSpecification Key(string key)
{
RoutingKey = key;
return this;
}

public IBindingSpecification Argument(string key, object value)
{
_arguments[key] = value;
return this;
}

public IBindingSpecification Arguments(Dictionary<string, object> arguments)
{
_arguments = arguments;
return this;
}

private string BindingsTarget(
string destinationField, string source, string destination, string key)
{
return "/bindings?src="
+ Utils.EncodeHttpParameter(source)
+ "&"
+ destinationField
+ "="
+ Utils.EncodeHttpParameter(destination)
+ "&key="
+ Utils.EncodeHttpParameter(key);
}

private async Task<List<Map>> GetBindings(string path)
{
var result = await Management.Request(
null, path,
AmqpManagement.Get, new[] { AmqpManagement.Code200 }).ConfigureAwait(false);

if (result.Body is not List list)
{
return [];
}


var l = new List<Map>() { };
foreach (object o in list)
{
if (o is Map item)
{
l.Add(item);
}
}

return l;
}

private string? MatchBinding(List<Map> bindings, string key, Map arguments)
{
string? uri = null;
foreach (Map binding in bindings)
{
string bindingKey = (string)binding["binding_key"];
Map bindingArguments = (Map)binding["arguments"];
if ((key == null && bindingKey == null) || (key != null && key.Equals(bindingKey)))
{
if ((arguments == null && bindingArguments == null) ||
(arguments != null && Utils.CompareMap(arguments, bindingArguments)))
{
uri = binding["location"].ToString();
break;
}
}
}

return uri;
}
}
8 changes: 8 additions & 0 deletions RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public class AmqpConnection : AbstractClosable, IConnection

private readonly AmqpManagement _management = new();
private readonly RecordingTopologyListener _recordingTopologyListener = new();
private readonly TaskCompletionSource<bool> _connectionCloseTaskCompletionSource = new(TaskCreationOptions.RunContinuationsAsynchronously);

private readonly ConnectionSettings _connectionSettings;
internal readonly AmqpSessionManagement NativePubSubSessions;
Expand Down Expand Up @@ -287,6 +288,8 @@ await _recordingTopologyListener.Accept(visitor)
{
_semaphoreClose.Release();
}

_connectionCloseTaskCompletionSource.SetResult(true);
};
}

Expand Down Expand Up @@ -334,6 +337,11 @@ await _management.CloseAsync()
{
_semaphoreClose.Release();
}

await _connectionCloseTaskCompletionSource.Task.WaitAsync(TimeSpan.FromSeconds(10))
.ConfigureAwait(false);

OnNewStatus(State.Closed, null);
}


Expand Down
21 changes: 5 additions & 16 deletions RabbitMQ.AMQP.Client/Impl/AmqpExchangeSpecification.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,6 @@

namespace RabbitMQ.AMQP.Client.Impl;

public class AmqpExchangeInfo : IExchangeInfo
{
}

public class AmqpExchangeSpecification(AmqpManagement management) : IExchangeSpecification
{
private string _name = "";
Expand All @@ -15,7 +11,7 @@ public class AmqpExchangeSpecification(AmqpManagement management) : IExchangeSpe
private string _typeString = ""; // TODO: add this
private readonly Map _arguments = new();

public async Task<IExchangeInfo> Declare()
public async Task Declare()
{
if (string.IsNullOrEmpty(_name))
{
Expand All @@ -34,15 +30,13 @@ public async Task<IExchangeInfo> Declare()
// TODO: encodePathSegment(queues)
// Message request = await management.Request(kv, $"/{Consts.Exchanges}/{_name}",
// for the moment we won't use the message response
await management.Request(kv, $"/{Consts.Exchanges}/{_name}",
await management.Request(kv, $"/{Consts.Exchanges}/{Utils.EncodePathSegment(_name)}",
AmqpManagement.Put,
[
AmqpManagement.Code204,
AmqpManagement.Code201,
AmqpManagement.Code409
]).ConfigureAwait(false);

return new AmqpExchangeInfo();
}

public IExchangeSpecification Name(string name)
Expand Down Expand Up @@ -76,18 +70,13 @@ public IExchangeSpecification Argument(string key, object value)
}
}

public class DefaultExchangeDeletionInfo : IEntityInfo
{
}

public class AmqpExchangeDeletion(AmqpManagement management) : IExchangeDeletion
{
public async Task<IEntityInfo> Delete(string name)
public async Task Delete(string name)
{
await management
.Request(null, $"/{Consts.Exchanges}/{name}", AmqpManagement.Delete, new[] { AmqpManagement.Code204, })
.Request(null, $"/{Consts.Exchanges}/{Utils.EncodePathSegment(name)}", AmqpManagement.Delete,
new[] { AmqpManagement.Code204, })
.ConfigureAwait(false);

return new DefaultExchangeDeletionInfo();
}
}
8 changes: 8 additions & 0 deletions RabbitMQ.AMQP.Client/Impl/AmqpManagement.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ public class AmqpManagement : AbstractClosable, IManagement // TODO: Implement T
internal const int Code204 = 204; // TODO: handle 204
internal const int Code409 = 409;
internal const string Put = "PUT";
internal const string Get = "GET";
internal const string Post = "POST";

internal const string Delete = "DELETE";

private const string ReplyTo = "$me";
Expand Down Expand Up @@ -74,6 +77,11 @@ public IExchangeDeletion ExchangeDeletion()
return new AmqpExchangeDeletion(this);
}

public IBindingSpecification Binding()
{
return new AmqpBindingSpecification(this);
}

public ITopologyListener TopologyListener()
{
return _recordingTopologyListener!;
Expand Down
Loading

0 comments on commit 930385e

Please sign in to comment.