From c0c8c9aba406654f06ee05212e2bdf730a030d19 Mon Sep 17 00:00:00 2001
From: cgardens
Date: Sat, 22 May 2021 14:13:30 -0700
Subject: [PATCH 1/8] attempt to commit if any records were flushed
---
.../BufferedStreamConsumer.java | 28 +++++++++++++------
1 file changed, 19 insertions(+), 9 deletions(-)
diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.java
index a01fc7f8d52f..6f495d1703af 100644
--- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.java
+++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.java
@@ -66,6 +66,17 @@
*
* All other message types are ignored.
*
+ *
+ *
+ * Throughout the lifecycle of the consumer, records get promoted from buffered to flushed to
+ * committed. A record when it is received is immediately buffered. When the buffer fills up, all
+ * buffered records are flushed out of memory using the user-provided recordWriter. When this flush
+ * happens, a state message is moved from pending to flushed. On close, if the user-provided onClose
+ * function is successful, then the flushed state record is considered committed and is then
+ * emitted. We expect this class to only ever emit either 1 state message (in the case of a full or
+ * partial success) or 0 state messages (in the case where the onClose step was never reached or did
+ * not complete without exception).
+ *
*/
public class BufferedStreamConsumer extends FailureTrackingAirbyteMessageConsumer implements AirbyteMessageConsumer {
@@ -85,7 +96,7 @@ public class BufferedStreamConsumer extends FailureTrackingAirbyteMessageConsume
private boolean hasStarted;
private boolean hasClosed;
- private AirbyteMessage lastCommittedState;
+ private AirbyteMessage lastFlushedState;
private AirbyteMessage pendingState;
public BufferedStreamConsumer(Consumer outputRecordCollector,
@@ -165,7 +176,7 @@ private void flushQueueToDestination() throws Exception {
}
if (pendingState != null) {
- lastCommittedState = pendingState;
+ lastFlushedState = pendingState;
pendingState = null;
}
}
@@ -192,13 +203,12 @@ protected void close(boolean hasFailed) throws Exception {
}
try {
- onClose.accept(hasFailed);
- // todo (cgardens) - For now we are using this conditional to maintain existing behavior. When we
- // enable checkpointing, we will need to get feedback from onClose on whether any data was persisted
- // or not. If it was then, the state message will be emitted.
- if (!hasFailed && lastCommittedState != null) {
- outputRecordCollector.accept(lastCommittedState);
- }
+ // means at least one state message worth of records were committed, so we will try complete the
+ // task.
+ onClose.accept(lastFlushedState != null);
+ // if one close succeeds without exception then we can emit the state record because it means its
+ // records were not only flushed, but committed.
+ outputRecordCollector.accept(lastFlushedState);
} catch (Exception e) {
LOGGER.error("on close failed.", e);
}
From dd10d5258624788690905fe31a3c65e80c127860 Mon Sep 17 00:00:00 2001
From: cgardens
Date: Sat, 22 May 2021 15:28:30 -0700
Subject: [PATCH 2/8] closer step by step
---
.../BufferedStreamConsumer.java | 7 ++--
.../BufferedStreamConsumerTest.java | 32 ++++++++++++++++++-
2 files changed, 35 insertions(+), 4 deletions(-)
diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.java
index 6f495d1703af..69c2070ac35e 100644
--- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.java
+++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.java
@@ -203,9 +203,10 @@ protected void close(boolean hasFailed) throws Exception {
}
try {
- // means at least one state message worth of records were committed, so we will try complete the
- // task.
- onClose.accept(lastFlushedState != null);
+ // if any state message flushed that means we can still go for at least a partial success. if none
+ // was emitted, if there were still no failures, then we can still succeed. the latter case is full
+ // refresh.
+ onClose.accept(lastFlushedState == null && hasFailed);
// if one close succeeds without exception then we can emit the state record because it means its
// records were not only flushed, but committed.
outputRecordCollector.accept(lastFlushedState);
diff --git a/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumerTest.java b/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumerTest.java
index 5857c36e4d9f..8a63b3f64eb5 100644
--- a/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumerTest.java
+++ b/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumerTest.java
@@ -24,6 +24,7 @@
package io.airbyte.integrations.destination.buffered_stream_consumer;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
@@ -31,6 +32,7 @@
import static org.mockito.Mockito.when;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
import io.airbyte.commons.concurrency.VoidCallable;
import io.airbyte.commons.functional.CheckedConsumer;
import io.airbyte.commons.functional.CheckedFunction;
@@ -193,6 +195,34 @@ void test1StreamWithStateAndThenMoreRecordsSmallerThanBuffer() throws Exception
verifyStartAndClose();
+ final List expectedRecords = Lists.newArrayList(expectedRecordsBatch1, expectedRecordsBatch2)
+ .stream()
+ .flatMap(Collection::stream)
+ .collect(Collectors.toList());
+ verifyRecords(STREAM_NAME, SCHEMA_NAME, expectedRecords);
+
+ verify(checkpointConsumer).accept(STATE_MESSAGE1);
+ }
+
+ @Test
+ void testExceptionAfterOneStateMessage() throws Exception {
+ final List expectedRecordsBatch1 = getNRecords(10);
+ final List expectedRecordsBatch2 = getNRecords(10, 20);
+ final List expectedRecordsBatch3 = getNRecords(20, 21);
+
+ consumer.start();
+ consumeRecords(consumer, expectedRecordsBatch1);
+ consumer.accept(STATE_MESSAGE1);
+ consumeRecords(consumer, expectedRecordsBatch2);
+ when(isValidRecord.apply(any())).thenThrow(new IllegalStateException("induced exception"));
+ assertThrows(IllegalStateException.class, () -> consumer.accept(expectedRecordsBatch3.get(0)));
+ consumer.close();
+
+ verifyStartAndClose();
+
+ verifyRecords(STREAM_NAME, SCHEMA_NAME, expectedRecordsBatch1);
+
+ verify(checkpointConsumer).accept(STATE_MESSAGE1);
}
@Test
@@ -280,4 +310,4 @@ private void verifyRecords(String streamName, String namespace, Collection
Date: Tue, 25 May 2021 15:45:36 -0700
Subject: [PATCH 3/8] better tests
---
.../BufferedStreamConsumer.java | 4 +-
.../BufferedStreamConsumerTest.java | 68 +++++++++++++++----
2 files changed, 58 insertions(+), 14 deletions(-)
diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.java
index 69c2070ac35e..26b690f8ac24 100644
--- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.java
+++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.java
@@ -209,7 +209,9 @@ protected void close(boolean hasFailed) throws Exception {
onClose.accept(lastFlushedState == null && hasFailed);
// if one close succeeds without exception then we can emit the state record because it means its
// records were not only flushed, but committed.
- outputRecordCollector.accept(lastFlushedState);
+ if (lastFlushedState != null) {
+ outputRecordCollector.accept(lastFlushedState);
+ }
} catch (Exception e) {
LOGGER.error("on close failed.", e);
}
diff --git a/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumerTest.java b/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumerTest.java
index 8a63b3f64eb5..c0b583c64d75 100644
--- a/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumerTest.java
+++ b/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumerTest.java
@@ -26,9 +26,11 @@
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;
import com.google.common.collect.ImmutableMap;
@@ -84,7 +86,7 @@ public class BufferedStreamConsumerTest {
private RecordWriter recordWriter;
private CheckedConsumer onClose;
private CheckedFunction isValidRecord;
- private Consumer checkpointConsumer;
+ private Consumer outputRecordCollector;
@SuppressWarnings("unchecked")
@BeforeEach
@@ -93,9 +95,9 @@ void setup() throws Exception {
recordWriter = mock(RecordWriter.class);
onClose = mock(CheckedConsumer.class);
isValidRecord = mock(CheckedFunction.class);
- checkpointConsumer = mock(Consumer.class);
+ outputRecordCollector = mock(Consumer.class);
consumer = new BufferedStreamConsumer(
- checkpointConsumer,
+ outputRecordCollector,
onStart,
recordWriter,
onClose,
@@ -119,7 +121,7 @@ void test1StreamWith1State() throws Exception {
verifyRecords(STREAM_NAME, SCHEMA_NAME, expectedRecords);
- verify(checkpointConsumer).accept(STATE_MESSAGE1);
+ verify(outputRecordCollector).accept(STATE_MESSAGE1);
}
@Test
@@ -136,7 +138,7 @@ void test1StreamWith2State() throws Exception {
verifyRecords(STREAM_NAME, SCHEMA_NAME, expectedRecords);
- verify(checkpointConsumer, times(1)).accept(STATE_MESSAGE2);
+ verify(outputRecordCollector, times(1)).accept(STATE_MESSAGE2);
}
@Test
@@ -152,7 +154,6 @@ void test1StreamWith0State() throws Exception {
verifyRecords(STREAM_NAME, SCHEMA_NAME, expectedRecords);
}
- // todo (cgardens) - split testing buffer flushing into own test.
@Test
void test1StreamWithStateAndThenMoreRecordsBiggerThanBuffer() throws Exception {
final List expectedRecordsBatch1 = getNRecords(10);
@@ -169,7 +170,7 @@ void test1StreamWithStateAndThenMoreRecordsBiggerThanBuffer() throws Exception {
verifyRecords(STREAM_NAME, SCHEMA_NAME, expectedRecordsBatch1);
verifyRecords(STREAM_NAME, SCHEMA_NAME, expectedRecordsBatch2);
- verify(checkpointConsumer).accept(STATE_MESSAGE1);
+ verify(outputRecordCollector).accept(STATE_MESSAGE1);
}
@Test
@@ -179,7 +180,7 @@ void test1StreamWithStateAndThenMoreRecordsSmallerThanBuffer() throws Exception
// consumer with big enough buffered that we see both batches are flushed in one go.
final BufferedStreamConsumer consumer = new BufferedStreamConsumer(
- checkpointConsumer,
+ outputRecordCollector,
onStart,
recordWriter,
onClose,
@@ -201,7 +202,7 @@ void test1StreamWithStateAndThenMoreRecordsSmallerThanBuffer() throws Exception
.collect(Collectors.toList());
verifyRecords(STREAM_NAME, SCHEMA_NAME, expectedRecords);
- verify(checkpointConsumer).accept(STATE_MESSAGE1);
+ verify(outputRecordCollector).accept(STATE_MESSAGE1);
}
@Test
@@ -222,7 +223,48 @@ void testExceptionAfterOneStateMessage() throws Exception {
verifyRecords(STREAM_NAME, SCHEMA_NAME, expectedRecordsBatch1);
- verify(checkpointConsumer).accept(STATE_MESSAGE1);
+ verify(outputRecordCollector).accept(STATE_MESSAGE1);
+ }
+
+ @Test
+ void testExceptionAfterNoStateMessages() throws Exception {
+ final List expectedRecordsBatch1 = getNRecords(10);
+ final List expectedRecordsBatch2 = getNRecords(10, 20);
+ final List expectedRecordsBatch3 = getNRecords(20, 21);
+
+ consumer.start();
+ consumeRecords(consumer, expectedRecordsBatch1);
+ consumeRecords(consumer, expectedRecordsBatch2);
+ when(isValidRecord.apply(any())).thenThrow(new IllegalStateException("induced exception"));
+ assertThrows(IllegalStateException.class, () -> consumer.accept(expectedRecordsBatch3.get(0)));
+ consumer.close();
+
+ verify(onStart).call();
+ verify(onClose).accept(true);
+
+ verifyRecords(STREAM_NAME, SCHEMA_NAME, expectedRecordsBatch1);
+
+ verifyNoInteractions(outputRecordCollector);
+ }
+
+ @Test
+ void testExceptionADuringOnClose() throws Exception {
+ doThrow(new IllegalStateException("induced exception")).when(onClose).accept(false);
+
+ final List expectedRecordsBatch1 = getNRecords(10);
+ final List expectedRecordsBatch2 = getNRecords(10, 20);
+
+ consumer.start();
+ consumeRecords(consumer, expectedRecordsBatch1);
+ consumer.accept(STATE_MESSAGE1);
+ consumeRecords(consumer, expectedRecordsBatch2);
+ consumer.close();
+
+ verifyStartAndClose();
+
+ verifyRecords(STREAM_NAME, SCHEMA_NAME, expectedRecordsBatch1);
+
+ verifyNoInteractions(outputRecordCollector);
}
@Test
@@ -245,7 +287,7 @@ void test2StreamWith1State() throws Exception {
verifyRecords(STREAM_NAME, SCHEMA_NAME, expectedRecordsStream1);
verifyRecords(STREAM_NAME2, SCHEMA_NAME, expectedRecordsStream2);
- verify(checkpointConsumer).accept(STATE_MESSAGE1);
+ verify(outputRecordCollector).accept(STATE_MESSAGE1);
}
@Test
@@ -269,7 +311,7 @@ void test2StreamWith2State() throws Exception {
verifyRecords(STREAM_NAME, SCHEMA_NAME, expectedRecordsStream1);
verifyRecords(STREAM_NAME2, SCHEMA_NAME, expectedRecordsStream2);
- verify(checkpointConsumer, times(1)).accept(STATE_MESSAGE2);
+ verify(outputRecordCollector, times(1)).accept(STATE_MESSAGE2);
}
private void verifyStartAndClose() throws Exception {
@@ -310,4 +352,4 @@ private void verifyRecords(String streamName, String namespace, Collection
Date: Wed, 26 May 2021 09:58:12 -0700
Subject: [PATCH 4/8] what's in a name?
---
.../buffered_stream_consumer/BufferedStreamConsumerTest.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumerTest.java b/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumerTest.java
index c0b583c64d75..e6f02e67c175 100644
--- a/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumerTest.java
+++ b/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumerTest.java
@@ -248,7 +248,7 @@ void testExceptionAfterNoStateMessages() throws Exception {
}
@Test
- void testExceptionADuringOnClose() throws Exception {
+ void testExceptionDuringOnClose() throws Exception {
doThrow(new IllegalStateException("induced exception")).when(onClose).accept(false);
final List expectedRecordsBatch1 = getNRecords(10);
From 94208dc90a8dd3c14a33504762abe17af5388811 Mon Sep 17 00:00:00 2001
From: cgardens
Date: Fri, 16 Jul 2021 11:44:59 -0700
Subject: [PATCH 5/8] fix exception handling in BufferedStreamConsumer
---
.../integrations/base/IntegrationRunner.java | 3 +--
.../BufferedStreamConsumer.java | 16 +++++++++++-----
2 files changed, 12 insertions(+), 7 deletions(-)
diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java
index ed2f242f13ac..5ebdd25d2069 100644
--- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java
+++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java
@@ -166,8 +166,7 @@ static void consumeWriteStream(AirbyteMessageConsumer consumer) throws Exception
if (singerMessageOptional.isPresent()) {
consumer.accept(singerMessageOptional.get());
} else {
- // todo (cgardens) - decide if we want to throw here instead.
- LOGGER.error(inputString);
+ LOGGER.error("Received invalid message: " + inputString);
}
}
}
diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.java
index 26b690f8ac24..8502d0eab45c 100644
--- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.java
+++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.java
@@ -203,17 +203,23 @@ protected void close(boolean hasFailed) throws Exception {
}
try {
- // if any state message flushed that means we can still go for at least a partial success. if none
- // was emitted, if there were still no failures, then we can still succeed. the latter case is full
- // refresh.
- onClose.accept(lastFlushedState == null && hasFailed);
+ // if no state was was emitted (i.e. full refresh), if there were still no failures, then we can
+ // still succeed.
+ if (lastFlushedState == null) {
+ onClose.accept(hasFailed);
+ } else {
+ // if any state message flushed that means we can still go for at least a partial success.
+ onClose.accept(false);
+ }
+
// if one close succeeds without exception then we can emit the state record because it means its
// records were not only flushed, but committed.
if (lastFlushedState != null) {
outputRecordCollector.accept(lastFlushedState);
}
} catch (Exception e) {
- LOGGER.error("on close failed.", e);
+ LOGGER.error("Close failed.", e);
+ throw e;
}
}
From e8b9fd2e6921bf3bdba85924cdca5b069589b4f1 Mon Sep 17 00:00:00 2001
From: cgardens
Date: Fri, 16 Jul 2021 11:50:53 -0700
Subject: [PATCH 6/8] a nod to copy destinations
---
.../BufferedStreamConsumer.java | 11 +++++++++++
1 file changed, 11 insertions(+)
diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.java
index 8502d0eab45c..0e9dfe14ddb0 100644
--- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.java
+++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.java
@@ -77,6 +77,17 @@
* partial success) or 0 state messages (in the case where the onClose step was never reached or did
* not complete without exception).
*
+ *
+ *
+ * When a record is "flushed" it is moved from the docker container to the destination. By
+ * convention, it is usually placed in some sort of temporary storage on the destination (e.g. a
+ * temporary database or file store). The logic in close handles committing the temporary
+ * representation data to the final store (e.g. final table). In the case of Copy destinations they
+ * often have additional temporary stores. The common pattern for copy destination is that flush
+ * pushes the data into cloud storage and then close copies from cloud storage to a temporary table
+ * AND then copies from the temporary table into the final table. This abstraction is blind to that
+ * detail as it implementation detail of how copy destinations implement close.
+ *
*/
public class BufferedStreamConsumer extends FailureTrackingAirbyteMessageConsumer implements AirbyteMessageConsumer {
From c6fe7aebf4a78823326051f86dbfabf201569aee Mon Sep 17 00:00:00 2001
From: cgardens
Date: Fri, 16 Jul 2021 12:09:29 -0700
Subject: [PATCH 7/8] fix exception test, copy changes
---
.../BufferedStreamConsumer.java | 16 ++++++++--------
.../BufferedStreamConsumerTest.java | 2 +-
2 files changed, 9 insertions(+), 9 deletions(-)
diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.java
index 0e9dfe14ddb0..525c48ae141d 100644
--- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.java
+++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.java
@@ -68,14 +68,14 @@
*
*
*
- * Throughout the lifecycle of the consumer, records get promoted from buffered to flushed to
- * committed. A record when it is received is immediately buffered. When the buffer fills up, all
- * buffered records are flushed out of memory using the user-provided recordWriter. When this flush
- * happens, a state message is moved from pending to flushed. On close, if the user-provided onClose
- * function is successful, then the flushed state record is considered committed and is then
- * emitted. We expect this class to only ever emit either 1 state message (in the case of a full or
- * partial success) or 0 state messages (in the case where the onClose step was never reached or did
- * not complete without exception).
+ * Throughout the lifecycle of the consumer, messages get promoted from buffered to flushed to
+ * committed. A record message when it is received is immediately buffered. When the buffer fills
+ * up, all buffered records are flushed out of memory using the user-provided recordWriter. When
+ * this flush happens, a state message is moved from pending to flushed. On close, if the
+ * user-provided onClose function is successful, then the flushed state record is considered
+ * committed and is then emitted. We expect this class to only ever emit either 1 state message (in
+ * the case of a full or partial success) or 0 state messages (in the case where the onClose step
+ * was never reached or did not complete without exception).
*
*
*
diff --git a/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumerTest.java b/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumerTest.java
index e6f02e67c175..9b13066153ec 100644
--- a/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumerTest.java
+++ b/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumerTest.java
@@ -258,7 +258,7 @@ void testExceptionDuringOnClose() throws Exception {
consumeRecords(consumer, expectedRecordsBatch1);
consumer.accept(STATE_MESSAGE1);
consumeRecords(consumer, expectedRecordsBatch2);
- consumer.close();
+ assertThrows(IllegalStateException.class, () -> consumer.close());
verifyStartAndClose();
From b5a3f6351da42fb68e5d6003407ee786a7184286 Mon Sep 17 00:00:00 2001
From: cgardens
Date: Wed, 21 Jul 2021 15:00:02 -0700
Subject: [PATCH 8/8] bump destination versions
---
.../seed/destination_definitions.yaml | 22 +++++++++----------
.../Dockerfile | 2 +-
.../destination-bigquery/Dockerfile | 2 +-
.../connectors/destination-csv/Dockerfile | 2 +-
.../destination-local-json/Dockerfile | 2 +-
.../destination-meilisearch/Dockerfile | 2 +-
.../connectors/destination-mssql/Dockerfile | 2 +-
.../connectors/destination-mysql/Dockerfile | 2 +-
.../connectors/destination-oracle/Dockerfile | 2 +-
.../destination-postgres/Dockerfile | 2 +-
.../destination-redshift/Dockerfile | 2 +-
.../destination-snowflake/Dockerfile | 2 +-
12 files changed, 22 insertions(+), 22 deletions(-)
diff --git a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml
index 0567df71b651..6e0f45f3a816 100644
--- a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml
+++ b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml
@@ -1,28 +1,28 @@
- destinationDefinitionId: a625d593-bba5-4a1c-a53d-2d246268a816
name: Local JSON
dockerRepository: airbyte/destination-local-json
- dockerImageTag: 0.2.7
+ dockerImageTag: 0.2.8
documentationUrl: https://docs.airbyte.io/integrations/destinations/local-json
- destinationDefinitionId: 8be1cf83-fde1-477f-a4ad-318d23c9f3c6
name: Local CSV
dockerRepository: airbyte/destination-csv
- dockerImageTag: 0.2.7
+ dockerImageTag: 0.2.8
documentationUrl: https://docs.airbyte.io/integrations/destinations/local-csv
- destinationDefinitionId: 25c5221d-dce2-4163-ade9-739ef790f503
name: Postgres
dockerRepository: airbyte/destination-postgres
- dockerImageTag: 0.3.7
+ dockerImageTag: 0.3.8
documentationUrl: https://docs.airbyte.io/integrations/destinations/postgres
icon: postgresql.svg
- destinationDefinitionId: 22f6c74f-5699-40ff-833c-4a879ea40133
name: BigQuery
dockerRepository: airbyte/destination-bigquery
- dockerImageTag: 0.3.7
+ dockerImageTag: 0.3.8
documentationUrl: https://docs.airbyte.io/integrations/destinations/bigquery
- destinationDefinitionId: 079d5540-f236-4294-ba7c-ade8fd918496
name: BigQuery (denormalized typed struct)
dockerRepository: airbyte/destination-bigquery-denormalized
- dockerImageTag: 0.1.0
+ dockerImageTag: 0.1.1
documentationUrl: https://docs.airbyte.io/integrations/destinations/bigquery
- destinationDefinitionId: ca8f6566-e555-4b40-943a-545bf123117a
name: Google Cloud Storage (GCS)
@@ -37,7 +37,7 @@
- destinationDefinitionId: 424892c4-daac-4491-b35d-c6688ba547ba
name: Snowflake
dockerRepository: airbyte/destination-snowflake
- dockerImageTag: 0.3.10
+ dockerImageTag: 0.3.11
documentationUrl: https://docs.airbyte.io/integrations/destinations/snowflake
- destinationDefinitionId: 4816b78f-1489-44c1-9060-4b19d5fa9362
name: S3
@@ -47,26 +47,26 @@
- destinationDefinitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc
name: Redshift
dockerRepository: airbyte/destination-redshift
- dockerImageTag: 0.3.11
+ dockerImageTag: 0.3.12
documentationUrl: https://docs.airbyte.io/integrations/destinations/redshift
icon: redshift.svg
- destinationDefinitionId: af7c921e-5892-4ff2-b6c1-4a5ab258fb7e
name: MeiliSearch
dockerRepository: airbyte/destination-meilisearch
- dockerImageTag: 0.2.7
+ dockerImageTag: 0.2.8
documentationUrl: https://docs.airbyte.io/integrations/destinations/meilisearch
- destinationDefinitionId: ca81ee7c-3163-4246-af40-094cc31e5e42
name: MySQL
dockerRepository: airbyte/destination-mysql
- dockerImageTag: 0.1.8
+ dockerImageTag: 0.1.9
documentationUrl: https://docs.airbyte.io/integrations/destinations/mysql
- destinationDefinitionId: d4353156-9217-4cad-8dd7-c108fd4f74cf
name: MS SQL Server
dockerRepository: airbyte/destination-mssql
- dockerImageTag: 0.1.5
+ dockerImageTag: 0.1.6
documentationUrl: https://docs.airbyte.io/integrations/destinations/mssql
- destinationDefinitionId: 3986776d-2319-4de9-8af8-db14c0996e72
name: Oracle (Alpha)
dockerRepository: airbyte/destination-oracle
- dockerImageTag: 0.1.2
+ dockerImageTag: 0.1.3
documentationUrl: https://docs.airbyte.io/integrations/destinations/oracle
diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/Dockerfile b/airbyte-integrations/connectors/destination-bigquery-denormalized/Dockerfile
index 8c59152b4805..be87de7e5042 100644
--- a/airbyte-integrations/connectors/destination-bigquery-denormalized/Dockerfile
+++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/Dockerfile
@@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar
RUN tar xf ${APPLICATION}.tar --strip-components=1
-LABEL io.airbyte.version=0.1.0
+LABEL io.airbyte.version=0.1.1
LABEL io.airbyte.name=airbyte/destination-bigquery-denormalized
diff --git a/airbyte-integrations/connectors/destination-bigquery/Dockerfile b/airbyte-integrations/connectors/destination-bigquery/Dockerfile
index 8238127d56fb..59bf25145cdd 100644
--- a/airbyte-integrations/connectors/destination-bigquery/Dockerfile
+++ b/airbyte-integrations/connectors/destination-bigquery/Dockerfile
@@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar
RUN tar xf ${APPLICATION}.tar --strip-components=1
-LABEL io.airbyte.version=0.3.7
+LABEL io.airbyte.version=0.3.8
LABEL io.airbyte.name=airbyte/destination-bigquery
diff --git a/airbyte-integrations/connectors/destination-csv/Dockerfile b/airbyte-integrations/connectors/destination-csv/Dockerfile
index bbc24451788e..fdaacb200ddb 100644
--- a/airbyte-integrations/connectors/destination-csv/Dockerfile
+++ b/airbyte-integrations/connectors/destination-csv/Dockerfile
@@ -7,5 +7,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar
RUN tar xf ${APPLICATION}.tar --strip-components=1
-LABEL io.airbyte.version=0.2.7
+LABEL io.airbyte.version=0.2.8
LABEL io.airbyte.name=airbyte/destination-csv
diff --git a/airbyte-integrations/connectors/destination-local-json/Dockerfile b/airbyte-integrations/connectors/destination-local-json/Dockerfile
index 47e0b28c60c5..f355b2ef7b5c 100644
--- a/airbyte-integrations/connectors/destination-local-json/Dockerfile
+++ b/airbyte-integrations/connectors/destination-local-json/Dockerfile
@@ -7,5 +7,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar
RUN tar xf ${APPLICATION}.tar --strip-components=1
-LABEL io.airbyte.version=0.2.7
+LABEL io.airbyte.version=0.2.8
LABEL io.airbyte.name=airbyte/destination-local-json
diff --git a/airbyte-integrations/connectors/destination-meilisearch/Dockerfile b/airbyte-integrations/connectors/destination-meilisearch/Dockerfile
index 466d243cc5e7..fd29451c32c5 100644
--- a/airbyte-integrations/connectors/destination-meilisearch/Dockerfile
+++ b/airbyte-integrations/connectors/destination-meilisearch/Dockerfile
@@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar
RUN tar xf ${APPLICATION}.tar --strip-components=1
-LABEL io.airbyte.version=0.2.7
+LABEL io.airbyte.version=0.2.8
LABEL io.airbyte.name=airbyte/destination-meilisearch
diff --git a/airbyte-integrations/connectors/destination-mssql/Dockerfile b/airbyte-integrations/connectors/destination-mssql/Dockerfile
index 927c4197ffed..9018245d3f82 100644
--- a/airbyte-integrations/connectors/destination-mssql/Dockerfile
+++ b/airbyte-integrations/connectors/destination-mssql/Dockerfile
@@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar
RUN tar xf ${APPLICATION}.tar --strip-components=1
-LABEL io.airbyte.version=0.1.5
+LABEL io.airbyte.version=0.1.6
LABEL io.airbyte.name=airbyte/destination-mssql
diff --git a/airbyte-integrations/connectors/destination-mysql/Dockerfile b/airbyte-integrations/connectors/destination-mysql/Dockerfile
index 5bc950ab07e3..29e6ee7c8191 100644
--- a/airbyte-integrations/connectors/destination-mysql/Dockerfile
+++ b/airbyte-integrations/connectors/destination-mysql/Dockerfile
@@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar
RUN tar xf ${APPLICATION}.tar --strip-components=1
-LABEL io.airbyte.version=0.1.8
+LABEL io.airbyte.version=0.1.9
LABEL io.airbyte.name=airbyte/destination-mysql
diff --git a/airbyte-integrations/connectors/destination-oracle/Dockerfile b/airbyte-integrations/connectors/destination-oracle/Dockerfile
index a8b34afebb62..d02e7ed230b5 100644
--- a/airbyte-integrations/connectors/destination-oracle/Dockerfile
+++ b/airbyte-integrations/connectors/destination-oracle/Dockerfile
@@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar
RUN tar xf ${APPLICATION}.tar --strip-components=1
-LABEL io.airbyte.version=0.1.2
+LABEL io.airbyte.version=0.1.3
LABEL io.airbyte.name=airbyte/destination-oracle
diff --git a/airbyte-integrations/connectors/destination-postgres/Dockerfile b/airbyte-integrations/connectors/destination-postgres/Dockerfile
index 64cccf52c761..6dcf5092a4e5 100644
--- a/airbyte-integrations/connectors/destination-postgres/Dockerfile
+++ b/airbyte-integrations/connectors/destination-postgres/Dockerfile
@@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar
RUN tar xf ${APPLICATION}.tar --strip-components=1
-LABEL io.airbyte.version=0.3.7
+LABEL io.airbyte.version=0.3.8
LABEL io.airbyte.name=airbyte/destination-postgres
diff --git a/airbyte-integrations/connectors/destination-redshift/Dockerfile b/airbyte-integrations/connectors/destination-redshift/Dockerfile
index 5fec62e11c03..8943eed911c0 100644
--- a/airbyte-integrations/connectors/destination-redshift/Dockerfile
+++ b/airbyte-integrations/connectors/destination-redshift/Dockerfile
@@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar
RUN tar xf ${APPLICATION}.tar --strip-components=1
-LABEL io.airbyte.version=0.3.11
+LABEL io.airbyte.version=0.3.12
LABEL io.airbyte.name=airbyte/destination-redshift
diff --git a/airbyte-integrations/connectors/destination-snowflake/Dockerfile b/airbyte-integrations/connectors/destination-snowflake/Dockerfile
index f2031c35e37b..270df9074f85 100644
--- a/airbyte-integrations/connectors/destination-snowflake/Dockerfile
+++ b/airbyte-integrations/connectors/destination-snowflake/Dockerfile
@@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar
RUN tar xf ${APPLICATION}.tar --strip-components=1
-LABEL io.airbyte.version=0.3.10
+LABEL io.airbyte.version=0.3.11
LABEL io.airbyte.name=airbyte/destination-snowflake