Skip to content

Commit

Permalink
Added runner to ActorContext to get number of queued messages
Browse files Browse the repository at this point in the history
  • Loading branch information
andresgutierrez committed Oct 18, 2023
1 parent cf5440e commit b14ab8b
Show file tree
Hide file tree
Showing 10 changed files with 115 additions and 17 deletions.
33 changes: 33 additions & 0 deletions Nixie.Tests/Actors/MessageCountActor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@

namespace Nixie.Tests.Actors;

public class MessageCountRequest
{

}

public class MessageCountResponse
{
public int? Counter { get; }

public MessageCountResponse(int? counter)
{
Counter = counter;
}
}

public sealed class MessageCountActor : IActor<MessageCountRequest, MessageCountResponse>
{
private readonly IActorContext<MessageCountActor, MessageCountRequest, MessageCountResponse> context;

public MessageCountActor(IActorContext<MessageCountActor, MessageCountRequest, MessageCountResponse> context)
{
this.context = context;
}

public Task<MessageCountResponse?> Receive(MessageCountRequest message)
{
return Task.FromResult<MessageCountResponse?>(new(context.Runner?.MessageCount));
}
}

22 changes: 22 additions & 0 deletions Nixie.Tests/TestMessageCount.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@

using Nixie.Tests.Actors;

namespace Nixie.Tests;

[Collection("Nixie")]
public sealed class TestMessageCount
{
[Fact]
public async Task TestMessageCountEmpty()
{
using ActorSystem asx = new();

IActorRef<MessageCountActor, MessageCountRequest, MessageCountResponse> actor = asx.Spawn<MessageCountActor, MessageCountRequest, MessageCountResponse>();

MessageCountResponse? message = await actor.Ask(new MessageCountRequest());
Assert.NotNull(message);

Assert.True(message.Counter.HasValue);
Assert.Equal(0, message.Counter.Value);
}
}
5 changes: 5 additions & 0 deletions Nixie/ActorContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ public sealed class ActorContext<TActor, TRequest> : IActorContext<TActor, TRequ
/// </summary>
public IGenericActorRef? Sender { get; set; }

/// <summary>
/// Returns the actor runner
/// </summary>
public ActorRunner<TActor, TRequest>? Runner { get; set; }

