Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][client][PIP-389] Add a producer config to improve compression performance #23525

Open
wants to merge 4 commits into
base: master
Choose a base branch
from

Conversation

liangyepianzhou
Copy link
Contributor

@liangyepianzhou liangyepianzhou commented Oct 29, 2024

PIP: #23526

Motivation

The motivation of this PIP is to provide a way to improve the compression performance by skipping the compression of small messages.
We want to add a new configuration compressMinMsgBodySize to the producer configuration.
This configuration will allow the user to set the minimum size of the message body that will be compressed.
If the message body size is less than the compressMinMsgBodySize, the message will not be compressed.

Verifying this change

  • Make sure that the change passes the CI checks.

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (10MB)
  • Extended integration test for recovery after broker failure

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository:

@liangyepianzhou liangyepianzhou marked this pull request as draft October 29, 2024 08:34
@github-actions github-actions bot added doc-label-missing doc-required Your PR changes impact docs and you will update later. and removed doc-label-missing labels Oct 29, 2024
@liangyepianzhou liangyepianzhou changed the title [improve][client] Add a producer config to improve compaction performance [improve][client] Add a producer config to improve compression performance Oct 29, 2024
@lhotari
Copy link
Member

lhotari commented Oct 30, 2024

Please add the PIP number to the PR title as we usually do.

@BewareMyPower BewareMyPower changed the title [improve][client] Add a producer config to improve compression performance [improve][client][PIP-389] Add a producer config to improve compression performance Oct 30, 2024
@lhotari
Copy link
Member

lhotari commented Oct 30, 2024

@liangyepianzhou Regarding performance optimizations for compression in Pulsar, there's also work that should be done.
For example for gzip compression/decompression this is very inefficient:

public ByteBuf encode(ByteBuf source) {
byte[] array;
int length = source.readableBytes();
int sizeEstimate = (int) Math.ceil(source.readableBytes() * 1.001) + 14;
ByteBuf compressed = PulsarByteBufAllocator.DEFAULT.heapBuffer(sizeEstimate);
int offset = 0;
if (source.hasArray()) {
array = source.array();
offset = source.arrayOffset() + source.readerIndex();
} else {
// If it's a direct buffer, we need to copy it
array = new byte[length];
source.getBytes(source.readerIndex(), array);
}
Deflater deflater = this.deflater.get();
deflater.reset();
deflater.setInput(array, offset, length);
while (!deflater.needsInput()) {
deflate(deflater, compressed);
}
return compressed;
}
.
Another detail is that the current implementation isn't using "zero copy" approaches that are available.
For example in Snappy:
@Override
public ByteBuf encode(ByteBuf source) {
int uncompressedLength = source.readableBytes();
int maxLength = Snappy.maxCompressedLength(uncompressedLength);
ByteBuffer sourceNio = source.nioBuffer(source.readerIndex(), source.readableBytes());
ByteBuf target = PooledByteBufAllocator.DEFAULT.buffer(maxLength, maxLength);
ByteBuffer targetNio = target.nioBuffer(0, maxLength);
int compressedLength = 0;
try {
compressedLength = Snappy.compress(sourceNio, targetNio);
} catch (IOException e) {
log.error("Failed to compress to Snappy: {}", e.getMessage());
}
target.writerIndex(compressedLength);
return target;
}
@Override
public ByteBuf decode(ByteBuf encoded, int uncompressedLength) throws IOException {
ByteBuf uncompressed = PooledByteBufAllocator.DEFAULT.buffer(uncompressedLength, uncompressedLength);
ByteBuffer uncompressedNio = uncompressed.nioBuffer(0, uncompressedLength);
ByteBuffer encodedNio = encoded.nioBuffer(encoded.readerIndex(), encoded.readableBytes());
Snappy.uncompress(encodedNio, uncompressedNio);
uncompressed.writerIndex(uncompressedLength);
return uncompressed;
}

In BookKeeper, I added zero-copy for calculating checksums in apache/bookkeeper#4196. The ByteBufVisitor approach could be used to avoid copying source buffers to an extra nio buffer.
Calling Netty's io.netty.buffer.CompositeByteBuf#nioBuffer will allocate a new nio ByteBuffer in the heap and copy the content there. That's not very great from performance perspective, especially when we want to reduce allocations and garbage. With the ByteBufVisitor approach it's possible to read the source direct byte buffers without extra copies.
Have you considered in addressing this performance issue in the Pulsar message compression solution?

@liangyepianzhou
Copy link
Contributor Author

Have you considered in addressing this performance issue in the Pulsar message compression solution?

Sounds good, maybe I can try optimizing it in other PRs

@lhotari
Copy link
Member

lhotari commented Oct 30, 2024

Have you considered in addressing this performance issue in the Pulsar message compression solution?

Sounds good, maybe I can try optimizing it in other PRs

