diff --git a/pulsar-client-cpp/lib/ProducerImpl.cc b/pulsar-client-cpp/lib/ProducerImpl.cc index 8c87086297f8d..20133c50fc932 100644 --- a/pulsar-client-cpp/lib/ProducerImpl.cc +++ b/pulsar-client-cpp/lib/ProducerImpl.cc @@ -417,37 +417,18 @@ void ProducerImpl::sendAsyncWithStatsUpdate(const Message& msg, const SendCallba callback(result, {}); }; + auto& msgMetadata = msg.impl_->metadata; const bool compressed = !canAddToBatch(msg); const auto payload = compressed ? applyCompression(uncompressedPayload, conf_.getCompressionType()) : uncompressedPayload; const auto compressedSize = static_cast(payload.readableBytes()); const auto maxMessageSize = static_cast(ClientConnection::getMaxMessageSize()); - if (compressed && compressedSize > ClientConnection::getMaxMessageSize() && !chunkingEnabled_) { - LOG_WARN(getName() << " - compressed Message payload size " << payload.readableBytes() - << " cannot exceed " << ClientConnection::getMaxMessageSize() - << " bytes unless chunking is enabled"); - handleFailedResult(ResultMessageTooBig); - return; - } - - auto& msgMetadata = msg.impl_->metadata; if (!msgMetadata.has_replicated_from() && msgMetadata.has_producer_name()) { handleFailedResult(ResultInvalidMessage); return; } - const int totalChunks = - canAddToBatch(msg) ? 1 : getNumOfChunks(compressedSize, ClientConnection::getMaxMessageSize()); - // Each chunk should be sent individually, so try to acquire extra permits for chunks. - for (int i = 0; i < (totalChunks - 1); i++) { - const auto result = canEnqueueRequest(0); // size is 0 because the memory has already reserved - if (result != ResultOk) { - handleFailedResult(result); - return; - } - } - Lock lock(mutex_); uint64_t sequenceId; if (!msgMetadata.has_sequence_id()) { @@ -457,6 +438,31 @@ void ProducerImpl::sendAsyncWithStatsUpdate(const Message& msg, const SendCallba } setMessageMetadata(msg, sequenceId, uncompressedSize); + auto payloadChunkSize = maxMessageSize; + int totalChunks; + if (!compressed || !chunkingEnabled_) { + totalChunks = 1; + } else { + const auto metadataSize = static_cast(msgMetadata.ByteSizeLong()); + if (metadataSize >= maxMessageSize) { + LOG_WARN(getName() << " - metadata size " << metadataSize << " cannot exceed " << maxMessageSize + << " bytes"); + handleFailedResult(ResultMessageTooBig); + return; + } + payloadChunkSize = maxMessageSize - metadataSize; + totalChunks = getNumOfChunks(compressedSize, payloadChunkSize); + } + + // Each chunk should be sent individually, so try to acquire extra permits for chunks. + for (int i = 0; i < (totalChunks - 1); i++) { + const auto result = canEnqueueRequest(0); // size is 0 because the memory has already reserved + if (result != ResultOk) { + handleFailedResult(result); + return; + } + } + if (canAddToBatch(msg)) { // Batching is enabled and the message is not delayed if (!batchMessageContainer_->hasEnoughSpace(msg)) { @@ -508,7 +514,7 @@ void ProducerImpl::sendAsyncWithStatsUpdate(const Message& msg, const SendCallba if (sendChunks) { msgMetadata.set_chunk_id(chunkId); } - const uint32_t endIndex = std::min(compressedSize, beginIndex + maxMessageSize); + const uint32_t endIndex = std::min(compressedSize, beginIndex + payloadChunkSize); auto chunkedPayload = payload.slice(beginIndex, endIndex - beginIndex); beginIndex = endIndex; @@ -517,10 +523,26 @@ void ProducerImpl::sendAsyncWithStatsUpdate(const Message& msg, const SendCallba handleFailedResult(ResultCryptoError); return; } + OpSendMsg op{msgMetadata, encryptedPayload, (chunkId == totalChunks - 1) ? callback : nullptr, + producerId_, sequenceId, conf_.getSendTimeout(), + 1, uncompressedSize}; + + if (!chunkingEnabled_) { + const uint32_t msgMetadataSize = op.metadata_.ByteSize(); + const uint32_t payloadSize = op.payload_.readableBytes(); + const uint32_t msgHeadersAndPayloadSize = msgMetadataSize + payloadSize; + if (msgHeadersAndPayloadSize > maxMessageSize) { + lock.unlock(); + releaseSemaphoreForSendOp(op); + LOG_WARN(getName() + << " - compressed Message size " << msgHeadersAndPayloadSize << " cannot exceed " + << maxMessageSize << " bytes unless chunking is enabled"); + handleFailedResult(ResultMessageTooBig); + return; + } + } - sendMessage(OpSendMsg{msgMetadata, encryptedPayload, - (chunkId == totalChunks - 1) ? callback : nullptr, producerId_, sequenceId, - conf_.getSendTimeout(), 1, uncompressedSize}); + sendMessage(op); } } } diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc b/pulsar-client-cpp/tests/BasicEndToEndTest.cc index 5c4f7d216237a..3431978a7ecc8 100644 --- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc +++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc @@ -593,8 +593,8 @@ TEST(BasicEndToEndTest, testMessageTooBig) { result = producer.send(msg); ASSERT_EQ(ResultMessageTooBig, result); - // Anything up to MaxMessageSize should be allowed - size = ClientConnection::getMaxMessageSize(); + // Anything up to MaxMessageSize - MetadataSize should be allowed + size = ClientConnection::getMaxMessageSize() - 32; /*the default message metadata size for string schema*/ msg = MessageBuilder().setAllocatedContent(content, size).build(); result = producer.send(msg); ASSERT_EQ(ResultOk, result); diff --git a/pulsar-client-cpp/tests/ProducerTest.cc b/pulsar-client-cpp/tests/ProducerTest.cc index 6315ac16f5145..d351ee9cdbc57 100644 --- a/pulsar-client-cpp/tests/ProducerTest.cc +++ b/pulsar-client-cpp/tests/ProducerTest.cc @@ -34,6 +34,9 @@ using namespace pulsar; static const std::string serviceUrl = "pulsar://localhost:6650"; static const std::string adminUrl = "http://localhost:8080/"; +// See the `maxMessageSize` config in test-conf/standalone-ssl.conf +static constexpr size_t maxMessageSize = 1024000; + TEST(ProducerTest, producerNotInitialized) { Producer producer; @@ -211,6 +214,63 @@ TEST(ProducerTest, testBacklogQuotasExceeded) { client.close(); } +class ProducerTest : public ::testing::TestWithParam {}; + +TEST_P(ProducerTest, testMaxMessageSize) { + Client client(serviceUrl); + + const std::string topic = "ProducerTest-NoBatchMaxMessageSize-" + std::to_string(time(nullptr)); + + Consumer consumer; + ASSERT_EQ(ResultOk, client.subscribe(topic, "sub", consumer)); + + Producer producer; + ProducerConfiguration conf; + conf.setBatchingEnabled(GetParam()); + ASSERT_EQ(ResultOk, client.createProducer(topic, conf, producer)); + + std::string msg = std::string(maxMessageSize / 2, 'a'); + ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent(msg).build())); + Message message; + ASSERT_EQ(ResultOk, consumer.receive(message)); + ASSERT_EQ(msg, message.getDataAsString()); + + std::string orderKey = std::string(maxMessageSize, 'a'); + ASSERT_EQ(ResultMessageTooBig, producer.send(MessageBuilder().setOrderingKey(orderKey).build())); + + ASSERT_EQ(ResultMessageTooBig, + producer.send(MessageBuilder().setContent(std::string(maxMessageSize, 'b')).build())); + + client.close(); +} + +TEST_P(ProducerTest, testChunkingMaxMessageSize) { + Client client(serviceUrl); + + const std::string topic = "ProducerTest-ChunkingMaxMessageSize-" + std::to_string(time(nullptr)); + + Consumer consumer; + ASSERT_EQ(ResultOk, client.subscribe(topic, "sub", consumer)); + + Producer producer; + ProducerConfiguration conf; + conf.setBatchingEnabled(false); + conf.setChunkingEnabled(true); + ASSERT_EQ(ResultOk, client.createProducer(topic, conf, producer)); + + std::string orderKey = std::string(maxMessageSize, 'a'); + ASSERT_EQ(ResultMessageTooBig, producer.send(MessageBuilder().setOrderingKey(orderKey).build())); + + std::string msg = std::string(2 * maxMessageSize + 10, 'b'); + Message message; + ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent(msg).build())); + ASSERT_EQ(ResultOk, consumer.receive(message)); + ASSERT_EQ(msg, message.getDataAsString()); + ASSERT_LE(1L, message.getMessageId().entryId()); + + client.close(); +} + TEST(ProducerTest, testExclusiveProducer) { Client client(serviceUrl); @@ -234,3 +294,5 @@ TEST(ProducerTest, testExclusiveProducer) { producerConfiguration3.setProducerName("p-name-3"); ASSERT_EQ(ResultProducerBusy, client.createProducer(topicName, producerConfiguration3, producer3)); } + +INSTANTIATE_TEST_CASE_P(Pulsar, ProducerTest, ::testing::Values(true, false));