Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][client] Fix for early hit beforeConsume for MultiTopicConsumer #23141

Merged

Conversation

coderzc
Copy link
Member

@coderzc coderzc commented Aug 8, 2024

Motivation

When using MultiTopicConsumer with ConsumerInterceptor , the beforeConsume method was hit many times
before receiving the first message. The root cause of this is that before the MultiTopicsConsumer receives the message, MultiTopicsConsumer has already completed receiving the message from the internal consumer and executed the beforeConsume method, so we can let beforeConsume be called in MultiTopicsConsumer to avoid this issue.

private void startReceivingMessages(List<ConsumerImpl<T>> newConsumers) {
if (log.isDebugEnabled()) {
log.debug("[{}] startReceivingMessages for {} new consumers in topics consumer, state: {}",
topic, newConsumers.size(), getState());
}
if (getState() == State.Ready) {
newConsumers.forEach(consumer -> {
consumer.increaseAvailablePermits(consumer.getConnectionHandler().cnx(),
consumer.getCurrentReceiverQueueSize());
internalPinnedExecutor.execute(() -> receiveMessageFromConsumer(consumer, true));
});
}
}
private void receiveMessageFromConsumer(ConsumerImpl<T> consumer, boolean batchReceive) {
if (duringSeek) {
log.info("[{}] Pause receiving messages for topic {} due to seek", subscription, consumer.getTopic());
return;
}
CompletableFuture<List<Message<T>>> messagesFuture;
if (batchReceive) {
messagesFuture = consumer.batchReceiveAsync().thenApply(msgs -> ((MessagesImpl<T>) msgs).getMessageList());
} else {
messagesFuture = consumer.receiveAsync().thenApply(Collections::singletonList);
}
messagesFuture.thenAcceptAsync(messages -> {

Modifications

  • Call beforeConsume method in MultiTopicsConsumer.
  • Wrap ConsumerInterceptors for internal consumer the and remove beforeConsume logic

Verifying this change

  • Make sure that the change passes the CI checks.

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (10MB)
  • Extended integration test for recovery after broker failure

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository:

Copy link

github-actions bot commented Aug 8, 2024

@coderzc Please add the following content to your PR description and select a checkbox:

- [ ] `doc` <!-- Your PR contains doc changes -->
- [ ] `doc-required` <!-- Your PR changes impact docs and you will update later -->
- [ ] `doc-not-needed` <!-- Your PR changes do not impact docs -->
- [ ] `doc-complete` <!-- Docs have been already added -->

@coderzc coderzc added type/bug The PR fixed a bug or issue reported a bug area/client labels Aug 8, 2024
@coderzc coderzc added this to the 3.4.0 milestone Aug 8, 2024
@github-actions github-actions bot added doc-not-needed Your PR changes do not impact docs and removed doc-label-missing labels Aug 8, 2024
@coderzc coderzc requested review from codelipenghui and poorbarcode and removed request for codelipenghui August 8, 2024 06:54
@coderzc coderzc self-assigned this Aug 8, 2024
Copy link
Contributor

@codelipenghui codelipenghui left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fix for beforeConsume method looks good.
@coderzc Could you please help also check other methods in the ConsumerInterceptor to make sure they are working as expected without issues like beforeConsume method.

@coderzc coderzc force-pushed the fix_multiTopicConsumer_beforeconsume branch from 8cccceb to 8eaf5c6 Compare August 9, 2024 05:14
@coderzc
Copy link
Member Author

coderzc commented Aug 9, 2024

The fix for beforeConsume method looks good. @coderzc Could you please help also check other methods in the ConsumerInterceptor to make sure they are working as expected without issues like beforeConsume method.

I checked other methods in the ConsumerInterceptor, they work well.

@coderzc coderzc force-pushed the fix_multiTopicConsumer_beforeconsume branch from 9bf5427 to 202b943 Compare August 9, 2024 08:24
@coderzc coderzc force-pushed the fix_multiTopicConsumer_beforeconsume branch from 7c3d04e to c575886 Compare August 12, 2024 02:54
@codecov-commenter
Copy link

codecov-commenter commented Aug 12, 2024

Codecov Report

Attention: Patch coverage is 72.72727% with 6 lines in your changes missing coverage. Please review.

Project coverage is 74.55%. Comparing base (bbc6224) to head (c575886).
Report is 603 commits behind head on master.

Files with missing lines Patch % Lines
...he/pulsar/client/impl/MultiTopicsConsumerImpl.java 72.72% 6 Missing ⚠️
Additional details and impacted files

Impacted file tree graph

@@             Coverage Diff              @@
##             master   #23141      +/-   ##
============================================
+ Coverage     73.57%   74.55%   +0.98%     
- Complexity    32624    34093    +1469     
============================================
  Files          1877     1919      +42     
  Lines        139502   144287    +4785     
  Branches      15299    15778     +479     
============================================
+ Hits         102638   107576    +4938     
+ Misses        28908    28467     -441     
- Partials       7956     8244     +288     
Flag Coverage Δ
inttests 27.87% <13.63%> (+3.28%) ⬆️
systests 24.76% <9.09%> (+0.43%) ⬆️
unittests 73.89% <72.72%> (+1.04%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
...he/pulsar/client/impl/MultiTopicsConsumerImpl.java 77.61% <72.72%> (-0.12%) ⬇️

... and 500 files with indirect coverage changes

@coderzc
Copy link
Member Author

coderzc commented Aug 13, 2024

@codelipenghui PTAL~

@codelipenghui codelipenghui merged commit c07b158 into apache:master Aug 14, 2024
52 checks passed
coderzc added a commit to coderzc/pulsar that referenced this pull request Aug 14, 2024
lhotari pushed a commit that referenced this pull request Aug 14, 2024
nikhil-ctds pushed a commit to datastax/pulsar that referenced this pull request Aug 16, 2024
nikhil-ctds pushed a commit to datastax/pulsar that referenced this pull request Aug 16, 2024
srinath-ctds pushed a commit to datastax/pulsar that referenced this pull request Aug 20, 2024
grssam pushed a commit to grssam/pulsar that referenced this pull request Sep 4, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants