diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.java index 0e9dfe14ddb0..525c48ae141d 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.java +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.java @@ -68,14 +68,14 @@ *

* *

- * Throughout the lifecycle of the consumer, records get promoted from buffered to flushed to - * committed. A record when it is received is immediately buffered. When the buffer fills up, all - * buffered records are flushed out of memory using the user-provided recordWriter. When this flush - * happens, a state message is moved from pending to flushed. On close, if the user-provided onClose - * function is successful, then the flushed state record is considered committed and is then - * emitted. We expect this class to only ever emit either 1 state message (in the case of a full or - * partial success) or 0 state messages (in the case where the onClose step was never reached or did - * not complete without exception). + * Throughout the lifecycle of the consumer, messages get promoted from buffered to flushed to + * committed. A record message when it is received is immediately buffered. When the buffer fills + * up, all buffered records are flushed out of memory using the user-provided recordWriter. When + * this flush happens, a state message is moved from pending to flushed. On close, if the + * user-provided onClose function is successful, then the flushed state record is considered + * committed and is then emitted. We expect this class to only ever emit either 1 state message (in + * the case of a full or partial success) or 0 state messages (in the case where the onClose step + * was never reached or did not complete without exception). *

* *

diff --git a/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumerTest.java b/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumerTest.java index e6f02e67c175..9b13066153ec 100644 --- a/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumerTest.java +++ b/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumerTest.java @@ -258,7 +258,7 @@ void testExceptionDuringOnClose() throws Exception { consumeRecords(consumer, expectedRecordsBatch1); consumer.accept(STATE_MESSAGE1); consumeRecords(consumer, expectedRecordsBatch2); - consumer.close(); + assertThrows(IllegalStateException.class, () -> consumer.close()); verifyStartAndClose();