diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java index d762c2b262558..db1b661d56453 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java @@ -31,6 +31,8 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLongFieldUpdater; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import io.netty.util.Timeout; import org.apache.pulsar.client.api.BatchReceivePolicy; @@ -80,6 +82,7 @@ enum ConsumerType { .newUpdater(ConsumerBase.class, "incomingMessagesSize"); protected volatile long incomingMessagesSize = 0; protected volatile Timeout batchReceiveTimeout = null; + protected final Lock reentrantLock = new ReentrantLock(); protected ConsumerBase(PulsarClientImpl client, String topic, ConsumerConfigurationData conf, int receiverQueueSize, ExecutorService listenerExecutor, @@ -613,8 +616,7 @@ protected boolean canEnqueueMessage(Message message) { } protected boolean enqueueMessageAndCheckBatchReceive(Message message) { - if (canEnqueueMessage(message)) { - incomingMessages.add(message); + if (canEnqueueMessage(message) && incomingMessages.offer(message)) { INCOMING_MESSAGES_SIZE_UPDATER.addAndGet( this, message.getData() == null ? 0 : message.getData().length); } @@ -678,19 +680,19 @@ protected void notifyPendingBatchReceivedCallBack() { if (opBatchReceive == null || opBatchReceive.future == null) { return; } - notifyPendingBatchReceivedCallBack(opBatchReceive); + try { + reentrantLock.lock(); + notifyPendingBatchReceivedCallBack(opBatchReceive); + } finally { + reentrantLock.unlock(); + } } protected void notifyPendingBatchReceivedCallBack(OpBatchReceive opBatchReceive) { MessagesImpl messages = getNewMessagesImpl(); Message msgPeeked = incomingMessages.peek(); while (msgPeeked != null && messages.canAdd(msgPeeked)) { - Message msg = null; - try { - msg = incomingMessages.poll(0L, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - // ignore - } + Message msg = incomingMessages.poll(); if (msg != null) { messageProcessed(msg); Message interceptMsg = beforeConsume(msg); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index a61952c893ec7..727a912124d59 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -218,52 +218,39 @@ private void receiveMessageFromConsumer(ConsumerImpl consumer) { // Process the message, add to the queue and trigger listener or async callback messageReceived(consumer, message); - // we're modifying pausedConsumers - lock.writeLock().lock(); - try { - int size = incomingMessages.size(); - if (size >= maxReceiverQueueSize - || (size > sharedQueueResumeThreshold && !pausedConsumers.isEmpty())) { - // mark this consumer to be resumed later: if No more space left in shared queue, - // or if any consumer is already paused (to create fair chance for already paused consumers) - pausedConsumers.add(consumer); - } else { - // Schedule next receiveAsync() if the incoming queue is not full. Use a different thread to avoid - // recursion and stack overflow - client.eventLoopGroup().execute(() -> { - receiveMessageFromConsumer(consumer); - }); - } - } finally { - lock.writeLock().unlock(); + int size = incomingMessages.size(); + if (size >= maxReceiverQueueSize + || (size > sharedQueueResumeThreshold && !pausedConsumers.isEmpty())) { + // mark this consumer to be resumed later: if No more space left in shared queue, + // or if any consumer is already paused (to create fair chance for already paused consumers) + pausedConsumers.add(consumer); + } else { + // Schedule next receiveAsync() if the incoming queue is not full. Use a different thread to avoid + // recursion and stack overflow + client.eventLoopGroup().execute(() -> { + receiveMessageFromConsumer(consumer); + }); } }); } private void messageReceived(ConsumerImpl consumer, Message message) { checkArgument(message instanceof MessageImpl); - lock.writeLock().lock(); - try { - TopicMessageImpl topicMessage = new TopicMessageImpl<>( + TopicMessageImpl topicMessage = new TopicMessageImpl<>( consumer.getTopic(), consumer.getTopicNameWithoutPartition(), message); - if (log.isDebugEnabled()) { - log.debug("[{}][{}] Received message from topics-consumer {}", + if (log.isDebugEnabled()) { + log.debug("[{}][{}] Received message from topics-consumer {}", topic, subscription, message.getMessageId()); - } + } - // if asyncReceive is waiting : return message to callback without adding to incomingMessages queue - if (!pendingReceives.isEmpty()) { - CompletableFuture> receivedFuture = pendingReceives.poll(); - unAckedMessageTracker.add(topicMessage.getMessageId()); - listenerExecutor.execute(() -> receivedFuture.complete(topicMessage)); - } else if (enqueueMessageAndCheckBatchReceive(topicMessage)) { - if (hasPendingBatchReceive()) { - notifyPendingBatchReceivedCallBack(); - } - } - } finally { - lock.writeLock().unlock(); + // if asyncReceive is waiting : return message to callback without adding to incomingMessages queue + CompletableFuture> receivedFuture = pendingReceives.poll(); + if (receivedFuture != null) { + unAckedMessageTracker.add(topicMessage.getMessageId()); + listenerExecutor.execute(() -> receivedFuture.complete(topicMessage)); + } else if (enqueueMessageAndCheckBatchReceive(topicMessage) && hasPendingBatchReceive()) { + notifyPendingBatchReceivedCallBack(); } if (listener != null) { @@ -304,23 +291,17 @@ protected synchronized void messageProcessed(Message msg) { } private void resumeReceivingFromPausedConsumersIfNeeded() { - lock.readLock().lock(); - try { - if (incomingMessages.size() <= sharedQueueResumeThreshold && !pausedConsumers.isEmpty()) { - while (true) { - ConsumerImpl consumer = pausedConsumers.poll(); - if (consumer == null) { - break; - } - - // if messages are readily available on consumer we will attempt to writeLock on the same thread - client.eventLoopGroup().execute(() -> { - receiveMessageFromConsumer(consumer); - }); + if (incomingMessages.size() <= sharedQueueResumeThreshold && !pausedConsumers.isEmpty()) { + while (true) { + ConsumerImpl consumer = pausedConsumers.poll(); + if (consumer == null) { + break; } + + client.eventLoopGroup().execute(() -> { + receiveMessageFromConsumer(consumer); + }); } - } finally { - lock.readLock().unlock(); } } @@ -405,26 +386,16 @@ protected CompletableFuture> internalBatchReceiveAsync() { @Override protected CompletableFuture> internalReceiveAsync() { CompletableFuture> result = new CompletableFuture<>(); - Message message; - try { - lock.writeLock().lock(); - message = incomingMessages.poll(0, TimeUnit.SECONDS); - if (message == null) { - pendingReceives.add(result); - } else { - INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, -message.getData().length); - checkState(message instanceof TopicMessageImpl); - unAckedMessageTracker.add(message.getMessageId()); - resumeReceivingFromPausedConsumersIfNeeded(); - result.complete(message); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - result.completeExceptionally(new PulsarClientException(e)); - } finally { - lock.writeLock().unlock(); + Message message = incomingMessages.poll(); + if (message == null) { + pendingReceives.add(result); + } else { + INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, -message.getData().length); + checkState(message instanceof TopicMessageImpl); + unAckedMessageTracker.add(message.getMessageId()); + resumeReceivingFromPausedConsumersIfNeeded(); + result.complete(message); } - return result; } @@ -592,14 +563,9 @@ public CompletableFuture closeAsync() { } private void failPendingReceive() { - lock.readLock().lock(); - try { - if (listenerExecutor != null && !listenerExecutor.isShutdown()) { - failPendingReceives(pendingReceives); - failPendingBatchReceives(pendingBatchReceives); - } - } finally { - lock.readLock().unlock(); + if (listenerExecutor != null && !listenerExecutor.isShutdown()) { + failPendingReceives(pendingReceives); + failPendingBatchReceives(pendingBatchReceives); } }