Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a HOCON setting to control DData replicator restart on failure #5145

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ public ReplicatorResiliencySpec(ITestOutputHelper helper) : base(SpecConfig, hel

var settings = ReplicatorSettings.Create(Sys)
.WithGossipInterval(TimeSpan.FromSeconds(1.0))
.WithMaxDeltaElements(10);
.WithMaxDeltaElements(10)
.WithRestartReplicatorOnFailure(true);

var props = BackoffSupervisor.Props(
Backoff.OnStop(
Expand Down Expand Up @@ -116,7 +117,6 @@ public async Task Handle_Durable_Store_Exception()

public async Task DurableStoreActorCrash()
{

const string replicatorActorPath = "/user/replicatorSuper/replicator";
const string durableStoreActorPath = "/user/replicatorSuper/replicator/durableStore";

Expand All @@ -129,9 +129,20 @@ public async Task DurableStoreActorCrash()
Watch(replicator);
Watch(durableStore);
durableStore.Tell(new InitFail());

var terminated = ExpectMsg<Terminated>(TimeSpan.FromSeconds(10));
if (!terminated.ActorRef.Path.Equals(durableStore.Path) && !terminated.ActorRef.Path.Equals(replicator.Path))
{
throw new Exception(
$"Expecting termination of either durable storage or replicator, found {terminated.ActorRef.Path} instead.");
}

ExpectTerminated(durableStore,TimeSpan.FromSeconds(10));
ExpectTerminated(replicator,TimeSpan.FromSeconds(10));
terminated = ExpectMsg<Terminated>(TimeSpan.FromSeconds(10));
if (!terminated.ActorRef.Path.Equals(durableStore.Path) && !terminated.ActorRef.Path.Equals(replicator.Path))
{
throw new Exception(
$"Expecting termination of either durable storage or replicator, found {terminated.ActorRef.Path} instead.");
}

//The supervisor should have restarted the replicator actor by now
await AwaitAssertAsync(async () =>
Expand All @@ -146,12 +157,38 @@ await newReplicator.Ask<ActorIdentity>(new Identify(Guid.NewGuid().ToString())).
Assert.Equal(replicatorActorPath,r.Result.Subject.Path.ToStringWithoutAddress());
});
},TimeSpan.FromSeconds(10));

}
}

[Fact]
public async Task DistributedData_Replicator_Defaults_to_NoSupervisor()
{
const string replicatorActorPath = "/user/ddataReplicator";
const string durableStoreActorPath = "/user/ddataReplicator/durableStore";

await InitCluster();
var replicator = DistributedData.Get(_sys1).Replicator;

IActorRef durableStore = null;
await AwaitAssertAsync(() =>
{
durableStore = _sys1.ActorSelection(durableStoreActorPath).ResolveOne(TimeSpan.FromSeconds(3))
.ContinueWith(
m => m.Result).Result;
}, TimeSpan.FromSeconds(10), TimeSpan.FromMilliseconds(100));

Watch(replicator);
Watch(durableStore);
durableStore.Tell(new InitFail());

ExpectTerminated(durableStore, TimeSpan.FromSeconds(10));
ExpectTerminated(replicator, TimeSpan.FromSeconds(10));

// The replicator should not have been recreated, so expect ActorNotFound
await Assert.ThrowsAsync<ActorNotFoundException>(() =>
_sys1.ActorSelection(replicatorActorPath).ResolveOne(TimeSpan.FromSeconds(5)));
}
}

