From 6bf384d4748175ebcc7df98468c210b812da00ae Mon Sep 17 00:00:00 2001 From: Caleb Lloyd Date: Fri, 1 Jul 2022 13:35:59 -0400 Subject: [PATCH] web socket tests Signed-off-by: Caleb Lloyd --- AlterNats.sln | 24 ++++ sandbox/BlazorWasm/BlazorWasm.sln | 64 --------- sandbox/BlazorWasm/README.md | 33 +++++ sandbox/BlazorWasm/docker/nats.sh | 12 -- .../BlazorWasm/{docker => }/nats-server.conf | 4 - src/AlterNats/NatsConnection.cs | 21 ++- src/AlterNats/NatsUri.cs | 42 ++++-- .../NatsConnectionTest.Sharding.cs | 13 +- .../NatsConnectionTest.Transports.cs | 15 +++ tests/AlterNats.Tests/NatsConnectionTest.cs | 51 ++++---- tests/AlterNats.Tests/_NatsServer.cs | 123 +++++++++++++----- tests/AlterNats.Tests/_NatsServerOptions.cs | 96 ++++++++++++++ tests/AlterNats.Tests/test.runsettings | 2 +- 13 files changed, 332 insertions(+), 168 deletions(-) delete mode 100644 sandbox/BlazorWasm/BlazorWasm.sln create mode 100644 sandbox/BlazorWasm/README.md delete mode 100755 sandbox/BlazorWasm/docker/nats.sh rename sandbox/BlazorWasm/{docker => }/nats-server.conf (58%) create mode 100644 tests/AlterNats.Tests/NatsConnectionTest.Transports.cs create mode 100644 tests/AlterNats.Tests/_NatsServerOptions.cs diff --git a/AlterNats.sln b/AlterNats.sln index ae3edf0..ed1dcc5 100644 --- a/AlterNats.sln +++ b/AlterNats.sln @@ -33,6 +33,14 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "AlterNats.Hosting", "src\Al EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MinimumWebApp", "sandbox\MinimumWebApp\MinimumWebApp.csproj", "{44881DEE-8B49-44EA-B0BA-8BDA4F706E1A}" EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "BlazorWasm", "BlazorWasm", "{326017A7-18B3-4567-B9F9-5521E4467198}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "BlazorWasm.Client", "sandbox\BlazorWasm\Client\BlazorWasm.Client.csproj", "{EFBCEF7F-51AA-4717-8E3E-304B6506F1AC}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "BlazorWasm.Server", "sandbox\BlazorWasm\Server\BlazorWasm.Server.csproj", "{36B9F490-681F-432D-B8FA-64440EC076D9}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "BlazorWasm.Shared", "sandbox\BlazorWasm\Shared\BlazorWasm.Shared.csproj", "{35D141D0-F80A-4E9B-A2AA-A1AEA925C9C2}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -71,6 +79,18 @@ Global {44881DEE-8B49-44EA-B0BA-8BDA4F706E1A}.Debug|Any CPU.Build.0 = Debug|Any CPU {44881DEE-8B49-44EA-B0BA-8BDA4F706E1A}.Release|Any CPU.ActiveCfg = Release|Any CPU {44881DEE-8B49-44EA-B0BA-8BDA4F706E1A}.Release|Any CPU.Build.0 = Release|Any CPU + {EFBCEF7F-51AA-4717-8E3E-304B6506F1AC}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {EFBCEF7F-51AA-4717-8E3E-304B6506F1AC}.Debug|Any CPU.Build.0 = Debug|Any CPU + {EFBCEF7F-51AA-4717-8E3E-304B6506F1AC}.Release|Any CPU.ActiveCfg = Release|Any CPU + {EFBCEF7F-51AA-4717-8E3E-304B6506F1AC}.Release|Any CPU.Build.0 = Release|Any CPU + {36B9F490-681F-432D-B8FA-64440EC076D9}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {36B9F490-681F-432D-B8FA-64440EC076D9}.Debug|Any CPU.Build.0 = Debug|Any CPU + {36B9F490-681F-432D-B8FA-64440EC076D9}.Release|Any CPU.ActiveCfg = Release|Any CPU + {36B9F490-681F-432D-B8FA-64440EC076D9}.Release|Any CPU.Build.0 = Release|Any CPU + {35D141D0-F80A-4E9B-A2AA-A1AEA925C9C2}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {35D141D0-F80A-4E9B-A2AA-A1AEA925C9C2}.Debug|Any CPU.Build.0 = Debug|Any CPU + {35D141D0-F80A-4E9B-A2AA-A1AEA925C9C2}.Release|Any CPU.ActiveCfg = Release|Any CPU + {35D141D0-F80A-4E9B-A2AA-A1AEA925C9C2}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -84,6 +104,10 @@ Global {7B518D4A-93EA-4E5E-956A-CCC34C005AD4} = {4827B3EC-73D8-436D-AE2A-5E29AC95FD0C} {D3F09B30-1ED5-48C2-81CD-A2AD88E751AC} = {4827B3EC-73D8-436D-AE2A-5E29AC95FD0C} {44881DEE-8B49-44EA-B0BA-8BDA4F706E1A} = {95A69671-16CA-4133-981C-CC381B7AAA30} + {326017A7-18B3-4567-B9F9-5521E4467198} = {95A69671-16CA-4133-981C-CC381B7AAA30} + {EFBCEF7F-51AA-4717-8E3E-304B6506F1AC} = {326017A7-18B3-4567-B9F9-5521E4467198} + {36B9F490-681F-432D-B8FA-64440EC076D9} = {326017A7-18B3-4567-B9F9-5521E4467198} + {35D141D0-F80A-4E9B-A2AA-A1AEA925C9C2} = {326017A7-18B3-4567-B9F9-5521E4467198} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {8CBB7278-D093-448E-B3DE-B5991209A1AA} diff --git a/sandbox/BlazorWasm/BlazorWasm.sln b/sandbox/BlazorWasm/BlazorWasm.sln deleted file mode 100644 index b1b4d34..0000000 --- a/sandbox/BlazorWasm/BlazorWasm.sln +++ /dev/null @@ -1,64 +0,0 @@ -Microsoft Visual Studio Solution File, Format Version 12.00 -# Visual Studio Version 16 -VisualStudioVersion = 16.0.0.0 -MinimumVisualStudioVersion = 16.0.0.0 -Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "BlazorWasm.Server", "Server\BlazorWasm.Server.csproj", "{EFFBE146-5F20-407A-B01B-C80D51C43861}" -EndProject -Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "BlazorWasm.Client", "Client\BlazorWasm.Client.csproj", "{B21E5013-1D4C-45E9-A8C2-7E6E6AAA17C1}" -EndProject -Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "BlazorWasm.Shared", "Shared\BlazorWasm.Shared.csproj", "{D1ED5F0E-D170-4DEA-8B75-5F92DC3CE72B}" -EndProject -Global - GlobalSection(SolutionConfigurationPlatforms) = preSolution - Debug|Any CPU = Debug|Any CPU - Debug|x64 = Debug|x64 - Debug|x86 = Debug|x86 - Release|Any CPU = Release|Any CPU - Release|x64 = Release|x64 - Release|x86 = Release|x86 - EndGlobalSection - GlobalSection(ProjectConfigurationPlatforms) = postSolution - {B21E5013-1D4C-45E9-A8C2-7E6E6AAA17C1}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {B21E5013-1D4C-45E9-A8C2-7E6E6AAA17C1}.Debug|Any CPU.Build.0 = Debug|Any CPU - {B21E5013-1D4C-45E9-A8C2-7E6E6AAA17C1}.Debug|x64.ActiveCfg = Debug|Any CPU - {B21E5013-1D4C-45E9-A8C2-7E6E6AAA17C1}.Debug|x64.Build.0 = Debug|Any CPU - {B21E5013-1D4C-45E9-A8C2-7E6E6AAA17C1}.Debug|x86.ActiveCfg = Debug|Any CPU - {B21E5013-1D4C-45E9-A8C2-7E6E6AAA17C1}.Debug|x86.Build.0 = Debug|Any CPU - {B21E5013-1D4C-45E9-A8C2-7E6E6AAA17C1}.Release|Any CPU.ActiveCfg = Release|Any CPU - {B21E5013-1D4C-45E9-A8C2-7E6E6AAA17C1}.Release|Any CPU.Build.0 = Release|Any CPU - {B21E5013-1D4C-45E9-A8C2-7E6E6AAA17C1}.Release|x64.ActiveCfg = Release|Any CPU - {B21E5013-1D4C-45E9-A8C2-7E6E6AAA17C1}.Release|x64.Build.0 = Release|Any CPU - {B21E5013-1D4C-45E9-A8C2-7E6E6AAA17C1}.Release|x86.ActiveCfg = Release|Any CPU - {B21E5013-1D4C-45E9-A8C2-7E6E6AAA17C1}.Release|x86.Build.0 = Release|Any CPU - {EFFBE146-5F20-407A-B01B-C80D51C43861}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {EFFBE146-5F20-407A-B01B-C80D51C43861}.Debug|Any CPU.Build.0 = Debug|Any CPU - {EFFBE146-5F20-407A-B01B-C80D51C43861}.Debug|x64.ActiveCfg = Debug|Any CPU - {EFFBE146-5F20-407A-B01B-C80D51C43861}.Debug|x64.Build.0 = Debug|Any CPU - {EFFBE146-5F20-407A-B01B-C80D51C43861}.Debug|x86.ActiveCfg = Debug|Any CPU - {EFFBE146-5F20-407A-B01B-C80D51C43861}.Debug|x86.Build.0 = Debug|Any CPU - {EFFBE146-5F20-407A-B01B-C80D51C43861}.Release|Any CPU.ActiveCfg = Release|Any CPU - {EFFBE146-5F20-407A-B01B-C80D51C43861}.Release|Any CPU.Build.0 = Release|Any CPU - {EFFBE146-5F20-407A-B01B-C80D51C43861}.Release|x64.ActiveCfg = Release|Any CPU - {EFFBE146-5F20-407A-B01B-C80D51C43861}.Release|x64.Build.0 = Release|Any CPU - {EFFBE146-5F20-407A-B01B-C80D51C43861}.Release|x86.ActiveCfg = Release|Any CPU - {EFFBE146-5F20-407A-B01B-C80D51C43861}.Release|x86.Build.0 = Release|Any CPU - {D1ED5F0E-D170-4DEA-8B75-5F92DC3CE72B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {D1ED5F0E-D170-4DEA-8B75-5F92DC3CE72B}.Debug|Any CPU.Build.0 = Debug|Any CPU - {D1ED5F0E-D170-4DEA-8B75-5F92DC3CE72B}.Debug|x64.ActiveCfg = Debug|Any CPU - {D1ED5F0E-D170-4DEA-8B75-5F92DC3CE72B}.Debug|x64.Build.0 = Debug|Any CPU - {D1ED5F0E-D170-4DEA-8B75-5F92DC3CE72B}.Debug|x86.ActiveCfg = Debug|Any CPU - {D1ED5F0E-D170-4DEA-8B75-5F92DC3CE72B}.Debug|x86.Build.0 = Debug|Any CPU - {D1ED5F0E-D170-4DEA-8B75-5F92DC3CE72B}.Release|Any CPU.ActiveCfg = Release|Any CPU - {D1ED5F0E-D170-4DEA-8B75-5F92DC3CE72B}.Release|Any CPU.Build.0 = Release|Any CPU - {D1ED5F0E-D170-4DEA-8B75-5F92DC3CE72B}.Release|x64.ActiveCfg = Release|Any CPU - {D1ED5F0E-D170-4DEA-8B75-5F92DC3CE72B}.Release|x64.Build.0 = Release|Any CPU - {D1ED5F0E-D170-4DEA-8B75-5F92DC3CE72B}.Release|x86.ActiveCfg = Release|Any CPU - {D1ED5F0E-D170-4DEA-8B75-5F92DC3CE72B}.Release|x86.Build.0 = Release|Any CPU - EndGlobalSection - GlobalSection(SolutionProperties) = preSolution - HideSolutionNode = FALSE - EndGlobalSection - GlobalSection(ExtensibilityGlobals) = postSolution - SolutionGuid = {86EA8CC1-A7C6-43DD-B602-F1A3ABAD77CC} - EndGlobalSection -EndGlobal \ No newline at end of file diff --git a/sandbox/BlazorWasm/README.md b/sandbox/BlazorWasm/README.md new file mode 100644 index 0000000..6c9f7cf --- /dev/null +++ b/sandbox/BlazorWasm/README.md @@ -0,0 +1,33 @@ +# BlazorWasm + +Blazor WebAssembly demo with NATS request/reply + +## Pre-requisites + +Start `nats-server` with a WebSocket listener on port 4280 + +**With nats-server** + +```bash +nats-server -c nats-server.conf +``` + +**With docker** + + +```bash +docker run --rm \ + --name nats-server \ + -p 4222:4222 \ + -p 4280:4280 \ + -v "$(pwd)/nats-server.conf:/etc/nats/nats-server.conf" \ + nats +``` + +## Run the Project + +```bash +dotnet run --project Server +``` + +Navigate to http://localhost:5000/fetchdata diff --git a/sandbox/BlazorWasm/docker/nats.sh b/sandbox/BlazorWasm/docker/nats.sh deleted file mode 100755 index b2da913..0000000 --- a/sandbox/BlazorWasm/docker/nats.sh +++ /dev/null @@ -1,12 +0,0 @@ -#!/usr/bin/env bash - -cd "$(dirname "$0")" -set -e - -docker run --rm \ - -p 4222:4222 \ - -p 4280:4280 \ - -p 8222:8222 \ - -v "$(pwd)/nats-server.conf:/etc/nats/nats-server.conf" \ - nats:2.8.2-alpine \ - "$@" diff --git a/sandbox/BlazorWasm/docker/nats-server.conf b/sandbox/BlazorWasm/nats-server.conf similarity index 58% rename from sandbox/BlazorWasm/docker/nats-server.conf rename to sandbox/BlazorWasm/nats-server.conf index c9a9064..2f5451b 100644 --- a/sandbox/BlazorWasm/docker/nats-server.conf +++ b/sandbox/BlazorWasm/nats-server.conf @@ -1,8 +1,4 @@ -port: 4222 - websocket { port: 4280 no_tls: true } - -monitor_port: 8222 diff --git a/src/AlterNats/NatsConnection.cs b/src/AlterNats/NatsConnection.cs index 8b4d080..072c735 100644 --- a/src/AlterNats/NatsConnection.cs +++ b/src/AlterNats/NatsConnection.cs @@ -151,16 +151,15 @@ async ValueTask InitialConnectAsync() target = await OnConnectingAsync(target).ConfigureAwait(false); } + logger.LogInformation("Try to connect NATS {0}", uri); if (uri.IsWebSocket) { - logger.LogInformation("Try to connect NATS {0}", uri.Uri); var conn = new WebSocketConnection(); await conn.ConnectAsync(uri.Uri, Options.ConnectTimeout).ConfigureAwait(false); this.socket = conn; } else { - logger.LogInformation("Try to connect NATS {0}:{1}", uri.Host, uri.Port); var conn = new TcpConnection(); await conn.ConnectAsync(target.Host, target.Port, Options.ConnectTimeout).ConfigureAwait(false); this.socket = conn; @@ -171,7 +170,7 @@ async ValueTask InitialConnectAsync() } catch (Exception ex) { - logger.LogError(ex, "Fail to connect NATS {0}:{1}.", uri.Host, uri.Port); + logger.LogError(ex, "Fail to connect NATS {0}", uri); } } if (this.socket == null) @@ -235,7 +234,7 @@ async ValueTask InitialConnectAsync() lock (gate) { var url = currentConnectUri; - logger.LogInformation("Connect succeed {0}, NATS {1}:{2}", name, url?.Host, url?.Port); + logger.LogInformation("Connect succeed {0}, NATS {1}", name, url); this.ConnectionState = NatsConnectionState.Open; this.pingTimerCancellationTokenSource = new CancellationTokenSource(); StartPingTimer(pingTimerCancellationTokenSource.Token); @@ -288,10 +287,11 @@ async void ReconnectLoop() // Dispose current and create new await socket.DisposeAsync().ConfigureAwait(false); - NatsUri[] urls = Array.Empty(); + NatsUri[] urls; + var defaultScheme = currentConnectUri?.Uri.Scheme ?? NatsUri.DefaultScheme; if (Options.NoRandomize) { - urls = this.ServerInfo?.ClientConnectUrls?.Select(x => new NatsUri(x)).Distinct().ToArray() ?? Array.Empty(); + urls = this.ServerInfo?.ClientConnectUrls?.Select(x => new NatsUri(x, defaultScheme)).Distinct().ToArray() ?? Array.Empty(); if (urls.Length == 0) { urls = Options.GetSeedUris(); @@ -299,7 +299,7 @@ async void ReconnectLoop() } else { - urls = this.ServerInfo?.ClientConnectUrls?.Select(x => new NatsUri(x)).OrderBy(_ => Guid.NewGuid()).Distinct().ToArray() ?? Array.Empty(); + urls = this.ServerInfo?.ClientConnectUrls?.Select(x => new NatsUri(x, defaultScheme)).OrderBy(_ => Guid.NewGuid()).Distinct().ToArray() ?? Array.Empty(); if (urls.Length == 0) { urls = Options.GetSeedUris(); @@ -328,16 +328,15 @@ async void ReconnectLoop() target = await OnConnectingAsync(target).ConfigureAwait(false); } + logger.LogInformation("Try to connect NATS {0}", url); if (url.IsWebSocket) { - logger.LogInformation("Try to connect NATS {0}", url.Uri); var conn = new WebSocketConnection(); await conn.ConnectAsync(url.Uri, Options.ConnectTimeout).ConfigureAwait(false); this.socket = conn; } else { - logger.LogInformation("Try to connect NATS {0}:{1}", url.Host, url.Port); var conn = new TcpConnection(); await conn.ConnectAsync(target.Host, target.Port, Options.ConnectTimeout).ConfigureAwait(false); this.socket = conn; @@ -375,7 +374,7 @@ async void ReconnectLoop() { if (url != null) { - logger.LogError(ex, "Fail to connect NATS {0}:{1}.", url.Host, url.Port); + logger.LogError(ex, "Fail to connect NATS {0}", url); } if (socketWriter != null) @@ -402,7 +401,7 @@ async void ReconnectLoop() lock (gate) { - logger.LogInformation("Connect succeed {0}, NATS {1}:{2}", name, url.Host, url.Port); + logger.LogInformation("Connect succeed {0}, NATS {1}", name, url); this.ConnectionState = NatsConnectionState.Open; this.pingTimerCancellationTokenSource = new CancellationTokenSource(); StartPingTimer(pingTimerCancellationTokenSource.Token); diff --git a/src/AlterNats/NatsUri.cs b/src/AlterNats/NatsUri.cs index 482e277..78e6f51 100644 --- a/src/AlterNats/NatsUri.cs +++ b/src/AlterNats/NatsUri.cs @@ -2,38 +2,52 @@ internal sealed class NatsUri : IEquatable { - const string DefaultScheme = "nats://"; - public static readonly NatsUri Default = new NatsUri("nats://localhost:4222"); - public const int DefaultPort = 4222; + public const string DefaultScheme = "nats"; - internal readonly Uri Uri; - public bool IsSecure { get; } - public bool IsWebSocket { get; } + public readonly Uri Uri; + public readonly bool IsSecure; + public readonly bool IsWebSocket; public string Host => Uri.Host; public int Port => Uri.Port; - public NatsUri(string urlString) + public NatsUri(string urlString, string defaultScheme = DefaultScheme) { if (!urlString.Contains("://")) { - urlString = DefaultScheme + urlString; + urlString = $"{defaultScheme}://{urlString}"; } - this.Uri = new Uri(urlString); - if (Uri.Scheme is "tls" or "wss") + var uriBuilder = new UriBuilder(new Uri(urlString, UriKind.Absolute)); + if (string.IsNullOrEmpty(uriBuilder.Host)) { - IsSecure = true; + uriBuilder.Host = "localhost"; } - if (Uri.Scheme is "ws" or "wss") + switch (uriBuilder.Scheme) { - IsWebSocket = true; + case "nats": + if (uriBuilder.Port == -1) + { + uriBuilder.Port = 4222; + } + break; + case "ws": + IsWebSocket = true; + break; + case "wss": + IsWebSocket = true; + IsSecure = true; + break; + default: + throw new ArgumentException($"unsupported scheme {uriBuilder.Scheme} in nats URL {urlString}", urlString); } + + Uri = uriBuilder.Uri; } public override string ToString() { - return Uri.ToString().Trim('/'); + return IsWebSocket && Uri.AbsolutePath != "/" ? Uri.ToString() : Uri.ToString().Trim('/'); } public override int GetHashCode() => Uri.GetHashCode(); diff --git a/tests/AlterNats.Tests/NatsConnectionTest.Sharding.cs b/tests/AlterNats.Tests/NatsConnectionTest.Sharding.cs index 5c834d4..f2610e5 100644 --- a/tests/AlterNats.Tests/NatsConnectionTest.Sharding.cs +++ b/tests/AlterNats.Tests/NatsConnectionTest.Sharding.cs @@ -1,12 +1,12 @@ namespace AlterNats.Tests; -public partial class NatsConnectionTest +public abstract partial class NatsConnectionTest { // TODO:do. [Fact] public async Task ConnectionPoolTest() { - await using var server = new NatsServer(output); + await using var server = new NatsServer(output, transportType); var conn = server.CreatePooledClientConnection(); @@ -24,11 +24,12 @@ public async Task ConnectionPoolTest() [Fact] public async Task ShardingConnectionTest() { - await using var server1 = new NatsServer(output); - await using var server2 = new NatsServer(output); - await using var server3 = new NatsServer(output); + await using var server1 = new NatsServer(output, transportType); + await using var server2 = new NatsServer(output, transportType); + await using var server3 = new NatsServer(output, transportType); - var urls = new[] { server1.Port, server2.Port, server3.Port }.Select(x => $"nats://localhost:{x}").ToArray(); + var urls = new[] { server1.Ports.ServerPort, server2.Ports.ServerPort, server3.Ports.ServerPort } + .Select(x => $"nats://localhost:{x}").ToArray(); var shardedConnection = new NatsShardingConnection(1, NatsOptions.Default, urls); diff --git a/tests/AlterNats.Tests/NatsConnectionTest.Transports.cs b/tests/AlterNats.Tests/NatsConnectionTest.Transports.cs new file mode 100644 index 0000000..3f04f6a --- /dev/null +++ b/tests/AlterNats.Tests/NatsConnectionTest.Transports.cs @@ -0,0 +1,15 @@ +namespace AlterNats.Tests; + +public class NatsConnectionTestTcp : NatsConnectionTest +{ + public NatsConnectionTestTcp(ITestOutputHelper output) : base(output, TransportType.Tcp) + { + } +} + +public class NatsConnectionTestWs : NatsConnectionTest +{ + public NatsConnectionTestWs(ITestOutputHelper output) : base(output, TransportType.WebSocket) + { + } +} diff --git a/tests/AlterNats.Tests/NatsConnectionTest.cs b/tests/AlterNats.Tests/NatsConnectionTest.cs index da26c3c..cac5da5 100644 --- a/tests/AlterNats.Tests/NatsConnectionTest.cs +++ b/tests/AlterNats.Tests/NatsConnectionTest.cs @@ -1,30 +1,23 @@ -using Cysharp.Diagnostics; +using System.Text; using MessagePack; -using System; -using System.Collections.Generic; -using System.Linq; -using System.Net.Sockets; -using System.Runtime.InteropServices; -using System.Text; -using System.Threading; -using System.Threading.Tasks; -using Xunit.Abstractions; namespace AlterNats.Tests; -public partial class NatsConnectionTest +public abstract partial class NatsConnectionTest { readonly ITestOutputHelper output; + readonly TransportType transportType; - public NatsConnectionTest(ITestOutputHelper output) + protected NatsConnectionTest(ITestOutputHelper output, TransportType transportType) { this.output = output; + this.transportType = transportType; } [Fact] public async Task SimplePubSubTest() { - await using var server = new NatsServer(output); + await using var server = new NatsServer(output, transportType); await using var subConnection = server.CreateClientConnection(); await using var pubConnection = server.CreateClientConnection(); @@ -57,7 +50,7 @@ await subConnection.SubscribeAsync(key, x => [Fact] public async Task EncodingTest() { - await using var server = new NatsServer(output); + await using var server = new NatsServer(output, transportType); var serializer1 = NatsOptions.Default.Serializer; var serializer2 = new MessagePackNatsSerializer(); @@ -97,7 +90,7 @@ public async Task EncodingTest() [InlineData(32768)] // 32 KiB public async Task RequestTest(int minSize) { - await using var server = new NatsServer(output); + await using var server = new NatsServer(output, transportType); var options = NatsOptions.Default with { RequestTimeout = TimeSpan.FromSeconds(5) }; await using var subConnection = server.CreateClientConnection(options); @@ -134,7 +127,12 @@ await Assert.ThrowsAsync(async () => [Fact] public async Task ReconnectSingleTest() { - await using var server = new NatsServer(output); + using var ports = new NatsServerPorts(new NatsServerPortOptions + { + WebSocket = transportType == TransportType.WebSocket, + ServerDisposeReturnsPorts = false + }); + await using var server = new NatsServer(output, transportType, ports); var key = Guid.NewGuid().ToString(); await using var subConnection = server.CreateClientConnection(); @@ -153,6 +151,7 @@ public async Task ReconnectSingleTest() { waitForReceive300.Pulse(); } + if (x == 500) { waitForReceiveFinish.Pulse(); @@ -177,7 +176,7 @@ public async Task ReconnectSingleTest() // start new nats server on same port output.WriteLine("START NEW SERVER"); - await using var newServer = new NatsServer(output, server.Port); + await using var newServer = new NatsServer(output, transportType, ports); await subConnection.ConnectAsync(); // wait open again await pubConnection.ConnectAsync(); // wait open again @@ -192,7 +191,7 @@ public async Task ReconnectSingleTest() [Fact(Timeout = 15000)] public async Task ReconnectClusterTest() { - await using var cluster = new NatsCluster(output); + await using var cluster = new NatsCluster(output, transportType); await Task.Delay(TimeSpan.FromSeconds(5)); // wait for cluster completely connected. var key = Guid.NewGuid().ToString(); @@ -205,9 +204,12 @@ public async Task ReconnectClusterTest() await connection2.ConnectAsync(); await connection3.ConnectAsync(); - output.WriteLine("Server1 ClientConnectUrls:" + String.Join(", ", connection1.ServerInfo?.ClientConnectUrls ?? Array.Empty())); - output.WriteLine("Server2 ClientConnectUrls:" + String.Join(", ", connection2.ServerInfo?.ClientConnectUrls ?? Array.Empty())); - output.WriteLine("Server3 ClientConnectUrls:" + String.Join(", ", connection3.ServerInfo?.ClientConnectUrls ?? Array.Empty())); + output.WriteLine("Server1 ClientConnectUrls:" + + String.Join(", ", connection1.ServerInfo?.ClientConnectUrls ?? Array.Empty())); + output.WriteLine("Server2 ClientConnectUrls:" + + String.Join(", ", connection2.ServerInfo?.ClientConnectUrls ?? Array.Empty())); + output.WriteLine("Server3 ClientConnectUrls:" + + String.Join(", ", connection3.ServerInfo?.ClientConnectUrls ?? Array.Empty())); connection1.ServerInfo!.ClientConnectUrls!.Select(x => new NatsUri(x).Port).Distinct().Count().ShouldBe(3); connection2.ServerInfo!.ClientConnectUrls!.Select(x => new NatsUri(x).Port).Distinct().Count().ShouldBe(3); @@ -224,6 +226,7 @@ public async Task ReconnectClusterTest() { waitForReceive300.Pulse(); } + if (x == 500) { waitForReceiveFinish.Pulse(); @@ -238,13 +241,14 @@ public async Task ReconnectClusterTest() var disconnectSignal = connection1.ConnectionDisconnectedAsAwaitable(); // register disconnect before kill - output.WriteLine($"TRY KILL SERVER1 Port:{cluster.Server1.Port}"); + output.WriteLine($"TRY KILL SERVER1 Port:{cluster.Server1.Ports.ServerPort}"); await cluster.Server1.DisposeAsync(); // process kill await disconnectSignal; await connection1.ConnectAsync(); // wait for reconnect complete. - connection1.ServerInfo!.Port.Should().BeOneOf(cluster.Server2.Port, cluster.Server3.Port); + connection1.ServerInfo!.Port.Should() + .BeOneOf(cluster.Server2.Ports.ServerPort, cluster.Server3.Ports.ServerPort); await connection2.PublishAsync(key, 400); await connection2.PublishAsync(key, 500); @@ -258,6 +262,7 @@ public class SampleClass : IEquatable { [Key(0)] public int Id { get; set; } + [Key(1)] public string Name { get; set; } diff --git a/tests/AlterNats.Tests/_NatsServer.cs b/tests/AlterNats.Tests/_NatsServer.cs index 8d3ed74..524199c 100644 --- a/tests/AlterNats.Tests/_NatsServer.cs +++ b/tests/AlterNats.Tests/_NatsServer.cs @@ -1,6 +1,6 @@ -using Cysharp.Diagnostics; -using System.Net.Sockets; +using System.Net.Sockets; using System.Runtime.InteropServices; +using Cysharp.Diagnostics; namespace AlterNats.Tests; @@ -10,29 +10,48 @@ public class NatsServer : IAsyncDisposable static readonly string natsServerPath = $"../../../../../tools/nats-server{ext}"; readonly CancellationTokenSource cancellationTokenSource = new(); + readonly string? configFileName; readonly ITestOutputHelper outputHelper; readonly Task processOut; readonly Task processErr; - - public int Port { get; } - + readonly TransportType transportType; bool isDisposed; - public NatsServer(ITestOutputHelper outputHelper, string argument = "") - : this(outputHelper, Random.Shared.Next(5000, 8000), argument) + public readonly NatsServerPorts Ports; + + public NatsServer(ITestOutputHelper outputHelper, TransportType transportType, string argument = "") + : this(outputHelper, transportType, new NatsServerPorts(new NatsServerPortOptions + { + WebSocket = transportType == TransportType.WebSocket + }), argument) { } - public NatsServer(ITestOutputHelper outputHelper, int port, string argument = "") + public NatsServer(ITestOutputHelper outputHelper, TransportType transportType, NatsServerPorts ports, + string argument = "") { this.outputHelper = outputHelper; - this.Port = port; - var cmd = $"{natsServerPath} -p {Port} {argument}".Trim(); + this.transportType = transportType; + Ports = ports; + var cmd = $"{natsServerPath} -p {Ports.ServerPort} {argument}".Trim(); + + if (transportType == TransportType.WebSocket) + { + configFileName = Path.GetTempFileName(); + var contents = ""; + contents += "websocket {" + Environment.NewLine; + contents += $" port: {Ports.WebSocketPort}" + Environment.NewLine; + contents += " no_tls: true" + Environment.NewLine; + contents += "}" + Environment.NewLine; + File.WriteAllText(configFileName, contents); + cmd = $"{cmd} -c {configFileName}"; + } + outputHelper.WriteLine("ProcessStart: " + cmd); var (p, stdout, stderror) = ProcessX.GetDualAsyncEnumerable(cmd); - this.processOut = EnumerteWithLogsAsync(stdout, cancellationTokenSource.Token); - this.processErr = EnumerteWithLogsAsync(stderror, cancellationTokenSource.Token); + processOut = EnumerateWithLogsAsync(stdout, cancellationTokenSource.Token); + processErr = EnumerateWithLogsAsync(stderror, cancellationTokenSource.Token); // Check for start server Task.Run(async () => @@ -42,7 +61,7 @@ public NatsServer(ITestOutputHelper outputHelper, int port, string argument = "" { try { - await client.ConnectAsync("localhost", Port, cancellationTokenSource.Token); + await client.ConnectAsync("localhost", Ports.ServerPort, cancellationTokenSource.Token); if (client.Connected) return; } catch @@ -54,16 +73,17 @@ public NatsServer(ITestOutputHelper outputHelper, int port, string argument = "" } }).Wait(5000); // timeout - if (this.processOut.IsFaulted) + if (processOut.IsFaulted) { - this.processOut.GetAwaiter().GetResult(); // throw exception + processOut.GetAwaiter().GetResult(); // throw exception } - if (this.processErr.IsFaulted) + + if (processErr.IsFaulted) { - this.processErr.GetAwaiter().GetResult(); // throw exception + processErr.GetAwaiter().GetResult(); // throw exception } - outputHelper.WriteLine("OK to Process Start, Port:" + Port); + outputHelper.WriteLine("OK to Process Start, Port:" + Ports.ServerPort); } public async ValueTask DisposeAsync() @@ -78,18 +98,32 @@ public async ValueTask DisposeAsync() var processLogs = await processErr; // wait for process exit, nats output info to stderror if (processLogs.Length != 0) { - outputHelper.WriteLine("Process Logs of " + Port); + outputHelper.WriteLine("Process Logs of " + Ports.ServerPort); foreach (var item in processLogs) { outputHelper.WriteLine(item); } } } - catch (OperationCanceledException) { } + catch (OperationCanceledException) + { + } + finally + { + if (configFileName != null) + { + File.Delete(configFileName); + } + + if (Ports.ServerDisposeReturnsPorts) + { + Ports.Dispose(); + } + } } } - async Task EnumerteWithLogsAsync(ProcessAsyncEnumerable enumerable, CancellationToken cancellationToken) + async Task EnumerateWithLogsAsync(ProcessAsyncEnumerable enumerable, CancellationToken cancellationToken) { var l = new List(); try @@ -102,6 +136,7 @@ async Task EnumerteWithLogsAsync(ProcessAsyncEnumerable enumerable, Ca catch (OperationCanceledException) { } + return l.ToArray(); } @@ -119,6 +154,13 @@ public NatsConnectionPool CreatePooledClientConnection(NatsOptions options) return new NatsConnectionPool(4, ClientOptions(options)); } + public string ClientUrl => transportType switch + { + TransportType.Tcp => $"localhost:{Ports.ServerPort}", + TransportType.WebSocket => $"ws://localhost:{Ports.WebSocketPort}", + _ => throw new ArgumentOutOfRangeException() + }; + public NatsOptions ClientOptions(NatsOptions options) { return options with @@ -127,29 +169,44 @@ public NatsOptions ClientOptions(NatsOptions options) //ConnectTimeout = TimeSpan.FromSeconds(1), //ReconnectWait = TimeSpan.Zero, //ReconnectJitter = TimeSpan.Zero, - Url = $"localhost:{Port}" + Url = ClientUrl }; } } public class NatsCluster : IAsyncDisposable { - readonly ITestOutputHelper outputHelper; - public NatsServer Server1 { get; } public NatsServer Server2 { get; } public NatsServer Server3 { get; } - public NatsCluster(ITestOutputHelper outputHelper) + public NatsCluster(ITestOutputHelper outputHelper, TransportType transportType) { - var Port1 = Random.Shared.Next(10000, 13000); - var Port2 = Random.Shared.Next(10000, 13000); - var Port3 = Random.Shared.Next(10000, 13000); - - this.outputHelper = outputHelper; - this.Server1 = new NatsServer(outputHelper, $"--cluster_name test-cluster -cluster nats://localhost:{Port1} -routes nats://localhost:{Port2},nats://localhost:{Port3}"); - this.Server2 = new NatsServer(outputHelper, $"--cluster_name test-cluster -cluster nats://localhost:{Port2} -routes nats://localhost:{Port1},nats://localhost:{Port3}"); - this.Server3 = new NatsServer(outputHelper, $"--cluster_name test-cluster -cluster nats://localhost:{Port3} -routes nats://localhost:{Port1},nats://localhost:{Port2}"); + var port1 = new NatsServerPorts(new NatsServerPortOptions + { + WebSocket = transportType == TransportType.WebSocket, + Clustering = true + }); + var port2 = new NatsServerPorts(new NatsServerPortOptions + { + WebSocket = transportType == TransportType.WebSocket, + Clustering = true + }); + var port3 = new NatsServerPorts(new NatsServerPortOptions + { + WebSocket = transportType == TransportType.WebSocket, + Clustering = true + }); + + var baseArgument = + $"--cluster_name test-cluster -routes nats://localhost:{port1.ClusteringPort},nats://localhost:{port2.ClusteringPort},nats://localhost:{port3.ClusteringPort}"; + + Server1 = new NatsServer(outputHelper, transportType, port1, + $"{baseArgument} -cluster nats://localhost:{port1.ClusteringPort}"); + Server2 = new NatsServer(outputHelper, transportType, port2, + $"{baseArgument} -cluster nats://localhost:{port2.ClusteringPort}"); + Server3 = new NatsServer(outputHelper, transportType, port3, + $"{baseArgument} -cluster nats://localhost:{port3.ClusteringPort}"); } public async ValueTask DisposeAsync() diff --git a/tests/AlterNats.Tests/_NatsServerOptions.cs b/tests/AlterNats.Tests/_NatsServerOptions.cs new file mode 100644 index 0000000..f4f3e2a --- /dev/null +++ b/tests/AlterNats.Tests/_NatsServerOptions.cs @@ -0,0 +1,96 @@ +using System.Collections.Concurrent; +using System.Net.NetworkInformation; + +namespace AlterNats.Tests; + +public class NatsServerPorts : IDisposable +{ + static readonly Lazy> portFactory = new(() => + { + const int start = 1024; + const int size = 4096; + var properties = IPGlobalProperties.GetIPGlobalProperties(); + var activePorts = new HashSet(properties.GetActiveTcpListeners() + .Where(m => m.Port is >= start and < start + size) + .Select(m => m.Port)); + var freePorts = new HashSet(Enumerable.Range(start, size)); + freePorts.ExceptWith(activePorts); + return new ConcurrentQueue(freePorts); + }); + + static int LeasePort() + { + if (portFactory.Value.TryDequeue(out var port)) + { + return port; + } + + throw new Exception("unable to allocate port"); + } + + static void ReturnPort(int port) + { + portFactory.Value.Enqueue(port); + } + + public readonly int ServerPort; + public readonly int? ClusteringPort; + public readonly int? WebSocketPort; + public readonly bool ServerDisposeReturnsPorts; + + bool _isDisposed; + + public NatsServerPorts() : this(new NatsServerPortOptions()) + { + } + + public NatsServerPorts(NatsServerPortOptions portOptions) + { + ServerPort = LeasePort(); + ServerDisposeReturnsPorts = portOptions.ServerDisposeReturnsPorts; + if (portOptions.Clustering) + { + ClusteringPort = LeasePort(); + } + + if (portOptions.WebSocket) + { + WebSocketPort = LeasePort(); + } + } + + public void Dispose() + { + if (_isDisposed) + { + return; + } + + _isDisposed = true; + ReturnPort(ServerPort); + if (ClusteringPort.HasValue) + { + ReturnPort(ClusteringPort.Value); + } + + if (WebSocketPort.HasValue) + { + ReturnPort(WebSocketPort.Value); + } + } +} + +public sealed record NatsServerPortOptions +{ + public bool Clustering { get; init; } = false; + + public bool WebSocket { get; init; } = false; + + public bool ServerDisposeReturnsPorts { get; init; } = true; +} + +public enum TransportType +{ + Tcp, + WebSocket +} diff --git a/tests/AlterNats.Tests/test.runsettings b/tests/AlterNats.Tests/test.runsettings index 8a3feba..004e361 100644 --- a/tests/AlterNats.Tests/test.runsettings +++ b/tests/AlterNats.Tests/test.runsettings @@ -2,6 +2,6 @@ 1 - 30000 + 60000