diff --git a/akka-javasdk/src/main/scala/akka/javasdk/impl/AnySupport.scala b/akka-javasdk/src/main/scala/akka/javasdk/impl/AnySupport.scala index 8c07e88d0..280273395 100644 --- a/akka-javasdk/src/main/scala/akka/javasdk/impl/AnySupport.scala +++ b/akka-javasdk/src/main/scala/akka/javasdk/impl/AnySupport.scala @@ -32,8 +32,6 @@ import scalapb.GeneratedMessageCompanion import scalapb.options.Scalapb import scala.collection.compat.immutable.ArraySeq -import akka.runtime.sdk.spi.BytesPayload - /** * INTERNAL API */ @@ -61,9 +59,6 @@ private[akka] object AnySupport { if (typeUrl.startsWith(KalixJsonTypeUrlPrefix)) JsonTypeUrlPrefix + typeUrl.stripPrefix(KalixJsonTypeUrlPrefix) else typeUrl - def stripJsonTypeUrlPrefix(typeUrl: String): String = - typeUrl.stripPrefix(AnySupport.JsonTypeUrlPrefix).stripPrefix(KalixJsonTypeUrlPrefix) - sealed abstract class Primitive[T: ClassTag] { val name = fieldType.name().toLowerCase(Locale.ROOT) val fullName = KalixPrimitive + name @@ -204,115 +199,6 @@ private[akka] object AnySupport { } } - def extractBytes(bytes: ByteString): ByteString = bytesToPrimitive(BytesPrimitive, bytes) - - // FIXME we should not need these conversions - def toSpiBytesPayload(pbAny: ScalaPbAny): BytesPayload = { - if (pbAny.typeUrl.startsWith(DefaultTypeUrlPrefix)) - new BytesPayload(ByteStringUtils.toAkkaByteStringUnsafe(pbAny.value), pbAny.typeUrl) - else - new BytesPayload(decodeLengthEncodedByteArrayToAkkaByteString(pbAny.value), pbAny.typeUrl) - } - - private def decodeLengthEncodedByteArrayToAkkaByteString(value: ByteString): akka.util.ByteString = - if (value.isEmpty) akka.util.ByteString.empty - else { - val codedInput = value.newCodedInput() - codedInput.readTag() - ByteStringUtils.toAkkaByteStringUnsafe(codedInput.readBytes()) - } - - // FIXME we should not need these conversions - def toScalaPbAny(bytesPayload: BytesPayload): ScalaPbAny = { - if (bytesPayload.contentType.startsWith(DefaultTypeUrlPrefix)) - ScalaPbAny( - typeUrl = bytesPayload.contentType, - value = ByteStringUtils.toProtoByteStringUnsafe(bytesPayload.bytes)) - else - ScalaPbAny( - typeUrl = bytesPayload.contentType, - value = encodeByteArray(ByteStringUtils.toProtoByteStringUnsafe(bytesPayload.bytes))) - } - - private def encodeByteArray(bytes: ByteString): ByteString = { - if (bytes.isEmpty) { - ByteString.EMPTY - } else { - // Create a byte array the right size. It needs to have the tag and enough space to hold the length of the data - // (up to 5 bytes). - // Length encoding consumes 1 byte for every 7 bits of the field - val bytesLengthFieldSize = ((31 - Integer.numberOfLeadingZeros(bytes.size())) / 7) + 1 - val byteArray = new Array[Byte](1 + bytesLengthFieldSize) - val stream = CodedOutputStream.newInstance(byteArray) - stream.writeTag(1, WireFormat.WIRETYPE_LENGTH_DELIMITED) - stream.writeUInt32NoTag(bytes.size()) - UnsafeByteOperations.unsafeWrap(byteArray).concat(bytes) - } - } - - object ByteStringUtils { - import java.nio.ByteBuffer - import akka.util.ByteString.ByteString1 - import akka.util.ByteString.ByteString1C - import akka.util.{ ByteString => AkkaByteString } - import com.google.protobuf.{ ByteOutput, ByteString, UnsafeByteOperations } - - def toAkkaByteStringUnsafe(bytes: ByteString): AkkaByteString = { - var out = AkkaByteString.empty - UnsafeByteOperations.unsafeWriteTo( - bytes, - new ByteOutput { - override def write(value: Byte): Unit = - out ++= AkkaByteString(value) - - override def write(value: Array[Byte], offset: Int, length: Int): Unit = - out ++= AkkaByteString.fromArray(value, offset, length) - - override def writeLazy(value: Array[Byte], offset: Int, length: Int): Unit = - out ++= AkkaByteString.fromArrayUnsafe(value, offset, length) - - override def write(value: ByteBuffer): Unit = if (value.hasRemaining) { - out ++= AkkaByteString.fromByteBuffer(value) - } - - override def writeLazy(value: ByteBuffer): Unit = { - if (value.hasRemaining) { - if (value.hasArray) { - out ++= AkkaByteString.fromArrayUnsafe(value.array(), value.arrayOffset(), value.remaining()) - } else { - out ++= AkkaByteString.fromByteBuffer(value) - } - } - } - }) - - out - } - - def toProtoByteStringUnsafe(bytes: AkkaByteString): ByteString = { - bytes match { - case _ if bytes.isEmpty => - ByteString.EMPTY - case _: ByteString1 | _: ByteString1C => - UnsafeByteOperations.unsafeWrap(bytes.toArrayUnsafe()) - case _ => - // zero copy, reuse the same underlying byte arrays - bytes.asByteBuffers.foldLeft(ByteString.EMPTY) { (acc, byteBuffer) => - acc.concat(UnsafeByteOperations.unsafeWrap(byteBuffer)) - } - } - } - - def toProtoByteStringUnsafe(bytes: Array[Byte]): ByteString = { - if (bytes.isEmpty) - ByteString.EMPTY - else { - UnsafeByteOperations.unsafeWrap(bytes) - } - } - - } - } class AnySupport(