+1, In the Pulsar code base, we have a special module called microbench for microbenchmarks with JMH, https://github.com/apache/pulsar/tree/master/microbench . Testing performance with JMH could be useful for such improvements.

@liangyepianzhou
Copy link
Contributor Author

Have you considered in addressing this performance issue in the Pulsar message compression solution?

Sounds good, maybe I can try optimizing it in other PRs

+1, In the Pulsar code base, we have a special module called microbench for microbenchmarks with JMH, https://github.com/apache/pulsar/tree/master/microbench . Testing performance with JMH could be useful for such improvements.

Thanks for the reminder.

@liangyepianzhou
Copy link
Contributor Author

@liangyepianzhou Regarding performance optimizations for compression in Pulsar, there's also work that should be done. For example for gzip compression/decompression this is very inefficient:

public ByteBuf encode(ByteBuf source) {
byte[] array;
int length = source.readableBytes();
int sizeEstimate = (int) Math.ceil(source.readableBytes() * 1.001) + 14;
ByteBuf compressed = PulsarByteBufAllocator.DEFAULT.heapBuffer(sizeEstimate);
int offset = 0;
if (source.hasArray()) {
array = source.array();
offset = source.arrayOffset() + source.readerIndex();
} else {
// If it's a direct buffer, we need to copy it
array = new byte[length];
source.getBytes(source.readerIndex(), array);
}
Deflater deflater = this.deflater.get();
deflater.reset();
deflater.setInput(array, offset, length);
while (!deflater.needsInput()) {
deflate(deflater, compressed);
}
return compressed;
}

.
Another detail is that the current implementation isn't using "zero copy" approaches that are available.
For example in Snappy:

@Override
public ByteBuf encode(ByteBuf source) {
int uncompressedLength = source.readableBytes();
int maxLength = Snappy.maxCompressedLength(uncompressedLength);
ByteBuffer sourceNio = source.nioBuffer(source.readerIndex(), source.readableBytes());
ByteBuf target = PooledByteBufAllocator.DEFAULT.buffer(maxLength, maxLength);
ByteBuffer targetNio = target.nioBuffer(0, maxLength);
int compressedLength = 0;
try {
compressedLength = Snappy.compress(sourceNio, targetNio);
} catch (IOException e) {
log.error("Failed to compress to Snappy: {}", e.getMessage());
}
target.writerIndex(compressedLength);
return target;
}
@Override
public ByteBuf decode(ByteBuf encoded, int uncompressedLength) throws IOException {
ByteBuf uncompressed = PooledByteBufAllocator.DEFAULT.buffer(uncompressedLength, uncompressedLength);
ByteBuffer uncompressedNio = uncompressed.nioBuffer(0, uncompressedLength);
ByteBuffer encodedNio = encoded.nioBuffer(encoded.readerIndex(), encoded.readableBytes());
Snappy.uncompress(encodedNio, uncompressedNio);
uncompressed.writerIndex(uncompressedLength);
return uncompressed;
}

In BookKeeper, I added zero-copy for calculating checksums in apache/bookkeeper#4196. The ByteBufVisitor approach could be used to avoid copying source buffers to an extra nio buffer.
Calling Netty's io.netty.buffer.CompositeByteBuf#nioBuffer will allocate a new nio ByteBuffer in the heap and copy the content there. That's not very great from performance perspective, especially when we want to reduce allocations and garbage. With the ByteBufVisitor approach it's possible to read the source direct byte buffers without extra copies.
Have you considered in addressing this performance issue in the Pulsar message compression solution?

@lhotari I try to optimize it yesterday. but I found that Pulsar did not use CompositeByteBuf when sending messages. Single sending uses io.netty.buffer.UnpooledHeapByteBufmemory, while batch sending uses io.netty.buffer.PooledUnsafeDirectByteBufmemory. They all have memory addresses or
array, so the current implementation already uses zero copy. It seems that there is no need to optimize?
And the example you gave, CompressionCodecSnappyJNI.java, is a compression class used in testing. It seems that there is no need to compress it.

@lhotari
Copy link
Member

lhotari commented Nov 11, 2024

@lhotari I try to optimize it yesterday. but I found that Pulsar did not use CompositeByteBuf when sending messages. Single sending uses io.netty.buffer.UnpooledHeapByteBufmemory, while batch sending uses io.netty.buffer.PooledUnsafeDirectByteBufmemory. They all have memory addresses or
array, so the current implementation already uses zero copy. It seems that there is no need to optimize?
And the example you gave, CompressionCodecSnappyJNI.java, is a compression class used in testing. It seems that there is no need to compress it.

@liangyepianzhou I created #23586 to clarify the possible optimization.

@liangyepianzhou
Copy link
Contributor Author

@lhotari Do you have time to review the code changes for pip-389 that have been approved on the mailing list?
Thank you!

Copy link
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use Pulsar code style. IDE instructions: https://pulsar.apache.org/contribute/setup-ide/#configure-code-style

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
doc-required Your PR changes impact docs and you will update later.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants