Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
andresgutierrez committed Jul 31, 2024
2 parents a7c4400 + 400197c commit 2496869
Show file tree
Hide file tree
Showing 19 changed files with 757 additions and 14 deletions.
60 changes: 60 additions & 0 deletions Nixie.Tests/Actors/RouteeActorStruct.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@

using Nixie.Routers;

namespace Nixie.Tests.Actors;

public readonly struct RouterMessageStruct : IConsistentHashable
{
public RouterMessageType Type { get; }

public string Data { get; }

public RouterMessageStruct(RouterMessageType type, string data)
{
Type = type;
Data = data;
}

public int GetHash()
{
return Data switch
{
"aaa" => 0,
"bbb" => 1,
"ccc" => 2,
"ddd" => 3,
"eee" => 4,
_ => 0
};
}
}

public sealed class RouteeActorStruct : IActorStruct<RouterMessageStruct>
{
private int receivedMessages;

private readonly IActorContextStruct<RouteeActorStruct, RouterMessageStruct> context;

public RouteeActorStruct(IActorContextStruct<RouteeActorStruct, RouterMessageStruct> context)
{
this.context = context;
}

public int GetMessages()
{
return receivedMessages;
}

private void IncrMessage()
{
receivedMessages++;
}

public async Task Receive(RouterMessageStruct message)
{
await Task.Yield();

if (message.Type == RouterMessageType.Route)
IncrMessage();
}
}
29 changes: 29 additions & 0 deletions Nixie.Tests/TestRouters.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,35 @@ public async Task TestCreateRoundRobinRouter()
Assert.Equal(1, routeeActor.GetMessages());
}
}

[Fact]
public async Task TestCreateRoundRobinRouterStruct()
{
using ActorSystem asx = new();

IActorRefStruct<RoundRobinActorStruct<RouteeActorStruct, RouterMessageStruct>, RouterMessageStruct> router =
asx.SpawnStruct<RoundRobinActorStruct<RouteeActorStruct, RouterMessageStruct>, RouterMessageStruct>("my-router", 5);

router.Send(new RouterMessageStruct(RouterMessageType.Route, "aaa"));
router.Send(new RouterMessageStruct(RouterMessageType.Route, "bbb"));
router.Send(new RouterMessageStruct(RouterMessageType.Route, "ccc"));
router.Send(new RouterMessageStruct(RouterMessageType.Route, "ddd"));
router.Send(new RouterMessageStruct(RouterMessageType.Route, "eee"));

await asx.Wait();

Assert.IsAssignableFrom<RoundRobinActorStruct<RouteeActorStruct, RouterMessageStruct>>(router.Runner.Actor);

RoundRobinActorStruct<RouteeActorStruct, RouterMessageStruct> routerActor = (RoundRobinActorStruct<RouteeActorStruct, RouterMessageStruct>)router.Runner.Actor!;

foreach (IActorRefStruct<RouteeActorStruct, RouterMessageStruct> routee in routerActor.Instances)
{
Assert.IsAssignableFrom<RouteeActorStruct>(routee.Runner.Actor);

RouteeActorStruct routeeActor = (RouteeActorStruct)routee.Runner.Actor!;
Assert.Equal(1, routeeActor.GetMessages());
}
}

[Fact]
public async Task TestCreateRoundRobinRouterExt()
Expand Down
25 changes: 25 additions & 0 deletions Nixie.Tests/TestShutdown.cs
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,31 @@ public async Task TestSpawnFireAndForgetActorAndGracefulShutdownByName()
Assert.Null(actor2);
}

[Fact]
public async Task TestSpawnFireAndForgetActorAndScheduleShutdownByRef()
{
using ActorSystem asx = new();

IActorRef<ShutdownActor, string> actor = asx.Spawn<ShutdownActor, string>("my-actor");

Assert.IsAssignableFrom<ShutdownActor>(actor.Runner.Actor);

actor.Send("TestSpawnFireAndForgetActorAndShutdownByName");

await asx.Wait();

Assert.Equal(1, ((ShutdownActor)actor.Runner.Actor!).GetMessages());

asx.ScheduleShutdown(actor, TimeSpan.FromMilliseconds(1000));

await Task.Delay(2000);

await asx.Wait();

IActorRef<ShutdownActor, string>? actor2 = asx.Get<ShutdownActor, string>("my-actor");
Assert.Null(actor2);
}

