Skip to content

Commit

Permalink
closer step by step
Browse files Browse the repository at this point in the history
  • Loading branch information
cgardens committed May 26, 2021
1 parent d39b2a7 commit 49e9da6
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -203,9 +203,10 @@ protected void close(boolean hasFailed) throws Exception {
}

try {
// means at least one state message worth of records were committed, so we will try complete the
// task.
onClose.accept(lastFlushedState != null);
// if any state message flushed that means we can still go for at least a partial success. if none
// was emitted, if there were still no failures, then we can still succeed. the latter case is full
// refresh.
onClose.accept(lastFlushedState == null && hasFailed);
// if one close succeeds without exception then we can emit the state record because it means its
// records were not only flushed, but committed.
outputRecordCollector.accept(lastFlushedState);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@

package io.airbyte.integrations.destination.buffered_stream_consumer;

import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import io.airbyte.commons.concurrency.VoidCallable;
import io.airbyte.commons.functional.CheckedConsumer;
import io.airbyte.commons.functional.CheckedFunction;
Expand Down Expand Up @@ -193,6 +195,34 @@ void test1StreamWithStateAndThenMoreRecordsSmallerThanBuffer() throws Exception

verifyStartAndClose();

final List<AirbyteMessage> expectedRecords = Lists.newArrayList(expectedRecordsBatch1, expectedRecordsBatch2)
.stream()
.flatMap(Collection::stream)
.collect(Collectors.toList());
verifyRecords(STREAM_NAME, SCHEMA_NAME, expectedRecords);

verify(checkpointConsumer).accept(STATE_MESSAGE1);
}

@Test
void testExceptionAfterOneStateMessage() throws Exception {
final List<AirbyteMessage> expectedRecordsBatch1 = getNRecords(10);
final List<AirbyteMessage> expectedRecordsBatch2 = getNRecords(10, 20);
final List<AirbyteMessage> expectedRecordsBatch3 = getNRecords(20, 21);

consumer.start();
consumeRecords(consumer, expectedRecordsBatch1);
consumer.accept(STATE_MESSAGE1);
consumeRecords(consumer, expectedRecordsBatch2);
when(isValidRecord.apply(any())).thenThrow(new IllegalStateException("induced exception"));
assertThrows(IllegalStateException.class, () -> consumer.accept(expectedRecordsBatch3.get(0)));
consumer.close();

verifyStartAndClose();

verifyRecords(STREAM_NAME, SCHEMA_NAME, expectedRecordsBatch1);

verify(checkpointConsumer).accept(STATE_MESSAGE1);
}

@Test
Expand Down Expand Up @@ -280,4 +310,4 @@ private void verifyRecords(String streamName, String namespace, Collection<Airby
expectedRecords.stream().map(AirbyteMessage::getRecord).collect(Collectors.toList()));
}

}
}

0 comments on commit 49e9da6

Please sign in to comment.