diff --git a/akka-javasdk-testkit/src/main/java/akka/javasdk/testkit/EventingTestKit.java b/akka-javasdk-testkit/src/main/java/akka/javasdk/testkit/EventingTestKit.java index 8bedc5a8f..86b6e3223 100644 --- a/akka-javasdk-testkit/src/main/java/akka/javasdk/testkit/EventingTestKit.java +++ b/akka-javasdk-testkit/src/main/java/akka/javasdk/testkit/EventingTestKit.java @@ -44,18 +44,39 @@ interface IncomingMessages { /** * Simulate the publishing of a raw message. * - * @param message raw bytestring to be published + * @param message raw protobuf bytestring to be published + * + * @deprecated Use publish with byte array parameter */ + @Deprecated void publish(ByteString message); /** * Simulate the publishing of a raw message. * - * @param message raw bytestring to be published + * @param message raw protobuf bytestring to be published * @param metadata associated with the message + * + * @deprecated Use publish with byte array parameter */ + @Deprecated void publish(ByteString message, Metadata metadata); + /** + * Simulate the publishing of a raw message. + * + * @param message raw byte array to be published + */ + void publish(byte[] message); + + /** + * Simulate the publishing of a raw message. + * + * @param message raw byte array to be published + * @param metadata associated with the message + */ + void publish(byte[] message, Metadata metadata); + /** * Simulate the publishing of a message. * diff --git a/akka-javasdk-testkit/src/main/scala/akka/javasdk/testkit/impl/EventingTestKitImpl.scala b/akka-javasdk-testkit/src/main/scala/akka/javasdk/testkit/impl/EventingTestKitImpl.scala index 2c684f7bc..2b31bf99e 100644 --- a/akka-javasdk-testkit/src/main/scala/akka/javasdk/testkit/impl/EventingTestKitImpl.scala +++ b/akka-javasdk-testkit/src/main/scala/akka/javasdk/testkit/impl/EventingTestKitImpl.scala @@ -302,6 +302,14 @@ private[testkit] class IncomingMessagesImpl(val sourcesHolder: ActorRef, val ser Await.result(addSource, 5.seconds) } + override def publish(message: Array[Byte]): Unit = + publish(message, SdkMetadata.EMPTY) + + override def publish(message: Array[Byte], metadata: SdkMetadata): Unit = { + val addSource = sourcesHolder.ask(SourcesHolder.Publish(ByteString.copyFrom(message), metadata))(5.seconds) + Await.result(addSource, 5.seconds) + } + override def publish(message: TestKitMessage[_]): Unit = message.getPayload match { case javaPb: GeneratedMessageV3 => publish(javaPb.toByteString, message.getMetadata) case scalaPb: GeneratedMessage => publish(scalaPb.toByteString, message.getMetadata) diff --git a/akka-javasdk-testkit/src/main/scala/akka/javasdk/testkit/impl/SourcesHolder.scala b/akka-javasdk-testkit/src/main/scala/akka/javasdk/testkit/impl/SourcesHolder.scala index b58b9a840..ddffccbcc 100644 --- a/akka-javasdk-testkit/src/main/scala/akka/javasdk/testkit/impl/SourcesHolder.scala +++ b/akka-javasdk-testkit/src/main/scala/akka/javasdk/testkit/impl/SourcesHolder.scala @@ -17,8 +17,8 @@ import scala.collection.mutable.ArrayBuffer object SourcesHolder { - case class AddSource(runningSourceProbe: RunningSourceProbe) - case class Publish(message: ByteString, metadata: SdkMetadata) + final case class AddSource(runningSourceProbe: RunningSourceProbe) + final case class Publish(message: ByteString, metadata: SdkMetadata) } class SourcesHolder extends Actor {