Skip to content

Commit

Permalink
Merge branch 'dev'
Browse files Browse the repository at this point in the history
  • Loading branch information
Arkatufus committed Mar 24, 2022
2 parents 3bb2ed4 + 5fc8bc5 commit 9fc3ca0
Show file tree
Hide file tree
Showing 14 changed files with 356 additions and 24 deletions.
8 changes: 6 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ akka.persistence.journal.redis {
### Configuration
- `configuration-string` - connection string, as described here: https://stackexchange.github.io/StackExchange.Redis/Configuration#basic-configuration-strings
- `key-prefix` - Redis journals key prefixes. Leave it for default or change it to customized value. WARNING: don't change this value after you've started persisting data in production.
- `database` - Set the Redis default database to use. If you added `defaultDatabase` to the `connection-strings`, you have to set `database` to the value of `defaultDatabase`.
- `use-database-number-from-connection-string` - determines redis database precedence when a user adds defaultDatabase to the connection-strings. For Redis Cluster, the `defaultDatabase` is 0! See below:

NOTE: Redis Standalone supports deploying multiple instances, but Redis cluster does not. The default database with Redis Cluster is always 0 - If you are deploying Redis Cluster, you don't need to add the `defaultDatabase` to the `connection-string`'! [cluster-spec](https://redis.io/topics/cluster-spec#implemented-subset)

## Snapshot Store
To activate the snapshot plugin, add the following line to your HOCON config:
Expand All @@ -57,7 +61,7 @@ This will run the snapshot-store with its default settings. The default settings
```
akka.persistence.snapshot-store.redis {
# qualified type name of the Redis persistence journal actor
class = "Akka.Persistence.Redis.Journal.RedisJournal, Akka.Persistence.Redis"
class = "Akka.Persistence.Redis.Snapshot.RedisSnapshotStore, Akka.Persistence.Redis"
# connection string, as described here: https://stackexchange.github.io/StackExchange.Redis/Configuration#basic-configuration-strings
configuration-string = ""
Expand Down Expand Up @@ -170,4 +174,4 @@ akka.actor {
"Akka.Persistence.Redis.Snapshot.SnapshotEntry, Akka.Persistence.Redis" = redis
}
}
```
```
4 changes: 4 additions & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
#### 1.4.35 March 23 2022 ####
* Fix default database 0 overriding StackExchange.Redis' defaultDatabase in configuration-strings [#194](https://github.com/akkadotnet/Akka.Persistence.Redis/pull/194)
* Upgraded to [Akka.NET 1.4.35](https://github.com/akkadotnet/akka.net/releases/tag/1.4.35)

#### 1.4.31 December 21 2021 ####
* Upgraded to [Akka.NET 1.4.31](https://github.com/akkadotnet/akka.net/releases/tag/1.4.31)
* [Upgraded StackExchange.Redis to 2.2.88](https://github.com/akkadotnet/Akka.Persistence.Redis/pull/179)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// -----------------------------------------------------------------------
// <copyright file="RedisJournalSpec.cs" company="Akka.NET Project">
// Copyright (C) 2013-2021 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
// -----------------------------------------------------------------------

using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using Akka.Configuration;
using Akka.Persistence.TCK.Journal;
using FluentAssertions;
using StackExchange.Redis;
using Xunit;
using Xunit.Abstractions;

namespace Akka.Persistence.Redis.Cluster.Tests
{
[Collection("RedisClusterSpec")]
public class RedisJournalDefaultDatabaseSpec : JournalSpec
{
private readonly RedisClusterFixture _fixture;

public static Config Config(RedisClusterFixture fixture)
{
DbUtils.Initialize(fixture);

return ConfigurationFactory.ParseString($@"
akka.loglevel = INFO
akka.persistence.journal.plugin = ""akka.persistence.journal.redis""
akka.persistence.journal.redis {{
class = ""Akka.Persistence.Redis.Journal.RedisJournal, Akka.Persistence.Redis""
plugin-dispatcher = ""akka.actor.default-dispatcher""
configuration-string = ""{DbUtils.ConnectionString},defaultDatabase=0""
use-database-number-from-connection-string = true
}}
akka.test.single-expect-default = 3s")
.WithFallback(RedisPersistence.DefaultConfig());
}

public RedisJournalDefaultDatabaseSpec(ITestOutputHelper output, RedisClusterFixture fixture)
: base(Config(fixture), nameof(RedisJournalSpec), output)
{
_fixture = fixture;

RedisPersistence.Get(Sys);
Initialize();
}

protected override bool SupportsRejectingNonSerializableObjects { get; } = false;

protected override void Dispose(bool disposing)
{
base.Dispose(disposing);
DbUtils.Clean();
}

[Fact]
public void Randomly_distributed_RedisKey_with_HashTag_should_be_distributed_relatively_equally_between_cluster_master()
{
var totalEntries = 10000;

var redis = ConnectionMultiplexer.Connect(_fixture.ConnectionString);
var db = redis.GetDatabase();
var journalHelper = new JournalHelper(Sys, "foo");
var dict = new Dictionary<EndPoint, int>();

for (var i = 0; i < totalEntries; ++i)
{
var id = $"{Guid.NewGuid():N}-{i}";
var ep = db.IdentifyEndpoint(journalHelper.GetJournalKey(id, true));
if (!dict.TryGetValue(ep, out _))
{
dict[ep] = 1;
}
else
{
dict[ep]++;
}
}

var values = dict.Values.AsEnumerable().ToArray();

// Evaluate standard deviation
var standardDeviation = StandardDeviation(values);
Output.WriteLine($"Server assignment distribution: [{string.Join(",", values)}]. Standard deviation: [{standardDeviation}]");

// Should be less than 1 percent of total keys
StandardDeviation(values).Should().BeLessThan(totalEntries * 0.01);
}

private double StandardDeviation(int[] values)
{
var mean = values.Average();
var sum = values.Sum(d => Math.Pow(d - mean, 2));
return Math.Sqrt((sum) / (values.Count() - 1));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// -----------------------------------------------------------------------
// <copyright file="RedisSnapshotStoreSpec.cs" company="Akka.NET Project">
// Copyright (C) 2013-2021 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
// -----------------------------------------------------------------------

using Akka.Configuration;
using Akka.Persistence.TCK.Snapshot;
using Xunit;
using Xunit.Abstractions;

namespace Akka.Persistence.Redis.Cluster.Tests
{
[Collection("RedisClusterSpec")]
public class RedisSnapshotStoreDefaultDatabaseSpec : SnapshotStoreSpec
{
public static Config Config(RedisClusterFixture fixture)
{
DbUtils.Initialize(fixture);

return ConfigurationFactory.ParseString($@"
akka.test.single-expect-default = 3s
akka.persistence {{
publish-plugin-commands = on
snapshot-store {{
plugin = ""akka.persistence.snapshot-store.redis""
redis {{
class = ""Akka.Persistence.Redis.Snapshot.RedisSnapshotStore, Akka.Persistence.Redis""
configuration-string = ""{fixture.ConnectionString},defaultDatabase=0""
use-database-number-from-connection-string = true
plugin-dispatcher = ""akka.actor.default-dispatcher""
}}
}}
}}
akka.actor {{
serializers {{
persistence-snapshot = ""Akka.Persistence.Redis.Serialization.PersistentSnapshotSerializer, Akka.Persistence.Redis""
}}
serialization-bindings {{
""Akka.Persistence.SelectedSnapshot, Akka.Persistence"" = persistence-snapshot
}}
serialization-identifiers {{
""Akka.Persistence.Redis.Serialization.PersistentSnapshotSerializer, Akka.Persistence.Redis"" = 48
}}
}}").WithFallback(RedisPersistence.DefaultConfig());
}

public RedisSnapshotStoreDefaultDatabaseSpec(ITestOutputHelper output, RedisClusterFixture fixture)
: base(Config(fixture), nameof(RedisSnapshotStoreSpec), output)
{
RedisPersistence.Get(Sys);
Initialize();
}

protected override void Dispose(bool disposing)
{
base.Dispose(disposing);
DbUtils.Clean();
}
}
}
39 changes: 25 additions & 14 deletions src/Akka.Persistence.Redis.Tests/RedisFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ public sealed class RedisSpecsFixture : ICollectionFixture<RedisFixture>
public class RedisFixture : IAsyncLifetime
{
protected readonly string RedisContainerName = $"redis-{Guid.NewGuid():N}";
protected readonly string RedisContainerName2 = $"redis-{Guid.NewGuid():N}";
protected DockerClient Client;

public RedisFixture()
Expand Down Expand Up @@ -64,15 +65,23 @@ await Client.Images.CreateImageAsync(
: $"{message.ID} {message.Status} {message.ProgressMessage}");
}));

var redisHostPort = ThreadLocalRandom.Current.Next(9000, 10000);
var redisHostPort1 = ThreadLocalRandom.Current.Next(9000, 10000);
var redisHostPort2 = ThreadLocalRandom.Current.Next(9000, 10000);

await CreateContainer(redisHostPort1, RedisContainerName);
await CreateContainer(redisHostPort2, RedisContainerName2);

ConnectionString = $"localhost:{redisHostPort1},localhost:{redisHostPort2}";
}
private async ValueTask CreateContainer(int redisHostPort, string redisContainerName)
{
// create the container
await Client.Containers.CreateContainerAsync(new CreateContainerParameters
{
Image = RedisImageName,
Name = RedisContainerName,
Name = redisContainerName,
Tty = true,
ExposedPorts = new Dictionary<string, EmptyStruct> {{"6379/tcp", new EmptyStruct()}},
ExposedPorts = new Dictionary<string, EmptyStruct> { { "6379/tcp", new EmptyStruct() } },
HostConfig = new HostConfig
{
PortBindings = new Dictionary<string, IList<PortBinding>>
Expand All @@ -86,29 +95,31 @@ await Client.Containers.CreateContainerAsync(new CreateContainerParameters


// start the container
await Client.Containers.StartContainerAsync(RedisContainerName, new ContainerStartParameters());
await Client.Containers.StartContainerAsync(redisContainerName, new ContainerStartParameters());

// Provide a 30 second startup delay
await Task.Delay(TimeSpan.FromSeconds(10));

ConnectionString = $"localhost:{redisHostPort}";
}

public async Task DisposeAsync()
{
if (Client != null)
{
// Delay to make sure that all tests has completed cleanup.
await Task.Delay(TimeSpan.FromSeconds(5));

// Kill the container, we can't simply stop the container because Redis can hung indefinetly
// if we simply stop the container.
await Client.Containers.KillContainerAsync(RedisContainerName, new ContainerKillParameters());

await Client.Containers.RemoveContainerAsync(RedisContainerName,
new ContainerRemoveParameters {Force = true});
await KillContainer(RedisContainerName);
await KillContainer(RedisContainerName2);

Client.Dispose();
}
}
private async ValueTask KillContainer(string container)
{
// Kill the container, we can't simply stop the container because Redis can hung indefinetly
// if we simply stop the container.
await Client.Containers.KillContainerAsync(container, new ContainerKillParameters());

await Client.Containers.RemoveContainerAsync(container,
new ContainerRemoveParameters { Force = true });
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// -----------------------------------------------------------------------
// <copyright file="RedisJournalSpec.cs" company="Akka.NET Project">
// Copyright (C) 2013-2021 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
// -----------------------------------------------------------------------

using Akka.Configuration;
using Akka.Persistence.TCK.Journal;
using Xunit;
using Xunit.Abstractions;

namespace Akka.Persistence.Redis.Tests
{
[Collection("RedisSpec")]
public class RedisJournalDefaultDatabaseSpec : JournalSpec
{
public const int Database = 1;

public static Config Config(RedisFixture fixture, int id)
{
DbUtils.Initialize(fixture);

return ConfigurationFactory.ParseString($@"
akka.loglevel = INFO
akka.persistence.journal.plugin = ""akka.persistence.journal.redis""
akka.persistence.journal.redis {{
class = ""Akka.Persistence.Redis.Journal.RedisJournal, Akka.Persistence.Redis""
plugin-dispatcher = ""akka.actor.default-dispatcher""
configuration-string = ""{fixture.ConnectionString},defaultDatabase=1""
use-database-number-from-connection-string = true
database = {id}
}}
akka.test.single-expect-default = 3s")
.WithFallback(RedisPersistence.DefaultConfig());
}

public RedisJournalDefaultDatabaseSpec(ITestOutputHelper output, RedisFixture fixture) : base(Config(fixture, Database),
nameof(RedisJournalSpec), output)
{
RedisPersistence.Get(Sys);
Initialize();
}

protected override bool SupportsRejectingNonSerializableObjects { get; } = false;

protected override void Dispose(bool disposing)
{
base.Dispose(disposing);
DbUtils.Clean(Database);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// -----------------------------------------------------------------------
// <copyright file="RedisSnapshotStoreSpec.cs" company="Akka.NET Project">
// Copyright (C) 2013-2021 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
// -----------------------------------------------------------------------

using Akka.Configuration;
using Akka.Persistence.TCK.Snapshot;
using Xunit;
using Xunit.Abstractions;

namespace Akka.Persistence.Redis.Tests
{
[Collection("RedisSpec")]
public class RedisSnapshotStoreDefaultDatabaseSpec : SnapshotStoreSpec
{
public const int Database = 1;

public static Config Config(RedisFixture fixture, int id)
{
DbUtils.Initialize(fixture);

return ConfigurationFactory.ParseString($@"
akka.test.single-expect-default = 3s
akka.persistence {{
publish-plugin-commands = on
snapshot-store {{
plugin = ""akka.persistence.snapshot-store.redis""
redis {{
class = ""Akka.Persistence.Redis.Snapshot.RedisSnapshotStore, Akka.Persistence.Redis""
configuration-string = ""{fixture.ConnectionString},defaultDatabase=1""
plugin-dispatcher = ""akka.actor.default-dispatcher""
use-database-number-from-connection-string = true
database = ""{id}""
}}
}}
}}
akka.actor {{
serializers {{
persistence-snapshot = ""Akka.Persistence.Redis.Serialization.PersistentSnapshotSerializer, Akka.Persistence.Redis""
}}
serialization-bindings {{
""Akka.Persistence.SelectedSnapshot, Akka.Persistence"" = persistence-snapshot
}}
serialization-identifiers {{
""Akka.Persistence.Redis.Serialization.PersistentSnapshotSerializer, Akka.Persistence.Redis"" = 48
}}
}}").WithFallback(RedisPersistence.DefaultConfig());
}

public RedisSnapshotStoreDefaultDatabaseSpec(ITestOutputHelper output, RedisFixture fixture)
: base(Config(fixture, Database), typeof(RedisSnapshotStoreSpec).Name, output)
{
RedisPersistence.Get(Sys);
Initialize();
}

protected override void Dispose(bool disposing)
{
base.Dispose(disposing);
DbUtils.Clean(Database);
}
}
}
Loading

0 comments on commit 9fc3ca0

Please sign in to comment.