Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test refactor #23

Open
wants to merge 20 commits into
base: develop
Choose a base branch
from
Open

Test refactor #23

wants to merge 20 commits into from

Conversation

colmsnowplow
Copy link
Contributor

No description provided.

@colmsnowplow colmsnowplow changed the base branch from develop to kinesis-sink-tests November 10, 2023 19:42
client: KinesisAsyncClient,
streamName: String,
shardCount: Int
) = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please can we have an explicit return type on this function? It helps elderly developers who don't know how to configure intellisense.


override val Timeout: FiniteDuration = 3.minutes

/** Resources which are shared across tests */
override val resource: Resource[IO, (Region, LocalStackContainer, Sink[IO])] =
override val resource: Resource[IO, (Region, LocalStackContainer, String => KinesisSinkConfig)] =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe simpler to make the return type Resource[IO, (Region, LocalStackContainer)]. And then the spec can call getKinesisSinkConfig when needed.

.region(region)
.build()
)
def getKinesisClient(endpoint: URI, region: Region): KinesisAsyncClient =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed that KinesisAsyncClient has a close() method, and we never call it.

I wonder if strictly speaking neither implementation here is correct. If the close() method is important (I don't know if it is!) then we should manage the client as a resource:

def getKinesisClient(endpoint: URI, region: Region): Resource[IO, KinesisAsyncClient] =
  ???

val testStream1Name = "test-source-stream-1"

val kinesisClient = getKinesisClient(localstack.getEndpoint, region)
createAndWaitForKinesisStream(kinesisClient, testStream1Name, 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here you do something blocking on a thread does not expect to be blocked. To be honest I have no idea how that affects these tests, whether that's a problem or not.

You could avoid the question by dropping this down into the for block like this:

for {
  _ <- IO.blocking{ createAndWaitForKinesisStream(kinesisClient, testStream1Name, 1) }
  refProcessed <- Ref[IO].of[List[ReceivedEvents]](Nil)
  // etc

@colmsnowplow colmsnowplow changed the base branch from kinesis-sink-tests to develop November 23, 2023 17:27
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants