Skip to content

Commit

Permalink
Merge pull request #860 from Aaronontheweb/pinned-dispatcher-fix
Browse files Browse the repository at this point in the history
PinnedDispatcher fixes + dispatcher teardown capabilities across all dispatchers
  • Loading branch information
Aaronontheweb committed Apr 16, 2015
2 parents 5506e45 + 6007c68 commit dc23ba9
Show file tree
Hide file tree
Showing 9 changed files with 174 additions and 101 deletions.
9 changes: 9 additions & 0 deletions src/core/Akka.Tests/Dispatch/DispatchersSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,15 @@ public void Dispatchers_must_return_separate_instances_of_dispatchers_with_diffe
d1.ShouldNotBeSame(d3);
}


[Fact]
public void PinnedDispatchers_must_return_new_instance_each_time()
{
var d1 = Lookup("myapp.my-pinned-dispatcher");
var d2 = Lookup("myapp.my-pinned-dispatcher");
d1.ShouldNotBeSame(d2);
}

#endregion

#region Support methods and classes
Expand Down
1 change: 1 addition & 0 deletions src/core/Akka/Actor/ActorCell.FaultHandling.cs
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,7 @@ private void FinishTerminate()
SwapMailbox(deadLetterMailbox);
mailbox.BecomeClosed();
mailbox.CleanUp();
Dispatcher.Detach(this);
}
}
finally
Expand Down
1 change: 1 addition & 0 deletions src/core/Akka/Actor/ActorCell.cs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ internal static ActorCell Current
public void Init(bool sendSupervise, Func<Mailbox> createMailbox /*, MailboxType mailboxType*/) //TODO: switch from Func<Mailbox> createMailbox to MailboxType mailboxType
{
var mailbox = createMailbox(); //Akka: dispatcher.createMailbox(this, mailboxType)
Dispatcher.Attach(this);
mailbox.Setup(Dispatcher);
mailbox.SetActor(this);
_mailbox = mailbox;
Expand Down
1 change: 1 addition & 0 deletions src/core/Akka/Akka.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@
<Compile Include="Actor\UntypedReceive.cs" />
<Compile Include="Dispatch\Mailboxes.cs" />
<Compile Include="Dispatch\MessageQueues\IMessageQueue.cs" />
<Compile Include="Dispatch\SingleThreadDispatcher.cs" />
<Compile Include="Dispatch\SysMsg\ISystemMessage.cs" />
<Compile Include="Dispatch\TaskDispatcher.cs" />
<Compile Include="Dispatch\ThreadPoolBuilder.cs" />
Expand Down
69 changes: 29 additions & 40 deletions src/core/Akka/Dispatch/AbstractDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -147,27 +147,6 @@ public override MessageDispatcher Dispatcher()
}
}

/// <summary>
/// Used to create instances of the <see cref="SingleThreadDispatcher"/>.
/// <remarks>
/// Always returns the same instance.
/// </remarks>
/// </summary>
class PinnedDispatcherConfigurator : MessageDispatcherConfigurator
{
public PinnedDispatcherConfigurator(Config config, IDispatcherPrerequisites prerequisites) : base(config, prerequisites)
{
_dispatcher = new SingleThreadDispatcher(this);
}

private readonly SingleThreadDispatcher _dispatcher;

public override MessageDispatcher Dispatcher()
{
return _dispatcher;
}
}

/// <summary>
/// Used to create instances of the <see cref="CurrentSynchronizationContextDispatcher"/>.
///
Expand All @@ -188,25 +167,8 @@ public override MessageDispatcher Dispatcher()
}

/// <summary>
/// Lookup list for different types of out-of-the-box <see cref="Dispatcher"/>s.
/// </summary>
public enum DispatcherType
{
Dispatcher,
TaskDispatcher,
PinnedDispatcher,
SynchronizedDispatcher,
}
public static class DispatcherTypeMembers
{
public static string GetName(this DispatcherType self)
{
//TODO: switch case return string?
return self.ToString();
}
}
/// <summary>
/// Class MessageDispatcher.
/// Class responsible for pushing messages from an actor's mailbox into its
/// receive methods. Comes in many different flavors.
/// </summary>
public abstract class MessageDispatcher
{
Expand Down Expand Up @@ -267,6 +229,33 @@ public virtual void SystemDispatch(ActorCell cell, Envelope envelope)
{
cell.SystemInvoke(envelope);
}

/// <summary>
/// Attaches the dispatcher to the <see cref="ActorCell"/>
///
/// <remarks>
/// Practically, doesn't do very much right now - dispatchers aren't responsible for creating
/// mailboxes in Akka.NET
/// </remarks>
/// </summary>
/// <param name="cell">The ActorCell belonging to the actor who's attaching to this dispatcher.</param>
public virtual void Attach(ActorCell cell)
{

}

/// <summary>
/// Detaches the dispatcher to the <see cref="ActorCell"/>
///
/// <remarks>
/// Only really used in dispatchers with 1:1 relationship with dispatcher.
/// </remarks>
/// </summary>
/// <param name="cell">The ActorCell belonging to the actor who's deatching from this dispatcher.</param>
public virtual void Detach(ActorCell cell)
{

}
}
}

