diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.java index 6f495d1703af..69c2070ac35e 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.java +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.java @@ -203,9 +203,10 @@ protected void close(boolean hasFailed) throws Exception { } try { - // means at least one state message worth of records were committed, so we will try complete the - // task. - onClose.accept(lastFlushedState != null); + // if any state message flushed that means we can still go for at least a partial success. if none + // was emitted, if there were still no failures, then we can still succeed. the latter case is full + // refresh. + onClose.accept(lastFlushedState == null && hasFailed); // if one close succeeds without exception then we can emit the state record because it means its // records were not only flushed, but committed. outputRecordCollector.accept(lastFlushedState); diff --git a/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumerTest.java b/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumerTest.java index 24397cf5f284..853bfa262e29 100644 --- a/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumerTest.java +++ b/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumerTest.java @@ -24,6 +24,7 @@ package io.airbyte.integrations.destination.buffered_stream_consumer; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -31,6 +32,7 @@ import static org.mockito.Mockito.when; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; import io.airbyte.commons.concurrency.VoidCallable; import io.airbyte.commons.functional.CheckedConsumer; import io.airbyte.commons.functional.CheckedFunction; @@ -193,6 +195,34 @@ void test1StreamWithStateAndThenMoreRecordsSmallerThanBuffer() throws Exception verifyStartAndClose(); + final List expectedRecords = Lists.newArrayList(expectedRecordsBatch1, expectedRecordsBatch2) + .stream() + .flatMap(Collection::stream) + .collect(Collectors.toList()); + verifyRecords(STREAM_NAME, SCHEMA_NAME, expectedRecords); + + verify(checkpointConsumer).accept(STATE_MESSAGE1); + } + + @Test + void testExceptionAfterOneStateMessage() throws Exception { + final List expectedRecordsBatch1 = getNRecords(10); + final List expectedRecordsBatch2 = getNRecords(10, 20); + final List expectedRecordsBatch3 = getNRecords(20, 21); + + consumer.start(); + consumeRecords(consumer, expectedRecordsBatch1); + consumer.accept(STATE_MESSAGE1); + consumeRecords(consumer, expectedRecordsBatch2); + when(isValidRecord.apply(any())).thenThrow(new IllegalStateException("induced exception")); + assertThrows(IllegalStateException.class, () -> consumer.accept(expectedRecordsBatch3.get(0))); + consumer.close(); + + verifyStartAndClose(); + + verifyRecords(STREAM_NAME, SCHEMA_NAME, expectedRecordsBatch1); + + verify(checkpointConsumer).accept(STATE_MESSAGE1); } @Test @@ -280,4 +310,4 @@ private void verifyRecords(String streamName, String namespace, Collection