Skip to content

Commit

Permalink
Akka.Delivery: fix ProducerControllerImpl<T> state bug (#7034)
Browse files Browse the repository at this point in the history
* Troubleshoot ReliableDeliveryRandomSpecs

reproduce #7033

* fixed logging bug

* adding better logging around fuzz factor

* found the bug
  • Loading branch information
Aaronontheweb authored Jan 4, 2024
1 parent 70787ae commit 1c4f071
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 66 deletions.
67 changes: 35 additions & 32 deletions src/core/Akka.Tests/Delivery/ReliableDeliveryRandomSpecs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@ namespace Akka.Tests.Delivery;

public class ReliableDeliveryRandomSpecs : TestKit.Xunit2.TestKit
{
internal static readonly Config Config = @"akka.reliable-delivery.consumer-controller{
private static readonly Config Config = @"akka.reliable-delivery.consumer-controller{
flow-control-window = 20
resend-interval-min = 500ms
resend-interval-max = 2s
}";
}
akka.loglevel = DEBUG
";

public ReliableDeliveryRandomSpecs(ITestOutputHelper output) : this(output, Config)
{
Expand Down Expand Up @@ -57,16 +59,6 @@ private async Task Test(int numberOfMessages, double producerDropProbability, do
consumerDropProbability, producerDropProbability,
consumerDelay, producerDelay, durableFailProbability, durableDelay);

// RandomFlakyNetwork to simulate lost messages from producerController to consumerController
double ConsumerDrop(object msg)
{
return msg switch
{
ConsumerController.SequencedMessage<TestConsumer.Job> _ => consumerDropProbability,
_ => 0
};
}

var consumerEndProbe = CreateTestProbe();
var consumerController = Sys.ActorOf(ConsumerController.CreateWithFuzzing<TestConsumer.Job>(Sys, Option<IActorRef>.None, ConsumerDrop, consumerControllerSettings), $"consumer-controller-{_idCount}");

Expand All @@ -75,18 +67,6 @@ double ConsumerDrop(object msg)
TestConsumer.PropsFor(consumerDelay, numberOfMessages, consumerEndProbe.Ref, consumerController),
$"consumer-{_idCount}");

// RandomFlakyNetwork to simulate lost messages from consumerController to producerController
double ProducerDrop(object msg)
{
return msg switch
{
ProducerController.Request _ => producerDropProbability,
ProducerController.Resend _ => producerDropProbability,
ProducerController.RegisterConsumer<TestConsumer.Job> _ => producerDropProbability,
_ => 0
};
}

var stateHolder = new AtomicReference<DurableProducerQueueStateHolder<TestConsumer.Job>>(DurableProducerQueueStateHolder<TestConsumer.Job>.Empty);
var durableQueue = durableFailProbability.Select(p =>
{
Expand All @@ -105,50 +85,73 @@ double ProducerDrop(object msg)
new ConsumerController.RegisterToProducerController<TestConsumer.Job>(producerController));

await consumerEndProbe.ExpectMsgAsync<TestConsumer.Collected>(TimeSpan.FromSeconds(120));
return;

// RandomFlakyNetwork to simulate lost messages from producerController to consumerController
double ConsumerDrop(object msg)
{
return msg switch
{
ConsumerController.SequencedMessage<TestConsumer.Job> _ => consumerDropProbability,
_ => 0
};
}

// RandomFlakyNetwork to simulate lost messages from consumerController to producerController
double ProducerDrop(object msg)
{
return msg switch
{
ProducerController.Request _ => producerDropProbability,
ProducerController.Resend _ => producerDropProbability,
ProducerController.RegisterConsumer<TestConsumer.Job> _ => producerDropProbability,
_ => 0
};
}
}

