Skip to content

Commit

Permalink
[fix][Java Client] Fix large message sometimes cannot be split into c…
Browse files Browse the repository at this point in the history
…hunks after PIP-132 (#16196)

Fixes #16195

### Motivation

[PIP-132](#14007) considers the
message metadata size when computing the payload chunk size and the
number of chunks. However, it could make some messages whose size is
less than `maxMessageSize` cannot be sent. There are two reasons:
1. The `MessageMetadata` will be updated after computing the payload
   chunk size, i.e. the actual metadata size would be greater.
2. `OpSendMsg#getMessageHeaderAndPayloadSize` doesn't exclude all bytes
   other than the metadata and payload, e.g. the 4 bytes checksum field.

For example, if the max message size is 100, send a string whose size is
60 with chunking enabled.
1. The initial metadata size is 25 so the chunk size is 75, the message
   won't be spit into chunks.
2. After `serializeAndSendMessage`, the metadata size becomes 32, so the
   serialized header's total size is 4 + 8 + 6 + 4 + 32 = 54, and the
  total size is 54 + 60 = 114, see `headerContentSize` in
  `serializeCommandSendWithSize`.
3. In `getMessageHeaderAndPayloadSize`, the returned value is computed
   by 114 - 8 - 4 = 102 > 100. The 6 bytes magic and checksum and 4
   bytes metadata length field are not included.

### Modifications

- Update the message metadata before computing the chunk size.
- Compute the correct size in `getMessageHeaderAndPayloadSize`.

### Verifying this change

Add `testChunkSize` to verify all sizes in range [1, maxMessageSize] can
be sent successfully when chunking is enabled.
  • Loading branch information
BewareMyPower authored Jun 28, 2022
1 parent b596a9c commit 7421589
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SizeUnit;
import org.apache.pulsar.client.impl.MessageImpl.SchemaState;
import org.apache.pulsar.client.impl.ProducerImpl.OpSendMsg;
Expand Down Expand Up @@ -550,6 +551,28 @@ public void testReaderChunkingConfiguration() throws Exception {
assertEquals(consumer.conf.getExpireTimeOfIncompleteChunkedMessageMillis(), 12);
}

@Test
public void testChunkSize() throws Exception {
final int maxMessageSize = 50;
final int payloadChunkSize = maxMessageSize - 32/* the default message metadata size for string schema */;
this.conf.setMaxMessageSize(maxMessageSize);

final Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic("my-property/my-ns/test-chunk-size")
.enableChunking(true)
.enableBatching(false)
.create();
for (int size = 1; size <= maxMessageSize; size++) {
final MessageId messageId = producer.send(createMessagePayload(size));
log.info("Send {} bytes to {}", size, messageId);
if (size <= payloadChunkSize) {
assertEquals(messageId.getClass(), MessageIdImpl.class);
} else {
assertEquals(messageId.getClass(), ChunkMessageIdImpl.class);
}
}
}

private String createMessagePayload(int size) {
StringBuilder str = new StringBuilder();
Random rand = new Random();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ public void sendAsync(Message<?> message, SendCallback callback) {
MessageImpl<?> msg = (MessageImpl<?>) message;
MessageMetadata msgMetadata = msg.getMessageBuilder();
ByteBuf payload = msg.getDataBuffer();
int uncompressedSize = payload.readableBytes();
final int uncompressedSize = payload.readableBytes();

if (!canEnqueueRequest(callback, message.getSequenceId(), uncompressedSize)) {
return;
Expand Down Expand Up @@ -488,6 +488,10 @@ public void sendAsync(Message<?> message, SendCallback callback) {
return;
}

// Update the message metadata before computing the payload chunk size to avoid a large message cannot be split
// into chunks.
final long sequenceId = updateMessageMetadata(msgMetadata, uncompressedSize);

// send in chunks
int totalChunks;
int payloadChunkSize;
Expand Down Expand Up @@ -526,13 +530,6 @@ public void sendAsync(Message<?> message, SendCallback callback) {
try {
synchronized (this) {
int readStartIndex = 0;
long sequenceId;
if (!msgMetadata.hasSequenceId()) {
sequenceId = msgIdGeneratorUpdater.getAndIncrement(this);
msgMetadata.setSequenceId(sequenceId);
} else {
sequenceId = msgMetadata.getSequenceId();
}
String uuid = totalChunks > 1 ? String.format("%s-%d", producerName, sequenceId) : null;
ChunkedMessageCtx chunkedMessageCtx = totalChunks > 1 ? ChunkedMessageCtx.get(totalChunks) : null;
byte[] schemaVersion = totalChunks > 1 && msg.getMessageBuilder().hasSchemaVersion()
Expand All @@ -554,7 +551,7 @@ public void sendAsync(Message<?> message, SendCallback callback) {
}
serializeAndSendMessage(msg, payload, sequenceId, uuid, chunkId, totalChunks,
readStartIndex, payloadChunkSize, compressedPayload, compressed,
compressedPayload.readableBytes(), uncompressedSize, callback, chunkedMessageCtx);
compressedPayload.readableBytes(), callback, chunkedMessageCtx);
readStartIndex = ((chunkId + 1) * payloadChunkSize);
}
}
Expand All @@ -567,6 +564,38 @@ public void sendAsync(Message<?> message, SendCallback callback) {
}
}

/**
* Update the message metadata except those fields that will be updated for chunks later.
*
* @param msgMetadata
* @param uncompressedSize
* @return the sequence id
*/
private long updateMessageMetadata(final MessageMetadata msgMetadata, final int uncompressedSize) {
final long sequenceId;
if (!msgMetadata.hasSequenceId()) {
sequenceId = msgIdGeneratorUpdater.getAndIncrement(this);
msgMetadata.setSequenceId(sequenceId);
} else {
sequenceId = msgMetadata.getSequenceId();
}

if (!msgMetadata.hasPublishTime()) {
msgMetadata.setPublishTime(client.getClientClock().millis());

checkArgument(!msgMetadata.hasProducerName());

msgMetadata.setProducerName(producerName);

if (conf.getCompressionType() != CompressionType.NONE) {
msgMetadata
.setCompression(CompressionCodecProvider.convertToWireProtocol(conf.getCompressionType()));
}
msgMetadata.setUncompressedSize(uncompressedSize);
}
return sequenceId;
}

@Override
public int getNumOfPartitions() {
return 0;
Expand All @@ -583,7 +612,6 @@ private void serializeAndSendMessage(MessageImpl<?> msg,
ByteBuf compressedPayload,
boolean compressed,
int compressedPayloadSize,
int uncompressedSize,
SendCallback callback,
ChunkedMessageCtx chunkedMessageCtx) throws IOException {
ByteBuf chunkPayload = compressedPayload;
Expand All @@ -603,19 +631,6 @@ private void serializeAndSendMessage(MessageImpl<?> msg,
.setNumChunksFromMsg(totalChunks)
.setTotalChunkMsgSize(compressedPayloadSize);
}
if (!msgMetadata.hasPublishTime()) {
msgMetadata.setPublishTime(client.getClientClock().millis());

checkArgument(!msgMetadata.hasProducerName());

msgMetadata.setProducerName(producerName);

if (conf.getCompressionType() != CompressionType.NONE) {
msgMetadata
.setCompression(CompressionCodecProvider.convertToWireProtocol(conf.getCompressionType()));
}
msgMetadata.setUncompressedSize(uncompressedSize);
}

if (canAddToBatch(msg) && totalChunks <= 1) {
if (canAddToCurrentBatch(msg)) {
Expand Down Expand Up @@ -1483,7 +1498,11 @@ public int getMessageHeaderAndPayloadSize() {
cmdHeader.markReaderIndex();
int totalSize = cmdHeader.readInt();
int cmdSize = cmdHeader.readInt();
int msgHeadersAndPayloadSize = totalSize - cmdSize - 4;
// The totalSize includes:
// | cmdLength | cmdSize | magic and checksum | msgMetadataLength | msgMetadata |
// | --------- | ------- | ------------------ | ----------------- | ----------- |
// | 4 | | 6 | 4 | |
int msgHeadersAndPayloadSize = totalSize - 4 - cmdSize - 6 - 4;
cmdHeader.resetReaderIndex();
return msgHeadersAndPayloadSize;
}
Expand Down Expand Up @@ -2214,8 +2233,8 @@ private void recoverProcessOpSendMsgFrom(ClientCnx cnx, MessageImpl from, long e
/**
* Check if final message size for non-batch and non-chunked messages is larger than max message size.
*/
public boolean isMessageSizeExceeded(OpSendMsg op) {
if (op.msg != null && op.totalChunks <= 1) {
private boolean isMessageSizeExceeded(OpSendMsg op) {
if (op.msg != null && !conf.isChunkingEnabled()) {
int messageSize = op.getMessageHeaderAndPayloadSize();
if (messageSize > ClientCnx.getMaxMessageSize()) {
releaseSemaphoreForSendOp(op);
Expand Down

0 comments on commit 7421589

Please sign in to comment.