Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gossip fixes #2092

Merged
merged 6 commits into from
Jan 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions ProtoActor.sln
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "KubernetesDiagnostics", "be
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Proto.Cluster.SeedNode.MongoDb", "src\Proto.Cluster.SeedNode.MongoDb\Proto.Cluster.SeedNode.MongoDb.csproj", "{6611DA4A-6471-45CE-A288-45BC7BF00B52}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "GossipDecoder", "benchmarks\GossipDecoder\GossipDecoder.csproj", "{FC144547-78F5-4C0B-B886-B7BC1563893B}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -1402,6 +1404,18 @@ Global
{6611DA4A-6471-45CE-A288-45BC7BF00B52}.Release|x64.Build.0 = Release|Any CPU
{6611DA4A-6471-45CE-A288-45BC7BF00B52}.Release|x86.ActiveCfg = Release|Any CPU
{6611DA4A-6471-45CE-A288-45BC7BF00B52}.Release|x86.Build.0 = Release|Any CPU
{FC144547-78F5-4C0B-B886-B7BC1563893B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{FC144547-78F5-4C0B-B886-B7BC1563893B}.Debug|Any CPU.Build.0 = Debug|Any CPU
{FC144547-78F5-4C0B-B886-B7BC1563893B}.Debug|x64.ActiveCfg = Debug|Any CPU
{FC144547-78F5-4C0B-B886-B7BC1563893B}.Debug|x64.Build.0 = Debug|Any CPU
{FC144547-78F5-4C0B-B886-B7BC1563893B}.Debug|x86.ActiveCfg = Debug|Any CPU
{FC144547-78F5-4C0B-B886-B7BC1563893B}.Debug|x86.Build.0 = Debug|Any CPU
{FC144547-78F5-4C0B-B886-B7BC1563893B}.Release|Any CPU.ActiveCfg = Release|Any CPU
{FC144547-78F5-4C0B-B886-B7BC1563893B}.Release|Any CPU.Build.0 = Release|Any CPU
{FC144547-78F5-4C0B-B886-B7BC1563893B}.Release|x64.ActiveCfg = Release|Any CPU
{FC144547-78F5-4C0B-B886-B7BC1563893B}.Release|x64.Build.0 = Release|Any CPU
{FC144547-78F5-4C0B-B886-B7BC1563893B}.Release|x86.ActiveCfg = Release|Any CPU
{FC144547-78F5-4C0B-B886-B7BC1563893B}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -1530,6 +1544,7 @@ Global
{BDB67DAB-12F8-4D9F-BF7E-9F5D9E723816} = {3D12F5E5-9774-4D7E-8A5B-B1F64544925B}
{5FECD1A8-A873-4927-81C3-E5C5A37D80C5} = {0F3AB331-C042-4371-A2F0-0AFDFA13DC9F}
{6611DA4A-6471-45CE-A288-45BC7BF00B52} = {3D12F5E5-9774-4D7E-8A5B-B1F64544925B}
{FC144547-78F5-4C0B-B886-B7BC1563893B} = {0F3AB331-C042-4371-A2F0-0AFDFA13DC9F}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {CD0D1E44-8118-4682-8793-6B20ABFA824C}
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/ClusterBenchmark/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
services:

consul-agent-1: &consul-agent
image: consul:latest
image: hashicorp/consul:latest
networks:
- consul
command: "agent -retry-join consul-server-bootstrap -client 0.0.0.0"
Expand Down
14 changes: 14 additions & 0 deletions benchmarks/GossipDecoder/GossipDecoder.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net7.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\..\src\Proto.Cluster\Proto.Cluster.csproj" />
</ItemGroup>

</Project>
11 changes: 11 additions & 0 deletions benchmarks/GossipDecoder/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
// See https://aka.ms/new-console-template for more information

using System.Runtime.InteropServices.JavaScript;
using Google.Protobuf;
using Google.Protobuf.WellKnownTypes;
using Proto.Cluster;

Console.WriteLine("Hello, World!");
var data2 = "abc"; //enter base64 encoded data here
var message = ClusterTopology.Parser.ParseFrom(ByteString.FromBase64(data2));
Console.WriteLine(message);
1 change: 1 addition & 0 deletions examples/ActorMetrics/ActorMetrics.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

<ItemGroup>
<ProjectReference Include="..\..\src\Proto.Actor\Proto.Actor.csproj" />
<ProjectReference Include="..\..\src\Proto.Cluster.Consul\Proto.Cluster.Consul.csproj" />
<ProjectReference Include="..\..\src\Proto.Cluster\Proto.Cluster.csproj" />
<ProjectReference Include="..\..\src\Proto.OpenTelemetry\Proto.OpenTelemetry.csproj" />
<ProjectReference Include="..\..\src\Proto.Remote\Proto.Remote.csproj" />
Expand Down
5 changes: 3 additions & 2 deletions examples/ActorMetrics/RunDummyCluster.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using Microsoft.Extensions.Logging;
using Proto;
using Proto.Cluster;
using Proto.Cluster.Consul;
using Proto.Cluster.Partition;
using Proto.Cluster.Seed;
using Proto.Remote;
Expand All @@ -32,7 +33,7 @@ public static void Run()

var clusterConfig =
ClusterConfig
.Setup("MyCluster", new SeedNodeClusterProvider(),
.Setup("MyCluster", new ConsulProvider(new ConsulProviderConfig()),
new PartitionIdentityLookup()
);

Expand Down Expand Up @@ -62,7 +63,7 @@ public static void Run()
var clusterConfig2 =
ClusterConfig
.Setup("MyCluster",
new SeedNodeClusterProvider(new SeedNodeClusterProviderOptions(system.GetAddress())),
new ConsulProvider(new ConsulProviderConfig()),
new PartitionIdentityLookup()
)
.WithClusterKind("somekind", props);
Expand Down
1 change: 1 addition & 0 deletions examples/AspNetGrains/Node1/Node1.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\..\..\src\Proto.Cluster.Consul\Proto.Cluster.Consul.csproj" />
<ProjectReference Include="..\..\..\src\Proto.Cluster.SeedNode.Redis\Proto.Cluster.SeedNode.Redis.csproj" />
<ProjectReference Include="..\..\..\src\Proto.Cluster\Proto.Cluster.csproj" />
<ProjectReference Include="..\Messages\Messages.csproj" />
Expand Down
9 changes: 2 additions & 7 deletions examples/AspNetGrains/Node1/Program.cs
Original file line number Diff line number Diff line change
@@ -1,20 +1,15 @@
using AspNetGrains.Messages;
using Proto;
using Proto.Cluster;
using Proto.Cluster.Seed;
using Proto.Cluster.SeedNode.Redis;
using Proto.Cluster.Consul;
using Proto.Remote;
using StackExchange.Redis;

var builder = WebApplication.CreateBuilder(args);
builder.Services.AddLogging(x => x.AddConsole());

var multiplexer = ConnectionMultiplexer.Connect("localhost:6379");
var discovery = new RedisSeedNodeDiscovery(multiplexer);

builder.Services.AddProtoCluster("MyCluster", port: 0,
configureRemote: r => r.WithProtoMessages(AspNetGrains.Messages.ProtosReflection.Descriptor),
configureCluster: c => c, clusterProvider: SeedNodeClusterProvider.JoinWithDiscovery(discovery));
configureCluster: c => c, clusterProvider: new ConsulProvider(new ConsulProviderConfig()));

builder.Services.AddHealthChecks().AddCheck<ClusterHealthCheck>("proto", null, new[] { "ready", "live" });

Expand Down
1 change: 1 addition & 0 deletions examples/AspNetGrains/Node2/Node2.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\..\..\src\Proto.Cluster.Consul\Proto.Cluster.Consul.csproj" />
<ProjectReference Include="..\..\..\src\Proto.Cluster.SeedNode.Redis\Proto.Cluster.SeedNode.Redis.csproj" />
<ProjectReference Include="..\Messages\Messages.csproj" />
</ItemGroup>
Expand Down
9 changes: 2 additions & 7 deletions examples/AspNetGrains/Node2/Program.cs
Original file line number Diff line number Diff line change
@@ -1,23 +1,18 @@
using AspNetGrains.Messages;
using Proto;
using Proto.Cluster;
using Proto.Cluster.Seed;
using Proto.Cluster.SeedNode.Redis;
using Proto.Cluster.Consul;
using Proto.Remote;
using StackExchange.Redis;

var builder = WebApplication.CreateBuilder(args);

builder.Services.AddLogging(x => x.AddConsole());

var multiplexer = ConnectionMultiplexer.Connect("localhost:6379");
var discovery = new RedisSeedNodeDiscovery(multiplexer);

builder.Services.AddProtoCluster("MyCluster", port: 0,
configureRemote: r => r.WithProtoMessages(AspNetGrains.Messages.ProtosReflection.Descriptor),
configureCluster: c =>
c.WithClusterKind(HelloGrainActor.GetClusterKind((ctx, ci) => new Node2.HelloGrain(ctx, ci.Identity))),
clusterProvider: SeedNodeClusterProvider.JoinWithDiscovery(discovery));
clusterProvider: new ConsulProvider(new ConsulProviderConfig()));

builder.Services.AddHealthChecks().AddCheck<ClusterHealthCheck>("proto", null, new[] { "ready", "live" });

Expand Down
1 change: 1 addition & 0 deletions examples/ClusterGrainHelloWorld/Node1/Node1.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
<LangVersion>10</LangVersion>
</PropertyGroup>
<ItemGroup>
<ProjectReference Include="..\..\..\src\Proto.Cluster.Consul\Proto.Cluster.Consul.csproj" />
<ProjectReference Include="..\..\..\src\Proto.Cluster\Proto.Cluster.csproj"/>
<ProjectReference Include="..\..\..\src\Proto.Remote\Proto.Remote.csproj"/>
<ProjectReference Include="..\Messages\Messages.csproj"/>
Expand Down
6 changes: 2 additions & 4 deletions examples/ClusterGrainHelloWorld/Node1/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,12 @@
// -----------------------------------------------------------------------

using System;
using System.Threading.Tasks;
using ClusterHelloWorld.Messages;
using Microsoft.Extensions.Logging;
using Proto;
using Proto.Cluster;
using Proto.Cluster.Partition;
using Proto.Cluster.Consul;
using Proto.Cluster.PartitionActivator;
using Proto.Cluster.Seed;
using Proto.Remote;
using Proto.Remote.GrpcNet;
using static Proto.CancellationTokens;
Expand All @@ -28,7 +26,7 @@
.WithRemote(GrpcNetRemoteConfig.BindToLocalhost().WithProtoMessages(ProtosReflection.Descriptor))
.WithCluster(ClusterConfig
.Setup("MyCluster",
SeedNodeClusterProvider.JoinSeedNode("127.0.0.1",8090),
new ConsulProvider(new ConsulProviderConfig()),
new PartitionActivatorLookup()));

