Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-10188: Prevent SinkTask::preCommit from being called after SinkTask::stop #8910

Merged
merged 1 commit into from
Oct 6, 2020

Conversation

C0urante
Copy link
Contributor

Jira

The general lifecycle for a sink task is:

  1. Instantiate the SinkTask object
  2. Invoke SinkTask::initialize
  3. Invoke SinkTask::start
  4. While the task is still running:
  • Poll Kafka for records
  • Give those records to the task via SinkTask::put
  • Periodically commit offsets, which involves calling SinkTask::preCommit and committing the resulting map of TopicPartition to offset to Kafka
  1. Commit offsets a penultimate time (including the call to `SinkTask::preCommit)
  2. Invoke SinkTask::stop
  3. Close the consumer for the task
  4. Commit offsets a final time (also including the call to SinkTask::preCommit)

This final offset commit happens indirectly: closing the consumer for a sink task causes the rebalance listener for that consumer to be triggered, and the rebalance listener the framework uses for its consumers performs an offset commit for the task when partitions are revoked.

This is a bit of a problem because the framework calls SinkTask::stop before closing the consumer for the task. It's possible and even likely that tasks will have de-allocated resources necessary for their preCommit method and will fail unexpectedly at this point.

Since the framework already ensures that offsets are committed after the last call to SinkTask::put, it should be fine to remove this extra offset commit. There is still a chance that some data may be dropped in the case that a task performs completely asynchronous writes to Kafka and has written data between the pre-stop call to SinkTask::preCommit and the post-stop one, but there will be no loss of delivery guarantees provided by the framework, and this change will adhere to the publicly-stated API for sink tasks.

A unit test is added that covers the internal WorkerSinkTask::close method and ensures that SinkTask::preCommit is not called during that method.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

Copy link
Contributor

@gharris1727 gharris1727 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks @C0urante for the fix!

Copy link
Contributor

@rhauch rhauch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work, @C0urante. One question below.

@@ -138,6 +139,7 @@ public WorkerSinkTask(ConnectorTaskId id,
this.sinkTaskMetricsGroup.recordOffsetSequenceNumber(commitSeqno);
this.consumer = consumer;
this.isTopicTrackingEnabled = workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG);
this.taskStopped = false;
Copy link
Contributor

@rhauch rhauch Sep 28, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be volatile?

Yes, it's true that WorkerSinkTask.close() is always and only called from within the WorkerTask.doRun() after the tasks determines it will stop. However, the onPartitionsRevoked(...) method is called from the consumer thread, and making the field volatile is the only way to ensure that the consumer thread reads a non-cached value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Javadocs for the ConsumerRebalanceLister state that the callback "will only execute in the user thread as part of the poll(long) call"; I think we have a guarantee here that onPartitionsRevoked will be called on the same thread that sets taskStopped to false. A fun way to verify this is to view the exceptions that get thrown by this bug; the stack traces include these lines:

	at org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsRevoked(WorkerSinkTask.java:695)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsRevoked(ConsumerCoordinator.java:312)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onLeavePrepare(ConsumerCoordinator.java:744)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.close(AbstractCoordinator.java:976)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.close(ConsumerCoordinator.java:888)
	at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2368)
	at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2335)
	at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2285)
	at org.apache.kafka.common.utils.Utils.closeQuietly(Utils.java:933)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.close(WorkerSinkTask.java:174)
	at org.apache.kafka.connect.runtime.WorkerTask.doClose(WorkerTask.java:164)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:191)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)

The only edge case I can think of might be with asynchronous offset commits, but fwict those don't trigger asynchronous rebalance listener callbacks (if they trigger rebalances or rebalance listener callbacks at all).

workerTask.stop();
workerTask.close();

PowerMock.verifyAll();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Verified locally that this test fails when the additions to the onPartitionsRevoked(...) method above are removed locally. Nice work, @C0urante.

@rhauch rhauch merged commit 4c6b962 into apache:trunk Oct 6, 2020
@C0urante C0urante deleted the kafka-10188 branch October 6, 2020 18:25
rhauch pushed a commit that referenced this pull request Oct 6, 2020
rhauch pushed a commit that referenced this pull request Oct 6, 2020
javierfreire pushed a commit to javierfreire/kafka that referenced this pull request Oct 8, 2020
ijuma added a commit to confluentinc/kafka that referenced this pull request Oct 8, 2020
* commit '2804257fe221f37e5098bd': (67 commits)
  KAFKA-10562: Properly invoke new StateStoreContext init (apache#9388)
  MINOR: trivial cleanups, javadoc errors, omitted StateStore tests, etc. (apache#8130)
  KAFKA-10564: only process non-empty task directories when internally cleaning obsolete state stores (apache#9373)
  KAFKA-9274: fix incorrect default value for `task.timeout.ms` config (apache#9385)
  KAFKA-10362: When resuming Streams active task with EOS, the checkpoint file is deleted (apache#9247)
  KAFKA-10028: Implement write path for feature versioning system (KIP-584) (apache#9001)
  KAFKA-10402: Upgrade system tests to python3 (apache#9196)
  KAFKA-10186; Abort transaction with pending data with TransactionAbortedException (apache#9280)
  MINOR: Remove `TargetVoters` from `DescribeQuorum` (apache#9376)
  Revert "KAFKA-10469: Resolve logger levels hierarchically (apache#9266)"
  MINOR: Don't publish javadocs for raft module (apache#9336)
  KAFKA-9929: fix: add missing default implementations (apache#9321)
  KAFKA-10188: Prevent SinkTask::preCommit from being called after SinkTask::stop (apache#8910)
  KAFKA-10338; Support PEM format for SSL key and trust stores (KIP-651) (apache#9345)
  KAFKA-10527; Voters should not reinitialize as leader in same epoch (apache#9348)
  MINOR: Refactor unit tests around RocksDBConfigSetter (apache#9358)
  KAFKA-6733: Printing additional ConsumerRecord fields in DefaultMessageFormatter (apache#9099)
  MINOR: Annotate test BlockingConnectorTest as integration test (apache#9379)
  MINOR: Fix failing test due to KAFKA-10556 PR (apache#9372)
  KAFKA-10439: Connect's Values to parse BigInteger as Decimal with zero scale. (apache#9320)
  ...
rgo pushed a commit to rgo/kafka that referenced this pull request Oct 20, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants