From 39db316fc86faa69231d7c8894056051721db781 Mon Sep 17 00:00:00 2001 From: Yevhen Sukhomud Date: Thu, 4 Aug 2022 21:19:26 +0700 Subject: [PATCH] Update Cassandra destination to use outputRecordCollector to properly store state (#15294) * Update Cassandra destination to use outputRecordCollector to properly store state * Bump version * auto-bump connector version [ci skip] Co-authored-by: Octavia Squidington III --- .../seed/destination_definitions.yaml | 2 +- .../resources/seed/destination_specs.yaml | 2 +- .../destination-cassandra/Dockerfile | 2 +- .../destination-cassandra/build.gradle | 1 + .../cassandra/CassandraDestination.java | 12 +++-- .../cassandra/CassandraMessageConsumer.java | 22 ++++----- .../cassandra/CassandraMessageConsumerIT.java | 2 +- .../CassandraRecordConsumerTest.java | 48 +++++++++++++++++++ 8 files changed, 70 insertions(+), 21 deletions(-) create mode 100644 airbyte-integrations/connectors/destination-cassandra/src/test/java/io/airbyte/integrations/destination/cassandra/CassandraRecordConsumerTest.java diff --git a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml index ff6078a608dd..df851ce5587f 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml @@ -53,7 +53,7 @@ - name: Cassandra destinationDefinitionId: 707456df-6f4f-4ced-b5c6-03f73bcad1c5 dockerRepository: airbyte/destination-cassandra - dockerImageTag: 0.1.2 + dockerImageTag: 0.1.3 documentationUrl: https://docs.airbyte.io/integrations/destinations/cassandra icon: cassandra.svg releaseStage: alpha diff --git a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml index 371c90249353..db3f3c3c91de 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml @@ -693,7 +693,7 @@ supported_destination_sync_modes: - "overwrite" - "append" -- dockerImage: "airbyte/destination-cassandra:0.1.2" +- dockerImage: "airbyte/destination-cassandra:0.1.3" spec: documentationUrl: "https://docs.airbyte.io/integrations/destinations/cassandra" connectionSpecification: diff --git a/airbyte-integrations/connectors/destination-cassandra/Dockerfile b/airbyte-integrations/connectors/destination-cassandra/Dockerfile index 5bb5b6b4dac1..4c44ecc48a1c 100644 --- a/airbyte-integrations/connectors/destination-cassandra/Dockerfile +++ b/airbyte-integrations/connectors/destination-cassandra/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION destination-cassandra COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=0.1.2 +LABEL io.airbyte.version=0.1.3 LABEL io.airbyte.name=airbyte/destination-cassandra diff --git a/airbyte-integrations/connectors/destination-cassandra/build.gradle b/airbyte-integrations/connectors/destination-cassandra/build.gradle index 593aa67c4e44..b5cca5c52e3f 100644 --- a/airbyte-integrations/connectors/destination-cassandra/build.gradle +++ b/airbyte-integrations/connectors/destination-cassandra/build.gradle @@ -26,6 +26,7 @@ dependencies { // https://mvnrepository.com/artifact/org.assertj/assertj-core testImplementation "org.assertj:assertj-core:${assertVersion}" testImplementation libs.connectors.testcontainers.cassandra + testImplementation project(':airbyte-integrations:bases:standard-destination-test') integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test') diff --git a/airbyte-integrations/connectors/destination-cassandra/src/main/java/io/airbyte/integrations/destination/cassandra/CassandraDestination.java b/airbyte-integrations/connectors/destination-cassandra/src/main/java/io/airbyte/integrations/destination/cassandra/CassandraDestination.java index 25a9a2f71811..eb4bb82332da 100644 --- a/airbyte-integrations/connectors/destination-cassandra/src/main/java/io/airbyte/integrations/destination/cassandra/CassandraDestination.java +++ b/airbyte-integrations/connectors/destination-cassandra/src/main/java/io/airbyte/integrations/destination/cassandra/CassandraDestination.java @@ -26,7 +26,7 @@ public static void main(String[] args) throws Exception { } @Override - public AirbyteConnectionStatus check(JsonNode config) { + public AirbyteConnectionStatus check(final JsonNode config) { var cassandraConfig = new CassandraConfig(config); // add random uuid to avoid conflicts with existing tables. String tableName = "table_" + UUID.randomUUID().toString().replace("-", ""); @@ -55,10 +55,12 @@ public AirbyteConnectionStatus check(JsonNode config) { } @Override - public AirbyteMessageConsumer getConsumer(JsonNode config, - ConfiguredAirbyteCatalog configuredCatalog, - Consumer outputRecordCollector) { - return new CassandraMessageConsumer(new CassandraConfig(config), configuredCatalog, outputRecordCollector); + public AirbyteMessageConsumer getConsumer(final JsonNode config, + final ConfiguredAirbyteCatalog configuredCatalog, + final Consumer outputRecordCollector) { + final CassandraConfig cassandraConfig = new CassandraConfig(config); + final CassandraCqlProvider cassandraCqlProvider = new CassandraCqlProvider(cassandraConfig); + return new CassandraMessageConsumer(cassandraConfig, configuredCatalog, cassandraCqlProvider, outputRecordCollector); } } diff --git a/airbyte-integrations/connectors/destination-cassandra/src/main/java/io/airbyte/integrations/destination/cassandra/CassandraMessageConsumer.java b/airbyte-integrations/connectors/destination-cassandra/src/main/java/io/airbyte/integrations/destination/cassandra/CassandraMessageConsumer.java index 6bfbab11a46f..023110d00cab 100644 --- a/airbyte-integrations/connectors/destination-cassandra/src/main/java/io/airbyte/integrations/destination/cassandra/CassandraMessageConsumer.java +++ b/airbyte-integrations/connectors/destination-cassandra/src/main/java/io/airbyte/integrations/destination/cassandra/CassandraMessageConsumer.java @@ -27,14 +27,13 @@ class CassandraMessageConsumer extends FailureTrackingAirbyteMessageConsumer { private final CassandraCqlProvider cassandraCqlProvider; - private AirbyteMessage lastMessage = null; - - public CassandraMessageConsumer(CassandraConfig cassandraConfig, - ConfiguredAirbyteCatalog configuredCatalog, - Consumer outputRecordCollector) { + public CassandraMessageConsumer(final CassandraConfig cassandraConfig, + final ConfiguredAirbyteCatalog configuredCatalog, + final CassandraCqlProvider provider, + final Consumer outputRecordCollector) { this.cassandraConfig = cassandraConfig; this.outputRecordCollector = outputRecordCollector; - this.cassandraCqlProvider = new CassandraCqlProvider(cassandraConfig); + this.cassandraCqlProvider = provider; var nameTransformer = new CassandraNameTransformer(cassandraConfig); this.cassandraStreams = configuredCatalog.getStreams().stream() .collect(Collectors.toUnmodifiableMap( @@ -55,7 +54,7 @@ protected void startTracked() { } @Override - protected void acceptTracked(AirbyteMessage message) { + protected void acceptTracked(final AirbyteMessage message) { if (message.getType() == AirbyteMessage.Type.RECORD) { var messageRecord = message.getRecord(); var streamConfig = @@ -66,14 +65,14 @@ protected void acceptTracked(AirbyteMessage message) { var data = Jsons.serialize(messageRecord.getData()); cassandraCqlProvider.insert(streamConfig.getKeyspace(), streamConfig.getTempTableName(), data); } else if (message.getType() == AirbyteMessage.Type.STATE) { - this.lastMessage = message; + outputRecordCollector.accept(message); } else { LOGGER.warn("Unsupported airbyte message type: {}", message.getType()); } } @Override - protected void close(boolean hasFailed) { + protected void close(final boolean hasFailed) { if (!hasFailed) { cassandraStreams.forEach((k, v) -> { try { @@ -88,17 +87,16 @@ protected void close(boolean hasFailed) { } default -> throw new UnsupportedOperationException(); } - } catch (Exception e) { + } catch (final Exception e) { LOGGER.error("Error while copying data to table {}: : ", v.getTableName(), e); } }); - outputRecordCollector.accept(lastMessage); } cassandraStreams.forEach((k, v) -> { try { cassandraCqlProvider.dropTableIfExists(v.getKeyspace(), v.getTempTableName()); - } catch (Exception e) { + } catch (final Exception e) { LOGGER.error("Error while deleting temp table {} with reason: ", v.getTempTableName(), e); } }); diff --git a/airbyte-integrations/connectors/destination-cassandra/src/test-integration/java/io/airbyte/integrations/destination/cassandra/CassandraMessageConsumerIT.java b/airbyte-integrations/connectors/destination-cassandra/src/test-integration/java/io/airbyte/integrations/destination/cassandra/CassandraMessageConsumerIT.java index 37c79aa5bd18..104f099264fb 100644 --- a/airbyte-integrations/connectors/destination-cassandra/src/test-integration/java/io/airbyte/integrations/destination/cassandra/CassandraMessageConsumerIT.java +++ b/airbyte-integrations/connectors/destination-cassandra/src/test-integration/java/io/airbyte/integrations/destination/cassandra/CassandraMessageConsumerIT.java @@ -53,8 +53,8 @@ void setup() { var catalog = TestDataFactory.createConfiguredAirbyteCatalog(cStream1, cStream2); - cassandraMessageConsumer = new CassandraMessageConsumer(cassandraConfig, catalog, message -> {}); cassandraCqlProvider = new CassandraCqlProvider(cassandraConfig); + cassandraMessageConsumer = new CassandraMessageConsumer(cassandraConfig, catalog, cassandraCqlProvider, message -> {}); nameTransformer = new CassandraNameTransformer(cassandraConfig); } diff --git a/airbyte-integrations/connectors/destination-cassandra/src/test/java/io/airbyte/integrations/destination/cassandra/CassandraRecordConsumerTest.java b/airbyte-integrations/connectors/destination-cassandra/src/test/java/io/airbyte/integrations/destination/cassandra/CassandraRecordConsumerTest.java new file mode 100644 index 000000000000..5b4eddfc38b9 --- /dev/null +++ b/airbyte-integrations/connectors/destination-cassandra/src/test/java/io/airbyte/integrations/destination/cassandra/CassandraRecordConsumerTest.java @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.cassandra; + +import io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer; +import io.airbyte.integrations.standardtest.destination.PerStreamStateMessageTest; +import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import java.util.function.Consumer; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +public class CassandraRecordConsumerTest extends PerStreamStateMessageTest { + + @Mock + private Consumer outputRecordCollector; + + @InjectMocks + private CassandraMessageConsumer consumer; + @Mock + private CassandraConfig config; + @Mock + private ConfiguredAirbyteCatalog catalog; + @Mock + private CassandraCqlProvider provider; + + @BeforeEach + public void init() { + consumer = new CassandraMessageConsumer(config, catalog, provider, outputRecordCollector); + } + + @Override + protected Consumer getMockedConsumer() { + return outputRecordCollector; + } + + @Override + protected FailureTrackingAirbyteMessageConsumer getMessageConsumer() { + return consumer; + } + +}