Skip to content

Commit

Permalink
adding ClusterSingleton hosting methods (#51)
Browse files Browse the repository at this point in the history
* adding `ClusterSingleton` hosting methods

* fixed compilation errors

* added test output logger

* fixed package metadata issue

* fixed bug with singleton proxy startup

* made tests more configurable

* working on fixing specs

* fixed ClusterSingletonProxy configuration

* fixed bug with singleton and proxy method

* Update AkkaClusterHostingExtensions.cs
  • Loading branch information
Aaronontheweb authored May 19, 2022
1 parent 36363ac commit a7ad10e
Show file tree
Hide file tree
Showing 4 changed files with 277 additions and 2 deletions.
6 changes: 6 additions & 0 deletions Akka.Hosting.sln
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Akka.Hosting.SqlSharding",
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Akka.Remote.Hosting.Tests", "src\Akka.Remote.Hosting.Tests\Akka.Remote.Hosting.Tests.csproj", "{4D748F16-AC22-4E8B-94D7-3DAF6B7CBD00}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Akka.Cluster.Hosting.Tests", "src\Akka.Cluster.Hosting.Tests\Akka.Cluster.Hosting.Tests.csproj", "{EEFCC5A9-94BB-41DA-A9D3-12ACB889FE42}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -81,6 +83,10 @@ Global
{4D748F16-AC22-4E8B-94D7-3DAF6B7CBD00}.Debug|Any CPU.Build.0 = Debug|Any CPU
{4D748F16-AC22-4E8B-94D7-3DAF6B7CBD00}.Release|Any CPU.ActiveCfg = Release|Any CPU
{4D748F16-AC22-4E8B-94D7-3DAF6B7CBD00}.Release|Any CPU.Build.0 = Release|Any CPU
{EEFCC5A9-94BB-41DA-A9D3-12ACB889FE42}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{EEFCC5A9-94BB-41DA-A9D3-12ACB889FE42}.Debug|Any CPU.Build.0 = Debug|Any CPU
{EEFCC5A9-94BB-41DA-A9D3-12ACB889FE42}.Release|Any CPU.ActiveCfg = Release|Any CPU
{EEFCC5A9-94BB-41DA-A9D3-12ACB889FE42}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down
17 changes: 17 additions & 0 deletions src/Akka.Cluster.Hosting.Tests/Akka.Cluster.Hosting.Tests.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>$(TestsNetCoreFramework)</TargetFramework>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Akka.TestKit.Xunit2" Version="$(AkkaVersion)" />
<PackageReference Include="FluentAssertions" Version="6.7.0" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="$(TestSdkVersion)" />
<PackageReference Include="xunit" Version="$(XunitVersion)" />
<PackageReference Include="xunit.runner.visualstudio" Version="$(XunitVersion)" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\Akka.Cluster.Hosting\Akka.Cluster.Hosting.csproj" />
</ItemGroup>
</Project>
130 changes: 130 additions & 0 deletions src/Akka.Cluster.Hosting.Tests/ClusterSingletonSpecs.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Event;
using Akka.Hosting;
using Akka.Remote.Hosting;
using Akka.TestKit.Xunit2.Internals;
using FluentAssertions;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Xunit;
using Xunit.Abstractions;

namespace Akka.Cluster.Hosting.Tests;

public class ClusterSingletonSpecs
{
public ClusterSingletonSpecs(ITestOutputHelper output)
{
Output = output;
}

public ITestOutputHelper Output { get; }

private class MySingletonActor : ReceiveActor
{
public static Props MyProps => Props.Create(() => new MySingletonActor());

public MySingletonActor()
{
ReceiveAny(_ => Sender.Tell(_));
}
}

private async Task<IHost> CreateHost(Action<AkkaConfigurationBuilder> specBuilder, ClusterOptions options)
{
var tcs = new TaskCompletionSource();
using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(10));

var host = new HostBuilder()
.ConfigureServices(collection =>
{
collection.AddAkka("TestSys", (configurationBuilder, provider) =>
{
configurationBuilder
.WithRemoting("localhost", 0)
.WithClustering(options)
.WithActors((system, registry) =>
{
var extSystem = (ExtendedActorSystem)system;
var logger = extSystem.SystemActorOf(Props.Create(() => new TestOutputLogger(Output)), "log-test");
logger.Tell(new InitializeLogger(system.EventStream));
})
.WithActors(async (system, registry) =>
{
var cluster = Cluster.Get(system);
cluster.RegisterOnMemberUp(() =>
{
tcs.SetResult();
});
if (options.SeedNodes == null || options.SeedNodes.Length == 0)
{
var myAddress = cluster.SelfAddress;
await cluster.JoinAsync(myAddress); // force system to wait until we're up
}
});
specBuilder(configurationBuilder);
});
}).Build();

await host.StartAsync(cancellationTokenSource.Token);
await (tcs.Task.WaitAsync(cancellationTokenSource.Token));

return host;
}

[Fact]
public async Task Should_launch_ClusterSingletonAndProxy()
{
// arrange
using var host = await CreateHost(
builder => { builder.WithSingleton<MySingletonActor>("my-singleton", MySingletonActor.MyProps); },
new ClusterOptions(){ Roles = new[] { "my-host"}});

var registry = host.Services.GetRequiredService<ActorRegistry>();
var singletonProxy = registry.Get<MySingletonActor>();

// act

// verify round-trip to the singleton proxy and back
var respond = await singletonProxy.Ask<string>("hit", TimeSpan.FromSeconds(3));

// assert
respond.Should().Be("hit");

await host.StopAsync();
}

[Fact]
public async Task Should_launch_ClusterSingleton_and_Proxy_separately()
{
// arrange

var singletonOptions = new ClusterSingletonOptions() { Role = "my-host" };
using var singletonHost = await CreateHost(
builder => { builder.WithSingleton<MySingletonActor>("my-singleton", MySingletonActor.MyProps, singletonOptions, createProxyToo:false); },
new ClusterOptions(){ Roles = new[] { "my-host"}});

var singletonSystem = singletonHost.Services.GetRequiredService<ActorSystem>();
var address = Cluster.Get(singletonSystem).SelfAddress;

using var singletonProxyHost = await CreateHost(
builder => { builder.WithSingletonProxy<MySingletonActor>("my-singleton", singletonOptions); },
new ClusterOptions(){ Roles = new[] { "proxy" }, SeedNodes = new Address[]{ address } });

var registry = singletonProxyHost.Services.GetRequiredService<ActorRegistry>();
var singletonProxy = registry.Get<MySingletonActor>();

// act

// verify round-trip to the singleton proxy and back
var respond = await singletonProxy.Ask<string>("hit", TimeSpan.FromSeconds(3));

// assert
respond.Should().Be("hit");

await Task.WhenAll(singletonHost.StopAsync(), singletonProxyHost.StopAsync());
}
}
126 changes: 124 additions & 2 deletions src/Akka.Cluster.Hosting/AkkaClusterHostingExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@ public sealed class ClusterOptions
public Address[] SeedNodes { get; set; }
}

