Skip to content

Commit

Permalink
scalafmtAll
Browse files Browse the repository at this point in the history
  • Loading branch information
colmsnowplow committed Nov 10, 2023
1 parent 17bb0ab commit 5fca90f
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ class KinesisSinkSpec extends CatsResource[IO, (Region, LocalStackContainer, Str
override val Timeout: FiniteDuration = 3.minutes

/** Resources which are shared across tests */
override val resource: Resource[IO, (Region, LocalStackContainer, String => KinesisSinkConfig )] =
override val resource: Resource[IO, (Region, LocalStackContainer, String => KinesisSinkConfig)] =
for {
region <- Resource.eval(IO.blocking((new DefaultAwsRegionProviderChain).getRegion))
localstack <- Localstack.resource(region, KinesisSinkSpec.getClass.getSimpleName)
} yield (region, localstack, getKinesisSinkConfig(localstack.getEndpoint)(_))
localstack <- Localstack.resource(region, KinesisSinkSpec.getClass.getSimpleName)
} yield (region, localstack, getKinesisSinkConfig(localstack.getEndpoint)(_))

override def is = s2"""
KinesisSinkSpec should
Expand All @@ -42,14 +42,14 @@ class KinesisSinkSpec extends CatsResource[IO, (Region, LocalStackContainer, Str

def e1 = withResource { case (region, localstack, getKinesisSinkConfig) =>
val testStream1Name = "test-sink-stream-1"
val testPayload = "test-payload"
val testInput = List(Sinkable(testPayload.getBytes(), Some("myPk"), Map(("", ""))))
val testPayload = "test-payload"
val testInput = List(Sinkable(testPayload.getBytes(), Some("myPk"), Map(("", ""))))

val kinesisClient = getKinesisClient(localstack.getEndpoint, region)
createAndWaitForKinesisStream(kinesisClient, testStream1Name, 1): Unit
val testSinkResource = KinesisSink.resource[IO](getKinesisSinkConfig(testStream1Name))

for {
for {
_ <- testSinkResource.use(testSink => testSink.sink(testInput))
_ <- IO.sleep(3.seconds)
result = getDataFromKinesis(kinesisClient, testStream1Name)
Expand All @@ -60,5 +60,4 @@ class KinesisSinkSpec extends CatsResource[IO, (Region, LocalStackContainer, Str
}
}

object KinesisSinkSpec {
}
object KinesisSinkSpec {}
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,7 @@ import java.time.Instant
import Utils._
import software.amazon.awssdk.regions.Region

class KinesisSourceSpec
extends CatsResource[IO, (Region, LocalStackContainer, String => KinesisSourceConfig)]
with SpecificationLike {
class KinesisSourceSpec extends CatsResource[IO, (Region, LocalStackContainer, String => KinesisSourceConfig)] with SpecificationLike {

override val Timeout: FiniteDuration = 3.minutes

Expand All @@ -47,7 +45,7 @@ class KinesisSourceSpec
"""

def e1 = withResource { case (region, localstack, getKinesisSourceConfig) =>
val testPayload = "test-payload"
val testPayload = "test-payload"
val testStream1Name = "test-source-stream-1"

val kinesisClient = getKinesisClient(localstack.getEndpoint, region)
Expand Down Expand Up @@ -77,5 +75,4 @@ class KinesisSourceSpec
}
}

object KinesisSourceSpec {
}
object KinesisSourceSpec {}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,13 @@ import eu.timepit.refined.types.numeric.PosInt
import software.amazon.awssdk.core.SdkBytes
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient
import software.amazon.awssdk.services.kinesis.model.{GetRecordsRequest, PutRecordRequest, PutRecordResponse, GetShardIteratorRequest, CreateStreamRequest}
import software.amazon.awssdk.services.kinesis.model.{
CreateStreamRequest,
GetRecordsRequest,
GetShardIteratorRequest,
PutRecordRequest,
PutRecordResponse
}

import com.snowplowanalytics.snowplow.sources.{EventProcessor, TokenedEvents}
import com.snowplowanalytics.snowplow.sources.kinesis.KinesisSourceConfig
Expand All @@ -37,19 +43,29 @@ object Utils {

def createAndWaitForKinesisStream(
client: KinesisAsyncClient,
streamName: String,
shardCount: Int) = {
streamName: String,
shardCount: Int
) = {
val createStreamReq = CreateStreamRequest
.builder().streamName(streamName)
.builder()
.streamName(streamName)
.shardCount(shardCount)
.build()

val resp = client.createStream(createStreamReq).get

// Block till it's active
while (resp.sdkHttpResponse().isSuccessful() && (client.describeStream(DescribeStreamRequest.builder().streamName(streamName).build()).get.streamDescription.streamStatusAsString != "ACTIVE")) {
while (
resp.sdkHttpResponse().isSuccessful() && (client
.describeStream(DescribeStreamRequest.builder().streamName(streamName).build())
.get
.streamDescription
.streamStatusAsString != "ACTIVE")
) {
Thread.sleep(500)
println(client.describeStream(DescribeStreamRequest.builder().streamName(streamName).build()).get.streamDescription.streamStatusAsString)
println(
client.describeStream(DescribeStreamRequest.builder().streamName(streamName).build()).get.streamDescription.streamStatusAsString
)
}
}

Expand Down Expand Up @@ -101,12 +117,13 @@ object Utils {
.build()

val out =
client.getRecords(request)
.get()
.records()
.asScala
.toList
.map(record => new String(record.data.asByteArray()))
client
.getRecords(request)
.get()
.records()
.asScala
.toList
.map(record => new String(record.data.asByteArray()))

ReceivedEvents(out, None)
}
Expand Down Expand Up @@ -139,10 +156,10 @@ object Utils {
}

def getKinesisClient(endpoint: URI, region: Region): KinesisAsyncClient =
KinesisAsyncClient
.builder()
.endpointOverride(endpoint)
.region(region)
.build()
KinesisAsyncClient
.builder()
.endpointOverride(endpoint)
.region(region)
.build()

}

0 comments on commit 5fca90f

Please sign in to comment.