Skip to content

Commit

Permalink
KAFKA-10188: Prevent SinkTask::preCommit from being called after Sink…
Browse files Browse the repository at this point in the history
…Task::stop (apache#8910)
  • Loading branch information
C0urante authored and Rafa García committed Oct 20, 2020
1 parent 3ddb215 commit f9e1cd0
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ class WorkerSinkTask extends WorkerTask {
private int commitFailures;
private boolean pausedForRedelivery;
private boolean committing;
private boolean taskStopped;
private final WorkerErrantRecordReporter workerErrantRecordReporter;

public WorkerSinkTask(ConnectorTaskId id,
Expand Down Expand Up @@ -138,6 +139,7 @@ public WorkerSinkTask(ConnectorTaskId id,
this.sinkTaskMetricsGroup.recordOffsetSequenceNumber(commitSeqno);
this.consumer = consumer;
this.isTopicTrackingEnabled = workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG);
this.taskStopped = false;
this.workerErrantRecordReporter = workerErrantRecordReporter;
}

Expand Down Expand Up @@ -168,6 +170,7 @@ protected void close() {
} catch (Throwable t) {
log.warn("Could not stop task", t);
}
taskStopped = true;
Utils.closeQuietly(consumer, "consumer");
Utils.closeQuietly(transformationChain, "transformation chain");
Utils.closeQuietly(retryWithToleranceOperator, "retry operator");
Expand Down Expand Up @@ -712,6 +715,10 @@ else if (!context.pausedPartitions().isEmpty())

@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
if (taskStopped) {
log.trace("Skipping partition revocation callback as task has already been stopped");
return;
}
log.debug("{} Partitions revoked", WorkerSinkTask.this);
try {
closePartitions();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,56 @@ public void testPause() throws Exception {
PowerMock.verifyAll();
}

@Test
public void testShutdown() throws Exception {
createTask(initialState);

expectInitializeTask();
expectTaskGetTopic(true);

// first iteration
expectPollInitialAssignment();

// second iteration
EasyMock.expect(sinkTask.preCommit(EasyMock.anyObject())).andReturn(Collections.emptyMap());
expectConsumerPoll(1);
expectConversionAndTransformation(1);
sinkTask.put(EasyMock.<Collection<SinkRecord>>anyObject());
EasyMock.expectLastCall();

// WorkerSinkTask::stop
consumer.wakeup();
PowerMock.expectLastCall();
sinkTask.stop();
PowerMock.expectLastCall();

// WorkerSinkTask::close
consumer.close();
PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() {
@Override
public Object answer() throws Throwable {
rebalanceListener.getValue().onPartitionsRevoked(
asList(TOPIC_PARTITION, TOPIC_PARTITION2)
);
return null;
}
});
transformationChain.close();
PowerMock.expectLastCall();

PowerMock.replayAll();

workerTask.initialize(TASK_CONFIG);
workerTask.initializeAndStart();
workerTask.iteration();
sinkTaskContext.getValue().requestCommit(); // Force an offset commit
workerTask.iteration();
workerTask.stop();
workerTask.close();

PowerMock.verifyAll();
}

@Test
public void testPollRedelivery() throws Exception {
createTask(initialState);
Expand Down

0 comments on commit f9e1cd0

Please sign in to comment.