From 63bc3230d4b5df9d117e87a4eddd56f80502c2fb Mon Sep 17 00:00:00 2001 From: Liren Tu Date: Sun, 4 Sep 2022 11:59:02 -0700 Subject: [PATCH] Add comments about intermediate state emission (#16262) * Add comments about intermediate state emission * Adjust wording * Format code --- .../relationaldb/StateDecoratingIterator.java | 31 ++++++++++++++----- .../StateDecoratingIteratorTest.java | 17 ++++++---- 2 files changed, 35 insertions(+), 13 deletions(-) diff --git a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/StateDecoratingIterator.java b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/StateDecoratingIterator.java index 12476b3b0306..605c3aca8eba 100644 --- a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/StateDecoratingIterator.java +++ b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/StateDecoratingIterator.java @@ -27,15 +27,30 @@ public class StateDecoratingIterator extends AbstractIterator im private final AirbyteStreamNameNamespacePair pair; private final String cursorField; private final JsonSchemaPrimitive cursorType; - private final int stateEmissionFrequency; private final String initialCursor; private String maxCursor; private boolean hasEmittedFinalState; - // The intermediateStateMessage is set to the latest state message. - // For every stateEmissionFrequency messages, emitIntermediateState is set to true and - // the latest intermediateStateMessage will be emitted. + /** + * These parameters are for intermediate state message emission. We can emit an intermediate state + * when the following two conditions are met. + *

+ * 1. The records are sorted by the cursor field. This is true when {@code stateEmissionFrequency} > + * 0. This logic is guaranteed in {@code AbstractJdbcSource#queryTableIncremental}, in which an + * "ORDER BY" clause is appended to the SQL query if {@code stateEmissionFrequency} > 0. + *

+ * 2. There is a cursor value that is ready for emission. A cursor value is "ready" if there is no + * more record with the same value. We cannot emit a cursor at will, because there may be multiple + * records with the same cursor value. If we emit a cursor ignoring this condition, should the sync + * fail right after the emission, the next sync may skip some records with the same cursor value due + * to "WHERE cursor_field > cursor" in {@code AbstractJdbcSource#queryTableIncremental}. + *

+ * The {@code intermediateStateMessage} is set to the latest state message that is ready for + * emission. For every {@code stateEmissionFrequency} messages, {@code emitIntermediateState} is set + * to true and the latest "ready" state will be emitted in the next {@code computeNext} call. + */ + private final int stateEmissionFrequency; private int totalRecordCount = 0; private boolean emitIntermediateState = false; private AirbyteMessage intermediateStateMessage = null; @@ -47,9 +62,11 @@ public class StateDecoratingIterator extends AbstractIterator im * @param cursorField Path to the comparator field used to track the records read so far * @param initialCursor name of the initial cursor column * @param cursorType ENUM type of primitive values that can be used as a cursor for checkpointing - * @param stateEmissionFrequency If larger than 0, intermediate states will be emitted for every - * stateEmissionFrequency records. Only emit intermediate states if the records are sorted by - * the cursor field. + * @param stateEmissionFrequency If larger than 0, the records are sorted by the cursor field, and + * intermediate states will be emitted for every {@code stateEmissionFrequency} records. The + * order of the records is guaranteed in {@code AbstractJdbcSource#queryTableIncremental}, in + * which an "ORDER BY" clause is appended to the SQL query if {@code stateEmissionFrequency} + * > 0. */ public StateDecoratingIterator(final Iterator messageIterator, final StateManager stateManager, diff --git a/airbyte-integrations/connectors/source-relational-db/src/test/java/io/airbyte/integrations/source/relationaldb/StateDecoratingIteratorTest.java b/airbyte-integrations/connectors/source-relational-db/src/test/java/io/airbyte/integrations/source/relationaldb/StateDecoratingIteratorTest.java index c67f537f76a8..7532ca2517b5 100644 --- a/airbyte-integrations/connectors/source-relational-db/src/test/java/io/airbyte/integrations/source/relationaldb/StateDecoratingIteratorTest.java +++ b/airbyte-integrations/connectors/source-relational-db/src/test/java/io/airbyte/integrations/source/relationaldb/StateDecoratingIteratorTest.java @@ -7,7 +7,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -75,6 +74,7 @@ private static AirbyteMessage createStateMessage(final String recordValue) { private Iterator createExceptionIterator() { return new Iterator() { + final Iterator internalMessageIterator = MoreIterators.of(RECORD_MESSAGE_1, RECORD_MESSAGE_2, RECORD_MESSAGE_2, RECORD_MESSAGE_3); @@ -88,7 +88,8 @@ public AirbyteMessage next() { if (internalMessageIterator.hasNext()) { return internalMessageIterator.next(); } else { - // this line throws a RunTimeException wrapped around a SQLException to mimic the flow of when a SQLException is thrown and wrapped in + // this line throws a RunTimeException wrapped around a SQLException to mimic the flow of when a + // SQLException is thrown and wrapped in // StreamingJdbcDatabase#tryAdvance throw new RuntimeException(new SQLException("Connection marked broken because of SQLSTATE(080006)", "08006")); } @@ -186,10 +187,12 @@ void testIteratorCatchesExceptionWhenEmissionFrequencyNonZero() { 1); assertEquals(RECORD_MESSAGE_1, iterator.next()); assertEquals(RECORD_MESSAGE_2, iterator.next()); - // continues to emit RECORD_MESSAGE_2 since cursorField has not changed thus not satisfying the condition of "ready" + // continues to emit RECORD_MESSAGE_2 since cursorField has not changed thus not satisfying the + // condition of "ready" assertEquals(RECORD_MESSAGE_2, iterator.next()); assertEquals(RECORD_MESSAGE_3, iterator.next()); - // emits the first state message since the iterator has changed cursorFields (2 -> 3) and met the frequency minimum of 1 record + // emits the first state message since the iterator has changed cursorFields (2 -> 3) and met the + // frequency minimum of 1 record assertEquals(STATE_MESSAGE_2, iterator.next()); // no further records to read since Exception was caught above and marked iterator as endOfData() assertFalse(iterator.hasNext()); @@ -210,8 +213,10 @@ void testIteratorCatchesExceptionWhenEmissionFrequencyZero() { assertEquals(RECORD_MESSAGE_2, iterator.next()); assertEquals(RECORD_MESSAGE_2, iterator.next()); assertEquals(RECORD_MESSAGE_3, iterator.next()); - // since stateEmission is not set to emit frequently, this will catch the error but not emit state message since it wasn't in a ready state - // of having a frequency > 0 but will prevent an exception from causing the iterator to fail by marking iterator as endOfData() + // since stateEmission is not set to emit frequently, this will catch the error but not emit state + // message since it wasn't in a ready state + // of having a frequency > 0 but will prevent an exception from causing the iterator to fail by + // marking iterator as endOfData() assertFalse(iterator.hasNext()); }