From 211d331d89e33bd40a73c6fef778acaab7d338ca Mon Sep 17 00:00:00 2001 From: Tuhai Maksym Date: Mon, 8 Aug 2022 18:49:11 +0300 Subject: [PATCH] 15302: Destination Azure Blob Storage: Handle per-stream state (#15318) * 15302: Azure blob destination consumer fixed * 15302: Unit test added * 15302: Unit test added * 15318: test fix --- .../build.gradle | 2 + .../AzureBlobStorageConsumer.java | 22 ++++----- .../AzureBlobRecordConsumerTest.java | 47 +++++++++++++++++++ 3 files changed, 57 insertions(+), 14 deletions(-) create mode 100644 airbyte-integrations/connectors/destination-azure-blob-storage/src/test/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobRecordConsumerTest.java diff --git a/airbyte-integrations/connectors/destination-azure-blob-storage/build.gradle b/airbyte-integrations/connectors/destination-azure-blob-storage/build.gradle index 6163033e0f44..1e1882ebb22e 100644 --- a/airbyte-integrations/connectors/destination-azure-blob-storage/build.gradle +++ b/airbyte-integrations/connectors/destination-azure-blob-storage/build.gradle @@ -19,6 +19,8 @@ dependencies { implementation 'com.azure:azure-storage-blob:12.12.0' implementation 'org.apache.commons:commons-csv:1.4' + testImplementation project(':airbyte-integrations:bases:standard-destination-test') + testImplementation 'org.apache.commons:commons-lang3:3.11' integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test') diff --git a/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageConsumer.java b/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageConsumer.java index 1989560f45f9..be9d14d28eab 100644 --- a/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageConsumer.java +++ b/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageConsumer.java @@ -44,13 +44,11 @@ public class AzureBlobStorageConsumer extends FailureTrackingAirbyteMessageConsu private final Consumer outputRecordCollector; private final Map streamNameAndNamespaceToWriters; - private AirbyteMessage lastStateMessage = null; - public AzureBlobStorageConsumer( - final AzureBlobStorageDestinationConfig azureBlobStorageDestinationConfig, - final ConfiguredAirbyteCatalog configuredCatalog, - final AzureBlobStorageWriterFactory writerFactory, - final Consumer outputRecordCollector) { + final AzureBlobStorageDestinationConfig azureBlobStorageDestinationConfig, + final ConfiguredAirbyteCatalog configuredCatalog, + final AzureBlobStorageWriterFactory writerFactory, + final Consumer outputRecordCollector) { this.azureBlobStorageDestinationConfig = azureBlobStorageDestinationConfig; this.configuredCatalog = configuredCatalog; this.writerFactory = writerFactory; @@ -93,8 +91,8 @@ protected void startTracked() throws Exception { } private void createContainers(final SpecializedBlobClientBuilder specializedBlobClientBuilder, - final AppendBlobClient appendBlobClient, - final ConfiguredAirbyteStream configuredStream) { + final AppendBlobClient appendBlobClient, + final ConfiguredAirbyteStream configuredStream) { // create container if absent (aka SQl Schema) final BlobContainerClient containerClient = appendBlobClient.getContainerClient(); if (!containerClient.exists()) { @@ -103,7 +101,7 @@ private void createContainers(final SpecializedBlobClientBuilder specializedBlob if (DestinationSyncMode.OVERWRITE.equals(configuredStream.getDestinationSyncMode())) { LOGGER.info("Sync mode is selected to OVERRIDE mode. New container will be automatically" + " created or all data would be overridden (if any) for stream:" + configuredStream - .getStream().getName()); + .getStream().getName()); var blobItemList = StreamSupport.stream(containerClient.listBlobs().spliterator(), false) .collect(Collectors.toList()); blobItemList.forEach(blob -> { @@ -121,7 +119,7 @@ private void createContainers(final SpecializedBlobClientBuilder specializedBlob @Override protected void acceptTracked(final AirbyteMessage airbyteMessage) throws Exception { if (airbyteMessage.getType() == Type.STATE) { - this.lastStateMessage = airbyteMessage; + outputRecordCollector.accept(airbyteMessage); return; } else if (airbyteMessage.getType() != Type.RECORD) { return; @@ -154,10 +152,6 @@ protected void close(final boolean hasFailed) throws Exception { for (final AzureBlobStorageWriter handler : streamNameAndNamespaceToWriters.values()) { handler.close(hasFailed); } - - if (!hasFailed) { - outputRecordCollector.accept(lastStateMessage); - } } private static String getOutputFilename(final Timestamp timestamp) { diff --git a/airbyte-integrations/connectors/destination-azure-blob-storage/src/test/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobRecordConsumerTest.java b/airbyte-integrations/connectors/destination-azure-blob-storage/src/test/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobRecordConsumerTest.java new file mode 100644 index 000000000000..bd3c0972bcda --- /dev/null +++ b/airbyte-integrations/connectors/destination-azure-blob-storage/src/test/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobRecordConsumerTest.java @@ -0,0 +1,47 @@ +package io.airbyte.integrations.destination.azure_blob_storage; + +import io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer; +import io.airbyte.integrations.destination.azure_blob_storage.writer.AzureBlobStorageWriterFactory; +import io.airbyte.integrations.standardtest.destination.PerStreamStateMessageTest; +import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import java.util.function.Consumer; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@DisplayName("AzureBlobRecordConsumer") +@ExtendWith(MockitoExtension.class) +public class AzureBlobRecordConsumerTest extends PerStreamStateMessageTest { + @Mock + private Consumer outputRecordCollector; + + private AzureBlobStorageConsumer consumer; + + @Mock + private AzureBlobStorageDestinationConfig azureBlobStorageDestinationConfig; + + @Mock + private ConfiguredAirbyteCatalog configuredCatalog; + + @Mock + private AzureBlobStorageWriterFactory writerFactory; + + @BeforeEach + public void init() { + consumer = new AzureBlobStorageConsumer(azureBlobStorageDestinationConfig, configuredCatalog, writerFactory, outputRecordCollector); + } + + @Override + protected Consumer getMockedConsumer() { + return outputRecordCollector; + } + + @Override + protected FailureTrackingAirbyteMessageConsumer getMessageConsumer() { + return consumer; + } +}