Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove unnecessary locks #8207

Merged
merged 3 commits into from
Oct 12, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -613,8 +613,7 @@ protected boolean canEnqueueMessage(Message<T> message) {
}

protected boolean enqueueMessageAndCheckBatchReceive(Message<T> message) {
if (canEnqueueMessage(message)) {
incomingMessages.add(message);
if (canEnqueueMessage(message) && incomingMessages.offer(message)) {
INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(
this, message.getData() == null ? 0 : message.getData().length);
}
Expand Down Expand Up @@ -678,19 +677,16 @@ protected void notifyPendingBatchReceivedCallBack() {
if (opBatchReceive == null || opBatchReceive.future == null) {
return;
}
notifyPendingBatchReceivedCallBack(opBatchReceive);
synchronized (incomingMessages) {
notifyPendingBatchReceivedCallBack(opBatchReceive);
}
}

protected void notifyPendingBatchReceivedCallBack(OpBatchReceive<T> opBatchReceive) {
MessagesImpl<T> messages = getNewMessagesImpl();
Message<T> msgPeeked = incomingMessages.peek();
while (msgPeeked != null && messages.canAdd(msgPeeked)) {
Message<T> msg = null;
try {
msg = incomingMessages.poll(0L, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
// ignore
}
Message<T> msg = incomingMessages.poll();
if (msg != null) {
messageProcessed(msg);
Message<T> interceptMsg = beforeConsume(msg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,52 +218,39 @@ private void receiveMessageFromConsumer(ConsumerImpl<T> consumer) {
// Process the message, add to the queue and trigger listener or async callback
messageReceived(consumer, message);

// we're modifying pausedConsumers
lock.writeLock().lock();
try {
int size = incomingMessages.size();
if (size >= maxReceiverQueueSize
|| (size > sharedQueueResumeThreshold && !pausedConsumers.isEmpty())) {
// mark this consumer to be resumed later: if No more space left in shared queue,
// or if any consumer is already paused (to create fair chance for already paused consumers)
pausedConsumers.add(consumer);
} else {
// Schedule next receiveAsync() if the incoming queue is not full. Use a different thread to avoid
// recursion and stack overflow
client.eventLoopGroup().execute(() -> {
receiveMessageFromConsumer(consumer);
});
}
} finally {
lock.writeLock().unlock();
int size = incomingMessages.size();
if (size >= maxReceiverQueueSize
|| (size > sharedQueueResumeThreshold && !pausedConsumers.isEmpty())) {
// mark this consumer to be resumed later: if No more space left in shared queue,
// or if any consumer is already paused (to create fair chance for already paused consumers)
pausedConsumers.add(consumer);
} else {
// Schedule next receiveAsync() if the incoming queue is not full. Use a different thread to avoid
// recursion and stack overflow
client.eventLoopGroup().execute(() -> {
receiveMessageFromConsumer(consumer);
});
}
});
}

private void messageReceived(ConsumerImpl<T> consumer, Message<T> message) {
checkArgument(message instanceof MessageImpl);
lock.writeLock().lock();
try {
TopicMessageImpl<T> topicMessage = new TopicMessageImpl<>(
TopicMessageImpl<T> topicMessage = new TopicMessageImpl<>(
consumer.getTopic(), consumer.getTopicNameWithoutPartition(), message);

if (log.isDebugEnabled()) {
log.debug("[{}][{}] Received message from topics-consumer {}",
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Received message from topics-consumer {}",
topic, subscription, message.getMessageId());
}
}

// if asyncReceive is waiting : return message to callback without adding to incomingMessages queue
if (!pendingReceives.isEmpty()) {
CompletableFuture<Message<T>> receivedFuture = pendingReceives.poll();
unAckedMessageTracker.add(topicMessage.getMessageId());
listenerExecutor.execute(() -> receivedFuture.complete(topicMessage));
} else if (enqueueMessageAndCheckBatchReceive(topicMessage)) {
if (hasPendingBatchReceive()) {
notifyPendingBatchReceivedCallBack();
}
}
} finally {
lock.writeLock().unlock();
// if asyncReceive is waiting : return message to callback without adding to incomingMessages queue
CompletableFuture<Message<T>> receivedFuture = pendingReceives.poll();
if (receivedFuture != null) {
unAckedMessageTracker.add(topicMessage.getMessageId());
listenerExecutor.execute(() -> receivedFuture.complete(topicMessage));
} else if (enqueueMessageAndCheckBatchReceive(topicMessage) && hasPendingBatchReceive()) {
notifyPendingBatchReceivedCallBack();
}

if (listener != null) {
Expand Down Expand Up @@ -304,23 +291,17 @@ protected synchronized void messageProcessed(Message<?> msg) {
}

private void resumeReceivingFromPausedConsumersIfNeeded() {
lock.readLock().lock();
try {
if (incomingMessages.size() <= sharedQueueResumeThreshold && !pausedConsumers.isEmpty()) {
while (true) {
ConsumerImpl<T> consumer = pausedConsumers.poll();
if (consumer == null) {
break;
}

// if messages are readily available on consumer we will attempt to writeLock on the same thread
client.eventLoopGroup().execute(() -> {
receiveMessageFromConsumer(consumer);
});
if (incomingMessages.size() <= sharedQueueResumeThreshold && !pausedConsumers.isEmpty()) {
while (true) {
ConsumerImpl<T> consumer = pausedConsumers.poll();
if (consumer == null) {
break;
}

client.eventLoopGroup().execute(() -> {
receiveMessageFromConsumer(consumer);
});
}
} finally {
lock.readLock().unlock();
}
}

Expand Down Expand Up @@ -405,26 +386,16 @@ protected CompletableFuture<Messages<T>> internalBatchReceiveAsync() {
@Override
protected CompletableFuture<Message<T>> internalReceiveAsync() {
CompletableFuture<Message<T>> result = new CompletableFuture<>();
Message<T> message;
try {
lock.writeLock().lock();
message = incomingMessages.poll(0, TimeUnit.SECONDS);
if (message == null) {
pendingReceives.add(result);
} else {
INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, -message.getData().length);
checkState(message instanceof TopicMessageImpl);
unAckedMessageTracker.add(message.getMessageId());
resumeReceivingFromPausedConsumersIfNeeded();
result.complete(message);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
result.completeExceptionally(new PulsarClientException(e));
} finally {
lock.writeLock().unlock();
Message<T> message = incomingMessages.poll();
if (message == null) {
pendingReceives.add(result);
} else {
INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, -message.getData().length);
checkState(message instanceof TopicMessageImpl);
unAckedMessageTracker.add(message.getMessageId());
resumeReceivingFromPausedConsumersIfNeeded();
result.complete(message);
}

return result;
}

Expand Down Expand Up @@ -592,14 +563,9 @@ public CompletableFuture<Void> closeAsync() {
}

private void failPendingReceive() {
lock.readLock().lock();
try {
if (listenerExecutor != null && !listenerExecutor.isShutdown()) {
failPendingReceives(pendingReceives);
failPendingBatchReceives(pendingBatchReceives);
}
} finally {
lock.readLock().unlock();
if (listenerExecutor != null && !listenerExecutor.isShutdown()) {
failPendingReceives(pendingReceives);
failPendingBatchReceives(pendingBatchReceives);
}
}

Expand Down