[Fact]
public async Task ReliableDelivery_with_random_failures_must_work_with_flaky_network()
public Task ReliableDelivery_with_random_failures_must_work_with_flaky_network()
{
NextId();
var consumerDropProbability = 0.1 + ThreadLocalRandom.Current.NextDouble() * 0.2;
var producerDropProbability = 0.1 + ThreadLocalRandom.Current.NextDouble() * 0.2;

await Test(numberOfMessages: 63, producerDropProbability: producerDropProbability,
return Test(numberOfMessages: 63, producerDropProbability: producerDropProbability,
consumerDropProbability: consumerDropProbability, Option<double>.None, true);
}

[Fact]
public async Task ReliableDelivery_with_random_failures_must_work_with_flaky_DurableProducerQueue()
public Task ReliableDelivery_with_random_failures_must_work_with_flaky_DurableProducerQueue()
{
NextId();
var durableFailProbability = 0.1 + ThreadLocalRandom.Current.NextDouble() * 0.1;

await Test(numberOfMessages: 31, producerDropProbability: 0.0,
return Test(numberOfMessages: 31, producerDropProbability: 0.0,
consumerDropProbability:0.0, durableFailProbability, true);
}

[Fact]
public async Task ReliableDelivery_with_random_failures_must_work_with_flaky_network_and_flaky_DurableProducerQueue()
public Task ReliableDelivery_with_random_failures_must_work_with_flaky_network_and_flaky_DurableProducerQueue()
{
NextId();
var consumerDropProbability = 0.1 + ThreadLocalRandom.Current.NextDouble() * 0.1;
var producerDropProbability = 0.1 + ThreadLocalRandom.Current.NextDouble() * 0.1;
var durableFailProbability = 0.1 + ThreadLocalRandom.Current.NextDouble() * 0.1;


await Test(numberOfMessages: 17, producerDropProbability: producerDropProbability,
return Test(numberOfMessages: 17, producerDropProbability: producerDropProbability,
consumerDropProbability: consumerDropProbability, durableFailProbability, true);
}

[Fact]
public async Task ReliableDelivery_with_random_failures_must_work_with_flaky_network_without_resending()
public Task ReliableDelivery_with_random_failures_must_work_with_flaky_network_without_resending()
{
NextId();
var consumerDropProbability = 0.1 + ThreadLocalRandom.Current.NextDouble() * 0.4;
var producerDropProbability = 0.1 + ThreadLocalRandom.Current.NextDouble() * 0.3;

await Test(numberOfMessages: 63, producerDropProbability: producerDropProbability,
return Test(numberOfMessages: 63, producerDropProbability: producerDropProbability,
consumerDropProbability: consumerDropProbability, Option<double>.None, false);
}
}
70 changes: 38 additions & 32 deletions src/core/Akka/Delivery/Internal/ConsumerControllerImpl.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,11 @@ protected internal override bool AroundReceive(Receive receive, object message)
{
// TESTING PURPOSES ONLY - used to simulate network failures.
if (_fuzzingControl != null && ThreadLocalRandom.Current.NextDouble() < _fuzzingControl(message))
{
_log.Debug("[Testing] dropping message [{0}] due to fuzzing factor", message);
return true;
}

return base.AroundReceive(receive, message);
}

Expand Down Expand Up @@ -281,38 +285,6 @@ private void WaitingForConfirmation(SequencedMessage<T> sequencedMessage)
if (_log.IsDebugEnabled)
_log.Debug("Received Confirmed seqNr [{0}] from consumer, stashed size [{1}].", seqNr, Stash.Count);

long ComputeNextSeqNr()
{
if (sequencedMessage.First)
{
// confirm the first message immediately to cancel resending of first
var newRequestedSeqNr = seqNr - 1 + Settings.FlowControlWindow;
_log.Debug("Sending Request after first with confirmedSeqNr [{0}], requestUpToSeqNr [{1}]", seqNr,
newRequestedSeqNr);
CurrentState.ProducerController.Tell(new Request(seqNr, newRequestedSeqNr, ResendLost, false));
return newRequestedSeqNr;
}

if (CurrentState.RequestedSeqNr - seqNr == Settings.FlowControlWindow / 2)
{
var newRequestedSeqNr = CurrentState.RequestedSeqNr + Settings.FlowControlWindow / 2;
_log.Debug("Sending Request with confirmedSeqNr [{0}], requestUpToSeqNr [{1}]", seqNr,
newRequestedSeqNr);
CurrentState.ProducerController.Tell(new Request(seqNr, newRequestedSeqNr, ResendLost, false));
_retryTimer.Start(); // reset interval since Request was just sent
return newRequestedSeqNr;
}

if (sequencedMessage.Ack)
{
if (_log.IsDebugEnabled)
_log.Debug("Sending Ack seqNr [{0}]", seqNr);
CurrentState.ProducerController.Tell(new Ack(seqNr));
}

return CurrentState.RequestedSeqNr;
}

var requestedSeqNr = ComputeNextSeqNr();
if (CurrentState.Stopping && Stash.IsEmpty)
{
Expand Down Expand Up @@ -351,6 +323,40 @@ async Task ShutDownAndStop()
Stash.Unstash();
Become(Active);
}

return;

long ComputeNextSeqNr()
{
if (sequencedMessage.First)
{
// confirm the first message immediately to cancel resending of first
var newRequestedSeqNr = seqNr - 1 + Settings.FlowControlWindow;
_log.Debug("Sending Request after first with confirmedSeqNr [{0}], requestUpToSeqNr [{1}]", seqNr,
newRequestedSeqNr);
CurrentState.ProducerController.Tell(new Request(seqNr, newRequestedSeqNr, ResendLost, false));
return newRequestedSeqNr;
}

if (CurrentState.RequestedSeqNr - seqNr == Settings.FlowControlWindow / 2)
{
var newRequestedSeqNr = CurrentState.RequestedSeqNr + Settings.FlowControlWindow / 2;
_log.Debug("Sending Request with confirmedSeqNr [{0}], requestUpToSeqNr [{1}]", seqNr,
newRequestedSeqNr);
CurrentState.ProducerController.Tell(new Request(seqNr, newRequestedSeqNr, ResendLost, false));
_retryTimer.Start(); // reset interval since Request was just sent
return newRequestedSeqNr;
}

if (sequencedMessage.Ack)
{
if (_log.IsDebugEnabled)
_log.Debug("Sending Ack seqNr [{0}]", seqNr);
CurrentState.ProducerController.Tell(new Ack(seqNr));
}

return CurrentState.RequestedSeqNr;
}
});

Receive<SequencedMessage<T>>(msg =>
Expand Down
4 changes: 2 additions & 2 deletions src/core/Akka/Delivery/Internal/ProducerControllerImpl.cs
Original file line number Diff line number Diff line change
Expand Up @@ -490,10 +490,10 @@ private void ReceiveRequest(long newConfirmedSeqNr, long newRequestedSeqNr, bool
: newRequestedSeqNr;

if (newRequestedSeqNr2 != newRequestedSeqNr)
_log.Debug("Expanded requestedSeqNr from [{0}] to [{1}], because current [{3}] and all were probably lost.",
_log.Debug("Expanded requestedSeqNr from [{0}] to [{1}], because current [{2}] and all were probably lost.",
newRequestedSeqNr, newRequestedSeqNr2, stateAfterAck.CurrentSeqNr);

if (newRequestedSeqNr > CurrentState.RequestedSeqNr)
if (newRequestedSeqNr2 > CurrentState.RequestedSeqNr)
{
bool newRequested;
if (CurrentState.StoreMessageSentInProgress != 0)
Expand Down

0 comments on commit 1c4f071

Please sign in to comment.