Skip to content

Commit

Permalink
GH-68: Continue to check for errors in flush even after stopped (#70)
Browse files Browse the repository at this point in the history
* GH-68: Continue to check for errors in flush even after stopped

* GH-68: Add unit test
  • Loading branch information
C0urante authored Jan 19, 2021
1 parent 54c8d8e commit d724654
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,9 @@ public void flush(Map<TopicPartition, OffsetAndMetadata> offsets) {

// Return immediately here since the executor will already be shutdown
if (stopped) {
// Still have to check for errors in order to prevent offsets being committed for records that
// we've failed to write
executor.maybeThrowEncounteredErrors();
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Matchers.any;
Expand Down Expand Up @@ -465,6 +466,40 @@ public void testEmptyFlush() {
testTask.flush(Collections.emptyMap());
}

@Test
public void testFlushAfterStop() {
Map<String, String> properties = propertiesFactory.getProperties();
Storage storage = mock(Storage.class);

BigQuery bigQuery = mock(BigQuery.class);
when(bigQuery.insertAll(any()))
.thenThrow(
new BigQueryException(400, "Oops", new BigQueryError("invalid", "global", "oops")));

SchemaRetriever schemaRetriever = mock(SchemaRetriever.class);
SchemaManager schemaManager = mock(SchemaManager.class);

SinkTaskContext sinkTaskContext = mock(SinkTaskContext.class);
BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, schemaRetriever, storage, schemaManager);
testTask.initialize(sinkTaskContext);
testTask.start(properties);

testTask.put(Collections.singletonList(spoofSinkRecord("t")));
assertThrows(
"first call to flush should fail",
Exception.class,
() -> testTask.flush(Collections.emptyMap()));
assertThrows(
"second call to flush should fail",
Exception.class,
() -> testTask.flush(Collections.emptyMap()));
testTask.stop();
assertThrows(
"third call to flush (after task stop) should fail",
Exception.class,
() -> testTask.flush(Collections.emptyMap()));
}

@Test
public void testBigQuery5XXRetry() {
final String topic = "test_topic";
Expand Down

0 comments on commit d724654

Please sign in to comment.