From e6b7702ba841bf864094cb3fbf5e6fa40db1dab2 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Sun, 29 Nov 2015 14:34:14 -0800 Subject: [PATCH] Revert "Actor task scheduler refactoring" --- .../Akka.Tests/Dispatch/AsyncAwaitSpec.cs | 157 ++---------------- .../Akka/Actor/ActorCell.DefaultMessages.cs | 15 +- src/core/Akka/Actor/ActorCell.cs | 15 -- src/core/Akka/Actor/PipeToSupport.cs | 5 +- src/core/Akka/Dispatch/ActorTaskScheduler.cs | 150 +++++++++-------- .../Akka/Dispatch/SysMsg/ISystemMessage.cs | 35 ++-- 6 files changed, 108 insertions(+), 269 deletions(-) diff --git a/src/core/Akka.Tests/Dispatch/AsyncAwaitSpec.cs b/src/core/Akka.Tests/Dispatch/AsyncAwaitSpec.cs index 7706e9e03cb..53d49d538ed 100644 --- a/src/core/Akka.Tests/Dispatch/AsyncAwaitSpec.cs +++ b/src/core/Akka.Tests/Dispatch/AsyncAwaitSpec.cs @@ -6,7 +6,6 @@ //----------------------------------------------------------------------- using System; -using System.Threading; using System.Threading.Tasks; using Akka.Actor; using Akka.TestKit; @@ -26,7 +25,7 @@ public ReceiveTimeoutAsyncActor() Receive(async s => { _replyTo = Sender; - + await Task.Delay(TimeSpan.FromMilliseconds(100)); SetReceiveTimeout(TimeSpan.FromMilliseconds(100)); }); @@ -36,7 +35,7 @@ class AsyncActor : ReceiveActor { public AsyncActor() { - Receive(async s => + Receive( async s => { await Task.Yield(); await Task.Delay(TimeSpan.FromMilliseconds(100)); @@ -211,9 +210,9 @@ public AsyncTplActor() Receive(m => { //this is also safe, all tasks complete in the actor context - RunTask(async () => + RunTask(() => { - await Task.Delay(TimeSpan.FromSeconds(1)) + Task.Delay(TimeSpan.FromSeconds(1)) .ContinueWith(t => { Sender.Tell("done"); }); }); }); @@ -229,11 +228,11 @@ public AsyncTplExceptionActor(IActorRef callback) _callback = callback; Receive(m => { - RunTask(async () => + RunTask(() => { - await Task.Delay(TimeSpan.FromSeconds(1)) + Task.Delay(TimeSpan.FromSeconds(1)) .ContinueWith(t => { throw new Exception("foo"); }); - }); + }); }); } @@ -244,16 +243,6 @@ protected override void PostRestart(Exception reason) } } - public class RestartMessage - { - public object Message { get; private set; } - - public RestartMessage(object message) - { - Message = message; - } - } - public class ActorAsyncAwaitSpec : AkkaSpec { [Fact] @@ -291,8 +280,8 @@ public async Task Actors_should_be_able_to_async_await_ask_message_loop() [Fact] public async Task Actors_should_be_able_to_block_ask_message_loop() { - var actor = Sys.ActorOf(Props.Create().WithDispatcher("akka.actor.task-dispatcher"), "Worker"); - var asker = Sys.ActorOf(Props.Create(() => new BlockingAsker(actor)).WithDispatcher("akka.actor.task-dispatcher"), "Asker"); + var actor = Sys.ActorOf(Props.Create().WithDispatcher("akka.actor.task-dispatcher"),"Worker"); + var asker =Sys.ActorOf(Props.Create(() => new BlockingAsker(actor)).WithDispatcher("akka.actor.task-dispatcher"),"Asker"); var task = asker.Ask("start", TimeSpan.FromSeconds(5)); actor.Tell(123, ActorRefs.NoSender); var res = await task; @@ -302,7 +291,7 @@ public async Task Actors_should_be_able_to_block_ask_message_loop() [Fact(Skip = "Maybe not possible to solve")] public async Task Actors_should_be_able_to_block_ask_self_message_loop() { - var asker = Sys.ActorOf(Props.Create(() => new BlockingAskSelf()), "Asker"); + var asker = Sys.ActorOf(Props.Create(() => new BlockingAskSelf()),"Asker"); var task = asker.Ask("start", TimeSpan.FromSeconds(5)); var res = await task; Assert.Equal("done", res); @@ -355,6 +344,7 @@ public async Task Actor_should_be_able_to_resume_suspend() res.ShouldBe("done"); } + [Fact] public void Actor_should_be_able_to_ReceiveTimeout_after_async_operation() { @@ -363,131 +353,6 @@ public void Actor_should_be_able_to_ReceiveTimeout_after_async_operation() actor.Tell("hello"); ExpectMsg(m => m == "GotIt"); } - - public class AsyncExceptionCatcherActor : ReceiveActor - { - private string _lastMessage; - - public AsyncExceptionCatcherActor() - { - Receive(async m => - { - _lastMessage = m; - try - { - // Throw an exception in the ActorTaskScheduler - await Task.Factory.StartNew(() => - { - throw new Exception("should not restart"); - }); - } - catch (Exception) - { - } - }); - - Receive(_ => Sender.Tell(_lastMessage, Self)); - } - } - - [Fact] - public async Task Actor_should_not_restart_if_exception_is_catched() - { - var actor = Sys.ActorOf(); - - actor.Tell("hello"); - - var lastMessage = await actor.Ask(123); - - lastMessage.ShouldBe("hello"); - } - - public class AsyncFailingActor : ReceiveActor - { - public AsyncFailingActor() - { - Receive(async m => - { - ThrowException(); - }); - } - - protected override void PreRestart(Exception reason, object message) - { - Sender.Tell(new RestartMessage(message), Self); - - base.PreRestart(reason, message); - } - - private static void ThrowException() - { - throw new Exception("foo"); - } - } - - [Fact] - public void Actor_PreRestart_should_give_the_failing_message() - { - var actor = Sys.ActorOf(); - - actor.Tell("hello"); - - ExpectMsg(m => "hello".Equals(m.Message)); - } - - public class AsyncPipeToDelayActor : ReceiveActor - { - public AsyncPipeToDelayActor() - { - Receive(async msg => - { - Task.Run(() => - { - Thread.Sleep(10); - return msg; - }).PipeTo(Sender, Self); //LogicalContext is lost?!? - - Thread.Sleep(3000); - }); - } - } - - public class AsyncReentrantActor : ReceiveActor - { - public AsyncReentrantActor() - { - Receive(async msg => - { - var sender = Sender; - Task.Run(() => - { - //Sleep to make sure the task is not completed when ContinueWith is called - Thread.Sleep(100); - return msg; - }).ContinueWith(_ => sender.Tell(msg)); // ContinueWith will schedule with the implicit ActorTaskScheduler - - Thread.Sleep(3000); - }); - } - } - - [Fact] - public void ActorTaskScheduler_reentrancy_should_not_be_possible() - { - var actor = Sys.ActorOf(); - actor.Tell("hello"); - - ExpectNoMsg(1000); - } - - [Fact] - public void Actor_PipeTo_should_not_be_delayed_by_async_receive() - { - var actor = Sys.ActorOf(); - - actor.Tell("hello"); - ExpectMsg(m => "hello".Equals(m), TimeSpan.FromMilliseconds(1000)); - } } } diff --git a/src/core/Akka/Actor/ActorCell.DefaultMessages.cs b/src/core/Akka/Actor/ActorCell.DefaultMessages.cs index 50c1a4f12dd..dc56e895449 100644 --- a/src/core/Akka/Actor/ActorCell.DefaultMessages.cs +++ b/src/core/Akka/Actor/ActorCell.DefaultMessages.cs @@ -165,7 +165,7 @@ public void SystemInvoke(Envelope envelope) { var m = envelope.Message; - if (m is ActorTaskSchedulerMessage) HandleActorTaskSchedulerMessage(m as ActorTaskSchedulerMessage); + if (m is CompleteTask) HandleCompleteTask(m as CompleteTask); else if (m is Failed) HandleFailed(m as Failed); else if (m is DeathWatchNotification) { @@ -203,17 +203,12 @@ public void SystemInvoke(Envelope envelope) } } - private void HandleActorTaskSchedulerMessage(ActorTaskSchedulerMessage m) + private void HandleCompleteTask(CompleteTask task) { - if (m.Exception != null) - { - HandleInvokeFailure(m.Exception); - return; - } - - m.ExecuteTask(); + CurrentMessage = task.State.Message; + Sender = task.State.Sender; + task.SetResult(); } - public void SwapMailbox(DeadLetterMailbox mailbox) { Mailbox.DebugPrint("{0} Swapping mailbox to DeadLetterMailbox", Self); diff --git a/src/core/Akka/Actor/ActorCell.cs b/src/core/Akka/Actor/ActorCell.cs index 17f4fff635c..40dfdfdc9f3 100644 --- a/src/core/Akka/Actor/ActorCell.cs +++ b/src/core/Akka/Actor/ActorCell.cs @@ -30,7 +30,6 @@ public partial class ActorCell : IUntypedActorContext, ICell private bool _actorHasBeenCleared; private Mailbox _mailbox; private readonly ActorSystemImpl _systemImpl; - private ActorTaskScheduler m_taskScheduler; public ActorCell(ActorSystemImpl system, IInternalActorRef self, Props props, MessageDispatcher dispatcher, IInternalActorRef parent) @@ -67,20 +66,6 @@ internal static ActorCell Current internal bool ActorHasBeenCleared { get { return _actorHasBeenCleared; } } internal static Props TerminatedProps { get { return terminatedProps; } } - public ActorTaskScheduler TaskScheduler - { - get - { - var taskScheduler = Volatile.Read(ref m_taskScheduler); - - if (taskScheduler != null) - return taskScheduler; - - taskScheduler = new ActorTaskScheduler(this); - return Interlocked.CompareExchange(ref m_taskScheduler, taskScheduler, null) ?? taskScheduler; - } - } - public void Init(bool sendSupervise, Func createMailbox /*, MailboxType mailboxType*/) //TODO: switch from Func createMailbox to MailboxType mailboxType { var mailbox = createMailbox(); //Akka: dispatcher.createMailbox(this, mailboxType) diff --git a/src/core/Akka/Actor/PipeToSupport.cs b/src/core/Akka/Actor/PipeToSupport.cs index 19ac16ed247..ea1eb5cd79f 100644 --- a/src/core/Akka/Actor/PipeToSupport.cs +++ b/src/core/Akka/Actor/PipeToSupport.cs @@ -6,7 +6,6 @@ //----------------------------------------------------------------------- using System; -using System.Threading; using System.Threading.Tasks; namespace Akka.Actor @@ -34,7 +33,7 @@ public static Task PipeTo(this Task taskToPipe, ICanTell recipient, IActor recipient.Tell(success != null ? success(tresult.Result) : tresult.Result, sender); - }, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default); + }, TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.AttachedToParent); } /// @@ -48,7 +47,7 @@ public static Task PipeTo(this Task taskToPipe, ICanTell recipient, IActorRef se { if (tresult.IsCanceled || tresult.IsFaulted) recipient.Tell(new Status.Failure(tresult.Exception), sender); - }, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default); + }, TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.AttachedToParent); } } } diff --git a/src/core/Akka/Dispatch/ActorTaskScheduler.cs b/src/core/Akka/Dispatch/ActorTaskScheduler.cs index 7bc078d38b6..550d664c496 100644 --- a/src/core/Akka/Dispatch/ActorTaskScheduler.cs +++ b/src/core/Akka/Dispatch/ActorTaskScheduler.cs @@ -7,6 +7,7 @@ using System; using System.Collections.Generic; +using System.Runtime.Remoting.Messaging; using System.Threading; using System.Threading.Tasks; using Akka.Actor; @@ -14,13 +15,29 @@ namespace Akka.Dispatch { + public class AmbientState + { + public IActorRef Self { get; set; } + public IActorRef Sender { get; set; } + public object Message { get; set; } + } + public class ActorTaskScheduler : TaskScheduler { - private readonly ActorCell _actorCell; + public static readonly TaskScheduler Instance = new ActorTaskScheduler(); + public static readonly TaskFactory TaskFactory = new TaskFactory(Instance); + public static readonly string StateKey = "akka.state"; + private const string Faulted = "faulted"; + private static readonly object Outer = new object(); - internal ActorTaskScheduler(ActorCell actorCell) + public static void SetCurrentState(IActorRef self, IActorRef sender, object message) { - _actorCell = actorCell; + CallContext.LogicalSetData(StateKey, new AmbientState + { + Sender = sender, + Self = self, + Message = message + }); } protected override IEnumerable GetScheduledTasks() @@ -30,44 +47,46 @@ protected override IEnumerable GetScheduledTasks() protected override void QueueTask(Task task) { - if ((task.CreationOptions & TaskCreationOptions.LongRunning) == TaskCreationOptions.LongRunning) + var s = CallContext.LogicalGetData(StateKey) as AmbientState; + if (task.AsyncState == Outer || s == null) { - Thread t = new Thread(state => - { - ScheduleTask((Task)state); - }); - t.IsBackground = true; - t.Name = "ActorTaskScheduler LongRunning"; - t.Start(task); - + TryExecuteTask(task); return; } - // Schedule the task execution, run inline if we are already in the actor context. - if (ActorCell.Current == _actorCell) + //we get here if the task needs to be marshalled back to the mailbox + //e.g. if previous task was an IO completion + s = CallContext.LogicalGetData(StateKey) as AmbientState; + + s.Self.Tell(new CompleteTask(s, () => { + SetCurrentState(s.Self,s.Sender,s.Message); TryExecuteTask(task); - } - else - { - ScheduleTask(task); - } - } + if (task.IsFaulted) + Rethrow(task, null); - private void ScheduleTask(Task task) - { - _actorCell.Self.Tell(new ActorTaskSchedulerMessage(this, task), ActorRefs.NoSender); - } - - internal void ExecuteTask(Task task) - { - TryExecuteTask(task); + }), ActorRefs.NoSender); } protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) { - // Prevent inline execution, it will execute inline anyway in QueueTask if we - // are already in the actor context. + if (taskWasPreviouslyQueued) + return false; + + var s = CallContext.LogicalGetData(StateKey) as AmbientState; + var cell = ActorCell.Current; + + //Is the current cell and the current state the same? + if (cell != null && + s != null && + Equals(cell.Self, s.Self) && + Equals(cell.Sender, s.Sender) && + cell.CurrentMessage == s.Message) + { + var res = TryExecuteTask(task); + return res; + } + return false; } @@ -80,62 +99,47 @@ public static void RunTask(Action action) }); } - public static void RunTask(Func asyncAction) + public static void RunTask(Func action) { var context = ActorCell.Current; - - if (context == null) - throw new InvalidOperationException("RunTask must be call from an actor context."); - var mailbox = context.Mailbox; //suspend the mailbox mailbox.Suspend(MailboxSuspendStatus.AwaitingTask); - ActorTaskScheduler actorScheduler = context.TaskScheduler; - - Task.Factory.StartNew(() => asyncAction(), CancellationToken.None, TaskCreationOptions.None, actorScheduler) - .Unwrap() - .ContinueWith(parent => - { - Exception exception = GetTaskException(parent); - - if (exception == null) - { - mailbox.Resume(MailboxSuspendStatus.AwaitingTask); - context.CheckReceiveTimeout(); - } - else - { - context.Self.Tell(new ActorTaskSchedulerMessage(exception), ActorRefs.NoSender); - } - - }, actorScheduler); - } + SetCurrentState(context.Self, context.Sender, null); - private static Exception GetTaskException(Task task) - { - switch (task.Status) - { - case TaskStatus.Canceled: - return new TaskCanceledException(); + //wrap our action inside a task, so that everything executing + //directly or indirectly from the action is executed on our task scheduler - case TaskStatus.Faulted: - return TryUnwrapAggregateException(task.Exception); - } + Task.Factory.StartNew(async _ => + { - return null; + //start executing our action and potential promise style + //tasks + await action() + //we need to use ContinueWith so that any exception is + //thrown inside the actor context. + //this is needed for IO completion tasks that execute out of context + .ContinueWith( + Rethrow, + Faulted, + TaskContinuationOptions.None); + + //if mailbox was suspended, make sure we re-enable message processing again + mailbox.Resume(MailboxSuspendStatus.AwaitingTask); + context.CheckReceiveTimeout(); + }, + Outer, + CancellationToken.None, + TaskCreationOptions.None, + Instance); } - private static Exception TryUnwrapAggregateException(AggregateException aggregateException) + private static void Rethrow(Task x, object s) { - if (aggregateException == null) - return null; - - if (aggregateException.InnerExceptions.Count == 1) - return aggregateException.InnerExceptions[0]; - - return aggregateException; + //this just rethrows the exception the task contains + x.Wait(); } } } diff --git a/src/core/Akka/Dispatch/SysMsg/ISystemMessage.cs b/src/core/Akka/Dispatch/SysMsg/ISystemMessage.cs index 0d6c66b80a5..b0b112b8395 100644 --- a/src/core/Akka/Dispatch/SysMsg/ISystemMessage.cs +++ b/src/core/Akka/Dispatch/SysMsg/ISystemMessage.cs @@ -243,39 +243,30 @@ public ActorTask(Task task) public Task Task { get; private set; } } - internal sealed class ActorTaskSchedulerMessage : ISystemMessage + public sealed class CompleteTask : ISystemMessage { - private readonly ActorTaskScheduler _scheduler; - private readonly Task _task; - /// - /// Initializes a new instance of the class. + /// Initializes a new instance of the class. /// - public ActorTaskSchedulerMessage(ActorTaskScheduler scheduler, Task task) + /// + /// The action. + public CompleteTask(AmbientState state, Action action) { - _scheduler = scheduler; - _task = task; + State = state; + SetResult = action; } + public AmbientState State { get; private set; } + /// - /// Initializes a new instance of the class. + /// Gets the set result. /// - /// The exception. - public ActorTaskSchedulerMessage(Exception exception) - { - Exception = exception; - } - - public Exception Exception { get; private set; } - - public void ExecuteTask() - { - _scheduler.ExecuteTask(_task); - } + /// The set result. + public Action SetResult { get; private set; } public override string ToString() { - return ""; + return "CompleteTask - AmbientState: " + State; } }