public sealed class ClusterSingletonOptions
{
public int? BufferSize { get; set; } = null;
public string Role { get; set; }
public object TerminationMessage { get; set; }
}

public sealed class ShardOptions
{
public StateStoreMode StateStoreMode { get; set; } = StateStoreMode.DData;
Expand Down Expand Up @@ -125,6 +132,9 @@ public static AkkaConfigurationBuilder WithShardRegion<TKey>(this AkkaConfigurat
.WithRole(shardOptions.Role)
.WithRememberEntities(shardOptions.RememberEntities)
.WithStateStoreMode(shardOptions.StateStoreMode), messageExtractor);

// TODO: should throw here if duplicate key used

registry.TryRegister<TKey>(shardRegion);
});
}
Expand Down Expand Up @@ -162,6 +172,9 @@ public static AkkaConfigurationBuilder WithShardRegion<TKey>(this AkkaConfigurat
.WithRole(shardOptions.Role)
.WithRememberEntities(shardOptions.RememberEntities)
.WithStateStoreMode(shardOptions.StateStoreMode), extractEntityId, extractShardId);

// TODO: should throw here if duplicate key used

registry.TryRegister<TKey>(shardRegion);
});
}
Expand Down Expand Up @@ -191,6 +204,9 @@ public static AkkaConfigurationBuilder WithShardRegionProxy<TKey>(this AkkaConfi
{
var shardRegionProxy = await ClusterSharding.Get(system)
.StartProxyAsync(typeName, roleName, extractEntityId, extractShardId);

// TODO: should throw here if duplicate key used

registry.TryRegister<TKey>(shardRegionProxy);
});
}
Expand All @@ -215,6 +231,9 @@ public static AkkaConfigurationBuilder WithShardRegionProxy<TKey>(this AkkaConfi
{
var shardRegionProxy = await ClusterSharding.Get(system)
.StartProxyAsync(typeName, roleName, messageExtractor);

// TODO: should throw here if duplicate key used

registry.TryRegister<TKey>(shardRegionProxy);
});
}
Expand All @@ -237,13 +256,116 @@ public static AkkaConfigurationBuilder WithDistributedPubSub(this AkkaConfigurat
{
middle = middle.AddHocon($"akka.cluster.pub-sub = \"{role}\"");
}

return middle.WithActors((system, registry) =>
{
// force the initialization
var mediator = DistributedPubSub.Get(system).Mediator;
registry.TryRegister<DistributedPubSub>(mediator);
});
}

