Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IEnvironment XML doc #105

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 17 additions & 13 deletions RabbitMQ.AMQP.Client/IEnvironment.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,38 +2,42 @@
// and the Mozilla Public License, version 2.0.
// Copyright (c) 2017-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.

using System.Collections.ObjectModel;
using System.Threading.Tasks;

namespace RabbitMQ.AMQP.Client
{
/// <summary>
/// Interface to create IConnections and manage them.
/// <para>
/// The <see cref="IEnvironment"/> is the main entry point to a node or a cluster of nodes.
/// </para>
/// <para>
/// The <see cref="CreateConnectionAsync()"/> method allows creating <see cref="IConnection"/> instances.
/// An application is expected to maintain a single <see cref="IEnvironment"/> instance and to close that instance
/// upon application exit.
/// </para>
/// <para>
/// <see cref="IEnvironment"/> instances are expected to be thread-safe.
/// </para>
/// </summary>
public interface IEnvironment
{
/// <summary>
/// Create a new connection with the given connection settings.
/// Create a new <see cref="IConnection"/> with the given connection settings.
/// </summary>
/// <param name="connectionSettings"></param>
/// <returns>IConnection</returns>
/// <returns><see cref="Task{IConnection}"/> instance.</returns>
public Task<IConnection> CreateConnectionAsync(ConnectionSettings connectionSettings);

/// <summary>
/// Create a new connection with the default connection settings.
/// Create a new <see cref="IConnection"/> with the default connection settings.
/// </summary>
/// <returns>IConnection</returns>
/// <returns><see cref="Task{IConnection}"/> instance.</returns>
public Task<IConnection> CreateConnectionAsync();

/// <summary>
/// Get all connections.
/// Close this environment and its resources.
/// </summary>
public ReadOnlyCollection<IConnection> GetConnections();

/// <summary>
/// Close all connections.
/// </summary>
/// <returns></returns>
/// <returns><see cref="Task"/></returns>
// TODO cancellation token
Task CloseAsync();
}
Expand Down
37 changes: 32 additions & 5 deletions RabbitMQ.AMQP.Client/Impl/AmqpEnvironment.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,21 @@
// Copyright (c) 2017-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.

using System.Collections.Concurrent;
using System.Collections.ObjectModel;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace RabbitMQ.AMQP.Client.Impl
{
/// <summary>
/// <para>
/// <see cref="AmqpEnvironment"/> is the implementation of <see cref="IEnvironment"/>.
/// </para>
/// <para>
/// The <see cref="CreateConnectionAsync()"/> method allows creating <see cref="IConnection"/> instances.
/// </para>
/// </summary>
public class AmqpEnvironment : IEnvironment
{
private ConnectionSettings ConnectionSettings { get; }
Expand All @@ -23,12 +31,24 @@ private AmqpEnvironment(ConnectionSettings connectionSettings, IMetricsReporter?
_metricsReporter = metricsReporter;
}

// TODO to play nicely with IoC containers, we should not have static Create methods
/// <summary>
/// Create a new <see cref="IEnvironment"/> instance, using the provided <see cref="ConnectionSettings"/>
/// and optional <see cref="IMetricsReporter"/>
/// </summary>
/// <param name="connectionSettings"></param>
/// <param name="metricsReporter"></param>
/// <returns><see cref="IEnvironment"/> instance.</returns>
public static IEnvironment Create(ConnectionSettings connectionSettings, IMetricsReporter? metricsReporter = default)
{
// TODO to play nicely with IoC containers, we should not have static Create methods
return new AmqpEnvironment(connectionSettings, metricsReporter);
}

/// <summary>
/// Create a new <see cref="IConnection"/> instance, using the provided <see cref="ConnectionSettings"/>.
/// </summary>
/// <param name="connectionSettings"></param>
/// <returns><see cref="Task{IConnection}"/> instance.</returns>
public async Task<IConnection> CreateConnectionAsync(ConnectionSettings connectionSettings)
{
IConnection c = await AmqpConnection.CreateAsync(connectionSettings, _metricsReporter).ConfigureAwait(false);
Expand All @@ -49,6 +69,10 @@ public async Task<IConnection> CreateConnectionAsync(ConnectionSettings connecti
return c;
}

/// <summary>
/// Create a new <see cref="IConnection"/> instance, using the <see cref="IEnvironment"/> <see cref="ConnectionSettings"/>.
/// </summary>
/// <returns><see cref="Task{IConnection}"/> instance.</returns>
public Task<IConnection> CreateConnectionAsync()
{
if (ConnectionSettings is null)
Expand All @@ -59,13 +83,16 @@ public Task<IConnection> CreateConnectionAsync()
return CreateConnectionAsync(ConnectionSettings);
}

public ReadOnlyCollection<IConnection> GetConnections() =>
new(_connections.Values.ToList());

/// <summary>
/// Close this environment and its resources.
/// </summary>
/// <returns><see cref="Task"/></returns>
// TODO cancellation token
public Task CloseAsync()
{
return Task.WhenAll(_connections.Values.Select(c => c.CloseAsync()));
}

internal IList<IConnection> Connections => _connections.Values.ToList();
}
}
2 changes: 0 additions & 2 deletions RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,6 @@ RabbitMQ.AMQP.Client.IEnvironment
RabbitMQ.AMQP.Client.IEnvironment.CloseAsync() -> System.Threading.Tasks.Task!
RabbitMQ.AMQP.Client.IEnvironment.CreateConnectionAsync() -> System.Threading.Tasks.Task<RabbitMQ.AMQP.Client.IConnection!>!
RabbitMQ.AMQP.Client.IEnvironment.CreateConnectionAsync(RabbitMQ.AMQP.Client.ConnectionSettings! connectionSettings) -> System.Threading.Tasks.Task<RabbitMQ.AMQP.Client.IConnection!>!
RabbitMQ.AMQP.Client.IEnvironment.GetConnections() -> System.Collections.ObjectModel.ReadOnlyCollection<RabbitMQ.AMQP.Client.IConnection!>!
RabbitMQ.AMQP.Client.IExchangeSpecification
RabbitMQ.AMQP.Client.IExchangeSpecification.Argument(string! key, object! value) -> RabbitMQ.AMQP.Client.IExchangeSpecification!
RabbitMQ.AMQP.Client.IExchangeSpecification.Arguments(System.Collections.Generic.Dictionary<string!, object!>! arguments) -> RabbitMQ.AMQP.Client.IExchangeSpecification!
Expand Down Expand Up @@ -361,7 +360,6 @@ RabbitMQ.AMQP.Client.Impl.AmqpEnvironment
RabbitMQ.AMQP.Client.Impl.AmqpEnvironment.CloseAsync() -> System.Threading.Tasks.Task!
RabbitMQ.AMQP.Client.Impl.AmqpEnvironment.CreateConnectionAsync() -> System.Threading.Tasks.Task<RabbitMQ.AMQP.Client.IConnection!>!
RabbitMQ.AMQP.Client.Impl.AmqpEnvironment.CreateConnectionAsync(RabbitMQ.AMQP.Client.ConnectionSettings! connectionSettings) -> System.Threading.Tasks.Task<RabbitMQ.AMQP.Client.IConnection!>!
RabbitMQ.AMQP.Client.Impl.AmqpEnvironment.GetConnections() -> System.Collections.ObjectModel.ReadOnlyCollection<RabbitMQ.AMQP.Client.IConnection!>!
RabbitMQ.AMQP.Client.Impl.AmqpExchangeSpecification
RabbitMQ.AMQP.Client.Impl.AmqpExchangeSpecification.AmqpExchangeSpecification(RabbitMQ.AMQP.Client.Impl.AmqpManagement! management) -> void
RabbitMQ.AMQP.Client.Impl.AmqpExchangeSpecification.Argument(string! key, object! value) -> RabbitMQ.AMQP.Client.IExchangeSpecification!
Expand Down
6 changes: 4 additions & 2 deletions Tests/ClusterTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,17 @@ public async Task CreateConnectionWithEnvironmentAndMultipleUris()
ConnectionSettings connectionSettings = connectionSettingBuilder.Build();

IEnvironment env = AmqpEnvironment.Create(connectionSettings);
var amqpEnv = (AmqpEnvironment)env;

// Note: by using _connection, the test will dispose the object on teardown
_connection = await env.CreateConnectionAsync();
Assert.NotNull(_connection);
Assert.NotEmpty(env.GetConnections());

Assert.NotEmpty(amqpEnv.Connections);

await env.CloseAsync();

Assert.Equal(State.Closed, _connection.State);
Assert.Empty(env.GetConnections());
Assert.Empty(amqpEnv.Connections);
}
}
40 changes: 25 additions & 15 deletions Tests/EnvironmentTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,82 +18,92 @@ public class EnvironmentTests(ITestOutputHelper testOutputHelper)
public async Task CreateAConnectionWithEnvironment()
{
IEnvironment env = AmqpEnvironment.Create(ConnectionSettingsBuilder.Create().Build());
var amqpEnv = (AmqpEnvironment)env;

IConnection connection = await env.CreateConnectionAsync();

Assert.NotNull(connection);
Assert.NotEmpty(env.GetConnections());
Assert.NotEmpty(amqpEnv.Connections);
await env.CloseAsync();
Assert.Equal(State.Closed, connection.State);
Assert.Empty(env.GetConnections());
Assert.Empty(amqpEnv.Connections);
}

[Fact]
public async Task CreateMoreConnectionsWithDifferentParametersEnvironment()
{
string envConnectionName = "EnvironmentConnection_" + Guid.NewGuid();

IEnvironment env = AmqpEnvironment.Create(
ConnectionSettingsBuilder.Create().ContainerId(envConnectionName).Build());
var amqpEnv = (AmqpEnvironment)env;

IConnection connection = await env.CreateConnectionAsync();

Assert.NotNull(connection);
await WaitUntilConnectionIsOpen(envConnectionName);
Assert.NotEmpty(env.GetConnections());
Assert.Single(env.GetConnections());
Assert.NotEmpty(amqpEnv.Connections);
Assert.Single(amqpEnv.Connections);

string envConnectionName2 = "EnvironmentConnection2_" + Guid.NewGuid();

IConnection connection2 = await env.CreateConnectionAsync(
ConnectionSettingsBuilder.Create().ContainerId(envConnectionName2).Build());
Assert.NotNull(connection2);
Assert.Equal(2, env.GetConnections().Count);
Assert.Equal(2, amqpEnv.Connections.Count);
await WaitUntilConnectionIsOpen(envConnectionName2);

await env.CloseAsync();
Assert.Equal(State.Closed, connection.State);
Assert.Equal(State.Closed, connection2.State);
Assert.Empty(env.GetConnections());
Assert.Empty(amqpEnv.Connections);
}