42 changes: 0 additions & 42 deletions src/core/Akka/Dispatch/Dispatchers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -99,48 +99,6 @@ public override void Schedule(Action run)
}
}

/// <summary>
/// Class SingleThreadDispatcher.
/// </summary>
public class SingleThreadDispatcher : MessageDispatcher
{
/// <summary>
/// The queue
/// </summary>
private readonly BlockingCollection<Action> queue = new BlockingCollection<Action>();

/// <summary>
/// The running
/// </summary>
private volatile bool running = true;

/// <summary>
/// Initializes a new instance of the <see cref="SingleThreadDispatcher" /> class.
/// </summary>
public SingleThreadDispatcher(MessageDispatcherConfigurator configurator)
: base(configurator)
{
var thread = new Thread(_ =>
{
foreach (var next in queue.GetConsumingEnumerable())
{
next();
if (!running) return;
}
});
thread.Start(); //thread won't start automatically without this
}

/// <summary>
/// Schedules the specified run.
/// </summary>
/// <param name="run">The run.</param>
public override void Schedule(Action run)
{
queue.Add(run);
}
}

/// <summary>
/// The registry of all <see cref="MessageDispatcher"/> instances available to this <see cref="ActorSystem"/>.
/// </summary>
Expand Down
22 changes: 4 additions & 18 deletions src/core/Akka/Dispatch/ForkJoinDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ public class ForkJoinDispatcherConfigurator : MessageDispatcherConfigurator
public ForkJoinDispatcherConfigurator(Config config, IDispatcherPrerequisites prerequisites) : base(config, prerequisites)
{
var dtp = config.GetConfig("dedicated-thread-pool");
if(dtp.IsEmpty) throw new ConfigurationException(string.Format("must define section dedicated-thread-pool for ForkJoinDispatcher {0}", config.GetString("id", "unknown")));
if (dtp == null || dtp.IsEmpty) throw new ConfigurationException(string.Format("must define section dedicated-thread-pool for ForkJoinDispatcher {0}", config.GetString("id", "unknown")));

var settings = new DedicatedThreadPoolSettings(dtp.GetInt("thread-count"),
ConfigureThreadType(dtp.GetString("threadtype", ThreadType.Background.ToString())),
GetSafeDeadlockTimeout(dtp));
var settings = new DedicatedThreadPoolSettings(dtp.GetInt("thread-count"),
DedicatedThreadPoolConfigHelpers.ConfigureThreadType(dtp.GetString("threadtype", ThreadType.Background.ToString())),
DedicatedThreadPoolConfigHelpers.GetSafeDeadlockTimeout(dtp));
_instance = new ForkJoinDispatcher(this, settings);
}

Expand All @@ -37,20 +37,6 @@ public override MessageDispatcher Dispatcher()
{
return _instance;
}

private static TimeSpan? GetSafeDeadlockTimeout(Config cfg)
{
var timespan = cfg.GetTimeSpan("deadlock-timeout", TimeSpan.FromSeconds(-1));
if (timespan.TotalSeconds < 0)
return null;
return timespan;
}

private static ThreadType ConfigureThreadType(string threadType)
{
return string.Compare(threadType, ThreadType.Foreground.ToString(), StringComparison.InvariantCultureIgnoreCase) == 0 ?
ThreadType.Foreground : ThreadType.Background;
}
}

/// <summary>
Expand Down
105 changes: 105 additions & 0 deletions src/core/Akka/Dispatch/SingleThreadDispatcher.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
//-----------------------------------------------------------------------
// <copyright file="SingleThreadDispatcher.cs" company="Akka.NET Project">
// Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
// Copyright (C) 2013-2015 Akka.NET project <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

using System;
using Akka.Actor;
using Akka.Configuration;
using Helios.Concurrency;

namespace Akka.Dispatch
{
/// <summary>
/// Used to create instances of the <see cref="SingleThreadDispatcher"/>.
///
/// Each actor created using the pinned dispatcher gets its own unique thread.
/// <remarks>
/// Always returns a new instance.
/// </remarks>
/// </summary>
class PinnedDispatcherConfigurator : MessageDispatcherConfigurator
{
private readonly DedicatedThreadPoolSettings _settings;

public PinnedDispatcherConfigurator(Config config, IDispatcherPrerequisites prerequisites)
: base(config, prerequisites)
{
var dtp = config.GetConfig("dedicated-thread-pool");
if (dtp == null || dtp.IsEmpty)
{
_settings = DedicatedThreadPoolConfigHelpers.DefaultSingleThreadPoolSettings;
}
else
{
_settings = new DedicatedThreadPoolSettings(1,
DedicatedThreadPoolConfigHelpers.ConfigureThreadType(dtp.GetString("threadtype", ThreadType.Background.ToString())),
DedicatedThreadPoolConfigHelpers.GetSafeDeadlockTimeout(dtp));
}
}

public override MessageDispatcher Dispatcher()
{
return new SingleThreadDispatcher(this, _settings);
}
}


/// <summary>
/// Used to power the <see cref="PinnedDispatcherConfigurator"/>.
///
/// Guaranteed to provide one new thread instance per actor.
///
/// Uses <see cref="DedicatedThreadPool"/> with 1 thread in order
/// to take advantage of standard cleanup / teardown / queueing mechanics.
///
/// /// Relevant configuration options:
/// <code>
/// my-forkjoin-dispatcher{
/// type = PinnedDispatcher
/// throughput = 100
/// dedicated-thread-pool{ #settings for Helios.DedicatedThreadPool
/// #deadlock-timeout = 3s #optional timeout for deadlock detection
/// threadtype = background #values can be "background" or "foreground"
/// }
/// }
///
/// my-other-forkjoin-dispatcher{
/// type = PinnedDispatcher
/// # dedicated-thread-pool section is optional
/// }
/// </code>
/// <remarks>
/// Worth noting that unlike the <see cref="ForkJoinDispatcher"/>, the <see cref="SingleThreadDispatcher"/>
/// does not respect the <c>dedicated-thread-pool.thread-count</c> property in configuration. That value is
/// always equal to 1 in the <see cref="SingleThreadDispatcher"/>.
/// </remarks>
/// </summary>
public class SingleThreadDispatcher : MessageDispatcher
{
private readonly DedicatedThreadPool _dedicatedThreadPool;

internal SingleThreadDispatcher(MessageDispatcherConfigurator configurator, DedicatedThreadPoolSettings settings)
: base(configurator)
{
_dedicatedThreadPool = new DedicatedThreadPool(settings);
}

/// <summary>
/// Schedules the specified run.
/// </summary>
/// <param name="run">The run.</param>
public override void Schedule(Action run)
{
_dedicatedThreadPool.QueueUserWorkItem(run);
}

public override void Detach(ActorCell cell)
{
//shut down the dedicated thread pool
_dedicatedThreadPool.Dispose();
}
}
}
25 changes: 24 additions & 1 deletion src/core/Akka/Dispatch/ThreadPoolBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,34 @@

using System;
using Akka.Configuration;
using Helios.Concurrency;

namespace Akka.Dispatch
{
class ThreadPoolBuilder
/// <summary>
/// <see cref="Config"/> helper class for configuring <see cref="MessageDispatcherConfigurator"/>
/// instances who depend on the Helios <see cref="DedicatedThreadPool"/>.
/// </summary>
internal static class DedicatedThreadPoolConfigHelpers
{
internal static TimeSpan? GetSafeDeadlockTimeout(Config cfg)
{
var timespan = cfg.GetTimeSpan("deadlock-timeout", TimeSpan.FromSeconds(-1));
if (timespan.TotalSeconds < 0)
return null;
return timespan;
}

internal static ThreadType ConfigureThreadType(string threadType)
{
return string.Compare(threadType, ThreadType.Foreground.ToString(), StringComparison.InvariantCultureIgnoreCase) == 0 ?
ThreadType.Foreground : ThreadType.Background;
}

/// <summary>
/// Default settings for <see cref="SingleThreadDispatcher"/> instances.
/// </summary>
internal static readonly DedicatedThreadPoolSettings DefaultSingleThreadPoolSettings = new DedicatedThreadPoolSettings(1);
}

/// <summary>
Expand Down

0 comments on commit dc23ba9

Please sign in to comment.