Skip to content

Commit

Permalink
Fix tests by adding gRPC server to simulate gossip responses
Browse files Browse the repository at this point in the history
  • Loading branch information
shaan1337 committed May 29, 2020
1 parent ccc0aaa commit 6d6fa17
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 32 deletions.
2 changes: 1 addition & 1 deletion src/EventStore.Client/EventStore.Client.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
</ItemGroup>
<ItemGroup>
<Protobuf Include="..\EventStore.Client.Common\protos\shared.proto" GrpcServices="Client" ProtoRoot="..\EventStore.Client.Common\protos" />
<Protobuf Include="..\EventStore.Client.Common\protos\gossip.proto" GrpcServices="Client" ProtoRoot="..\EventStore.Client.Common\protos" />
<Protobuf Include="..\EventStore.Client.Common\protos\gossip.proto" ProtoRoot="..\EventStore.Client.Common\protos" />
</ItemGroup>
<ItemGroup>
<Compile Remove="..\EventStore.Client.Common\TypedExceptionInterceptor.cs" />
Expand Down
3 changes: 3 additions & 0 deletions test/EventStore.Client.Tests/EventStore.Client.Tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
<ItemGroup>
<ProjectReference Include="..\..\src\EventStore.Client\EventStore.Client.csproj" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Grpc" Version="2.27.0" />
</ItemGroup>
<ItemGroup>
<Compile Remove="..\EventStore.Client.Tests.Common\*.cs" />
</ItemGroup>
Expand Down
156 changes: 125 additions & 31 deletions test/EventStore.Client.Tests/GossipBasedEndpointDiscovererTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,25 @@
using System.Linq;
using System.Net;
using System.Net.Http;
using System.Text.Json;
using System.Text.Json.Serialization;
using System.Net.Sockets;
using System.Security.Cryptography;
using System.Security.Cryptography.X509Certificates;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using EventStore.Client.Gossip;
using Grpc.Core;
using Xunit;