[Fact]
public async Task CloseConnectionsIndividually()
{
string envConnectionName = "EnvironmentConnection_" + Guid.NewGuid();

IEnvironment env = AmqpEnvironment.Create(
ConnectionSettingsBuilder.Create().ContainerId(envConnectionName).Build());
var amqpEnv = (AmqpEnvironment)env;

IConnection connection = await env.CreateConnectionAsync();

await WaitUntilConnectionIsOpen(envConnectionName);
Assert.Single(env.GetConnections());
Assert.Equal(1, env.GetConnections()[0].Id);
Assert.Single(amqpEnv.Connections);
Assert.Equal(1, amqpEnv.Connections[0].Id);

string envConnectionName2 = "EnvironmentConnection2_" + Guid.NewGuid().ToString();
IConnection connection2 = await env.CreateConnectionAsync(
ConnectionSettingsBuilder.Create().ContainerId(envConnectionName2).Build());
Assert.Equal(2, env.GetConnections().Count);
Assert.Equal(2, env.GetConnections()[1].Id);
Assert.Equal(2, amqpEnv.Connections.Count);
Assert.Equal(2, amqpEnv.Connections[1].Id);
await WaitUntilConnectionIsOpen(envConnectionName2);

string envConnectionName3 = "EnvironmentConnection3_" + Guid.NewGuid().ToString();
IConnection connection3 = await env.CreateConnectionAsync(
ConnectionSettingsBuilder.Create().ContainerId(envConnectionName3).Build());
Assert.Equal(3, env.GetConnections().Count);
Assert.Equal(3, env.GetConnections()[2].Id);
Assert.Equal(3, amqpEnv.Connections.Count);
Assert.Equal(3, amqpEnv.Connections[2].Id);
await WaitUntilConnectionIsOpen(envConnectionName3);

// closing
await connection.CloseAsync();
Assert.Equal(State.Closed, connection.State);
Assert.Equal(2, env.GetConnections().Count);
Assert.Equal(2, amqpEnv.Connections.Count);

await WaitUntilConnectionIsClosed(envConnectionName);
await connection2.CloseAsync();
Assert.Equal(State.Closed, connection2.State);
Assert.Single(env.GetConnections());
Assert.Single(amqpEnv.Connections);
await WaitUntilConnectionIsClosed(envConnectionName2);

await connection3.CloseAsync();
Assert.Equal(State.Closed, connection3.State);
await WaitUntilConnectionIsClosed(envConnectionName3);

Assert.Empty(env.GetConnections());
Assert.Empty(amqpEnv.Connections);
await env.CloseAsync();
}
}
Loading