public class FakeDurableStore : ReceiveActor
{
public FakeDurableStore(Config config)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// //-----------------------------------------------------------------------
// // <copyright file="DDataSettingsSpec.cs" company="Akka.NET Project">
// // Copyright (C) 2009-2021 Lightbend Inc. <http://www.lightbend.com>
// // Copyright (C) 2013-2021 .NET Foundation <https://github.com/akkadotnet/akka.net>
// // </copyright>
// //-----------------------------------------------------------------------

using System;
using Akka.Actor;
using Akka.Configuration;
using Akka.Dispatch;
using Akka.TestKit;
using Xunit;

namespace Akka.DistributedData.Tests
{
public class ReplicatorSettingsSpec : AkkaSpec
{
private static readonly Config Config = ConfigurationFactory.Empty.WithFallback(DistributedData.DefaultConfig());

public ReplicatorSettingsSpec() : base(Config)
{
}

[Fact]
public void SettingsShouldContainProperDefaultValues()
{
var settings = ReplicatorSettings.Create(Sys);
settings.Role.ShouldBe(String.Empty);
settings.GossipInterval.ShouldBe(TimeSpan.FromSeconds(2));
settings.NotifySubscribersInterval.ShouldBe(TimeSpan.FromMilliseconds(500));
settings.MaxDeltaElements.ShouldBe(500);
settings.Dispatcher.ShouldBe("akka.actor.internal-dispatcher");
settings.PruningInterval.ShouldBe(TimeSpan.FromSeconds(120));
settings.MaxPruningDissemination.ShouldBe(TimeSpan.FromSeconds(300));
settings.PruningMarkerTimeToLive.ShouldBe(TimeSpan.FromHours(6));
settings.RestartReplicatorOnFailure.ShouldBeFalse();
settings.MaxDeltaSize.ShouldBe(50);
settings.DurableKeys.Count.ShouldBe(0);
settings.DurableStoreProps.ShouldBe(Props.Empty);
settings.DurablePruningMarkerTimeToLive.ShouldBe(TimeSpan.FromDays(10));

Sys.Settings.Config.GetTimeSpan("akka.cluster.distributed-data.serializer-cache-time-to-live")
.ShouldBe(TimeSpan.FromSeconds(10));

Sys.Settings.Config.GetString("akka.cluster.distributed-data.durable.store-actor-class")
.ShouldBe("Akka.DistributedData.LightningDB.LmdbDurableStore, Akka.DistributedData.LightningDB");

Sys.Settings.Config.GetString("akka.cluster.distributed-data.durable.use-dispatcher")
.ShouldBe("akka.cluster.distributed-data.durable.pinned-store");

Sys.Settings.Config.GetString("akka.actor.serializers.akka-data-replication")
.ShouldBe("Akka.DistributedData.Serialization.ReplicatorMessageSerializer, Akka.DistributedData");

Sys.Settings.Config.GetString("akka.actor.serializers.akka-replicated-data")
.ShouldBe("Akka.DistributedData.Serialization.ReplicatedDataSerializer, Akka.DistributedData");

Sys.Settings.Config.GetString("akka.actor.serialization-bindings.\"Akka.DistributedData.IReplicatorMessage, Akka.DistributedData\"")
.ShouldBe("akka-data-replication");

Sys.Settings.Config.GetString("akka.actor.serialization-bindings.\"Akka.DistributedData.IReplicatedDataSerialization, Akka.DistributedData\"")
.ShouldBe("akka-replicated-data");


Sys.Settings.Config.GetInt("akka.actor.serialization-identifiers.\"Akka.DistributedData.Serialization.ReplicatedDataSerializer, Akka.DistributedData\"")
.ShouldBe(11);

Sys.Settings.Config.GetInt("akka.actor.serialization-identifiers.\"Akka.DistributedData.Serialization.ReplicatorMessageSerializer, Akka.DistributedData\"")
.ShouldBe(12);
}
}
}
4 changes: 3 additions & 1 deletion src/contrib/cluster/Akka.DistributedData/DistributedData.cs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,9 @@ public DistributedData(ExtendedActorSystem system)
else
{
var name = config.GetString("name", null);
Replicator = system.ActorOf(GetSupervisedReplicator(_settings, name), name+"Supervisor");
Replicator = _settings.RestartReplicatorOnFailure
? system.ActorOf(GetSupervisedReplicator(_settings, name), name+"Supervisor")
: system.ActorOf(Akka.DistributedData.Replicator.Props(_settings), name);
}
}

