Skip to content

Commit

Permalink
Use EasyNetQ.Management.Client
Browse files Browse the repository at this point in the history
* Allow specifying `MaxFrameSize` for a connection.
* Move HTTP API interaction to `EasyNetQ.Management.Client`

* No need to start toxiproxy yet

* Use `uint.MinValue` to mean `unlimited` for max frame size
*

Implement the Environment class to manage the connections (#36)

* Implement the Environment Closes #35
---------

Signed-off-by: Gabriele Santomaggio <g.santomaggio@gmail.com>

Use `EasyNetQ.Management.Client`

* Allow specifying `MaxFrameSize` for a connection.
* Move HTTP API interaction to `EasyNetQ.Management.Client`

* No need to start toxiproxy yet

* Use `uint.MinValue` to mean `unlimited` for max frame size
*

* Add to public API

Continue migrating to EasyNetQ.Management.Client
  • Loading branch information
lukebakken committed Jul 26, 2024
1 parent 955c730 commit d36a7b4
Show file tree
Hide file tree
Showing 12 changed files with 160 additions and 96 deletions.
10 changes: 5 additions & 5 deletions .ci/ubuntu/gha-setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -46,18 +46,18 @@ else
readonly docker_pull_args=''
fi

set -o nounset

declare -r rabbitmq_docker_name="$docker_name_prefix-rabbitmq"
declare -r toxiproxy_docker_name="$docker_name_prefix-toxiproxy"

if [[ $1 == 'stop' ]]
then
docker stop "$rabbitmq_docker_name"
docker stop "$toxiproxy_docker_name"
exit 0
fi

set -o nounset

declare -r rabbitmq_docker_name="$docker_name_prefix-rabbitmq"
declare -r toxiproxy_docker_name="$docker_name_prefix-toxiproxy"

function start_toxiproxy
{
if [[ $run_toxiproxy == 'true' ]]
Expand Down
11 changes: 7 additions & 4 deletions .github/workflows/build-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,17 @@ jobs:
run: dotnet format ${{ github.workspace }}/Build.csproj --no-restore --verify-no-changes
- name: Start RabbitMQ
id: start-rabbitmq
run: ${{ github.workspace }}/.ci/ubuntu/gha-setup.sh toxiproxy
# Note: not using toxiproxy yet
# run: ${{ github.workspace }}/.ci/ubuntu/gha-setup.sh toxiproxy
run: ${{ github.workspace }}/.ci/ubuntu/gha-setup.sh
- name: Test
run: dotnet test ${{ github.workspace }}/Build.csproj --no-restore --no-build --logger "console;verbosity=detailed" /p:AltCover=true /p:AltCoverStrongNameKey=${{github.workspace}}/rabbit.snk
- name: Check for errors in RabbitMQ logs
run: ${{ github.workspace}}/.ci/ubuntu/gha-log-check.sh
- name: Maybe collect toxiproxy logs
if: failure()
run: docker logs rabbitmq-amqp-dotnet-client-toxiproxy > ${{ github.workspace }}/.ci/ubuntu/log/toxiproxy.log
# Note: not using toxiproxy yet
# - name: Maybe collect toxiproxy logs
# if: failure()
# run: docker logs rabbitmq-amqp-dotnet-client-toxiproxy > ${{ github.workspace }}/.ci/ubuntu/log/toxiproxy.log
- name: Maybe upload RabbitMQ logs
if: failure()
uses: actions/upload-artifact@v4
Expand Down
1 change: 1 addition & 0 deletions RabbitMQ.AMQP.Client/IConnectionSettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ public interface IConnectionSettings : IEquatable<IConnectionSettings>
string ConnectionName { get; }
string Path { get; }
bool UseSsl { get; }
uint MaxFrameSize { get; }
SaslMechanism SaslMechanism { get; }
ITlsSettings? TlsSettings { get; }

Expand Down
11 changes: 10 additions & 1 deletion RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -182,9 +182,18 @@ private async Task EnsureConnection()
var open = new Open
{
HostName = $"vhost:{_connectionSettings.VirtualHost}",
Properties = new Fields() { [new Symbol("connection_name")] = _connectionSettings.ConnectionName, }
Properties = new Fields()
{
[new Symbol("connection_name")] = _connectionSettings.ConnectionName,
}
};

if (_connectionSettings.MaxFrameSize > uint.MinValue)
{
// Note: when set here, there is no need to set cf.AMQP.MaxFrameSize
open.MaxFrameSize = _connectionSettings.MaxFrameSize;
}

void onOpened(Amqp.IConnection connection, Open open1)
{
Trace.WriteLine(TraceLevel.Verbose, $"Connection opened. Info: {ToString()}");
Expand Down
37 changes: 30 additions & 7 deletions RabbitMQ.AMQP.Client/Impl/ConnectionSettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ public class ConnectionSettingBuilder
private string _scheme = "AMQP";
private string _connectionName = "AMQP.NET";
private string _virtualHost = "/";
private uint _maxFrameSize = Consts.DefaultMaxFrameSize;
private SaslMechanism _saslMechanism = Client.SaslMechanism.Plain;
private IRecoveryConfiguration _recoveryConfiguration = Impl.RecoveryConfiguration.Create();

Expand Down Expand Up @@ -69,6 +70,17 @@ public ConnectionSettingBuilder VirtualHost(string virtualHost)
return this;
}

public ConnectionSettingBuilder MaxFrameSize(uint maxFrameSize)
{
_maxFrameSize = maxFrameSize;
if (_maxFrameSize != uint.MinValue && _maxFrameSize < 512)
{
throw new ArgumentOutOfRangeException(nameof(maxFrameSize),
"maxFrameSize must be greater or equal to 512");
}
return this;
}

public ConnectionSettingBuilder SaslMechanism(SaslMechanism saslMechanism)
{
_saslMechanism = saslMechanism;
Expand All @@ -89,9 +101,9 @@ public ConnectionSettingBuilder RecoveryConfiguration(IRecoveryConfiguration rec

public ConnectionSettings Build()
{
var c = new ConnectionSettings(_host, _port, _user,
var c = new ConnectionSettings(_scheme, _host, _port, _user,
_password, _virtualHost,
_scheme, _connectionName, _saslMechanism)
_connectionName, _saslMechanism, _maxFrameSize)
{
Recovery = (RecoveryConfiguration)_recoveryConfiguration
};
Expand All @@ -106,8 +118,9 @@ public ConnectionSettings Build()
public class ConnectionSettings : IConnectionSettings
{
private readonly Address _address;
private readonly string _connectionName = "";
private readonly string _virtualHost = "/";
private readonly string _connectionName = "";
private readonly uint _maxFrameSize = Consts.DefaultMaxFrameSize;
private readonly ITlsSettings? _tlsSettings;
private readonly SaslMechanism _saslMechanism = SaslMechanism.Plain;

Expand All @@ -122,17 +135,27 @@ public ConnectionSettings(string address, ITlsSettings? tlsSettings = null)
}
}

public ConnectionSettings(string host, int port,
public ConnectionSettings(string scheme, string host, int port,
string? user, string? password,
string virtualHost, string scheme, string connectionName,
SaslMechanism saslMechanism, ITlsSettings? tlsSettings = null)
string virtualHost, string connectionName,
SaslMechanism saslMechanism,
uint maxFrameSize = Consts.DefaultMaxFrameSize,
ITlsSettings? tlsSettings = null)
{
_address = new Address(host: host, port: port,
user: user, password: password,
path: "/", scheme: scheme);
_connectionName = connectionName;
_virtualHost = virtualHost;
_saslMechanism = saslMechanism;

_maxFrameSize = maxFrameSize;
if (_maxFrameSize != uint.MinValue && _maxFrameSize < 512)
{
throw new ArgumentOutOfRangeException(nameof(maxFrameSize),
"maxFrameSize must be greater or equal to 512");
}

_tlsSettings = tlsSettings;

if (_address.UseSsl && _tlsSettings == null)
Expand All @@ -150,8 +173,8 @@ public ConnectionSettings(string host, int port,
public string ConnectionName => _connectionName;
public string Path => _address.Path;
public bool UseSsl => _address.UseSsl;
public uint MaxFrameSize => _maxFrameSize;
public SaslMechanism SaslMechanism => _saslMechanism;

public ITlsSettings? TlsSettings => _tlsSettings;
public IRecoveryConfiguration Recovery { get; init; } = RecoveryConfiguration.Create();

Expand Down
4 changes: 4 additions & 0 deletions RabbitMQ.AMQP.Client/Impl/Consts.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,8 @@ public class Consts
public const string Queues = "queues";
public const string Bindings = "bindings";

/// <summary>
/// <code>uint.MinValue</code> means "no limit"
/// </summary>
public const uint DefaultMaxFrameSize = uint.MinValue; // NOTE: Azure/amqpnetlite uses 256 * 1024
}
6 changes: 5 additions & 1 deletion RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#nullable enable
abstract RabbitMQ.AMQP.Client.Impl.AbstractLifeCycle.CloseAsync() -> System.Threading.Tasks.Task!
const RabbitMQ.AMQP.Client.Impl.Consts.Bindings = "bindings" -> string!
const RabbitMQ.AMQP.Client.Impl.Consts.DefaultMaxFrameSize = 0 -> uint
const RabbitMQ.AMQP.Client.Impl.Consts.Exchanges = "exchanges" -> string!
const RabbitMQ.AMQP.Client.Impl.Consts.Key = "key" -> string!
const RabbitMQ.AMQP.Client.Impl.Consts.Queues = "queues" -> string!
Expand Down Expand Up @@ -80,6 +81,7 @@ RabbitMQ.AMQP.Client.IConnection.PublisherBuilder() -> RabbitMQ.AMQP.Client.IPub
RabbitMQ.AMQP.Client.IConnectionSettings
RabbitMQ.AMQP.Client.IConnectionSettings.ConnectionName.get -> string!
RabbitMQ.AMQP.Client.IConnectionSettings.Host.get -> string!
RabbitMQ.AMQP.Client.IConnectionSettings.MaxFrameSize.get -> uint
RabbitMQ.AMQP.Client.IConnectionSettings.Password.get -> string?
RabbitMQ.AMQP.Client.IConnectionSettings.Path.get -> string!
RabbitMQ.AMQP.Client.IConnectionSettings.Port.get -> int
Expand Down Expand Up @@ -328,6 +330,7 @@ RabbitMQ.AMQP.Client.Impl.ConnectionSettingBuilder
RabbitMQ.AMQP.Client.Impl.ConnectionSettingBuilder.Build() -> RabbitMQ.AMQP.Client.Impl.ConnectionSettings!
RabbitMQ.AMQP.Client.Impl.ConnectionSettingBuilder.ConnectionName(string! connectionName) -> RabbitMQ.AMQP.Client.Impl.ConnectionSettingBuilder!
RabbitMQ.AMQP.Client.Impl.ConnectionSettingBuilder.Host(string! host) -> RabbitMQ.AMQP.Client.Impl.ConnectionSettingBuilder!
RabbitMQ.AMQP.Client.Impl.ConnectionSettingBuilder.MaxFrameSize(uint maxFrameSize) -> RabbitMQ.AMQP.Client.Impl.ConnectionSettingBuilder!
RabbitMQ.AMQP.Client.Impl.ConnectionSettingBuilder.Password(string! password) -> RabbitMQ.AMQP.Client.Impl.ConnectionSettingBuilder!
RabbitMQ.AMQP.Client.Impl.ConnectionSettingBuilder.Port(int port) -> RabbitMQ.AMQP.Client.Impl.ConnectionSettingBuilder!
RabbitMQ.AMQP.Client.Impl.ConnectionSettingBuilder.RecoveryConfiguration(RabbitMQ.AMQP.Client.IRecoveryConfiguration! recoveryConfiguration) -> RabbitMQ.AMQP.Client.Impl.ConnectionSettingBuilder!
Expand All @@ -338,10 +341,11 @@ RabbitMQ.AMQP.Client.Impl.ConnectionSettingBuilder.VirtualHost(string! virtualHo
RabbitMQ.AMQP.Client.Impl.ConnectionSettings
RabbitMQ.AMQP.Client.Impl.ConnectionSettings.ConnectionName.get -> string!
RabbitMQ.AMQP.Client.Impl.ConnectionSettings.ConnectionSettings(string! address, RabbitMQ.AMQP.Client.ITlsSettings? tlsSettings = null) -> void
RabbitMQ.AMQP.Client.Impl.ConnectionSettings.ConnectionSettings(string! host, int port, string? user, string? password, string! virtualHost, string! scheme, string! connectionName, RabbitMQ.AMQP.Client.SaslMechanism! saslMechanism, RabbitMQ.AMQP.Client.ITlsSettings? tlsSettings = null) -> void
RabbitMQ.AMQP.Client.Impl.ConnectionSettings.ConnectionSettings(string! scheme, string! host, int port, string? user, string? password, string! virtualHost, string! connectionName, RabbitMQ.AMQP.Client.SaslMechanism! saslMechanism, uint maxFrameSize = 0, RabbitMQ.AMQP.Client.ITlsSettings? tlsSettings = null) -> void
RabbitMQ.AMQP.Client.Impl.ConnectionSettings.Equals(RabbitMQ.AMQP.Client.IConnectionSettings? other) -> bool
RabbitMQ.AMQP.Client.Impl.ConnectionSettings.Equals(RabbitMQ.AMQP.Client.Impl.ConnectionSettings! other) -> bool
RabbitMQ.AMQP.Client.Impl.ConnectionSettings.Host.get -> string!
RabbitMQ.AMQP.Client.Impl.ConnectionSettings.MaxFrameSize.get -> uint
RabbitMQ.AMQP.Client.Impl.ConnectionSettings.Password.get -> string?
RabbitMQ.AMQP.Client.Impl.ConnectionSettings.Path.get -> string!
RabbitMQ.AMQP.Client.Impl.ConnectionSettings.Port.get -> int
Expand Down
9 changes: 6 additions & 3 deletions Tests/BindingsTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ await management.Binding().SourceExchange(sourceExchange).DestinationQueue(queue
await management.QueueDeletion().Delete(queueDestination);
await connection.CloseAsync();
SystemUtils.WaitUntil(() => !SystemUtils.ExchangeExists(sourceExchange));
SystemUtils.WaitUntil(() => !SystemUtils.QueueExists(queueDestination));

await SystemUtils.WaitUntilQueueDeletedAsync(queueDestination);
}

[Fact]
Expand Down Expand Up @@ -75,8 +76,9 @@ await management.Binding().SourceExchange("exchange_bind_two_times").Destination

await management.QueueDeletion().Delete("queue_bind_two_times");
await connection.CloseAsync();

SystemUtils.WaitUntil(() => !SystemUtils.ExchangeExists("exchange_bind_two_times"));
SystemUtils.WaitUntil(() => !SystemUtils.QueueExists("queue_bind_two_times"));
await SystemUtils.WaitUntilQueueDeletedAsync("queue_bind_two_times");
}


Expand Down Expand Up @@ -154,8 +156,9 @@ await management.Binding().SourceExchange("exchange_bindings_with_arguments")
await management.ExchangeDeletion().Delete("exchange_bindings_with_arguments");
await management.QueueDeletion().Delete("queue_bindings_with_arguments");
await connection.CloseAsync();

SystemUtils.WaitUntil(() => !SystemUtils.ExchangeExists("exchange_bindings_with_arguments"));
SystemUtils.WaitUntil(() => !SystemUtils.QueueExists("queue_bindings_with_arguments"));
await SystemUtils.WaitUntilQueueDeletedAsync("queue_bindings_with_arguments");
}

// TODO: test with multi-bindings with parameters with list as value
Expand Down
11 changes: 8 additions & 3 deletions Tests/ConnectionRecoveryTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -232,10 +232,13 @@ public async Task RecoveryTopologyShouldRecoverTheTempQueues()
await SystemUtils.WaitUntilConnectionIsKilled(connectionName);
await completion.Task.WaitAsync(TimeSpan.FromSeconds(10));
SystemUtils.WaitUntil(() => recoveryEvents == 2);
SystemUtils.WaitUntil(() => SystemUtils.QueueExists(queueName));

await SystemUtils.WaitUntilQueueExistsAsync(queueName);

await connection.CloseAsync();
SystemUtils.WaitUntil(() => !SystemUtils.QueueExists(queueName));

await SystemUtils.WaitUntilQueueDeletedAsync(queueName);

TestOutputHelper.WriteLine(
$"Recover: Queue count: {management.TopologyListener().QueueCount()} , events: {recoveryEvents}");
Assert.Equal(0, management.TopologyListener().QueueCount());
Expand Down Expand Up @@ -277,7 +280,9 @@ public async Task RecoveryTopologyShouldNotRecoverTheTempQueues()

await SystemUtils.WaitUntilConnectionIsKilled(connectionName);
await completion.Task.WaitAsync(TimeSpan.FromSeconds(10));
SystemUtils.WaitUntil(() => SystemUtils.QueueExists(queueName) == false);

await SystemUtils.WaitUntilQueueDeletedAsync(queueName);

await connection.CloseAsync();
Assert.Equal(0, management.TopologyListener().QueueCount());
TestOutputHelper.WriteLine(
Expand Down
12 changes: 6 additions & 6 deletions Tests/ConnectionTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ public class ConnectionTests(ITestOutputHelper output)
[Fact]
public void ValidateAddress()
{
ConnectionSettings connectionSettings = new("localhost", 5672, "guest-user",
"guest-password", "vhost_1", "amqp1", "connection_name", SaslMechanism.External);
ConnectionSettings connectionSettings = new("amqp1", "localhost", 5672, "guest-user",
"guest-password", "vhost_1", "connection_name", SaslMechanism.External);
Assert.Equal("localhost", connectionSettings.Host);
Assert.Equal(5672, connectionSettings.Port);
Assert.Equal("guest-user", connectionSettings.User);
Expand All @@ -23,13 +23,13 @@ public void ValidateAddress()
Assert.Equal("amqp1", connectionSettings.Scheme);
Assert.Equal(SaslMechanism.External, connectionSettings.SaslMechanism);

ConnectionSettings second = new("localhost", 5672, "guest-user",
"guest-password", "path/", "amqp1", "connection_name", SaslMechanism.External);
ConnectionSettings second = new("amqp1", "localhost", 5672, "guest-user",
"guest-password", "path/", "connection_name", SaslMechanism.External);

Assert.Equal(connectionSettings, second);

ConnectionSettings third = new("localhost", 5672, "guest-user",
"guest-password", "path/", "amqp2", "connection_name", SaslMechanism.Plain);
ConnectionSettings third = new("amqp2", "localhost", 5672, "guest-user",
"guest-password", "path/", "connection_name", SaslMechanism.Plain);

Assert.NotEqual(connectionSettings, third);
}
Expand Down
Loading

0 comments on commit d36a7b4

Please sign in to comment.