Skip to content

Commit

Permalink
close #550
Browse files Browse the repository at this point in the history
helps resolve #673

Reimplemented Murmur3, ConsistentHashRouting to use virtual nodes
Added deploy and routerconfig fallback support
Rewrote ActorOf method to LocalActorRefProvider to match Akka
Rewrote ActorOf method to RemoteActorRefProvider to match Akka
Breaking change - renamed ConsistentHashable interface to IConsistentHashable (per #633)
Added MultiNodeTests for ClusterConsistentHashRouting
Implemented Pool routers for Akka.Cluster
  • Loading branch information
Aaronontheweb committed Mar 3, 2015
1 parent f319c27 commit ca9a37a
Show file tree
Hide file tree
Showing 45 changed files with 2,210 additions and 658 deletions.
2 changes: 2 additions & 0 deletions src/core/Akka.Cluster.Tests/Akka.Cluster.Tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@
<Compile Include="MultiNode\MultiNodeClusterSpec.cs" />
<Compile Include="MultiNode\MultiNodeFact.cs" />
<Compile Include="MultiNode\MultiNodeLoggingConfig.cs" />
<Compile Include="MultiNode\Routing\ClusterConsistentHashingGroupSpec.cs" />
<Compile Include="MultiNode\Routing\ClusterConsistentHashingRouterSpec.cs" />
<Compile Include="NodeMetricsSpec.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="Proto\ClusterMessageSerializerSpec.cs" />
Expand Down
2 changes: 2 additions & 0 deletions src/core/Akka.Cluster.Tests/MultiNode/JoinSeedNodeSpec.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System.Collections.Immutable;
using System.Threading;
using Akka.Actor;
using Akka.Configuration;
using Akka.Remote.TestKit;

namespace Akka.Cluster.Tests.MultiNode
Expand Down Expand Up @@ -31,6 +32,7 @@ public JoinSeedNodeConfig()
_ordinary2 = Role("ordinary2");

CommonConfig = MultiNodeLoggingConfig.LoggingConfig.WithFallback(DebugConfig(true))
.WithFallback(ConfigurationFactory.ParseString(@"akka.cluster.publish-stats-interval = 25s"))
.WithFallback(MultiNodeClusterSpec.ClusterConfig());
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Linq;
using Akka.Actor;
using Akka.Cluster.Routing;
using Akka.Configuration;
using Akka.Remote.TestKit;
using Akka.Routing;
using Akka.TestKit;

namespace Akka.Cluster.Tests.MultiNode.Routing
{
public class ClusterConsistentHashingGroupSpecConfig : MultiNodeConfig
{
#region Test classes
public sealed class Get { }
public sealed class Collected
{
public Collected(HashSet<object> messages)
{
Messages = messages;
}

public HashSet<object> Messages { get; private set; }
}

public class Destination : UntypedActor
{
private readonly HashSet<object> _receivedMessages = new HashSet<object>();
protected override void OnReceive(object message)
{
if (message is Get) Sender.Tell(new Collected(_receivedMessages));
else
{
_receivedMessages.Add(message);
}
}
}

#endregion

private readonly RoleName _first;
public RoleName First { get { return _first; } }

private readonly RoleName _second;
public RoleName Second { get { return _second; } }

private readonly RoleName _third;

public RoleName Third { get { return _third; } }

public ClusterConsistentHashingGroupSpecConfig()
{
_first = Role("first");
_second = Role("second");
_third = Role("third");

CommonConfig = MultiNodeLoggingConfig.LoggingConfig.WithFallback(DebugConfig(false))
.WithFallback(ConfigurationFactory.ParseString(@"
akka.cluster.publish-stats-interval = 5s
"))
.WithFallback(MultiNodeClusterSpec.ClusterConfig());
}
}

public class ClusterConsistentHashingGroupMultiNode1 : ClusterConsistentHashingGroupSpec { }
public class ClusterConsistentHashingGroupMultiNode2 : ClusterConsistentHashingGroupSpec { }
public class ClusterConsistentHashingGroupMultiNode3 : ClusterConsistentHashingGroupSpec { }


public abstract class ClusterConsistentHashingGroupSpec : MultiNodeClusterSpec
{
private readonly ClusterConsistentHashingGroupSpecConfig _config;

protected ClusterConsistentHashingGroupSpec() : this(new ClusterConsistentHashingGroupSpecConfig())
{

}

protected ClusterConsistentHashingGroupSpec(ClusterConsistentHashingGroupSpecConfig config) : base(config)
{
_config = config;
}

protected Routees CurrentRoutees(ActorRef router)
{
var routerAsk = router.Ask<Routees>(new GetRoutees(), GetTimeoutOrDefault(null));
routerAsk.Wait();
return routerAsk.Result;
}

/// <summary>
/// Fills in the self address for local ActorRef
/// </summary>
protected Address FullAddress(ActorRef actorRef)
{
if (string.IsNullOrEmpty(actorRef.Path.Address.Host) || !actorRef.Path.Address.Port.HasValue)
return Cluster.SelfAddress;
return actorRef.Path.Address;
}

[MultiNodeFact]
public void ClusterConsistentHashingGroupSpecs()
{
AClusterRouterWithConsitentHashingGroupMustStartClusterWith3Nodes();
AClusterRouterWithConsistentHashingGroupMustSendToSameDestinationsFromDifferentNodes();
}

protected void AClusterRouterWithConsitentHashingGroupMustStartClusterWith3Nodes()
{
Sys.ActorOf(Props.Create<ClusterConsistentHashingGroupSpecConfig.Destination>(), "dest");
AwaitClusterUp(_config.First, _config.Second, _config.Third);
EnterBarrier("after-1");
}

protected void AClusterRouterWithConsistentHashingGroupMustSendToSameDestinationsFromDifferentNodes()
{
ConsistentHashMapping hashMapping = msg =>
{
if (msg is string) return msg;
return null;
};

var paths = new List<string>() {"/user/dest"};
var router =
Sys.ActorOf(
new ClusterRouterGroup(new ConsistentHashingGroup(paths).WithHashMapping(hashMapping),
new ClusterRouterGroupSettings(10, true, null, ImmutableHashSet.Create<string>(paths.ToArray())))
.Props(), "router");

// it may take some time until router receives cluster member events
AwaitAssert(() =>
{
var members = CurrentRoutees(router).Members;
members.Count().ShouldBe(3);
});
var keys = new[] {"A", "B", "C", "D", "E", "F", "G"};
foreach (var key in Enumerable.Range(1, 10).SelectMany(i => keys))
{
router.Tell(key, TestActor);
}
EnterBarrier("messages-sent");
router.Tell(new Broadcast(new Get()));
var a = ExpectMsg<ClusterConsistentHashingGroupSpecConfig.Collected>().Messages;
var b = ExpectMsg<ClusterConsistentHashingGroupSpecConfig.Collected>().Messages;
var c = ExpectMsg<ClusterConsistentHashingGroupSpecConfig.Collected>().Messages;

a.Intersect(b).Count().ShouldBe(0);
a.Intersect(c).Count().ShouldBe(0);
b.Intersect(c).Count().ShouldBe(0);

(a.Count + b.Count + c.Count).ShouldBe(keys.Length);
EnterBarrier("after-2");
}
}
}
Loading

4 comments on commit ca9a37a

@Petabridge-CI
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TeamCity Akka.NET :: Akka.NET PR Build Build 93 is now running

@Petabridge-CI
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TeamCity Akka.NET :: Akka.NET PR Build Build 93 outcome was FAILURE
Summary: System.Exception: xUnit failed for the following assemblies: D:\BuildAgent\work\49b164d63843fb4\src\core\Akka.Persistence.Tests\bin\Release\Akka.Persistence.Tests.dll, D:\BuildAgent\work\49b164d63843fb4\src\core\Akka.Tests\bin\Release\Akka.Tests.dll ... Build time: 00:09:47

Failed tests

Akka.Persistence.Tests.dll: Akka.Persistence.Tests.PersistentActorSpec.PersistentActor_should_support_Context_Become_during_recovery: <no details avaliable>

Akka.Tests.dll: Akka.Tests.Actor.InboxSpec.Inbox_have_a_default_and_custom_timeouts: <no details avaliable>

Akka.Tests.dll: Akka.Tests.Routing.ConsistentHashingRouterSpec.ConsistentHashingRouterMustAdjustNodeRingWhenRouteeDies: <no details avaliable>

Akka.Tests.dll: Akka.Tests.Routing.RoutingSpec.Router_in_general_must_evict_terminated_routees: <no details avaliable>

@Petabridge-CI
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TeamCity Akka.NET :: Akka.NET PR Build Build 94 is now running

@Petabridge-CI
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TeamCity Akka.NET :: Akka.NET PR Build Build 94 outcome was FAILURE
Summary: System.Exception: xUnit failed for the following assemblies: D:\BuildAgent\work\49b164d63843fb4\src\core\Akka.Persistence.Tests\bin\Release\Akka.Persistence.Tests.dll, D:\BuildAgent\work\49b164d63843fb4\src\core\Akka.Tests\bin\Release\Akka.Tests.dll ... Build time: 00:09:11

Failed tests

Akka.Persistence.Tests.dll: Akka.Persistence.Tests.GuaranteedDeliverySpec.GuaranteedDelivery_must_redeliver_lost_messages_after_restart: <no details avaliable>

Akka.Tests.dll: Akka.Tests.Routing.TailChoppingSpec.Tail_chopping_router_must_throw_exception_if_no_result_will_arrive_within_the_given_time: <no details avaliable>

Akka.Tests.dll: Akka.Tests.Actor.InboxSpec.Inbox_have_a_default_and_custom_timeouts: <no details avaliable>

Please sign in to comment.