/// <summary>
/// Creates a new <see cref="ClusterSingletonManager"/> to host an actor created via <see cref="actorProps"/>.
///
/// If <paramref name="createProxyToo"/> is set to <c>true</c> then this method will also create a <see cref="ClusterSingletonProxy"/> that
/// will be added to the <see cref="ActorRegistry"/> using the key <see cref="TKey"/>. Otherwise this method will register nothing with
/// the <see cref="ActorRegistry"/>.
/// </summary>
/// <param name="builder">The builder instance being configured.</param>
/// <param name="singletonName">The name of this singleton instance. Will also be used in the <see cref="ActorPath"/> for the <see cref="ClusterSingletonManager"/> and
/// optionally, the <see cref="ClusterSingletonProxy"/> created by this method.</param>
/// <param name="actorProps">The underlying actor type. SHOULD NOT BE CREATED USING <see cref="ClusterSingletonManager.Props"/></param>
/// <param name="options">Optional. The set of options for configuring both the <see cref="ClusterSingletonManager"/> and
/// optionally, the <see cref="ClusterSingletonProxy"/>.</param>
/// <param name="createProxyToo">When set to <c>true></c>, creates a <see cref="ClusterSingletonProxy"/> that automatically points to the <see cref="ClusterSingletonManager"/> created by this method.</param>
/// <typeparam name="TKey">The key type to use for the <see cref="ActorRegistry"/> when <paramref name="createProxyToo"/> is set to <c>true</c>.</typeparam>
/// <returns>The same <see cref="AkkaConfigurationBuilder"/> instance originally passed in.</returns>
public static AkkaConfigurationBuilder WithSingleton<TKey>(this AkkaConfigurationBuilder builder,
string singletonName, Props actorProps, ClusterSingletonOptions options = null, bool createProxyToo = true)
{
return builder.WithActors((system, registry) =>
{
options ??= new ClusterSingletonOptions();
var clusterSingletonManagerSettings =
ClusterSingletonManagerSettings.Create(system).WithSingletonName(singletonName);

var singletonProxySettings =
ClusterSingletonProxySettings.Create(system).WithSingletonName(singletonName);

if (!string.IsNullOrEmpty(options.Role))
{
clusterSingletonManagerSettings = clusterSingletonManagerSettings.WithRole(options.Role);
singletonProxySettings = singletonProxySettings.WithRole(options.Role);
}

var singletonProps = options.TerminationMessage == null
? ClusterSingletonManager.Props(actorProps, clusterSingletonManagerSettings)
: ClusterSingletonManager.Props(actorProps, options.TerminationMessage,
clusterSingletonManagerSettings);

var singletonManagerRef = system.ActorOf(singletonProps, singletonName);

// create a proxy that can talk to the singleton we just created
// and add it to the ActorRegistry
if (createProxyToo)
{
if (options.BufferSize != null)
{
singletonProxySettings = singletonProxySettings.WithBufferSize(options.BufferSize.Value);
}

CreateAndRegisterSingletonProxy<TKey>(singletonManagerRef.Path.Name, $"/user/{singletonManagerRef.Path.Name}", singletonProxySettings, system, registry);
}
});
}

private static void CreateAndRegisterSingletonProxy<TKey>(string singletonActorName, string singletonActorPath,
ClusterSingletonProxySettings singletonProxySettings, ActorSystem system, IActorRegistry registry)
{
var singletonProxyProps = ClusterSingletonProxy.Props(singletonActorPath,
singletonProxySettings);
var singletonProxy = system.ActorOf(singletonProxyProps, $"{singletonActorName}-proxy");

registry.Register<TKey>(singletonProxy);
}

/// <summary>
/// Creates a <see cref="ClusterSingletonProxy"/> and adds it to the <see cref="ActorRegistry"/> using the given
/// <see cref="TKey"/>.
/// </summary>
/// <param name="builder">The builder instance being configured.</param>
/// <param name="singletonName">The name of this singleton instance. Will also be used in the <see cref="ActorPath"/> for the <see cref="ClusterSingletonManager"/> and
/// optionally, the <see cref="ClusterSingletonProxy"/> created by this method.</param>
/// <param name="options">Optional. The set of options for configuring the <see cref="ClusterSingletonProxy"/>.</param>
/// <param name="singletonManagerPath">Optional. By default Akka.Hosting will assume the <see cref="ClusterSingletonManager"/> is hosted at "/user/{singletonName}" - but
/// if for some reason the path is different you can use this property to override that value.</param>
/// <typeparam name="TKey">The key type to use for the <see cref="ActorRegistry"/>.</typeparam>
/// <returns>The same <see cref="AkkaConfigurationBuilder"/> instance originally passed in.</returns>
public static AkkaConfigurationBuilder WithSingletonProxy<TKey>(this AkkaConfigurationBuilder builder,
string singletonName, ClusterSingletonOptions options = null, string singletonManagerPath = null)
{
return builder.WithActors((system, registry) =>
{
options ??= new ClusterSingletonOptions();

var singletonProxySettings =
ClusterSingletonProxySettings.Create(system).WithSingletonName(singletonName);

if (!string.IsNullOrEmpty(options.Role))
{
singletonProxySettings = singletonProxySettings.WithRole(options.Role);
}

if (options.BufferSize != null)
{
singletonProxySettings = singletonProxySettings.WithBufferSize(options.BufferSize.Value);
}

singletonManagerPath ??= $"/user/{singletonName}";

CreateAndRegisterSingletonProxy<TKey>(singletonName, singletonManagerPath, singletonProxySettings, system, registry);
});
}
}
}
}

0 comments on commit a7ad10e

Please sign in to comment.