diff --git a/sandbox/ConsoleApp/Program.cs b/sandbox/ConsoleApp/Program.cs index f3850d5..c4e901f 100644 --- a/sandbox/ConsoleApp/Program.cs +++ b/sandbox/ConsoleApp/Program.cs @@ -22,6 +22,9 @@ // create connection(default, connect to nats://localhost:4222) +// var conn = new NatsConnectionPool(1).GetConnection(); + + await using var conn = new NatsConnection(); conn.OnConnectingAsync = async x => { @@ -33,6 +36,7 @@ + // Server await conn.SubscribeRequestAsync("foobar", (int x) => $"Hello {x}"); diff --git a/src/AlterNats.Hosting/NatsHostingExtensions.cs b/src/AlterNats.Hosting/NatsHostingExtensions.cs index 1fda662..d691ed6 100644 --- a/src/AlterNats.Hosting/NatsHostingExtensions.cs +++ b/src/AlterNats.Hosting/NatsHostingExtensions.cs @@ -10,7 +10,7 @@ public static class NatsHostingExtensions /// Add NatsConnection/Pool to ServiceCollection. When poolSize = 1, registered `NatsConnection` and `INatsCommand` as singleton. /// Others, registered `NatsConnectionPool` as singleton, `NatsConnection` and `INatsCommand` as transient(get from pool). /// - public static IServiceCollection AddNats(this IServiceCollection services, int poolSize = 1, Func? configureOptions = null) + public static IServiceCollection AddNats(this IServiceCollection services, int poolSize = 1, Func? configureOptions = null, Action? configureConnection = null) { poolSize = Math.Max(poolSize, 1); @@ -24,7 +24,7 @@ public static IServiceCollection AddNats(this IServiceCollection services, int p options = configureOptions(options); } - return new NatsConnectionPool(poolSize, options); + return new NatsConnectionPool(poolSize, options, configureConnection ?? (_ => { })); }); services.TryAddTransient(static provider => @@ -48,7 +48,12 @@ public static IServiceCollection AddNats(this IServiceCollection services, int p { options = configureOptions(options); } - return new NatsConnection(options); + var conn = new NatsConnection(options); + if (configureConnection != null) + { + configureConnection(conn); + } + return conn; }); services.TryAddSingleton(static provider => @@ -63,7 +68,7 @@ public static IServiceCollection AddNats(this IServiceCollection services, int p /// /// Add Singleton NatsShardingConnection to ServiceCollection. /// - public static IServiceCollection AddNats(this IServiceCollection services, int poolSize, string[] urls, Func? configureOptions = null) + public static IServiceCollection AddNats(this IServiceCollection services, int poolSize, string[] urls, Func? configureOptions = null, Action? configureConnection = null) { services.TryAddSingleton(provider => { @@ -73,7 +78,7 @@ public static IServiceCollection AddNats(this IServiceCollection services, int p options = configureOptions(options); } - return new NatsShardingConnection(poolSize, options, urls); + return new NatsShardingConnection(poolSize, options, urls, configureConnection ?? (_ => { })); }); return services; diff --git a/src/AlterNats/NatsConnectionPool.cs b/src/AlterNats/NatsConnectionPool.cs index e784db3..d1d4580 100644 --- a/src/AlterNats/NatsConnectionPool.cs +++ b/src/AlterNats/NatsConnectionPool.cs @@ -6,29 +6,36 @@ public sealed class NatsConnectionPool : IAsyncDisposable int index = -1; public NatsConnectionPool() - : this(Environment.ProcessorCount / 2, NatsOptions.Default) + : this(Environment.ProcessorCount / 2, NatsOptions.Default, _ => { }) { } public NatsConnectionPool(int poolSize) - : this(poolSize, NatsOptions.Default) + : this(poolSize, NatsOptions.Default, _ => { }) { } public NatsConnectionPool(NatsOptions options) - : this(Environment.ProcessorCount / 2, options) + : this(Environment.ProcessorCount / 2, options, _ => { }) { } public NatsConnectionPool(int poolSize, NatsOptions options) + : this(poolSize, options, _ => { }) + { + } + + public NatsConnectionPool(int poolSize, NatsOptions options, Action configureConnection) { poolSize = Math.Max(1, poolSize); connections = new NatsConnection[poolSize]; for (int i = 0; i < connections.Length; i++) { var name = (options.ConnectOptions.Name == null) ? $"#{i}" : $"{options.ConnectOptions.Name}#{i}"; - connections[i] = new NatsConnection(options with { ConnectOptions = options.ConnectOptions with { Name = name } }); + var conn = new NatsConnection(options with { ConnectOptions = options.ConnectOptions with { Name = name } }); + configureConnection(conn); + connections[i] = conn; } } diff --git a/src/AlterNats/NatsShardingConnection.cs b/src/AlterNats/NatsShardingConnection.cs index 4eba9c9..0b85cae 100644 --- a/src/AlterNats/NatsShardingConnection.cs +++ b/src/AlterNats/NatsShardingConnection.cs @@ -9,12 +9,17 @@ public sealed class NatsShardingConnection : IAsyncDisposable readonly NatsConnectionPool[] pools; public NatsShardingConnection(int poolSize, NatsOptions options, string[] urls) + : this(poolSize, options, urls, _ => { }) + { + } + + public NatsShardingConnection(int poolSize, NatsOptions options, string[] urls, Action configureConnection) { poolSize = Math.Max(1, poolSize); pools = new NatsConnectionPool[urls.Length]; for (int i = 0; i < urls.Length; i++) { - pools[i] = new NatsConnectionPool(poolSize, options with { Url = urls[i] }); + pools[i] = new NatsConnectionPool(poolSize, options with { Url = urls[i] }, configureConnection); } }