Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉 Destination DynamoDB: Handle per-stream state #15350

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@
- name: DynamoDB
destinationDefinitionId: 8ccd8909-4e99-4141-b48d-4984b70b2d89
dockerRepository: airbyte/destination-dynamodb
dockerImageTag: 0.1.4
dockerImageTag: 0.1.5
documentationUrl: https://docs.airbyte.io/integrations/destinations/dynamodb
icon: dynamodb.svg
releaseStage: alpha
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1141,7 +1141,7 @@
supported_destination_sync_modes:
- "overwrite"
- "append"
- dockerImage: "airbyte/destination-dynamodb:0.1.4"
- dockerImage: "airbyte/destination-dynamodb:0.1.5"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/destinations/dynamodb"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION destination-dynamodb

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.1.4
LABEL io.airbyte.version=0.1.5
LABEL io.airbyte.name=airbyte/destination-dynamodb
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ dependencies {
implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs)
implementation 'com.amazonaws:aws-java-sdk-dynamodb:1.12.47'

testImplementation project(':airbyte-integrations:bases:standard-destination-test')

integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test')
integrationTestJavaImplementation project(':airbyte-integrations:connectors:destination-dynamodb')
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ public class DynamodbConsumer extends FailureTrackingAirbyteMessageConsumer {
private final Consumer<AirbyteMessage> outputRecordCollector;
private final Map<AirbyteStreamNameNamespacePair, DynamodbWriter> streamNameAndNamespaceToWriters;

private AirbyteMessage lastStateMessage = null;

public DynamodbConsumer(final DynamodbDestinationConfig dynamodbDestinationConfig,
final ConfiguredAirbyteCatalog configuredCatalog,
final Consumer<AirbyteMessage> outputRecordCollector) {
Expand Down Expand Up @@ -80,7 +78,7 @@ protected void startTracked() throws Exception {
@Override
protected void acceptTracked(final AirbyteMessage airbyteMessage) throws Exception {
if (airbyteMessage.getType() == AirbyteMessage.Type.STATE) {
this.lastStateMessage = airbyteMessage;
outputRecordCollector.accept(airbyteMessage);
return;
} else if (airbyteMessage.getType() != AirbyteMessage.Type.RECORD) {
return;
Expand All @@ -105,10 +103,6 @@ protected void close(final boolean hasFailed) throws Exception {
for (final DynamodbWriter handler : streamNameAndNamespaceToWriters.values()) {
handler.close(hasFailed);
}
// DynamoDB stream uploader is all or nothing if a failure happens in the destination.
if (!hasFailed) {
outputRecordCollector.accept(lastStateMessage);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.dynamodb;

import io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer;
import io.airbyte.integrations.standardtest.destination.PerStreamStateMessageTest;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import java.util.function.Consumer;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

@ExtendWith(MockitoExtension.class)
class DynamodbConsumerTest extends PerStreamStateMessageTest {

@Mock
private Consumer<AirbyteMessage> outputRecordCollector;

@InjectMocks
private DynamodbConsumer consumer;

@Mock
private DynamodbDestinationConfig destinationConfig;

@Mock
private ConfiguredAirbyteCatalog catalog;

@BeforeEach
private void init() {
consumer = new DynamodbConsumer(destinationConfig, catalog, outputRecordCollector);
}

@Override
protected Consumer<AirbyteMessage> getMockedConsumer() {
return outputRecordCollector;
}

@Override
protected FailureTrackingAirbyteMessageConsumer getMessageConsumer() {
return consumer;
}

}
1 change: 1 addition & 0 deletions docs/integrations/destinations/dynamodb.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ This connector by default uses 10 capacity units for both Read and Write in Dyna

| Version | Date | Pull Request | Subject |
| :--- | :--- | :--- | :--- |
| 0.1.5 | 2022-08-05 | [\#15350](https://github.com/airbytehq/airbyte/pull/15350) | Added per-stream handling |
| 0.1.4 | 2022-06-16 | [\#13852](https://github.com/airbytehq/airbyte/pull/13852) | Updated stacktrace format for any trace message errors |
| 0.1.3 | 2022-05-17 | [12820](https://github.com/airbytehq/airbyte/pull/12820) | Improved 'check' operation performance |
| 0.1.2 | 2022-02-14 | [10256](https://github.com/airbytehq/airbyte/pull/10256) | Add `-XX:+ExitOnOutOfMemoryError` JVM option |
Expand Down