diff --git a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml index 2cb599631b18..a0fbaa91712f 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml @@ -80,7 +80,7 @@ - name: DynamoDB destinationDefinitionId: 8ccd8909-4e99-4141-b48d-4984b70b2d89 dockerRepository: airbyte/destination-dynamodb - dockerImageTag: 0.1.4 + dockerImageTag: 0.1.5 documentationUrl: https://docs.airbyte.io/integrations/destinations/dynamodb icon: dynamodb.svg releaseStage: alpha diff --git a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml index 349d0913c5d8..59e11741de8f 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml @@ -1141,7 +1141,7 @@ supported_destination_sync_modes: - "overwrite" - "append" -- dockerImage: "airbyte/destination-dynamodb:0.1.4" +- dockerImage: "airbyte/destination-dynamodb:0.1.5" spec: documentationUrl: "https://docs.airbyte.io/integrations/destinations/dynamodb" connectionSpecification: diff --git a/airbyte-integrations/connectors/destination-dynamodb/Dockerfile b/airbyte-integrations/connectors/destination-dynamodb/Dockerfile index 5f80d086a636..027c8b00d737 100644 --- a/airbyte-integrations/connectors/destination-dynamodb/Dockerfile +++ b/airbyte-integrations/connectors/destination-dynamodb/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION destination-dynamodb COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=0.1.4 +LABEL io.airbyte.version=0.1.5 LABEL io.airbyte.name=airbyte/destination-dynamodb diff --git a/airbyte-integrations/connectors/destination-dynamodb/build.gradle b/airbyte-integrations/connectors/destination-dynamodb/build.gradle index 41bb9c7c3437..f9b3c74559ac 100644 --- a/airbyte-integrations/connectors/destination-dynamodb/build.gradle +++ b/airbyte-integrations/connectors/destination-dynamodb/build.gradle @@ -17,6 +17,8 @@ dependencies { implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs) implementation 'com.amazonaws:aws-java-sdk-dynamodb:1.12.47' + testImplementation project(':airbyte-integrations:bases:standard-destination-test') + integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test') integrationTestJavaImplementation project(':airbyte-integrations:connectors:destination-dynamodb') } diff --git a/airbyte-integrations/connectors/destination-dynamodb/src/main/java/io/airbyte/integrations/destination/dynamodb/DynamodbConsumer.java b/airbyte-integrations/connectors/destination-dynamodb/src/main/java/io/airbyte/integrations/destination/dynamodb/DynamodbConsumer.java index efd8ee51f5cc..babd12b13af3 100644 --- a/airbyte-integrations/connectors/destination-dynamodb/src/main/java/io/airbyte/integrations/destination/dynamodb/DynamodbConsumer.java +++ b/airbyte-integrations/connectors/destination-dynamodb/src/main/java/io/airbyte/integrations/destination/dynamodb/DynamodbConsumer.java @@ -29,8 +29,6 @@ public class DynamodbConsumer extends FailureTrackingAirbyteMessageConsumer { private final Consumer outputRecordCollector; private final Map streamNameAndNamespaceToWriters; - private AirbyteMessage lastStateMessage = null; - public DynamodbConsumer(final DynamodbDestinationConfig dynamodbDestinationConfig, final ConfiguredAirbyteCatalog configuredCatalog, final Consumer outputRecordCollector) { @@ -80,7 +78,7 @@ protected void startTracked() throws Exception { @Override protected void acceptTracked(final AirbyteMessage airbyteMessage) throws Exception { if (airbyteMessage.getType() == AirbyteMessage.Type.STATE) { - this.lastStateMessage = airbyteMessage; + outputRecordCollector.accept(airbyteMessage); return; } else if (airbyteMessage.getType() != AirbyteMessage.Type.RECORD) { return; @@ -105,10 +103,6 @@ protected void close(final boolean hasFailed) throws Exception { for (final DynamodbWriter handler : streamNameAndNamespaceToWriters.values()) { handler.close(hasFailed); } - // DynamoDB stream uploader is all or nothing if a failure happens in the destination. - if (!hasFailed) { - outputRecordCollector.accept(lastStateMessage); - } } } diff --git a/airbyte-integrations/connectors/destination-dynamodb/src/test/java/io/airbyte/integrations/destination/dynamodb/DynamodbConsumerTest.java b/airbyte-integrations/connectors/destination-dynamodb/src/test/java/io/airbyte/integrations/destination/dynamodb/DynamodbConsumerTest.java new file mode 100644 index 000000000000..73b544aacaea --- /dev/null +++ b/airbyte-integrations/connectors/destination-dynamodb/src/test/java/io/airbyte/integrations/destination/dynamodb/DynamodbConsumerTest.java @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.dynamodb; + +import io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer; +import io.airbyte.integrations.standardtest.destination.PerStreamStateMessageTest; +import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import java.util.function.Consumer; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class DynamodbConsumerTest extends PerStreamStateMessageTest { + + @Mock + private Consumer outputRecordCollector; + + @InjectMocks + private DynamodbConsumer consumer; + + @Mock + private DynamodbDestinationConfig destinationConfig; + + @Mock + private ConfiguredAirbyteCatalog catalog; + + @BeforeEach + private void init() { + consumer = new DynamodbConsumer(destinationConfig, catalog, outputRecordCollector); + } + + @Override + protected Consumer getMockedConsumer() { + return outputRecordCollector; + } + + @Override + protected FailureTrackingAirbyteMessageConsumer getMessageConsumer() { + return consumer; + } + +} diff --git a/docs/integrations/destinations/dynamodb.md b/docs/integrations/destinations/dynamodb.md index 727218d51386..b02293b2bd30 100644 --- a/docs/integrations/destinations/dynamodb.md +++ b/docs/integrations/destinations/dynamodb.md @@ -58,6 +58,7 @@ This connector by default uses 10 capacity units for both Read and Write in Dyna | Version | Date | Pull Request | Subject | | :--- | :--- | :--- | :--- | +| 0.1.5 | 2022-08-05 | [\#15350](https://github.com/airbytehq/airbyte/pull/15350) | Added per-stream handling | | 0.1.4 | 2022-06-16 | [\#13852](https://github.com/airbytehq/airbyte/pull/13852) | Updated stacktrace format for any trace message errors | | 0.1.3 | 2022-05-17 | [12820](https://github.com/airbytehq/airbyte/pull/12820) | Improved 'check' operation performance | | 0.1.2 | 2022-02-14 | [10256](https://github.com/airbytehq/airbyte/pull/10256) | Add `-XX:+ExitOnOutOfMemoryError` JVM option |