diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.java index 69c2070ac35e..26b690f8ac24 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.java +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.java @@ -209,7 +209,9 @@ protected void close(boolean hasFailed) throws Exception { onClose.accept(lastFlushedState == null && hasFailed); // if one close succeeds without exception then we can emit the state record because it means its // records were not only flushed, but committed. - outputRecordCollector.accept(lastFlushedState); + if (lastFlushedState != null) { + outputRecordCollector.accept(lastFlushedState); + } } catch (Exception e) { LOGGER.error("on close failed.", e); } diff --git a/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumerTest.java b/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumerTest.java index 853bfa262e29..56aa30cde290 100644 --- a/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumerTest.java +++ b/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumerTest.java @@ -26,9 +26,11 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; import com.google.common.collect.ImmutableMap; @@ -84,7 +86,7 @@ public class BufferedStreamConsumerTest { private RecordWriter recordWriter; private CheckedConsumer onClose; private CheckedFunction isValidRecord; - private Consumer checkpointConsumer; + private Consumer outputRecordCollector; @SuppressWarnings("unchecked") @BeforeEach @@ -93,9 +95,9 @@ void setup() throws Exception { recordWriter = mock(RecordWriter.class); onClose = mock(CheckedConsumer.class); isValidRecord = mock(CheckedFunction.class); - checkpointConsumer = mock(Consumer.class); + outputRecordCollector = mock(Consumer.class); consumer = new BufferedStreamConsumer( - checkpointConsumer, + outputRecordCollector, onStart, recordWriter, onClose, @@ -119,7 +121,7 @@ void test1StreamWith1State() throws Exception { verifyRecords(STREAM_NAME, SCHEMA_NAME, expectedRecords); - verify(checkpointConsumer).accept(STATE_MESSAGE1); + verify(outputRecordCollector).accept(STATE_MESSAGE1); } @Test @@ -136,7 +138,7 @@ void test1StreamWith2State() throws Exception { verifyRecords(STREAM_NAME, SCHEMA_NAME, expectedRecords); - verify(checkpointConsumer, times(1)).accept(STATE_MESSAGE2); + verify(outputRecordCollector, times(1)).accept(STATE_MESSAGE2); } @Test @@ -152,7 +154,6 @@ void test1StreamWith0State() throws Exception { verifyRecords(STREAM_NAME, SCHEMA_NAME, expectedRecords); } - // todo (cgardens) - split testing buffer flushing into own test. @Test void test1StreamWithStateAndThenMoreRecordsBiggerThanBuffer() throws Exception { final List expectedRecordsBatch1 = getNRecords(10); @@ -169,7 +170,7 @@ void test1StreamWithStateAndThenMoreRecordsBiggerThanBuffer() throws Exception { verifyRecords(STREAM_NAME, SCHEMA_NAME, expectedRecordsBatch1); verifyRecords(STREAM_NAME, SCHEMA_NAME, expectedRecordsBatch2); - verify(checkpointConsumer).accept(STATE_MESSAGE1); + verify(outputRecordCollector).accept(STATE_MESSAGE1); } @Test @@ -179,7 +180,7 @@ void test1StreamWithStateAndThenMoreRecordsSmallerThanBuffer() throws Exception // consumer with big enough buffered that we see both batches are flushed in one go. final BufferedStreamConsumer consumer = new BufferedStreamConsumer( - checkpointConsumer, + outputRecordCollector, onStart, recordWriter, onClose, @@ -201,7 +202,7 @@ void test1StreamWithStateAndThenMoreRecordsSmallerThanBuffer() throws Exception .collect(Collectors.toList()); verifyRecords(STREAM_NAME, SCHEMA_NAME, expectedRecords); - verify(checkpointConsumer).accept(STATE_MESSAGE1); + verify(outputRecordCollector).accept(STATE_MESSAGE1); } @Test @@ -222,7 +223,48 @@ void testExceptionAfterOneStateMessage() throws Exception { verifyRecords(STREAM_NAME, SCHEMA_NAME, expectedRecordsBatch1); - verify(checkpointConsumer).accept(STATE_MESSAGE1); + verify(outputRecordCollector).accept(STATE_MESSAGE1); + } + + @Test + void testExceptionAfterNoStateMessages() throws Exception { + final List expectedRecordsBatch1 = getNRecords(10); + final List expectedRecordsBatch2 = getNRecords(10, 20); + final List expectedRecordsBatch3 = getNRecords(20, 21); + + consumer.start(); + consumeRecords(consumer, expectedRecordsBatch1); + consumeRecords(consumer, expectedRecordsBatch2); + when(isValidRecord.apply(any())).thenThrow(new IllegalStateException("induced exception")); + assertThrows(IllegalStateException.class, () -> consumer.accept(expectedRecordsBatch3.get(0))); + consumer.close(); + + verify(onStart).call(); + verify(onClose).accept(true); + + verifyRecords(STREAM_NAME, SCHEMA_NAME, expectedRecordsBatch1); + + verifyNoInteractions(outputRecordCollector); + } + + @Test + void testExceptionADuringOnClose() throws Exception { + doThrow(new IllegalStateException("induced exception")).when(onClose).accept(false); + + final List expectedRecordsBatch1 = getNRecords(10); + final List expectedRecordsBatch2 = getNRecords(10, 20); + + consumer.start(); + consumeRecords(consumer, expectedRecordsBatch1); + consumer.accept(STATE_MESSAGE1); + consumeRecords(consumer, expectedRecordsBatch2); + consumer.close(); + + verifyStartAndClose(); + + verifyRecords(STREAM_NAME, SCHEMA_NAME, expectedRecordsBatch1); + + verifyNoInteractions(outputRecordCollector); } @Test @@ -245,7 +287,7 @@ void test2StreamWith1State() throws Exception { verifyRecords(STREAM_NAME, SCHEMA_NAME, expectedRecordsStream1); verifyRecords(STREAM_NAME2, SCHEMA_NAME, expectedRecordsStream2); - verify(checkpointConsumer).accept(STATE_MESSAGE1); + verify(outputRecordCollector).accept(STATE_MESSAGE1); } @Test @@ -269,7 +311,7 @@ void test2StreamWith2State() throws Exception { verifyRecords(STREAM_NAME, SCHEMA_NAME, expectedRecordsStream1); verifyRecords(STREAM_NAME2, SCHEMA_NAME, expectedRecordsStream2); - verify(checkpointConsumer, times(1)).accept(STATE_MESSAGE2); + verify(outputRecordCollector, times(1)).accept(STATE_MESSAGE2); } private void verifyStartAndClose() throws Exception { @@ -310,4 +352,4 @@ private void verifyRecords(String streamName, String namespace, Collection