Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed: IWrappedMessage + IDeadLetterSuppression handling #7414

Merged
merged 10 commits into from
Dec 20, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -3686,7 +3686,7 @@ namespace Akka.Event
}
public sealed class SuppressedDeadLetter : Akka.Event.AllDeadLetters
{
public SuppressedDeadLetter(Akka.Event.IDeadLetterSuppression message, Akka.Actor.IActorRef sender, Akka.Actor.IActorRef recipient) { }
public SuppressedDeadLetter(object message, Akka.Actor.IActorRef sender, Akka.Actor.IActorRef recipient) { }
}
public class TraceLogger : Akka.Actor.UntypedActor
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3676,7 +3676,7 @@ namespace Akka.Event
}
public sealed class SuppressedDeadLetter : Akka.Event.AllDeadLetters
{
public SuppressedDeadLetter(Akka.Event.IDeadLetterSuppression message, Akka.Actor.IActorRef sender, Akka.Actor.IActorRef recipient) { }
public SuppressedDeadLetter(object message, Akka.Actor.IActorRef sender, Akka.Actor.IActorRef recipient) { }
}
public class TraceLogger : Akka.Actor.UntypedActor
{
Expand Down
47 changes: 44 additions & 3 deletions src/core/Akka.Tests/Actor/DeadLettersSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@

namespace Akka.Tests
{

public class DeadLettersSpec : AkkaSpec
{
[Fact]
Expand All @@ -23,6 +22,48 @@ public async Task Can_send_messages_to_dead_letters()
Sys.DeadLetters.Tell("foobar");
await ExpectMsgAsync<DeadLetter>(deadLetter=>deadLetter.Message.Equals("foobar"));
}
}
}

private sealed record WrappedClass(object Message) : IWrappedMessage;

private sealed class SuppressedMessage : IDeadLetterSuppression
{

}

[Fact]
public async Task ShouldLogNormalWrappedMessages()
{
Sys.EventStream.Subscribe(TestActor, typeof(DeadLetter));
Sys.DeadLetters.Tell(new WrappedClass("chocolate-beans"));
await ExpectMsgAsync<DeadLetter>();
}

[Fact]
public async Task ShouldNotLogWrappedMessagesWithDeadLetterSuppression()
{
Sys.EventStream.Subscribe(TestActor, typeof(AllDeadLetters));
Sys.DeadLetters.Tell(new WrappedClass(new SuppressedMessage()));
var msg = await ExpectMsgAsync<SuppressedDeadLetter>();
msg.Message.ToString()!.Contains("SuppressedMessage").ShouldBeTrue();
}

[Fact]
public async Task ShouldLogNormalActorSelectionWrappedMessages()
{
Sys.EventStream.Subscribe(TestActor, typeof(DeadLetter));
var selection = Sys.ActorSelection("/user/foobar");
selection.Tell(new WrappedClass("chocolate-beans"));
await ExpectMsgAsync<DeadLetter>();
}

[Fact]
public async Task ShouldNotLogActorSelectionWrappedMessagesWithDeadLetterSuppression()
{
Sys.EventStream.Subscribe(TestActor, typeof(AllDeadLetters));
var selection = Sys.ActorSelection("/user/foobar");
selection.Tell(new WrappedClass(new SuppressedMessage()));
var msg = await ExpectMsgAsync<SuppressedDeadLetter>();
msg.Message.ToString()!.Contains("SuppressedMessage").ShouldBeTrue();
}
}
}
54 changes: 54 additions & 0 deletions src/core/Akka.Tests/Actor/WrappedMessagesSpec.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// -----------------------------------------------------------------------
// <copyright file="WrappedMessagesSpec.cs" company="Akka.NET Project">
// Copyright (C) 2009-2024 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2024 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
// -----------------------------------------------------------------------

using Akka.Actor;
using Akka.Event;
using Akka.TestKit;
using Xunit;

namespace Akka.Tests;

public class WrappedMessagesSpec
{
private sealed record WrappedClass(object Message) : IWrappedMessage;

private sealed record WrappedSuppressedClass(object Message) : IWrappedMessage, IDeadLetterSuppression;

private sealed class SuppressedMessage : IDeadLetterSuppression
{

}


[Fact]
public void ShouldUnwrapWrappedMessage()
{
var message = new WrappedClass("chocolate-beans");
var unwrapped = WrappedMessage.Unwrap(message);
unwrapped.ShouldBe("chocolate-beans");
}

public static readonly TheoryData<object, bool> SuppressedMessages = new()
{
{new SuppressedMessage(), true},
{new WrappedClass(new SuppressedMessage()), true},
{new WrappedClass(new WrappedClass(new SuppressedMessage())), true},
{new WrappedClass(new WrappedClass("chocolate-beans")), false},
{new WrappedSuppressedClass("foo"), true},
{new WrappedClass(new WrappedSuppressedClass("chocolate-beans")), true},
{new WrappedClass("chocolate-beans"), false},
{"chocolate-beans", false}
};

[Theory]
[MemberData(nameof(SuppressedMessages))]
public void ShouldDetectIfWrappedMessageIsSuppressed(object message, bool shouldBeSuppressed)
{
var isSuppressed = WrappedMessage.IsDeadLetterSuppressedAnywhere(message);
isSuppressed.ShouldBe(shouldBeSuppressed);
}
}
35 changes: 24 additions & 11 deletions src/core/Akka/Actor/BuiltInActors.cs
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,18 @@ public static object Unwrap(object message)
}
return message;
}

