Skip to content
This repository has been archived by the owner on Jul 17, 2023. It is now read-only.

Commit

Permalink
web socket tests
Browse files Browse the repository at this point in the history
Signed-off-by: Caleb Lloyd <caleb@synadia.com>
  • Loading branch information
Caleb Lloyd committed Jul 28, 2022
1 parent d715153 commit e28baff
Show file tree
Hide file tree
Showing 13 changed files with 346 additions and 182 deletions.
24 changes: 24 additions & 0 deletions AlterNats.sln
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "AlterNats.Hosting", "src\Al
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MinimumWebApp", "sandbox\MinimumWebApp\MinimumWebApp.csproj", "{44881DEE-8B49-44EA-B0BA-8BDA4F706E1A}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "BlazorWasm", "BlazorWasm", "{326017A7-18B3-4567-B9F9-5521E4467198}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "BlazorWasm.Client", "sandbox\BlazorWasm\Client\BlazorWasm.Client.csproj", "{EFBCEF7F-51AA-4717-8E3E-304B6506F1AC}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "BlazorWasm.Server", "sandbox\BlazorWasm\Server\BlazorWasm.Server.csproj", "{36B9F490-681F-432D-B8FA-64440EC076D9}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "BlazorWasm.Shared", "sandbox\BlazorWasm\Shared\BlazorWasm.Shared.csproj", "{35D141D0-F80A-4E9B-A2AA-A1AEA925C9C2}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -71,6 +79,18 @@ Global
{44881DEE-8B49-44EA-B0BA-8BDA4F706E1A}.Debug|Any CPU.Build.0 = Debug|Any CPU
{44881DEE-8B49-44EA-B0BA-8BDA4F706E1A}.Release|Any CPU.ActiveCfg = Release|Any CPU
{44881DEE-8B49-44EA-B0BA-8BDA4F706E1A}.Release|Any CPU.Build.0 = Release|Any CPU
{EFBCEF7F-51AA-4717-8E3E-304B6506F1AC}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{EFBCEF7F-51AA-4717-8E3E-304B6506F1AC}.Debug|Any CPU.Build.0 = Debug|Any CPU
{EFBCEF7F-51AA-4717-8E3E-304B6506F1AC}.Release|Any CPU.ActiveCfg = Release|Any CPU
{EFBCEF7F-51AA-4717-8E3E-304B6506F1AC}.Release|Any CPU.Build.0 = Release|Any CPU
{36B9F490-681F-432D-B8FA-64440EC076D9}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{36B9F490-681F-432D-B8FA-64440EC076D9}.Debug|Any CPU.Build.0 = Debug|Any CPU
{36B9F490-681F-432D-B8FA-64440EC076D9}.Release|Any CPU.ActiveCfg = Release|Any CPU
{36B9F490-681F-432D-B8FA-64440EC076D9}.Release|Any CPU.Build.0 = Release|Any CPU
{35D141D0-F80A-4E9B-A2AA-A1AEA925C9C2}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{35D141D0-F80A-4E9B-A2AA-A1AEA925C9C2}.Debug|Any CPU.Build.0 = Debug|Any CPU
{35D141D0-F80A-4E9B-A2AA-A1AEA925C9C2}.Release|Any CPU.ActiveCfg = Release|Any CPU
{35D141D0-F80A-4E9B-A2AA-A1AEA925C9C2}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand All @@ -84,6 +104,10 @@ Global
{7B518D4A-93EA-4E5E-956A-CCC34C005AD4} = {4827B3EC-73D8-436D-AE2A-5E29AC95FD0C}
{D3F09B30-1ED5-48C2-81CD-A2AD88E751AC} = {4827B3EC-73D8-436D-AE2A-5E29AC95FD0C}
{44881DEE-8B49-44EA-B0BA-8BDA4F706E1A} = {95A69671-16CA-4133-981C-CC381B7AAA30}
{326017A7-18B3-4567-B9F9-5521E4467198} = {95A69671-16CA-4133-981C-CC381B7AAA30}
{EFBCEF7F-51AA-4717-8E3E-304B6506F1AC} = {326017A7-18B3-4567-B9F9-5521E4467198}
{36B9F490-681F-432D-B8FA-64440EC076D9} = {326017A7-18B3-4567-B9F9-5521E4467198}
{35D141D0-F80A-4E9B-A2AA-A1AEA925C9C2} = {326017A7-18B3-4567-B9F9-5521E4467198}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {8CBB7278-D093-448E-B3DE-B5991209A1AA}
Expand Down
64 changes: 0 additions & 64 deletions sandbox/BlazorWasm/BlazorWasm.sln

This file was deleted.

33 changes: 33 additions & 0 deletions sandbox/BlazorWasm/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# BlazorWasm

