Skip to content

Commit

Permalink
chore: Cleanup AnySupport (#106)
Browse files Browse the repository at this point in the history
* some stuff not used any more
  • Loading branch information
patriknw authored Dec 18, 2024
1 parent 273ac52 commit 51f8997
Showing 1 changed file with 0 additions and 114 deletions.
114 changes: 0 additions & 114 deletions akka-javasdk/src/main/scala/akka/javasdk/impl/AnySupport.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ import scalapb.GeneratedMessageCompanion
import scalapb.options.Scalapb
import scala.collection.compat.immutable.ArraySeq

import akka.runtime.sdk.spi.BytesPayload

/**
* INTERNAL API
*/
Expand Down Expand Up @@ -61,9 +59,6 @@ private[akka] object AnySupport {
if (typeUrl.startsWith(KalixJsonTypeUrlPrefix)) JsonTypeUrlPrefix + typeUrl.stripPrefix(KalixJsonTypeUrlPrefix)
else typeUrl

def stripJsonTypeUrlPrefix(typeUrl: String): String =
typeUrl.stripPrefix(AnySupport.JsonTypeUrlPrefix).stripPrefix(KalixJsonTypeUrlPrefix)

sealed abstract class Primitive[T: ClassTag] {
val name = fieldType.name().toLowerCase(Locale.ROOT)
val fullName = KalixPrimitive + name
Expand Down Expand Up @@ -204,115 +199,6 @@ private[akka] object AnySupport {
}
}

def extractBytes(bytes: ByteString): ByteString = bytesToPrimitive(BytesPrimitive, bytes)

// FIXME we should not need these conversions
def toSpiBytesPayload(pbAny: ScalaPbAny): BytesPayload = {
if (pbAny.typeUrl.startsWith(DefaultTypeUrlPrefix))
new BytesPayload(ByteStringUtils.toAkkaByteStringUnsafe(pbAny.value), pbAny.typeUrl)
else
new BytesPayload(decodeLengthEncodedByteArrayToAkkaByteString(pbAny.value), pbAny.typeUrl)
}

private def decodeLengthEncodedByteArrayToAkkaByteString(value: ByteString): akka.util.ByteString =
if (value.isEmpty) akka.util.ByteString.empty
else {
val codedInput = value.newCodedInput()
codedInput.readTag()
ByteStringUtils.toAkkaByteStringUnsafe(codedInput.readBytes())
}

// FIXME we should not need these conversions
def toScalaPbAny(bytesPayload: BytesPayload): ScalaPbAny = {
if (bytesPayload.contentType.startsWith(DefaultTypeUrlPrefix))
ScalaPbAny(
typeUrl = bytesPayload.contentType,
value = ByteStringUtils.toProtoByteStringUnsafe(bytesPayload.bytes))
else
ScalaPbAny(
typeUrl = bytesPayload.contentType,
value = encodeByteArray(ByteStringUtils.toProtoByteStringUnsafe(bytesPayload.bytes)))
}

private def encodeByteArray(bytes: ByteString): ByteString = {
if (bytes.isEmpty) {
ByteString.EMPTY
} else {
// Create a byte array the right size. It needs to have the tag and enough space to hold the length of the data
// (up to 5 bytes).
// Length encoding consumes 1 byte for every 7 bits of the field
val bytesLengthFieldSize = ((31 - Integer.numberOfLeadingZeros(bytes.size())) / 7) + 1
val byteArray = new Array[Byte](1 + bytesLengthFieldSize)
val stream = CodedOutputStream.newInstance(byteArray)
stream.writeTag(1, WireFormat.WIRETYPE_LENGTH_DELIMITED)
stream.writeUInt32NoTag(bytes.size())
UnsafeByteOperations.unsafeWrap(byteArray).concat(bytes)
}
}

object ByteStringUtils {
import java.nio.ByteBuffer
import akka.util.ByteString.ByteString1
import akka.util.ByteString.ByteString1C
import akka.util.{ ByteString => AkkaByteString }
import com.google.protobuf.{ ByteOutput, ByteString, UnsafeByteOperations }

def toAkkaByteStringUnsafe(bytes: ByteString): AkkaByteString = {
var out = AkkaByteString.empty
UnsafeByteOperations.unsafeWriteTo(
bytes,
new ByteOutput {
override def write(value: Byte): Unit =
out ++= AkkaByteString(value)

override def write(value: Array[Byte], offset: Int, length: Int): Unit =
out ++= AkkaByteString.fromArray(value, offset, length)

override def writeLazy(value: Array[Byte], offset: Int, length: Int): Unit =
out ++= AkkaByteString.fromArrayUnsafe(value, offset, length)

override def write(value: ByteBuffer): Unit = if (value.hasRemaining) {
out ++= AkkaByteString.fromByteBuffer(value)
}

override def writeLazy(value: ByteBuffer): Unit = {
if (value.hasRemaining) {
if (value.hasArray) {
out ++= AkkaByteString.fromArrayUnsafe(value.array(), value.arrayOffset(), value.remaining())
} else {
out ++= AkkaByteString.fromByteBuffer(value)
}
}
}
})

out
}

def toProtoByteStringUnsafe(bytes: AkkaByteString): ByteString = {
bytes match {
case _ if bytes.isEmpty =>
ByteString.EMPTY
case _: ByteString1 | _: ByteString1C =>
UnsafeByteOperations.unsafeWrap(bytes.toArrayUnsafe())
case _ =>
// zero copy, reuse the same underlying byte arrays
bytes.asByteBuffers.foldLeft(ByteString.EMPTY) { (acc, byteBuffer) =>
acc.concat(UnsafeByteOperations.unsafeWrap(byteBuffer))
}
}
}

def toProtoByteStringUnsafe(bytes: Array[Byte]): ByteString = {
if (bytes.isEmpty)
ByteString.EMPTY
else {
UnsafeByteOperations.unsafeWrap(bytes)
}
}

}

}

class AnySupport(
Expand Down

0 comments on commit 51f8997

Please sign in to comment.