Skip to content

Commit

Permalink
use bytebuffer
Browse files Browse the repository at this point in the history
  • Loading branch information
oguzhanunlu committed Sep 13, 2023
1 parent 9b3db8c commit 382a4a2
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import org.typelevel.log4cats.slf4j.Slf4jLogger

import java.nio.charset.StandardCharsets
import scala.concurrent.duration.FiniteDuration

import com.snowplowanalytics.iglu.client.resolver.registries.{Http4sRegistryLookup, RegistryLookup}
import com.snowplowanalytics.snowplow.analytics.scalasdk.Event
import com.snowplowanalytics.snowplow.badrows.{BadRow, Processor => BadRowProcessor}
Expand All @@ -28,6 +27,8 @@ import com.snowplowanalytics.snowplow.sources.{EventProcessingConfig, EventProce
import com.snowplowanalytics.snowplow.lakes.Environment
import com.snowplowanalytics.snowplow.loaders.{NonAtomicFields, SchemaSubVersion, TabledEntity, Transform, TypedTabledEntity}

import java.nio.ByteBuffer

object Processing {

private implicit def logger[F[_]: Sync] = Slf4jLogger.getLogger[F]
Expand Down Expand Up @@ -128,12 +129,12 @@ object Processing {
}
}.drain

private def rememberTokens[F[_]: Functor](ref: Ref[F, WindowState]): Pipe[F, TokenedEvents, List[Array[Byte]]] =
private def rememberTokens[F[_]: Functor](ref: Ref[F, WindowState]): Pipe[F, TokenedEvents, List[ByteBuffer]] =
_.evalMap { case TokenedEvents(events, token) =>
ref.update(state => state.copy(tokens = token :: state.tokens)).as(events)
}

private def incrementReceivedCount[F[_]](env: Environment[F]): Pipe[F, List[Array[Byte]], List[Array[Byte]]] =
private def incrementReceivedCount[F[_]](env: Environment[F]): Pipe[F, List[ByteBuffer], List[ByteBuffer]] =
_.evalTap { events =>
env.metrics.addReceived(events.size)
}
Expand All @@ -151,17 +152,18 @@ object Processing {
private def parseBytes[F[_]: Async](
env: Environment[F],
processor: BadRowProcessor
): Pipe[F, List[Array[Byte]], (List[BadRow], Batched)] =
): Pipe[F, List[ByteBuffer], (List[BadRow], Batched)] =
_.parEvalMapUnordered(env.cpuParallelism) { list =>
list
.traverse { bytes =>
.traverse { byteBuffer =>
Applicative[F].pure {
val stringified = new String(bytes, StandardCharsets.UTF_8)
val stringified = StandardCharsets.UTF_8.decode(byteBuffer).toString
Event
.parse(stringified)
.map(event => Parsed(event, bytes.size, TabledEntity.forEvent(event)))
.map(event => Parsed(event, byteBuffer.array().length, TabledEntity.forEvent(event)))
.leftMap { failure =>
val payload = BadRowRawPayload(stringified)

BadRow.LoaderParsingError(processor, failure, payload)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,17 @@ package com.snowplowanalytics.snowplow.sources
import cats.Applicative
import cats.kernel.Semigroup
import cats.implicits._
import cats.effect.{Async, Sync}
import cats.effect.{Async, Resource, Sync}

import fs2.Stream

import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger

import java.nio.ByteBuffer

// kafka
import fs2.kafka.{CommittableConsumerRecord, ConsumerSettings, KafkaConsumer}
import fs2.kafka._
import org.apache.kafka.common.TopicPartition

// snowplow
Expand Down Expand Up @@ -65,7 +69,7 @@ object KafkaSource {
.map(joinPartitions[F](_))
}

private type PartitionedStreams[F[_]] = Map[TopicPartition, Stream[F, CommittableConsumerRecord[F, Array[Byte], Array[Byte]]]]
private type PartitionedStreams[F[_]] = Map[TopicPartition, Stream[F, CommittableConsumerRecord[F, Array[Byte], ByteBuffer]]]

private def joinPartitions[F[_]: Async](
partitioned: PartitionedStreams[F]
Expand Down Expand Up @@ -112,13 +116,16 @@ object KafkaSource {
.sorted
.mkString(",")

private def consumerSettings[F[_]: Async](config: KafkaSourceConfig): ConsumerSettings[F, Array[Byte], Array[Byte]] =
ConsumerSettings[F, Array[Byte], Array[Byte]]
private implicit def byteBufferDeserializer[F[_]: Sync]: Resource[F, ValueDeserializer[F, ByteBuffer]] =
Resource.pure(Deserializer.lift(arr => Sync[F].pure(ByteBuffer.wrap(arr))))

private def consumerSettings[F[_]: Async](config: KafkaSourceConfig): ConsumerSettings[F, Array[Byte], ByteBuffer] =
ConsumerSettings[F, Array[Byte], ByteBuffer]
.withBootstrapServers(config.bootstrapServers)
.withProperties(config.consumerConf)
.withEnableAutoCommit(false)
.withProperties(
("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"),
("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer")
("value.deserializer", "org.apache.kafka.common.serialization.ByteBufferDeserializer")
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient

import scala.collection.mutable.ArrayBuffer

import java.net.URI
import java.util.concurrent.Semaphore
import java.util.{Date, UUID}
Expand All @@ -53,8 +51,9 @@ object KinesisSource {

private type KinesisCheckpointer[F[_]] = Checkpointer[F, Map[String, KinesisMetadata[F]]]

private def toMetadata[F[_]: Sync]: CommittableRecord => KinesisMetadata[F] = cr =>
KinesisMetadata(cr.shardId, cr.sequenceNumber, cr.isLastInShard, cr.lastRecordSemaphore, cr.checkpoint)
implicit class RichCommitableRecord[F[_]: Sync](val cr: CommittableRecord) extends AnyVal {
def toMetadata: KinesisMetadata[F] = KinesisMetadata(cr.shardId, cr.sequenceNumber, cr.isLastInShard, cr.lastRecordSemaphore, cr.checkpoint)
}

final case class KinesisMetadata[F[_]](
shardId: String,
Expand Down Expand Up @@ -147,21 +146,12 @@ object KinesisSource {
.toList
.groupBy(_.shardId)
.view
.mapValues(_.maxBy(_.sequenceNumber))
.mapValues(toMetadata)
.mapValues(_.maxBy(_.sequenceNumber).toMetadata)
.toMap
LowLevelEvents(chunk.toList.map(getPayload), ack)
LowLevelEvents(chunk.toList.map(_.record.data()), ack)
}
}

def getPayload(record: CommittableRecord): Array[Byte] = {
val data = record.record.data
val buffer = ArrayBuffer[Byte]()
while (data.hasRemaining)
buffer.append(data.get)
buffer.toArray
}

private def scheduler[F[_]: Sync](
kinesisClient: KinesisAsyncClient,
dynamoDbClient: DynamoDbAsyncClient,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import fs2.Stream
import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger

import java.nio.ByteBuffer

// pubsub
import com.google.api.core.{ApiFutures, ApiService}
import com.google.api.gax.batching.FlowControlSettings
Expand Down Expand Up @@ -72,7 +74,7 @@ object PubsubSource {
}
}

private case class SingleMessage[F[_]](message: Array[Byte], ackReply: AckReplyConsumerWithResponse)
private case class SingleMessage[F[_]](message: ByteBuffer, ackReply: AckReplyConsumerWithResponse)

private def pubsubStream[F[_]: Async](config: PubsubSourceConfig): Stream[F, LowLevelEvents[List[AckReplyConsumerWithResponse]]] = {
val resources = for {
Expand All @@ -94,7 +96,7 @@ object PubsubSource {
LowLevelEvents(events, acks)
}
.evalTap { case LowLevelEvents(events, _) =>
val numBytes = events.map(_.size).sum
val numBytes = events.map(_.array().length).sum
semaphore.releaseN(numBytes.toLong)
}
.interruptWhen(sig)
Expand Down Expand Up @@ -178,7 +180,7 @@ object PubsubSource {
new MessageReceiverWithAckResponse {
def receiveMessage(message: PubsubMessage, ackReply: AckReplyConsumerWithResponse): Unit = {
val put = semaphore.acquireN(message.getData.size.toLong) *>
queue.offer(SingleMessage(message.getData.toByteArray, ackReply))
queue.offer(SingleMessage(ByteBuffer.wrap(message.getData.toByteArray), ackReply))

val io = put.race(sig.get)
.flatMap {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ package com.snowplowanalytics.snowplow.sources

import cats.effect.kernel.Unique

import java.nio.ByteBuffer

/**
* The events as they are fed into a [[EventProcessor]]
*
Expand All @@ -19,4 +21,4 @@ import cats.effect.kernel.Unique
* When the [[EventProcessor]] emits the token, it is an instruction to the [[SourceAndAck]] to
* ack/checkpoint the events.
*/
case class TokenedEvents(events: List[Array[Byte]], ack: Unique.Token)
case class TokenedEvents(events: List[ByteBuffer], ack: Unique.Token)
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@
*/
package com.snowplowanalytics.snowplow.sources.internal

import java.nio.ByteBuffer

/**
* The events and checkpointable item emitted by a LowLevelSource
*
* This library uses LowLevelEvents internally, but it is never exposed to the high level event
* processor
*/
case class LowLevelEvents[C](events: List[Array[Byte]], ack: C)
case class LowLevelEvents[C](events: List[ByteBuffer], ack: C)

0 comments on commit 382a4a2

Please sign in to comment.