Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PIP 83 : Pulsar client: Message consumption with pooled buffer #10184

Merged
merged 1 commit into from
Apr 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4029,4 +4029,4 @@ public void testPartitionTopicsOnSeparateListner() throws Exception {
blockedMessageLatch.countDown();
log.info("-- Exiting {} test --", methodName);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.impl;

import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.UUID.randomUUID;
import static org.apache.pulsar.broker.service.BrokerService.BROKER_SERVICE_CONFIGURATION_PATH;
import static org.mockito.Mockito.any;
Expand All @@ -30,12 +31,14 @@
import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import io.netty.buffer.ByteBuf;
import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
Expand All @@ -55,7 +58,6 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.Cleanup;

import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
Expand Down Expand Up @@ -132,6 +134,11 @@ public Object[][] subType() {
return new Object[][] { { SubscriptionType.Shared }, { SubscriptionType.Failover } };
}

@DataProvider(name = "booleanFlagProvider")
public Object[][] booleanFlagProvider() {
return new Object[][] { { true }, { false } };
}

/**
* Verifies unload namespace-bundle doesn't close shared connection used by other namespace-bundle.
*
Expand Down Expand Up @@ -918,4 +925,98 @@ public void testJsonSchemaProducerConsumerWithSpecifiedReaderAndWriter() throws
private static final class TestMessageObject{
private String value;
}
}

/**
* It validates pooled message consumption for batch and non-batch messages.
*
* @throws Exception
*/
@Test(dataProvider = "booleanFlagProvider")
public void testConsumerWithPooledMessages(boolean isBatchingEnabled) throws Exception {
log.info("-- Starting {} test --", methodName);

@Cleanup
PulsarClient newPulsarClient = PulsarClient.builder().serviceUrl(lookupUrl.toString()).build();

final String topic = "persistent://my-property/my-ns/testConsumerWithPooledMessages" + isBatchingEnabled;

@Cleanup
Consumer<ByteBuffer> consumer = newPulsarClient.newConsumer(Schema.BYTEBUFFER).topic(topic)
.subscriptionName("my-sub").poolMessages(true).subscribe();

@Cleanup
Producer<byte[]> producer = newPulsarClient.newProducer().topic(topic).enableBatching(isBatchingEnabled).create();

final int numMessages = 100;
for (int i = 0; i < numMessages; i++) {
producer.newMessage().value(("value-" + i).getBytes(UTF_8))
.eventTime((i + 1) * 100L).sendAsync();
}
producer.flush();

// Reuse pre-allocated pooled buffer to process every message
byte[] val = null;
int size = 0;
for (int i = 0; i < numMessages; i++) {
Message<ByteBuffer> msg = consumer.receive();
ByteBuffer value;
try {
value = msg.getValue();
int capacity = value.remaining();
// expand the size of buffer if needed
if (capacity > size) {
val = new byte[capacity];
size = capacity;
}
// read message into pooled buffer
value.get(val, 0, capacity);
// process the message
assertEquals(("value-" + i), new String(val, 0, capacity));
} finally {
msg.release();
}
}
consumer.close();
producer.close();
}

/**
* It verifies that expiry/redelivery of messages relesaes the messages without leak.
*
* @param isBatchingEnabled
* @throws Exception
*/
@Test(dataProvider = "booleanFlagProvider")
public void testPooledMessageWithAckTimeout(boolean isBatchingEnabled) throws Exception {
log.info("-- Starting {} test --", methodName);

@Cleanup
PulsarClient newPulsarClient = PulsarClient.builder().serviceUrl(lookupUrl.toString()).build();

final String topic = "persistent://my-property/my-ns/testPooledMessageWithAckTimeout" + isBatchingEnabled;

@Cleanup
ConsumerImpl<ByteBuffer> consumer = (ConsumerImpl<ByteBuffer>) newPulsarClient.newConsumer(Schema.BYTEBUFFER)
.topic(topic).subscriptionName("my-sub").poolMessages(true).subscribe();

@Cleanup
Producer<byte[]> producer = newPulsarClient.newProducer().topic(topic).enableBatching(isBatchingEnabled)
.create();

final int numMessages = 100;
for (int i = 0; i < numMessages; i++) {
producer.newMessage().value(("value-" + i).getBytes(UTF_8)).eventTime((i + 1) * 100L).sendAsync();
}
producer.flush();

retryStrategically((test) -> consumer.incomingMessages.peek() != null, 5, 500);
MessageImpl<ByteBuffer> msg = (MessageImpl) consumer.incomingMessages.peek();
assertNotNull(msg);
ByteBuf payload = ((MessageImpl) msg).getPayload();
assertNotEquals(payload.refCnt(), 0);
consumer.redeliverUnacknowledgedMessages();
assertEquals(payload.refCnt(), 0);
consumer.close();
producer.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -731,4 +731,14 @@ public interface ConsumerBuilder<T> extends Cloneable {
* @return
*/
ConsumerBuilder<T> expireTimeOfIncompleteChunkedMessage(long duration, TimeUnit unit);

/**
* Enable pooling of messages and the underlying data buffers.
* <p/>
* When pooling is enabled, the application is responsible for calling Message.release() after the handling of every
* received message. If “release()” is not called on a received message, there will be a memory leak. If an
* application attempts to use and already “released” message, it might experience undefined behavior (eg: memory
* corruption, deserialization error, etc.).
*/
ConsumerBuilder<T> poolMessages(boolean poolMessages);
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,13 @@ public interface Message<T> {
*/
byte[] getData();

/**
* Get the uncompressed message payload size in bytes.
*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should specify wether it's compressed or uncompressed size.

* @return size in bytes.
*/
int size();

/**
* Get the de-serialized value of the message, according the configured {@link Schema}.
*
Expand Down Expand Up @@ -217,4 +224,12 @@ public interface Message<T> {
* @return the name of cluster, from which the message is replicated.
*/
String getReplicatedFrom();

/**
* Release a message back to the pool. This is required only if the consumer was created with the option to pool
* messages, otherwise it will have no effect.
*
* @since 2.8.0
*/
void release();
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.api;

import static org.apache.pulsar.client.internal.DefaultImplementation.getBytes;
import java.nio.ByteBuffer;
import java.sql.Time;
import java.sql.Timestamp;
Expand Down Expand Up @@ -120,6 +121,22 @@ default T decode(byte[] bytes, byte[] schemaVersion) {
return decode(bytes);
}

/**
* Decode a ByteBuffer into an object using a given version. <br/>
*
* @param data
* the ByteBuffer to decode
* @param schemaVersion
* the schema version to decode the object. null indicates using latest version.
* @return the deserialized object
*/
default T decode(ByteBuffer data, byte[] schemaVersion) {
if (data == null) {
return null;
}
return decode(getBytes(data), schemaVersion);
}

/**
* @return an object that represents the Schema associated metadata
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -494,4 +494,24 @@ public static BatcherBuilder newKeyBasedBatcherBuilder() {
() -> (BatcherBuilder) getConstructor("org.apache.pulsar.client.impl.KeyBasedBatcherBuilder")
.newInstance());
}

/**
* Retrieves ByteBuffer data into byte[].
*
* @param byteBuffer
* @return
*/
public static byte[] getBytes(ByteBuffer byteBuffer) {
if (byteBuffer == null) {
return null;
}
if (byteBuffer.hasArray() && byteBuffer.arrayOffset() == 0
&& byteBuffer.array().length == byteBuffer.remaining()) {
return byteBuffer.array();
}
// Direct buffer is not backed by array and it needs to be read from direct memory
byte[] array = new byte[byteBuffer.remaining()];
byteBuffer.get(array);
return array;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.pulsar.client.cli;

import static org.apache.commons.lang3.StringUtils.isNotBlank;

import static org.apache.pulsar.client.internal.DefaultImplementation.getBytes;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
import com.beust.jcommander.Parameters;
Expand All @@ -31,6 +31,7 @@
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
Expand Down Expand Up @@ -122,7 +123,9 @@ public class CmdConsume {
@Parameter(names = { "-st", "--schema-type"}, description = "Set a schema type on the consumer, it can be 'bytes' or 'auto_consume'")
private String schematype = "bytes";


@Parameter(names = { "-pm", "--pool-messages" }, description = "Use the pooled message")
private boolean poolMessages = true;

private ClientBuilder clientBuilder;
private Authentication authentication;
private String serviceURL;
Expand Down Expand Up @@ -171,6 +174,8 @@ private String interpretMessage(Message<?> message, boolean displayHex) throws I
} else if (value instanceof GenericRecord) {
Map<String, Object> asMap = genericRecordToMap((GenericRecord) value);
data = asMap.toString();
} else if (value instanceof ByteBuffer) {
data = new String(getBytes((ByteBuffer) value));
} else {
data = value.toString();
}
Expand Down Expand Up @@ -233,7 +238,7 @@ private int consume(String topic) {
try {
ConsumerBuilder<?> builder;
PulsarClient client = clientBuilder.build();
Schema<?> schema = Schema.BYTES;
Schema<?> schema = poolMessages ? Schema.BYTEBUFFER : Schema.BYTES;
if ("auto_consume".equals(schematype)) {
schema = Schema.AUTO_CONSUME();
} else if (!"bytes".equals(schematype)) {
Expand All @@ -243,7 +248,8 @@ private int consume(String topic) {
.subscriptionName(this.subscriptionName)
.subscriptionType(subscriptionType)
.subscriptionMode(subscriptionMode)
.subscriptionInitialPosition(subscriptionInitialPosition);
.subscriptionInitialPosition(subscriptionInitialPosition)
.poolMessages(poolMessages);

if (isRegex) {
builder.topicsPattern(Pattern.compile(topic));
Expand Down Expand Up @@ -275,15 +281,19 @@ private int consume(String topic) {
if (msg == null) {
LOG.debug("No message to consume after waiting for 5 seconds.");
} else {
numMessagesConsumed += 1;
if (!hideContent) {
System.out.println(MESSAGE_BOUNDARY);
String output = this.interpretMessage(msg, displayHex);
System.out.println(output);
} else if (numMessagesConsumed % 1000 == 0) {
System.out.println("Received " + numMessagesConsumed + " messages");
try {
numMessagesConsumed += 1;
if (!hideContent) {
System.out.println(MESSAGE_BOUNDARY);
String output = this.interpretMessage(msg, displayHex);
System.out.println(output);
} else if (numMessagesConsumed % 1000 == 0) {
System.out.println("Received " + numMessagesConsumed + " messages");
}
consumer.acknowledge(msg);
} finally {
msg.release();
}
consumer.acknowledge(msg);
}
}
client.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -908,23 +908,28 @@ protected boolean hasPendingBatchReceive() {
}

protected void increaseIncomingMessageSize(final Message<?> message) {
INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(
this, message.getData() == null ? 0 : message.getData().length);
INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, message.size());
}

protected void resetIncomingMessageSize() {
INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0);
}

protected void decreaseIncomingMessageSize(final Message<?> message) {
INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this,
(message.getData() != null) ? -message.getData().length : 0);
INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, -message.size());
}

public long getIncomingMessageSize() {
return INCOMING_MESSAGES_SIZE_UPDATER.get(this);
}

protected void clearIncomingMessages() {
// release messages if they are pooled messages
incomingMessages.forEach(Message::release);
incomingMessages.clear();
resetIncomingMessageSize();
}

protected abstract void completeOpBatchReceive(OpBatchReceive<T> op);

private ExecutorService getExecutor(Message<T> msg) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -458,4 +458,9 @@ public ConsumerBuilder<T> expireTimeOfIncompleteChunkedMessage(long duration, Ti
return this;
}

@Override
public ConsumerBuilder<T> poolMessages(boolean poolMessages) {
conf.setPoolMessages(poolMessages);
return this;
}
}
Loading