Skip to content

Commit

Permalink
Merge pull request #1482 from JeffCyr/ActorTaskScheduler-refactoring
Browse files Browse the repository at this point in the history
Actor task scheduler refactoring
  • Loading branch information
rogeralsing committed Nov 29, 2015
2 parents 3113853 + 7ace1db commit 26daeac
Show file tree
Hide file tree
Showing 6 changed files with 269 additions and 108 deletions.
157 changes: 146 additions & 11 deletions src/core/Akka.Tests/Dispatch/AsyncAwaitSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
//-----------------------------------------------------------------------

using System;
using System.Threading;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.TestKit;
Expand All @@ -25,7 +26,7 @@ public ReceiveTimeoutAsyncActor()
Receive<string>(async s =>
{
_replyTo = Sender;

await Task.Delay(TimeSpan.FromMilliseconds(100));
SetReceiveTimeout(TimeSpan.FromMilliseconds(100));
});
Expand All @@ -35,7 +36,7 @@ class AsyncActor : ReceiveActor
{
public AsyncActor()
{
Receive<string>( async s =>
Receive<string>(async s =>
{
await Task.Yield();
await Task.Delay(TimeSpan.FromMilliseconds(100));
Expand Down Expand Up @@ -210,9 +211,9 @@ public AsyncTplActor()
Receive<string>(m =>
{
//this is also safe, all tasks complete in the actor context
RunTask(() =>
RunTask(async () =>
{
Task.Delay(TimeSpan.FromSeconds(1))
await Task.Delay(TimeSpan.FromSeconds(1))
.ContinueWith(t => { Sender.Tell("done"); });
});
});
Expand All @@ -228,11 +229,11 @@ public AsyncTplExceptionActor(IActorRef callback)
_callback = callback;
Receive<string>(m =>
{
RunTask(() =>
RunTask(async () =>
{
Task.Delay(TimeSpan.FromSeconds(1))
await Task.Delay(TimeSpan.FromSeconds(1))
.ContinueWith(t => { throw new Exception("foo"); });
});
});
});
}

Expand All @@ -243,6 +244,16 @@ protected override void PostRestart(Exception reason)
}
}

public class RestartMessage
{
public object Message { get; private set; }

public RestartMessage(object message)
{
Message = message;
}
}

public class ActorAsyncAwaitSpec : AkkaSpec
{
[Fact]
Expand Down Expand Up @@ -280,8 +291,8 @@ public async Task Actors_should_be_able_to_async_await_ask_message_loop()
[Fact]
public async Task Actors_should_be_able_to_block_ask_message_loop()
{
var actor = Sys.ActorOf(Props.Create<AsyncAwaitActor>().WithDispatcher("akka.actor.task-dispatcher"),"Worker");
var asker =Sys.ActorOf(Props.Create(() => new BlockingAsker(actor)).WithDispatcher("akka.actor.task-dispatcher"),"Asker");
var actor = Sys.ActorOf(Props.Create<AsyncAwaitActor>().WithDispatcher("akka.actor.task-dispatcher"), "Worker");
var asker = Sys.ActorOf(Props.Create(() => new BlockingAsker(actor)).WithDispatcher("akka.actor.task-dispatcher"), "Asker");
var task = asker.Ask("start", TimeSpan.FromSeconds(5));
actor.Tell(123, ActorRefs.NoSender);
var res = await task;
Expand All @@ -291,7 +302,7 @@ public async Task Actors_should_be_able_to_block_ask_message_loop()
[Fact(Skip = "Maybe not possible to solve")]
public async Task Actors_should_be_able_to_block_ask_self_message_loop()
{
var asker = Sys.ActorOf(Props.Create(() => new BlockingAskSelf()),"Asker");
var asker = Sys.ActorOf(Props.Create(() => new BlockingAskSelf()), "Asker");
var task = asker.Ask("start", TimeSpan.FromSeconds(5));
var res = await task;
Assert.Equal("done", res);
Expand Down Expand Up @@ -344,7 +355,6 @@ public async Task Actor_should_be_able_to_resume_suspend()
res.ShouldBe("done");
}


[Fact]
public void Actor_should_be_able_to_ReceiveTimeout_after_async_operation()
{
Expand All @@ -353,6 +363,131 @@ public void Actor_should_be_able_to_ReceiveTimeout_after_async_operation()
actor.Tell("hello");
ExpectMsg<string>(m => m == "GotIt");
}

public class AsyncExceptionCatcherActor : ReceiveActor
{
private string _lastMessage;

public AsyncExceptionCatcherActor()
{
Receive<string>(async m =>
{
_lastMessage = m;
try
{
// Throw an exception in the ActorTaskScheduler
await Task.Factory.StartNew(() =>
{
throw new Exception("should not restart");
});
}
catch (Exception)
{
}
});

Receive<int>(_ => Sender.Tell(_lastMessage, Self));
}
}

[Fact]
public async Task Actor_should_not_restart_if_exception_is_catched()
{
var actor = Sys.ActorOf<AsyncExceptionCatcherActor>();

actor.Tell("hello");

var lastMessage = await actor.Ask(123);

lastMessage.ShouldBe("hello");
}

public class AsyncFailingActor : ReceiveActor
{
public AsyncFailingActor()
{
Receive<string>(async m =>
{
ThrowException();
});
}

protected override void PreRestart(Exception reason, object message)
{
Sender.Tell(new RestartMessage(message), Self);

base.PreRestart(reason, message);
}

private static void ThrowException()
{
throw new Exception("foo");
}
}

[Fact]
public void Actor_PreRestart_should_give_the_failing_message()
{
var actor = Sys.ActorOf<AsyncFailingActor>();

actor.Tell("hello");

ExpectMsg<RestartMessage>(m => "hello".Equals(m.Message));
}

public class AsyncPipeToDelayActor : ReceiveActor
{
public AsyncPipeToDelayActor()
{
Receive<string>(async msg =>
{
Task.Run(() =>
{
Thread.Sleep(10);
return msg;
}).PipeTo(Sender, Self); //LogicalContext is lost?!?

Thread.Sleep(3000);
});
}
}

public class AsyncReentrantActor : ReceiveActor
{
public AsyncReentrantActor()
{
Receive<string>(async msg =>
{
var sender = Sender;
Task.Run(() =>
{
//Sleep to make sure the task is not completed when ContinueWith is called
Thread.Sleep(100);
return msg;
}).ContinueWith(_ => sender.Tell(msg)); // ContinueWith will schedule with the implicit ActorTaskScheduler

Thread.Sleep(3000);
});
}
}

[Fact]
public void ActorTaskScheduler_reentrancy_should_not_be_possible()
{
var actor = Sys.ActorOf<AsyncReentrantActor>();
actor.Tell("hello");

ExpectNoMsg(1000);
}

[Fact]
public void Actor_PipeTo_should_not_be_delayed_by_async_receive()
{
var actor = Sys.ActorOf<AsyncPipeToDelayActor>();

actor.Tell("hello");
ExpectMsg<string>(m => "hello".Equals(m), TimeSpan.FromMilliseconds(1000));
}
}
}

15 changes: 10 additions & 5 deletions src/core/Akka/Actor/ActorCell.DefaultMessages.cs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ public void SystemInvoke(Envelope envelope)
{
var m = envelope.Message;

if (m is CompleteTask) HandleCompleteTask(m as CompleteTask);
if (m is ActorTaskSchedulerMessage) HandleActorTaskSchedulerMessage(m as ActorTaskSchedulerMessage);
else if (m is Failed) HandleFailed(m as Failed);
else if (m is DeathWatchNotification)
{
Expand Down Expand Up @@ -203,12 +203,17 @@ public void SystemInvoke(Envelope envelope)
}
}

private void HandleCompleteTask(CompleteTask task)
private void HandleActorTaskSchedulerMessage(ActorTaskSchedulerMessage m)
{
CurrentMessage = task.State.Message;
Sender = task.State.Sender;
task.SetResult();
if (m.Exception != null)
{
HandleInvokeFailure(m.Exception);
return;
}

m.ExecuteTask();
}

public void SwapMailbox(DeadLetterMailbox mailbox)
{
Mailbox.DebugPrint("{0} Swapping mailbox to DeadLetterMailbox", Self);
Expand Down
15 changes: 15 additions & 0 deletions src/core/Akka/Actor/ActorCell.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public partial class ActorCell : IUntypedActorContext, ICell
private bool _actorHasBeenCleared;
private Mailbox _mailbox;
private readonly ActorSystemImpl _systemImpl;
private ActorTaskScheduler m_taskScheduler;


public ActorCell(ActorSystemImpl system, IInternalActorRef self, Props props, MessageDispatcher dispatcher, IInternalActorRef parent)
Expand Down Expand Up @@ -66,6 +67,20 @@ internal static ActorCell Current
internal bool ActorHasBeenCleared { get { return _actorHasBeenCleared; } }
internal static Props TerminatedProps { get { return terminatedProps; } }

public ActorTaskScheduler TaskScheduler
{
get
{
var taskScheduler = Volatile.Read(ref m_taskScheduler);

if (taskScheduler != null)
return taskScheduler;

taskScheduler = new ActorTaskScheduler(this);
return Interlocked.CompareExchange(ref m_taskScheduler, taskScheduler, null) ?? taskScheduler;
}
}

public void Init(bool sendSupervise, Func<Mailbox> createMailbox /*, MailboxType mailboxType*/) //TODO: switch from Func<Mailbox> createMailbox to MailboxType mailboxType
{
var mailbox = createMailbox(); //Akka: dispatcher.createMailbox(this, mailboxType)
Expand Down
5 changes: 3 additions & 2 deletions src/core/Akka/Actor/PipeToSupport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
//-----------------------------------------------------------------------

using System;
using System.Threading;
using System.Threading.Tasks;

namespace Akka.Actor
Expand Down Expand Up @@ -33,7 +34,7 @@ public static Task PipeTo<T>(this Task<T> taskToPipe, ICanTell recipient, IActor
recipient.Tell(success != null
? success(tresult.Result)
: tresult.Result, sender);
}, TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.AttachedToParent);
}, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
}

/// <summary>
Expand All @@ -47,7 +48,7 @@ public static Task PipeTo(this Task taskToPipe, ICanTell recipient, IActorRef se
{
if (tresult.IsCanceled || tresult.IsFaulted)
recipient.Tell(new Status.Failure(tresult.Exception), sender);
}, TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.AttachedToParent);
}, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
}
}
}
Expand Down
Loading

0 comments on commit 26daeac

Please sign in to comment.