Skip to content

Commit

Permalink
Multi journal config feature implementation (#272)
Browse files Browse the repository at this point in the history
* A proof of concept of how multi journal config can be implemented

* Integrate options

* Add sample project

* Update Directory.Packages.props

---------

Co-authored-by: Aaron Stannard <aaron@petabridge.com>
  • Loading branch information
Arkatufus and Aaronontheweb authored Mar 15, 2023
1 parent 4bc801e commit 05368d5
Show file tree
Hide file tree
Showing 13 changed files with 372 additions and 16 deletions.
11 changes: 11 additions & 0 deletions Akka.Persistence.Azure.sln
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "build", "build", "{320BFA6C
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Akka.Persistence.Azure.Hosting", "src\Akka.Persistence.Azure.Hosting\Akka.Persistence.Azure.Hosting.csproj", "{64C6B877-9262-456B-8A1C-60C4F272DA19}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Samples", "Samples", "{34372787-606F-4D85-A24F-534A1E8348E3}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Akka.Persistence.Azure.Sample.MultiJournal", "src\Akka.Persistence.Azure.Sample.MultiJournal\Akka.Persistence.Azure.Sample.MultiJournal.csproj", "{C79F19DE-30AC-4F1E-A3FA-949276413AB9}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand All @@ -45,11 +49,18 @@ Global
{64C6B877-9262-456B-8A1C-60C4F272DA19}.Debug|Any CPU.Build.0 = Debug|Any CPU
{64C6B877-9262-456B-8A1C-60C4F272DA19}.Release|Any CPU.ActiveCfg = Release|Any CPU
{64C6B877-9262-456B-8A1C-60C4F272DA19}.Release|Any CPU.Build.0 = Release|Any CPU
{C79F19DE-30AC-4F1E-A3FA-949276413AB9}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{C79F19DE-30AC-4F1E-A3FA-949276413AB9}.Debug|Any CPU.Build.0 = Debug|Any CPU
{C79F19DE-30AC-4F1E-A3FA-949276413AB9}.Release|Any CPU.ActiveCfg = Release|Any CPU
{C79F19DE-30AC-4F1E-A3FA-949276413AB9}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {A2D867B3-6A2B-46B6-B249-81408CFD7A50}
EndGlobalSection
GlobalSection(NestedProjects) = preSolution
{C79F19DE-30AC-4F1E-A3FA-949276413AB9} = {34372787-606F-4D85-A24F-534A1E8348E3}
EndGlobalSection
EndGlobal
Original file line number Diff line number Diff line change
Expand Up @@ -114,18 +114,22 @@ internal void Apply(AkkaConfigurationBuilder builder)
if (AzureCredential is null || ServiceUri is null)
throw new ConfigurationException($"Both {nameof(ServiceUri)} and {nameof(AzureCredential)} need to be declared to use {nameof(AzureCredential)}");

var setup = builder.Setups.FirstOrDefault(s => s is AzureTableStorageJournalSetup) as AzureTableStorageJournalSetup;
var multiSetup = builder.Setups.FirstOrDefault(s => s is AzureTableStorageMultiJournalSetup) as AzureTableStorageMultiJournalSetup;
multiSetup ??= new AzureTableStorageMultiJournalSetup();

var setup = multiSetup.Get(Identifier);
setup ??= new AzureTableStorageJournalSetup();
Apply(setup);
multiSetup.Set(Identifier, setup);

builder.AddSetup(setup);
builder.AddSetup(multiSetup);
}

