-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Test refactor #23
base: develop
Are you sure you want to change the base?
Test refactor #23
Changes from all commits
a70b26c
d0d6905
ea225f7
c1f1f00
d974a63
0fbd58e
d751023
c50cd12
8e25b15
aaf1ef5
db4cd5e
a9b0a01
e37a18b
5a7d459
1190eb2
9755873
9259309
614a566
19a250b
e37613b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,63 @@ | ||
/* | ||
* Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved. | ||
* | ||
* This program is licensed to you under the Snowplow Community License Version 1.0, | ||
* and you may not use this file except in compliance with the Snowplow Community License Version 1.0. | ||
* You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0 | ||
*/ | ||
package com.snowplowanalytics.snowplow.sinks.kinesis | ||
|
||
import cats.effect.{IO, Resource} | ||
import cats.effect.testing.specs2.CatsResource | ||
|
||
import scala.concurrent.duration.{DurationInt, FiniteDuration} | ||
|
||
import org.specs2.mutable.SpecificationLike | ||
|
||
import org.testcontainers.containers.localstack.LocalStackContainer | ||
|
||
import software.amazon.awssdk.regions.providers.DefaultAwsRegionProviderChain | ||
import software.amazon.awssdk.regions.Region | ||
|
||
import com.snowplowanalytics.snowplow.it.kinesis._ | ||
import com.snowplowanalytics.snowplow.sinks.Sinkable | ||
|
||
import Utils._ | ||
|
||
class KinesisSinkSpec extends CatsResource[IO, (Region, LocalStackContainer, String => KinesisSinkConfig)] with SpecificationLike { | ||
|
||
override val Timeout: FiniteDuration = 3.minutes | ||
|
||
/** Resources which are shared across tests */ | ||
override val resource: Resource[IO, (Region, LocalStackContainer, String => KinesisSinkConfig)] = | ||
for { | ||
region <- Resource.eval(IO.blocking((new DefaultAwsRegionProviderChain).getRegion)) | ||
localstack <- Localstack.resource(region, KinesisSinkSpec.getClass.getSimpleName) | ||
} yield (region, localstack, getKinesisSinkConfig(localstack.getEndpoint)(_)) | ||
|
||
override def is = s2""" | ||
KinesisSinkSpec should | ||
write to output stream $e1 | ||
""" | ||
|
||
def e1 = withResource { case (region, localstack, getKinesisSinkConfig) => | ||
val testStream1Name = "test-sink-stream-1" | ||
val testPayload = "test-payload" | ||
val testInput = List(Sinkable(testPayload.getBytes(), Some("myPk"), Map(("", "")))) | ||
|
||
val kinesisClient = getKinesisClient(localstack.getEndpoint, region) | ||
createAndWaitForKinesisStream(kinesisClient, testStream1Name, 1): Unit | ||
val testSinkResource = KinesisSink.resource[IO](getKinesisSinkConfig(testStream1Name)) | ||
|
||
for { | ||
_ <- testSinkResource.use(testSink => testSink.sink(testInput)) | ||
_ <- IO.sleep(3.seconds) | ||
result = getDataFromKinesis(kinesisClient, testStream1Name) | ||
} yield List( | ||
result.events must haveSize(1), | ||
result.events must beEqualTo(List(testPayload)) | ||
) | ||
} | ||
} | ||
|
||
object KinesisSinkSpec {} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,45 +16,48 @@ import org.specs2.mutable.SpecificationLike | |
|
||
import org.testcontainers.containers.localstack.LocalStackContainer | ||
|
||
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient | ||
import software.amazon.awssdk.regions.providers.DefaultAwsRegionProviderChain | ||
|
||
import com.snowplowanalytics.snowplow.sources.EventProcessingConfig | ||
import com.snowplowanalytics.snowplow.sources.EventProcessingConfig.NoWindowing | ||
import com.snowplowanalytics.snowplow.it.kinesis._ | ||
|
||
import java.time.Instant | ||
|
||
import Utils._ | ||
import software.amazon.awssdk.regions.Region | ||
|
||
class KinesisSourceSpec | ||
extends CatsResource[IO, (LocalStackContainer, KinesisAsyncClient, String => KinesisSourceConfig)] | ||
with SpecificationLike { | ||
import KinesisSourceSpec._ | ||
class KinesisSourceSpec extends CatsResource[IO, (Region, LocalStackContainer, String => KinesisSourceConfig)] with SpecificationLike { | ||
|
||
override val Timeout: FiniteDuration = 3.minutes | ||
|
||
/** Resources which are shared across tests */ | ||
override val resource: Resource[IO, (LocalStackContainer, KinesisAsyncClient, String => KinesisSourceConfig)] = | ||
override val resource: Resource[IO, (Region, LocalStackContainer, String => KinesisSourceConfig)] = | ||
for { | ||
region <- Resource.eval(KinesisSourceConfig.getRuntimeRegion[IO]) | ||
localstack <- Localstack.resource(region, KINESIS_INITIALIZE_STREAMS) | ||
kinesisClient <- Resource.eval(getKinesisClient(localstack.getEndpoint, region)) | ||
} yield (localstack, kinesisClient, getKinesisConfig(localstack.getEndpoint)(_)) | ||
region <- Resource.eval(IO.blocking((new DefaultAwsRegionProviderChain).getRegion)) | ||
localstack <- Localstack.resource(region, KinesisSourceSpec.getClass.getSimpleName) | ||
} yield (region, localstack, getKinesisSourceConfig(localstack.getEndpoint)(_)) | ||
|
||
override def is = s2""" | ||
KinesisSourceSpec should | ||
read from input stream $e1 | ||
""" | ||
|
||
def e1 = withResource { case (_, kinesisClient, getKinesisConfig) => | ||
val testPayload = "test-payload" | ||
def e1 = withResource { case (region, localstack, getKinesisSourceConfig) => | ||
val testPayload = "test-payload" | ||
val testStream1Name = "test-source-stream-1" | ||
|
||
val kinesisClient = getKinesisClient(localstack.getEndpoint, region) | ||
createAndWaitForKinesisStream(kinesisClient, testStream1Name, 1) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here you do something blocking on a thread does not expect to be blocked. To be honest I have no idea how that affects these tests, whether that's a problem or not. You could avoid the question by dropping this down into the for {
_ <- IO.blocking{ createAndWaitForKinesisStream(kinesisClient, testStream1Name, 1) }
refProcessed <- Ref[IO].of[List[ReceivedEvents]](Nil)
// etc |
||
|
||
for { | ||
|
||
refProcessed <- Ref[IO].of[List[ReceivedEvents]](Nil) | ||
t1 <- IO.realTimeInstant | ||
_ <- putDataToKinesis(kinesisClient, testStream1Name, testPayload) | ||
t2 <- IO.realTimeInstant | ||
processingConfig = new EventProcessingConfig(NoWindowing) | ||
kinesisConfig = getKinesisConfig(testStream1Name) | ||
kinesisConfig = getKinesisSourceConfig(testStream1Name) | ||
sourceAndAck <- KinesisSource.build[IO](kinesisConfig) | ||
stream = sourceAndAck.stream(processingConfig, testProcessor(refProcessed)) | ||
fiber <- stream.compile.drain.start | ||
|
@@ -71,8 +74,4 @@ class KinesisSourceSpec | |
} | ||
} | ||
|
||
object KinesisSourceSpec { | ||
val testStream1Name = "test-stream-1" | ||
val KINESIS_INITIALIZE_STREAMS: String = | ||
List(s"$testStream1Name:1").mkString(",") | ||
} | ||
object KinesisSourceSpec {} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,28 +5,70 @@ | |
* and you may not use this file except in compliance with the Snowplow Community License Version 1.0. | ||
* You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0 | ||
*/ | ||
package com.snowplowanalytics.snowplow.sources.kinesis | ||
package com.snowplowanalytics.snowplow.it.kinesis | ||
|
||
import cats.effect.{IO, Ref} | ||
|
||
import scala.concurrent.duration.FiniteDuration | ||
import scala.jdk.CollectionConverters._ | ||
|
||
import eu.timepit.refined.types.numeric.PosInt | ||
|
||
import software.amazon.awssdk.core.SdkBytes | ||
import software.amazon.awssdk.regions.Region | ||
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient | ||
import software.amazon.awssdk.services.kinesis.model.{PutRecordRequest, PutRecordResponse} | ||
import software.amazon.awssdk.services.kinesis.model.{ | ||
CreateStreamRequest, | ||
GetRecordsRequest, | ||
GetShardIteratorRequest, | ||
PutRecordRequest, | ||
PutRecordResponse | ||
} | ||
|
||
import com.snowplowanalytics.snowplow.sources.{EventProcessor, TokenedEvents} | ||
import com.snowplowanalytics.snowplow.sources.kinesis.KinesisSourceConfig | ||
import com.snowplowanalytics.snowplow.sinks.kinesis.{BackoffPolicy, KinesisSinkConfig} | ||
|
||
import java.net.URI | ||
import java.nio.charset.StandardCharsets | ||
import java.util.UUID | ||
import java.time.Instant | ||
import java.util.concurrent.TimeUnit | ||
|
||
import com.snowplowanalytics.snowplow.sources.{EventProcessor, TokenedEvents} | ||
import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest | ||
|
||
object Utils { | ||
|
||
case class ReceivedEvents(events: List[String], tstamp: Option[Instant]) | ||
|
||
def createAndWaitForKinesisStream( | ||
client: KinesisAsyncClient, | ||
streamName: String, | ||
shardCount: Int | ||
) = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please can we have an explicit return type on this function? It helps elderly developers who don't know how to configure intellisense. |
||
val createStreamReq = CreateStreamRequest | ||
.builder() | ||
.streamName(streamName) | ||
.shardCount(shardCount) | ||
.build() | ||
|
||
val resp = client.createStream(createStreamReq).get | ||
|
||
// Block till it's active | ||
while ( | ||
resp.sdkHttpResponse().isSuccessful() && (client | ||
.describeStream(DescribeStreamRequest.builder().streamName(streamName).build()) | ||
.get | ||
.streamDescription | ||
.streamStatusAsString != "ACTIVE") | ||
) { | ||
Thread.sleep(500) | ||
println( | ||
client.describeStream(DescribeStreamRequest.builder().streamName(streamName).build()).get.streamDescription.streamStatusAsString | ||
) | ||
} | ||
} | ||
|
||
def putDataToKinesis( | ||
client: KinesisAsyncClient, | ||
streamName: String, | ||
|
@@ -42,7 +84,51 @@ object Utils { | |
IO.blocking(client.putRecord(record).get()) | ||
} | ||
|
||
def getKinesisConfig(endpoint: URI)(streamName: String): KinesisSourceConfig = KinesisSourceConfig( | ||
/** | ||
* getDataFromKinesis gets the last 1000 records from kinesis, stringifies the datta it found, and | ||
* returns a ReceivedEvents It can be called at the end of simple tests to return data from a | ||
* Kinesis stream. | ||
* | ||
* If required in future, where more data is used we might amend it to poll the stream for data | ||
* and return everything it finds after a period without any data. | ||
*/ | ||
def getDataFromKinesis( | ||
client: KinesisAsyncClient, | ||
streamName: String | ||
): ReceivedEvents = { | ||
|
||
val descStreamResp = client.describeStream(DescribeStreamRequest.builder().streamName(streamName).build()).get | ||
|
||
// We're assuming only one shard here. | ||
// Any future test with multiple shards requires us to create one iterator per shard | ||
val shIterRequest = GetShardIteratorRequest | ||
.builder() | ||
.streamName(streamName) | ||
.shardIteratorType("TRIM_HORIZON") | ||
.shardId(descStreamResp.streamDescription.shards.get(0).shardId) | ||
.build(); | ||
|
||
val shIter = client.getShardIterator(shIterRequest).get.shardIterator | ||
|
||
val request = GetRecordsRequest | ||
.builder() | ||
.streamARN(descStreamResp.streamDescription().streamARN()) | ||
.shardIterator(shIter) | ||
.build() | ||
|
||
val out = | ||
client | ||
.getRecords(request) | ||
.get() | ||
.records() | ||
.asScala | ||
.toList | ||
.map(record => new String(record.data.asByteArray())) | ||
|
||
ReceivedEvents(out, None) | ||
} | ||
|
||
def getKinesisSourceConfig(endpoint: URI)(streamName: String): KinesisSourceConfig = KinesisSourceConfig( | ||
UUID.randomUUID().toString, | ||
streamName, | ||
KinesisSourceConfig.InitialPosition.TrimHorizon, | ||
|
@@ -53,6 +139,14 @@ object Utils { | |
Some(endpoint) | ||
) | ||
|
||
def getKinesisSinkConfig(endpoint: URI)(streamName: String): KinesisSinkConfig = KinesisSinkConfig( | ||
streamName, | ||
BackoffPolicy(FiniteDuration(1, TimeUnit.SECONDS), FiniteDuration(1, TimeUnit.SECONDS), None), | ||
1000, | ||
1000000, | ||
Some(endpoint) | ||
) | ||
|
||
def testProcessor(ref: Ref[IO, List[ReceivedEvents]]): EventProcessor[IO] = | ||
_.evalMap { case TokenedEvents(events, token, tstamp) => | ||
val parsed = events.map(byteBuffer => StandardCharsets.UTF_8.decode(byteBuffer).toString) | ||
|
@@ -61,12 +155,11 @@ object Utils { | |
} yield token | ||
} | ||
|
||
def getKinesisClient(endpoint: URI, region: Region): IO[KinesisAsyncClient] = | ||
IO( | ||
KinesisAsyncClient | ||
.builder() | ||
.endpointOverride(endpoint) | ||
.region(region) | ||
.build() | ||
) | ||
def getKinesisClient(endpoint: URI, region: Region): KinesisAsyncClient = | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I noticed that I wonder if strictly speaking neither implementation here is correct. If the def getKinesisClient(endpoint: URI, region: Region): Resource[IO, KinesisAsyncClient] =
??? |
||
KinesisAsyncClient | ||
.builder() | ||
.endpointOverride(endpoint) | ||
.region(region) | ||
.build() | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe simpler to make the return type
Resource[IO, (Region, LocalStackContainer)]
. And then the spec can callgetKinesisSinkConfig
when needed.