-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[PIP-38] Support batch receive in java client. #4621
[PIP-38] Support batch receive in java client. #4621
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@codelipenghui this is a great change. left a few comments.
Additionally it might be good to have a few more followup changes to optimize this further.
The current pulsar client breaks a message batch to individual messages and collect multiple message into a Messages
. There is a lot of unuseful object conversations.
Ideally the pulsar client implementation should
a) keep a queue of Messages
. Each Messages
is a message batch or multiple message batches.
b) on receiving individual message, it polls a Messages
from the queue, and poll a message out of the Messages
.
This can allow lazy deserialization and object creation, and it will increase the throughput using batch receive api because your cpu cycles can be reduced.
/** | ||
* Max size of message for a single batch receive, 0 or negative means no limit. | ||
*/ | ||
private long maxSizeOfMessages; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maxSizeOfMessages sounds a bit confusing.
I would suggest maxNumMessages
and maxNumBytes
to replace maxNumberOfMessages
and maxSizeOfMessages
. thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
* client.newConsumer().batchReceivePolicy(BatchReceivePolicy.builder() | ||
* .maxNumberOfMessages(100) | ||
* .maxSizeOfMessages(5 * 1024 * 1024) | ||
* .timeout(100) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is better to have a builder method timeout(long timeout, TimeUnit timeoutUnit)
rather than having two separated methods.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
@codelipenghui Since this is a significant addition to the client API, can you create a PIP with description and examples of the code? |
@merlimat I have already move it to 2.5.0 and will create a PIP soon. |
@merlimat @sijie I have already create PIP-38 in wiki and google doc, please take a look. Hope to get your advice on google doc, i will sync the update to the PIP wiki. |
@sijie I have addressed your comments, please review again. |
run java8 tests |
6 similar comments
run java8 tests |
run java8 tests |
run java8 tests |
run java8 tests |
run java8 tests |
run java8 tests |
@codelipenghui can you send an email to the dev@ mailing list to start the discussion? |
@sijie Oh, sorry i forgot it, i will send a email soon. |
@merlimat |
run java8 tests |
run java8 tests |
fee1af0
to
d201d8a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@codelipenghui overall looks good. left some comments there.
* Max number of bytes: 10MB | ||
* Timeout: 100ms | ||
*/ | ||
public static final BatchReceivePolicy DEFAULT_POLICY = new BatchReceivePolicy( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in general I am not in favor of using "number of messages" in any configuration or policies. In a multi-tenant system, message size varies between tenants and applications. so I would actually remove the limit of number of message just rely on number of bytes for a default policy.
Hence my recommendation would be:
BatchReceivePolicy DEFAULT_POLICY = new BatchReceivePolicy(-1, 10 * 1024 * 1024, 100, TimeUnit.MILLISECONDS);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fix it
/** | ||
* Max number of messages for a single batch receive, 0 or negative means no limit. | ||
*/ | ||
private int maxNumMessages; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since you already used builder pattern, just make these variables final
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fix it
/** | ||
* Get the list {@link Message} | ||
*/ | ||
List<Message<T>> getMessageList(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need to expose this? my feeling is to avoid exposing such method until it is really needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove
@@ -314,6 +315,14 @@ public ConsumerBuilderImpl(PulsarClientImpl client, Schema<T> schema) { | |||
return this; | |||
} | |||
|
|||
@Override | |||
public ConsumerBuilder<T> batchReceivePolicy(BatchReceivePolicy batchReceivePolicy) { | |||
checkArgument(batchReceivePolicy != null, "batchReceivePolicy must not be null."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@codelipenghui we have already moved all the validation to ConsumerConfigurationData
. Can you move the validation there?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see all the validations are still in ConsumerBuilderImpl.
/** | ||
* Max bytes of messages for a single batch receive, 0 or negative means no limit. | ||
*/ | ||
private long maxNumBytes; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need long
here. an int
should be good enough. Because you cannot really hold a "long"-sized buffer in memory.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fix it
} catch (InterruptedException | ExecutionException e) { | ||
State state = getState(); | ||
if (state != State.Closing && state != State.Closed) { | ||
stats.incrementNumReceiveFailed(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need a new metric for batchReceive?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, i will add metrics for batch receive.
protected CompletableFuture<Messages<T>> internalBatchReceiveAsync() { | ||
CompletableFuture<Messages<T>> result = new CompletableFuture<>(); | ||
try { | ||
lock.writeLock().lock(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we use a writeLock or a readLock?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use read lock may cause Messages return early(have not reached capacity yet), so use write lock here.
try { | ||
msg = incomingMessages.poll(0L, TimeUnit.MILLISECONDS); | ||
} catch (InterruptedException e) { | ||
// ignore |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@codelipenghui Can we just use poll()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes fix it
void notifyPendingBatchReceivedCallBack(OpBatchReceive<T> opBatchReceive) { | ||
MessagesImpl<T> messages = new MessagesImpl<>(batchReceivePolicy.getMaxNumMessages(), | ||
batchReceivePolicy.getMaxNumBytes()); | ||
Message<T> msgPeeked = incomingMessages.peek(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see the similar pattern in a few places. Can we make a function for it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
@@ -80,6 +91,11 @@ protected ConsumerBase(PulsarClientImpl client, String topic, ConsumerConfigurat | |||
this.pendingReceives = Queues.newConcurrentLinkedQueue(); | |||
this.schema = schema; | |||
this.interceptors = interceptors; | |||
this.batchReceivePolicy = conf.getBatchReceivePolicy(); | |||
this.pendingBatchReceives = Queues.newConcurrentLinkedQueue(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you delay the creation of this queue util the first batchReceive
is called? I am try to reduce creating the queue if the consumer is not using batchReceive
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fix it
4e7a9ec
to
5f5692f
Compare
@sijie rebased |
run java8 tests |
1 similar comment
run java8 tests |
Anyone working on adding the support for batchRecieve to Golang client? |
Motivation
Support messages batch receiving, some application scenarios can be made simpler. Users often increase application throughput through batch operations. For example, batch insert or update database.
At present, we provide the ability to receive a single message. If users want to take advantage of batch operating advantages, need to implement a message collector him self. So this proposal aims to provide a universal interface and mechanism for batch receiving messages.
For example:
Verifying this change
Added new UT to verify this change.
Does this pull request potentially affect one of the following parts:
If
yes
was chosen, please highlight the changesDocumentation