system.EventStream.Subscribe<ClusterTopology>(
Expand Down
1 change: 1 addition & 0 deletions examples/ClusterGrainHelloWorld/Node2/Node2.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
<LangVersion>11</LangVersion>
</PropertyGroup>
<ItemGroup>
<ProjectReference Include="..\..\..\src\Proto.Cluster.Consul\Proto.Cluster.Consul.csproj" />
<ProjectReference Include="..\..\..\src\Proto.Cluster\Proto.Cluster.csproj"/>
<ProjectReference Include="..\..\..\src\Proto.Remote\Proto.Remote.csproj"/>
<ProjectReference Include="..\Messages\Messages.csproj"/>
Expand Down
5 changes: 2 additions & 3 deletions examples/ClusterGrainHelloWorld/Node2/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,8 @@
using Microsoft.Extensions.Logging;
using Proto;
using Proto.Cluster;
using Proto.Cluster.Partition;
using Proto.Cluster.Consul;
using Proto.Cluster.PartitionActivator;
using Proto.Cluster.Seed;
using Proto.Remote;
using Proto.Remote.GrpcNet;
using static System.Threading.Tasks.Task;
Expand All @@ -27,7 +26,7 @@
var system = new ActorSystem(new ActorSystemConfig().WithDeveloperSupervisionLogging(true))
.WithRemote(GrpcNetRemoteConfig.BindToLocalhost(8090).WithProtoMessages(ProtosReflection.Descriptor))
.WithCluster(ClusterConfig
.Setup("MyCluster", SeedNodeClusterProvider.StartSeedNode(), new PartitionActivatorLookup())
.Setup("MyCluster", new ConsulProvider(new ConsulProviderConfig()), new PartitionActivatorLookup())
.WithClusterKind(
HelloGrainActor.GetClusterKind((ctx, identity) => new HelloGrain(ctx, identity.Identity)))
);
Expand Down
50 changes: 50 additions & 0 deletions src/Proto.Cluster/Gossip/Gossip.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,43 @@
// </copyright>
// -----------------------------------------------------------------------


/*
Member abc state:
GossipState
MemberState abc
Key123 - sequence id 1
Key456 - sequence id 2

MemberState def
Key123 - sequence id 1
Key456 - sequence id 2

committed offsets
"abc.def" 1

Member def state:
GossipState
MemberState abc
Key123 - sequence id 1
Key456 - sequence id 2

MemberState def
Key123 - sequence id 3
Key456 - sequence id 2

committed offsets
"abc.def" 2


gossip from def to abc
scan all entries for all member states _except_ for abc (we shouldn´t send their state to them)
find all entries that are higher than the committed offset for the member state
send all entries to the target member

committed offsets is local per member, meaning we will send excessive and needless data to other members
maybe this could be improved by having each nodes committed offsets as part of the MemberState (?)
*/
using System;
using System.Collections.Generic;
using System.Collections.Immutable;
Expand Down Expand Up @@ -281,13 +318,26 @@ private void Purge()
//find all members that have sent topology
var members = _getMembers();

//purge member states
foreach (var memberId in _state.Members.Keys.ToArray())
{
if (!members.Contains(memberId))
{
_state.Members.Remove(memberId);
}
}

//purge committed offsets
foreach (var x in _committedOffsets.Keys.ToArray())
{
var parts = x.Split(".");
var from = parts[0];
var to = parts[1];
if (!members.Contains(from) || !members.Contains(to))
{
_committedOffsets = _committedOffsets.Remove(x);
}
}
}

private void CommitPendingOffsets(ImmutableDictionary<string, long> pendingOffsets)
Expand Down
2 changes: 2 additions & 0 deletions src/Proto.Cluster/GossipContracts.proto
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ message GossipResponse {
//key + member_id gets it's own entry, if collision, highest version is selected
message GossipState {
message GossipMemberState {
//key is e.g. "cluster.topology", "cluster.heartbeat"
map<string,GossipKeyValue> values = 1;
}

//key is member id
map<string, GossipMemberState> members = 1;
}

Expand Down
2 changes: 1 addition & 1 deletion tests/Proto.Cluster.Tests/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
services:

consul-agent-1: &consul-agent
image: consul:latest
image: hashicorp/consul:latest
networks:
- consul
command: "agent -retry-join consul-server-bootstrap -client 0.0.0.0"
Expand Down
Loading