From daafb61e3bb0dce9cd6feb7eace0e6c17229b658 Mon Sep 17 00:00:00 2001 From: Benoit Moriceau Date: Wed, 3 Aug 2022 16:25:58 -0700 Subject: [PATCH] Add generic test to test the per stream state behavior (#15267) * Add generic test to test the per stream state behavior * Add missing dependency * Add license * Add missing newline --- .../standard-destination-test/build.gradle | 1 + .../PerStreamStateMessageTest.java | 63 +++++++++++++++++++ 2 files changed, 64 insertions(+) create mode 100644 airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/PerStreamStateMessageTest.java diff --git a/airbyte-integrations/bases/standard-destination-test/build.gradle b/airbyte-integrations/bases/standard-destination-test/build.gradle index 79c7a2c8aa62..bc9aee6076fe 100644 --- a/airbyte-integrations/bases/standard-destination-test/build.gradle +++ b/airbyte-integrations/bases/standard-destination-test/build.gradle @@ -11,4 +11,5 @@ dependencies { implementation(enforcedPlatform('org.junit:junit-bom:5.8.2')) implementation 'org.junit.jupiter:junit-jupiter-api' implementation 'org.junit.jupiter:junit-jupiter-params' + implementation 'org.mockito:mockito-core:4.6.1' } diff --git a/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/PerStreamStateMessageTest.java b/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/PerStreamStateMessageTest.java new file mode 100644 index 000000000000..120f9f1a8729 --- /dev/null +++ b/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/PerStreamStateMessageTest.java @@ -0,0 +1,63 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.standardtest.destination; + +import io.airbyte.commons.json.Jsons; +import io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer; +import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.protocol.models.AirbyteMessage.Type; +import io.airbyte.protocol.models.AirbyteStateMessage; +import io.airbyte.protocol.models.AirbyteStateMessage.AirbyteStateType; +import io.airbyte.protocol.models.AirbyteStreamState; +import io.airbyte.protocol.models.StreamDescriptor; +import java.util.function.Consumer; +import org.junit.jupiter.api.Test; +import org.mockito.InOrder; +import org.mockito.Mockito; + +public abstract class PerStreamStateMessageTest { + + protected abstract Consumer getMockedConsumer(); + + protected abstract FailureTrackingAirbyteMessageConsumer getMessageConsumer(); + + @Test + void ensureAllStateMessageAreEmitted() throws Exception { + final AirbyteMessage airbyteMessage1 = AirbyteMessageCreator.createStreamStateMessage("name_one", "state_one"); + final AirbyteMessage airbyteMessage2 = AirbyteMessageCreator.createStreamStateMessage("name_two", "state_two"); + final AirbyteMessage airbyteMessage3 = AirbyteMessageCreator.createStreamStateMessage("name_three", "state_three"); + final FailureTrackingAirbyteMessageConsumer messageConsumer = getMessageConsumer(); + + messageConsumer.accept(airbyteMessage1); + messageConsumer.accept(airbyteMessage2); + messageConsumer.accept(airbyteMessage3); + + final Consumer mConsumer = getMockedConsumer(); + final InOrder inOrder = Mockito.inOrder(mConsumer); + + inOrder.verify(mConsumer).accept(airbyteMessage1); + inOrder.verify(mConsumer).accept(airbyteMessage2); + inOrder.verify(mConsumer).accept(airbyteMessage3); + } + + class AirbyteMessageCreator { + + public static AirbyteMessage createStreamStateMessage(final String name, final String value) { + return new AirbyteMessage() + .withType(Type.STATE) + .withState( + new AirbyteStateMessage() + .withType(AirbyteStateType.STREAM) + .withStream( + new AirbyteStreamState() + .withStreamDescriptor( + new StreamDescriptor() + .withName(name)) + .withStreamState(Jsons.jsonNode(value)))); + } + + } + +}