Skip to content

Commit

Permalink
Working on surrogates
Browse files Browse the repository at this point in the history
  • Loading branch information
rogeralsing committed Mar 1, 2015
1 parent 06f33c1 commit 4dfdc40
Show file tree
Hide file tree
Showing 8 changed files with 32 additions and 100 deletions.
2 changes: 1 addition & 1 deletion src/core/Akka.Cluster/Routing/ClusterRoutingConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public sealed class ClusterRouterGroup : Group, IClusterRouterConfigBase
private readonly ClusterRouterSettingsBase _settings;

public ClusterRouterGroup(Group local, ClusterRouterGroupSettings settings)
: base(settings.AllowLocalRoutees ? settings.RouteesPaths.ToArray() : null,local.RouterDispatcher)
: base(settings.AllowLocalRoutees ? settings.RouteesPaths.ToArray() : Enumerable.Empty<string>(),local.RouterDispatcher)
{
_settings = settings;
_local = local;
Expand Down
36 changes: 1 addition & 35 deletions src/core/Akka.Remote/Routing/RemoteRouterConfig.cs
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.Remoting.Contexts;
using System.Text;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Configuration;
using Akka.Routing;
using Akka.Util;
using Akka.Util.Internal;
Expand Down Expand Up @@ -37,7 +33,7 @@ public override ISurrogate ToSurrogate(ActorSystem system)
/// </summary>
private readonly AtomicCounter _childNameCounter = new AtomicCounter();

public RemoteRouterConfig(Pool local, IEnumerable<Address> nodes)
public RemoteRouterConfig(Pool local, IEnumerable<Address> nodes) : base(local.NrOfInstances,local.Resizer,local.SupervisorStrategy,local.RouterDispatcher,local.UsePoolDispatcher)
{

Local = local;
Expand All @@ -46,36 +42,6 @@ public RemoteRouterConfig(Pool local, IEnumerable<Address> nodes)
_nodeAddrEnumerator = Nodes.GetContinuousEnumerator();
}

#region Property overrides

public override SupervisorStrategy SupervisorStrategy
{
get { return Local.SupervisorStrategy; }
set
{
Local.SupervisorStrategy = value;
}
}

public override Resizer Resizer
{
get { return Local.Resizer; }
set { Local.Resizer = value; }
}

public override int NrOfInstances
{
get { return Local.NrOfInstances; }
set { Local.NrOfInstances = value; }
}

public override string RouterDispatcher
{
get { return Local.RouterDispatcher; }
}

#endregion

#region Trivial method overrides

internal override RouterActor CreateRouterActor()
Expand Down
15 changes: 2 additions & 13 deletions src/core/Akka/Routing/Broadcast.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
using System;
using System.Collections.Generic;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Configuration;
using Akka.Util;
Expand Down Expand Up @@ -50,10 +47,6 @@ public override ISurrogate ToSurrogate(ActorSystem system)
};
}

protected BroadcastPool()
{
}

/// <summary>
/// Initializes a new instance of the <see cref="BroadcastPool"/> class.
/// </summary>
Expand All @@ -80,7 +73,7 @@ public BroadcastPool(int nrOfInstances, Resizer resizer,SupervisorStrategy super
/// Simple form of BroadcastPool constructor
/// </summary>
/// <param name="nrOfInstances">The nr of instances.</param>
public BroadcastPool(int nrOfInstances) : base(nrOfInstances, null, Pool.DefaultStrategy, null) { }
public BroadcastPool(int nrOfInstances) : base(nrOfInstances, null, DefaultStrategy, null) { }

/// <summary>
/// Creates the router.
Expand Down Expand Up @@ -113,10 +106,6 @@ public override ISurrogate ToSurrogate(ActorSystem system)
};
}

protected BroadcastGroup()
{

}
/// <summary>
/// Initializes a new instance of the <see cref="BroadcastGroup" /> class.
/// </summary>
Expand Down
8 changes: 0 additions & 8 deletions src/core/Akka/Routing/ConsistentHash.cs
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,6 @@ public override ISurrogate ToSurrogate(ActorSystem system)
};
}

protected ConsistentHashingGroup()
{
}

public ConsistentHashingGroup(Config config)
: base(config.GetStringList("routees.paths"))
{
Expand Down Expand Up @@ -167,10 +163,6 @@ public override ISurrogate ToSurrogate(ActorSystem system)
};
}

protected ConsistentHashingPool()
{
}

/// <summary>
/// Initializes a new instance of the <see cref="ConsistentHashingPool"/> class.
/// </summary>
Expand Down
2 changes: 1 addition & 1 deletion src/core/Akka/Routing/RoundRobin.cs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public class RoundRobinPoolSurrogate : ISurrogate
{
public ISurrogated FromSurrogate(ActorSystem system)
{
return new RandomPool(NrOfInstances, Resizer, SupervisorStrategy, RouterDispatcher, UsePoolDispatcher);
return new RoundRobinPool(NrOfInstances, Resizer, SupervisorStrategy, RouterDispatcher, UsePoolDispatcher);
}

public int NrOfInstances { get; set; }
Expand Down
19 changes: 6 additions & 13 deletions src/core/Akka/Routing/ScatterGatherFirstCompleted.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Configuration;
Expand Down Expand Up @@ -36,7 +34,7 @@ public ScatterGatherFirstCompletedRoutees(Routee[] routees, TimeSpan within)
_within = within;
}

public override void Send(object message, Actor.ActorRef sender)
public override void Send(object message, ActorRef sender)
{
var tasks = new List<Task>();
foreach(var routee in _routees)
Expand Down Expand Up @@ -71,10 +69,6 @@ public override ISurrogate ToSurrogate(ActorSystem system)
};
}

protected ScatterGatherFirstCompletedGroup()
{

}
/// <summary>
/// Initializes a new instance of the <see cref="ScatterGatherFirstCompletedGroup" /> class.
/// </summary>
Expand All @@ -88,6 +82,7 @@ public ScatterGatherFirstCompletedGroup(Config config)
/// <summary>
/// Initializes a new instance of the <see cref="ScatterGatherFirstCompletedGroup" /> class.
/// </summary>
/// <param name="within">Expect a response within the given timespan</param>
/// <param name="paths">The paths.</param>
public ScatterGatherFirstCompletedGroup(TimeSpan within,params string[] paths)
: base(paths)
Expand All @@ -99,6 +94,7 @@ public ScatterGatherFirstCompletedGroup(TimeSpan within,params string[] paths)
/// Initializes a new instance of the <see cref="ScatterGatherFirstCompletedGroup" /> class.
/// </summary>
/// <param name="paths">The paths.</param>
/// <param name="within">Expect a response within the given timespan</param>
public ScatterGatherFirstCompletedGroup(IEnumerable<string> paths,TimeSpan within) : base(paths)
{
Within = within;
Expand All @@ -108,6 +104,7 @@ public ScatterGatherFirstCompletedGroup(IEnumerable<string> paths,TimeSpan withi
/// Initializes a new instance of the <see cref="ScatterGatherFirstCompletedGroup" /> class.
/// </summary>
/// <param name="routees">The routees.</param>
/// <param name="within">Expect a response within the given timespan</param>
public ScatterGatherFirstCompletedGroup(IEnumerable<ActorRef> routees,TimeSpan within) : base(routees)
{
Within = within;
Expand Down Expand Up @@ -165,6 +162,7 @@ public override ISurrogate ToSurrogate(ActorSystem system)
/// <param name="resizer">The resizer.</param>
/// <param name="supervisorStrategy">The supervisor strategy.</param>
/// <param name="routerDispatcher">The router dispatcher.</param>
/// <param name="within">Expect a response within the given timespan</param>
/// <param name="usePoolDispatcher">if set to <c>true</c> [use pool dispatcher].</param>
public ScatterGatherFirstCompletedPool(int nrOfInstances, Resizer resizer, SupervisorStrategy supervisorStrategy,
string routerDispatcher,TimeSpan within, bool usePoolDispatcher = false)
Expand All @@ -178,16 +176,11 @@ public ScatterGatherFirstCompletedPool(Config config) : base(config)
_within = config.GetTimeSpan("within");
}

protected ScatterGatherFirstCompletedPool()
{

}

/// <summary>
/// Simple form of RoundRobin constructor
/// </summary>
/// <param name="nrOfInstances">The nr of instances.</param>
public ScatterGatherFirstCompletedPool(int nrOfInstances) : base(nrOfInstances, null, Pool.DefaultStrategy, null) { }
public ScatterGatherFirstCompletedPool(int nrOfInstances) : base(nrOfInstances, null, DefaultStrategy, null) { }

/// <summary>
/// Creates the router.
Expand Down
4 changes: 0 additions & 4 deletions src/core/Akka/Routing/SmallestMailbox.cs
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,6 @@ public SmallestMailboxPool(Config config) : base(config)

}

protected SmallestMailboxPool()
{
}

public SmallestMailboxPool(int nrOfInstances) : base(nrOfInstances, null, Pool.DefaultStrategy, null) { }

public override Router CreateRouter(ActorSystem system)
Expand Down
46 changes: 21 additions & 25 deletions src/core/Akka/Routing/TailChoppingRoutingLogic.cs
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Akka.Util;
using Akka.Actor;
using System.Threading;
using Akka.Configuration;
using Akka.Util;
using Akka.Util.Internal;

namespace Akka.Routing
Expand Down Expand Up @@ -38,6 +36,7 @@ public sealed class TailChoppingRoutingLogic : RoutingLogic
/// </summary>
/// <param name="within">The time within which at least one response is expected.</param>
/// <param name="interval">The duration after which the next routee will be picked.</param>
/// <param name="scheduler">The scheduler to use</param>
public TailChoppingRoutingLogic(TimeSpan within, TimeSpan interval, Scheduler scheduler)
{
_within = within;
Expand Down Expand Up @@ -130,7 +129,7 @@ public override void Send(object message, ActorRef sender)
}, token);

