Skip to content

Commit

Permalink
Remove unnecessary locks (apache#8207)
Browse files Browse the repository at this point in the history
### Motivation
There are many unnecessary locks in MultiTopicsConsumerImpl, which affect performance.
BlockingQueue is inherently thread-safe, and there is no need to lock in many places.

### Modifications
Remove unnecessary locks
 
### Verifying this change
Use the perf tool, 3 * 8-core 16G nodes,recording time is about 2 minutes

1. Prepare a 3-node pulsar cluster and produce some data(topic with 4 partitions)
2. Use pulsar-perf on another machine,
3. `bin/pulsar-perf consume -u 'http://x.x.x.x:8080' -s my-sub-6 -sp Earliest -q 100000 persistent://public/default/p-topic`
Pressure test twice, the first time with the original one, and the second time to replace the pulsar-client-original.jar in the lib folder

before removing:
Aggregated throughput stats --- 11715556 records received --- 68813.420 msg/s --- 537.605 Mbit/s

after removing:
Aggregated throughput stats --- 25062077 records received --- 161656.814 msg/s --- 1262.944 Mbit/s
  • Loading branch information
315157973 authored and merlimat committed Nov 23, 2020
1 parent e852e45 commit 1daf44c
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import io.netty.util.Timeout;
import org.apache.pulsar.client.api.BatchReceivePolicy;
Expand Down Expand Up @@ -80,6 +82,7 @@ enum ConsumerType {
.newUpdater(ConsumerBase.class, "incomingMessagesSize");
protected volatile long incomingMessagesSize = 0;
protected volatile Timeout batchReceiveTimeout = null;
protected final Lock reentrantLock = new ReentrantLock();

protected ConsumerBase(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf,
int receiverQueueSize, ExecutorService listenerExecutor,
Expand Down Expand Up @@ -613,8 +616,7 @@ protected boolean canEnqueueMessage(Message<T> message) {
}

protected boolean enqueueMessageAndCheckBatchReceive(Message<T> message) {
if (canEnqueueMessage(message)) {
incomingMessages.add(message);
if (canEnqueueMessage(message) && incomingMessages.offer(message)) {
INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(
this, message.getData() == null ? 0 : message.getData().length);
}
Expand Down Expand Up @@ -678,19 +680,19 @@ protected void notifyPendingBatchReceivedCallBack() {
if (opBatchReceive == null || opBatchReceive.future == null) {
return;
}
notifyPendingBatchReceivedCallBack(opBatchReceive);
try {
reentrantLock.lock();
notifyPendingBatchReceivedCallBack(opBatchReceive);
} finally {
reentrantLock.unlock();
}
}

protected void notifyPendingBatchReceivedCallBack(OpBatchReceive<T> opBatchReceive) {
MessagesImpl<T> messages = getNewMessagesImpl();
Message<T> msgPeeked = incomingMessages.peek();
while (msgPeeked != null && messages.canAdd(msgPeeked)) {
Message<T> msg = null;
try {
msg = incomingMessages.poll(0L, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
// ignore
}
Message<T> msg = incomingMessages.poll();
if (msg != null) {
messageProcessed(msg);
Message<T> interceptMsg = beforeConsume(msg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,52 +218,39 @@ private void receiveMessageFromConsumer(ConsumerImpl<T> consumer) {
// Process the message, add to the queue and trigger listener or async callback
messageReceived(consumer, message);

// we're modifying pausedConsumers
lock.writeLock().lock();
try {
int size = incomingMessages.size();
if (size >= maxReceiverQueueSize
|| (size > sharedQueueResumeThreshold && !pausedConsumers.isEmpty())) {
// mark this consumer to be resumed later: if No more space left in shared queue,
// or if any consumer is already paused (to create fair chance for already paused consumers)
pausedConsumers.add(consumer);
} else {
// Schedule next receiveAsync() if the incoming queue is not full. Use a different thread to avoid
// recursion and stack overflow
client.eventLoopGroup().execute(() -> {
receiveMessageFromConsumer(consumer);
});
}
} finally {
lock.writeLock().unlock();
int size = incomingMessages.size();
if (size >= maxReceiverQueueSize
|| (size > sharedQueueResumeThreshold && !pausedConsumers.isEmpty())) {
// mark this consumer to be resumed later: if No more space left in shared queue,
// or if any consumer is already paused (to create fair chance for already paused consumers)
pausedConsumers.add(consumer);
} else {
// Schedule next receiveAsync() if the incoming queue is not full. Use a different thread to avoid
// recursion and stack overflow
client.eventLoopGroup().execute(() -> {
receiveMessageFromConsumer(consumer);
});
}
});
}

private void messageReceived(ConsumerImpl<T> consumer, Message<T> message) {
checkArgument(message instanceof MessageImpl);
lock.writeLock().lock();
try {
TopicMessageImpl<T> topicMessage = new TopicMessageImpl<>(
TopicMessageImpl<T> topicMessage = new TopicMessageImpl<>(
consumer.getTopic(), consumer.getTopicNameWithoutPartition(), message);

if (log.isDebugEnabled()) {
log.debug("[{}][{}] Received message from topics-consumer {}",
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Received message from topics-consumer {}",
topic, subscription, message.getMessageId());
}
}

// if asyncReceive is waiting : return message to callback without adding to incomingMessages queue
if (!pendingReceives.isEmpty()) {
CompletableFuture<Message<T>> receivedFuture = pendingReceives.poll();
unAckedMessageTracker.add(topicMessage.getMessageId());
listenerExecutor.execute(() -> receivedFuture.complete(topicMessage));
} else if (enqueueMessageAndCheckBatchReceive(topicMessage)) {
if (hasPendingBatchReceive()) {
notifyPendingBatchReceivedCallBack();
}
}
} finally {
lock.writeLock().unlock();
// if asyncReceive is waiting : return message to callback without adding to incomingMessages queue
CompletableFuture<Message<T>> receivedFuture = pendingReceives.poll();
if (receivedFuture != null) {
unAckedMessageTracker.add(topicMessage.getMessageId());
listenerExecutor.execute(() -> receivedFuture.complete(topicMessage));
} else if (enqueueMessageAndCheckBatchReceive(topicMessage) && hasPendingBatchReceive()) {
notifyPendingBatchReceivedCallBack();
}

if (listener != null) {
Expand Down Expand Up @@ -304,23 +291,17 @@ protected synchronized void messageProcessed(Message<?> msg) {
}

private void resumeReceivingFromPausedConsumersIfNeeded() {
lock.readLock().lock();
try {
if (incomingMessages.size() <= sharedQueueResumeThreshold && !pausedConsumers.isEmpty()) {
while (true) {
ConsumerImpl<T> consumer = pausedConsumers.poll();
if (consumer == null) {
break;
}

// if messages are readily available on consumer we will attempt to writeLock on the same thread
client.eventLoopGroup().execute(() -> {
receiveMessageFromConsumer(consumer);
});
if (incomingMessages.size() <= sharedQueueResumeThreshold && !pausedConsumers.isEmpty()) {
while (true) {
ConsumerImpl<T> consumer = pausedConsumers.poll();
if (consumer == null) {
break;
}

client.eventLoopGroup().execute(() -> {
receiveMessageFromConsumer(consumer);
});
}
} finally {
lock.readLock().unlock();
}
}

Expand Down Expand Up @@ -405,26 +386,16 @@ protected CompletableFuture<Messages<T>> internalBatchReceiveAsync() {
@Override
protected CompletableFuture<Message<T>> internalReceiveAsync() {
CompletableFuture<Message<T>> result = new CompletableFuture<>();
Message<T> message;
try {
lock.writeLock().lock();
message = incomingMessages.poll(0, TimeUnit.SECONDS);
if (message == null) {
pendingReceives.add(result);
} else {
INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, -message.getData().length);
checkState(message instanceof TopicMessageImpl);
unAckedMessageTracker.add(message.getMessageId());
resumeReceivingFromPausedConsumersIfNeeded();
result.complete(message);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
result.completeExceptionally(new PulsarClientException(e));
} finally {
lock.writeLock().unlock();
Message<T> message = incomingMessages.poll();
if (message == null) {
pendingReceives.add(result);
} else {
INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, -message.getData().length);
checkState(message instanceof TopicMessageImpl);
unAckedMessageTracker.add(message.getMessageId());
resumeReceivingFromPausedConsumersIfNeeded();
result.complete(message);
}

return result;
}

Expand Down Expand Up @@ -592,14 +563,9 @@ public CompletableFuture<Void> closeAsync() {
}

private void failPendingReceive() {
lock.readLock().lock();
try {
if (listenerExecutor != null && !listenerExecutor.isShutdown()) {
failPendingReceives(pendingReceives);
failPendingBatchReceives(pendingBatchReceives);
}
} finally {
lock.readLock().unlock();
if (listenerExecutor != null && !listenerExecutor.isShutdown()) {
failPendingReceives(pendingReceives);
failPendingBatchReceives(pendingBatchReceives);
}
}

Expand Down

0 comments on commit 1daf44c

Please sign in to comment.