internal void Apply(AzureTableStorageJournalSetup setup)
{
setup.ServiceUri = ServiceUri;
setup.AzureCredential = AzureCredential;
setup.TableClientOptions = TableClientOptions;
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net6.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<IsPackable>false</IsPackable>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Akka.Cluster.Hosting" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\Akka.Persistence.Azure.Hosting\Akka.Persistence.Azure.Hosting.csproj" />
</ItemGroup>

</Project>
74 changes: 74 additions & 0 deletions src/Akka.Persistence.Azure.Sample.MultiJournal/Customers.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// -----------------------------------------------------------------------
// <copyright file="Customers.cs" company="Akka.NET Project">
// Copyright (C) 2013-2023 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
// -----------------------------------------------------------------------

using Akka.Actor;
using Akka.Event;

namespace Akka.Persistence.Azure.Sample.MultiJournal;

public sealed class PurchaseItem
{
public readonly string ItemName;

public PurchaseItem(string itemName)
{
ItemName = itemName;
}
}

public class Customer : ReceivePersistentActor
{
public static Props Props(string id) => Akka.Actor.Props.Create(() => new Customer(id));

private readonly ILoggingAdapter _log;
private readonly string _name;
private List<string> _purchasedItems = new();
private int _counter;

public Customer(string persistenceId)
{
PersistenceId = persistenceId;
_log = Context.GetLogger();
_name = Uri.UnescapeDataString(Self.Path.Name);

Recover<SnapshotOffer>(msg =>
{
_purchasedItems = (List<string>) msg.Snapshot;
_log.Info(
@$"'{_name}' restored purchases.
All items: [{string.Join(", ", _purchasedItems)}]
--------------------------");
});

Recover<PurchaseItem>(HandlePurchase);

Command<PurchaseItem>(purchase =>
{
Persist(purchase, HandlePurchase);
});

Command<SaveSnapshotSuccess>(msg =>
{
DeleteMessages(msg.Metadata.SequenceNr);
});

Command<DeleteMessagesSuccess>(_ => { });
}

private void HandlePurchase(PurchaseItem item)
{
_purchasedItems.Add(item.ItemName);
_log.Info(
@$"'{_name}' purchased '{item.ItemName}'.
All items: [{string.Join(", ", _purchasedItems)}]
--------------------------");
if (_counter > 0 && _counter % 10 == 0)
SaveSnapshot(_purchasedItems);
_counter++;
}

public override string PersistenceId { get; }
}
43 changes: 43 additions & 0 deletions src/Akka.Persistence.Azure.Sample.MultiJournal/MessageExtractor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// -----------------------------------------------------------------------
// <copyright file="MessageExtractor.cs" company="Akka.NET Project">
// Copyright (C) 2013-2023 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
// -----------------------------------------------------------------------

using Akka.Cluster.Sharding;

namespace Akka.Persistence.Azure.Sample.MultiJournal;

public sealed class ShardEnvelope
{
public string EntityId { get; }
public object Payload { get; }

public ShardEnvelope(string entityId, object payload)
{
EntityId = entityId;
Payload = payload;
}
}

public sealed class MessageExtractor : HashCodeMessageExtractor
{
public MessageExtractor(int maxNumberOfShards) : base(maxNumberOfShards)
{
}

public override string? EntityId(object message)
=> message switch
{
ShardRegion.StartEntity start => start.EntityId,
ShardEnvelope e => e.EntityId,
_ => null
};

public override object EntityMessage(object message)
=> message switch
{
ShardEnvelope e => e.Payload,
_ => message
};
}
82 changes: 82 additions & 0 deletions src/Akka.Persistence.Azure.Sample.MultiJournal/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// -----------------------------------------------------------------------
// <copyright file="Program.cs" company="Akka.NET Project">
// Copyright (C) 2013-2023 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
// -----------------------------------------------------------------------

using Akka.Actor;
using Akka.Cluster;
using Akka.Cluster.Hosting;
using Akka.Cluster.Sharding;
using Akka.Hosting;
using Akka.Persistence.Azure.Hosting;
using Akka.Persistence.Azure.Sample.MultiJournal;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;

const string shardConnectionString = "UseDevelopmentStorage=true";
const string connectionString = "UseDevelopmentStorage=true";

using var host = new HostBuilder()
.ConfigureLogging(builder =>
{
builder.AddConsole();
})
.ConfigureServices((context, services) =>
{
services.AddAkka("multi-journal", (builder, provider) =>
{
var persistenceJournal = new AzureTableStorageJournalOptions(true)
{
Identifier = "azure-journal",
ConnectionString = connectionString,
AutoInitialize = true
};
var shardJournal = new AzureTableStorageJournalOptions(false)
{
Identifier = "azure-shard-journal",
ConnectionString = shardConnectionString,
AutoInitialize = true
};
builder
.WithClustering()
.WithAzureTableJournal(persistenceJournal)
.WithAzureTableJournal(shardJournal)
.WithAzureBlobsSnapshotStore(new AzureBlobSnapshotOptions
{
ConnectionString = connectionString,
AutoInitialize = true
})
.WithShardRegion<ShardRegionKey>(
"region-1",
Customer.Props,
new MessageExtractor(10),
new ShardOptions
{
JournalOptions = shardJournal,
StateStoreMode = StateStoreMode.Persistence
})
.WithActors((system, registry, resolver) =>
{
var actor = system.ActorOf(resolver.Props<Purchaser>());
registry.Register<Purchaser>(actor);
});
});
}).Build();

await host.StartAsync();

var sys = host.Services.GetRequiredService<ActorSystem>();
var cluster = Cluster.Get(sys);
var registry = host.Services.GetRequiredService<ActorRegistry>();
var purchaser = registry.Get<Purchaser>();
cluster.RegisterOnMemberUp(() =>
{
purchaser.Tell("start");
});
cluster.Join(cluster.SelfAddress);

Console.ReadKey();

await host.StopAsync();
65 changes: 65 additions & 0 deletions src/Akka.Persistence.Azure.Sample.MultiJournal/Purchaser.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// -----------------------------------------------------------------------
// <copyright file="Purchaser.cs" company="Akka.NET Project">
// Copyright (C) 2013-2023 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
// -----------------------------------------------------------------------

using Akka.Actor;
using Akka.Hosting;
using Akka.Util;

namespace Akka.Persistence.Azure.Sample.MultiJournal;

public class Purchaser: UntypedActor, IWithTimers
{
private const string TimerKey = "timer-key";
private const string Timer = "timer";
private readonly string[] _customers;
private readonly string[] _items;
private readonly IActorRef _shardRegion;
private readonly TimeSpan _purchaseInterval = TimeSpan.FromSeconds(3);

public Purchaser(IRequiredActor<ShardRegionKey> shardActor)
{
_customers = new[]
{
"Yoda", "Obi-Wan", "Darth Vader", "Princess Leia",
"Luke Skywalker", "R2D2", "Han Solo", "Chewbacca", "Jabba"
};

_items = new[]
{
"Yoghurt", "Fruits", "Light Saber", "Fluffy toy", "Dream Catcher",
"Candies", "Cigars", "Chicken nuggets", "French fries"
};

_shardRegion = shardActor.ActorRef;
}

public ITimerScheduler Timers { get; set; }

protected override void OnReceive(object message)
{
if (message is not string str)
return;

switch (str)
{
case "start":
Timers.StartPeriodicTimer(TimerKey, Timer, _purchaseInterval);
return;
case Timer:
var customer = PickRandom(_customers);
var item = PickRandom(_items);

// A shard message needs to be wrapped inside an envelope so the system knows which
// shard and actor it should route the message to.
var envelope = new ShardEnvelope(customer, new PurchaseItem(item));

_shardRegion.Tell(envelope);
return;
}
}

private static T PickRandom<T>(IReadOnlyList<T> items) => items[ThreadLocalRandom.Current.Next(items.Count)];
}
10 changes: 10 additions & 0 deletions src/Akka.Persistence.Azure.Sample.MultiJournal/ShardRegionKey.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
// -----------------------------------------------------------------------
// <copyright file="ShardRegionKey.cs" company="Akka.NET Project">
// Copyright (C) 2013-2023 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
// -----------------------------------------------------------------------

namespace Akka.Persistence.Azure.Sample.MultiJournal;

public sealed class ShardRegionKey
{ }
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,15 @@ public AzureTableStorageJournal(Config config = null)
if (setup.HasValue)
_settings = setup.Value.Apply(_settings);

var multiSetup = Context.System.Settings.Setup.Get<AzureTableStorageMultiJournalSetup>();
if (multiSetup.HasValue)
{
var journalId = Self.Path.Name.SplitDottedPathHonouringQuotes().Last();
setup = multiSetup.Value.Get(journalId);
if(setup.HasValue)
_settings = setup.Value.Apply(_settings);
}

_serialization = new SerializationHelper(Context.System);

if (_settings.Development)
Expand Down
Loading

0 comments on commit 05368d5

Please sign in to comment.