Skip to content
This repository has been archived by the owner on Jul 17, 2023. It is now read-only.

Commit

Permalink
NatsConnectoinPool, NatsShardingConnection, AddNats support configure…
Browse files Browse the repository at this point in the history
…Connection
  • Loading branch information
neuecc committed Jun 9, 2022
1 parent a575eae commit 48e8b0c
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 10 deletions.
4 changes: 4 additions & 0 deletions sandbox/ConsoleApp/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
// create connection(default, connect to nats://localhost:4222)


// var conn = new NatsConnectionPool(1).GetConnection();


await using var conn = new NatsConnection();
conn.OnConnectingAsync = async x =>
{
Expand All @@ -33,6 +36,7 @@




// Server
await conn.SubscribeRequestAsync("foobar", (int x) => $"Hello {x}");

Expand Down
15 changes: 10 additions & 5 deletions src/AlterNats.Hosting/NatsHostingExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ public static class NatsHostingExtensions
/// Add NatsConnection/Pool to ServiceCollection. When poolSize = 1, registered `NatsConnection` and `INatsCommand` as singleton.
/// Others, registered `NatsConnectionPool` as singleton, `NatsConnection` and `INatsCommand` as transient(get from pool).
/// </summary>
public static IServiceCollection AddNats(this IServiceCollection services, int poolSize = 1, Func<NatsOptions, NatsOptions>? configureOptions = null)
public static IServiceCollection AddNats(this IServiceCollection services, int poolSize = 1, Func<NatsOptions, NatsOptions>? configureOptions = null, Action<NatsConnection>? configureConnection = null)
{
poolSize = Math.Max(poolSize, 1);

Expand All @@ -24,7 +24,7 @@ public static IServiceCollection AddNats(this IServiceCollection services, int p
options = configureOptions(options);
}

return new NatsConnectionPool(poolSize, options);
return new NatsConnectionPool(poolSize, options, configureConnection ?? (_ => { }));
});

services.TryAddTransient<NatsConnection>(static provider =>
Expand All @@ -48,7 +48,12 @@ public static IServiceCollection AddNats(this IServiceCollection services, int p
{
options = configureOptions(options);
}
return new NatsConnection(options);
var conn = new NatsConnection(options);
if (configureConnection != null)
{
configureConnection(conn);
}
return conn;
});

services.TryAddSingleton<INatsCommand>(static provider =>
Expand All @@ -63,7 +68,7 @@ public static IServiceCollection AddNats(this IServiceCollection services, int p
/// <summary>
/// Add Singleton NatsShardingConnection to ServiceCollection.
/// </summary>
public static IServiceCollection AddNats(this IServiceCollection services, int poolSize, string[] urls, Func<NatsOptions, NatsOptions>? configureOptions = null)
public static IServiceCollection AddNats(this IServiceCollection services, int poolSize, string[] urls, Func<NatsOptions, NatsOptions>? configureOptions = null, Action<NatsConnection>? configureConnection = null)
{
services.TryAddSingleton<NatsShardingConnection>(provider =>
{
Expand All @@ -73,7 +78,7 @@ public static IServiceCollection AddNats(this IServiceCollection services, int p
options = configureOptions(options);
}

return new NatsShardingConnection(poolSize, options, urls);
return new NatsShardingConnection(poolSize, options, urls, configureConnection ?? (_ => { }));
});

return services;
Expand Down
15 changes: 11 additions & 4 deletions src/AlterNats/NatsConnectionPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,29 +6,36 @@ public sealed class NatsConnectionPool : IAsyncDisposable
int index = -1;

public NatsConnectionPool()
: this(Environment.ProcessorCount / 2, NatsOptions.Default)
: this(Environment.ProcessorCount / 2, NatsOptions.Default, _ => { })
{
}

public NatsConnectionPool(int poolSize)
: this(poolSize, NatsOptions.Default)
: this(poolSize, NatsOptions.Default, _ => { })
{
}

public NatsConnectionPool(NatsOptions options)
: this(Environment.ProcessorCount / 2, options)
: this(Environment.ProcessorCount / 2, options, _ => { })
{

}

public NatsConnectionPool(int poolSize, NatsOptions options)
: this(poolSize, options, _ => { })
{
}

public NatsConnectionPool(int poolSize, NatsOptions options, Action<NatsConnection> configureConnection)
{
poolSize = Math.Max(1, poolSize);
connections = new NatsConnection[poolSize];
for (int i = 0; i < connections.Length; i++)
{
var name = (options.ConnectOptions.Name == null) ? $"#{i}" : $"{options.ConnectOptions.Name}#{i}";
connections[i] = new NatsConnection(options with { ConnectOptions = options.ConnectOptions with { Name = name } });
var conn = new NatsConnection(options with { ConnectOptions = options.ConnectOptions with { Name = name } });
configureConnection(conn);
connections[i] = conn;
}
}

Expand Down
7 changes: 6 additions & 1 deletion src/AlterNats/NatsShardingConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,17 @@ public sealed class NatsShardingConnection : IAsyncDisposable
readonly NatsConnectionPool[] pools;

public NatsShardingConnection(int poolSize, NatsOptions options, string[] urls)
: this(poolSize, options, urls, _ => { })
{
}

public NatsShardingConnection(int poolSize, NatsOptions options, string[] urls, Action<NatsConnection> configureConnection)
{
poolSize = Math.Max(1, poolSize);
pools = new NatsConnectionPool[urls.Length];
for (int i = 0; i < urls.Length; i++)
{
pools[i] = new NatsConnectionPool(poolSize, options with { Url = urls[i] });
pools[i] = new NatsConnectionPool(poolSize, options with { Url = urls[i] }, configureConnection);
}
}

Expand Down

0 comments on commit 48e8b0c

Please sign in to comment.