From aaa3aaed015a81e27dac407b005ccef7b6b045d0 Mon Sep 17 00:00:00 2001 From: Tuhai Maksym Date: Wed, 10 Aug 2022 12:43:55 +0300 Subject: [PATCH] 15310: Destination Scylla: Handle per-stream state (#15399) * 15310: Destination Scylla: Handle per-stream state * 15399: test fix * 15318: test fix * 15318: updating version * auto-bump connector version [ci skip] Co-authored-by: Octavia Squidington III --- .../seed/destination_definitions.yaml | 2 +- .../resources/seed/destination_specs.yaml | 2 +- .../connectors/destination-scylla/Dockerfile | 2 +- .../destination-scylla/build.gradle | 1 + .../scylla/ScyllaMessageConsumer.java | 5 +- .../scylla/ScyllaRecordConsumerTest.java | 71 +++++++++++++++++++ docs/integrations/destinations/scylla.md | 5 +- 7 files changed, 80 insertions(+), 8 deletions(-) create mode 100644 airbyte-integrations/connectors/destination-scylla/src/test/java/io/airbyte/integrations/destination/scylla/ScyllaRecordConsumerTest.java diff --git a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml index f9012e4e1cf5..63f6fe788372 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml @@ -297,7 +297,7 @@ - name: Scylla destinationDefinitionId: 3dc6f384-cd6b-4be3-ad16-a41450899bf0 dockerRepository: airbyte/destination-scylla - dockerImageTag: 0.1.2 + dockerImageTag: 0.1.3 documentationUrl: https://docs.airbyte.io/integrations/destinations/scylla icon: scylla.svg releaseStage: alpha diff --git a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml index dfcfb8d22599..2ddd01880f89 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml @@ -5040,7 +5040,7 @@ supported_destination_sync_modes: - "append" - "append_dedup" -- dockerImage: "airbyte/destination-scylla:0.1.2" +- dockerImage: "airbyte/destination-scylla:0.1.3" spec: documentationUrl: "https://docs.airbyte.io/integrations/destinations/scylla" connectionSpecification: diff --git a/airbyte-integrations/connectors/destination-scylla/Dockerfile b/airbyte-integrations/connectors/destination-scylla/Dockerfile index f7e349e0c1b1..62da4ca4a368 100644 --- a/airbyte-integrations/connectors/destination-scylla/Dockerfile +++ b/airbyte-integrations/connectors/destination-scylla/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION destination-scylla COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=0.1.2 +LABEL io.airbyte.version=0.1.3 LABEL io.airbyte.name=airbyte/destination-scylla diff --git a/airbyte-integrations/connectors/destination-scylla/build.gradle b/airbyte-integrations/connectors/destination-scylla/build.gradle index 9fcc858fe811..21d45bb766fb 100644 --- a/airbyte-integrations/connectors/destination-scylla/build.gradle +++ b/airbyte-integrations/connectors/destination-scylla/build.gradle @@ -20,6 +20,7 @@ dependencies { implementation "com.scylladb:scylla-driver-core:${scyllaDriver}" + testImplementation project(':airbyte-integrations:bases:standard-destination-test') // https://mvnrepository.com/artifact/org.assertj/assertj-core testImplementation "org.assertj:assertj-core:${assertVersion}" // https://mvnrepository.com/artifact/org.testcontainers/testcontainers diff --git a/airbyte-integrations/connectors/destination-scylla/src/main/java/io/airbyte/integrations/destination/scylla/ScyllaMessageConsumer.java b/airbyte-integrations/connectors/destination-scylla/src/main/java/io/airbyte/integrations/destination/scylla/ScyllaMessageConsumer.java index 8cfbc7d0e94f..a03053e8a3dd 100644 --- a/airbyte-integrations/connectors/destination-scylla/src/main/java/io/airbyte/integrations/destination/scylla/ScyllaMessageConsumer.java +++ b/airbyte-integrations/connectors/destination-scylla/src/main/java/io/airbyte/integrations/destination/scylla/ScyllaMessageConsumer.java @@ -27,8 +27,6 @@ public class ScyllaMessageConsumer extends FailureTrackingAirbyteMessageConsumer private final ScyllaCqlProvider scyllaCqlProvider; - private AirbyteMessage lastMessage = null; - public ScyllaMessageConsumer(ScyllaConfig scyllaConfig, ConfiguredAirbyteCatalog configuredCatalog, Consumer outputRecordCollector) { @@ -66,7 +64,7 @@ protected void acceptTracked(AirbyteMessage message) { var data = Jsons.serialize(messageRecord.getData()); scyllaCqlProvider.insert(streamConfig.getKeyspace(), streamConfig.getTempTableName(), data); } else if (message.getType() == AirbyteMessage.Type.STATE) { - this.lastMessage = message; + outputRecordCollector.accept(message); } else { LOGGER.warn("Unsupported airbyte message type: {}", message.getType()); } @@ -92,7 +90,6 @@ protected void close(boolean hasFailed) { LOGGER.error("Error while copying data to table {}: ", v.getTableName(), e); } }); - outputRecordCollector.accept(lastMessage); } scyllaStreams.forEach((k, v) -> { diff --git a/airbyte-integrations/connectors/destination-scylla/src/test/java/io/airbyte/integrations/destination/scylla/ScyllaRecordConsumerTest.java b/airbyte-integrations/connectors/destination-scylla/src/test/java/io/airbyte/integrations/destination/scylla/ScyllaRecordConsumerTest.java new file mode 100644 index 000000000000..0686797bce14 --- /dev/null +++ b/airbyte-integrations/connectors/destination-scylla/src/test/java/io/airbyte/integrations/destination/scylla/ScyllaRecordConsumerTest.java @@ -0,0 +1,71 @@ +package io.airbyte.integrations.destination.scylla; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer; +import io.airbyte.integrations.standardtest.destination.PerStreamStateMessageTest; +import io.airbyte.integrations.util.HostPortResolver; +import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import java.util.function.Consumer; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.testcontainers.containers.GenericContainer; + +@DisplayName("ScyllaRecordConsumer") +@ExtendWith(MockitoExtension.class) +public class ScyllaRecordConsumerTest extends PerStreamStateMessageTest { + private static ScyllaContainer scyllaContainer; + + @Mock + private Consumer outputRecordCollector; + + private ScyllaMessageConsumer consumer; + + @Mock + ScyllaConfig scyllaConfig; + + @Mock + private ConfiguredAirbyteCatalog configuredCatalog; + + public static ScyllaContainer initContainer() { + if (scyllaContainer == null) { + scyllaContainer = new ScyllaContainer() + .withExposedPorts(9042) + // single cpu core cluster + .withCommand("--smp 1"); + } + scyllaContainer.start(); + return scyllaContainer; + } + + @BeforeEach + public void init() { + ScyllaContainer scyllaContainer = initContainer(); + JsonNode configJson = TestDataFactory.jsonConfig( + HostPortResolver.resolveHost(scyllaContainer), + HostPortResolver.resolvePort(scyllaContainer)); + var scyllaConfig = new ScyllaConfig(configJson); + consumer = new ScyllaMessageConsumer(scyllaConfig, configuredCatalog, outputRecordCollector); + } + + @Override + protected Consumer getMockedConsumer() { + return outputRecordCollector; + } + + @Override + protected FailureTrackingAirbyteMessageConsumer getMessageConsumer() { + return consumer; + } + + static class ScyllaContainer extends GenericContainer { + + public ScyllaContainer() { + super("scylladb/scylla:4.5.0"); + } + + } +} diff --git a/docs/integrations/destinations/scylla.md b/docs/integrations/destinations/scylla.md index 14ae8435ec0b..386a1fd418aa 100644 --- a/docs/integrations/destinations/scylla.md +++ b/docs/integrations/destinations/scylla.md @@ -42,5 +42,8 @@ and handle any amount of data from the connector. * Replication [optional] [default: 1] ### Setup guide +## Changelog -###### TODO: more info, screenshots?, etc... \ No newline at end of file +| Version | Date | Pull Request | Subject | +|:--------|:-----------| :--- |:----------------------------------------------------------------------------------------------------| +| 0.1.3 | 2022-08-10 | [153999](https://github.com/airbytehq/airbyte/pull/15399) | handling per-stream state |