From 964b2263f78db67e987c5193a3c9a487ab08926e Mon Sep 17 00:00:00 2001 From: Yevhen Sukhomud Date: Fri, 5 Aug 2022 15:24:44 +0700 Subject: [PATCH] Update Kinesis destination to use outputRecordCollector to properly store state (#15348) * Update Kinesis destination to use outputRecordCollector to properly store state * Bump version * auto-bump connector version [ci skip] Co-authored-by: Octavia Squidington III --- .../seed/destination_definitions.yaml | 2 +- .../resources/seed/destination_specs.yaml | 2 +- .../connectors/destination-kinesis/Dockerfile | 2 +- .../destination-kinesis/build.gradle | 1 + .../kinesis/KinesisDestination.java | 11 +++-- .../kinesis/KinesisMessageConsumer.java | 17 +++---- .../kinesis/KinesisRecordConsumerTest.java | 47 +++++++++++++++++++ 7 files changed, 64 insertions(+), 18 deletions(-) create mode 100644 airbyte-integrations/connectors/destination-kinesis/src/test/java/io/airbyte/integrations/destination/kinesis/KinesisRecordConsumerTest.java diff --git a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml index df851ce5587f..05be34b8d059 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml @@ -140,7 +140,7 @@ - name: Kinesis destinationDefinitionId: 6d1d66d4-26ab-4602-8d32-f85894b04955 dockerRepository: airbyte/destination-kinesis - dockerImageTag: 0.1.3 + dockerImageTag: 0.1.4 documentationUrl: https://docs.airbyte.io/integrations/destinations/kinesis icon: kinesis.svg releaseStage: alpha diff --git a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml index db3f3c3c91de..d07d6934f191 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml @@ -2296,7 +2296,7 @@ supportsDBT: false supported_destination_sync_modes: - "append" -- dockerImage: "airbyte/destination-kinesis:0.1.3" +- dockerImage: "airbyte/destination-kinesis:0.1.4" spec: documentationUrl: "https://docs.airbyte.io/integrations/destinations/kinesis" connectionSpecification: diff --git a/airbyte-integrations/connectors/destination-kinesis/Dockerfile b/airbyte-integrations/connectors/destination-kinesis/Dockerfile index 4513fa652bb8..05a5d4bd26d6 100644 --- a/airbyte-integrations/connectors/destination-kinesis/Dockerfile +++ b/airbyte-integrations/connectors/destination-kinesis/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION destination-kinesis COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=0.1.3 +LABEL io.airbyte.version=0.1.4 LABEL io.airbyte.name=airbyte/destination-kinesis diff --git a/airbyte-integrations/connectors/destination-kinesis/build.gradle b/airbyte-integrations/connectors/destination-kinesis/build.gradle index 5861e1f4a3f2..b92d73d3f8d4 100644 --- a/airbyte-integrations/connectors/destination-kinesis/build.gradle +++ b/airbyte-integrations/connectors/destination-kinesis/build.gradle @@ -24,6 +24,7 @@ dependencies { testImplementation "org.assertj:assertj-core:${assertVersion}" testImplementation "org.testcontainers:localstack:${testContainersVersion}" + testImplementation project(':airbyte-integrations:bases:standard-destination-test') integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test') diff --git a/airbyte-integrations/connectors/destination-kinesis/src/main/java/io/airbyte/integrations/destination/kinesis/KinesisDestination.java b/airbyte-integrations/connectors/destination-kinesis/src/main/java/io/airbyte/integrations/destination/kinesis/KinesisDestination.java index 5c88b6f87bf1..2170fba76afd 100644 --- a/airbyte-integrations/connectors/destination-kinesis/src/main/java/io/airbyte/integrations/destination/kinesis/KinesisDestination.java +++ b/airbyte-integrations/connectors/destination-kinesis/src/main/java/io/airbyte/integrations/destination/kinesis/KinesisDestination.java @@ -34,7 +34,7 @@ public static void main(String[] args) throws Exception { * @return AirbyteConnectionStatus status of the connection result. */ @Override - public AirbyteConnectionStatus check(JsonNode config) { + public AirbyteConnectionStatus check(final JsonNode config) { KinesisStream kinesisStream = null; var streamName = "test_stream"; try { @@ -69,10 +69,11 @@ public AirbyteConnectionStatus check(JsonNode config) { * @return KinesisMessageConsumer for consuming Airbyte messages and streaming them to Kinesis. */ @Override - public AirbyteMessageConsumer getConsumer(JsonNode config, - ConfiguredAirbyteCatalog configuredCatalog, - Consumer outputRecordCollector) { - return new KinesisMessageConsumer(new KinesisConfig(config), configuredCatalog, outputRecordCollector); + public AirbyteMessageConsumer getConsumer(final JsonNode config, + final ConfiguredAirbyteCatalog configuredCatalog, + final Consumer outputRecordCollector) { + final KinesisStream kinesisStream = new KinesisStream(new KinesisConfig(config)); + return new KinesisMessageConsumer(configuredCatalog, kinesisStream, outputRecordCollector); } } diff --git a/airbyte-integrations/connectors/destination-kinesis/src/main/java/io/airbyte/integrations/destination/kinesis/KinesisMessageConsumer.java b/airbyte-integrations/connectors/destination-kinesis/src/main/java/io/airbyte/integrations/destination/kinesis/KinesisMessageConsumer.java index b76ce98c93e1..2bd0360f5a43 100644 --- a/airbyte-integrations/connectors/destination-kinesis/src/main/java/io/airbyte/integrations/destination/kinesis/KinesisMessageConsumer.java +++ b/airbyte-integrations/connectors/destination-kinesis/src/main/java/io/airbyte/integrations/destination/kinesis/KinesisMessageConsumer.java @@ -29,13 +29,11 @@ public class KinesisMessageConsumer extends FailureTrackingAirbyteMessageConsume private final Map kinesisStreams; - private AirbyteMessage lastMessage = null; - - public KinesisMessageConsumer(KinesisConfig kinesisConfig, - ConfiguredAirbyteCatalog configuredCatalog, - Consumer outputRecordCollector) { + public KinesisMessageConsumer(final ConfiguredAirbyteCatalog configuredCatalog, + final KinesisStream kinesisStream, + final Consumer outputRecordCollector) { this.outputRecordCollector = outputRecordCollector; - this.kinesisStream = new KinesisStream(kinesisConfig); + this.kinesisStream = kinesisStream; var nameTransformer = new KinesisNameTransformer(); this.kinesisStreams = configuredCatalog.getStreams().stream() .collect(Collectors.toUnmodifiableMap( @@ -60,7 +58,7 @@ protected void startTracked() { * @param message received from the Airbyte source. */ @Override - protected void acceptTracked(AirbyteMessage message) { + protected void acceptTracked(final AirbyteMessage message) { if (message.getType() == AirbyteMessage.Type.RECORD) { var messageRecord = message.getRecord(); @@ -84,7 +82,7 @@ protected void acceptTracked(AirbyteMessage message) { // throw exception and end sync? }); } else if (message.getType() == AirbyteMessage.Type.STATE) { - this.lastMessage = message; + outputRecordCollector.accept(message); } else { LOGGER.warn("Unsupported airbyte message type: {}", message.getType()); } @@ -97,13 +95,12 @@ protected void acceptTracked(AirbyteMessage message) { * @param hasFailed flag for indicating if the operation has failed. */ @Override - protected void close(boolean hasFailed) { + protected void close(final boolean hasFailed) { try { if (!hasFailed) { kinesisStream.flush(e -> { LOGGER.error("Error while streaming data to Kinesis", e); }); - this.outputRecordCollector.accept(lastMessage); } } finally { kinesisStream.close(); diff --git a/airbyte-integrations/connectors/destination-kinesis/src/test/java/io/airbyte/integrations/destination/kinesis/KinesisRecordConsumerTest.java b/airbyte-integrations/connectors/destination-kinesis/src/test/java/io/airbyte/integrations/destination/kinesis/KinesisRecordConsumerTest.java new file mode 100644 index 000000000000..a96787cce0ad --- /dev/null +++ b/airbyte-integrations/connectors/destination-kinesis/src/test/java/io/airbyte/integrations/destination/kinesis/KinesisRecordConsumerTest.java @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.kinesis; + +import io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer; +import io.airbyte.integrations.standardtest.destination.PerStreamStateMessageTest; +import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import java.util.function.Consumer; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@DisplayName("KinesisRecordConsumer") +@ExtendWith(MockitoExtension.class) +public class KinesisRecordConsumerTest extends PerStreamStateMessageTest { + + @Mock + private Consumer outputRecordCollector; + + @Mock + private ConfiguredAirbyteCatalog catalog; + @Mock + private KinesisStream kinesisStream; + + private KinesisMessageConsumer consumer; + + @BeforeEach + public void init() { + consumer = new KinesisMessageConsumer(catalog, kinesisStream, outputRecordCollector); + } + + @Override + protected Consumer getMockedConsumer() { + return outputRecordCollector; + } + + @Override + protected FailureTrackingAirbyteMessageConsumer getMessageConsumer() { + return consumer; + } + +}