diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java index 00e6c2f78e317..783e971f391f3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java @@ -53,6 +53,7 @@ import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Reader; +import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SizeUnit; import org.apache.pulsar.client.impl.MessageImpl.SchemaState; import org.apache.pulsar.client.impl.ProducerImpl.OpSendMsg; @@ -550,6 +551,28 @@ public void testReaderChunkingConfiguration() throws Exception { assertEquals(consumer.conf.getExpireTimeOfIncompleteChunkedMessageMillis(), 12); } + @Test + public void testChunkSize() throws Exception { + final int maxMessageSize = 50; + final int payloadChunkSize = maxMessageSize - 32/* the default message metadata size for string schema */; + this.conf.setMaxMessageSize(maxMessageSize); + + final Producer producer = pulsarClient.newProducer(Schema.STRING) + .topic("my-property/my-ns/test-chunk-size") + .enableChunking(true) + .enableBatching(false) + .create(); + for (int size = 1; size <= maxMessageSize; size++) { + final MessageId messageId = producer.send(createMessagePayload(size)); + log.info("Send {} bytes to {}", size, messageId); + if (size <= payloadChunkSize) { + assertEquals(messageId.getClass(), MessageIdImpl.class); + } else { + assertEquals(messageId.getClass(), ChunkMessageIdImpl.class); + } + } + } + private String createMessagePayload(int size) { StringBuilder str = new StringBuilder(); Random rand = new Random(); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index 163a672d1c1a0..bb114e7f0e36b 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -441,7 +441,7 @@ public void sendAsync(Message message, SendCallback callback) { MessageImpl msg = (MessageImpl) message; MessageMetadata msgMetadata = msg.getMessageBuilder(); ByteBuf payload = msg.getDataBuffer(); - int uncompressedSize = payload.readableBytes(); + final int uncompressedSize = payload.readableBytes(); if (!canEnqueueRequest(callback, message.getSequenceId(), uncompressedSize)) { return; @@ -488,6 +488,10 @@ public void sendAsync(Message message, SendCallback callback) { return; } + // Update the message metadata before computing the payload chunk size to avoid a large message cannot be split + // into chunks. + final long sequenceId = updateMessageMetadata(msgMetadata, uncompressedSize); + // send in chunks int totalChunks; int payloadChunkSize; @@ -526,13 +530,6 @@ public void sendAsync(Message message, SendCallback callback) { try { synchronized (this) { int readStartIndex = 0; - long sequenceId; - if (!msgMetadata.hasSequenceId()) { - sequenceId = msgIdGeneratorUpdater.getAndIncrement(this); - msgMetadata.setSequenceId(sequenceId); - } else { - sequenceId = msgMetadata.getSequenceId(); - } String uuid = totalChunks > 1 ? String.format("%s-%d", producerName, sequenceId) : null; ChunkedMessageCtx chunkedMessageCtx = totalChunks > 1 ? ChunkedMessageCtx.get(totalChunks) : null; byte[] schemaVersion = totalChunks > 1 && msg.getMessageBuilder().hasSchemaVersion() @@ -554,7 +551,7 @@ public void sendAsync(Message message, SendCallback callback) { } serializeAndSendMessage(msg, payload, sequenceId, uuid, chunkId, totalChunks, readStartIndex, payloadChunkSize, compressedPayload, compressed, - compressedPayload.readableBytes(), uncompressedSize, callback, chunkedMessageCtx); + compressedPayload.readableBytes(), callback, chunkedMessageCtx); readStartIndex = ((chunkId + 1) * payloadChunkSize); } } @@ -567,6 +564,38 @@ public void sendAsync(Message message, SendCallback callback) { } } + /** + * Update the message metadata except those fields that will be updated for chunks later. + * + * @param msgMetadata + * @param uncompressedSize + * @return the sequence id + */ + private long updateMessageMetadata(final MessageMetadata msgMetadata, final int uncompressedSize) { + final long sequenceId; + if (!msgMetadata.hasSequenceId()) { + sequenceId = msgIdGeneratorUpdater.getAndIncrement(this); + msgMetadata.setSequenceId(sequenceId); + } else { + sequenceId = msgMetadata.getSequenceId(); + } + + if (!msgMetadata.hasPublishTime()) { + msgMetadata.setPublishTime(client.getClientClock().millis()); + + checkArgument(!msgMetadata.hasProducerName()); + + msgMetadata.setProducerName(producerName); + + if (conf.getCompressionType() != CompressionType.NONE) { + msgMetadata + .setCompression(CompressionCodecProvider.convertToWireProtocol(conf.getCompressionType())); + } + msgMetadata.setUncompressedSize(uncompressedSize); + } + return sequenceId; + } + @Override public int getNumOfPartitions() { return 0; @@ -583,7 +612,6 @@ private void serializeAndSendMessage(MessageImpl msg, ByteBuf compressedPayload, boolean compressed, int compressedPayloadSize, - int uncompressedSize, SendCallback callback, ChunkedMessageCtx chunkedMessageCtx) throws IOException { ByteBuf chunkPayload = compressedPayload; @@ -603,19 +631,6 @@ private void serializeAndSendMessage(MessageImpl msg, .setNumChunksFromMsg(totalChunks) .setTotalChunkMsgSize(compressedPayloadSize); } - if (!msgMetadata.hasPublishTime()) { - msgMetadata.setPublishTime(client.getClientClock().millis()); - - checkArgument(!msgMetadata.hasProducerName()); - - msgMetadata.setProducerName(producerName); - - if (conf.getCompressionType() != CompressionType.NONE) { - msgMetadata - .setCompression(CompressionCodecProvider.convertToWireProtocol(conf.getCompressionType())); - } - msgMetadata.setUncompressedSize(uncompressedSize); - } if (canAddToBatch(msg) && totalChunks <= 1) { if (canAddToCurrentBatch(msg)) { @@ -1483,7 +1498,11 @@ public int getMessageHeaderAndPayloadSize() { cmdHeader.markReaderIndex(); int totalSize = cmdHeader.readInt(); int cmdSize = cmdHeader.readInt(); - int msgHeadersAndPayloadSize = totalSize - cmdSize - 4; + // The totalSize includes: + // | cmdLength | cmdSize | magic and checksum | msgMetadataLength | msgMetadata | + // | --------- | ------- | ------------------ | ----------------- | ----------- | + // | 4 | | 6 | 4 | | + int msgHeadersAndPayloadSize = totalSize - 4 - cmdSize - 6 - 4; cmdHeader.resetReaderIndex(); return msgHeadersAndPayloadSize; } @@ -2214,8 +2233,8 @@ private void recoverProcessOpSendMsgFrom(ClientCnx cnx, MessageImpl from, long e /** * Check if final message size for non-batch and non-chunked messages is larger than max message size. */ - public boolean isMessageSizeExceeded(OpSendMsg op) { - if (op.msg != null && op.totalChunks <= 1) { + private boolean isMessageSizeExceeded(OpSendMsg op) { + if (op.msg != null && !conf.isChunkingEnabled()) { int messageSize = op.getMessageHeaderAndPayloadSize(); if (messageSize > ClientCnx.getMaxMessageSize()) { releaseSemaphoreForSendOp(op);