#nullable enable
namespace EventStore.Client {
public class GossipBasedEndpointDiscovererTests {
public class GossipBasedEndpointDiscovererTests: IAsyncLifetime {
private readonly Fixture _fixture;

public GossipBasedEndpointDiscovererTests() {
_fixture = new Fixture();
}

[Fact]
public async Task should_issue_gossip_to_gossip_seed() {
HttpRequestMessage? request = null;
Expand All @@ -26,12 +36,12 @@ public async Task should_issue_gossip_to_gossip_seed() {
}
};

var handler = new FakeMessageHandler(req => {
var handler = new CustomMessageHandler(req => {
request = req;
return ResponseFromGossip(gossip);
_fixture.CurrentClusterInfo.Members = gossip.Members;
});

var gossipSeed = new DnsEndPoint("gossip_seed_endpoint", 1114);
var gossipSeed = new DnsEndPoint(_fixture.Host, _fixture.Port);

var sut = new ClusterEndpointDiscoverer(1, new[] {
gossipSeed,
Expand Down Expand Up @@ -84,16 +94,16 @@ public async Task should_be_able_to_discover_twice() {
}
};

var handler = new FakeMessageHandler(req => {
var handler = new CustomMessageHandler(req => {
if (isFirstGossip) {
isFirstGossip = false;
return ResponseFromGossip(firstGossip);
_fixture.CurrentClusterInfo.Members = firstGossip.Members;
} else {
return ResponseFromGossip(secondGossip);
_fixture.CurrentClusterInfo.Members = secondGossip.Members;
}
});

var gossipSeed = new DnsEndPoint("gossip_seed_endpoint", 1114);
var gossipSeed = new DnsEndPoint(_fixture.Host, _fixture.Port);

var sut = new ClusterEndpointDiscoverer(5, new[] {
gossipSeed,
Expand All @@ -119,13 +129,13 @@ public async Task should_not_exceed_max_discovery_attempts() {
int maxDiscoveryAttempts = 5;
int discoveryAttempts = 0;

var handler = new FakeMessageHandler(request => {
var handler = new CustomMessageHandler(request => {
discoveryAttempts++;
throw new Exception();
});

var sut = new ClusterEndpointDiscoverer(maxDiscoveryAttempts, new[] {
new DnsEndPoint("localhost", 1114),
new DnsEndPoint(_fixture.Host, _fixture.Port),
}, Timeout.InfiniteTimeSpan, TimeSpan.Zero, NodePreference.Leader, handler);

await Assert.ThrowsAsync<DiscoveryException>(() => sut.DiscoverAsync());
Expand Down Expand Up @@ -157,9 +167,11 @@ public async Task should_not_be_able_to_pick_invalid_node(ClusterMessages.VNodeS
}
};

var handler = new FakeMessageHandler(req => ResponseFromGossip(gossip));
var handler = new CustomMessageHandler(req => {
_fixture.CurrentClusterInfo.Members = gossip.Members;
});

var sut = new ClusterEndpointDiscoverer(1, new[] { new DnsEndPoint("localhost", 1113),
var sut = new ClusterEndpointDiscoverer(1, new[] { new DnsEndPoint(_fixture.Host, _fixture.Port),
}, Timeout.InfiniteTimeSpan, TimeSpan.Zero, NodePreference.Leader, handler);

await Assert.ThrowsAsync<DiscoveryException>(() => sut.DiscoverAsync());
Expand Down Expand Up @@ -206,10 +218,12 @@ public async Task should_pick_node_based_on_preference(NodePreference preference
},
}
};
var handler = new FakeMessageHandler(req => ResponseFromGossip(gossip));
var handler = new CustomMessageHandler(req => {
_fixture.CurrentClusterInfo.Members = gossip.Members;
});

var sut = new ClusterEndpointDiscoverer(1, new[] {
new DnsEndPoint("localhost", 1113)
new DnsEndPoint(_fixture.Host, _fixture.Port)
}, Timeout.InfiniteTimeSpan, TimeSpan.Zero, preference, handler);

var result = await sut.DiscoverAsync();
Expand Down Expand Up @@ -237,36 +251,116 @@ public async Task falls_back_to_first_alive_node_if_a_preferred_node_is_not_foun
},
}
};
var handler = new FakeMessageHandler(req => ResponseFromGossip(gossip));
var handler = new CustomMessageHandler(req => {
_fixture.CurrentClusterInfo.Members = gossip.Members;
});

var sut = new ClusterEndpointDiscoverer(1, new[] {
new DnsEndPoint("localhost", 1113)
new DnsEndPoint(_fixture.Host, _fixture.Port)
}, Timeout.InfiniteTimeSpan, TimeSpan.Zero, NodePreference.Leader, handler);

var result = await sut.DiscoverAsync();
Assert.Equal(result.GetPort(),
gossip.Members.Last(x => x.State == ClusterMessages.VNodeState.Follower).HttpEndPointPort);
}

private HttpResponseMessage ResponseFromGossip(ClusterMessages.ClusterInfo gossip) =>
new HttpResponseMessage(HttpStatusCode.OK) {
Content = new StringContent(JsonSerializer.Serialize(gossip, new JsonSerializerOptions {
PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
PropertyNameCaseInsensitive = true,
Converters = {new JsonStringEnumConverter(JsonNamingPolicy.CamelCase)}
}))
};
private class CustomMessageHandler : HttpClientHandler {
private readonly Action<HttpRequestMessage> _handle;

private class FakeMessageHandler : HttpMessageHandler {
private readonly Func<HttpRequestMessage, HttpResponseMessage> _handle;

public FakeMessageHandler(Func<HttpRequestMessage, HttpResponseMessage> handle) {
public CustomMessageHandler(Action<HttpRequestMessage> handle) {
_handle = handle;
ServerCertificateCustomValidationCallback = delegate { return true; };
}

protected override Task<HttpResponseMessage> SendAsync(HttpRequestMessage request,
CancellationToken cancellationToken) {
return Task.FromResult(_handle(request));
_handle(request);
return base.SendAsync(request, cancellationToken);
}
}

public Task InitializeAsync() => _fixture.InitializeAsync();
public Task DisposeAsync() => _fixture.DisposeAsync();

public class Fixture : IAsyncLifetime {
public readonly string Host = "localhost";
public readonly int Port = GetFreePort();
public readonly ClusterMessages.ClusterInfo CurrentClusterInfo = new ClusterMessages.ClusterInfo();
private Server? _server;

private static int GetFreePort() {
using var socket =
new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp) {
ExclusiveAddressUse = false
};
socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, true);
socket.Bind(new IPEndPoint(IPAddress.Loopback, 0));
return ((IPEndPoint)socket.LocalEndPoint).Port;
}

private void StartGrpcServer() {
var keyCertificatePair = GenerateKeyCertificatePair();
_server = new Server
{
Services = { Gossip.Gossip.BindService(new GossipImplementation(CurrentClusterInfo)) },
Ports = { new ServerPort(Host, Port, new SslServerCredentials(new [] {keyCertificatePair})) }
};
_server.Start();
}

private KeyCertificatePair GenerateKeyCertificatePair() {
using (RSA rsa = RSA.Create())
{
var certReq = new CertificateRequest("CN=hello", rsa, HashAlgorithmName.SHA256, RSASignaturePadding.Pkcs1);
var certificate = certReq.CreateSelfSigned(DateTimeOffset.UtcNow.AddMonths(-1), DateTimeOffset.UtcNow.AddMonths(1));
var pemCertificateBuilder = new StringBuilder();
pemCertificateBuilder.AppendLine("-----BEGIN CERTIFICATE-----");
pemCertificateBuilder.AppendLine(Convert.ToBase64String(certificate.Export(X509ContentType.Cert), Base64FormattingOptions.InsertLineBreaks));
pemCertificateBuilder.AppendLine("-----END CERTIFICATE-----");
var pemCertificate = pemCertificateBuilder.ToString();

var pemKeyBuilder = new StringBuilder();
pemKeyBuilder.AppendLine("-----BEGIN RSA PRIVATE KEY-----");
pemKeyBuilder.AppendLine(Convert.ToBase64String(rsa.ExportRSAPrivateKey(), Base64FormattingOptions.InsertLineBreaks));
pemKeyBuilder.AppendLine("-----END RSA PRIVATE KEY-----");
var pemKey = pemKeyBuilder.ToString();

return new KeyCertificatePair(pemCertificate, pemKey);
}
}

private class GossipImplementation : Gossip.Gossip.GossipBase {
private readonly ClusterMessages.ClusterInfo _currentClusterInfo;

public GossipImplementation(ClusterMessages.ClusterInfo currentClusterInfo) {
_currentClusterInfo = currentClusterInfo;
}
public override Task<ClusterInfo> Read(Empty request, ServerCallContext context) {
if (_currentClusterInfo.Members == null) {
return Task.FromResult(new ClusterInfo());
}
var members = Array.ConvertAll(_currentClusterInfo.Members, x => new MemberInfo {
InstanceId = Uuid.FromGuid(x.InstanceId).ToDto(),
State = (MemberInfo.Types.VNodeState)x.State,
IsAlive = x.IsAlive,
HttpEndPoint = new Gossip.EndPoint {
Address = x.HttpEndPointIp,
Port = (uint) x.HttpEndPointPort
}
}).ToArray();
var info = new ClusterInfo();
info.Members.AddRange(members);
return Task.FromResult(info);
}
}

public Task InitializeAsync() {
StartGrpcServer();
return Task.CompletedTask;
}

public Task DisposeAsync() {
return _server == null ? Task.CompletedTask : _server.ShutdownAsync();
}
}
}
Expand Down

0 comments on commit 6d6fa17

Please sign in to comment.