/// <summary>
/// Creates a new actor context.
/// </summary>
Expand Down
7 changes: 6 additions & 1 deletion Nixie/ActorContextReply.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ namespace Nixie;
/// Represents an actor context. This class is passed to the actor when it is created.
/// It can be used to create other actors or get the sender and the actor system.
/// </summary>
public sealed class ActorContext<TActor, TRequest, TResponse> : IActorContext<TActor, TRequest, TResponse>
public sealed class ActorContext<TActor, TRequest, TResponse> : IActorContext<TActor, TRequest, TResponse>
where TActor : IActor<TRequest, TResponse> where TRequest : class where TResponse : class?
{
private readonly ActorSystem actorSystem;
Expand Down Expand Up @@ -47,6 +47,11 @@ public sealed class ActorContext<TActor, TRequest, TResponse> : IActorContext<TA
/// </summary>
public bool ByPassReply { get; set; }

/// <summary>
/// Returns the actor runner
/// </summary>
public ActorRunner<TActor, TRequest, TResponse>? Runner { get; set; }

/// <summary>
/// Creates a new actor context.
/// </summary>
Expand Down
2 changes: 1 addition & 1 deletion Nixie/ActorRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public bool HasPendingMessages(out string? actorName)
{
ActorRunner<TActor, TRequest> runner = lazyValue.Value.runner;

if (!runner.IsShutdown && !lazyValue.Value.runner.Inbox.IsEmpty)
if (!runner.IsShutdown && !lazyValue.Value.runner.IsEmpty)
{
actorName = runner.Name;
return true;
Expand Down
2 changes: 1 addition & 1 deletion Nixie/ActorRepositoryReply.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public bool HasPendingMessages(out string? actorName)
{
ActorRunner<TActor, TRequest, TResponse> runner = lazyValue.Value.runner;

if (!runner.IsShutdown && !lazyValue.Value.runner.Inbox.IsEmpty)
if (!runner.IsShutdown && !lazyValue.Value.runner.IsEmpty)
{
actorName = runner.Name;
return true;
Expand Down
26 changes: 20 additions & 6 deletions Nixie/ActorRunner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,26 @@ public sealed class ActorRunner<TActor, TRequest> where TActor : IActor<TRequest

private readonly ILogger? logger;

private readonly ConcurrentQueue<ActorMessage<TRequest>> inbox = new();

private int processing = 1;

private int shutdown = 1;
private int shutdown = 1;

/// <summary>
/// Returns the name of the actor
/// </summary>
public string Name { get; }

/// <summary>
/// The actor's inbox
/// Returns true if the actor's inbox is empty
/// </summary>
public ConcurrentQueue<ActorMessage<TRequest>> Inbox { get; } = new();
public bool IsEmpty => inbox.IsEmpty;

/// <summary>
/// Returns the number of messages in the inbox
/// </summary>
public int MessageCount => inbox.Count;

/// <summary>
/// Reference to the actual actor
Expand Down Expand Up @@ -73,7 +80,7 @@ public void SendAndTryDeliver(TRequest message, IGenericActorRef? sender)
if (shutdown == 0)
return;

Inbox.Enqueue(new ActorMessage<TRequest>(message, sender));
inbox.Enqueue(new ActorMessage<TRequest>(message, sender));

if (1 == Interlocked.Exchange(ref processing, 0))
_ = DeliverMessages();
Expand All @@ -88,16 +95,23 @@ public bool Shutdown()
return 1 == Interlocked.Exchange(ref shutdown, 0);
}

/// <summary>
/// Enqueues a message to the actor and tries to deliver it.
/// The request/response type actors use an object to assign the response once completed.
/// </summary>
/// <returns></returns>
private async Task DeliverMessages()
{
try
{
if (Actor is null || shutdown == 0)
if (Actor is null || ActorContext is null || shutdown == 0)
return;

ActorContext.Runner = this;

do
{
while (Inbox.TryDequeue(out ActorMessage<TRequest> message))
while (inbox.TryDequeue(out ActorMessage<TRequest> message))
{
if (shutdown == 0)
break;
Expand Down
25 changes: 17 additions & 8 deletions Nixie/ActorRunnerReply.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@


using System.Collections.Concurrent;
using Microsoft.Extensions.Logging;

Expand All @@ -16,6 +16,8 @@ public sealed class ActorRunner<TActor, TRequest, TResponse> where TActor : IAct

private readonly ILogger? logger;

private readonly ConcurrentQueue<ActorMessageReply<TRequest, TResponse>> inbox = new();

private int processing = 1;

private int shutdown = 1;
Expand All @@ -26,9 +28,14 @@ public sealed class ActorRunner<TActor, TRequest, TResponse> where TActor : IAct
public string Name { get; }

/// <summary>
/// The inbox of the actor.
/// Returns true if the actor's inbox is empty
/// </summary>
public bool IsEmpty => inbox.IsEmpty;

/// <summary>
/// Returns the number of messages in the inbox
/// </summary>
public ConcurrentQueue<ActorMessageReply<TRequest, TResponse>> Inbox { get; } = new();
public int MessageCount => inbox.Count;

/// <summary>
/// The reference to the actor.
Expand Down Expand Up @@ -66,7 +73,7 @@ public ActorRunner(ActorSystem actorSystem, ILogger? logger, string name)

/// <summary>
/// Enqueues a message to the actor and tries to deliver it.
/// The request/response type actors use an object to assign the response once completed. The "Ask" operations spin until they receive the response.
/// The request/response type actors use an object to assign the response once completed.
/// </summary>
/// <param name="message"></param>
/// <param name="sender"></param>
Expand All @@ -81,15 +88,15 @@ public ActorRunner(ActorSystem actorSystem, ILogger? logger, string name)
if (parentReply is not null)
messageReply = parentReply;
else
messageReply = new(message, sender, promise);
messageReply = new(message, sender, promise);

if (shutdown == 0)
{
promise.SetCanceled();
return promise;
}

Inbox.Enqueue(messageReply);
inbox.Enqueue(messageReply);

if (1 == Interlocked.Exchange(ref processing, 0))
_ = DeliverMessages();
Expand Down Expand Up @@ -118,13 +125,15 @@ private async Task DeliverMessages()
if (Actor is null || ActorContext is null || shutdown == 0)
return;

ActorContext.Runner = this;

do
{
while (Inbox.TryDequeue(out ActorMessageReply<TRequest, TResponse>? message))
while (inbox.TryDequeue(out ActorMessageReply<TRequest, TResponse>? message))
{
if (shutdown == 0 || ActorContext is null)
break;

if (message.Sender is not null)
ActorContext.Sender = message.Sender;
else
Expand Down
5 changes: 5 additions & 0 deletions Nixie/IActorContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,9 @@ public interface IActorContext<TActor, TRequest> where TActor : IActor<TRequest>
/// Returns a reference to the sender of the message
/// </summary>
public IGenericActorRef? Sender { get; set; }

/// <summary>
/// Returns the actor runner
/// </summary>
public ActorRunner<TActor, TRequest>? Runner { get; set; }
}
5 changes: 5 additions & 0 deletions Nixie/IActorContextReply.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,9 @@ public interface IActorContext<TActor, TRequest, TResponse>
/// to allow other consumer to set the response
/// </summary>
public bool ByPassReply { get; set; }

/// <summary>
/// Returns the actor runner
/// </summary>
public ActorRunner<TActor, TRequest, TResponse>? Runner { get; set; }
}

0 comments on commit b14ab8b

Please sign in to comment.