Blazor WebAssembly demo with NATS request/reply

## Pre-requisites

Start `nats-server` with a WebSocket listener on port 4280

**With nats-server**

```bash
nats-server -c nats-server.conf
```

**With docker**


```bash
docker run --rm \
--name nats-server \
-p 4222:4222 \
-p 4280:4280 \
-v "$(pwd)/nats-server.conf:/etc/nats/nats-server.conf" \
nats
```

## Run the Project

```bash
dotnet run --project Server
```

Navigate to http://localhost:5000/fetchdata
12 changes: 0 additions & 12 deletions sandbox/BlazorWasm/docker/nats.sh

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,8 +1,4 @@
port: 4222

websocket {
port: 4280
no_tls: true
}

monitor_port: 8222
21 changes: 10 additions & 11 deletions src/AlterNats/NatsConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -151,16 +151,15 @@ async ValueTask InitialConnectAsync()
target = await OnConnectingAsync(target).ConfigureAwait(false);
}

logger.LogInformation("Try to connect NATS {0}", uri);
if (uri.IsWebSocket)
{
logger.LogInformation("Try to connect NATS {0}", uri.Uri);
var conn = new WebSocketConnection();
await conn.ConnectAsync(uri.Uri, Options.ConnectTimeout).ConfigureAwait(false);
this.socket = conn;
}
else
{
logger.LogInformation("Try to connect NATS {0}:{1}", uri.Host, uri.Port);
var conn = new TcpConnection();
await conn.ConnectAsync(target.Host, target.Port, Options.ConnectTimeout).ConfigureAwait(false);
this.socket = conn;
Expand All @@ -171,7 +170,7 @@ async ValueTask InitialConnectAsync()
}
catch (Exception ex)
{
logger.LogError(ex, "Fail to connect NATS {0}:{1}.", uri.Host, uri.Port);
logger.LogError(ex, "Fail to connect NATS {0}", uri);
}
}
if (this.socket == null)
Expand Down Expand Up @@ -235,7 +234,7 @@ async ValueTask InitialConnectAsync()
lock (gate)
{
var url = currentConnectUri;
logger.LogInformation("Connect succeed {0}, NATS {1}:{2}", name, url?.Host, url?.Port);
logger.LogInformation("Connect succeed {0}, NATS {1}", name, url);
this.ConnectionState = NatsConnectionState.Open;
this.pingTimerCancellationTokenSource = new CancellationTokenSource();
StartPingTimer(pingTimerCancellationTokenSource.Token);
Expand Down Expand Up @@ -288,18 +287,19 @@ async void ReconnectLoop()
// Dispose current and create new
await socket.DisposeAsync().ConfigureAwait(false);

NatsUri[] urls = Array.Empty<NatsUri>();
NatsUri[] urls;
var defaultScheme = currentConnectUri?.Uri.Scheme ?? NatsUri.DefaultScheme;
if (Options.NoRandomize)
{
urls = this.ServerInfo?.ClientConnectUrls?.Select(x => new NatsUri(x)).Distinct().ToArray() ?? Array.Empty<NatsUri>();
urls = this.ServerInfo?.ClientConnectUrls?.Select(x => new NatsUri(x, defaultScheme)).Distinct().ToArray() ?? Array.Empty<NatsUri>();
if (urls.Length == 0)
{
urls = Options.GetSeedUris();
}
}
else
{
urls = this.ServerInfo?.ClientConnectUrls?.Select(x => new NatsUri(x)).OrderBy(_ => Guid.NewGuid()).Distinct().ToArray() ?? Array.Empty<NatsUri>();
urls = this.ServerInfo?.ClientConnectUrls?.Select(x => new NatsUri(x, defaultScheme)).OrderBy(_ => Guid.NewGuid()).Distinct().ToArray() ?? Array.Empty<NatsUri>();
if (urls.Length == 0)
{
urls = Options.GetSeedUris();
Expand Down Expand Up @@ -328,16 +328,15 @@ async void ReconnectLoop()
target = await OnConnectingAsync(target).ConfigureAwait(false);
}

logger.LogInformation("Try to connect NATS {0}", url);
if (url.IsWebSocket)
{
logger.LogInformation("Try to connect NATS {0}", url.Uri);
var conn = new WebSocketConnection();
await conn.ConnectAsync(url.Uri, Options.ConnectTimeout).ConfigureAwait(false);
this.socket = conn;
}
else
{
logger.LogInformation("Try to connect NATS {0}:{1}", url.Host, url.Port);
var conn = new TcpConnection();
await conn.ConnectAsync(target.Host, target.Port, Options.ConnectTimeout).ConfigureAwait(false);
this.socket = conn;
Expand Down Expand Up @@ -375,7 +374,7 @@ async void ReconnectLoop()
{
if (url != null)
{
logger.LogError(ex, "Fail to connect NATS {0}:{1}.", url.Host, url.Port);
logger.LogError(ex, "Fail to connect NATS {0}", url);
}

if (socketWriter != null)
Expand All @@ -402,7 +401,7 @@ async void ReconnectLoop()

lock (gate)
{
logger.LogInformation("Connect succeed {0}, NATS {1}:{2}", name, url.Host, url.Port);
logger.LogInformation("Connect succeed {0}, NATS {1}", name, url);
this.ConnectionState = NatsConnectionState.Open;
this.pingTimerCancellationTokenSource = new CancellationTokenSource();
StartPingTimer(pingTimerCancellationTokenSource.Token);
Expand Down
42 changes: 28 additions & 14 deletions src/AlterNats/NatsUri.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,38 +2,52 @@

internal sealed class NatsUri : IEquatable<NatsUri>
{
const string DefaultScheme = "nats://";
public static readonly NatsUri Default = new NatsUri("nats://localhost:4222");
public const int DefaultPort = 4222;
public const string DefaultScheme = "nats";

internal readonly Uri Uri;
public bool IsSecure { get; }
public bool IsWebSocket { get; }
public readonly Uri Uri;
public readonly bool IsSecure;
public readonly bool IsWebSocket;
public string Host => Uri.Host;
public int Port => Uri.Port;

public NatsUri(string urlString)
public NatsUri(string urlString, string defaultScheme = DefaultScheme)
{
if (!urlString.Contains("://"))
{
urlString = DefaultScheme + urlString;
urlString = $"{defaultScheme}://{urlString}";
}

this.Uri = new Uri(urlString);
if (Uri.Scheme is "tls" or "wss")
var uriBuilder = new UriBuilder(new Uri(urlString, UriKind.Absolute));
if (string.IsNullOrEmpty(uriBuilder.Host))
{
IsSecure = true;
uriBuilder.Host = "localhost";
}

if (Uri.Scheme is "ws" or "wss")
switch (uriBuilder.Scheme)
{
IsWebSocket = true;
case "nats":
if (uriBuilder.Port == -1)
{
uriBuilder.Port = 4222;
}
break;
case "ws":
IsWebSocket = true;
break;
case "wss":
IsWebSocket = true;
IsSecure = true;
break;
default:
throw new ArgumentException($"unsupported scheme {uriBuilder.Scheme} in nats URL {urlString}", urlString);
}

Uri = uriBuilder.Uri;
}

public override string ToString()
{
return Uri.ToString().Trim('/');
return IsWebSocket && Uri.AbsolutePath != "/" ? Uri.ToString() : Uri.ToString().Trim('/');
}

public override int GetHashCode() => Uri.GetHashCode();
Expand Down
13 changes: 7 additions & 6 deletions tests/AlterNats.Tests/NatsConnectionTest.Sharding.cs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
namespace AlterNats.Tests;

public partial class NatsConnectionTest
public abstract partial class NatsConnectionTest
{
// TODO:do.
[Fact]
public async Task ConnectionPoolTest()
{
await using var server = new NatsServer(output);
await using var server = new NatsServer(output, transportType);

var conn = server.CreatePooledClientConnection();

Expand All @@ -24,11 +24,12 @@ public async Task ConnectionPoolTest()
[Fact]
public async Task ShardingConnectionTest()
{
await using var server1 = new NatsServer(output);
await using var server2 = new NatsServer(output);
await using var server3 = new NatsServer(output);
await using var server1 = new NatsServer(output, transportType);
await using var server2 = new NatsServer(output, transportType);
await using var server3 = new NatsServer(output, transportType);

var urls = new[] { server1.Port, server2.Port, server3.Port }.Select(x => $"nats://localhost:{x}").ToArray();
var urls = new[] { server1.Ports.ServerPort, server2.Ports.ServerPort, server3.Ports.ServerPort }
.Select(x => $"nats://localhost:{x}").ToArray();
var shardedConnection = new NatsShardingConnection(1, NatsOptions.Default, urls);


Expand Down
15 changes: 15 additions & 0 deletions tests/AlterNats.Tests/NatsConnectionTest.Transports.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
namespace AlterNats.Tests;

public class NatsConnectionTestTcp : NatsConnectionTest
{
public NatsConnectionTestTcp(ITestOutputHelper output) : base(output, TransportType.Tcp)
{
}
}

public class NatsConnectionTestWs : NatsConnectionTest
{
public NatsConnectionTestWs(ITestOutputHelper output) : base(output, TransportType.WebSocket)
{
}
}
Loading

0 comments on commit e28baff

Please sign in to comment.