internal static bool IsDeadLetterSuppressedAnywhere(object message)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function to check, up and down the stack, if a message was marked as IDeadLetterSuppression anywhere within

{
var isSuppressed = message is IDeadLetterSuppression;
while(!isSuppressed && message is IWrappedMessage wm)
{
message = wm.Message;
isSuppressed = message is IDeadLetterSuppression;
}

return isSuppressed;
}
}

/// <summary>
Expand Down Expand Up @@ -226,19 +238,20 @@ public DeadLetterActorRef(IActorRefProvider provider, ActorPath path, EventStrea
/// <exception cref="InvalidMessageException">This exception is thrown if the given <paramref name="message"/> is undefined.</exception>
protected override void TellInternal(object message, IActorRef sender)
{
if (message == null) throw new InvalidMessageException("Message is null");
var i = message as Identify;
if (i != null)
switch (message)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refomatting - nothing of significance here

{
sender.Tell(new ActorIdentity(i.MessageId, ActorRefs.Nobody));
return;
}
var d = message as DeadLetter;
if (d != null)
{
if (!SpecialHandle(d.Message, d.Sender)) { _eventStream.Publish(d); }
return;
case null:
throw new InvalidMessageException("Message is null");
case Identify i:
sender.Tell(new ActorIdentity(i.MessageId, ActorRefs.Nobody));
return;
case DeadLetter d:
{
if (!SpecialHandle(d.Message, d.Sender)) { _eventStream.Publish(d); }
return;
}
}

if (!SpecialHandle(message, sender)) { _eventStream.Publish(new DeadLetter(message, sender.IsNobody() ? Provider.DeadLetters : sender, this)); }
}

Expand Down
8 changes: 6 additions & 2 deletions src/core/Akka/Actor/DeadLetterMailbox.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,13 @@ public DeadLetterMessageQueue(IActorRef deadLetters)
/// <param name="envelope">TBD</param>
public void Enqueue(IActorRef receiver, Envelope envelope)
{
if (envelope.Message is DeadLetter)
if (envelope.Message is AllDeadLetters)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Important fix - now that we can produce SupressedDeadLetters more easily, this safety check has to be expanded to encompass AllDeadLetters and their derived types, otherwise this will produce a StackOverflowException.

{
// actor subscribing to DeadLetter. Drop it.
/* We're receiving a DeadLetter sent to us by someone else (which is not normal - usually only happens
* if we were explicitly subscribed to DeadLetters on the EventStream).
*
* Have to terminate here in order to prevent a stack overflow.
*/
return;
}

Expand Down
10 changes: 5 additions & 5 deletions src/core/Akka/Actor/EmptyLocalActorRef.cs
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,9 @@ protected virtual bool SpecialHandle(object message, IActorRef sender)
}
else
{
if (actorSelectionMessage.Message is IDeadLetterSuppression selectionDeadLetterSuppression)
if (WrappedMessage.IsDeadLetterSuppressedAnywhere(actorSelectionMessage.Message))
{
PublishSupressedDeadLetter(selectionDeadLetterSuppression, sender);
PublishSupressedDeadLetter(actorSelectionMessage.Message, sender);
}
else
{
Expand All @@ -123,16 +123,16 @@ protected virtual bool SpecialHandle(object message, IActorRef sender)
return true;
}

if (message is IDeadLetterSuppression deadLetterSuppression)
if (WrappedMessage.IsDeadLetterSuppressedAnywhere(message))
{
PublishSupressedDeadLetter(deadLetterSuppression, sender);
PublishSupressedDeadLetter(message, sender);
return true;
}

return false;
}

private void PublishSupressedDeadLetter(IDeadLetterSuppression msg, IActorRef sender)
private void PublishSupressedDeadLetter(object msg, IActorRef sender)
{
_eventStream.Publish(new SuppressedDeadLetter(msg, sender.IsNobody() ? _provider.DeadLetters : sender, this));
}
Expand Down
2 changes: 1 addition & 1 deletion src/core/Akka/Event/DeadLetter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public sealed class SuppressedDeadLetter : AllDeadLetters
/// <exception cref="ArgumentNullException">
/// This exception is thrown when either the sender or the recipient is undefined.
/// </exception>
public SuppressedDeadLetter(IDeadLetterSuppression message, IActorRef sender, IActorRef recipient) : base(message, sender, recipient)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Breaking API change here, kind of - needed to relax this type constraint otherwise we'd lose data in the SuppressedDeadLetter logging

public SuppressedDeadLetter(object message, IActorRef sender, IActorRef recipient) : base(message, sender, recipient)
{
if (sender == null) throw new ArgumentNullException(nameof(sender), "SuppressedDeadLetter sender may not be null");
if (recipient == null) throw new ArgumentNullException(nameof(recipient), "SuppressedDeadLetter recipient may not be null");
Expand Down
Loading