diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java index d9ebb4d59..3f7cd4b24 100644 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java @@ -143,6 +143,9 @@ public void flush(Map offsets) { // Return immediately here since the executor will already be shutdown if (stopped) { + // Still have to check for errors in order to prevent offsets being committed for records that + // we've failed to write + executor.maybeThrowEncounteredErrors(); return; } diff --git a/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTaskTest.java b/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTaskTest.java index 21d9e18fb..f91a755fd 100644 --- a/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTaskTest.java +++ b/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTaskTest.java @@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Matchers.any; @@ -465,6 +466,40 @@ public void testEmptyFlush() { testTask.flush(Collections.emptyMap()); } + @Test + public void testFlushAfterStop() { + Map properties = propertiesFactory.getProperties(); + Storage storage = mock(Storage.class); + + BigQuery bigQuery = mock(BigQuery.class); + when(bigQuery.insertAll(any())) + .thenThrow( + new BigQueryException(400, "Oops", new BigQueryError("invalid", "global", "oops"))); + + SchemaRetriever schemaRetriever = mock(SchemaRetriever.class); + SchemaManager schemaManager = mock(SchemaManager.class); + + SinkTaskContext sinkTaskContext = mock(SinkTaskContext.class); + BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, schemaRetriever, storage, schemaManager); + testTask.initialize(sinkTaskContext); + testTask.start(properties); + + testTask.put(Collections.singletonList(spoofSinkRecord("t"))); + assertThrows( + "first call to flush should fail", + Exception.class, + () -> testTask.flush(Collections.emptyMap())); + assertThrows( + "second call to flush should fail", + Exception.class, + () -> testTask.flush(Collections.emptyMap())); + testTask.stop(); + assertThrows( + "third call to flush (after task stop) should fail", + Exception.class, + () -> testTask.flush(Collections.emptyMap())); + } + @Test public void testBigQuery5XXRetry() { final String topic = "test_topic";