var request = completion.Task;
completion.Task.ContinueWith((task) =>
completion.Task.ContinueWith(task =>
{
tokenSource.Cancel(false);
});
Expand Down Expand Up @@ -208,12 +207,15 @@ public TailChoppingPool(int nrOfInstances, Resizer resizer, SupervisorStrategy s
/// </summary>
/// <param name="config">The configuration to use with this instance.</param>
public TailChoppingPool(Config config)
: this(config.GetInt("nr-of-instances"),
DefaultResizer.FromConfig(config),
null,
null, //TODO: what are our defaults? null?
config.GetTimeSpan("within"),
config.GetTimeSpan("tail-chopping-router.interval"),
config.HasPath("pool-dispatcher")
)
{
NrOfInstances = config.GetInt("nr-of-instances");
_within = config.GetTimeSpan("within");
_interval = config.GetTimeSpan("tail-chopping-router.interval");
Resizer = DefaultResizer.FromConfig(config);
UsePoolDispatcher = config.HasPath("pool-dispatcher");
}

/// <summary>
Expand All @@ -222,11 +224,9 @@ public TailChoppingPool(Config config)
/// <param name="nrOfInstances">The initial number of routees in the pool.</param>
/// <param name="within">The amount of time to wait for a response.</param>
/// <param name="interval">The interval to wait before sending to the next routee.</param>
public TailChoppingPool(int nrOfInstances, TimeSpan within, TimeSpan interval)
public TailChoppingPool(int nrOfInstances, TimeSpan within, TimeSpan interval) : this(nrOfInstances,null,null,null,within,interval)
{
NrOfInstances = nrOfInstances;
_within = within;
_interval = interval;
//TODO: what are our defaults? null?
}

/// <summary>
Expand All @@ -236,8 +236,7 @@ public TailChoppingPool(int nrOfInstances, TimeSpan within, TimeSpan interval)
/// <returns>The tail chopping pool.</returns>
public TailChoppingPool WithSupervisorStrategy(SupervisorStrategy strategy)
{
SupervisorStrategy = strategy;
return this;
return new TailChoppingPool(NrOfInstances, Resizer, strategy, RouterDispatcher, _within, _interval, UsePoolDispatcher);
}

/// <summary>
Expand All @@ -247,19 +246,17 @@ public TailChoppingPool WithSupervisorStrategy(SupervisorStrategy strategy)
/// <returns>The tail chopping pool.</returns>
public TailChoppingPool WithResizer(Resizer resizer)
{
Resizer = resizer;
return this;
return new TailChoppingPool(NrOfInstances, resizer, SupervisorStrategy, RouterDispatcher, _within, _interval, UsePoolDispatcher);
}

/// <summary>
/// Sets the router dispatcher to use for the pool.
/// </summary>
/// <param name="dispatcherId">The router dispatcher to use.</param>
/// <param name="routerDispatcher">The router dispatcher to use.</param>
/// <returns>The tail chopping pool.</returns>
public TailChoppingPool WithDispatcher(string dispatcherId)
public TailChoppingPool WithDispatcher(string routerDispatcher)
{
RouterDispatcher = dispatcherId;
return this;
return new TailChoppingPool(NrOfInstances, Resizer, SupervisorStrategy, routerDispatcher, _within, _interval, UsePoolDispatcher);
}

/// <summary>
Expand Down Expand Up @@ -316,8 +313,8 @@ public override ISurrogate ToSurrogate(ActorSystem system)
/// </summary>
/// <param name="config">The configuration to use with this instance.</param>
public TailChoppingGroup(Config config)
: base(config.GetStringList("routees.paths").ToArray())
{
Paths = config.GetStringList("routees.paths").ToArray();
_within = config.GetTimeSpan("within");
_interval = config.GetTimeSpan("tail-chopping-router.interval");
}
Expand All @@ -328,9 +325,8 @@ public TailChoppingGroup(Config config)
/// <param name="routeePaths">The configured routee paths to use with this instance.</param>
/// <param name="within">The amount of time to wait for a response.</param>
/// <param name="interval">The interval to wait before sending to the next routee.</param>
public TailChoppingGroup(string[] routeePaths, TimeSpan within, TimeSpan interval)
public TailChoppingGroup(string[] routeePaths, TimeSpan within, TimeSpan interval) : base(routeePaths)
{
Paths = routeePaths;
_within = within;
_interval = interval;
}
Expand Down

4 comments on commit 4dfdc40

@Petabridge-CI
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TeamCity Akka.NET :: Akka.NET PR Build Build 79 is now running

@Petabridge-CI
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TeamCity Akka.NET :: Akka.NET PR Build Build 79 outcome was FAILURE
Summary: System.Exception: xUnit failed for the following assemblies: D:\BuildAgent\work\49b164d63843fb4\src\core\Akka.Persistence.Tests\bin\Release\Akka.Persistence.Tests.dll at Microsoft.FSharp.Core.Operators.FailWith[T](String message) at Fake.XUnitHel... Build time: 00:11:13

Failed tests

Akka.Persistence.Tests.dll: Akka.Persistence.Tests.PersistentActorSpec.PersistentActor_should_support_Context_Become_during_recovery: <no details avaliable>

Akka.Persistence.Tests.dll: Akka.Persistence.Tests.PersistentActorSpec.PersistentActor_should_be_able_to_opt_out_from_stashing_messages_until_all_events_has_been_processed: <no details avaliable>

Akka.Persistence.Tests.dll: Akka.Persistence.Tests.GuaranteedDeliveryCrashSpec.GuaranteedDelivery_should_not_send_when_actor_crashes: <no details avaliable>

@Petabridge-CI
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TeamCity Akka.NET :: Akka.NET PR Build Build 81 is now running

@Petabridge-CI
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TeamCity Akka.NET :: Akka.NET PR Build Build 81 outcome was FAILURE
Summary: System.Exception: xUnit failed for the following assemblies: D:\BuildAgent\work\49b164d63843fb4\src\core\Akka.Persistence.Tests\bin\Release\Akka.Persistence.Tests.dll, D:\BuildAgent\work\49b164d63843fb4\src\core\Akka.Tests\bin\Release\Akka.Tests.dll ... Build time: 00:10:39

Failed tests

Akka.Persistence.Tests.dll: Akka.Persistence.Tests.GuaranteedDeliverySpec.GuaranteedDelivery_must_redeliver_lost_messages: <no details avaliable>

Akka.Persistence.Tests.dll: Akka.Persistence.Tests.PersistentViewSpec.PersistentView_should_run_updates_on_user_request_and_wait_for_update: <no details avaliable>

Akka.Tests.dll: Akka.Tests.Routing.RoutingSpec.Router_in_general_must_evict_terminated_routees: <no details avaliable>

Akka.Persistence.Tests.dll: Akka.Persistence.Tests.GuaranteedDeliveryCrashSpec.GuaranteedDelivery_should_not_send_when_actor_crashes: <no details avaliable>

Please sign in to comment.