From 1365c68a901a5a12a9144ece9360dd801747af5c Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 29 Nov 2019 12:47:49 +0100 Subject: [PATCH] Better deadLetter logging of wrapped messages, #28109 (#28253) * specifically AdaptMessage that is used for Typed messageAdapter and ActorContext.ask --- .../typed/scaladsl/MessageAdapterSpec.scala | 28 ++++++++++++++++++- .../issue-28109-wrapped-message.excludes | 3 ++ .../typed/internal/InternalMessage.scala | 7 +++-- .../src/main/scala/akka/actor/Actor.scala | 1 + .../src/main/scala/akka/actor/ActorRef.scala | 24 +++++++++++++++- .../scala/akka/actor/ActorSelection.scala | 5 +++- .../scala/akka/event/DeadLetterListener.scala | 8 ++++-- .../akka/routing/ConsistentHashing.scala | 7 ++++- .../typed/ShardingMessageExtractor.scala | 3 +- .../pubsub/DistributedPubSubMediator.scala | 15 ++++++++-- 10 files changed, 89 insertions(+), 12 deletions(-) create mode 100644 akka-actor-typed/src/main/mima-filters/2.6.0.backwards.excludes/issue-28109-wrapped-message.excludes diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/MessageAdapterSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/MessageAdapterSpec.scala index 0553fa14d2c..43633a010b1 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/MessageAdapterSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/MessageAdapterSpec.scala @@ -17,10 +17,11 @@ import akka.actor.testkit.typed.scaladsl.LogCapturing import akka.actor.typed.eventstream.EventStream import com.typesafe.config.ConfigFactory import org.scalatest.WordSpecLike +import org.slf4j.event.Level object MessageAdapterSpec { val config = ConfigFactory.parseString(""" - akka.log-dead-letters = off + akka.log-dead-letters = on ping-pong-dispatcher { executor = thread-pool-executor type = PinnedDispatcher @@ -272,4 +273,29 @@ class MessageAdapterSpec } + "log wrapped message of DeadLetter" in { + case class Ping(sender: ActorRef[Pong]) + case class Pong(greeting: String) + case class PingReply(response: Pong) + + val pingProbe = createTestProbe[Ping]() + + val snitch = Behaviors.setup[PingReply] { context => + val replyTo = context.messageAdapter[Pong](PingReply) + pingProbe.ref ! Ping(replyTo) + Behaviors.stopped + } + val ref = spawn(snitch) + + createTestProbe().expectTerminated(ref) + + LoggingTestKit.empty + .withLogLevel(Level.INFO) + .withMessageRegex("Pong.*wrapped in.*AdaptMessage.*dead letters encountered") + .expect { + pingProbe.receiveMessage().sender ! Pong("hi") + } + + } + } diff --git a/akka-actor-typed/src/main/mima-filters/2.6.0.backwards.excludes/issue-28109-wrapped-message.excludes b/akka-actor-typed/src/main/mima-filters/2.6.0.backwards.excludes/issue-28109-wrapped-message.excludes new file mode 100644 index 00000000000..5f1abadd985 --- /dev/null +++ b/akka-actor-typed/src/main/mima-filters/2.6.0.backwards.excludes/issue-28109-wrapped-message.excludes @@ -0,0 +1,3 @@ +# 28109 WrappedMessage for better dead letter logging + +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.typed.internal.AdaptMessage.msg") diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/InternalMessage.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/InternalMessage.scala index d76987d67c3..506fc66cdce 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/InternalMessage.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/InternalMessage.scala @@ -4,6 +4,7 @@ package akka.actor.typed.internal +import akka.actor.WrappedMessage import akka.annotation.InternalApi /** @@ -22,6 +23,8 @@ import akka.annotation.InternalApi * function. Used by `ActorContext.spawnMessageAdapter` and `ActorContext.ask` so that the function is * applied in the "parent" actor (for better thread safety).. */ -@InternalApi private[akka] final case class AdaptMessage[U, T](msg: U, adapter: U => T) extends InternalMessage { - def adapt(): T = adapter(msg) +@InternalApi private[akka] final case class AdaptMessage[U, T](message: U, adapter: U => T) + extends InternalMessage + with WrappedMessage { + def adapt(): T = adapter(message) } diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index 15ed96aa178..fb11e0c8452 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -288,6 +288,7 @@ final case class UnhandledMessage( @BeanProperty sender: ActorRef, @BeanProperty recipient: ActorRef) extends NoSerializationVerificationNeeded + with WrappedMessage /** * Classes for passing status back to the sender. diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 651134c17a9..15b3e5fb677 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -479,7 +479,7 @@ private[akka] trait MinimalActorRef extends InternalActorRef with LocalRef { * Subscribe to this class to be notified about all [[DeadLetter]] (also the suppressed ones) * and [[Dropped]]. */ -sealed trait AllDeadLetters { +sealed trait AllDeadLetters extends WrappedMessage { def message: Any def sender: ActorRef def recipient: ActorRef @@ -533,6 +533,28 @@ object Dropped { Dropped(message, reason, ActorRef.noSender, recipient) } +object WrappedMessage { + + /** + * Unwrap [[WrappedMessage]] recursively. + */ + @tailrec def unwrap(message: Any): Any = { + message match { + case w: WrappedMessage => unwrap(w.message) + case _ => message + + } + } +} + +/** + * Message envelopes may implement this trait for better logging, such as logging of + * message class name of the wrapped message instead of the envelope class name. + */ +trait WrappedMessage { + def message: Any +} + private[akka] object DeadLetterActorRef { @SerialVersionUID(1L) class SerializedDeadLetterActorRef extends Serializable { //TODO implement as Protobuf for performance? diff --git a/akka-actor/src/main/scala/akka/actor/ActorSelection.scala b/akka-actor/src/main/scala/akka/actor/ActorSelection.scala index 4a10451930e..2da231eba22 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSelection.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSelection.scala @@ -315,12 +315,15 @@ private[akka] final case class ActorSelectionMessage( elements: immutable.Iterable[SelectionPathElement], wildcardFanOut: Boolean) extends AutoReceivedMessage - with PossiblyHarmful { + with PossiblyHarmful + with WrappedMessage { def identifyRequest: Option[Identify] = msg match { case x: Identify => Some(x) case _ => None } + + override def message: Any = msg } /** diff --git a/akka-actor/src/main/scala/akka/event/DeadLetterListener.scala b/akka-actor/src/main/scala/akka/event/DeadLetterListener.scala index 099b3a075be..be2cf76c2f3 100644 --- a/akka-actor/src/main/scala/akka/event/DeadLetterListener.scala +++ b/akka-actor/src/main/scala/akka/event/DeadLetterListener.scala @@ -13,6 +13,7 @@ import akka.actor.AllDeadLetters import akka.actor.DeadLetter import akka.actor.DeadLetterActorRef import akka.actor.Dropped +import akka.actor.WrappedMessage import akka.event.Logging.Info import akka.util.PrettyDuration._ @@ -96,13 +97,16 @@ class DeadLetterListener extends Actor { private def logDeadLetter(d: AllDeadLetters, doneMsg: String): Unit = { val origin = if (isReal(d.sender)) s" from ${d.sender}" else "" + val unwrapped = WrappedMessage.unwrap(d.message) + val messageStr = unwrapped.getClass.getName + val wrappedIn = if (d.message.isInstanceOf[WrappedMessage]) s" wrapped in [${d.message.getClass.getName}]" else "" val logMessage = d match { case dropped: Dropped => val destination = if (isReal(d.recipient)) s" to ${d.recipient}" else "" - s"Message [${d.message.getClass.getName}]$origin$destination was dropped. ${dropped.reason}. " + + s"Message [$messageStr]$wrappedIn$origin$destination was dropped. ${dropped.reason}. " + s"[$count] dead letters encountered$doneMsg. " case _ => - s"Message [${d.message.getClass.getName}]$origin to ${d.recipient} was not delivered. " + + s"Message [$messageStr]$wrappedIn$origin to ${d.recipient} was not delivered. " + s"[$count] dead letters encountered$doneMsg. " + s"If this is not an expected behavior then ${d.recipient} may have terminated unexpectedly. " } diff --git a/akka-actor/src/main/scala/akka/routing/ConsistentHashing.scala b/akka-actor/src/main/scala/akka/routing/ConsistentHashing.scala index 1660145ebd6..13b1f4036bf 100644 --- a/akka-actor/src/main/scala/akka/routing/ConsistentHashing.scala +++ b/akka-actor/src/main/scala/akka/routing/ConsistentHashing.scala @@ -5,6 +5,7 @@ package akka.routing import scala.collection.immutable + import akka.dispatch.Dispatchers import com.typesafe.config.Config import akka.actor.SupervisorStrategy @@ -13,10 +14,13 @@ import akka.actor.Address import akka.actor.ExtendedActorSystem import akka.actor.ActorSystem import java.util.concurrent.atomic.AtomicReference + import akka.serialization.SerializationExtension import scala.util.control.NonFatal + import akka.event.Logging import akka.actor.ActorPath +import akka.actor.WrappedMessage object ConsistentHashingRouter { @@ -51,7 +55,8 @@ object ConsistentHashingRouter { @SerialVersionUID(1L) final case class ConsistentHashableEnvelope(message: Any, hashKey: Any) extends ConsistentHashable - with RouterEnvelope { + with RouterEnvelope + with WrappedMessage { override def consistentHashKey: Any = hashKey } diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ShardingMessageExtractor.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ShardingMessageExtractor.scala index 7cef8193b55..dc1f3f2b7ab 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ShardingMessageExtractor.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ShardingMessageExtractor.scala @@ -4,6 +4,7 @@ package akka.cluster.sharding.typed +import akka.actor.WrappedMessage import akka.util.unused object ShardingMessageExtractor { @@ -105,4 +106,4 @@ abstract class HashCodeNoEnvelopeMessageExtractor[M](val numberOfShards: Int) ex * The alternative way of routing messages through sharding is to not use envelopes, * and have the message types themselves carry identifiers. */ -final case class ShardingEnvelope[M](entityId: String, message: M) // TODO think if should remain a case class +final case class ShardingEnvelope[M](entityId: String, message: M) extends WrappedMessage diff --git a/akka-cluster-tools/src/main/scala/akka/cluster/pubsub/DistributedPubSubMediator.scala b/akka-cluster-tools/src/main/scala/akka/cluster/pubsub/DistributedPubSubMediator.scala index 799e9aa2d3d..5dbe7e57ecc 100644 --- a/akka-cluster-tools/src/main/scala/akka/cluster/pubsub/DistributedPubSubMediator.scala +++ b/akka-cluster-tools/src/main/scala/akka/cluster/pubsub/DistributedPubSubMediator.scala @@ -190,23 +190,32 @@ object DistributedPubSubMediator { @SerialVersionUID(1L) final case class SubscribeAck(subscribe: Subscribe) extends DeadLetterSuppression @SerialVersionUID(1L) final case class UnsubscribeAck(unsubscribe: Unsubscribe) @SerialVersionUID(1L) final case class Publish(topic: String, msg: Any, sendOneMessageToEachGroup: Boolean) - extends DistributedPubSubMessage { + extends DistributedPubSubMessage + with WrappedMessage { def this(topic: String, msg: Any) = this(topic, msg, sendOneMessageToEachGroup = false) + + override def message: Any = msg } object Publish { def apply(topic: String, msg: Any) = new Publish(topic, msg) } @SerialVersionUID(1L) final case class Send(path: String, msg: Any, localAffinity: Boolean) - extends DistributedPubSubMessage { + extends DistributedPubSubMessage + with WrappedMessage { /** * Convenience constructor with `localAffinity` false */ def this(path: String, msg: Any) = this(path, msg, localAffinity = false) + + override def message: Any = msg } @SerialVersionUID(1L) final case class SendToAll(path: String, msg: Any, allButSelf: Boolean = false) - extends DistributedPubSubMessage { + extends DistributedPubSubMessage + with WrappedMessage { def this(path: String, msg: Any) = this(path, msg, allButSelf = false) + + override def message: Any = msg } sealed abstract class GetTopics