Expand Down
43 changes: 40 additions & 3 deletions src/contrib/cluster/Akka.DistributedData/ReplicatorSettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ public static ReplicatorSettings Create(Config config)
durableStoreProps: durableStoreProps,
pruningMarkerTimeToLive: config.GetTimeSpan("pruning-marker-time-to-live", TimeSpan.FromHours(6)),
durablePruningMarkerTimeToLive: durableConfig.GetTimeSpan("pruning-marker-time-to-live", TimeSpan.FromDays(10)),
maxDeltaSize: config.GetInt("delta-crdt.max-delta-size", 50));
maxDeltaSize: config.GetInt("delta-crdt.max-delta-size", 50),
restartReplicatorOnFailure: config.GetBoolean("recreate-on-failure", false));
}

/// <summary>
Expand Down Expand Up @@ -149,7 +150,10 @@ public static ReplicatorSettings Create(Config config)
public Props DurableStoreProps { get; }

public int MaxDeltaSize { get; }

public bool RestartReplicatorOnFailure { get; }

[Obsolete]
public ReplicatorSettings(string role,
TimeSpan gossipInterval,
TimeSpan notifySubscribersInterval,
Expand Down Expand Up @@ -177,6 +181,35 @@ public ReplicatorSettings(string role,
MaxDeltaSize = maxDeltaSize;
}

public ReplicatorSettings(string role,
TimeSpan gossipInterval,
TimeSpan notifySubscribersInterval,
int maxDeltaElements,
string dispatcher,
TimeSpan pruningInterval,
TimeSpan maxPruningDissemination,
IImmutableSet<string> durableKeys,
Props durableStoreProps,
TimeSpan pruningMarkerTimeToLive,
TimeSpan durablePruningMarkerTimeToLive,
int maxDeltaSize,
bool restartReplicatorOnFailure)
{
Role = role;
GossipInterval = gossipInterval;
NotifySubscribersInterval = notifySubscribersInterval;
MaxDeltaElements = maxDeltaElements;
Dispatcher = dispatcher;
PruningInterval = pruningInterval;
MaxPruningDissemination = maxPruningDissemination;
DurableKeys = durableKeys;
DurableStoreProps = durableStoreProps;
PruningMarkerTimeToLive = pruningMarkerTimeToLive;
DurablePruningMarkerTimeToLive = durablePruningMarkerTimeToLive;
MaxDeltaSize = maxDeltaSize;
RestartReplicatorOnFailure = restartReplicatorOnFailure;
}

private ReplicatorSettings Copy(string role = null,
TimeSpan? gossipInterval = null,
TimeSpan? notifySubscribersInterval = null,
Expand All @@ -188,7 +221,8 @@ private ReplicatorSettings Copy(string role = null,
Props durableStoreProps = null,
TimeSpan? pruningMarkerTimeToLive = null,
TimeSpan? durablePruningMarkerTimeToLive = null,
int? maxDeltaSize = null)
int? maxDeltaSize = null,
bool? restartReplicatorOnFailure = null)
{
return new ReplicatorSettings(
role: role ?? this.Role,
Expand All @@ -202,7 +236,8 @@ private ReplicatorSettings Copy(string role = null,
durableStoreProps: durableStoreProps ?? this.DurableStoreProps,
pruningMarkerTimeToLive: pruningMarkerTimeToLive ?? this.PruningMarkerTimeToLive,
durablePruningMarkerTimeToLive: durablePruningMarkerTimeToLive ?? this.DurablePruningMarkerTimeToLive,
maxDeltaSize: maxDeltaSize ?? this.MaxDeltaSize);
maxDeltaSize: maxDeltaSize ?? this.MaxDeltaSize,
restartReplicatorOnFailure: restartReplicatorOnFailure ?? this.RestartReplicatorOnFailure);
}

public ReplicatorSettings WithRole(string role) => Copy(role: role);
Expand All @@ -217,5 +252,7 @@ public ReplicatorSettings WithPruning(TimeSpan pruningInterval, TimeSpan maxPrun
public ReplicatorSettings WithPruningMarkerTimeToLive(TimeSpan pruningMarkerTtl, TimeSpan durablePruningMarkerTtl) =>
Copy(pruningMarkerTimeToLive: pruningMarkerTtl, durablePruningMarkerTimeToLive: durablePruningMarkerTtl);
public ReplicatorSettings WithMaxDeltaSize(int maxDeltaSize) => Copy(maxDeltaSize: maxDeltaSize);
public ReplicatorSettings WithRestartReplicatorOnFailure(bool restart) =>
Copy(restartReplicatorOnFailure: restart);
}
}
4 changes: 4 additions & 0 deletions src/contrib/cluster/Akka.DistributedData/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ akka.cluster.distributed-data {
# after this duration.
serializer-cache-time-to-live = 10s

# When set, this flag will attach a backoff supervisor to the replicator;
# any failing replicator to be restarted
recreate-on-failure = off

# Settings for delta-CRDT
delta-crdt {
# enable or disable delta-CRDT replication
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -762,7 +762,9 @@ namespace Akka.DistributedData
}
public sealed class ReplicatorSettings
{
[System.ObsoleteAttribute()]
public ReplicatorSettings(string role, System.TimeSpan gossipInterval, System.TimeSpan notifySubscribersInterval, int maxDeltaElements, string dispatcher, System.TimeSpan pruningInterval, System.TimeSpan maxPruningDissemination, System.Collections.Immutable.IImmutableSet<string> durableKeys, Akka.Actor.Props durableStoreProps, System.TimeSpan pruningMarkerTimeToLive, System.TimeSpan durablePruningMarkerTimeToLive, int maxDeltaSize) { }
public ReplicatorSettings(string role, System.TimeSpan gossipInterval, System.TimeSpan notifySubscribersInterval, int maxDeltaElements, string dispatcher, System.TimeSpan pruningInterval, System.TimeSpan maxPruningDissemination, System.Collections.Immutable.IImmutableSet<string> durableKeys, Akka.Actor.Props durableStoreProps, System.TimeSpan pruningMarkerTimeToLive, System.TimeSpan durablePruningMarkerTimeToLive, int maxDeltaSize, bool restartReplicatorOnFailure) { }
public string Dispatcher { get; }
public System.Collections.Immutable.IImmutableSet<string> DurableKeys { get; }
public System.TimeSpan DurablePruningMarkerTimeToLive { get; }
Expand All @@ -775,6 +777,7 @@ namespace Akka.DistributedData
public System.TimeSpan NotifySubscribersInterval { get; }
public System.TimeSpan PruningInterval { get; }
public System.TimeSpan PruningMarkerTimeToLive { get; }
public bool RestartReplicatorOnFailure { get; }
public string Role { get; }
public static Akka.DistributedData.ReplicatorSettings Create(Akka.Actor.ActorSystem system) { }
public static Akka.DistributedData.ReplicatorSettings Create(Akka.Configuration.Config config) { }
Expand All @@ -787,6 +790,7 @@ namespace Akka.DistributedData
public Akka.DistributedData.ReplicatorSettings WithNotifySubscribersInterval(System.TimeSpan notifySubscribersInterval) { }
public Akka.DistributedData.ReplicatorSettings WithPruning(System.TimeSpan pruningInterval, System.TimeSpan maxPruningDissemination) { }
public Akka.DistributedData.ReplicatorSettings WithPruningMarkerTimeToLive(System.TimeSpan pruningMarkerTtl, System.TimeSpan durablePruningMarkerTtl) { }
public Akka.DistributedData.ReplicatorSettings WithRestartReplicatorOnFailure(bool restart) { }
public Akka.DistributedData.ReplicatorSettings WithRole(string role) { }
}
[System.Diagnostics.DebuggerDisplayAttribute("VersionVector({Node}->{Version})")]
Expand Down