[Fact]
public async Task TestSpawnFireAndForgetSlowActorAndGracefulShutdownByName()
{
Expand Down
4 changes: 2 additions & 2 deletions Nixie/ActorRepositoryReply.cs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public bool IsProcessing(out string? actorName)
{
ActorRunner<TActor, TRequest, TResponse> runner = lazyValue.Value.runner;

if (!runner.IsShutdown && runner.IsProcessing)
if (runner is { IsShutdown: false, IsProcessing: true })
{
actorName = runner.Name;
return true;
Expand Down Expand Up @@ -110,7 +110,7 @@ public IActorRef<TActor, TRequest, TResponse> Spawn(string? name = null, params

Lazy<(ActorRunner<TActor, TRequest, TResponse> runner, ActorRef<TActor, TRequest, TResponse> actorRef)> actor = actors.GetOrAdd(
name,
(string name) => new Lazy<(ActorRunner<TActor, TRequest, TResponse>, ActorRef<TActor, TRequest, TResponse>)>(() => CreateInternal(name, args))
(string _) => new(() => CreateInternal(name, args))
);

return actor.Value.actorRef;
Expand Down
35 changes: 34 additions & 1 deletion Nixie/ActorScheduler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,17 @@ public class ActorScheduler : IDisposable
{
private static int sequence;

private readonly ActorSystem actorSystem;

private readonly ILogger? logger;

private readonly ConcurrentDictionary<object, Lazy<ConcurrentDictionary<long, Lazy<Timer>>>> onceTimers = new();

private readonly ConcurrentDictionary<object, Lazy<ConcurrentDictionary<string, Lazy<Timer>>>> periodicTimers = new();

public ActorScheduler(ILogger? logger)
public ActorScheduler(ActorSystem actorSystem, ILogger? logger)
{
this.actorSystem = actorSystem;
this.logger = logger;
}

Expand Down Expand Up @@ -185,6 +188,36 @@ public Timer ScheduleOnceStruct<TActor, TRequest>(IActorRefStruct<TActor, TReque
Lazy<Timer> timer = timers.Value.GetOrAdd(seq, (long key) => new(() => ScheduleOnceTimerStruct(actorRef, request, delay, seq)));
return timer.Value;
}

/// <summary>
/// Schedule an actor to be terminated after the specified delay.
/// </summary>
/// <typeparam name="TActor"></typeparam>
/// <typeparam name="TRequest"></typeparam>
/// <param name="actorRef"></param>
/// <param name="request"></param>
/// <param name="delay"></param>
/// <returns></returns>
public Timer ScheduleShutdown<TActor, TRequest, TResponse>(IActorRef<TActor, TRequest, TResponse> actorRef, TimeSpan delay)
where TActor : IActor<TRequest, TResponse> where TRequest : class where TResponse : class?
{
return new((state) => actorSystem.Shutdown(actorRef), null, delay, TimeSpan.Zero);
}

/// <summary>
/// Schedule an actor to be terminated after the specified delay.
/// </summary>
/// <typeparam name="TActor"></typeparam>
/// <typeparam name="TRequest"></typeparam>
/// <param name="actorRef"></param>
/// <param name="request"></param>
/// <param name="delay"></param>
/// <returns></returns>
public Timer ScheduleShutdown<TActor, TRequest>(IActorRef<TActor, TRequest> actorRef, TimeSpan delay)
where TActor : IActor<TRequest> where TRequest : class
{
return new((state) => actorSystem.Shutdown(actorRef), null, delay, TimeSpan.Zero);
}

/// <summary>
/// Stops a periodic timer
Expand Down
36 changes: 34 additions & 2 deletions Nixie/ActorSystem.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public ActorSystem(IServiceProvider? serviceProvider = null, ILogger? logger = n
{
this.serviceProvider = serviceProvider;
this.logger = logger;
this.scheduler = new(logger);
this.scheduler = new(this, logger);

Nobody = Spawn<NobodyActor, object>();
}
Expand Down Expand Up @@ -182,7 +182,7 @@ public bool Shutdown<TActor, TRequest, TResponse>(string name) where TActor : IA
/// <param name="name"></param>
/// <returns></returns>
public bool Shutdown<TActor, TRequest, TResponse>(IActorRef<TActor, TRequest, TResponse> actorRef)
where TActor : IActor<TRequest, TResponse> where TRequest : class where TResponse : class
where TActor : IActor<TRequest, TResponse> where TRequest : class where TResponse : class?
{
ActorRepository<TActor, TRequest, TResponse> repository = GetRepository<TActor, TRequest, TResponse>();

Expand Down Expand Up @@ -614,6 +614,38 @@ public void ScheduleOnceStruct<TActor, TRequest>(IActorRefStruct<TActor, TReques
{
scheduler.ScheduleOnceStruct(actorRef, request, delay);
}

/// <summary>
/// Schedule an actor to be terminated after the specified delay.
/// </summary>
/// <typeparam name="TActor"></typeparam>
/// <typeparam name="TRequest"></typeparam>
/// <typeparam name="TResponse"></typeparam>
/// <param name="actorRef"></param>
/// <param name="request"></param>
/// <param name="delay"></param>
/// <returns></returns>
public void ScheduleShutdown<TActor, TRequest, TResponse>(IActorRef<TActor, TRequest, TResponse> actorRef, TimeSpan delay)
where TActor : IActor<TRequest, TResponse> where TRequest : class where TResponse : class?
{
scheduler.ScheduleShutdown(actorRef, delay);
}

/// <summary>
/// Schedule an actor to be terminated after the specified delay.
/// </summary>
/// <typeparam name="TActor"></typeparam>
/// <typeparam name="TRequest"></typeparam>
/// <typeparam name="TResponse"></typeparam>
/// <param name="actorRef"></param>
/// <param name="request"></param>
/// <param name="delay"></param>
/// <returns></returns>
public void ScheduleShutdown<TActor, TRequest>(IActorRef<TActor, TRequest> actorRef, TimeSpan delay)
where TActor : IActor<TRequest> where TRequest : class
{
scheduler.ScheduleShutdown(actorRef, delay);
}

/// <summary>
/// Stops a periodic timer
Expand Down
2 changes: 1 addition & 1 deletion Nixie/IActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ namespace Nixie;
/// This interface must be implemented by all actors that do not return a response.
/// </summary>
/// <typeparam name="TRequest"></typeparam>
public interface IActor<TRequest> where TRequest : class
public interface IActor<in TRequest> where TRequest : class
{
public Task Receive(TRequest message);
}
2 changes: 1 addition & 1 deletion Nixie/IActorStruct.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ namespace Nixie;
/// This actor type supports struct request messages.
/// </summary>
/// <typeparam name="TRequest"></typeparam>
public interface IActorStruct<TRequest> where TRequest : struct
public interface IActorStruct<in TRequest> where TRequest : struct
{
public Task Receive(TRequest message);
}
2 changes: 1 addition & 1 deletion Nixie/IActorStructReply.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ namespace Nixie;
/// </summary>
/// <typeparam name="TRequest"></typeparam>
/// <typeparam name="TResponse"></typeparam>
public interface IActorStruct<TRequest, TResponse> where TRequest : struct where TResponse : struct
public interface IActorStruct<in TRequest, TResponse> where TRequest : struct where TResponse : struct
{
/// <summary>
/// Passes a message to the actor and returns a response.
Expand Down
2 changes: 1 addition & 1 deletion Nixie/LazyTaskMethodBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ namespace Nixie;

public class LazyTaskMethodBuilder<T>
{
public LazyTaskMethodBuilder() => Task = new LazyTask<T>();
public LazyTaskMethodBuilder() => Task = new();

public static LazyTaskMethodBuilder<T> Create() => new();

Expand Down
2 changes: 1 addition & 1 deletion Nixie/Nixie.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<PackageId>Nixie</PackageId>
<Version>1.0.3</Version>
<Version>1.0.6</Version>
<Description>A Lightweight Actor Model Implementation for C#/.NET</Description>
<Authors>Andres Gutierrez</Authors>
<Company>Andres Gutierrez</Company>
Expand Down
2 changes: 1 addition & 1 deletion Nixie/Routers/ActorSystemExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ List<IActorRef<TActor, TRequest, TResponse>> instances
}

/// <summary>
/// Creates a Round-Robin router specifying number of instances
/// Creates a Consistent Hash router specifying name and number of instances
/// </summary>
/// <typeparam name="TActor"></typeparam>
/// <typeparam name="TRequest"></typeparam>
Expand Down
Loading

0 comments on commit 2496869

Please sign in to comment.