From 9888afcb49b9841ac52e19fe0ff3616aedf95964 Mon Sep 17 00:00:00 2001 From: Oguzhan Unlu Date: Thu, 14 Sep 2023 14:34:07 +0300 Subject: [PATCH] test feedback --- .../snowplow/sources/KinesisSourceSpec.scala | 60 ------------------- .../snowplow/sources/kinesis/Containers.scala | 30 ++++++++++ .../sources/kinesis/KinesisSourceSpec.scala | 53 ++++++++++++++++ .../sources/{ => kinesis}/Utils.scala | 54 ++++++----------- .../sources/kinesis}/KinesisSource.scala | 21 +++---- .../kinesis}/KinesisSourceConfig.scala | 3 +- 6 files changed, 109 insertions(+), 112 deletions(-) delete mode 100644 snowplow-common-internal/kinesis/src/it/scala/com/snowplowanalytics/snowplow/sources/KinesisSourceSpec.scala create mode 100644 snowplow-common-internal/kinesis/src/it/scala/com/snowplowanalytics/snowplow/sources/kinesis/Containers.scala create mode 100644 snowplow-common-internal/kinesis/src/it/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSourceSpec.scala rename snowplow-common-internal/kinesis/src/it/scala/com/snowplowanalytics/snowplow/sources/{ => kinesis}/Utils.scala (52%) rename snowplow-common-internal/kinesis/src/main/scala/{com.snowplowanalytics.snowplow/sources => com/snowplowanalytics/snowplow/sources/kinesis}/KinesisSource.scala (95%) rename snowplow-common-internal/kinesis/src/main/scala/{com.snowplowanalytics.snowplow/sources => com/snowplowanalytics/snowplow/sources/kinesis}/KinesisSourceConfig.scala (97%) diff --git a/snowplow-common-internal/kinesis/src/it/scala/com/snowplowanalytics/snowplow/sources/KinesisSourceSpec.scala b/snowplow-common-internal/kinesis/src/it/scala/com/snowplowanalytics/snowplow/sources/KinesisSourceSpec.scala deleted file mode 100644 index c308122..0000000 --- a/snowplow-common-internal/kinesis/src/it/scala/com/snowplowanalytics/snowplow/sources/KinesisSourceSpec.scala +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved. - * - * This program is licensed to you under the Snowplow Community License Version 1.0, - * and you may not use this file except in compliance with the Snowplow Community License Version 1.0. - * You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0 - */ -package com.snowplowanalytics.snowplow.sources - -import cats.effect.testing.specs2.CatsEffect -import cats.effect.{IO, Ref} -import cats.effect.unsafe.implicits.global - -import org.slf4j.LoggerFactory - -import org.testcontainers.containers.output.Slf4jLogConsumer - -import org.specs2.mutable.Specification - -import software.amazon.awssdk.regions.Region - -import scala.annotation.nowarn -import scala.concurrent.duration.DurationInt - -import com.snowplowanalytics.snowplow.sources.EventProcessingConfig.NoWindowing -import com.snowplowanalytics.snowplow.sources.Utils._ - -@nowarn("msg=unused value of type org.specs2.specification.core.Fragment") -class KinesisSourceSpec extends Specification with CatsEffect { - - "Kinesis source" should { - "read from input stream" in { - val inputStreamName = "read-input-test-stream" - val shardCount = 1 - val localstack = getLocalstackContainer(4566, inputStreamName, shardCount) - localstack.start() - - val logger = LoggerFactory.getLogger("kinesis-source-test") - val logs = new Slf4jLogConsumer(logger) - localstack.followOutput(logs) - - val region = Region.of(localstack.getRegion) - val config = getKinesisConfig(localstack.getEndpoint, inputStreamName) - - val examplePayload = "example-payload" - - val prog = for { - refProcessed <- Ref[IO].of[List[String]](Nil) - _ <- putDataToKinesis(getKinesisClient(localstack.getEndpoint, region), inputStreamName, examplePayload) - sourceAndAck = KinesisSource.build[IO](config).stream(new EventProcessingConfig(NoWindowing), testProcessor(refProcessed)) - fiber <- sourceAndAck.compile.drain.start - _ <- IO.sleep(2.minutes) - processed <- refProcessed.get - _ <- fiber.cancel - } yield processed must contain(examplePayload) - - prog.unsafeRunSync() - } - } -} diff --git a/snowplow-common-internal/kinesis/src/it/scala/com/snowplowanalytics/snowplow/sources/kinesis/Containers.scala b/snowplow-common-internal/kinesis/src/it/scala/com/snowplowanalytics/snowplow/sources/kinesis/Containers.scala new file mode 100644 index 0000000..d326294 --- /dev/null +++ b/snowplow-common-internal/kinesis/src/it/scala/com/snowplowanalytics/snowplow/sources/kinesis/Containers.scala @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Snowplow Community License Version 1.0, + * and you may not use this file except in compliance with the Snowplow Community License Version 1.0. + * You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0 + */ +package com.snowplowanalytics.snowplow.sources.kinesis + +import org.testcontainers.containers.localstack.LocalStackContainer +import org.testcontainers.containers.localstack.LocalStackContainer.Service +import org.testcontainers.containers.wait.strategy.Wait +import org.testcontainers.utility.DockerImageName + +object Containers { + + val LOCALSTACK_EXPOSED_PORT = 4566 + val testStream1Name = "test-stream-1" + val kinesisInitializeStreams: String = List(s"$testStream1Name:1").mkString(",") + + val localstack: LocalStackContainer = { + val localstack = new LocalStackContainer(DockerImageName.parse("localstack/localstack:2.2.0")) + localstack.withServices(Service.KINESIS) + localstack.addEnv("KINESIS_INITIALIZE_STREAMS", kinesisInitializeStreams) + localstack.addExposedPort(LOCALSTACK_EXPOSED_PORT) + localstack.setWaitStrategy(Wait.forLogMessage(".*Ready.*", 1)) + localstack + } + +} diff --git a/snowplow-common-internal/kinesis/src/it/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSourceSpec.scala b/snowplow-common-internal/kinesis/src/it/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSourceSpec.scala new file mode 100644 index 0000000..066b2fd --- /dev/null +++ b/snowplow-common-internal/kinesis/src/it/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSourceSpec.scala @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Snowplow Community License Version 1.0, + * and you may not use this file except in compliance with the Snowplow Community License Version 1.0. + * You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0 + */ +package com.snowplowanalytics.snowplow.sources.kinesis + +import cats.effect.{IO, Ref} +import cats.effect.unsafe.implicits.global + +import org.specs2.mutable.Specification +import org.specs2.specification.BeforeAfterAll + +import scala.annotation.nowarn +import scala.concurrent.duration.DurationInt + +import Containers._ +import Utils._ + +import com.snowplowanalytics.snowplow.sources.EventProcessingConfig +import com.snowplowanalytics.snowplow.sources.EventProcessingConfig.NoWindowing + +@nowarn("msg=unused value of type org.specs2.specification.core.Fragment") +class KinesisSourceSpec extends Specification with BeforeAfterAll { + + def beforeAll(): Unit = localstack.start() + def afterAll(): Unit = localstack.stop() + + "Kinesis source" should { + "read from input stream" in { + val testPayload = "test-payload" + + val prog = for { + region <- awsRegion + refProcessed <- Ref[IO].of[List[String]](Nil) + kinesisClient <- getKinesisClient(localstack.getEndpoint, region) + _ <- putDataToKinesis(kinesisClient, Containers.testStream1Name, testPayload) + processingConfig = new EventProcessingConfig(NoWindowing) + kinesisConfig = getKinesisConfig(region) + sourceAndAck = KinesisSource.build[IO](kinesisConfig).stream(processingConfig, testProcessor(refProcessed)) + fiber <- sourceAndAck.compile.drain.start + _ <- IO.sleep(2.minutes) + processed <- refProcessed.get + _ <- fiber.cancel + } yield processed must contain(testPayload) + + prog.unsafeRunSync() + + } + } +} diff --git a/snowplow-common-internal/kinesis/src/it/scala/com/snowplowanalytics/snowplow/sources/Utils.scala b/snowplow-common-internal/kinesis/src/it/scala/com/snowplowanalytics/snowplow/sources/kinesis/Utils.scala similarity index 52% rename from snowplow-common-internal/kinesis/src/it/scala/com/snowplowanalytics/snowplow/sources/Utils.scala rename to snowplow-common-internal/kinesis/src/it/scala/com/snowplowanalytics/snowplow/sources/kinesis/Utils.scala index e9e2f94..fc95065 100644 --- a/snowplow-common-internal/kinesis/src/it/scala/com/snowplowanalytics/snowplow/sources/Utils.scala +++ b/snowplow-common-internal/kinesis/src/it/scala/com/snowplowanalytics/snowplow/sources/kinesis/Utils.scala @@ -5,15 +5,10 @@ * and you may not use this file except in compliance with the Snowplow Community License Version 1.0. * You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0 */ -package com.snowplowanalytics.snowplow.sources +package com.snowplowanalytics.snowplow.sources.kinesis import cats.effect.{IO, Ref} - -import org.testcontainers.containers.localstack.LocalStackContainer -import org.testcontainers.containers.localstack.LocalStackContainer.Service -import org.testcontainers.containers.wait.strategy.Wait -import org.testcontainers.utility.DockerImageName - +import com.snowplowanalytics.snowplow.sources.{EventProcessor, TokenedEvents} import software.amazon.awssdk.core.SdkBytes import software.amazon.awssdk.regions.Region import software.amazon.awssdk.services.kinesis.KinesisAsyncClient @@ -24,35 +19,23 @@ import java.nio.charset.StandardCharsets import java.util.UUID object Utils { - - val region: Region = Region.of("eu-central-1") + val awsRegion: IO[Region] = IO(Region.of(Containers.localstack.getRegion)) def testProcessor(ref: Ref[IO, List[String]]): EventProcessor[IO] = _.evalMap { case TokenedEvents(events, token) => for { - _ <- ref.update(_ ::: events.map(bytes => new String(bytes, StandardCharsets.UTF_8))) + _ <- ref.update(_ ::: events.map(byteBuffer => StandardCharsets.UTF_8.decode(byteBuffer).toString)) } yield token } - def getLocalstackContainer( - exposedPort: Int, - streamName: String, - shardCount: Int - ): LocalStackContainer = { - val localstack = new LocalStackContainer(DockerImageName.parse("localstack/localstack:2.2.0")) - localstack.withServices(Service.KINESIS) - localstack.addEnv("KINESIS_INITIALIZE_STREAMS", s"$streamName:$shardCount") - localstack.addExposedPort(exposedPort) - localstack.setWaitStrategy(Wait.forLogMessage(".*Ready.*", 1)) - localstack - } - - def getKinesisClient(endpoint: URI, region: Region): KinesisAsyncClient = - KinesisAsyncClient - .builder() - .endpointOverride(endpoint) - .region(region) - .build() + def getKinesisClient(endpoint: URI, region: Region): IO[KinesisAsyncClient] = + IO.blocking( + KinesisAsyncClient + .builder() + .endpointOverride(endpoint) + .region(region) + .build() + ) def putDataToKinesis( client: KinesisAsyncClient, @@ -69,18 +52,15 @@ object Utils { IO.blocking(client.putRecord(record).get()) } - def getKinesisConfig( - endpoint: URI, - streamName: String - ): KinesisSourceConfig = KinesisSourceConfig( + def getKinesisConfig(region: Region): KinesisSourceConfig = KinesisSourceConfig( UUID.randomUUID().toString, - streamName, + Containers.testStream1Name, Some(region), KinesisSourceConfig.InitPosition.TrimHorizon, KinesisSourceConfig.Retrieval.Polling(1), 1, - Some(endpoint), - Some(endpoint), - Some(endpoint) + Some(Containers.localstack.getEndpoint), + Some(Containers.localstack.getEndpoint), + Some(Containers.localstack.getEndpoint) ) } diff --git a/snowplow-common-internal/kinesis/src/main/scala/com.snowplowanalytics.snowplow/sources/KinesisSource.scala b/snowplow-common-internal/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSource.scala similarity index 95% rename from snowplow-common-internal/kinesis/src/main/scala/com.snowplowanalytics.snowplow/sources/KinesisSource.scala rename to snowplow-common-internal/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSource.scala index 83ebb8d..42ca8e4 100644 --- a/snowplow-common-internal/kinesis/src/main/scala/com.snowplowanalytics.snowplow/sources/KinesisSource.scala +++ b/snowplow-common-internal/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSource.scala @@ -5,42 +5,37 @@ * and you may not use this file except in compliance with the Snowplow Community License Version 1.0. * You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0 */ -package com.snowplowanalytics.snowplow.sources +package com.snowplowanalytics.snowplow.sources.kinesis import cats._ -import cats.implicits._ - import cats.effect.{Async, Resource, Sync} - +import cats.implicits._ import eu.timepit.refined.types.all.PosInt - import fs2.Stream import fs2.aws.kinesis.{CommittableRecord, Kinesis, KinesisConsumerSettings} - import org.typelevel.log4cats.Logger import org.typelevel.log4cats.slf4j.Slf4jLogger - import software.amazon.awssdk.regions.Region import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient import software.amazon.awssdk.services.kinesis.KinesisAsyncClient import java.net.URI -import java.util.concurrent.Semaphore import java.util.{Date, UUID} +import java.util.concurrent.Semaphore // kinesis import software.amazon.kinesis.common.{ConfigsBuilder, InitialPositionInStream, InitialPositionInStreamExtended} import software.amazon.kinesis.coordinator.Scheduler -import software.amazon.kinesis.processor.SingleStreamTracker import software.amazon.kinesis.exceptions.ShutdownException import software.amazon.kinesis.metrics.MetricsLevel -import software.amazon.kinesis.processor.ShardRecordProcessorFactory +import software.amazon.kinesis.processor.{ShardRecordProcessorFactory, SingleStreamTracker} import software.amazon.kinesis.retrieval.fanout.FanOutConfig import software.amazon.kinesis.retrieval.polling.PollingConfig // snowplow import com.snowplowanalytics.snowplow.sources.internal.{Checkpointer, LowLevelEvents, LowLevelSource} +import com.snowplowanalytics.snowplow.sources.SourceAndAck object KinesisSource { @@ -51,8 +46,8 @@ object KinesisSource { private type KinesisCheckpointer[F[_]] = Checkpointer[F, Map[String, KinesisMetadata[F]]] - implicit class RichCommitableRecord[F[_]: Sync](val cr: CommittableRecord) extends AnyVal { - def toMetadata: KinesisMetadata[F] = KinesisMetadata(cr.shardId, cr.sequenceNumber, cr.isLastInShard, cr.lastRecordSemaphore, cr.checkpoint) + implicit class RichCommitableRecord(val cr: CommittableRecord) extends AnyVal { + def toMetadata[F[_]: Sync]: KinesisMetadata[F] = KinesisMetadata(cr.shardId, cr.sequenceNumber, cr.isLastInShard, cr.lastRecordSemaphore, cr.checkpoint) } final case class KinesisMetadata[F[_]]( @@ -146,7 +141,7 @@ object KinesisSource { .toList .groupBy(_.shardId) .view - .mapValues(_.maxBy(_.sequenceNumber).toMetadata) + .mapValues(_.maxBy(_.sequenceNumber).toMetadata[F]) .toMap LowLevelEvents(chunk.toList.map(_.record.data()), ack) } diff --git a/snowplow-common-internal/kinesis/src/main/scala/com.snowplowanalytics.snowplow/sources/KinesisSourceConfig.scala b/snowplow-common-internal/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSourceConfig.scala similarity index 97% rename from snowplow-common-internal/kinesis/src/main/scala/com.snowplowanalytics.snowplow/sources/KinesisSourceConfig.scala rename to snowplow-common-internal/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSourceConfig.scala index c27a2eb..0ccfa3c 100644 --- a/snowplow-common-internal/kinesis/src/main/scala/com.snowplowanalytics.snowplow/sources/KinesisSourceConfig.scala +++ b/snowplow-common-internal/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSourceConfig.scala @@ -5,11 +5,10 @@ * and you may not use this file except in compliance with the Snowplow Community License Version 1.0. * You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0 */ -package com.snowplowanalytics.snowplow.sources +package com.snowplowanalytics.snowplow.sources.kinesis import io.circe._ import io.circe.generic.semiauto._ - import software.amazon.awssdk.regions.Region import software.amazon.awssdk.regions.providers.DefaultAwsRegionProviderChain