Skip to content

Commit

Permalink
test feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
oguzhanunlu committed Sep 14, 2023
1 parent 382a4a2 commit 9888afc
Show file tree
Hide file tree
Showing 6 changed files with 109 additions and 112 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Snowplow Community License Version 1.0,
* and you may not use this file except in compliance with the Snowplow Community License Version 1.0.
* You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0
*/
package com.snowplowanalytics.snowplow.sources.kinesis

import org.testcontainers.containers.localstack.LocalStackContainer
import org.testcontainers.containers.localstack.LocalStackContainer.Service
import org.testcontainers.containers.wait.strategy.Wait
import org.testcontainers.utility.DockerImageName

object Containers {

val LOCALSTACK_EXPOSED_PORT = 4566
val testStream1Name = "test-stream-1"
val kinesisInitializeStreams: String = List(s"$testStream1Name:1").mkString(",")

val localstack: LocalStackContainer = {
val localstack = new LocalStackContainer(DockerImageName.parse("localstack/localstack:2.2.0"))
localstack.withServices(Service.KINESIS)
localstack.addEnv("KINESIS_INITIALIZE_STREAMS", kinesisInitializeStreams)
localstack.addExposedPort(LOCALSTACK_EXPOSED_PORT)
localstack.setWaitStrategy(Wait.forLogMessage(".*Ready.*", 1))
localstack
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Snowplow Community License Version 1.0,
* and you may not use this file except in compliance with the Snowplow Community License Version 1.0.
* You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0
*/
package com.snowplowanalytics.snowplow.sources.kinesis

import cats.effect.{IO, Ref}
import cats.effect.unsafe.implicits.global

import org.specs2.mutable.Specification
import org.specs2.specification.BeforeAfterAll

import scala.annotation.nowarn
import scala.concurrent.duration.DurationInt

import Containers._
import Utils._

import com.snowplowanalytics.snowplow.sources.EventProcessingConfig
import com.snowplowanalytics.snowplow.sources.EventProcessingConfig.NoWindowing

@nowarn("msg=unused value of type org.specs2.specification.core.Fragment")
class KinesisSourceSpec extends Specification with BeforeAfterAll {

def beforeAll(): Unit = localstack.start()
def afterAll(): Unit = localstack.stop()

"Kinesis source" should {
"read from input stream" in {
val testPayload = "test-payload"

val prog = for {
region <- awsRegion
refProcessed <- Ref[IO].of[List[String]](Nil)
kinesisClient <- getKinesisClient(localstack.getEndpoint, region)
_ <- putDataToKinesis(kinesisClient, Containers.testStream1Name, testPayload)
processingConfig = new EventProcessingConfig(NoWindowing)
kinesisConfig = getKinesisConfig(region)
sourceAndAck = KinesisSource.build[IO](kinesisConfig).stream(processingConfig, testProcessor(refProcessed))
fiber <- sourceAndAck.compile.drain.start
_ <- IO.sleep(2.minutes)
processed <- refProcessed.get
_ <- fiber.cancel
} yield processed must contain(testPayload)

prog.unsafeRunSync()

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,10 @@
* and you may not use this file except in compliance with the Snowplow Community License Version 1.0.
* You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0
*/
package com.snowplowanalytics.snowplow.sources
package com.snowplowanalytics.snowplow.sources.kinesis

import cats.effect.{IO, Ref}

import org.testcontainers.containers.localstack.LocalStackContainer
import org.testcontainers.containers.localstack.LocalStackContainer.Service
import org.testcontainers.containers.wait.strategy.Wait
import org.testcontainers.utility.DockerImageName

import com.snowplowanalytics.snowplow.sources.{EventProcessor, TokenedEvents}
import software.amazon.awssdk.core.SdkBytes
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient
Expand All @@ -24,35 +19,23 @@ import java.nio.charset.StandardCharsets
import java.util.UUID

object Utils {

val region: Region = Region.of("eu-central-1")
val awsRegion: IO[Region] = IO(Region.of(Containers.localstack.getRegion))

def testProcessor(ref: Ref[IO, List[String]]): EventProcessor[IO] =
_.evalMap { case TokenedEvents(events, token) =>
for {
_ <- ref.update(_ ::: events.map(bytes => new String(bytes, StandardCharsets.UTF_8)))
_ <- ref.update(_ ::: events.map(byteBuffer => StandardCharsets.UTF_8.decode(byteBuffer).toString))
} yield token
}

def getLocalstackContainer(
exposedPort: Int,
streamName: String,
shardCount: Int
): LocalStackContainer = {
val localstack = new LocalStackContainer(DockerImageName.parse("localstack/localstack:2.2.0"))
localstack.withServices(Service.KINESIS)
localstack.addEnv("KINESIS_INITIALIZE_STREAMS", s"$streamName:$shardCount")
localstack.addExposedPort(exposedPort)
localstack.setWaitStrategy(Wait.forLogMessage(".*Ready.*", 1))
localstack
}

def getKinesisClient(endpoint: URI, region: Region): KinesisAsyncClient =
KinesisAsyncClient
.builder()
.endpointOverride(endpoint)
.region(region)
.build()
def getKinesisClient(endpoint: URI, region: Region): IO[KinesisAsyncClient] =
IO.blocking(
KinesisAsyncClient
.builder()
.endpointOverride(endpoint)
.region(region)
.build()
)

def putDataToKinesis(
client: KinesisAsyncClient,
Expand All @@ -69,18 +52,15 @@ object Utils {
IO.blocking(client.putRecord(record).get())
}

def getKinesisConfig(
endpoint: URI,
streamName: String
): KinesisSourceConfig = KinesisSourceConfig(
def getKinesisConfig(region: Region): KinesisSourceConfig = KinesisSourceConfig(
UUID.randomUUID().toString,
streamName,
Containers.testStream1Name,
Some(region),
KinesisSourceConfig.InitPosition.TrimHorizon,
KinesisSourceConfig.Retrieval.Polling(1),
1,
Some(endpoint),
Some(endpoint),
Some(endpoint)
Some(Containers.localstack.getEndpoint),
Some(Containers.localstack.getEndpoint),
Some(Containers.localstack.getEndpoint)
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,42 +5,37 @@
* and you may not use this file except in compliance with the Snowplow Community License Version 1.0.
* You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0
*/
package com.snowplowanalytics.snowplow.sources
package com.snowplowanalytics.snowplow.sources.kinesis

import cats._
import cats.implicits._

import cats.effect.{Async, Resource, Sync}

import cats.implicits._
import eu.timepit.refined.types.all.PosInt

import fs2.Stream
import fs2.aws.kinesis.{CommittableRecord, Kinesis, KinesisConsumerSettings}

import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger

import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient

import java.net.URI
import java.util.concurrent.Semaphore
import java.util.{Date, UUID}
import java.util.concurrent.Semaphore

// kinesis
import software.amazon.kinesis.common.{ConfigsBuilder, InitialPositionInStream, InitialPositionInStreamExtended}
import software.amazon.kinesis.coordinator.Scheduler
import software.amazon.kinesis.processor.SingleStreamTracker
import software.amazon.kinesis.exceptions.ShutdownException
import software.amazon.kinesis.metrics.MetricsLevel
import software.amazon.kinesis.processor.ShardRecordProcessorFactory
import software.amazon.kinesis.processor.{ShardRecordProcessorFactory, SingleStreamTracker}
import software.amazon.kinesis.retrieval.fanout.FanOutConfig
import software.amazon.kinesis.retrieval.polling.PollingConfig

// snowplow
import com.snowplowanalytics.snowplow.sources.internal.{Checkpointer, LowLevelEvents, LowLevelSource}
import com.snowplowanalytics.snowplow.sources.SourceAndAck

object KinesisSource {

Expand All @@ -51,8 +46,8 @@ object KinesisSource {

private type KinesisCheckpointer[F[_]] = Checkpointer[F, Map[String, KinesisMetadata[F]]]

implicit class RichCommitableRecord[F[_]: Sync](val cr: CommittableRecord) extends AnyVal {
def toMetadata: KinesisMetadata[F] = KinesisMetadata(cr.shardId, cr.sequenceNumber, cr.isLastInShard, cr.lastRecordSemaphore, cr.checkpoint)
implicit class RichCommitableRecord(val cr: CommittableRecord) extends AnyVal {
def toMetadata[F[_]: Sync]: KinesisMetadata[F] = KinesisMetadata(cr.shardId, cr.sequenceNumber, cr.isLastInShard, cr.lastRecordSemaphore, cr.checkpoint)
}

final case class KinesisMetadata[F[_]](
Expand Down Expand Up @@ -146,7 +141,7 @@ object KinesisSource {
.toList
.groupBy(_.shardId)
.view
.mapValues(_.maxBy(_.sequenceNumber).toMetadata)
.mapValues(_.maxBy(_.sequenceNumber).toMetadata[F])
.toMap
LowLevelEvents(chunk.toList.map(_.record.data()), ack)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@
* and you may not use this file except in compliance with the Snowplow Community License Version 1.0.
* You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0
*/
package com.snowplowanalytics.snowplow.sources
package com.snowplowanalytics.snowplow.sources.kinesis

import io.circe._
import io.circe.generic.semiauto._

import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.regions.providers.DefaultAwsRegionProviderChain

Expand Down

0 comments on commit 9888afc

Please sign in to comment.