diff --git a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml index 0567df71b651..6e0f45f3a816 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml @@ -1,28 +1,28 @@ - destinationDefinitionId: a625d593-bba5-4a1c-a53d-2d246268a816 name: Local JSON dockerRepository: airbyte/destination-local-json - dockerImageTag: 0.2.7 + dockerImageTag: 0.2.8 documentationUrl: https://docs.airbyte.io/integrations/destinations/local-json - destinationDefinitionId: 8be1cf83-fde1-477f-a4ad-318d23c9f3c6 name: Local CSV dockerRepository: airbyte/destination-csv - dockerImageTag: 0.2.7 + dockerImageTag: 0.2.8 documentationUrl: https://docs.airbyte.io/integrations/destinations/local-csv - destinationDefinitionId: 25c5221d-dce2-4163-ade9-739ef790f503 name: Postgres dockerRepository: airbyte/destination-postgres - dockerImageTag: 0.3.7 + dockerImageTag: 0.3.8 documentationUrl: https://docs.airbyte.io/integrations/destinations/postgres icon: postgresql.svg - destinationDefinitionId: 22f6c74f-5699-40ff-833c-4a879ea40133 name: BigQuery dockerRepository: airbyte/destination-bigquery - dockerImageTag: 0.3.7 + dockerImageTag: 0.3.8 documentationUrl: https://docs.airbyte.io/integrations/destinations/bigquery - destinationDefinitionId: 079d5540-f236-4294-ba7c-ade8fd918496 name: BigQuery (denormalized typed struct) dockerRepository: airbyte/destination-bigquery-denormalized - dockerImageTag: 0.1.0 + dockerImageTag: 0.1.1 documentationUrl: https://docs.airbyte.io/integrations/destinations/bigquery - destinationDefinitionId: ca8f6566-e555-4b40-943a-545bf123117a name: Google Cloud Storage (GCS) @@ -37,7 +37,7 @@ - destinationDefinitionId: 424892c4-daac-4491-b35d-c6688ba547ba name: Snowflake dockerRepository: airbyte/destination-snowflake - dockerImageTag: 0.3.10 + dockerImageTag: 0.3.11 documentationUrl: https://docs.airbyte.io/integrations/destinations/snowflake - destinationDefinitionId: 4816b78f-1489-44c1-9060-4b19d5fa9362 name: S3 @@ -47,26 +47,26 @@ - destinationDefinitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc name: Redshift dockerRepository: airbyte/destination-redshift - dockerImageTag: 0.3.11 + dockerImageTag: 0.3.12 documentationUrl: https://docs.airbyte.io/integrations/destinations/redshift icon: redshift.svg - destinationDefinitionId: af7c921e-5892-4ff2-b6c1-4a5ab258fb7e name: MeiliSearch dockerRepository: airbyte/destination-meilisearch - dockerImageTag: 0.2.7 + dockerImageTag: 0.2.8 documentationUrl: https://docs.airbyte.io/integrations/destinations/meilisearch - destinationDefinitionId: ca81ee7c-3163-4246-af40-094cc31e5e42 name: MySQL dockerRepository: airbyte/destination-mysql - dockerImageTag: 0.1.8 + dockerImageTag: 0.1.9 documentationUrl: https://docs.airbyte.io/integrations/destinations/mysql - destinationDefinitionId: d4353156-9217-4cad-8dd7-c108fd4f74cf name: MS SQL Server dockerRepository: airbyte/destination-mssql - dockerImageTag: 0.1.5 + dockerImageTag: 0.1.6 documentationUrl: https://docs.airbyte.io/integrations/destinations/mssql - destinationDefinitionId: 3986776d-2319-4de9-8af8-db14c0996e72 name: Oracle (Alpha) dockerRepository: airbyte/destination-oracle - dockerImageTag: 0.1.2 + dockerImageTag: 0.1.3 documentationUrl: https://docs.airbyte.io/integrations/destinations/oracle diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java index ed2f242f13ac..5ebdd25d2069 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java @@ -166,8 +166,7 @@ static void consumeWriteStream(AirbyteMessageConsumer consumer) throws Exception if (singerMessageOptional.isPresent()) { consumer.accept(singerMessageOptional.get()); } else { - // todo (cgardens) - decide if we want to throw here instead. - LOGGER.error(inputString); + LOGGER.error("Received invalid message: " + inputString); } } } diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.java index a01fc7f8d52f..525c48ae141d 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.java +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.java @@ -66,6 +66,28 @@ *

