diff --git a/build.fsx b/build.fsx index 1428c1cc88d..d8e3d65acae 100644 --- a/build.fsx +++ b/build.fsx @@ -217,7 +217,8 @@ Target "RunTests" <| fun _ -> let xunitTestAssemblies = !! "src/**/bin/Release/*.Tests.dll" -- "src/**/bin/Release/Akka.TestKit.VsTest.Tests.dll" -- "src/**/bin/Release/Akka.TestKit.NUnit.Tests.dll" -- - "src/**/bin/Release/Akka.Persistence.SqlServer.Tests.dll" + "src/**/bin/Release/Akka.Persistence.SqlServer.Tests.dll" -- + "src/**/bin/Release/Akka.Persistence.PostgreSql.Tests.dll" mkdir testOutput @@ -271,6 +272,14 @@ Target "RunSqlServerTests" <| fun _ -> (fun p -> { p with OutputDir = testOutput; ToolPath = xunitToolPath }) sqlServerTests +Target "RunPostgreSqlTests" <| fun _ -> + let postgreSqlTests = !! "src/**/bin/Release/Akka.Persistence.PostgreSql.Tests.dll" + let xunitToolPath = findToolInSubPath "xunit.console.exe" "src/packages/xunit.runner.console*/tools" + printfn "Using XUnit runner: %s" xunitToolPath + xUnit2 + (fun p -> { p with OutputDir = testOutput; ToolPath = xunitToolPath }) + postgreSqlTests + //-------------------------------------------------------------------------------- // Nuget targets //-------------------------------------------------------------------------------- diff --git a/src/Akka.sln b/src/Akka.sln index 346cffa32fd..727a47f922c 100644 --- a/src/Akka.sln +++ b/src/Akka.sln @@ -209,6 +209,10 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Akka.MultiNodeTests", "core EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Akka.Persistence.Sql.Common", "contrib\persistence\Akka.Persistence.Sql.Common\Akka.Persistence.Sql.Common.csproj", "{3B9E6211-9488-4DB5-B714-24248693B38F}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Akka.Persistence.PostgreSql", "contrib\persistence\Akka.Persistence.PostgreSql\Akka.Persistence.PostgreSql.csproj", "{4B89227B-5AD1-4061-816F-570067C3727F}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Akka.Persistence.PostgreSql.Tests", "contrib\persistence\Akka.Persistence.PostgreSql.Tests\Akka.Persistence.PostgreSql.Tests.csproj", "{2D1812FD-70C0-43EE-9C25-3980E41F30E1}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug Mono|Any CPU = Debug Mono|Any CPU @@ -774,6 +778,22 @@ Global {3B9E6211-9488-4DB5-B714-24248693B38F}.Release Mono|Any CPU.Build.0 = Release|Any CPU {3B9E6211-9488-4DB5-B714-24248693B38F}.Release|Any CPU.ActiveCfg = Release|Any CPU {3B9E6211-9488-4DB5-B714-24248693B38F}.Release|Any CPU.Build.0 = Release|Any CPU + {4B89227B-5AD1-4061-816F-570067C3727F}.Debug Mono|Any CPU.ActiveCfg = Debug|Any CPU + {4B89227B-5AD1-4061-816F-570067C3727F}.Debug Mono|Any CPU.Build.0 = Debug|Any CPU + {4B89227B-5AD1-4061-816F-570067C3727F}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {4B89227B-5AD1-4061-816F-570067C3727F}.Debug|Any CPU.Build.0 = Debug|Any CPU + {4B89227B-5AD1-4061-816F-570067C3727F}.Release Mono|Any CPU.ActiveCfg = Release|Any CPU + {4B89227B-5AD1-4061-816F-570067C3727F}.Release Mono|Any CPU.Build.0 = Release|Any CPU + {4B89227B-5AD1-4061-816F-570067C3727F}.Release|Any CPU.ActiveCfg = Release|Any CPU + {4B89227B-5AD1-4061-816F-570067C3727F}.Release|Any CPU.Build.0 = Release|Any CPU + {2D1812FD-70C0-43EE-9C25-3980E41F30E1}.Debug Mono|Any CPU.ActiveCfg = Debug|Any CPU + {2D1812FD-70C0-43EE-9C25-3980E41F30E1}.Debug Mono|Any CPU.Build.0 = Debug|Any CPU + {2D1812FD-70C0-43EE-9C25-3980E41F30E1}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {2D1812FD-70C0-43EE-9C25-3980E41F30E1}.Debug|Any CPU.Build.0 = Debug|Any CPU + {2D1812FD-70C0-43EE-9C25-3980E41F30E1}.Release Mono|Any CPU.ActiveCfg = Release|Any CPU + {2D1812FD-70C0-43EE-9C25-3980E41F30E1}.Release Mono|Any CPU.Build.0 = Release|Any CPU + {2D1812FD-70C0-43EE-9C25-3980E41F30E1}.Release|Any CPU.ActiveCfg = Release|Any CPU + {2D1812FD-70C0-43EE-9C25-3980E41F30E1}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -868,5 +888,7 @@ Global {7DBD5C17-5E9D-40C4-9201-D092751532A7} = {7625FD95-4B2C-4A5B-BDD5-94B1493FAC8E} {F0781BEA-5BA0-4AF0-BB15-E3F209B681F5} = {01167D3C-49C4-4CDE-9787-C176D139ACDD} {3B9E6211-9488-4DB5-B714-24248693B38F} = {264C22A4-CAFC-41F6-B82C-4DDC5C196767} + {4B89227B-5AD1-4061-816F-570067C3727F} = {264C22A4-CAFC-41F6-B82C-4DDC5C196767} + {2D1812FD-70C0-43EE-9C25-3980E41F30E1} = {264C22A4-CAFC-41F6-B82C-4DDC5C196767} EndGlobalSection EndGlobal diff --git a/src/contrib/persistence/Akka.Persistence.PostgreSql.Tests/Akka.Persistence.PostgreSql.Tests.csproj b/src/contrib/persistence/Akka.Persistence.PostgreSql.Tests/Akka.Persistence.PostgreSql.Tests.csproj new file mode 100644 index 00000000000..7acb38a43b3 --- /dev/null +++ b/src/contrib/persistence/Akka.Persistence.PostgreSql.Tests/Akka.Persistence.PostgreSql.Tests.csproj @@ -0,0 +1,106 @@ + + + + + Debug + AnyCPU + {2D1812FD-70C0-43EE-9C25-3980E41F30E1} + Library + Properties + Akka.Persistence.PostgreSql.Tests + Akka.Persistence.PostgreSql.Tests + v4.5 + 512 + ..\ + true + + + + true + full + false + bin\Debug\ + DEBUG;TRACE + prompt + 4 + + + pdbonly + true + bin\Release\ + TRACE + prompt + 4 + + + + ..\..\..\packages\Npgsql.2.2.5\lib\net45\Mono.Security.dll + + + ..\..\..\packages\Npgsql.2.2.5\lib\net45\Npgsql.dll + + + + + + + + + + + + + + + + + + + + + + + {7dbd5c17-5e9d-40c4-9201-d092751532a7} + Akka.TestKit.Xunit2 + + + {4b89227b-5ad1-4061-816f-570067c3727f} + Akka.Persistence.PostgreSql + + + {ad9418b6-c452-4169-94fb-d43de0bfa966} + Akka.Persistence.TestKit + + + {fca84dea-c118-424b-9eb8-34375dfef18a} + Akka.Persistence + + + {0d3cbad0-bbdb-43e5-afc4-ed1d3ecdc224} + Akka.TestKit + + + {5deddf90-37f0-48d3-a0b0-a5cbd8a7e377} + Akka + + + {3b9e6211-9488-4db5-b714-24248693b38f} + Akka.Persistence.Sql.Common + + + + + + + This project references NuGet package(s) that are missing on this computer. Enable NuGet Package Restore to download them. For more information, see http://go.microsoft.com/fwlink/?LinkID=322105. The missing file is {0}. + + + + + \ No newline at end of file diff --git a/src/contrib/persistence/Akka.Persistence.PostgreSql.Tests/DbUtils.cs b/src/contrib/persistence/Akka.Persistence.PostgreSql.Tests/DbUtils.cs new file mode 100644 index 00000000000..b9fbc9c43b0 --- /dev/null +++ b/src/contrib/persistence/Akka.Persistence.PostgreSql.Tests/DbUtils.cs @@ -0,0 +1,80 @@ +using System; +using System.Configuration; +using System.Data.SqlClient; +using Akka.Dispatch.SysMsg; +using Npgsql; + +namespace Akka.Persistence.PostgreSql.Tests +{ + public static class DbUtils + { + public static void Initialize() + { + var connectionString = ConfigurationManager.ConnectionStrings["TestDb"].ConnectionString; + var connectionBuilder = new NpgsqlConnectionStringBuilder(connectionString); + + //connect to postgres database to create a new database + var databaseName = connectionBuilder.Database; + connectionBuilder.Database = "postgres"; + connectionString = connectionBuilder.ToString(); + + using (var conn = new NpgsqlConnection(connectionString)) + { + conn.Open(); + + bool dbExists; + using (var cmd = new NpgsqlCommand()) + { + cmd.CommandText = string.Format(@"SELECT TRUE FROM pg_database WHERE datname='{0}'", databaseName); + cmd.Connection = conn; + + var result = cmd.ExecuteScalar(); + dbExists = result != null && Convert.ToBoolean(result); + } + + if (dbExists) + { + DoClean(conn); + } + else + { + DoCreate(conn, databaseName); + } + } + } + + public static void Clean() + { + var connectionString = ConfigurationManager.ConnectionStrings["TestDb"].ConnectionString; + + using (var conn = new NpgsqlConnection(connectionString)) + { + conn.Open(); + + DoClean(conn); + } + } + + private static void DoCreate(NpgsqlConnection conn, string databaseName) + { + using (var cmd = new NpgsqlCommand()) + { + cmd.CommandText = string.Format(@"CREATE DATABASE {0}", databaseName); + cmd.Connection = conn; + cmd.ExecuteNonQuery(); + } + } + + private static void DoClean(NpgsqlConnection conn) + { + using (var cmd = new NpgsqlCommand()) + { + cmd.CommandText = @" + DROP TABLE IF EXISTS public.event_journal; + DROP TABLE IF EXISTS public.snapshot_store"; + cmd.Connection = conn; + cmd.ExecuteNonQuery(); + } + } + } +} \ No newline at end of file diff --git a/src/contrib/persistence/Akka.Persistence.PostgreSql.Tests/PostgreSqlJournalSpec.cs b/src/contrib/persistence/Akka.Persistence.PostgreSql.Tests/PostgreSqlJournalSpec.cs new file mode 100644 index 00000000000..6c9ecd27e92 --- /dev/null +++ b/src/contrib/persistence/Akka.Persistence.PostgreSql.Tests/PostgreSqlJournalSpec.cs @@ -0,0 +1,49 @@ +using System.Configuration; +using Akka.Configuration; +using Akka.Persistence.TestKit.Journal; + +namespace Akka.Persistence.PostgreSql.Tests +{ + public class PostgreSqlJournalSpec : JournalSpec + { + private static readonly Config SpecConfig; + + static PostgreSqlJournalSpec() + { + var connectionString = ConfigurationManager.ConnectionStrings["TestDb"].ConnectionString; + + var config = @" + akka.persistence { + publish-plugin-commands = on + journal { + plugin = ""akka.persistence.journal.postgresql"" + postgresql { + class = ""Akka.Persistence.PostgreSql.Journal.PostgreSqlJournal, Akka.Persistence.PostgreSql"" + plugin-dispatcher = ""akka.actor.default-dispatcher"" + table-name = event_journal + schema-name = public + auto-initialize = on + connection-string = """ + connectionString + @""" + } + } + }"; + + SpecConfig = ConfigurationFactory.ParseString(config); + + //need to make sure db is created before the tests start + DbUtils.Initialize(); + } + + public PostgreSqlJournalSpec() + : base(SpecConfig, "PostgreSqlJournalSpec") + { + Initialize(); + } + + protected override void Dispose(bool disposing) + { + base.Dispose(disposing); + DbUtils.Clean(); + } + } +} \ No newline at end of file diff --git a/src/contrib/persistence/Akka.Persistence.PostgreSql.Tests/PostgreSqlSnapshotStoreSpec.cs b/src/contrib/persistence/Akka.Persistence.PostgreSql.Tests/PostgreSqlSnapshotStoreSpec.cs new file mode 100644 index 00000000000..1056459a04c --- /dev/null +++ b/src/contrib/persistence/Akka.Persistence.PostgreSql.Tests/PostgreSqlSnapshotStoreSpec.cs @@ -0,0 +1,49 @@ +using System.Configuration; +using Akka.Configuration; +using Akka.Persistence.TestKit.Snapshot; + +namespace Akka.Persistence.PostgreSql.Tests +{ + public class PostgreSqlSnapshotStoreSpec : SnapshotStoreSpec + { + private static readonly Config SpecConfig; + + static PostgreSqlSnapshotStoreSpec() + { + var connectionString = ConfigurationManager.ConnectionStrings["TestDb"].ConnectionString; + + var config = @" + akka.persistence { + publish-plugin-commands = on + snapshot-store { + plugin = ""akka.persistence.snapshot-store.postgresql"" + postgresql { + class = ""Akka.Persistence.PostgreSql.Snapshot.PostgreSqlSnapshotStore, Akka.Persistence.PostgreSql"" + plugin-dispatcher = ""akka.actor.default-dispatcher"" + table-name = snapshot_store + schema-name = public + auto-initialize = on + connection-string = """ + connectionString + @""" + } + } + }"; + + SpecConfig = ConfigurationFactory.ParseString(config); + + //need to make sure db is created before the tests start + DbUtils.Initialize(); + } + + public PostgreSqlSnapshotStoreSpec() + : base(SpecConfig, "PostgreSqlSnapshotStoreSpec") + { + Initialize(); + } + + protected override void Dispose(bool disposing) + { + base.Dispose(disposing); + DbUtils.Clean(); + } + } +} \ No newline at end of file diff --git a/src/contrib/persistence/Akka.Persistence.PostgreSql.Tests/Properties/AssemblyInfo.cs b/src/contrib/persistence/Akka.Persistence.PostgreSql.Tests/Properties/AssemblyInfo.cs new file mode 100644 index 00000000000..1970f490550 --- /dev/null +++ b/src/contrib/persistence/Akka.Persistence.PostgreSql.Tests/Properties/AssemblyInfo.cs @@ -0,0 +1,36 @@ +using System.Reflection; +using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; + +// General Information about an assembly is controlled through the following +// set of attributes. Change these attribute values to modify the information +// associated with an assembly. +[assembly: AssemblyTitle("Akka.Persistence.PostgreSql.Tests")] +[assembly: AssemblyDescription("")] +[assembly: AssemblyConfiguration("")] +[assembly: AssemblyCompany("")] +[assembly: AssemblyProduct("Akka.Persistence.PostgreSql.Tests")] +[assembly: AssemblyCopyright("Copyright © 2015")] +[assembly: AssemblyTrademark("")] +[assembly: AssemblyCulture("")] + +// Setting ComVisible to false makes the types in this assembly not visible +// to COM components. If you need to access a type in this assembly from +// COM, set the ComVisible attribute to true on that type. +[assembly: ComVisible(false)] + +// The following GUID is for the ID of the typelib if this project is exposed to COM +[assembly: Guid("8494fd8c-15ae-489e-83aa-1ac37b458964")] + +// Version information for an assembly consists of the following four values: +// +// Major Version +// Minor Version +// Build Number +// Revision +// +// You can specify all the values or you can default the Build and Revision Numbers +// by using the '*' as shown below: +// [assembly: AssemblyVersion("1.0.*")] +[assembly: AssemblyVersion("1.0.0.0")] +[assembly: AssemblyFileVersion("1.0.0.0")] diff --git a/src/contrib/persistence/Akka.Persistence.PostgreSql.Tests/app.config b/src/contrib/persistence/Akka.Persistence.PostgreSql.Tests/app.config new file mode 100644 index 00000000000..010fe0e91f2 --- /dev/null +++ b/src/contrib/persistence/Akka.Persistence.PostgreSql.Tests/app.config @@ -0,0 +1,6 @@ + + + + + + diff --git a/src/contrib/persistence/Akka.Persistence.PostgreSql.Tests/packages.config b/src/contrib/persistence/Akka.Persistence.PostgreSql.Tests/packages.config new file mode 100644 index 00000000000..9c8e7e8768b --- /dev/null +++ b/src/contrib/persistence/Akka.Persistence.PostgreSql.Tests/packages.config @@ -0,0 +1,4 @@ + + + + \ No newline at end of file diff --git a/src/contrib/persistence/Akka.Persistence.PostgreSql/Akka.Persistence.PostgreSql.csproj b/src/contrib/persistence/Akka.Persistence.PostgreSql/Akka.Persistence.PostgreSql.csproj new file mode 100644 index 00000000000..08e88a4798e --- /dev/null +++ b/src/contrib/persistence/Akka.Persistence.PostgreSql/Akka.Persistence.PostgreSql.csproj @@ -0,0 +1,101 @@ + + + + + Debug + AnyCPU + {4B89227B-5AD1-4061-816F-570067C3727F} + Library + Properties + Akka.Persistence.PostgreSql + Akka.Persistence.PostgreSql + v4.5 + 512 + ..\ + true + + + + true + full + false + bin\Debug\ + DEBUG;TRACE + prompt + 4 + + + pdbonly + true + bin\Release\ + TRACE + prompt + 4 + + + + ..\..\..\packages\Npgsql.2.2.5\lib\net45\Mono.Security.dll + + + ..\..\..\packages\Npgsql.2.2.5\lib\net45\Npgsql.dll + + + + + + + + + + + + + + + + + + + + + + + + Always + + + + + {fca84dea-c118-424b-9eb8-34375dfef18a} + Akka.Persistence + + + {5deddf90-37f0-48d3-a0b0-a5cbd8a7e377} + Akka + + + {3b9e6211-9488-4db5-b714-24248693b38f} + Akka.Persistence.Sql.Common + + + + + + + + + + + + This project references NuGet package(s) that are missing on this computer. Enable NuGet Package Restore to download them. For more information, see http://go.microsoft.com/fwlink/?LinkID=322105. The missing file is {0}. + + + + + \ No newline at end of file diff --git a/src/contrib/persistence/Akka.Persistence.PostgreSql/Akka.Persistence.PostgreSql.nuspec b/src/contrib/persistence/Akka.Persistence.PostgreSql/Akka.Persistence.PostgreSql.nuspec new file mode 100644 index 00000000000..d85c4b4a521 --- /dev/null +++ b/src/contrib/persistence/Akka.Persistence.PostgreSql/Akka.Persistence.PostgreSql.nuspec @@ -0,0 +1,20 @@ + + + + @project@ + @project@@title@ + @build.number@ + @authors@ + @authors@ + Akka.NET Persistence journal and snapshot store backed by PostgreSql. + https://github.com/akkadotnet/akka.net/blob/master/LICENSE + https://github.com/akkadotnet/akka.net + http://getakka.net/images/AkkaNetLogo.Normal.png + false + @releaseNotes@ + @copyright@ + @tags@ persistence eventsource postgresql + @dependencies@ + @references@ + + diff --git a/src/contrib/persistence/Akka.Persistence.PostgreSql/Extension.cs b/src/contrib/persistence/Akka.Persistence.PostgreSql/Extension.cs new file mode 100644 index 00000000000..3f7bcbec513 --- /dev/null +++ b/src/contrib/persistence/Akka.Persistence.PostgreSql/Extension.cs @@ -0,0 +1,116 @@ +using System; +using Akka.Actor; +using Akka.Configuration; +using Akka.Persistence.Sql.Common; + +namespace Akka.Persistence.PostgreSql +{ + /// + /// Configuration settings representation targeting PostgreSql journal actor. + /// + public class PostgreSqlJournalSettings : JournalSettings + { + public const string JournalConfigPath = "akka.persistence.journal.postgresql"; + + /// + /// Flag determining in case of event journal table missing, it should be automatically initialized. + /// + public bool AutoInitialize { get; private set; } + + public PostgreSqlJournalSettings(Config config) + : base(config) + { + AutoInitialize = config.GetBoolean("auto-initialize"); + } + } + + /// + /// Configuration settings representation targeting PostgreSql snapshot store actor. + /// + public class PostgreSqlSnapshotStoreSettings : SnapshotStoreSettings + { + public const string SnapshotStoreConfigPath = "akka.persistence.snapshot-store.postgresql"; + + /// + /// Flag determining in case of snapshot store table missing, it should be automatically initialized. + /// + public bool AutoInitialize { get; private set; } + + public PostgreSqlSnapshotStoreSettings(Config config) + : base(config) + { + AutoInitialize = config.GetBoolean("auto-initialize"); + } + } + + /// + /// An actor system extension initializing support for PostgreSql persistence layer. + /// + public class PostgreSqlPersistenceExtension : IExtension + { + /// + /// Journal-related settings loaded from HOCON configuration. + /// + public readonly PostgreSqlJournalSettings JournalSettings; + + /// + /// Snapshot store related settings loaded from HOCON configuration. + /// + public readonly PostgreSqlSnapshotStoreSettings SnapshotStoreSettings; + + public PostgreSqlPersistenceExtension(ExtendedActorSystem system) + { + system.Settings.InjectTopLevelFallback(PostgreSqlPersistence.DefaultConfiguration()); + + JournalSettings = new PostgreSqlJournalSettings(system.Settings.Config.GetConfig(PostgreSqlJournalSettings.JournalConfigPath)); + SnapshotStoreSettings = new PostgreSqlSnapshotStoreSettings(system.Settings.Config.GetConfig(PostgreSqlSnapshotStoreSettings.SnapshotStoreConfigPath)); + + if (JournalSettings.AutoInitialize) + { + PostgreSqlInitializer.CreatePostgreSqlJournalTables(JournalSettings.ConnectionString, JournalSettings.SchemaName, JournalSettings.TableName); + } + + if (SnapshotStoreSettings.AutoInitialize) + { + PostgreSqlInitializer.CreatePostgreSqlSnapshotStoreTables(SnapshotStoreSettings.ConnectionString, SnapshotStoreSettings.SchemaName, SnapshotStoreSettings.TableName); + } + } + } + + /// + /// Singleton class used to setup PostgreSQL backend for akka persistence plugin. + /// + public class PostgreSqlPersistence : ExtensionIdProvider + { + public static readonly PostgreSqlPersistence Instance = new PostgreSqlPersistence(); + + /// + /// Initializes a PostgreSQL persistence plugin inside provided . + /// + public static void Init(ActorSystem actorSystem) + { + Instance.Apply(actorSystem); + } + + private PostgreSqlPersistence() { } + + /// + /// Creates an actor system extension for akka persistence PostgreSQL support. + /// + /// + /// + public override PostgreSqlPersistenceExtension CreateExtension(ExtendedActorSystem system) + { + return new PostgreSqlPersistenceExtension(system); + } + + /// + /// Returns a default configuration for akka persistence PostgreSQL-based journals and snapshot stores. + /// + /// + public static Config DefaultConfiguration() + { + return ConfigurationFactory.FromResource("Akka.Persistence.PostgreSql.postgresql.conf"); + } + } +} \ No newline at end of file diff --git a/src/contrib/persistence/Akka.Persistence.PostgreSql/InternalExtensions.cs b/src/contrib/persistence/Akka.Persistence.PostgreSql/InternalExtensions.cs new file mode 100644 index 00000000000..cec1132ebb5 --- /dev/null +++ b/src/contrib/persistence/Akka.Persistence.PostgreSql/InternalExtensions.cs @@ -0,0 +1,15 @@ +using System; +using System.Data.SqlClient; +using Npgsql; + +namespace Akka.Persistence.PostgreSql +{ + internal static class InternalExtensions + { + public static string QuoteSchemaAndTable(this string sqlQuery, string schemaName, string tableName) + { + var cb = new NpgsqlCommandBuilder(); + return string.Format(sqlQuery, cb.QuoteIdentifier(schemaName), cb.QuoteIdentifier(tableName)); + } + } +} \ No newline at end of file diff --git a/src/contrib/persistence/Akka.Persistence.PostgreSql/Journal/PostgreSqlJournal.cs b/src/contrib/persistence/Akka.Persistence.PostgreSql/Journal/PostgreSqlJournal.cs new file mode 100644 index 00000000000..d942eafa2d6 --- /dev/null +++ b/src/contrib/persistence/Akka.Persistence.PostgreSql/Journal/PostgreSqlJournal.cs @@ -0,0 +1,96 @@ +using System; +using System.Collections.Generic; +using System.Data.Common; +using System.Data.SqlClient; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Akka.Persistence.Journal; +using Npgsql; +using Akka.Persistence.Sql.Common.Journal; +using Akka.Persistence.Sql.Common; + +namespace Akka.Persistence.PostgreSql.Journal +{ + public class PostgreSqlJournalEngine : JournalDbEngine + { + public PostgreSqlJournalEngine(JournalSettings journalSettings, Akka.Serialization.Serialization serialization) + : base(journalSettings, serialization) + { + QueryBuilder = new PostgreSqlJournalQueryBuilder(journalSettings.TableName, journalSettings.SchemaName); + QueryMapper = new PostgreSqlJournalQueryMapper(serialization); + } + + protected override DbConnection CreateDbConnection() + { + return new NpgsqlConnection(Settings.ConnectionString); + } + + protected override void CopyParamsToCommand(DbCommand sqlCommand, JournalEntry entry) + { + sqlCommand.Parameters[":persistence_id"].Value = entry.PersistenceId; + sqlCommand.Parameters[":sequence_nr"].Value = entry.SequenceNr; + sqlCommand.Parameters[":is_deleted"].Value = entry.IsDeleted; + sqlCommand.Parameters[":payload_type"].Value = entry.PayloadType; + sqlCommand.Parameters[":payload"].Value = entry.Payload; + } + } + + /// + /// Persistent journal actor using PostgreSQL as persistence layer. It processes write requests + /// one by one in synchronous manner, while reading results asynchronously. + /// + public class PostgreSqlJournal : SyncWriteJournal + { + private readonly PostgreSqlPersistenceExtension _extension; + private PostgreSqlJournalEngine _engine; + + public PostgreSqlJournal() + { + _extension = PostgreSqlPersistence.Instance.Apply(Context.System); + } + + /// + /// Gets an engine instance responsible for handling all database-related journal requests. + /// + protected virtual JournalDbEngine Engine + { + get + { + return _engine ?? (_engine = new PostgreSqlJournalEngine(_extension.JournalSettings, Context.System.Serialization)); + } + } + + protected override void PreStart() + { + base.PreStart(); + Engine.Open(); + } + + protected override void PostStop() + { + base.PostStop(); + Engine.Close(); + } + + public override Task ReplayMessagesAsync(string persistenceId, long fromSequenceNr, long toSequenceNr, long max, Action replayCallback) + { + return Engine.ReplayMessagesAsync(persistenceId, fromSequenceNr, toSequenceNr, max, Context.Sender, replayCallback); + } + + public override Task ReadHighestSequenceNrAsync(string persistenceId, long fromSequenceNr) + { + return Engine.ReadHighestSequenceNrAsync(persistenceId, fromSequenceNr); + } + + public override void WriteMessages(IEnumerable messages) + { + Engine.WriteMessages(messages); + } + + public override void DeleteMessagesTo(string persistenceId, long toSequenceNr, bool isPermanent) + { + Engine.DeleteMessagesTo(persistenceId, toSequenceNr, isPermanent); + } + } +} \ No newline at end of file diff --git a/src/contrib/persistence/Akka.Persistence.PostgreSql/Journal/QueryBuilder.cs b/src/contrib/persistence/Akka.Persistence.PostgreSql/Journal/QueryBuilder.cs new file mode 100644 index 00000000000..0356f757f8e --- /dev/null +++ b/src/contrib/persistence/Akka.Persistence.PostgreSql/Journal/QueryBuilder.cs @@ -0,0 +1,140 @@ +using System.Data; +using System.Data.SqlClient; +using System.Text; +using Npgsql; +using NpgsqlTypes; +using Akka.Persistence.Sql.Common.Journal; +using System.Data.Common; + +namespace Akka.Persistence.PostgreSql.Journal +{ + internal class PostgreSqlJournalQueryBuilder : IJournalQueryBuilder + { + private readonly string _schemaName; + private readonly string _tableName; + + private readonly string _selectHighestSequenceNrSql; + private readonly string _insertMessagesSql; + + public PostgreSqlJournalQueryBuilder(string tableName, string schemaName) + { + _tableName = tableName; + _schemaName = schemaName; + + _insertMessagesSql = "INSERT INTO {0}.{1} (persistence_id, sequence_nr, is_deleted, payload_type, payload) VALUES (:persistence_id, :sequence_nr, :is_deleted, :payload_type, :payload)" + .QuoteSchemaAndTable(_schemaName, _tableName); + _selectHighestSequenceNrSql = @"SELECT MAX(sequence_nr) FROM {0}.{1} WHERE persistence_id = :persistence_id".QuoteSchemaAndTable(_schemaName, _tableName); + } + + public DbCommand SelectMessages(string persistenceId, long fromSequenceNr, long toSequenceNr, long max) + { + var sql = BuildSelectMessagesSql(fromSequenceNr, toSequenceNr, max); + var command = new NpgsqlCommand(sql) + { + Parameters = { PersistenceIdToSqlParam(persistenceId) } + }; + + return command; + } + + public DbCommand SelectHighestSequenceNr(string persistenceId) + { + var command = new NpgsqlCommand(_selectHighestSequenceNrSql) + { + Parameters = { PersistenceIdToSqlParam(persistenceId) } + }; + + return command; + } + + public DbCommand InsertBatchMessages(IPersistentRepresentation[] messages) + { + var command = new NpgsqlCommand(_insertMessagesSql); + command.Parameters.Add(":persistence_id", NpgsqlDbType.Varchar); + command.Parameters.Add(":sequence_nr", NpgsqlDbType.Bigint); + command.Parameters.Add(":is_deleted", NpgsqlDbType.Boolean); + command.Parameters.Add(":payload_type", NpgsqlDbType.Varchar); + command.Parameters.Add(":payload", NpgsqlDbType.Bytea); + + return command; + } + + public DbCommand DeleteBatchMessages(string persistenceId, long toSequenceNr, bool permanent) + { + var sql = BuildDeleteSql(toSequenceNr, permanent); + var command = new NpgsqlCommand(sql) + { + Parameters = { PersistenceIdToSqlParam(persistenceId) } + }; + + return command; + } + + private string BuildDeleteSql(long toSequenceNr, bool permanent) + { + var sqlBuilder = new StringBuilder(); + + if (permanent) + { + sqlBuilder.Append("DELETE FROM {0}.{1} ".QuoteSchemaAndTable(_schemaName, _tableName)); + } + else + { + sqlBuilder.Append("UPDATE {0}.{1} SET is_deleted = true ".QuoteSchemaAndTable(_schemaName, _tableName)); + } + + sqlBuilder.Append("WHERE persistence_id = :persistence_id"); + + if (toSequenceNr != long.MaxValue) + { + sqlBuilder.Append(" AND sequence_nr <= ").Append(toSequenceNr); + } + + var sql = sqlBuilder.ToString(); + return sql; + } + + private string BuildSelectMessagesSql(long fromSequenceNr, long toSequenceNr, long max) + { + var sqlBuilder = new StringBuilder(); + sqlBuilder.AppendFormat( + @"SELECT + persistence_id, + sequence_nr, + is_deleted, + payload_type, + payload ") + .Append(" FROM {0}.{1} WHERE persistence_id = :persistence_id".QuoteSchemaAndTable(_schemaName, _tableName)); + + // since we guarantee type of fromSequenceNr, toSequenceNr and max + // we can inline them without risk of SQL injection + + if (fromSequenceNr > 0) + { + if (toSequenceNr != long.MaxValue) + sqlBuilder.Append(" AND sequence_nr BETWEEN ") + .Append(fromSequenceNr) + .Append(" AND ") + .Append(toSequenceNr); + else + sqlBuilder.Append(" AND sequence_nr >= ").Append(fromSequenceNr); + } + + if (toSequenceNr != long.MaxValue) + sqlBuilder.Append(" AND sequence_nr <= ").Append(toSequenceNr); + + if (max != long.MaxValue) + { + sqlBuilder.AppendFormat(" LIMIT {0}", max); + } + + var sql = sqlBuilder.ToString(); + return sql; + } + + private static NpgsqlParameter PersistenceIdToSqlParam(string persistenceId, string paramName = null) + { + return new NpgsqlParameter(paramName ?? ":persistence_id", NpgsqlDbType.Varchar, persistenceId.Length) { Value = persistenceId }; + } + } +} \ No newline at end of file diff --git a/src/contrib/persistence/Akka.Persistence.PostgreSql/Journal/QueryMapper.cs b/src/contrib/persistence/Akka.Persistence.PostgreSql/Journal/QueryMapper.cs new file mode 100644 index 00000000000..44b65b9790e --- /dev/null +++ b/src/contrib/persistence/Akka.Persistence.PostgreSql/Journal/QueryMapper.cs @@ -0,0 +1,39 @@ +using System; +using System.Data.Common; +using System.Data.SqlClient; +using Npgsql; +using Akka.Persistence.Sql.Common.Journal; +using Akka.Actor; + +namespace Akka.Persistence.PostgreSql.Journal +{ + internal class PostgreSqlJournalQueryMapper : IJournalQueryMapper + { + private readonly Akka.Serialization.Serialization _serialization; + + public PostgreSqlJournalQueryMapper(Akka.Serialization.Serialization serialization) + { + _serialization = serialization; + } + + public IPersistentRepresentation Map(DbDataReader reader, IActorRef sender = null) + { + var persistenceId = reader.GetString(0); + var sequenceNr = reader.GetInt64(1); + var isDeleted = reader.GetBoolean(2); + var payload = GetPayload(reader); + + return new Persistent(payload, sequenceNr, persistenceId, isDeleted, sender); + } + + private object GetPayload(DbDataReader reader) + { + var payloadType = reader.GetString(3); + var type = Type.GetType(payloadType, true); + var binary = (byte[]) reader[4]; + + var serializer = _serialization.FindSerializerForType(type); + return serializer.FromBinary(binary, type); + } + } +} \ No newline at end of file diff --git a/src/contrib/persistence/Akka.Persistence.PostgreSql/PostgreSqlInitializer.cs b/src/contrib/persistence/Akka.Persistence.PostgreSql/PostgreSqlInitializer.cs new file mode 100644 index 00000000000..6fd32f5a428 --- /dev/null +++ b/src/contrib/persistence/Akka.Persistence.PostgreSql/PostgreSqlInitializer.cs @@ -0,0 +1,99 @@ +using System; +using System.Data.SqlClient; +using Npgsql; + +namespace Akka.Persistence.PostgreSql +{ + internal static class PostgreSqlInitializer + { + private const string SqlJournalFormat = @" + DO + $do$ + BEGIN + IF NOT EXISTS (SELECT 1 FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = '{2}' AND TABLE_NAME = '{3}') THEN + CREATE TABLE {0}.{1} ( + persistence_id VARCHAR(200) NOT NULL, + sequence_nr BIGINT NOT NULL, + is_deleted BOOLEAN NOT NULL, + payload_type VARCHAR(500) NOT NULL, + payload BYTEA NOT NULL, + CONSTRAINT {3}_pk PRIMARY KEY (persistence_id, sequence_nr) + ); + CREATE INDEX {3}_sequence_nr_idx ON {0}.{1}(sequence_nr); + END IF; + END + $do$ + "; + + private const string SqlSnapshotStoreFormat = @" + DO + $do$ + BEGIN + IF NOT EXISTS (SELECT 1 FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = '{2}' AND TABLE_NAME = '{3}') THEN + CREATE TABLE {0}.{1} ( + persistence_id VARCHAR(200) NOT NULL, + sequence_nr BIGINT NOT NULL, + created_at TIMESTAMP WITH TIME ZONE NOT NULL, + created_at_ticks SMALLINT NOT NULL CHECK(created_at_ticks >= 0 AND created_at_ticks < 10), + snapshot_type VARCHAR(500) NOT NULL, + snapshot BYTEA NOT NULL, + CONSTRAINT {3}_pk PRIMARY KEY (persistence_id, sequence_nr) + ); + CREATE INDEX {3}_sequence_nr_idx ON {0}.{1}(sequence_nr); + CREATE INDEX {3}_created_at_idx ON {0}.{1}(created_at); + END IF; + END + $do$ + "; + + /// + /// Initializes a PostgreSQL journal-related tables according to 'schema-name', 'table-name' + /// and 'connection-string' values provided in 'akka.persistence.journal.postgresql' config. + /// + internal static void CreatePostgreSqlJournalTables(string connectionString, string schemaName, string tableName) + { + var sql = InitJournalSql(tableName, schemaName); + ExecuteSql(connectionString, sql); + } + + /// + /// Initializes a PostgreSQL snapshot store related tables according to 'schema-name', 'table-name' + /// and 'connection-string' values provided in 'akka.persistence.snapshot-store.postgresql' config. + /// + internal static void CreatePostgreSqlSnapshotStoreTables(string connectionString, string schemaName, string tableName) + { + var sql = InitSnapshotStoreSql(tableName, schemaName); + ExecuteSql(connectionString, sql); + } + + private static string InitJournalSql(string tableName, string schemaName = null) + { + if (string.IsNullOrEmpty(tableName)) throw new ArgumentNullException("tableName", "Akka.Persistence.PostgreSql journal table name is required"); + schemaName = schemaName ?? "public"; + + var cb = new NpgsqlCommandBuilder(); + return string.Format(SqlJournalFormat, cb.QuoteIdentifier(schemaName), cb.QuoteIdentifier(tableName), cb.UnquoteIdentifier(schemaName), cb.UnquoteIdentifier(tableName)); + } + + private static string InitSnapshotStoreSql(string tableName, string schemaName = null) + { + if (string.IsNullOrEmpty(tableName)) throw new ArgumentNullException("tableName", "Akka.Persistence.PostgreSql snapshot store table name is required"); + schemaName = schemaName ?? "public"; + + var cb = new NpgsqlCommandBuilder(); + return string.Format(SqlSnapshotStoreFormat, cb.QuoteIdentifier(schemaName), cb.QuoteIdentifier(tableName), cb.UnquoteIdentifier(schemaName), cb.UnquoteIdentifier(tableName)); + } + + private static void ExecuteSql(string connectionString, string sql) + { + using (var conn = new NpgsqlConnection(connectionString)) + using (var command = conn.CreateCommand()) + { + conn.Open(); + + command.CommandText = sql; + command.ExecuteNonQuery(); + } + } + } +} \ No newline at end of file diff --git a/src/contrib/persistence/Akka.Persistence.PostgreSql/Properties/AssemblyInfo.cs b/src/contrib/persistence/Akka.Persistence.PostgreSql/Properties/AssemblyInfo.cs new file mode 100644 index 00000000000..72164213d82 --- /dev/null +++ b/src/contrib/persistence/Akka.Persistence.PostgreSql/Properties/AssemblyInfo.cs @@ -0,0 +1,36 @@ +using System.Reflection; +using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; + +// General Information about an assembly is controlled through the following +// set of attributes. Change these attribute values to modify the information +// associated with an assembly. +[assembly: AssemblyTitle("Akka.Persistence.PostgreSql")] +[assembly: AssemblyDescription("")] +[assembly: AssemblyConfiguration("")] +[assembly: AssemblyCompany("")] +[assembly: AssemblyProduct("Akka.Persistence.PostgreSql")] +[assembly: AssemblyCopyright("Copyright © 2015")] +[assembly: AssemblyTrademark("")] +[assembly: AssemblyCulture("")] + +// Setting ComVisible to false makes the types in this assembly not visible +// to COM components. If you need to access a type in this assembly from +// COM, set the ComVisible attribute to true on that type. +[assembly: ComVisible(false)] + +// The following GUID is for the ID of the typelib if this project is exposed to COM +[assembly: Guid("3b21dbd6-ebb9-44cb-8dee-edbfb5bf0a00")] + +// Version information for an assembly consists of the following four values: +// +// Major Version +// Minor Version +// Build Number +// Revision +// +// You can specify all the values or you can default the Build and Revision Numbers +// by using the '*' as shown below: +// [assembly: AssemblyVersion("1.0.*")] +[assembly: AssemblyVersion("1.0.0.0")] +[assembly: AssemblyFileVersion("1.0.0.0")] diff --git a/src/contrib/persistence/Akka.Persistence.PostgreSql/README.md b/src/contrib/persistence/Akka.Persistence.PostgreSql/README.md new file mode 100644 index 00000000000..1cd6b2af191 --- /dev/null +++ b/src/contrib/persistence/Akka.Persistence.PostgreSql/README.md @@ -0,0 +1,88 @@ +## Akka.Persistence.PostgreSql + +Akka Persistence journal and snapshot store backed by PostgreSql database. + +**WARNING: Akka.Persistence.PostgreSql plugin is still in beta and it's mechanics described below may be still subject to change**. + +### Setup + +To activate the journal plugin, add the following lines to actor system configuration file: + +``` +akka.persistence.journal.plugin = "akka.persistence.journal.postgresql" +akka.persistence.journal.postgresql.connection-string = "" +``` + +Similar configuration may be used to setup a PostgreSql snapshot store: + +``` +akka.persistence.snasphot-store.plugin = "akka.persistence.snasphot-store.postgresql" +akka.persistence.snasphot-store.postgresql.connection-string = "" +``` + +Remember that connection string must be provided separately to Journal and Snapshot Store. To finish setup simply initialize plugin using: `PostgreSqlPersistence.Init(actorSystem);` + +### Configuration + +Both journal and snapshot store share the same configuration keys (however they resides in separate scopes, so they are definied distinctly for either journal or snapshot store): + +- `class` (string with fully qualified type name) - determines class to be used as a persistent journal. Default: *Akka.Persistence.PostgreSql.Journal.PostgreSqlJournal, Akka.Persistence.PostgreSql* (for journal) and *Akka.Persistence.PostgreSql.Snapshot.PostgreSqlSnapshotStore, Akka.Persistence.PostgreSql* (for snapshot store). +- `plugin-dispatcher` (string with configuration path) - describes a message dispatcher for persistent journal. Default: *akka.actor.default-dispatcher* +- `connection-string` - connection string used to access PostgreSql database. Default: *none*. +- `connection-timeout` - timespan determining default connection timeouts on database-related operations. Default: *30s* +- `schema-name` - name of the database schema, where journal or snapshot store tables should be placed. Default: *public* +- `table-name` - name of the table used by either journal or snapshot store. Default: *event_journal* (for journal) or *snapshot_store* (for snapshot store) +- `auto-initialize` - flag determining if journal or snapshot store related tables should by automatically created when they have not been found in connected database. Default: *false* + +### Custom SQL data queries + +PostgreSql persistence plugin defines a default table schema used for both journal and snapshot store. + +**EventJournal table**: + + +----------------+-------------+------------+---------------+---------+ + | persistence_id | sequence_nr | is_deleted | payload_type | payload | + +----------------+-------------+------------+---------------+---------+ + | varchar(200) | bigint | boolean | varchar(500) | bytea | + +----------------+-------------+------------+---------------+---------+ + +**SnapshotStore table**: + + +----------------+--------------+--------------------------+------------------+---------------+----------+ + | persistence_id | sequence_nr | created_at | created_at_ticks | snapshot_type | snapshot | + +----------------+--------------+--------------------------+------------------+--------------------------+ + | varchar(200) | bigint | timestamp with time zone | smallint | varchar(500) | bytea | + +----------------+--------------+--------------------------+------------------+--------------------------+ + +**created_at and created_at_ticks - The max precision of a PostgreSQL timestamp is 6. The max precision of a .Net DateTime object is 7. Because of this differences, the additional ticks are saved in a separate column and combined during deserialization. There is also a check constraint restricting created_at_ticks to the range [0,10) to ensure that there are no precision differences in the opposite direction.** + +Underneath Akka.Persistence.PostgreSql uses the Npgsql library to communicate with the database. You may choose not to use a dedicated built in ones, but to create your own being better fit for your use case. To do so, you have to create your own versions of `IJournalQueryBuilder` and `IJournalQueryMapper` (for custom journals) or `ISnapshotQueryBuilder` and `ISnapshotQueryMapper` (for custom snapshot store) and then attach inside journal, just like in the example below: + +```csharp +class MyCustomPostgreSqlJournal: Akka.Persistence.PostgreSql.Journal.PostgreSqlJournal +{ + public MyCustomPostgreSqlJournal() : base() + { + QueryBuilder = new MyCustomJournalQueryBuilder(); + QueryMapper = new MyCustomJournalQueryMapper(); + } +} +``` + +The final step is to setup your custom journal using akka config: + +``` +akka.persistence.journal.postgresql.class = "MyModule.MyCustomPostgreSqlJournal, MyModule" +``` + +### Tests + +The PostgreSql tests are packaged as a separate build task with a target of "RunPostgreSqlTests". + +In order to run the tests, you must do the following things: + +1. Download and install PostgreSql from: http://www.postgresql.org/download/ +2. Install PostgreSql with the default settings. The default connection string uses the following credentials: + 1. Username: postgres + 2. Password: postgres +3. A custom app.config file can be used and needs to be placed in the same folder as the dll \ No newline at end of file diff --git a/src/contrib/persistence/Akka.Persistence.PostgreSql/Snapshot/PostgreSqlSnapshotStore.cs b/src/contrib/persistence/Akka.Persistence.PostgreSql/Snapshot/PostgreSqlSnapshotStore.cs new file mode 100644 index 00000000000..acd38690a94 --- /dev/null +++ b/src/contrib/persistence/Akka.Persistence.PostgreSql/Snapshot/PostgreSqlSnapshotStore.cs @@ -0,0 +1,44 @@ +using System.Collections.Generic; +using System.Data.SqlClient; +using System.Threading; +using System.Threading.Tasks; +using Akka.Persistence.Snapshot; +using Npgsql; +using Akka.Persistence.Sql.Common.Snapshot; +using Akka.Persistence.Sql.Common; +using System; +using System.Data.Common; + +namespace Akka.Persistence.PostgreSql.Snapshot +{ + /// + /// Actor used for storing incoming snapshots into persistent snapshot store backed by PostgreSQL database. + /// + public class PostgreSqlSnapshotStore : DbSnapshotStore + { + private readonly PostgreSqlPersistenceExtension _extension; + private readonly PostgreSqlSnapshotStoreSettings _settings; + + public PostgreSqlSnapshotStore() + { + _extension = PostgreSqlPersistence.Instance.Apply(Context.System); + + _settings = _extension.SnapshotStoreSettings; + QueryBuilder = new PostgreSqlSnapshotQueryBuilder(_settings.SchemaName, _settings.TableName); + QueryMapper = new PostgreSqlSnapshotQueryMapper(Context.System.Serialization); + } + + protected override SnapshotStoreSettings Settings + { + get + { + return _settings; + } + } + + protected override DbConnection CreateDbConnection() + { + return new NpgsqlConnection(Settings.ConnectionString); + } + } +} \ No newline at end of file diff --git a/src/contrib/persistence/Akka.Persistence.PostgreSql/Snapshot/QueryBuilder.cs b/src/contrib/persistence/Akka.Persistence.PostgreSql/Snapshot/QueryBuilder.cs new file mode 100644 index 00000000000..6bcb126f00b --- /dev/null +++ b/src/contrib/persistence/Akka.Persistence.PostgreSql/Snapshot/QueryBuilder.cs @@ -0,0 +1,166 @@ +using System; +using System.Data; +using System.Data.SqlClient; +using System.Text; +using Npgsql; +using NpgsqlTypes; +using Akka.Persistence.Sql.Common.Snapshot; +using System.Data.Common; + +namespace Akka.Persistence.PostgreSql.Snapshot +{ + internal class PostgreSqlSnapshotQueryBuilder : ISnapshotQueryBuilder + { + private readonly string _deleteSql; + private readonly string _insertSql; + private readonly string _selectSql; + + public PostgreSqlSnapshotQueryBuilder(string schemaName, string tableName) + { + _deleteSql = @"DELETE FROM {0}.{1} WHERE persistence_id = :persistence_id ".QuoteSchemaAndTable(schemaName, tableName); + _insertSql = @"INSERT INTO {0}.{1} (persistence_id, sequence_nr, created_at, created_at_ticks, snapshot_type, snapshot) VALUES (:persistence_id, :sequence_nr, :created_at, :created_at_ticks, :snapshot_type, :snapshot)".QuoteSchemaAndTable(schemaName, tableName); + _selectSql = @"SELECT persistence_id, sequence_nr, created_at, created_at_ticks, snapshot_type, snapshot FROM {0}.{1} WHERE persistence_id = :persistence_id".QuoteSchemaAndTable(schemaName, tableName); + } + + public DbCommand DeleteOne(string persistenceId, long sequenceNr, DateTime timestamp) + { + var sqlCommand = new NpgsqlCommand(); + sqlCommand.Parameters.Add(new NpgsqlParameter(":persistence_id", NpgsqlDbType.Varchar, persistenceId.Length) + { + Value = persistenceId + }); + var sb = new StringBuilder(_deleteSql); + + if (sequenceNr < long.MaxValue && sequenceNr > 0) + { + sb.Append(@"AND sequence_nr = :sequence_nr "); + sqlCommand.Parameters.Add(new NpgsqlParameter(":sequence_nr", NpgsqlDbType.Bigint) {Value = sequenceNr}); + } + + if (timestamp > DateTime.MinValue && timestamp < DateTime.MaxValue) + { + sb.Append(@"AND created_at = :created_at AND created_at_ticks = :created_at_ticks"); + sqlCommand.Parameters.Add(new NpgsqlParameter(":created_at", NpgsqlDbType.Timestamp) + { + Value = GetMaxPrecisionTicks(timestamp) + }); + sqlCommand.Parameters.Add(new NpgsqlParameter(":created_at_ticks", NpgsqlDbType.Smallint) + { + Value = GetExtraTicks(timestamp) + }); + } + + sqlCommand.CommandText = sb.ToString(); + + return sqlCommand; + } + + public DbCommand DeleteMany(string persistenceId, long maxSequenceNr, DateTime maxTimestamp) + { + var sqlCommand = new NpgsqlCommand(); + sqlCommand.Parameters.Add(new NpgsqlParameter(":persistence_id", NpgsqlDbType.Varchar, persistenceId.Length) + { + Value = persistenceId + }); + var sb = new StringBuilder(_deleteSql); + + if (maxSequenceNr < long.MaxValue && maxSequenceNr > 0) + { + sb.Append(@" AND sequence_nr <= :sequence_nr "); + sqlCommand.Parameters.Add(new NpgsqlParameter(":sequence_nr", NpgsqlDbType.Bigint) + { + Value = maxSequenceNr + }); + } + + if (maxTimestamp > DateTime.MinValue && maxTimestamp < DateTime.MaxValue) + { + sb.Append( + @" AND (created_at < :created_at OR (created_at = :created_at AND created_at_ticks <= :created_at_ticks)) "); + sqlCommand.Parameters.Add(new NpgsqlParameter(":created_at", NpgsqlDbType.Timestamp) + { + Value = GetMaxPrecisionTicks(maxTimestamp) + }); + sqlCommand.Parameters.Add(new NpgsqlParameter(":created_at_ticks", NpgsqlDbType.Smallint) + { + Value = GetExtraTicks(maxTimestamp) + }); + } + + sqlCommand.CommandText = sb.ToString(); + + return sqlCommand; + } + + public DbCommand InsertSnapshot(SnapshotEntry entry) + { + var sqlCommand = new NpgsqlCommand(_insertSql) + { + Parameters = + { + new NpgsqlParameter(":persistence_id", NpgsqlDbType.Varchar, entry.PersistenceId.Length) { Value = entry.PersistenceId }, + new NpgsqlParameter(":sequence_nr", NpgsqlDbType.Bigint) { Value = entry.SequenceNr }, + new NpgsqlParameter(":created_at", NpgsqlDbType.Timestamp) { Value = GetMaxPrecisionTicks(entry.Timestamp) }, + new NpgsqlParameter(":created_at_ticks", NpgsqlDbType.Smallint) { Value = GetExtraTicks(entry.Timestamp) }, + new NpgsqlParameter(":snapshot_type", NpgsqlDbType.Varchar, entry.SnapshotType.Length) { Value = entry.SnapshotType }, + new NpgsqlParameter(":snapshot", NpgsqlDbType.Bytea, entry.Snapshot.Length) { Value = entry.Snapshot } + } + }; + + return sqlCommand; + } + + public DbCommand SelectSnapshot(string persistenceId, long maxSequenceNr, DateTime maxTimestamp) + { + var sqlCommand = new NpgsqlCommand(); + sqlCommand.Parameters.Add(new NpgsqlParameter(":persistence_id", NpgsqlDbType.Varchar, persistenceId.Length) + { + Value = persistenceId + }); + + var sb = new StringBuilder(_selectSql); + if (maxSequenceNr > 0 && maxSequenceNr < long.MaxValue) + { + sb.Append(" AND sequence_nr <= :sequence_nr "); + sqlCommand.Parameters.Add(new NpgsqlParameter(":sequence_nr", NpgsqlDbType.Bigint) + { + Value = maxSequenceNr + }); + } + + if (maxTimestamp > DateTime.MinValue && maxTimestamp < DateTime.MaxValue) + { + sb.Append( + @" AND (created_at < :created_at OR (created_at = :created_at AND created_at_ticks <= :created_at_ticks)) "); + sqlCommand.Parameters.Add(new NpgsqlParameter(":created_at", NpgsqlDbType.Timestamp) + { + Value = GetMaxPrecisionTicks(maxTimestamp) + }); + sqlCommand.Parameters.Add(new NpgsqlParameter(":created_at_ticks", NpgsqlDbType.Smallint) + { + Value = GetExtraTicks(maxTimestamp) + }); + } + + sb.Append(" ORDER BY sequence_nr DESC"); + sqlCommand.CommandText = sb.ToString(); + return sqlCommand; + } + + private static DateTime GetMaxPrecisionTicks(DateTime date) + { + var ticks = (date.Ticks / 10) * 10; + + ticks = date.Ticks - ticks; + + return date.AddTicks(-1 * ticks); + } + + private static short GetExtraTicks(DateTime date) + { + var ticks = date.Ticks; + + return (short)(ticks % 10); + } + } +} \ No newline at end of file diff --git a/src/contrib/persistence/Akka.Persistence.PostgreSql/Snapshot/QueryMapper.cs b/src/contrib/persistence/Akka.Persistence.PostgreSql/Snapshot/QueryMapper.cs new file mode 100644 index 00000000000..e50c8fdc264 --- /dev/null +++ b/src/contrib/persistence/Akka.Persistence.PostgreSql/Snapshot/QueryMapper.cs @@ -0,0 +1,44 @@ +using System; +using System.Data.Common; +using System.Data.SqlClient; +using Npgsql; +using Akka.Persistence.Sql.Common.Snapshot; + +namespace Akka.Persistence.PostgreSql.Snapshot +{ + internal class PostgreSqlSnapshotQueryMapper : ISnapshotQueryMapper + { + private readonly Akka.Serialization.Serialization _serialization; + + public PostgreSqlSnapshotQueryMapper(Akka.Serialization.Serialization serialization) + { + _serialization = serialization; + } + + public SelectedSnapshot Map(DbDataReader reader) + { + var persistenceId = reader.GetString(0); + var sequenceNr = reader.GetInt64(1); + + var timestamp = reader.GetDateTime(2); + var timestampTicks = reader.GetInt16(3); + timestamp = timestamp.AddTicks(timestampTicks); + + var metadata = new SnapshotMetadata(persistenceId, sequenceNr, timestamp); + var snapshot = GetSnapshot(reader); + + return new SelectedSnapshot(metadata, snapshot); + } + + private object GetSnapshot(DbDataReader reader) + { + var type = Type.GetType(reader.GetString(4), true); + var serializer = _serialization.FindSerializerForType(type); + var binary = (byte[])reader[5]; + + var obj = serializer.FromBinary(binary, type); + + return obj; + } + } +} \ No newline at end of file diff --git a/src/contrib/persistence/Akka.Persistence.PostgreSql/packages.config b/src/contrib/persistence/Akka.Persistence.PostgreSql/packages.config new file mode 100644 index 00000000000..9c8e7e8768b --- /dev/null +++ b/src/contrib/persistence/Akka.Persistence.PostgreSql/packages.config @@ -0,0 +1,4 @@ + + + + \ No newline at end of file diff --git a/src/contrib/persistence/Akka.Persistence.PostgreSql/postgresql.conf b/src/contrib/persistence/Akka.Persistence.PostgreSql/postgresql.conf new file mode 100644 index 00000000000..aba8293065f --- /dev/null +++ b/src/contrib/persistence/Akka.Persistence.PostgreSql/postgresql.conf @@ -0,0 +1,54 @@ +akka.persistence{ + + journal { + postgresql { + + # qualified type name of the PostgreSql persistence journal actor + class = "Akka.Persistence.PostgreSql.Journal.PostgreSqlJournal, Akka.Persistence.PostgreSql" + + # dispatcher used to drive journal actor + plugin-dispatcher = "akka.actor.default-dispatcher" + + # connection string used for database access + connection-string = "" + + # default SQL commands timeout + connection-timeout = 30s + + # PostgreSql schema name to table corresponding with persistent journal + schema-name = public + + # PostgreSql table corresponding with persistent journal + table-name = event_journal + + # should corresponding journal table be initialized automatically + auto-initialize = off + } + } + + snapshot-store { + postgresql { + + # qualified type name of the PostgreSql persistence journal actor + class = "Akka.Persistence.PostgreSql.Snapshot.PostgreSqlSnapshotStore, Akka.Persistence.PostgreSql" + + # dispatcher used to drive journal actor + plugin-dispatcher = ""akka.actor.default-dispatcher"" + + # connection string used for database access + connection-string = "" + + # default SQL commands timeout + connection-timeout = 30s + + # PostgreSql schema name to table corresponding with persistent journal + schema-name = public + + # PostgreSql table corresponding with persistent journal + table-name = snapshot_store + + # should corresponding journal table be initialized automatically + auto-initialize = off + } + } +} \ No newline at end of file