Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Cleanup AnySupport #106

Merged
merged 1 commit into from
Dec 18, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 0 additions & 114 deletions akka-javasdk/src/main/scala/akka/javasdk/impl/AnySupport.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ import scalapb.GeneratedMessageCompanion
import scalapb.options.Scalapb
import scala.collection.compat.immutable.ArraySeq

import akka.runtime.sdk.spi.BytesPayload

/**
* INTERNAL API
*/
Expand Down Expand Up @@ -61,9 +59,6 @@ private[akka] object AnySupport {
if (typeUrl.startsWith(KalixJsonTypeUrlPrefix)) JsonTypeUrlPrefix + typeUrl.stripPrefix(KalixJsonTypeUrlPrefix)
else typeUrl

def stripJsonTypeUrlPrefix(typeUrl: String): String =
typeUrl.stripPrefix(AnySupport.JsonTypeUrlPrefix).stripPrefix(KalixJsonTypeUrlPrefix)

sealed abstract class Primitive[T: ClassTag] {
val name = fieldType.name().toLowerCase(Locale.ROOT)
val fullName = KalixPrimitive + name
Expand Down Expand Up @@ -204,115 +199,6 @@ private[akka] object AnySupport {
}
}

def extractBytes(bytes: ByteString): ByteString = bytesToPrimitive(BytesPrimitive, bytes)

// FIXME we should not need these conversions
def toSpiBytesPayload(pbAny: ScalaPbAny): BytesPayload = {
if (pbAny.typeUrl.startsWith(DefaultTypeUrlPrefix))
new BytesPayload(ByteStringUtils.toAkkaByteStringUnsafe(pbAny.value), pbAny.typeUrl)
else
new BytesPayload(decodeLengthEncodedByteArrayToAkkaByteString(pbAny.value), pbAny.typeUrl)
}

private def decodeLengthEncodedByteArrayToAkkaByteString(value: ByteString): akka.util.ByteString =
if (value.isEmpty) akka.util.ByteString.empty
else {
val codedInput = value.newCodedInput()
codedInput.readTag()
ByteStringUtils.toAkkaByteStringUnsafe(codedInput.readBytes())
}

// FIXME we should not need these conversions
def toScalaPbAny(bytesPayload: BytesPayload): ScalaPbAny = {
if (bytesPayload.contentType.startsWith(DefaultTypeUrlPrefix))
ScalaPbAny(
typeUrl = bytesPayload.contentType,
value = ByteStringUtils.toProtoByteStringUnsafe(bytesPayload.bytes))
else
ScalaPbAny(
typeUrl = bytesPayload.contentType,
value = encodeByteArray(ByteStringUtils.toProtoByteStringUnsafe(bytesPayload.bytes)))
}

private def encodeByteArray(bytes: ByteString): ByteString = {
if (bytes.isEmpty) {
ByteString.EMPTY
} else {
// Create a byte array the right size. It needs to have the tag and enough space to hold the length of the data
// (up to 5 bytes).
// Length encoding consumes 1 byte for every 7 bits of the field
val bytesLengthFieldSize = ((31 - Integer.numberOfLeadingZeros(bytes.size())) / 7) + 1
val byteArray = new Array[Byte](1 + bytesLengthFieldSize)
val stream = CodedOutputStream.newInstance(byteArray)
stream.writeTag(1, WireFormat.WIRETYPE_LENGTH_DELIMITED)
stream.writeUInt32NoTag(bytes.size())
UnsafeByteOperations.unsafeWrap(byteArray).concat(bytes)
}
}

object ByteStringUtils {
import java.nio.ByteBuffer
import akka.util.ByteString.ByteString1
import akka.util.ByteString.ByteString1C
import akka.util.{ ByteString => AkkaByteString }
import com.google.protobuf.{ ByteOutput, ByteString, UnsafeByteOperations }

def toAkkaByteStringUnsafe(bytes: ByteString): AkkaByteString = {
var out = AkkaByteString.empty
UnsafeByteOperations.unsafeWriteTo(
bytes,
new ByteOutput {
override def write(value: Byte): Unit =
out ++= AkkaByteString(value)

override def write(value: Array[Byte], offset: Int, length: Int): Unit =
out ++= AkkaByteString.fromArray(value, offset, length)

override def writeLazy(value: Array[Byte], offset: Int, length: Int): Unit =
out ++= AkkaByteString.fromArrayUnsafe(value, offset, length)

override def write(value: ByteBuffer): Unit = if (value.hasRemaining) {
out ++= AkkaByteString.fromByteBuffer(value)
}

override def writeLazy(value: ByteBuffer): Unit = {
if (value.hasRemaining) {
if (value.hasArray) {
out ++= AkkaByteString.fromArrayUnsafe(value.array(), value.arrayOffset(), value.remaining())
} else {
out ++= AkkaByteString.fromByteBuffer(value)
}
}
}
})

out
}

def toProtoByteStringUnsafe(bytes: AkkaByteString): ByteString = {
bytes match {
case _ if bytes.isEmpty =>
ByteString.EMPTY
case _: ByteString1 | _: ByteString1C =>
UnsafeByteOperations.unsafeWrap(bytes.toArrayUnsafe())
case _ =>
// zero copy, reuse the same underlying byte arrays
bytes.asByteBuffers.foldLeft(ByteString.EMPTY) { (acc, byteBuffer) =>
acc.concat(UnsafeByteOperations.unsafeWrap(byteBuffer))
}
}
}

def toProtoByteStringUnsafe(bytes: Array[Byte]): ByteString = {
if (bytes.isEmpty)
ByteString.EMPTY
else {
UnsafeByteOperations.unsafeWrap(bytes)
}
}

}

}

class AnySupport(
Expand Down
Loading