* All other message types are ignored. *

+ * + *

+ * Throughout the lifecycle of the consumer, messages get promoted from buffered to flushed to + * committed. A record message when it is received is immediately buffered. When the buffer fills + * up, all buffered records are flushed out of memory using the user-provided recordWriter. When + * this flush happens, a state message is moved from pending to flushed. On close, if the + * user-provided onClose function is successful, then the flushed state record is considered + * committed and is then emitted. We expect this class to only ever emit either 1 state message (in + * the case of a full or partial success) or 0 state messages (in the case where the onClose step + * was never reached or did not complete without exception). + *

+ * + *

+ * When a record is "flushed" it is moved from the docker container to the destination. By + * convention, it is usually placed in some sort of temporary storage on the destination (e.g. a + * temporary database or file store). The logic in close handles committing the temporary + * representation data to the final store (e.g. final table). In the case of Copy destinations they + * often have additional temporary stores. The common pattern for copy destination is that flush + * pushes the data into cloud storage and then close copies from cloud storage to a temporary table + * AND then copies from the temporary table into the final table. This abstraction is blind to that + * detail as it implementation detail of how copy destinations implement close. + *

*/ public class BufferedStreamConsumer extends FailureTrackingAirbyteMessageConsumer implements AirbyteMessageConsumer { @@ -85,7 +107,7 @@ public class BufferedStreamConsumer extends FailureTrackingAirbyteMessageConsume private boolean hasStarted; private boolean hasClosed; - private AirbyteMessage lastCommittedState; + private AirbyteMessage lastFlushedState; private AirbyteMessage pendingState; public BufferedStreamConsumer(Consumer outputRecordCollector, @@ -165,7 +187,7 @@ private void flushQueueToDestination() throws Exception { } if (pendingState != null) { - lastCommittedState = pendingState; + lastFlushedState = pendingState; pendingState = null; } } @@ -192,15 +214,23 @@ protected void close(boolean hasFailed) throws Exception { } try { - onClose.accept(hasFailed); - // todo (cgardens) - For now we are using this conditional to maintain existing behavior. When we - // enable checkpointing, we will need to get feedback from onClose on whether any data was persisted - // or not. If it was then, the state message will be emitted. - if (!hasFailed && lastCommittedState != null) { - outputRecordCollector.accept(lastCommittedState); + // if no state was was emitted (i.e. full refresh), if there were still no failures, then we can + // still succeed. + if (lastFlushedState == null) { + onClose.accept(hasFailed); + } else { + // if any state message flushed that means we can still go for at least a partial success. + onClose.accept(false); + } + + // if one close succeeds without exception then we can emit the state record because it means its + // records were not only flushed, but committed. + if (lastFlushedState != null) { + outputRecordCollector.accept(lastFlushedState); } } catch (Exception e) { - LOGGER.error("on close failed.", e); + LOGGER.error("Close failed.", e); + throw e; } } diff --git a/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumerTest.java b/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumerTest.java index 5857c36e4d9f..9b13066153ec 100644 --- a/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumerTest.java +++ b/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumerTest.java @@ -24,13 +24,17 @@ package io.airbyte.integrations.destination.buffered_stream_consumer; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; import io.airbyte.commons.concurrency.VoidCallable; import io.airbyte.commons.functional.CheckedConsumer; import io.airbyte.commons.functional.CheckedFunction; @@ -82,7 +86,7 @@ public class BufferedStreamConsumerTest { private RecordWriter recordWriter; private CheckedConsumer onClose; private CheckedFunction isValidRecord; - private Consumer checkpointConsumer; + private Consumer outputRecordCollector; @SuppressWarnings("unchecked") @BeforeEach @@ -91,9 +95,9 @@ void setup() throws Exception { recordWriter = mock(RecordWriter.class); onClose = mock(CheckedConsumer.class); isValidRecord = mock(CheckedFunction.class); - checkpointConsumer = mock(Consumer.class); + outputRecordCollector = mock(Consumer.class); consumer = new BufferedStreamConsumer( - checkpointConsumer, + outputRecordCollector, onStart, recordWriter, onClose, @@ -117,7 +121,7 @@ void test1StreamWith1State() throws Exception { verifyRecords(STREAM_NAME, SCHEMA_NAME, expectedRecords); - verify(checkpointConsumer).accept(STATE_MESSAGE1); + verify(outputRecordCollector).accept(STATE_MESSAGE1); } @Test @@ -134,7 +138,7 @@ void test1StreamWith2State() throws Exception { verifyRecords(STREAM_NAME, SCHEMA_NAME, expectedRecords); - verify(checkpointConsumer, times(1)).accept(STATE_MESSAGE2); + verify(outputRecordCollector, times(1)).accept(STATE_MESSAGE2); } @Test @@ -150,7 +154,6 @@ void test1StreamWith0State() throws Exception { verifyRecords(STREAM_NAME, SCHEMA_NAME, expectedRecords); } - // todo (cgardens) - split testing buffer flushing into own test. @Test void test1StreamWithStateAndThenMoreRecordsBiggerThanBuffer() throws Exception { final List expectedRecordsBatch1 = getNRecords(10); @@ -167,7 +170,7 @@ void test1StreamWithStateAndThenMoreRecordsBiggerThanBuffer() throws Exception { verifyRecords(STREAM_NAME, SCHEMA_NAME, expectedRecordsBatch1); verifyRecords(STREAM_NAME, SCHEMA_NAME, expectedRecordsBatch2); - verify(checkpointConsumer).accept(STATE_MESSAGE1); + verify(outputRecordCollector).accept(STATE_MESSAGE1); } @Test @@ -177,7 +180,7 @@ void test1StreamWithStateAndThenMoreRecordsSmallerThanBuffer() throws Exception // consumer with big enough buffered that we see both batches are flushed in one go. final BufferedStreamConsumer consumer = new BufferedStreamConsumer( - checkpointConsumer, + outputRecordCollector, onStart, recordWriter, onClose, @@ -193,6 +196,75 @@ void test1StreamWithStateAndThenMoreRecordsSmallerThanBuffer() throws Exception verifyStartAndClose(); + final List expectedRecords = Lists.newArrayList(expectedRecordsBatch1, expectedRecordsBatch2) + .stream() + .flatMap(Collection::stream) + .collect(Collectors.toList()); + verifyRecords(STREAM_NAME, SCHEMA_NAME, expectedRecords); + + verify(outputRecordCollector).accept(STATE_MESSAGE1); + } + + @Test + void testExceptionAfterOneStateMessage() throws Exception { + final List expectedRecordsBatch1 = getNRecords(10); + final List expectedRecordsBatch2 = getNRecords(10, 20); + final List expectedRecordsBatch3 = getNRecords(20, 21); + + consumer.start(); + consumeRecords(consumer, expectedRecordsBatch1); + consumer.accept(STATE_MESSAGE1); + consumeRecords(consumer, expectedRecordsBatch2); + when(isValidRecord.apply(any())).thenThrow(new IllegalStateException("induced exception")); + assertThrows(IllegalStateException.class, () -> consumer.accept(expectedRecordsBatch3.get(0))); + consumer.close(); + + verifyStartAndClose(); + + verifyRecords(STREAM_NAME, SCHEMA_NAME, expectedRecordsBatch1); + + verify(outputRecordCollector).accept(STATE_MESSAGE1); + } + + @Test + void testExceptionAfterNoStateMessages() throws Exception { + final List expectedRecordsBatch1 = getNRecords(10); + final List expectedRecordsBatch2 = getNRecords(10, 20); + final List expectedRecordsBatch3 = getNRecords(20, 21); + + consumer.start(); + consumeRecords(consumer, expectedRecordsBatch1); + consumeRecords(consumer, expectedRecordsBatch2); + when(isValidRecord.apply(any())).thenThrow(new IllegalStateException("induced exception")); + assertThrows(IllegalStateException.class, () -> consumer.accept(expectedRecordsBatch3.get(0))); + consumer.close(); + + verify(onStart).call(); + verify(onClose).accept(true); + + verifyRecords(STREAM_NAME, SCHEMA_NAME, expectedRecordsBatch1); + + verifyNoInteractions(outputRecordCollector); + } + + @Test + void testExceptionDuringOnClose() throws Exception { + doThrow(new IllegalStateException("induced exception")).when(onClose).accept(false); + + final List expectedRecordsBatch1 = getNRecords(10); + final List expectedRecordsBatch2 = getNRecords(10, 20); + + consumer.start(); + consumeRecords(consumer, expectedRecordsBatch1); + consumer.accept(STATE_MESSAGE1); + consumeRecords(consumer, expectedRecordsBatch2); + assertThrows(IllegalStateException.class, () -> consumer.close()); + + verifyStartAndClose(); + + verifyRecords(STREAM_NAME, SCHEMA_NAME, expectedRecordsBatch1); + + verifyNoInteractions(outputRecordCollector); } @Test @@ -215,7 +287,7 @@ void test2StreamWith1State() throws Exception { verifyRecords(STREAM_NAME, SCHEMA_NAME, expectedRecordsStream1); verifyRecords(STREAM_NAME2, SCHEMA_NAME, expectedRecordsStream2); - verify(checkpointConsumer).accept(STATE_MESSAGE1); + verify(outputRecordCollector).accept(STATE_MESSAGE1); } @Test @@ -239,7 +311,7 @@ void test2StreamWith2State() throws Exception { verifyRecords(STREAM_NAME, SCHEMA_NAME, expectedRecordsStream1); verifyRecords(STREAM_NAME2, SCHEMA_NAME, expectedRecordsStream2); - verify(checkpointConsumer, times(1)).accept(STATE_MESSAGE2); + verify(outputRecordCollector, times(1)).accept(STATE_MESSAGE2); } private void verifyStartAndClose() throws Exception { diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/Dockerfile b/airbyte-integrations/connectors/destination-bigquery-denormalized/Dockerfile index 8c59152b4805..be87de7e5042 100644 --- a/airbyte-integrations/connectors/destination-bigquery-denormalized/Dockerfile +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/Dockerfile @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar RUN tar xf ${APPLICATION}.tar --strip-components=1 -LABEL io.airbyte.version=0.1.0 +LABEL io.airbyte.version=0.1.1 LABEL io.airbyte.name=airbyte/destination-bigquery-denormalized diff --git a/airbyte-integrations/connectors/destination-bigquery/Dockerfile b/airbyte-integrations/connectors/destination-bigquery/Dockerfile index 8238127d56fb..59bf25145cdd 100644 --- a/airbyte-integrations/connectors/destination-bigquery/Dockerfile +++ b/airbyte-integrations/connectors/destination-bigquery/Dockerfile @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar RUN tar xf ${APPLICATION}.tar --strip-components=1 -LABEL io.airbyte.version=0.3.7 +LABEL io.airbyte.version=0.3.8 LABEL io.airbyte.name=airbyte/destination-bigquery diff --git a/airbyte-integrations/connectors/destination-csv/Dockerfile b/airbyte-integrations/connectors/destination-csv/Dockerfile index bbc24451788e..fdaacb200ddb 100644 --- a/airbyte-integrations/connectors/destination-csv/Dockerfile +++ b/airbyte-integrations/connectors/destination-csv/Dockerfile @@ -7,5 +7,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar RUN tar xf ${APPLICATION}.tar --strip-components=1 -LABEL io.airbyte.version=0.2.7 +LABEL io.airbyte.version=0.2.8 LABEL io.airbyte.name=airbyte/destination-csv diff --git a/airbyte-integrations/connectors/destination-local-json/Dockerfile b/airbyte-integrations/connectors/destination-local-json/Dockerfile index 47e0b28c60c5..f355b2ef7b5c 100644 --- a/airbyte-integrations/connectors/destination-local-json/Dockerfile +++ b/airbyte-integrations/connectors/destination-local-json/Dockerfile @@ -7,5 +7,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar RUN tar xf ${APPLICATION}.tar --strip-components=1 -LABEL io.airbyte.version=0.2.7 +LABEL io.airbyte.version=0.2.8 LABEL io.airbyte.name=airbyte/destination-local-json diff --git a/airbyte-integrations/connectors/destination-meilisearch/Dockerfile b/airbyte-integrations/connectors/destination-meilisearch/Dockerfile index 466d243cc5e7..fd29451c32c5 100644 --- a/airbyte-integrations/connectors/destination-meilisearch/Dockerfile +++ b/airbyte-integrations/connectors/destination-meilisearch/Dockerfile @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar RUN tar xf ${APPLICATION}.tar --strip-components=1 -LABEL io.airbyte.version=0.2.7 +LABEL io.airbyte.version=0.2.8 LABEL io.airbyte.name=airbyte/destination-meilisearch diff --git a/airbyte-integrations/connectors/destination-mssql/Dockerfile b/airbyte-integrations/connectors/destination-mssql/Dockerfile index 927c4197ffed..9018245d3f82 100644 --- a/airbyte-integrations/connectors/destination-mssql/Dockerfile +++ b/airbyte-integrations/connectors/destination-mssql/Dockerfile @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar RUN tar xf ${APPLICATION}.tar --strip-components=1 -LABEL io.airbyte.version=0.1.5 +LABEL io.airbyte.version=0.1.6 LABEL io.airbyte.name=airbyte/destination-mssql diff --git a/airbyte-integrations/connectors/destination-mysql/Dockerfile b/airbyte-integrations/connectors/destination-mysql/Dockerfile index 5bc950ab07e3..29e6ee7c8191 100644 --- a/airbyte-integrations/connectors/destination-mysql/Dockerfile +++ b/airbyte-integrations/connectors/destination-mysql/Dockerfile @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar RUN tar xf ${APPLICATION}.tar --strip-components=1 -LABEL io.airbyte.version=0.1.8 +LABEL io.airbyte.version=0.1.9 LABEL io.airbyte.name=airbyte/destination-mysql diff --git a/airbyte-integrations/connectors/destination-oracle/Dockerfile b/airbyte-integrations/connectors/destination-oracle/Dockerfile index a8b34afebb62..d02e7ed230b5 100644 --- a/airbyte-integrations/connectors/destination-oracle/Dockerfile +++ b/airbyte-integrations/connectors/destination-oracle/Dockerfile @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar RUN tar xf ${APPLICATION}.tar --strip-components=1 -LABEL io.airbyte.version=0.1.2 +LABEL io.airbyte.version=0.1.3 LABEL io.airbyte.name=airbyte/destination-oracle diff --git a/airbyte-integrations/connectors/destination-postgres/Dockerfile b/airbyte-integrations/connectors/destination-postgres/Dockerfile index 64cccf52c761..6dcf5092a4e5 100644 --- a/airbyte-integrations/connectors/destination-postgres/Dockerfile +++ b/airbyte-integrations/connectors/destination-postgres/Dockerfile @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar RUN tar xf ${APPLICATION}.tar --strip-components=1 -LABEL io.airbyte.version=0.3.7 +LABEL io.airbyte.version=0.3.8 LABEL io.airbyte.name=airbyte/destination-postgres diff --git a/airbyte-integrations/connectors/destination-redshift/Dockerfile b/airbyte-integrations/connectors/destination-redshift/Dockerfile index 5fec62e11c03..8943eed911c0 100644 --- a/airbyte-integrations/connectors/destination-redshift/Dockerfile +++ b/airbyte-integrations/connectors/destination-redshift/Dockerfile @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar RUN tar xf ${APPLICATION}.tar --strip-components=1 -LABEL io.airbyte.version=0.3.11 +LABEL io.airbyte.version=0.3.12 LABEL io.airbyte.name=airbyte/destination-redshift diff --git a/airbyte-integrations/connectors/destination-snowflake/Dockerfile b/airbyte-integrations/connectors/destination-snowflake/Dockerfile index f2031c35e37b..270df9074f85 100644 --- a/airbyte-integrations/connectors/destination-snowflake/Dockerfile +++ b/airbyte-integrations/connectors/destination-snowflake/Dockerfile @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar RUN tar xf ${APPLICATION}.tar --strip-components=1 -LABEL io.airbyte.version=0.3.10 +LABEL io.airbyte.version=0.3.11 LABEL io.airbyte.name=airbyte/destination-snowflake