Skip to content

Commit

Permalink
🎉 Destination DynamoDB: Handle per-stream state (#15350)
Browse files Browse the repository at this point in the history
* [15304] 🎉 Destination DynamoDB: Handle per-stream state

* [15304] 🎉 Destination DynamoDB: Handle per-stream state

* [15304] 🎉 Destination DynamoDB: Handle per-stream state

* [15304] 🎉 Destination DynamoDB: Handle per-stream state

* auto-bump connector version [ci skip]

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
1 parent f0e1fbf commit ac33e19
Show file tree
Hide file tree
Showing 7 changed files with 55 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@
- name: DynamoDB
destinationDefinitionId: 8ccd8909-4e99-4141-b48d-4984b70b2d89
dockerRepository: airbyte/destination-dynamodb
dockerImageTag: 0.1.4
dockerImageTag: 0.1.5
documentationUrl: https://docs.airbyte.io/integrations/destinations/dynamodb
icon: dynamodb.svg
releaseStage: alpha
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1141,7 +1141,7 @@
supported_destination_sync_modes:
- "overwrite"
- "append"
- dockerImage: "airbyte/destination-dynamodb:0.1.4"
- dockerImage: "airbyte/destination-dynamodb:0.1.5"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/destinations/dynamodb"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION destination-dynamodb

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.1.4
LABEL io.airbyte.version=0.1.5
LABEL io.airbyte.name=airbyte/destination-dynamodb
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ dependencies {
implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs)
implementation 'com.amazonaws:aws-java-sdk-dynamodb:1.12.47'

testImplementation project(':airbyte-integrations:bases:standard-destination-test')

integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test')
integrationTestJavaImplementation project(':airbyte-integrations:connectors:destination-dynamodb')
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ public class DynamodbConsumer extends FailureTrackingAirbyteMessageConsumer {
private final Consumer<AirbyteMessage> outputRecordCollector;
private final Map<AirbyteStreamNameNamespacePair, DynamodbWriter> streamNameAndNamespaceToWriters;

private AirbyteMessage lastStateMessage = null;

public DynamodbConsumer(final DynamodbDestinationConfig dynamodbDestinationConfig,
final ConfiguredAirbyteCatalog configuredCatalog,
final Consumer<AirbyteMessage> outputRecordCollector) {
Expand Down Expand Up @@ -80,7 +78,7 @@ protected void startTracked() throws Exception {
@Override
protected void acceptTracked(final AirbyteMessage airbyteMessage) throws Exception {
if (airbyteMessage.getType() == AirbyteMessage.Type.STATE) {
this.lastStateMessage = airbyteMessage;
outputRecordCollector.accept(airbyteMessage);
return;
} else if (airbyteMessage.getType() != AirbyteMessage.Type.RECORD) {
return;
Expand All @@ -105,10 +103,6 @@ protected void close(final boolean hasFailed) throws Exception {
for (final DynamodbWriter handler : streamNameAndNamespaceToWriters.values()) {
handler.close(hasFailed);
}
// DynamoDB stream uploader is all or nothing if a failure happens in the destination.
if (!hasFailed) {
outputRecordCollector.accept(lastStateMessage);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.dynamodb;

import io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer;
import io.airbyte.integrations.standardtest.destination.PerStreamStateMessageTest;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import java.util.function.Consumer;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

@ExtendWith(MockitoExtension.class)
class DynamodbConsumerTest extends PerStreamStateMessageTest {

@Mock
private Consumer<AirbyteMessage> outputRecordCollector;

@InjectMocks
private DynamodbConsumer consumer;

@Mock
private DynamodbDestinationConfig destinationConfig;

@Mock
private ConfiguredAirbyteCatalog catalog;

@BeforeEach
private void init() {
consumer = new DynamodbConsumer(destinationConfig, catalog, outputRecordCollector);
}

@Override
protected Consumer<AirbyteMessage> getMockedConsumer() {
return outputRecordCollector;
}

@Override
protected FailureTrackingAirbyteMessageConsumer getMessageConsumer() {
return consumer;
}

}
1 change: 1 addition & 0 deletions docs/integrations/destinations/dynamodb.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ This connector by default uses 10 capacity units for both Read and Write in Dyna

| Version | Date | Pull Request | Subject |
| :--- | :--- | :--- | :--- |
| 0.1.5 | 2022-08-05 | [\#15350](https://github.com/airbytehq/airbyte/pull/15350) | Added per-stream handling |
| 0.1.4 | 2022-06-16 | [\#13852](https://github.com/airbytehq/airbyte/pull/13852) | Updated stacktrace format for any trace message errors |
| 0.1.3 | 2022-05-17 | [12820](https://github.com/airbytehq/airbyte/pull/12820) | Improved 'check' operation performance |
| 0.1.2 | 2022-02-14 | [10256](https://github.com/airbytehq/airbyte/pull/10256) | Add `-XX:+ExitOnOutOfMemoryError` JVM option |
Expand Down

0 comments on commit ac33e19

Please sign in to comment.