Skip to content

Commit

Permalink
15302: Destination Azure Blob Storage: Handle per-stream state (#15318)
Browse files Browse the repository at this point in the history
* 15302: Azure blob destination consumer fixed

* 15302: Unit test added

* 15302: Unit test added

* 15318: test fix
  • Loading branch information
kimerinn authored Aug 8, 2022
1 parent b5dc550 commit 211d331
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ dependencies {
implementation 'com.azure:azure-storage-blob:12.12.0'
implementation 'org.apache.commons:commons-csv:1.4'

testImplementation project(':airbyte-integrations:bases:standard-destination-test')

testImplementation 'org.apache.commons:commons-lang3:3.11'

integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,11 @@ public class AzureBlobStorageConsumer extends FailureTrackingAirbyteMessageConsu
private final Consumer<AirbyteMessage> outputRecordCollector;
private final Map<AirbyteStreamNameNamespacePair, AzureBlobStorageWriter> streamNameAndNamespaceToWriters;

private AirbyteMessage lastStateMessage = null;

public AzureBlobStorageConsumer(
final AzureBlobStorageDestinationConfig azureBlobStorageDestinationConfig,
final ConfiguredAirbyteCatalog configuredCatalog,
final AzureBlobStorageWriterFactory writerFactory,
final Consumer<AirbyteMessage> outputRecordCollector) {
final AzureBlobStorageDestinationConfig azureBlobStorageDestinationConfig,
final ConfiguredAirbyteCatalog configuredCatalog,
final AzureBlobStorageWriterFactory writerFactory,
final Consumer<AirbyteMessage> outputRecordCollector) {
this.azureBlobStorageDestinationConfig = azureBlobStorageDestinationConfig;
this.configuredCatalog = configuredCatalog;
this.writerFactory = writerFactory;
Expand Down Expand Up @@ -93,8 +91,8 @@ protected void startTracked() throws Exception {
}

private void createContainers(final SpecializedBlobClientBuilder specializedBlobClientBuilder,
final AppendBlobClient appendBlobClient,
final ConfiguredAirbyteStream configuredStream) {
final AppendBlobClient appendBlobClient,
final ConfiguredAirbyteStream configuredStream) {
// create container if absent (aka SQl Schema)
final BlobContainerClient containerClient = appendBlobClient.getContainerClient();
if (!containerClient.exists()) {
Expand All @@ -103,7 +101,7 @@ private void createContainers(final SpecializedBlobClientBuilder specializedBlob
if (DestinationSyncMode.OVERWRITE.equals(configuredStream.getDestinationSyncMode())) {
LOGGER.info("Sync mode is selected to OVERRIDE mode. New container will be automatically"
+ " created or all data would be overridden (if any) for stream:" + configuredStream
.getStream().getName());
.getStream().getName());
var blobItemList = StreamSupport.stream(containerClient.listBlobs().spliterator(), false)
.collect(Collectors.toList());
blobItemList.forEach(blob -> {
Expand All @@ -121,7 +119,7 @@ private void createContainers(final SpecializedBlobClientBuilder specializedBlob
@Override
protected void acceptTracked(final AirbyteMessage airbyteMessage) throws Exception {
if (airbyteMessage.getType() == Type.STATE) {
this.lastStateMessage = airbyteMessage;
outputRecordCollector.accept(airbyteMessage);
return;
} else if (airbyteMessage.getType() != Type.RECORD) {
return;
Expand Down Expand Up @@ -154,10 +152,6 @@ protected void close(final boolean hasFailed) throws Exception {
for (final AzureBlobStorageWriter handler : streamNameAndNamespaceToWriters.values()) {
handler.close(hasFailed);
}

if (!hasFailed) {
outputRecordCollector.accept(lastStateMessage);
}
}

private static String getOutputFilename(final Timestamp timestamp) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package io.airbyte.integrations.destination.azure_blob_storage;

import io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer;
import io.airbyte.integrations.destination.azure_blob_storage.writer.AzureBlobStorageWriterFactory;
import io.airbyte.integrations.standardtest.destination.PerStreamStateMessageTest;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import java.util.function.Consumer;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

@DisplayName("AzureBlobRecordConsumer")
@ExtendWith(MockitoExtension.class)
public class AzureBlobRecordConsumerTest extends PerStreamStateMessageTest {
@Mock
private Consumer<AirbyteMessage> outputRecordCollector;

private AzureBlobStorageConsumer consumer;

@Mock
private AzureBlobStorageDestinationConfig azureBlobStorageDestinationConfig;

@Mock
private ConfiguredAirbyteCatalog configuredCatalog;

@Mock
private AzureBlobStorageWriterFactory writerFactory;

@BeforeEach
public void init() {
consumer = new AzureBlobStorageConsumer(azureBlobStorageDestinationConfig, configuredCatalog, writerFactory, outputRecordCollector);
}

@Override
protected Consumer<AirbyteMessage> getMockedConsumer() {
return outputRecordCollector;
}

@Override
protected FailureTrackingAirbyteMessageConsumer getMessageConsumer() {
return consumer;
}
}

0 comments on commit 211d331

Please sign in to comment.