-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
PIP 83 : Pulsar client: Message consumption with pooled buffer #10184
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
overall it looks good to me.
I left one comment in Schema API, PTAL
* @return the deserialized object | ||
*/ | ||
default T decode(ByteBuf bytes, byte[] schemaVersion) { | ||
return null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you please write the default implementation, that calls the other decode
method ?
otherwise custom schema may break if in the future we are going to relay more on this method
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
custom schema will not break, MessageImpl
already handles and calls default decode
. calling default decode from here will require ByteBuf conversion to byte[] which should be handled at top level and in this case we are handling at MessageImpl
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if you look at this from Pulsar point of view you are right.
but sometimes in your application or Pulsar IO Sink you have a Schema instance and you want to decode a given payload using the Schema.
In this case if you see that there is a decode(ByteBuf)
method and you already have a ByteBuf
you are tempted to use this new method, but it is not implemented for most of the system Schema and it is absolutely not implemented for new custom Schema.
this is why I believe that we should provide a default implementation that relies on the fact that the decode(byte[]....)
method is always implemented.
the default implementation is straightforward and I believe it is worth to add it
@@ -171,6 +174,12 @@ private String interpretMessage(Message<?> message, boolean displayHex) throws I | |||
} else if (value instanceof GenericRecord) { | |||
Map<String, Object> asMap = genericRecordToMap((GenericRecord) value); | |||
data = asMap.toString(); | |||
} else if (value instanceof ByteBuffer) { | |||
ByteBuffer payload = ((ByteBuffer)value); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about using the internal array of the ByteBuffer in case of "hasArray", offset=0 and len == remaining? I did the same in BytesKafkaSource
this way we can save a copy.
@@ -66,6 +66,13 @@ | |||
*/ | |||
byte[] getData(); | |||
|
|||
/** | |||
* Get the message payload size in bytes. | |||
* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should specify wether it's compressed or uncompressed size.
* the schema version to decode the object. null indicates using latest version. | ||
* @return the deserialized object | ||
*/ | ||
default T decode(ByteBuf bytes, byte[] schemaVersion) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since Schema
is also part of public API, we can't use Netty ByteBuf
. Instead we can use java.nio.ByteBuffer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I introduced ByteBuf
earlier as AbstractSchema already has the same API so, it won't impact other existing Schema implementation as all existing impl are extending AbstracSchema
.
But I agree. Schema
is part of public API and it should not have ByteBuf. So, changed to ByteBuffer
@@ -122,7 +123,9 @@ | |||
@Parameter(names = { "-st", "--schema-type"}, description = "Set a schema type on the consumer, it can be 'bytes' or 'auto_consume'") | |||
private String schematype = "bytes"; | |||
|
|||
|
|||
@Parameter(names = { "-pm", "--pool-messages" }, description = "Use the pooled message") | |||
private boolean poolMessages = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd say to not have it configurable for the tool. Just enabled it always.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed default value to true. let's keep this configuration for a release because for a small memory size client-host, they might want to use heap-memory as GC can help to make some space when internal queue size tuning is not right and all the memory is used.
@@ -167,6 +172,9 @@ | |||
|
|||
@Parameter(names = {"--batch-index-ack" }, description = "Enable or disable the batch index acknowledgment") | |||
public boolean batchIndexAck = false; | |||
|
|||
@Parameter(names = { "-pm", "--pool-messages" }, description = "Use the pooled message") | |||
private boolean poolMessages = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again, for tools, I'd keep it always enabled.
if (null == byteBuf) { | ||
return null; | ||
} else if(byteBuf.isDirect()){ | ||
return byteBuf.nioBuffer(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can return the NIO buffer even if it's on the heap
msg.payload = payload; | ||
payload.retain(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
msg.payload = payload; | |
payload.retain(); | |
msg.payload = payload.retain(); |
msg.cnx = cnx; | ||
msg.redeliveryCount = redeliveryCount; | ||
|
||
msg.poolMessage = poolMessage; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This portion is being repeated from the other init()
method. Is there a way to reuse that?
@@ -310,6 +378,11 @@ public boolean publishedEarlierThan(long timestamp) { | |||
if (msgMetadata.isNullValue()) { | |||
return null; | |||
} | |||
if (poolMessage) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need the special case here?
if (msgMetadata.isNullValue()) { | ||
return 0; | ||
} | ||
return poolMessage ? payload.readableBytes() : getData().length; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't payload.readableBytes()
work in both cases?
a5159c1
to
910354c
Compare
fix api, buffer-access, duplicate code
910354c
to
ab537da
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
great work
@eolivelli @merlimat addressed the feedback. |
I have already approved. |
Motivation
Pulsar client library should provide support for an API to allow the application to access message payload from pooled buffers. The library has to also provide an associated release API, to release and deallocate pooled buffers used by the message.
Modification
Message::release()
api