Skip to content

Commit

Permalink
Fix/inbox actor reference (#4073)
Browse files Browse the repository at this point in the history
* Inbox.Actor refactoring

* Inbox Actor changes and StringLike changes

* Formatted

* Formatting
  • Loading branch information
Jonathan Nagy authored and Aaronontheweb committed Dec 20, 2019
1 parent fed9051 commit 8f97c3c
Showing 1 changed file with 69 additions and 67 deletions.
136 changes: 69 additions & 67 deletions src/core/Akka/Actor/Inbox.Actor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public InboxActor(int size)
/// TBD
/// </summary>
/// <param name="query">TBD</param>
public void EnqueueQuery(IQuery query)
private void EnqueueQuery(IQuery query)
{
var q = query.WithClient(Sender);
_clients.Enqueue(q);
Expand All @@ -56,7 +56,7 @@ public void EnqueueQuery(IQuery query)
/// TBD
/// </summary>
/// <param name="message">TBD</param>
public void EnqueueMessage(object message)
private void EnqueueMessage(object message)
{
if (_messages.Count < _size)
{
Expand All @@ -77,9 +77,9 @@ public void EnqueueMessage(object message)
/// </summary>
/// <param name="query">TBD</param>
/// <returns>TBD</returns>
public bool ClientPredicate(IQuery query)
private bool ClientPredicate(IQuery query)
{
if(query is Select select)
if (query is Select select)
return select.Predicate(_currentMessage);

return query is Get;
Expand All @@ -90,7 +90,7 @@ public bool ClientPredicate(IQuery query)
/// </summary>
/// <param name="message">TBD</param>
/// <returns>TBD</returns>
public bool MessagePredicate(object message)
private bool MessagePredicate(object message)
{
if (_currentSelect.HasValue)
return _currentSelect.Value.Predicate(message);
Expand All @@ -105,82 +105,84 @@ public bool MessagePredicate(object message)
/// <returns>TBD</returns>
protected override bool Receive(object message)
{
switch (message)
if (message is Get get)
{
case Get get:
if (_messages.Count == 0)
{
EnqueueQuery(get);
}
else
{
Sender.Tell(_messages.Dequeue());
}
break;
case Select select:
if (_messages.Count == 0)
if (_messages.Count == 0)
{
EnqueueQuery(get);
}
else
{
Sender.Tell(_messages.Dequeue());
}
}
else if (message is Select select)
{
if (_messages.Count == 0)
{
EnqueueQuery(select);
}
else
{
_currentSelect = select;
var firstMatch = _messages.DequeueFirstOrDefault(MessagePredicate);
if (firstMatch == null)
{
EnqueueQuery(select);
}
else
{
_currentSelect = select;
var firstMatch = _messages.DequeueFirstOrDefault(MessagePredicate);
if (firstMatch == null)
{
EnqueueQuery(select);
}
else
{
Sender.Tell(firstMatch);
}
_currentSelect = null;
Sender.Tell(firstMatch);
}
_currentSelect = null;
}
}
else if (message is StartWatch startwatch)
{
if (startwatch.Message == null)
Context.Watch(startwatch.Target);
else
Context.WatchWith(startwatch.Target, startwatch.Message);
}
else if (message is StopWatch stopwatch)
{
Context.Unwatch(stopwatch.Target);
}
else if (message is Kick)
{
var now = Context.System.Scheduler.MonotonicClock;
var overdue = _clientsByTimeout.TakeWhile(q => q.Deadline < now);

break;
case StartWatch startWatch:
if (startWatch.Message == null)
Context.Watch(startWatch.Target);
else
Context.WatchWith(startWatch.Target, startWatch.Message);
break;
case StopWatch stopWatch:
Context.Unwatch(stopWatch.Target);
break;
case Kick _:
var now = Context.System.Scheduler.MonotonicClock;
var overdue = _clientsByTimeout.TakeWhile(q => q.Deadline < now);

foreach (var query in overdue)
{
query.Client.Tell(new Status.Failure(new TimeoutException("Deadline passed")));
}
_clients.RemoveAll(q => q.Deadline < now);
foreach (var query in overdue)
{
query.Client.Tell(new Status.Failure(new TimeoutException("Deadline passed")));
}
_clients.RemoveAll(q => q.Deadline < now);

var afterDeadline = _clientsByTimeout.Where(q => q.Deadline >= now);
_clientsByTimeout.IntersectWith(afterDeadline);
break;
default:
if (_clients.Count == 0)
var afterDeadline = _clientsByTimeout.Where(q => q.Deadline >= now);
_clientsByTimeout.IntersectWith(afterDeadline);
}
else
{
if (_clients.Count == 0)
{
EnqueueMessage(message);
}
else
{
_currentMessage = message;
var firstMatch = _matched[0] = _clients.DequeueFirstOrDefault(ClientPredicate); //TODO: this should work as DequeueFirstOrDefault
if (firstMatch != null)
{
EnqueueMessage(message);
_clientsByTimeout.ExceptWith(_matched);
firstMatch.Client.Tell(message);
}
else
{
_currentMessage = message;
var firstMatch = _matched[0] = _clients.DequeueFirstOrDefault(ClientPredicate); //TODO: this should work as DequeueFirstOrDefault
if (firstMatch != null)
{
_clientsByTimeout.ExceptWith(_matched);
firstMatch.Client.Tell(message);
}
else
{
EnqueueMessage(message);
}
_currentMessage = null;
EnqueueMessage(message);
}
break;
_currentMessage = null;
}
}

if (_clients.Count == 0)
Expand Down

0 comments on commit 8f97c3c

Please sign in to comment.