-
Notifications
You must be signed in to change notification settings - Fork 14k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-12268: Implement task idling semantics via currentLag API #10137
Conversation
This reverts commit fdcf8fb.
Implements KIP-695 Reverts a previous behavior change to Consumer.poll and replaces it with a new Consumer.currentLag API, which returns the client's currently cached lag. Uses this new API to implement the desired task idling semantics improvement from KIP-695.
/** | ||
* @see KafkaConsumer#currentLag(TopicPartition) | ||
*/ | ||
OptionalLong currentLag(TopicPartition topicPartition); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pardon me, KIP-695 does not include this change. It seems KIP-695 is still based on metadata
? Please correct me If I misunderstand anything :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Woah, you are fast, @chia7712 !
I just sent a message to the vote thread. I wanted to submit this PR first so that the vote thread message can have the full context available.
Do you mind reading over what I said there? If it sounds good to you, then I'll update the KIP, and we can maybe put this whole mess to bed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Quick question, should the API take a Collection<TopicPartition>
like other APIs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @ijuma, I considered it, but decided on the current API because:
- This is a very quick, local in-memory lookup, so there's no reason to batch multiple requests in one
- It complicates the return type. We'd have to return either a
Map<TP, Long>
, with mappings missing for unknown lags (which creates unfortunate null semantics for users), or aMap<TP, OptionalLong>
which creates a complex-to-understand two hop lookup (lag:=result.get(tp).get()
). Or else, we could return a more complex domain object object like @chia7712 proposed in the mailing list. All these complications seem like unnecessary complexity in the case of this particular API, given the first point.
WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there is no batching benefit, then the simpler API makes sense. @hachikuji Any reason why batching could be useful here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For API calls that may incur a broker round trip, have batching of partitions makes sense. For this API I think single partition lookup is good enough.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we concern that users may call this function too frequent looping a large number of partitions, and each call is synchronizing on the subscription state, then maybe we can make it in a batching mode.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good point, @guozhangwang. Entering the synchronized block will have some overhead each time it's called.
I think we can just reason about the use cases here. My guess is that people would either tend to spot-check specific lags, as we are doing here, or they would tend to periodically check all lags. In the former case, I'd hazard that the current API is fine. In the latter case, we'd face more overhead. I'm sure this is motivated reasoning, but perhaps we can lump the latter case in with @chia7712 's suggestion to expose more metadata and defer it to the future.
**/ | ||
@Override | ||
public OptionalLong currentLag(TopicPartition topicPartition) { | ||
final Long lag = subscriptions.partitionLag(topicPartition, isolationLevel); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Other methods call acquireAndEnsureOpen();
first and then call release()
in the finally block. Should this new method follow same pattern?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I overlooked that, and it's a good idea.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made a pass over the PR, overall LGTM! Just one comment about the lag provider
call frequency.
/** | ||
* @see KafkaConsumer#currentLag(TopicPartition) | ||
*/ | ||
OptionalLong currentLag(TopicPartition topicPartition); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For API calls that may incur a broker round trip, have batching of partitions makes sense. For this API I think single partition lookup is good enough.
@@ -156,24 +134,24 @@ public boolean readyToProcess(final long wallClockTime) { | |||
final TopicPartition partition = entry.getKey(); | |||
final RecordQueue queue = entry.getValue(); | |||
|
|||
final Long nullableFetchedLag = fetchedLags.get(partition); | |||
final OptionalLong fetchedLag = lagProvider.apply(partition); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wearing my paranoid hat here: readyToProcess
is on the critical path, called per record, while we would only update the underlying lag at most as frequent as the consumer poll rate. And in practice we would fall in to the first condition !queue.isEmpty()
most of the time. On the other hand, partitionLag
call on SubscriptionState
is synchronized and could slow down the fetching thread (well, maybe just a bit). So could we call the provider only necessary, i.e. the queue is empty and the lag is either == 0 or not present?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, this is a good suggestion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just one nit on log4j entry, otherwise LGTM!
@@ -211,7 +192,7 @@ public boolean readyToProcess(final long wallClockTime) { | |||
return false; | |||
} else { | |||
enforcedProcessingSensor.record(1.0d, wallClockTime); | |||
logger.info("Continuing to process although some partition timestamps were not buffered locally." + | |||
logger.trace("Continuing to process although some partitions are empty on the broker." + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought if user did not set to MAX_TASK_IDLE_MS_DISABLED
they may care about when enforce processing starts, so this is better to stay as INFO?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @guozhangwang, I thought so as well, but @ableegoldman pointed out to me that it actually prints out on every invocation while we are enforcing processing, which turns out to flood the logs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we could leave this detailed logging at TRACE, and just print a single message at warn
the first time this enforced processing occurs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM overall
idlePartitionDeadlines.putIfAbsent(partition, wallClockTime + maxTaskIdleMs); | ||
final long deadline = idlePartitionDeadlines.get(partition); | ||
if (wallClockTime < deadline) { | ||
final OptionalLong fetchedLag = lagProvider.apply(partition); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was nullable before (the partition has not been fetched). For this net API, getting lag for such partition can produce exception. Is this a potential bug?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, @chia7712 . Yes, it can produce an exception now, but only if the partition is not assigned. If it simply has not been fetched, then the OptionalLong will be "empty".
I think this exception is fine to raise, since the assignment being unassigned at this point seems like an illegal state.
One unrelated test failure:
|
* apache-github/trunk: (37 commits) KAFKA-10357: Extract setup of changelog from Streams partition assignor (apache#10163) KAFKA-10251: increase timeout for consuming records (apache#10228) KAFKA-12394; Return `TOPIC_AUTHORIZATION_FAILED` in delete topic response if no describe permission (apache#10223) MINOR: Disable transactional/idempotent system tests for Raft quorums (apache#10224) KAFKA-10766: Unit test cases for RocksDBRangeIterator (apache#9717) KAFKA-12289: Adding test cases for prefix scan in InMemoryKeyValueStore (apache#10052) KAFKA-12268: Implement task idling semantics via currentLag API (apache#10137) MINOR: Time and log producer state recovery phases (apache#10241) MINOR: correct the error message of validating uint32 (apache#10193) MINOR: Format the revoking active log output in `StreamsPartitionAssignor` (apache#10242) KAFKA-12323 Follow-up: Refactor the unit test a bit (apache#10205) MINOR: Remove stack trace of the lock exception in a debug log4j (apache#10231) MINOR: Word count should account for extra whitespaces between words (apache#10229) MINOR; Small refactor in `GroupMetadata` (apache#10236) KAFKA-10340: Proactively close producer when cancelling source tasks (apache#10016) KAFKA-12329; kafka-reassign-partitions command should give a better error message when a topic does not exist (apache#10141) KAFKA-12254: Ensure MM2 creates topics with source topic configs (apache#10217) MINOR: fix kafka-metadata-shell.sh (apache#10226) KAFKA-12374: Add missing config sasl.mechanism.controller.protocol (apache#10199) KAFKA-10101: Fix edge cases in Log.recoverLog and LogManager.loadLogs (apache#8812) ...
…he#10137) Implements KIP-695 Reverts a previous behavior change to Consumer.poll and replaces it with a new Consumer.currentLag API, which returns the client's currently cached lag. Uses this new API to implement the desired task idling semantics improvement from KIP-695. Reverts fdcf8fb / KAFKA-10866: Add metadata to ConsumerRecords (apache#9836) Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Guozhang Wang <guozhang@apache.org>
Consumer#currentLag()
.it with a new Consumer.currentLag API, which returns the client's
currently cached lag. The reverted PR is KAFKA-10866: Add metadata to ConsumerRecords #9836 . The reverted Jira is KAFKA-10866 .
KAFKA-10867, but this PR needed to revert KAFKA-10866 and adapt KAFKA-10867.
Committer Checklist (excluded from commit message)